From ac710d4e4f2aac27d706c988c723518614610f6a Mon Sep 17 00:00:00 2001 From: Sergey Savelyev Date: Sat, 27 Dec 2025 15:01:34 -0500 Subject: [PATCH 1/4] initial backend command stuff --- Cargo.lock | 13 ++++ Cargo.toml | 2 +- examples/simple_command/Cargo.toml | 13 ++++ examples/simple_command/src/main.rs | 64 ++++++++++++++++ server/proto/core.proto | 19 +++++ server/src/command/mod.rs | 1 + server/src/command/service.rs | 115 ++++++++++++++++++++++++++++ server/src/grpc/cmd.rs | 83 ++++++++++++++++++++ server/src/grpc/mod.rs | 44 +++++++++++ server/src/{grpc.rs => grpc/tlm.rs} | 29 +------ server/src/http/api/cmd.rs | 17 ++++ server/src/http/api/mod.rs | 4 +- server/src/http/mod.rs | 4 + server/src/lib.rs | 8 +- 14 files changed, 385 insertions(+), 31 deletions(-) create mode 100644 examples/simple_command/Cargo.toml create mode 100644 examples/simple_command/src/main.rs create mode 100644 server/src/command/mod.rs create mode 100644 server/src/command/service.rs create mode 100644 server/src/grpc/cmd.rs create mode 100644 server/src/grpc/mod.rs rename server/src/{grpc.rs => grpc/tlm.rs} (85%) create mode 100644 server/src/http/api/cmd.rs diff --git a/Cargo.lock b/Cargo.lock index 66af3a9..762214e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2128,6 +2128,19 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "simple_command" +version = "0.0.0" +dependencies = [ + "chrono", + "log", + "num-traits", + "server", + "tokio", + "tokio-util", + "tonic", +] + [[package]] name = "simple_producer" version = "0.0.0" diff --git a/Cargo.toml b/Cargo.toml index 8837d48..5567a9c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = ["server", "examples/simple_producer"] +members = ["server", "examples/simple_producer", "examples/simple_command"] resolver = "2" [profile.dev.package.sqlx-macros] diff --git a/examples/simple_command/Cargo.toml b/examples/simple_command/Cargo.toml new file mode 100644 index 0000000..bf018d5 --- /dev/null +++ b/examples/simple_command/Cargo.toml @@ -0,0 +1,13 @@ + +[package] +name = "simple_command" +edition = "2021" + +[dependencies] +server = { path = "../../server" } +tonic = "0.12.3" +tokio = { version = "1.43.0", features = ["rt-multi-thread", "signal"] } +chrono = "0.4.39" +tokio-util = "0.7.13" +num-traits = "0.2.19" +log = "0.4.29" diff --git a/examples/simple_command/src/main.rs b/examples/simple_command/src/main.rs new file mode 100644 index 0000000..c6c3349 --- /dev/null +++ b/examples/simple_command/src/main.rs @@ -0,0 +1,64 @@ +use chrono::DateTime; +use server::core::command_service_client::CommandServiceClient; +use server::core::telemetry_value::Value; +use server::core::{CommandDefinitionRequest, CommandParameterDefinition, TelemetryDataType}; +use std::error::Error; +use tokio::select; +use tokio_util::sync::CancellationToken; +use tonic::codegen::tokio_stream::StreamExt; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let cancellation_token = CancellationToken::new(); + { + let cancellation_token = cancellation_token.clone(); + tokio::spawn(async move { + let _ = tokio::signal::ctrl_c().await; + cancellation_token.cancel(); + }); + } + + let mut client = CommandServiceClient::connect("http://[::1]:50051").await?; + + let command_response = client + .new_command(CommandDefinitionRequest { + name: "simple_command/a".to_string(), + parameters: vec![ + CommandParameterDefinition { + name: "a".to_string(), + data_type: TelemetryDataType::Float32.into(), + }, + CommandParameterDefinition { + name: "b".to_string(), + data_type: TelemetryDataType::Float64.into(), + }, + CommandParameterDefinition { + name: "c".to_string(), + data_type: TelemetryDataType::Boolean.into(), + }, + ], + }) + .await?; + let mut command_stream = command_response.into_inner(); + + loop { + select! { + _ = cancellation_token.cancelled() => { + break; + }, + Some(command) = command_stream.next() => { + let command = command?; + + let timestamp = command.timestamp.expect("Missing Timestamp"); + let timestamp = DateTime::from_timestamp(timestamp.secs, timestamp.nanos as u32).expect("Could not construct date time"); + let Value::Float32(a) = command.parameters["a"].value.expect("Missing Value a") else { panic!("Wrong Type a"); }; + let Value::Float64(b) = command.parameters["b"].value.expect("Missing Value b") else { panic!("Wrong Type b"); }; + let Value::Boolean(c) = command.parameters["c"].value.expect("Missing Value c") else { panic!("Wrong Type c"); }; + + println!("Command Received:\n timestamp: {timestamp}\n a: {a}\n b: {b}\n c: {c}"); + }, + } + } + + Ok(()) +} diff --git a/server/proto/core.proto b/server/proto/core.proto index 27218fb..d6b98e6 100644 --- a/server/proto/core.proto +++ b/server/proto/core.proto @@ -48,3 +48,22 @@ service TelemetryService { rpc NewTelemetry (TelemetryDefinitionRequest) returns (TelemetryDefinitionResponse); rpc InsertTelemetry (stream TelemetryItem) returns (stream TelemetryInsertResponse); } + +message CommandParameterDefinition { + string name = 1; + TelemetryDataType data_type = 2; +} + +message CommandDefinitionRequest { + string name = 1; + repeated CommandParameterDefinition parameters = 2; +} + +message Command { + Timestamp timestamp = 1; + map parameters = 2; +} + +service CommandService { + rpc NewCommand (CommandDefinitionRequest) returns (stream Command); +} diff --git a/server/src/command/mod.rs b/server/src/command/mod.rs new file mode 100644 index 0000000..1f278a4 --- /dev/null +++ b/server/src/command/mod.rs @@ -0,0 +1 @@ +pub mod service; diff --git a/server/src/command/service.rs b/server/src/command/service.rs new file mode 100644 index 0000000..b4bcd83 --- /dev/null +++ b/server/src/command/service.rs @@ -0,0 +1,115 @@ +use crate::core::telemetry_value::Value; +use crate::core::{ + Command, CommandDefinitionRequest, TelemetryDataType, TelemetryValue, Timestamp, +}; +use anyhow::{bail, ensure}; +use chrono::{DateTime, Utc}; +use papaya::HashMap; +use std::collections::HashMap as StdHashMap; +use tokio::sync::mpsc; +use tokio::sync::mpsc::{Receiver, Sender}; + +struct RegisteredCommand { + definition: CommandDefinitionRequest, + tx: Sender>, +} + +pub struct CommandManagementService { + registered_commands: HashMap, +} + +impl CommandManagementService { + pub fn new() -> Self { + Self { + registered_commands: HashMap::new(), + } + } + + pub async fn register_command( + &self, + command: CommandDefinitionRequest, + ) -> anyhow::Result>> { + let (tx, rx) = mpsc::channel(1); + + let registered_commands = self.registered_commands.pin_owned(); + if let Some(previous) = registered_commands.insert( + command.name.clone(), + RegisteredCommand { + definition: command, + tx, + }, + ) { + // If the receiver was already closed, we don't care (ignore error) + let _ = previous.tx.send(None).await; + } + + Ok(rx) + } + + pub async fn send_command( + &self, + name: impl Into, + parameters: serde_json::Map, + ) -> anyhow::Result<()> { + let timestamp = Utc::now(); + let offset_from_unix_epoch = + timestamp - DateTime::from_timestamp(0, 0).expect("Could not get Unix epoch"); + + let name = name.into(); + let registered_commands = self.registered_commands.pin(); + let Some(registration) = registered_commands.get(&name) else { + bail!("Command Not Found {name}"); + }; + + ensure!( + parameters.len() == registration.definition.parameters.len(), + "Command has {} parameters. {} expected", + parameters.len(), + registration.definition.parameters.len() + ); + let mut result_parameters = StdHashMap::new(); + for parameter in ®istration.definition.parameters { + let Some(param_value) = parameters.get(¶meter.name) else { + bail!("Command Missing Parameter: {}", parameter.name); + }; + let Some(param_value) = (match parameter.data_type() { + TelemetryDataType::Float32 => { + param_value.as_f64().map(|v| Value::Float32(v as f32)) + } + TelemetryDataType::Float64 => param_value.as_f64().map(Value::Float64), + TelemetryDataType::Boolean => param_value.as_bool().map(Value::Boolean), + }) else { + bail!( + "Parameter {} has the wrong type. {:?} expected", + parameter.name, + parameter.data_type() + ); + }; + result_parameters.insert( + parameter.name.clone(), + TelemetryValue { + value: Some(param_value), + }, + ); + } + + // Clone & Drop lets us use a standard pin instead of an owned pin + let tx = registration.tx.clone(); + drop(registered_commands); + + if let Err(e) = tx + .send(Some(Command { + timestamp: Some(Timestamp { + secs: offset_from_unix_epoch.num_seconds(), + nanos: offset_from_unix_epoch.subsec_nanos(), + }), + parameters: result_parameters, + })) + .await + { + bail!("Failed to send command {e}"); + } + + Ok(()) + } +} diff --git a/server/src/grpc/cmd.rs b/server/src/grpc/cmd.rs new file mode 100644 index 0000000..359b380 --- /dev/null +++ b/server/src/grpc/cmd.rs @@ -0,0 +1,83 @@ +use crate::command::service::CommandManagementService; +use crate::core::command_service_server::CommandService; +use crate::core::{Command, CommandDefinitionRequest}; +use log::{error, trace}; +use std::pin::Pin; +use std::sync::Arc; +use tokio::select; +use tokio::sync::mpsc; +use tokio_util::sync::CancellationToken; +use tonic::codegen::tokio_stream::wrappers::ReceiverStream; +use tonic::codegen::tokio_stream::Stream; +use tonic::{Request, Response, Status}; + +pub struct CoreCommandService { + pub command_service: Arc, + pub cancellation_token: CancellationToken, +} + +#[tonic::async_trait] +impl CommandService for CoreCommandService { + type NewCommandStream = Pin> + Send>>; + + async fn new_command( + &self, + request: Request, + ) -> Result, Status> { + trace!("CoreCommandService::new_command"); + + let cancel_token = self.cancellation_token.clone(); + let mut cmd_rx = match self + .command_service + .register_command(request.into_inner()) + .await + { + Ok(rx) => rx, + Err(e) => { + error!("Failed to register command: {e}"); + return Err(Status::internal("Failed to register command")); + } + }; + + let (tx, rx) = mpsc::channel(128); + + tokio::spawn(async move { + loop { + select! { + _ = cancel_token.cancelled() => break, + _ = tx.closed() => break, + Some(message) = cmd_rx.recv() => { + match message { + None => break, + Some(message) => { + match tx.send(Ok(message)).await { + Ok(()) => {}, + Err(e) => { + error!("Failed to send command data: {e}"); + break; + } + } + } + } + + }, + else => break, + } + } + cmd_rx.close(); + if !tx.is_closed() { + match tx + .send(Err(Status::resource_exhausted("End of Command Stream"))) + .await + { + Ok(()) => {} + Err(e) => { + error!("Failed to close old command sender {e}"); + } + } + } + }); + + Ok(Response::new(Box::pin(ReceiverStream::new(rx)))) + } +} diff --git a/server/src/grpc/mod.rs b/server/src/grpc/mod.rs new file mode 100644 index 0000000..1e7b01b --- /dev/null +++ b/server/src/grpc/mod.rs @@ -0,0 +1,44 @@ +mod cmd; +mod tlm; + +use crate::command::service::CommandManagementService; +use crate::core::command_service_server::CommandServiceServer; +use crate::core::telemetry_service_server::TelemetryServiceServer; +use crate::grpc::cmd::CoreCommandService; +use crate::grpc::tlm::CoreTelemetryService; +use crate::telemetry::management_service::TelemetryManagementService; +use log::{error, info}; +use std::sync::Arc; +use tokio::task::JoinHandle; +use tokio_util::sync::CancellationToken; +use tonic::transport::Server; + +pub fn setup( + token: CancellationToken, + telemetry_management_service: Arc, + command_service: Arc, +) -> anyhow::Result> { + let addr = "[::1]:50051".parse()?; + Ok(tokio::spawn(async move { + let tlm_service = CoreTelemetryService { + tlm_management: telemetry_management_service, + cancellation_token: token.clone(), + }; + + let cmd_service = CoreCommandService { + command_service, + cancellation_token: token.clone(), + }; + + info!("Starting gRPC Server"); + let result = Server::builder() + .add_service(TelemetryServiceServer::new(tlm_service)) + .add_service(CommandServiceServer::new(cmd_service)) + .serve_with_shutdown(addr, token.cancelled_owned()) + .await; + + if let Err(err) = result { + error!("gRPC Server Encountered An Error: {err}"); + } + })) +} diff --git a/server/src/grpc.rs b/server/src/grpc/tlm.rs similarity index 85% rename from server/src/grpc.rs rename to server/src/grpc/tlm.rs index b9682d3..fff260d 100644 --- a/server/src/grpc.rs +++ b/server/src/grpc/tlm.rs @@ -1,4 +1,4 @@ -use crate::core::telemetry_service_server::{TelemetryService, TelemetryServiceServer}; +use crate::core::telemetry_service_server::TelemetryService; use crate::core::telemetry_value::Value; use crate::core::{ TelemetryDataType, TelemetryDefinitionRequest, TelemetryDefinitionResponse, @@ -9,16 +9,14 @@ use crate::telemetry::data_value::TelemetryDataValue; use crate::telemetry::history::TelemetryHistory; use crate::telemetry::management_service::TelemetryManagementService; use chrono::{DateTime, SecondsFormat}; -use log::{error, info, trace}; +use log::trace; use std::pin::Pin; use std::sync::Arc; use tokio::select; use tokio::sync::mpsc; -use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use tonic::codegen::tokio_stream::wrappers::ReceiverStream; use tonic::codegen::tokio_stream::{Stream, StreamExt}; -use tonic::transport::Server; use tonic::{Request, Response, Status, Streaming}; pub struct CoreTelemetryService { @@ -141,26 +139,3 @@ impl CoreTelemetryService { Ok(TelemetryInsertResponse {}) } } - -pub fn setup( - token: CancellationToken, - telemetry_management_service: Arc, -) -> anyhow::Result> { - let addr = "[::1]:50051".parse()?; - Ok(tokio::spawn(async move { - let tlm_service = CoreTelemetryService { - tlm_management: telemetry_management_service, - cancellation_token: token.clone(), - }; - - info!("Starting gRPC Server"); - let result = Server::builder() - .add_service(TelemetryServiceServer::new(tlm_service)) - .serve_with_shutdown(addr, token.cancelled_owned()) - .await; - - if let Err(err) = result { - error!("gRPC Server Encountered An Error: {err}"); - } - })) -} diff --git a/server/src/http/api/cmd.rs b/server/src/http/api/cmd.rs new file mode 100644 index 0000000..d08c74f --- /dev/null +++ b/server/src/http/api/cmd.rs @@ -0,0 +1,17 @@ +use crate::command::service::CommandManagementService; +use crate::http::error::HttpServerResultError; +use actix_web::{post, web, Responder}; +use std::sync::Arc; + +#[post("/cmd/{name:[\\w\\d/_-]+}")] +pub(super) async fn send_command( + command_service: web::Data>, + name: web::Path, + parameters: web::Json>, +) -> Result { + command_service + .send_command(name.to_string(), parameters.into_inner()) + .await?; + + Ok(web::Json(())) +} diff --git a/server/src/http/api/mod.rs b/server/src/http/api/mod.rs index 4710521..82f5c15 100644 --- a/server/src/http/api/mod.rs +++ b/server/src/http/api/mod.rs @@ -1,3 +1,4 @@ +mod cmd; mod panels; mod tlm; @@ -11,5 +12,6 @@ pub fn setup_api(cfg: &mut web::ServiceConfig) { .service(panels::get_all) .service(panels::get_one) .service(panels::set) - .service(panels::delete); + .service(panels::delete) + .service(cmd::send_command); } diff --git a/server/src/http/mod.rs b/server/src/http/mod.rs index 3690c75..250eafb 100644 --- a/server/src/http/mod.rs +++ b/server/src/http/mod.rs @@ -2,6 +2,7 @@ mod api; mod error; mod websocket; +use crate::command::service::CommandManagementService; use crate::http::api::setup_api; use crate::http::websocket::setup_websocket; use crate::panels::PanelService; @@ -16,10 +17,12 @@ pub async fn setup( cancellation_token: CancellationToken, telemetry_definitions: Arc, panel_service: PanelService, + command_service: Arc, ) -> anyhow::Result<()> { let data = web::Data::new(telemetry_definitions); let cancel_token = web::Data::new(cancellation_token); let panel_service = web::Data::new(Arc::new(panel_service)); + let command_service = web::Data::new(command_service); info!("Starting HTTP Server"); HttpServer::new(move || { @@ -27,6 +30,7 @@ pub async fn setup( .app_data(data.clone()) .app_data(cancel_token.clone()) .app_data(panel_service.clone()) + .app_data(command_service.clone()) .service(web::scope("/ws").configure(setup_websocket)) .service(web::scope("/api").configure(setup_api)) .wrap(Logger::default()) diff --git a/server/src/lib.rs b/server/src/lib.rs index 616a2b2..0fdf332 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -1,3 +1,4 @@ +mod command; mod grpc; mod http; mod panels; @@ -9,6 +10,7 @@ pub mod core { tonic::include_proto!("core"); } +use crate::command::service::CommandManagementService; use crate::panels::PanelService; use crate::telemetry::history::TelemetryHistoryService; use crate::telemetry::management_service::TelemetryManagementService; @@ -49,11 +51,13 @@ pub async fn setup() -> anyhow::Result<()> { TelemetryHistoryService::new(telemetry_folder)?, )?); - let grpc_server = grpc::setup(cancellation_token.clone(), tlm.clone())?; + let cmd = Arc::new(CommandManagementService::new()); + + let grpc_server = grpc::setup(cancellation_token.clone(), tlm.clone(), cmd.clone())?; let panel_service = PanelService::new(sqlite.clone()); - let result = http::setup(cancellation_token.clone(), tlm.clone(), panel_service).await; + let result = http::setup(cancellation_token.clone(), tlm.clone(), panel_service, cmd).await; cancellation_token.cancel(); result?; // result is dropped grpc_server.await?; //grpc server is dropped -- 2.43.0 From 59a0c81eb42c41418205302ec4d602d3c8992df8 Mon Sep 17 00:00:00 2001 From: Sergey Savelyev Date: Sun, 28 Dec 2025 00:01:55 -0500 Subject: [PATCH 2/4] initial frontend command stuff --- frontend/src/components/CommandList.vue | 70 ++++++++++++++++++++ frontend/src/components/CommandParameter.vue | 37 +++++++++++ frontend/src/components/CommandSender.vue | 67 +++++++++++++++++++ frontend/src/composables/command.ts | 52 +++++++++++++++ frontend/src/panels/panel.ts | 7 +- frontend/src/router/index.ts | 9 ++- frontend/src/views/CommandListView.vue | 61 +++++++++++++++++ frontend/src/views/TelemetryListView.vue | 57 ++++++++-------- server/src/command/definition.rs | 36 ++++++++++ server/src/command/error.rs | 36 ++++++++++ server/src/command/mod.rs | 2 + server/src/command/service.rs | 67 ++++++++++++++----- server/src/http/api/cmd.rs | 21 +++++- server/src/http/api/mod.rs | 4 +- server/src/http/error.rs | 16 ++--- 15 files changed, 480 insertions(+), 62 deletions(-) create mode 100644 frontend/src/components/CommandList.vue create mode 100644 frontend/src/components/CommandParameter.vue create mode 100644 frontend/src/components/CommandSender.vue create mode 100644 frontend/src/composables/command.ts create mode 100644 frontend/src/views/CommandListView.vue create mode 100644 server/src/command/definition.rs create mode 100644 server/src/command/error.rs diff --git a/frontend/src/components/CommandList.vue b/frontend/src/components/CommandList.vue new file mode 100644 index 0000000..b089ce1 --- /dev/null +++ b/frontend/src/components/CommandList.vue @@ -0,0 +1,70 @@ + + + + + diff --git a/frontend/src/components/CommandParameter.vue b/frontend/src/components/CommandParameter.vue new file mode 100644 index 0000000..e1e5490 --- /dev/null +++ b/frontend/src/components/CommandParameter.vue @@ -0,0 +1,37 @@ + + + + + diff --git a/frontend/src/components/CommandSender.vue b/frontend/src/components/CommandSender.vue new file mode 100644 index 0000000..37d1315 --- /dev/null +++ b/frontend/src/components/CommandSender.vue @@ -0,0 +1,67 @@ + + + + + diff --git a/frontend/src/composables/command.ts b/frontend/src/composables/command.ts new file mode 100644 index 0000000..13f4439 --- /dev/null +++ b/frontend/src/composables/command.ts @@ -0,0 +1,52 @@ +import { ref, toValue, watchEffect } from 'vue'; +import { type MaybeRefOrGetter } from 'vue'; + +export interface CommandParameterDefinition { + name: string; + data_type: string; +} + +export interface CommandDefinition { + name: string; + parameters: CommandParameterDefinition[]; +} + +export function useAllCommands() { + const data = ref(null); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const error = ref(null); + + watchEffect(async () => { + try { + const res = await fetch(`/api/cmd`); + data.value = await res.json(); + error.value = null; + } catch (e) { + data.value = null; + error.value = e; + } + }); + + return { data, error }; +} + +export function ueCommand(name: MaybeRefOrGetter) { + const data = ref(null); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const error = ref(null); + + watchEffect(async () => { + const name_value = toValue(name); + + try { + const res = await fetch(`/api/cmd/${name_value}`); + data.value = await res.json(); + error.value = null; + } catch (e) { + data.value = null; + error.value = e; + } + }); + + return { data, error }; +} diff --git a/frontend/src/panels/panel.ts b/frontend/src/panels/panel.ts index b721f93..45479c0 100644 --- a/frontend/src/panels/panel.ts +++ b/frontend/src/panels/panel.ts @@ -34,7 +34,12 @@ export function usePanelHeirarchy(): Ref { }, { name: 'Telemetry Elements', - to: { name: 'list' }, + to: { name: 'tlm' }, + type: PanelHeirarchyType.LEAF, + }, + { + name: 'Commands', + to: { name: 'cmd' }, type: PanelHeirarchyType.LEAF, }, { diff --git a/frontend/src/router/index.ts b/frontend/src/router/index.ts index e3a174c..13ef3a9 100644 --- a/frontend/src/router/index.ts +++ b/frontend/src/router/index.ts @@ -14,10 +14,15 @@ const router = createRouter({ component: () => import('../views/GraphView.vue'), }, { - path: '/list', - name: 'list', + path: '/tlm', + name: 'tlm', component: () => import('../views/TelemetryListView.vue'), }, + { + path: '/cmd', + name: 'cmd', + component: () => import('../views/CommandListView.vue'), + }, { path: '/chart', name: 'chart', diff --git a/frontend/src/views/CommandListView.vue b/frontend/src/views/CommandListView.vue new file mode 100644 index 0000000..c5f3f46 --- /dev/null +++ b/frontend/src/views/CommandListView.vue @@ -0,0 +1,61 @@ + + + + + diff --git a/frontend/src/views/TelemetryListView.vue b/frontend/src/views/TelemetryListView.vue index 672873d..2859bd8 100644 --- a/frontend/src/views/TelemetryListView.vue +++ b/frontend/src/views/TelemetryListView.vue @@ -6,7 +6,6 @@ import type { TelemetryDefinition } from '@/composables/telemetry'; import TelemetryInfo from '@/components/TelemetryInfo.vue'; import FlexDivider from '@/components/FlexDivider.vue'; import ScreenLayout from '@/components/layout/ScreenLayout.vue'; -import { Direction } from '@/composables/Direction.ts'; import { ScreenType } from '@/composables/ScreenType.ts'; const searchValue = ref(''); @@ -16,36 +15,38 @@ const mousedover = ref(null); diff --git a/server/src/command/definition.rs b/server/src/command/definition.rs new file mode 100644 index 0000000..d724c8a --- /dev/null +++ b/server/src/command/definition.rs @@ -0,0 +1,36 @@ +use crate::command::service::RegisteredCommand; +use crate::core::TelemetryDataType; +use crate::telemetry::data_type::tlm_data_type_deserializer; +use crate::telemetry::data_type::tlm_data_type_serializer; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct CommandParameterDefinition { + pub name: String, + #[serde(serialize_with = "tlm_data_type_serializer")] + #[serde(deserialize_with = "tlm_data_type_deserializer")] + pub data_type: TelemetryDataType, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct CommandDefinition { + pub name: String, + pub parameters: Vec, +} + +impl From for CommandDefinition { + fn from(value: RegisteredCommand) -> Self { + Self { + name: value.name, + parameters: value + .definition + .parameters + .into_iter() + .map(|param| CommandParameterDefinition { + data_type: param.data_type(), + name: param.name, + }) + .collect(), + } + } +} diff --git a/server/src/command/error.rs b/server/src/command/error.rs new file mode 100644 index 0000000..aa0bbe0 --- /dev/null +++ b/server/src/command/error.rs @@ -0,0 +1,36 @@ +use crate::core::TelemetryDataType; +use actix_web::http::StatusCode; +use actix_web::ResponseError; +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum Error { + #[error("Command Not Found {0}")] + CommandNotFound(String), + #[error("Incorrect Number of Parameters Specified. {expected} expected. {actual} found.")] + IncorrectParameterCount { expected: usize, actual: usize }, + #[error("Missing Parameter {0}.")] + MisingParameter(String), + #[error("Incorrect Parameter Type for {name}. {expected_type:?} expected.")] + WrongParameterType { + name: String, + expected_type: TelemetryDataType, + }, + #[error("No Command Receiver")] + NoCommandReceiver, + #[error("Failed to Send")] + FailedToSend, +} + +impl ResponseError for Error { + fn status_code(&self) -> StatusCode { + match *self { + Error::CommandNotFound(_) => StatusCode::NOT_FOUND, + Error::IncorrectParameterCount { .. } => StatusCode::BAD_REQUEST, + Error::MisingParameter(_) => StatusCode::BAD_REQUEST, + Error::WrongParameterType { .. } => StatusCode::BAD_REQUEST, + Error::NoCommandReceiver => StatusCode::SERVICE_UNAVAILABLE, + Error::FailedToSend => StatusCode::INTERNAL_SERVER_ERROR, + } + } +} diff --git a/server/src/command/mod.rs b/server/src/command/mod.rs index 1f278a4..e3c4024 100644 --- a/server/src/command/mod.rs +++ b/server/src/command/mod.rs @@ -1 +1,3 @@ +mod definition; +pub mod error; pub mod service; diff --git a/server/src/command/service.rs b/server/src/command/service.rs index b4bcd83..452451e 100644 --- a/server/src/command/service.rs +++ b/server/src/command/service.rs @@ -1,16 +1,24 @@ +use crate::command::definition::CommandDefinition; +use crate::command::error::Error as CmdError; +use crate::command::error::Error::{ + CommandNotFound, FailedToSend, IncorrectParameterCount, MisingParameter, NoCommandReceiver, + WrongParameterType, +}; use crate::core::telemetry_value::Value; use crate::core::{ Command, CommandDefinitionRequest, TelemetryDataType, TelemetryValue, Timestamp, }; -use anyhow::{bail, ensure}; use chrono::{DateTime, Utc}; +use log::error; use papaya::HashMap; use std::collections::HashMap as StdHashMap; use tokio::sync::mpsc; use tokio::sync::mpsc::{Receiver, Sender}; -struct RegisteredCommand { - definition: CommandDefinitionRequest, +#[derive(Clone)] +pub(super) struct RegisteredCommand { + pub(super) name: String, + pub(super) definition: CommandDefinitionRequest, tx: Sender>, } @@ -25,6 +33,24 @@ impl CommandManagementService { } } + pub fn get_commands(&self) -> anyhow::Result> { + let mut result = vec![]; + + let registered_commands = self.registered_commands.pin(); + for registration in registered_commands.values() { + result.push(registration.clone().into()); + } + + Ok(result) + } + + pub fn get_command_definition(&self, name: &String) -> Option { + self.registered_commands + .pin() + .get(name) + .map(|registration| registration.clone().into()) + } + pub async fn register_command( &self, command: CommandDefinitionRequest, @@ -35,6 +61,7 @@ impl CommandManagementService { if let Some(previous) = registered_commands.insert( command.name.clone(), RegisteredCommand { + name: command.name.clone(), definition: command, tx, }, @@ -50,7 +77,7 @@ impl CommandManagementService { &self, name: impl Into, parameters: serde_json::Map, - ) -> anyhow::Result<()> { + ) -> Result<(), CmdError> { let timestamp = Utc::now(); let offset_from_unix_epoch = timestamp - DateTime::from_timestamp(0, 0).expect("Could not get Unix epoch"); @@ -58,19 +85,19 @@ impl CommandManagementService { let name = name.into(); let registered_commands = self.registered_commands.pin(); let Some(registration) = registered_commands.get(&name) else { - bail!("Command Not Found {name}"); + return Err(CommandNotFound(name)); }; - ensure!( - parameters.len() == registration.definition.parameters.len(), - "Command has {} parameters. {} expected", - parameters.len(), - registration.definition.parameters.len() - ); + if parameters.len() != registration.definition.parameters.len() { + return Err(IncorrectParameterCount { + expected: registration.definition.parameters.len(), + actual: parameters.len(), + }); + } let mut result_parameters = StdHashMap::new(); for parameter in ®istration.definition.parameters { let Some(param_value) = parameters.get(¶meter.name) else { - bail!("Command Missing Parameter: {}", parameter.name); + return Err(MisingParameter(parameter.name.clone())); }; let Some(param_value) = (match parameter.data_type() { TelemetryDataType::Float32 => { @@ -79,11 +106,10 @@ impl CommandManagementService { TelemetryDataType::Float64 => param_value.as_f64().map(Value::Float64), TelemetryDataType::Boolean => param_value.as_bool().map(Value::Boolean), }) else { - bail!( - "Parameter {} has the wrong type. {:?} expected", - parameter.name, - parameter.data_type() - ); + return Err(WrongParameterType { + name: parameter.name.clone(), + expected_type: parameter.data_type(), + }); }; result_parameters.insert( parameter.name.clone(), @@ -97,6 +123,10 @@ impl CommandManagementService { let tx = registration.tx.clone(); drop(registered_commands); + if tx.is_closed() { + return Err(NoCommandReceiver); + } + if let Err(e) = tx .send(Some(Command { timestamp: Some(Timestamp { @@ -107,7 +137,8 @@ impl CommandManagementService { })) .await { - bail!("Failed to send command {e}"); + error!("Failed to Send Command: {e}"); + return Err(FailedToSend); } Ok(()) diff --git a/server/src/http/api/cmd.rs b/server/src/http/api/cmd.rs index d08c74f..39de4b0 100644 --- a/server/src/http/api/cmd.rs +++ b/server/src/http/api/cmd.rs @@ -1,6 +1,6 @@ use crate::command::service::CommandManagementService; use crate::http::error::HttpServerResultError; -use actix_web::{post, web, Responder}; +use actix_web::{get, post, web, Responder}; use std::sync::Arc; #[post("/cmd/{name:[\\w\\d/_-]+}")] @@ -13,5 +13,22 @@ pub(super) async fn send_command( .send_command(name.to_string(), parameters.into_inner()) .await?; - Ok(web::Json(())) + Ok(web::Json("Command Sent Successfully.")) +} + +#[get("/cmd")] +pub(super) async fn get_all( + command_service: web::Data>, +) -> Result { + Ok(web::Json(command_service.get_commands()?)) +} + +#[get("/cmd/{name:[\\w\\d/_-]+}")] +pub(super) async fn get_one( + command_service: web::Data>, + name: web::Path, +) -> Result { + Ok(web::Json( + command_service.get_command_definition(&name.to_string()), + )) } diff --git a/server/src/http/api/mod.rs b/server/src/http/api/mod.rs index 82f5c15..0c7875b 100644 --- a/server/src/http/api/mod.rs +++ b/server/src/http/api/mod.rs @@ -13,5 +13,7 @@ pub fn setup_api(cfg: &mut web::ServiceConfig) { .service(panels::get_one) .service(panels::set) .service(panels::delete) - .service(cmd::send_command); + .service(cmd::send_command) + .service(cmd::get_all) + .service(cmd::get_one); } diff --git a/server/src/http/error.rs b/server/src/http/error.rs index 4027309..36ae486 100644 --- a/server/src/http/error.rs +++ b/server/src/http/error.rs @@ -2,7 +2,6 @@ use actix_web::error::ResponseError; use actix_web::http::header::ContentType; use actix_web::http::StatusCode; use actix_web::HttpResponse; -use anyhow::Error; use thiserror::Error; #[derive(Error, Debug)] @@ -16,31 +15,28 @@ pub enum HttpServerResultError { #[error("Timed out")] Timeout, #[error("Internal Error")] - InternalError(anyhow::Error), + InternalError(#[from] anyhow::Error), #[error("Panel Uuid Not Found: {uuid}")] PanelUuidNotFound { uuid: String }, + #[error(transparent)] + Command(#[from] crate::command::error::Error), } impl ResponseError for HttpServerResultError { fn status_code(&self) -> StatusCode { - match *self { + match self { HttpServerResultError::TlmNameNotFound { .. } => StatusCode::NOT_FOUND, HttpServerResultError::TlmUuidNotFound { .. } => StatusCode::NOT_FOUND, HttpServerResultError::InvalidDateTime { .. } => StatusCode::BAD_REQUEST, HttpServerResultError::Timeout => StatusCode::GATEWAY_TIMEOUT, HttpServerResultError::InternalError { .. } => StatusCode::INTERNAL_SERVER_ERROR, HttpServerResultError::PanelUuidNotFound { .. } => StatusCode::NOT_FOUND, + HttpServerResultError::Command(inner) => inner.status_code(), } } fn error_response(&self) -> HttpResponse { HttpResponse::build(self.status_code()) - .insert_header(ContentType::html()) + .insert_header(ContentType::plaintext()) .body(self.to_string()) } } - -impl From for HttpServerResultError { - fn from(value: Error) -> Self { - Self::InternalError(value) - } -} -- 2.43.0 From c3253f3204b697d88d49f4a52b35cbac8d39ee85 Mon Sep 17 00:00:00 2001 From: Sergey Savelyev Date: Sun, 28 Dec 2025 11:48:11 -0500 Subject: [PATCH 3/4] allow panels to hold commands --- frontend/src/components/CommandInput.vue | 40 +++++ frontend/src/components/CommandParameter.vue | 30 +--- .../CommandParameterDataConfigurator.vue | 29 ++++ .../CommandParameterListConfigurator.vue | 99 +++++++++++ frontend/src/components/CommandSender.vue | 3 +- frontend/src/components/DynamicComponent.vue | 154 ++++++++++++++++-- frontend/src/components/TelemetryLine.vue | 25 ++- frontend/src/composables/command.ts | 5 +- frontend/src/composables/dynamic.ts | 37 +++++ frontend/src/composables/telemetry.ts | 3 +- frontend/src/views/PanelView.vue | 11 +- 11 files changed, 385 insertions(+), 51 deletions(-) create mode 100644 frontend/src/components/CommandInput.vue create mode 100644 frontend/src/components/CommandParameterDataConfigurator.vue create mode 100644 frontend/src/components/CommandParameterListConfigurator.vue diff --git a/frontend/src/components/CommandInput.vue b/frontend/src/components/CommandInput.vue new file mode 100644 index 0000000..32b8176 --- /dev/null +++ b/frontend/src/components/CommandInput.vue @@ -0,0 +1,40 @@ + + + + + diff --git a/frontend/src/components/CommandParameter.vue b/frontend/src/components/CommandParameter.vue index e1e5490..07d6b3c 100644 --- a/frontend/src/components/CommandParameter.vue +++ b/frontend/src/components/CommandParameter.vue @@ -1,36 +1,22 @@ diff --git a/frontend/src/components/CommandParameterDataConfigurator.vue b/frontend/src/components/CommandParameterDataConfigurator.vue new file mode 100644 index 0000000..94af44c --- /dev/null +++ b/frontend/src/components/CommandParameterDataConfigurator.vue @@ -0,0 +1,29 @@ + + + + + diff --git a/frontend/src/components/CommandParameterListConfigurator.vue b/frontend/src/components/CommandParameterListConfigurator.vue new file mode 100644 index 0000000..1df1e6a --- /dev/null +++ b/frontend/src/components/CommandParameterListConfigurator.vue @@ -0,0 +1,99 @@ + + + + + diff --git a/frontend/src/components/CommandSender.vue b/frontend/src/components/CommandSender.vue index 37d1315..ef301e2 100644 --- a/frontend/src/components/CommandSender.vue +++ b/frontend/src/components/CommandSender.vue @@ -3,12 +3,13 @@ import type { CommandDefinition } from '@/composables/command.ts'; import { ref } from 'vue'; import CommandParameter from '@/components/CommandParameter.vue'; import FlexDivider from '@/components/FlexDivider.vue'; +import type { DynamicDataType } from '@/composables/dynamic.ts'; const props = defineProps<{ command: CommandDefinition | null; }>(); -const parameters = ref({}); // eslint-disable-line @typescript-eslint/no-explicit-any +const parameters = ref<{ [key: string]: DynamicDataType }>({}); const busy = ref(false); const result = ref(''); diff --git a/frontend/src/components/DynamicComponent.vue b/frontend/src/components/DynamicComponent.vue index 14d947a..e8d30ab 100644 --- a/frontend/src/components/DynamicComponent.vue +++ b/frontend/src/components/DynamicComponent.vue @@ -1,6 +1,12 @@