Преглед изворни кода

Implement audio tunneling over control channel

Sergey Chushin пре 3 година
родитељ
комит
3d00d0fd52
3 измењених фајлова са 59 додато и 126 уклоњено
  1. 15 4
      src/client.rs
  2. 38 122
      src/protocol.rs
  3. 6 0
      src/server.rs

+ 15 - 4
src/client.rs

@@ -8,18 +8,21 @@ use tokio::task::JoinHandle;
 use crate::connection::Connection;
 use crate::db::{Db, User};
 use crate::proto::mumble::{Ping, UserRemove, UserState};
-use crate::protocol::{MumblePacket, MumblePacketWriter, VoicePacket};
+use crate::protocol::{MumblePacket, MumblePacketWriter, VoicePacket, AudioData};
 
 pub enum Message {
     UserConnected(u32),
     UserDisconnected(u32),
+    UserTalking(AudioData),
 }
 
 pub enum ResponseMessage {
-    Disconnected
+    Disconnected,
+    Talking(AudioData),
 }
 
 pub struct Client {
+    pub session_id: u32,
     inner_sender: UnboundedSender<InnerMessage>,
     handler_task: JoinHandle<()>,
     packet_task: JoinHandle<()>,
@@ -103,6 +106,7 @@ impl Client {
         });
 
         return (Client {
+            session_id,
             inner_sender,
             handler_task,
             packet_task,
@@ -138,9 +142,11 @@ impl<W> Handler<W>
                 match voice {
                     VoicePacket::Ping(_) => {
                         self.writer.write(MumblePacket::UdpTunnel(voice)).await;
-                        println!("VoicePing");
                     }
-                    VoicePacket::AudioData(_) => { println!("AudioData"); }
+                    VoicePacket::AudioData(mut audio_data) => {
+                        audio_data.session_id = Some(self.session_id);
+                        self.response_sender.send(ResponseMessage::Talking(audio_data));
+                    }
                 }
             }
             _ => println!("unimplemented!")
@@ -152,6 +158,7 @@ impl<W> Handler<W>
         match message {
             Message::UserConnected(session_id) => self.new_user_connected(session_id).await?,
             Message::UserDisconnected(session_id) => self.user_disconnected(session_id).await?,
+            Message::UserTalking(audio_data) => self.user_talking(audio_data).await?,
         }
 
         Ok(())
@@ -174,6 +181,10 @@ impl<W> Handler<W>
         self.db.remove_connected_user(self.session_id).await;
         self.response_sender.send(ResponseMessage::Disconnected);
     }
+
+    async fn user_talking(&mut self, audio_data: AudioData) -> Result<(), Error> {
+        Ok(self.writer.write(MumblePacket::UdpTunnel(VoicePacket::AudioData(audio_data))).await?)
+    }
 }
 
 impl From<User> for UserState {

+ 38 - 122
src/protocol.rs

@@ -86,23 +86,13 @@ pub struct MumblePacketWriter<W> {
 }
 
 pub struct VoicePing {
-    timestamp: u64,
+    bytes: Vec<u8>
 }
 
+#[derive(Clone)]
 pub struct AudioData {
-    codec: Codecs,
-    target: u8,
-    session_id: Option<u64>,
-    sequence_number: u64,
-    audio_payload: Vec<u8>,
-    positional_info: Option<[f32; 3]>,
-}
-
-enum Codecs {
-    CeltAlpha,
-    Speex,
-    CeltBeta,
-    Opus,
+    pub session_id: Option<u32>,
+    bytes: Vec<u8>,
 }
 
 pub fn new<S>(stream: S) -> (MumblePacketReader<ReadHalf<S>>, MumblePacketWriter<WriteHalf<S>>)
@@ -126,15 +116,11 @@ impl<R> MumblePacketReader<R>
     pub async fn read(&mut self) -> Result<MumblePacket, Error> {
         let packet_type = self.reader.read_u16().await?;
         let payload_length = self.reader.read_u32().await?;
-
-        if packet_type == UDP_TUNNEL {
-            return Ok(MumblePacket::UdpTunnel(self.read_voice_packet().await?));
-        }
-
         let payload = self.read_payload(payload_length).await?;
 
         match packet_type {
             VERSION => Ok(MumblePacket::Version(Version::parse_from_bytes(&payload)?)),
+            UDP_TUNNEL => Ok(MumblePacket::UdpTunnel(VoicePacket::parse_from_bytes(payload)?)),
             AUTHENTICATE => Ok(MumblePacket::Authenticate(Authenticate::parse_from_bytes(&payload)?)),
             PING => Ok(MumblePacket::Ping(Ping::parse_from_bytes(&payload)?)),
             REJECT => Ok(MumblePacket::Reject(Reject::parse_from_bytes(&payload)?)),
@@ -163,36 +149,6 @@ impl<R> MumblePacketReader<R>
         }
     }
 
-    async fn read_voice_packet(&mut self) -> Result<VoicePacket, Error> {
-        let header = self.reader.read_u8().await?;
-        let (audio_packet_type, target) = decode_header(header);
-
-        if audio_packet_type == 1 {
-            let timestamp = self.read_varint().await?;
-            return Ok(VoicePacket::Ping(VoicePing {
-                timestamp
-            }));
-        }
-
-        let codec = match audio_packet_type {
-            0 => Codecs::CeltAlpha,
-            2 => Codecs::Speex,
-            3 => Codecs::CeltBeta,
-            4 => Codecs::Opus,
-            _ => return Err(Error::ParsingError)
-        };
-        let sequence_number = self.read_varint().await?;
-        let audio_payload = self.read_audio_payload(&codec).await?;
-        Ok(VoicePacket::AudioData(AudioData {
-            codec,
-            target,
-            session_id: None,
-            sequence_number,
-            audio_payload,
-            positional_info: None, //TODO
-        }))
-    }
-
     async fn read_varint(&mut self) -> Result<u64, Error> { //TODO negative number decode
         let header = self.reader.read_u8().await?;
 
@@ -239,46 +195,6 @@ impl<R> MumblePacketReader<R>
         Err(Error::ParsingError)
     }
 
-    async fn read_audio_payload(&mut self, codec_type: &Codecs) -> Result<Vec<u8>, Error> {
-        match codec_type {
-            Codecs::CeltAlpha | Codecs::Speex | Codecs::CeltBeta => {
-                let mut payload = vec![];
-                loop {
-                    let header = self.reader.read_u8().await?;
-                    let continuation_bit = header & 0b1000_0000;
-                    let length = header & 0b0111_1111;
-                    payload.push(header);
-                    if length == 0 {
-                        payload.push(0);
-                        break;
-                    }
-                    for _ in 0..length {
-                        payload.push(self.reader.read_u8().await?)
-                    }
-
-                    if continuation_bit == 0 {
-                        break;
-                    }
-                    if payload.len() > MAX_AUDIO_PACKET_SIZE {
-                        return Err(Error::ParsingError);
-                    }
-                }
-                Ok(payload)
-            }
-            Codecs::Opus => {
-                let mut payload = vec![];
-                let header = self.read_varint().await?;
-                let length = header & 0x1fff;
-                payload.append(&mut encode_varint(header));
-
-                for _ in 0..length {
-                    payload.push(self.reader.read_u8().await?)
-                }
-                Ok(payload)
-            }
-        }
-    }
-
     async fn read_payload(&mut self, payload_length: u32) -> tokio::io::Result<Vec<u8>> {
         let mut payload = vec![0; payload_length as usize];
         self.reader.read_exact(&mut payload).await?;
@@ -347,6 +263,27 @@ impl<W> MumblePacketWriter<W>
     }
 }
 
