Quellcode durchsuchen

Use unbounded_channel for server and client events

Bounded channels can cause deadlock if the process is temporarily
starved of CPU cycles. Use unbounded ones to prevent this issue.
Frans Bergman vor 2 Jahren
Ursprung
Commit
c0f650ba6f

+ 8 - 8
src/server/client/client_worker.rs

@@ -7,11 +7,11 @@ use mumble_protocol::voice::{Clientbound, Serverbound, VoicePacket};
 use std::marker::PhantomData;
 use std::sync::Arc;
 use tokio::sync::mpsc;
-use tokio::sync::mpsc::{Receiver, Sender};
+use tokio::sync::mpsc::{UnboundedReceiver, Receiver, UnboundedSender, Sender};
 use tokio::task::JoinHandle;
 
 pub struct ClientWorker<C: ControlChannel, A: AudioChannel> {
-    event_sender: Sender<ServerEvent>,
+    event_sender: UnboundedSender<ServerEvent>,
     audio_channel_sender: Sender<A>,
     task: JoinHandle<()>,
     control_channel_type: PhantomData<C>,
@@ -39,10 +39,10 @@ impl<C: ControlChannel + 'static, A: AudioChannel + 'static> ClientWorker<C, A>
         storage: Arc<Storage>,
         control_channel: C,
         config: Config,
-    ) -> Result<(Self, Receiver<ClientEvent>), ConnectionSetupError> {
+    ) -> Result<(Self, UnboundedReceiver<ClientEvent>), ConnectionSetupError> {
         let control_channel = Arc::new(control_channel);
-        let (event_sender, event_receiver) = mpsc::channel(1);
-        let (server_event_sender, server_event_receiver) = mpsc::channel(1);
+        let (event_sender, event_receiver) = mpsc::unbounded_channel();
+        let (server_event_sender, server_event_receiver) = mpsc::unbounded_channel();
         let (audio_sender, audio_receiver) = mpsc::channel(1);
         let handler: Handler<C, A> = Handler::new(
             storage,
@@ -69,7 +69,7 @@ impl<C: ControlChannel + 'static, A: AudioChannel + 'static> ClientWorker<C, A>
     }
 
     pub async fn send_event(&self, event: ServerEvent) {
-        if self.event_sender.send(event).await.is_err() {
+        if self.event_sender.send(event).is_err() {
             todo!()
         }
     }
@@ -83,7 +83,7 @@ impl<C: ControlChannel + 'static, A: AudioChannel + 'static> ClientWorker<C, A>
     async fn run_handler_loop(
         mut handler: Handler<C, A>,
         control_channel: Arc<C>,
-        event_receiver: Receiver<ServerEvent>,
+        event_receiver: UnboundedReceiver<ServerEvent>,
         channel_receiver: Receiver<A>,
     ) -> JoinHandle<()> {
         tokio::spawn(async move {
@@ -109,7 +109,7 @@ impl<C: ControlChannel + 'static, A: AudioChannel + 'static> ClientWorker<C, A>
     async fn handler_loop(
         handler: &mut Handler<C, A>,
         control_channel: Arc<C>,
-        mut event_receiver: Receiver<ServerEvent>,
+        mut event_receiver: UnboundedReceiver<ServerEvent>,
         mut channel_receiver: Receiver<A>,
     ) -> Result<(), HandlerError> {
         let mut audio_channel: Option<Arc<A>> = None;

+ 5 - 7
src/server/client/handler.rs

@@ -15,7 +15,7 @@ use std::marker::PhantomData;
 use std::num::NonZeroU32;
 use std::sync::Arc;
 use tokio::sync::mpsc::error::SendError;
-use tokio::sync::mpsc::Sender;
+use tokio::sync::mpsc::UnboundedSender as Sender;
 
 static PBKDF2_ALGORITHM: pbkdf2::Algorithm = pbkdf2::PBKDF2_HMAC_SHA256;
 
@@ -249,7 +249,7 @@ impl<C: ControlChannel, A: AudioChannel> Handler<C, A> {
                     payload,
                     position_info,
                 };
-                self.event_sender.send(ClientEvent::Talking(packet)).await?;
+                self.event_sender.send(ClientEvent::Talking(packet))?;
             }
         }
 
@@ -258,7 +258,7 @@ impl<C: ControlChannel, A: AudioChannel> Handler<C, A> {
 
     pub async fn self_disconnected(&self) -> Result<(), HandlerError> {
         self.storage.remove_by_session_id(self.session_id);
-        self.event_sender.send(ClientEvent::Disconnected).await?;
+        self.event_sender.send(ClientEvent::Disconnected)?;
         Ok(())
     }
 
@@ -287,8 +287,7 @@ impl<C: ControlChannel, A: AudioChannel> Handler<C, A> {
             message.set_actor(self.session_id);
         }
         self.event_sender
-            .send(ClientEvent::TextMessage(message))
-            .await?;
+            .send(ClientEvent::TextMessage(message))?;
         Ok(())
     }
 
@@ -309,8 +308,7 @@ impl<C: ControlChannel, A: AudioChannel> Handler<C, A> {
         self.storage
             .update_session_data(self.session_id, session_data);
         self.event_sender
-            .send(ClientEvent::StateChanged(state))
-            .await?;
+            .send(ClientEvent::StateChanged(state))?;
 
         Ok(())
     }

+ 2 - 2
src/server/connection_worker.rs

@@ -10,7 +10,7 @@ use mumble_protocol::crypt::CryptState;
 use mumble_protocol::voice::{Clientbound, VoicePacket};
 use std::sync::Arc;
 use tokio::net::TcpStream;
-use tokio::sync::mpsc::Receiver;
+use tokio::sync::mpsc::UnboundedReceiver;
 use tokio_rustls::server::TlsStream;
 
 pub struct ConnectionWorker {
@@ -84,7 +84,7 @@ impl ConnectionWorker {
         Ok(())
     }
 
-    async fn event_loop(&self, mut event_receiver: Receiver<ClientEvent>) {
+    async fn event_loop(&self, mut event_receiver: UnboundedReceiver<ClientEvent>) {
         loop {
             let message = event_receiver.recv().await.unwrap();
             match message {