From 678b10de08f6abf1589440c44136e04da89b01e7 Mon Sep 17 00:00:00 2001 From: Sergey Savelyev Date: Sun, 28 Dec 2025 16:23:42 -0500 Subject: [PATCH] implement command responses --- Cargo.lock | 1 + examples/simple_command/Cargo.toml | 1 + examples/simple_command/src/main.rs | 121 +++++++++++++++++++++++----- server/proto/core.proto | 20 ++++- server/src/command/error.rs | 6 ++ server/src/command/service.rs | 49 +++++++---- server/src/grpc/cmd.rs | 97 ++++++++++++++++++---- server/src/http/api/cmd.rs | 4 +- 8 files changed, 242 insertions(+), 57 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 762214e..8f99aab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2132,6 +2132,7 @@ dependencies = [ name = "simple_command" version = "0.0.0" dependencies = [ + "anyhow", "chrono", "log", "num-traits", diff --git a/examples/simple_command/Cargo.toml b/examples/simple_command/Cargo.toml index bf018d5..b799df6 100644 --- a/examples/simple_command/Cargo.toml +++ b/examples/simple_command/Cargo.toml @@ -11,3 +11,4 @@ chrono = "0.4.39" tokio-util = "0.7.13" num-traits = "0.2.19" log = "0.4.29" +anyhow = "1.0.100" diff --git a/examples/simple_command/src/main.rs b/examples/simple_command/src/main.rs index c6c3349..ee6f512 100644 --- a/examples/simple_command/src/main.rs +++ b/examples/simple_command/src/main.rs @@ -1,11 +1,79 @@ use chrono::DateTime; +use server::core::client_side_command::Inner; use server::core::command_service_client::CommandServiceClient; use server::core::telemetry_value::Value; -use server::core::{CommandDefinitionRequest, CommandParameterDefinition, TelemetryDataType}; +use server::core::{ + ClientSideCommand, Command, CommandDefinitionRequest, CommandParameterDefinition, + CommandResponse, TelemetryDataType, +}; use std::error::Error; use tokio::select; +use tokio::sync::mpsc; +use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; +use tonic::codegen::tokio_stream::wrappers::ReceiverStream; use tonic::codegen::tokio_stream::StreamExt; +use tonic::transport::Channel; + +struct CommandHandler { + handle: JoinHandle<()>, +} + +impl CommandHandler { + pub async fn new CommandResponse + Send + 'static>( + cancellation_token: CancellationToken, + client: &mut CommandServiceClient, + command_definition_request: CommandDefinitionRequest, + handler: F, + ) -> anyhow::Result { + let (tx, rx) = mpsc::channel(4); + + // The buffer size of 4 means this is safe to send immediately + tx.send(ClientSideCommand { + inner: Some(Inner::Request(command_definition_request)), + }) + .await?; + let response = client.new_command(ReceiverStream::new(rx)).await?; + let mut cmd_stream = response.into_inner(); + + let handle = tokio::spawn(async move { + loop { + select! { + _ = cancellation_token.cancelled() => break, + Some(msg) = cmd_stream.next() => { + match msg { + Ok(cmd) => { + let uuid = cmd.uuid.clone(); + let mut response = handler(cmd); + response.uuid = uuid; + match tx.send(ClientSideCommand { + inner: Some(Inner::Response(response)) + }).await { + Ok(()) => {}, + Err(e) => { + println!("SendError: {e}"); + break; + } + } + } + Err(e) => { + println!("Error: {e}"); + break; + } + } + }, + else => break, + } + } + }); + + Ok(Self { handle }) + } + + pub async fn join(self) -> anyhow::Result<()> { + Ok(self.handle.await?) + } +} #[tokio::main] async fn main() -> Result<(), Box> { @@ -20,8 +88,10 @@ async fn main() -> Result<(), Box> { let mut client = CommandServiceClient::connect("http://[::1]:50051").await?; - let command_response = client - .new_command(CommandDefinitionRequest { + let cmd_handler = CommandHandler::new( + cancellation_token, + &mut client, + CommandDefinitionRequest { name: "simple_command/a".to_string(), parameters: vec![ CommandParameterDefinition { @@ -37,28 +107,35 @@ async fn main() -> Result<(), Box> { data_type: TelemetryDataType::Boolean.into(), }, ], - }) - .await?; - let mut command_stream = command_response.into_inner(); + }, + |command| { + let timestamp = command.timestamp.expect("Missing Timestamp"); + let timestamp = DateTime::from_timestamp(timestamp.secs, timestamp.nanos as u32) + .expect("Could not construct date time"); + let Value::Float32(a) = command.parameters["a"].value.expect("Missing Value a") else { + panic!("Wrong Type a"); + }; + let Value::Float64(b) = command.parameters["b"].value.expect("Missing Value b") else { + panic!("Wrong Type b"); + }; + let Value::Boolean(c) = command.parameters["c"].value.expect("Missing Value c") else { + panic!("Wrong Type c"); + }; - loop { - select! { - _ = cancellation_token.cancelled() => { - break; - }, - Some(command) = command_stream.next() => { - let command = command?; + println!("Command Received:\n timestamp: {timestamp}\n a: {a}\n b: {b}\n c: {c}"); - let timestamp = command.timestamp.expect("Missing Timestamp"); - let timestamp = DateTime::from_timestamp(timestamp.secs, timestamp.nanos as u32).expect("Could not construct date time"); - let Value::Float32(a) = command.parameters["a"].value.expect("Missing Value a") else { panic!("Wrong Type a"); }; - let Value::Float64(b) = command.parameters["b"].value.expect("Missing Value b") else { panic!("Wrong Type b"); }; - let Value::Boolean(c) = command.parameters["c"].value.expect("Missing Value c") else { panic!("Wrong Type c"); }; + CommandResponse { + uuid: command.uuid.clone(), + success: true, + response: format!( + "Successfully Received Command! timestamp: {timestamp} a: {a} b: {b} c: {c}" + ), + } + }, + ) + .await?; - println!("Command Received:\n timestamp: {timestamp}\n a: {a}\n b: {b}\n c: {c}"); - }, - } - } + cmd_handler.join().await?; Ok(()) } diff --git a/server/proto/core.proto b/server/proto/core.proto index d6b98e6..18d0ea9 100644 --- a/server/proto/core.proto +++ b/server/proto/core.proto @@ -60,10 +60,24 @@ message CommandDefinitionRequest { } message Command { - Timestamp timestamp = 1; - map parameters = 2; + UUID uuid = 1; + Timestamp timestamp = 2; + map parameters = 3; +} + +message CommandResponse { + UUID uuid = 1; + bool success = 2; + string response = 3; +} + +message ClientSideCommand { + oneof inner { + CommandDefinitionRequest request = 1; + CommandResponse response = 2; + } } service CommandService { - rpc NewCommand (CommandDefinitionRequest) returns (stream Command); + rpc NewCommand (stream ClientSideCommand) returns (stream Command); } diff --git a/server/src/command/error.rs b/server/src/command/error.rs index aa0bbe0..698de99 100644 --- a/server/src/command/error.rs +++ b/server/src/command/error.rs @@ -20,6 +20,10 @@ pub enum Error { NoCommandReceiver, #[error("Failed to Send")] FailedToSend, + #[error("Failed to Receive Command Response")] + FailedToReceiveResponse, + #[error("Command Failure: {0}")] + CommandFailure(String), } impl ResponseError for Error { @@ -31,6 +35,8 @@ impl ResponseError for Error { Error::WrongParameterType { .. } => StatusCode::BAD_REQUEST, Error::NoCommandReceiver => StatusCode::SERVICE_UNAVAILABLE, Error::FailedToSend => StatusCode::INTERNAL_SERVER_ERROR, + Error::FailedToReceiveResponse => StatusCode::INTERNAL_SERVER_ERROR, + Error::CommandFailure(_) => StatusCode::BAD_REQUEST, } } } diff --git a/server/src/command/service.rs b/server/src/command/service.rs index 452451e..784b736 100644 --- a/server/src/command/service.rs +++ b/server/src/command/service.rs @@ -1,25 +1,26 @@ use crate::command::definition::CommandDefinition; use crate::command::error::Error as CmdError; use crate::command::error::Error::{ - CommandNotFound, FailedToSend, IncorrectParameterCount, MisingParameter, NoCommandReceiver, - WrongParameterType, + CommandFailure, CommandNotFound, FailedToReceiveResponse, FailedToSend, + IncorrectParameterCount, MisingParameter, NoCommandReceiver, WrongParameterType, }; use crate::core::telemetry_value::Value; use crate::core::{ - Command, CommandDefinitionRequest, TelemetryDataType, TelemetryValue, Timestamp, + Command, CommandDefinitionRequest, CommandResponse, TelemetryDataType, TelemetryValue, + Timestamp, Uuid, }; use chrono::{DateTime, Utc}; use log::error; use papaya::HashMap; use std::collections::HashMap as StdHashMap; use tokio::sync::mpsc; -use tokio::sync::mpsc::{Receiver, Sender}; +use tokio::sync::oneshot; #[derive(Clone)] pub(super) struct RegisteredCommand { pub(super) name: String, pub(super) definition: CommandDefinitionRequest, - tx: Sender>, + tx: mpsc::Sender)>>, } pub struct CommandManagementService { @@ -54,7 +55,7 @@ impl CommandManagementService { pub async fn register_command( &self, command: CommandDefinitionRequest, - ) -> anyhow::Result>> { + ) -> anyhow::Result)>>> { let (tx, rx) = mpsc::channel(1); let registered_commands = self.registered_commands.pin_owned(); @@ -77,7 +78,7 @@ impl CommandManagementService { &self, name: impl Into, parameters: serde_json::Map, - ) -> Result<(), CmdError> { + ) -> Result { let timestamp = Utc::now(); let offset_from_unix_epoch = timestamp - DateTime::from_timestamp(0, 0).expect("Could not get Unix epoch"); @@ -127,20 +128,38 @@ impl CommandManagementService { return Err(NoCommandReceiver); } + let uuid = Uuid::random(); + let (response_tx, response_rx) = oneshot::channel(); if let Err(e) = tx - .send(Some(Command { - timestamp: Some(Timestamp { - secs: offset_from_unix_epoch.num_seconds(), - nanos: offset_from_unix_epoch.subsec_nanos(), - }), - parameters: result_parameters, - })) + .send(Some(( + Command { + uuid: Some(uuid), + timestamp: Some(Timestamp { + secs: offset_from_unix_epoch.num_seconds(), + nanos: offset_from_unix_epoch.subsec_nanos(), + }), + parameters: result_parameters, + }, + response_tx, + ))) .await { error!("Failed to Send Command: {e}"); return Err(FailedToSend); } - Ok(()) + response_rx + .await + .map_err(|e| { + error!("Failed to Receive Command Response: {e}"); + FailedToReceiveResponse + }) + .and_then(|response| { + if response.success { + Ok(response.response) + } else { + Err(CommandFailure(response.response)) + } + }) } } diff --git a/server/src/grpc/cmd.rs b/server/src/grpc/cmd.rs index 359b380..900201e 100644 --- a/server/src/grpc/cmd.rs +++ b/server/src/grpc/cmd.rs @@ -1,15 +1,18 @@ use crate::command::service::CommandManagementService; +use crate::core::client_side_command::Inner; use crate::core::command_service_server::CommandService; -use crate::core::{Command, CommandDefinitionRequest}; +use crate::core::{ClientSideCommand, Command, CommandResponse, Uuid}; use log::{error, trace}; +use std::collections::HashMap; use std::pin::Pin; use std::sync::Arc; use tokio::select; use tokio::sync::mpsc; +use tokio::sync::oneshot; use tokio_util::sync::CancellationToken; use tonic::codegen::tokio_stream::wrappers::ReceiverStream; -use tonic::codegen::tokio_stream::Stream; -use tonic::{Request, Response, Status}; +use tonic::codegen::tokio_stream::{Stream, StreamExt}; +use tonic::{Request, Response, Status, Streaming}; pub struct CoreCommandService { pub command_service: Arc, @@ -22,16 +25,33 @@ impl CommandService for CoreCommandService { async fn new_command( &self, - request: Request, + request: Request>, ) -> Result, Status> { trace!("CoreCommandService::new_command"); let cancel_token = self.cancellation_token.clone(); - let mut cmd_rx = match self - .command_service - .register_command(request.into_inner()) - .await - { + let mut in_stream = request.into_inner(); + + let cmd_request = select! { + _ = cancel_token.cancelled() => return Err(Status::internal("Shutting Down")), + Some(message) = in_stream.next() => { + match message { + Ok(ClientSideCommand { + inner: Some(Inner::Request(cmd_request)) + }) => cmd_request, + Err(err) => { + error!("Error in Stream: {err}"); + return Err(Status::cancelled("Error in Stream")); + }, + _ => { + return Err(Status::invalid_argument("First message must be request")); + }, + } + }, + else => return Err(Status::internal("Shutting Down")), + }; + + let mut cmd_rx = match self.command_service.register_command(cmd_request).await { Ok(rx) => rx, Err(e) => { error!("Failed to register command: {e}"); @@ -42,6 +62,8 @@ impl CommandService for CoreCommandService { let (tx, rx) = mpsc::channel(128); tokio::spawn(async move { + let mut result = Status::resource_exhausted("End of Command Stream"); + let mut in_progress = HashMap::>::new(); loop { select! { _ = cancel_token.cancelled() => break, @@ -50,32 +72,77 @@ impl CommandService for CoreCommandService { match message { None => break, Some(message) => { - match tx.send(Ok(message)).await { + let key = message.0.uuid.clone().unwrap().value; + in_progress.insert(key.clone(), message.1); + match tx.send(Ok(message.0)).await { Ok(()) => {}, Err(e) => { error!("Failed to send command data: {e}"); + if in_progress.remove(&key).unwrap().send(CommandResponse { + uuid: Some(Uuid::from(key)), + success: false, + response: "Failed to send command data.".to_string(), + }).is_err() { + error!("Failed to send command response on failure to send command data"); + } break; } } } } - }, + Some(message) = in_stream.next() => { + match message { + Ok(message) => { + match message.inner { + Some(Inner::Response(response)) => { + if let Some(uuid) = &response.uuid { + match in_progress.remove(&uuid.value) { + Some(sender) => { + if sender.send(response).is_err() { + error!("Failed to send command response on success") + } + } + None => { + result = Status::invalid_argument("Invalid Command UUID"); + break; + } + } + } + } + _ => { + result = Status::invalid_argument("Subsequent Message Must Be Command Responses"); + break; + } + } + } + Err(e) => { + error!("Received error from command handler {e}"); + break + }, + } + } else => break, } } cmd_rx.close(); if !tx.is_closed() { - match tx - .send(Err(Status::resource_exhausted("End of Command Stream"))) - .await - { + match tx.send(Err(result)).await { Ok(()) => {} Err(e) => { error!("Failed to close old command sender {e}"); } } } + for (key, sender) in in_progress.drain() { + if sender.send(CommandResponse { + uuid: Some(Uuid::from(key)), + success: false, + response: "Command Handler Shut Down".to_string(), + }).is_err() { + error!("Failed to send command response on shutdown"); + } + } }); Ok(Response::new(Box::pin(ReceiverStream::new(rx)))) diff --git a/server/src/http/api/cmd.rs b/server/src/http/api/cmd.rs index 39de4b0..ed3c8cb 100644 --- a/server/src/http/api/cmd.rs +++ b/server/src/http/api/cmd.rs @@ -9,11 +9,11 @@ pub(super) async fn send_command( name: web::Path, parameters: web::Json>, ) -> Result { - command_service + let result = command_service .send_command(name.to_string(), parameters.into_inner()) .await?; - Ok(web::Json("Command Sent Successfully.")) + Ok(web::Json(result)) } #[get("/cmd")]