Implement Commanding #6

Merged
sergeysav merged 4 commits from sergeysav/commanding into main 2025-12-28 13:39:13 -08:00
14 changed files with 385 additions and 31 deletions
Showing only changes of commit ac710d4e4f - Show all commits

13
Cargo.lock generated
View File

@@ -2128,6 +2128,19 @@ dependencies = [
"rand_core 0.6.4",
]
[[package]]
name = "simple_command"
version = "0.0.0"
dependencies = [
"chrono",
"log",
"num-traits",
"server",
"tokio",
"tokio-util",
"tonic",
]
[[package]]
name = "simple_producer"
version = "0.0.0"

View File

@@ -1,5 +1,5 @@
[workspace]
members = ["server", "examples/simple_producer"]
members = ["server", "examples/simple_producer", "examples/simple_command"]
resolver = "2"
[profile.dev.package.sqlx-macros]

View File

@@ -0,0 +1,13 @@
[package]
name = "simple_command"
edition = "2021"
[dependencies]
server = { path = "../../server" }
tonic = "0.12.3"
tokio = { version = "1.43.0", features = ["rt-multi-thread", "signal"] }
chrono = "0.4.39"
tokio-util = "0.7.13"
num-traits = "0.2.19"
log = "0.4.29"

View File

@@ -0,0 +1,64 @@
use chrono::DateTime;
use server::core::command_service_client::CommandServiceClient;
use server::core::telemetry_value::Value;
use server::core::{CommandDefinitionRequest, CommandParameterDefinition, TelemetryDataType};
use std::error::Error;
use tokio::select;
use tokio_util::sync::CancellationToken;
use tonic::codegen::tokio_stream::StreamExt;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let cancellation_token = CancellationToken::new();
{
let cancellation_token = cancellation_token.clone();
tokio::spawn(async move {
let _ = tokio::signal::ctrl_c().await;
cancellation_token.cancel();
});
}
let mut client = CommandServiceClient::connect("http://[::1]:50051").await?;
let command_response = client
.new_command(CommandDefinitionRequest {
name: "simple_command/a".to_string(),
parameters: vec![
CommandParameterDefinition {
name: "a".to_string(),
data_type: TelemetryDataType::Float32.into(),
},
CommandParameterDefinition {
name: "b".to_string(),
data_type: TelemetryDataType::Float64.into(),
},
CommandParameterDefinition {
name: "c".to_string(),
data_type: TelemetryDataType::Boolean.into(),
},
],
})
.await?;
let mut command_stream = command_response.into_inner();
loop {
select! {
_ = cancellation_token.cancelled() => {
break;
},
Some(command) = command_stream.next() => {
let command = command?;
let timestamp = command.timestamp.expect("Missing Timestamp");
let timestamp = DateTime::from_timestamp(timestamp.secs, timestamp.nanos as u32).expect("Could not construct date time");
let Value::Float32(a) = command.parameters["a"].value.expect("Missing Value a") else { panic!("Wrong Type a"); };
let Value::Float64(b) = command.parameters["b"].value.expect("Missing Value b") else { panic!("Wrong Type b"); };
let Value::Boolean(c) = command.parameters["c"].value.expect("Missing Value c") else { panic!("Wrong Type c"); };
println!("Command Received:\n timestamp: {timestamp}\n a: {a}\n b: {b}\n c: {c}");
},
}
}
Ok(())
}

View File

@@ -48,3 +48,22 @@ service TelemetryService {
rpc NewTelemetry (TelemetryDefinitionRequest) returns (TelemetryDefinitionResponse);
rpc InsertTelemetry (stream TelemetryItem) returns (stream TelemetryInsertResponse);
}
message CommandParameterDefinition {
string name = 1;
TelemetryDataType data_type = 2;
}
message CommandDefinitionRequest {
string name = 1;
repeated CommandParameterDefinition parameters = 2;
}
message Command {
Timestamp timestamp = 1;
map<string, TelemetryValue> parameters = 2;
}
service CommandService {
rpc NewCommand (CommandDefinitionRequest) returns (stream Command);
}

