From b8475a12adfc963df6dbcde2b0df1669e4a1a6a3 Mon Sep 17 00:00:00 2001 From: Sergey Savelyev Date: Wed, 31 Dec 2025 11:15:39 -0500 Subject: [PATCH] improve telemetry ergonomics --- api-core/Cargo.toml | 2 +- api-core/src/data_type.rs | 3 +- api-core/src/data_value.rs | 4 +- api/src/client/command.rs | 17 +- api/src/client/config.rs | 11 ++ api/src/client/context.rs | 4 +- api/src/client/mod.rs | 13 ++ api/src/client/telemetry.rs | 159 +++++++++++++------ examples/simple_command/src/main.rs | 9 +- examples/simple_producer/src/main.rs | 222 +++++++-------------------- 10 files changed, 210 insertions(+), 234 deletions(-) create mode 100644 api/src/client/config.rs diff --git a/api-core/Cargo.toml b/api-core/Cargo.toml index a895fe2..19e0b66 100644 --- a/api-core/Cargo.toml +++ b/api-core/Cargo.toml @@ -7,6 +7,6 @@ authors = ["Sergey "] [dependencies] chrono = { workspace = true, features = ["serde"] } -derive_more = { workspace = true, features = ["try_into"] } +derive_more = { workspace = true, features = ["from", "try_into"] } serde = { workspace = true, features = ["derive"] } thiserror = { workspace = true } diff --git a/api-core/src/data_type.rs b/api-core/src/data_type.rs index 3600b09..ad4c584 100644 --- a/api-core/src/data_type.rs +++ b/api-core/src/data_type.rs @@ -1,3 +1,4 @@ +use crate::data_value::DataValue; use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] @@ -7,7 +8,7 @@ pub enum DataType { Boolean, } -pub trait ToDataType { +pub trait ToDataType: Into { const DATA_TYPE: DataType; } diff --git a/api-core/src/data_value.rs b/api-core/src/data_value.rs index 35422a3..3f257eb 100644 --- a/api-core/src/data_value.rs +++ b/api-core/src/data_value.rs @@ -1,7 +1,7 @@ -use derive_more::TryInto; +use derive_more::{From, TryInto}; use serde::{Deserialize, Serialize}; -#[derive(Debug, Clone, Copy, Serialize, Deserialize, TryInto)] +#[derive(Debug, Clone, Copy, Serialize, Deserialize, From, TryInto)] pub enum DataValue { Float32(f32), Float64(f64), diff --git a/api/src/client/command.rs b/api/src/client/command.rs index 5d2cfa0..84b3dc3 100644 --- a/api/src/client/command.rs +++ b/api/src/client/command.rs @@ -5,12 +5,18 @@ use std::fmt::Display; use std::sync::Arc; use tokio_util::sync::CancellationToken; -pub struct Commanding; +pub struct CommandRegistry { + client: Arc, +} + +impl CommandRegistry { + pub fn new(client: Arc) -> Self { + Self { client } + } -impl Commanding { pub fn register_handler( - client: Arc, - command_name: String, + &self, + command_name: impl Into, mut callback: F, ) -> CommandHandle where @@ -20,8 +26,9 @@ impl Commanding { let result = CommandHandle { cancellation_token: cancellation_token.clone(), }; + let client = self.client.clone(); - let command_definition = C::create(command_name); + let command_definition = C::create(command_name.into()); tokio::spawn(async move { while !cancellation_token.is_cancelled() { diff --git a/api/src/client/config.rs b/api/src/client/config.rs new file mode 100644 index 0000000..18569df --- /dev/null +++ b/api/src/client/config.rs @@ -0,0 +1,11 @@ +pub struct ClientConfiguration { + pub send_buffer_size: usize, +} + +impl Default for ClientConfiguration { + fn default() -> Self { + Self { + send_buffer_size: 128, + } + } +} diff --git a/api/src/client/context.rs b/api/src/client/context.rs index 48a9e25..362b92f 100644 --- a/api/src/client/context.rs +++ b/api/src/client/context.rs @@ -1,3 +1,4 @@ +use crate::client::config::ClientConfiguration; use crate::client::error::{ConnectError, MessageError}; use crate::client::{Callback, ClientChannel, OutgoingMessage, RegisteredCallback}; use crate::messages::callback::GenericCallbackError; @@ -23,6 +24,7 @@ pub struct ClientContext { pub cancel: CancellationToken, pub request: Request, pub connected_state_tx: watch::Sender, + pub client_configuration: ClientConfiguration, } impl ClientContext { @@ -71,7 +73,7 @@ impl ClientContext { }; info!("Connected to {}", self.request.uri()); - let (tx, rx) = mpsc::channel(128); + let (tx, rx) = mpsc::channel(self.client_configuration.send_buffer_size); *write_lock = tx; drop(write_lock); diff --git a/api/src/client/mod.rs b/api/src/client/mod.rs index 23de189..d333c51 100644 --- a/api/src/client/mod.rs +++ b/api/src/client/mod.rs @@ -1,8 +1,10 @@ pub mod command; +mod config; mod context; pub mod error; pub mod telemetry; +use crate::client::config::ClientConfiguration; use crate::client::error::{MessageError, RequestError}; use crate::messages::callback::GenericCallbackError; use crate::messages::payload::RequestMessagePayload; @@ -41,6 +43,16 @@ pub struct Client { impl Client { pub fn connect(request: R) -> Result + where + R: IntoClientRequest, + { + Self::connect_with_config(request, ClientConfiguration::default()) + } + + pub fn connect_with_config( + request: R, + config: ClientConfiguration, + ) -> Result where R: IntoClientRequest, { @@ -52,6 +64,7 @@ impl Client { cancel: cancel.clone(), request: request.into_client_request()?, connected_state_tx, + client_configuration: config, }; context.start(channel.clone())?; diff --git a/api/src/client/telemetry.rs b/api/src/client/telemetry.rs index 2a536e9..7cf0b70 100644 --- a/api/src/client/telemetry.rs +++ b/api/src/client/telemetry.rs @@ -1,77 +1,104 @@ use crate::client::error::MessageError; use crate::client::Client; -use crate::data_type::DataType; use crate::data_value::DataValue; use crate::messages::telemetry_definition::TelemetryDefinitionRequest; use crate::messages::telemetry_entry::TelemetryEntry; +use api_core::data_type::{DataType, ToDataType}; use chrono::{DateTime, Utc}; +use std::marker::PhantomData; use std::sync::Arc; use tokio::sync::{oneshot, RwLock}; use tokio_util::sync::CancellationToken; use uuid::Uuid; -pub struct Telemetry; +pub struct TelemetryRegistry { + client: Arc, +} -impl Telemetry { - pub async fn register( - client: Arc, - name: String, +impl TelemetryRegistry { + pub fn new(client: Arc) -> Self { + Self { client } + } + + #[inline] + pub async fn register_generic( + &self, + name: impl Into, data_type: DataType, - ) -> TelemetryHandle { - let cancellation_token = CancellationToken::new(); - let cancel_token = cancellation_token.clone(); - let stored_client = client.clone(); + ) -> GenericTelemetryHandle { + // inner for compilation performance + async fn inner( + client: Arc, + name: String, + data_type: DataType, + ) -> GenericTelemetryHandle { + let cancellation_token = CancellationToken::new(); + let cancel_token = cancellation_token.clone(); + let stored_client = client.clone(); - let response_uuid = Arc::new(RwLock::new(Uuid::nil())); + let response_uuid = Arc::new(RwLock::new(Uuid::nil())); - let response_uuid_inner = response_uuid.clone(); - let (tx, rx) = oneshot::channel(); + let response_uuid_inner = response_uuid.clone(); + let (tx, rx) = oneshot::channel(); - tokio::spawn(async move { - let mut write_lock = Some(response_uuid_inner.write().await); - let _ = tx.send(()); - while !cancel_token.is_cancelled() { - if let Ok(response) = client - .send_request(TelemetryDefinitionRequest { - name: name.clone(), - data_type, - }) - .await - { - let mut lock = match write_lock { - None => response_uuid_inner.write().await, - Some(lock) => lock, - }; - // Update the value in the lock - *lock = response.uuid; - // Set this value so the loop works - write_lock = None; + tokio::spawn(async move { + let mut write_lock = Some(response_uuid_inner.write().await); + let _ = tx.send(()); + while !cancel_token.is_cancelled() { + if let Ok(response) = client + .send_request(TelemetryDefinitionRequest { + name: name.clone(), + data_type, + }) + .await + { + let mut lock = match write_lock { + None => response_uuid_inner.write().await, + Some(lock) => lock, + }; + // Update the value in the lock + *lock = response.uuid; + // Set this value so the loop works + write_lock = None; + } + + client.wait_disconnected().await; } + }); - client.wait_disconnected().await; + // Wait until the write lock is acquired + let _ = rx.await; + // Wait until the write lock is released for the first time + drop(response_uuid.read().await); + + GenericTelemetryHandle { + cancellation_token, + uuid: response_uuid, + client: stored_client, } - }); - - // Wait until the write lock is acquired - let _ = rx.await; - // Wait until the write lock is released for the first time - drop(response_uuid.read().await); - - TelemetryHandle { - cancellation_token, - uuid: response_uuid, - client: stored_client, } + inner(self.client.clone(), name.into(), data_type).await + } + + #[inline] + pub async fn register(&self, name: impl Into) -> TelemetryHandle { + self.register_generic(name, T::DATA_TYPE).await.coerce() } } -pub struct TelemetryHandle { +impl Drop for GenericTelemetryHandle { + fn drop(&mut self) { + self.cancellation_token.cancel(); + } +} + +pub struct GenericTelemetryHandle { cancellation_token: CancellationToken, uuid: Arc>, client: Arc, } -impl TelemetryHandle { +impl GenericTelemetryHandle { pub async fn publish( &self, value: DataValue, @@ -97,10 +124,42 @@ impl TelemetryHandle { Ok(()) } -} -impl Drop for TelemetryHandle { - fn drop(&mut self) { - self.cancellation_token.cancel(); + #[inline] + pub async fn publish_now(&self, value: DataValue) -> Result<(), MessageError> { + self.publish(value, Utc::now()).await + } + + fn coerce>(self) -> TelemetryHandle { + TelemetryHandle:: { + generic_handle: self, + _phantom: PhantomData, + } + } +} + +pub struct TelemetryHandle { + generic_handle: GenericTelemetryHandle, + _phantom: PhantomData, +} + +impl TelemetryHandle { + pub fn to_generic(self) -> GenericTelemetryHandle { + self.generic_handle + } + pub fn as_generic(&self) -> &GenericTelemetryHandle { + &self.generic_handle + } +} + +impl> TelemetryHandle { + #[inline] + pub async fn publish(&self, value: T, timestamp: DateTime) -> Result<(), MessageError> { + self.as_generic().publish(value.into(), timestamp).await + } + + #[inline] + pub async fn publish_now(&self, value: T) -> Result<(), MessageError> { + self.publish(value, Utc::now()).await } } diff --git a/examples/simple_command/src/main.rs b/examples/simple_command/src/main.rs index e994e1b..5a9c832 100644 --- a/examples/simple_command/src/main.rs +++ b/examples/simple_command/src/main.rs @@ -1,4 +1,4 @@ -use api::client::command::Commanding; +use api::client::command::CommandRegistry; use api::client::Client; use api::macros::IntoCommandDefinition; use api::messages::command::CommandHeader; @@ -20,13 +20,14 @@ async fn main() -> Result<(), Box> { }); } - let client = Arc::new(Client::connect("ws://[::1]:8080/backend")?); + let client = Arc::new(Client::connect("ws://localhost:8080/backend")?); + let cmd = CommandRegistry::new(client); - let handle = - Commanding::register_handler(client, "simple_command/a".to_string(), handle_command); + let handle = cmd.register_handler("simple_command/a", handle_command); cancellation_token.cancelled().await; + // This will automatically drop when we return drop(handle); Ok(()) diff --git a/examples/simple_producer/src/main.rs b/examples/simple_producer/src/main.rs index 869a2ec..e7ce4c3 100644 --- a/examples/simple_producer/src/main.rs +++ b/examples/simple_producer/src/main.rs @@ -1,10 +1,9 @@ -use api::client::telemetry::Telemetry; +use api::client::telemetry::TelemetryRegistry; use api::client::Client; -use api::data_type::DataType; -use api::data_value::DataValue; use chrono::{TimeDelta, Utc}; use futures_util::future::join_all; use num_traits::FloatConst; +use std::f64; use std::sync::Arc; use std::time::Duration; use tokio::time::{sleep_until, Instant}; @@ -14,111 +13,31 @@ use tokio_util::sync::CancellationToken; async fn main() -> anyhow::Result<()> { env_logger::init(); - let client = Arc::new(Client::connect("ws://[::1]:8080/backend")?); + let client = Arc::new(Client::connect("ws://localhost:8080/backend")?); + let tlm = TelemetryRegistry::new(client); - let time_offset = Telemetry::register( - client.clone(), - "simple_producer/time_offset".into(), - DataType::Float64, - ) + let time_offset = tlm.register::("simple_producer/time_offset").await; + + let publish_offset = tlm.register::("simple_producer/publish_offset").await; + + let await_offset = tlm.register::("simple_producer/await_offset").await; + + let bool_tlm_handle = tlm.register::("simple_producer/bool").await; + + let sin_handles = join_all((1..=6).map(|i| { + tlm.register::(format!( + "simple_producer/sin{}", + if i == 1 { "".into() } else { i.to_string() } + )) + })) .await; - let publish_offset = Telemetry::register( - client.clone(), - "simple_producer/publish_offset".into(), - DataType::Float64, - ) - .await; - - let await_offset = Telemetry::register( - client.clone(), - "simple_producer/await_offset".into(), - DataType::Float64, - ) - .await; - - let sin_tlm_handle = Telemetry::register( - client.clone(), - "simple_producer/sin".into(), - DataType::Float32, - ) - .await; - let cos_tlm_handle = Telemetry::register( - client.clone(), - "simple_producer/cos".into(), - DataType::Float64, - ) - .await; - let bool_tlm_handle = Telemetry::register( - client.clone(), - "simple_producer/bool".into(), - DataType::Boolean, - ) - .await; - - let sin2_tlm_handle = Telemetry::register( - client.clone(), - "simple_producer/sin2".into(), - DataType::Float32, - ) - .await; - let cos2_tlm_handle = Telemetry::register( - client.clone(), - "simple_producer/cos2".into(), - DataType::Float64, - ) - .await; - - let sin3_tlm_handle = Telemetry::register( - client.clone(), - "simple_producer/sin3".into(), - DataType::Float32, - ) - .await; - let cos3_tlm_handle = Telemetry::register( - client.clone(), - "simple_producer/cos3".into(), - DataType::Float64, - ) - .await; - - let sin4_tlm_handle = Telemetry::register( - client.clone(), - "simple_producer/sin4".into(), - DataType::Float32, - ) - .await; - let cos4_tlm_handle = Telemetry::register( - client.clone(), - "simple_producer/cos4".into(), - DataType::Float64, - ) - .await; - - let sin5_tlm_handle = Telemetry::register( - client.clone(), - "simple_producer/sin5".into(), - DataType::Float32, - ) - .await; - let cos5_tlm_handle = Telemetry::register( - client.clone(), - "simple_producer/cos5".into(), - DataType::Float64, - ) - .await; - - let sin6_tlm_handle = Telemetry::register( - client.clone(), - "simple_producer/sin6".into(), - DataType::Float32, - ) - .await; - let cos6_tlm_handle = Telemetry::register( - client.clone(), - "simple_producer/cos6".into(), - DataType::Float64, - ) + let cos_handles = join_all((1..=6).map(|i| { + tlm.register::(format!( + "simple_producer/cos{}", + if i == 1 { "".into() } else { i.to_string() } + )) + })) .await; let cancellation_token = CancellationToken::new(); @@ -134,81 +53,44 @@ async fn main() -> anyhow::Result<()> { let start_instant = Instant::now(); let mut next_time = start_instant; let mut index = 0; - let mut tasks = vec![]; while !cancellation_token.is_cancelled() { next_time += Duration::from_millis(10); index += 1; sleep_until(next_time).await; let publish_time = start_time + TimeDelta::from_std(next_time - start_instant).unwrap(); let actual_time = Instant::now(); - tasks.push(time_offset.publish( - DataValue::Float64((actual_time - next_time).as_secs_f64()), - Utc::now(), - )); - tasks.push(sin_tlm_handle.publish( - DataValue::Float32((f32::TAU() * (index as f32) / (1000.0_f32)).sin()), - publish_time, - )); - tasks.push(cos_tlm_handle.publish( - DataValue::Float64((f64::TAU() * (index as f64) / (1000.0_f64)).cos()), - publish_time, - )); - tasks.push(bool_tlm_handle.publish(DataValue::Boolean(index % 1000 > 500), publish_time)); - tasks.push(sin2_tlm_handle.publish( - DataValue::Float32((f32::TAU() * (index as f32) / (500.0_f32)).sin()), - publish_time, - )); - tasks.push(cos2_tlm_handle.publish( - DataValue::Float64((f64::TAU() * (index as f64) / (500.0_f64)).cos()), - publish_time, - )); - tasks.push(sin3_tlm_handle.publish( - DataValue::Float32((f32::TAU() * (index as f32) / (333.0_f32)).sin()), - publish_time, - )); - tasks.push(cos3_tlm_handle.publish( - DataValue::Float64((f64::TAU() * (index as f64) / (333.0_f64)).cos()), - publish_time, - )); - tasks.push(sin4_tlm_handle.publish( - DataValue::Float32((f32::TAU() * (index as f32) / (250.0_f32)).sin()), - publish_time, - )); - tasks.push(cos4_tlm_handle.publish( - DataValue::Float64((f64::TAU() * (index as f64) / (250.0_f64)).cos()), - publish_time, - )); - tasks.push(sin5_tlm_handle.publish( - DataValue::Float32((f32::TAU() * (index as f32) / (200.0_f32)).sin()), - publish_time, - )); - tasks.push(cos5_tlm_handle.publish( - DataValue::Float64((f64::TAU() * (index as f64) / (200.0_f64)).cos()), - publish_time, - )); - tasks.push(sin6_tlm_handle.publish( - DataValue::Float32((f32::TAU() * (index as f32) / (166.0_f32)).sin()), - publish_time, - )); - tasks.push(cos6_tlm_handle.publish( - DataValue::Float64((f64::TAU() * (index as f64) / (166.0_f64)).cos()), - publish_time, - )); - tasks.push(publish_offset.publish( - DataValue::Float64((Instant::now() - actual_time).as_secs_f64()), - Utc::now(), - )); + // Due to how telemetry handles are implemented, unless the send buffer is full awaiting + // these will return immediately + time_offset + .publish_now((actual_time - next_time).as_secs_f64()) + .await?; + bool_tlm_handle + .publish(index % 1000 > 500, publish_time) + .await?; - // Join the tasks so they all run in parallel - for task in join_all(tasks.drain(..)).await { - task?; + for (i, sin) in sin_handles.iter().enumerate() { + sin.publish( + (f32::TAU() * (index as f32) / (1000.0_f32 / (i + 1) as f32)).sin(), + publish_time, + ) + .await?; + } + for (i, cos) in cos_handles.iter().enumerate() { + cos.publish( + (f64::TAU() * (index as f64) / (1000.0_f64 / (i + 1) as f64)).cos(), + publish_time, + ) + .await?; } - tasks.push(await_offset.publish( - DataValue::Float64((Instant::now() - actual_time).as_secs_f64()), - Utc::now(), - )); + publish_offset + .publish((Instant::now() - actual_time).as_secs_f64(), Utc::now()) + .await?; + + await_offset + .publish((Instant::now() - actual_time).as_secs_f64(), Utc::now()) + .await?; } Ok(())