Browse Source

Fix clippy warnings

Sergey Chushin 3 years ago
parent
commit
4ad767424c

+ 2 - 0
src/protocol/mod.rs

@@ -1,4 +1,6 @@
 pub mod connection;
+// TODO Remove dead code
+#[allow(dead_code)]
 pub mod parser;
 
 mod mumble;

+ 0 - 150
src/server/client/client.rs

@@ -1,150 +0,0 @@
-use crate::protocol::connection::{AudioChannel, ControlChannel};
-use crate::protocol::parser::{AudioData, AudioPacket, TextMessage, UserState};
-use crate::server::client::handler::{Config, Error, Handler};
-use crate::storage::Storage;
-use std::marker::PhantomData;
-use std::sync::Arc;
-use tokio::sync::mpsc;
-use tokio::sync::mpsc::{Receiver, Sender};
-use tokio::task::JoinHandle;
-
-pub struct Client<C: ControlChannel, A: AudioChannel> {
-    event_sender: Sender<ServerEvent>,
-    audio_channel_sender: Sender<A>,
-    task: JoinHandle<()>,
-    control_channel_type: PhantomData<C>,
-    audio_channel_type: PhantomData<A>,
-}
-
-pub enum ClientEvent {
-    Talking(AudioData),
-    StateChanged(UserState),
-    TextMessage(TextMessage),
-    Disconnected,
-}
-
-pub enum ServerEvent {
-    Connected(u32),
-    Talking(AudioData),
-    StateChanged(UserState),
-    TextMessage(TextMessage),
-    Disconnected(u32),
-}
-
-impl<C: ControlChannel + 'static, A: AudioChannel + 'static> Client<C, A> {
-    pub async fn setup_connection(
-        session_id: u32,
-        storage: Arc<Storage>,
-        control_channel: C,
-        config: Config,
-    ) -> Result<(Self, Receiver<ClientEvent>), Error> {
-        let control_channel = Arc::new(control_channel);
-        let (event_sender, event_receiver) = mpsc::channel(1);
-        let (server_event_sender, server_event_receiver) = mpsc::channel(1);
-        let (audio_sender, audio_receiver) = mpsc::channel(1);
-        let handler: Handler<C, A> = Handler::new(
-            storage,
-            Arc::clone(&control_channel),
-            event_sender,
-            session_id,
-            config,
-        );
-        handler.handle_new_connection().await?;
-        let client = Client {
-            event_sender: server_event_sender,
-            audio_channel_sender: audio_sender,
-            task: Self::run_handler_loop(
-                handler,
-                control_channel,
-                server_event_receiver,
-                audio_receiver,
-            )
-            .await,
-            control_channel_type: Default::default(),
-            audio_channel_type: Default::default(),
-        };
-        Ok((client, event_receiver))
-    }
-
-    pub async fn send_event(&self, event: ServerEvent) {
-        self.event_sender.send(event).await;
-    }
-
-    pub async fn set_audio_channel(&mut self, channel: A) {
-        self.audio_channel_sender.send(channel).await;
-    }
-
-    async fn run_handler_loop(
-        mut handler: Handler<C, A>,
-        control_channel: Arc<C>,
-        mut event_receiver: Receiver<ServerEvent>,
-        mut channel_receiver: Receiver<A>,
-    ) -> JoinHandle<()> {
-        // TODO cleaner solution
-        tokio::spawn(async move {
-            let mut audio_channel: Option<Arc<A>> = None;
-            let msg_recv_fut = control_channel.receive();
-            let audio_recv_fut = Self::recv(audio_channel.clone());
-            tokio::pin!(msg_recv_fut, audio_recv_fut);
-
-            loop {
-                tokio::select! {
-                    result = &mut msg_recv_fut => {
-                        match result {
-                            Ok(msg) => {
-                                msg_recv_fut.set(control_channel.receive());
-                                handler.handle_message(msg).await;
-                            }
-                            Err(crate::protocol::connection::Error::Parsing(_)) => {
-                                // TODO
-                                // Ignore for now
-                                msg_recv_fut.set(control_channel.receive());
-                            }
-                            Err(_) => {
-                                handler.self_disconnected().await;
-                                break;
-                            }
-                        }
-                    }
-                    Some(event) = event_receiver.recv() => {
-                        handler.handle_server_event(event).await;
-                    }
-                    Some(channel) = channel_receiver.recv() => {
-                        let channel = Arc::new(channel);
-                        handler.set_audio_channel(Arc::clone(&channel));
-                        audio_channel = Some(channel);
-                        audio_recv_fut.set(Self::recv(audio_channel.clone()));
-                    }
-                    Some(result) = &mut audio_recv_fut => {
-                        match result {
-                            Ok(packet) => {
-                                audio_recv_fut.set(Self::recv(audio_channel.clone()));
-                                handler.handle_audio_packet(packet).await;
-                            }
-                            Err(_) => {
-                                handler.self_disconnected().await;
-                                break;
-                            }
-                        }
-                    }
-                };
-            }
-        })
-    }
-
-    async fn recv(
-        audio_channel: Option<Arc<A>>,
-    ) -> Option<Result<AudioPacket, crate::protocol::connection::Error>> {
-        if let Some(channel) = audio_channel {
-            Some(channel.receive().await)
-        } else {
-            std::future::pending().await
-        }
-    }
-}
-
-impl<C: ControlChannel, A: AudioChannel> Drop for Client<C, A> {
-    fn drop(&mut self) {
-        self.task.abort();
-    }
-}

