This commit is contained in:
2024-12-30 17:22:16 -05:00
parent 10e80a0c2d
commit c7ca250b66
20 changed files with 529 additions and 472 deletions

View File

@@ -0,0 +1,170 @@
use crate::http::websocket::request::{RegisterTlmListenerRequest, WebsocketRequest};
use crate::http::websocket::response::{TlmValueResponse, WebsocketResponse};
use crate::telemetry::management_service::TelemetryManagementService;
use actix_web::{rt, web, HttpRequest, HttpResponse};
use actix_ws::{AggregatedMessage, ProtocolError, Session};
use anyhow::anyhow;
use log::{error, trace};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc::Sender;
use tokio::time::{sleep, Instant};
use tokio::{pin, select};
use tokio_util::sync::CancellationToken;
use tonic::codegen::tokio_stream::StreamExt;
pub mod request;
pub mod response;
fn handle_register_tlm_listener(
data: &Arc<TelemetryManagementService>,
request: RegisterTlmListenerRequest,
tx: &Sender<WebsocketResponse>,
) {
if let Some(tlm_data) = data.get_by_uuid(&request.uuid) {
let minimum_separation = Duration::from_millis(request.minimum_separation_ms as u64);
let mut rx = tlm_data.data.subscribe();
let tx = tx.clone();
rt::spawn(async move {
let mut last_sent_at = Instant::now() - minimum_separation;
let mut last_value = None;
let sleep = sleep(Duration::from_millis(0));
pin!(sleep);
loop {
select! {
_ = tx.closed() => {
break;
}
Ok(_) = rx.changed() => {
let now = Instant::now();
let value = {
let ref_val = rx.borrow_and_update();
ref_val.clone()
};
if last_sent_at + minimum_separation > now {
last_value = value;
sleep.as_mut().reset(last_sent_at + minimum_separation);
continue;
} else {
last_value = None;
last_sent_at = now;
}
let _ = tx.send(TlmValueResponse {
uuid: request.uuid.clone(),
value,
}.into()).await;
}
() = &mut sleep => {
if let Some(value) = last_value {
let _ = tx.send(TlmValueResponse {
uuid: request.uuid.clone(),
value: Some(value),
}.into()).await;
}
last_value = None;
let now = Instant::now();
last_sent_at = now;
}
}
}
});
}
}
async fn handle_websocket_message(
data: &Arc<TelemetryManagementService>,
request: WebsocketRequest,
tx: &Sender<WebsocketResponse>,
) {
match request {
WebsocketRequest::RegisterTlmListener(request) => {
handle_register_tlm_listener(data, request, tx)
}
};
}
async fn handle_websocket_response(
msg: WebsocketResponse,
session: &mut Session,
) -> anyhow::Result<bool> {
let msg_json = match serde_json::to_string(&msg) {
Ok(msg_json) => msg_json,
Err(err) => {
return Err(anyhow!("JSON Serialization Error Encountered {err}"));
}
};
if let Err(err) = session.text(msg_json).await {
return Err(anyhow!("Tx Error Encountered {err}"));
}
Ok(true)
}
async fn handle_websocket_incoming(
msg: Result<AggregatedMessage, ProtocolError>,
data: &Arc<TelemetryManagementService>,
session: &mut Session,
tx: &Sender<WebsocketResponse>,
) -> anyhow::Result<bool> {
match msg {
Ok(AggregatedMessage::Close(_)) => Ok(false),
Ok(AggregatedMessage::Text(msg)) => match serde_json::from_str::<WebsocketRequest>(&msg) {
Ok(request) => {
handle_websocket_message(data, request, tx).await;
Ok(true)
}
Err(err) => Err(anyhow!("JSON Deserialization Error Encountered {err}")),
},
Ok(AggregatedMessage::Ping(msg)) => match session.pong(&msg).await {
Ok(_) => Ok(true),
Err(err) => Err(anyhow!("Pong Encountered Error: {err}")),
},
Err(err) => Err(anyhow!("Rx Error Encountered {err}")),
_ => Err(anyhow!("Unexpected Message")),
}
}
pub async fn websocket_connect(
req: HttpRequest,
stream: web::Payload,
data: web::Data<Arc<TelemetryManagementService>>,
cancel_token: web::Data<CancellationToken>,
) -> Result<HttpResponse, actix_web::Error> {
trace!("websocket_connect");
let (res, mut session, stream) = actix_ws::handle(&req, stream)?;
let mut stream = stream
.aggregate_continuations()
// up to 1 MiB
.max_continuation_size(2_usize.pow(20));
let cancel_token = cancel_token.get_ref().clone();
rt::spawn(async move {
let (tx, mut rx) = tokio::sync::mpsc::channel::<WebsocketResponse>(128);
loop {
let result = select! {
_ = cancel_token.cancelled() => Ok(false),
Some(msg) = rx.recv() => handle_websocket_response(msg, &mut session).await,
Some(msg) = stream.next() => handle_websocket_incoming(msg, data.get_ref(), &mut session, &tx).await,
else => Ok(false),
};
match result {
Ok(true) => {}
Ok(false) => break,
Err(err) => {
error!("{}", err);
break;
}
}
}
rx.close();
let _ = session.close(None).await;
});
Ok(res)
}
pub fn setup_websocket(cfg: &mut web::ServiceConfig) {
cfg.route("", web::get().to(websocket_connect));
}