fixes formatting and lints

This commit is contained in:
2024-12-27 11:05:47 -05:00
parent 147d1abaf8
commit 12f27bad69
11 changed files with 281 additions and 210 deletions

View File

@@ -1,20 +1,23 @@
use crate::core::telemetry_service_server::{TelemetryService, TelemetryServiceServer};
use crate::core::telemetry_value::Value;
use crate::core::{
TelemetryDataType, TelemetryDefinitionRequest, TelemetryDefinitionResponse,
TelemetryInsertResponse, TelemetryItem, TelemetryValue, Uuid,
};
use crate::telemetry::{TelemetryDataItem, TelemetryDataValue, TelemetryManagementService};
use chrono::{DateTime, SecondsFormat};
use log::{error, trace};
use std::error::Error;
use std::pin::Pin;
use std::sync::Arc;
use chrono::{DateTime, SecondsFormat};
use log::{error, trace};
use tokio::select;
use tokio::sync::mpsc;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tonic::{Request, Response, Status, Streaming};
use tonic::codegen::tokio_stream::{Stream, StreamExt};
use tonic::codegen::tokio_stream::wrappers::ReceiverStream;
use tonic::codegen::tokio_stream::{Stream, StreamExt};
use tonic::transport::Server;
use crate::core::telemetry_service_server::{TelemetryService, TelemetryServiceServer};
use crate::core::{TelemetryDataType, TelemetryDefinitionRequest, TelemetryDefinitionResponse, TelemetryInsertResponse, TelemetryItem, TelemetryValue, Uuid};
use crate::core::telemetry_value::Value;
use crate::telemetry::{TelemetryDataItem, TelemetryDataValue, TelemetryManagementService};
use tonic::{Request, Response, Status, Streaming};
pub struct CoreTelemetryService {
pub tlm_management: Arc<TelemetryManagementService>,
@@ -23,24 +26,29 @@ pub struct CoreTelemetryService {
#[tonic::async_trait]
impl TelemetryService for CoreTelemetryService {
async fn new_telemetry(&self, request: Request<TelemetryDefinitionRequest>) -> Result<Response<TelemetryDefinitionResponse>, Status> {
async fn new_telemetry(
&self,
request: Request<TelemetryDefinitionRequest>,
) -> Result<Response<TelemetryDefinitionResponse>, Status> {
trace!("CoreTelemetryService::new_telemetry");
self.tlm_management.register(request.into_inner()).await
.map(|uuid|
self.tlm_management
.register(request.into_inner())
.await
.map(|uuid| {
Response::new(TelemetryDefinitionResponse {
uuid: Some(Uuid {
value: uuid
}),
uuid: Some(Uuid { value: uuid }),
})
)
.map_err(|err|
Status::already_exists(err.to_string())
)
})
.map_err(|err| Status::already_exists(err.to_string()))
}
type InsertTelemetryStream = Pin<Box<dyn Stream<Item = Result<TelemetryInsertResponse, Status>> + Send>>;
type InsertTelemetryStream =
Pin<Box<dyn Stream<Item = Result<TelemetryInsertResponse, Status>> + Send>>;
async fn insert_telemetry(&self, request: Request<Streaming<TelemetryItem>>) -> Result<Response<Self::InsertTelemetryStream>, Status> {
async fn insert_telemetry(
&self,
request: Request<Streaming<TelemetryItem>>,
) -> Result<Response<Self::InsertTelemetryStream>, Status> {
trace!("CoreTelemetryService::insert_telemetry");
let cancel_token = self.cancellation_token.clone();
@@ -77,7 +85,10 @@ impl TelemetryService for CoreTelemetryService {
}
impl CoreTelemetryService {
async fn handle_new_tlm_item(tlm_management: &Arc<TelemetryManagementService>, tlm_item: &TelemetryItem) -> Result<TelemetryInsertResponse, Status> {
async fn handle_new_tlm_item(
tlm_management: &Arc<TelemetryManagementService>,
tlm_item: &TelemetryItem,
) -> Result<TelemetryInsertResponse, Status> {
trace!("CoreTelemetryService::handle_new_tlm_item {:?}", tlm_item);
let Some(ref uuid) = tlm_item.uuid else {
return Err(Status::failed_precondition("UUID Missing"));
@@ -102,7 +113,8 @@ impl CoreTelemetryService {
return Err(Status::failed_precondition("Data Type Mismatch"));
};
let Some(timestamp) = DateTime::from_timestamp(timestamp.secs, timestamp.nanos as u32) else {
let Some(timestamp) = DateTime::from_timestamp(timestamp.secs, timestamp.nanos as u32)
else {
return Err(Status::invalid_argument("Failed to construct UTC DateTime"));
};
@@ -111,19 +123,22 @@ impl CoreTelemetryService {
Value::Float32(x) => TelemetryDataValue::Float32(x),
Value::Float64(x) => TelemetryDataValue::Float64(x),
},
timestamp: timestamp.to_rfc3339_opts(SecondsFormat::Millis, true)
timestamp: timestamp.to_rfc3339_opts(SecondsFormat::Millis, true),
}));
Ok(TelemetryInsertResponse {})
}
}
pub fn setup(token: CancellationToken, telemetry_management_service: Arc<TelemetryManagementService>) -> Result<JoinHandle<()>, Box<dyn Error>> {
pub fn setup(
token: CancellationToken,
telemetry_management_service: Arc<TelemetryManagementService>,
) -> Result<JoinHandle<()>, Box<dyn Error>> {
let addr = "[::1]:50051".parse()?;
Ok(tokio::spawn(async move {
let tlm_service = CoreTelemetryService {
tlm_management: telemetry_management_service,
cancellation_token: token.clone()
cancellation_token: token.clone(),
};
trace!("Starting gRPC Server");