From 29f7f6d83be3a11bdf621d6598a251923507d759 Mon Sep 17 00:00:00 2001 From: Sergey Savelyev Date: Tue, 30 Dec 2025 01:54:39 -0500 Subject: [PATCH] moves code into the api layer --- Cargo.lock | 5 + api/Cargo.toml | 5 + api/src/client/error.rs | 31 +++ api/src/client/mod.rs | 283 +++++++++++++++++++++ api/src/lib.rs | 4 +- api/src/messages/mod.rs | 34 +++ api/src/messages/payload.rs | 17 ++ api/src/messages/telemetry_definition.rs | 19 ++ api/src/messages/telemetry_entry.rs | 14 + api/src/request.rs | 32 --- api/src/response.rs | 20 -- examples/simple_producer/src/main.rs | 252 ++---------------- server/src/http/backend/connection.rs | 8 +- server/src/telemetry/management_service.rs | 6 +- 14 files changed, 444 insertions(+), 286 deletions(-) create mode 100644 api/src/client/error.rs create mode 100644 api/src/client/mod.rs create mode 100644 api/src/messages/mod.rs create mode 100644 api/src/messages/payload.rs create mode 100644 api/src/messages/telemetry_definition.rs create mode 100644 api/src/messages/telemetry_entry.rs delete mode 100644 api/src/request.rs delete mode 100644 api/src/response.rs diff --git a/Cargo.lock b/Cargo.lock index 986edb0..5ad5a5e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -257,9 +257,14 @@ version = "0.1.0" dependencies = [ "chrono", "derive_more", + "futures-util", "log", "serde", + "serde_json", "thiserror", + "tokio", + "tokio-tungstenite", + "tokio-util", "uuid", ] diff --git a/api/Cargo.toml b/api/Cargo.toml index 000a12d..4834d80 100644 --- a/api/Cargo.toml +++ b/api/Cargo.toml @@ -12,3 +12,8 @@ serde = { version = "1.0.228", features = ["derive"] } derive_more = { version = "2.1.0", features = ["from", "try_into"] } uuid = { version = "1.19.0", features = ["v4", "serde"] } chrono = { version = "0.4.39", features = ["serde"] } +tokio = { version = "1.43.0", features = ["rt", "macros"] } +tokio-tungstenite = { version = "0.28.0", features = ["rustls-tls-native-roots"] } +tokio-util = "0.7.17" +futures-util = "0.3.31" +serde_json = "1.0.145" diff --git a/api/src/client/error.rs b/api/src/client/error.rs new file mode 100644 index 0000000..67e675e --- /dev/null +++ b/api/src/client/error.rs @@ -0,0 +1,31 @@ +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum ConnectError { + #[error(transparent)] + TungsteniteError(#[from] tokio_tungstenite::tungstenite::Error), + #[error(transparent)] + IoError(#[from] std::io::Error), +} + +#[derive(Error, Debug)] +pub enum MessageError { + #[error(transparent)] + TokioSendError(#[from] tokio::sync::mpsc::error::SendError<()>), + #[error(transparent)] + TokioTrySendError(#[from] tokio::sync::mpsc::error::TrySendError<()>), + #[error(transparent)] + TokioLockError(#[from] tokio::sync::TryLockError), +} + +#[derive(Error, Debug)] +pub enum RequestError { + #[error(transparent)] + TokioSendError(#[from] tokio::sync::mpsc::error::SendError<()>), + #[error(transparent)] + TokioLockError(#[from] tokio::sync::TryLockError), + #[error(transparent)] + RecvError(#[from] tokio::sync::oneshot::error::RecvError), + #[error(transparent)] + Inner(E), +} diff --git a/api/src/client/mod.rs b/api/src/client/mod.rs new file mode 100644 index 0000000..2e80251 --- /dev/null +++ b/api/src/client/mod.rs @@ -0,0 +1,283 @@ +pub mod error; + +use crate::client::error::{MessageError, RequestError}; +use crate::messages::payload::ResponseMessagePayload; +use crate::messages::{ClientMessage, RequestMessage, RequestResponse, ResponseMessage}; +use error::ConnectError; +use futures_util::stream::StreamExt; +use futures_util::SinkExt; +use log::{debug, error}; +use std::collections::HashMap; +use std::sync::mpsc::sync_channel; +use std::sync::Arc; +use std::thread; +use tokio::net::TcpStream; +use tokio::select; +use tokio::sync::mpsc::{Receiver, Sender}; +use tokio::sync::{mpsc, oneshot, RwLock, RwLockWriteGuard}; +use tokio_tungstenite::tungstenite::client::IntoClientRequest; +use tokio_tungstenite::tungstenite::handshake::client::Request; +use tokio_tungstenite::tungstenite::Message; +use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream}; +use tokio_util::sync::CancellationToken; +use uuid::Uuid; + +enum Callback { + None, + Once(oneshot::Sender), +} + +struct OutgoingMessage { + msg: RequestMessage, + callback: Callback, +} + +pub struct Client { + cancel: CancellationToken, + channel: Arc>>, +} + +struct ClientContext { + cancel: CancellationToken, + request: Request, +} + +impl Client { + pub fn connect(request: R) -> Result + where + R: IntoClientRequest, + { + let (tx, _rx) = mpsc::channel(1); + let cancel = CancellationToken::new(); + let channel = Arc::new(RwLock::new(tx)); + let context = ClientContext { + cancel: cancel.clone(), + request: request.into_client_request()?, + }; + + context.start(channel.clone())?; + + Ok(Self { cancel, channel }) + } + + pub async fn send_message(&self, msg: M) -> Result<(), MessageError> { + let sender = self.channel.read().await; + let data = sender.reserve().await?; + data.send(OutgoingMessage { + msg: RequestMessage { + uuid: Uuid::new_v4(), + response: None, + payload: msg.into(), + }, + callback: Callback::None, + }); + Ok(()) + } + + pub async fn send_message_if_connected( + &self, + msg: M, + ) -> Result<(), MessageError> { + let sender = self.channel.try_read()?; + let data = sender.reserve().await?; + data.send(OutgoingMessage { + msg: RequestMessage { + uuid: Uuid::new_v4(), + response: None, + payload: msg.into(), + }, + callback: Callback::None, + }); + Ok(()) + } + + pub fn try_send_message(&self, msg: M) -> Result<(), MessageError> { + let sender = self.channel.try_read()?; + let data = sender.try_reserve()?; + data.send(OutgoingMessage { + msg: RequestMessage { + uuid: Uuid::new_v4(), + response: None, + payload: msg.into(), + }, + callback: Callback::None, + }); + Ok(()) + } + + pub async fn send_request( + &self, + msg: M, + ) -> Result>::Error>> + { + let sender = self.channel.read().await; + let data = sender.reserve().await?; + + let (tx, rx) = oneshot::channel(); + data.send(OutgoingMessage { + msg: RequestMessage { + uuid: Uuid::new_v4(), + response: None, + payload: msg.into(), + }, + callback: Callback::Once(tx), + }); + + let response = rx.await?; + let response = M::Response::try_from(response.payload).map_err(RequestError::Inner)?; + + Ok(response) + } +} + +impl ClientContext { + fn start(mut self, channel: Arc>>) -> Result<(), ConnectError> { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build()?; + + let (tx, rx) = sync_channel::<()>(1); + + let _detached = thread::Builder::new() + .name("tlm-client".to_string()) + .spawn(move || { + runtime.block_on(async { + let mut write_lock = channel.write().await; + + // This cannot fail + let _ = tx.send(()); + + while !self.cancel.is_cancelled() { + write_lock = self.run_connection(write_lock, &channel).await; + } + drop(write_lock); + }); + })?; + + // This cannot fail + let _ = rx.recv(); + + Ok(()) + } + + async fn run_connection<'a>( + &mut self, + mut write_lock: RwLockWriteGuard<'a, Sender>, + channel: &'a Arc>>, + ) -> RwLockWriteGuard<'a, Sender> { + let mut ws = match connect_async(self.request.clone()).await { + Ok((ws, _)) => ws, + Err(e) => { + error!("Connect Error: {e}"); + return write_lock; + } + }; + + let (tx, rx) = mpsc::channel(128); + *write_lock = tx; + drop(write_lock); + + let close_connection = self.handle_connection(&mut ws, rx).await; + + let write_lock = channel.write().await; + if close_connection { + if let Err(e) = ws.close(None).await { + println!("Close Error {e}"); + } + } + write_lock + } + + async fn handle_connection( + &mut self, + ws: &mut WebSocketStream>, + mut rx: Receiver, + ) -> bool { + let mut callbacks = HashMap::::new(); + loop { + select! { + _ = self.cancel.cancelled() => { break; }, + Some(msg) = ws.next() => { + match msg { + Ok(msg) => { + match msg { + Message::Text(msg) => { + let msg: ResponseMessage = match serde_json::from_str(&msg) { + Ok(m) => m, + Err(e) => { + error!("Failed to deserialize {e}"); + break; + } + }; + self.handle_incoming(msg, &mut callbacks).await; + } + Message::Binary(_) => unimplemented!("Binary Data Not Implemented"), + Message::Ping(data) => { + if let Err(e) = ws.send(Message::Pong(data)).await { + error!("Failed to send Pong {e}"); + break; + } + } + Message::Pong(_) => { + // Intentionally Left Empty + } + Message::Close(_) => { + debug!("Websocket Closed"); + return false; + } + Message::Frame(_) => unreachable!("Not Possible"), + } + } + Err(e) => { + error!("Receive Error {e}"); + break; + } + } + } + Some(msg) = rx.recv() => { + callbacks.insert(msg.msg.uuid, msg.callback); + let msg = match serde_json::to_string(&msg.msg) { + Ok(m) => m, + Err(e) => { + error!("Encode Error {e}"); + break; + } + }; + if let Err(e) = ws.send(Message::Text(msg.into())).await { + error!("Send Error {e}"); + break; + } + } + else => { break; }, + } + } + true + } + + async fn handle_incoming( + &mut self, + msg: ResponseMessage, + callbacks: &mut HashMap, + ) { + if let Some(response_uuid) = msg.response { + match callbacks.get(&response_uuid) { + Some(Callback::Once(_)) => { + let Some(Callback::Once(callback)) = callbacks.remove(&response_uuid) else { + return; + }; + let _ = callback.send(msg); + } + Some(Callback::None) => { + callbacks.remove(&response_uuid); + } + None => {} + } + } + } +} + +impl Drop for Client { + fn drop(&mut self) { + self.cancel.cancel(); + } +} diff --git a/api/src/lib.rs b/api/src/lib.rs index d45cf7a..ca58c52 100644 --- a/api/src/lib.rs +++ b/api/src/lib.rs @@ -1,4 +1,4 @@ +pub mod client; pub mod data_type; pub mod data_value; -pub mod request; -pub mod response; +pub mod messages; diff --git a/api/src/messages/mod.rs b/api/src/messages/mod.rs new file mode 100644 index 0000000..79392b5 --- /dev/null +++ b/api/src/messages/mod.rs @@ -0,0 +1,34 @@ +pub mod payload; +pub mod telemetry_definition; +pub mod telemetry_entry; + +use payload::{RequestMessagePayload, ResponseMessagePayload}; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RequestMessage { + pub uuid: Uuid, + pub response: Option, + #[serde(flatten)] + pub payload: RequestMessagePayload, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ResponseMessage { + pub uuid: Uuid, + pub response: Option, + #[serde(flatten)] + pub payload: ResponseMessagePayload, +} + +pub trait ClientMessage: Into {} + +pub trait RequestResponse: Into { + type Response: TryFrom; +} + +// pub trait RegisterCallback { +// type Callback : TryFrom; +// type Response : Into; +// } diff --git a/api/src/messages/payload.rs b/api/src/messages/payload.rs new file mode 100644 index 0000000..176c42a --- /dev/null +++ b/api/src/messages/payload.rs @@ -0,0 +1,17 @@ +use crate::messages::telemetry_definition::{ + TelemetryDefinitionRequest, TelemetryDefinitionResponse, +}; +use crate::messages::telemetry_entry::TelemetryEntry; +use derive_more::{From, TryInto}; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize, From)] +pub enum RequestMessagePayload { + TelemetryDefinitionRequest(TelemetryDefinitionRequest), + TelemetryEntry(TelemetryEntry), +} + +#[derive(Debug, Clone, Serialize, Deserialize, From, TryInto)] +pub enum ResponseMessagePayload { + TelemetryDefinitionResponse(TelemetryDefinitionResponse), +} diff --git a/api/src/messages/telemetry_definition.rs b/api/src/messages/telemetry_definition.rs new file mode 100644 index 0000000..f3f1f2d --- /dev/null +++ b/api/src/messages/telemetry_definition.rs @@ -0,0 +1,19 @@ +use crate::data_type::DataType; +use crate::messages::RequestResponse; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TelemetryDefinitionRequest { + pub name: String, + pub data_type: DataType, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TelemetryDefinitionResponse { + pub uuid: Uuid, +} + +impl RequestResponse for TelemetryDefinitionRequest { + type Response = TelemetryDefinitionResponse; +} diff --git a/api/src/messages/telemetry_entry.rs b/api/src/messages/telemetry_entry.rs new file mode 100644 index 0000000..3376e30 --- /dev/null +++ b/api/src/messages/telemetry_entry.rs @@ -0,0 +1,14 @@ +use crate::data_value::DataValue; +use crate::messages::ClientMessage; +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TelemetryEntry { + pub uuid: Uuid, + pub value: DataValue, + pub timestamp: DateTime, +} + +impl ClientMessage for TelemetryEntry {} diff --git a/api/src/request.rs b/api/src/request.rs deleted file mode 100644 index 4653a09..0000000 --- a/api/src/request.rs +++ /dev/null @@ -1,32 +0,0 @@ -use crate::data_type::DataType; -use crate::data_value::DataValue; -use chrono::{DateTime, Utc}; -use derive_more::From; -use serde::{Deserialize, Serialize}; -use uuid::Uuid; - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct TelemetryDefinitionRequest { - pub name: String, - pub data_type: DataType, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct TelemetryEntry { - pub uuid: Uuid, - pub value: DataValue, - pub timestamp: DateTime, -} - -#[derive(Debug, Clone, Serialize, Deserialize, From)] -pub enum RequestMessagePayload { - TelemetryDefinitionRequest(TelemetryDefinitionRequest), - TelemetryEntry(TelemetryEntry), -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct RequestMessage { - pub uuid: Uuid, - #[serde(flatten)] - pub payload: RequestMessagePayload, -} diff --git a/api/src/response.rs b/api/src/response.rs deleted file mode 100644 index 5c7a41f..0000000 --- a/api/src/response.rs +++ /dev/null @@ -1,20 +0,0 @@ -use derive_more::{From, TryInto}; -use serde::{Deserialize, Serialize}; -use uuid::Uuid; - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct TelemetryDefinitionResponse { - pub uuid: Uuid, -} - -#[derive(Debug, Clone, Serialize, Deserialize, From, TryInto)] -pub enum ResponseMessagePayload { - TelemetryDefinitionResponse(TelemetryDefinitionResponse), -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ResponseMessage { - pub uuid: Uuid, - #[serde(flatten)] - pub payload: ResponseMessagePayload, -} diff --git a/examples/simple_producer/src/main.rs b/examples/simple_producer/src/main.rs index bec85d6..d176c8f 100644 --- a/examples/simple_producer/src/main.rs +++ b/examples/simple_producer/src/main.rs @@ -1,273 +1,71 @@ -use anyhow::anyhow; +use api::client::error::MessageError; +use api::client::Client; use api::data_type::DataType; use api::data_value::DataValue; -use api::request::{ - RequestMessage, RequestMessagePayload, TelemetryDefinitionRequest, TelemetryEntry, -}; -use api::response::{ResponseMessage, ResponseMessagePayload, TelemetryDefinitionResponse}; +use api::messages::telemetry_definition::TelemetryDefinitionRequest; +use api::messages::telemetry_entry::TelemetryEntry; use chrono::{DateTime, TimeDelta, Utc}; use futures_util::future::join_all; -use futures_util::{SinkExt, StreamExt}; use num_traits::FloatConst; -use std::collections::HashMap; -use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; -use tokio::net::TcpStream; -use tokio::sync::mpsc; -use tokio::sync::mpsc::channel; -use tokio::time::{sleep, sleep_until, Instant}; -use tokio::{pin, select}; -use tokio_tungstenite::tungstenite::client::IntoClientRequest; -use tokio_tungstenite::tungstenite::Message; -use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream}; +use tokio::time::{sleep_until, Instant}; use tokio_util::sync::CancellationToken; use uuid::Uuid; struct Telemetry { - is_connected: Arc, - tx: mpsc::Sender, - callback_new: mpsc::Sender<(Uuid, mpsc::Sender)>, - callback_delete: mpsc::Sender, - cancel: CancellationToken, + client: Arc, } -struct TelemetryItemHandle<'a> { +struct TelemetryItemHandle { uuid: Uuid, - tlm: &'a Telemetry, + client: Arc, } impl Telemetry { - async fn connect(request: R) -> anyhow::Result>> - where - R: IntoClientRequest + Unpin, - { - let (client, _) = match connect_async(request).await { - Ok(o) => o, - Err(e) => { - sleep(Duration::from_secs(1)).await; - return Err(e.into()); - } - }; - Ok(client) - } - - pub async fn new(request: R) -> anyhow::Result - where - R: IntoClientRequest + Unpin + Clone + Send + 'static, - { - let cancel = CancellationToken::new(); - let cancel_stored = cancel.clone(); - let (tx_tx, mut tx_rx) = channel(128); - let (callback_new_tx, mut callback_new_rx) = channel(16); - let (callback_delete_tx, mut callback_delete_rx) = channel(16); - - let is_connected = Arc::new(AtomicBool::new(false)); - let connected = is_connected.clone(); - - tokio::spawn(async move { - while !cancel.is_cancelled() { - let client = Self::connect(request.clone()); - pin!(client); - let mut client = match loop { - break select! { - c = &mut client => { - c - }, - Some(_) = tx_rx.recv() => {continue;}, - Some(_) = callback_new_rx.recv() => {continue;}, - Some(_) = callback_delete_rx.recv() => {continue;}, - }; - } { - Ok(c) => c, - Err(e) => { - println!("Connect Error: {e}"); - continue; - } - }; - - let mut close_connection = true; - let mut callbacks = HashMap::>::new(); - connected.store(true, Ordering::SeqCst); - - loop { - select! { - biased; - _ = cancel.cancelled() => { break; }, - Some(msg) = callback_new_rx.recv() => { - let (uuid, callback) = msg; - callbacks.insert(uuid, callback); - }, - Some(msg) = callback_delete_rx.recv() => { - callbacks.remove(&msg); - }, - Some(msg) = client.next() => { - match msg { - Ok(msg) => { - match msg { - Message::Text(msg) => { - let msg: ResponseMessage = match serde_json::from_str(&msg) { - Ok(m) => m, - Err(e) => { - println!("Failed to deserialize {e}"); - break; - } - }; - if let Some(cb) = callbacks.get_mut(&msg.uuid) { - if let Err(e) = cb.send(msg.payload).await { - println!("Failed to call callback {e}"); - callbacks.remove(&msg.uuid); - } - } else { - unimplemented!("Unexpected Message: {msg:?}"); - } - } - Message::Binary(_) => unimplemented!("Binary Unsupported"), - Message::Ping(data) => { - if let Err(e) = client.send(Message::Pong(data)).await { - println!("Failed to send Pong {e}"); - break; - } - } - Message::Pong(_) => { - // Intentionally Left Empty - } - Message::Close(_) => { - println!("Websocket Closed"); - close_connection = false; - break; - } - Message::Frame(_) => unreachable!("Not Possible"), - } - } - Err(e) => { - println!("Receive Error {e}"); - break; - } - } - }, - Some(msg) = tx_rx.recv() => { - let msg = match serde_json::to_string(&msg) { - Ok(m) => m, - Err(e) => { - println!("Encode Error {e}"); - break; - } - }; - if let Err(e) = client.send(Message::Text(msg.into())).await { - println!("Send Error {e}"); - break; - } - }, - else => { break; }, - } - } - connected.store(false, Ordering::SeqCst); - if close_connection { - if let Err(e) = client.close(None).await { - println!("Close Error {e}"); - } - } - - // Callbacks gets dropped here - closing all listeners automatically - } - }); - - Ok(Self { - tx: tx_tx, - callback_new: callback_new_tx, - callback_delete: callback_delete_tx, - cancel: cancel_stored, - is_connected, - }) - } - - pub async fn wait_connected(&self) { - while !self.is_connected.load(Ordering::Relaxed) { - sleep(Duration::from_millis(10)).await; - } - } - - /// Send a message and don't expect a response - pub async fn send_message>( - &self, - payload: P, - ) -> anyhow::Result<()> { - self.tx - .send(RequestMessage { - uuid: Uuid::new_v4(), - payload: payload.into(), - }) - .await?; - Ok(()) - } - - async fn send_request< - P: Into, - R: TryFrom, - >( - &self, - payload: P, - ) -> anyhow::Result { - let uuid = Uuid::new_v4(); - let (tx, mut rx) = channel(1); - - self.wait_connected().await; - self.callback_new.send((uuid, tx)).await?; - self.tx - .send(RequestMessage { - uuid, - payload: payload.into(), - }) - .await?; - - let response = rx.recv().await.ok_or(anyhow!("No Response Received"))?; - - let response = R::try_from(response)?; - - self.callback_delete.send(uuid).await?; - - Ok(response) + pub fn new(client: Arc) -> Self { + Self { client } } pub async fn register( &self, name: String, data_type: DataType, - ) -> anyhow::Result> { - let response: TelemetryDefinitionResponse = self + ) -> anyhow::Result { + let response = self + .client .send_request(TelemetryDefinitionRequest { name, data_type }) .await?; Ok(TelemetryItemHandle { uuid: response.uuid, - tlm: self, + client: self.client.clone(), }) } } -impl Drop for Telemetry { - fn drop(&mut self) { - self.cancel.cancel(); - } -} - -impl<'a> TelemetryItemHandle<'a> { +impl TelemetryItemHandle { pub async fn publish(&self, value: DataValue, timestamp: DateTime) -> anyhow::Result<()> { - self.tlm - .send_message(TelemetryEntry { + self.client + .send_message_if_connected(TelemetryEntry { uuid: self.uuid, value, timestamp, }) - .await?; + .await + .or_else(|e| match e { + MessageError::TokioLockError(_) => Ok(()), + e => Err(e), + })?; + Ok(()) } } #[tokio::main] async fn main() -> anyhow::Result<()> { - let tlm = Telemetry::new("ws://[::1]:8080/backend").await?; + let client = Arc::new(Client::connect("ws://[::1]:8080/backend")?); + let tlm = Telemetry::new(client); { let time_offset = tlm diff --git a/server/src/http/backend/connection.rs b/server/src/http/backend/connection.rs index 895bdaf..1004b48 100644 --- a/server/src/http/backend/connection.rs +++ b/server/src/http/backend/connection.rs @@ -1,10 +1,11 @@ use crate::telemetry::management_service::TelemetryManagementService; use actix_ws::{AggregatedMessage, ProtocolError, Session}; use anyhow::bail; -use api::request::{RequestMessage, RequestMessagePayload}; -use api::response::ResponseMessage; +use api::messages::payload::RequestMessagePayload; +use api::messages::{RequestMessage, ResponseMessage}; use std::sync::Arc; use tokio::sync::mpsc::{Receiver, Sender}; +use uuid::Uuid; pub(super) struct BackendConnection { session: Session, @@ -31,7 +32,8 @@ impl BackendConnection { RequestMessagePayload::TelemetryDefinitionRequest(tlm_def) => { self.tx .send(ResponseMessage { - uuid: msg.uuid, + uuid: Uuid::new_v4(), + response: Some(msg.uuid), payload: self.tlm_management.register(tlm_def)?.into(), }) .await?; diff --git a/server/src/telemetry/management_service.rs b/server/src/telemetry/management_service.rs index 63e8c81..b4478b7 100644 --- a/server/src/telemetry/management_service.rs +++ b/server/src/telemetry/management_service.rs @@ -5,8 +5,10 @@ use crate::telemetry::history::{TelemetryHistory, TelemetryHistoryService}; use anyhow::bail; use api::data_type::DataType; use api::data_value::DataValue; -use api::request::{TelemetryDefinitionRequest, TelemetryEntry}; -use api::response::TelemetryDefinitionResponse; +use api::messages::telemetry_definition::{ + TelemetryDefinitionRequest, TelemetryDefinitionResponse, +}; +use api::messages::telemetry_entry::TelemetryEntry; use chrono::SecondsFormat; use log::{error, info, warn}; use papaya::{HashMap, HashMapRef, LocalGuard};