View File

@@ -0,0 +1 @@
pub mod service;

View File

@@ -0,0 +1,115 @@
use crate::core::telemetry_value::Value;
use crate::core::{
Command, CommandDefinitionRequest, TelemetryDataType, TelemetryValue, Timestamp,
};
use anyhow::{bail, ensure};
use chrono::{DateTime, Utc};
use papaya::HashMap;
use std::collections::HashMap as StdHashMap;
use tokio::sync::mpsc;
use tokio::sync::mpsc::{Receiver, Sender};
struct RegisteredCommand {
definition: CommandDefinitionRequest,
tx: Sender<Option<Command>>,
}
pub struct CommandManagementService {
registered_commands: HashMap<String, RegisteredCommand>,
}
impl CommandManagementService {
pub fn new() -> Self {
Self {
registered_commands: HashMap::new(),
}
}
pub async fn register_command(
&self,
command: CommandDefinitionRequest,
) -> anyhow::Result<Receiver<Option<Command>>> {
let (tx, rx) = mpsc::channel(1);
let registered_commands = self.registered_commands.pin_owned();
if let Some(previous) = registered_commands.insert(
command.name.clone(),
RegisteredCommand {
definition: command,
tx,
},
) {
// If the receiver was already closed, we don't care (ignore error)
let _ = previous.tx.send(None).await;
}
Ok(rx)
}
pub async fn send_command(
&self,
name: impl Into<String>,
parameters: serde_json::Map<String, serde_json::Value>,
) -> anyhow::Result<()> {
let timestamp = Utc::now();
let offset_from_unix_epoch =
timestamp - DateTime::from_timestamp(0, 0).expect("Could not get Unix epoch");
let name = name.into();
let registered_commands = self.registered_commands.pin();
let Some(registration) = registered_commands.get(&name) else {
bail!("Command Not Found {name}");
};
ensure!(
parameters.len() == registration.definition.parameters.len(),
"Command has {} parameters. {} expected",
parameters.len(),
registration.definition.parameters.len()
);
let mut result_parameters = StdHashMap::new();
for parameter in &registration.definition.parameters {
let Some(param_value) = parameters.get(&parameter.name) else {
bail!("Command Missing Parameter: {}", parameter.name);
};
let Some(param_value) = (match parameter.data_type() {
TelemetryDataType::Float32 => {
param_value.as_f64().map(|v| Value::Float32(v as f32))
}
TelemetryDataType::Float64 => param_value.as_f64().map(Value::Float64),
TelemetryDataType::Boolean => param_value.as_bool().map(Value::Boolean),
}) else {
bail!(
"Parameter {} has the wrong type. {:?} expected",
parameter.name,
parameter.data_type()
);
};
result_parameters.insert(
parameter.name.clone(),
TelemetryValue {
value: Some(param_value),
},
);
}
// Clone & Drop lets us use a standard pin instead of an owned pin
let tx = registration.tx.clone();
drop(registered_commands);
if let Err(e) = tx
.send(Some(Command {
timestamp: Some(Timestamp {
secs: offset_from_unix_epoch.num_seconds(),
nanos: offset_from_unix_epoch.subsec_nanos(),
}),
parameters: result_parameters,
}))
.await
{
bail!("Failed to send command {e}");
}
Ok(())
}
}

83
server/src/grpc/cmd.rs Normal file
View File

