use crate::client::error::MessageError; use crate::client::Client; use crate::data_value::DataValue; use crate::messages::telemetry_definition::TelemetryDefinitionRequest; use crate::messages::telemetry_entry::TelemetryEntry; use api_core::data_type::{DataType, ToDataType}; use chrono::{DateTime, Utc}; use std::marker::PhantomData; use std::sync::Arc; use tokio::sync::{oneshot, RwLock}; use tokio_util::sync::CancellationToken; use uuid::Uuid; pub struct TelemetryRegistry { client: Arc, } impl TelemetryRegistry { pub fn new(client: Arc) -> Self { Self { client } } #[inline] pub async fn register_generic( &self, name: impl Into, data_type: DataType, ) -> GenericTelemetryHandle { // inner for compilation performance async fn inner( client: Arc, name: String, data_type: DataType, ) -> GenericTelemetryHandle { let cancellation_token = CancellationToken::new(); let cancel_token = cancellation_token.clone(); let stored_client = client.clone(); let response_uuid = Arc::new(RwLock::new(Uuid::nil())); let response_uuid_inner = response_uuid.clone(); let (tx, rx) = oneshot::channel(); tokio::spawn(async move { let mut write_lock = Some(response_uuid_inner.write().await); let _ = tx.send(()); while !cancel_token.is_cancelled() { if let Ok(response) = client .send_request(TelemetryDefinitionRequest { name: name.clone(), data_type, }) .await { let mut lock = match write_lock { None => response_uuid_inner.write().await, Some(lock) => lock, }; // Update the value in the lock *lock = response.uuid; // Set this value so the loop works write_lock = None; } client.wait_disconnected().await; } }); // Wait until the write lock is acquired let _ = rx.await; // Wait until the write lock is released for the first time drop(response_uuid.read().await); GenericTelemetryHandle { cancellation_token, uuid: response_uuid, client: stored_client, data_type, } } inner(self.client.clone(), name.into(), data_type).await } #[inline] pub async fn register(&self, name: impl Into) -> TelemetryHandle { self.register_generic(name, T::DATA_TYPE).await.coerce() } } impl Drop for GenericTelemetryHandle { fn drop(&mut self) { self.cancellation_token.cancel(); } } pub struct GenericTelemetryHandle { cancellation_token: CancellationToken, uuid: Arc>, client: Arc, data_type: DataType, } impl GenericTelemetryHandle { pub async fn publish( &self, value: DataValue, timestamp: DateTime, ) -> Result<(), MessageError> { if value.to_data_type() != self.data_type { return Err(MessageError::IncorrectDataType { expected: self.data_type, actual: value.to_data_type(), }); } let Ok(lock) = self.uuid.try_read() else { return Ok(()); }; let uuid = *lock; drop(lock); self.client .send_message_if_connected(TelemetryEntry { uuid, value, timestamp, }) .await .or_else(|e| match e { MessageError::TokioLockError(_) => Ok(()), e => Err(e), })?; Ok(()) } #[inline] pub async fn publish_now(&self, value: DataValue) -> Result<(), MessageError> { self.publish(value, Utc::now()).await } fn coerce>(self) -> TelemetryHandle { TelemetryHandle:: { generic_handle: self, _phantom: PhantomData, } } } pub struct TelemetryHandle { generic_handle: GenericTelemetryHandle, _phantom: PhantomData, } impl TelemetryHandle { pub fn to_generic(self) -> GenericTelemetryHandle { self.generic_handle } pub fn as_generic(&self) -> &GenericTelemetryHandle { &self.generic_handle } } impl> TelemetryHandle { #[inline] pub async fn publish(&self, value: T, timestamp: DateTime) -> Result<(), MessageError> { self.as_generic().publish(value.into(), timestamp).await } #[inline] pub async fn publish_now(&self, value: T) -> Result<(), MessageError> { self.publish(value, Utc::now()).await } } #[cfg(test)] mod tests { use crate::client::error::MessageError; use crate::client::telemetry::TelemetryRegistry; use crate::client::tests::create_test_client; use crate::client::Callback; use crate::messages::payload::RequestMessagePayload; use crate::messages::telemetry_definition::{ TelemetryDefinitionRequest, TelemetryDefinitionResponse, }; use crate::messages::telemetry_entry::TelemetryEntry; use crate::messages::ResponseMessage; use api_core::data_type::DataType; use api_core::data_value::DataValue; use futures_util::FutureExt; use std::sync::Arc; use std::time::Duration; use tokio::task::yield_now; use tokio::time::timeout; use tokio::try_join; use uuid::Uuid; #[tokio::test] async fn generic() { // if _c drops then we are disconnected let (mut rx, _c, client) = create_test_client(); let tlm = TelemetryRegistry::new(Arc::new(client)); let tlm_handle = tlm.register_generic("generic", DataType::Float32); let tlm_uuid = Uuid::new_v4(); let expected_rx = async { let msg = rx.recv().await.unwrap(); let Callback::Once(responder) = msg.callback else { panic!("Expected Once Callback"); }; assert!(msg.msg.response.is_none()); let RequestMessagePayload::TelemetryDefinitionRequest(TelemetryDefinitionRequest { name, data_type, }) = msg.msg.payload else { panic!("Expected Telemetry Definition Request") }; assert_eq!(name, "generic".to_string()); assert_eq!(data_type, DataType::Float32); responder .send(ResponseMessage { uuid: Uuid::new_v4(), response: Some(msg.msg.uuid), payload: TelemetryDefinitionResponse { uuid: tlm_uuid }.into(), }) .unwrap(); }; let (tlm_handle, _) = try_join!( timeout(Duration::from_secs(1), tlm_handle), timeout(Duration::from_secs(1), expected_rx), ) .unwrap(); assert_eq!(*tlm_handle.uuid.try_read().unwrap(), tlm_uuid); // This should NOT block if there is space in the queue tlm_handle .publish_now(0.0f32.into()) .now_or_never() .unwrap() .unwrap(); let tlm_msg = timeout(Duration::from_secs(1), rx.recv()) .await .unwrap() .unwrap(); assert!(matches!(tlm_msg.callback, Callback::None)); match tlm_msg.msg.payload { RequestMessagePayload::TelemetryEntry(TelemetryEntry { uuid, value, .. }) => { assert_eq!(uuid, tlm_uuid); assert_eq!(value, DataValue::Float32(0.0f32)); } _ => panic!("Expected Telemetry Entry"), } } #[tokio::test] async fn mismatched_type() { let (mut rx, _, client) = create_test_client(); let tlm = TelemetryRegistry::new(Arc::new(client)); let tlm_handle = tlm.register_generic("generic", DataType::Float32); let tlm_uuid = Uuid::new_v4(); let expected_rx = async { let msg = rx.recv().await.unwrap(); let Callback::Once(responder) = msg.callback else { panic!("Expected Once Callback"); }; assert!(msg.msg.response.is_none()); let RequestMessagePayload::TelemetryDefinitionRequest(TelemetryDefinitionRequest { name, data_type, }) = msg.msg.payload else { panic!("Expected Telemetry Definition Request") }; assert_eq!(name, "generic".to_string()); assert_eq!(data_type, DataType::Float32); responder .send(ResponseMessage { uuid: Uuid::new_v4(), response: Some(msg.msg.uuid), payload: TelemetryDefinitionResponse { uuid: tlm_uuid }.into(), }) .unwrap(); }; let (tlm_handle, _) = try_join!( timeout(Duration::from_secs(1), tlm_handle), timeout(Duration::from_secs(1), expected_rx), ) .unwrap(); assert_eq!(*tlm_handle.uuid.try_read().unwrap(), tlm_uuid); match timeout( Duration::from_secs(1), tlm_handle.publish_now(0.0f64.into()), ) .await .unwrap() { Err(MessageError::IncorrectDataType { expected, actual }) => { assert_eq!(expected, DataType::Float32); assert_eq!(actual, DataType::Float64); } _ => panic!("Error Expected"), } } #[tokio::test] async fn typed() { // if _c drops then we are disconnected let (mut rx, _c, client) = create_test_client(); let tlm = TelemetryRegistry::new(Arc::new(client)); let tlm_handle = tlm.register::("typed"); let tlm_uuid = Uuid::new_v4(); let expected_rx = async { let msg = rx.recv().await.unwrap(); let Callback::Once(responder) = msg.callback else { panic!("Expected Once Callback"); }; assert!(msg.msg.response.is_none()); let RequestMessagePayload::TelemetryDefinitionRequest(TelemetryDefinitionRequest { name, data_type, }) = msg.msg.payload else { panic!("Expected Telemetry Definition Request") }; assert_eq!(name, "typed".to_string()); assert_eq!(data_type, DataType::Boolean); responder .send(ResponseMessage { uuid: Uuid::new_v4(), response: Some(msg.msg.uuid), payload: TelemetryDefinitionResponse { uuid: tlm_uuid }.into(), }) .unwrap(); }; let (tlm_handle, _) = try_join!( timeout(Duration::from_secs(1), tlm_handle), timeout(Duration::from_secs(1), expected_rx), ) .unwrap(); assert_eq!(*tlm_handle.as_generic().uuid.try_read().unwrap(), tlm_uuid); // This should NOT block if there is space in the queue tlm_handle .publish_now(true) .now_or_never() .unwrap() .unwrap(); // This should block as there should not be space in the queue assert!(tlm_handle.publish_now(false).now_or_never().is_none()); let tlm_msg = timeout(Duration::from_secs(1), rx.recv()) .await .unwrap() .unwrap(); assert!(matches!(tlm_msg.callback, Callback::None)); match tlm_msg.msg.payload { RequestMessagePayload::TelemetryEntry(TelemetryEntry { uuid, value, .. }) => { assert_eq!(uuid, tlm_uuid); assert_eq!(value, DataValue::Boolean(true)); } _ => panic!("Expected Telemetry Entry"), } let _make_generic_again = tlm_handle.to_generic(); } #[tokio::test] async fn reconnect() { // if _c drops then we are disconnected let (mut rx, connected, client) = create_test_client(); let tlm = TelemetryRegistry::new(Arc::new(client)); let tlm_handle = tlm.register_generic("generic", DataType::Float32); let tlm_uuid = Uuid::new_v4(); let expected_rx = async { let msg = rx.recv().await.unwrap(); let Callback::Once(responder) = msg.callback else { panic!("Expected Once Callback"); }; assert!(msg.msg.response.is_none()); let RequestMessagePayload::TelemetryDefinitionRequest(TelemetryDefinitionRequest { name, data_type, }) = msg.msg.payload else { panic!("Expected Telemetry Definition Request") }; assert_eq!(name, "generic".to_string()); assert_eq!(data_type, DataType::Float32); responder .send(ResponseMessage { uuid: Uuid::new_v4(), response: Some(msg.msg.uuid), payload: TelemetryDefinitionResponse { uuid: tlm_uuid }.into(), }) .unwrap(); }; let (tlm_handle, _) = try_join!( timeout(Duration::from_secs(1), tlm_handle), timeout(Duration::from_secs(1), expected_rx), ) .unwrap(); assert_eq!(*tlm_handle.uuid.try_read().unwrap(), tlm_uuid); // Notify Disconnect connected.send_replace(false); // Notify Reconnect connected.send_replace(true); { let new_tlm_uuid = Uuid::new_v4(); let msg = rx.recv().await.unwrap(); let Callback::Once(responder) = msg.callback else { panic!("Expected Once Callback"); }; assert!(msg.msg.response.is_none()); let RequestMessagePayload::TelemetryDefinitionRequest(TelemetryDefinitionRequest { name, data_type, }) = msg.msg.payload else { panic!("Expected Telemetry Definition Request") }; assert_eq!(name, "generic".to_string()); assert_eq!(data_type, DataType::Float32); responder .send(ResponseMessage { uuid: Uuid::new_v4(), response: Some(msg.msg.uuid), payload: TelemetryDefinitionResponse { uuid: new_tlm_uuid }.into(), }) .unwrap(); // Yield to the executor so that the UUIDs can be updated yield_now().await; assert_eq!(*tlm_handle.uuid.try_read().unwrap(), new_tlm_uuid); } } }