use chrono::DateTime; use num_traits::float::FloatConst; use server::core::telemetry_service_client::TelemetryServiceClient; use server::core::telemetry_value::Value; use server::core::{ TelemetryDataType, TelemetryDefinitionRequest, TelemetryItem, TelemetryValue, Timestamp, Uuid, }; use std::error::Error; use std::time::Duration; use tokio::select; use tokio::sync::mpsc; use tokio::sync::mpsc::Sender; use tokio::time::Instant; use tokio_util::sync::CancellationToken; use tonic::codegen::tokio_stream::wrappers::ReceiverStream; use tonic::codegen::tokio_stream::StreamExt; use tonic::codegen::StdError; use tonic::transport::Channel; use tonic::Request; struct Telemetry { client: TelemetryServiceClient, tx: Sender, cancel: CancellationToken, } struct TelemetryItemHandle { uuid: String, tx: Sender, } impl Telemetry { pub async fn new(dst: D) -> Result> where D: TryInto, D::Error: Into, { let mut client = TelemetryServiceClient::connect(dst).await?; let client_stored = client.clone(); let cancel = CancellationToken::new(); let cancel_stored = cancel.clone(); let (local_tx, mut local_rx) = mpsc::channel(16); tokio::spawn(async move { while !cancel.is_cancelled() { let (server_tx, server_rx) = mpsc::channel(16); let response_stream = client .insert_telemetry(ReceiverStream::new(server_rx)) .await; if let Ok(response_stream) = response_stream { let mut response_stream = response_stream.into_inner(); loop { select! { _ = cancel.cancelled() => { break; }, Some(item) = local_rx.recv() => { match server_tx.send(item).await { Ok(_) => {} Err(_) => break, } }, Some(response) = response_stream.next() => { match response { Ok(_) => {} Err(_) => { break; } } }, else => break, } } } else { tokio::time::sleep(Duration::from_secs(1)).await; } } }); Ok(Self { client: client_stored, tx: local_tx, cancel: cancel_stored, }) } pub async fn register( &mut self, name: String, data_type: TelemetryDataType, ) -> Result> { let response = self .client .new_telemetry(Request::new(TelemetryDefinitionRequest { name, data_type: data_type.into(), })) .await? .into_inner(); let Some(uuid) = response.uuid else { return Err("UUID Missing".into()); }; Ok(TelemetryItemHandle { uuid: uuid.value, tx: self.tx.clone(), }) } } impl Drop for Telemetry { fn drop(&mut self) { self.cancel.cancel(); } } impl TelemetryItemHandle { pub async fn publish( &self, value: Value, timestamp: DateTime, ) -> Result<(), Box> { let offset_from_unix_epoch = timestamp - DateTime::from_timestamp(0, 0).expect("Could not get Unix epoch"); self.tx .send(TelemetryItem { uuid: Some(Uuid { value: self.uuid.clone(), }), value: Some(TelemetryValue { value: Some(value) }), timestamp: Some(Timestamp { secs: offset_from_unix_epoch.num_seconds(), nanos: offset_from_unix_epoch.subsec_nanos(), }), }) .await?; Ok(()) } } #[tokio::main] async fn main() -> Result<(), Box> { let mut tlm = Telemetry::new("http://[::1]:50051").await?; let index_handle = tlm .register( "simple_producer/time_offset".into(), TelemetryDataType::Float64, ) .await?; let publish_offset = tlm .register( "simple_producer/publish_offset".into(), TelemetryDataType::Float64, ) .await?; let await_offset = tlm .register( "simple_producer/await_offset".into(), TelemetryDataType::Float64, ) .await?; let sin_tlm_handle = tlm .register("simple_producer/sin".into(), TelemetryDataType::Float32) .await?; let cos_tlm_handle = tlm .register("simple_producer/cos".into(), TelemetryDataType::Float64) .await?; let sin2_tlm_handle = tlm .register("simple_producer/sin2".into(), TelemetryDataType::Float32) .await?; let cos2_tlm_handle = tlm .register("simple_producer/cos2".into(), TelemetryDataType::Float64) .await?; let sin3_tlm_handle = tlm .register("simple_producer/sin3".into(), TelemetryDataType::Float32) .await?; let cos3_tlm_handle = tlm .register("simple_producer/cos3".into(), TelemetryDataType::Float64) .await?; let sin4_tlm_handle = tlm .register("simple_producer/sin4".into(), TelemetryDataType::Float32) .await?; let cos4_tlm_handle = tlm .register("simple_producer/cos4".into(), TelemetryDataType::Float64) .await?; let sin5_tlm_handle = tlm .register("simple_producer/sin5".into(), TelemetryDataType::Float32) .await?; let cos5_tlm_handle = tlm .register("simple_producer/cos5".into(), TelemetryDataType::Float64) .await?; let sin6_tlm_handle = tlm .register("simple_producer/sin6".into(), TelemetryDataType::Float32) .await?; let cos6_tlm_handle = tlm .register("simple_producer/cos6".into(), TelemetryDataType::Float64) .await?; let cancellation_token = CancellationToken::new(); { let cancellation_token = cancellation_token.clone(); tokio::spawn(async move { let _ = tokio::signal::ctrl_c().await; cancellation_token.cancel(); }); } let start_time = chrono::Utc::now(); let start_instant = Instant::now(); let mut next_time = start_instant; let mut index = 0; let mut tasks = vec![]; while !cancellation_token.is_cancelled() { next_time += Duration::from_millis(10); index += 1; tokio::time::sleep_until(next_time).await; let publish_time = start_time + chrono::TimeDelta::from_std(next_time - start_instant).unwrap(); let actual_time = Instant::now(); tasks.push(index_handle.publish( Value::Float64((actual_time - next_time).as_secs_f64()), chrono::Utc::now(), )); tasks.push(sin_tlm_handle.publish( Value::Float32((f32::TAU() * (index as f32) / (1000.0_f32)).sin()), publish_time, )); tasks.push(cos_tlm_handle.publish( Value::Float64((f64::TAU() * (index as f64) / (1000.0_f64)).cos()), publish_time, )); tasks.push(sin2_tlm_handle.publish( Value::Float32((f32::TAU() * (index as f32) / (500.0_f32)).sin()), publish_time, )); tasks.push(cos2_tlm_handle.publish( Value::Float64((f64::TAU() * (index as f64) / (500.0_f64)).cos()), publish_time, )); tasks.push(sin3_tlm_handle.publish( Value::Float32((f32::TAU() * (index as f32) / (333.0_f32)).sin()), publish_time, )); tasks.push(cos3_tlm_handle.publish( Value::Float64((f64::TAU() * (index as f64) / (333.0_f64)).cos()), publish_time, )); tasks.push(sin4_tlm_handle.publish( Value::Float32((f32::TAU() * (index as f32) / (250.0_f32)).sin()), publish_time, )); tasks.push(cos4_tlm_handle.publish( Value::Float64((f64::TAU() * (index as f64) / (250.0_f64)).cos()), publish_time, )); tasks.push(sin5_tlm_handle.publish( Value::Float32((f32::TAU() * (index as f32) / (200.0_f32)).sin()), publish_time, )); tasks.push(cos5_tlm_handle.publish( Value::Float64((f64::TAU() * (index as f64) / (200.0_f64)).cos()), publish_time, )); tasks.push(sin6_tlm_handle.publish( Value::Float32((f32::TAU() * (index as f32) / (166.0_f32)).sin()), publish_time, )); tasks.push(cos6_tlm_handle.publish( Value::Float64((f64::TAU() * (index as f64) / (166.0_f64)).cos()), publish_time, )); tasks.push(publish_offset.publish( Value::Float64((Instant::now() - actual_time).as_secs_f64()), chrono::Utc::now(), )); for task in tasks.drain(..) { task.await?; } tasks.push(await_offset.publish( Value::Float64((Instant::now() - actual_time).as_secs_f64()), chrono::Utc::now(), )); } Ok(()) }