diff --git a/Cargo.lock b/Cargo.lock index f9d8b1c..3bfff90 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1512,6 +1512,7 @@ dependencies = [ "tokio-util", "tonic", "tonic-build", + "uuid", ] [[package]] @@ -1898,6 +1899,15 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "uuid" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8c5f0a0af699448548ad1a2fbf920fb4bee257eae39953ba95cb84891a0446a" +dependencies = [ + "getrandom", +] + [[package]] name = "version_check" version = "0.9.5" diff --git a/frontend/src/composables/telemetry.ts b/frontend/src/composables/telemetry.ts index ce13cec..c0626e5 100644 --- a/frontend/src/composables/telemetry.ts +++ b/frontend/src/composables/telemetry.ts @@ -16,7 +16,7 @@ export function useTelemetry(name: MaybeRefOrGetter) { const name_value = toValue(name); try { - const res = await fetch(`/api/tlm/${name_value}`); + const res = await fetch(`/api/tlm/info/${name_value}`); data.value = await res.json(); error.value = null; } catch (e) { diff --git a/server/Cargo.toml b/server/Cargo.toml index a442ed5..450df0e 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -23,6 +23,7 @@ papaya = "0.1.7" thiserror = "2.0.9" derive_more = { version = "1.0.0", features = ["from"] } anyhow = "1.0.95" +uuid = { version = "1.11.0", features = ["v4"] } [build-dependencies] tonic-build = "0.12.3" diff --git a/server/src/grpc.rs b/server/src/grpc.rs index 378e394..2097090 100644 --- a/server/src/grpc.rs +++ b/server/src/grpc.rs @@ -94,7 +94,8 @@ impl CoreTelemetryService { let Some(ref uuid) = tlm_item.uuid else { return Err(Status::failed_precondition("UUID Missing")); }; - let Some(tlm_data) = tlm_management.get_by_uuid(&uuid.value) else { + let tlm_management_pin = tlm_management.pin(); + let Some(tlm_data) = tlm_management_pin.get_by_uuid(&uuid.value) else { return Err(Status::not_found("Telemetry Item Not Found")); }; @@ -110,7 +111,7 @@ impl CoreTelemetryService { Value::Float32(_) => TelemetryDataType::Float32, Value::Float64(_) => TelemetryDataType::Float64, }; - if expected_type != tlm_data.definition.data_type { + if expected_type != tlm_data.data.definition.data_type { return Err(Status::failed_precondition("Data Type Mismatch")); }; @@ -119,13 +120,15 @@ impl CoreTelemetryService { return Err(Status::invalid_argument("Failed to construct UTC DateTime")); }; - let _ = tlm_data.data.send_replace(Some(TelemetryDataItem { - value: match value { - Value::Float32(x) => TelemetryDataValue::Float32(x), - Value::Float64(x) => TelemetryDataValue::Float64(x), - }, + let value = match value { + Value::Float32(x) => TelemetryDataValue::Float32(x), + Value::Float64(x) => TelemetryDataValue::Float64(x), + }; + let _ = tlm_data.data.data.send_replace(Some(TelemetryDataItem { + value: value.clone(), timestamp: timestamp.to_rfc3339_opts(SecondsFormat::Millis, true), })); + tlm_data.history.insert(tlm_management.history_service(), value, timestamp); Ok(TelemetryInsertResponse {}) } diff --git a/server/src/http/api/mod.rs b/server/src/http/api/mod.rs index 4d5f88c..98e6cbb 100644 --- a/server/src/http/api/mod.rs +++ b/server/src/http/api/mod.rs @@ -1,10 +1,12 @@ use crate::http::error::HttpServerResultError; use crate::telemetry::management_service::TelemetryManagementService; use actix_web::{get, web, Responder}; -use log::trace; +use log::{info, trace}; use std::sync::Arc; +use chrono::{DateTime, TimeDelta, Utc}; +use serde::Deserialize; -#[get("/tlm/{name:[\\w\\d/_-]+}")] +#[get("/tlm/info/{name:[\\w\\d/_-]+}")] async fn get_tlm_definition( data: web::Data>, name: web::Path, @@ -12,12 +14,48 @@ async fn get_tlm_definition( let string = name.to_string(); trace!("get_tlm_definition {}", string); let Some(data) = data.get_by_name(&string) else { - return Err(HttpServerResultError::TlmNotFound { tlm: string }); + return Err(HttpServerResultError::TlmNameNotFound { tlm: string }); }; Ok(web::Json(data.definition.clone())) } -pub fn setup_api(cfg: &mut web::ServiceConfig) { - cfg.service(get_tlm_definition); +#[derive(Deserialize)] +struct HistoryQuery { + from: String, + to: String, + resolution: i64 +} + +#[get("/tlm/history/{uuid:[0-9a-f]+}")] +async fn get_tlm_history( + data: web::Data>, + uuid: web::Path, + info: web::Query, +) -> Result { + let uuid = uuid.to_string(); + trace!("get_tlm_history {} from {} to {} resolution {}", uuid, info.from, info.to, info.resolution); + + let Ok(from) = (&info.from).parse::>() else { + return Err(HttpServerResultError::InvalidDateTime { date_time: info.from.clone() }); + }; + let Ok(to) = (&info.to).parse::>() else { + return Err(HttpServerResultError::InvalidDateTime { date_time: info.to.clone() }); + }; + let maximum_resolution = TimeDelta::milliseconds(info.resolution); + info!("get_tlm_history {} from {} to {} resolution {}", uuid, from, to, maximum_resolution); + + let data = data.pin(); + match data.get_by_uuid(&uuid) { + None => Err(HttpServerResultError::TlmUuidNotFound { uuid }), + Some(tlm) => { + Ok(web::Json(tlm.history.get(from, to, maximum_resolution))) + } + } +} + +pub fn setup_api(cfg: &mut web::ServiceConfig) { + cfg + .service(get_tlm_definition) + .service(get_tlm_history); } diff --git a/server/src/http/error.rs b/server/src/http/error.rs index 4168e6c..fb1976b 100644 --- a/server/src/http/error.rs +++ b/server/src/http/error.rs @@ -9,14 +9,20 @@ pub enum WebsocketResponseError {} #[derive(Error, Debug)] pub enum HttpServerResultError { - #[error("Telemetry Not Found: {tlm}")] - TlmNotFound { tlm: String }, + #[error("Telemetry Name Not Found: {tlm}")] + TlmNameNotFound { tlm: String }, + #[error("Telemetry Uuid Not Found: {uuid}")] + TlmUuidNotFound { uuid: String }, + #[error("DateTime Parsing Error: {date_time}")] + InvalidDateTime { date_time: String }, } impl ResponseError for HttpServerResultError { fn status_code(&self) -> StatusCode { match *self { - HttpServerResultError::TlmNotFound { .. } => StatusCode::NOT_FOUND, + HttpServerResultError::TlmNameNotFound { .. } => StatusCode::NOT_FOUND, + HttpServerResultError::TlmUuidNotFound { .. } => StatusCode::NOT_FOUND, + HttpServerResultError::InvalidDateTime { .. } => StatusCode::BAD_REQUEST, } } fn error_response(&self) -> HttpResponse { diff --git a/server/src/lib.rs b/server/src/lib.rs index 966b38d..441fb4f 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -11,6 +11,7 @@ use crate::telemetry::management_service::TelemetryManagementService; use std::error::Error; use std::sync::Arc; use tokio_util::sync::CancellationToken; +use crate::telemetry::history::TelemetryHistoryService; pub async fn setup() -> Result<(), Box> { let cancellation_token = CancellationToken::new(); @@ -22,7 +23,9 @@ pub async fn setup() -> Result<(), Box> { }); } - let tlm = Arc::new(TelemetryManagementService::new()); + let tlm = Arc::new(TelemetryManagementService::new( + TelemetryHistoryService::new() + )); let grpc_server = grpc::setup(cancellation_token.clone(), tlm.clone())?; diff --git a/server/src/telemetry/data.rs b/server/src/telemetry/data.rs index 7ca7b0d..228c795 100644 --- a/server/src/telemetry/data.rs +++ b/server/src/telemetry/data.rs @@ -1,8 +1,14 @@ use crate::telemetry::data_item::TelemetryDataItem; use crate::telemetry::definition::TelemetryDefinition; +use crate::telemetry::history::TelemetryHistory; #[derive(Clone)] pub struct TelemetryData { pub definition: TelemetryDefinition, pub data: tokio::sync::watch::Sender>, } + +pub struct TelemetryDataHistory { + pub data: TelemetryData, + pub history: TelemetryHistory, +} diff --git a/server/src/telemetry/history.rs b/server/src/telemetry/history.rs new file mode 100644 index 0000000..cb036a5 --- /dev/null +++ b/server/src/telemetry/history.rs @@ -0,0 +1,195 @@ +use crate::telemetry::data_value::TelemetryDataValue; +use chrono::{DateTime, SecondsFormat, TimeDelta, Utc}; +use std::collections::VecDeque; +use std::sync::RwLock; +use log::error; +use crate::telemetry::data_item::TelemetryDataItem; + +struct SegmentData { + values: Vec, + timestamps: Vec> +} + +struct HistorySegment { + start: DateTime, + end: DateTime, + data: RwLock, +} + +impl HistorySegment { + fn new(start: DateTime, end: DateTime) -> Self { + Self { + start, + end, + data: RwLock::new(SegmentData { + values: vec![], + timestamps: vec![], + }), + } + } + + fn insert(&self, value: TelemetryDataValue, timestamp: DateTime) { + if timestamp < self.start || timestamp >= self.end { + return; + } + + let mut data = self.data.write().unwrap_or_else(|err| { + error!("HistorySegment::insert - data was poisoned: {}", err); + let lock = err.into_inner(); + self.data.clear_poison(); + lock + }); + + // Find the point where we should insert this item + let index = data.timestamps.partition_point(|item| item <= ×tamp); + // Insert the item + data.timestamps.insert(index, timestamp); + data.values.insert(index, value); + } + + fn get(&self, from: DateTime, to: DateTime, maximum_resolution: TimeDelta) -> (DateTime, Vec) { + let mut result = vec![]; + + let mut next_from = from; + + if from < self.end && self.start < to { // If there is overlap with the range + let data = self.data.read().unwrap_or_else(|err| { + error!("HistorySegment::get - data was poisoned: {}", err); + let lock = err.into_inner(); + self.data.clear_poison(); + lock + }); + + let start = data.timestamps.partition_point(|x| x < &from); + let end = data.timestamps.partition_point(|x| x < &to); + + if start < data.timestamps.len() { + let t = data.timestamps[start]; + result.push(TelemetryDataItem { + value: data.values[start].clone(), + timestamp: t.to_rfc3339_opts(SecondsFormat::Millis, true), + }); + for i in (start + 1)..end { + let t = data.timestamps[i]; + if t >= next_from { + let time_since_next_from = t - next_from; + next_from = match (time_since_next_from.num_nanoseconds(), maximum_resolution.num_nanoseconds()) { + (Some(nanos_since_next_from), Some(maximum_resolution_nanos)) => { + let nanos_since_next_from = nanos_since_next_from as u64; + let maximum_resolution_nanos = maximum_resolution_nanos as u64; + let num_steps = nanos_since_next_from.div_ceil(maximum_resolution_nanos); + let subsec_nanos = maximum_resolution.subsec_nanos() as u64; + // This will break once this can't be represented in 2^63 nanoseconds (over 200 years) + let seconds = ((maximum_resolution.num_seconds() as u64) * num_steps) as i64; + let nanos = (num_steps * subsec_nanos) as i64; + next_from + TimeDelta::seconds(seconds) + TimeDelta::nanoseconds(nanos) + }, + _ => t + maximum_resolution, // If there is a gap so big it can't be represented in 2^63 nanoseconds (over 200 years) just skip forward + }; + result.push(TelemetryDataItem { + value: data.values[i].clone(), + timestamp: t.to_rfc3339_opts(SecondsFormat::Millis, true), + }); + } + } + } + } + + (next_from, result) + } +} + +pub struct TelemetryHistory { + segments: RwLock>, +} + +impl TelemetryHistory { + + pub fn new() -> Self { + Self { + segments: RwLock::new(VecDeque::new()), + } + } + + pub fn insert(&self, service: &TelemetryHistoryService, value: TelemetryDataValue, timestamp: DateTime) { + let segments = self.segments.read().unwrap_or_else(|err| { + error!("TelemetryHistory::insert - segments was poisoned: {}", err); + let lock = err.into_inner(); + self.segments.clear_poison(); + lock + }); + + let segments = if segments.is_empty() || segments[segments.len() - 1].end < timestamp { + // We want to insert something that doesn't fit into our history + drop(segments); + let mut segments = self.segments.write().unwrap_or_else(|err| { + error!("TelemetryHistory::insert - segments was poisoned: {}", err); + let lock = err.into_inner(); + self.segments.clear_poison(); + lock + }); + + if segments.len() == 0 { + segments.push_back(HistorySegment::new(timestamp, timestamp + service.segment_width)); + } else { + while segments[segments.len() - 1].end < timestamp { + if segments.len() == service.max_segments { + let _ = segments.pop_front(); + } + let start_time = segments[segments.len() - 1].end; + segments.push_back(HistorySegment::new(start_time, start_time + service.segment_width)); + } + } + + drop(segments); + self.segments.read().unwrap_or_else(|err| { + error!("TelemetryHistory::insert - segments was poisoned: {}", err); + let lock = err.into_inner(); + self.segments.clear_poison(); + lock + }) + } else { + segments + }; + + // Get the index of the first segment which has an end time AFTER the above timestamp + let segment_index = segments.partition_point(|segment| segment.end < timestamp); + + segments[segment_index].insert(value, timestamp); + } + + pub fn get(&self, from: DateTime, to: DateTime, maximum_resolution: TimeDelta) -> Vec { + let mut result = vec![]; + + let segments = self.segments.read().unwrap_or_else(|err| { + error!("TelemetryHistory::get - segments was poisoned: {}", err); + let lock = err.into_inner(); + self.segments.clear_poison(); + lock + }); + + let mut from = from; + + for i in 0..segments.len() { + let (new_from, new_data) = segments[i].get(from, to, maximum_resolution); + from = new_from; + result.extend(new_data); + } + + result + } +} + +pub struct TelemetryHistoryService { + segment_width: TimeDelta, + max_segments: usize +} + +impl TelemetryHistoryService { + pub fn new() -> Self { + Self { + segment_width: TimeDelta::minutes(1), + max_segments: 5 + } + } +} diff --git a/server/src/telemetry/management_service.rs b/server/src/telemetry/management_service.rs index 8708004..7669fbc 100644 --- a/server/src/telemetry/management_service.rs +++ b/server/src/telemetry/management_service.rs @@ -1,19 +1,23 @@ use crate::core::{TelemetryDefinitionRequest, Uuid}; -use crate::telemetry::data::TelemetryData; +use crate::telemetry::data::{TelemetryData, TelemetryDataHistory}; use crate::telemetry::definition::TelemetryDefinition; -use papaya::HashMap; +use papaya::{HashMap, HashMapRef, LocalGuard}; use std::error::Error; +use std::hash::RandomState; +use crate::telemetry::history::{TelemetryHistory, TelemetryHistoryService}; pub struct TelemetryManagementService { uuid_index: HashMap, - tlm_data: HashMap, + tlm_data: HashMap, + telemetry_history_service: TelemetryHistoryService } impl TelemetryManagementService { - pub fn new() -> Self { + pub fn new(telemetry_history_service: TelemetryHistoryService) -> Self { Self { uuid_index: HashMap::new(), tlm_data: HashMap::new(), + telemetry_history_service } } @@ -32,13 +36,16 @@ impl TelemetryManagementService { let _ = tlm_data.try_insert( uuid.clone(), - TelemetryData { - definition: TelemetryDefinition { - uuid: uuid.clone(), - name: telemetry_definition_request.name.clone(), - data_type: telemetry_definition_request.data_type(), + TelemetryDataHistory { + data: TelemetryData { + definition: TelemetryDefinition { + uuid: uuid.clone(), + name: telemetry_definition_request.name.clone(), + data_type: telemetry_definition_request.data_type(), + }, + data: tokio::sync::watch::channel(None).0, }, - data: tokio::sync::watch::channel(None).0, + history: TelemetryHistory::new(), }, ); @@ -53,6 +60,27 @@ impl TelemetryManagementService { pub fn get_by_uuid(&self, uuid: &String) -> Option { let tlm_data = self.tlm_data.pin(); - tlm_data.get(uuid).cloned() + tlm_data.get(uuid).map(|data_history| &data_history.data).cloned() + } + + pub fn pin(&self) -> TelemetryManagementServicePin { + TelemetryManagementServicePin { + tlm_data: self.tlm_data.pin() + } + } + + pub fn history_service(&self) -> &TelemetryHistoryService { + &self.telemetry_history_service } } + +pub struct TelemetryManagementServicePin<'a> { + tlm_data: HashMapRef<'a, String, TelemetryDataHistory, RandomState, LocalGuard<'a>> +} + +impl<'a> TelemetryManagementServicePin<'a> { + pub fn get_by_uuid(&'a self, uuid: &String) -> Option<&'a TelemetryDataHistory> { + self.tlm_data.get(uuid) + } +} + diff --git a/server/src/telemetry/mod.rs b/server/src/telemetry/mod.rs index 3d3e27f..6988350 100644 --- a/server/src/telemetry/mod.rs +++ b/server/src/telemetry/mod.rs @@ -4,3 +4,4 @@ pub mod data_type; pub mod data_value; pub mod definition; pub mod management_service; +pub mod history;