diff --git a/Cargo.lock b/Cargo.lock index 66af3a9..8f99aab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2128,6 +2128,20 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "simple_command" +version = "0.0.0" +dependencies = [ + "anyhow", + "chrono", + "log", + "num-traits", + "server", + "tokio", + "tokio-util", + "tonic", +] + [[package]] name = "simple_producer" version = "0.0.0" diff --git a/Cargo.toml b/Cargo.toml index 8837d48..5567a9c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = ["server", "examples/simple_producer"] +members = ["server", "examples/simple_producer", "examples/simple_command"] resolver = "2" [profile.dev.package.sqlx-macros] diff --git a/examples/simple_command/Cargo.toml b/examples/simple_command/Cargo.toml new file mode 100644 index 0000000..b799df6 --- /dev/null +++ b/examples/simple_command/Cargo.toml @@ -0,0 +1,14 @@ + +[package] +name = "simple_command" +edition = "2021" + +[dependencies] +server = { path = "../../server" } +tonic = "0.12.3" +tokio = { version = "1.43.0", features = ["rt-multi-thread", "signal"] } +chrono = "0.4.39" +tokio-util = "0.7.13" +num-traits = "0.2.19" +log = "0.4.29" +anyhow = "1.0.100" diff --git a/examples/simple_command/src/main.rs b/examples/simple_command/src/main.rs new file mode 100644 index 0000000..ee6f512 --- /dev/null +++ b/examples/simple_command/src/main.rs @@ -0,0 +1,141 @@ +use chrono::DateTime; +use server::core::client_side_command::Inner; +use server::core::command_service_client::CommandServiceClient; +use server::core::telemetry_value::Value; +use server::core::{ + ClientSideCommand, Command, CommandDefinitionRequest, CommandParameterDefinition, + CommandResponse, TelemetryDataType, +}; +use std::error::Error; +use tokio::select; +use tokio::sync::mpsc; +use tokio::task::JoinHandle; +use tokio_util::sync::CancellationToken; +use tonic::codegen::tokio_stream::wrappers::ReceiverStream; +use tonic::codegen::tokio_stream::StreamExt; +use tonic::transport::Channel; + +struct CommandHandler { + handle: JoinHandle<()>, +} + +impl CommandHandler { + pub async fn new CommandResponse + Send + 'static>( + cancellation_token: CancellationToken, + client: &mut CommandServiceClient, + command_definition_request: CommandDefinitionRequest, + handler: F, + ) -> anyhow::Result { + let (tx, rx) = mpsc::channel(4); + + // The buffer size of 4 means this is safe to send immediately + tx.send(ClientSideCommand { + inner: Some(Inner::Request(command_definition_request)), + }) + .await?; + let response = client.new_command(ReceiverStream::new(rx)).await?; + let mut cmd_stream = response.into_inner(); + + let handle = tokio::spawn(async move { + loop { + select! { + _ = cancellation_token.cancelled() => break, + Some(msg) = cmd_stream.next() => { + match msg { + Ok(cmd) => { + let uuid = cmd.uuid.clone(); + let mut response = handler(cmd); + response.uuid = uuid; + match tx.send(ClientSideCommand { + inner: Some(Inner::Response(response)) + }).await { + Ok(()) => {}, + Err(e) => { + println!("SendError: {e}"); + break; + } + } + } + Err(e) => { + println!("Error: {e}"); + break; + } + } + }, + else => break, + } + } + }); + + Ok(Self { handle }) + } + + pub async fn join(self) -> anyhow::Result<()> { + Ok(self.handle.await?) + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let cancellation_token = CancellationToken::new(); + { + let cancellation_token = cancellation_token.clone(); + tokio::spawn(async move { + let _ = tokio::signal::ctrl_c().await; + cancellation_token.cancel(); + }); + } + + let mut client = CommandServiceClient::connect("http://[::1]:50051").await?; + + let cmd_handler = CommandHandler::new( + cancellation_token, + &mut client, + CommandDefinitionRequest { + name: "simple_command/a".to_string(), + parameters: vec![ + CommandParameterDefinition { + name: "a".to_string(), + data_type: TelemetryDataType::Float32.into(), + }, + CommandParameterDefinition { + name: "b".to_string(), + data_type: TelemetryDataType::Float64.into(), + }, + CommandParameterDefinition { + name: "c".to_string(), + data_type: TelemetryDataType::Boolean.into(), + }, + ], + }, + |command| { + let timestamp = command.timestamp.expect("Missing Timestamp"); + let timestamp = DateTime::from_timestamp(timestamp.secs, timestamp.nanos as u32) + .expect("Could not construct date time"); + let Value::Float32(a) = command.parameters["a"].value.expect("Missing Value a") else { + panic!("Wrong Type a"); + }; + let Value::Float64(b) = command.parameters["b"].value.expect("Missing Value b") else { + panic!("Wrong Type b"); + }; + let Value::Boolean(c) = command.parameters["c"].value.expect("Missing Value c") else { + panic!("Wrong Type c"); + }; + + println!("Command Received:\n timestamp: {timestamp}\n a: {a}\n b: {b}\n c: {c}"); + + CommandResponse { + uuid: command.uuid.clone(), + success: true, + response: format!( + "Successfully Received Command! timestamp: {timestamp} a: {a} b: {b} c: {c}" + ), + } + }, + ) + .await?; + + cmd_handler.join().await?; + + Ok(()) +} diff --git a/frontend/src/components/CommandInput.vue b/frontend/src/components/CommandInput.vue new file mode 100644 index 0000000..32b8176 --- /dev/null +++ b/frontend/src/components/CommandInput.vue @@ -0,0 +1,40 @@ + + + + + diff --git a/frontend/src/components/CommandList.vue b/frontend/src/components/CommandList.vue new file mode 100644 index 0000000..b089ce1 --- /dev/null +++ b/frontend/src/components/CommandList.vue @@ -0,0 +1,70 @@ + + + + + diff --git a/frontend/src/components/CommandParameter.vue b/frontend/src/components/CommandParameter.vue new file mode 100644 index 0000000..07d6b3c --- /dev/null +++ b/frontend/src/components/CommandParameter.vue @@ -0,0 +1,23 @@ + + + + + diff --git a/frontend/src/components/CommandParameterDataConfigurator.vue b/frontend/src/components/CommandParameterDataConfigurator.vue new file mode 100644 index 0000000..94af44c --- /dev/null +++ b/frontend/src/components/CommandParameterDataConfigurator.vue @@ -0,0 +1,29 @@ + + + + + diff --git a/frontend/src/components/CommandParameterListConfigurator.vue b/frontend/src/components/CommandParameterListConfigurator.vue new file mode 100644 index 0000000..1df1e6a --- /dev/null +++ b/frontend/src/components/CommandParameterListConfigurator.vue @@ -0,0 +1,99 @@ + + + + + diff --git a/frontend/src/components/CommandSender.vue b/frontend/src/components/CommandSender.vue new file mode 100644 index 0000000..ef301e2 --- /dev/null +++ b/frontend/src/components/CommandSender.vue @@ -0,0 +1,68 @@ + + + + + diff --git a/frontend/src/components/DynamicComponent.vue b/frontend/src/components/DynamicComponent.vue index 14d947a..e8d30ab 100644 --- a/frontend/src/components/DynamicComponent.vue +++ b/frontend/src/components/DynamicComponent.vue @@ -1,6 +1,12 @@