From c6b32312d899258b1a998f04ebd2cf4a43b5bdbe Mon Sep 17 00:00:00 2001 From: Sergey Savelyev Date: Sat, 19 Oct 2024 15:21:12 -0700 Subject: [PATCH] clean up producer --- Cargo.lock | 3 + examples/simple_producer/Cargo.toml | 3 + examples/simple_producer/src/main.rs | 175 ++++++++++++++++++++------- server/proto/core.proto | 6 +- 4 files changed, 140 insertions(+), 47 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 82a776d..a4bfd74 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -886,8 +886,11 @@ checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" name = "simple_producer" version = "0.0.0" dependencies = [ + "chrono", + "num-traits", "server", "tokio", + "tokio-util", "tonic", ] diff --git a/examples/simple_producer/Cargo.toml b/examples/simple_producer/Cargo.toml index 48ed9c9..936d388 100644 --- a/examples/simple_producer/Cargo.toml +++ b/examples/simple_producer/Cargo.toml @@ -7,3 +7,6 @@ edition = "2021" server = { path = "../../server" } tonic = "0.12.3" tokio = { version = "1.40.0", features = ["rt-multi-thread"] } +chrono = "0.4.38" +tokio-util = "0.7.12" +num-traits = "0.2.19" diff --git a/examples/simple_producer/src/main.rs b/examples/simple_producer/src/main.rs index 1569b53..a53d111 100644 --- a/examples/simple_producer/src/main.rs +++ b/examples/simple_producer/src/main.rs @@ -1,63 +1,150 @@ +use chrono::DateTime; use server::core::telemetry_service_client::TelemetryServiceClient; use server::core::telemetry_value::Value; -use server::core::{TelemetryDataType, TelemetryDefinitionRequest, TelemetryItem, TelemetryValue, Timestamp}; -use std::f32::consts::TAU; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use server::core::{TelemetryDataType, TelemetryDefinitionRequest, TelemetryItem, TelemetryValue, Timestamp, Uuid}; +use std::error::Error; +use std::future::IntoFuture; +use std::time::Duration; use tokio::sync::mpsc; +use tokio::sync::mpsc::Sender; +use tokio::task::JoinHandle; use tokio::time::Instant; +use tokio_util::sync::CancellationToken; use tonic::codegen::tokio_stream::wrappers::ReceiverStream; use tonic::codegen::tokio_stream::StreamExt; +use tonic::codegen::StdError; +use tonic::transport::Channel; use tonic::Request; +use num_traits::float::FloatConst; + +struct Telemetry { + client: TelemetryServiceClient, + tx: Sender, + handle: Option>, + cancel: CancellationToken, +} + +struct TelemetryItemHandle { + uuid: Vec, + tx: Sender, +} + +impl Telemetry { + pub async fn new(dst: D) -> Result> + where + D: TryInto, + D::Error: Into, + { + let mut client = TelemetryServiceClient::connect(dst).await?; + let mut client_stored = client.clone(); + let cancel = CancellationToken::new(); + let cancel_stored = cancel.clone(); + let (local_tx, mut local_rx) = mpsc::channel(128); + + let handle: JoinHandle<()> = tokio::spawn(async move { + while !cancel.is_cancelled() { + let (server_tx, server_rx) = mpsc::channel(1); + let response_stream = client.insert_telemetry(ReceiverStream::new(server_rx)).await; + if let Ok(response_stream) = response_stream { + let mut response_stream = response_stream.into_inner(); + while let Some(item) = local_rx.recv().await { + match server_tx.send(item).await { + Ok(_) => {}, + Err(_) => { + break + }, + } + if let Some(response) = response_stream.next().await { + match response { + Ok(_) => {}, + Err(_) => { + break; + }, + } + } else { + break; + } + } + } else { + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + () + }); + + Ok(Self { + client: client_stored, + tx: local_tx, + handle: Some(handle), + cancel: cancel_stored, + }) + } + + pub async fn register(&mut self, name: String, data_type: TelemetryDataType) -> Result> { + let response = self.client.new_telemetry(Request::new(TelemetryDefinitionRequest { + name, + data_type: data_type.into(), + })).await?.into_inner(); + + let Some(uuid) = response.uuid else { return Err("UUID Missing".into()); }; + + Ok(TelemetryItemHandle { + uuid: uuid.value, + tx: self.tx.clone(), + }) + } +} + +impl Drop for Telemetry { + fn drop(&mut self) { + self.cancel.cancel(); + } +} + +impl TelemetryItemHandle { + pub async fn publish(&self, value: Value, timestamp: DateTime) -> Result<(), Box> { + let offset_from_unix_epoch = timestamp - DateTime::from_timestamp(0, 0).expect("Could not get Unix epoch"); + self.tx.send(TelemetryItem { + uuid: Some(Uuid { + value: self.uuid.clone() + }), + value: Some(TelemetryValue { + value: Some(value) + }), + timestamp: Some(Timestamp { + secs: offset_from_unix_epoch.num_seconds(), + nanos: offset_from_unix_epoch.subsec_nanos(), + }), + }).await?; + Ok(()) + } +} #[tokio::main] -async fn main() -> Result<(), Box> { - let mut client = TelemetryServiceClient::connect("http://[::1]:50051").await?; +async fn main() -> Result<(), Box> { + let mut tlm = Telemetry::new("http://[::1]:50051").await?; - let request = Request::new(TelemetryDefinitionRequest { - name: "simple_producer/sin".into(), - data_type: TelemetryDataType::Float32.into(), - }); - - let response = client.new_telemetry(request).await?.into_inner(); - - println!("Response={:?}", response); - let Some(uuid) = response.uuid else { return Err("UUID Missing".into()); }; - - let (tx, rx) = mpsc::channel(128); - - let mut response_stream = client.insert_telemetry(ReceiverStream::new(rx)).await?.into_inner(); + let sin_tlm_handle = tlm.register("simple_producer/sin".into(), TelemetryDataType::Float32).await?; + let cos_tlm_handle = tlm.register("simple_producer/cos".into(), TelemetryDataType::Float64).await?; let mut next_time = Instant::now(); let mut index = 0; - loop { + for _ in 0..100 { next_time += Duration::from_millis(100); index += 10; tokio::time::sleep_until(next_time).await; - let now = SystemTime::now().duration_since(UNIX_EPOCH)?; - tx.send(TelemetryItem { - uuid: Some(uuid.clone()), - value: Some(TelemetryValue { - value: Some(Value::Float32( - (TAU * (index as f32) / (1000.0_f32)).sin() - )), - }), - timestamp: Some(Timestamp { - secs: now.as_secs(), - nanos: now.subsec_nanos(), - }), - }).await?; - if let Some(response) = response_stream.next().await { - match response { - Ok(_) => { - } - Err(_) => { - println!("Error"); - } - } - } else { - println!("Stream closed"); - break; - } + sin_tlm_handle.publish( + Value::Float32( + (f32::TAU() * (index as f32) / (1000.0_f32)).sin() + ), + chrono::Utc::now(), + ).await?; + cos_tlm_handle.publish( + Value::Float64( + (f64::TAU() * (index as f64) / (1000.0_f64)).cos() + ), + chrono::Utc::now(), + ).await?; } Ok(()) diff --git a/server/proto/core.proto b/server/proto/core.proto index d523ad8..fad7a20 100644 --- a/server/proto/core.proto +++ b/server/proto/core.proto @@ -10,7 +10,7 @@ enum TelemetryDataType { message TelemetryValue { oneof value { float float_32 = 1; - float float_64 = 2; + double float_64 = 2; } } @@ -20,8 +20,8 @@ message UUID { // UTC since UNIX message Timestamp { - fixed64 secs = 1; - fixed32 nanos = 2; + sfixed64 secs = 1; + sfixed32 nanos = 2; } message TelemetryDefinitionRequest {