From 6980b7f6aa785c027fec965cc2b18e58ccf8e07b Mon Sep 17 00:00:00 2001 From: Sergey Savelyev Date: Tue, 30 Dec 2025 14:19:41 -0500 Subject: [PATCH] move cmd off of grpc --- Cargo.lock | 2 +- api/src/client/mod.rs | 189 +++++++++++++++++++++++--- api/src/data_value.rs | 3 +- api/src/messages/callback.rs | 7 + api/src/messages/command.rs | 35 +++++ api/src/messages/mod.rs | 10 +- api/src/messages/payload.rs | 6 + examples/simple_command/Cargo.toml | 2 +- examples/simple_command/src/main.rs | 173 ++++++++--------------- server/build.rs | 1 - server/proto/core.proto | 60 -------- server/src/command/command_handle.rs | 20 +++ server/src/command/definition.rs | 21 +-- server/src/command/error.rs | 4 +- server/src/command/mod.rs | 1 + server/src/command/service.rs | 131 +++++++++++------- server/src/grpc/mod.rs | 33 ----- server/src/http/api/panels.rs | 15 +- server/src/http/backend/connection.rs | 38 +++++- server/src/http/backend/mod.rs | 5 +- server/src/http/error.rs | 2 +- server/src/lib.rs | 9 -- server/src/panels/mod.rs | 17 ++- server/src/telemetry/data_type.rs | 38 ------ server/src/telemetry/mod.rs | 1 - server/src/uuid.rs | 18 --- 26 files changed, 452 insertions(+), 389 deletions(-) create mode 100644 api/src/messages/callback.rs create mode 100644 api/src/messages/command.rs delete mode 100644 server/proto/core.proto create mode 100644 server/src/command/command_handle.rs delete mode 100644 server/src/grpc/mod.rs delete mode 100644 server/src/telemetry/data_type.rs delete mode 100644 server/src/uuid.rs diff --git a/Cargo.lock b/Cargo.lock index 5ad5a5e..f05e34c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2258,10 +2258,10 @@ name = "simple_command" version = "0.0.0" dependencies = [ "anyhow", + "api", "chrono", "log", "num-traits", - "server", "tokio", "tokio-util", "tonic", diff --git a/api/src/client/mod.rs b/api/src/client/mod.rs index 2e80251..c11a905 100644 --- a/api/src/client/mod.rs +++ b/api/src/client/mod.rs @@ -1,20 +1,23 @@ pub mod error; use crate::client::error::{MessageError, RequestError}; +use crate::messages::callback::GenericCallbackError; +use crate::messages::payload::RequestMessagePayload; use crate::messages::payload::ResponseMessagePayload; -use crate::messages::{ClientMessage, RequestMessage, RequestResponse, ResponseMessage}; +use crate::messages::{ + ClientMessage, RegisterCallback, RequestMessage, RequestResponse, ResponseMessage, +}; use error::ConnectError; use futures_util::stream::StreamExt; use futures_util::SinkExt; -use log::{debug, error}; +use log::{debug, error, warn}; use std::collections::HashMap; use std::sync::mpsc::sync_channel; use std::sync::Arc; use std::thread; use tokio::net::TcpStream; -use tokio::select; -use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::{mpsc, oneshot, RwLock, RwLockWriteGuard}; +use tokio::{select, spawn}; use tokio_tungstenite::tungstenite::client::IntoClientRequest; use tokio_tungstenite::tungstenite::handshake::client::Request; use tokio_tungstenite::tungstenite::Message; @@ -22,9 +25,13 @@ use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream}; use tokio_util::sync::CancellationToken; use uuid::Uuid; +type RegisteredCallback = mpsc::Sender<(ResponseMessage, oneshot::Sender)>; +type ClientChannel = Arc>>; + enum Callback { None, Once(oneshot::Sender), + Registered(RegisteredCallback), } struct OutgoingMessage { @@ -34,7 +41,7 @@ struct OutgoingMessage { pub struct Client { cancel: CancellationToken, - channel: Arc>>, + channel: ClientChannel, } struct ClientContext { @@ -128,10 +135,101 @@ impl Client { Ok(response) } + + pub async fn register_callback_channel( + &self, + msg: M, + ) -> Result)>, MessageError> + where + ::Callback: Send + 'static, + ::Response: Send + 'static, + <::Callback as TryFrom>::Error: Send, + { + let sender = self.channel.read().await; + let data = sender.reserve().await?; + + let (inner_tx, mut inner_rx) = mpsc::channel(16); + let (outer_tx, outer_rx) = mpsc::channel(1); + + data.send(OutgoingMessage { + msg: RequestMessage { + uuid: Uuid::new_v4(), + response: None, + payload: msg.into(), + }, + callback: Callback::Registered(inner_tx), + }); + + spawn(async move { + // If the handler was unregistered we can stop + while let Some((msg, responder)) = inner_rx.recv().await { + let response: RequestMessagePayload = match M::Callback::try_from(msg.payload) { + Err(_) => GenericCallbackError::MismatchedType.into(), + Ok(o) => { + let (response_tx, response_rx) = oneshot::channel::(); + match outer_tx.send((o, response_tx)).await { + Err(_) => GenericCallbackError::CallbackClosed.into(), + Ok(()) => response_rx + .await + .map(M::Response::into) + .unwrap_or_else(|_| GenericCallbackError::CallbackClosed.into()), + } + } + }; + + if responder.send(response).is_err() { + // If the callback was unregistered we can stop + break; + } + } + }); + + Ok(outer_rx) + } + + pub async fn register_callback_fn( + &self, + msg: M, + mut f: F, + ) -> Result<(), MessageError> + where + F: FnMut(M::Callback) -> M::Response + Send + 'static, + { + let sender = self.channel.read().await; + let data = sender.reserve().await?; + + let (inner_tx, mut inner_rx) = mpsc::channel(16); + + data.send(OutgoingMessage { + msg: RequestMessage { + uuid: Uuid::new_v4(), + response: None, + payload: msg.into(), + }, + callback: Callback::Registered(inner_tx), + }); + + spawn(async move { + // If the handler was unregistered we can stop + while let Some((msg, responder)) = inner_rx.recv().await { + let response: RequestMessagePayload = match M::Callback::try_from(msg.payload) { + Err(_) => GenericCallbackError::MismatchedType.into(), + Ok(o) => f(o).into(), + }; + + if responder.send(response).is_err() { + // If the callback was unregistered we can stop + break; + } + } + }); + + Ok(()) + } } impl ClientContext { - fn start(mut self, channel: Arc>>) -> Result<(), ConnectError> { + fn start(mut self, channel: ClientChannel) -> Result<(), ConnectError> { let runtime = tokio::runtime::Builder::new_current_thread() .enable_all() .build()?; @@ -162,9 +260,9 @@ impl ClientContext { async fn run_connection<'a>( &mut self, - mut write_lock: RwLockWriteGuard<'a, Sender>, - channel: &'a Arc>>, - ) -> RwLockWriteGuard<'a, Sender> { + mut write_lock: RwLockWriteGuard<'a, mpsc::Sender>, + channel: &'a ClientChannel, + ) -> RwLockWriteGuard<'a, mpsc::Sender> { let mut ws = match connect_async(self.request.clone()).await { Ok((ws, _)) => ws, Err(e) => { @@ -177,7 +275,7 @@ impl ClientContext { *write_lock = tx; drop(write_lock); - let close_connection = self.handle_connection(&mut ws, rx).await; + let close_connection = self.handle_connection(&mut ws, rx, channel).await; let write_lock = channel.write().await; if close_connection { @@ -191,7 +289,8 @@ impl ClientContext { async fn handle_connection( &mut self, ws: &mut WebSocketStream>, - mut rx: Receiver, + mut rx: mpsc::Receiver, + channel: &ClientChannel, ) -> bool { let mut callbacks = HashMap::::new(); loop { @@ -209,7 +308,7 @@ impl ClientContext { break; } }; - self.handle_incoming(msg, &mut callbacks).await; + self.handle_incoming(msg, &mut callbacks, channel).await; } Message::Binary(_) => unimplemented!("Binary Data Not Implemented"), Message::Ping(data) => { @@ -235,7 +334,10 @@ impl ClientContext { } } Some(msg) = rx.recv() => { - callbacks.insert(msg.msg.uuid, msg.callback); + // Insert a callback if it isn't a None callback + if !matches!(msg.callback, Callback::None) { + callbacks.insert(msg.msg.uuid, msg.callback); + } let msg = match serde_json::to_string(&msg.msg) { Ok(m) => m, Err(e) => { @@ -258,22 +360,77 @@ impl ClientContext { &mut self, msg: ResponseMessage, callbacks: &mut HashMap, + channel: &ClientChannel, ) { if let Some(response_uuid) = msg.response { match callbacks.get(&response_uuid) { + Some(Callback::None) => { + callbacks.remove(&response_uuid); + unreachable!("We skip registering callbacks of None type"); + } Some(Callback::Once(_)) => { let Some(Callback::Once(callback)) = callbacks.remove(&response_uuid) else { return; }; let _ = callback.send(msg); } - Some(Callback::None) => { - callbacks.remove(&response_uuid); + Some(Callback::Registered(callback)) => { + let callback = callback.clone(); + spawn(Self::handle_registered_callback( + callback, + msg, + channel.clone(), + )); + } + None => { + warn!("No Callback Registered for {response_uuid}"); } - None => {} } } } + + async fn handle_registered_callback( + callback: RegisteredCallback, + msg: ResponseMessage, + channel: ClientChannel, + ) { + let (tx, rx) = oneshot::channel(); + + let uuid = msg.uuid; + + let response = match callback.send((msg, tx)).await { + Err(_) => GenericCallbackError::CallbackClosed.into(), + Ok(()) => rx + .await + .unwrap_or_else(|_| GenericCallbackError::CallbackClosed.into()), + }; + + if let Err(e) = Self::send_response(channel, response, uuid).await { + error!("Failed to send response {e}"); + } + } + + async fn send_response( + channel: ClientChannel, + payload: RequestMessagePayload, + response_uuid: Uuid, + ) -> Result<(), MessageError> { + // If this failed that means we're in the middle of reconnecting, so our callbacks + // are all being cleaned up as-is. No response needed. + let sender = channel.try_read()?; + let data = sender.reserve().await?; + + data.send(OutgoingMessage { + msg: RequestMessage { + uuid: Uuid::new_v4(), + response: Some(response_uuid), + payload, + }, + callback: Callback::None, + }); + + Ok(()) + } } impl Drop for Client { diff --git a/api/src/data_value.rs b/api/src/data_value.rs index 43e8d9e..35422a3 100644 --- a/api/src/data_value.rs +++ b/api/src/data_value.rs @@ -1,6 +1,7 @@ +use derive_more::TryInto; use serde::{Deserialize, Serialize}; -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, Serialize, Deserialize, TryInto)] pub enum DataValue { Float32(f32), Float64(f64), diff --git a/api/src/messages/callback.rs b/api/src/messages/callback.rs new file mode 100644 index 0000000..950287d --- /dev/null +++ b/api/src/messages/callback.rs @@ -0,0 +1,7 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum GenericCallbackError { + CallbackClosed, + MismatchedType, +} diff --git a/api/src/messages/command.rs b/api/src/messages/command.rs new file mode 100644 index 0000000..b8c9a81 --- /dev/null +++ b/api/src/messages/command.rs @@ -0,0 +1,35 @@ +use crate::data_type::DataType; +use crate::data_value::DataValue; +use crate::messages::RegisterCallback; +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CommandParameterDefinition { + pub name: String, + pub data_type: DataType, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CommandDefinition { + pub name: String, + pub parameters: Vec, +} + +impl RegisterCallback for CommandDefinition { + type Callback = Command; + type Response = CommandResponse; +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Command { + pub timestamp: DateTime, + pub parameters: HashMap, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CommandResponse { + pub success: bool, + pub response: String, +} diff --git a/api/src/messages/mod.rs b/api/src/messages/mod.rs index 79392b5..f11f638 100644 --- a/api/src/messages/mod.rs +++ b/api/src/messages/mod.rs @@ -1,3 +1,5 @@ +pub mod callback; +pub mod command; pub mod payload; pub mod telemetry_definition; pub mod telemetry_entry; @@ -28,7 +30,7 @@ pub trait RequestResponse: Into { type Response: TryFrom; } -// pub trait RegisterCallback { -// type Callback : TryFrom; -// type Response : Into; -// } +pub trait RegisterCallback: Into { + type Callback: TryFrom; + type Response: Into; +} diff --git a/api/src/messages/payload.rs b/api/src/messages/payload.rs index 176c42a..0dbe173 100644 --- a/api/src/messages/payload.rs +++ b/api/src/messages/payload.rs @@ -1,3 +1,5 @@ +use crate::messages::callback::GenericCallbackError; +use crate::messages::command::{Command, CommandDefinition, CommandResponse}; use crate::messages::telemetry_definition::{ TelemetryDefinitionRequest, TelemetryDefinitionResponse, }; @@ -9,9 +11,13 @@ use serde::{Deserialize, Serialize}; pub enum RequestMessagePayload { TelemetryDefinitionRequest(TelemetryDefinitionRequest), TelemetryEntry(TelemetryEntry), + GenericCallbackError(GenericCallbackError), + CommandDefinition(CommandDefinition), + CommandResponse(CommandResponse), } #[derive(Debug, Clone, Serialize, Deserialize, From, TryInto)] pub enum ResponseMessagePayload { TelemetryDefinitionResponse(TelemetryDefinitionResponse), + Command(Command), } diff --git a/examples/simple_command/Cargo.toml b/examples/simple_command/Cargo.toml index b799df6..29d89d4 100644 --- a/examples/simple_command/Cargo.toml +++ b/examples/simple_command/Cargo.toml @@ -4,7 +4,7 @@ name = "simple_command" edition = "2021" [dependencies] -server = { path = "../../server" } +api = { path = "../../api" } tonic = "0.12.3" tokio = { version = "1.43.0", features = ["rt-multi-thread", "signal"] } chrono = "0.4.39" diff --git a/examples/simple_command/src/main.rs b/examples/simple_command/src/main.rs index ee6f512..2f2adf4 100644 --- a/examples/simple_command/src/main.rs +++ b/examples/simple_command/src/main.rs @@ -1,78 +1,36 @@ -use chrono::DateTime; -use server::core::client_side_command::Inner; -use server::core::command_service_client::CommandServiceClient; -use server::core::telemetry_value::Value; -use server::core::{ - ClientSideCommand, Command, CommandDefinitionRequest, CommandParameterDefinition, - CommandResponse, TelemetryDataType, +use anyhow::anyhow; +use api::client::Client; +use api::data_type::DataType; +use api::messages::command::{ + Command, CommandDefinition, CommandParameterDefinition, CommandResponse, }; use std::error::Error; -use tokio::select; -use tokio::sync::mpsc; -use tokio::task::JoinHandle; +use std::sync::Arc; use tokio_util::sync::CancellationToken; -use tonic::codegen::tokio_stream::wrappers::ReceiverStream; -use tonic::codegen::tokio_stream::StreamExt; -use tonic::transport::Channel; -struct CommandHandler { - handle: JoinHandle<()>, -} +fn handle_command(command: Command) -> anyhow::Result { + let timestamp = command.timestamp; + let a: f32 = (*command + .parameters + .get("a") + .ok_or(anyhow!("Parameter 'a' Missing"))?) + .try_into()?; + let b: f64 = (*command + .parameters + .get("b") + .ok_or(anyhow!("Parameter 'b' Missing"))?) + .try_into()?; + let c: bool = (*command + .parameters + .get("c") + .ok_or(anyhow!("Parameter 'c' Missing"))?) + .try_into()?; -impl CommandHandler { - pub async fn new CommandResponse + Send + 'static>( - cancellation_token: CancellationToken, - client: &mut CommandServiceClient, - command_definition_request: CommandDefinitionRequest, - handler: F, - ) -> anyhow::Result { - let (tx, rx) = mpsc::channel(4); + println!("Command Received:\n timestamp: {timestamp}\n a: {a}\n b: {b}\n c: {c}"); - // The buffer size of 4 means this is safe to send immediately - tx.send(ClientSideCommand { - inner: Some(Inner::Request(command_definition_request)), - }) - .await?; - let response = client.new_command(ReceiverStream::new(rx)).await?; - let mut cmd_stream = response.into_inner(); - - let handle = tokio::spawn(async move { - loop { - select! { - _ = cancellation_token.cancelled() => break, - Some(msg) = cmd_stream.next() => { - match msg { - Ok(cmd) => { - let uuid = cmd.uuid.clone(); - let mut response = handler(cmd); - response.uuid = uuid; - match tx.send(ClientSideCommand { - inner: Some(Inner::Response(response)) - }).await { - Ok(()) => {}, - Err(e) => { - println!("SendError: {e}"); - break; - } - } - } - Err(e) => { - println!("Error: {e}"); - break; - } - } - }, - else => break, - } - } - }); - - Ok(Self { handle }) - } - - pub async fn join(self) -> anyhow::Result<()> { - Ok(self.handle.await?) - } + Ok(format!( + "Successfully Received Command! timestamp: {timestamp} a: {a} b: {b} c: {c}" + )) } #[tokio::main] @@ -86,56 +44,41 @@ async fn main() -> Result<(), Box> { }); } - let mut client = CommandServiceClient::connect("http://[::1]:50051").await?; + let client = Arc::new(Client::connect("ws://[::1]:8080/backend")?); - let cmd_handler = CommandHandler::new( - cancellation_token, - &mut client, - CommandDefinitionRequest { - name: "simple_command/a".to_string(), - parameters: vec![ - CommandParameterDefinition { - name: "a".to_string(), - data_type: TelemetryDataType::Float32.into(), + client + .register_callback_fn( + CommandDefinition { + name: "simple_command/a".to_string(), + parameters: vec![ + CommandParameterDefinition { + name: "a".to_string(), + data_type: DataType::Float32, + }, + CommandParameterDefinition { + name: "b".to_string(), + data_type: DataType::Float64, + }, + CommandParameterDefinition { + name: "c".to_string(), + data_type: DataType::Boolean, + }, + ], + }, + |command| match handle_command(command) { + Ok(response) => CommandResponse { + success: true, + response, }, - CommandParameterDefinition { - name: "b".to_string(), - data_type: TelemetryDataType::Float64.into(), + Err(error) => CommandResponse { + success: false, + response: error.to_string(), }, - CommandParameterDefinition { - name: "c".to_string(), - data_type: TelemetryDataType::Boolean.into(), - }, - ], - }, - |command| { - let timestamp = command.timestamp.expect("Missing Timestamp"); - let timestamp = DateTime::from_timestamp(timestamp.secs, timestamp.nanos as u32) - .expect("Could not construct date time"); - let Value::Float32(a) = command.parameters["a"].value.expect("Missing Value a") else { - panic!("Wrong Type a"); - }; - let Value::Float64(b) = command.parameters["b"].value.expect("Missing Value b") else { - panic!("Wrong Type b"); - }; - let Value::Boolean(c) = command.parameters["c"].value.expect("Missing Value c") else { - panic!("Wrong Type c"); - }; + }, + ) + .await?; - println!("Command Received:\n timestamp: {timestamp}\n a: {a}\n b: {b}\n c: {c}"); - - CommandResponse { - uuid: command.uuid.clone(), - success: true, - response: format!( - "Successfully Received Command! timestamp: {timestamp} a: {a} b: {b} c: {c}" - ), - } - }, - ) - .await?; - - cmd_handler.join().await?; + cancellation_token.cancelled().await; Ok(()) } diff --git a/server/build.rs b/server/build.rs index e763efd..d100728 100644 --- a/server/build.rs +++ b/server/build.rs @@ -1,5 +1,4 @@ fn main() -> Result<(), Box> { println!("cargo:rerun-if-changed=migrations"); - tonic_build::compile_protos("proto/core.proto")?; Ok(()) } diff --git a/server/proto/core.proto b/server/proto/core.proto deleted file mode 100644 index 961ae34..0000000 --- a/server/proto/core.proto +++ /dev/null @@ -1,60 +0,0 @@ - -syntax = "proto3"; -package core; - -enum TelemetryDataType { - Float32 = 0; - Float64 = 1; - Boolean = 2; -} - -message TelemetryValue { - oneof value { - float float_32 = 1; - double float_64 = 2; - bool boolean = 3; - } -} - -message UUID { - string value = 1; -} - -// UTC since UNIX -message Timestamp { - sfixed64 secs = 1; - sfixed32 nanos = 2; -} - -message CommandParameterDefinition { - string name = 1; - TelemetryDataType data_type = 2; -} - -message CommandDefinitionRequest { - string name = 1; - repeated CommandParameterDefinition parameters = 2; -} - -message Command { - UUID uuid = 1; - Timestamp timestamp = 2; - map parameters = 3; -} - -message CommandResponse { - UUID uuid = 1; - bool success = 2; - string response = 3; -} - -message ClientSideCommand { - oneof inner { - CommandDefinitionRequest request = 1; - CommandResponse response = 2; - } -} - -service CommandService { - rpc NewCommand (stream ClientSideCommand) returns (stream Command); -} diff --git a/server/src/command/command_handle.rs b/server/src/command/command_handle.rs new file mode 100644 index 0000000..9c7b68d --- /dev/null +++ b/server/src/command/command_handle.rs @@ -0,0 +1,20 @@ +use uuid::Uuid; + +pub struct CommandHandle { + name: String, + uuid: Uuid, +} + +impl CommandHandle { + pub fn new(name: String, uuid: Uuid) -> Self { + Self { name, uuid } + } + + pub fn name(&self) -> &str { + &self.name + } + + pub fn uuid(&self) -> &Uuid { + &self.uuid + } +} diff --git a/server/src/command/definition.rs b/server/src/command/definition.rs index d724c8a..70b0ceb 100644 --- a/server/src/command/definition.rs +++ b/server/src/command/definition.rs @@ -1,22 +1,5 @@ use crate::command::service::RegisteredCommand; -use crate::core::TelemetryDataType; -use crate::telemetry::data_type::tlm_data_type_deserializer; -use crate::telemetry::data_type::tlm_data_type_serializer; -use serde::{Deserialize, Serialize}; - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct CommandParameterDefinition { - pub name: String, - #[serde(serialize_with = "tlm_data_type_serializer")] - #[serde(deserialize_with = "tlm_data_type_deserializer")] - pub data_type: TelemetryDataType, -} - -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct CommandDefinition { - pub name: String, - pub parameters: Vec, -} +use api::messages::command::{CommandDefinition, CommandParameterDefinition}; impl From for CommandDefinition { fn from(value: RegisteredCommand) -> Self { @@ -27,7 +10,7 @@ impl From for CommandDefinition { .parameters .into_iter() .map(|param| CommandParameterDefinition { - data_type: param.data_type(), + data_type: param.data_type, name: param.name, }) .collect(), diff --git a/server/src/command/error.rs b/server/src/command/error.rs index 698de99..c218826 100644 --- a/server/src/command/error.rs +++ b/server/src/command/error.rs @@ -1,6 +1,6 @@ -use crate::core::TelemetryDataType; use actix_web::http::StatusCode; use actix_web::ResponseError; +use api::data_type::DataType; use thiserror::Error; #[derive(Error, Debug)] @@ -14,7 +14,7 @@ pub enum Error { #[error("Incorrect Parameter Type for {name}. {expected_type:?} expected.")] WrongParameterType { name: String, - expected_type: TelemetryDataType, + expected_type: DataType, }, #[error("No Command Receiver")] NoCommandReceiver, diff --git a/server/src/command/mod.rs b/server/src/command/mod.rs index e3c4024..adb99ed 100644 --- a/server/src/command/mod.rs +++ b/server/src/command/mod.rs @@ -1,3 +1,4 @@ +pub mod command_handle; mod definition; pub mod error; pub mod service; diff --git a/server/src/command/service.rs b/server/src/command/service.rs index 784b736..8aafb11 100644 --- a/server/src/command/service.rs +++ b/server/src/command/service.rs @@ -1,36 +1,40 @@ -use crate::command::definition::CommandDefinition; +use crate::command::command_handle::CommandHandle; use crate::command::error::Error as CmdError; use crate::command::error::Error::{ CommandFailure, CommandNotFound, FailedToReceiveResponse, FailedToSend, IncorrectParameterCount, MisingParameter, NoCommandReceiver, WrongParameterType, }; -use crate::core::telemetry_value::Value; -use crate::core::{ - Command, CommandDefinitionRequest, CommandResponse, TelemetryDataType, TelemetryValue, - Timestamp, Uuid, -}; -use chrono::{DateTime, Utc}; +use anyhow::bail; +use api::data_type::DataType; +use api::data_value::DataValue; +use api::messages::command::{Command, CommandDefinition, CommandResponse}; +use api::messages::ResponseMessage; +use chrono::Utc; use log::error; use papaya::HashMap; use std::collections::HashMap as StdHashMap; -use tokio::sync::mpsc; use tokio::sync::oneshot; +use tokio::sync::{mpsc, RwLock}; +use uuid::Uuid; #[derive(Clone)] pub(super) struct RegisteredCommand { pub(super) name: String, - pub(super) definition: CommandDefinitionRequest, - tx: mpsc::Sender)>>, + pub(super) definition: CommandDefinition, + response_uuid: Uuid, + tx: mpsc::Sender, } pub struct CommandManagementService { registered_commands: HashMap, + outstanding_responses: RwLock>>, } impl CommandManagementService { pub fn new() -> Self { Self { registered_commands: HashMap::new(), + outstanding_responses: RwLock::new(StdHashMap::new()), } } @@ -52,26 +56,26 @@ impl CommandManagementService { .map(|registration| registration.clone().into()) } - pub async fn register_command( + pub fn register_command( &self, - command: CommandDefinitionRequest, - ) -> anyhow::Result)>>> { - let (tx, rx) = mpsc::channel(1); - - let registered_commands = self.registered_commands.pin_owned(); - if let Some(previous) = registered_commands.insert( - command.name.clone(), + uuid: Uuid, + command: CommandDefinition, + tx: mpsc::Sender, + ) -> anyhow::Result { + let registered_commands = self.registered_commands.pin(); + // We don't care about the previously registered command + let name = command.name.clone(); + let _ = registered_commands.insert( + name.clone(), RegisteredCommand { - name: command.name.clone(), + response_uuid: uuid, + name: name.clone(), definition: command, tx, }, - ) { - // If the receiver was already closed, we don't care (ignore error) - let _ = previous.tx.send(None).await; - } + ); - Ok(rx) + Ok(CommandHandle::new(name, uuid)) } pub async fn send_command( @@ -80,8 +84,6 @@ impl CommandManagementService { parameters: serde_json::Map, ) -> Result { let timestamp = Utc::now(); - let offset_from_unix_epoch = - timestamp - DateTime::from_timestamp(0, 0).expect("Could not get Unix epoch"); let name = name.into(); let registered_commands = self.registered_commands.pin(); @@ -100,27 +102,21 @@ impl CommandManagementService { let Some(param_value) = parameters.get(¶meter.name) else { return Err(MisingParameter(parameter.name.clone())); }; - let Some(param_value) = (match parameter.data_type() { - TelemetryDataType::Float32 => { - param_value.as_f64().map(|v| Value::Float32(v as f32)) - } - TelemetryDataType::Float64 => param_value.as_f64().map(Value::Float64), - TelemetryDataType::Boolean => param_value.as_bool().map(Value::Boolean), + let Some(param_value) = (match parameter.data_type { + DataType::Float32 => param_value.as_f64().map(|v| DataValue::Float32(v as f32)), + DataType::Float64 => param_value.as_f64().map(DataValue::Float64), + DataType::Boolean => param_value.as_bool().map(DataValue::Boolean), }) else { return Err(WrongParameterType { name: parameter.name.clone(), - expected_type: parameter.data_type(), + expected_type: parameter.data_type, }); }; - result_parameters.insert( - parameter.name.clone(), - TelemetryValue { - value: Some(param_value), - }, - ); + result_parameters.insert(parameter.name.clone(), param_value); } // Clone & Drop lets us use a standard pin instead of an owned pin + let response_uuid = registration.response_uuid; let tx = registration.tx.clone(); drop(registered_commands); @@ -128,23 +124,27 @@ impl CommandManagementService { return Err(NoCommandReceiver); } - let uuid = Uuid::random(); + let uuid = Uuid::new_v4(); let (response_tx, response_rx) = oneshot::channel(); + + { + let mut outstanding_responses = self.outstanding_responses.write().await; + outstanding_responses.insert(uuid, response_tx); + } + if let Err(e) = tx - .send(Some(( - Command { - uuid: Some(uuid), - timestamp: Some(Timestamp { - secs: offset_from_unix_epoch.num_seconds(), - nanos: offset_from_unix_epoch.subsec_nanos(), - }), + .send(ResponseMessage { + uuid, + response: Some(response_uuid), + payload: Command { + timestamp, parameters: result_parameters, - }, - response_tx, - ))) + } + .into(), + }) .await { - error!("Failed to Send Command: {e}"); + error!("Failed to Send Command {e}"); return Err(FailedToSend); } @@ -162,4 +162,33 @@ impl CommandManagementService { } }) } + + pub async fn handle_command_response( + &self, + uuid: Uuid, + response: CommandResponse, + ) -> anyhow::Result<()> { + let responder = { + let mut outstanding_responses = self.outstanding_responses.write().await; + outstanding_responses.remove(&uuid) + }; + match responder { + None => bail!("Unexpected Command Response for Command {uuid}"), + Some(response_tx) => { + if let Err(e) = response_tx.send(response) { + bail!("Failed to send Command Response {e:?}"); + } + } + }; + + Ok(()) + } + + pub fn unregister(&self, command_handle: CommandHandle) { + let registered_commands = self.registered_commands.pin(); + // We don't care if this succeeded + let _ = registered_commands.remove_if(command_handle.name(), |_, registration| { + registration.response_uuid == *command_handle.uuid() + }); + } } diff --git a/server/src/grpc/mod.rs b/server/src/grpc/mod.rs deleted file mode 100644 index 1e5eaf5..0000000 --- a/server/src/grpc/mod.rs +++ /dev/null @@ -1,33 +0,0 @@ -mod cmd; - -use crate::command::service::CommandManagementService; -use crate::core::command_service_server::CommandServiceServer; -use crate::grpc::cmd::CoreCommandService; -use log::{error, info}; -use std::sync::Arc; -use tokio::task::JoinHandle; -use tokio_util::sync::CancellationToken; -use tonic::transport::Server; - -pub fn setup( - token: CancellationToken, - command_service: Arc, -) -> anyhow::Result> { - let addr = "[::1]:50051".parse()?; - Ok(tokio::spawn(async move { - let cmd_service = CoreCommandService { - command_service, - cancellation_token: token.clone(), - }; - - info!("Starting gRPC Server"); - let result = Server::builder() - .add_service(CommandServiceServer::new(cmd_service)) - .serve_with_shutdown(addr, token.cancelled_owned()) - .await; - - if let Err(err) = result { - error!("gRPC Server Encountered An Error: {err}"); - } - })) -} diff --git a/server/src/http/api/panels.rs b/server/src/http/api/panels.rs index 055ec44..2341b60 100644 --- a/server/src/http/api/panels.rs +++ b/server/src/http/api/panels.rs @@ -4,6 +4,7 @@ use crate::panels::PanelService; use actix_web::{delete, get, post, put, web, Responder}; use serde::Deserialize; use std::sync::Arc; +use uuid::Uuid; #[derive(Deserialize)] struct CreateParam { @@ -13,7 +14,7 @@ struct CreateParam { #[derive(Deserialize)] struct IdParam { - id: String, + id: Uuid, } #[post("/panel")] @@ -22,7 +23,7 @@ pub(super) async fn new( data: web::Json, ) -> Result { let uuid = panels.create(&data.name, &data.data).await?; - Ok(web::Json(uuid.value)) + Ok(web::Json(uuid)) } #[get("/panel")] @@ -38,12 +39,10 @@ pub(super) async fn get_one( panels: web::Data>, path: web::Path, ) -> Result { - let result = panels.read(path.id.clone().into()).await?; + let result = panels.read(path.id).await?; match result { Some(result) => Ok(web::Json(result)), - None => Err(HttpServerResultError::PanelUuidNotFound { - uuid: path.id.clone(), - }), + None => Err(HttpServerResultError::PanelUuidNotFound { uuid: path.id }), } } @@ -53,7 +52,7 @@ pub(super) async fn set( path: web::Path, data: web::Json, ) -> Result { - panels.update(path.id.clone().into(), data.0).await?; + panels.update(path.id, data.0).await?; Ok(web::Json(())) } @@ -62,6 +61,6 @@ pub(super) async fn delete( panels: web::Data>, path: web::Path, ) -> Result { - panels.delete(path.id.clone().into()).await?; + panels.delete(path.id).await?; Ok(web::Json(())) } diff --git a/server/src/http/backend/connection.rs b/server/src/http/backend/connection.rs index 1004b48..1aac379 100644 --- a/server/src/http/backend/connection.rs +++ b/server/src/http/backend/connection.rs @@ -1,3 +1,5 @@ +use crate::command::command_handle::CommandHandle; +use crate::command::service::CommandManagementService; use crate::telemetry::management_service::TelemetryManagementService; use actix_ws::{AggregatedMessage, ProtocolError, Session}; use anyhow::bail; @@ -10,18 +12,26 @@ use uuid::Uuid; pub(super) struct BackendConnection { session: Session, tlm_management: Arc, + cmd_management: Arc, tx: Sender, + commands: Vec, pub rx: Receiver, pub should_close: bool, } impl BackendConnection { - pub fn new(session: Session, tlm_management: Arc) -> Self { + pub fn new( + session: Session, + tlm_management: Arc, + cmd_management: Arc, + ) -> Self { let (tx, rx) = tokio::sync::mpsc::channel::(128); Self { session, tlm_management, + cmd_management, tx, + commands: vec![], rx, should_close: false, } @@ -41,6 +51,21 @@ impl BackendConnection { RequestMessagePayload::TelemetryEntry(tlm_entry) => { self.tlm_management.add_tlm_item(tlm_entry)?; } + RequestMessagePayload::GenericCallbackError(_) => todo!(), + RequestMessagePayload::CommandDefinition(def) => { + let cmd = self + .cmd_management + .register_command(msg.uuid, def, self.tx.clone())?; + self.commands.push(cmd); + } + RequestMessagePayload::CommandResponse(response) => match msg.response { + None => bail!("Command Response Payload Must Respond to a Command"), + Some(uuid) => { + self.cmd_management + .handle_command_response(uuid, response) + .await?; + } + }, } Ok(()) } @@ -78,6 +103,15 @@ impl BackendConnection { pub async fn cleanup(mut self) { self.rx.close(); - let _ = self.session.close(None).await; + // Clone here to prevent conflict with the Drop trait + let _ = self.session.clone().close(None).await; + } +} + +impl Drop for BackendConnection { + fn drop(&mut self) { + for command in self.commands.drain(..) { + self.cmd_management.unregister(command); + } } } diff --git a/server/src/http/backend/mod.rs b/server/src/http/backend/mod.rs index 3285285..bb1580d 100644 --- a/server/src/http/backend/mod.rs +++ b/server/src/http/backend/mod.rs @@ -1,5 +1,6 @@ mod connection; +use crate::command::service::CommandManagementService; use crate::http::backend::connection::BackendConnection; use crate::telemetry::management_service::TelemetryManagementService; use actix_web::{rt, web, HttpRequest, HttpResponse}; @@ -14,6 +15,7 @@ async fn backend_connect( stream: web::Payload, cancel_token: web::Data, telemetry_management_service: web::Data>, + command_management_service: web::Data>, ) -> Result { trace!("backend_connect"); let (res, session, stream) = actix_ws::handle(&req, stream)?; @@ -25,9 +27,10 @@ async fn backend_connect( let cancel_token = cancel_token.get_ref().clone(); let tlm_management = telemetry_management_service.get_ref().clone(); + let cmd_management = command_management_service.get_ref().clone(); rt::spawn(async move { - let mut connection = BackendConnection::new(session, tlm_management); + let mut connection = BackendConnection::new(session, tlm_management, cmd_management); while !connection.should_close { let result = select! { _ = cancel_token.cancelled() => { diff --git a/server/src/http/error.rs b/server/src/http/error.rs index f2d8113..a37fff9 100644 --- a/server/src/http/error.rs +++ b/server/src/http/error.rs @@ -20,7 +20,7 @@ pub enum HttpServerResultError { #[error("Internal Error")] InternalError(#[from] anyhow::Error), #[error("Panel Uuid Not Found: {uuid}")] - PanelUuidNotFound { uuid: String }, + PanelUuidNotFound { uuid: Uuid }, #[error(transparent)] Command(#[from] crate::command::error::Error), } diff --git a/server/src/lib.rs b/server/src/lib.rs index 1b5a026..754e77f 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -1,14 +1,8 @@ mod command; -mod grpc; mod http; mod panels; mod serialization; mod telemetry; -mod uuid; - -pub mod core { - tonic::include_proto!("core"); -} use crate::command::service::CommandManagementService; use crate::panels::PanelService; @@ -53,14 +47,11 @@ pub async fn setup() -> anyhow::Result<()> { let cmd = Arc::new(CommandManagementService::new()); - let grpc_server = grpc::setup(cancellation_token.clone(), cmd.clone())?; - let panel_service = PanelService::new(sqlite.clone()); let result = http::setup(cancellation_token.clone(), tlm.clone(), panel_service, cmd).await; cancellation_token.cancel(); result?; // result is dropped - grpc_server.await?; //grpc server is dropped drop(cancellation_token); // All cancellation tokens are now dropped sqlite.close().await; diff --git a/server/src/panels/mod.rs b/server/src/panels/mod.rs index bb92760..1c91e04 100644 --- a/server/src/panels/mod.rs +++ b/server/src/panels/mod.rs @@ -1,9 +1,9 @@ pub mod panel; -use crate::core::Uuid; use crate::panels::panel::{PanelRequired, PanelUpdate}; use panel::Panel; use sqlx::SqlitePool; +use uuid::Uuid; pub struct PanelService { pool: SqlitePool, @@ -15,7 +15,8 @@ impl PanelService { } pub async fn create(&self, name: &str, data: &str) -> anyhow::Result { - let id = Uuid::random(); + let id = Uuid::new_v4(); + let id_string = id.to_string(); let mut transaction = self.pool.begin().await?; @@ -24,7 +25,7 @@ impl PanelService { INSERT INTO PANELS (id, name, data, deleted) VALUES ($1, $2, $3, FALSE); "#, - id.value, + id_string, name, data ) @@ -65,7 +66,7 @@ impl PanelService { WHERE id = $1 AND deleted = FALSE "#, ) - .bind(id.value) + .bind(id.to_string()) .fetch_optional(&mut *transaction) .await?; @@ -75,6 +76,7 @@ impl PanelService { } pub async fn update(&self, id: Uuid, data: PanelUpdate) -> anyhow::Result<()> { + let id = id.to_string(); let mut transaction = self.pool.begin().await?; if let Some(name) = data.name { @@ -84,7 +86,7 @@ impl PanelService { SET name = $2 WHERE id = $1; "#, - id.value, + id, name ) .execute(&mut *transaction) @@ -97,7 +99,7 @@ impl PanelService { SET data = $2 WHERE id = $1; "#, - id.value, + id, data ) .execute(&mut *transaction) @@ -110,6 +112,7 @@ impl PanelService { } pub async fn delete(&self, id: Uuid) -> anyhow::Result<()> { + let id = id.to_string(); let mut transaction = self.pool.begin().await?; let _ = sqlx::query!( @@ -118,7 +121,7 @@ impl PanelService { SET deleted = TRUE WHERE id = $1; "#, - id.value, + id, ) .execute(&mut *transaction) .await?; diff --git a/server/src/telemetry/data_type.rs b/server/src/telemetry/data_type.rs deleted file mode 100644 index a19c64d..0000000 --- a/server/src/telemetry/data_type.rs +++ /dev/null @@ -1,38 +0,0 @@ -use crate::core::TelemetryDataType; -use serde::de::Visitor; -use serde::{Deserializer, Serializer}; -use std::fmt::Formatter; - -pub fn tlm_data_type_serializer( - tlm_data_type: &TelemetryDataType, - serializer: S, -) -> Result -where - S: Serializer, -{ - serializer.serialize_str(tlm_data_type.as_str_name()) -} - -struct TlmDataTypeVisitor; - -impl Visitor<'_> for TlmDataTypeVisitor { - type Value = TelemetryDataType; - - fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result { - formatter.write_str("A &str") - } - - fn visit_str(self, v: &str) -> Result - where - E: serde::de::Error, - { - TelemetryDataType::from_str_name(v).ok_or(E::custom("Invalid TelemetryDataType")) - } -} - -pub fn tlm_data_type_deserializer<'de, D>(deserializer: D) -> Result -where - D: Deserializer<'de>, -{ - deserializer.deserialize_str(TlmDataTypeVisitor) -} diff --git a/server/src/telemetry/mod.rs b/server/src/telemetry/mod.rs index d5272b6..bfc497c 100644 --- a/server/src/telemetry/mod.rs +++ b/server/src/telemetry/mod.rs @@ -1,6 +1,5 @@ pub mod data; pub mod data_item; -pub mod data_type; pub mod definition; pub mod history; pub mod management_service; diff --git a/server/src/uuid.rs b/server/src/uuid.rs deleted file mode 100644 index 87f5adf..0000000 --- a/server/src/uuid.rs +++ /dev/null @@ -1,18 +0,0 @@ -use crate::core::Uuid; -use rand::RngCore; - -impl Uuid { - pub fn random() -> Self { - let mut uuid = [0u8; 16]; - rand::rng().fill_bytes(&mut uuid); - Self { - value: hex::encode(uuid), - } - } -} - -impl From for Uuid { - fn from(value: String) -> Self { - Self { value } - } -}