@@ -0,0 +1,83 @@
use crate::command::service::CommandManagementService;
use crate::core::command_service_server::CommandService;
use crate::core::{Command, CommandDefinitionRequest};
use log::{error, trace};
use std::pin::Pin;
use std::sync::Arc;
use tokio::select;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use tonic::codegen::tokio_stream::wrappers::ReceiverStream;
use tonic::codegen::tokio_stream::Stream;
use tonic::{Request, Response, Status};
pub struct CoreCommandService {
pub command_service: Arc<CommandManagementService>,
pub cancellation_token: CancellationToken,
}
#[tonic::async_trait]
impl CommandService for CoreCommandService {
type NewCommandStream = Pin<Box<dyn Stream<Item = Result<Command, Status>> + Send>>;
async fn new_command(
&self,
request: Request<CommandDefinitionRequest>,
) -> Result<Response<Self::NewCommandStream>, Status> {
trace!("CoreCommandService::new_command");
let cancel_token = self.cancellation_token.clone();
let mut cmd_rx = match self
.command_service
.register_command(request.into_inner())
.await
{
Ok(rx) => rx,
Err(e) => {
error!("Failed to register command: {e}");
return Err(Status::internal("Failed to register command"));
}
};
let (tx, rx) = mpsc::channel(128);
tokio::spawn(async move {
loop {
select! {
_ = cancel_token.cancelled() => break,
_ = tx.closed() => break,
Some(message) = cmd_rx.recv() => {
match message {
None => break,
Some(message) => {
match tx.send(Ok(message)).await {
Ok(()) => {},
Err(e) => {
error!("Failed to send command data: {e}");
break;
}
}
}
}
},
else => break,
}
}
cmd_rx.close();
if !tx.is_closed() {
match tx
.send(Err(Status::resource_exhausted("End of Command Stream")))
.await
{
Ok(()) => {}
Err(e) => {
error!("Failed to close old command sender {e}");
}
}
}
});
Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
}
}

44
server/src/grpc/mod.rs Normal file
View File

@@ -0,0 +1,44 @@
mod cmd;
mod tlm;
use crate::command::service::CommandManagementService;
use crate::core::command_service_server::CommandServiceServer;
use crate::core::telemetry_service_server::TelemetryServiceServer;
use crate::grpc::cmd::CoreCommandService;
use crate::grpc::tlm::CoreTelemetryService;
use crate::telemetry::management_service::TelemetryManagementService;
use log::{error, info};
use std::sync::Arc;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tonic::transport::Server;
pub fn setup(
token: CancellationToken,
telemetry_management_service: Arc<TelemetryManagementService>,
command_service: Arc<CommandManagementService>,
) -> anyhow::Result<JoinHandle<()>> {
let addr = "[::1]:50051".parse()?;
Ok(tokio::spawn(async move {
let tlm_service = CoreTelemetryService {
tlm_management: telemetry_management_service,
cancellation_token: token.clone(),
};
let cmd_service = CoreCommandService {
command_service,
cancellation_token: token.clone(),
};
info!("Starting gRPC Server");
let result = Server::builder()
.add_service(TelemetryServiceServer::new(tlm_service))
.add_service(CommandServiceServer::new(cmd_service))
.serve_with_shutdown(addr, token.cancelled_owned())
.await;
if let Err(err) = result {
error!("gRPC Server Encountered An Error: {err}");
}
}))
}

View File

