فهرست منبع

Split stream into separate reader and writer

Sergey Chushin 3 سال پیش
والد
کامیت
91eb44714d
1فایلهای تغییر یافته به همراه219 افزوده شده و 222 حذف شده
  1. 219 222
      src/protocol.rs

+ 219 - 222
src/protocol.rs

@@ -1,10 +1,11 @@
-use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
 use protobuf::{Message, ProtobufError};
 use protobuf::{Message, ProtobufError};
-use crate::proto::mumble::{Version, Authenticate, Ping, Reject, ServerSync,
-                           ChannelRemove, ChannelState, UserRemove, UserState, BanList,
-                           TextMessage, PermissionDenied, ACL as Acl, QueryUsers, CryptSetup,
-                           ContextActionModify, ContextAction, UserList, VoiceTarget, PermissionQuery,
-                           CodecVersion, UserStats, RequestBlob, ServerConfig, SuggestConfig};
+use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
+
+use crate::proto::mumble::{ACL as Acl, Authenticate, BanList, ChannelRemove, ChannelState,
+                           CodecVersion, ContextAction, ContextActionModify, CryptSetup, PermissionDenied,
+                           PermissionQuery, Ping, QueryUsers, Reject, RequestBlob,
+                           ServerConfig, ServerSync, SuggestConfig, TextMessage, UserList,
+                           UserRemove, UserState, UserStats, Version, VoiceTarget};
 
 
 pub const MUMBLE_PROTOCOL_VERSION: u32 = 0b0000_0001_0011_0100;
 pub const MUMBLE_PROTOCOL_VERSION: u32 = 0b0000_0001_0011_0100;
 
 
@@ -62,30 +63,30 @@ pub enum MumblePacket {
     UserStats(UserStats),
     UserStats(UserStats),
     RequestBlob(RequestBlob),
     RequestBlob(RequestBlob),
     ServerConfig(ServerConfig),
     ServerConfig(ServerConfig),
-    SuggestConfig(SuggestConfig)
+    SuggestConfig(SuggestConfig),
 }
 }
 
 
 pub enum VoicePacket {
 pub enum VoicePacket {
     Ping(VoicePing),
     Ping(VoicePing),
-    AudioData(AudioData)
+    AudioData(AudioData),
 }
 }
 
 
 pub enum Error {
 pub enum Error {
     UnknownPacketType,
     UnknownPacketType,
     ConnectionError,
     ConnectionError,
-    ParsingError
+    ParsingError,
 }
 }
 
 
-pub struct MumblePacketStream<S> {
-    stream: S
+pub struct MumblePacketReader<R> {
+    reader: R,
 }
 }
 
 
