main.rs 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387
  1. extern crate argparse;
  2. extern crate bytes;
  3. extern crate futures;
  4. extern crate mpd;
  5. extern crate mumble_protocol;
  6. extern crate opus;
  7. mod commands;
  8. mod util;
  9. use argparse::ArgumentParser;
  10. use argparse::Store;
  11. use argparse::StoreTrue;
  12. use bytes::Bytes;
  13. use commands::create_commands;
  14. use commands::parse_command;
  15. use futures::channel::oneshot;
  16. use futures::join;
  17. use futures::stream::SplitSink;
  18. use futures::SinkExt;
  19. use futures::StreamExt;
  20. use mpd::Client;
  21. use mumble_protocol::control::msgs;
  22. use mumble_protocol::control::ClientControlCodec;
  23. use mumble_protocol::control::ControlPacket;
  24. use mumble_protocol::crypt::ClientCryptState;
  25. use mumble_protocol::crypt::CryptState;
  26. use mumble_protocol::voice::VoicePacket;
  27. use mumble_protocol::voice::VoicePacketPayload;
  28. use mumble_protocol::Clientbound;
  29. use mumble_protocol::Serverbound;
  30. use opus::Encoder;
  31. use std::collections::BTreeMap;
  32. use std::convert::Into;
  33. use std::convert::TryInto;
  34. use std::fs::File;
  35. use std::io::Read;
  36. use std::net::Ipv6Addr;
  37. use std::net::SocketAddr;
  38. use std::net::ToSocketAddrs;
  39. use std::thread;
  40. use std::time;
  41. use tokio::net::TcpStream;
  42. use tokio::net::UdpSocket;
  43. use tokio::sync::mpsc;
  44. use tokio_tls::TlsConnector;
  45. use tokio_util::codec::Decoder;
  46. use tokio_util::udp::UdpFramed;
  47. use util::display_song;
  48. struct AudioSettings {
  49. fifo_path: String,
  50. bitrate: i32,
  51. }
  52. async fn connect(
  53. server_addr: SocketAddr,
  54. server_host: String,
  55. user_name: String,
  56. accept_invalid_cert: bool,
  57. mpd_addr: String,
  58. crypt_state_sender: oneshot::Sender<ClientCryptState>,
  59. ) {
  60. // Wrap crypt_state_sender in Option, so we can call it only once
  61. let mut crypt_state_sender = Some(crypt_state_sender);
  62. // Connect to server via TCP
  63. let stream = TcpStream::connect(&server_addr)
  64. .await
  65. .expect("Failed to connect to server:");
  66. println!("TCP connected..");
  67. // Wrap the connection in TLS
  68. let mut builder = native_tls::TlsConnector::builder();
  69. builder.danger_accept_invalid_certs(accept_invalid_cert);
  70. let connector: TlsConnector = builder
  71. .build()
  72. .expect("Failed to create TLS connector")
  73. .into();
  74. let tls_stream = connector
  75. .connect(&server_host, stream)
  76. .await
  77. .expect("Failed to connect TLS: {}");
  78. println!("TLS connected..");
  79. // Wrap the TLS stream with Mumble's client-side control-channel codec
  80. let (mut sink, mut stream) = ClientControlCodec::new().framed(tls_stream).split();
  81. // Handshake (omitting `Version` message for brevity)
  82. let mut msg = msgs::Authenticate::new();
  83. msg.set_username(user_name);
  84. msg.set_opus(true);
  85. sink.send(msg.into()).await.unwrap();
  86. // Connect to MPD
  87. let mut conn = Client::connect(mpd_addr).unwrap();
  88. println!("Logging in..");
  89. let mut crypt_state = None;
  90. // Channel for packet events and periodic ping events
  91. let (tx, mut rx) = mpsc::channel(100);
  92. let mut tx1 = tx.clone();
  93. let mut tx2 = tx.clone();
  94. // Handle incoming packets
  95. tokio::spawn(async move {
  96. while let Some(packet) = stream.next().await {
  97. tx1.send(Some(packet)).await.unwrap()
  98. }
  99. });
  100. // Generate periodic tick events
  101. tokio::spawn(async move {
  102. loop {
  103. tx2.send(None).await.unwrap();
  104. thread::sleep(time::Duration::from_millis(2000));
  105. }
  106. });
  107. let mut paged_results = BTreeMap::new();
  108. let commands = create_commands();
  109. while let Some(i) = rx.recv().await {
  110. match i {
  111. Some(packet) => match packet.unwrap() {
  112. ControlPacket::TextMessage(mut msg) => {
  113. println!(
  114. "Got message from user with session ID {}: {}",
  115. msg.get_actor(),
  116. msg.get_message()
  117. );
  118. // Send reply back to server
  119. if let Some(reply) = parse_command(
  120. &mut conn,
  121. &mut paged_results,
  122. msg.get_actor(),
  123. &msg.take_message(),
  124. &commands,
  125. ) {
  126. let mut response = msgs::TextMessage::new();
  127. response.mut_session().push(msg.get_actor());
  128. response.set_message(reply);
  129. sink.send(response.into()).await.unwrap();
  130. }
  131. }
  132. ControlPacket::CryptSetup(msg) => {
  133. // Wait until we're fully connected before initiating UDP voice
  134. crypt_state = Some(ClientCryptState::new_from(
  135. msg.get_key()
  136. .try_into()
  137. .expect("Server sent private key with incorrect size"),
  138. msg.get_client_nonce()
  139. .try_into()
  140. .expect("Server sent client_nonce with incorrect size"),
  141. msg.get_server_nonce()
  142. .try_into()
  143. .expect("Server sent server_nonce with incorrect size"),
  144. ));
  145. }
  146. ControlPacket::ServerSync(_) => {
  147. println!("Logged in!");
  148. if let Some(sender) = crypt_state_sender.take() {
  149. let _ = sender.send(
  150. crypt_state
  151. .take()
  152. .expect("Server didn't send us any CryptSetup packet!"),
  153. );
  154. }
  155. }
  156. ControlPacket::Reject(msg) => {
  157. println!("Login rejected: {:?}", msg);
  158. }
  159. _ => {}
  160. },
  161. None => {
  162. // Update status
  163. let status = conn
  164. .currentsong()
  165. .ok()
  166. .flatten()
  167. .map(|s| display_song(&s))
  168. .flatten()
  169. .unwrap_or("".to_string());
  170. let mut response = msgs::UserState::new();
  171. response.set_comment(status);
  172. sink.send(response.into()).await.unwrap();
  173. sink.send(msgs::Ping::new().into()).await.unwrap();
  174. }
  175. }
  176. }
  177. }
  178. async fn handle_udp(
  179. server_addr: SocketAddr,
  180. crypt_state: oneshot::Receiver<ClientCryptState>,
  181. audio_settings: AudioSettings,
  182. ) {
  183. // Bind UDP socket
  184. let udp_socket = UdpSocket::bind((Ipv6Addr::from(0u128), 0u16))
  185. .await
  186. .expect("Failed to bind UDP socket");
  187. // Wait for initial CryptState
  188. let crypt_state = match crypt_state.await {
  189. Ok(crypt_state) => crypt_state,
  190. // disconnected before we received the CryptSetup packet, oh well
  191. Err(_) => return,
  192. };
  193. println!("UDP ready!");
  194. // Wrap the raw UDP packets in Mumble's crypto and voice codec (CryptState does both)
  195. let (mut sink, mut source) = UdpFramed::new(udp_socket, crypt_state).split();
  196. tokio::spawn(async move { send_audio(&mut sink, server_addr, audio_settings).await });
  197. // Handle incoming UDP packets
  198. while let Some(packet) = source.next().await {
  199. let (packet, _src_addr) = match packet {
  200. Ok(packet) => packet,
  201. Err(err) => {
  202. eprintln!("Got an invalid UDP packet: {}", err);
  203. // To be expected, considering this is the internet, just ignore it
  204. continue;
  205. }
  206. };
  207. match packet {
  208. VoicePacket::Ping { .. } => {
  209. // Note: A normal application would handle these and only use UDP for voice
  210. // once it has received one.
  211. println!("Pong");
  212. continue;
  213. }
  214. VoicePacket::Audio { .. } => {
  215. // Got audio, do nothing
  216. }
  217. }
  218. }
  219. }
  220. async fn send_audio(
  221. sink: &mut SplitSink<
  222. UdpFramed<CryptState<Serverbound, Clientbound>>,
  223. (VoicePacket<Serverbound>, std::net::SocketAddr),
  224. >,
  225. server_addr: SocketAddr,
  226. audio_settings: AudioSettings,
  227. ) {
  228. // Connect to unix named pipe
  229. let mut fifo = match File::open(&audio_settings.fifo_path) {
  230. Ok(f) => f,
  231. Err(e) => {
  232. eprintln!("Error opening FIFO at {}: {}", &audio_settings.fifo_path, e);
  233. return;
  234. }
  235. };
  236. // Init encoder
  237. let mut encoder =
  238. Encoder::new(48_000, opus::Channels::Stereo, opus::Application::Audio).unwrap();
  239. encoder.set_packet_loss_perc(10).unwrap();
  240. encoder
  241. .set_bitrate(opus::Bitrate::Bits(audio_settings.bitrate))
  242. .unwrap();
  243. // Must be multiple of 240 when sampling at 48kHz
  244. const BUF_SIZE: usize = 240 * 8;
  245. let mut buf = [0; BUF_SIZE];
  246. let mut opus_buf = [0; 72_000];
  247. let mut seq_num = 0;
  248. loop {
  249. match fifo.read_exact(&mut buf) {
  250. Ok(()) => {
  251. // Convert two u8 into a single i16
  252. let samples: Vec<i16> = buf
  253. .chunks(2)
  254. .into_iter()
  255. .map(|chunk| match chunk {
  256. [b, a] => ((*a as i16) << 8) + (*b as i16),
  257. _ => panic!(
  258. "BUG: Should be impossible, check that BUF_SIZE={} is even",
  259. BUF_SIZE
  260. ),
  261. })
  262. .collect();
  263. // Encode with Opus
  264. let payload = match encoder.encode(&samples, &mut opus_buf) {
  265. Ok(size) => {
  266. VoicePacketPayload::Opus(Bytes::from(opus_buf[0..size].to_owned()), false)
  267. }
  268. Err(e) => {
  269. println!("{:?}", e);
  270. continue;
  271. }
  272. };
  273. // Send!
  274. let reply = VoicePacket::Audio {
  275. _dst: std::marker::PhantomData,
  276. target: 0, // normal speech
  277. session_id: (), // unused for server-bound packets
  278. seq_num,
  279. payload,
  280. position_info: None,
  281. };
  282. sink.send((reply, server_addr)).await.unwrap();
  283. seq_num += 1;
  284. }
  285. Err(e) => {
  286. eprintln!("Error reading FIFO: {}", e);
  287. break;
  288. }
  289. }
  290. }
  291. }
  292. #[tokio::main]
  293. async fn main() {
  294. // Handle command line arguments
  295. let mut server_host = "".to_string();
  296. let mut server_port = 64738u16;
  297. let mut user_name = "mumble-bot-rs".to_string();
  298. let mut accept_invalid_cert = false;
  299. let mut bitrate = 96_000i32;
  300. let mut fifo_path = "".to_string();
  301. let mut mpd_addr = "127.0.0.1:6600".to_string();
  302. {
  303. let mut ap = ArgumentParser::new();
  304. ap.set_description("Mumble bot for streaming music from MPD");
  305. ap.refer(&mut server_host)
  306. .add_option(&["--host"], Store, "Hostname of mumble server")
  307. .required();
  308. ap.refer(&mut server_port)
  309. .add_option(&["--port"], Store, "Port of mumble server");
  310. ap.refer(&mut user_name)
  311. .add_option(&["--username"], Store, "User name used to connect");
  312. ap.refer(&mut accept_invalid_cert).add_option(
  313. &["--accept-invalid-cert"],
  314. StoreTrue,
  315. "Accept invalid TLS certificates",
  316. );
  317. ap.refer(&mut fifo_path)
  318. .add_option(
  319. &["--fifo-path"],
  320. Store,
  321. "Path to FIFO to read PCM audio data from",
  322. )
  323. .required();
  324. ap.refer(&mut bitrate)
  325. .add_option(&["--bitrate"], Store, "Opus encoder bitrate (bits/s)");
  326. ap.refer(&mut mpd_addr).add_option(
  327. &["--mpd-addr"],
  328. Store,
  329. "MPD TCP control socket address",
  330. );
  331. ap.parse_args_or_exit();
  332. }
  333. let server_addr = (server_host.as_ref(), server_port)
  334. .to_socket_addrs()
  335. .expect("Failed to parse server address")
  336. .next()
  337. .expect("Failed to resolve server address");
  338. // Oneshot channel for setting UDP CryptState from control task
  339. // For simplicity we don't deal with re-syncing, real applications would have to.
  340. let (crypt_state_sender, crypt_state_receiver) = oneshot::channel::<ClientCryptState>();
  341. // Run it
  342. join!(
  343. connect(
  344. server_addr,
  345. server_host,
  346. user_name,
  347. accept_invalid_cert,
  348. mpd_addr,
  349. crypt_state_sender,
  350. ),
  351. handle_udp(
  352. server_addr,
  353. crypt_state_receiver,
  354. AudioSettings { fifo_path, bitrate }
  355. )
  356. );
  357. }