@@ -1,4 +1,4 @@
use crate::core::telemetry_service_server::{TelemetryService, TelemetryServiceServer};
use crate::core::telemetry_service_server::TelemetryService;
use crate::core::telemetry_value::Value;
use crate::core::{
TelemetryDataType, TelemetryDefinitionRequest, TelemetryDefinitionResponse,
@@ -9,16 +9,14 @@ use crate::telemetry::data_value::TelemetryDataValue;
use crate::telemetry::history::TelemetryHistory;
use crate::telemetry::management_service::TelemetryManagementService;
use chrono::{DateTime, SecondsFormat};
use log::{error, info, trace};
use log::trace;
use std::pin::Pin;
use std::sync::Arc;
use tokio::select;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tonic::codegen::tokio_stream::wrappers::ReceiverStream;
use tonic::codegen::tokio_stream::{Stream, StreamExt};
use tonic::transport::Server;
use tonic::{Request, Response, Status, Streaming};
pub struct CoreTelemetryService {
@@ -141,26 +139,3 @@ impl CoreTelemetryService {
Ok(TelemetryInsertResponse {})
}
}
pub fn setup(
token: CancellationToken,
telemetry_management_service: Arc<TelemetryManagementService>,
) -> anyhow::Result<JoinHandle<()>> {
let addr = "[::1]:50051".parse()?;
Ok(tokio::spawn(async move {
let tlm_service = CoreTelemetryService {
tlm_management: telemetry_management_service,
cancellation_token: token.clone(),
};
info!("Starting gRPC Server");
let result = Server::builder()
.add_service(TelemetryServiceServer::new(tlm_service))
.serve_with_shutdown(addr, token.cancelled_owned())
.await;
if let Err(err) = result {
error!("gRPC Server Encountered An Error: {err}");
}
}))
}

View File

@@ -0,0 +1,17 @@
use crate::command::service::CommandManagementService;
use crate::http::error::HttpServerResultError;
use actix_web::{post, web, Responder};
use std::sync::Arc;
#[post("/cmd/{name:[\\w\\d/_-]+}")]
pub(super) async fn send_command(
command_service: web::Data<Arc<CommandManagementService>>,
name: web::Path<String>,
parameters: web::Json<serde_json::Map<String, serde_json::Value>>,
) -> Result<impl Responder, HttpServerResultError> {
command_service
.send_command(name.to_string(), parameters.into_inner())
.await?;
Ok(web::Json(()))
}

View File

@@ -1,3 +1,4 @@
mod cmd;
mod panels;
mod tlm;
@@ -11,5 +12,6 @@ pub fn setup_api(cfg: &mut web::ServiceConfig) {
.service(panels::get_all)
.service(panels::get_one)
.service(panels::set)
.service(panels::delete);
.service(panels::delete)
.service(cmd::send_command);
}

View File

@@ -2,6 +2,7 @@ mod api;
mod error;
mod websocket;
use crate::command::service::CommandManagementService;
use crate::http::api::setup_api;
use crate::http::websocket::setup_websocket;
use crate::panels::PanelService;
@@ -16,10 +17,12 @@ pub async fn setup(
cancellation_token: CancellationToken,
telemetry_definitions: Arc<TelemetryManagementService>,
panel_service: PanelService,
command_service: Arc<CommandManagementService>,
) -> anyhow::Result<()> {
let data = web::Data::new(telemetry_definitions);
let cancel_token = web::Data::new(cancellation_token);
let panel_service = web::Data::new(Arc::new(panel_service));
let command_service = web::Data::new(command_service);
info!("Starting HTTP Server");
HttpServer::new(move || {
@@ -27,6 +30,7 @@ pub async fn setup(
.app_data(data.clone())
.app_data(cancel_token.clone())
.app_data(panel_service.clone())
.app_data(command_service.clone())
.service(web::scope("/ws").configure(setup_websocket))
.service(web::scope("/api").configure(setup_api))
.wrap(Logger::default())

View File

@@ -1,3 +1,4 @@
mod command;
mod grpc;
mod http;
mod panels;
@@ -9,6 +10,7 @@ pub mod core {
tonic::include_proto!("core");
}
use crate::command::service::CommandManagementService;
use crate::panels::PanelService;
use crate::telemetry::history::TelemetryHistoryService;
use crate::telemetry::management_service::TelemetryManagementService;
@@ -49,11 +51,13 @@ pub async fn setup() -> anyhow::Result<()> {
TelemetryHistoryService::new(telemetry_folder)?,
)?);
let grpc_server = grpc::setup(cancellation_token.clone(), tlm.clone())?;
let cmd = Arc::new(CommandManagementService::new());
let grpc_server = grpc::setup(cancellation_token.clone(), tlm.clone(), cmd.clone())?;
let panel_service = PanelService::new(sqlite.clone());
let result = http::setup(cancellation_token.clone(), tlm.clone(), panel_service).await;
let result = http::setup(cancellation_token.clone(), tlm.clone(), panel_service, cmd).await;
cancellation_token.cancel();
result?; // result is dropped
grpc_server.await?; //grpc server is dropped