From 9136c5fd710da8b2e451e89285de81442479ecc9 Mon Sep 17 00:00:00 2001 From: Sergey Savelyev Date: Wed, 1 Jan 2025 10:08:50 -0500 Subject: [PATCH] adds saving and loading history to and from disk --- .gitignore | 1 + Cargo.lock | 10 - frontend/.editorconfig | 2 +- frontend/src/components/TelemetryLine.vue | 6 +- frontend/src/graph/line.ts | 30 +- frontend/src/views/HomeView.vue | 18 +- server/Cargo.toml | 3 +- server/src/grpc.rs | 12 +- server/src/http/api/mod.rs | 3 +- server/src/http/mod.rs | 7 +- server/src/lib.rs | 22 +- server/src/main.rs | 2 +- server/src/telemetry/data.rs | 11 +- server/src/telemetry/history.rs | 455 +++++++++++++++++---- server/src/telemetry/management_service.rs | 166 ++++++-- 15 files changed, 602 insertions(+), 146 deletions(-) diff --git a/.gitignore b/.gitignore index cdff3d3..d40f21a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ /target .idea/ +telemetry/ diff --git a/Cargo.lock b/Cargo.lock index 3bfff90..f9d8b1c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1512,7 +1512,6 @@ dependencies = [ "tokio-util", "tonic", "tonic-build", - "uuid", ] [[package]] @@ -1899,15 +1898,6 @@ dependencies = [ "percent-encoding", ] -[[package]] -name = "uuid" -version = "1.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8c5f0a0af699448548ad1a2fbf920fb4bee257eae39953ba95cb84891a0446a" -dependencies = [ - "getrandom", -] - [[package]] name = "version_check" version = "0.9.5" diff --git a/frontend/.editorconfig b/frontend/.editorconfig index ecea360..9c0c7ad 100644 --- a/frontend/.editorconfig +++ b/frontend/.editorconfig @@ -1,6 +1,6 @@ [*.{js,jsx,mjs,cjs,ts,tsx,mts,cts,vue}] charset = utf-8 -indent_size = 2 +indent_size = 4 indent_style = space insert_final_newline = true trim_trailing_whitespace = true diff --git a/frontend/src/components/TelemetryLine.vue b/frontend/src/components/TelemetryLine.vue index 8a953d0..a171d97 100644 --- a/frontend/src/components/TelemetryLine.vue +++ b/frontend/src/components/TelemetryLine.vue @@ -30,14 +30,16 @@ const props = defineProps<{ }>(); const smoothing_distance_x = 5; +const maximum_minimum_separation_live = 100; // ms const text_offset = computed(() => 10); +const min_sep = computed(() => Math.min(props.minimum_separation || 0, maximum_minimum_separation_live)); const { data } = useTelemetry(() => props.data); const websocket = inject>(WEBSOCKET_SYMBOL)!; const value = websocket.value.listen_to_telemetry( data, - props.minimum_separation, + min_sep, ); const graph_data = inject(GRAPH_DATA)!; @@ -84,7 +86,7 @@ watch([value], ([val]) => { x: val_t, y: item_val, } as Point; - memo.value.insert(new_item); + memo.value.insert(new_item, props.minimum_separation); if (item_val < min.value) { min.value = item_val; } diff --git a/frontend/src/graph/line.ts b/frontend/src/graph/line.ts index 116ff11..f211200 100644 --- a/frontend/src/graph/line.ts +++ b/frontend/src/graph/line.ts @@ -34,8 +34,36 @@ export class PointLine { return left + (this.data[left].x < x ? 1 : 0); } - insert(point: Point) { + insert(point: Point, maximum_separation?: number) { const index = this.find_index(point.x); this.data.splice(index, 0, point); + if (maximum_separation !== undefined) { + this.reduce_to_maximum_separation(maximum_separation, [index - 1, index + 1]); + } + } + + reduce_to_maximum_separation(maximum_separation: number, range?: [number, number]) { + if (maximum_separation <= 0) { + return; + } + // Add a default range if it does not exist + range = range || [1, this.data.length - 2]; + // clamp it to the bounds + range = [Math.max(1, range[0]), Math.min(this.data.length - 2, range[1])]; + + // Loop over the indices in the range (backwards so removals don't break anything) + for (let i = range[1]; i >= range[0]; i--) { + const x_previous = this.data[i - 1].x; + const x_current = this.data[i].x; + const x_next = this.data[i + 1].x; + + const separation_before = x_current - x_previous; + const separation_after = x_next - x_current; + + // If the data points are too close on both sides then delete this point + if (separation_before < maximum_separation && separation_after < maximum_separation) { + this.data.splice(i, 1); + } + } } } diff --git a/frontend/src/views/HomeView.vue b/frontend/src/views/HomeView.vue index dd9be54..3b3ea57 100644 --- a/frontend/src/views/HomeView.vue +++ b/frontend/src/views/HomeView.vue @@ -28,40 +28,40 @@ provide(WEBSOCKET_SYMBOL, websocket); :height="400" :border_top_bottom="24" :border_left_right="128" - :duration="60 * 1000" + :duration="60 * 1000 * 10" > diff --git a/server/Cargo.toml b/server/Cargo.toml index 450df0e..3612552 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -11,7 +11,7 @@ log = "0.4.22" prost = "0.13.3" rand = "0.8.5" tonic = { version = "0.12.3" } -tokio = { version = "1.40.0", features = ["rt-multi-thread", "signal"] } +tokio = { version = "1.40.0", features = ["rt-multi-thread", "signal", "fs"] } chrono = "0.4.38" actix-web = "4.9.0" actix-ws = "0.3.0" @@ -23,7 +23,6 @@ papaya = "0.1.7" thiserror = "2.0.9" derive_more = { version = "1.0.0", features = ["from"] } anyhow = "1.0.95" -uuid = { version = "1.11.0", features = ["v4"] } [build-dependencies] tonic-build = "0.12.3" diff --git a/server/src/grpc.rs b/server/src/grpc.rs index 17c4629..a1b69fd 100644 --- a/server/src/grpc.rs +++ b/server/src/grpc.rs @@ -8,8 +8,7 @@ use crate::telemetry::data_item::TelemetryDataItem; use crate::telemetry::data_value::TelemetryDataValue; use crate::telemetry::management_service::TelemetryManagementService; use chrono::{DateTime, SecondsFormat}; -use log::{error, trace}; -use std::error::Error; +use log::{error, info, trace}; use std::pin::Pin; use std::sync::Arc; use tokio::select; @@ -20,6 +19,7 @@ use tonic::codegen::tokio_stream::wrappers::ReceiverStream; use tonic::codegen::tokio_stream::{Stream, StreamExt}; use tonic::transport::Server; use tonic::{Request, Response, Status, Streaming}; +use crate::telemetry::history::TelemetryHistory; pub struct CoreTelemetryService { pub tlm_management: Arc, @@ -128,9 +128,7 @@ impl CoreTelemetryService { value: value.clone(), timestamp: timestamp.to_rfc3339_opts(SecondsFormat::Millis, true), })); - tlm_data - .history - .insert(tlm_management.history_service(), value, timestamp); + TelemetryHistory::insert_sync(tlm_data.clone(), tlm_management.history_service(), value, timestamp); Ok(TelemetryInsertResponse {}) } @@ -139,7 +137,7 @@ impl CoreTelemetryService { pub fn setup( token: CancellationToken, telemetry_management_service: Arc, -) -> Result, Box> { +) -> anyhow::Result> { let addr = "[::1]:50051".parse()?; Ok(tokio::spawn(async move { let tlm_service = CoreTelemetryService { @@ -147,7 +145,7 @@ pub fn setup( cancellation_token: token.clone(), }; - trace!("Starting gRPC Server"); + info!("Starting gRPC Server"); let result = Server::builder() .add_service(TelemetryServiceServer::new(tlm_service)) .serve_with_shutdown(addr, token.cancelled_owned()) diff --git a/server/src/http/api/mod.rs b/server/src/http/api/mod.rs index 313e8f7..d12f2ae 100644 --- a/server/src/http/api/mod.rs +++ b/server/src/http/api/mod.rs @@ -54,10 +54,11 @@ async fn get_tlm_history( }; let maximum_resolution = TimeDelta::milliseconds(info.resolution); + let history_service = data.history_service(); let data = data.pin(); match data.get_by_uuid(&uuid) { None => Err(HttpServerResultError::TlmUuidNotFound { uuid }), - Some(tlm) => Ok(web::Json(tlm.history.get(from, to, maximum_resolution))), + Some(tlm) => Ok(web::Json(tlm.get(from, to, maximum_resolution, &history_service).await)), } } diff --git a/server/src/http/mod.rs b/server/src/http/mod.rs index 427f593..9b84c35 100644 --- a/server/src/http/mod.rs +++ b/server/src/http/mod.rs @@ -6,19 +6,18 @@ use crate::http::api::setup_api; use crate::http::websocket::setup_websocket; use crate::telemetry::management_service::TelemetryManagementService; use actix_web::{web, App, HttpServer}; -use log::trace; -use std::error::Error; +use log::info; use std::sync::Arc; use tokio_util::sync::CancellationToken; pub async fn setup( cancellation_token: CancellationToken, telemetry_definitions: Arc, -) -> Result<(), Box> { +) -> anyhow::Result<()> { let data = web::Data::new(telemetry_definitions); let cancel_token = web::Data::new(cancellation_token); - trace!("Starting HTTP Server"); + info!("Starting HTTP Server"); HttpServer::new(move || { App::new() .app_data(data.clone()) diff --git a/server/src/lib.rs b/server/src/lib.rs index fa1fcc9..8394caf 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -9,11 +9,11 @@ pub mod core { use crate::telemetry::history::TelemetryHistoryService; use crate::telemetry::management_service::TelemetryManagementService; -use std::error::Error; use std::sync::Arc; +use log::error; use tokio_util::sync::CancellationToken; -pub async fn setup() -> Result<(), Box> { +pub async fn setup() -> anyhow::Result<()> { let cancellation_token = CancellationToken::new(); { let cancellation_token = cancellation_token.clone(); @@ -24,15 +24,23 @@ pub async fn setup() -> Result<(), Box> { } let tlm = Arc::new(TelemetryManagementService::new( - TelemetryHistoryService::new(), - )); + TelemetryHistoryService::new()?, + )?); let grpc_server = grpc::setup(cancellation_token.clone(), tlm.clone())?; - let result = http::setup(cancellation_token.clone(), tlm).await; + let result = http::setup(cancellation_token.clone(), tlm.clone()).await; cancellation_token.cancel(); - result?; - grpc_server.await?; + result?; // result is dropped + grpc_server.await?; //grpc server is dropped + drop(cancellation_token); // All cancellation tokens are now dropped + + // Perform cleanup functions - at this point all servers have stopped and we can be sure that cleaning things up is safe + if let Some(tlm) = Arc::into_inner(tlm) { + tlm.cleanup().await?; + } else { + error!("Could not clean up Telemetry Management Service. Arc not released.") + } Ok(()) } diff --git a/server/src/main.rs b/server/src/main.rs index 84e0c26..b9aa770 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -2,7 +2,7 @@ use std::env; use std::str::FromStr; #[tokio::main] -async fn main() -> Result<(), Box> { +async fn main() -> anyhow::Result<()> { let log_file = env::var("LOG_FILE"); let log_level = match env::var("LOG_LEVEL") { Ok(log_level) => log::LevelFilter::from_str(&log_level).unwrap_or(log::LevelFilter::Info), diff --git a/server/src/telemetry/data.rs b/server/src/telemetry/data.rs index 228c795..d2bf1ec 100644 --- a/server/src/telemetry/data.rs +++ b/server/src/telemetry/data.rs @@ -1,6 +1,5 @@ use crate::telemetry::data_item::TelemetryDataItem; use crate::telemetry::definition::TelemetryDefinition; -use crate::telemetry::history::TelemetryHistory; #[derive(Clone)] pub struct TelemetryData { @@ -8,7 +7,11 @@ pub struct TelemetryData { pub data: tokio::sync::watch::Sender>, } -pub struct TelemetryDataHistory { - pub data: TelemetryData, - pub history: TelemetryHistory, +impl From for TelemetryData { + fn from(value: TelemetryDefinition) -> Self { + Self { + definition: value, + data: tokio::sync::watch::channel(None).0 + } + } } diff --git a/server/src/telemetry/history.rs b/server/src/telemetry/history.rs index 95b3d5e..1220268 100644 --- a/server/src/telemetry/history.rs +++ b/server/src/telemetry/history.rs @@ -1,22 +1,56 @@ use crate::telemetry::data_item::TelemetryDataItem; use crate::telemetry::data_value::TelemetryDataValue; -use chrono::{DateTime, SecondsFormat, TimeDelta, Utc}; -use log::error; +use chrono::{DateTime, DurationRound, SecondsFormat, TimeDelta, Utc}; +use log::{error, info}; use std::collections::VecDeque; -use std::sync::RwLock; +use std::{fs, path}; +use std::cmp::min; +use std::io::SeekFrom; +use std::path::PathBuf; +use std::sync::{Arc, RwLock}; +use anyhow::{ensure, Context}; +use tokio::fs::File; +use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; +use tokio::task::JoinHandle; +use crate::core::TelemetryDataType; +use crate::telemetry::data::TelemetryData; +use crate::telemetry::definition::TelemetryDefinition; + +const FOLDER_DURATION: TimeDelta = TimeDelta::hours(1); + +fn update_next_from(time_since_next_from: TimeDelta, maximum_resolution: TimeDelta, t: DateTime, next_from: DateTime) -> DateTime { + match ( + time_since_next_from.num_nanoseconds(), + maximum_resolution.num_nanoseconds(), + ) { + (_, Some(0)) => t, + (Some(nanos_since_next_from), Some(maximum_resolution_nanos)) => { + let nanos_since_next_from = nanos_since_next_from as u64; + let maximum_resolution_nanos = maximum_resolution_nanos as u64; + let num_steps = + nanos_since_next_from.div_ceil(maximum_resolution_nanos); + if num_steps > i32::MAX as u64 { + t + maximum_resolution + } else { + next_from + maximum_resolution * num_steps as i32 + } + } + _ => t + maximum_resolution, // If there is a gap so big it can't be represented in 2^63 nanoseconds (over 200 years) just skip forward + } +} struct SegmentData { values: Vec, timestamps: Vec>, } -struct HistorySegment { +struct HistorySegmentRam { start: DateTime, end: DateTime, data: RwLock, } -impl HistorySegment { +impl HistorySegmentRam { fn new(start: DateTime, end: DateTime) -> Self { Self { start, @@ -34,7 +68,7 @@ impl HistorySegment { } let mut data = self.data.write().unwrap_or_else(|err| { - error!("HistorySegment::insert - data was poisoned: {}", err); + error!("HistorySegmentRam::insert - data was poisoned: {}", err); let lock = err.into_inner(); self.data.clear_poison(); lock @@ -60,38 +94,23 @@ impl HistorySegment { if from < self.end && self.start < to { // If there is overlap with the range let data = self.data.read().unwrap_or_else(|err| { - error!("HistorySegment::get - data was poisoned: {}", err); + error!("HistorySegmentRam::get - data was poisoned: {}", err); let lock = err.into_inner(); self.data.clear_poison(); lock }); let start = data.timestamps.partition_point(|x| x < &from); - let end = data.timestamps.partition_point(|x| x < &to); if start < data.timestamps.len() { - for i in start..end { + for i in start..data.timestamps.len() { let t = data.timestamps[i]; + if t >= self.end { + break; + } if t >= next_from { let time_since_next_from = t - next_from; - next_from = match ( - time_since_next_from.num_nanoseconds(), - maximum_resolution.num_nanoseconds(), - ) { - (_, Some(0)) => t, - (Some(nanos_since_next_from), Some(maximum_resolution_nanos)) => { - let nanos_since_next_from = nanos_since_next_from as u64; - let maximum_resolution_nanos = maximum_resolution_nanos as u64; - let num_steps = - nanos_since_next_from.div_ceil(maximum_resolution_nanos); - if num_steps > i32::MAX as u64 { - t + maximum_resolution - } else { - next_from + maximum_resolution * num_steps as i32 - } - } - _ => t + maximum_resolution, // If there is a gap so big it can't be represented in 2^63 nanoseconds (over 200 years) just skip forward - }; + next_from = update_next_from(time_since_next_from, maximum_resolution, t, next_from); result.push(TelemetryDataItem { value: data.values[i].clone(), timestamp: t.to_rfc3339_opts(SecondsFormat::Millis, true), @@ -105,65 +124,298 @@ impl HistorySegment { } } -pub struct TelemetryHistory { - segments: RwLock>, +struct HistorySegmentDisk { + start: DateTime, + end: DateTime, + length: u64, + file: File } -impl TelemetryHistory { - pub fn new() -> Self { - Self { - segments: RwLock::new(VecDeque::new()), +impl HistorySegmentDisk { + const TIMESTAMP_LENGTH: u64 = 8 + 4; + const HEADER_LENGTH: u64 = Self::TIMESTAMP_LENGTH + Self::TIMESTAMP_LENGTH + 8; + + async fn save_to_disk(mut folder: PathBuf, mut segment: HistorySegmentRam) -> anyhow::Result { + // Get the path for the specific timestamp we want to save to disk + let folder_time = segment.start.duration_trunc(FOLDER_DURATION)?; + folder.push(folder_time.to_rfc3339_opts(SecondsFormat::Secs, true)); + + // Create the necessary folders + fs::create_dir_all(&folder)?; + + let mut file = folder; + file.push(format!("{}.dat", segment.start.to_rfc3339_opts(SecondsFormat::Secs, true))); + + let file = File::create(file).await?; + + let mut result = Self { + start: segment.start, + end: segment.end, + length: 0, + file + }; + + let utc_offset_start = result.start - DateTime::UNIX_EPOCH; + let utc_offset_end = result.end - DateTime::UNIX_EPOCH; + + // Write the segment bounds + result.file.write_i64(utc_offset_start.num_seconds()).await?; + result.file.write_i32(utc_offset_start.subsec_nanos()).await?; + result.file.write_i64(utc_offset_end.num_seconds()).await?; + result.file.write_i32(utc_offset_end.subsec_nanos()).await?; + + let data = segment.data.get_mut().unwrap_or_else(|err| { + error!("HistorySegmentDisk::save_to_disk - data was poisoned: {}", err); + let lock = err.into_inner(); + lock + }); + + ensure!(data.timestamps.len() == data.values.len(), "Invalid Segment Cannot Be Saved to Disk"); + + result.length = data.timestamps.len() as u64; + result.file.write_u64(result.length).await?; + + // Write all the timestamps + for timestamp in &data.timestamps { + let utc_offset = *timestamp - DateTime::UNIX_EPOCH; + result.file.write_i64(utc_offset.num_seconds()).await?; + result.file.write_i32(utc_offset.subsec_nanos()).await?; + } + + // Write all the values + for value in &data.values { + match value { + TelemetryDataValue::Float32(value) => result.file.write_f32(*value).await?, + TelemetryDataValue::Float64(value) => result.file.write_f64(*value).await?, + } + } + + result.file.flush().await?; + Ok(result) + } + + async fn load_to_ram(mut self, telemetry_data_type: TelemetryDataType) -> anyhow::Result { + let mut segment_data = SegmentData { + values: Vec::with_capacity(self.length as usize), + timestamps: Vec::with_capacity(self.length as usize), + }; + + self.file.seek(SeekFrom::Start(Self::HEADER_LENGTH)).await?; + for _ in 0..self.length { + segment_data.timestamps.push(self.read_date_time().await?); + } + for _ in 0..self.length { + segment_data.values.push(self.read_telemetry_item(telemetry_data_type).await?); + } + + Ok(HistorySegmentRam { + start: self.start, + end: self.end, + data: RwLock::new(segment_data), + }) + } + + async fn open(folder: PathBuf, start: DateTime) -> anyhow::Result { + // Get the path for the specific timestamp we want to save to disk + let folder_time = start.duration_trunc(FOLDER_DURATION)?; + let mut file = folder; + file.push(folder_time.to_rfc3339_opts(SecondsFormat::Secs, true)); + file.push(format!("{}.dat", start.to_rfc3339_opts(SecondsFormat::Secs, true))); + + let mut file = File::open(file).await?; + + // Write the segment bounds + let start_seconds = file.read_i64().await?; + let start_nanos = file.read_i32().await?; + let end_seconds = file.read_i64().await?; + let end_nanos = file.read_i32().await?; + let start = TimeDelta::new(start_seconds, start_nanos as u32).context("Failed to reconstruct start TimeDelta")?; + let end = TimeDelta::new(end_seconds, end_nanos as u32).context("Failed to reconstruct end TimeDelta")?; + + let length = file.read_u64().await?; + + Ok(HistorySegmentDisk { + start: DateTime::UNIX_EPOCH + start, + end: DateTime::UNIX_EPOCH + end, + length, + file, + }) + } + + async fn get( + &mut self, + from: DateTime, + to: DateTime, + maximum_resolution: TimeDelta, + telemetry_data_type: TelemetryDataType, + ) -> anyhow::Result<(DateTime, Vec)> { + let mut result = vec![]; + + let mut next_from = from; + + if from < self.end && self.start < to { + let start = self.partition_point(from).await?; + if start < self.length { + for i in start..self.length { + let t = self.get_date_time(i).await?; + if t >= self.end { + break; + } + if t >= next_from { + let time_since_next_from = t - next_from; + next_from = update_next_from(time_since_next_from, maximum_resolution, t, next_from); + result.push(TelemetryDataItem { + value: self.get_telemetry_item(i, telemetry_data_type).await?, + timestamp: t.to_rfc3339_opts(SecondsFormat::Millis, true), + }); + } + } + } + } + + Ok((next_from, result)) + } + + async fn read_date_time(&mut self) -> anyhow::Result> { + let seconds = self.file.read_i64().await?; + let nanos = self.file.read_i32().await?; + let start = TimeDelta::new(seconds, nanos as u32).context("Failed to reconstruct TimeDelta")?; + Ok(DateTime::UNIX_EPOCH + start) + } + + async fn get_date_time(&mut self, index: u64) -> anyhow::Result> { + self.file.seek(SeekFrom::Start(Self::HEADER_LENGTH + index * Self::TIMESTAMP_LENGTH)).await?; + self.read_date_time().await + } + + async fn read_telemetry_item(&mut self, telemetry_data_type: TelemetryDataType) -> anyhow::Result { + match telemetry_data_type { + TelemetryDataType::Float32 => Ok(TelemetryDataValue::Float32(self.file.read_f32().await?)), + TelemetryDataType::Float64 => Ok(TelemetryDataValue::Float64(self.file.read_f64().await?)), } } - pub fn insert( + async fn get_telemetry_item(&mut self, index: u64, telemetry_data_type: TelemetryDataType) -> anyhow::Result { + let item_length = match telemetry_data_type { + TelemetryDataType::Float32 => 4, + TelemetryDataType::Float64 => 8, + }; + self.file.seek(SeekFrom::Start(Self::HEADER_LENGTH + self.length * Self::TIMESTAMP_LENGTH + index * item_length)).await?; + self.read_telemetry_item(telemetry_data_type).await + } + + async fn partition_point(&mut self, date_time: DateTime) -> anyhow::Result { + if self.length == 0 { + return Ok(0); + } + + // left should be too early to insert + // right should be too late to insert + let mut left = 0; + let mut size = self.length; + + while size > 1 { + let half = size / 2; + let mid = left + half; + + let is_less = self.get_date_time(mid).await? < date_time; + if is_less { + left = mid; + } + size -= half; + } + + Ok(left + if self.get_date_time(left).await? < date_time { 1 } else { 0 }) + } +} + +pub struct TelemetryHistory { + pub data: TelemetryData, + segments: tokio::sync::RwLock>, +} + + +impl From for TelemetryHistory { + fn from(value: TelemetryData) -> Self { + Self { + data: value, + segments: tokio::sync::RwLock::new(VecDeque::new()), + } + } +} + +impl From for TelemetryHistory { + fn from(value: TelemetryDefinition) -> Self { + >::into(value).into() + } +} + +impl TelemetryHistory { + fn cleanup_segment(&self, service: &TelemetryHistoryService, history_segment_ram: HistorySegmentRam) -> JoinHandle<()> { + let mut path = service.data_root_folder.clone(); + path.push(&self.data.definition.uuid); + tokio::spawn(async move { + match HistorySegmentDisk::save_to_disk(path, history_segment_ram).await { + // Immediately drop the segment - now that we've saved it to disk we don't need to keep it in memory + Ok(segment) => drop(segment), + Err(err) => { + error!("An error occurred saving telemetry history to disk: {}", err); + } + } + }) + } + + async fn get_disk_segment(&self, service: &TelemetryHistoryService, start: DateTime) -> anyhow::Result { + let mut path = service.data_root_folder.clone(); + path.push(&self.data.definition.uuid); + HistorySegmentDisk::open(path, start).await + } + + async fn create_ram_segment(&self, start: DateTime, service: &TelemetryHistoryService, telemetry_data_type: TelemetryDataType) -> HistorySegmentRam { + let ram = match self.get_disk_segment(service, start).await { + Ok(disk) => disk.load_to_ram(telemetry_data_type).await, + Err(e) => Err(e), + }; + + match ram { + Ok(ram) => ram, + Err(_) => HistorySegmentRam::new( + start, + start + service.segment_width, + ), + } + } + + pub async fn insert( &self, service: &TelemetryHistoryService, value: TelemetryDataValue, timestamp: DateTime, ) { - let segments = self.segments.read().unwrap_or_else(|err| { - error!("TelemetryHistory::insert - segments was poisoned: {}", err); - let lock = err.into_inner(); - self.segments.clear_poison(); - lock - }); + let segments = self.segments.read().await; let segments = if segments.is_empty() || segments[segments.len() - 1].end < timestamp { // We want to insert something that doesn't fit into our history drop(segments); - let mut segments = self.segments.write().unwrap_or_else(|err| { - error!("TelemetryHistory::insert - segments was poisoned: {}", err); - let lock = err.into_inner(); - self.segments.clear_poison(); - lock - }); + let mut segments = self.segments.write().await; if segments.len() == 0 { - segments.push_back(HistorySegment::new( - timestamp, - timestamp + service.segment_width, - )); + let start_time = timestamp.duration_trunc(service.segment_width).unwrap(); + segments.push_back(self.create_ram_segment(start_time, service, self.data.definition.data_type).await); } else { while segments[segments.len() - 1].end < timestamp { if segments.len() == service.max_segments { - let _ = segments.pop_front(); + if let Some(segment) = segments.pop_front() { + let _ = self.cleanup_segment(service, segment); + } } let start_time = segments[segments.len() - 1].end; - segments.push_back(HistorySegment::new( - start_time, - start_time + service.segment_width, - )); + segments.push_back(self.create_ram_segment(start_time, service, self.data.definition.data_type).await); } } drop(segments); - self.segments.read().unwrap_or_else(|err| { - error!("TelemetryHistory::insert - segments was poisoned: {}", err); - let lock = err.into_inner(); - self.segments.clear_poison(); - lock - }) + self.segments.read().await } else { segments }; @@ -174,23 +426,60 @@ impl TelemetryHistory { segments[segment_index].insert(value, timestamp); } - pub fn get( + pub fn insert_sync( + history: Arc, + service: Arc, + value: TelemetryDataValue, + timestamp: DateTime, + ) { + tokio::spawn(async move { + history.insert(&service, value, timestamp).await; + }); + } + + pub async fn get( &self, from: DateTime, to: DateTime, maximum_resolution: TimeDelta, + telemetry_history_service: &TelemetryHistoryService ) -> Vec { let mut result = vec![]; - let segments = self.segments.read().unwrap_or_else(|err| { - error!("TelemetryHistory::get - segments was poisoned: {}", err); - let lock = err.into_inner(); - self.segments.clear_poison(); - lock - }); + let segments = self.segments.read().await; let mut from = from; + { + let first_ram_segment = segments[0].start; + let start = from.duration_trunc(telemetry_history_service.segment_width).unwrap(); + let end = (to + telemetry_history_service.segment_width).duration_trunc(telemetry_history_service.segment_width).unwrap(); + + let end = min(end, first_ram_segment); + + let mut path = telemetry_history_service.data_root_folder.clone(); + path.push(&self.data.definition.uuid); + + let mut start = start; + while start < end { + match self.get_disk_segment(telemetry_history_service, start).await { + Ok(mut disk) => { + match disk.get(from, to, maximum_resolution, self.data.definition.data_type).await { + Ok((new_from, new_data)) => { + from = new_from; + result.extend(new_data); + }, + Err(err) => { + error!("Failed to get from disk segment: {err}"); + } + } + }, + Err(_) => {}, // Ignore errors + } + start += telemetry_history_service.segment_width; + } + } + for i in 0..segments.len() { let (new_from, new_data) = segments[i].get(from, to, maximum_resolution); from = new_from; @@ -199,18 +488,42 @@ impl TelemetryHistory { result } + + pub async fn cleanup(&self, service: &TelemetryHistoryService) -> anyhow::Result<()> { + let mut segments = self.segments.write().await; + + for segment in segments.drain(..) { + self.cleanup_segment(service, segment).await?; + } + + Ok(()) + } } pub struct TelemetryHistoryService { segment_width: TimeDelta, max_segments: usize, + data_root_folder: PathBuf, } impl TelemetryHistoryService { - pub fn new() -> Self { - Self { + pub fn new() -> anyhow::Result { + let result = Self { segment_width: TimeDelta::minutes(1), max_segments: 5, - } + data_root_folder: path::absolute("telemetry")?, + }; + + fs::create_dir_all(&result.data_root_folder)?; + + info!("Recording Telemetry Data to {}", result.data_root_folder.to_string_lossy()); + + Ok(result) + } + + pub fn get_metadata_file(&self) -> PathBuf { + let mut result = self.data_root_folder.clone(); + result.push("metadata.json"); + result } } diff --git a/server/src/telemetry/management_service.rs b/server/src/telemetry/management_service.rs index f027824..386fbd9 100644 --- a/server/src/telemetry/management_service.rs +++ b/server/src/telemetry/management_service.rs @@ -1,30 +1,84 @@ +use std::fs; +use std::fs::File; use crate::core::{TelemetryDefinitionRequest, Uuid}; -use crate::telemetry::data::{TelemetryData, TelemetryDataHistory}; +use crate::telemetry::data::TelemetryData; use crate::telemetry::definition::TelemetryDefinition; use crate::telemetry::history::{TelemetryHistory, TelemetryHistoryService}; use papaya::{HashMap, HashMapRef, LocalGuard}; -use std::error::Error; use std::hash::RandomState; +use std::io::{Read, Write}; +use std::sync::Arc; +use std::time::Duration; +use log::{error, info, warn}; +use tokio::sync::Mutex; +use tokio::time::sleep; + +const RELEASED_ATTEMPTS: usize = 5; pub struct TelemetryManagementService { uuid_index: HashMap, - tlm_data: HashMap, - telemetry_history_service: TelemetryHistoryService, + tlm_data: HashMap>, + telemetry_history_service: Arc, + metadata_file: Arc>, } impl TelemetryManagementService { - pub fn new(telemetry_history_service: TelemetryHistoryService) -> Self { - Self { - uuid_index: HashMap::new(), - tlm_data: HashMap::new(), - telemetry_history_service, + pub fn new(telemetry_history_service: TelemetryHistoryService) -> anyhow::Result { + let metadata_file = telemetry_history_service.get_metadata_file(); + + let uuid_index = HashMap::new(); + let tlm_data = HashMap::new(); + + // TODO: Load metadata from file + match File::open(&metadata_file) { + Ok(mut metadata_file) => { + let uuid_index = uuid_index.pin(); + let tlm_data = tlm_data.pin(); + + // Read all data from the file + let mut data = String::new(); + metadata_file.read_to_string(&mut data)?; + drop(metadata_file); + + // Each line is a separate entry + for line in data.split("\n") { + if line.is_empty() { + // Skip empty lines + continue; + } + // Skip invalid entries + match serde_json::from_str::(line) { + Ok(tlm_def) => { + let _ = uuid_index.insert(tlm_def.name.clone(), tlm_def.uuid.clone()); + let _ = tlm_data.insert(tlm_def.uuid.clone(), Arc::new(tlm_def.into())); + }, + Err(err) => { + error!("Failed to parse metadata entry {err}"); + }, + } + } + } + Err(err) => { + warn!("Failed to open metadata file {err}. Continuing"); + } } + + Ok(Self { + uuid_index, + tlm_data, + telemetry_history_service: Arc::new(telemetry_history_service), + metadata_file: Arc::new(Mutex::new(fs::OpenOptions::new() + .create(true) + .write(true) + .append(true) + .open(metadata_file)?)) + }) } pub fn register( &self, telemetry_definition_request: TelemetryDefinitionRequest, - ) -> Result> { + ) -> anyhow::Result { let uuid_index = self.uuid_index.pin(); let tlm_data = self.tlm_data.pin(); @@ -34,21 +88,47 @@ impl TelemetryManagementService { }) .clone(); - let _ = tlm_data.try_insert( + let inserted = tlm_data.try_insert( uuid.clone(), - TelemetryDataHistory { - data: TelemetryData { - definition: TelemetryDefinition { - uuid: uuid.clone(), - name: telemetry_definition_request.name.clone(), - data_type: telemetry_definition_request.data_type(), - }, - data: tokio::sync::watch::channel(None).0, - }, - history: TelemetryHistory::new(), - }, + Arc::new(TelemetryDefinition { + uuid: uuid.clone(), + name: telemetry_definition_request.name.clone(), + data_type: telemetry_definition_request.data_type(), + }.into()), ); + match inserted { + Ok(newly_inserted) => { + // This data also needs to be written to disk + let file = self.metadata_file.clone(); + let newly_inserted = newly_inserted.data.definition.clone(); + tokio::spawn(async move { + let mut file = file.lock_owned().await; + let newly_inserted = serde_json::to_string(&newly_inserted); + match newly_inserted { + Ok(newly_inserted) => { + if let Err(err) = file.write(newly_inserted.as_bytes()) { + error!("Failed to write TelemetryDefinition to file {err}"); + return; + } + if let Err(err) = file.write("\n".as_bytes()) { + error!("Failed to write newline to file {err}"); + return; + } + if let Err(err) = file.flush() { + error!("Failed to flush file {err}"); + return; + } + }, + Err(err) => { + error!("Failed to serialize TelemetryDefinition {err}"); + }, + } + }); + } + Err(_) => {}, + } + Ok(uuid) } @@ -72,17 +152,51 @@ impl TelemetryManagementService { } } - pub fn history_service(&self) -> &TelemetryHistoryService { - &self.telemetry_history_service + pub fn history_service(&self) -> Arc { + self.telemetry_history_service.clone() + } + + pub async fn cleanup(self) -> anyhow::Result<()> { + info!("Saving Telemetry"); + let history_service = self.telemetry_history_service; + let metadata_file = self.metadata_file; + let tlm_data = self.tlm_data.pin_owned(); + let mut tasks = vec![]; + + for _ in 0..RELEASED_ATTEMPTS { + if Arc::strong_count(&history_service) != 1 { + sleep(Duration::from_secs(1)).await; + } + } + if Arc::strong_count(&history_service) != 1 { + warn!("History Service Has not been released after {RELEASED_ATTEMPTS} attempts!"); + } + + for data_history in tlm_data.values() { + tasks.push(data_history.cleanup(&history_service)); + } + { + if let Some(tlm) = Arc::into_inner(metadata_file) { + tlm.into_inner().sync_all()?; + } else { + error!("Could not close metadata file: still in use") + } + } + + for task in tasks { + task.await?; + } + + Ok(()) } } pub struct TelemetryManagementServicePin<'a> { - tlm_data: HashMapRef<'a, String, TelemetryDataHistory, RandomState, LocalGuard<'a>>, + tlm_data: HashMapRef<'a, String, Arc, RandomState, LocalGuard<'a>>, } impl<'a> TelemetryManagementServicePin<'a> { - pub fn get_by_uuid(&'a self, uuid: &String) -> Option<&'a TelemetryDataHistory> { + pub fn get_by_uuid(&'a self, uuid: &String) -> Option<&'a Arc> { self.tlm_data.get(uuid) } }