diff --git a/Cargo.lock b/Cargo.lock index c3b2d1e..f9d8b1c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -268,9 +268,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.90" +version = "1.0.95" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37bf3594c4c988a53154954629820791dde498571819ae4ca50ca811e060cc95" +checksum = "34ac096ce696dc2fcabef30516bb13c0a68a11d30131d3df6f04711467681b04" [[package]] name = "async-stream" @@ -485,15 +485,6 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e" -[[package]] -name = "convert_case" -version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec182b0ca2f35d8fc196cf3404988fd8b8c739a4d270ff118a398feb0cbec1ca" -dependencies = [ - "unicode-segmentation", -] - [[package]] name = "cookie" version = "0.16.2" @@ -554,7 +545,7 @@ version = "0.99.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f33878137e4dafd7fa914ad4e259e18a4e8e532b9617a2d0150262bf53abfce" dependencies = [ - "convert_case 0.4.0", + "convert_case", "proc-macro2", "quote", "rustc_version", @@ -576,11 +567,9 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cb7330aeadfbe296029522e6c40f315320aba36fc43a5b3632f3795348f3bd22" dependencies = [ - "convert_case 0.6.0", "proc-macro2", "quote", "syn", - "unicode-xid", ] [[package]] @@ -685,17 +674,6 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" -[[package]] -name = "futures-macro" -version = "0.3.31" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "futures-sink" version = "0.3.31" @@ -715,11 +693,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ "futures-core", - "futures-macro", "futures-task", "pin-project-lite", "pin-utils", - "slab", ] [[package]] @@ -1265,9 +1241,9 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.88" +version = "1.0.92" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c3a7fc5db1e57d5a779a352c8cdb57b29aa4c40cc69c3a68a7fedc815fbf2f9" +checksum = "37d3544b3f2748c54e147655edb5025752e2303145b5aefb3c3ea2c78b973bb0" dependencies = [ "unicode-ident", ] @@ -1520,10 +1496,10 @@ version = "0.1.0" dependencies = [ "actix-web", "actix-ws", + "anyhow", "chrono", "derive_more 1.0.0", "fern", - "futures-util", "hex", "log", "papaya", @@ -1531,6 +1507,7 @@ dependencies = [ "rand", "serde", "serde_json", + "thiserror", "tokio", "tokio-util", "tonic", @@ -1602,9 +1579,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.79" +version = "2.0.93" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89132cd0bf050864e1d38dc3bbc07a0eb8e7530af26344d3d2bbbef83499f590" +checksum = "9c786062daee0d6db1132800e623df74274a0a87322d8e183338e01b3d98d058" dependencies = [ "proc-macro2", "quote", @@ -1636,6 +1613,26 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "thiserror" +version = "2.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f072643fd0190df67a8bab670c20ef5d8737177d6ac6b2e9a236cb096206b2cc" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "2.0.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b50fa271071aae2e6ee85f842e2e28ba8cd2c5fb67f11fcb1fd70b276f9e7d4" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "time" version = "0.3.36" @@ -1890,18 +1887,6 @@ dependencies = [ "tinyvec", ] -[[package]] -name = "unicode-segmentation" -version = "1.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d4c87d22b6e3f4a18d4d40ef354e97c90fcb14dd91d7dc0aa9d8a1172ebf7202" - -[[package]] -name = "unicode-xid" -version = "0.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "229730647fbc343e3a80e463c1db7f78f3855d3f3739bee0dda773c9a037c90a" - [[package]] name = "url" version = "2.5.2" diff --git a/examples/simple_producer/src/main.rs b/examples/simple_producer/src/main.rs index 26b6dcf..380aeb5 100644 --- a/examples/simple_producer/src/main.rs +++ b/examples/simple_producer/src/main.rs @@ -144,15 +144,24 @@ async fn main() -> Result<(), Box> { let mut tlm = Telemetry::new("http://[::1]:50051").await?; let index_handle = tlm - .register("simple_producer/time_offset".into(), TelemetryDataType::Float64) + .register( + "simple_producer/time_offset".into(), + TelemetryDataType::Float64, + ) .await?; let publish_offset = tlm - .register("simple_producer/publish_offset".into(), TelemetryDataType::Float64) + .register( + "simple_producer/publish_offset".into(), + TelemetryDataType::Float64, + ) .await?; let await_offset = tlm - .register("simple_producer/await_offset".into(), TelemetryDataType::Float64) + .register( + "simple_producer/await_offset".into(), + TelemetryDataType::Float64, + ) .await?; let sin_tlm_handle = tlm @@ -215,89 +224,75 @@ async fn main() -> Result<(), Box> { next_time += Duration::from_millis(10); index += 1; tokio::time::sleep_until(next_time).await; - let publish_time = start_time + chrono::TimeDelta::from_std(next_time - start_instant).unwrap(); + let publish_time = + start_time + chrono::TimeDelta::from_std(next_time - start_instant).unwrap(); let actual_time = Instant::now(); - tasks.push(index_handle - .publish( - Value::Float64((actual_time - next_time).as_secs_f64()), - chrono::Utc::now(), - )); - tasks.push(sin_tlm_handle - .publish( - Value::Float32((f32::TAU() * (index as f32) / (1000.0_f32)).sin()), - publish_time, - )); - tasks.push(cos_tlm_handle - .publish( - Value::Float64((f64::TAU() * (index as f64) / (1000.0_f64)).cos()), - publish_time, - )); - tasks.push(sin2_tlm_handle - .publish( - Value::Float32((f32::TAU() * (index as f32) / (500.0_f32)).sin()), - publish_time, - )); - tasks.push(cos2_tlm_handle - .publish( - Value::Float64((f64::TAU() * (index as f64) / (500.0_f64)).cos()), - publish_time, - )); - tasks.push(sin3_tlm_handle - .publish( - Value::Float32((f32::TAU() * (index as f32) / (333.0_f32)).sin()), - publish_time, - )); - tasks.push(cos3_tlm_handle - .publish( - Value::Float64((f64::TAU() * (index as f64) / (333.0_f64)).cos()), - publish_time, - )); - tasks.push(sin4_tlm_handle - .publish( - Value::Float32((f32::TAU() * (index as f32) / (250.0_f32)).sin()), - publish_time, - )); - tasks.push(cos4_tlm_handle - .publish( - Value::Float64((f64::TAU() * (index as f64) / (250.0_f64)).cos()), - publish_time, - )); - tasks.push(sin5_tlm_handle - .publish( - Value::Float32((f32::TAU() * (index as f32) / (200.0_f32)).sin()), - publish_time, - )); - tasks.push(cos5_tlm_handle - .publish( - Value::Float64((f64::TAU() * (index as f64) / (200.0_f64)).cos()), - publish_time, - )); - tasks.push(sin6_tlm_handle - .publish( - Value::Float32((f32::TAU() * (index as f32) / (166.0_f32)).sin()), - publish_time, - )); - tasks.push(cos6_tlm_handle - .publish( - Value::Float64((f64::TAU() * (index as f64) / (166.0_f64)).cos()), - publish_time, - )); + tasks.push(index_handle.publish( + Value::Float64((actual_time - next_time).as_secs_f64()), + chrono::Utc::now(), + )); + tasks.push(sin_tlm_handle.publish( + Value::Float32((f32::TAU() * (index as f32) / (1000.0_f32)).sin()), + publish_time, + )); + tasks.push(cos_tlm_handle.publish( + Value::Float64((f64::TAU() * (index as f64) / (1000.0_f64)).cos()), + publish_time, + )); + tasks.push(sin2_tlm_handle.publish( + Value::Float32((f32::TAU() * (index as f32) / (500.0_f32)).sin()), + publish_time, + )); + tasks.push(cos2_tlm_handle.publish( + Value::Float64((f64::TAU() * (index as f64) / (500.0_f64)).cos()), + publish_time, + )); + tasks.push(sin3_tlm_handle.publish( + Value::Float32((f32::TAU() * (index as f32) / (333.0_f32)).sin()), + publish_time, + )); + tasks.push(cos3_tlm_handle.publish( + Value::Float64((f64::TAU() * (index as f64) / (333.0_f64)).cos()), + publish_time, + )); + tasks.push(sin4_tlm_handle.publish( + Value::Float32((f32::TAU() * (index as f32) / (250.0_f32)).sin()), + publish_time, + )); + tasks.push(cos4_tlm_handle.publish( + Value::Float64((f64::TAU() * (index as f64) / (250.0_f64)).cos()), + publish_time, + )); + tasks.push(sin5_tlm_handle.publish( + Value::Float32((f32::TAU() * (index as f32) / (200.0_f32)).sin()), + publish_time, + )); + tasks.push(cos5_tlm_handle.publish( + Value::Float64((f64::TAU() * (index as f64) / (200.0_f64)).cos()), + publish_time, + )); + tasks.push(sin6_tlm_handle.publish( + Value::Float32((f32::TAU() * (index as f32) / (166.0_f32)).sin()), + publish_time, + )); + tasks.push(cos6_tlm_handle.publish( + Value::Float64((f64::TAU() * (index as f64) / (166.0_f64)).cos()), + publish_time, + )); - tasks.push(publish_offset - .publish( - Value::Float64((Instant::now() - actual_time).as_secs_f64()), - chrono::Utc::now(), - )); + tasks.push(publish_offset.publish( + Value::Float64((Instant::now() - actual_time).as_secs_f64()), + chrono::Utc::now(), + )); for task in tasks.drain(..) { task.await?; } - tasks.push(await_offset - .publish( - Value::Float64((Instant::now() - actual_time).as_secs_f64()), - chrono::Utc::now(), - )); + tasks.push(await_offset.publish( + Value::Float64((Instant::now() - actual_time).as_secs_f64()), + chrono::Utc::now(), + )); } Ok(()) diff --git a/server/Cargo.toml b/server/Cargo.toml index 469129e..a442ed5 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -18,10 +18,11 @@ actix-ws = "0.3.0" tokio-util = "0.7.12" serde = { version = "1.0.210", features = ["derive"] } serde_json = "1.0.132" -derive_more = { version = "1.0.0", features = ["full"] } hex = "0.4.3" -futures-util = "0.3.31" papaya = "0.1.7" +thiserror = "2.0.9" +derive_more = { version = "1.0.0", features = ["from"] } +anyhow = "1.0.95" [build-dependencies] tonic-build = "0.12.3" diff --git a/server/src/grpc.rs b/server/src/grpc.rs index fb18f66..378e394 100644 --- a/server/src/grpc.rs +++ b/server/src/grpc.rs @@ -4,7 +4,9 @@ use crate::core::{ TelemetryDataType, TelemetryDefinitionRequest, TelemetryDefinitionResponse, TelemetryInsertResponse, TelemetryItem, TelemetryValue, Uuid, }; -use crate::telemetry::{TelemetryDataItem, TelemetryDataValue, TelemetryManagementService}; +use crate::telemetry::data_item::TelemetryDataItem; +use crate::telemetry::data_value::TelemetryDataValue; +use crate::telemetry::management_service::TelemetryManagementService; use chrono::{DateTime, SecondsFormat}; use log::{error, trace}; use std::error::Error; diff --git a/server/src/http.rs b/server/src/http.rs deleted file mode 100644 index ef7b9ce..0000000 --- a/server/src/http.rs +++ /dev/null @@ -1,232 +0,0 @@ -use crate::telemetry::{TelemetryDataItem, TelemetryManagementService}; -use actix_web::http::header::ContentType; -use actix_web::http::StatusCode; -use actix_web::{error, get, rt, web, App, HttpRequest, HttpResponse, HttpServer, Responder}; -use actix_ws::AggregatedMessage; -use derive_more::{Display, Error}; -use log::{error, trace}; -use serde::{Deserialize, Serialize}; -use std::sync::Arc; -use std::time::Duration; -use tokio::{pin, select}; -use tokio::sync::mpsc::Sender; -use tokio::time::{sleep, Instant}; -use tokio_util::sync::CancellationToken; -use tonic::codegen::tokio_stream::StreamExt; - -#[derive(Debug, Display, Error)] -enum UserError { - #[display("Telemetry Not Found: {tlm}")] - TlmNotFound { tlm: String }, -} - -impl error::ResponseError for UserError { - fn status_code(&self) -> StatusCode { - match *self { - UserError::TlmNotFound { .. } => StatusCode::NOT_FOUND, - } - } - fn error_response(&self) -> HttpResponse { - HttpResponse::build(self.status_code()) - .insert_header(ContentType::html()) - .body(self.to_string()) - } -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -struct RegisterTlmListenerRequest { - uuid: String, - minimum_separation_ms: u32, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -enum WebsocketRequest { - RegisterTlmListener(RegisterTlmListenerRequest), -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -enum WebsocketResponse { - TlmValue { - uuid: String, - value: Option, - }, -} - -#[get("/tlm/{name:[\\w\\d/_-]+}")] -async fn get_tlm_definition( - data: web::Data>, - name: web::Path, -) -> Result { - let string = name.to_string(); - trace!("get_tlm_definition {}", string); - let Some(data) = data.get_by_name(&string) else { - return Err(UserError::TlmNotFound { tlm: string }); - }; - - Ok(web::Json(data.definition.clone())) -} - -fn handle_register_tlm_listener(data: &Arc, request: RegisterTlmListenerRequest, tx: &Sender) { - if let Some(tlm_data) = data.get_by_uuid(&request.uuid) { - let minimum_separation = Duration::from_millis(request.minimum_separation_ms as u64); - let mut rx = tlm_data.data.subscribe(); - let tx = tx.clone(); - rt::spawn(async move { - let mut last_sent_at = Instant::now() - minimum_separation; - let mut last_value = None; - let sleep = sleep(Duration::from_millis(0)); - pin!(sleep); - loop { - select! { - _ = tx.closed() => { - break; - } - Ok(_) = rx.changed() => { - let now = Instant::now(); - let value = { - let ref_val = rx.borrow_and_update(); - ref_val.clone() - }; - if last_sent_at + minimum_separation > now { - last_value = value; - sleep.as_mut().reset(last_sent_at + minimum_separation); - continue; - } else { - last_value = None; - last_sent_at = now; - } - let _ = tx.send(WebsocketResponse::TlmValue { - uuid: request.uuid.clone(), - value, - }).await; - } - () = &mut sleep => { - if let Some(value) = last_value { - let _ = tx.send(WebsocketResponse::TlmValue { - uuid: request.uuid.clone(), - value: Some(value), - }).await; - } - last_value = None; - let now = Instant::now(); - last_sent_at = now; - } - } - } - }); - } -} - -async fn handle_websocket_message(data: &Arc, request: WebsocketRequest, tx: &Sender) { - match request { - WebsocketRequest::RegisterTlmListener(request) => handle_register_tlm_listener(data, request, tx), - }; -} - -async fn websocket_connect( - req: HttpRequest, - stream: web::Payload, - data: web::Data>, - cancel_token: web::Data, -) -> Result { - trace!("websocket_connect"); - let (res, mut session, stream) = actix_ws::handle(&req, stream)?; - - let mut stream = stream - .aggregate_continuations() - // up to 1 MiB - .max_continuation_size(2_usize.pow(20)); - - let cancel_token = cancel_token.get_ref().clone(); - - rt::spawn(async move { - let (tx, mut rx) = tokio::sync::mpsc::channel::(128); - loop { - select! { - _ = cancel_token.cancelled() => { - break; - }, - Some(msg) = rx.recv() => { - let msg_json = match serde_json::to_string(&msg) { - Ok(msg_json) => msg_json, - Err(err) => { - error!("JSON Serialization Error Encountered {err}"); - break; - }, - }; - if let Err(err) = session.text(msg_json).await { - error!("Tx Error Encountered {err}"); - break; - } - }, - Some(msg) = stream.next() => { - let result = match msg { - Ok(AggregatedMessage::Close(_)) => { - break; - }, - Ok(AggregatedMessage::Text(msg)) => { - match serde_json::from_str::(&msg) { - Ok(request) => { - handle_websocket_message(data.get_ref(), request, &tx).await; - }, - Err(err) => { - error!("JSON Deserialization Error Encountered {err}"); - break; - } - } - Ok(()) - }, - Ok(AggregatedMessage::Ping(msg)) => { - session.pong(&msg).await - }, - Err(err) => { - error!("Rx Error Encountered {err}"); - break; - }, - _ => { - error!("Unexpected Message"); - break; - } - }; - if let Err(err) = result { - error!("Tx Error Encountered {err}"); - break; - } - }, - else => { - break; - }, - } - } - rx.close(); - let _ = session.close(None).await; - }); - - Ok(res) -} - -fn setup_api(cfg: &mut web::ServiceConfig) { - cfg.service(get_tlm_definition); -} - -pub async fn setup( - cancellation_token: CancellationToken, - telemetry_definitions: Arc, -) -> Result<(), Box> { - let data = web::Data::new(telemetry_definitions); - let cancel_token = web::Data::new(cancellation_token); - - trace!("Starting HTTP Server"); - HttpServer::new(move || { - App::new() - .app_data(data.clone()) - .app_data(cancel_token.clone()) - .route("/ws", web::get().to(websocket_connect)) - .service(web::scope("/api").configure(setup_api)) - }) - .bind("localhost:8080")? - .run() - .await?; - - Ok(()) -} diff --git a/server/src/http/api/mod.rs b/server/src/http/api/mod.rs new file mode 100644 index 0000000..4d5f88c --- /dev/null +++ b/server/src/http/api/mod.rs @@ -0,0 +1,23 @@ +use crate::http::error::HttpServerResultError; +use crate::telemetry::management_service::TelemetryManagementService; +use actix_web::{get, web, Responder}; +use log::trace; +use std::sync::Arc; + +#[get("/tlm/{name:[\\w\\d/_-]+}")] +async fn get_tlm_definition( + data: web::Data>, + name: web::Path, +) -> Result { + let string = name.to_string(); + trace!("get_tlm_definition {}", string); + let Some(data) = data.get_by_name(&string) else { + return Err(HttpServerResultError::TlmNotFound { tlm: string }); + }; + + Ok(web::Json(data.definition.clone())) +} + +pub fn setup_api(cfg: &mut web::ServiceConfig) { + cfg.service(get_tlm_definition); +} diff --git a/server/src/http/error.rs b/server/src/http/error.rs new file mode 100644 index 0000000..4168e6c --- /dev/null +++ b/server/src/http/error.rs @@ -0,0 +1,27 @@ +use actix_web::error::ResponseError; +use actix_web::http::header::ContentType; +use actix_web::http::StatusCode; +use actix_web::HttpResponse; +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum WebsocketResponseError {} + +#[derive(Error, Debug)] +pub enum HttpServerResultError { + #[error("Telemetry Not Found: {tlm}")] + TlmNotFound { tlm: String }, +} + +impl ResponseError for HttpServerResultError { + fn status_code(&self) -> StatusCode { + match *self { + HttpServerResultError::TlmNotFound { .. } => StatusCode::NOT_FOUND, + } + } + fn error_response(&self) -> HttpResponse { + HttpResponse::build(self.status_code()) + .insert_header(ContentType::html()) + .body(self.to_string()) + } +} diff --git a/server/src/http/mod.rs b/server/src/http/mod.rs new file mode 100644 index 0000000..427f593 --- /dev/null +++ b/server/src/http/mod.rs @@ -0,0 +1,34 @@ +mod api; +mod error; +mod websocket; + +use crate::http::api::setup_api; +use crate::http::websocket::setup_websocket; +use crate::telemetry::management_service::TelemetryManagementService; +use actix_web::{web, App, HttpServer}; +use log::trace; +use std::error::Error; +use std::sync::Arc; +use tokio_util::sync::CancellationToken; + +pub async fn setup( + cancellation_token: CancellationToken, + telemetry_definitions: Arc, +) -> Result<(), Box> { + let data = web::Data::new(telemetry_definitions); + let cancel_token = web::Data::new(cancellation_token); + + trace!("Starting HTTP Server"); + HttpServer::new(move || { + App::new() + .app_data(data.clone()) + .app_data(cancel_token.clone()) + .service(web::scope("/ws").configure(setup_websocket)) + .service(web::scope("/api").configure(setup_api)) + }) + .bind("localhost:8080")? + .run() + .await?; + + Ok(()) +} diff --git a/server/src/http/websocket/mod.rs b/server/src/http/websocket/mod.rs new file mode 100644 index 0000000..1c52849 --- /dev/null +++ b/server/src/http/websocket/mod.rs @@ -0,0 +1,170 @@ +use crate::http::websocket::request::{RegisterTlmListenerRequest, WebsocketRequest}; +use crate::http::websocket::response::{TlmValueResponse, WebsocketResponse}; +use crate::telemetry::management_service::TelemetryManagementService; +use actix_web::{rt, web, HttpRequest, HttpResponse}; +use actix_ws::{AggregatedMessage, ProtocolError, Session}; +use anyhow::anyhow; +use log::{error, trace}; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::mpsc::Sender; +use tokio::time::{sleep, Instant}; +use tokio::{pin, select}; +use tokio_util::sync::CancellationToken; +use tonic::codegen::tokio_stream::StreamExt; + +pub mod request; +pub mod response; + +fn handle_register_tlm_listener( + data: &Arc, + request: RegisterTlmListenerRequest, + tx: &Sender, +) { + if let Some(tlm_data) = data.get_by_uuid(&request.uuid) { + let minimum_separation = Duration::from_millis(request.minimum_separation_ms as u64); + let mut rx = tlm_data.data.subscribe(); + let tx = tx.clone(); + rt::spawn(async move { + let mut last_sent_at = Instant::now() - minimum_separation; + let mut last_value = None; + let sleep = sleep(Duration::from_millis(0)); + pin!(sleep); + loop { + select! { + _ = tx.closed() => { + break; + } + Ok(_) = rx.changed() => { + let now = Instant::now(); + let value = { + let ref_val = rx.borrow_and_update(); + ref_val.clone() + }; + if last_sent_at + minimum_separation > now { + last_value = value; + sleep.as_mut().reset(last_sent_at + minimum_separation); + continue; + } else { + last_value = None; + last_sent_at = now; + } + let _ = tx.send(TlmValueResponse { + uuid: request.uuid.clone(), + value, + }.into()).await; + } + () = &mut sleep => { + if let Some(value) = last_value { + let _ = tx.send(TlmValueResponse { + uuid: request.uuid.clone(), + value: Some(value), + }.into()).await; + } + last_value = None; + let now = Instant::now(); + last_sent_at = now; + } + } + } + }); + } +} + +async fn handle_websocket_message( + data: &Arc, + request: WebsocketRequest, + tx: &Sender, +) { + match request { + WebsocketRequest::RegisterTlmListener(request) => { + handle_register_tlm_listener(data, request, tx) + } + }; +} + +async fn handle_websocket_response( + msg: WebsocketResponse, + session: &mut Session, +) -> anyhow::Result { + let msg_json = match serde_json::to_string(&msg) { + Ok(msg_json) => msg_json, + Err(err) => { + return Err(anyhow!("JSON Serialization Error Encountered {err}")); + } + }; + if let Err(err) = session.text(msg_json).await { + return Err(anyhow!("Tx Error Encountered {err}")); + } + + Ok(true) +} + +async fn handle_websocket_incoming( + msg: Result, + data: &Arc, + session: &mut Session, + tx: &Sender, +) -> anyhow::Result { + match msg { + Ok(AggregatedMessage::Close(_)) => Ok(false), + Ok(AggregatedMessage::Text(msg)) => match serde_json::from_str::(&msg) { + Ok(request) => { + handle_websocket_message(data, request, tx).await; + Ok(true) + } + Err(err) => Err(anyhow!("JSON Deserialization Error Encountered {err}")), + }, + Ok(AggregatedMessage::Ping(msg)) => match session.pong(&msg).await { + Ok(_) => Ok(true), + Err(err) => Err(anyhow!("Pong Encountered Error: {err}")), + }, + Err(err) => Err(anyhow!("Rx Error Encountered {err}")), + _ => Err(anyhow!("Unexpected Message")), + } +} + +pub async fn websocket_connect( + req: HttpRequest, + stream: web::Payload, + data: web::Data>, + cancel_token: web::Data, +) -> Result { + trace!("websocket_connect"); + let (res, mut session, stream) = actix_ws::handle(&req, stream)?; + + let mut stream = stream + .aggregate_continuations() + // up to 1 MiB + .max_continuation_size(2_usize.pow(20)); + + let cancel_token = cancel_token.get_ref().clone(); + + rt::spawn(async move { + let (tx, mut rx) = tokio::sync::mpsc::channel::(128); + loop { + let result = select! { + _ = cancel_token.cancelled() => Ok(false), + Some(msg) = rx.recv() => handle_websocket_response(msg, &mut session).await, + Some(msg) = stream.next() => handle_websocket_incoming(msg, data.get_ref(), &mut session, &tx).await, + else => Ok(false), + }; + match result { + Ok(true) => {} + Ok(false) => break, + Err(err) => { + error!("{}", err); + break; + } + } + } + rx.close(); + let _ = session.close(None).await; + }); + + Ok(res) +} + +pub fn setup_websocket(cfg: &mut web::ServiceConfig) { + cfg.route("", web::get().to(websocket_connect)); +} diff --git a/server/src/http/websocket/request.rs b/server/src/http/websocket/request.rs new file mode 100644 index 0000000..04d1f99 --- /dev/null +++ b/server/src/http/websocket/request.rs @@ -0,0 +1,13 @@ +use derive_more::From; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RegisterTlmListenerRequest { + pub uuid: String, + pub minimum_separation_ms: u32, +} + +#[derive(Debug, Clone, Serialize, Deserialize, From)] +pub enum WebsocketRequest { + RegisterTlmListener(RegisterTlmListenerRequest), +} diff --git a/server/src/http/websocket/response.rs b/server/src/http/websocket/response.rs new file mode 100644 index 0000000..f056572 --- /dev/null +++ b/server/src/http/websocket/response.rs @@ -0,0 +1,14 @@ +use crate::telemetry::data_item::TelemetryDataItem; +use derive_more::From; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TlmValueResponse { + pub uuid: String, + pub value: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize, From)] +pub enum WebsocketResponse { + TlmValue(TlmValueResponse), +} diff --git a/server/src/lib.rs b/server/src/lib.rs index 782a9a9..966b38d 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -7,7 +7,7 @@ pub mod core { tonic::include_proto!("core"); } -use crate::telemetry::TelemetryManagementService; +use crate::telemetry::management_service::TelemetryManagementService; use std::error::Error; use std::sync::Arc; use tokio_util::sync::CancellationToken; diff --git a/server/src/telemetry.rs b/server/src/telemetry.rs deleted file mode 100644 index a2643c5..0000000 --- a/server/src/telemetry.rs +++ /dev/null @@ -1,113 +0,0 @@ -use crate::core::{TelemetryDataType, TelemetryDefinitionRequest, Uuid}; -use serde::de::Visitor; -use serde::{Deserialize, Deserializer, Serialize, Serializer}; -use std::error::Error; -use std::fmt::Formatter; -use papaya::HashMap; - -fn tlm_data_type_serialzier( - tlm_data_type: &TelemetryDataType, - serializer: S, -) -> Result -where - S: Serializer, -{ - serializer.serialize_str(tlm_data_type.as_str_name()) -} - -struct TlmDataTypeVisitor; - -impl Visitor<'_> for TlmDataTypeVisitor { - type Value = TelemetryDataType; - - fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result { - formatter.write_str("A &str") - } - - fn visit_str(self, v: &str) -> Result - where - E: serde::de::Error, - { - TelemetryDataType::from_str_name(v).ok_or(E::custom("Invalid TelemetryDataType")) - } -} - -fn tlm_data_type_deserialzier<'de, D>(deserializer: D) -> Result -where - D: Deserializer<'de>, -{ - deserializer.deserialize_str(TlmDataTypeVisitor) -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct TelemetryDefinition { - pub uuid: String, - pub name: String, - #[serde(serialize_with = "tlm_data_type_serialzier")] - #[serde(deserialize_with = "tlm_data_type_deserialzier")] - pub data_type: TelemetryDataType, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum TelemetryDataValue { - Float32(f32), - Float64(f64), -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct TelemetryDataItem { - pub value: TelemetryDataValue, - pub timestamp: String, -} - -#[derive(Clone)] -pub struct TelemetryData { - pub definition: TelemetryDefinition, - pub data: tokio::sync::watch::Sender>, -} - -pub struct TelemetryManagementService { - uuid_index: HashMap, - tlm_data: HashMap, -} - -impl TelemetryManagementService { - pub fn new() -> Self { - Self { - uuid_index: HashMap::new(), - tlm_data: HashMap::new(), - } - } - - pub fn register( - &self, - telemetry_definition_request: TelemetryDefinitionRequest, - ) -> Result> { - let uuid_index = self.uuid_index.pin(); - let tlm_data = self.tlm_data.pin(); - - let uuid = uuid_index.get_or_insert_with(telemetry_definition_request.name.clone(), || Uuid::random().value).clone(); - - let _ = tlm_data.try_insert(uuid.clone(), TelemetryData { - definition: TelemetryDefinition { - uuid: uuid.clone(), - name: telemetry_definition_request.name.clone(), - data_type: telemetry_definition_request.data_type(), - }, - data: tokio::sync::watch::channel(None).0, - }); - - Ok(uuid) - } - - pub fn get_by_name(&self, name: &String) -> Option { - let uuid_index = self.uuid_index.pin(); - let uuid = uuid_index.get(name)?; - self.get_by_uuid(uuid) - } - - pub fn get_by_uuid(&self, uuid: &String) -> Option { - let tlm_data = self.tlm_data.pin(); - tlm_data.get(uuid).cloned() - } -} diff --git a/server/src/telemetry/data.rs b/server/src/telemetry/data.rs new file mode 100644 index 0000000..7ca7b0d --- /dev/null +++ b/server/src/telemetry/data.rs @@ -0,0 +1,8 @@ +use crate::telemetry::data_item::TelemetryDataItem; +use crate::telemetry::definition::TelemetryDefinition; + +#[derive(Clone)] +pub struct TelemetryData { + pub definition: TelemetryDefinition, + pub data: tokio::sync::watch::Sender>, +} diff --git a/server/src/telemetry/data_item.rs b/server/src/telemetry/data_item.rs new file mode 100644 index 0000000..8c7e482 --- /dev/null +++ b/server/src/telemetry/data_item.rs @@ -0,0 +1,8 @@ +use crate::telemetry::data_value::TelemetryDataValue; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TelemetryDataItem { + pub value: TelemetryDataValue, + pub timestamp: String, +} diff --git a/server/src/telemetry/data_type.rs b/server/src/telemetry/data_type.rs new file mode 100644 index 0000000..a19c64d --- /dev/null +++ b/server/src/telemetry/data_type.rs @@ -0,0 +1,38 @@ +use crate::core::TelemetryDataType; +use serde::de::Visitor; +use serde::{Deserializer, Serializer}; +use std::fmt::Formatter; + +pub fn tlm_data_type_serializer( + tlm_data_type: &TelemetryDataType, + serializer: S, +) -> Result +where + S: Serializer, +{ + serializer.serialize_str(tlm_data_type.as_str_name()) +} + +struct TlmDataTypeVisitor; + +impl Visitor<'_> for TlmDataTypeVisitor { + type Value = TelemetryDataType; + + fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result { + formatter.write_str("A &str") + } + + fn visit_str(self, v: &str) -> Result + where + E: serde::de::Error, + { + TelemetryDataType::from_str_name(v).ok_or(E::custom("Invalid TelemetryDataType")) + } +} + +pub fn tlm_data_type_deserializer<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + deserializer.deserialize_str(TlmDataTypeVisitor) +} diff --git a/server/src/telemetry/data_value.rs b/server/src/telemetry/data_value.rs new file mode 100644 index 0000000..a056fda --- /dev/null +++ b/server/src/telemetry/data_value.rs @@ -0,0 +1,7 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum TelemetryDataValue { + Float32(f32), + Float64(f64), +} diff --git a/server/src/telemetry/definition.rs b/server/src/telemetry/definition.rs new file mode 100644 index 0000000..95deb36 --- /dev/null +++ b/server/src/telemetry/definition.rs @@ -0,0 +1,13 @@ +use crate::core::TelemetryDataType; +use crate::telemetry::data_type::tlm_data_type_deserializer; +use crate::telemetry::data_type::tlm_data_type_serializer; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TelemetryDefinition { + pub uuid: String, + pub name: String, + #[serde(serialize_with = "tlm_data_type_serializer")] + #[serde(deserialize_with = "tlm_data_type_deserializer")] + pub data_type: TelemetryDataType, +} diff --git a/server/src/telemetry/management_service.rs b/server/src/telemetry/management_service.rs new file mode 100644 index 0000000..8708004 --- /dev/null +++ b/server/src/telemetry/management_service.rs @@ -0,0 +1,58 @@ +use crate::core::{TelemetryDefinitionRequest, Uuid}; +use crate::telemetry::data::TelemetryData; +use crate::telemetry::definition::TelemetryDefinition; +use papaya::HashMap; +use std::error::Error; + +pub struct TelemetryManagementService { + uuid_index: HashMap, + tlm_data: HashMap, +} + +impl TelemetryManagementService { + pub fn new() -> Self { + Self { + uuid_index: HashMap::new(), + tlm_data: HashMap::new(), + } + } + + pub fn register( + &self, + telemetry_definition_request: TelemetryDefinitionRequest, + ) -> Result> { + let uuid_index = self.uuid_index.pin(); + let tlm_data = self.tlm_data.pin(); + + let uuid = uuid_index + .get_or_insert_with(telemetry_definition_request.name.clone(), || { + Uuid::random().value + }) + .clone(); + + let _ = tlm_data.try_insert( + uuid.clone(), + TelemetryData { + definition: TelemetryDefinition { + uuid: uuid.clone(), + name: telemetry_definition_request.name.clone(), + data_type: telemetry_definition_request.data_type(), + }, + data: tokio::sync::watch::channel(None).0, + }, + ); + + Ok(uuid) + } + + pub fn get_by_name(&self, name: &String) -> Option { + let uuid_index = self.uuid_index.pin(); + let uuid = uuid_index.get(name)?; + self.get_by_uuid(uuid) + } + + pub fn get_by_uuid(&self, uuid: &String) -> Option { + let tlm_data = self.tlm_data.pin(); + tlm_data.get(uuid).cloned() + } +} diff --git a/server/src/telemetry/mod.rs b/server/src/telemetry/mod.rs new file mode 100644 index 0000000..3d3e27f --- /dev/null +++ b/server/src/telemetry/mod.rs @@ -0,0 +1,6 @@ +pub mod data; +pub mod data_item; +pub mod data_type; +pub mod data_value; +pub mod definition; +pub mod management_service;