tcp_control_channel.rs 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. use crate::protocol::connection::{ControlChannel, ControlChannelStats};
  2. use async_trait::async_trait;
  3. use futures::stream::SplitSink;
  4. use futures::stream::SplitStream;
  5. use futures::stream::StreamExt;
  6. use futures::stream::TryStreamExt;
  7. use futures::SinkExt;
  8. use mumble_protocol::control::ControlPacket;
  9. use mumble_protocol::control::ServerControlCodec;
  10. use mumble_protocol::voice::{Clientbound, Serverbound};
  11. use std::io::Error as IoError;
  12. use std::io::Error;
  13. use std::io::ErrorKind;
  14. use std::sync::atomic::{AtomicU32, Ordering};
  15. use tokio::net::TcpStream;
  16. use tokio::sync::Mutex;
  17. use tokio_rustls::server::TlsStream;
  18. use tokio_util::codec::Decoder;
  19. use tokio_util::codec::Framed;
  20. pub struct TcpControlChannel {
  21. received: AtomicU32,
  22. sink: Mutex<
  23. SplitSink<Framed<TlsStream<TcpStream>, ServerControlCodec>, ControlPacket<Clientbound>>,
  24. >,
  25. stream: Mutex<SplitStream<Framed<TlsStream<TcpStream>, ServerControlCodec>>>,
  26. }
  27. impl TcpControlChannel {
  28. pub fn new(stream: TlsStream<TcpStream>) -> Self {
  29. let (sink, stream) = ServerControlCodec::new().framed(stream).split();
  30. TcpControlChannel {
  31. sink: Mutex::new(sink),
  32. stream: Mutex::new(stream),
  33. received: AtomicU32::new(0),
  34. }
  35. }
  36. }
  37. #[async_trait]
  38. impl ControlChannel for TcpControlChannel {
  39. async fn send(&self, message: ControlPacket<Clientbound>) -> Result<(), Error> {
  40. let mut sink = self.sink.lock().await;
  41. Ok(sink.send(message).await?)
  42. }
  43. async fn receive(&self) -> Result<ControlPacket<Serverbound>, Error> {
  44. let mut stream = self.stream.lock().await;
  45. let message = stream.try_next().await?;
  46. match message {
  47. Some(msg) => {
  48. self.received.fetch_add(1, Ordering::Relaxed);
  49. Ok(msg)
  50. }
  51. None => Err(IoError::new(ErrorKind::BrokenPipe, "stream closed")),
  52. }
  53. }
  54. fn get_stats(&self) -> ControlChannelStats {
  55. ControlChannelStats {
  56. received: self.received.load(Ordering::Acquire),
  57. }
  58. }
  59. }