From f47512010f24e88690d0471c42941da69249d593 Mon Sep 17 00:00:00 2001 From: Sergey Savelyev Date: Sat, 19 Oct 2024 15:51:35 -0700 Subject: [PATCH] stores telemetry definitions --- examples/simple_producer/src/main.rs | 8 ++--- server/src/lib.rs | 51 +++++++++++++++++++++++----- 2 files changed, 45 insertions(+), 14 deletions(-) diff --git a/examples/simple_producer/src/main.rs b/examples/simple_producer/src/main.rs index a53d111..d0e82c2 100644 --- a/examples/simple_producer/src/main.rs +++ b/examples/simple_producer/src/main.rs @@ -3,11 +3,9 @@ use server::core::telemetry_service_client::TelemetryServiceClient; use server::core::telemetry_value::Value; use server::core::{TelemetryDataType, TelemetryDefinitionRequest, TelemetryItem, TelemetryValue, Timestamp, Uuid}; use std::error::Error; -use std::future::IntoFuture; use std::time::Duration; use tokio::sync::mpsc; use tokio::sync::mpsc::Sender; -use tokio::task::JoinHandle; use tokio::time::Instant; use tokio_util::sync::CancellationToken; use tonic::codegen::tokio_stream::wrappers::ReceiverStream; @@ -20,7 +18,6 @@ use num_traits::float::FloatConst; struct Telemetry { client: TelemetryServiceClient, tx: Sender, - handle: Option>, cancel: CancellationToken, } @@ -36,12 +33,12 @@ impl Telemetry { D::Error: Into, { let mut client = TelemetryServiceClient::connect(dst).await?; - let mut client_stored = client.clone(); + let client_stored = client.clone(); let cancel = CancellationToken::new(); let cancel_stored = cancel.clone(); let (local_tx, mut local_rx) = mpsc::channel(128); - let handle: JoinHandle<()> = tokio::spawn(async move { + tokio::spawn(async move { while !cancel.is_cancelled() { let (server_tx, server_rx) = mpsc::channel(1); let response_stream = client.insert_telemetry(ReceiverStream::new(server_rx)).await; @@ -75,7 +72,6 @@ impl Telemetry { Ok(Self { client: client_stored, tx: local_tx, - handle: Some(handle), cancel: cancel_stored, }) } diff --git a/server/src/lib.rs b/server/src/lib.rs index 5d3e177..6328ab3 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -4,28 +4,61 @@ pub mod core { tonic::include_proto!("core"); } +use std::collections::HashMap; use std::error::Error; use std::pin::Pin; use log::{trace}; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, Mutex}; use tonic::{Request, Response, Status, Streaming}; use tonic::codegen::tokio_stream::{Stream, StreamExt}; use tonic::codegen::tokio_stream::wrappers::ReceiverStream; use tonic::transport::Server; use core::telemetry_service_server::TelemetryService; -use crate::core::{TelemetryDefinitionRequest, TelemetryDefinitionResponse, TelemetryInsertResponse, TelemetryItem, Uuid}; +use crate::core::{TelemetryDataType, TelemetryDefinitionRequest, TelemetryDefinitionResponse, TelemetryInsertResponse, TelemetryItem, Uuid}; use crate::core::telemetry_service_server::TelemetryServiceServer; -#[derive(Debug, Default)] -pub struct CoreTelemetryService {} +#[derive(Debug)] +struct TelemetryDefinition { + uuid: Vec, + // name: String, + data_type: TelemetryDataType, +} + +pub struct CoreTelemetryService { + map: Mutex> +} #[tonic::async_trait] impl TelemetryService for CoreTelemetryService { async fn new_telemetry(&self, request: Request) -> Result, Status> { - trace!("TelemetryService::new_telemetry {:?}", request); + trace!("TelemetryService::new_telemetry"); + + let telemetry_definition_request = request.into_inner(); + + let uuid = { + let mut lock = self.map.lock().await; + if let Some(definition) = lock.get(&telemetry_definition_request.name) { + trace!("Telemetry Definition Found {:?}", definition); + if definition.data_type != telemetry_definition_request.data_type() { + return Err(Status::already_exists("A telemetry item of the same name already exists")); + } + definition.uuid.clone() + } else { + trace!("Adding New Telemetry Definition {:?}", telemetry_definition_request); + let uuid = Uuid::random().value; + lock.insert(telemetry_definition_request.name.clone(), TelemetryDefinition { + uuid: uuid.clone(), + // name: telemetry_definition_request.name.clone(), + data_type: telemetry_definition_request.data_type().clone(), + }); + uuid + } + }; let reply = TelemetryDefinitionResponse { - uuid: Some(Uuid::random()), + uuid: Some(Uuid { + value: uuid + }), }; Ok(Response::new(reply)) @@ -34,7 +67,7 @@ impl TelemetryService for CoreTelemetryService { type InsertTelemetryStream = Pin> + Send>>; async fn insert_telemetry(&self, request: Request>) -> Result, Status> { - trace!("TelemetryService::insert_telemetry {:?}", request); + trace!("TelemetryService::insert_telemetry"); let mut in_stream = request.into_inner(); let (tx, rx) = mpsc::channel(128); @@ -62,7 +95,9 @@ impl TelemetryService for CoreTelemetryService { pub async fn setup() -> Result<(), Box> { let addr = "[::1]:50051".parse()?; - let tlm_service = CoreTelemetryService::default(); + let tlm_service = CoreTelemetryService { + map: Mutex::new(HashMap::new()), + }; Server::builder() .add_service(TelemetryServiceServer::new(tlm_service))