-pub struct VoicePacketStream<S> {
-    stream: S
+pub struct MumblePacketWriter<W> {
+    writer: W,
 }
 }
 
 
 pub struct VoicePing {
 pub struct VoicePing {
-    timestamp: u64
+    timestamp: u64,
 }
 }
 
 
 pub struct AudioData {
 pub struct AudioData {
@@ -94,7 +95,7 @@ pub struct AudioData {
     session_id: Option<u64>,
     session_id: Option<u64>,
     sequence_number: u64,
     sequence_number: u64,
     audio_payload: Vec<u8>,
     audio_payload: Vec<u8>,
-    positional_info: Option<[f32; 3]>
+    positional_info: Option<[f32; 3]>,
 }
 }
 
 
 enum Codecs {
 enum Codecs {
@@ -104,20 +105,26 @@ enum Codecs {
     Opus,
     Opus,
 }
 }
 
 
-impl<S> MumblePacketStream<S>
+pub fn new<S, R, W>(stream: S) -> (MumblePacketReader<R>, MumblePacketWriter<W>)
     where
     where
         S: AsyncRead + AsyncWrite + Unpin + Send,
         S: AsyncRead + AsyncWrite + Unpin + Send,
+        R: AsyncRead + Unpin + Send,
+        W: AsyncWrite + Unpin + Send,
 {
 {
-    pub fn new(stream: S) -> Self {
-        MumblePacketStream { stream }
-    }
+    let (reader, writer) = tokio::io::split(stream);
+    (MumblePacketReader { reader }, MumblePacketWriter { writer })
+}
 
 
+impl<R> MumblePacketReader<R>
+    where
+        R: AsyncRead + Unpin + Send,
+{
     pub async fn read(&mut self) -> Result<MumblePacket, Error> {
     pub async fn read(&mut self) -> Result<MumblePacket, Error> {
-        let packet_type = self.stream.read_u16().await?;
-        let payload_length = self.stream.read_u32().await?;
+        let packet_type = self.reader.read_u16().await?;
+        let payload_length = self.reader.read_u32().await?;
 
 
         if packet_type == UDP_TUNNEL {
         if packet_type == UDP_TUNNEL {
-            return Ok(MumblePacket::UdpTunnel(self.read_voice_packet().await?))
+            return Ok(MumblePacket::UdpTunnel(self.read_voice_packet().await?));
         }
         }
 
 
         let payload = self.read_payload(payload_length).await?;
         let payload = self.read_payload(payload_length).await?;
@@ -152,52 +159,38 @@ impl<S> MumblePacketStream<S>
         }
         }
     }
     }
 
 
-    pub async fn write(&mut self, packet: MumblePacket) -> Result<(), Error> {
-        match packet {
-            MumblePacket::UdpTunnel(value) => {
-                let bytes = Self::serialize_voice_packet(value);
-                self.stream.write_u16(UDP_TUNNEL).await?;
-                self.stream.write_u32(bytes.len() as u32).await?;
-                self.stream.write_all(&bytes).await?;
-            }
-            MumblePacket::Version(value) => self.write_protobuf_packet(value, VERSION).await?,
-            MumblePacket::Authenticate(value) => self.write_protobuf_packet(value, AUTHENTICATE).await?,
-            MumblePacket::Ping(value) => self.write_protobuf_packet(value, PING).await?,
-            MumblePacket::Reject(value) => self.write_protobuf_packet(value, REJECT).await?,
-            MumblePacket::ServerSync(value) => self.write_protobuf_packet(value, SERVER_SYNC).await?,
-            MumblePacket::ChannelRemove(value) => self.write_protobuf_packet(value, CHANNEL_REMOVE).await?,
-            MumblePacket::ChannelState(value) => self.write_protobuf_packet(value, CHANNEL_STATE).await?,
-            MumblePacket::UserRemove(value) => self.write_protobuf_packet(value, USER_REMOVE).await?,
-            MumblePacket::UserState(value) => self.write_protobuf_packet(value, USER_STATE).await?,
-            MumblePacket::BanList(value) => self.write_protobuf_packet(value, BAN_LIST).await?,
-            MumblePacket::TextMessage(value) => self.write_protobuf_packet(value, TEXT_MESSAGE).await?,
-            MumblePacket::PermissionDenied(value) => self.write_protobuf_packet(value, PERMISSION_DENIED).await?,
-            MumblePacket::Acl(value) => self.write_protobuf_packet(value, ACL).await?,
-            MumblePacket::QueryUsers(value) => self.write_protobuf_packet(value, QUERY_USERS).await?,
-            MumblePacket::CryptSetup(value) => self.write_protobuf_packet(value, CRYPT_SETUP).await?,
-            MumblePacket::ContextActionModify(value) => self.write_protobuf_packet(value, CONTEXT_ACTION_MODIFY).await?,
-            MumblePacket::ContextAction(value) => self.write_protobuf_packet(value, CONTEXT_ACTION).await?,
-            MumblePacket::UserList(value) => self.write_protobuf_packet(value, USER_LIST).await?,
-            MumblePacket::VoiceTarget(value) => self.write_protobuf_packet(value, VOICE_TARGET).await?,
-            MumblePacket::PermissionQuery(value) => self.write_protobuf_packet(value, PERMISSION_QUERY).await?,
-            MumblePacket::CodecVersion(value) => self.write_protobuf_packet(value, CODEC_VERSION).await?,
-            MumblePacket::UserStats(value) => self.write_protobuf_packet(value, USER_STATS).await?,
-            MumblePacket::RequestBlob(value) => self.write_protobuf_packet(value, REQUEST_BLOB).await?,
-            MumblePacket::ServerConfig(value) => self.write_protobuf_packet(value, SERVER_CONFIG).await?,
-            MumblePacket::SuggestConfig(value) => self.write_protobuf_packet(value, SUGGEST_CONFIG).await?,
-        }
+    async fn read_voice_packet(&mut self) -> Result<VoicePacket, Error> {
+        let header = self.reader.read_u8().await?;
+        let (audio_packet_type, target) = decode_header(header);
 
 
-        Ok(())
-    }
+        if audio_packet_type == 1 {
+            let timestamp = self.read_varint().await?;
+            return Ok(VoicePacket::Ping(VoicePing {
+                timestamp
+            }));
+        }
 
 
-    async fn read_payload(&mut self, payload_length: u32) -> tokio::io::Result<Vec<u8>> {
-        let mut payload = vec![0; payload_length as usize];
-        self.stream.read_exact(&mut payload).await?;
-        Ok(payload)
+        let codec = match audio_packet_type {
+            0 => Codecs::CeltAlpha,
+            2 => Codecs::Speex,
+            3 => Codecs::CeltBeta,
+            4 => Codecs::Opus,
+            _ => return Err(Error::ParsingError)
+        };
+        let sequence_number = self.read_varint().await?;
+        let audio_payload = self.read_audio_payload(&codec).await?;
+        Ok(VoicePacket::AudioData(AudioData {
+            codec,
+            target,
+            session_id: None,
+            sequence_number,
+            audio_payload,
+            positional_info: None, //TODO
+        }))
     }
     }
 
 
     async fn read_varint(&mut self) -> Result<u64, Error> { //TODO negative number decode
     async fn read_varint(&mut self) -> Result<u64, Error> { //TODO negative number decode
-        let header = self.stream.read_u8().await?;
+        let header = self.reader.read_u8().await?;
 
 
         //7-bit number
         //7-bit number
         if (header & 0b1000_0000) == 0b0000_0000 {
         if (header & 0b1000_0000) == 0b0000_0000 {
@@ -207,77 +200,47 @@ impl<S> MumblePacketStream<S>
         if (header & 0b1100_0000) == 0b1000_0000 {
         if (header & 0b1100_0000) == 0b1000_0000 {
             let first_number_byte = header ^ 0b1000_0000;
             let first_number_byte = header ^ 0b1000_0000;
             return Ok(
             return Ok(
-                ((first_number_byte       as u64) << 8) |
-                (self.stream.read_u8().await? as u64)
+                ((first_number_byte as u64) << 8) |
+                    (self.reader.read_u8().await? as u64)
             );
             );
         }
         }
         //21-bit number
         //21-bit number
         if (header & 0b1110_0000) == 0b1100_0000 {
         if (header & 0b1110_0000) == 0b1100_0000 {
             let first_number_byte = header ^ 0b1100_0000;
             let first_number_byte = header ^ 0b1100_0000;
             return Ok(
             return Ok(
-                ((first_number_byte       as u64) << 16) |
-                ((self.stream.read_u8().await? as u64) << 8 ) |
-                (self.stream.read_u8().await? as u64)
+                ((first_number_byte as u64) << 16) |
+                    ((self.reader.read_u8().await? as u64) << 8) |
+                    (self.reader.read_u8().await? as u64)
             );
             );
         }
         }
         //28-bit number
         //28-bit number
         if (header & 0b1111_0000) == 0b1110_0000 {
         if (header & 0b1111_0000) == 0b1110_0000 {
             let first_number_byte = header ^ 0b1110_0000;
             let first_number_byte = header ^ 0b1110_0000;
             return Ok(
             return Ok(
-                ((first_number_byte       as u64) << 24) |
-                ((self.stream.read_u8().await? as u64) << 16) |
-                ((self.stream.read_u8().await? as u64) << 8 ) |
-                (self.stream.read_u8().await? as u64)
+                ((first_number_byte as u64) << 24) |
+                    ((self.reader.read_u8().await? as u64) << 16) |
+                    ((self.reader.read_u8().await? as u64) << 8) |
+                    (self.reader.read_u8().await? as u64)
             );
             );
         }
         }
         //32-bit number
         //32-bit number
         if (header & 0b1111_1100) == 0b1111_0000 {
         if (header & 0b1111_1100) == 0b1111_0000 {
-            return Ok(self.stream.read_u32().await? as u64);
+            return Ok(self.reader.read_u32().await? as u64);
         }
         }
         //64-bit number
         //64-bit number
         if (header & 0b1111_1100) == 0b1111_0100 {
         if (header & 0b1111_1100) == 0b1111_0100 {
-            return Ok(self.stream.read_u64().await?);
+            return Ok(self.reader.read_u64().await?);
         }
         }
 
 
         Err(Error::ParsingError)
         Err(Error::ParsingError)
     }
     }
 
 
-    async fn read_voice_packet(&mut self) -> Result<VoicePacket, Error> {
-        let header = self.stream.read_u8().await?;
-        let (audio_packet_type, target) = Self::decode_header(header);
-
-        if audio_packet_type == 1 {
-            let timestamp = self.read_varint().await?;
-            return Ok(VoicePacket::Ping(VoicePing {
-                timestamp
-            }));
-        }
-
-        let codec = match audio_packet_type {
-            0 => Codecs::CeltAlpha,
-            2 => Codecs::Speex,
-            3 => Codecs::CeltBeta,
-            4 => Codecs::Opus,
-            _ => return Err(Error::ParsingError)
-        };
-        let sequence_number = self.read_varint().await?;
-        let audio_payload = self.read_audio_payload(&codec).await?;
-        Ok(VoicePacket::AudioData(AudioData {
-            codec,
-            target,
-            session_id: None,
-            sequence_number,
-            audio_payload,
-            positional_info: None //TODO
-        }))
-    }
-
     async fn read_audio_payload(&mut self, codec_type: &Codecs) -> Result<Vec<u8>, Error> {
     async fn read_audio_payload(&mut self, codec_type: &Codecs) -> Result<Vec<u8>, Error> {
         match codec_type {
         match codec_type {
             Codecs::CeltAlpha | Codecs::Speex | Codecs::CeltBeta => {
             Codecs::CeltAlpha | Codecs::Speex | Codecs::CeltBeta => {
                 let mut payload = vec![];
                 let mut payload = vec![];
                 loop {
                 loop {
-                    let header = self.stream.read_u8().await?;
+                    let header = self.reader.read_u8().await?;
                     let continuation_bit = header & 0b1000_0000;
                     let continuation_bit = header & 0b1000_0000;
                     let length = header & 0b0111_1111;
                     let length = header & 0b0111_1111;
                     payload.push(header);
                     payload.push(header);
@@ -286,7 +249,7 @@ impl<S> MumblePacketStream<S>
                         break;
                         break;
                     }
                     }
                     for _ in 0..length {
                     for _ in 0..length {
-                        payload.push(self.stream.read_u8().await?)
+                        payload.push(self.reader.read_u8().await?)
                     }
                     }
 
 
                     if continuation_bit == 0 {
                     if continuation_bit == 0 {
@@ -302,137 +265,170 @@ impl<S> MumblePacketStream<S>
                 let mut payload = vec![];
                 let mut payload = vec![];
                 let header = self.read_varint().await?;
                 let header = self.read_varint().await?;
                 let length = header & 0x1fff;
                 let length = header & 0x1fff;
-                payload.append(&mut Self::encode_varint(header));
+                payload.append(&mut encode_varint(header));
 
 
                 for _ in 0..length {
                 for _ in 0..length {
-                    payload.push(self.stream.read_u8().await?)
+                    payload.push(self.reader.read_u8().await?)
                 }
                 }
                 Ok(payload)
                 Ok(payload)
             }
             }
         }
         }
     }
     }
 
 
-    async fn write_protobuf_packet<T> (&mut self, packet: T, packet_type: u16) -> Result<(), Error>
-    where T: Message
+    async fn read_payload(&mut self, payload_length: u32) -> tokio::io::Result<Vec<u8>> {
+        let mut payload = vec![0; payload_length as usize];
+        self.reader.read_exact(&mut payload).await?;
+        Ok(payload)
+    }
+}
+
+impl<W> MumblePacketWriter<W>
+    where
+        W: AsyncWrite + Unpin + Send,
+{
+    pub async fn write(&mut self, packet: MumblePacket) -> Result<(), Error> {
+        match packet {
+            MumblePacket::UdpTunnel(value) => {
+                let bytes = serialize_voice_packet(value);
+                self.writer.write_u16(UDP_TUNNEL).await?;
+                self.writer.write_u32(bytes.len() as u32).await?;
+                self.writer.write_all(&bytes).await?;
+            }
+            MumblePacket::Version(value) => self.write_protobuf_packet(value, VERSION).await?,
+            MumblePacket::Authenticate(value) => self.write_protobuf_packet(value, AUTHENTICATE).await?,
+            MumblePacket::Ping(value) => self.write_protobuf_packet(value, PING).await?,
+            MumblePacket::Reject(value) => self.write_protobuf_packet(value, REJECT).await?,
+            MumblePacket::ServerSync(value) => self.write_protobuf_packet(value, SERVER_SYNC).await?,
+            MumblePacket::ChannelRemove(value) => self.write_protobuf_packet(value, CHANNEL_REMOVE).await?,
+            MumblePacket::ChannelState(value) => self.write_protobuf_packet(value, CHANNEL_STATE).await?,
+            MumblePacket::UserRemove(value) => self.write_protobuf_packet(value, USER_REMOVE).await?,
+            MumblePacket::UserState(value) => self.write_protobuf_packet(value, USER_STATE).await?,
+            MumblePacket::BanList(value) => self.write_protobuf_packet(value, BAN_LIST).await?,
+            MumblePacket::TextMessage(value) => self.write_protobuf_packet(value, TEXT_MESSAGE).await?,
+            MumblePacket::PermissionDenied(value) => self.write_protobuf_packet(value, PERMISSION_DENIED).await?,
+            MumblePacket::Acl(value) => self.write_protobuf_packet(value, ACL).await?,
+            MumblePacket::QueryUsers(value) => self.write_protobuf_packet(value, QUERY_USERS).await?,
+            MumblePacket::CryptSetup(value) => self.write_protobuf_packet(value, CRYPT_SETUP).await?,
+            MumblePacket::ContextActionModify(value) => self.write_protobuf_packet(value, CONTEXT_ACTION_MODIFY).await?,
+            MumblePacket::ContextAction(value) => self.write_protobuf_packet(value, CONTEXT_ACTION).await?,
+            MumblePacket::UserList(value) => self.write_protobuf_packet(value, USER_LIST).await?,
+            MumblePacket::VoiceTarget(value) => self.write_protobuf_packet(value, VOICE_TARGET).await?,
+            MumblePacket::PermissionQuery(value) => self.write_protobuf_packet(value, PERMISSION_QUERY).await?,
+            MumblePacket::CodecVersion(value) => self.write_protobuf_packet(value, CODEC_VERSION).await?,
+            MumblePacket::UserStats(value) => self.write_protobuf_packet(value, USER_STATS).await?,
+            MumblePacket::RequestBlob(value) => self.write_protobuf_packet(value, REQUEST_BLOB).await?,
+            MumblePacket::ServerConfig(value) => self.write_protobuf_packet(value, SERVER_CONFIG).await?,
+            MumblePacket::SuggestConfig(value) => self.write_protobuf_packet(value, SUGGEST_CONFIG).await?,
+        }
+
+        self.writer.flush().await?;
+        Ok(())
+    }
+
+    async fn write_protobuf_packet<T>(&mut self, packet: T, packet_type: u16) -> Result<(), Error>
+        where T: Message
     {
     {
         let bytes = packet.write_to_bytes()?;
         let bytes = packet.write_to_bytes()?;
-        self.stream.write_u16(packet_type).await?;
-        self.stream.write_u32(bytes.len() as u32).await?;
-        self.stream.write_all(&bytes).await?;
+        self.writer.write_u16(packet_type).await?;
+        self.writer.write_u32(bytes.len() as u32).await?;
+        self.writer.write_all(&bytes).await?;
 
 
         Ok(())
         Ok(())
     }
     }
+}
 
 
-    fn decode_header(header: u8) -> (u8, u8) {
-        let packet_type = header >> 5;
-        let target = header & 0b0001_1111;
-        (packet_type, target)
-    }
+fn decode_header(header: u8) -> (u8, u8) {
+    let packet_type = header >> 5;
+    let target = header & 0b0001_1111;
+    (packet_type, target)
+}
 
 
-    fn encode_header(packet_type: u8, target: u8) -> u8 {
-        (packet_type << 5) | target
-    }
+fn encode_header(packet_type: u8, target: u8) -> u8 {
+    (packet_type << 5) | target
+}
 
 
-    fn encode_varint(number: u64) -> Vec<u8> { //TODO negative number encode
-        let mut result = vec![];
-
-        if number < 0x80 {
-            //7-bit number
-            result.push(number as u8);
-        } else if number < 0x4000 {
-            //14-bit number
-            result.push(((number >> 8) | 0x80) as u8);
-            result.push((number & 0xFF) as u8);
-        } else if number < 0x200000 {
-            //21-bit number
-            result.push(((number >> 16) | 0xC0) as u8);
-            result.push(((number >> 8) & 0xFF) as u8);
-            result.push((number & 0xFF) as u8);
-        } else if number < 0x10000000 {
-            //28-bit number
-            result.push(((number >> 24) | 0xE0) as u8);
-            result.push(((number >> 16) & 0xFF) as u8);
-            result.push(((number >> 8) & 0xFF) as u8);
-            result.push((number & 0xFF) as u8);
-        } else if number < 0x100000000 {
-            //32-bit number
-            result.push(0xF0);
-            result.push(((number >> 24) & 0xFF) as u8);
-            result.push(((number >> 16) & 0xFF) as u8);
-            result.push(((number >> 8) & 0xFF) as u8);
-            result.push((number & 0xFF) as u8);
-        } else {
-            //64-bit number
-            result.push(0xF4);
-            result.push(((number >> 56) & 0xFF) as u8);
-            result.push(((number >> 48) & 0xFF) as u8);
-            result.push(((number >> 40) & 0xFF) as u8);
-            result.push(((number >> 32) & 0xFF) as u8);
-            result.push(((number >> 24) & 0xFF) as u8);
-            result.push(((number >> 16) & 0xFF) as u8);
-            result.push(((number >> 8) & 0xFF) as u8);
-            result.push((number & 0xFF) as u8);
-        }
+fn encode_varint(number: u64) -> Vec<u8> { //TODO negative number encode
+    let mut result = vec![];
 
 
-        result
+    if number < 0x80 {
+        //7-bit number
+        result.push(number as u8);
+    } else if number < 0x4000 {
+        //14-bit number
+        result.push(((number >> 8) | 0x80) as u8);
+        result.push((number & 0xFF) as u8);
+    } else if number < 0x200000 {
+        //21-bit number
+        result.push(((number >> 16) | 0xC0) as u8);
+        result.push(((number >> 8) & 0xFF) as u8);
+        result.push((number & 0xFF) as u8);
+    } else if number < 0x10000000 {
+        //28-bit number
+        result.push(((number >> 24) | 0xE0) as u8);
+        result.push(((number >> 16) & 0xFF) as u8);
+        result.push(((number >> 8) & 0xFF) as u8);
+        result.push((number & 0xFF) as u8);
+    } else if number < 0x100000000 {
+        //32-bit number
+        result.push(0xF0);
+        result.push(((number >> 24) & 0xFF) as u8);
+        result.push(((number >> 16) & 0xFF) as u8);
+        result.push(((number >> 8) & 0xFF) as u8);
+        result.push((number & 0xFF) as u8);
+    } else {
+        //64-bit number
+        result.push(0xF4);
+        result.push(((number >> 56) & 0xFF) as u8);
+        result.push(((number >> 48) & 0xFF) as u8);
+        result.push(((number >> 40) & 0xFF) as u8);
+        result.push(((number >> 32) & 0xFF) as u8);
+        result.push(((number >> 24) & 0xFF) as u8);
+        result.push(((number >> 16) & 0xFF) as u8);
+        result.push(((number >> 8) & 0xFF) as u8);
+        result.push((number & 0xFF) as u8);
     }
     }
 
 
-    fn serialize_voice_packet(packet: VoicePacket) -> Vec<u8> {
-        let mut result = vec![];
+    result
+}
 
 
-        match packet {
-            VoicePacket::Ping(value) => {
-                result.push(0b0010_0000);
-                let mut varint = Self::encode_varint(value.timestamp);
-                result.append(&mut varint);
+fn serialize_voice_packet(packet: VoicePacket) -> Vec<u8> {
+    let mut result = vec![];
+
+    match packet {
+        VoicePacket::Ping(value) => {
+            result.push(0b0010_0000);
+            let mut varint = encode_varint(value.timestamp);
+            result.append(&mut varint);
+        }
+        VoicePacket::AudioData(mut value) => {
+            let packet_type = match value.codec {
+                Codecs::CeltAlpha => 0b0000_0000,
+                Codecs::Speex => 0b0100_0000,
+                Codecs::CeltBeta => 0b0110_0000,
+                Codecs::Opus => 0b1000_0000,
+            };
+            let header = encode_header(packet_type, value.target);
+            result.push(header);
+
+            if let Some(session_id) = value.session_id {
+                let mut session_id = encode_varint(session_id);
+                result.append(&mut session_id);
             }
             }
-            VoicePacket::AudioData(mut value) => {
-                let packet_type = match value.codec {
-                    Codecs::CeltAlpha => 0b0000_0000,
-                    Codecs::Speex => 0b0100_0000,
-                    Codecs::CeltBeta => 0b0110_0000,
-                    Codecs::Opus => 0b1000_0000,
-                };
-                let header = Self::encode_header(packet_type, value.target);
-                result.push(header);
-
-                if let Some(session_id) = value.session_id {
-                    let mut session_id = Self::encode_varint(session_id);
-                    result.append(&mut session_id);
-                }
 
 
-                let mut sequence_number = Self::encode_varint(value.sequence_number);
-                result.append(&mut sequence_number);
+            let mut sequence_number = encode_varint(value.sequence_number);
+            result.append(&mut sequence_number);
 
 
-                result.append(&mut value.audio_payload);
+            result.append(&mut value.audio_payload);
 
 
-                if let Some(position_info) = value.positional_info {
-                    result.extend_from_slice(&position_info[0].to_be_bytes());
-                    result.extend_from_slice(&position_info[1].to_be_bytes());
-                    result.extend_from_slice(&position_info[2].to_be_bytes());
-                }
+            if let Some(position_info) = value.positional_info {
+                result.extend_from_slice(&position_info[0].to_be_bytes());
+                result.extend_from_slice(&position_info[1].to_be_bytes());
+                result.extend_from_slice(&position_info[2].to_be_bytes());
             }
             }
         }
         }
-
-        result
-    }
-}
-
-impl <S> VoicePacketStream<S>
-    where
-        S: AsyncRead + AsyncWrite + Unpin + Send,
-{
-    pub fn new(stream: S) -> Self {
-        VoicePacketStream { stream }
-    }
-
-    pub async fn read(&mut self) -> Result<VoicePacket, Error> {
-        unimplemented!() //TODO
     }
     }
 
 
-    pub async fn write(&mut self, packet: VoicePacket) -> Result<(), Error> {
-        unimplemented!() //TODO
-    }
+    result
 }
 }
 
 
 impl From<std::io::Error> for Error {
 impl From<std::io::Error> for Error {
@@ -452,21 +448,22 @@ impl From<ProtobufError> for Error {
 
 
 #[cfg(test)]
 #[cfg(test)]
 mod tests {
 mod tests {
-    use super::*;
     use tokio::net::TcpStream;
     use tokio::net::TcpStream;
 
 
+    use super::*;
+
     #[test]
     #[test]
     fn test_decode_header() {
     fn test_decode_header() {
-        assert_eq!(MumblePacketStream::<TcpStream>::decode_header(0b0100_1000), (2, 8));
-        assert_eq!(MumblePacketStream::<TcpStream>::decode_header(0b0111_1111), (3, 31));
-        assert_eq!(MumblePacketStream::<TcpStream>::decode_header(0b1000_0000), (4, 0));
+        assert_eq!(decode_header(0b0100_1000), (2, 8));
+        assert_eq!(decode_header(0b0111_1111), (3, 31));
+        assert_eq!(decode_header(0b1000_0000), (4, 0));
     }
     }
 
 
     #[test]
     #[test]
     fn test_encode_header() {
     fn test_encode_header() {
-        assert_eq!(MumblePacketStream::<TcpStream>::encode_header(2, 8), 0b0100_1000);
-        assert_eq!(MumblePacketStream::<TcpStream>::encode_header(3, 31), 0b0111_1111);
-        assert_eq!(MumblePacketStream::<TcpStream>::encode_header(4, 0), 0b1000_0000);
+        assert_eq!(encode_header(2, 8), 0b0100_1000);
+        assert_eq!(encode_header(3, 31), 0b0111_1111);
+        assert_eq!(encode_header(4, 0), 0b1000_0000);
     }
     }
 
 
     #[test]
     #[test]
@@ -482,12 +479,12 @@ mod tests {
             vec![0b1111_0100, 0b1100_0000, 0b0000_0000, 0b0000_0000, 0b0000_0001,
             vec![0b1111_0100, 0b1100_0000, 0b0000_0000, 0b0000_0000, 0b0000_0001,
                  0b0000_0000, 0b0000_0000, 0b0000_0000, 0b0001_0000];
                  0b0000_0000, 0b0000_0000, 0b0000_0000, 0b0001_0000];
 
 
-        assert_eq!(MumblePacketStream::<TcpStream>::encode_varint(0x8), varint_7bit_positive);
-        assert_eq!(MumblePacketStream::<TcpStream>::encode_varint(0x2203), varint_14bit_positive);
-        assert_eq!(MumblePacketStream::<TcpStream>::encode_varint(0x140000), varint_21bit_positive);
-        assert_eq!(MumblePacketStream::<TcpStream>::encode_varint(0xc402001), varint_28bit_positive);
-        assert_eq!(MumblePacketStream::<TcpStream>::encode_varint(0xc0000001), varint_32bit_positive);
-        assert_eq!(MumblePacketStream::<TcpStream>::encode_varint(0xc000000100000010), varint_64bit_positive);
+        assert_eq!(encode_varint(0x8), varint_7bit_positive);
+        assert_eq!(encode_varint(0x2203), varint_14bit_positive);
+        assert_eq!(encode_varint(0x140000), varint_21bit_positive);
+        assert_eq!(encode_varint(0xc402001), varint_28bit_positive);
+        assert_eq!(encode_varint(0xc0000001), varint_32bit_positive);
+        assert_eq!(encode_varint(0xc000000100000010), varint_64bit_positive);
     }
     }
 }
 }