diff --git a/examples/simple_producer/src/main.rs b/examples/simple_producer/src/main.rs index 6f07eef..d68821e 100644 --- a/examples/simple_producer/src/main.rs +++ b/examples/simple_producer/src/main.rs @@ -1,7 +1,10 @@ use chrono::DateTime; +use num_traits::float::FloatConst; use server::core::telemetry_service_client::TelemetryServiceClient; use server::core::telemetry_value::Value; -use server::core::{TelemetryDataType, TelemetryDefinitionRequest, TelemetryItem, TelemetryValue, Timestamp, Uuid}; +use server::core::{ + TelemetryDataType, TelemetryDefinitionRequest, TelemetryItem, TelemetryValue, Timestamp, Uuid, +}; use std::error::Error; use std::time::Duration; use tokio::sync::mpsc; @@ -13,7 +16,6 @@ use tonic::codegen::tokio_stream::StreamExt; use tonic::codegen::StdError; use tonic::transport::Channel; use tonic::Request; -use num_traits::float::FloatConst; struct Telemetry { client: TelemetryServiceClient, @@ -41,22 +43,22 @@ impl Telemetry { tokio::spawn(async move { while !cancel.is_cancelled() { let (server_tx, server_rx) = mpsc::channel(1); - let response_stream = client.insert_telemetry(ReceiverStream::new(server_rx)).await; + let response_stream = client + .insert_telemetry(ReceiverStream::new(server_rx)) + .await; if let Ok(response_stream) = response_stream { let mut response_stream = response_stream.into_inner(); while let Some(item) = local_rx.recv().await { match server_tx.send(item).await { - Ok(_) => {}, - Err(_) => { - break - }, + Ok(_) => {} + Err(_) => break, } if let Some(response) = response_stream.next().await { match response { - Ok(_) => {}, + Ok(_) => {} Err(_) => { break; - }, + } } } else { break; @@ -75,13 +77,23 @@ impl Telemetry { }) } - pub async fn register(&mut self, name: String, data_type: TelemetryDataType) -> Result> { - let response = self.client.new_telemetry(Request::new(TelemetryDefinitionRequest { - name, - data_type: data_type.into(), - })).await?.into_inner(); + pub async fn register( + &mut self, + name: String, + data_type: TelemetryDataType, + ) -> Result> { + let response = self + .client + .new_telemetry(Request::new(TelemetryDefinitionRequest { + name, + data_type: data_type.into(), + })) + .await? + .into_inner(); - let Some(uuid) = response.uuid else { return Err("UUID Missing".into()); }; + let Some(uuid) = response.uuid else { + return Err("UUID Missing".into()); + }; Ok(TelemetryItemHandle { uuid: uuid.value, @@ -97,20 +109,25 @@ impl Drop for Telemetry { } impl TelemetryItemHandle { - pub async fn publish(&self, value: Value, timestamp: DateTime) -> Result<(), Box> { - let offset_from_unix_epoch = timestamp - DateTime::from_timestamp(0, 0).expect("Could not get Unix epoch"); - self.tx.send(TelemetryItem { - uuid: Some(Uuid { - value: self.uuid.clone() - }), - value: Some(TelemetryValue { - value: Some(value) - }), - timestamp: Some(Timestamp { - secs: offset_from_unix_epoch.num_seconds(), - nanos: offset_from_unix_epoch.subsec_nanos(), - }), - }).await?; + pub async fn publish( + &self, + value: Value, + timestamp: DateTime, + ) -> Result<(), Box> { + let offset_from_unix_epoch = + timestamp - DateTime::from_timestamp(0, 0).expect("Could not get Unix epoch"); + self.tx + .send(TelemetryItem { + uuid: Some(Uuid { + value: self.uuid.clone(), + }), + value: Some(TelemetryValue { value: Some(value) }), + timestamp: Some(Timestamp { + secs: offset_from_unix_epoch.num_seconds(), + nanos: offset_from_unix_epoch.subsec_nanos(), + }), + }) + .await?; Ok(()) } } @@ -119,23 +136,47 @@ impl TelemetryItemHandle { async fn main() -> Result<(), Box> { let mut tlm = Telemetry::new("http://[::1]:50051").await?; - let sin_tlm_handle = tlm.register("simple_producer/sin".into(), TelemetryDataType::Float32).await?; - let cos_tlm_handle = tlm.register("simple_producer/cos".into(), TelemetryDataType::Float64).await?; + let sin_tlm_handle = tlm + .register("simple_producer/sin".into(), TelemetryDataType::Float32) + .await?; + let cos_tlm_handle = tlm + .register("simple_producer/cos".into(), TelemetryDataType::Float64) + .await?; - let sin2_tlm_handle = tlm.register("simple_producer/sin2".into(), TelemetryDataType::Float32).await?; - let cos2_tlm_handle = tlm.register("simple_producer/cos2".into(), TelemetryDataType::Float64).await?; + let sin2_tlm_handle = tlm + .register("simple_producer/sin2".into(), TelemetryDataType::Float32) + .await?; + let cos2_tlm_handle = tlm + .register("simple_producer/cos2".into(), TelemetryDataType::Float64) + .await?; - let sin3_tlm_handle = tlm.register("simple_producer/sin3".into(), TelemetryDataType::Float32).await?; - let cos3_tlm_handle = tlm.register("simple_producer/cos3".into(), TelemetryDataType::Float64).await?; + let sin3_tlm_handle = tlm + .register("simple_producer/sin3".into(), TelemetryDataType::Float32) + .await?; + let cos3_tlm_handle = tlm + .register("simple_producer/cos3".into(), TelemetryDataType::Float64) + .await?; - let sin4_tlm_handle = tlm.register("simple_producer/sin4".into(), TelemetryDataType::Float32).await?; - let cos4_tlm_handle = tlm.register("simple_producer/cos4".into(), TelemetryDataType::Float64).await?; + let sin4_tlm_handle = tlm + .register("simple_producer/sin4".into(), TelemetryDataType::Float32) + .await?; + let cos4_tlm_handle = tlm + .register("simple_producer/cos4".into(), TelemetryDataType::Float64) + .await?; - let sin5_tlm_handle = tlm.register("simple_producer/sin5".into(), TelemetryDataType::Float32).await?; - let cos5_tlm_handle = tlm.register("simple_producer/cos5".into(), TelemetryDataType::Float64).await?; + let sin5_tlm_handle = tlm + .register("simple_producer/sin5".into(), TelemetryDataType::Float32) + .await?; + let cos5_tlm_handle = tlm + .register("simple_producer/cos5".into(), TelemetryDataType::Float64) + .await?; - let sin6_tlm_handle = tlm.register("simple_producer/sin6".into(), TelemetryDataType::Float32).await?; - let cos6_tlm_handle = tlm.register("simple_producer/cos6".into(), TelemetryDataType::Float64).await?; + let sin6_tlm_handle = tlm + .register("simple_producer/sin6".into(), TelemetryDataType::Float32) + .await?; + let cos6_tlm_handle = tlm + .register("simple_producer/cos6".into(), TelemetryDataType::Float64) + .await?; let cancellation_token = CancellationToken::new(); { @@ -152,78 +193,78 @@ async fn main() -> Result<(), Box> { next_time += Duration::from_millis(100); index += 10; tokio::time::sleep_until(next_time).await; - sin_tlm_handle.publish( - Value::Float32( - (f32::TAU() * (index as f32) / (1000.0_f32)).sin() - ), - chrono::Utc::now(), - ).await?; - cos_tlm_handle.publish( - Value::Float64( - (f64::TAU() * (index as f64) / (1000.0_f64)).cos() - ), - chrono::Utc::now(), - ).await?; - sin2_tlm_handle.publish( - Value::Float32( - (f32::TAU() * (index as f32) / (500.0_f32)).sin() - ), - chrono::Utc::now(), - ).await?; - cos2_tlm_handle.publish( - Value::Float64( - (f64::TAU() * (index as f64) / (500.0_f64)).cos() - ), - chrono::Utc::now(), - ).await?; - sin3_tlm_handle.publish( - Value::Float32( - (f32::TAU() * (index as f32) / (333.0_f32)).sin() - ), - chrono::Utc::now(), - ).await?; - cos3_tlm_handle.publish( - Value::Float64( - (f64::TAU() * (index as f64) / (333.0_f64)).cos() - ), - chrono::Utc::now(), - ).await?; - sin4_tlm_handle.publish( - Value::Float32( - (f32::TAU() * (index as f32) / (250.0_f32)).sin() - ), - chrono::Utc::now(), - ).await?; - cos4_tlm_handle.publish( - Value::Float64( - (f64::TAU() * (index as f64) / (250.0_f64)).cos() - ), - chrono::Utc::now(), - ).await?; - sin5_tlm_handle.publish( - Value::Float32( - (f32::TAU() * (index as f32) / (200.0_f32)).sin() - ), - chrono::Utc::now(), - ).await?; - cos5_tlm_handle.publish( - Value::Float64( - (f64::TAU() * (index as f64) / (200.0_f64)).cos() - ), - chrono::Utc::now(), - ).await?; - sin6_tlm_handle.publish( - Value::Float32( - (f32::TAU() * (index as f32) / (166.0_f32)).sin() - ), - chrono::Utc::now(), - ).await?; - cos6_tlm_handle.publish( - Value::Float64( - (f64::TAU() * (index as f64) / (166.0_f64)).cos() - ), - chrono::Utc::now(), - ).await?; + sin_tlm_handle + .publish( + Value::Float32((f32::TAU() * (index as f32) / (1000.0_f32)).sin()), + chrono::Utc::now(), + ) + .await?; + cos_tlm_handle + .publish( + Value::Float64((f64::TAU() * (index as f64) / (1000.0_f64)).cos()), + chrono::Utc::now(), + ) + .await?; + sin2_tlm_handle + .publish( + Value::Float32((f32::TAU() * (index as f32) / (500.0_f32)).sin()), + chrono::Utc::now(), + ) + .await?; + cos2_tlm_handle + .publish( + Value::Float64((f64::TAU() * (index as f64) / (500.0_f64)).cos()), + chrono::Utc::now(), + ) + .await?; + sin3_tlm_handle + .publish( + Value::Float32((f32::TAU() * (index as f32) / (333.0_f32)).sin()), + chrono::Utc::now(), + ) + .await?; + cos3_tlm_handle + .publish( + Value::Float64((f64::TAU() * (index as f64) / (333.0_f64)).cos()), + chrono::Utc::now(), + ) + .await?; + sin4_tlm_handle + .publish( + Value::Float32((f32::TAU() * (index as f32) / (250.0_f32)).sin()), + chrono::Utc::now(), + ) + .await?; + cos4_tlm_handle + .publish( + Value::Float64((f64::TAU() * (index as f64) / (250.0_f64)).cos()), + chrono::Utc::now(), + ) + .await?; + sin5_tlm_handle + .publish( + Value::Float32((f32::TAU() * (index as f32) / (200.0_f32)).sin()), + chrono::Utc::now(), + ) + .await?; + cos5_tlm_handle + .publish( + Value::Float64((f64::TAU() * (index as f64) / (200.0_f64)).cos()), + chrono::Utc::now(), + ) + .await?; + sin6_tlm_handle + .publish( + Value::Float32((f32::TAU() * (index as f32) / (166.0_f32)).sin()), + chrono::Utc::now(), + ) + .await?; + cos6_tlm_handle + .publish( + Value::Float64((f64::TAU() * (index as f64) / (166.0_f64)).cos()), + chrono::Utc::now(), + ) + .await?; } Ok(()) diff --git a/frontend/src/components/SvgGraph.vue b/frontend/src/components/SvgGraph.vue index 5e2aef3..0e07e7e 100644 --- a/frontend/src/components/SvgGraph.vue +++ b/frontend/src/components/SvgGraph.vue @@ -140,8 +140,7 @@ const lines = computed(() => { :timestamp="tick" :utc="props.utc" :show_millis="duration < 1000" - > - + > diff --git a/frontend/src/components/TimeText.vue b/frontend/src/components/TimeText.vue index de86ceb..8cf1979 100644 --- a/frontend/src/components/TimeText.vue +++ b/frontend/src/components/TimeText.vue @@ -5,8 +5,8 @@ const props = defineProps<{ x: number; y: number; timestamp: number; - utc?: boolean - show_millis?: boolean + utc?: boolean; + show_millis?: boolean; }>(); // This function is slow @@ -61,18 +61,12 @@ function getDateString(date: Date) { const timetext = computed(() => { return getDateString(new Date(props.timestamp)); }); - - \ No newline at end of file + diff --git a/frontend/src/components/ValueLabel.vue b/frontend/src/components/ValueLabel.vue index 63780b3..61f4743 100644 --- a/frontend/src/components/ValueLabel.vue +++ b/frontend/src/components/ValueLabel.vue @@ -1,6 +1,5 @@ @@ -59,4 +57,4 @@ text { fill: var(--indexed-color); dominant-baseline: middle; } - \ No newline at end of file + diff --git a/server/build.rs b/server/build.rs index cdb3490..ebafb76 100644 --- a/server/build.rs +++ b/server/build.rs @@ -1,4 +1,3 @@ - fn main() -> Result<(), Box> { tonic_build::compile_protos("proto/core.proto")?; Ok(()) diff --git a/server/src/grpc.rs b/server/src/grpc.rs index 44625de..35d023a 100644 --- a/server/src/grpc.rs +++ b/server/src/grpc.rs @@ -1,20 +1,23 @@ +use crate::core::telemetry_service_server::{TelemetryService, TelemetryServiceServer}; +use crate::core::telemetry_value::Value; +use crate::core::{ + TelemetryDataType, TelemetryDefinitionRequest, TelemetryDefinitionResponse, + TelemetryInsertResponse, TelemetryItem, TelemetryValue, Uuid, +}; +use crate::telemetry::{TelemetryDataItem, TelemetryDataValue, TelemetryManagementService}; +use chrono::{DateTime, SecondsFormat}; +use log::{error, trace}; use std::error::Error; use std::pin::Pin; use std::sync::Arc; -use chrono::{DateTime, SecondsFormat}; -use log::{error, trace}; use tokio::select; use tokio::sync::mpsc; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; -use tonic::{Request, Response, Status, Streaming}; -use tonic::codegen::tokio_stream::{Stream, StreamExt}; use tonic::codegen::tokio_stream::wrappers::ReceiverStream; +use tonic::codegen::tokio_stream::{Stream, StreamExt}; use tonic::transport::Server; -use crate::core::telemetry_service_server::{TelemetryService, TelemetryServiceServer}; -use crate::core::{TelemetryDataType, TelemetryDefinitionRequest, TelemetryDefinitionResponse, TelemetryInsertResponse, TelemetryItem, TelemetryValue, Uuid}; -use crate::core::telemetry_value::Value; -use crate::telemetry::{TelemetryDataItem, TelemetryDataValue, TelemetryManagementService}; +use tonic::{Request, Response, Status, Streaming}; pub struct CoreTelemetryService { pub tlm_management: Arc, @@ -23,24 +26,29 @@ pub struct CoreTelemetryService { #[tonic::async_trait] impl TelemetryService for CoreTelemetryService { - async fn new_telemetry(&self, request: Request) -> Result, Status> { + async fn new_telemetry( + &self, + request: Request, + ) -> Result, Status> { trace!("CoreTelemetryService::new_telemetry"); - self.tlm_management.register(request.into_inner()).await - .map(|uuid| + self.tlm_management + .register(request.into_inner()) + .await + .map(|uuid| { Response::new(TelemetryDefinitionResponse { - uuid: Some(Uuid { - value: uuid - }), + uuid: Some(Uuid { value: uuid }), }) - ) - .map_err(|err| - Status::already_exists(err.to_string()) - ) + }) + .map_err(|err| Status::already_exists(err.to_string())) } - type InsertTelemetryStream = Pin> + Send>>; + type InsertTelemetryStream = + Pin> + Send>>; - async fn insert_telemetry(&self, request: Request>) -> Result, Status> { + async fn insert_telemetry( + &self, + request: Request>, + ) -> Result, Status> { trace!("CoreTelemetryService::insert_telemetry"); let cancel_token = self.cancellation_token.clone(); @@ -77,7 +85,10 @@ impl TelemetryService for CoreTelemetryService { } impl CoreTelemetryService { - async fn handle_new_tlm_item(tlm_management: &Arc, tlm_item: &TelemetryItem) -> Result { + async fn handle_new_tlm_item( + tlm_management: &Arc, + tlm_item: &TelemetryItem, + ) -> Result { trace!("CoreTelemetryService::handle_new_tlm_item {:?}", tlm_item); let Some(ref uuid) = tlm_item.uuid else { return Err(Status::failed_precondition("UUID Missing")); @@ -102,7 +113,8 @@ impl CoreTelemetryService { return Err(Status::failed_precondition("Data Type Mismatch")); }; - let Some(timestamp) = DateTime::from_timestamp(timestamp.secs, timestamp.nanos as u32) else { + let Some(timestamp) = DateTime::from_timestamp(timestamp.secs, timestamp.nanos as u32) + else { return Err(Status::invalid_argument("Failed to construct UTC DateTime")); }; @@ -111,19 +123,22 @@ impl CoreTelemetryService { Value::Float32(x) => TelemetryDataValue::Float32(x), Value::Float64(x) => TelemetryDataValue::Float64(x), }, - timestamp: timestamp.to_rfc3339_opts(SecondsFormat::Millis, true) + timestamp: timestamp.to_rfc3339_opts(SecondsFormat::Millis, true), })); Ok(TelemetryInsertResponse {}) } } -pub fn setup(token: CancellationToken, telemetry_management_service: Arc) -> Result, Box> { +pub fn setup( + token: CancellationToken, + telemetry_management_service: Arc, +) -> Result, Box> { let addr = "[::1]:50051".parse()?; Ok(tokio::spawn(async move { let tlm_service = CoreTelemetryService { tlm_management: telemetry_management_service, - cancellation_token: token.clone() + cancellation_token: token.clone(), }; trace!("Starting gRPC Server"); diff --git a/server/src/http.rs b/server/src/http.rs index 4fbaf1e..e35be4c 100644 --- a/server/src/http.rs +++ b/server/src/http.rs @@ -1,15 +1,15 @@ +use crate::telemetry::{TelemetryDataItem, TelemetryManagementService}; use actix_web::http::header::ContentType; use actix_web::http::StatusCode; -use actix_web::{rt, error, get, web, App, HttpRequest, HttpResponse, HttpServer, Responder}; -use derive_more::{Display, Error}; -use std::sync::Arc; +use actix_web::{error, get, rt, web, App, HttpRequest, HttpResponse, HttpServer, Responder}; use actix_ws::AggregatedMessage; +use derive_more::{Display, Error}; use log::{error, trace}; use serde::{Deserialize, Serialize}; +use std::sync::Arc; use tokio::select; use tokio_util::sync::CancellationToken; use tonic::codegen::tokio_stream::StreamExt; -use crate::telemetry::{TelemetryDataItem, TelemetryManagementService}; #[derive(Debug, Display, Error)] enum UserError { @@ -18,35 +18,36 @@ enum UserError { } impl error::ResponseError for UserError { - fn error_response(&self) -> HttpResponse { - HttpResponse::build(self.status_code()) - .insert_header(ContentType::html()) - .body(self.to_string()) - } fn status_code(&self) -> StatusCode { match *self { UserError::TlmNotFound { .. } => StatusCode::NOT_FOUND, } } + fn error_response(&self) -> HttpResponse { + HttpResponse::build(self.status_code()) + .insert_header(ContentType::html()) + .body(self.to_string()) + } } #[derive(Debug, Clone, Serialize, Deserialize)] enum WebsocketRequest { - RegisterTlmListener { - uuid: String - } + RegisterTlmListener { uuid: String }, } #[derive(Debug, Clone, Serialize, Deserialize)] enum WebsocketResponse { TlmValue { uuid: String, - value: Option - } + value: Option, + }, } #[get("/tlm/{name:[\\w\\d/_-]+}")] -async fn get_tlm_definition(data: web::Data>, name: web::Path) -> Result { +async fn get_tlm_definition( + data: web::Data>, + name: web::Path, +) -> Result { let string = name.to_string(); trace!("get_tlm_definition {}", string); let Some(data) = data.get_by_name(&string).await else { @@ -56,7 +57,12 @@ async fn get_tlm_definition(data: web::Data>, na Ok(web::Json(data.definition.clone())) } -async fn websocket_connect(req: HttpRequest, stream: web::Payload, data: web::Data>, cancel_token: web::Data) -> Result { +async fn websocket_connect( + req: HttpRequest, + stream: web::Payload, + data: web::Data>, + cancel_token: web::Data, +) -> Result { trace!("websocket_connect"); let (res, mut session, stream) = actix_ws::handle(&req, stream)?; @@ -157,11 +163,13 @@ async fn websocket_connect(req: HttpRequest, stream: web::Payload, data: web::Da } fn setup_api(cfg: &mut web::ServiceConfig) { - cfg - .service(get_tlm_definition); + cfg.service(get_tlm_definition); } -pub async fn setup(cancellation_token: CancellationToken, telemetry_definitions: Arc) -> Result<(), Box> { +pub async fn setup( + cancellation_token: CancellationToken, + telemetry_definitions: Arc, +) -> Result<(), Box> { let data = web::Data::new(telemetry_definitions); let cancel_token = web::Data::new(cancellation_token); @@ -173,8 +181,9 @@ pub async fn setup(cancellation_token: CancellationToken, telemetry_definitions: .route("/ws", web::get().to(websocket_connect)) .service(web::scope("/api").configure(setup_api)) }) - .bind("localhost:8080")? - .run().await?; + .bind("localhost:8080")? + .run() + .await?; Ok(()) } diff --git a/server/src/lib.rs b/server/src/lib.rs index 6effe31..782a9a9 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -1,7 +1,7 @@ -mod uuid; mod grpc; mod http; mod telemetry; +mod uuid; pub mod core { tonic::include_proto!("core"); @@ -33,4 +33,3 @@ pub async fn setup() -> Result<(), Box> { Ok(()) } - diff --git a/server/src/main.rs b/server/src/main.rs index 17724e8..84e0c26 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -5,9 +5,8 @@ use std::str::FromStr; async fn main() -> Result<(), Box> { let log_file = env::var("LOG_FILE"); let log_level = match env::var("LOG_LEVEL") { - Ok(log_level) => log::LevelFilter::from_str(&log_level) - .unwrap_or(log::LevelFilter::Info), - Err(_) => log::LevelFilter::Info + Ok(log_level) => log::LevelFilter::from_str(&log_level).unwrap_or(log::LevelFilter::Info), + Err(_) => log::LevelFilter::Info, }; let mut log_config = fern::Dispatch::new() diff --git a/server/src/telemetry.rs b/server/src/telemetry.rs index e7d7473..980b0e5 100644 --- a/server/src/telemetry.rs +++ b/server/src/telemetry.rs @@ -1,14 +1,20 @@ +use crate::core::{TelemetryDataType, TelemetryDefinitionRequest, Uuid}; +use log::trace; +use serde::de::Visitor; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; use std::collections::HashMap; use std::error::Error; use std::fmt::Formatter; use std::sync::Arc; -use log::trace; -use serde::de::Visitor; -use serde::{Deserialize, Deserializer, Serialize, Serializer}; use tokio::sync::Mutex; -use crate::core::{TelemetryDataType, TelemetryDefinitionRequest, Uuid}; -fn tlm_data_type_serialzier(tlm_data_type: &TelemetryDataType, serializer: S) -> Result where S: Serializer { +fn tlm_data_type_serialzier( + tlm_data_type: &TelemetryDataType, + serializer: S, +) -> Result +where + S: Serializer, +{ serializer.serialize_str(tlm_data_type.as_str_name()) } @@ -29,7 +35,10 @@ impl Visitor<'_> for TlmDataTypeVisitor { } } -fn tlm_data_type_deserialzier<'de, D>(deserializer: D) -> Result where D: Deserializer<'de> { +fn tlm_data_type_deserialzier<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ deserializer.deserialize_str(TlmDataTypeVisitor) } @@ -51,7 +60,7 @@ pub enum TelemetryDataValue { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TelemetryDataItem { pub value: TelemetryDataValue, - pub timestamp: String + pub timestamp: String, } #[derive(Clone)] @@ -73,12 +82,15 @@ impl TelemetryManagementService { } } - pub async fn register(&self, telemetry_definition_request: TelemetryDefinitionRequest) -> Result> { + pub async fn register( + &self, + telemetry_definition_request: TelemetryDefinitionRequest, + ) -> Result> { let mut lock = self.uuid_mapping.lock().await; if let Some(uuid) = lock.get(&telemetry_definition_request.name) { trace!("Telemetry Definition Found {:?}", uuid); let tlm_lock = self.tlm_mapping.lock().await; - if let Some(TelemetryData { definition, ..}) = tlm_lock.get(uuid) { + if let Some(TelemetryData { definition, .. }) = tlm_lock.get(uuid) { if definition.data_type != telemetry_definition_request.data_type() { return Err("A telemetry item of the same name already exists".into()); } @@ -87,18 +99,24 @@ impl TelemetryManagementService { Err("Could not find Telemetry Data".into()) } } else { - trace!("Adding New Telemetry Definition {:?}", telemetry_definition_request); + trace!( + "Adding New Telemetry Definition {:?}", + telemetry_definition_request + ); let mut tlm_lock = self.tlm_mapping.lock().await; let uuid = Uuid::random().value; lock.insert(telemetry_definition_request.name.clone(), uuid.clone()); - tlm_lock.insert(uuid.clone(), TelemetryData { - definition: TelemetryDefinition { - uuid: uuid.clone(), - name: telemetry_definition_request.name.clone(), - data_type: telemetry_definition_request.data_type(), + tlm_lock.insert( + uuid.clone(), + TelemetryData { + definition: TelemetryDefinition { + uuid: uuid.clone(), + name: telemetry_definition_request.name.clone(), + data_type: telemetry_definition_request.data_type(), + }, + data: tokio::sync::watch::channel(None).0, }, - data: tokio::sync::watch::channel(None).0 - }); + ); Ok(uuid) } } diff --git a/server/src/uuid.rs b/server/src/uuid.rs index c6abaa7..1c9aacb 100644 --- a/server/src/uuid.rs +++ b/server/src/uuid.rs @@ -1,9 +1,9 @@ +use crate::core::Uuid; use rand::RngCore; -use crate::core::{Uuid}; impl Uuid { pub fn random() -> Self { - let mut uuid =[0u8; 16]; + let mut uuid = [0u8; 16]; rand::thread_rng().fill_bytes(&mut uuid); Self { value: hex::encode(uuid),