123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387 |
- extern crate argparse;
- extern crate bytes;
- extern crate futures;
- extern crate mpd;
- extern crate mumble_protocol;
- extern crate opus;
- mod commands;
- mod util;
- use argparse::ArgumentParser;
- use argparse::Store;
- use argparse::StoreTrue;
- use bytes::Bytes;
- use commands::create_commands;
- use commands::parse_command;
- use futures::channel::oneshot;
- use futures::join;
- use futures::stream::SplitSink;
- use futures::SinkExt;
- use futures::StreamExt;
- use mpd::Client;
- use mumble_protocol::control::msgs;
- use mumble_protocol::control::ClientControlCodec;
- use mumble_protocol::control::ControlPacket;
- use mumble_protocol::crypt::ClientCryptState;
- use mumble_protocol::crypt::CryptState;
- use mumble_protocol::voice::VoicePacket;
- use mumble_protocol::voice::VoicePacketPayload;
- use mumble_protocol::Clientbound;
- use mumble_protocol::Serverbound;
- use opus::Encoder;
- use std::collections::BTreeMap;
- use std::convert::Into;
- use std::convert::TryInto;
- use std::fs::File;
- use std::io::Read;
- use std::net::Ipv6Addr;
- use std::net::SocketAddr;
- use std::net::ToSocketAddrs;
- use std::thread;
- use std::time;
- use tokio::net::TcpStream;
- use tokio::net::UdpSocket;
- use tokio::sync::mpsc;
- use tokio_tls::TlsConnector;
- use tokio_util::codec::Decoder;
- use tokio_util::udp::UdpFramed;
- use util::display_song;
- struct AudioSettings {
- fifo_path: String,
- bitrate: i32,
- }
- async fn connect(
- server_addr: SocketAddr,
- server_host: String,
- user_name: String,
- accept_invalid_cert: bool,
- mpd_addr: String,
- crypt_state_sender: oneshot::Sender<ClientCryptState>,
- ) {
- // Wrap crypt_state_sender in Option, so we can call it only once
- let mut crypt_state_sender = Some(crypt_state_sender);
- // Connect to server via TCP
- let stream = TcpStream::connect(&server_addr)
- .await
- .expect("Failed to connect to server:");
- println!("TCP connected..");
- // Wrap the connection in TLS
- let mut builder = native_tls::TlsConnector::builder();
- builder.danger_accept_invalid_certs(accept_invalid_cert);
- let connector: TlsConnector = builder
- .build()
- .expect("Failed to create TLS connector")
- .into();
- let tls_stream = connector
- .connect(&server_host, stream)
- .await
- .expect("Failed to connect TLS: {}");
- println!("TLS connected..");
- // Wrap the TLS stream with Mumble's client-side control-channel codec
- let (mut sink, mut stream) = ClientControlCodec::new().framed(tls_stream).split();
- // Handshake (omitting `Version` message for brevity)
- let mut msg = msgs::Authenticate::new();
- msg.set_username(user_name);
- msg.set_opus(true);
- sink.send(msg.into()).await.unwrap();
- // Connect to MPD
- let mut conn = Client::connect(mpd_addr).unwrap();
- println!("Logging in..");
- let mut crypt_state = None;
- // Channel for packet events and periodic ping events
- let (tx, mut rx) = mpsc::channel(100);
- let mut tx1 = tx.clone();
- let mut tx2 = tx.clone();
- // Handle incoming packets
- tokio::spawn(async move {
- while let Some(packet) = stream.next().await {
- tx1.send(Some(packet)).await.unwrap()
- }
- });
- // Generate periodic tick events
- tokio::spawn(async move {
- loop {
- tx2.send(None).await.unwrap();
- thread::sleep(time::Duration::from_millis(2000));
- }
- });
- let mut paged_results = BTreeMap::new();
- let commands = create_commands();
- while let Some(i) = rx.recv().await {
- match i {
- Some(packet) => match packet.unwrap() {
- ControlPacket::TextMessage(mut msg) => {
- println!(
- "Got message from user with session ID {}: {}",
- msg.get_actor(),
- msg.get_message()
- );
- // Send reply back to server
- if let Some(reply) = parse_command(
- &mut conn,
- &mut paged_results,
- msg.get_actor(),
- &msg.take_message(),
- &commands,
- ) {
- let mut response = msgs::TextMessage::new();
- response.mut_session().push(msg.get_actor());
- response.set_message(reply);
- sink.send(response.into()).await.unwrap();
- }
- }
- ControlPacket::CryptSetup(msg) => {
- // Wait until we're fully connected before initiating UDP voice
- crypt_state = Some(ClientCryptState::new_from(
- msg.get_key()
- .try_into()
- .expect("Server sent private key with incorrect size"),
- msg.get_client_nonce()
- .try_into()
- .expect("Server sent client_nonce with incorrect size"),
- msg.get_server_nonce()
- .try_into()
- .expect("Server sent server_nonce with incorrect size"),
- ));
- }
- ControlPacket::ServerSync(_) => {
- println!("Logged in!");
- if let Some(sender) = crypt_state_sender.take() {
- let _ = sender.send(
- crypt_state
- .take()
- .expect("Server didn't send us any CryptSetup packet!"),
- );
- }
- }
- ControlPacket::Reject(msg) => {
- println!("Login rejected: {:?}", msg);
- }
- _ => {}
- },
- None => {
- // Update status
- let status = conn
- .currentsong()
- .ok()
- .flatten()
- .map(|s| display_song(&s))
- .flatten()
- .unwrap_or("".to_string());
- let mut response = msgs::UserState::new();
- response.set_comment(status);
- sink.send(response.into()).await.unwrap();
- sink.send(msgs::Ping::new().into()).await.unwrap();
- }
- }
- }
- }
- async fn handle_udp(
- server_addr: SocketAddr,
- crypt_state: oneshot::Receiver<ClientCryptState>,
- audio_settings: AudioSettings,
- ) {
- // Bind UDP socket
- let udp_socket = UdpSocket::bind((Ipv6Addr::from(0u128), 0u16))
- .await
- .expect("Failed to bind UDP socket");
- // Wait for initial CryptState
- let crypt_state = match crypt_state.await {
- Ok(crypt_state) => crypt_state,
- // disconnected before we received the CryptSetup packet, oh well
- Err(_) => return,
- };
- println!("UDP ready!");
- // Wrap the raw UDP packets in Mumble's crypto and voice codec (CryptState does both)
- let (mut sink, mut source) = UdpFramed::new(udp_socket, crypt_state).split();
- tokio::spawn(async move { send_audio(&mut sink, server_addr, audio_settings).await });
- // Handle incoming UDP packets
- while let Some(packet) = source.next().await {
- let (packet, _src_addr) = match packet {
- Ok(packet) => packet,
- Err(err) => {
- eprintln!("Got an invalid UDP packet: {}", err);
- // To be expected, considering this is the internet, just ignore it
- continue;
- }
- };
- match packet {
- VoicePacket::Ping { .. } => {
- // Note: A normal application would handle these and only use UDP for voice
- // once it has received one.
- println!("Pong");
- continue;
- }
- VoicePacket::Audio { .. } => {
- // Got audio, do nothing
- }
- }
- }
- }
- async fn send_audio(
- sink: &mut SplitSink<
- UdpFramed<CryptState<Serverbound, Clientbound>>,
- (VoicePacket<Serverbound>, std::net::SocketAddr),
- >,
- server_addr: SocketAddr,
- audio_settings: AudioSettings,
- ) {
- // Connect to unix named pipe
- let mut fifo = match File::open(&audio_settings.fifo_path) {
- Ok(f) => f,
- Err(e) => {
- eprintln!("Error opening FIFO at {}: {}", &audio_settings.fifo_path, e);
- return;
- }
- };
- // Init encoder
- let mut encoder =
- Encoder::new(48_000, opus::Channels::Stereo, opus::Application::Audio).unwrap();
- encoder.set_packet_loss_perc(10).unwrap();
- encoder
- .set_bitrate(opus::Bitrate::Bits(audio_settings.bitrate))
- .unwrap();
- // Must be multiple of 240 when sampling at 48kHz
- const BUF_SIZE: usize = 240 * 8;
- let mut buf = [0; BUF_SIZE];
- let mut opus_buf = [0; 72_000];
- let mut seq_num = 0;
- loop {
- match fifo.read_exact(&mut buf) {
- Ok(()) => {
- // Convert two u8 into a single i16
- let samples: Vec<i16> = buf
- .chunks(2)
- .into_iter()
- .map(|chunk| match chunk {
- [b, a] => ((*a as i16) << 8) + (*b as i16),
- _ => panic!(
- "BUG: Should be impossible, check that BUF_SIZE={} is even",
- BUF_SIZE
- ),
- })
- .collect();
- // Encode with Opus
- let payload = match encoder.encode(&samples, &mut opus_buf) {
- Ok(size) => {
- VoicePacketPayload::Opus(Bytes::from(opus_buf[0..size].to_owned()), false)
- }
- Err(e) => {
- println!("{:?}", e);
- continue;
- }
- };
- // Send!
- let reply = VoicePacket::Audio {
- _dst: std::marker::PhantomData,
- target: 0, // normal speech
- session_id: (), // unused for server-bound packets
- seq_num,
- payload,
- position_info: None,
- };
- sink.send((reply, server_addr)).await.unwrap();
- seq_num += 1;
- }
- Err(e) => {
- eprintln!("Error reading FIFO: {}", e);
- break;
- }
- }
- }
- }
- #[tokio::main]
- async fn main() {
- // Handle command line arguments
- let mut server_host = "".to_string();
- let mut server_port = 64738u16;
- let mut user_name = "mumble-bot-rs".to_string();
- let mut accept_invalid_cert = false;
- let mut bitrate = 96_000i32;
- let mut fifo_path = "".to_string();
- let mut mpd_addr = "127.0.0.1:6600".to_string();
- {
- let mut ap = ArgumentParser::new();
- ap.set_description("Mumble bot for streaming music from MPD");
- ap.refer(&mut server_host)
- .add_option(&["--host"], Store, "Hostname of mumble server")
- .required();
- ap.refer(&mut server_port)
- .add_option(&["--port"], Store, "Port of mumble server");
- ap.refer(&mut user_name)
- .add_option(&["--username"], Store, "User name used to connect");
- ap.refer(&mut accept_invalid_cert).add_option(
- &["--accept-invalid-cert"],
- StoreTrue,
- "Accept invalid TLS certificates",
- );
- ap.refer(&mut fifo_path)
- .add_option(
- &["--fifo-path"],
- Store,
- "Path to FIFO to read PCM audio data from",
- )
- .required();
- ap.refer(&mut bitrate)
- .add_option(&["--bitrate"], Store, "Opus encoder bitrate (bits/s)");
- ap.refer(&mut mpd_addr).add_option(
- &["--mpd-addr"],
- Store,
- "MPD TCP control socket address",
- );
- ap.parse_args_or_exit();
- }
- let server_addr = (server_host.as_ref(), server_port)
- .to_socket_addrs()
- .expect("Failed to parse server address")
- .next()
- .expect("Failed to resolve server address");
- // Oneshot channel for setting UDP CryptState from control task
- // For simplicity we don't deal with re-syncing, real applications would have to.
- let (crypt_state_sender, crypt_state_receiver) = oneshot::channel::<ClientCryptState>();
- // Run it
- join!(
- connect(
- server_addr,
- server_host,
- user_name,
- accept_invalid_cert,
- mpd_addr,
- crypt_state_sender,
- ),
- handle_udp(
- server_addr,
- crypt_state_receiver,
- AudioSettings { fifo_path, bitrate }
- )
- );
- }
|