+impl VoicePacket {
+    fn parse_from_bytes(bytes: Vec<u8>) -> Result<VoicePacket, Error> {
+        if bytes.is_empty() {
+            return Err(Error::ParsingError)
+        }
+
+        let header = bytes.first().unwrap();
+        let (packet_type, _) = decode_header(header.clone());
+        if packet_type == 1 {
+            return Ok(VoicePacket::Ping(VoicePing {
+                bytes
+            }))
+        }
+
+        Ok(VoicePacket::AudioData(AudioData {
+            session_id: None,
+            bytes,
+        }))
+    }
+}
+
 fn decode_header(header: u8) -> (u8, u8) {
     let packet_type = header >> 5;
     let target = header & 0b0001_1111;
@@ -402,43 +339,22 @@ fn encode_varint(number: u64) -> Vec<u8> { //TODO negative number encode
 }
 
 fn serialize_voice_packet(packet: VoicePacket) -> Vec<u8> {
-    let mut result = vec![];
-
     match packet {
-        VoicePacket::Ping(value) => {
-            result.push(0b0010_0000);
-            let mut varint = encode_varint(value.timestamp);
-            result.append(&mut varint);
-        }
-        VoicePacket::AudioData(mut value) => {
-            let packet_type = match value.codec {
-                Codecs::CeltAlpha => 0b0000_0000,
-                Codecs::Speex => 0b0100_0000,
-                Codecs::CeltBeta => 0b0110_0000,
-                Codecs::Opus => 0b1000_0000,
-            };
-            let header = encode_header(packet_type, value.target);
-            result.push(header);
-
-            if let Some(session_id) = value.session_id {
-                let mut session_id = encode_varint(session_id);
-                result.append(&mut session_id);
-            }
-
-            let mut sequence_number = encode_varint(value.sequence_number);
-            result.append(&mut sequence_number);
-
-            result.append(&mut value.audio_payload);
-
-            if let Some(position_info) = value.positional_info {
-                result.extend_from_slice(&position_info[0].to_be_bytes());
-                result.extend_from_slice(&position_info[1].to_be_bytes());
-                result.extend_from_slice(&position_info[2].to_be_bytes());
+        VoicePacket::Ping(ping) => ping.bytes,
+        VoicePacket::AudioData(audio_data) => {
+            if let Some(session_id) = audio_data.session_id {
+                let mut bytes = audio_data.bytes;
+                let mut varint = encode_varint(session_id as u64);
+                let mut result = Vec::with_capacity(bytes.len() + varint.len());
+                let header = bytes.remove(0);
+                result.push(header);
+                result.append(&mut varint);
+                result.append(&mut bytes);
+                return result;
             }
+            audio_data.bytes
         }
     }
-
-    result
 }
 
 impl From<std::io::Error> for Error {

+ 6 - 0
src/server.rs

@@ -87,6 +87,12 @@ async fn process(db: Arc<Db>, stream: TlsStream<TcpStream>, clients: Clients) {
                 }
                 return;
             }
+            ResponseMessage::Talking(audio_data) => {
+                let clients = clients.read().await;
+                for client in clients.values().filter(|client| client.session_id != session_id) {
+                    client.post_message(Message::UserTalking(audio_data.clone()));
+                }
+            }
         }
     }
 }