implement command responses

This commit is contained in:
2025-12-28 16:23:42 -05:00
parent c3253f3204
commit 678b10de08
8 changed files with 242 additions and 57 deletions

1
Cargo.lock generated
View File

@@ -2132,6 +2132,7 @@ dependencies = [
name = "simple_command" name = "simple_command"
version = "0.0.0" version = "0.0.0"
dependencies = [ dependencies = [
"anyhow",
"chrono", "chrono",
"log", "log",
"num-traits", "num-traits",

View File

@@ -11,3 +11,4 @@ chrono = "0.4.39"
tokio-util = "0.7.13" tokio-util = "0.7.13"
num-traits = "0.2.19" num-traits = "0.2.19"
log = "0.4.29" log = "0.4.29"
anyhow = "1.0.100"

View File

@@ -1,11 +1,79 @@
use chrono::DateTime; use chrono::DateTime;
use server::core::client_side_command::Inner;
use server::core::command_service_client::CommandServiceClient; use server::core::command_service_client::CommandServiceClient;
use server::core::telemetry_value::Value; use server::core::telemetry_value::Value;
use server::core::{CommandDefinitionRequest, CommandParameterDefinition, TelemetryDataType}; use server::core::{
ClientSideCommand, Command, CommandDefinitionRequest, CommandParameterDefinition,
CommandResponse, TelemetryDataType,
};
use std::error::Error; use std::error::Error;
use tokio::select; use tokio::select;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tonic::codegen::tokio_stream::wrappers::ReceiverStream;
use tonic::codegen::tokio_stream::StreamExt; use tonic::codegen::tokio_stream::StreamExt;
use tonic::transport::Channel;
struct CommandHandler {
handle: JoinHandle<()>,
}
impl CommandHandler {
pub async fn new<F: Fn(Command) -> CommandResponse + Send + 'static>(
cancellation_token: CancellationToken,
client: &mut CommandServiceClient<Channel>,
command_definition_request: CommandDefinitionRequest,
handler: F,
) -> anyhow::Result<Self> {
let (tx, rx) = mpsc::channel(4);
// The buffer size of 4 means this is safe to send immediately
tx.send(ClientSideCommand {
inner: Some(Inner::Request(command_definition_request)),
})
.await?;
let response = client.new_command(ReceiverStream::new(rx)).await?;
let mut cmd_stream = response.into_inner();
let handle = tokio::spawn(async move {
loop {
select! {
_ = cancellation_token.cancelled() => break,
Some(msg) = cmd_stream.next() => {
match msg {
Ok(cmd) => {
let uuid = cmd.uuid.clone();
let mut response = handler(cmd);
response.uuid = uuid;
match tx.send(ClientSideCommand {
inner: Some(Inner::Response(response))
}).await {
Ok(()) => {},
Err(e) => {
println!("SendError: {e}");
break;
}
}
}
Err(e) => {
println!("Error: {e}");
break;
}
}
},
else => break,
}
}
});
Ok(Self { handle })
}
pub async fn join(self) -> anyhow::Result<()> {
Ok(self.handle.await?)
}
}
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> { async fn main() -> Result<(), Box<dyn Error>> {
@@ -20,8 +88,10 @@ async fn main() -> Result<(), Box<dyn Error>> {
let mut client = CommandServiceClient::connect("http://[::1]:50051").await?; let mut client = CommandServiceClient::connect("http://[::1]:50051").await?;
let command_response = client let cmd_handler = CommandHandler::new(
.new_command(CommandDefinitionRequest { cancellation_token,
&mut client,
CommandDefinitionRequest {
name: "simple_command/a".to_string(), name: "simple_command/a".to_string(),
parameters: vec![ parameters: vec![
CommandParameterDefinition { CommandParameterDefinition {
@@ -37,28 +107,35 @@ async fn main() -> Result<(), Box<dyn Error>> {
data_type: TelemetryDataType::Boolean.into(), data_type: TelemetryDataType::Boolean.into(),
}, },
], ],
}) },
.await?; |command| {
let mut command_stream = command_response.into_inner(); let timestamp = command.timestamp.expect("Missing Timestamp");
let timestamp = DateTime::from_timestamp(timestamp.secs, timestamp.nanos as u32)
.expect("Could not construct date time");
let Value::Float32(a) = command.parameters["a"].value.expect("Missing Value a") else {
panic!("Wrong Type a");
};
let Value::Float64(b) = command.parameters["b"].value.expect("Missing Value b") else {
panic!("Wrong Type b");
};
let Value::Boolean(c) = command.parameters["c"].value.expect("Missing Value c") else {
panic!("Wrong Type c");
};
loop { println!("Command Received:\n timestamp: {timestamp}\n a: {a}\n b: {b}\n c: {c}");
select! {
_ = cancellation_token.cancelled() => {
break;
},
Some(command) = command_stream.next() => {
let command = command?;
let timestamp = command.timestamp.expect("Missing Timestamp"); CommandResponse {
let timestamp = DateTime::from_timestamp(timestamp.secs, timestamp.nanos as u32).expect("Could not construct date time"); uuid: command.uuid.clone(),
let Value::Float32(a) = command.parameters["a"].value.expect("Missing Value a") else { panic!("Wrong Type a"); }; success: true,
let Value::Float64(b) = command.parameters["b"].value.expect("Missing Value b") else { panic!("Wrong Type b"); }; response: format!(
let Value::Boolean(c) = command.parameters["c"].value.expect("Missing Value c") else { panic!("Wrong Type c"); }; "Successfully Received Command! timestamp: {timestamp} a: {a} b: {b} c: {c}"
),
}
},
)
.await?;
println!("Command Received:\n timestamp: {timestamp}\n a: {a}\n b: {b}\n c: {c}"); cmd_handler.join().await?;
},
}
}
Ok(()) Ok(())
} }

View File

@@ -60,10 +60,24 @@ message CommandDefinitionRequest {
} }
message Command { message Command {
Timestamp timestamp = 1; UUID uuid = 1;
map<string, TelemetryValue> parameters = 2; Timestamp timestamp = 2;
map<string, TelemetryValue> parameters = 3;
}
message CommandResponse {
UUID uuid = 1;
bool success = 2;
string response = 3;
}
message ClientSideCommand {
oneof inner {
CommandDefinitionRequest request = 1;
CommandResponse response = 2;
}
} }
service CommandService { service CommandService {
rpc NewCommand (CommandDefinitionRequest) returns (stream Command); rpc NewCommand (stream ClientSideCommand) returns (stream Command);
} }

View File

@@ -20,6 +20,10 @@ pub enum Error {
NoCommandReceiver, NoCommandReceiver,
#[error("Failed to Send")] #[error("Failed to Send")]
FailedToSend, FailedToSend,
#[error("Failed to Receive Command Response")]
FailedToReceiveResponse,
#[error("Command Failure: {0}")]
CommandFailure(String),
} }
impl ResponseError for Error { impl ResponseError for Error {
@@ -31,6 +35,8 @@ impl ResponseError for Error {
Error::WrongParameterType { .. } => StatusCode::BAD_REQUEST, Error::WrongParameterType { .. } => StatusCode::BAD_REQUEST,
Error::NoCommandReceiver => StatusCode::SERVICE_UNAVAILABLE, Error::NoCommandReceiver => StatusCode::SERVICE_UNAVAILABLE,
Error::FailedToSend => StatusCode::INTERNAL_SERVER_ERROR, Error::FailedToSend => StatusCode::INTERNAL_SERVER_ERROR,
Error::FailedToReceiveResponse => StatusCode::INTERNAL_SERVER_ERROR,
Error::CommandFailure(_) => StatusCode::BAD_REQUEST,
} }
} }
} }