+ 176 - 0
src/server/client/client_worker.rs

@@ -0,0 +1,176 @@
+use crate::protocol::connection::{AudioChannel, ControlChannel};
+use crate::protocol::parser::{AudioData, AudioPacket, TextMessage, UserState};
+use crate::server::client::handler::{Config, ConnectionSetupError, Handler, HandlerError};
+use crate::storage::Storage;
+use log::error;
+use std::marker::PhantomData;
+use std::sync::Arc;
+use tokio::sync::mpsc;
+use tokio::sync::mpsc::{Receiver, Sender};
+use tokio::task::JoinHandle;
+
+pub struct ClientWorker<C: ControlChannel, A: AudioChannel> {
+    event_sender: Sender<ServerEvent>,
+    audio_channel_sender: Sender<A>,
+    task: JoinHandle<()>,
+    control_channel_type: PhantomData<C>,
+    audio_channel_type: PhantomData<A>,
+}
+
+pub enum ClientEvent {
+    Talking(AudioData),
+    StateChanged(UserState),
+    TextMessage(TextMessage),
+    Disconnected,
+}
+
+pub enum ServerEvent {
+    Connected(u32),
+    Talking(AudioData),
+    StateChanged(UserState),
+    TextMessage(TextMessage),
+    Disconnected(u32),
+}
+
+impl<C: ControlChannel + 'static, A: AudioChannel + 'static> ClientWorker<C, A> {
+    pub async fn setup_connection(
+        session_id: u32,
+        storage: Arc<Storage>,
+        control_channel: C,
+        config: Config,
+    ) -> Result<(Self, Receiver<ClientEvent>), ConnectionSetupError> {
+        let control_channel = Arc::new(control_channel);
+        let (event_sender, event_receiver) = mpsc::channel(1);
+        let (server_event_sender, server_event_receiver) = mpsc::channel(1);
+        let (audio_sender, audio_receiver) = mpsc::channel(1);
+        let handler: Handler<C, A> = Handler::new(
+            storage,
+            Arc::clone(&control_channel),
+            event_sender,
+            session_id,
+            config,
+        );
+        handler.handle_new_connection().await?;
+        let client = ClientWorker {
+            event_sender: server_event_sender,
+            audio_channel_sender: audio_sender,
+            task: Self::run_handler_loop(
+                handler,
+                control_channel,
+                server_event_receiver,
+                audio_receiver,
+            )
+            .await,
+            control_channel_type: Default::default(),
+            audio_channel_type: Default::default(),
+        };
+        Ok((client, event_receiver))
+    }
+
+    pub async fn send_event(&self, event: ServerEvent) {
+        if self.event_sender.send(event).await.is_err() {
+            todo!()
+        }
+    }
+
+    pub async fn set_audio_channel(&mut self, channel: A) {
+        if self.audio_channel_sender.send(channel).await.is_err() {
+            todo!()
+        }
+    }
+
+    async fn run_handler_loop(
+        handler: Handler<C, A>,
+        control_channel: Arc<C>,
+        event_receiver: Receiver<ServerEvent>,
+        channel_receiver: Receiver<A>,
+    ) -> JoinHandle<()> {
+        tokio::spawn(async move {
+            match Self::handler_loop(handler, control_channel, event_receiver, channel_receiver)
+                .await
+            {
+                Err(HandlerError::PacketParsing(_) | HandlerError::IO(_)) => {
+                    todo!()
+                }
+                Err(HandlerError::EventReceiverClosed) => {
+                    error!("Server event receiver have been dropped");
+                }
+                Ok(_) => {}
+            }
+        })
+    }
+
+    // TODO cleaner solution
+    async fn handler_loop(
+        mut handler: Handler<C, A>,
+        control_channel: Arc<C>,
+        mut event_receiver: Receiver<ServerEvent>,
+        mut channel_receiver: Receiver<A>,
+    ) -> Result<(), HandlerError> {
+        let mut audio_channel: Option<Arc<A>> = None;
+        let msg_recv_fut = control_channel.receive();
+        let audio_recv_fut = Self::recv(audio_channel.clone());
+        tokio::pin!(msg_recv_fut, audio_recv_fut);
+
+        loop {
+            tokio::select! {
+                result = &mut msg_recv_fut => {
+                    match result {
+                        Ok(msg) => {
+                            msg_recv_fut.set(control_channel.receive());
+                            handler.handle_message(msg).await?;
+                        }
+                        Err(crate::protocol::connection::Error::Parsing(_)) => {
+                            // TODO
+                            // Ignore for now
+                            msg_recv_fut.set(control_channel.receive());
+                        }
+                        Err(_) => {
+                            handler.self_disconnected().await?;
+                            break;
+                        }
+                    }
+                }
+                Some(event) = event_receiver.recv() => {
+                    handler.handle_server_event(event).await?;
+                }
+                Some(channel) = channel_receiver.recv() => {
+                    let channel = Arc::new(channel);
+                    handler.set_audio_channel(Arc::clone(&channel));
+                    audio_channel = Some(channel);
+                    audio_recv_fut.set(Self::recv(audio_channel.clone()));
+                }
+                Some(result) = &mut audio_recv_fut => {
+                    match result {
+                        Ok(packet) => {
+                            audio_recv_fut.set(Self::recv(audio_channel.clone()));
+                            handler.handle_audio_packet(packet).await?;
+                        }
+                        Err(_) => {
+                            handler.self_disconnected().await?;
+                            break;
+                        }
+                    }
+                }
+            };
+        }
+
+        Ok(())
+    }
+
+    async fn recv(
+        audio_channel: Option<Arc<A>>,
+    ) -> Option<Result<AudioPacket, crate::protocol::connection::Error>> {
+        if let Some(channel) = audio_channel {
+            Some(channel.receive().await)
+        } else {
+            std::future::pending().await
+        }
+    }
+}
+
+impl<C: ControlChannel, A: AudioChannel> Drop for ClientWorker<C, A> {
+    fn drop(&mut self) {
+        self.task.abort();
+    }
+}

