From ac710d4e4f2aac27d706c988c723518614610f6a Mon Sep 17 00:00:00 2001 From: Sergey Savelyev Date: Sat, 27 Dec 2025 15:01:34 -0500 Subject: [PATCH] initial backend command stuff --- Cargo.lock | 13 ++++ Cargo.toml | 2 +- examples/simple_command/Cargo.toml | 13 ++++ examples/simple_command/src/main.rs | 64 ++++++++++++++++ server/proto/core.proto | 19 +++++ server/src/command/mod.rs | 1 + server/src/command/service.rs | 115 ++++++++++++++++++++++++++++ server/src/grpc/cmd.rs | 83 ++++++++++++++++++++ server/src/grpc/mod.rs | 44 +++++++++++ server/src/{grpc.rs => grpc/tlm.rs} | 29 +------ server/src/http/api/cmd.rs | 17 ++++ server/src/http/api/mod.rs | 4 +- server/src/http/mod.rs | 4 + server/src/lib.rs | 8 +- 14 files changed, 385 insertions(+), 31 deletions(-) create mode 100644 examples/simple_command/Cargo.toml create mode 100644 examples/simple_command/src/main.rs create mode 100644 server/src/command/mod.rs create mode 100644 server/src/command/service.rs create mode 100644 server/src/grpc/cmd.rs create mode 100644 server/src/grpc/mod.rs rename server/src/{grpc.rs => grpc/tlm.rs} (85%) create mode 100644 server/src/http/api/cmd.rs diff --git a/Cargo.lock b/Cargo.lock index 66af3a9..762214e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2128,6 +2128,19 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "simple_command" +version = "0.0.0" +dependencies = [ + "chrono", + "log", + "num-traits", + "server", + "tokio", + "tokio-util", + "tonic", +] + [[package]] name = "simple_producer" version = "0.0.0" diff --git a/Cargo.toml b/Cargo.toml index 8837d48..5567a9c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = ["server", "examples/simple_producer"] +members = ["server", "examples/simple_producer", "examples/simple_command"] resolver = "2" [profile.dev.package.sqlx-macros] diff --git a/examples/simple_command/Cargo.toml b/examples/simple_command/Cargo.toml new file mode 100644 index 0000000..bf018d5 --- /dev/null +++ b/examples/simple_command/Cargo.toml @@ -0,0 +1,13 @@ + +[package] +name = "simple_command" +edition = "2021" + +[dependencies] +server = { path = "../../server" } +tonic = "0.12.3" +tokio = { version = "1.43.0", features = ["rt-multi-thread", "signal"] } +chrono = "0.4.39" +tokio-util = "0.7.13" +num-traits = "0.2.19" +log = "0.4.29" diff --git a/examples/simple_command/src/main.rs b/examples/simple_command/src/main.rs new file mode 100644 index 0000000..c6c3349 --- /dev/null +++ b/examples/simple_command/src/main.rs @@ -0,0 +1,64 @@ +use chrono::DateTime; +use server::core::command_service_client::CommandServiceClient; +use server::core::telemetry_value::Value; +use server::core::{CommandDefinitionRequest, CommandParameterDefinition, TelemetryDataType}; +use std::error::Error; +use tokio::select; +use tokio_util::sync::CancellationToken; +use tonic::codegen::tokio_stream::StreamExt; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let cancellation_token = CancellationToken::new(); + { + let cancellation_token = cancellation_token.clone(); + tokio::spawn(async move { + let _ = tokio::signal::ctrl_c().await; + cancellation_token.cancel(); + }); + } + + let mut client = CommandServiceClient::connect("http://[::1]:50051").await?; + + let command_response = client + .new_command(CommandDefinitionRequest { + name: "simple_command/a".to_string(), + parameters: vec![ + CommandParameterDefinition { + name: "a".to_string(), + data_type: TelemetryDataType::Float32.into(), + }, + CommandParameterDefinition { + name: "b".to_string(), + data_type: TelemetryDataType::Float64.into(), + }, + CommandParameterDefinition { + name: "c".to_string(), + data_type: TelemetryDataType::Boolean.into(), + }, + ], + }) + .await?; + let mut command_stream = command_response.into_inner(); + + loop { + select! { + _ = cancellation_token.cancelled() => { + break; + }, + Some(command) = command_stream.next() => { + let command = command?; + + let timestamp = command.timestamp.expect("Missing Timestamp"); + let timestamp = DateTime::from_timestamp(timestamp.secs, timestamp.nanos as u32).expect("Could not construct date time"); + let Value::Float32(a) = command.parameters["a"].value.expect("Missing Value a") else { panic!("Wrong Type a"); }; + let Value::Float64(b) = command.parameters["b"].value.expect("Missing Value b") else { panic!("Wrong Type b"); }; + let Value::Boolean(c) = command.parameters["c"].value.expect("Missing Value c") else { panic!("Wrong Type c"); }; + + println!("Command Received:\n timestamp: {timestamp}\n a: {a}\n b: {b}\n c: {c}"); + }, + } + } + + Ok(()) +} diff --git a/server/proto/core.proto b/server/proto/core.proto index 27218fb..d6b98e6 100644 --- a/server/proto/core.proto +++ b/server/proto/core.proto @@ -48,3 +48,22 @@ service TelemetryService { rpc NewTelemetry (TelemetryDefinitionRequest) returns (TelemetryDefinitionResponse); rpc InsertTelemetry (stream TelemetryItem) returns (stream TelemetryInsertResponse); } + +message CommandParameterDefinition { + string name = 1; + TelemetryDataType data_type = 2; +} + +message CommandDefinitionRequest { + string name = 1; + repeated CommandParameterDefinition parameters = 2; +} + +message Command { + Timestamp timestamp = 1; + map parameters = 2; +} + +service CommandService { + rpc NewCommand (CommandDefinitionRequest) returns (stream Command); +} diff --git a/server/src/command/mod.rs b/server/src/command/mod.rs new file mode 100644 index 0000000..1f278a4 --- /dev/null +++ b/server/src/command/mod.rs @@ -0,0 +1 @@ +pub mod service; diff --git a/server/src/command/service.rs b/server/src/command/service.rs new file mode 100644 index 0000000..b4bcd83 --- /dev/null +++ b/server/src/command/service.rs @@ -0,0 +1,115 @@ +use crate::core::telemetry_value::Value; +use crate::core::{ + Command, CommandDefinitionRequest, TelemetryDataType, TelemetryValue, Timestamp, +}; +use anyhow::{bail, ensure}; +use chrono::{DateTime, Utc}; +use papaya::HashMap; +use std::collections::HashMap as StdHashMap; +use tokio::sync::mpsc; +use tokio::sync::mpsc::{Receiver, Sender}; + +struct RegisteredCommand { + definition: CommandDefinitionRequest, + tx: Sender>, +} + +pub struct CommandManagementService { + registered_commands: HashMap, +} + +impl CommandManagementService { + pub fn new() -> Self { + Self { + registered_commands: HashMap::new(), + } + } + + pub async fn register_command( + &self, + command: CommandDefinitionRequest, + ) -> anyhow::Result>> { + let (tx, rx) = mpsc::channel(1); + + let registered_commands = self.registered_commands.pin_owned(); + if let Some(previous) = registered_commands.insert( + command.name.clone(), + RegisteredCommand { + definition: command, + tx, + }, + ) { + // If the receiver was already closed, we don't care (ignore error) + let _ = previous.tx.send(None).await; + } + + Ok(rx) + } + + pub async fn send_command( + &self, + name: impl Into, + parameters: serde_json::Map, + ) -> anyhow::Result<()> { + let timestamp = Utc::now(); + let offset_from_unix_epoch = + timestamp - DateTime::from_timestamp(0, 0).expect("Could not get Unix epoch"); + + let name = name.into(); + let registered_commands = self.registered_commands.pin(); + let Some(registration) = registered_commands.get(&name) else { + bail!("Command Not Found {name}"); + }; + + ensure!( + parameters.len() == registration.definition.parameters.len(), + "Command has {} parameters. {} expected", + parameters.len(), + registration.definition.parameters.len() + ); + let mut result_parameters = StdHashMap::new(); + for parameter in ®istration.definition.parameters { + let Some(param_value) = parameters.get(¶meter.name) else { + bail!("Command Missing Parameter: {}", parameter.name); + }; + let Some(param_value) = (match parameter.data_type() { + TelemetryDataType::Float32 => { + param_value.as_f64().map(|v| Value::Float32(v as f32)) + } + TelemetryDataType::Float64 => param_value.as_f64().map(Value::Float64), + TelemetryDataType::Boolean => param_value.as_bool().map(Value::Boolean), + }) else { + bail!( + "Parameter {} has the wrong type. {:?} expected", + parameter.name, + parameter.data_type() + ); + }; + result_parameters.insert( + parameter.name.clone(), + TelemetryValue { + value: Some(param_value), + }, + ); + } + + // Clone & Drop lets us use a standard pin instead of an owned pin + let tx = registration.tx.clone(); + drop(registered_commands); + + if let Err(e) = tx + .send(Some(Command { + timestamp: Some(Timestamp { + secs: offset_from_unix_epoch.num_seconds(), + nanos: offset_from_unix_epoch.subsec_nanos(), + }), + parameters: result_parameters, + })) + .await + { + bail!("Failed to send command {e}"); + } + + Ok(()) + } +} diff --git a/server/src/grpc/cmd.rs b/server/src/grpc/cmd.rs new file mode 100644 index 0000000..359b380 --- /dev/null +++ b/server/src/grpc/cmd.rs @@ -0,0 +1,83 @@ +use crate::command::service::CommandManagementService; +use crate::core::command_service_server::CommandService; +use crate::core::{Command, CommandDefinitionRequest}; +use log::{error, trace}; +use std::pin::Pin; +use std::sync::Arc; +use tokio::select; +use tokio::sync::mpsc; +use tokio_util::sync::CancellationToken; +use tonic::codegen::tokio_stream::wrappers::ReceiverStream; +use tonic::codegen::tokio_stream::Stream; +use tonic::{Request, Response, Status}; + +pub struct CoreCommandService { + pub command_service: Arc, + pub cancellation_token: CancellationToken, +} + +#[tonic::async_trait] +impl CommandService for CoreCommandService { + type NewCommandStream = Pin> + Send>>; + + async fn new_command( + &self, + request: Request, + ) -> Result, Status> { + trace!("CoreCommandService::new_command"); + + let cancel_token = self.cancellation_token.clone(); + let mut cmd_rx = match self + .command_service + .register_command(request.into_inner()) + .await + { + Ok(rx) => rx, + Err(e) => { + error!("Failed to register command: {e}"); + return Err(Status::internal("Failed to register command")); + } + }; + + let (tx, rx) = mpsc::channel(128); + + tokio::spawn(async move { + loop { + select! { + _ = cancel_token.cancelled() => break, + _ = tx.closed() => break, + Some(message) = cmd_rx.recv() => { + match message { + None => break, + Some(message) => { + match tx.send(Ok(message)).await { + Ok(()) => {}, + Err(e) => { + error!("Failed to send command data: {e}"); + break; + } + } + } + } + + }, + else => break, + } + } + cmd_rx.close(); + if !tx.is_closed() { + match tx + .send(Err(Status::resource_exhausted("End of Command Stream"))) + .await + { + Ok(()) => {} + Err(e) => { + error!("Failed to close old command sender {e}"); + } + } + } + }); + + Ok(Response::new(Box::pin(ReceiverStream::new(rx)))) + } +} diff --git a/server/src/grpc/mod.rs b/server/src/grpc/mod.rs new file mode 100644 index 0000000..1e7b01b --- /dev/null +++ b/server/src/grpc/mod.rs @@ -0,0 +1,44 @@ +mod cmd; +mod tlm; + +use crate::command::service::CommandManagementService; +use crate::core::command_service_server::CommandServiceServer; +use crate::core::telemetry_service_server::TelemetryServiceServer; +use crate::grpc::cmd::CoreCommandService; +use crate::grpc::tlm::CoreTelemetryService; +use crate::telemetry::management_service::TelemetryManagementService; +use log::{error, info}; +use std::sync::Arc; +use tokio::task::JoinHandle; +use tokio_util::sync::CancellationToken; +use tonic::transport::Server; + +pub fn setup( + token: CancellationToken, + telemetry_management_service: Arc, + command_service: Arc, +) -> anyhow::Result> { + let addr = "[::1]:50051".parse()?; + Ok(tokio::spawn(async move { + let tlm_service = CoreTelemetryService { + tlm_management: telemetry_management_service, + cancellation_token: token.clone(), + }; + + let cmd_service = CoreCommandService { + command_service, + cancellation_token: token.clone(), + }; + + info!("Starting gRPC Server"); + let result = Server::builder() + .add_service(TelemetryServiceServer::new(tlm_service)) + .add_service(CommandServiceServer::new(cmd_service)) + .serve_with_shutdown(addr, token.cancelled_owned()) + .await; + + if let Err(err) = result { + error!("gRPC Server Encountered An Error: {err}"); + } + })) +} diff --git a/server/src/grpc.rs b/server/src/grpc/tlm.rs similarity index 85% rename from server/src/grpc.rs rename to server/src/grpc/tlm.rs index b9682d3..fff260d 100644 --- a/server/src/grpc.rs +++ b/server/src/grpc/tlm.rs @@ -1,4 +1,4 @@ -use crate::core::telemetry_service_server::{TelemetryService, TelemetryServiceServer}; +use crate::core::telemetry_service_server::TelemetryService; use crate::core::telemetry_value::Value; use crate::core::{ TelemetryDataType, TelemetryDefinitionRequest, TelemetryDefinitionResponse, @@ -9,16 +9,14 @@ use crate::telemetry::data_value::TelemetryDataValue; use crate::telemetry::history::TelemetryHistory; use crate::telemetry::management_service::TelemetryManagementService; use chrono::{DateTime, SecondsFormat}; -use log::{error, info, trace}; +use log::trace; use std::pin::Pin; use std::sync::Arc; use tokio::select; use tokio::sync::mpsc; -use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use tonic::codegen::tokio_stream::wrappers::ReceiverStream; use tonic::codegen::tokio_stream::{Stream, StreamExt}; -use tonic::transport::Server; use tonic::{Request, Response, Status, Streaming}; pub struct CoreTelemetryService { @@ -141,26 +139,3 @@ impl CoreTelemetryService { Ok(TelemetryInsertResponse {}) } } - -pub fn setup( - token: CancellationToken, - telemetry_management_service: Arc, -) -> anyhow::Result> { - let addr = "[::1]:50051".parse()?; - Ok(tokio::spawn(async move { - let tlm_service = CoreTelemetryService { - tlm_management: telemetry_management_service, - cancellation_token: token.clone(), - }; - - info!("Starting gRPC Server"); - let result = Server::builder() - .add_service(TelemetryServiceServer::new(tlm_service)) - .serve_with_shutdown(addr, token.cancelled_owned()) - .await; - - if let Err(err) = result { - error!("gRPC Server Encountered An Error: {err}"); - } - })) -} diff --git a/server/src/http/api/cmd.rs b/server/src/http/api/cmd.rs new file mode 100644 index 0000000..d08c74f --- /dev/null +++ b/server/src/http/api/cmd.rs @@ -0,0 +1,17 @@ +use crate::command::service::CommandManagementService; +use crate::http::error::HttpServerResultError; +use actix_web::{post, web, Responder}; +use std::sync::Arc; + +#[post("/cmd/{name:[\\w\\d/_-]+}")] +pub(super) async fn send_command( + command_service: web::Data>, + name: web::Path, + parameters: web::Json>, +) -> Result { + command_service + .send_command(name.to_string(), parameters.into_inner()) + .await?; + + Ok(web::Json(())) +} diff --git a/server/src/http/api/mod.rs b/server/src/http/api/mod.rs index 4710521..82f5c15 100644 --- a/server/src/http/api/mod.rs +++ b/server/src/http/api/mod.rs @@ -1,3 +1,4 @@ +mod cmd; mod panels; mod tlm; @@ -11,5 +12,6 @@ pub fn setup_api(cfg: &mut web::ServiceConfig) { .service(panels::get_all) .service(panels::get_one) .service(panels::set) - .service(panels::delete); + .service(panels::delete) + .service(cmd::send_command); } diff --git a/server/src/http/mod.rs b/server/src/http/mod.rs index 3690c75..250eafb 100644 --- a/server/src/http/mod.rs +++ b/server/src/http/mod.rs @@ -2,6 +2,7 @@ mod api; mod error; mod websocket; +use crate::command::service::CommandManagementService; use crate::http::api::setup_api; use crate::http::websocket::setup_websocket; use crate::panels::PanelService; @@ -16,10 +17,12 @@ pub async fn setup( cancellation_token: CancellationToken, telemetry_definitions: Arc, panel_service: PanelService, + command_service: Arc, ) -> anyhow::Result<()> { let data = web::Data::new(telemetry_definitions); let cancel_token = web::Data::new(cancellation_token); let panel_service = web::Data::new(Arc::new(panel_service)); + let command_service = web::Data::new(command_service); info!("Starting HTTP Server"); HttpServer::new(move || { @@ -27,6 +30,7 @@ pub async fn setup( .app_data(data.clone()) .app_data(cancel_token.clone()) .app_data(panel_service.clone()) + .app_data(command_service.clone()) .service(web::scope("/ws").configure(setup_websocket)) .service(web::scope("/api").configure(setup_api)) .wrap(Logger::default()) diff --git a/server/src/lib.rs b/server/src/lib.rs index 616a2b2..0fdf332 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -1,3 +1,4 @@ +mod command; mod grpc; mod http; mod panels; @@ -9,6 +10,7 @@ pub mod core { tonic::include_proto!("core"); } +use crate::command::service::CommandManagementService; use crate::panels::PanelService; use crate::telemetry::history::TelemetryHistoryService; use crate::telemetry::management_service::TelemetryManagementService; @@ -49,11 +51,13 @@ pub async fn setup() -> anyhow::Result<()> { TelemetryHistoryService::new(telemetry_folder)?, )?); - let grpc_server = grpc::setup(cancellation_token.clone(), tlm.clone())?; + let cmd = Arc::new(CommandManagementService::new()); + + let grpc_server = grpc::setup(cancellation_token.clone(), tlm.clone(), cmd.clone())?; let panel_service = PanelService::new(sqlite.clone()); - let result = http::setup(cancellation_token.clone(), tlm.clone(), panel_service).await; + let result = http::setup(cancellation_token.clone(), tlm.clone(), panel_service, cmd).await; cancellation_token.cancel(); result?; // result is dropped grpc_server.await?; //grpc server is dropped