View File

@@ -1,25 +1,26 @@
use crate::command::definition::CommandDefinition; use crate::command::definition::CommandDefinition;
use crate::command::error::Error as CmdError; use crate::command::error::Error as CmdError;
use crate::command::error::Error::{ use crate::command::error::Error::{
CommandNotFound, FailedToSend, IncorrectParameterCount, MisingParameter, NoCommandReceiver, CommandFailure, CommandNotFound, FailedToReceiveResponse, FailedToSend,
WrongParameterType, IncorrectParameterCount, MisingParameter, NoCommandReceiver, WrongParameterType,
}; };
use crate::core::telemetry_value::Value; use crate::core::telemetry_value::Value;
use crate::core::{ use crate::core::{
Command, CommandDefinitionRequest, TelemetryDataType, TelemetryValue, Timestamp, Command, CommandDefinitionRequest, CommandResponse, TelemetryDataType, TelemetryValue,
Timestamp, Uuid,
}; };
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use log::error; use log::error;
use papaya::HashMap; use papaya::HashMap;
use std::collections::HashMap as StdHashMap; use std::collections::HashMap as StdHashMap;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::sync::mpsc::{Receiver, Sender}; use tokio::sync::oneshot;
#[derive(Clone)] #[derive(Clone)]
pub(super) struct RegisteredCommand { pub(super) struct RegisteredCommand {
pub(super) name: String, pub(super) name: String,
pub(super) definition: CommandDefinitionRequest, pub(super) definition: CommandDefinitionRequest,
tx: Sender<Option<Command>>, tx: mpsc::Sender<Option<(Command, oneshot::Sender<CommandResponse>)>>,
} }
pub struct CommandManagementService { pub struct CommandManagementService {
@@ -54,7 +55,7 @@ impl CommandManagementService {
pub async fn register_command( pub async fn register_command(
&self, &self,
command: CommandDefinitionRequest, command: CommandDefinitionRequest,
) -> anyhow::Result<Receiver<Option<Command>>> { ) -> anyhow::Result<mpsc::Receiver<Option<(Command, oneshot::Sender<CommandResponse>)>>> {
let (tx, rx) = mpsc::channel(1); let (tx, rx) = mpsc::channel(1);
let registered_commands = self.registered_commands.pin_owned(); let registered_commands = self.registered_commands.pin_owned();
@@ -77,7 +78,7 @@ impl CommandManagementService {
&self, &self,
name: impl Into<String>, name: impl Into<String>,
parameters: serde_json::Map<String, serde_json::Value>, parameters: serde_json::Map<String, serde_json::Value>,
) -> Result<(), CmdError> { ) -> Result<String, CmdError> {
let timestamp = Utc::now(); let timestamp = Utc::now();
let offset_from_unix_epoch = let offset_from_unix_epoch =
timestamp - DateTime::from_timestamp(0, 0).expect("Could not get Unix epoch"); timestamp - DateTime::from_timestamp(0, 0).expect("Could not get Unix epoch");
@@ -127,20 +128,38 @@ impl CommandManagementService {
return Err(NoCommandReceiver); return Err(NoCommandReceiver);
} }
let uuid = Uuid::random();
let (response_tx, response_rx) = oneshot::channel();
if let Err(e) = tx if let Err(e) = tx
.send(Some(Command { .send(Some((
timestamp: Some(Timestamp { Command {
secs: offset_from_unix_epoch.num_seconds(), uuid: Some(uuid),
nanos: offset_from_unix_epoch.subsec_nanos(), timestamp: Some(Timestamp {
}), secs: offset_from_unix_epoch.num_seconds(),
parameters: result_parameters, nanos: offset_from_unix_epoch.subsec_nanos(),
})) }),
parameters: result_parameters,
},
response_tx,
)))
.await .await
{ {
error!("Failed to Send Command: {e}"); error!("Failed to Send Command: {e}");
return Err(FailedToSend); return Err(FailedToSend);
} }
Ok(()) response_rx
.await
.map_err(|e| {
error!("Failed to Receive Command Response: {e}");
FailedToReceiveResponse
})
.and_then(|response| {
if response.success {
Ok(response.response)
} else {
Err(CommandFailure(response.response))
}
})
} }
} }

View File

@@ -1,15 +1,18 @@
use crate::command::service::CommandManagementService; use crate::command::service::CommandManagementService;
use crate::core::client_side_command::Inner;
use crate::core::command_service_server::CommandService; use crate::core::command_service_server::CommandService;
use crate::core::{Command, CommandDefinitionRequest}; use crate::core::{ClientSideCommand, Command, CommandResponse, Uuid};
use log::{error, trace}; use log::{error, trace};
use std::collections::HashMap;
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
use tokio::select; use tokio::select;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tonic::codegen::tokio_stream::wrappers::ReceiverStream; use tonic::codegen::tokio_stream::wrappers::ReceiverStream;
use tonic::codegen::tokio_stream::Stream; use tonic::codegen::tokio_stream::{Stream, StreamExt};
use tonic::{Request, Response, Status}; use tonic::{Request, Response, Status, Streaming};
pub struct CoreCommandService { pub struct CoreCommandService {
pub command_service: Arc<CommandManagementService>, pub command_service: Arc<CommandManagementService>,
@@ -22,16 +25,33 @@ impl CommandService for CoreCommandService {
async fn new_command( async fn new_command(
&self, &self,
request: Request<CommandDefinitionRequest>, request: Request<Streaming<ClientSideCommand>>,
) -> Result<Response<Self::NewCommandStream>, Status> { ) -> Result<Response<Self::NewCommandStream>, Status> {
trace!("CoreCommandService::new_command"); trace!("CoreCommandService::new_command");
let cancel_token = self.cancellation_token.clone(); let cancel_token = self.cancellation_token.clone();
let mut cmd_rx = match self let mut in_stream = request.into_inner();
.command_service
.register_command(request.into_inner()) let cmd_request = select! {
.await _ = cancel_token.cancelled() => return Err(Status::internal("Shutting Down")),
{ Some(message) = in_stream.next() => {
match message {
Ok(ClientSideCommand {
inner: Some(Inner::Request(cmd_request))
}) => cmd_request,
Err(err) => {
error!("Error in Stream: {err}");
return Err(Status::cancelled("Error in Stream"));
},
_ => {
return Err(Status::invalid_argument("First message must be request"));
},
}
},
else => return Err(Status::internal("Shutting Down")),
};
let mut cmd_rx = match self.command_service.register_command(cmd_request).await {
Ok(rx) => rx, Ok(rx) => rx,
Err(e) => { Err(e) => {
error!("Failed to register command: {e}"); error!("Failed to register command: {e}");
@@ -42,6 +62,8 @@ impl CommandService for CoreCommandService {
let (tx, rx) = mpsc::channel(128); let (tx, rx) = mpsc::channel(128);
tokio::spawn(async move { tokio::spawn(async move {
let mut result = Status::resource_exhausted("End of Command Stream");
let mut in_progress = HashMap::<String, oneshot::Sender<CommandResponse>>::new();
loop { loop {
select! { select! {
_ = cancel_token.cancelled() => break, _ = cancel_token.cancelled() => break,
@@ -50,32 +72,77 @@ impl CommandService for CoreCommandService {
match message { match message {
None => break, None => break,
Some(message) => { Some(message) => {
match tx.send(Ok(message)).await { let key = message.0.uuid.clone().unwrap().value;
in_progress.insert(key.clone(), message.1);
match tx.send(Ok(message.0)).await {
Ok(()) => {}, Ok(()) => {},
Err(e) => { Err(e) => {
error!("Failed to send command data: {e}"); error!("Failed to send command data: {e}");
if in_progress.remove(&key).unwrap().send(CommandResponse {
uuid: Some(Uuid::from(key)),
success: false,
response: "Failed to send command data.".to_string(),
}).is_err() {
error!("Failed to send command response on failure to send command data");
}
break; break;
} }
} }
} }
} }
}, },
Some(message) = in_stream.next() => {
match message {
Ok(message) => {
match message.inner {
Some(Inner::Response(response)) => {
if let Some(uuid) = &response.uuid {
match in_progress.remove(&uuid.value) {
Some(sender) => {
if sender.send(response).is_err() {
error!("Failed to send command response on success")
}
}
None => {
result = Status::invalid_argument("Invalid Command UUID");
break;
}
}
}
}
_ => {
result = Status::invalid_argument("Subsequent Message Must Be Command Responses");
break;
}
}
}
Err(e) => {
error!("Received error from command handler {e}");
break
},
}
}
else => break, else => break,
} }
} }
cmd_rx.close(); cmd_rx.close();
if !tx.is_closed() { if !tx.is_closed() {
match tx match tx.send(Err(result)).await {
.send(Err(Status::resource_exhausted("End of Command Stream")))
.await
{
Ok(()) => {} Ok(()) => {}
Err(e) => { Err(e) => {
error!("Failed to close old command sender {e}"); error!("Failed to close old command sender {e}");
} }
} }
} }
for (key, sender) in in_progress.drain() {
if sender.send(CommandResponse {
uuid: Some(Uuid::from(key)),
success: false,
response: "Command Handler Shut Down".to_string(),
}).is_err() {
error!("Failed to send command response on shutdown");
}
}
}); });
Ok(Response::new(Box::pin(ReceiverStream::new(rx)))) Ok(Response::new(Box::pin(ReceiverStream::new(rx))))

View File

@@ -9,11 +9,11 @@ pub(super) async fn send_command(
name: web::Path<String>, name: web::Path<String>,
parameters: web::Json<serde_json::Map<String, serde_json::Value>>, parameters: web::Json<serde_json::Map<String, serde_json::Value>>,
) -> Result<impl Responder, HttpServerResultError> { ) -> Result<impl Responder, HttpServerResultError> {
command_service let result = command_service
.send_command(name.to_string(), parameters.into_inner()) .send_command(name.to_string(), parameters.into_inner())
.await?; .await?;
Ok(web::Json("Command Sent Successfully.")) Ok(web::Json(result))
} }
#[get("/cmd")] #[get("/cmd")]