Implement Commanding (#6)

Reviewed-on: #6
Co-authored-by: Sergey Savelyev <sergeysav.nn@gmail.com>
Co-committed-by: Sergey Savelyev <sergeysav.nn@gmail.com>
This commit was merged in pull request #6.
This commit is contained in:
2025-12-28 13:39:12 -08:00
committed by sergeysav
parent 8cfaf468e9
commit f658b55586
33 changed files with 1389 additions and 98 deletions

View File

@@ -48,3 +48,36 @@ service TelemetryService {
rpc NewTelemetry (TelemetryDefinitionRequest) returns (TelemetryDefinitionResponse);
rpc InsertTelemetry (stream TelemetryItem) returns (stream TelemetryInsertResponse);
}
message CommandParameterDefinition {
string name = 1;
TelemetryDataType data_type = 2;
}
message CommandDefinitionRequest {
string name = 1;
repeated CommandParameterDefinition parameters = 2;
}
message Command {
UUID uuid = 1;
Timestamp timestamp = 2;
map<string, TelemetryValue> parameters = 3;
}
message CommandResponse {
UUID uuid = 1;
bool success = 2;
string response = 3;
}
message ClientSideCommand {
oneof inner {
CommandDefinitionRequest request = 1;
CommandResponse response = 2;
}
}
service CommandService {
rpc NewCommand (stream ClientSideCommand) returns (stream Command);
}

View File

@@ -0,0 +1,36 @@
use crate::command::service::RegisteredCommand;
use crate::core::TelemetryDataType;
use crate::telemetry::data_type::tlm_data_type_deserializer;
use crate::telemetry::data_type::tlm_data_type_serializer;
use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CommandParameterDefinition {
pub name: String,
#[serde(serialize_with = "tlm_data_type_serializer")]
#[serde(deserialize_with = "tlm_data_type_deserializer")]
pub data_type: TelemetryDataType,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CommandDefinition {
pub name: String,
pub parameters: Vec<CommandParameterDefinition>,
}
impl From<RegisteredCommand> for CommandDefinition {
fn from(value: RegisteredCommand) -> Self {
Self {
name: value.name,
parameters: value
.definition
.parameters
.into_iter()
.map(|param| CommandParameterDefinition {
data_type: param.data_type(),
name: param.name,
})
.collect(),
}
}
}

View File

@@ -0,0 +1,42 @@
use crate::core::TelemetryDataType;
use actix_web::http::StatusCode;
use actix_web::ResponseError;
use thiserror::Error;
#[derive(Error, Debug)]
pub enum Error {
#[error("Command Not Found {0}")]
CommandNotFound(String),
#[error("Incorrect Number of Parameters Specified. {expected} expected. {actual} found.")]
IncorrectParameterCount { expected: usize, actual: usize },
#[error("Missing Parameter {0}.")]
MisingParameter(String),
#[error("Incorrect Parameter Type for {name}. {expected_type:?} expected.")]
WrongParameterType {
name: String,
expected_type: TelemetryDataType,
},
#[error("No Command Receiver")]
NoCommandReceiver,
#[error("Failed to Send")]
FailedToSend,
#[error("Failed to Receive Command Response")]
FailedToReceiveResponse,
#[error("Command Failure: {0}")]
CommandFailure(String),
}
impl ResponseError for Error {
fn status_code(&self) -> StatusCode {
match *self {
Error::CommandNotFound(_) => StatusCode::NOT_FOUND,
Error::IncorrectParameterCount { .. } => StatusCode::BAD_REQUEST,
Error::MisingParameter(_) => StatusCode::BAD_REQUEST,
Error::WrongParameterType { .. } => StatusCode::BAD_REQUEST,
Error::NoCommandReceiver => StatusCode::SERVICE_UNAVAILABLE,
Error::FailedToSend => StatusCode::INTERNAL_SERVER_ERROR,
Error::FailedToReceiveResponse => StatusCode::INTERNAL_SERVER_ERROR,
Error::CommandFailure(_) => StatusCode::BAD_REQUEST,
}
}
}

View File

@@ -0,0 +1,3 @@
mod definition;
pub mod error;
pub mod service;

View File

@@ -0,0 +1,165 @@
use crate::command::definition::CommandDefinition;
use crate::command::error::Error as CmdError;
use crate::command::error::Error::{
CommandFailure, CommandNotFound, FailedToReceiveResponse, FailedToSend,
IncorrectParameterCount, MisingParameter, NoCommandReceiver, WrongParameterType,
};
use crate::core::telemetry_value::Value;
use crate::core::{
Command, CommandDefinitionRequest, CommandResponse, TelemetryDataType, TelemetryValue,
Timestamp, Uuid,
};
use chrono::{DateTime, Utc};
use log::error;
use papaya::HashMap;
use std::collections::HashMap as StdHashMap;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
#[derive(Clone)]
pub(super) struct RegisteredCommand {
pub(super) name: String,
pub(super) definition: CommandDefinitionRequest,
tx: mpsc::Sender<Option<(Command, oneshot::Sender<CommandResponse>)>>,
}
pub struct CommandManagementService {
registered_commands: HashMap<String, RegisteredCommand>,
}
impl CommandManagementService {
pub fn new() -> Self {
Self {
registered_commands: HashMap::new(),
}
}
pub fn get_commands(&self) -> anyhow::Result<Vec<CommandDefinition>> {
let mut result = vec![];
let registered_commands = self.registered_commands.pin();
for registration in registered_commands.values() {
result.push(registration.clone().into());
}
Ok(result)
}
pub fn get_command_definition(&self, name: &String) -> Option<CommandDefinition> {
self.registered_commands
.pin()
.get(name)
.map(|registration| registration.clone().into())
}
pub async fn register_command(
&self,
command: CommandDefinitionRequest,
) -> anyhow::Result<mpsc::Receiver<Option<(Command, oneshot::Sender<CommandResponse>)>>> {
let (tx, rx) = mpsc::channel(1);
let registered_commands = self.registered_commands.pin_owned();
if let Some(previous) = registered_commands.insert(
command.name.clone(),
RegisteredCommand {
name: command.name.clone(),
definition: command,
tx,
},
) {
// If the receiver was already closed, we don't care (ignore error)
let _ = previous.tx.send(None).await;
}
Ok(rx)
}
pub async fn send_command(
&self,
name: impl Into<String>,
parameters: serde_json::Map<String, serde_json::Value>,
) -> Result<String, CmdError> {
let timestamp = Utc::now();
let offset_from_unix_epoch =
timestamp - DateTime::from_timestamp(0, 0).expect("Could not get Unix epoch");
let name = name.into();
let registered_commands = self.registered_commands.pin();
let Some(registration) = registered_commands.get(&name) else {
return Err(CommandNotFound(name));
};
if parameters.len() != registration.definition.parameters.len() {
return Err(IncorrectParameterCount {
expected: registration.definition.parameters.len(),
actual: parameters.len(),
});
}
let mut result_parameters = StdHashMap::new();
for parameter in &registration.definition.parameters {
let Some(param_value) = parameters.get(&parameter.name) else {
return Err(MisingParameter(parameter.name.clone()));
};
let Some(param_value) = (match parameter.data_type() {
TelemetryDataType::Float32 => {
param_value.as_f64().map(|v| Value::Float32(v as f32))
}
TelemetryDataType::Float64 => param_value.as_f64().map(Value::Float64),
TelemetryDataType::Boolean => param_value.as_bool().map(Value::Boolean),
}) else {
return Err(WrongParameterType {
name: parameter.name.clone(),
expected_type: parameter.data_type(),
});
};
result_parameters.insert(
parameter.name.clone(),
TelemetryValue {
value: Some(param_value),
},
);
}
// Clone & Drop lets us use a standard pin instead of an owned pin
let tx = registration.tx.clone();
drop(registered_commands);
if tx.is_closed() {
return Err(NoCommandReceiver);
}
let uuid = Uuid::random();
let (response_tx, response_rx) = oneshot::channel();
if let Err(e) = tx
.send(Some((
Command {
uuid: Some(uuid),
timestamp: Some(Timestamp {
secs: offset_from_unix_epoch.num_seconds(),
nanos: offset_from_unix_epoch.subsec_nanos(),
}),
parameters: result_parameters,
},
response_tx,
)))
.await
{
error!("Failed to Send Command: {e}");
return Err(FailedToSend);
}
response_rx
.await
.map_err(|e| {
error!("Failed to Receive Command Response: {e}");
FailedToReceiveResponse
})
.and_then(|response| {
if response.success {
Ok(response.response)
} else {
Err(CommandFailure(response.response))
}
})
}
}

150
server/src/grpc/cmd.rs Normal file
View File

@@ -0,0 +1,150 @@
use crate::command::service::CommandManagementService;
use crate::core::client_side_command::Inner;
use crate::core::command_service_server::CommandService;
use crate::core::{ClientSideCommand, Command, CommandResponse, Uuid};
use log::{error, trace};
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use tokio::select;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use tokio_util::sync::CancellationToken;
use tonic::codegen::tokio_stream::wrappers::ReceiverStream;
use tonic::codegen::tokio_stream::{Stream, StreamExt};
use tonic::{Request, Response, Status, Streaming};
pub struct CoreCommandService {
pub command_service: Arc<CommandManagementService>,
pub cancellation_token: CancellationToken,
}
#[tonic::async_trait]
impl CommandService for CoreCommandService {
type NewCommandStream = Pin<Box<dyn Stream<Item = Result<Command, Status>> + Send>>;
async fn new_command(
&self,
request: Request<Streaming<ClientSideCommand>>,
) -> Result<Response<Self::NewCommandStream>, Status> {
trace!("CoreCommandService::new_command");
let cancel_token = self.cancellation_token.clone();
let mut in_stream = request.into_inner();
let cmd_request = select! {
_ = cancel_token.cancelled() => return Err(Status::internal("Shutting Down")),
Some(message) = in_stream.next() => {
match message {
Ok(ClientSideCommand {
inner: Some(Inner::Request(cmd_request))
}) => cmd_request,
Err(err) => {
error!("Error in Stream: {err}");
return Err(Status::cancelled("Error in Stream"));
},
_ => {
return Err(Status::invalid_argument("First message must be request"));
},
}
},
else => return Err(Status::internal("Shutting Down")),
};
let mut cmd_rx = match self.command_service.register_command(cmd_request).await {
Ok(rx) => rx,
Err(e) => {
error!("Failed to register command: {e}");
return Err(Status::internal("Failed to register command"));
}
};
let (tx, rx) = mpsc::channel(128);
tokio::spawn(async move {
let mut result = Status::resource_exhausted("End of Command Stream");
let mut in_progress = HashMap::<String, oneshot::Sender<CommandResponse>>::new();
loop {
select! {
_ = cancel_token.cancelled() => break,
_ = tx.closed() => break,
Some(message) = cmd_rx.recv() => {
match message {
None => break,
Some(message) => {
let key = message.0.uuid.clone().unwrap().value;
in_progress.insert(key.clone(), message.1);
match tx.send(Ok(message.0)).await {
Ok(()) => {},
Err(e) => {
error!("Failed to send command data: {e}");
if in_progress.remove(&key).unwrap().send(CommandResponse {
uuid: Some(Uuid::from(key)),
success: false,
response: "Failed to send command data.".to_string(),
}).is_err() {
error!("Failed to send command response on failure to send command data");
}
break;
}
}
}
}
},
Some(message) = in_stream.next() => {
match message {
Ok(message) => {
match message.inner {
Some(Inner::Response(response)) => {
if let Some(uuid) = &response.uuid {
match in_progress.remove(&uuid.value) {
Some(sender) => {
if sender.send(response).is_err() {
error!("Failed to send command response on success")
}
}
None => {
result = Status::invalid_argument("Invalid Command UUID");
break;
}
}
}
}
_ => {
result = Status::invalid_argument("Subsequent Message Must Be Command Responses");
break;
}
}
}
Err(e) => {
error!("Received error from command handler {e}");
break
},
}
}
else => break,
}
}
cmd_rx.close();
if !tx.is_closed() {
match tx.send(Err(result)).await {
Ok(()) => {}
Err(e) => {
error!("Failed to close old command sender {e}");
}
}
}
for (key, sender) in in_progress.drain() {
if sender.send(CommandResponse {
uuid: Some(Uuid::from(key)),
success: false,
response: "Command Handler Shut Down".to_string(),
}).is_err() {
error!("Failed to send command response on shutdown");
}
}
});
Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
}
}

44
server/src/grpc/mod.rs Normal file
View File

@@ -0,0 +1,44 @@
mod cmd;
mod tlm;
use crate::command::service::CommandManagementService;
use crate::core::command_service_server::CommandServiceServer;
use crate::core::telemetry_service_server::TelemetryServiceServer;
use crate::grpc::cmd::CoreCommandService;
use crate::grpc::tlm::CoreTelemetryService;
use crate::telemetry::management_service::TelemetryManagementService;
use log::{error, info};
use std::sync::Arc;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tonic::transport::Server;
pub fn setup(
token: CancellationToken,
telemetry_management_service: Arc<TelemetryManagementService>,
command_service: Arc<CommandManagementService>,
) -> anyhow::Result<JoinHandle<()>> {
let addr = "[::1]:50051".parse()?;
Ok(tokio::spawn(async move {
let tlm_service = CoreTelemetryService {
tlm_management: telemetry_management_service,
cancellation_token: token.clone(),
};
let cmd_service = CoreCommandService {
command_service,
cancellation_token: token.clone(),
};
info!("Starting gRPC Server");
let result = Server::builder()
.add_service(TelemetryServiceServer::new(tlm_service))
.add_service(CommandServiceServer::new(cmd_service))
.serve_with_shutdown(addr, token.cancelled_owned())
.await;
if let Err(err) = result {
error!("gRPC Server Encountered An Error: {err}");
}
}))
}

View File

@@ -1,4 +1,4 @@
use crate::core::telemetry_service_server::{TelemetryService, TelemetryServiceServer};
use crate::core::telemetry_service_server::TelemetryService;
use crate::core::telemetry_value::Value;
use crate::core::{
TelemetryDataType, TelemetryDefinitionRequest, TelemetryDefinitionResponse,
@@ -9,16 +9,14 @@ use crate::telemetry::data_value::TelemetryDataValue;
use crate::telemetry::history::TelemetryHistory;
use crate::telemetry::management_service::TelemetryManagementService;
use chrono::{DateTime, SecondsFormat};
use log::{error, info, trace};
use log::trace;
use std::pin::Pin;
use std::sync::Arc;
use tokio::select;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tonic::codegen::tokio_stream::wrappers::ReceiverStream;
use tonic::codegen::tokio_stream::{Stream, StreamExt};
use tonic::transport::Server;
use tonic::{Request, Response, Status, Streaming};
pub struct CoreTelemetryService {
@@ -141,26 +139,3 @@ impl CoreTelemetryService {
Ok(TelemetryInsertResponse {})
}
}
pub fn setup(
token: CancellationToken,
telemetry_management_service: Arc<TelemetryManagementService>,
) -> anyhow::Result<JoinHandle<()>> {
let addr = "[::1]:50051".parse()?;
Ok(tokio::spawn(async move {
let tlm_service = CoreTelemetryService {
tlm_management: telemetry_management_service,
cancellation_token: token.clone(),
};
info!("Starting gRPC Server");
let result = Server::builder()
.add_service(TelemetryServiceServer::new(tlm_service))
.serve_with_shutdown(addr, token.cancelled_owned())
.await;
if let Err(err) = result {
error!("gRPC Server Encountered An Error: {err}");
}
}))
}

View File

@@ -0,0 +1,34 @@
use crate::command::service::CommandManagementService;
use crate::http::error::HttpServerResultError;
use actix_web::{get, post, web, Responder};
use std::sync::Arc;
#[post("/cmd/{name:[\\w\\d/_-]+}")]
pub(super) async fn send_command(
command_service: web::Data<Arc<CommandManagementService>>,
name: web::Path<String>,
parameters: web::Json<serde_json::Map<String, serde_json::Value>>,
) -> Result<impl Responder, HttpServerResultError> {
let result = command_service
.send_command(name.to_string(), parameters.into_inner())
.await?;
Ok(web::Json(result))
}
#[get("/cmd")]
pub(super) async fn get_all(
command_service: web::Data<Arc<CommandManagementService>>,
) -> Result<impl Responder, HttpServerResultError> {
Ok(web::Json(command_service.get_commands()?))
}
#[get("/cmd/{name:[\\w\\d/_-]+}")]
pub(super) async fn get_one(
command_service: web::Data<Arc<CommandManagementService>>,
name: web::Path<String>,
) -> Result<impl Responder, HttpServerResultError> {
Ok(web::Json(
command_service.get_command_definition(&name.to_string()),
))
}

View File

@@ -1,3 +1,4 @@
mod cmd;
mod panels;
mod tlm;
@@ -11,5 +12,8 @@ pub fn setup_api(cfg: &mut web::ServiceConfig) {
.service(panels::get_all)
.service(panels::get_one)
.service(panels::set)
.service(panels::delete);
.service(panels::delete)
.service(cmd::send_command)
.service(cmd::get_all)
.service(cmd::get_one);
}

View File

@@ -2,7 +2,6 @@ use actix_web::error::ResponseError;
use actix_web::http::header::ContentType;
use actix_web::http::StatusCode;
use actix_web::HttpResponse;
use anyhow::Error;
use thiserror::Error;
#[derive(Error, Debug)]
@@ -16,31 +15,28 @@ pub enum HttpServerResultError {
#[error("Timed out")]
Timeout,
#[error("Internal Error")]
InternalError(anyhow::Error),
InternalError(#[from] anyhow::Error),
#[error("Panel Uuid Not Found: {uuid}")]
PanelUuidNotFound { uuid: String },
#[error(transparent)]
Command(#[from] crate::command::error::Error),
}
impl ResponseError for HttpServerResultError {
fn status_code(&self) -> StatusCode {
match *self {
match self {
HttpServerResultError::TlmNameNotFound { .. } => StatusCode::NOT_FOUND,
HttpServerResultError::TlmUuidNotFound { .. } => StatusCode::NOT_FOUND,
HttpServerResultError::InvalidDateTime { .. } => StatusCode::BAD_REQUEST,
HttpServerResultError::Timeout => StatusCode::GATEWAY_TIMEOUT,
HttpServerResultError::InternalError { .. } => StatusCode::INTERNAL_SERVER_ERROR,
HttpServerResultError::PanelUuidNotFound { .. } => StatusCode::NOT_FOUND,
HttpServerResultError::Command(inner) => inner.status_code(),
}
}
fn error_response(&self) -> HttpResponse {
HttpResponse::build(self.status_code())
.insert_header(ContentType::html())
.insert_header(ContentType::plaintext())
.body(self.to_string())
}
}
impl From<anyhow::Error> for HttpServerResultError {
fn from(value: Error) -> Self {
Self::InternalError(value)
}
}

View File

@@ -2,6 +2,7 @@ mod api;
mod error;
mod websocket;
use crate::command::service::CommandManagementService;
use crate::http::api::setup_api;
use crate::http::websocket::setup_websocket;
use crate::panels::PanelService;
@@ -16,10 +17,12 @@ pub async fn setup(
cancellation_token: CancellationToken,
telemetry_definitions: Arc<TelemetryManagementService>,
panel_service: PanelService,
command_service: Arc<CommandManagementService>,
) -> anyhow::Result<()> {
let data = web::Data::new(telemetry_definitions);
let cancel_token = web::Data::new(cancellation_token);
let panel_service = web::Data::new(Arc::new(panel_service));
let command_service = web::Data::new(command_service);
info!("Starting HTTP Server");
HttpServer::new(move || {
@@ -27,6 +30,7 @@ pub async fn setup(
.app_data(data.clone())
.app_data(cancel_token.clone())
.app_data(panel_service.clone())
.app_data(command_service.clone())
.service(web::scope("/ws").configure(setup_websocket))
.service(web::scope("/api").configure(setup_api))
.wrap(Logger::default())

View File

@@ -1,3 +1,4 @@
mod command;
mod grpc;
mod http;
mod panels;
@@ -9,6 +10,7 @@ pub mod core {
tonic::include_proto!("core");
}
use crate::command::service::CommandManagementService;
use crate::panels::PanelService;
use crate::telemetry::history::TelemetryHistoryService;
use crate::telemetry::management_service::TelemetryManagementService;
@@ -49,11 +51,13 @@ pub async fn setup() -> anyhow::Result<()> {
TelemetryHistoryService::new(telemetry_folder)?,
)?);
let grpc_server = grpc::setup(cancellation_token.clone(), tlm.clone())?;
let cmd = Arc::new(CommandManagementService::new());
let grpc_server = grpc::setup(cancellation_token.clone(), tlm.clone(), cmd.clone())?;
let panel_service = PanelService::new(sqlite.clone());
let result = http::setup(cancellation_token.clone(), tlm.clone(), panel_service).await;
let result = http::setup(cancellation_token.clone(), tlm.clone(), panel_service, cmd).await;
cancellation_token.cancel();
result?; // result is dropped
grpc_server.await?; //grpc server is dropped