From 6fdbb868b7afec90f07795a73dc7927337f440c0 Mon Sep 17 00:00:00 2001 From: Sergey Savelyev Date: Tue, 30 Dec 2025 19:15:49 -0500 Subject: [PATCH] add telemetry producer to api --- api/src/client/context.rs | 242 +++++++++++++++ api/src/client/mod.rs | 241 +------------- api/src/client/telemetry.rs | 106 +++++++ examples/simple_producer/src/main.rs | 449 ++++++++++++--------------- 4 files changed, 547 insertions(+), 491 deletions(-) create mode 100644 api/src/client/context.rs create mode 100644 api/src/client/telemetry.rs diff --git a/api/src/client/context.rs b/api/src/client/context.rs new file mode 100644 index 0000000..48a9e25 --- /dev/null +++ b/api/src/client/context.rs @@ -0,0 +1,242 @@ +use crate::client::error::{ConnectError, MessageError}; +use crate::client::{Callback, ClientChannel, OutgoingMessage, RegisteredCallback}; +use crate::messages::callback::GenericCallbackError; +use crate::messages::payload::RequestMessagePayload; +use crate::messages::{RequestMessage, ResponseMessage}; +use futures_util::{SinkExt, StreamExt}; +use log::{debug, error, info, trace, warn}; +use std::collections::HashMap; +use std::sync::mpsc::sync_channel; +use std::thread; +use std::time::Duration; +use tokio::net::TcpStream; +use tokio::sync::{mpsc, oneshot, watch, RwLockWriteGuard}; +use tokio::time::sleep; +use tokio::{select, spawn}; +use tokio_tungstenite::tungstenite::handshake::client::Request; +use tokio_tungstenite::tungstenite::Message; +use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream}; +use tokio_util::sync::CancellationToken; +use uuid::Uuid; + +pub struct ClientContext { + pub cancel: CancellationToken, + pub request: Request, + pub connected_state_tx: watch::Sender, +} + +impl ClientContext { + pub fn start(mut self, channel: ClientChannel) -> Result<(), ConnectError> { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build()?; + + let (tx, rx) = sync_channel::<()>(1); + + let _detached = thread::Builder::new() + .name("tlm-client".to_string()) + .spawn(move || { + runtime.block_on(async { + let mut write_lock = channel.write().await; + + // This cannot fail + let _ = tx.send(()); + + while !self.cancel.is_cancelled() { + write_lock = self.run_connection(write_lock, &channel).await; + } + drop(write_lock); + }); + })?; + + // This cannot fail + let _ = rx.recv(); + + Ok(()) + } + + async fn run_connection<'a>( + &mut self, + mut write_lock: RwLockWriteGuard<'a, mpsc::Sender>, + channel: &'a ClientChannel, + ) -> RwLockWriteGuard<'a, mpsc::Sender> { + debug!("Attempting to Connect to {}", self.request.uri()); + let mut ws = match connect_async(self.request.clone()).await { + Ok((ws, _)) => ws, + Err(e) => { + info!("Failed to Connect: {e}"); + sleep(Duration::from_secs(1)).await; + return write_lock; + } + }; + info!("Connected to {}", self.request.uri()); + + let (tx, rx) = mpsc::channel(128); + *write_lock = tx; + drop(write_lock); + + // Don't care about the previous value + let _ = self.connected_state_tx.send_replace(true); + + let close_connection = self.handle_connection(&mut ws, rx, channel).await; + + let write_lock = channel.write().await; + // Send this after grabbing the lock - to prevent extra contention when others try to grab + // the lock to use that as a signal that we have reconnected + let _ = self.connected_state_tx.send_replace(false); + if close_connection { + if let Err(e) = ws.close(None).await { + error!("Failed to Close the Connection: {e}"); + } + } + write_lock + } + + async fn handle_connection( + &mut self, + ws: &mut WebSocketStream>, + mut rx: mpsc::Receiver, + channel: &ClientChannel, + ) -> bool { + let mut callbacks = HashMap::::new(); + loop { + select! { + _ = self.cancel.cancelled() => { break; }, + Some(msg) = ws.next() => { + match msg { + Ok(msg) => { + match msg { + Message::Text(msg) => { + trace!("Incoming: {msg}"); + let msg: ResponseMessage = match serde_json::from_str(&msg) { + Ok(m) => m, + Err(e) => { + error!("Failed to deserialize {e}"); + break; + } + }; + self.handle_incoming(msg, &mut callbacks, channel).await; + } + Message::Binary(_) => unimplemented!("Binary Data Not Implemented"), + Message::Ping(data) => { + if let Err(e) = ws.send(Message::Pong(data)).await { + error!("Failed to send Pong {e}"); + break; + } + } + Message::Pong(_) => { + // Intentionally Left Empty + } + Message::Close(_) => { + debug!("Websocket Closed"); + return false; + } + Message::Frame(_) => unreachable!("Not Possible"), + } + } + Err(e) => { + error!("Receive Error {e}"); + break; + } + } + } + Some(msg) = rx.recv() => { + // Insert a callback if it isn't a None callback + if !matches!(msg.callback, Callback::None) { + callbacks.insert(msg.msg.uuid, msg.callback); + } + let msg = match serde_json::to_string(&msg.msg) { + Ok(m) => m, + Err(e) => { + error!("Encode Error {e}"); + break; + } + }; + trace!("Outgoing: {msg}"); + if let Err(e) = ws.send(Message::Text(msg.into())).await { + error!("Send Error {e}"); + break; + } + } + else => { break; }, + } + } + true + } + + async fn handle_incoming( + &mut self, + msg: ResponseMessage, + callbacks: &mut HashMap, + channel: &ClientChannel, + ) { + if let Some(response_uuid) = msg.response { + match callbacks.get(&response_uuid) { + Some(Callback::None) => { + callbacks.remove(&response_uuid); + unreachable!("We skip registering callbacks of None type"); + } + Some(Callback::Once(_)) => { + let Some(Callback::Once(callback)) = callbacks.remove(&response_uuid) else { + return; + }; + let _ = callback.send(msg); + } + Some(Callback::Registered(callback)) => { + let callback = callback.clone(); + spawn(Self::handle_registered_callback( + callback, + msg, + channel.clone(), + )); + } + None => { + warn!("No Callback Registered for {response_uuid}"); + } + } + } + } + + async fn handle_registered_callback( + callback: RegisteredCallback, + msg: ResponseMessage, + channel: ClientChannel, + ) { + let (tx, rx) = oneshot::channel(); + + let uuid = msg.uuid; + + let response = match callback.send((msg, tx)).await { + Err(_) => GenericCallbackError::CallbackClosed.into(), + Ok(()) => rx + .await + .unwrap_or_else(|_| GenericCallbackError::CallbackClosed.into()), + }; + + if let Err(e) = Self::send_response(channel, response, uuid).await { + error!("Failed to send response {e}"); + } + } + + async fn send_response( + channel: ClientChannel, + payload: RequestMessagePayload, + response_uuid: Uuid, + ) -> Result<(), MessageError> { + // If this failed that means we're in the middle of reconnecting, so our callbacks + // are all being cleaned up as-is. No response needed. + let sender = channel.try_read()?; + let data = sender.reserve().await?; + + data.send(OutgoingMessage { + msg: RequestMessage { + uuid: Uuid::new_v4(), + response: Some(response_uuid), + payload, + }, + callback: Callback::None, + }); + + Ok(()) + } +} diff --git a/api/src/client/mod.rs b/api/src/client/mod.rs index e02c41c..ad5f53d 100644 --- a/api/src/client/mod.rs +++ b/api/src/client/mod.rs @@ -1,4 +1,6 @@ +mod context; pub mod error; +pub mod telemetry; use crate::client::error::{MessageError, RequestError}; use crate::messages::callback::GenericCallbackError; @@ -7,23 +9,12 @@ use crate::messages::payload::ResponseMessagePayload; use crate::messages::{ ClientMessage, RegisterCallback, RequestMessage, RequestResponse, ResponseMessage, }; +use context::ClientContext; use error::ConnectError; -use futures_util::stream::StreamExt; -use futures_util::SinkExt; -use log::{debug, error, info, trace, warn}; -use std::collections::HashMap; -use std::sync::mpsc::sync_channel; use std::sync::Arc; -use std::thread; -use std::time::Duration; -use tokio::net::TcpStream; -use tokio::sync::{mpsc, oneshot, watch, RwLock, RwLockWriteGuard}; -use tokio::time::sleep; -use tokio::{select, spawn}; +use tokio::spawn; +use tokio::sync::{mpsc, oneshot, watch, RwLock}; use tokio_tungstenite::tungstenite::client::IntoClientRequest; -use tokio_tungstenite::tungstenite::handshake::client::Request; -use tokio_tungstenite::tungstenite::Message; -use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream}; use tokio_util::sync::CancellationToken; use uuid::Uuid; @@ -47,12 +38,6 @@ pub struct Client { connected_state_rx: watch::Receiver, } -struct ClientContext { - cancel: CancellationToken, - request: Request, - connected_state_tx: watch::Sender, -} - impl Client { pub fn connect(request: R) -> Result where @@ -260,222 +245,6 @@ impl Client { } } -impl ClientContext { - fn start(mut self, channel: ClientChannel) -> Result<(), ConnectError> { - let runtime = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build()?; - - let (tx, rx) = sync_channel::<()>(1); - - let _detached = thread::Builder::new() - .name("tlm-client".to_string()) - .spawn(move || { - runtime.block_on(async { - let mut write_lock = channel.write().await; - - // This cannot fail - let _ = tx.send(()); - - while !self.cancel.is_cancelled() { - write_lock = self.run_connection(write_lock, &channel).await; - } - drop(write_lock); - }); - })?; - - // This cannot fail - let _ = rx.recv(); - - Ok(()) - } - - async fn run_connection<'a>( - &mut self, - mut write_lock: RwLockWriteGuard<'a, mpsc::Sender>, - channel: &'a ClientChannel, - ) -> RwLockWriteGuard<'a, mpsc::Sender> { - debug!("Attempting to Connect to {}", self.request.uri()); - let mut ws = match connect_async(self.request.clone()).await { - Ok((ws, _)) => ws, - Err(e) => { - info!("Failed to Connect: {e}"); - sleep(Duration::from_secs(1)).await; - return write_lock; - } - }; - info!("Connected to {}", self.request.uri()); - - let (tx, rx) = mpsc::channel(128); - *write_lock = tx; - drop(write_lock); - - // Don't care about the previous value - let _ = self.connected_state_tx.send_replace(true); - - let close_connection = self.handle_connection(&mut ws, rx, channel).await; - - let write_lock = channel.write().await; - // Send this after grabbing the lock - to prevent extra contention when others try to grab - // the lock to use that as a signal that we have reconnected - let _ = self.connected_state_tx.send_replace(false); - if close_connection { - if let Err(e) = ws.close(None).await { - error!("Failed to Close the Connection: {e}"); - } - } - write_lock - } - - async fn handle_connection( - &mut self, - ws: &mut WebSocketStream>, - mut rx: mpsc::Receiver, - channel: &ClientChannel, - ) -> bool { - let mut callbacks = HashMap::::new(); - loop { - select! { - _ = self.cancel.cancelled() => { break; }, - Some(msg) = ws.next() => { - match msg { - Ok(msg) => { - match msg { - Message::Text(msg) => { - trace!("Incoming: {msg}"); - let msg: ResponseMessage = match serde_json::from_str(&msg) { - Ok(m) => m, - Err(e) => { - error!("Failed to deserialize {e}"); - break; - } - }; - self.handle_incoming(msg, &mut callbacks, channel).await; - } - Message::Binary(_) => unimplemented!("Binary Data Not Implemented"), - Message::Ping(data) => { - if let Err(e) = ws.send(Message::Pong(data)).await { - error!("Failed to send Pong {e}"); - break; - } - } - Message::Pong(_) => { - // Intentionally Left Empty - } - Message::Close(_) => { - debug!("Websocket Closed"); - return false; - } - Message::Frame(_) => unreachable!("Not Possible"), - } - } - Err(e) => { - error!("Receive Error {e}"); - break; - } - } - } - Some(msg) = rx.recv() => { - // Insert a callback if it isn't a None callback - if !matches!(msg.callback, Callback::None) { - callbacks.insert(msg.msg.uuid, msg.callback); - } - let msg = match serde_json::to_string(&msg.msg) { - Ok(m) => m, - Err(e) => { - error!("Encode Error {e}"); - break; - } - }; - trace!("Outgoing: {msg}"); - if let Err(e) = ws.send(Message::Text(msg.into())).await { - error!("Send Error {e}"); - break; - } - } - else => { break; }, - } - } - true - } - - async fn handle_incoming( - &mut self, - msg: ResponseMessage, - callbacks: &mut HashMap, - channel: &ClientChannel, - ) { - if let Some(response_uuid) = msg.response { - match callbacks.get(&response_uuid) { - Some(Callback::None) => { - callbacks.remove(&response_uuid); - unreachable!("We skip registering callbacks of None type"); - } - Some(Callback::Once(_)) => { - let Some(Callback::Once(callback)) = callbacks.remove(&response_uuid) else { - return; - }; - let _ = callback.send(msg); - } - Some(Callback::Registered(callback)) => { - let callback = callback.clone(); - spawn(Self::handle_registered_callback( - callback, - msg, - channel.clone(), - )); - } - None => { - warn!("No Callback Registered for {response_uuid}"); - } - } - } - } - - async fn handle_registered_callback( - callback: RegisteredCallback, - msg: ResponseMessage, - channel: ClientChannel, - ) { - let (tx, rx) = oneshot::channel(); - - let uuid = msg.uuid; - - let response = match callback.send((msg, tx)).await { - Err(_) => GenericCallbackError::CallbackClosed.into(), - Ok(()) => rx - .await - .unwrap_or_else(|_| GenericCallbackError::CallbackClosed.into()), - }; - - if let Err(e) = Self::send_response(channel, response, uuid).await { - error!("Failed to send response {e}"); - } - } - - async fn send_response( - channel: ClientChannel, - payload: RequestMessagePayload, - response_uuid: Uuid, - ) -> Result<(), MessageError> { - // If this failed that means we're in the middle of reconnecting, so our callbacks - // are all being cleaned up as-is. No response needed. - let sender = channel.try_read()?; - let data = sender.reserve().await?; - - data.send(OutgoingMessage { - msg: RequestMessage { - uuid: Uuid::new_v4(), - response: Some(response_uuid), - payload, - }, - callback: Callback::None, - }); - - Ok(()) - } -} - impl Drop for Client { fn drop(&mut self) { self.cancel.cancel(); diff --git a/api/src/client/telemetry.rs b/api/src/client/telemetry.rs new file mode 100644 index 0000000..2a536e9 --- /dev/null +++ b/api/src/client/telemetry.rs @@ -0,0 +1,106 @@ +use crate::client::error::MessageError; +use crate::client::Client; +use crate::data_type::DataType; +use crate::data_value::DataValue; +use crate::messages::telemetry_definition::TelemetryDefinitionRequest; +use crate::messages::telemetry_entry::TelemetryEntry; +use chrono::{DateTime, Utc}; +use std::sync::Arc; +use tokio::sync::{oneshot, RwLock}; +use tokio_util::sync::CancellationToken; +use uuid::Uuid; + +pub struct Telemetry; + +impl Telemetry { + pub async fn register( + client: Arc, + name: String, + data_type: DataType, + ) -> TelemetryHandle { + let cancellation_token = CancellationToken::new(); + let cancel_token = cancellation_token.clone(); + let stored_client = client.clone(); + + let response_uuid = Arc::new(RwLock::new(Uuid::nil())); + + let response_uuid_inner = response_uuid.clone(); + let (tx, rx) = oneshot::channel(); + + tokio::spawn(async move { + let mut write_lock = Some(response_uuid_inner.write().await); + let _ = tx.send(()); + while !cancel_token.is_cancelled() { + if let Ok(response) = client + .send_request(TelemetryDefinitionRequest { + name: name.clone(), + data_type, + }) + .await + { + let mut lock = match write_lock { + None => response_uuid_inner.write().await, + Some(lock) => lock, + }; + // Update the value in the lock + *lock = response.uuid; + // Set this value so the loop works + write_lock = None; + } + + client.wait_disconnected().await; + } + }); + + // Wait until the write lock is acquired + let _ = rx.await; + // Wait until the write lock is released for the first time + drop(response_uuid.read().await); + + TelemetryHandle { + cancellation_token, + uuid: response_uuid, + client: stored_client, + } + } +} + +pub struct TelemetryHandle { + cancellation_token: CancellationToken, + uuid: Arc>, + client: Arc, +} + +impl TelemetryHandle { + pub async fn publish( + &self, + value: DataValue, + timestamp: DateTime, + ) -> Result<(), MessageError> { + let Ok(lock) = self.uuid.try_read() else { + return Ok(()); + }; + let uuid = *lock; + drop(lock); + + self.client + .send_message_if_connected(TelemetryEntry { + uuid, + value, + timestamp, + }) + .await + .or_else(|e| match e { + MessageError::TokioLockError(_) => Ok(()), + e => Err(e), + })?; + + Ok(()) + } +} + +impl Drop for TelemetryHandle { + fn drop(&mut self) { + self.cancellation_token.cancel(); + } +} diff --git a/examples/simple_producer/src/main.rs b/examples/simple_producer/src/main.rs index 9426c1c..869a2ec 100644 --- a/examples/simple_producer/src/main.rs +++ b/examples/simple_producer/src/main.rs @@ -1,276 +1,215 @@ -use api::client::error::MessageError; +use api::client::telemetry::Telemetry; use api::client::Client; use api::data_type::DataType; use api::data_value::DataValue; -use api::messages::telemetry_definition::TelemetryDefinitionRequest; -use api::messages::telemetry_entry::TelemetryEntry; -use chrono::{DateTime, TimeDelta, Utc}; +use chrono::{TimeDelta, Utc}; use futures_util::future::join_all; use num_traits::FloatConst; use std::sync::Arc; use std::time::Duration; -use tokio::sync::{oneshot, RwLock}; use tokio::time::{sleep_until, Instant}; use tokio_util::sync::CancellationToken; -use uuid::Uuid; - -struct Telemetry { - client: Arc, -} - -struct TelemetryItemHandle { - cancellation_token: CancellationToken, - uuid: Arc>, - client: Arc, -} - -impl Telemetry { - pub fn new(client: Arc) -> Self { - Self { client } - } - - pub async fn register( - &self, - name: String, - data_type: DataType, - ) -> anyhow::Result { - let cancellation_token = CancellationToken::new(); - let cancel_token = cancellation_token.clone(); - let client = self.client.clone(); - - let response_uuid = Arc::new(RwLock::new(Uuid::nil())); - - let response_uuid_inner = response_uuid.clone(); - let (tx, rx) = oneshot::channel(); - - tokio::spawn(async move { - let mut write_lock = Some(response_uuid_inner.write().await); - let _ = tx.send(()); - while !cancel_token.is_cancelled() { - if let Ok(response) = client - .send_request(TelemetryDefinitionRequest { - name: name.clone(), - data_type, - }) - .await - { - let mut lock = match write_lock { - None => response_uuid_inner.write().await, - Some(lock) => lock, - }; - // Update the value in the lock - *lock = response.uuid; - // Set this value so the loop works - write_lock = None; - } - - client.wait_disconnected().await; - } - }); - - // Wait until the write lock is acquired - let _ = rx.await; - // Wait until the write lock is released for the first time - drop(response_uuid.read().await); - - Ok(TelemetryItemHandle { - cancellation_token, - uuid: response_uuid, - client: self.client.clone(), - }) - } -} - -impl TelemetryItemHandle { - pub async fn publish(&self, value: DataValue, timestamp: DateTime) -> anyhow::Result<()> { - let Ok(lock) = self.uuid.try_read() else { - return Ok(()); - }; - let uuid = *lock; - drop(lock); - - self.client - .send_message_if_connected(TelemetryEntry { - uuid, - value, - timestamp, - }) - .await - .or_else(|e| match e { - MessageError::TokioLockError(_) => Ok(()), - e => Err(e), - })?; - - Ok(()) - } -} - -impl Drop for TelemetryItemHandle { - fn drop(&mut self) { - self.cancellation_token.cancel(); - } -} #[tokio::main] async fn main() -> anyhow::Result<()> { env_logger::init(); let client = Arc::new(Client::connect("ws://[::1]:8080/backend")?); - let tlm = Telemetry::new(client); + let time_offset = Telemetry::register( + client.clone(), + "simple_producer/time_offset".into(), + DataType::Float64, + ) + .await; + + let publish_offset = Telemetry::register( + client.clone(), + "simple_producer/publish_offset".into(), + DataType::Float64, + ) + .await; + + let await_offset = Telemetry::register( + client.clone(), + "simple_producer/await_offset".into(), + DataType::Float64, + ) + .await; + + let sin_tlm_handle = Telemetry::register( + client.clone(), + "simple_producer/sin".into(), + DataType::Float32, + ) + .await; + let cos_tlm_handle = Telemetry::register( + client.clone(), + "simple_producer/cos".into(), + DataType::Float64, + ) + .await; + let bool_tlm_handle = Telemetry::register( + client.clone(), + "simple_producer/bool".into(), + DataType::Boolean, + ) + .await; + + let sin2_tlm_handle = Telemetry::register( + client.clone(), + "simple_producer/sin2".into(), + DataType::Float32, + ) + .await; + let cos2_tlm_handle = Telemetry::register( + client.clone(), + "simple_producer/cos2".into(), + DataType::Float64, + ) + .await; + + let sin3_tlm_handle = Telemetry::register( + client.clone(), + "simple_producer/sin3".into(), + DataType::Float32, + ) + .await; + let cos3_tlm_handle = Telemetry::register( + client.clone(), + "simple_producer/cos3".into(), + DataType::Float64, + ) + .await; + + let sin4_tlm_handle = Telemetry::register( + client.clone(), + "simple_producer/sin4".into(), + DataType::Float32, + ) + .await; + let cos4_tlm_handle = Telemetry::register( + client.clone(), + "simple_producer/cos4".into(), + DataType::Float64, + ) + .await; + + let sin5_tlm_handle = Telemetry::register( + client.clone(), + "simple_producer/sin5".into(), + DataType::Float32, + ) + .await; + let cos5_tlm_handle = Telemetry::register( + client.clone(), + "simple_producer/cos5".into(), + DataType::Float64, + ) + .await; + + let sin6_tlm_handle = Telemetry::register( + client.clone(), + "simple_producer/sin6".into(), + DataType::Float32, + ) + .await; + let cos6_tlm_handle = Telemetry::register( + client.clone(), + "simple_producer/cos6".into(), + DataType::Float64, + ) + .await; + + let cancellation_token = CancellationToken::new(); { - let time_offset = tlm - .register("simple_producer/time_offset".into(), DataType::Float64) - .await?; - - let publish_offset = tlm - .register("simple_producer/publish_offset".into(), DataType::Float64) - .await?; - - let await_offset = tlm - .register("simple_producer/await_offset".into(), DataType::Float64) - .await?; - - let sin_tlm_handle = tlm - .register("simple_producer/sin".into(), DataType::Float32) - .await?; - let cos_tlm_handle = tlm - .register("simple_producer/cos".into(), DataType::Float64) - .await?; - let bool_tlm_handle = tlm - .register("simple_producer/bool".into(), DataType::Boolean) - .await?; - - let sin2_tlm_handle = tlm - .register("simple_producer/sin2".into(), DataType::Float32) - .await?; - let cos2_tlm_handle = tlm - .register("simple_producer/cos2".into(), DataType::Float64) - .await?; - - let sin3_tlm_handle = tlm - .register("simple_producer/sin3".into(), DataType::Float32) - .await?; - let cos3_tlm_handle = tlm - .register("simple_producer/cos3".into(), DataType::Float64) - .await?; - - let sin4_tlm_handle = tlm - .register("simple_producer/sin4".into(), DataType::Float32) - .await?; - let cos4_tlm_handle = tlm - .register("simple_producer/cos4".into(), DataType::Float64) - .await?; - - let sin5_tlm_handle = tlm - .register("simple_producer/sin5".into(), DataType::Float32) - .await?; - let cos5_tlm_handle = tlm - .register("simple_producer/cos5".into(), DataType::Float64) - .await?; - - let sin6_tlm_handle = tlm - .register("simple_producer/sin6".into(), DataType::Float32) - .await?; - let cos6_tlm_handle = tlm - .register("simple_producer/cos6".into(), DataType::Float64) - .await?; - - let cancellation_token = CancellationToken::new(); - { - let cancellation_token = cancellation_token.clone(); - tokio::spawn(async move { - let _ = tokio::signal::ctrl_c().await; - cancellation_token.cancel(); - println!("Cancellation Token Cancelled"); - }); - } - - let start_time = Utc::now(); - let start_instant = Instant::now(); - let mut next_time = start_instant; - let mut index = 0; - let mut tasks = vec![]; - while !cancellation_token.is_cancelled() { - next_time += Duration::from_millis(1000); - index += 1; - sleep_until(next_time).await; - let publish_time = start_time + TimeDelta::from_std(next_time - start_instant).unwrap(); - let actual_time = Instant::now(); - tasks.push(time_offset.publish( - DataValue::Float64((actual_time - next_time).as_secs_f64()), - Utc::now(), - )); - tasks.push(sin_tlm_handle.publish( - DataValue::Float32((f32::TAU() * (index as f32) / (1000.0_f32)).sin()), - publish_time, - )); - tasks.push(cos_tlm_handle.publish( - DataValue::Float64((f64::TAU() * (index as f64) / (1000.0_f64)).cos()), - publish_time, - )); - tasks.push( - bool_tlm_handle.publish(DataValue::Boolean(index % 1000 > 500), publish_time), - ); - tasks.push(sin2_tlm_handle.publish( - DataValue::Float32((f32::TAU() * (index as f32) / (500.0_f32)).sin()), - publish_time, - )); - tasks.push(cos2_tlm_handle.publish( - DataValue::Float64((f64::TAU() * (index as f64) / (500.0_f64)).cos()), - publish_time, - )); - tasks.push(sin3_tlm_handle.publish( - DataValue::Float32((f32::TAU() * (index as f32) / (333.0_f32)).sin()), - publish_time, - )); - tasks.push(cos3_tlm_handle.publish( - DataValue::Float64((f64::TAU() * (index as f64) / (333.0_f64)).cos()), - publish_time, - )); - tasks.push(sin4_tlm_handle.publish( - DataValue::Float32((f32::TAU() * (index as f32) / (250.0_f32)).sin()), - publish_time, - )); - tasks.push(cos4_tlm_handle.publish( - DataValue::Float64((f64::TAU() * (index as f64) / (250.0_f64)).cos()), - publish_time, - )); - tasks.push(sin5_tlm_handle.publish( - DataValue::Float32((f32::TAU() * (index as f32) / (200.0_f32)).sin()), - publish_time, - )); - tasks.push(cos5_tlm_handle.publish( - DataValue::Float64((f64::TAU() * (index as f64) / (200.0_f64)).cos()), - publish_time, - )); - tasks.push(sin6_tlm_handle.publish( - DataValue::Float32((f32::TAU() * (index as f32) / (166.0_f32)).sin()), - publish_time, - )); - tasks.push(cos6_tlm_handle.publish( - DataValue::Float64((f64::TAU() * (index as f64) / (166.0_f64)).cos()), - publish_time, - )); - - tasks.push(publish_offset.publish( - DataValue::Float64((Instant::now() - actual_time).as_secs_f64()), - Utc::now(), - )); - - // Join the tasks so they all run in parallel - for task in join_all(tasks.drain(..)).await { - task?; - } - - tasks.push(await_offset.publish( - DataValue::Float64((Instant::now() - actual_time).as_secs_f64()), - Utc::now(), - )); - } - println!("Exiting Loop"); + let cancellation_token = cancellation_token.clone(); + tokio::spawn(async move { + let _ = tokio::signal::ctrl_c().await; + cancellation_token.cancel(); + }); + } + + let start_time = Utc::now(); + let start_instant = Instant::now(); + let mut next_time = start_instant; + let mut index = 0; + let mut tasks = vec![]; + while !cancellation_token.is_cancelled() { + next_time += Duration::from_millis(10); + index += 1; + sleep_until(next_time).await; + let publish_time = start_time + TimeDelta::from_std(next_time - start_instant).unwrap(); + let actual_time = Instant::now(); + tasks.push(time_offset.publish( + DataValue::Float64((actual_time - next_time).as_secs_f64()), + Utc::now(), + )); + tasks.push(sin_tlm_handle.publish( + DataValue::Float32((f32::TAU() * (index as f32) / (1000.0_f32)).sin()), + publish_time, + )); + tasks.push(cos_tlm_handle.publish( + DataValue::Float64((f64::TAU() * (index as f64) / (1000.0_f64)).cos()), + publish_time, + )); + tasks.push(bool_tlm_handle.publish(DataValue::Boolean(index % 1000 > 500), publish_time)); + tasks.push(sin2_tlm_handle.publish( + DataValue::Float32((f32::TAU() * (index as f32) / (500.0_f32)).sin()), + publish_time, + )); + tasks.push(cos2_tlm_handle.publish( + DataValue::Float64((f64::TAU() * (index as f64) / (500.0_f64)).cos()), + publish_time, + )); + tasks.push(sin3_tlm_handle.publish( + DataValue::Float32((f32::TAU() * (index as f32) / (333.0_f32)).sin()), + publish_time, + )); + tasks.push(cos3_tlm_handle.publish( + DataValue::Float64((f64::TAU() * (index as f64) / (333.0_f64)).cos()), + publish_time, + )); + tasks.push(sin4_tlm_handle.publish( + DataValue::Float32((f32::TAU() * (index as f32) / (250.0_f32)).sin()), + publish_time, + )); + tasks.push(cos4_tlm_handle.publish( + DataValue::Float64((f64::TAU() * (index as f64) / (250.0_f64)).cos()), + publish_time, + )); + tasks.push(sin5_tlm_handle.publish( + DataValue::Float32((f32::TAU() * (index as f32) / (200.0_f32)).sin()), + publish_time, + )); + tasks.push(cos5_tlm_handle.publish( + DataValue::Float64((f64::TAU() * (index as f64) / (200.0_f64)).cos()), + publish_time, + )); + tasks.push(sin6_tlm_handle.publish( + DataValue::Float32((f32::TAU() * (index as f32) / (166.0_f32)).sin()), + publish_time, + )); + tasks.push(cos6_tlm_handle.publish( + DataValue::Float64((f64::TAU() * (index as f64) / (166.0_f64)).cos()), + publish_time, + )); + + tasks.push(publish_offset.publish( + DataValue::Float64((Instant::now() - actual_time).as_secs_f64()), + Utc::now(), + )); + + // Join the tasks so they all run in parallel + for task in join_all(tasks.drain(..)).await { + task?; + } + + tasks.push(await_offset.publish( + DataValue::Float64((Instant::now() - actual_time).as_secs_f64()), + Utc::now(), + )); } - drop(tlm); Ok(()) }