+ 69 - 45
src/server/client/handler.rs

@@ -1,15 +1,16 @@
-use crate::protocol::connection::{AudioChannel, ControlChannel};
+use crate::protocol::connection::{AudioChannel, ControlChannel, Error};
 use crate::protocol::parser::{
     AudioData, AudioPacket, Authenticate, ChannelState, CodecVersion, ControlMessage, CryptSetup,
-    Ping, ServerConfig, ServerSync, SessionId, TextMessage, UdpTunnel, UserRemove, UserState,
-    Version, MUMBLE_PROTOCOL_VERSION,
+    ParsingError, Ping, ServerConfig, ServerSync, SessionId, TextMessage, UdpTunnel, UserRemove,
+    UserState, Version, MUMBLE_PROTOCOL_VERSION,
 };
-use crate::server::client::client::{ClientEvent, ServerEvent};
+use crate::server::client::client_worker::{ClientEvent, ServerEvent};
 use crate::storage::{Guest, SessionData, Storage};
 use log::error;
 use ring::pbkdf2;
 use std::num::NonZeroU32;
 use std::sync::Arc;
+use tokio::sync::mpsc::error::SendError;
 use tokio::sync::mpsc::Sender;
 
 static PBKDF2_ALGORITHM: pbkdf2::Algorithm = pbkdf2::PBKDF2_HMAC_SHA256;
@@ -47,9 +48,15 @@ pub struct Config {
     pub pbkdf2_iterations: NonZeroU32,
 }
 
