From 51af825b27f4ffce053e426d1f9c66e3b26b9d5b Mon Sep 17 00:00:00 2001 From: Sergey Savelyev Date: Sun, 20 Oct 2024 14:40:17 -0700 Subject: [PATCH] save data on backend --- server/src/grpc.rs | 95 ++++++++++++++++++--------------- server/src/http.rs | 20 +++---- server/src/lib.rs | 44 ++------------- server/src/telemetry.rs | 115 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 178 insertions(+), 96 deletions(-) create mode 100644 server/src/telemetry.rs diff --git a/server/src/grpc.rs b/server/src/grpc.rs index 639764e..9a15217 100644 --- a/server/src/grpc.rs +++ b/server/src/grpc.rs @@ -1,10 +1,9 @@ -use std::collections::HashMap; use std::error::Error; use std::pin::Pin; use std::sync::Arc; use log::{error, trace}; use tokio::select; -use tokio::sync::{mpsc, Mutex}; +use tokio::sync::mpsc; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use tonic::{Request, Response, Status, Streaming}; @@ -12,56 +11,39 @@ use tonic::codegen::tokio_stream::{Stream, StreamExt}; use tonic::codegen::tokio_stream::wrappers::ReceiverStream; use tonic::transport::Server; use crate::core::telemetry_service_server::{TelemetryService, TelemetryServiceServer}; -use crate::core::{TelemetryDefinitionRequest, TelemetryDefinitionResponse, TelemetryInsertResponse, TelemetryItem, Uuid}; -use crate::TelemetryDefinition; +use crate::core::{TelemetryDataType, TelemetryDefinitionRequest, TelemetryDefinitionResponse, TelemetryInsertResponse, TelemetryItem, TelemetryValue, Uuid}; +use crate::core::telemetry_value::Value; +use crate::telemetry::{TelemetryDataValue, TelemetryManagementService}; pub struct CoreTelemetryService { - pub map: Arc>>, + pub tlm_management: Arc, pub cancellation_token: CancellationToken, } #[tonic::async_trait] impl TelemetryService for CoreTelemetryService { async fn new_telemetry(&self, request: Request) -> Result, Status> { - trace!("TelemetryService::new_telemetry"); - - let telemetry_definition_request = request.into_inner(); - - let uuid = { - let mut lock = self.map.lock().await; - if let Some(definition) = lock.get(&telemetry_definition_request.name) { - trace!("Telemetry Definition Found {:?}", definition); - if definition.data_type != telemetry_definition_request.data_type() { - return Err(Status::already_exists("A telemetry item of the same name already exists")); - } - definition.uuid.clone() - } else { - trace!("Adding New Telemetry Definition {:?}", telemetry_definition_request); - let uuid = Uuid::random().value; - lock.insert(telemetry_definition_request.name.clone(), TelemetryDefinition { - uuid: uuid.clone(), - name: telemetry_definition_request.name.clone(), - data_type: telemetry_definition_request.data_type().clone(), - }); - uuid - } - }; - - let reply = TelemetryDefinitionResponse { - uuid: Some(Uuid { - value: uuid - }), - }; - - Ok(Response::new(reply)) + trace!("CoreTelemetryService::new_telemetry"); + self.tlm_management.register(request.into_inner()).await + .map(|uuid| + Response::new(TelemetryDefinitionResponse { + uuid: Some(Uuid { + value: uuid + }), + }) + ) + .map_err(|err| + Status::already_exists(err.to_string()) + ) } type InsertTelemetryStream = Pin> + Send>>; async fn insert_telemetry(&self, request: Request>) -> Result, Status> { - trace!("TelemetryService::insert_telemetry"); + trace!("CoreTelemetryService::insert_telemetry"); let cancel_token = self.cancellation_token.clone(); + let tlm_management = self.tlm_management.clone(); let mut in_stream = request.into_inner(); let (tx, rx) = mpsc::channel(128); @@ -74,9 +56,8 @@ impl TelemetryService for CoreTelemetryService { Some(message) = in_stream.next() => { match message { Ok(tlm_item) => { - trace!("tlm_item {:?}", tlm_item); tx - .send(Ok(TelemetryInsertResponse {})) + .send(Self::handle_new_tlm_item(&tlm_management, &tlm_item).await) .await .expect("working rx"); } @@ -94,18 +75,48 @@ impl TelemetryService for CoreTelemetryService { } } -pub fn setup(token: CancellationToken, telemetry_definitions: Arc>>) -> Result, Box> { +impl CoreTelemetryService { + async fn handle_new_tlm_item(tlm_management: &Arc, tlm_item: &TelemetryItem) -> Result { + trace!("CoreTelemetryService::handle_new_tlm_item {:?}", tlm_item); + let Some(ref uuid) = tlm_item.uuid else { + return Err(Status::failed_precondition("UUID Missing")); + }; + let Some(tlm_data) = tlm_management.get_by_uuid(&uuid.value).await else { + return Err(Status::not_found("Telemetry Item Not Found")); + }; + + let Some(TelemetryValue { value: Some(value) }) = tlm_item.value else { + return Err(Status::failed_precondition("Value Missing")); + }; + + let expected_type = match value { + Value::Float32(_) => TelemetryDataType::Float32, + Value::Float64(_) => TelemetryDataType::Float64, + }; + if expected_type != tlm_data.definition.data_type { + return Err(Status::failed_precondition("Data Type Mismatch")); + }; + + let _ = tlm_data.data.send_replace(Some(match value { + Value::Float32(x) => TelemetryDataValue::Float32(x), + Value::Float64(x) => TelemetryDataValue::Float64(x), + })); + + Ok(TelemetryInsertResponse {}) + } +} + +pub fn setup(token: CancellationToken, telemetry_management_service: Arc) -> Result, Box> { let addr = "[::1]:50051".parse()?; Ok(tokio::spawn(async move { let tlm_service = CoreTelemetryService { - map: telemetry_definitions, + tlm_management: telemetry_management_service, cancellation_token: token.clone() }; trace!("Starting gRPC Server"); let result = Server::builder() .add_service(TelemetryServiceServer::new(tlm_service)) - // .serve(addr) .serve_with_shutdown(addr, token.cancelled_owned()) .await; diff --git a/server/src/http.rs b/server/src/http.rs index 3654bd1..4f1dd54 100644 --- a/server/src/http.rs +++ b/server/src/http.rs @@ -1,12 +1,10 @@ -use crate::TelemetryDefinition; use actix_web::http::header::ContentType; use actix_web::http::StatusCode; use actix_web::{error, get, web, App, HttpResponse, HttpServer, Responder}; use derive_more::{Display, Error}; -use std::collections::HashMap; use std::sync::Arc; use log::trace; -use tokio::sync::Mutex; +use crate::telemetry::TelemetryManagementService; #[derive(Debug, Display, Error)] enum UserError { @@ -28,18 +26,14 @@ impl error::ResponseError for UserError { } #[get("/tlm/{name:[\\w\\d/_-]+}")] -async fn get_tlm_definition(data: web::Data>>>, name: web::Path) -> Result { +async fn get_tlm_definition(data: web::Data>, name: web::Path) -> Result { let string = name.to_string(); trace!("get_tlm_definition {}", string); - let data = data.lock().await; - let tlm_def = data.get(&string); - trace!("tlm_def {:?}", tlm_def.clone()); + let Some(data) = data.get_by_name(&string).await else { + return Err(UserError::TlmNotFound { tlm: string }); + }; - if let Some(tlm_def) = tlm_def { - Ok(web::Json(tlm_def.clone())) - } else { - Err(UserError::TlmNotFound { tlm: string }) - } + Ok(web::Json(data.definition.clone())) } fn setup_api(cfg: &mut web::ServiceConfig) { @@ -47,7 +41,7 @@ fn setup_api(cfg: &mut web::ServiceConfig) { .service(get_tlm_definition); } -pub async fn setup(telemetry_definitions: Arc>>) -> Result<(), Box> { +pub async fn setup(telemetry_definitions: Arc) -> Result<(), Box> { let data = web::Data::new(telemetry_definitions); trace!("Starting HTTP Server"); diff --git a/server/src/lib.rs b/server/src/lib.rs index ac32594..b83a089 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -1,54 +1,16 @@ mod uuid; mod grpc; mod http; +mod telemetry; pub mod core { tonic::include_proto!("core"); } -use std::collections::HashMap; -use crate::core::TelemetryDataType; +use crate::telemetry::TelemetryManagementService; use std::error::Error; -use std::fmt::Formatter; use std::sync::Arc; -use tokio::sync::Mutex; use tokio_util::sync::CancellationToken; -use serde::{Deserialize, Deserializer, Serialize, Serializer}; -use serde::de::Visitor; - -fn tlm_data_type_serialzier(tlm_data_type: &TelemetryDataType, serializer: S) -> Result where S: Serializer { - serializer.serialize_str(tlm_data_type.as_str_name()) -} - -struct TlmDataTypeVisitor; - -impl<'de> Visitor<'de> for TlmDataTypeVisitor { - type Value = TelemetryDataType; - - fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result { - formatter.write_str("A &str") - } - - fn visit_str(self, v: &str) -> Result - where - E: serde::de::Error, - { - TelemetryDataType::from_str_name(v).ok_or(E::custom("Invalid TelemetryDataType")) - } -} - -fn tlm_data_type_deserialzier<'de, D>(deserializer: D) -> Result where D: Deserializer<'de> { - deserializer.deserialize_str(TlmDataTypeVisitor) -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -struct TelemetryDefinition { - uuid: String, - name: String, - #[serde(serialize_with = "tlm_data_type_serialzier")] - #[serde(deserialize_with = "tlm_data_type_deserialzier")] - data_type: TelemetryDataType, -} pub async fn setup() -> Result<(), Box> { let cancellation_token = CancellationToken::new(); @@ -60,7 +22,7 @@ pub async fn setup() -> Result<(), Box> { }); } - let tlm = Arc::new(Mutex::new(HashMap::new())); + let tlm = Arc::new(TelemetryManagementService::new()); let grpc_server = grpc::setup(cancellation_token.clone(), tlm.clone())?; diff --git a/server/src/telemetry.rs b/server/src/telemetry.rs new file mode 100644 index 0000000..0411f4a --- /dev/null +++ b/server/src/telemetry.rs @@ -0,0 +1,115 @@ +use std::collections::HashMap; +use std::error::Error; +use std::fmt::Formatter; +use std::sync::Arc; +use log::trace; +use serde::de::Visitor; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; +use tokio::sync::Mutex; +use crate::core::{TelemetryDataType, TelemetryDefinitionRequest, Uuid}; + +fn tlm_data_type_serialzier(tlm_data_type: &TelemetryDataType, serializer: S) -> Result where S: Serializer { + serializer.serialize_str(tlm_data_type.as_str_name()) +} + +struct TlmDataTypeVisitor; + +impl<'de> Visitor<'de> for TlmDataTypeVisitor { + type Value = TelemetryDataType; + + fn expecting(&self, formatter: &mut Formatter) -> std::fmt::Result { + formatter.write_str("A &str") + } + + fn visit_str(self, v: &str) -> Result + where + E: serde::de::Error, + { + TelemetryDataType::from_str_name(v).ok_or(E::custom("Invalid TelemetryDataType")) + } +} + +fn tlm_data_type_deserialzier<'de, D>(deserializer: D) -> Result where D: Deserializer<'de> { + deserializer.deserialize_str(TlmDataTypeVisitor) +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TelemetryDefinition { + pub uuid: String, + pub name: String, + #[serde(serialize_with = "tlm_data_type_serialzier")] + #[serde(deserialize_with = "tlm_data_type_deserialzier")] + pub data_type: TelemetryDataType, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum TelemetryDataValue { + Float32(f32), + Float64(f64), +} + +#[derive(Clone)] +pub struct TelemetryData { + pub definition: TelemetryDefinition, + pub data: tokio::sync::watch::Sender>, +} + +pub struct TelemetryManagementService { + uuid_mapping: Arc>>, + tlm_mapping: Arc>>, +} + +impl TelemetryManagementService { + pub fn new() -> Self { + Self { + uuid_mapping: Arc::new(Mutex::new(HashMap::new())), + tlm_mapping: Arc::new(Mutex::new(HashMap::new())), + } + } + + pub async fn register(&self, telemetry_definition_request: TelemetryDefinitionRequest) -> Result> { + let mut lock = self.uuid_mapping.lock().await; + if let Some(uuid) = lock.get(&telemetry_definition_request.name) { + trace!("Telemetry Definition Found {:?}", uuid); + let tlm_lock = self.tlm_mapping.lock().await; + if let Some(TelemetryData { definition, ..}) = tlm_lock.get(uuid) { + if definition.data_type != telemetry_definition_request.data_type() { + return Err("A telemetry item of the same name already exists".into()); + } + Ok(uuid.clone()) + } else { + Err("Could not find Telemetry Data".into()) + } + } else { + trace!("Adding New Telemetry Definition {:?}", telemetry_definition_request); + let mut tlm_lock = self.tlm_mapping.lock().await; + let uuid = Uuid::random().value; + lock.insert(telemetry_definition_request.name.clone(), uuid.clone()); + tlm_lock.insert(uuid.clone(), TelemetryData { + definition: TelemetryDefinition { + uuid: uuid.clone(), + name: telemetry_definition_request.name.clone(), + data_type: telemetry_definition_request.data_type().clone(), + }, + data: tokio::sync::watch::channel(None).0 + }); + Ok(uuid) + } + } + + pub async fn get_by_name(&self, name: &String) -> Option { + let Some(uuid) = ({ + let uuid_lock = self.uuid_mapping.lock().await; + uuid_lock.get(name).map(|inner| inner.clone()) + }) else { + return None; + }; + + self.get_by_uuid(&uuid).await + } + + pub async fn get_by_uuid(&self, uuid: &String) -> Option { + let tlm_lock = self.tlm_mapping.lock().await; + tlm_lock.get(uuid).map(|inner| inner.clone()) + } +}