use crate::core::telemetry_service_server::{TelemetryService, TelemetryServiceServer}; use crate::core::telemetry_value::Value; use crate::core::{ TelemetryDataType, TelemetryDefinitionRequest, TelemetryDefinitionResponse, TelemetryInsertResponse, TelemetryItem, TelemetryValue, Uuid, }; use crate::telemetry::data_item::TelemetryDataItem; use crate::telemetry::data_value::TelemetryDataValue; use crate::telemetry::management_service::TelemetryManagementService; use chrono::{DateTime, SecondsFormat}; use log::{error, trace}; use std::error::Error; use std::pin::Pin; use std::sync::Arc; use tokio::select; use tokio::sync::mpsc; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; use tonic::codegen::tokio_stream::wrappers::ReceiverStream; use tonic::codegen::tokio_stream::{Stream, StreamExt}; use tonic::transport::Server; use tonic::{Request, Response, Status, Streaming}; pub struct CoreTelemetryService { pub tlm_management: Arc, pub cancellation_token: CancellationToken, } #[tonic::async_trait] impl TelemetryService for CoreTelemetryService { async fn new_telemetry( &self, request: Request, ) -> Result, Status> { trace!("CoreTelemetryService::new_telemetry"); self.tlm_management .register(request.into_inner()) .map(|uuid| { Response::new(TelemetryDefinitionResponse { uuid: Some(Uuid { value: uuid }), }) }) .map_err(|err| Status::already_exists(err.to_string())) } type InsertTelemetryStream = Pin> + Send>>; async fn insert_telemetry( &self, request: Request>, ) -> Result, Status> { trace!("CoreTelemetryService::insert_telemetry"); let cancel_token = self.cancellation_token.clone(); let tlm_management = self.tlm_management.clone(); let mut in_stream = request.into_inner(); let (tx, rx) = mpsc::channel(128); tokio::spawn(async move { loop { select! { _ = cancel_token.cancelled() => { break; }, Some(message) = in_stream.next() => { match message { Ok(tlm_item) => { tx .send(Self::handle_new_tlm_item(&tlm_management, &tlm_item)) .await .expect("working rx"); } Err(err) => { let _ = tx.send(Err(err)).await; } } }, else => break, } } }); Ok(Response::new(Box::pin(ReceiverStream::new(rx)))) } } impl CoreTelemetryService { fn handle_new_tlm_item( tlm_management: &Arc, tlm_item: &TelemetryItem, ) -> Result { trace!("CoreTelemetryService::handle_new_tlm_item {:?}", tlm_item); let Some(ref uuid) = tlm_item.uuid else { return Err(Status::failed_precondition("UUID Missing")); }; let tlm_management_pin = tlm_management.pin(); let Some(tlm_data) = tlm_management_pin.get_by_uuid(&uuid.value) else { return Err(Status::not_found("Telemetry Item Not Found")); }; let Some(TelemetryValue { value: Some(value) }) = tlm_item.value else { return Err(Status::failed_precondition("Value Missing")); }; let Some(timestamp) = tlm_item.timestamp else { return Err(Status::failed_precondition("Timestamp Missing")); }; let expected_type = match value { Value::Float32(_) => TelemetryDataType::Float32, Value::Float64(_) => TelemetryDataType::Float64, }; if expected_type != tlm_data.data.definition.data_type { return Err(Status::failed_precondition("Data Type Mismatch")); }; let Some(timestamp) = DateTime::from_timestamp(timestamp.secs, timestamp.nanos as u32) else { return Err(Status::invalid_argument("Failed to construct UTC DateTime")); }; let value = match value { Value::Float32(x) => TelemetryDataValue::Float32(x), Value::Float64(x) => TelemetryDataValue::Float64(x), }; let _ = tlm_data.data.data.send_replace(Some(TelemetryDataItem { value: value.clone(), timestamp: timestamp.to_rfc3339_opts(SecondsFormat::Millis, true), })); tlm_data.history.insert(tlm_management.history_service(), value, timestamp); Ok(TelemetryInsertResponse {}) } } pub fn setup( token: CancellationToken, telemetry_management_service: Arc, ) -> Result, Box> { let addr = "[::1]:50051".parse()?; Ok(tokio::spawn(async move { let tlm_service = CoreTelemetryService { tlm_management: telemetry_management_service, cancellation_token: token.clone(), }; trace!("Starting gRPC Server"); let result = Server::builder() .add_service(TelemetryServiceServer::new(tlm_service)) .serve_with_shutdown(addr, token.cancelled_owned()) .await; if let Err(err) = result { error!("gRPC Server Encountered An Error: {err}"); } })) }