-pub enum Error {
+pub enum HandlerError {
     IO(std::io::Error),
-    PacketParsing(crate::protocol::parser::ParsingError),
+    PacketParsing(ParsingError),
+    EventReceiverClosed,
+}
+
+pub enum ConnectionSetupError {
+    IO(std::io::Error),
+    PacketParsing(ParsingError),
     Reject(Reject),
     WrongPacket,
 }
@@ -57,10 +64,10 @@ pub enum Error {
 pub enum Reject {
     InvalidUsername,
     UsernameInUse,
-    WrongVersion,
+    _WrongVersion,
     WrongUserPassword,
-    WrongServerPassword,
-    NoCertificate,
+    _WrongServerPassword,
+    _NoCertificate,
 }
 
 impl<C: ControlChannel, A: AudioChannel> Handler<C, A> {
@@ -88,17 +95,17 @@ impl<C: ControlChannel, A: AudioChannel> Handler<C, A> {
         self.audio_channel = Some(channel);
     }
 
-    pub async fn handle_new_connection(&self) -> Result<(), Error> {
+    pub async fn handle_new_connection(&self) -> Result<(), ConnectionSetupError> {
         match self.control_channel.receive().await? {
             ControlMessage::Version(_) => {
                 // TODO check version
             }
-            _ => return Err(Error::WrongPacket),
+            _ => return Err(ConnectionSetupError::WrongPacket),
         };
         // TODO
         let auth = match self.control_channel.receive().await? {
             ControlMessage::Authenticate(auth) => auth,
-            _ => return Err(Error::WrongPacket),
+            _ => return Err(ConnectionSetupError::WrongPacket),
         };
         self.authenticate(auth).await?;
 
@@ -110,15 +117,14 @@ impl<C: ControlChannel, A: AudioChannel> Handler<C, A> {
             client_nonce: Some(Vec::from(self.config.client_nonce)),
             server_nonce: Some(Vec::from(self.config.server_nonce)),
         };
-        let channel_states: Vec<ChannelState> = self
+        let channel_states = self
             .storage
             .get_channels()
             .into_iter()
             .map(|channel| ChannelState {
                 id: Some(channel.id),
                 name: Some(channel.name),
-            })
-            .collect();
+            });
         let user_states: Vec<UserState> = self.get_user_states();
         let codec_version = CodecVersion {
             celt_alpha_version: self.config.alpha_codec_version,
@@ -138,7 +144,7 @@ impl<C: ControlChannel, A: AudioChannel> Handler<C, A> {
 
         self.control_channel.send(version).await?;
         self.control_channel.send(crypt_setup).await?;
-        for channel_state in channel_states.into_iter() {
+        for channel_state in channel_states {
             self.control_channel.send(channel_state).await?;
         }
         for user_state in user_states.into_iter() {
@@ -151,7 +157,7 @@ impl<C: ControlChannel, A: AudioChannel> Handler<C, A> {
         Ok(())
     }
 
-    pub async fn handle_server_event(&self, event: ServerEvent) -> Result<(), Error> {
+    pub async fn handle_server_event(&self, event: ServerEvent) -> Result<(), HandlerError> {
         match event {
             ServerEvent::Connected(session_id) => self.new_user_connected(session_id).await?,
             ServerEvent::StateChanged(state) => self.user_state_changed(state).await?,
@@ -163,7 +169,7 @@ impl<C: ControlChannel, A: AudioChannel> Handler<C, A> {
         Ok(())
     }
 
-    pub async fn handle_message(&self, packet: ControlMessage) -> Result<(), Error> {
+    pub async fn handle_message(&self, packet: ControlMessage) -> Result<(), HandlerError> {
         match packet {
             ControlMessage::Ping(ping) => self.control_ping(ping).await?,
             ControlMessage::TextMessage(message) => self.text_message(message).await?,
@@ -174,29 +180,32 @@ impl<C: ControlChannel, A: AudioChannel> Handler<C, A> {
         Ok(())
     }
 
-    pub async fn handle_audio_packet(&self, packet: AudioPacket) -> Result<(), Error> {
+    pub async fn handle_audio_packet(&self, packet: AudioPacket) -> Result<(), HandlerError> {
         match packet {
             AudioPacket::Ping(_) => {
-                self.audio_channel.as_ref().unwrap().send(packet).await;
+                if let Some(channel) = self.audio_channel.as_ref() {
+                    channel.send(packet).await?;
+                }
             }
             AudioPacket::AudioData(mut audio_data) => {
                 audio_data.session_id = Some(SessionId::from(self.session_id));
                 self.event_sender
                     .send(ClientEvent::Talking(audio_data))
-                    .await;
+                    .await?;
             }
         }
 
         Ok(())
     }
 
-    pub async fn self_disconnected(&self) {
+    pub async fn self_disconnected(&self) -> Result<(), HandlerError> {
         self.storage.remove_by_session_id(self.session_id);
-        self.event_sender.send(ClientEvent::Disconnected).await;
+        self.event_sender.send(ClientEvent::Disconnected).await?;
+        Ok(())
     }
 
     // Control packets
-    async fn control_ping(&self, incoming: Ping) -> Result<(), Error> {
+    async fn control_ping(&self, incoming: Ping) -> Result<(), HandlerError> {
         let mut ping = Ping {
             timestamp: incoming.timestamp,
             good: None,
@@ -216,9 +225,9 @@ impl<C: ControlChannel, A: AudioChannel> Handler<C, A> {
         Ok(())
     }
 
-    async fn text_message(&self, mut message: TextMessage) -> Result<(), Error> {
+    async fn text_message(&self, mut message: TextMessage) -> Result<(), HandlerError> {
         if self.config.max_message_length < message.message.len() as u32 {
-            // TODO send a permission denied message
+            // TODO send the permission denied message
             return Ok(());
         }
         if message.sender.is_none() {
@@ -226,11 +235,11 @@ impl<C: ControlChannel, A: AudioChannel> Handler<C, A> {
         }
         self.event_sender
             .send(ClientEvent::TextMessage(message))
-            .await;
+            .await?;
         Ok(())
     }
 
-    async fn user_state(&self, mut state: UserState) -> Result<(), Error> {
+    async fn user_state(&self, mut state: UserState) -> Result<(), HandlerError> {
         if state.session_id.is_none() {
             state.session_id = Some(SessionId::from(self.session_id));
         }
@@ -248,12 +257,12 @@ impl<C: ControlChannel, A: AudioChannel> Handler<C, A> {
             .update_session_data(self.session_id, session_data);
         self.event_sender
             .send(ClientEvent::StateChanged(state))
-            .await;
+            .await?;
 
         Ok(())
     }
 
-    async fn tunnel(&self, tunnel: UdpTunnel) -> Result<(), Error> {
+    async fn tunnel(&self, tunnel: UdpTunnel) -> Result<(), HandlerError> {
         match tunnel.audio_packet {
             AudioPacket::Ping(_) => {
                 self.control_channel
@@ -264,7 +273,7 @@ impl<C: ControlChannel, A: AudioChannel> Handler<C, A> {
                 audio_data.session_id = Some(SessionId::from(self.session_id));
                 self.event_sender
                     .send(ClientEvent::Talking(audio_data))
-                    .await;
+                    .await?;
             }
         }
 
@@ -272,7 +281,7 @@ impl<C: ControlChannel, A: AudioChannel> Handler<C, A> {
     }
 
     // Server events
-    async fn new_user_connected(&self, session_id: u32) -> Result<(), Error> {
+    async fn new_user_connected(&self, session_id: u32) -> Result<(), HandlerError> {
         let id = Some(SessionId::from(session_id));
         if let Some(user) = self.storage.get_connected_user(session_id) {
             self.control_channel
@@ -297,12 +306,12 @@ impl<C: ControlChannel, A: AudioChannel> Handler<C, A> {
         Ok(())
     }
 
-    async fn user_state_changed(&self, state: UserState) -> Result<(), Error> {
+    async fn user_state_changed(&self, state: UserState) -> Result<(), HandlerError> {
         self.control_channel.send(state).await?;
         Ok(())
     }
 
-    async fn user_talking(&self, audio_data: AudioData) -> Result<(), Error> {
+    async fn user_talking(&self, audio_data: AudioData) -> Result<(), HandlerError> {
         if let Some(data) = self.storage.get_session_data(self.session_id) {
             if data.self_deaf || data.deafened_by_admin {
                 return Ok(());
@@ -321,30 +330,30 @@ impl<C: ControlChannel, A: AudioChannel> Handler<C, A> {
         Ok(())
     }
 
-    async fn user_disconnected(&self, session_id: u32) -> Result<(), Error> {
+    async fn user_disconnected(&self, session_id: u32) -> Result<(), HandlerError> {
         let user_remove = UserRemove {
             session_id: session_id.into(),
         };
         Ok(self.control_channel.send(user_remove).await?)
     }
 
-    async fn user_text_message(&self, message: TextMessage) -> Result<(), Error> {
+    async fn user_text_message(&self, message: TextMessage) -> Result<(), HandlerError> {
         self.control_channel.send(message).await?;
         Ok(())
     }
 
     // Utils
-    async fn authenticate(&self, auth: Authenticate) -> Result<(), Error> {
+    async fn authenticate(&self, auth: Authenticate) -> Result<(), ConnectionSetupError> {
         let username = match auth.username {
             Some(username) => username,
-            None => return Err(Error::Reject(Reject::InvalidUsername)),
+            None => return Err(ConnectionSetupError::Reject(Reject::InvalidUsername)),
         };
         if !validate_username(&username, self.config.max_username_length as usize) {
-            return Err(Error::Reject(Reject::InvalidUsername));
+            return Err(ConnectionSetupError::Reject(Reject::InvalidUsername));
         }
 
         if self.storage.username_in_connected(&username) {
-            return Err(Error::Reject(Reject::UsernameInUse));
+            return Err(ConnectionSetupError::Reject(Reject::UsernameInUse));
         }
 
         let user = match self.storage.get_user_by_username(username.clone()) {
@@ -363,7 +372,7 @@ impl<C: ControlChannel, A: AudioChannel> Handler<C, A> {
         ) {
             let password = match auth.password {
                 Some(password) => password,
-                None => return Err(Error::Reject(Reject::WrongUserPassword)),
+                None => return Err(ConnectionSetupError::Reject(Reject::WrongUserPassword)),
             };
             pbkdf2::verify(
                 PBKDF2_ALGORITHM,
@@ -372,7 +381,7 @@ impl<C: ControlChannel, A: AudioChannel> Handler<C, A> {
                 password.as_bytes(),
                 stored_password_hash,
             )
-            .map_err(|_| Error::Reject(Reject::WrongUserPassword))?;
+            .map_err(|_| ConnectionSetupError::Reject(Reject::WrongUserPassword))?;
         }
 
         self.storage.add_connected_user(user, self.session_id);
@@ -413,11 +422,26 @@ fn validate_username(username: &str, max_username_length: usize) -> bool {
         && username.len() <= max_username_length
 }
 
-impl From<crate::protocol::connection::Error> for Error {
+impl From<Error> for HandlerError {
     fn from(err: crate::protocol::connection::Error) -> Self {
         match err {
-            crate::protocol::connection::Error::IO(err) => Error::IO(err),
-            crate::protocol::connection::Error::Parsing(err) => Error::PacketParsing(err),
+            Error::IO(err) => HandlerError::IO(err),
+            Error::Parsing(err) => HandlerError::PacketParsing(err),
+        }
+    }
+}
+
+impl From<Error> for ConnectionSetupError {
+    fn from(err: Error) -> Self {
+        match err {
+            Error::IO(err) => ConnectionSetupError::IO(err),
+            Error::Parsing(err) => ConnectionSetupError::PacketParsing(err),
         }
     }
 }
+
+impl From<SendError<ClientEvent>> for HandlerError {
+    fn from(_: SendError<ClientEvent>) -> Self {
+        HandlerError::EventReceiverClosed
+    }
+}

+ 3 - 3
src/server/client/mod.rs

@@ -1,5 +1,5 @@
-mod client;
+mod client_worker;
 mod handler;
 
-pub use self::client::{Client, ClientEvent, ServerEvent};
-pub use self::handler::{Config, Error};
+pub use self::client_worker::{ClientWorker, ClientEvent, ServerEvent};
+pub use self::handler::{Config, ConnectionSetupError};

+ 5 - 5
src/server/connection_worker.rs

@@ -1,6 +1,6 @@
 use crate::crypto::Ocb2Aes128Crypto;
 use crate::protocol::parser::{AudioData, TextMessage, UserState};
-use crate::server::client::{Client, ClientEvent, Config, Error, ServerEvent};
+use crate::server::client::{ClientWorker, ClientEvent, Config, ConnectionSetupError, ServerEvent};
 use crate::server::session_pool::{SessionId, SessionPool};
 use crate::server::tcp_control_channel::TcpControlChannel;
 use crate::server::udp_worker::{UdpAudioChannel, UdpWorker};
@@ -16,14 +16,14 @@ pub struct ConnectionWorker {
     session_id: SessionId,
     session_pool: Arc<SessionPool>,
     storage: Arc<Storage>,
-    clients: Arc<DashMap<SessionId, Client<TcpControlChannel, UdpAudioChannel>>>,
+    clients: Arc<DashMap<SessionId, ClientWorker<TcpControlChannel, UdpAudioChannel>>>,
 }
 
 impl ConnectionWorker {
     pub fn new(
         session_pool: Arc<SessionPool>,
         storage: Arc<Storage>,
-        clients: Arc<DashMap<SessionId, Client<TcpControlChannel, UdpAudioChannel>>>,
+        clients: Arc<DashMap<SessionId, ClientWorker<TcpControlChannel, UdpAudioChannel>>>,
     ) -> Self {
         ConnectionWorker {
             session_id: session_pool.pop(),
@@ -51,12 +51,12 @@ impl ConnectionWorker {
         stream: TlsStream<TcpStream>,
         config: Config,
         worker: Arc<UdpWorker>,
-    ) -> Result<(), Error> {
+    ) -> Result<(), ConnectionSetupError> {
         let address = stream.get_ref().0.peer_addr().unwrap();
         let crypto =
             Ocb2Aes128Crypto::new(config.crypto_key, config.server_nonce, config.client_nonce);
         let control_channel = TcpControlChannel::new(stream);
-        let (client, event_receiver) = Client::setup_connection(
+        let (client, event_receiver) = ClientWorker::setup_connection(
             self.session_id,
             Arc::clone(&self.storage),
             control_channel,

+ 145 - 2
src/server/mod.rs

@@ -1,8 +1,151 @@
+use crate::protocol::parser::MUMBLE_PROTOCOL_VERSION;
+use crate::server::client::{ClientWorker, Config as ClientConfig};
+use crate::server::connection_worker::ConnectionWorker;
+use crate::server::session_pool::SessionPool;
+use crate::server::tcp_control_channel::TcpControlChannel;
+use crate::server::udp_worker::{ServerInfo, UdpAudioChannel, UdpWorker};
+use crate::storage::Storage;
+use dashmap::DashMap;
+use log::{error, info};
+use rand::prelude::StdRng;
+use rand::{Rng, SeedableRng};
+use std::net::{IpAddr, SocketAddr};
+use std::num::NonZeroU32;
+use std::sync::Arc;
+use tokio::net::{TcpListener, UdpSocket};
+use tokio_rustls::rustls::{Certificate, NoClientAuth, PrivateKey, ServerConfig};
+use tokio_rustls::TlsAcceptor;
+
 mod client;
 mod connection_worker;
-mod server;
 mod session_pool;
 mod tcp_control_channel;
 mod udp_worker;
 
-pub use self::server::{Config, Server};
+pub struct Config {
+    pub ip_address: IpAddr,
+    pub port: u16,
+    pub certificate: Certificate,
+    pub private_key: PrivateKey,
+    pub path_to_db_file: String,
+}
+
+pub struct Server {
+    config: Config,
+    rng: StdRng,
+    storage: Arc<Storage>,
+    clients: Arc<DashMap<SessionId, ClientWorker<TcpControlChannel, UdpAudioChannel>>>,
+    session_pool: Arc<SessionPool>,
+}
+type SessionId = u32;
+
+impl Server {
+    pub fn new(config: Config) -> Self {
+        let storage = Storage::open(&config.path_to_db_file);
+
+        Server {
+            config,
+            rng: StdRng::from_entropy(),
+            storage: Arc::new(storage),
+            clients: Default::default(),
+            session_pool: Arc::new(SessionPool::new()),
+        }
+    }
+
+    pub async fn run(mut self) {
+        let mut tls_config = ServerConfig::new(NoClientAuth::new());
+        let result = tls_config.set_single_cert(
+            vec![self.config.certificate.clone()],
+            self.config.private_key.clone(),
+        );
+        if let Err(err) = result {
+            error!("{}", err);
+            panic!();
+        }
+
+        let socket_address = SocketAddr::new(self.config.ip_address, self.config.port);
+        let tls_acceptor = TlsAcceptor::from(Arc::new(tls_config));
+        let tcp_listener = match TcpListener::bind(socket_address).await {
+            Ok(listener) => listener,
+            Err(_) => {
+                error!("Couldn't bind tcp socket to {}", socket_address);
+                panic!();
+            }
+        };
+        let udp_socket = match UdpSocket::bind(socket_address).await {
+            Ok(socket) => socket,
+            Err(_) => {
+                error!("Couldn't bind udp socket to {}", socket_address);
+                panic!();
+            }
+        };
+        let server_info = ServerInfo {
+            version: MUMBLE_PROTOCOL_VERSION.into(),
+            connected_users: self.storage.watch_connected_count(),
+            max_users: 10,
+            max_bandwidth: 128000,
+        };
+        let udp_worker = Arc::new(UdpWorker::start(udp_socket, server_info).await);
+        info!("Server listening on {}", socket_address);
+
+        loop {
+            let (tcp_stream, _) = match tcp_listener.accept().await {
+                Ok(stream) => stream,
+                Err(err) => {
+                    info!("Tcp error: {}", err);
+                    continue;
+                }
+            };
+            let tls_stream = match tls_acceptor.accept(tcp_stream).await {
+                Ok(stream) => stream,
+                Err(err) => {
+                    info!("Tls error: {}", err);
+                    continue;
+                }
+            };
+            let worker = ConnectionWorker::new(
+                Arc::clone(&self.session_pool),
+                Arc::clone(&self.storage),
+                Arc::clone(&self.clients),
+            );
+            worker
+                .start(
+                    tls_stream,
+                    self.create_client_config(),
+                    Arc::clone(&udp_worker),
+                )
+                .await;
+        }
+    }
+
+    fn create_client_config(&mut self) -> ClientConfig {
+        let crypto_key = self.generate_key();
+        let server_nonce = self.generate_key();
+        let client_nonce = self.generate_key();
+        ClientConfig {
+            crypto_key,
+            server_nonce,
+            client_nonce,
+            alpha_codec_version: -2147483637,
+            beta_codec_version: -2147483632,
+            prefer_alpha: true,
+            opus_support: true,
+            welcome_text: "Welcome".to_string(),
+            max_bandwidth: 128000,
+            max_users: 10,
+            allow_html: true,
+            max_message_length: 512,
+            max_image_message_length: 100000,
+            max_username_length: 64,
+            min_compatible_version: 0x10200,
+            server_password: None,
+            pbkdf2_iterations: NonZeroU32::new(100_000).unwrap(),
+        }
+    }
+
+    fn generate_key(&mut self) -> [u8; 16] {
+        let mut buffer = [0; 16];
+        self.rng.fill(&mut buffer);
+        buffer
+    }
+}

+ 0 - 145
src/server/server.rs

@@ -1,145 +0,0 @@
-use crate::protocol::parser::MUMBLE_PROTOCOL_VERSION;
-use crate::server::client::{Client, Config as ClientConfig};
-use crate::server::connection_worker::ConnectionWorker;
-use crate::server::session_pool::SessionPool;
-use crate::server::tcp_control_channel::TcpControlChannel;
-use crate::server::udp_worker::{ServerInfo, UdpAudioChannel, UdpWorker};
-use crate::storage::Storage;
-use dashmap::DashMap;
-use log::{error, info};
-use rand::prelude::StdRng;
-use rand::{Rng, SeedableRng};
-use std::net::{IpAddr, SocketAddr};
-use std::num::NonZeroU32;
-use std::sync::Arc;
-use tokio::net::{TcpListener, UdpSocket};
-use tokio_rustls::rustls::{Certificate, NoClientAuth, PrivateKey, ServerConfig};
-use tokio_rustls::TlsAcceptor;
-
-pub struct Config {
-    pub ip_address: IpAddr,
-    pub port: u16,
-    pub certificate: Certificate,
-    pub private_key: PrivateKey,
-    pub path_to_db_file: String,
-}
-
-pub struct Server {
-    config: Config,
-    rng: StdRng,
-    storage: Arc<Storage>,
-    clients: Arc<DashMap<SessionId, Client<TcpControlChannel, UdpAudioChannel>>>,
-    session_pool: Arc<SessionPool>,
-}
-type SessionId = u32;
-
-impl Server {
-    pub fn new(config: Config) -> Self {
-        let storage = Storage::open(&config.path_to_db_file);
-
-        Server {
-            config,
-            rng: StdRng::from_entropy(),
-            storage: Arc::new(storage),
-            clients: Default::default(),
-            session_pool: Arc::new(SessionPool::new()),
-        }
-    }
-
-    pub async fn run(mut self) {
-        let mut tls_config = ServerConfig::new(NoClientAuth::new());
-        let result = tls_config.set_single_cert(
-            vec![self.config.certificate.clone()],
-            self.config.private_key.clone(),
-        );
-        if let Err(err) = result {
-            error!("{}", err);
-            panic!();
-        }
-
-        let socket_address = SocketAddr::new(self.config.ip_address, self.config.port);
-        let tls_acceptor = TlsAcceptor::from(Arc::new(tls_config));
-        let tcp_listener = match TcpListener::bind(socket_address).await {
-            Ok(listener) => listener,
-            Err(_) => {
-                error!("Couldn't bind tcp socket to {}", socket_address);
-                panic!();
-            }
-        };
-        let udp_socket = match UdpSocket::bind(socket_address).await {
-            Ok(socket) => socket,
-            Err(_) => {
-                error!("Couldn't bind udp socket to {}", socket_address);
-                panic!();
-            }
-        };
-        let server_info = ServerInfo {
-            version: MUMBLE_PROTOCOL_VERSION.into(),
-            connected_users: self.storage.watch_connected_count(),
-            max_users: 10,
-            max_bandwidth: 128000,
-        };
-        let udp_worker = Arc::new(UdpWorker::start(udp_socket, server_info).await);
-        info!("Server listening on {}", socket_address);
-
-        loop {
-            let (tcp_stream, _) = match tcp_listener.accept().await {
-                Ok(stream) => stream,
-                Err(err) => {
-                    info!("Tcp error: {}", err);
-                    continue;
-                }
-            };
-            let tls_stream = match tls_acceptor.accept(tcp_stream).await {
-                Ok(stream) => stream,
-                Err(err) => {
-                    info!("Tls error: {}", err);
-                    continue;
-                }
-            };
-            let worker = ConnectionWorker::new(
-                Arc::clone(&self.session_pool),
-                Arc::clone(&self.storage),
-                Arc::clone(&self.clients),
-            );
-            worker
-                .start(
-                    tls_stream,
-                    self.create_client_config(),
-                    Arc::clone(&udp_worker),
-                )
-                .await;
-        }
-    }
-
-    fn create_client_config(&mut self) -> ClientConfig {
-        let crypto_key = self.generate_key();
-        let server_nonce = self.generate_key();
-        let client_nonce = self.generate_key();
-        ClientConfig {
-            crypto_key,
-            server_nonce,
-            client_nonce,
-            alpha_codec_version: -2147483637,
-            beta_codec_version: -2147483632,
-            prefer_alpha: true,
-            opus_support: true,
-            welcome_text: "Welcome".to_string(),
-            max_bandwidth: 128000,
-            max_users: 10,
-            allow_html: true,
-            max_message_length: 512,
-            max_image_message_length: 100000,
-            max_username_length: 64,
-            min_compatible_version: 0x10200,
-            server_password: None,
-            pbkdf2_iterations: NonZeroU32::new(100_000).unwrap(),
-        }
-    }
-
-    fn generate_key(&mut self) -> [u8; 16] {
-        let mut buffer = [0; 16];
-        self.rng.fill(&mut buffer);
-        buffer
-    }
-}

+ 12 - 8
src/server/udp_worker.rs

@@ -87,15 +87,19 @@ impl UdpWorker {
                         if len <= ENCRYPTION_OVERHEAD || len > MAX_DATAGRAM_SIZE {
                             continue;
                         } else if len == INFO_PING_SIZE {
-                            Self::response_to_ping(
+                            if let Err(err) = Self::response_to_ping(
                                 &buf[..12].try_into().unwrap(),
                                 &socket,
                                 address,
                                 &info,
                             )
-                            .await;
+                            .await
+                            {
+                                error!("UDP socket error: {}", err);
+                                break;
+                            };
                         } else if let Some(sender) = senders.get(&address).as_deref() {
-                            sender.send(Vec::from(&buf[..len])).await;
+                            sender.send(Vec::from(&buf[..len])).await.unwrap();
                         } else {
                             Self::check_queue(
                                 &queue,
@@ -106,8 +110,8 @@ impl UdpWorker {
                             );
                         }
                     }
-                    Err(error) => {
-                        error!("UDP task error: {}", error);
+                    Err(err) => {
+                        error!("UDP socket error: {}", err);
                         break;
                     }
                 }
@@ -142,7 +146,7 @@ impl UdpWorker {
                 destination: address,
                 senders,
             };
-            channel_sender.send(channel);
+            channel_sender.send(channel).ok();
 
             if list.is_empty() {
                 remove = true;
@@ -158,9 +162,9 @@ impl UdpWorker {
         socket: &UdpSocket,
         origin: SocketAddr,
         info: &ServerInfo,
-    ) {
+    ) -> std::io::Result<usize> {
         let bytes = Self::create_response(ping, info);
-        socket.send_to(&bytes, origin).await;
+        socket.send_to(&bytes, origin).await
     }
 
     fn create_response(ping: &[u8; INFO_PING_SIZE], info: &ServerInfo) -> [u8; RESPONSE_SIZE] {

+ 12 - 37
src/storage.rs

@@ -91,13 +91,12 @@ impl Storage {
             position: None,
         })
         .unwrap();
-        channels
-            .compare_and_swap(
-                to_bytes(ROOT_CHANNEL_ID),
-                Option::<&[u8]>::None,
-                Some(root_channel),
-            )
-            .unwrap();
+
+        if !channels.contains_key(to_bytes(ROOT_CHANNEL_ID)).unwrap() {
+            channels
+                .insert(to_bytes(ROOT_CHANNEL_ID), root_channel)
+                .unwrap();
+        }
 
         Storage {
             users,
@@ -195,23 +194,13 @@ impl Storage {
     }
 
     pub fn username_in_connected(&self, username: &str) -> bool {
-        if self
-            .guests
+        self.guests
             .iter()
-            .find(|entry| entry.value().username == username)
-            .is_some()
-        {
-            true
-        } else if self
-            .connected_users
-            .iter()
-            .find(|entry| entry.value().1 == username)
-            .is_some()
-        {
-            true
-        } else {
-            false
-        }
+            .any(|entry| entry.value().username == username)
+            || self
+                .connected_users
+                .iter()
+                .any(|entry| entry.value().1 == username)
     }
 
     pub fn watch_connected_count(&self) -> Arc<AtomicU32> {
@@ -240,20 +229,6 @@ impl Guest {
     }
 }
 
-impl SessionData {
-    fn new() -> Self {
-        SessionData {
-            muted_by_admin: false,
-            deafened_by_admin: false,
-            suppressed: false,
-            self_mute: false,
-            self_deaf: false,
-            priority_speaker: false,
-            recording: false,
-        }
-    }
-}
-
 fn to_bytes(number: u32) -> [u8; 4] {
     number.to_be_bytes()
 }