use crate::http::websocket::request::{ RegisterTlmListenerRequest, UnregisterTlmListenerRequest, WebsocketRequest, }; use crate::http::websocket::response::{TlmValueResponse, WebsocketResponse}; use crate::telemetry::management_service::TelemetryManagementService; use actix_web::{rt, web, HttpRequest, HttpResponse}; use actix_ws::{AggregatedMessage, ProtocolError, Session}; use anyhow::anyhow; use futures_util::StreamExt; use log::{error, trace}; use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; use tokio::select; use tokio::sync::mpsc::Sender; use tokio::time::{sleep_until, Instant}; use tokio_util::sync::CancellationToken; use uuid::Uuid; pub mod request; pub mod response; fn handle_register_tlm_listener( data: &Arc, request: RegisterTlmListenerRequest, tx: &Sender, tlm_listeners: &mut HashMap, ) { if let Some(tlm_data) = data.get_by_uuid(&request.uuid) { let token = CancellationToken::new(); if let Some(token) = tlm_listeners.insert(tlm_data.definition.uuid, token.clone()) { token.cancel(); } let minimum_separation = Duration::from_millis(request.minimum_separation_ms as u64); let mut rx = tlm_data.data.subscribe(); let tx = tx.clone(); rt::spawn(async move { loop { let now = select! { biased; _ = tx.closed() => { break; } _ = token.cancelled() => { break; } Ok(_) = rx.changed() => { let now = Instant::now(); let value = { let ref_val = rx.borrow_and_update(); ref_val.clone() }; let _ = tx.send(TlmValueResponse { uuid: request.uuid, value, }.into()).await; now } }; select! { biased; _ = tx.closed() => { break; } _ = token.cancelled() => { break; } _ = sleep_until(now + minimum_separation) => {} } } }); } } fn handle_unregister_tlm_listener( request: UnregisterTlmListenerRequest, tlm_listeners: &mut HashMap, ) { if let Some(token) = tlm_listeners.remove(&request.uuid) { token.cancel(); } } async fn handle_websocket_message( data: &Arc, request: WebsocketRequest, tx: &Sender, tlm_listeners: &mut HashMap, ) { match request { WebsocketRequest::RegisterTlmListener(request) => { handle_register_tlm_listener(data, request, tx, tlm_listeners) } WebsocketRequest::UnregisterTlmListener(request) => { handle_unregister_tlm_listener(request, tlm_listeners) } }; } async fn handle_websocket_response( msg: WebsocketResponse, session: &mut Session, ) -> anyhow::Result { let msg_json = match serde_json::to_string(&msg) { Ok(msg_json) => msg_json, Err(err) => { return Err(anyhow!("JSON Serialization Error Encountered {err}")); } }; if let Err(err) = session.text(msg_json).await { return Err(anyhow!("Tx Error Encountered {err}")); } Ok(true) } async fn handle_websocket_incoming( msg: Result, data: &Arc, session: &mut Session, tx: &Sender, tlm_listeners: &mut HashMap, ) -> anyhow::Result { match msg { Ok(AggregatedMessage::Close(_)) => Ok(false), Ok(AggregatedMessage::Text(msg)) => match serde_json::from_str::(&msg) { Ok(request) => { handle_websocket_message(data, request, tx, tlm_listeners).await; Ok(true) } Err(err) => Err(anyhow!("JSON Deserialization Error Encountered {err}")), }, Ok(AggregatedMessage::Ping(msg)) => match session.pong(&msg).await { Ok(_) => Ok(true), Err(err) => Err(anyhow!("Pong Encountered Error: {err}")), }, Err(err) => Err(anyhow!("Rx Error Encountered {err}")), _ => Err(anyhow!("Unexpected Message")), } } async fn websocket_connect( req: HttpRequest, stream: web::Payload, data: web::Data>, cancel_token: web::Data, ) -> Result { trace!("websocket_connect"); let (res, mut session, stream) = actix_ws::handle(&req, stream)?; let mut stream = stream .aggregate_continuations() // up to 1 MiB .max_continuation_size(2_usize.pow(20)); let cancel_token = cancel_token.get_ref().clone(); rt::spawn(async move { let mut tlm_listeners = HashMap::new(); let (tx, mut rx) = tokio::sync::mpsc::channel::(128); loop { let result = select! { _ = cancel_token.cancelled() => Ok(false), Some(msg) = rx.recv() => handle_websocket_response(msg, &mut session).await, Some(msg) = stream.next() => handle_websocket_incoming(msg, data.get_ref(), &mut session, &tx, &mut tlm_listeners).await, else => Ok(false), }; match result { Ok(true) => {} Ok(false) => break, Err(err) => { error!("{}", err); break; } } } rx.close(); let _ = session.close(None).await; for (_, token) in tlm_listeners.drain() { token.cancel(); } }); Ok(res) } pub fn setup_websocket(cfg: &mut web::ServiceConfig) { cfg.route("", web::get().to(websocket_connect)); }