123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566 |
- use crate::protocol::connection::{ControlChannel, ControlChannelStats};
- use async_trait::async_trait;
- use futures::stream::SplitSink;
- use futures::stream::SplitStream;
- use futures::stream::StreamExt;
- use futures::stream::TryStreamExt;
- use futures::SinkExt;
- use mumble_protocol::control::ControlPacket;
- use mumble_protocol::control::ServerControlCodec;
- use mumble_protocol::voice::{Clientbound, Serverbound};
- use std::io::Error as IoError;
- use std::io::Error;
- use std::io::ErrorKind;
- use std::sync::atomic::{AtomicU32, Ordering};
- use tokio::net::TcpStream;
- use tokio::sync::Mutex;
- use tokio_rustls::server::TlsStream;
- use tokio_util::codec::Decoder;
- use tokio_util::codec::Framed;
- pub struct TcpControlChannel {
- received: AtomicU32,
- sink: Mutex<
- SplitSink<Framed<TlsStream<TcpStream>, ServerControlCodec>, ControlPacket<Clientbound>>,
- >,
- stream: Mutex<SplitStream<Framed<TlsStream<TcpStream>, ServerControlCodec>>>,
- }
- impl TcpControlChannel {
- pub fn new(stream: TlsStream<TcpStream>) -> Self {
- let (sink, stream) = ServerControlCodec::new().framed(stream).split();
- TcpControlChannel {
- sink: Mutex::new(sink),
- stream: Mutex::new(stream),
- received: AtomicU32::new(0),
- }
- }
- }
- #[async_trait]
- impl ControlChannel for TcpControlChannel {
- async fn send(&self, message: ControlPacket<Clientbound>) -> Result<(), Error> {
- let mut sink = self.sink.lock().await;
- Ok(sink.send(message).await?)
- }
- async fn receive(&self) -> Result<ControlPacket<Serverbound>, Error> {
- let mut stream = self.stream.lock().await;
- let message = stream.try_next().await?;
- match message {
- Some(msg) => {
- self.received.fetch_add(1, Ordering::Relaxed);
- Ok(msg)
- }
- None => Err(IoError::new(ErrorKind::BrokenPipe, "stream closed")),
- }
- }
- fn get_stats(&self) -> ControlChannelStats {
- ControlChannelStats {
- received: self.received.load(Ordering::Acquire),
- }
- }
- }
|