소스 검색

Bunch of stuff

- Begin implementing authentication.
- Update dependencies.
- Migrate to the pure rust protobuf generation.
  Protoc is no longer needed.
- Restructure project.
Sergey Chushin 3 년 전
부모
커밋
022a2fdb03

+ 1 - 1
.gitignore

@@ -3,4 +3,4 @@
 rumble.iml
 rumble.iml
 
 
 # generated files
 # generated files
-src/proto/mumble.rs
+src/protocol/mumble.rs

+ 58 - 72
Cargo.lock

@@ -4,9 +4,9 @@ version = 3
 
 
 [[package]]
 [[package]]
 name = "aes"
 name = "aes"
-version = "0.7.4"
+version = "0.7.5"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "495ee669413bfbe9e8cace80f4d3d78e6d8c8d99579f97fb93bde351b185f2d4"
+checksum = "9e8b47f52ea9bae42228d07ec09eb676433d7c4ed1ebdf0f1d1c29ed446f1ab8"
 dependencies = [
 dependencies = [
  "cfg-if",
  "cfg-if",
  "cipher",
  "cipher",
@@ -32,6 +32,17 @@ dependencies = [
  "winapi",
  "winapi",
 ]
 ]
 
 
+[[package]]
+name = "async-trait"
+version = "0.1.51"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "44318e776df68115a881de9a8fd1b9e53368d7a4a5ce4cc48517da3393233a5e"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
 [[package]]
 [[package]]
 name = "atty"
 name = "atty"
 version = "0.2.14"
 version = "0.2.14"
@@ -126,9 +137,9 @@ dependencies = [
 
 
 [[package]]
 [[package]]
 name = "cpufeatures"
 name = "cpufeatures"
-version = "0.1.5"
+version = "0.2.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "66c99696f6c9dd7f35d486b9d04d7e6e202aa3e8c40d553f2fdf5e7e0c6a71ef"
+checksum = "95059428f66df56b63431fdb4e1947ed2190586af5c5a8a8b71122bdf5a7f469"
 dependencies = [
 dependencies = [
  "libc",
  "libc",
 ]
 ]
@@ -167,16 +178,20 @@ dependencies = [
 ]
 ]
 
 
 [[package]]
 [[package]]
-name = "either"
-version = "1.6.1"
+name = "dashmap"
+version = "4.0.2"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457"
+checksum = "e77a43b28d0668df09411cb0bc9a8c2adc40f9a048afe863e05fd43251e8e39c"
+dependencies = [
+ "cfg-if",
+ "num_cpus",
+]
 
 
 [[package]]
 [[package]]
 name = "env_logger"
 name = "env_logger"
-version = "0.8.4"
+version = "0.9.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a19187fea3ac7e84da7dacf48de0c45d63c6a76f9490dae389aead16c243fce3"
+checksum = "0b2cf0344971ee6c64c31be0d530793fba457d322dfec2810c453d0ef228f9c3"
 dependencies = [
 dependencies = [
  "atty",
  "atty",
  "humantime",
  "humantime",
@@ -266,9 +281,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
 
 
 [[package]]
 [[package]]
 name = "libc"
 name = "libc"
-version = "0.2.95"
+version = "0.2.101"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "789da6d93f1b866ffe175afc5322a4d76c038605a1c3319bb57b06967ca98a36"
+checksum = "3cb00336871be5ed2c8ed44b60ae9959dc5b9f08539422ed43f09e34ecaeba21"
 
 
 [[package]]
 [[package]]
 name = "lock_api"
 name = "lock_api"
@@ -404,39 +419,27 @@ dependencies = [
 
 
 [[package]]
 [[package]]
 name = "protobuf"
 name = "protobuf"
-version = "2.23.0"
+version = "2.25.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "45604fc7a88158e7d514d8e22e14ac746081e7a70d7690074dd0029ee37458d6"
+checksum = "23129d50f2c9355ced935fce8a08bd706ee2e7ce2b3b33bf61dace0e379ac63a"
 
 
 [[package]]
 [[package]]
 name = "protobuf-codegen"
 name = "protobuf-codegen"
-version = "2.23.0"
+version = "2.25.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "cb87f342b585958c1c086313dbc468dcac3edf5e90362111c26d7a58127ac095"
+checksum = "4ba98ce0dadaa6de1e7f1b6d82a0a73b03e0c049169a167c919d906b0875026c"
 dependencies = [
 dependencies = [
  "protobuf",
  "protobuf",
 ]
 ]
 
 
 [[package]]
 [[package]]
-name = "protoc"
-version = "2.23.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4677a99cc1f866078918c00773cbb46dd72eecad949a31981de5aad1ff9bcc8d"
-dependencies = [
- "log",
- "which",
-]
-
-[[package]]
-name = "protoc-rust"
-version = "2.23.0"
+name = "protobuf-codegen-pure"
+version = "2.25.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b35a8c288bd8b80ab9eb660b75de7b99be75ee1a0a3920f15b4924d38da43f6c"
+checksum = "a2bab16316ed0c5794a06c399af55a3ca7b93496cb35cb4c15bcc8f5d824f2b7"
 dependencies = [
 dependencies = [
  "protobuf",
  "protobuf",
  "protobuf-codegen",
  "protobuf-codegen",
- "protoc",
- "tempfile",
 ]
 ]
 
 
 [[package]]
 [[package]]
@@ -450,9 +453,9 @@ dependencies = [
 
 
 [[package]]
 [[package]]
 name = "rand"
 name = "rand"
-version = "0.8.3"
+version = "0.8.4"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0ef9e7e66b4468674bfcb0c81af8b7fa0bb154fa9f28eb840da5c447baeb8d7e"
+checksum = "2e7573632e6454cf6b99d7aac4ccca54be06da05aca2ef7423d22d27d4d4bcd8"
 dependencies = [
 dependencies = [
  "libc",
  "libc",
  "rand_chacha",
  "rand_chacha",
@@ -514,15 +517,6 @@ version = "0.6.25"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b"
 checksum = "f497285884f3fcff424ffc933e56d7cbca511def0c9831a7f9b5f6153e3cc89b"
 
 
-[[package]]
-name = "remove_dir_all"
-version = "0.5.3"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7"
-dependencies = [
- "winapi",
-]
-
 [[package]]
 [[package]]
 name = "ring"
 name = "ring"
 version = "0.16.20"
 version = "0.16.20"
@@ -543,13 +537,16 @@ name = "rumble"
 version = "0.1.0"
 version = "0.1.0"
 dependencies = [
 dependencies = [
  "aes",
  "aes",
+ "async-trait",
  "bincode",
  "bincode",
  "clap",
  "clap",
+ "dashmap",
  "env_logger",
  "env_logger",
  "log",
  "log",
  "protobuf",
  "protobuf",
- "protoc-rust",
+ "protobuf-codegen-pure",
  "rand",
  "rand",
+ "ring",
  "serde",
  "serde",
  "sled",
  "sled",
  "tokio",
  "tokio",
@@ -587,18 +584,18 @@ dependencies = [
 
 
 [[package]]
 [[package]]
 name = "serde"
 name = "serde"
-version = "1.0.126"
+version = "1.0.130"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ec7505abeacaec74ae4778d9d9328fe5a5d04253220a85c4ee022239fc996d03"
+checksum = "f12d06de37cf59146fbdecab66aa99f9fe4f78722e3607577a5375d66bd0c913"
 dependencies = [
 dependencies = [
  "serde_derive",
  "serde_derive",
 ]
 ]
 
 
 [[package]]
 [[package]]
 name = "serde_derive"
 name = "serde_derive"
-version = "1.0.126"
+version = "1.0.130"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "963a7dbc9895aeac7ac90e74f34a5d5261828f79df35cbed41e10189d3804d43"
+checksum = "d7bc1a1ab1961464eae040d96713baa5a724a8152c1222492465b54322ec508b"
 dependencies = [
 dependencies = [
  "proc-macro2",
  "proc-macro2",
  "quote",
  "quote",
@@ -650,20 +647,6 @@ dependencies = [
  "unicode-xid",
  "unicode-xid",
 ]
 ]
 
 
-[[package]]
-name = "tempfile"
-version = "3.2.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "dac1c663cfc93810f88aed9b8941d48cabf856a1b111c29a40439018d870eb22"
-dependencies = [
- "cfg-if",
- "libc",
- "rand",
- "redox_syscall",
- "remove_dir_all",
- "winapi",
-]
-
 [[package]]
 [[package]]
 name = "termcolor"
 name = "termcolor"
 version = "1.1.2"
 version = "1.1.2"
@@ -684,9 +667,9 @@ dependencies = [
 
 
 [[package]]
 [[package]]
 name = "tokio"
 name = "tokio"
-version = "1.6.1"
+version = "1.10.1"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0a38d31d7831c6ed7aad00aa4c12d9375fd225a6dd77da1d25b707346319a975"
+checksum = "92036be488bb6594459f2e03b60e42df6f937fe6ca5c5ffdcb539c6b84dc40f5"
 dependencies = [
 dependencies = [
  "autocfg",
  "autocfg",
  "bytes",
  "bytes",
@@ -695,6 +678,19 @@ dependencies = [
  "mio",
  "mio",
  "num_cpus",
  "num_cpus",
  "pin-project-lite",
  "pin-project-lite",
+ "tokio-macros",
+ "winapi",
+]
+
+[[package]]
+name = "tokio-macros"
+version = "1.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "54473be61f4ebe4efd09cec9bd5d16fa51d70ea0192213d754d2d500457db110"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
 ]
 ]
 
 
 [[package]]
 [[package]]
@@ -824,16 +820,6 @@ dependencies = [
  "untrusted",
  "untrusted",
 ]
 ]
 
 
-[[package]]
-name = "which"
-version = "4.1.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b55551e42cbdf2ce2bedd2203d0cc08dba002c27510f86dab6d0ce304cba3dfe"
-dependencies = [
- "either",
- "libc",
-]
-
 [[package]]
 [[package]]
 name = "winapi"
 name = "winapi"
 version = "0.3.9"
 version = "0.3.9"

+ 13 - 7
Cargo.toml

@@ -9,19 +9,25 @@ edition = "2018"
 
 
 [dependencies]
 [dependencies]
 clap = "2.33.3"
 clap = "2.33.3"
-tokio = { version = "1.6.1", features = ["rt-multi-thread", "net", "io-util", "sync"] }
+tokio = { version = "1.10.1", features = ["rt-multi-thread", "net", "io-util", "sync", "macros"] }
 tokio-rustls = "0.22.0"
 tokio-rustls = "0.22.0"
-protobuf = "2.23.0"
+protobuf = "2.25.1"
+dashmap = "4.0.2"
+async-trait = "0.1.51"
+
 sled = "0.34.6"
 sled = "0.34.6"
-serde = { version = "1.0.126", features = ["derive"] }
+serde = { version = "1.0.130", features = ["derive"] }
 bincode = "1.3.3"
 bincode = "1.3.3"
-aes = "0.7.3"
-rand = "0.8.3"
+
+ring = "0.16.20"
+aes = "0.7.5"
+rand = "0.8.4"
+
 log = "0.4.14"
 log = "0.4.14"
-env_logger = "0.8.4"
+env_logger = "0.9.0"
 
 
 [build-dependencies]
 [build-dependencies]
-protoc-rust = "2.23.0"
+protobuf-codegen-pure = "2.25.1"
 
 
 [profile.release]
 [profile.release]
 lto = true
 lto = true

+ 13 - 5
build.rs

@@ -1,8 +1,16 @@
+use protobuf_codegen_pure::Customize;
+
 fn main() {
 fn main() {
-    protoc_rust::Codegen::new()
-        .out_dir("src/proto")
-        .input("src/proto/mumble.proto")
-        .include("src/proto")
+    let customize = Customize {
+        expose_fields: Some(true),
+        generate_accessors: Some(false),
+        ..Default::default()
+    };
+    protobuf_codegen_pure::Codegen::new()
+        .out_dir("src/protocol")
+        .inputs(&["src/protocol/mumble.proto"])
+        .include("src/protocol")
+        .customize(customize)
         .run()
         .run()
-        .expect("protoc");
+        .expect("Codegen failed.");
 }
 }

+ 0 - 448
src/client.rs

@@ -1,448 +0,0 @@
-use std::sync::Arc;
-
-use log::error;
-use tokio::sync::mpsc;
-use tokio::sync::mpsc::{Receiver, Sender};
-use tokio::task::JoinHandle;
-
-use crate::client::Error::StreamError;
-use crate::connection::{AudioChannel, ControlChannel};
-use crate::db::{Db, User};
-use crate::proto::mumble::{
-    ChannelState, CodecVersion, CryptSetup, Ping, ServerConfig, ServerSync, UserRemove, UserState,
-    Version,
-};
-use crate::protocol::{AudioData, AudioPacket, MumblePacket, MUMBLE_PROTOCOL_VERSION};
-use std::sync::atomic::Ordering;
-
-pub struct Client {
-    pub session_id: u32,
-    inner_event_sender: Sender<InnerEvent>,
-    handler_task: JoinHandle<()>,
-    control_channel_task: JoinHandle<()>,
-    audio_channel_task: Option<JoinHandle<()>>,
-}
-
-pub struct Config {
-    pub crypto_key: [u8; 16],
-    pub server_nonce: [u8; 16],
-    pub client_nonce: [u8; 16],
-    pub alpha_codec_version: i32,
-    pub beta_codec_version: i32,
-    pub prefer_alpha: bool,
-    pub opus_support: bool,
-    pub welcome_text: String,
-    pub max_bandwidth: u32,
-    pub max_users: u32,
-    pub allow_html: bool,
-    pub max_message_length: u32,
-    pub max_image_message_length: u32,
-}
-
-pub enum ClientEvent {
-    Talking(AudioData),
-    Disconnected,
-}
-
-pub enum ServerEvent {
-    UserConnected(u32),
-    UserDisconnected(u32),
-    UserTalking(AudioData),
-}
-
-pub enum Error {
-    AuthenticationError,
-    StreamError,
-    WrongPacket,
-}
-
-struct Handler {
-    db: Arc<Db>,
-    control_channel: Arc<ControlChannel>,
-    audio_channel: Option<Arc<AudioChannel>>,
-    client_event_sender: Sender<ClientEvent>,
-    session_id: u32,
-    crypto_resyncs: u32,
-}
-
-enum InnerEvent {
-    ServerEvent(ServerEvent),
-    ControlPacket(MumblePacket),
-    AudioPacket(AudioPacket),
-    AudioChannel(Arc<AudioChannel>),
-    SelfDisconnected,
-}
-
-impl Client {
-    pub async fn establish_connection(
-        db: Arc<Db>,
-        control_channel: ControlChannel,
-        config: Config,
-    ) -> Result<(Self, Receiver<ClientEvent>), Error> {
-        match control_channel.receive().await? {
-            MumblePacket::Version(version) => version,
-            _ => return Err(Error::WrongPacket),
-        };
-        let mut auth = match control_channel.receive().await? {
-            MumblePacket::Authenticate(auth) => auth,
-            _ => return Err(Error::WrongPacket),
-        };
-        if !auth.has_username() {
-            return Err(Error::AuthenticationError);
-        }
-        let session_id = db.add_new_user(auth.take_username()).await;
-
-        let version = {
-            let mut version = Version::new();
-            version.set_version(MUMBLE_PROTOCOL_VERSION);
-            MumblePacket::Version(version)
-        };
-        let crypt_setup = {
-            let key = config.crypto_key;
-            let server_nonce = config.server_nonce;
-            let client_nonce = config.client_nonce;
-            let mut crypt_setup = CryptSetup::new();
-            crypt_setup.set_key(Vec::from(key));
-            crypt_setup.set_server_nonce(Vec::from(server_nonce));
-            crypt_setup.set_client_nonce(Vec::from(client_nonce));
-            MumblePacket::CryptSetup(crypt_setup)
-        };
-        let codec_version = {
-            let mut codec_version = CodecVersion::new();
-            codec_version.set_alpha(config.alpha_codec_version);
-            codec_version.set_beta(config.beta_codec_version);
-            codec_version.set_prefer_alpha(config.prefer_alpha);
-            codec_version.set_opus(config.opus_support);
-            MumblePacket::CodecVersion(codec_version)
-        };
-        let channel_states: Vec<MumblePacket> = {
-            db.get_channels()
-                .await
-                .into_iter()
-                .map(|channel| {
-                    let mut channel_state = ChannelState::new();
-                    channel_state.set_channel_id(channel.id);
-                    channel_state.set_name(channel.name);
-                    MumblePacket::ChannelState(channel_state)
-                })
-                .collect()
-        };
-        let user_states: Vec<MumblePacket> = {
-            db.get_connected_users()
-                .await
-                .into_iter()
-                .map(|user| {
-                    let mut user_state = UserState::new();
-                    user_state.set_name(user.username);
-                    user_state.set_session(user.session_id);
-                    user_state.set_channel_id(user.channel_id);
-                    MumblePacket::UserState(user_state)
-                })
-                .collect()
-        };
-        let server_sync = {
-            let mut server_sync = ServerSync::new();
-            server_sync.set_session(session_id);
-            server_sync.set_welcome_text(config.welcome_text);
-            server_sync.set_max_bandwidth(config.max_bandwidth);
-            MumblePacket::ServerSync(server_sync)
-        };
-        let server_config = {
-            let mut server_config = ServerConfig::new();
-            server_config.set_max_users(config.max_users);
-            server_config.set_allow_html(config.allow_html);
-            server_config.set_message_length(config.max_message_length);
-            server_config.set_image_message_length(config.max_image_message_length);
-            MumblePacket::ServerConfig(server_config)
-        };
-
-        control_channel.send(version).await?;
-        control_channel.send(crypt_setup).await?;
-        control_channel.send(codec_version).await?;
-        control_channel.send_multiple(channel_states).await?;
-        control_channel.send_multiple(user_states).await?;
-        control_channel.send(server_sync).await?;
-        control_channel.send(server_config).await?;
-
-        let (client, response_receiver) = Client::new(control_channel, db, session_id).await;
-        Ok((client, response_receiver))
-    }
-
-    pub async fn set_audio_channel(&mut self, audio_channel: AudioChannel) {
-        let audio_channel = Arc::new(audio_channel);
-        let channel = Arc::clone(&audio_channel);
-        let sender = self.inner_event_sender.clone();
-        self.audio_channel_task = Some(tokio::spawn(async move {
-            loop {
-                match channel.receive().await {
-                    Ok(packet) => {
-                        sender.send(InnerEvent::AudioPacket(packet)).await;
-                    }
-                    Err(_) => break,
-                }
-            }
-        }));
-
-        self.inner_event_sender
-            .send(InnerEvent::AudioChannel(audio_channel))
-            .await;
-    }
-
-    pub async fn send_event(&self, event: ServerEvent) {
-        self.inner_event_sender
-            .send(InnerEvent::ServerEvent(event))
-            .await;
-    }
-
-    async fn new(
-        control_channel: ControlChannel,
-        db: Arc<Db>,
-        session_id: u32,
-    ) -> (Self, Receiver<ClientEvent>) {
-        let (inner_event_sender, inner_event_receiver) = mpsc::channel(1);
-        let (client_event_sender, response_receiver) = mpsc::channel(1);
-        let control_channel = Arc::new(control_channel);
-        let handler = Handler {
-            db,
-            control_channel: Arc::clone(&control_channel),
-            audio_channel: None,
-            client_event_sender,
-            session_id,
-            crypto_resyncs: 0,
-        };
-        let client = Client {
-            session_id,
-            inner_event_sender: inner_event_sender.clone(),
-            handler_task: Self::run_handler_task(handler, inner_event_receiver).await,
-            control_channel_task: Self::run_control_channel_task(
-                control_channel,
-                inner_event_sender,
-            )
-            .await,
-            audio_channel_task: None,
-        };
-
-        return (client, response_receiver);
-    }
-
-    async fn run_handler_task(
-        mut handler: Handler,
-        mut inner_event_receiver: Receiver<InnerEvent>,
-    ) -> JoinHandle<()> {
-        tokio::spawn(async move {
-            loop {
-                let message = match inner_event_receiver.recv().await {
-                    Some(msg) => msg,
-                    None => {
-                        error!("Handler task closed unexpectedly");
-                        break;
-                    }
-                };
-
-                let result = match message {
-                    InnerEvent::ServerEvent(event) => handler.handle_server_event(event).await,
-                    InnerEvent::ControlPacket(packet) => {
-                        handler.handle_control_packet(packet).await
-                    }
-                    InnerEvent::AudioPacket(audio) => handler.handle_audio_packet(audio).await,
-                    InnerEvent::AudioChannel(channel) => {
-                        handler.audio_channel = Some(channel);
-                        Ok(())
-                    }
-                    InnerEvent::SelfDisconnected => {
-                        handler.self_disconnected().await;
-                        break;
-                    }
-                };
-
-                if let Err(_) = result {
-                    // TODO
-                    error!("Handler task error");
-                }
-            }
-        })
-    }
-
-    async fn run_control_channel_task(
-        control_channel: Arc<ControlChannel>,
-        sender: Sender<InnerEvent>,
-    ) -> JoinHandle<()> {
-        tokio::spawn(async move {
-            loop {
-                match control_channel.receive().await {
-                    Ok(packet) => sender.send(InnerEvent::ControlPacket(packet)).await,
-                    Err(_) => {
-                        // TODO
-                        sender.send(InnerEvent::SelfDisconnected).await;
-                        return;
-                    }
-                };
-            }
-        })
-    }
-}
-
-impl Handler {
-    async fn handle_server_event(&self, message: ServerEvent) -> Result<(), Error> {
-        match message {
-            ServerEvent::UserConnected(session_id) => self.new_user_connected(session_id).await?,
-            ServerEvent::UserDisconnected(session_id) => self.user_disconnected(session_id).await?,
-            ServerEvent::UserTalking(audio_data) => self.user_talking(audio_data).await?,
-        }
-
-        Ok(())
-    }
-
-    async fn handle_control_packet(&self, packet: MumblePacket) -> Result<(), Error> {
-        match packet {
-            MumblePacket::Ping(ping) => self.handle_control_channel_ping(ping).await?,
-            MumblePacket::UdpTunnel(packet) => self.handle_tunnel(packet).await?,
-            MumblePacket::ChannelRemove(_) => error!("ChannelRemove unimplemented!"),
-            MumblePacket::ChannelState(_) => error!("ChannelState unimplemented!"),
-            MumblePacket::UserRemove(_) => error!("UserRemove unimplemented!"),
-            MumblePacket::UserState(_) => error!("UserState unimplemented!"),
-            MumblePacket::BanList(_) => error!("BanList unimplemented!"),
-            MumblePacket::TextMessage(_) => error!("TextMessage unimplemented!"),
-            MumblePacket::QueryUsers(_) => error!("TextMessage unimplemented!"),
-            MumblePacket::CryptSetup(_) => error!("CryptSetup unimplemented!"),
-            MumblePacket::ContextAction(_) => error!("ContextAction unimplemented!"),
-            MumblePacket::UserList(_) => error!("UserList unimplemented!"),
-            MumblePacket::VoiceTarget(_) => error!("VoiceTarget unimplemented!"),
-            MumblePacket::PermissionQuery(_) => error!("PermissionQuery unimplemented!"),
-            MumblePacket::UserStats(_) => error!("UserStats unimplemented!"),
-            MumblePacket::RequestBlob(_) => error!("RequestBlob unimplemented!"),
-            MumblePacket::Acl(_) => error!("Acl unimplemented!"),
-            // The rest is only sent by the server
-            _ => return Err(Error::WrongPacket),
-        }
-        Ok(())
-    }
-
-    async fn handle_control_channel_ping(&self, ping: Ping) -> Result<(), Error> {
-        let timestamp = ping.get_timestamp();
-        let mut ping = Ping::new();
-        if ping.has_timestamp() {
-            ping.set_timestamp(timestamp);
-        }
-        if let Some(channel) = self.audio_channel.as_ref() {
-            ping.set_good(channel.good.load(Ordering::Acquire));
-            ping.set_late(channel.late.load(Ordering::Acquire));
-            ping.set_lost(channel.lost.load(Ordering::Acquire));
-            ping.set_resync(self.crypto_resyncs);
-        }
-
-        self.control_channel.send(MumblePacket::Ping(ping)).await?;
-        Ok(())
-    }
-
-    async fn handle_tunnel(&self, packet: AudioPacket) -> Result<(), Error> {
-        match packet {
-            AudioPacket::Ping(_) => {
-                self.control_channel
-                    .send(MumblePacket::UdpTunnel(packet))
-                    .await?;
-            }
-            AudioPacket::AudioData(mut audio_data) => {
-                audio_data.session_id = Some(self.session_id);
-                self.client_event_sender
-                    .send(ClientEvent::Talking(audio_data))
-                    .await;
-            }
-        }
-
-        Ok(())
-    }
-
-    async fn handle_audio_packet(&self, packet: AudioPacket) -> Result<(), Error> {
-        match packet {
-            AudioPacket::Ping(_) => {
-                self.audio_channel.as_ref().unwrap().send(packet).await;
-            }
-            AudioPacket::AudioData(mut audio_data) => {
-                audio_data.session_id = Some(self.session_id);
-                self.client_event_sender
-                    .send(ClientEvent::Talking(audio_data))
-                    .await;
-            }
-        }
-
-        Ok(())
-    }
-
-    async fn new_user_connected(&self, session_id: u32) -> Result<(), Error> {
-        if let Some(user) = self.db.get_user_by_session_id(session_id).await {
-            self.control_channel.send(MumblePacket::from(user)).await?;
-        }
-        Ok(())
-    }
-
-    async fn user_disconnected(&self, session_id: u32) -> Result<(), Error> {
-        let mut user_remove = UserRemove::new();
-        user_remove.set_session(session_id);
-        Ok(self
-            .control_channel
-            .send(MumblePacket::UserRemove(user_remove))
-            .await?)
-    }
-
-    async fn self_disconnected(&self) {
-        self.db.remove_connected_user(self.session_id).await;
-        self.client_event_sender
-            .send(ClientEvent::Disconnected)
-            .await;
-    }
-
-    async fn user_talking(&self, audio_data: AudioData) -> Result<(), Error> {
-        let audio_packet = AudioPacket::AudioData(audio_data);
-        if let Some(channel) = self.audio_channel.as_ref() {
-            channel.send(audio_packet).await?;
-        } else {
-            self.control_channel
-                .send(MumblePacket::UdpTunnel(audio_packet))
-                .await;
-        }
-
-        Ok(())
-    }
-}
-
-impl Drop for Client {
-    fn drop(&mut self) {
-        self.handler_task.abort();
-        self.control_channel_task.abort();
-        if let Some(audio_task) = self.audio_channel_task.as_ref() {
-            audio_task.abort();
-        }
-    }
-}
-
-impl From<User> for UserState {
-    fn from(user: User) -> Self {
-        let mut user_state = UserState::new();
-        if let Some(id) = user.id {
-            user_state.set_user_id(id)
-        }
-        user_state.set_name(user.username);
-        user_state.set_channel_id(user.channel_id);
-        user_state.set_session(user.session_id);
-        user_state
-    }
-}
-
-impl From<User> for MumblePacket {
-    fn from(user: User) -> Self {
-        MumblePacket::UserState(UserState::from(user))
-    }
-}
-
-impl From<crate::protocol::Error> for Error {
-    fn from(_: crate::protocol::Error) -> Self {
-        StreamError
-    }
-}
-
-impl From<crate::connection::Error> for Error {
-    fn from(_: crate::connection::Error) -> Self {
-        StreamError
-    }
-}

+ 0 - 149
src/connection.rs

@@ -1,149 +0,0 @@
-use std::sync::Arc;
-
-use crate::protocol::{AudioPacket, MumblePacket};
-
-use crate::crypto::Ocb2Aes128Crypto;
-
-use std::net::SocketAddr;
-use tokio::io::{AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf};
-use tokio::net::{TcpStream, UdpSocket};
-use tokio::sync::Mutex;
-
-use std::sync::atomic::{AtomicU32, Ordering};
-use tokio::sync::mpsc::Receiver;
-use tokio_rustls::TlsStream;
-
-pub struct ControlChannel {
-    pub packets_received: AtomicU32,
-    writer: Mutex<WriteHalf<TlsStream<TcpStream>>>,
-    reader: Mutex<ReadHalf<TlsStream<TcpStream>>>,
-}
-
-pub struct AudioChannel {
-    pub good: AtomicU32,
-    pub late: AtomicU32,
-    pub lost: AtomicU32,
-    pub packets_received: AtomicU32,
-    raw_bytes_receiver: Mutex<Receiver<Vec<u8>>>,
-    crypto: Mutex<Ocb2Aes128Crypto>,
-    socket: Arc<UdpSocket>,
-    destination: SocketAddr,
-}
-
-pub enum Error {
-    IOError(std::io::Error),
-    ParsingError(crate::protocol::Error),
-    CryptError(crate::crypto::Error),
-}
-
-impl ControlChannel {
-    pub fn new(stream: TlsStream<TcpStream>) -> Self {
-        let (reader, writer) = tokio::io::split(stream);
-
-        ControlChannel {
-            writer: Mutex::new(writer),
-            reader: Mutex::new(reader),
-            packets_received: AtomicU32::new(0),
-        }
-    }
-
-    pub async fn receive(&self) -> Result<MumblePacket, Error> {
-        let mut packet_type = [0; 2];
-        let mut length = [0; 4];
-        let mut reader = self.reader.lock().await;
-        reader.read_exact(&mut packet_type).await?;
-        reader.read_exact(&mut length).await?;
-        let (packet_type, length) = MumblePacket::parse_prefix(packet_type, length);
-
-        let mut payload = vec![0; length as usize];
-        reader.read_exact(&mut payload).await?;
-        let packet = MumblePacket::parse_payload(packet_type, &payload)?;
-
-        self.packets_received.fetch_add(1, Ordering::Relaxed);
-        Ok(packet)
-    }
-
-    pub async fn send(&self, packet: MumblePacket) -> Result<(), Error> {
-        let bytes = packet.serialize();
-        let mut writer = self.writer.lock().await;
-        writer.write_all(&bytes).await?;
-        writer.flush().await?;
-        Ok(())
-    }
-
-    pub async fn send_multiple(&self, packets: Vec<MumblePacket>) -> Result<(), Error> {
-        for packet in packets {
-            self.send(packet).await?;
-        }
-
-        Ok(())
-    }
-}
-
-impl AudioChannel {
-    pub fn new(
-        raw_bytes_receiver: Receiver<Vec<u8>>,
-        socket: Arc<UdpSocket>,
-        crypto: Ocb2Aes128Crypto,
-        destination: SocketAddr,
-    ) -> Self {
-        let crypto = Mutex::new(crypto);
-
-        AudioChannel {
-            raw_bytes_receiver: Mutex::new(raw_bytes_receiver),
-            crypto,
-            socket,
-            destination,
-            good: AtomicU32::new(0),
-            late: AtomicU32::new(0),
-            lost: AtomicU32::new(0),
-            packets_received: AtomicU32::new(0),
-        }
-    }
-
-    pub async fn receive(&self) -> Result<AudioPacket, Error> {
-        let mut receiver = self.raw_bytes_receiver.lock().await;
-        let bytes = match receiver.recv().await {
-            Some(bytes) => bytes,
-            None => unimplemented!(),
-        };
-        let mut crypto = self.crypto.lock().await;
-        let decrypted = crypto.decrypt(&bytes)?;
-        self.good.swap(crypto.good, Ordering::Release);
-        self.late.swap(crypto.late, Ordering::Release);
-        self.lost.swap(crypto.lost, Ordering::Release);
-        drop(crypto);
-
-        let packet = AudioPacket::parse(decrypted)?;
-        self.packets_received.fetch_add(1, Ordering::Relaxed);
-        Ok(packet)
-    }
-
-    pub async fn send(&self, packet: AudioPacket) -> Result<(), Error> {
-        let bytes = packet.serialize();
-        let encrypted = {
-            let mut crypto = self.crypto.lock().await;
-            crypto.encrypt(&bytes)?
-        };
-        self.socket.send_to(&encrypted, self.destination).await?;
-        Ok(())
-    }
-}
-
-impl From<std::io::Error> for Error {
-    fn from(error: std::io::Error) -> Self {
-        Error::IOError(error)
-    }
-}
-
-impl From<crate::protocol::Error> for Error {
-    fn from(error: crate::protocol::Error) -> Self {
-        Error::ParsingError(error)
-    }
-}
-
-impl From<crate::crypto::Error> for Error {
-    fn from(error: crate::crypto::Error) -> Self {
-        Error::CryptError(error)
-    }
-}

+ 5 - 7
src/crypto.rs

@@ -1,7 +1,6 @@
-use std::cmp::Ordering;
-
 use aes::cipher::generic_array::GenericArray;
 use aes::cipher::generic_array::GenericArray;
 use aes::{Aes128, BlockDecrypt, BlockEncrypt, NewBlockCipher};
 use aes::{Aes128, BlockDecrypt, BlockEncrypt, NewBlockCipher};
+use std::cmp::Ordering;
 
 
 const AES_BLOCK_SIZE: usize = 16;
 const AES_BLOCK_SIZE: usize = 16;
 const SHIFT_BITS: u8 = 7;
 const SHIFT_BITS: u8 = 7;
@@ -25,7 +24,6 @@ pub enum Error {
 }
 }
 
 
 // Based on the official Mumble project CryptState implementation
 // Based on the official Mumble project CryptState implementation
-// TODO refactor this mess
 impl Ocb2Aes128Crypto {
 impl Ocb2Aes128Crypto {
     pub fn new(key: Key, encrypt_iv: Nonce, decrypt_iv: Nonce) -> Ocb2Aes128Crypto {
     pub fn new(key: Key, encrypt_iv: Nonce, decrypt_iv: Nonce) -> Ocb2Aes128Crypto {
         Ocb2Aes128Crypto {
         Ocb2Aes128Crypto {
@@ -51,7 +49,7 @@ impl Ocb2Aes128Crypto {
         }
         }
 
 
         let mut result = vec![0; plain.len() + 4];
         let mut result = vec![0; plain.len() + 4];
-        if !self.ocb_encrypt(&plain, &mut result[4..], self.encrypt_iv, &mut tag, true) {
+        if !self.ocb_encrypt(plain, &mut result[4..], self.encrypt_iv, &mut tag, true) {
             return Err(Error::Fail);
             return Err(Error::Fail);
         }
         }
 
 
@@ -209,14 +207,14 @@ impl Ocb2Aes128Crypto {
             }
             }
 
 
             s2(&mut delta);
             s2(&mut delta);
-            xor(&mut tmp, &delta, &plain);
+            xor(&mut tmp, &delta, plain);
             if flip_a_bit {
             if flip_a_bit {
                 tmp[0] ^= 1;
                 tmp[0] ^= 1;
             }
             }
             self.cipher
             self.cipher
                 .encrypt_block(GenericArray::from_mut_slice(&mut tmp));
                 .encrypt_block(GenericArray::from_mut_slice(&mut tmp));
             xor(encrypted, &delta, &tmp);
             xor(encrypted, &delta, &tmp);
-            xor_a(&mut checksum, &plain);
+            xor_a(&mut checksum, plain);
             if flip_a_bit {
             if flip_a_bit {
                 checksum[0] ^= 1;
                 checksum[0] ^= 1;
             }
             }
@@ -268,7 +266,7 @@ impl Ocb2Aes128Crypto {
             self.cipher
             self.cipher
                 .decrypt_block(GenericArray::from_mut_slice(&mut tmp));
                 .decrypt_block(GenericArray::from_mut_slice(&mut tmp));
             xor(plain, &delta, &tmp);
             xor(plain, &delta, &tmp);
-            xor_a(&mut checksum, &plain);
+            xor_a(&mut checksum, plain);
 
 
             encrypted = &encrypted[AES_BLOCK_SIZE..];
             encrypted = &encrypted[AES_BLOCK_SIZE..];
             plain = &mut plain[AES_BLOCK_SIZE..];
             plain = &mut plain[AES_BLOCK_SIZE..];

+ 0 - 110
src/db.rs

@@ -1,110 +0,0 @@
-use std::collections::HashMap;
-
-use serde::{Deserialize, Serialize};
-use std::sync::atomic::{AtomicU32, Ordering};
-use tokio::sync::RwLock;
-
-const ROOT_CHANNEL_ID: u32 = 0;
-const USER_TREE_NAME: &[u8] = b"users";
-const CHANNEL_TREE_NAME: &[u8] = b"channels";
-
-type SessionId = u32;
-
-pub struct Db {
-    db: sled::Db,
-    users: sled::Tree,
-    channels: sled::Tree,
-    connected_users: RwLock<HashMap<SessionId, User>>,
-    next_session_id: AtomicU32,
-}
-
-#[derive(Clone)]
-pub struct User {
-    pub id: Option<u32>,
-    pub username: String,
-    pub channel_id: u32,
-    pub session_id: SessionId,
-}
-
-#[derive(Serialize, Deserialize)]
-struct PersistentUserData {
-    id: u32,
-    username: String,
-    channel_id: u32,
-}
-
-#[derive(Serialize, Deserialize)]
-pub struct Channel {
-    pub id: u32,
-    pub name: String,
-}
-
-impl Db {
-    pub fn open(path_to_db_file: &str) -> Self {
-        let db = sled::open(path_to_db_file).expect("Unable to open database");
-        let users = db.open_tree(USER_TREE_NAME).unwrap();
-        let channels = db.open_tree(CHANNEL_TREE_NAME).unwrap();
-
-        let root_channel = bincode::serialize(&Channel {
-            id: 0,
-            name: "Root".to_string(),
-        })
-        .unwrap();
-        channels
-            .compare_and_swap(
-                ROOT_CHANNEL_ID.to_be_bytes(),
-                Option::<&[u8]>::None,
-                Some(root_channel),
-            )
-            .unwrap();
-
-        Db {
-            db,
-            users,
-            channels,
-            connected_users: RwLock::new(HashMap::new()),
-            next_session_id: AtomicU32::new(0),
-        }
-    }
-
-    pub async fn add_new_user(&self, username: String) -> u32 {
-        let session_id = self.next_session_id.fetch_add(1, Ordering::SeqCst);
-        let mut connected_users = self.connected_users.write().await;
-        connected_users.insert(
-            session_id,
-            User {
-                id: None,
-                username,
-                channel_id: ROOT_CHANNEL_ID,
-                session_id,
-            },
-        );
-        session_id
-    }
-
-    pub async fn get_channels(&self) -> Vec<Channel> {
-        self.channels
-            .iter()
-            .values()
-            .map(|channel| bincode::deserialize(&channel.unwrap()).unwrap())
-            .collect()
-    }
-
-    pub async fn get_connected_users(&self) -> Vec<User> {
-        let users = self.connected_users.read().await;
-        users.values().cloned().collect()
-    }
-
-    pub async fn get_user_by_session_id(&self, session_id: u32) -> Option<User> {
-        let connected_users = self.connected_users.read().await;
-        if let Some(user) = connected_users.get(&session_id) {
-            return Some(user.clone());
-        }
-        None
-    }
-
-    pub async fn remove_connected_user(&self, session_id: u32) {
-        let mut connected_users = self.connected_users.write().await;
-        connected_users.remove(&session_id);
-    }
-}

+ 4 - 8
src/main.rs

@@ -1,18 +1,14 @@
-use std::fs::File;
-use std::io::BufReader;
-
 use crate::server::Server;
 use crate::server::Server;
 use clap::{App, Arg};
 use clap::{App, Arg};
+use std::fs::File;
+use std::io::BufReader;
 use tokio::runtime::Builder;
 use tokio::runtime::Builder;
 use tokio_rustls::rustls::{internal::pemfile, Certificate, PrivateKey};
 use tokio_rustls::rustls::{internal::pemfile, Certificate, PrivateKey};
 
 
-mod client;
-mod connection;
 mod crypto;
 mod crypto;
-mod db;
-mod proto;
 mod protocol;
 mod protocol;
 mod server;
 mod server;
+mod storage;
 
 
 fn main() {
 fn main() {
     env_logger::init();
     env_logger::init();
@@ -59,7 +55,7 @@ fn main() {
     let keyfile = matches.value_of("private key").unwrap();
     let keyfile = matches.value_of("private key").unwrap();
     let path = "db/".to_string();
     let path = "db/".to_string();
 
 
-    let config = server::Config {
+    let config = crate::server::Config {
         ip_address: ip.parse().unwrap(),
         ip_address: ip.parse().unwrap(),
         port: port.parse().unwrap(),
         port: port.parse().unwrap(),
         certificate: read_certificate(cert_file),
         certificate: read_certificate(cert_file),

+ 0 - 1
src/proto/mod.rs

@@ -1 +0,0 @@
-pub mod mumble;

+ 0 - 386
src/protocol.rs

@@ -1,386 +0,0 @@
-use protobuf::{Message, ProtobufError};
-
-use crate::proto::mumble::{
-    Authenticate, BanList, ChannelRemove, ChannelState, CodecVersion, ContextAction,
-    ContextActionModify, CryptSetup, PermissionDenied, PermissionQuery, Ping, QueryUsers, Reject,
-    RequestBlob, ServerConfig, ServerSync, SuggestConfig, TextMessage, UserList, UserRemove,
-    UserState, UserStats, Version, VoiceTarget, ACL as Acl,
-};
-
-pub const MUMBLE_PROTOCOL_VERSION: u32 = 0x10304;
-pub const MAX_AUDIO_PACKET_SIZE: usize = 1020;
-
-const VERSION: u16 = 0;
-const UDP_TUNNEL: u16 = 1;
-const AUTHENTICATE: u16 = 2;
-const PING: u16 = 3;
-const REJECT: u16 = 4;
-const SERVER_SYNC: u16 = 5;
-const CHANNEL_REMOVE: u16 = 6;
-const CHANNEL_STATE: u16 = 7;
-const USER_REMOVE: u16 = 8;
-const USER_STATE: u16 = 9;
-const BAN_LIST: u16 = 10;
-const TEXT_MESSAGE: u16 = 11;
-const PERMISSION_DENIED: u16 = 12;
-const ACL: u16 = 13;
-const QUERY_USERS: u16 = 14;
-const CRYPT_SETUP: u16 = 15;
-const CONTEXT_ACTION_MODIFY: u16 = 16;
-const CONTEXT_ACTION: u16 = 17;
-const USER_LIST: u16 = 18;
-const VOICE_TARGET: u16 = 19;
-const PERMISSION_QUERY: u16 = 20;
-const CODEC_VERSION: u16 = 21;
-const USER_STATS: u16 = 22;
-const REQUEST_BLOB: u16 = 23;
-const SERVER_CONFIG: u16 = 24;
-const SUGGEST_CONFIG: u16 = 25;
-
-const TYPE_SIZE: usize = 2;
-const LENGTH_SIZE: usize = 4;
-
-pub enum MumblePacket {
-    Version(Version),
-    UdpTunnel(AudioPacket),
-    Authenticate(Authenticate),
-    Ping(Ping),
-    Reject(Reject),
-    ServerSync(ServerSync),
-    ChannelRemove(ChannelRemove),
-    ChannelState(ChannelState),
-    UserRemove(UserRemove),
-    UserState(UserState),
-    BanList(BanList),
-    TextMessage(TextMessage),
-    PermissionDenied(PermissionDenied),
-    Acl(Acl),
-    QueryUsers(QueryUsers),
-    CryptSetup(CryptSetup),
-    ContextActionModify(ContextActionModify),
-    ContextAction(ContextAction),
-    UserList(UserList),
-    VoiceTarget(VoiceTarget),
-    PermissionQuery(PermissionQuery),
-    CodecVersion(CodecVersion),
-    UserStats(UserStats),
-    RequestBlob(RequestBlob),
-    ServerConfig(ServerConfig),
-    SuggestConfig(SuggestConfig),
-}
-
-pub enum AudioPacket {
-    Ping(AudioPing),
-    AudioData(AudioData),
-}
-
-pub enum Error {
-    UnknownPacketType,
-    ParsingError,
-}
-
-pub struct AudioPing {
-    bytes: Vec<u8>,
-}
-
-#[derive(Clone)]
-pub struct AudioData {
-    pub session_id: Option<u32>,
-    bytes: Vec<u8>,
-}
-
-impl MumblePacket {
-    pub fn parse_prefix(packet_type: [u8; TYPE_SIZE], length: [u8; LENGTH_SIZE]) -> (u16, u32) {
-        (u16::from_be_bytes(packet_type), u32::from_be_bytes(length))
-    }
-
-    pub fn parse_payload(packet_type: u16, payload: &[u8]) -> Result<Self, Error> {
-        match packet_type {
-            VERSION => Ok(MumblePacket::Version(Version::parse_from_bytes(payload)?)),
-            UDP_TUNNEL => Ok(MumblePacket::UdpTunnel(AudioPacket::parse(
-                payload.to_vec(),
-            )?)),
-            AUTHENTICATE => Ok(MumblePacket::Authenticate(Authenticate::parse_from_bytes(
-                payload,
-            )?)),
-            PING => Ok(MumblePacket::Ping(Ping::parse_from_bytes(payload)?)),
-            REJECT => Ok(MumblePacket::Reject(Reject::parse_from_bytes(payload)?)),
-            SERVER_SYNC => Ok(MumblePacket::ServerSync(ServerSync::parse_from_bytes(
-                payload,
-            )?)),
-            CHANNEL_REMOVE => Ok(MumblePacket::ChannelRemove(
-                ChannelRemove::parse_from_bytes(payload)?,
-            )),
-            CHANNEL_STATE => Ok(MumblePacket::ChannelState(ChannelState::parse_from_bytes(
-                payload,
-            )?)),
-            USER_REMOVE => Ok(MumblePacket::UserRemove(UserRemove::parse_from_bytes(
-                payload,
-            )?)),
-            USER_STATE => Ok(MumblePacket::UserState(UserState::parse_from_bytes(
-                payload,
-            )?)),
-            BAN_LIST => Ok(MumblePacket::BanList(BanList::parse_from_bytes(payload)?)),
-            TEXT_MESSAGE => Ok(MumblePacket::TextMessage(TextMessage::parse_from_bytes(
-                payload,
-            )?)),
-            PERMISSION_DENIED => Ok(MumblePacket::PermissionDenied(
-                PermissionDenied::parse_from_bytes(payload)?,
-            )),
-            ACL => Ok(MumblePacket::Acl(Acl::parse_from_bytes(payload)?)),
-            QUERY_USERS => Ok(MumblePacket::QueryUsers(QueryUsers::parse_from_bytes(
-                payload,
-            )?)),
-            CRYPT_SETUP => Ok(MumblePacket::CryptSetup(CryptSetup::parse_from_bytes(
-                payload,
-            )?)),
-            CONTEXT_ACTION_MODIFY => Ok(MumblePacket::ContextActionModify(
-                ContextActionModify::parse_from_bytes(payload)?,
-            )),
-            CONTEXT_ACTION => Ok(MumblePacket::ContextAction(
-                ContextAction::parse_from_bytes(payload)?,
-            )),
-            USER_LIST => Ok(MumblePacket::UserList(UserList::parse_from_bytes(payload)?)),
-            VOICE_TARGET => Ok(MumblePacket::VoiceTarget(VoiceTarget::parse_from_bytes(
-                payload,
-            )?)),
-            PERMISSION_QUERY => Ok(MumblePacket::PermissionQuery(
-                PermissionQuery::parse_from_bytes(payload)?,
-            )),
-            CODEC_VERSION => Ok(MumblePacket::CodecVersion(CodecVersion::parse_from_bytes(
-                payload,
-            )?)),
-            USER_STATS => Ok(MumblePacket::UserStats(UserStats::parse_from_bytes(
-                payload,
-            )?)),
-            REQUEST_BLOB => Ok(MumblePacket::RequestBlob(RequestBlob::parse_from_bytes(
-                payload,
-            )?)),
-            SERVER_CONFIG => Ok(MumblePacket::ServerConfig(ServerConfig::parse_from_bytes(
-                payload,
-            )?)),
-            SUGGEST_CONFIG => Ok(MumblePacket::SuggestConfig(
-                SuggestConfig::parse_from_bytes(&payload)?,
-            )),
-            _ => Err(Error::UnknownPacketType),
-        }
-    }
-
-    pub fn serialize(self) -> Vec<u8> {
-        match self {
-            MumblePacket::UdpTunnel(voice_packet) => {
-                let bytes = voice_packet.serialize();
-                return UDP_TUNNEL
-                    .to_be_bytes()
-                    .iter()
-                    .cloned()
-                    .chain((bytes.len() as u32).to_be_bytes().iter().cloned())
-                    .chain(bytes)
-                    .collect();
-            }
-            MumblePacket::Version(value) => Self::serialize_protobuf_packet(&value, VERSION),
-            MumblePacket::Authenticate(value) => {
-                Self::serialize_protobuf_packet(&value, AUTHENTICATE)
-            }
-            MumblePacket::Ping(value) => Self::serialize_protobuf_packet(&value, PING),
-            MumblePacket::Reject(value) => Self::serialize_protobuf_packet(&value, REJECT),
-            MumblePacket::ServerSync(value) => Self::serialize_protobuf_packet(&value, SERVER_SYNC),
-            MumblePacket::ChannelRemove(value) => {
-                Self::serialize_protobuf_packet(&value, CHANNEL_REMOVE)
-            }
-            MumblePacket::ChannelState(value) => {
-                Self::serialize_protobuf_packet(&value, CHANNEL_STATE)
-            }
-            MumblePacket::UserRemove(value) => Self::serialize_protobuf_packet(&value, USER_REMOVE),
-            MumblePacket::UserState(value) => Self::serialize_protobuf_packet(&value, USER_STATE),
-            MumblePacket::BanList(value) => Self::serialize_protobuf_packet(&value, BAN_LIST),
-            MumblePacket::TextMessage(value) => {
-                Self::serialize_protobuf_packet(&value, TEXT_MESSAGE)
-            }
-            MumblePacket::PermissionDenied(value) => {
-                Self::serialize_protobuf_packet(&value, PERMISSION_DENIED)
-            }
-            MumblePacket::Acl(value) => Self::serialize_protobuf_packet(&value, ACL),
-            MumblePacket::QueryUsers(value) => Self::serialize_protobuf_packet(&value, QUERY_USERS),
-            MumblePacket::CryptSetup(value) => Self::serialize_protobuf_packet(&value, CRYPT_SETUP),
-            MumblePacket::ContextActionModify(value) => {
-                Self::serialize_protobuf_packet(&value, CONTEXT_ACTION_MODIFY)
-            }
-            MumblePacket::ContextAction(value) => {
-                Self::serialize_protobuf_packet(&value, CONTEXT_ACTION)
-            }
-            MumblePacket::UserList(value) => Self::serialize_protobuf_packet(&value, USER_LIST),
-            MumblePacket::VoiceTarget(value) => {
-                Self::serialize_protobuf_packet(&value, VOICE_TARGET)
-            }
-            MumblePacket::PermissionQuery(value) => {
-                Self::serialize_protobuf_packet(&value, PERMISSION_QUERY)
-            }
-            MumblePacket::CodecVersion(value) => {
-                Self::serialize_protobuf_packet(&value, CODEC_VERSION)
-            }
-            MumblePacket::UserStats(value) => Self::serialize_protobuf_packet(&value, USER_STATS),
-            MumblePacket::RequestBlob(value) => {
-                Self::serialize_protobuf_packet(&value, REQUEST_BLOB)
-            }
-            MumblePacket::ServerConfig(value) => {
-                Self::serialize_protobuf_packet(&value, SERVER_CONFIG)
-            }
-            MumblePacket::SuggestConfig(value) => {
-                Self::serialize_protobuf_packet(&value, SUGGEST_CONFIG)
-            }
-        }
-    }
-
-    fn serialize_protobuf_packet<T>(packet: &T, packet_type: u16) -> Vec<u8>
-    where
-        T: Message,
-    {
-        let bytes = packet.write_to_bytes().unwrap();
-        return packet_type
-            .to_be_bytes()
-            .iter()
-            .cloned()
-            .chain((bytes.len() as u32).to_be_bytes().iter().cloned())
-            .chain(bytes)
-            .collect();
-    }
-}
-
-impl AudioPacket {
-    pub fn parse(bytes: Vec<u8>) -> Result<Self, Error> {
-        if bytes.is_empty() || bytes.len() > MAX_AUDIO_PACKET_SIZE {
-            return Err(Error::ParsingError);
-        }
-
-        let header = bytes.first().unwrap();
-        let (packet_type, _) = decode_header(*header);
-        if packet_type == 1 {
-            return Ok(AudioPacket::Ping(AudioPing { bytes }));
-        }
-
-        Ok(AudioPacket::AudioData(AudioData {
-            session_id: None,
-            bytes,
-        }))
-    }
-
-    pub fn serialize(self) -> Vec<u8> {
-        match self {
-            AudioPacket::Ping(ping) => ping.bytes,
-            AudioPacket::AudioData(audio_data) => {
-                if let Some(session_id) = audio_data.session_id {
-                    let mut bytes = audio_data.bytes;
-                    let varint = encode_varint(session_id as u64);
-                    return std::iter::once(bytes.remove(0))
-                        .chain(varint)
-                        .chain(bytes)
-                        .collect();
-                }
-                audio_data.bytes
-            }
-        }
-    }
-}
-
-fn decode_header(header: u8) -> (u8, u8) {
-    let packet_type = header >> 5;
-    let target = header & 0b0001_1111;
-    (packet_type, target)
-}
-
-fn encode_varint(number: u64) -> Vec<u8> {
-    let mut result = vec![];
-
-    if number < 0x80 {
-        //7-bit number
-        result.push(number as u8);
-    } else if number < 0x4000 {
-        //14-bit number
-        result.push(((number >> 8) | 0x80) as u8);
-        result.push((number & 0xFF) as u8);
-    } else if number < 0x200000 {
-        //21-bit number
-        result.push(((number >> 16) | 0xC0) as u8);
-        result.push(((number >> 8) & 0xFF) as u8);
-        result.push((number & 0xFF) as u8);
-    } else if number < 0x10000000 {
-        //28-bit number
-        result.push(((number >> 24) | 0xE0) as u8);
-        result.push(((number >> 16) & 0xFF) as u8);
-        result.push(((number >> 8) & 0xFF) as u8);
-        result.push((number & 0xFF) as u8);
-    } else if number < 0x100000000 {
-        //32-bit number
-        result.push(0xF0);
-        result.push(((number >> 24) & 0xFF) as u8);
-        result.push(((number >> 16) & 0xFF) as u8);
-        result.push(((number >> 8) & 0xFF) as u8);
-        result.push((number & 0xFF) as u8);
-    } else {
-        //64-bit number
-        result.push(0xF4);
-        result.push(((number >> 56) & 0xFF) as u8);
-        result.push(((number >> 48) & 0xFF) as u8);
-        result.push(((number >> 40) & 0xFF) as u8);
-        result.push(((number >> 32) & 0xFF) as u8);
-        result.push(((number >> 24) & 0xFF) as u8);
-        result.push(((number >> 16) & 0xFF) as u8);
-        result.push(((number >> 8) & 0xFF) as u8);
-        result.push((number & 0xFF) as u8);
-    }
-
-    result
-}
-
-impl From<ProtobufError> for Error {
-    fn from(_: ProtobufError) -> Self {
-        Error::ParsingError
-    }
-}
-
-//TODO write more tests
-#[cfg(test)]
-mod tests {
-    use super::*;
-
-    #[test]
-    fn test_decode_header() {
-        assert_eq!(decode_header(0b0100_1000), (2, 8));
-        assert_eq!(decode_header(0b0111_1111), (3, 31));
-        assert_eq!(decode_header(0b1000_0000), (4, 0));
-    }
-
-    #[test]
-    fn test_encode_varint() {
-        let varint_7bit_positive = vec![0b0000_1000];
-        let varint_14bit_positive = vec![0b1010_0010, 0b0000_0011];
-        let varint_21bit_positive = vec![0b1101_0100, 0b0000_0000, 0b0000_0000];
-        let varint_28bit_positive = vec![0b1110_1100, 0b0100_0000, 0b0010_0000, 0b0000_0001];
-        let varint_32bit_positive = vec![
-            0b1111_0000,
-            0b1100_0000,
-            0b0000_0000,
-            0b0000_0000,
-            0b0000_0001,
-        ];
-        let varint_64bit_positive = vec![
-            0b1111_0100,
-            0b1100_0000,
-            0b0000_0000,
-            0b0000_0000,
-            0b0000_0001,
-            0b0000_0000,
-            0b0000_0000,
-            0b0000_0000,
-            0b0001_0000,
-        ];
-
-        assert_eq!(encode_varint(0x8), varint_7bit_positive);
-        assert_eq!(encode_varint(0x2203), varint_14bit_positive);
-        assert_eq!(encode_varint(0x140000), varint_21bit_positive);
-        assert_eq!(encode_varint(0xc402001), varint_28bit_positive);
-        assert_eq!(encode_varint(0xc0000001), varint_32bit_positive);
-        assert_eq!(encode_varint(0xc000000100000010), varint_64bit_positive);
-    }
-}

+ 37 - 0
src/protocol/connection.rs

@@ -0,0 +1,37 @@
+use crate::protocol::parser::{AudioPacket, ControlMessage, Message};
+use async_trait::async_trait;
+
+#[async_trait]
+pub trait ControlChannel: Send + Sync {
+    async fn send(&self, message: impl Message + 'async_trait) -> Result<(), Error>;
+
+    async fn receive(&self) -> Result<ControlMessage, Error>;
+
+    fn get_stats(&self) -> ControlChannelStats;
+}
+
+#[async_trait]
+pub trait AudioChannel: Send + Sync {
+    async fn send(&self, packet: AudioPacket) -> Result<(), Error>;
+
+    async fn receive(&self) -> Result<AudioPacket, Error>;
+
+    fn get_stats(&self) -> AudioChannelStats;
+}
+
+pub struct ControlChannelStats {
+    pub received: u32,
+}
+
+pub struct AudioChannelStats {
+    pub good: u32,
+    pub late: u32,
+    pub lost: u32,
+    pub received: u32,
+}
+
+#[derive(Debug)]
+pub enum Error {
+    IO(std::io::Error),
+    Parsing(crate::protocol::parser::ParsingError),
+}

+ 4 - 0
src/protocol/mod.rs

@@ -0,0 +1,4 @@
+pub mod connection;
+pub mod parser;
+
+mod mumble;

+ 0 - 0
src/proto/mumble.proto → src/protocol/mumble.proto


+ 678 - 0
src/protocol/parser.rs

@@ -0,0 +1,678 @@
+use crate::protocol::mumble;
+use protobuf::{Message as ProtobufMessage, ProtobufError, SingularField};
+use std::num::NonZeroU32;
+
+pub const MUMBLE_PROTOCOL_VERSION: ProtocolVersion = ProtocolVersion {
+    major: 1,
+    minor: 3,
+    patch: 4,
+};
+
+const VERSION: u16 = 0;
+const UDP_TUNNEL: u16 = 1;
+const AUTHENTICATE: u16 = 2;
+const PING: u16 = 3;
+const REJECT: u16 = 4;
+const SERVER_SYNC: u16 = 5;
+const CHANNEL_REMOVE: u16 = 6;
+const CHANNEL_STATE: u16 = 7;
+const USER_REMOVE: u16 = 8;
+const USER_STATE: u16 = 9;
+const BAN_LIST: u16 = 10;
+const TEXT_MESSAGE: u16 = 11;
+const PERMISSION_DENIED: u16 = 12;
+const ACL: u16 = 13;
+const QUERY_USERS: u16 = 14;
+const CRYPT_SETUP: u16 = 15;
+const CONTEXT_ACTION_MODIFY: u16 = 16;
+const CONTEXT_ACTION: u16 = 17;
+const USER_LIST: u16 = 18;
+const VOICE_TARGET: u16 = 19;
+const PERMISSION_QUERY: u16 = 20;
+const CODEC_VERSION: u16 = 21;
+const USER_STATS: u16 = 22;
+const REQUEST_BLOB: u16 = 23;
+const SERVER_CONFIG: u16 = 24;
+
+const SUGGEST_CONFIG: u16 = 25;
+const TYPE_SIZE: usize = 2;
+const LENGTH_SIZE: usize = 4;
+
+pub trait Message: Into<ControlMessage> + Send {
+    fn serialize(self) -> Vec<u8>;
+}
+
+pub enum ControlMessage {
+    Acl(),
+    Authenticate(Authenticate),
+    BanList(),
+    ChannelRemove(),
+    ChannelState(ChannelState),
+    CodecVersion(CodecVersion),
+    ContextAction(),
+    ContextActionModify(),
+    CryptSetup(CryptSetup),
+    PermissionDenied(),
+    PermissionQuery(),
+    Ping(Ping),
+    QueryUsers(),
+    Reject(),
+    RequestBlob(),
+    ServerConfig(ServerConfig),
+    ServerSync(ServerSync),
+    SuggestConfig(),
+    TextMessage(),
+    UserList(),
+    UserRemove(UserRemove),
+    UserState(UserState),
+    UserStats(),
+    UdpTunnel(UdpTunnel),
+    Version(Version),
+    VoiceTarget(),
+}
+
+pub enum AudioPacket {
+    Ping(AudioPing),
+    AudioData(AudioData),
+}
+
+#[derive(Clone)]
+pub enum SessionId {
+    // 0 is reserved for SuperUser
+    SuperUser,
+    User(NonZeroU32),
+}
+
+#[derive(Debug)]
+pub enum ParsingError {
+    MalformedInput,
+}
+
+#[derive(Clone)]
+pub struct AudioData {
+    pub session_id: Option<SessionId>,
+    packet: Vec<u8>,
+}
+
+pub struct AudioPing {
+    packet: Vec<u8>,
+}
+
+#[derive(Eq, PartialEq, Debug, Clone)]
+pub struct ProtocolVersion {
+    pub major: u16,
+    pub minor: u8,
+    pub patch: u8,
+}
+
+#[derive(Default)]
+pub struct Authenticate {
+    pub username: Option<String>,
+    pub password: Option<String>,
+}
+
+#[derive(Default)]
+pub struct ChannelState {
+    pub id: Option<u32>,
+    pub name: Option<String>,
+}
+
+pub struct CodecVersion {
+    pub celt_alpha_version: i32,
+    pub celt_beta_version: i32,
+    pub prefer_alpha: bool,
+    pub opus_support: bool,
+}
+
+#[derive(Default)]
+pub struct CryptSetup {
+    pub key: Option<Vec<u8>>,
+    pub client_nonce: Option<Vec<u8>>,
+    pub server_nonce: Option<Vec<u8>>,
+}
+
+#[derive(Default)]
+pub struct Ping {
+    pub timestamp: Option<u64>,
+    pub good: Option<u32>,
+    pub late: Option<u32>,
+    pub lost: Option<u32>,
+    pub resyncs: Option<u32>,
+}
+
+pub struct ServerConfig {
+    pub max_users: u32,
+    pub max_message_length: u32,
+}
+
+pub struct ServerSync {
+    pub user_session_id: SessionId,
+    pub max_bandwidth: u32,
+    pub welcome_text: String,
+}
+
+pub struct UserRemove {
+    pub session_id: SessionId,
+}
+
+#[derive(Default)]
+pub struct UserState {
+    pub session_id: Option<SessionId>,
+    pub name: Option<String>,
+    pub channel_id: Option<u32>,
+}
+
+pub struct UdpTunnel {
+    pub audio_packet: AudioPacket,
+}
+
+#[derive(Default)]
+pub struct Version {
+    pub version: Option<ProtocolVersion>,
+}
+
+impl ControlMessage {
+    pub fn parse_prefix(packet_type: [u8; TYPE_SIZE], length: [u8; LENGTH_SIZE]) -> (u16, u32) {
+        (u16::from_be_bytes(packet_type), u32::from_be_bytes(length))
+    }
+
+    pub fn parse_payload(packet_type: u16, payload: &[u8]) -> Result<Self, ParsingError> {
+        Ok(match packet_type {
+            AUTHENTICATE => {
+                Authenticate::from(mumble::Authenticate::parse_from_bytes(payload)?).into()
+            }
+            CRYPT_SETUP => CryptSetup::from(mumble::CryptSetup::parse_from_bytes(payload)?).into(),
+            PING => Ping::from(mumble::Ping::parse_from_bytes(payload)?).into(),
+            UDP_TUNNEL => ControlMessage::UdpTunnel(UdpTunnel {
+                audio_packet: AudioPacket::parse(payload.to_vec())?,
+            }),
+            VERSION => Version::from(mumble::Version::parse_from_bytes(payload)?).into(),
+            // TODO
+            _ => return Err(ParsingError::MalformedInput),
+        })
+    }
+}
+
+impl AudioPacket {
+    pub fn parse(bytes: Vec<u8>) -> Result<Self, ParsingError> {
+        let header = bytes.first().unwrap();
+        let (packet_type, _) = decode_header(*header);
+        if packet_type == 1 {
+            return Ok(AudioPacket::Ping(AudioPing { packet: bytes }));
+        }
+
+        Ok(AudioPacket::AudioData(AudioData {
+            session_id: None,
+            packet: bytes,
+        }))
+    }
+
+    pub fn serialize(self) -> Vec<u8> {
+        match self {
+            AudioPacket::Ping(ping) => ping.packet,
+            AudioPacket::AudioData(audio_data) => {
+                if let Some(session_id) = audio_data.session_id {
+                    let mut bytes = audio_data.packet;
+                    let varint = encode_varint(u32::from(session_id) as u64);
+                    return std::iter::once(bytes.remove(0))
+                        .chain(varint)
+                        .chain(bytes)
+                        .collect();
+                }
+                audio_data.packet
+            }
+        }
+    }
+}
+
+fn serialize_protobuf_message<T>(message: T, packet_type: u16) -> Vec<u8>
+where
+    T: ProtobufMessage,
+{
+    let bytes = message.write_to_bytes().unwrap();
+    return packet_type
+        .to_be_bytes()
+        .iter()
+        .cloned()
+        .chain((bytes.len() as u32).to_be_bytes().iter().cloned())
+        .chain(bytes.into_iter())
+        .collect();
+}
+
+fn decode_header(header: u8) -> (u8, u8) {
+    let packet_type = header >> 5;
+    let target = header & 0b0001_1111;
+    (packet_type, target)
+}
+
+fn encode_varint(number: u64) -> Vec<u8> {
+    let mut result = vec![];
+
+    if number < 0x80 {
+        //7-bit number
+        result.push(number as u8);
+    } else if number < 0x4000 {
+        //14-bit number
+        result.push(((number >> 8) | 0x80) as u8);
+        result.push((number & 0xFF) as u8);
+    } else if number < 0x200000 {
+        //21-bit number
+        result.push(((number >> 16) | 0xC0) as u8);
+        result.push(((number >> 8) & 0xFF) as u8);
+        result.push((number & 0xFF) as u8);
+    } else if number < 0x10000000 {
+        //28-bit number
+        result.push(((number >> 24) | 0xE0) as u8);
+        result.push(((number >> 16) & 0xFF) as u8);
+        result.push(((number >> 8) & 0xFF) as u8);
+        result.push((number & 0xFF) as u8);
+    } else if number < 0x100000000 {
+        //32-bit number
+        result.push(0xF0);
+        result.push(((number >> 24) & 0xFF) as u8);
+        result.push(((number >> 16) & 0xFF) as u8);
+        result.push(((number >> 8) & 0xFF) as u8);
+        result.push((number & 0xFF) as u8);
+    } else {
+        //64-bit number
+        result.push(0xF4);
+        result.push(((number >> 56) & 0xFF) as u8);
+        result.push(((number >> 48) & 0xFF) as u8);
+        result.push(((number >> 40) & 0xFF) as u8);
+        result.push(((number >> 32) & 0xFF) as u8);
+        result.push(((number >> 24) & 0xFF) as u8);
+        result.push(((number >> 16) & 0xFF) as u8);
+        result.push(((number >> 8) & 0xFF) as u8);
+        result.push((number & 0xFF) as u8);
+    }
+
+    result
+}
+
+impl Message for Authenticate {
+    fn serialize(self) -> Vec<u8> {
+        let proto = mumble::Authenticate {
+            username: SingularField::from(self.username),
+            password: SingularField::from(self.password),
+            ..Default::default()
+        };
+        serialize_protobuf_message(proto, AUTHENTICATE)
+    }
+}
+
+impl Message for ChannelState {
+    fn serialize(self) -> Vec<u8> {
+        let proto = mumble::ChannelState {
+            channel_id: self.id,
+            name: SingularField::from(self.name),
+            ..Default::default()
+        };
+        serialize_protobuf_message(proto, CHANNEL_STATE)
+    }
+}
+
+impl Message for CodecVersion {
+    fn serialize(self) -> Vec<u8> {
+        let proto = mumble::CodecVersion {
+            alpha: Some(self.celt_alpha_version),
+            beta: Some(self.celt_beta_version),
+            prefer_alpha: Some(self.prefer_alpha),
+            opus: Some(self.opus_support),
+            ..Default::default()
+        };
+
+        serialize_protobuf_message(proto, CODEC_VERSION)
+    }
+}
+
+impl Message for CryptSetup {
+    fn serialize(self) -> Vec<u8> {
+        let proto = mumble::CryptSetup {
+            key: SingularField::from(self.key),
+            client_nonce: SingularField::from(self.client_nonce),
+            server_nonce: SingularField::from(self.server_nonce),
+            ..Default::default()
+        };
+        serialize_protobuf_message(proto, CRYPT_SETUP)
+    }
+}
+
+impl Message for Ping {
+    fn serialize(self) -> Vec<u8> {
+        let proto = mumble::Ping {
+            timestamp: self.timestamp,
+            good: self.good,
+            late: self.late,
+            lost: self.lost,
+            resync: self.resyncs,
+            ..Default::default()
+        };
+        serialize_protobuf_message(proto, PING)
+    }
+}
+
+impl Message for ServerConfig {
+    fn serialize(self) -> Vec<u8> {
+        let proto = mumble::ServerConfig {
+            max_users: Some(self.max_users),
+            message_length: Some(self.max_message_length),
+            ..Default::default()
+        };
+        serialize_protobuf_message(proto, SERVER_CONFIG)
+    }
+}
+
+impl Message for ServerSync {
+    fn serialize(self) -> Vec<u8> {
+        let proto = mumble::ServerSync {
+            session: Some(u32::from(self.user_session_id)),
+            max_bandwidth: Some(self.max_bandwidth),
+            welcome_text: SingularField::some(self.welcome_text),
+            ..Default::default()
+        };
+        serialize_protobuf_message(proto, SERVER_SYNC)
+    }
+}
+
+impl Message for UserRemove {
+    fn serialize(self) -> Vec<u8> {
+        let proto = mumble::UserRemove {
+            session: Some(u32::from(self.session_id)),
+            ..Default::default()
+        };
+        serialize_protobuf_message(proto, USER_REMOVE)
+    }
+}
+
+impl Message for UserState {
+    fn serialize(self) -> Vec<u8> {
+        let mut proto = mumble::UserState {
+            name: SingularField::from(self.name),
+            channel_id: self.channel_id,
+            ..Default::default()
+        };
+        if let Some(session) = self.session_id {
+            proto.session = Some(u32::from(session));
+        }
+
+        serialize_protobuf_message(proto, USER_STATE)
+    }
+}
+
+impl Message for UdpTunnel {
+    fn serialize(self) -> Vec<u8> {
+        let bytes = self.audio_packet.serialize();
+        return UDP_TUNNEL
+            .to_be_bytes()
+            .iter()
+            .cloned()
+            .chain((bytes.len() as u32).to_be_bytes().iter().cloned())
+            .chain(bytes)
+            .collect();
+    }
+}
+
+impl Message for Version {
+    fn serialize(self) -> Vec<u8> {
+        let mut proto = mumble::Version::new();
+        if let Some(version) = self.version {
+            proto.version = Some(u32::from(version))
+        }
+
+        serialize_protobuf_message(proto, VERSION)
+    }
+}
+
+impl From<Authenticate> for ControlMessage {
+    fn from(auth: Authenticate) -> Self {
+        ControlMessage::Authenticate(auth)
+    }
+}
+
+impl From<ChannelState> for ControlMessage {
+    fn from(state: ChannelState) -> Self {
+        ControlMessage::ChannelState(state)
+    }
+}
+
+impl From<CodecVersion> for ControlMessage {
+    fn from(codec_version: CodecVersion) -> Self {
+        ControlMessage::CodecVersion(codec_version)
+    }
+}
+
+impl From<CryptSetup> for ControlMessage {
+    fn from(crypt: CryptSetup) -> Self {
+        ControlMessage::CryptSetup(crypt)
+    }
+}
+
+impl From<Ping> for ControlMessage {
+    fn from(ping: Ping) -> Self {
+        ControlMessage::Ping(ping)
+    }
+}
+
+impl From<ServerConfig> for ControlMessage {
+    fn from(config: ServerConfig) -> Self {
+        ControlMessage::ServerConfig(config)
+    }
+}
+
+impl From<ServerSync> for ControlMessage {
+    fn from(sync: ServerSync) -> Self {
+        ControlMessage::ServerSync(sync)
+    }
+}
+
+impl From<UserRemove> for ControlMessage {
+    fn from(remove: UserRemove) -> Self {
+        ControlMessage::UserRemove(remove)
+    }
+}
+
+impl From<UserState> for ControlMessage {
+    fn from(state: UserState) -> Self {
+        ControlMessage::UserState(state)
+    }
+}
+
+impl From<UdpTunnel> for ControlMessage {
+    fn from(tunnel: UdpTunnel) -> Self {
+        ControlMessage::UdpTunnel(tunnel)
+    }
+}
+
+impl From<Version> for ControlMessage {
+    fn from(version: Version) -> Self {
+        ControlMessage::Version(version)
+    }
+}
+
+impl From<mumble::Authenticate> for Authenticate {
+    fn from(auth: mumble::Authenticate) -> Self {
+        Authenticate {
+            username: auth.username.into_option(),
+            password: auth.password.into_option(),
+        }
+    }
+}
+
+impl From<mumble::CryptSetup> for CryptSetup {
+    fn from(crypt: mumble::CryptSetup) -> Self {
+        CryptSetup {
+            key: crypt.key.into_option(),
+            client_nonce: crypt.client_nonce.into_option(),
+            server_nonce: crypt.server_nonce.into_option(),
+        }
+    }
+}
+
+impl From<mumble::Ping> for Ping {
+    fn from(ping: mumble::Ping) -> Self {
+        Ping {
+            timestamp: ping.timestamp,
+            good: ping.good,
+            late: ping.resync,
+            lost: ping.lost,
+            resyncs: ping.resync,
+        }
+    }
+}
+
+impl From<mumble::Version> for Version {
+    fn from(version: mumble::Version) -> Self {
+        let mut protocol_version = None;
+        if let Some(version_number) = version.version {
+            protocol_version = Some(ProtocolVersion::from(version_number))
+        }
+        Version {
+            version: protocol_version,
+        }
+    }
+}
+
+impl From<AudioPacket> for UdpTunnel {
+    fn from(packet: AudioPacket) -> Self {
+        UdpTunnel {
+            audio_packet: packet,
+        }
+    }
+}
+
+impl From<ProtocolVersion> for u32 {
+    fn from(version: ProtocolVersion) -> Self {
+        ((version.major as u32) << 16) | ((version.minor as u32) << 8) | (version.patch as u32)
+    }
+}
+
+impl From<u32> for ProtocolVersion {
+    fn from(encoded: u32) -> Self {
+        ProtocolVersion {
+            major: ((encoded & 0xFFFF0000) >> 16) as u16,
+            minor: ((encoded & 0xFF00) >> 8) as u8,
+            patch: (encoded & 0xFF) as u8,
+        }
+    }
+}
+
+impl From<u32> for SessionId {
+    fn from(session_id: u32) -> Self {
+        if let Some(non_zero) = NonZeroU32::new(session_id) {
+            SessionId::User(non_zero)
+        } else {
+            SessionId::SuperUser
+        }
+    }
+}
+
+impl From<SessionId> for u32 {
+    fn from(session_id: SessionId) -> Self {
+        match session_id {
+            SessionId::SuperUser => 0,
+            SessionId::User(id) => id.into(),
+        }
+    }
+}
+
+impl From<ProtobufError> for ParsingError {
+    fn from(_: ProtobufError) -> Self {
+        ParsingError::MalformedInput
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    #[test]
+    fn test_protocol_version_conversion() {
+        let encoded = 0x10302;
+        let version = ProtocolVersion {
+            major: 1,
+            minor: 3,
+            patch: 2,
+        };
+        assert_eq!(encoded, u32::from(version.clone()));
+        assert_eq!(ProtocolVersion::from(encoded), version);
+    }
+
+    #[test]
+    fn test_serialize_protobuf_message() {
+        let version = mumble::Version {
+            version: Some(12345),
+            release: SingularField::some("release".to_owned()),
+            os: SingularField::some("os".to_owned()),
+            os_version: SingularField::some("os_version".to_owned()),
+            ..Default::default()
+        };
+        let version_serialized = vec![
+            0x0, 0x0, 0x0, 0x0, 0x0, 0x1c, 0x08, 0xb9, 0x60, 0x12, 0x07, 0x72, 0x65, 0x6c, 0x65,
+            0x61, 0x73, 0x65, 0x1a, 0x02, 0x6f, 0x73, 0x22, 0x0a, 0x6f, 0x73, 0x5f, 0x76, 0x65,
+            0x72, 0x73, 0x69, 0x6f, 0x6e,
+        ];
+
+        let user_state = mumble::UserState {
+            session: Some(42),
+            mute: Some(false),
+            ..Default::default()
+        };
+        let user_state_serialized = vec![0x0, 0x9, 0x0, 0x0, 0x0, 0x4, 0x08, 0x2a, 0x30, 0x00];
+
+        let ping = mumble::Ping {
+            timestamp: Some(123456789),
+            ..Default::default()
+        };
+        let ping_serialized = vec![0x0, 0x3, 0x0, 0x0, 0x0, 0x5, 0x8, 0x95, 0x9a, 0xef, 0x3a];
+
+        assert_eq!(
+            serialize_protobuf_message(version, VERSION),
+            version_serialized
+        );
+        assert_eq!(
+            serialize_protobuf_message(user_state, USER_STATE),
+            user_state_serialized
+        );
+        assert_eq!(serialize_protobuf_message(ping, PING), ping_serialized);
+    }
+
+    #[test]
+    fn test_decode_header() {
+        assert_eq!(decode_header(0b0100_1000), (2, 8));
+        assert_eq!(decode_header(0b0111_1111), (3, 31));
+        assert_eq!(decode_header(0b1000_0000), (4, 0));
+    }
+
+    #[test]
+    fn test_encode_varint() {
+        let varint_7bit_positive = vec![0b0000_1000];
+        let varint_14bit_positive = vec![0b1010_0010, 0b0000_0011];
+        let varint_21bit_positive = vec![0b1101_0100, 0b0000_0000, 0b0000_0000];
+        let varint_28bit_positive = vec![0b1110_1100, 0b0100_0000, 0b0010_0000, 0b0000_0001];
+        let varint_32bit_positive = vec![
+            0b1111_0000,
+            0b1100_0000,
+            0b0000_0000,
+            0b0000_0000,
+            0b0000_0001,
+        ];
+        let varint_64bit_positive = vec![
+            0b1111_0100,
+            0b1100_0000,
+            0b0000_0000,
+            0b0000_0000,
+            0b0000_0001,
+            0b0000_0000,
+            0b0000_0000,
+            0b0000_0000,
+            0b0001_0000,
+        ];
+
+        assert_eq!(encode_varint(0x8), varint_7bit_positive);
+        assert_eq!(encode_varint(0x2203), varint_14bit_positive);
+        assert_eq!(encode_varint(0x140000), varint_21bit_positive);
+        assert_eq!(encode_varint(0xc402001), varint_28bit_positive);
+        assert_eq!(encode_varint(0xc0000001), varint_32bit_positive);
+        assert_eq!(encode_varint(0xc000000100000010), varint_64bit_positive);
+    }
+}

+ 0 - 308
src/server.rs

@@ -1,308 +0,0 @@
-use std::collections::HashMap;
-use std::net::{IpAddr, SocketAddr};
-use std::sync::Arc;
-
-use tokio::net::{TcpListener, TcpStream, UdpSocket};
-use tokio::sync::{mpsc, Mutex, RwLock};
-use tokio_rustls::rustls::{Certificate, NoClientAuth, PrivateKey, ServerConfig};
-use tokio_rustls::{TlsAcceptor, TlsStream};
-
-use crate::client::{Client, ClientEvent, ServerEvent};
-use crate::connection::{AudioChannel, ControlChannel};
-use crate::crypto::Ocb2Aes128Crypto;
-use crate::db::Db;
-use crate::protocol::AudioData;
-use rand::prelude::StdRng;
-use rand::{Rng, SeedableRng};
-
-use tokio::sync::mpsc::{Receiver, Sender};
-
-use log::{error, info, warn};
-
-pub const MAX_UDP_DATAGRAM_SIZE: usize = 1024;
-
-pub struct Config {
-    pub ip_address: IpAddr,
-    pub port: u16,
-    pub certificate: Certificate,
-    pub private_key: PrivateKey,
-    pub path_to_db_file: String,
-}
-
-pub struct Server {
-    config: Config,
-    db: Arc<Db>,
-    clients: RwLock<HashMap<SessionId, Client>>,
-    waiting_for_audio_channel: Mutex<Vec<(SessionId, IpAddr, Ocb2Aes128Crypto)>>,
-    address_to_channel: RwLock<HashMap<SocketAddr, Sender<Vec<u8>>>>,
-}
-
-type SessionId = u32;
-
-impl Server {
-    pub fn new(config: Config) -> Arc<Self> {
-        let path_to_db_file = config.path_to_db_file.clone();
-
-        Arc::new(Server {
-            config,
-            clients: RwLock::new(HashMap::new()),
-            db: Arc::new(Db::open(&path_to_db_file)),
-            waiting_for_audio_channel: Mutex::new(vec![]),
-            address_to_channel: RwLock::new(HashMap::new()),
-        })
-    }
-
-    pub async fn run(self: Arc<Self>) {
-        let mut tls_config = ServerConfig::new(NoClientAuth::new());
-        let result = tls_config.set_single_cert(
-            vec![self.config.certificate.clone()],
-            self.config.private_key.clone(),
-        );
-        if let Err(err) = result {
-            error!("{}", err);
-            panic!();
-        }
-
-        let socket_address = SocketAddr::new(self.config.ip_address, self.config.port);
-        let tls_acceptor = TlsAcceptor::from(Arc::new(tls_config));
-        let tcp_listener = match TcpListener::bind(socket_address).await {
-            Ok(listener) => listener,
-            Err(_) => {
-                error!("Couldn't bind tcp socket to {}", socket_address);
-                panic!();
-            }
-        };
-        let udp_socket = match UdpSocket::bind(socket_address).await {
-            Ok(socket) => socket,
-            Err(_) => {
-                error!("Couldn't bind udp socket to {}", socket_address);
-                panic!();
-            }
-        };
-        info!("Server listening on {}", socket_address);
-
-        Arc::clone(&self).run_udp_task(udp_socket).await;
-        Arc::clone(&self)
-            .listen_for_new_connections(tcp_listener, tls_acceptor)
-            .await;
-    }
-
-    async fn run_udp_task(self: Arc<Self>, socket: UdpSocket) {
-        let socket = Arc::new(socket);
-        tokio::spawn(async move {
-            let mut buf = [0; MAX_UDP_DATAGRAM_SIZE];
-            loop {
-                if let Ok((len, socket_address)) = socket.recv_from(&mut buf).await {
-                    if !Arc::clone(&self)
-                        .send_to_audio_channel(&buf[..len], &socket_address)
-                        .await
-                    {
-                        // TODO Move to a separate task
-                        Arc::clone(&self)
-                            .match_address_to_channel(
-                                &buf[..len],
-                                socket_address,
-                                Arc::clone(&socket),
-                            )
-                            .await;
-                    }
-                }
-            }
-        });
-    }
-
-    async fn send_to_audio_channel(self: &Arc<Self>, buf: &[u8], address: &SocketAddr) -> bool {
-        let connected = self.address_to_channel.read().await;
-        if let Some(sender) = connected.get(address) {
-            sender.send(Vec::from(buf)).await;
-            return true;
-        }
-
-        false
-    }
-
-    async fn match_address_to_channel(
-        self: &Arc<Self>,
-        buf: &[u8],
-        address: SocketAddr,
-        udp_socket: Arc<UdpSocket>,
-    ) {
-        let mut waiting = self.waiting_for_audio_channel.lock().await;
-        let index = match waiting
-            .iter_mut()
-            .position(|(_, ip, crypto)| &address.ip() == ip && crypto.decrypt(buf).is_ok())
-        {
-            Some(index) => index,
-            None => return,
-        };
-        let (session_id, _, crypto) = waiting.remove(index);
-        drop(waiting);
-
-        let (sender, receiver) = mpsc::channel(1);
-        let mut clients = self.clients.write().await;
-        if let Some(client) = clients.get_mut(&session_id) {
-            let audio_channel = AudioChannel::new(receiver, udp_socket, crypto, address);
-            client.set_audio_channel(audio_channel).await;
-        }
-        drop(clients);
-
-        let mut address_to_channel = self.address_to_channel.write().await;
-        address_to_channel.insert(address, sender);
-    }
-
-    async fn listen_for_new_connections(
-        self: Arc<Self>,
-        listener: TcpListener,
-        acceptor: TlsAcceptor,
-    ) {
-        loop {
-            let (stream, _) = match listener.accept().await {
-                Ok(stream) => stream,
-                Err(_) => continue,
-            };
-            let acceptor = acceptor.clone();
-            let server = Arc::clone(&self);
-
-            tokio::spawn(async move {
-                let stream = acceptor.accept(stream).await;
-                if let Ok(stream) = stream {
-                    server.process_new_connection(TlsStream::from(stream)).await;
-                }
-            });
-        }
-    }
-
-    async fn process_new_connection(self: Arc<Self>, stream: TlsStream<TcpStream>) {
-        let address = stream.get_ref().0.peer_addr().unwrap();
-        info!("New connection: {}", address);
-
-        let (session_id, mut responder) = match self.new_client(stream).await {
-            Ok(id) => {
-                info!("Connection established successfully {}", address);
-                id
-            }
-            Err(_) => {
-                info!("Failed to establish connection {}", address);
-                return;
-            }
-        };
-
-        loop {
-            let message = match responder.recv().await {
-                Some(msg) => msg,
-                None => {
-                    warn!("Connection closed unexpectedly");
-                    return;
-                }
-            };
-
-            match message {
-                ClientEvent::Disconnected => {
-                    self.client_disconnected(session_id).await;
-                    info!("Disconnected {}", address);
-                    return;
-                }
-                ClientEvent::Talking(audio_data) => {
-                    self.client_talking(session_id, audio_data).await;
-                }
-            }
-        }
-    }
-
-    async fn client_disconnected(&self, session_id: SessionId) {
-        let mut clients = self.clients.write().await;
-        clients.remove(&session_id);
-        for client in clients.values() {
-            client
-                .send_event(ServerEvent::UserDisconnected(session_id))
-                .await;
-        }
-        drop(clients);
-
-        //TODO optimize
-        let mut waiting = self.waiting_for_audio_channel.lock().await;
-        if let Some(index) = waiting.iter().position(|(id, _, _)| session_id == *id) {
-            waiting.remove(index);
-        } else {
-            drop(waiting);
-
-            let mut address_to_channel = self.address_to_channel.write().await;
-            if let Some(key) = address_to_channel
-                .keys()
-                .find(|key| address_to_channel.get(key).unwrap().is_closed())
-                .cloned()
-            {
-                address_to_channel.remove(&key);
-            }
-        }
-    }
-
-    async fn client_talking(&self, session_id: SessionId, audio: AudioData) {
-        let clients = self.clients.read().await;
-        for client in clients
-            .values()
-            .filter(|client| client.session_id != session_id)
-        {
-            client
-                .send_event(ServerEvent::UserTalking(audio.clone()))
-                .await;
-        }
-    }
-
-    async fn new_client(
-        self: &Arc<Self>,
-        stream: TlsStream<TcpStream>,
-    ) -> Result<(SessionId, Receiver<ClientEvent>), crate::client::Error> {
-        let ip = stream.get_ref().0.peer_addr().unwrap().ip();
-        let config = self.create_client_config();
-        let crypto =
-            Ocb2Aes128Crypto::new(config.crypto_key, config.server_nonce, config.client_nonce);
-        let (client, receiver) =
-            Client::establish_connection(Arc::clone(&self.db), ControlChannel::new(stream), config)
-                .await?;
-
-        let session_id = client.session_id;
-        let mut clients = self.clients.write().await;
-        for client in clients.values() {
-            client
-                .send_event(ServerEvent::UserConnected(session_id))
-                .await;
-        }
-        clients.insert(session_id, client);
-        drop(clients);
-
-        let mut waiting = self.waiting_for_audio_channel.lock().await;
-        waiting.push((session_id, ip, crypto));
-        drop(waiting);
-
-        Ok((session_id, receiver))
-    }
-
-    fn create_client_config(&self) -> crate::client::Config {
-        let crypto_key = self.generate_key();
-        let server_nonce = self.generate_key();
-        let client_nonce = self.generate_key();
-        crate::client::Config {
-            crypto_key,
-            server_nonce,
-            client_nonce,
-            alpha_codec_version: 0,
-            beta_codec_version: 0,
-            prefer_alpha: true,
-            opus_support: true,
-            welcome_text: "Welcome".to_string(),
-            max_bandwidth: 128000,
-            max_users: 10,
-            allow_html: true,
-            max_message_length: 512,
-            max_image_message_length: 100000,
-        }
-    }
-
-    fn generate_key(&self) -> [u8; 16] {
-        let mut buffer = [0; 16];
-        let mut rng = StdRng::from_entropy();
-        rng.fill(&mut buffer);
-        buffer
-    }
-}

+ 146 - 0
src/server/client/client.rs

@@ -0,0 +1,146 @@
+use crate::protocol::connection::{AudioChannel, ControlChannel};
+use crate::protocol::parser::{AudioData, AudioPacket};
+use crate::server::client::handler::{Config, Error, Handler};
+use crate::storage::Storage;
+use std::marker::PhantomData;
+use std::sync::Arc;
+use tokio::sync::mpsc;
+use tokio::sync::mpsc::{Receiver, Sender};
+use tokio::task::JoinHandle;
+
+pub struct Client<C: ControlChannel, A: AudioChannel> {
+    event_sender: Sender<ServerEvent>,
+    audio_channel_sender: Sender<A>,
+    task: JoinHandle<()>,
+    control_channel_type: PhantomData<C>,
+    audio_channel_type: PhantomData<A>,
+}
+
+pub enum ClientEvent {
+    Talking(AudioData),
+    Disconnected,
+}
+
+pub enum ServerEvent {
+    Connected(u32),
+    Talking(AudioData),
+    Disconnected(u32),
+}
+
+impl<C: ControlChannel + 'static, A: AudioChannel + 'static> Client<C, A> {
+    pub async fn setup_connection(
+        session_id: u32,
+        storage: Arc<Storage>,
+        control_channel: C,
+        config: Config,
+    ) -> Result<(Self, Receiver<ClientEvent>), Error> {
+        let control_channel = Arc::new(control_channel);
+        let (event_sender, event_receiver) = mpsc::channel(1);
+        let (server_event_sender, server_event_receiver) = mpsc::channel(1);
+        let (audio_sender, audio_receiver) = mpsc::channel(1);
+        let handler: Handler<C, A> = Handler::new(
+            storage,
+            Arc::clone(&control_channel),
+            event_sender,
+            session_id,
+            config,
+        );
+        handler.handle_new_connection().await?;
+        let client = Client {
+            event_sender: server_event_sender,
+            audio_channel_sender: audio_sender,
+            task: Self::run_handler_loop(
+                handler,
+                control_channel,
+                server_event_receiver,
+                audio_receiver,
+            )
+            .await,
+            control_channel_type: Default::default(),
+            audio_channel_type: Default::default(),
+        };
+        Ok((client, event_receiver))
+    }
+
+    pub async fn send_event(&self, event: ServerEvent) {
+        self.event_sender.send(event).await;
+    }
+
+    pub async fn set_audio_channel(&mut self, channel: A) {
+        self.audio_channel_sender.send(channel).await;
+    }
+
+    async fn run_handler_loop(
+        mut handler: Handler<C, A>,
+        control_channel: Arc<C>,
+        mut event_receiver: Receiver<ServerEvent>,
+        mut channel_receiver: Receiver<A>,
+    ) -> JoinHandle<()> {
+        // TODO cleaner solution
+        tokio::spawn(async move {
+            let mut audio_channel: Option<Arc<A>> = None;
+            let msg_recv_fut = control_channel.receive();
+            let audio_recv_fut = Self::recv(audio_channel.clone());
+            tokio::pin!(msg_recv_fut, audio_recv_fut);
+
+            loop {
+                tokio::select! {
+                    result = &mut msg_recv_fut => {
+                        match result {
+                            Ok(msg) => {
+                                msg_recv_fut.set(control_channel.receive());
+                                handler.handle_message(msg).await;
+                            }
+                            Err(crate::protocol::connection::Error::Parsing(_)) => {
+                                // TODO
+                                // Ignore for now
+                                msg_recv_fut.set(control_channel.receive());
+                            }
+                            Err(_) => {
+                                handler.self_disconnected().await;
+                                break;
+                            }
+                        }
+                    }
+                    Some(event) = event_receiver.recv() => {
+                        handler.handle_server_event(event).await;
+                    }
+                    Some(channel) = channel_receiver.recv() => {
+                        let channel = Arc::new(channel);
+                        handler.set_audio_channel(Arc::clone(&channel));
+                        audio_channel = Some(channel);
+                        audio_recv_fut.set(Self::recv(audio_channel.clone()));
+                    }
+                    Some(result) = &mut audio_recv_fut => {
+                        match result {
+                            Ok(packet) => {
+                                audio_recv_fut.set(Self::recv(audio_channel.clone()));
+                                handler.handle_audio_packet(packet).await;
+                            }
+                            Err(_) => {
+                                handler.self_disconnected().await;
+                                break;
+                            }
+                        }
+                    }
+                };
+            }
+        })
+    }
+
+    async fn recv(
+        audio_channel: Option<Arc<A>>,
+    ) -> Option<Result<AudioPacket, crate::protocol::connection::Error>> {
+        if let Some(channel) = audio_channel {
+            Some(channel.receive().await)
+        } else {
+            std::future::pending().await
+        }
+    }
+}
+
+impl<C: ControlChannel, A: AudioChannel> Drop for Client<C, A> {
+    fn drop(&mut self) {
+        self.task.abort();
+    }
+}

+ 362 - 0
src/server/client/handler.rs

@@ -0,0 +1,362 @@
+use crate::protocol::connection::{AudioChannel, ControlChannel};
+use crate::protocol::parser::{
+    AudioData, AudioPacket, Authenticate, ChannelState, CodecVersion, ControlMessage, CryptSetup,
+    Ping, ServerConfig, ServerSync, SessionId, UdpTunnel, UserRemove, UserState, Version,
+    MUMBLE_PROTOCOL_VERSION,
+};
+use crate::server::client::client::{ClientEvent, ServerEvent};
+use crate::storage::{Guest, Storage};
+use log::error;
+use ring::pbkdf2;
+use std::num::NonZeroU32;
+use std::sync::Arc;
+use tokio::sync::mpsc::Sender;
+
+static PBKDF2_ALGORITHM: pbkdf2::Algorithm = pbkdf2::PBKDF2_HMAC_SHA256;
+
+type Key = [u8; 16];
+type Nonce = [u8; 16];
+
+pub struct Handler<C: ControlChannel, A: AudioChannel> {
+    storage: Arc<Storage>,
+    control_channel: Arc<C>,
+    audio_channel: Option<Arc<A>>,
+    event_sender: Sender<ClientEvent>,
+    config: Config,
+    session_id: u32,
+    crypto_resyncs: u32,
+}
+
+pub struct Config {
+    pub crypto_key: Key,
+    pub server_nonce: Nonce,
+    pub client_nonce: Nonce,
+    pub alpha_codec_version: i32,
+    pub beta_codec_version: i32,
+    pub prefer_alpha: bool,
+    pub opus_support: bool,
+    pub welcome_text: String,
+    pub max_bandwidth: u32,
+    pub max_users: u32,
+    pub allow_html: bool,
+    pub max_message_length: u32,
+    pub max_image_message_length: u32,
+    pub max_username_length: u32,
+    pub min_compatible_version: u32,
+    pub server_password: Option<String>,
+    pub pbkdf2_iterations: NonZeroU32,
+}
+
+pub enum Error {
+    IO(std::io::Error),
+    PacketParsing(crate::protocol::parser::ParsingError),
+    Reject(Reject),
+    WrongPacket,
+}
+
+pub enum Reject {
+    InvalidUsername,
+    UsernameInUse,
+    WrongVersion,
+    WrongUserPassword,
+    WrongServerPassword,
+    NoCertificate,
+}
+
+impl<C: ControlChannel, A: AudioChannel> Handler<C, A> {
+    pub fn new(
+        storage: Arc<Storage>,
+        control_channel: Arc<C>,
+        event_sender: Sender<ClientEvent>,
+        session_id: u32,
+        config: Config,
+    ) -> Self {
+        Handler {
+            storage,
+            control_channel,
+            audio_channel: None,
+            event_sender,
+            session_id,
+            crypto_resyncs: 0,
+            config,
+        }
+    }
+}
+
+impl<C: ControlChannel, A: AudioChannel> Handler<C, A> {
+    pub fn set_audio_channel(&mut self, channel: Arc<A>) {
+        self.audio_channel = Some(channel);
+    }
+
+    pub async fn handle_new_connection(&self) -> Result<(), Error> {
+        match self.control_channel.receive().await? {
+            ControlMessage::Version(_) => {
+                // TODO check version
+            }
+            _ => return Err(Error::WrongPacket),
+        };
+        // TODO
+        let auth = match self.control_channel.receive().await? {
+            ControlMessage::Authenticate(auth) => auth,
+            _ => return Err(Error::WrongPacket),
+        };
+        self.authenticate(auth).await?;
+
+        let version = Version {
+            version: Some(MUMBLE_PROTOCOL_VERSION),
+        };
+        let crypt_setup = CryptSetup {
+            key: Some(Vec::from(self.config.crypto_key)),
+            client_nonce: Some(Vec::from(self.config.client_nonce)),
+            server_nonce: Some(Vec::from(self.config.server_nonce)),
+        };
+        let channel_states: Vec<ChannelState> = self
+            .storage
+            .get_channels()
+            .into_iter()
+            .map(|channel| ChannelState {
+                id: Some(channel.id),
+                name: Some(channel.name),
+            })
+            .collect();
+        let user_states: Vec<UserState> = self.get_user_states();
+        let codec_version = CodecVersion {
+            celt_alpha_version: self.config.alpha_codec_version,
+            celt_beta_version: self.config.beta_codec_version,
+            prefer_alpha: self.config.prefer_alpha,
+            opus_support: self.config.opus_support,
+        };
+        let server_sync = ServerSync {
+            user_session_id: SessionId::from(self.session_id),
+            max_bandwidth: self.config.max_bandwidth,
+            welcome_text: self.config.welcome_text.clone(),
+        };
+        let server_config = ServerConfig {
+            max_users: self.config.max_users,
+            max_message_length: self.config.max_message_length,
+        };
+
+        self.control_channel.send(version).await?;
+        self.control_channel.send(crypt_setup).await?;
+        for channel_state in channel_states.into_iter() {
+            self.control_channel.send(channel_state).await?;
+        }
+        for user_state in user_states.into_iter() {
+            self.control_channel.send(user_state).await?;
+        }
+        self.control_channel.send(codec_version).await?;
+        self.control_channel.send(server_sync).await?;
+        self.control_channel.send(server_config).await?;
+
+        Ok(())
+    }
+
+    pub async fn handle_server_event(&self, event: ServerEvent) -> Result<(), Error> {
+        match event {
+            ServerEvent::Connected(session_id) => self.new_user_connected(session_id).await?,
+            ServerEvent::Disconnected(session_id) => self.disconnected(session_id).await?,
+            ServerEvent::Talking(audio_data) => self.talking(audio_data).await?,
+        }
+
+        Ok(())
+    }
+
+    pub async fn handle_message(&self, packet: ControlMessage) -> Result<(), Error> {
+        match packet {
+            ControlMessage::Ping(ping) => self.control_ping(ping).await?,
+            ControlMessage::UdpTunnel(tunnel) => self.tunnel(tunnel).await?,
+            _ => error!("unimplemented!"),
+        }
+        Ok(())
+    }
+
+    pub async fn handle_audio_packet(&self, packet: AudioPacket) -> Result<(), Error> {
+        match packet {
+            AudioPacket::Ping(_) => {
+                self.audio_channel.as_ref().unwrap().send(packet).await;
+            }
+            AudioPacket::AudioData(mut audio_data) => {
+                audio_data.session_id = Some(SessionId::from(self.session_id));
+                self.event_sender
+                    .send(ClientEvent::Talking(audio_data))
+                    .await;
+            }
+        }
+
+        Ok(())
+    }
+
+    pub async fn self_disconnected(&self) {
+        self.storage.remove_by_session_id(self.session_id);
+        self.event_sender.send(ClientEvent::Disconnected).await;
+    }
+
+    // Control packets
+    async fn control_ping(&self, incoming: Ping) -> Result<(), Error> {
+        let mut ping = Ping {
+            timestamp: incoming.timestamp,
+            good: None,
+            late: None,
+            lost: None,
+            resyncs: None,
+        };
+        if let Some(channel) = self.audio_channel.as_ref() {
+            let stats = channel.get_stats();
+            ping.good = Some(stats.good);
+            ping.late = Some(stats.late);
+            ping.lost = Some(stats.lost);
+            ping.resyncs = Some(self.crypto_resyncs);
+        }
+
+        self.control_channel.send(ping).await?;
+        Ok(())
+    }
+
+    async fn tunnel(&self, tunnel: UdpTunnel) -> Result<(), Error> {
+        match tunnel.audio_packet {
+            AudioPacket::Ping(_) => {
+                self.control_channel
+                    .send(UdpTunnel::from(tunnel.audio_packet))
+                    .await?;
+            }
+            AudioPacket::AudioData(mut audio_data) => {
+                audio_data.session_id = Some(SessionId::from(self.session_id));
+                self.event_sender
+                    .send(ClientEvent::Talking(audio_data))
+                    .await;
+            }
+        }
+
+        Ok(())
+    }
+
+    // Server events
+    async fn new_user_connected(&self, session_id: u32) -> Result<(), Error> {
+        let id = Some(SessionId::from(session_id));
+        if let Some(user) = self.storage.get_connected_user(session_id) {
+            self.control_channel
+                .send(UserState {
+                    session_id: id,
+                    name: Some(user.username),
+                    channel_id: Some(user.channel_id),
+                })
+                .await?;
+        } else if let Some(guest) = self.storage.get_guest(session_id) {
+            self.control_channel
+                .send(UserState {
+                    session_id: id,
+                    name: Some(guest.username),
+                    channel_id: Some(guest.channel_id),
+                })
+                .await?;
+        }
+
+        Ok(())
+    }
+
+    async fn talking(&self, audio_data: AudioData) -> Result<(), Error> {
+        let audio_packet = AudioPacket::AudioData(audio_data);
+        if let Some(channel) = self.audio_channel.as_ref() {
+            channel.send(audio_packet).await?;
+        } else {
+            self.control_channel
+                .send(UdpTunnel::from(audio_packet))
+                .await?;
+        }
+
+        Ok(())
+    }
+
+    async fn disconnected(&self, session_id: u32) -> Result<(), Error> {
+        let user_remove = UserRemove {
+            session_id: session_id.into(),
+        };
+        Ok(self.control_channel.send(user_remove).await?)
+    }
+
+    // Utils
+    async fn authenticate(&self, auth: Authenticate) -> Result<(), Error> {
+        let username = match auth.username {
+            Some(username) => username,
+            None => return Err(Error::Reject(Reject::InvalidUsername)),
+        };
+        if !validate_username(&username, self.config.max_username_length as usize) {
+            return Err(Error::Reject(Reject::InvalidUsername));
+        }
+
+        if self.storage.username_in_connected(&username) {
+            return Err(Error::Reject(Reject::UsernameInUse));
+        }
+
+        let user = match self.storage.get_user_by_username(username.clone()) {
+            Some(user) => user,
+            None => {
+                self.storage
+                    .add_guest(Guest::new(username, self.session_id, 0));
+                return Ok(());
+            }
+        };
+
+        if let (Some(stored_password_hash), Some(iterations), Some(salt)) = (
+            &user.password_hash,
+            user.pbkdf2_iterations,
+            &user.password_salt,
+        ) {
+            let password = match auth.password {
+                Some(password) => password,
+                None => return Err(Error::Reject(Reject::WrongUserPassword)),
+            };
+            pbkdf2::verify(
+                PBKDF2_ALGORITHM,
+                iterations,
+                salt,
+                password.as_bytes(),
+                stored_password_hash,
+            )
+            .map_err(|_| Error::Reject(Reject::WrongUserPassword))?;
+        }
+
+        self.storage.add_connected_user(user, self.session_id);
+
+        Ok(())
+    }
+
+    fn get_user_states(&self) -> Vec<UserState> {
+        let guests = self.storage.get_guests();
+        let users = self.storage.get_connected_users();
+        let mut states = Vec::with_capacity(guests.len() + users.len());
+        for guest in guests {
+            let state = UserState {
+                session_id: Some(SessionId::from(guest.session_id)),
+                name: Some(guest.username),
+                channel_id: Some(guest.channel_id),
+            };
+            states.push(state);
+        }
+
+        for (session_id, user) in users {
+            let state = UserState {
+                session_id: Some(SessionId::from(session_id)),
+                name: Some(user.username),
+                channel_id: Some(user.channel_id),
+            };
+            states.push(state);
+        }
+        states
+    }
+}
+
+fn validate_username(username: &str, max_username_length: usize) -> bool {
+    !username.is_empty()
+        && username.trim().len() == username.len()
+        && username.len() <= max_username_length
+}
+
+impl From<crate::protocol::connection::Error> for Error {
+    fn from(err: crate::protocol::connection::Error) -> Self {
+        match err {
+            crate::protocol::connection::Error::IO(err) => Error::IO(err),
+            crate::protocol::connection::Error::Parsing(err) => Error::PacketParsing(err),
+        }
+    }
+}

+ 5 - 0
src/server/client/mod.rs

@@ -0,0 +1,5 @@
+mod client;
+mod handler;
+
+pub use self::client::{Client, ClientEvent, ServerEvent};
+pub use self::handler::{Config, Error};

+ 124 - 0
src/server/connection_worker.rs

@@ -0,0 +1,124 @@
+use crate::crypto::Ocb2Aes128Crypto;
+use crate::protocol::parser::AudioData;
+use crate::server::client::{Client, ClientEvent, Config, Error, ServerEvent};
+use crate::server::session_pool::{SessionId, SessionPool};
+use crate::server::tcp_control_channel::TcpControlChannel;
+use crate::server::udp_audio_channel::{UdpAudioChannel, UdpWorker};
+use crate::storage::Storage;
+use dashmap::DashMap;
+use log::info;
+use std::sync::Arc;
+use tokio::net::TcpStream;
+use tokio::sync::mpsc::Receiver;
+use tokio_rustls::server::TlsStream;
+
+pub struct ConnectionWorker {
+    session_id: SessionId,
+    session_pool: Arc<SessionPool>,
+    storage: Arc<Storage>,
+    clients: Arc<DashMap<SessionId, Client<TcpControlChannel, UdpAudioChannel>>>,
+}
+
+impl ConnectionWorker {
+    pub fn new(
+        session_pool: Arc<SessionPool>,
+        storage: Arc<Storage>,
+        clients: Arc<DashMap<SessionId, Client<TcpControlChannel, UdpAudioChannel>>>,
+    ) -> Self {
+        ConnectionWorker {
+            session_id: session_pool.pop(),
+            session_pool,
+            storage,
+            clients,
+        }
+    }
+
+    pub async fn start(self, stream: TlsStream<TcpStream>, config: Config, worker: Arc<UdpWorker>) {
+        tokio::spawn(async move {
+            let address = stream.get_ref().0.peer_addr().unwrap();
+            if self
+                .process_new_connection(stream, config, worker)
+                .await
+                .is_err()
+            {
+                info!("Failed to establish connection {}", address)
+            }
+        });
+    }
+
+    async fn process_new_connection(
+        self,
+        stream: TlsStream<TcpStream>,
+        config: Config,
+        worker: Arc<UdpWorker>,
+    ) -> Result<(), Error> {
+        let address = stream.get_ref().0.peer_addr().unwrap();
+        let crypto =
+            Ocb2Aes128Crypto::new(config.crypto_key, config.server_nonce, config.client_nonce);
+        let control_channel = TcpControlChannel::new(stream);
+        let (client, event_receiver) = Client::setup_connection(
+            self.session_id,
+            Arc::clone(&self.storage),
+            control_channel,
+            config,
+        )
+        .await?;
+        info!("Connection established successfully {}", address);
+
+        let session_id = self.session_id;
+        for client in self.clients.iter() {
+            client.send_event(ServerEvent::Connected(session_id)).await;
+        }
+        self.clients.insert(session_id, client);
+
+        let clients = Arc::clone(&self.clients);
+        let task = tokio::spawn(async move {
+            let audio_channel = worker.new_audio_channel(address.ip(), crypto).await;
+            if let Some(client) = clients.get_mut(&session_id).as_deref_mut() {
+                client.set_audio_channel(audio_channel).await;
+            }
+        });
+
+        self.event_loop(event_receiver).await;
+        task.abort();
+        info!("Disconnected {}", address);
+
+        Ok(())
+    }
+
+    async fn event_loop(&self, mut event_receiver: Receiver<ClientEvent>) {
+        loop {
+            let message = event_receiver.recv().await.unwrap();
+            match message {
+                ClientEvent::Disconnected => {
+                    self.client_disconnected().await;
+                    return;
+                }
+                ClientEvent::Talking(audio_data) => {
+                    self.client_talking(audio_data).await;
+                }
+            }
+        }
+    }
+
+    async fn client_disconnected(&self) {
+        self.clients.remove(&self.session_id);
+        for client in self.clients.iter() {
+            client
+                .send_event(ServerEvent::Disconnected(self.session_id))
+                .await;
+        }
+
+        self.session_pool.push(self.session_id);
+    }
+
+    async fn client_talking(&self, audio: AudioData) {
+        for client in self
+            .clients
+            .iter()
+            .filter(|el| *el.key() != self.session_id)
+        {
+            client.send_event(ServerEvent::Talking(audio.clone())).await;
+        }
+    }
+}

+ 8 - 0
src/server/mod.rs

@@ -0,0 +1,8 @@
+mod client;
+mod connection_worker;
+mod server;
+mod session_pool;
+mod tcp_control_channel;
+mod udp_audio_channel;
+
+pub use self::server::{Config, Server};

+ 138 - 0
src/server/server.rs

@@ -0,0 +1,138 @@
+use crate::server::client::{Client, Config as ClientConfig};
+use crate::server::connection_worker::ConnectionWorker;
+use crate::server::session_pool::SessionPool;
+use crate::server::tcp_control_channel::TcpControlChannel;
+use crate::server::udp_audio_channel::{UdpAudioChannel, UdpWorker};
+use crate::storage::Storage;
+use dashmap::DashMap;
+use log::{error, info};
+use rand::prelude::StdRng;
+use rand::{Rng, SeedableRng};
+use std::net::{IpAddr, SocketAddr};
+use std::num::NonZeroU32;
+use std::sync::Arc;
+use tokio::net::{TcpListener, UdpSocket};
+use tokio_rustls::rustls::{Certificate, NoClientAuth, PrivateKey, ServerConfig};
+use tokio_rustls::TlsAcceptor;
+
+pub struct Config {
+    pub ip_address: IpAddr,
+    pub port: u16,
+    pub certificate: Certificate,
+    pub private_key: PrivateKey,
+    pub path_to_db_file: String,
+}
+
+pub struct Server {
+    config: Config,
+    rng: StdRng,
+    storage: Arc<Storage>,
+    clients: Arc<DashMap<SessionId, Client<TcpControlChannel, UdpAudioChannel>>>,
+    session_pool: Arc<SessionPool>,
+}
+type SessionId = u32;
+
+impl Server {
+    pub fn new(config: Config) -> Self {
+        let storage = Storage::open(&config.path_to_db_file);
+
+        Server {
+            config,
+            rng: StdRng::from_entropy(),
+            storage: Arc::new(storage),
+            clients: Default::default(),
+            session_pool: Arc::new(SessionPool::new()),
+        }
+    }
+
+    pub async fn run(mut self) {
+        let mut tls_config = ServerConfig::new(NoClientAuth::new());
+        let result = tls_config.set_single_cert(
+            vec![self.config.certificate.clone()],
+            self.config.private_key.clone(),
+        );
+        if let Err(err) = result {
+            error!("{}", err);
+            panic!();
+        }
+
+        let socket_address = SocketAddr::new(self.config.ip_address, self.config.port);
+        let tls_acceptor = TlsAcceptor::from(Arc::new(tls_config));
+        let tcp_listener = match TcpListener::bind(socket_address).await {
+            Ok(listener) => listener,
+            Err(_) => {
+                error!("Couldn't bind tcp socket to {}", socket_address);
+                panic!();
+            }
+        };
+        let udp_socket = match UdpSocket::bind(socket_address).await {
+            Ok(socket) => socket,
+            Err(_) => {
+                error!("Couldn't bind udp socket to {}", socket_address);
+                panic!();
+            }
+        };
+        let udp_worker = Arc::new(UdpWorker::start(udp_socket).await);
+        info!("Server listening on {}", socket_address);
+
+        loop {
+            let (tcp_stream, _) = match tcp_listener.accept().await {
+                Ok(stream) => stream,
+                Err(err) => {
+                    info!("Tcp error: {}", err);
+                    continue;
+                }
+            };
+            let tls_stream = match tls_acceptor.accept(tcp_stream).await {
+                Ok(stream) => stream,
+                Err(err) => {
+                    info!("Tls error: {}", err);
+                    continue;
+                }
+            };
+            let worker = ConnectionWorker::new(
+                Arc::clone(&self.session_pool),
+                Arc::clone(&self.storage),
+                Arc::clone(&self.clients),
+            );
+            worker
+                .start(
+                    tls_stream,
+                    self.create_client_config(),
+                    Arc::clone(&udp_worker),
+                )
+                .await;
+        }
+    }
+
+    fn create_client_config(&mut self) -> ClientConfig {
+        let crypto_key = self.generate_key();
+        let server_nonce = self.generate_key();
+        let client_nonce = self.generate_key();
+        ClientConfig {
+            crypto_key,
+            server_nonce,
+            client_nonce,
+            alpha_codec_version: -2147483637,
+            beta_codec_version: -2147483632,
+            prefer_alpha: true,
+            opus_support: true,
+            welcome_text: "Welcome".to_string(),
+            max_bandwidth: 128000,
+            max_users: 10,
+            allow_html: true,
+            max_message_length: 512,
+            max_image_message_length: 100000,
+            max_username_length: 64,
+            min_compatible_version: 0x10200,
+            server_password: None,
+            pbkdf2_iterations: NonZeroU32::new(100_000).unwrap(),
+        }
+    }
+
+    fn generate_key(&mut self) -> [u8; 16] {
+        let mut buffer = [0; 16];
+        self.rng.fill(&mut buffer);
+        buffer
+    }
+}

+ 34 - 0
src/server/session_pool.rs

@@ -0,0 +1,34 @@
+use std::sync::atomic::{AtomicU32, Ordering};
+use std::sync::Mutex;
+
+pub type SessionId = u32;
+
+pub struct SessionPool {
+    next_session_id: AtomicU32,
+    released: Mutex<Vec<SessionId>>,
+}
+
+impl SessionPool {
+    pub fn new() -> Self {
+        SessionPool {
+            // 0 is reserved for SuperUser
+            next_session_id: AtomicU32::new(1),
+            released: Mutex::new(vec![]),
+        }
+    }
+
+    pub fn pop(&self) -> SessionId {
+        let mut released = self.released.lock().unwrap();
+        if released.is_empty() {
+            self.next_session_id.fetch_add(1, Ordering::SeqCst)
+        } else {
+            let index = released.len() - 1;
+            released.remove(index)
+        }
+    }
+
+    pub fn push(&self, id: SessionId) {
+        let mut released = self.released.lock().unwrap();
+        released.push(id);
+    }
+}

+ 80 - 0
src/server/tcp_control_channel.rs

@@ -0,0 +1,80 @@
+use crate::protocol::connection::{ControlChannel, ControlChannelStats, Error};
+use crate::protocol::parser::{ControlMessage, Message, ParsingError};
+use async_trait::async_trait;
+use std::io::ErrorKind;
+use std::sync::atomic::{AtomicU32, Ordering};
+use tokio::io::{AsyncReadExt, AsyncWriteExt, ReadHalf, WriteHalf};
+use tokio::net::TcpStream;
+use tokio::sync::Mutex;
+use tokio_rustls::server::TlsStream;
+
+const MAX_PROTOBUF_MESSAGE_SIZE: u32 = 8 * 1024 * 1024 - 1;
+
+pub struct TcpControlChannel {
+    received: AtomicU32,
+    writer: Mutex<WriteHalf<TlsStream<TcpStream>>>,
+    reader: Mutex<ReadHalf<TlsStream<TcpStream>>>,
+}
+
+impl TcpControlChannel {
+    pub fn new(stream: TlsStream<TcpStream>) -> Self {
+        let (reader, writer) = tokio::io::split(stream);
+
+        TcpControlChannel {
+            writer: Mutex::new(writer),
+            reader: Mutex::new(reader),
+            received: AtomicU32::new(0),
+        }
+    }
+}
+
+#[async_trait]
+impl ControlChannel for TcpControlChannel {
+    async fn send(&self, message: impl Message + 'async_trait) -> Result<(), Error> {
+        let bytes = message.serialize();
+        let mut writer = self.writer.lock().await;
+        writer.write_all(&bytes).await?;
+        Ok(())
+    }
+
+    async fn receive(&self) -> Result<ControlMessage, Error> {
+        let mut packet_type = [0; 2];
+        let mut length = [0; 4];
+        let mut reader = self.reader.lock().await;
+        reader.read_exact(&mut packet_type).await?;
+        reader.read_exact(&mut length).await?;
+        let (packet_type, length) = ControlMessage::parse_prefix(packet_type, length);
+
+        if length > MAX_PROTOBUF_MESSAGE_SIZE {
+            return Err(Error::IO(std::io::Error::new(
+                ErrorKind::Other,
+                "too big message",
+            )));
+        }
+
+        let mut payload = vec![0; length as usize];
+        reader.read_exact(&mut payload).await?;
+        let message = ControlMessage::parse_payload(packet_type, &payload)?;
+
+        self.received.fetch_add(1, Ordering::Relaxed);
+        Ok(message)
+    }
+
+    fn get_stats(&self) -> ControlChannelStats {
+        ControlChannelStats {
+            received: self.received.load(Ordering::Acquire),
+        }
+    }
+}
+
+impl From<std::io::Error> for Error {
+    fn from(err: std::io::Error) -> Self {
+        Error::IO(err)
+    }
+}
+
+impl From<ParsingError> for Error {
+    fn from(err: ParsingError) -> Self {
+        Error::Parsing(err)
+    }
+}

+ 151 - 0
src/server/udp_audio_channel.rs

@@ -0,0 +1,151 @@
+use crate::crypto;
+use crate::crypto::Ocb2Aes128Crypto;
+use crate::protocol::connection::{AudioChannel, AudioChannelStats, Error};
+use crate::protocol::parser::AudioPacket;
+use async_trait::async_trait;
+use std::net::{IpAddr, SocketAddr};
+use std::sync::atomic::{AtomicU32, Ordering};
+use std::sync::Arc;
+use tokio::net::UdpSocket;
+use tokio::sync::broadcast::error::RecvError;
+use tokio::sync::broadcast::{Receiver, Sender};
+use tokio::sync::{broadcast, Mutex};
+use tokio::task::JoinHandle;
+
+const MAX_AUDIO_PACKET_SIZE: usize = 1020;
+const ENCRYPTION_OVERHEAD: usize = 4;
+const MAX_DATAGRAM_SIZE: usize = MAX_AUDIO_PACKET_SIZE + ENCRYPTION_OVERHEAD;
+
+type Data = Arc<(Vec<u8>, SocketAddr)>;
+
+pub struct UdpWorker {
+    sender: Sender<Data>,
+    socket: Arc<UdpSocket>,
+    task: JoinHandle<()>,
+}
+
+pub struct UdpAudioChannel {
+    good: AtomicU32,
+    late: AtomicU32,
+    lost: AtomicU32,
+    received: AtomicU32,
+    receiver: Mutex<Receiver<Data>>,
+    crypto: Mutex<Ocb2Aes128Crypto>,
+    socket: Arc<UdpSocket>,
+    destination: SocketAddr,
+}
+
+impl UdpWorker {
+    pub async fn start(socket: UdpSocket) -> Self {
+        let (sender, _) = broadcast::channel(8);
+        let socket = Arc::new(socket);
+        let udp_socket = Arc::clone(&socket);
+        let broadcast_sender = sender.clone();
+        let task = tokio::spawn(async move {
+            let mut buf = [0; MAX_DATAGRAM_SIZE];
+            loop {
+                if let Ok((len, socket_address)) = udp_socket.recv_from(&mut buf).await {
+                    broadcast_sender.send(Arc::new((Vec::from(&buf[..len]), socket_address)));
+                }
+            }
+        });
+        UdpWorker {
+            sender,
+            socket,
+            task,
+        }
+    }
+
+    pub async fn new_audio_channel(
+        &self,
+        ip: IpAddr,
+        mut crypto: Ocb2Aes128Crypto,
+    ) -> UdpAudioChannel {
+        loop {
+            let mut receiver = self.sender.subscribe();
+            let data = match receiver.recv().await {
+                Ok(data) => data,
+                Err(RecvError::Lagged(_)) => receiver.recv().await.unwrap(),
+                Err(_) => panic!(),
+            };
+            let (bytes, address) = data.as_ref();
+
+            if address.ip() == ip && crypto.decrypt(bytes).is_ok() {
+                return UdpAudioChannel {
+                    good: AtomicU32::new(1),
+                    late: AtomicU32::new(0),
+                    lost: AtomicU32::new(0),
+                    received: AtomicU32::new(1),
+                    receiver: Mutex::new(receiver),
+                    crypto: Mutex::new(crypto),
+                    socket: Arc::clone(&self.socket),
+                    destination: address.clone(),
+                };
+            }
+        }
+    }
+}
+
+#[async_trait]
+impl AudioChannel for UdpAudioChannel {
+    async fn send(&self, packet: AudioPacket) -> Result<(), Error> {
+        let bytes = packet.serialize();
+        let encrypted = {
+            let mut crypto = self.crypto.lock().await;
+            crypto.encrypt(&bytes)?
+        };
+        self.socket.send_to(&encrypted, self.destination).await?;
+        Ok(())
+    }
+
+    async fn receive(&self) -> Result<AudioPacket, Error> {
+        let mut receiver = self.receiver.lock().await;
+        let data = loop {
+            let data = match receiver.recv().await {
+                Ok(data) => data,
+                Err(RecvError::Lagged(_)) => receiver.recv().await.unwrap(),
+                Err(_) => panic!(),
+            };
+
+            if data.1 == self.destination {
+                break data;
+            }
+        };
+        drop(receiver);
+
+        let mut crypto = self.crypto.lock().await;
+        let decrypted = crypto.decrypt(&data.0)?;
+        self.good.swap(crypto.good, Ordering::Release);
+        self.late.swap(crypto.late, Ordering::Release);
+        self.lost.swap(crypto.lost, Ordering::Release);
+        drop(crypto);
+
+        let packet = AudioPacket::parse(decrypted)?;
+        self.received.fetch_add(1, Ordering::Relaxed);
+        Ok(packet)
+    }
+
+    fn get_stats(&self) -> AudioChannelStats {
+        AudioChannelStats {
+            good: self.good.load(Ordering::Acquire),
+            late: self.late.load(Ordering::Acquire),
+            lost: self.lost.load(Ordering::Acquire),
+            received: self.received.load(Ordering::Acquire),
+        }
+    }
+}
+
+impl Drop for UdpWorker {
+    fn drop(&mut self) {
+        self.task.abort();
+    }
+}
+
+impl From<crypto::Error> for Error {
+    fn from(_: crypto::Error) -> Self {
+        Error::IO(std::io::Error::new(
+            std::io::ErrorKind::InvalidData,
+            "crypto fail",
+        ))
+    }
+}

+ 241 - 0
src/storage.rs

@@ -0,0 +1,241 @@
+use dashmap::DashMap;
+use serde::{Deserialize, Serialize};
+use std::num::NonZeroU32;
+
+const ROOT_CHANNEL_ID: u32 = 0;
+const USER_TREE_NAME: &[u8] = b"users";
+const CHANNEL_TREE_NAME: &[u8] = b"channels";
+
+type Sha1Hash = [u8; 20];
+type SessionId = u32;
+type UserId = u32;
+type ChannelId = u32;
+type Username = String;
+
+pub struct Storage {
+    users: sled::Tree,
+    channels: sled::Tree,
+    session_data: DashMap<SessionId, SessionData>,
+    guests: DashMap<SessionId, Guest>,
+    connected_users: DashMap<SessionId, (UserId, Username)>,
+}
+
+#[derive(Serialize, Deserialize)]
+pub struct User {
+    pub id: UserId,
+    pub username: Username,
+    pub channel_id: ChannelId,
+    pub comment: Option<String>,
+    pub texture: Option<Vec<u8>>,
+    pub certificate_hash: Option<Sha1Hash>,
+    pub comment_hash: Option<Sha1Hash>,
+    pub texture_hash: Option<Sha1Hash>,
+    pub password_hash: Option<Vec<u8>>,
+    pub password_salt: Option<Vec<u8>>,
+    pub pbkdf2_iterations: Option<NonZeroU32>,
+}
+
+#[derive(Serialize, Deserialize)]
+pub struct Channel {
+    pub id: ChannelId,
+    pub name: String,
+    pub parent_id: Option<u32>,
+    pub linked_channels: Vec<u32>,
+    pub description: Option<String>,
+    pub description_hash: Option<Sha1Hash>,
+    pub max_users: Option<u32>,
+    pub temporary: bool,
+    pub position: Option<u32>,
+}
+
+#[derive(Clone)]
+pub struct Guest {
+    pub session_id: SessionId,
+    pub username: Username,
+    pub channel_id: ChannelId,
+    pub comment: Option<String>,
+    pub texture: Option<Vec<u8>>,
+    pub comment_hash: Option<Sha1Hash>,
+    pub texture_hash: Option<Sha1Hash>,
+}
+
+#[derive(Clone)]
+pub struct SessionData {
+    pub muted_by_admin: bool,
+    pub deafened_by_admin: bool,
+    pub suppressed: bool,
+    pub muted: bool,
+    pub deafened: bool,
+    pub priority_speaker: bool,
+    pub recording: bool,
+}
+
+impl Storage {
+    pub fn open(path_to_db_file: &str) -> Self {
+        let db = sled::open(path_to_db_file).expect("Unable to open database");
+        let users = db.open_tree(USER_TREE_NAME).unwrap();
+        let channels = db.open_tree(CHANNEL_TREE_NAME).unwrap();
+
+        let root_channel = bincode::serialize(&Channel {
+            id: 0,
+            name: "Root".to_string(),
+            parent_id: None,
+            linked_channels: vec![],
+            description: None,
+            description_hash: None,
+            max_users: None,
+            temporary: false,
+            position: None,
+        })
+        .unwrap();
+        channels
+            .compare_and_swap(
+                to_bytes(ROOT_CHANNEL_ID),
+                Option::<&[u8]>::None,
+                Some(root_channel),
+            )
+            .unwrap();
+
+        Storage {
+            users,
+            channels,
+            session_data: DashMap::new(),
+            guests: DashMap::new(),
+            connected_users: DashMap::new(),
+        }
+    }
+
+    pub fn add_guest(&self, guest: Guest) {
+        self.guests.insert(guest.session_id, guest);
+    }
+
+    pub fn add_connected_user(&self, user: User, session_id: SessionId) {
+        self.connected_users
+            .insert(session_id, (user.id, user.username));
+    }
+
+    pub fn get_channels(&self) -> Vec<Channel> {
+        self.channels
+            .iter()
+            .values()
+            .map(|channel| bincode::deserialize(&channel.unwrap()).unwrap())
+            .collect()
+    }
+
+    pub fn get_guests(&self) -> Vec<Guest> {
+        self.guests.iter().map(|el| el.value().clone()).collect()
+    }
+
+    pub fn get_connected_users(&self) -> Vec<(SessionId, User)> {
+        let connected_users: Vec<(SessionId, UserId)> = self
+            .connected_users
+            .iter()
+            .map(|el| (*el.key(), el.value().0))
+            .collect();
+        let mut users = Vec::with_capacity(connected_users.len());
+        for (session_id, user_id) in connected_users {
+            if let Ok(Some(bytes)) = self.users.get(&to_bytes(user_id)) {
+                let user: User = bincode::deserialize(&bytes).unwrap();
+                users.push((session_id, user));
+            }
+        }
+        users
+    }
+
+    pub fn get_guest(&self, id: SessionId) -> Option<Guest> {
+        self.guests.get(&id).map(|entry| entry.value().clone())
+    }
+
+    pub fn get_connected_user(&self, id: SessionId) -> Option<User> {
+        if let Some(entry) = self.connected_users.get(&id) {
+            self.get_user_by_id(entry.value().0)
+        } else {
+            None
+        }
+    }
+
+    pub fn get_session_data(&self, id: SessionId) -> Option<SessionData> {
+        self.session_data
+            .get(&id)
+            .map(|entry| entry.value().clone())
+    }
+
+    pub fn get_user_by_id(&self, id: UserId) -> Option<User> {
+        if let Ok(Some(user)) = self.users.get(bincode::serialize(&id).unwrap()) {
+            return bincode::deserialize(&user).unwrap();
+        }
+        None
+    }
+
+    pub fn get_user_by_username(&self, username: Username) -> Option<User> {
+        self.users
+            .iter()
+            .filter_map(|value| value.ok())
+            .find_map(|(_, user)| {
+                let user: User = bincode::deserialize(&user).unwrap();
+                if user.username == username {
+                    Some(user)
+                } else {
+                    None
+                }
+            })
+    }
+
+    pub fn username_in_connected(&self, username: &str) -> bool {
+        if self
+            .guests
+            .iter()
+            .find(|entry| entry.value().username == username)
+            .is_some()
+        {
+            true
+        } else if self
+            .connected_users
+            .iter()
+            .find(|entry| entry.value().1 == username)
+            .is_some()
+        {
+            true
+        } else {
+            false
+        }
+    }
+
+    pub fn remove_by_session_id(&self, id: SessionId) {
+        self.connected_users.remove(&id);
+        self.guests.remove(&id);
+        self.session_data.remove(&id);
+    }
+}
+
+impl Guest {
+    pub fn new(username: String, session_id: SessionId, channel_id: ChannelId) -> Self {
+        Guest {
+            session_id,
+            username,
+            channel_id,
+            comment: None,
+            texture: None,
+            comment_hash: None,
+            texture_hash: None,
+        }
+    }
+}
+
+impl SessionData {
+    fn new() -> Self {
+        SessionData {
+            muted_by_admin: false,
+            deafened_by_admin: false,
+            suppressed: false,
+            muted: false,
+            deafened: false,
+            priority_speaker: false,
+            recording: false,
+        }
+    }
+}
+
+fn to_bytes(number: u32) -> [u8; 4] {
+    number.to_be_bytes()
+}