use crate::serialization::file_ext::{ReadExt, WriteExt}; use crate::telemetry::data::TelemetryData; use crate::telemetry::data_item::TelemetryDataItem; use crate::telemetry::definition::TelemetryDefinition; use anyhow::{anyhow, ensure, Context}; use api::data_type::DataType; use api::data_value::DataValue; use chrono::{DateTime, DurationRound, SecondsFormat, TimeDelta, Utc}; use log::{error, info}; use std::cmp::min; use std::collections::VecDeque; use std::fs; use std::fs::File; use std::io::{BufReader, BufWriter, Seek, SeekFrom, Write}; use std::path::PathBuf; use std::sync::{Arc, RwLock}; use tokio::task::{spawn_blocking, JoinHandle}; const FOLDER_DURATION: TimeDelta = TimeDelta::hours(1); fn update_next_from( time_since_next_from: TimeDelta, maximum_resolution: TimeDelta, t: DateTime, next_from: DateTime, ) -> DateTime { match ( time_since_next_from.num_nanoseconds(), maximum_resolution.num_nanoseconds(), ) { (_, Some(0)) => t, (Some(nanos_since_next_from), Some(maximum_resolution_nanos)) => { let nanos_since_next_from = nanos_since_next_from as u64; let maximum_resolution_nanos = maximum_resolution_nanos as u64; let num_steps = nanos_since_next_from.div_ceil(maximum_resolution_nanos); if num_steps > i32::MAX as u64 { t + maximum_resolution } else { next_from + maximum_resolution * num_steps as i32 } } _ => t + maximum_resolution, // If there is a gap so big it can't be represented in 2^63 nanoseconds (over 200 years) just skip forward } } struct SegmentData { values: Vec, timestamps: Vec>, } struct HistorySegmentRam { start: DateTime, end: DateTime, data: RwLock, } impl HistorySegmentRam { fn new(start: DateTime, end: DateTime) -> Self { Self { start, end, data: RwLock::new(SegmentData { values: vec![], timestamps: vec![], }), } } fn insert(&self, value: DataValue, timestamp: DateTime) { if timestamp < self.start || timestamp >= self.end { return; } let mut data = self.data.write().unwrap_or_else(|err| { error!("HistorySegmentRam::insert - data was poisoned: {}", err); let lock = err.into_inner(); self.data.clear_poison(); lock }); // Find the point where we should insert this item let index = data.timestamps.partition_point(|item| item <= ×tamp); // Insert the item data.timestamps.insert(index, timestamp); data.values.insert(index, value); } fn get( &self, from: DateTime, to: DateTime, maximum_resolution: TimeDelta, ) -> (DateTime, Vec) { let mut result = vec![]; let mut next_from = from; if from < self.end && self.start < to { // If there is overlap with the range let data = self.data.read().unwrap_or_else(|err| { error!("HistorySegmentRam::get - data was poisoned: {}", err); let lock = err.into_inner(); self.data.clear_poison(); lock }); let start = data.timestamps.partition_point(|x| x < &from); if start < data.timestamps.len() { for i in start..data.timestamps.len() { let t = data.timestamps[i]; if t >= self.end { break; } if t >= next_from { let time_since_next_from = t - next_from; next_from = update_next_from( time_since_next_from, maximum_resolution, t, next_from, ); result.push(TelemetryDataItem { value: data.values[i], timestamp: t.to_rfc3339_opts(SecondsFormat::Millis, true), }); } } } } (next_from, result) } } struct HistorySegmentFile { start: DateTime, end: DateTime, length: u64, file: BufReader, file_position: i64, } impl HistorySegmentFile { const TIMESTAMP_LENGTH: u64 = 8 + 4; const HEADER_LENGTH: u64 = Self::TIMESTAMP_LENGTH + Self::TIMESTAMP_LENGTH + 8; fn save_to_disk(mut folder: PathBuf, mut segment: HistorySegmentRam) -> anyhow::Result { // Get the path for the specific timestamp we want to save to disk let folder_time = segment.start.duration_trunc(FOLDER_DURATION)?; folder.push(folder_time.to_rfc3339_opts(SecondsFormat::Secs, true)); // Create the necessary folders fs::create_dir_all(&folder)?; let mut file = folder; file.push(format!( "{}.dat", segment.start.to_rfc3339_opts(SecondsFormat::Secs, true) )); let mut file = BufWriter::new(File::create(file)?); let utc_offset_start = segment.start - DateTime::UNIX_EPOCH; let utc_offset_end = segment.end - DateTime::UNIX_EPOCH; // Write the segment bounds file.write_data::(utc_offset_start.num_seconds())?; file.write_data::(utc_offset_start.subsec_nanos())?; file.write_data::(utc_offset_end.num_seconds())?; file.write_data::(utc_offset_end.subsec_nanos())?; let data = segment.data.get_mut().unwrap_or_else(|err| { error!( "HistorySegmentDisk::save_to_disk - data was poisoned: {}", err ); err.into_inner() }); ensure!( data.timestamps.len() == data.values.len(), "Invalid Segment Cannot Be Saved to Disk" ); let length = data.timestamps.len() as u64; file.write_data::(length)?; // Write all the timestamps for timestamp in &data.timestamps { let utc_offset = *timestamp - DateTime::UNIX_EPOCH; file.write_data::(utc_offset.num_seconds())?; file.write_data::(utc_offset.subsec_nanos())?; } // Write all the values for value in &data.values { match value { DataValue::Float32(value) => file.write_data::(*value)?, DataValue::Float64(value) => file.write_data::(*value)?, DataValue::Boolean(value) => file.write_data::(*value)?, } } file.flush()?; let mut file = BufReader::new(file.into_inner()?); file.seek(SeekFrom::Start(0))?; Ok(Self { start: segment.start, end: segment.end, length: 0, file, file_position: 0, }) } fn load_to_ram(mut self, telemetry_data_type: DataType) -> anyhow::Result { let mut segment_data = SegmentData { values: Vec::with_capacity(self.length as usize), timestamps: Vec::with_capacity(self.length as usize), }; self.file.seek(SeekFrom::Start(Self::HEADER_LENGTH))?; self.file_position = Self::HEADER_LENGTH as i64; for _ in 0..self.length { segment_data.timestamps.push(self.read_date_time()?); } for _ in 0..self.length { segment_data .values .push(self.read_telemetry_item(telemetry_data_type)?); } Ok(HistorySegmentRam { start: self.start, end: self.end, data: RwLock::new(segment_data), }) } fn open(folder: PathBuf, start: DateTime) -> anyhow::Result { // Get the path for the specific timestamp we want to save to disk let folder_time = start.duration_trunc(FOLDER_DURATION)?; let mut file = folder; file.push(folder_time.to_rfc3339_opts(SecondsFormat::Secs, true)); file.push(format!( "{}.dat", start.to_rfc3339_opts(SecondsFormat::Secs, true) )); let mut file = BufReader::new(File::open(file)?); // Write the segment bounds let start_seconds = file.read_data::()?; let start_nanos = file.read_data::()?; let end_seconds = file.read_data::()?; let end_nanos = file.read_data::()?; let start = TimeDelta::new(start_seconds, start_nanos as u32) .context("Failed to reconstruct start TimeDelta")?; let end = TimeDelta::new(end_seconds, end_nanos as u32) .context("Failed to reconstruct end TimeDelta")?; let length = file.read_data::()?; file.seek(SeekFrom::Start(0))?; Ok(HistorySegmentFile { start: DateTime::UNIX_EPOCH + start, end: DateTime::UNIX_EPOCH + end, length, file, file_position: 0, }) } fn get( &mut self, from: DateTime, to: DateTime, maximum_resolution: TimeDelta, telemetry_data_type: DataType, ) -> anyhow::Result<(DateTime, Vec)> { self.file_position = 0; self.file.seek(SeekFrom::Start(0))?; let mut result = vec![]; let mut next_from = from; if from < self.end && self.start < to { let start = self.partition_point(from)?; if start < self.length { for i in start..self.length { let t = self.get_date_time(i)?; if t >= self.end { break; } if t >= next_from { let time_since_next_from = t - next_from; next_from = update_next_from( time_since_next_from, maximum_resolution, t, next_from, ); result.push(TelemetryDataItem { value: self.get_telemetry_item(i, telemetry_data_type)?, timestamp: t.to_rfc3339_opts(SecondsFormat::Millis, true), }); } } } } Ok((next_from, result)) } fn read_date_time(&mut self) -> anyhow::Result> { let seconds = self.file.read_data::()?; let nanos = self.file.read_data::()?; self.file_position += 8 + 4; let start = TimeDelta::new(seconds, nanos as u32).context("Failed to reconstruct TimeDelta")?; Ok(DateTime::UNIX_EPOCH + start) } fn get_date_time(&mut self, index: u64) -> anyhow::Result> { let desired_position = Self::HEADER_LENGTH + index * Self::TIMESTAMP_LENGTH; let seek_amount = desired_position as i64 - self.file_position; self.file_position += seek_amount; self.file.seek_relative(seek_amount)?; self.read_date_time() } fn read_telemetry_item(&mut self, telemetry_data_type: DataType) -> anyhow::Result { match telemetry_data_type { DataType::Float32 => { self.file_position += 4; Ok(DataValue::Float32(self.file.read_data::()?)) } DataType::Float64 => { self.file_position += 8; Ok(DataValue::Float64(self.file.read_data::()?)) } DataType::Boolean => { self.file_position += 1; Ok(DataValue::Boolean(self.file.read_data::()?)) } } } fn get_telemetry_item( &mut self, index: u64, telemetry_data_type: DataType, ) -> anyhow::Result { let item_length = match telemetry_data_type { DataType::Float32 => 4, DataType::Float64 => 8, DataType::Boolean => 1, }; let desired_position = Self::HEADER_LENGTH + self.length * Self::TIMESTAMP_LENGTH + index * item_length; let seek_amount = desired_position as i64 - self.file_position; self.file_position += seek_amount; self.file.seek_relative(seek_amount)?; self.read_telemetry_item(telemetry_data_type) } fn partition_point(&mut self, date_time: DateTime) -> anyhow::Result { if self.length == 0 { return Ok(0); } // left should be too early to insert // right should be too late to insert let mut left = 0; let mut size = self.length; while size > 1 { let half = size / 2; let mid = left + half; let is_less = self.get_date_time(mid)? < date_time; if is_less { left = mid; } size -= half; } Ok(left + if self.get_date_time(left)? < date_time { 1 } else { 0 }) } } pub struct TelemetryHistory { pub data: TelemetryData, segments: tokio::sync::RwLock>, } impl From for TelemetryHistory { fn from(value: TelemetryData) -> Self { Self { data: value, segments: tokio::sync::RwLock::new(VecDeque::new()), } } } impl From for TelemetryHistory { fn from(value: TelemetryDefinition) -> Self { >::into(value).into() } } impl TelemetryHistory { fn cleanup_segment( &self, service: &TelemetryHistoryService, history_segment_ram: HistorySegmentRam, ) -> JoinHandle<()> { let mut path = service.data_root_folder.clone(); path.push(self.data.definition.uuid.as_hyphenated().to_string()); spawn_blocking(move || { match HistorySegmentFile::save_to_disk(path, history_segment_ram) { // Immediately drop the segment - now that we've saved it to disk we don't need to keep it in memory Ok(segment) => drop(segment), Err(err) => { error!( "An error occurred saving telemetry history to disk: {}", err ); } } }) } fn get_disk_segment( &self, service: &TelemetryHistoryService, start: DateTime, ) -> JoinHandle> { let mut path = service.data_root_folder.clone(); path.push(self.data.definition.uuid.as_hyphenated().to_string()); spawn_blocking(move || HistorySegmentFile::open(path, start)) } async fn create_ram_segment( &self, start: DateTime, service: &TelemetryHistoryService, telemetry_data_type: DataType, ) -> HistorySegmentRam { let ram = self .get_disk_segment(service, start) .await .unwrap_or_else(|e| Err(anyhow!("Join Error {e}"))) .map(|disk| spawn_blocking(move || disk.load_to_ram(telemetry_data_type))); let ram = match ram { Ok(ram) => ram.await.unwrap_or_else(|e| Err(anyhow!("Join Error {e}"))), Err(e) => Err(e), }; match ram { Ok(ram) => ram, _ => HistorySegmentRam::new(start, start + service.segment_width), } } pub async fn insert( &self, service: &TelemetryHistoryService, value: DataValue, timestamp: DateTime, ) { let segments = self.segments.read().await; let segments = if segments.is_empty() || segments[segments.len() - 1].end < timestamp { // We want to insert something that doesn't fit into our history drop(segments); let mut segments = self.segments.write().await; if segments.is_empty() { let start_time = timestamp.duration_trunc(service.segment_width).unwrap(); segments.push_back( self.create_ram_segment(start_time, service, self.data.definition.data_type) .await, ); } else { while segments[segments.len() - 1].end < timestamp { if segments.len() == service.max_segments { if let Some(segment) = segments.pop_front() { // We don't care about this future drop(self.cleanup_segment(service, segment)); } } let start_time = segments[segments.len() - 1].end; segments.push_back( self.create_ram_segment( start_time, service, self.data.definition.data_type, ) .await, ); } } drop(segments); self.segments.read().await } else { segments }; // Get the index of the first segment which has an end time AFTER the above timestamp let segment_index = segments.partition_point(|segment| segment.end < timestamp); segments[segment_index].insert(value, timestamp); } pub fn insert_sync( history: Arc, service: Arc, value: DataValue, timestamp: DateTime, ) { tokio::spawn(async move { history.insert(&service, value, timestamp).await; }); } pub async fn get( &self, from: DateTime, to: DateTime, maximum_resolution: TimeDelta, telemetry_history_service: &TelemetryHistoryService, ) -> Vec { let mut disk_result = vec![]; let mut ram_result = vec![]; let mut from = from; let mut to = to; let initial_to = to; let mut ram_from_result = from; { let segments = self.segments.read().await; let first_ram_segment = segments.front().map(|x| x.start); if let Some(first_ram_segment) = first_ram_segment { let mut ram_from = first_ram_segment; for i in 0..segments.len() { let (new_from, new_data) = segments[i].get(ram_from, to, maximum_resolution); ram_from = new_from; ram_result.extend(new_data); } from = min(from, first_ram_segment); to = min(to, first_ram_segment); ram_from_result = ram_from; } } { let start = from .duration_trunc(telemetry_history_service.segment_width) .unwrap(); let end = (to + telemetry_history_service.segment_width) .duration_trunc(telemetry_history_service.segment_width) .unwrap(); let mut path = telemetry_history_service.data_root_folder.clone(); path.push(self.data.definition.uuid.as_hyphenated().to_string()); let mut start = start; while start < end { // We're going to ignore errors with getting the disk segment if let Ok(Ok(mut disk)) = self .get_disk_segment(telemetry_history_service, start) .await { match disk.get(from, to, maximum_resolution, self.data.definition.data_type) { Ok((new_from, new_data)) => { from = new_from; disk_result.extend(new_data); } Err(err) => { error!("Failed to get from disk segment: {err}"); } } } start += telemetry_history_service.segment_width; } } { // Go through the ram segments a second time to capture any data added since we dealt // with the disk data from = ram_from_result; to = initial_to; let segments = self.segments.read().await; for i in 0..segments.len() { let (new_from, new_data) = segments[i].get(from, to, maximum_resolution); from = new_from; ram_result.extend(new_data); } } disk_result.extend(ram_result); disk_result } pub async fn cleanup(&self, service: &TelemetryHistoryService) -> anyhow::Result<()> { let mut segments = self.segments.write().await; let segments = segments .drain(..) .map(|segment| self.cleanup_segment(service, segment)) .collect::>(); for segment in segments { segment.await?; } Ok(()) } } pub struct TelemetryHistoryService { segment_width: TimeDelta, max_segments: usize, data_root_folder: PathBuf, } impl TelemetryHistoryService { pub fn new(data_folder: PathBuf) -> anyhow::Result { let result = Self { segment_width: TimeDelta::minutes(1), max_segments: 5, data_root_folder: data_folder, }; fs::create_dir_all(&result.data_root_folder)?; info!( "Recording Telemetry Data to {}", result.data_root_folder.to_string_lossy() ); Ok(result) } pub fn get_metadata_file(&self) -> PathBuf { self.data_root_folder.join("metadata.json") } }