diff --git a/frontend/src/components/GraphAxis.vue b/frontend/src/components/GraphAxis.vue index b2f6739..3cae71b 100644 --- a/frontend/src/components/GraphAxis.vue +++ b/frontend/src/components/GraphAxis.vue @@ -181,7 +181,8 @@ const lines = computed(() => { text_offset " :y="y_map(tick)" - > + > @@ -196,8 +197,8 @@ const lines = computed(() => { text_offset " :y="y_map(tick)" - > + > @@ -212,10 +213,9 @@ const lines = computed(() => { text_offset " :y="y_map(tick)" - > - - + + @@ -232,7 +232,8 @@ const lines = computed(() => { text_offset " :y="y_map(tick)" - > + > @@ -247,7 +248,8 @@ const lines = computed(() => { text_offset " :y="y_map(tick)" - > + > @@ -262,7 +264,8 @@ const lines = computed(() => { text_offset " :y="y_map(tick)" - > + > diff --git a/frontend/src/components/NumericText.vue b/frontend/src/components/NumericText.vue index 69b92aa..30ec6d8 100644 --- a/frontend/src/components/NumericText.vue +++ b/frontend/src/components/NumericText.vue @@ -1,27 +1,30 @@ - \ No newline at end of file + diff --git a/frontend/src/components/SvgGraph.vue b/frontend/src/components/SvgGraph.vue index e05526e..7646082 100644 --- a/frontend/src/components/SvgGraph.vue +++ b/frontend/src/components/SvgGraph.vue @@ -64,10 +64,18 @@ time_lines.reverse(); const text_offset = computed(() => 5); const legend_width = 160; -const border_left = computed(() => (props.left_axis ? 96 : 0) + (props.legend == GraphSide.Left ? legend_width : 0)); -const border_right = computed(() => (props.right_axis ? 80 : 0) + (props.legend == GraphSide.Right ? legend_width : 0)); +const border_left = computed( + () => + (props.left_axis ? 96 : 0) + + (props.legend == GraphSide.Left ? legend_width : 0), +); +const border_right = computed( + () => + (props.right_axis ? 80 : 0) + + (props.legend == GraphSide.Right ? legend_width : 0), +); const border_top = computed(() => 6); -const border_bottom = computed(() => props.hide_time_labels ? 6 : 24); +const border_bottom = computed(() => (props.hide_time_labels ? 6 : 24)); const max_x = now; const min_x = computed(() => max_x.value - window_duration.value); @@ -75,7 +83,8 @@ const min_x = computed(() => max_x.value - window_duration.value); const x_map = (x: number) => { const diff_x = max_x.value - min_x.value; return ( - ((width.value - border_left.value - border_right.value) * (x - min_x.value)) / + ((width.value - border_left.value - border_right.value) * + (x - min_x.value)) / diff_x + border_left.value ); @@ -83,8 +92,12 @@ const x_map = (x: number) => { const telemetry_lines = ref([]); -const legend_enabled = computed(() => props.legend === GraphSide.Left || props.legend === GraphSide.Right); -const legend_x = computed(() => (props.legend === GraphSide.Left) ? (8) : (width.value - legend_width + 8)); +const legend_enabled = computed( + () => props.legend === GraphSide.Left || props.legend === GraphSide.Right, +); +const legend_x = computed(() => + props.legend === GraphSide.Left ? 8 : width.value - legend_width + 8, +); const legend_y = computed(() => border_top.value); const legend_x_stride = computed(() => 0); const legend_y_stride = computed(() => 16); @@ -188,5 +201,4 @@ const lines = computed(() => { stroke: variables.$time-tick; fill: variables.$time-tick; } - diff --git a/frontend/src/components/TelemetryLine.vue b/frontend/src/components/TelemetryLine.vue index 68b4304..e0b1e4c 100644 --- a/frontend/src/components/TelemetryLine.vue +++ b/frontend/src/components/TelemetryLine.vue @@ -35,14 +35,13 @@ const legend_line_length = 8; const legend_text_offset = 4; const text_offset = computed(() => 10); -const min_sep = computed(() => Math.min(props.minimum_separation || 0, maximum_minimum_separation_live)); +const min_sep = computed(() => + Math.min(props.minimum_separation || 0, maximum_minimum_separation_live), +); const { data } = useTelemetry(() => props.data); const websocket = inject>(WEBSOCKET_SYMBOL)!; -const value = websocket.value.listen_to_telemetry( - data, - min_sep, -); +const value = websocket.value.listen_to_telemetry(data, min_sep); const graph_data = inject(GRAPH_DATA)!; const axis_data = inject(AXIS_DATA)!; @@ -128,7 +127,9 @@ watch( max.value = item_val; } } - memo.value.reduce_to_maximum_separation(props.minimum_separation || 0); + memo.value.reduce_to_maximum_separation( + props.minimum_separation || 0, + ); triggerRef(memo); debounced_recompute(); } catch (e) { @@ -251,28 +252,41 @@ const current_value = computed(() => { }); const legend_x = computed(() => { - return toValue(graph_data.legend_x) + toValue(graph_data.legend_x_stride) * index.value; + return ( + toValue(graph_data.legend_x) + + toValue(graph_data.legend_x_stride) * index.value + ); }); const legend_y = computed(() => { - return toValue(graph_data.legend_y) + toValue(graph_data.legend_y_stride) * index.value; + return ( + toValue(graph_data.legend_y) + + toValue(graph_data.legend_y_stride) * index.value + ); }); const legend_text = computed(() => { - const max_chars = (toValue(graph_data.legend_width) - legend_line_length - legend_text_offset * 2) / 7; + const max_chars = + (toValue(graph_data.legend_width) - + legend_line_length - + legend_text_offset * 2) / + 7; const start_text = props.data; if (start_text.length > max_chars) { - return start_text.substring(0, 3) + "..." + start_text.substring(start_text.length - max_chars + 6); + return ( + start_text.substring(0, 3) + + '...' + + start_text.substring(start_text.length - max_chars + 6) + ); } return start_text; }); const legend_line = computed(() => { - let x = legend_x.value; - let y = legend_y.value; + const x = legend_x.value; + const y = legend_y.value; return `${x},${y} ${x + legend_line_length},${y}`; }); - diff --git a/frontend/src/graph/line.ts b/frontend/src/graph/line.ts index f211200..91220ca 100644 --- a/frontend/src/graph/line.ts +++ b/frontend/src/graph/line.ts @@ -38,18 +38,27 @@ export class PointLine { const index = this.find_index(point.x); this.data.splice(index, 0, point); if (maximum_separation !== undefined) { - this.reduce_to_maximum_separation(maximum_separation, [index - 1, index + 1]); + this.reduce_to_maximum_separation(maximum_separation, [ + index - 1, + index + 1, + ]); } } - reduce_to_maximum_separation(maximum_separation: number, range?: [number, number]) { + reduce_to_maximum_separation( + maximum_separation: number, + range?: [number, number], + ) { if (maximum_separation <= 0) { return; } // Add a default range if it does not exist range = range || [1, this.data.length - 2]; // clamp it to the bounds - range = [Math.max(1, range[0]), Math.min(this.data.length - 2, range[1])]; + range = [ + Math.max(1, range[0]), + Math.min(this.data.length - 2, range[1]), + ]; // Loop over the indices in the range (backwards so removals don't break anything) for (let i = range[1]; i >= range[0]; i--) { @@ -61,7 +70,10 @@ export class PointLine { const separation_after = x_next - x_current; // If the data points are too close on both sides then delete this point - if (separation_before < maximum_separation && separation_after < maximum_separation) { + if ( + separation_before < maximum_separation && + separation_after < maximum_separation + ) { this.data.splice(i, 1); } } diff --git a/frontend/src/views/HomeView.vue b/frontend/src/views/HomeView.vue index 7315815..b47a155 100644 --- a/frontend/src/views/HomeView.vue +++ b/frontend/src/views/HomeView.vue @@ -66,18 +66,8 @@ provide(WEBSOCKET_SYMBOL, websocket); > - - - - + + diff --git a/server/src/grpc.rs b/server/src/grpc.rs index a1b69fd..e60d724 100644 --- a/server/src/grpc.rs +++ b/server/src/grpc.rs @@ -6,6 +6,7 @@ use crate::core::{ }; use crate::telemetry::data_item::TelemetryDataItem; use crate::telemetry::data_value::TelemetryDataValue; +use crate::telemetry::history::TelemetryHistory; use crate::telemetry::management_service::TelemetryManagementService; use chrono::{DateTime, SecondsFormat}; use log::{error, info, trace}; @@ -19,7 +20,6 @@ use tonic::codegen::tokio_stream::wrappers::ReceiverStream; use tonic::codegen::tokio_stream::{Stream, StreamExt}; use tonic::transport::Server; use tonic::{Request, Response, Status, Streaming}; -use crate::telemetry::history::TelemetryHistory; pub struct CoreTelemetryService { pub tlm_management: Arc, @@ -128,7 +128,12 @@ impl CoreTelemetryService { value: value.clone(), timestamp: timestamp.to_rfc3339_opts(SecondsFormat::Millis, true), })); - TelemetryHistory::insert_sync(tlm_data.clone(), tlm_management.history_service(), value, timestamp); + TelemetryHistory::insert_sync( + tlm_data.clone(), + tlm_management.history_service(), + value, + timestamp, + ); Ok(TelemetryInsertResponse {}) } diff --git a/server/src/http/api/mod.rs b/server/src/http/api/mod.rs index d12f2ae..95555dd 100644 --- a/server/src/http/api/mod.rs +++ b/server/src/http/api/mod.rs @@ -58,7 +58,10 @@ async fn get_tlm_history( let data = data.pin(); match data.get_by_uuid(&uuid) { None => Err(HttpServerResultError::TlmUuidNotFound { uuid }), - Some(tlm) => Ok(web::Json(tlm.get(from, to, maximum_resolution, &history_service).await)), + Some(tlm) => Ok(web::Json( + tlm.get(from, to, maximum_resolution, &history_service) + .await, + )), } } diff --git a/server/src/lib.rs b/server/src/lib.rs index 8394caf..9b0f18b 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -9,8 +9,8 @@ pub mod core { use crate::telemetry::history::TelemetryHistoryService; use crate::telemetry::management_service::TelemetryManagementService; -use std::sync::Arc; use log::error; +use std::sync::Arc; use tokio_util::sync::CancellationToken; pub async fn setup() -> anyhow::Result<()> { diff --git a/server/src/telemetry/data.rs b/server/src/telemetry/data.rs index d2bf1ec..82762ba 100644 --- a/server/src/telemetry/data.rs +++ b/server/src/telemetry/data.rs @@ -11,7 +11,7 @@ impl From for TelemetryData { fn from(value: TelemetryDefinition) -> Self { Self { definition: value, - data: tokio::sync::watch::channel(None).0 + data: tokio::sync::watch::channel(None).0, } } } diff --git a/server/src/telemetry/history.rs b/server/src/telemetry/history.rs index c2a24c9..b47b36b 100644 --- a/server/src/telemetry/history.rs +++ b/server/src/telemetry/history.rs @@ -1,24 +1,29 @@ +use crate::core::TelemetryDataType; +use crate::telemetry::data::TelemetryData; use crate::telemetry::data_item::TelemetryDataItem; use crate::telemetry::data_value::TelemetryDataValue; +use crate::telemetry::definition::TelemetryDefinition; +use anyhow::{ensure, Context}; use chrono::{DateTime, DurationRound, SecondsFormat, TimeDelta, Utc}; use log::{error, info}; -use std::collections::VecDeque; -use std::{fs, path}; use std::cmp::min; +use std::collections::VecDeque; use std::io::SeekFrom; use std::path::PathBuf; use std::sync::{Arc, RwLock}; -use anyhow::{ensure, Context}; +use std::{fs, path}; use tokio::fs::File; use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt}; use tokio::task::JoinHandle; -use crate::core::TelemetryDataType; -use crate::telemetry::data::TelemetryData; -use crate::telemetry::definition::TelemetryDefinition; const FOLDER_DURATION: TimeDelta = TimeDelta::hours(1); -fn update_next_from(time_since_next_from: TimeDelta, maximum_resolution: TimeDelta, t: DateTime, next_from: DateTime) -> DateTime { +fn update_next_from( + time_since_next_from: TimeDelta, + maximum_resolution: TimeDelta, + t: DateTime, + next_from: DateTime, +) -> DateTime { match ( time_since_next_from.num_nanoseconds(), maximum_resolution.num_nanoseconds(), @@ -27,8 +32,7 @@ fn update_next_from(time_since_next_from: TimeDelta, maximum_resolution: TimeDel (Some(nanos_since_next_from), Some(maximum_resolution_nanos)) => { let nanos_since_next_from = nanos_since_next_from as u64; let maximum_resolution_nanos = maximum_resolution_nanos as u64; - let num_steps = - nanos_since_next_from.div_ceil(maximum_resolution_nanos); + let num_steps = nanos_since_next_from.div_ceil(maximum_resolution_nanos); if num_steps > i32::MAX as u64 { t + maximum_resolution } else { @@ -110,7 +114,12 @@ impl HistorySegmentRam { } if t >= next_from { let time_since_next_from = t - next_from; - next_from = update_next_from(time_since_next_from, maximum_resolution, t, next_from); + next_from = update_next_from( + time_since_next_from, + maximum_resolution, + t, + next_from, + ); result.push(TelemetryDataItem { value: data.values[i].clone(), timestamp: t.to_rfc3339_opts(SecondsFormat::Millis, true), @@ -128,14 +137,17 @@ struct HistorySegmentDisk { start: DateTime, end: DateTime, length: u64, - file: File + file: File, } impl HistorySegmentDisk { const TIMESTAMP_LENGTH: u64 = 8 + 4; const HEADER_LENGTH: u64 = Self::TIMESTAMP_LENGTH + Self::TIMESTAMP_LENGTH + 8; - async fn save_to_disk(mut folder: PathBuf, mut segment: HistorySegmentRam) -> anyhow::Result { + async fn save_to_disk( + mut folder: PathBuf, + mut segment: HistorySegmentRam, + ) -> anyhow::Result { // Get the path for the specific timestamp we want to save to disk let folder_time = segment.start.duration_trunc(FOLDER_DURATION)?; folder.push(folder_time.to_rfc3339_opts(SecondsFormat::Secs, true)); @@ -144,7 +156,10 @@ impl HistorySegmentDisk { fs::create_dir_all(&folder)?; let mut file = folder; - file.push(format!("{}.dat", segment.start.to_rfc3339_opts(SecondsFormat::Secs, true))); + file.push(format!( + "{}.dat", + segment.start.to_rfc3339_opts(SecondsFormat::Secs, true) + )); let file = File::create(file).await?; @@ -152,25 +167,36 @@ impl HistorySegmentDisk { start: segment.start, end: segment.end, length: 0, - file + file, }; let utc_offset_start = result.start - DateTime::UNIX_EPOCH; let utc_offset_end = result.end - DateTime::UNIX_EPOCH; // Write the segment bounds - result.file.write_i64(utc_offset_start.num_seconds()).await?; - result.file.write_i32(utc_offset_start.subsec_nanos()).await?; + result + .file + .write_i64(utc_offset_start.num_seconds()) + .await?; + result + .file + .write_i32(utc_offset_start.subsec_nanos()) + .await?; result.file.write_i64(utc_offset_end.num_seconds()).await?; result.file.write_i32(utc_offset_end.subsec_nanos()).await?; let data = segment.data.get_mut().unwrap_or_else(|err| { - error!("HistorySegmentDisk::save_to_disk - data was poisoned: {}", err); - let lock = err.into_inner(); - lock + error!( + "HistorySegmentDisk::save_to_disk - data was poisoned: {}", + err + ); + err.into_inner() }); - ensure!(data.timestamps.len() == data.values.len(), "Invalid Segment Cannot Be Saved to Disk"); + ensure!( + data.timestamps.len() == data.values.len(), + "Invalid Segment Cannot Be Saved to Disk" + ); result.length = data.timestamps.len() as u64; result.file.write_u64(result.length).await?; @@ -194,7 +220,10 @@ impl HistorySegmentDisk { Ok(result) } - async fn load_to_ram(mut self, telemetry_data_type: TelemetryDataType) -> anyhow::Result { + async fn load_to_ram( + mut self, + telemetry_data_type: TelemetryDataType, + ) -> anyhow::Result { let mut segment_data = SegmentData { values: Vec::with_capacity(self.length as usize), timestamps: Vec::with_capacity(self.length as usize), @@ -205,7 +234,9 @@ impl HistorySegmentDisk { segment_data.timestamps.push(self.read_date_time().await?); } for _ in 0..self.length { - segment_data.values.push(self.read_telemetry_item(telemetry_data_type).await?); + segment_data + .values + .push(self.read_telemetry_item(telemetry_data_type).await?); } Ok(HistorySegmentRam { @@ -220,7 +251,10 @@ impl HistorySegmentDisk { let folder_time = start.duration_trunc(FOLDER_DURATION)?; let mut file = folder; file.push(folder_time.to_rfc3339_opts(SecondsFormat::Secs, true)); - file.push(format!("{}.dat", start.to_rfc3339_opts(SecondsFormat::Secs, true))); + file.push(format!( + "{}.dat", + start.to_rfc3339_opts(SecondsFormat::Secs, true) + )); let mut file = File::open(file).await?; @@ -229,8 +263,10 @@ impl HistorySegmentDisk { let start_nanos = file.read_i32().await?; let end_seconds = file.read_i64().await?; let end_nanos = file.read_i32().await?; - let start = TimeDelta::new(start_seconds, start_nanos as u32).context("Failed to reconstruct start TimeDelta")?; - let end = TimeDelta::new(end_seconds, end_nanos as u32).context("Failed to reconstruct end TimeDelta")?; + let start = TimeDelta::new(start_seconds, start_nanos as u32) + .context("Failed to reconstruct start TimeDelta")?; + let end = TimeDelta::new(end_seconds, end_nanos as u32) + .context("Failed to reconstruct end TimeDelta")?; let length = file.read_u64().await?; @@ -263,7 +299,12 @@ impl HistorySegmentDisk { } if t >= next_from { let time_since_next_from = t - next_from; - next_from = update_next_from(time_since_next_from, maximum_resolution, t, next_from); + next_from = update_next_from( + time_since_next_from, + maximum_resolution, + t, + next_from, + ); result.push(TelemetryDataItem { value: self.get_telemetry_item(i, telemetry_data_type).await?, timestamp: t.to_rfc3339_opts(SecondsFormat::Millis, true), @@ -279,28 +320,48 @@ impl HistorySegmentDisk { async fn read_date_time(&mut self) -> anyhow::Result> { let seconds = self.file.read_i64().await?; let nanos = self.file.read_i32().await?; - let start = TimeDelta::new(seconds, nanos as u32).context("Failed to reconstruct TimeDelta")?; + let start = + TimeDelta::new(seconds, nanos as u32).context("Failed to reconstruct TimeDelta")?; Ok(DateTime::UNIX_EPOCH + start) } async fn get_date_time(&mut self, index: u64) -> anyhow::Result> { - self.file.seek(SeekFrom::Start(Self::HEADER_LENGTH + index * Self::TIMESTAMP_LENGTH)).await?; + self.file + .seek(SeekFrom::Start( + Self::HEADER_LENGTH + index * Self::TIMESTAMP_LENGTH, + )) + .await?; self.read_date_time().await } - async fn read_telemetry_item(&mut self, telemetry_data_type: TelemetryDataType) -> anyhow::Result { + async fn read_telemetry_item( + &mut self, + telemetry_data_type: TelemetryDataType, + ) -> anyhow::Result { match telemetry_data_type { - TelemetryDataType::Float32 => Ok(TelemetryDataValue::Float32(self.file.read_f32().await?)), - TelemetryDataType::Float64 => Ok(TelemetryDataValue::Float64(self.file.read_f64().await?)), + TelemetryDataType::Float32 => { + Ok(TelemetryDataValue::Float32(self.file.read_f32().await?)) + } + TelemetryDataType::Float64 => { + Ok(TelemetryDataValue::Float64(self.file.read_f64().await?)) + } } } - async fn get_telemetry_item(&mut self, index: u64, telemetry_data_type: TelemetryDataType) -> anyhow::Result { + async fn get_telemetry_item( + &mut self, + index: u64, + telemetry_data_type: TelemetryDataType, + ) -> anyhow::Result { let item_length = match telemetry_data_type { TelemetryDataType::Float32 => 4, TelemetryDataType::Float64 => 8, }; - self.file.seek(SeekFrom::Start(Self::HEADER_LENGTH + self.length * Self::TIMESTAMP_LENGTH + index * item_length)).await?; + self.file + .seek(SeekFrom::Start( + Self::HEADER_LENGTH + self.length * Self::TIMESTAMP_LENGTH + index * item_length, + )) + .await?; self.read_telemetry_item(telemetry_data_type).await } @@ -325,7 +386,12 @@ impl HistorySegmentDisk { size -= half; } - Ok(left + if self.get_date_time(left).await? < date_time { 1 } else { 0 }) + Ok(left + + if self.get_date_time(left).await? < date_time { + 1 + } else { + 0 + }) } } @@ -334,7 +400,6 @@ pub struct TelemetryHistory { segments: tokio::sync::RwLock>, } - impl From for TelemetryHistory { fn from(value: TelemetryData) -> Self { Self { @@ -351,7 +416,11 @@ impl From for TelemetryHistory { } impl TelemetryHistory { - fn cleanup_segment(&self, service: &TelemetryHistoryService, history_segment_ram: HistorySegmentRam) -> JoinHandle<()> { + fn cleanup_segment( + &self, + service: &TelemetryHistoryService, + history_segment_ram: HistorySegmentRam, + ) -> JoinHandle<()> { let mut path = service.data_root_folder.clone(); path.push(&self.data.definition.uuid); tokio::spawn(async move { @@ -359,19 +428,31 @@ impl TelemetryHistory { // Immediately drop the segment - now that we've saved it to disk we don't need to keep it in memory Ok(segment) => drop(segment), Err(err) => { - error!("An error occurred saving telemetry history to disk: {}", err); + error!( + "An error occurred saving telemetry history to disk: {}", + err + ); } } }) } - async fn get_disk_segment(&self, service: &TelemetryHistoryService, start: DateTime) -> anyhow::Result { + async fn get_disk_segment( + &self, + service: &TelemetryHistoryService, + start: DateTime, + ) -> anyhow::Result { let mut path = service.data_root_folder.clone(); path.push(&self.data.definition.uuid); HistorySegmentDisk::open(path, start).await } - async fn create_ram_segment(&self, start: DateTime, service: &TelemetryHistoryService, telemetry_data_type: TelemetryDataType) -> HistorySegmentRam { + async fn create_ram_segment( + &self, + start: DateTime, + service: &TelemetryHistoryService, + telemetry_data_type: TelemetryDataType, + ) -> HistorySegmentRam { let ram = match self.get_disk_segment(service, start).await { Ok(disk) => disk.load_to_ram(telemetry_data_type).await, Err(e) => Err(e), @@ -379,10 +460,7 @@ impl TelemetryHistory { match ram { Ok(ram) => ram, - Err(_) => HistorySegmentRam::new( - start, - start + service.segment_width, - ), + Err(_) => HistorySegmentRam::new(start, start + service.segment_width), } } @@ -401,16 +479,27 @@ impl TelemetryHistory { if segments.len() == 0 { let start_time = timestamp.duration_trunc(service.segment_width).unwrap(); - segments.push_back(self.create_ram_segment(start_time, service, self.data.definition.data_type).await); + segments.push_back( + self.create_ram_segment(start_time, service, self.data.definition.data_type) + .await, + ); } else { while segments[segments.len() - 1].end < timestamp { if segments.len() == service.max_segments { if let Some(segment) = segments.pop_front() { - let _ = self.cleanup_segment(service, segment); + // We don't care about this future + drop(self.cleanup_segment(service, segment)); } } let start_time = segments[segments.len() - 1].end; - segments.push_back(self.create_ram_segment(start_time, service, self.data.definition.data_type).await); + segments.push_back( + self.create_ram_segment( + start_time, + service, + self.data.definition.data_type, + ) + .await, + ); } } @@ -442,7 +531,7 @@ impl TelemetryHistory { from: DateTime, to: DateTime, maximum_resolution: TimeDelta, - telemetry_history_service: &TelemetryHistoryService + telemetry_history_service: &TelemetryHistoryService, ) -> Vec { let mut result = vec![]; @@ -452,29 +541,41 @@ impl TelemetryHistory { { let first_ram_segment = segments.front().map(|x| x.start); - let start = from.duration_trunc(telemetry_history_service.segment_width).unwrap(); - let end = (to + telemetry_history_service.segment_width).duration_trunc(telemetry_history_service.segment_width).unwrap(); + let start = from + .duration_trunc(telemetry_history_service.segment_width) + .unwrap(); + let end = (to + telemetry_history_service.segment_width) + .duration_trunc(telemetry_history_service.segment_width) + .unwrap(); - let end = if let Some(first_ram_segment) = first_ram_segment { min(end, first_ram_segment) } else { end }; + let end = if let Some(first_ram_segment) = first_ram_segment { + min(end, first_ram_segment) + } else { + end + }; let mut path = telemetry_history_service.data_root_folder.clone(); path.push(&self.data.definition.uuid); let mut start = start; while start < end { - match self.get_disk_segment(telemetry_history_service, start).await { - Ok(mut disk) => { - match disk.get(from, to, maximum_resolution, self.data.definition.data_type).await { - Ok((new_from, new_data)) => { - from = new_from; - result.extend(new_data); - }, - Err(err) => { - error!("Failed to get from disk segment: {err}"); - } + // We're going to ignore errors with getting the disk segment + if let Ok(mut disk) = self + .get_disk_segment(telemetry_history_service, start) + .await + { + match disk + .get(from, to, maximum_resolution, self.data.definition.data_type) + .await + { + Ok((new_from, new_data)) => { + from = new_from; + result.extend(new_data); } - }, - Err(_) => {}, // Ignore errors + Err(err) => { + error!("Failed to get from disk segment: {err}"); + } + } } start += telemetry_history_service.segment_width; } @@ -516,7 +617,10 @@ impl TelemetryHistoryService { fs::create_dir_all(&result.data_root_folder)?; - info!("Recording Telemetry Data to {}", result.data_root_folder.to_string_lossy()); + info!( + "Recording Telemetry Data to {}", + result.data_root_folder.to_string_lossy() + ); Ok(result) } diff --git a/server/src/telemetry/management_service.rs b/server/src/telemetry/management_service.rs index 386fbd9..35bf063 100644 --- a/server/src/telemetry/management_service.rs +++ b/server/src/telemetry/management_service.rs @@ -1,15 +1,15 @@ -use std::fs; -use std::fs::File; use crate::core::{TelemetryDefinitionRequest, Uuid}; use crate::telemetry::data::TelemetryData; use crate::telemetry::definition::TelemetryDefinition; use crate::telemetry::history::{TelemetryHistory, TelemetryHistoryService}; +use log::{error, info, warn}; use papaya::{HashMap, HashMapRef, LocalGuard}; +use std::fs; +use std::fs::File; use std::hash::RandomState; use std::io::{Read, Write}; use std::sync::Arc; use std::time::Duration; -use log::{error, info, warn}; use tokio::sync::Mutex; use tokio::time::sleep; @@ -51,10 +51,10 @@ impl TelemetryManagementService { Ok(tlm_def) => { let _ = uuid_index.insert(tlm_def.name.clone(), tlm_def.uuid.clone()); let _ = tlm_data.insert(tlm_def.uuid.clone(), Arc::new(tlm_def.into())); - }, + } Err(err) => { error!("Failed to parse metadata entry {err}"); - }, + } } } } @@ -67,11 +67,12 @@ impl TelemetryManagementService { uuid_index, tlm_data, telemetry_history_service: Arc::new(telemetry_history_service), - metadata_file: Arc::new(Mutex::new(fs::OpenOptions::new() - .create(true) - .write(true) - .append(true) - .open(metadata_file)?)) + metadata_file: Arc::new(Mutex::new( + fs::OpenOptions::new() + .create(true) + .append(true) + .open(metadata_file)?, + )), }) } @@ -90,43 +91,42 @@ impl TelemetryManagementService { let inserted = tlm_data.try_insert( uuid.clone(), - Arc::new(TelemetryDefinition { - uuid: uuid.clone(), - name: telemetry_definition_request.name.clone(), - data_type: telemetry_definition_request.data_type(), - }.into()), + Arc::new( + TelemetryDefinition { + uuid: uuid.clone(), + name: telemetry_definition_request.name.clone(), + data_type: telemetry_definition_request.data_type(), + } + .into(), + ), ); - match inserted { - Ok(newly_inserted) => { - // This data also needs to be written to disk - let file = self.metadata_file.clone(); - let newly_inserted = newly_inserted.data.definition.clone(); - tokio::spawn(async move { - let mut file = file.lock_owned().await; - let newly_inserted = serde_json::to_string(&newly_inserted); - match newly_inserted { - Ok(newly_inserted) => { - if let Err(err) = file.write(newly_inserted.as_bytes()) { - error!("Failed to write TelemetryDefinition to file {err}"); - return; - } - if let Err(err) = file.write("\n".as_bytes()) { - error!("Failed to write newline to file {err}"); - return; - } - if let Err(err) = file.flush() { - error!("Failed to flush file {err}"); - return; - } - }, - Err(err) => { - error!("Failed to serialize TelemetryDefinition {err}"); - }, + if let Ok(newly_inserted) = inserted { + // This data also needs to be written to disk + let file = self.metadata_file.clone(); + let newly_inserted = newly_inserted.data.definition.clone(); + tokio::spawn(async move { + let mut file = file.lock_owned().await; + let newly_inserted = serde_json::to_string(&newly_inserted); + match newly_inserted { + Ok(newly_inserted) => { + if let Err(err) = file.write(newly_inserted.as_bytes()) { + error!("Failed to write TelemetryDefinition to file {err}"); + return; + } + if let Err(err) = file.write("\n".as_bytes()) { + error!("Failed to write newline to file {err}"); + return; + } + if let Err(err) = file.flush() { + error!("Failed to flush file {err}"); + } } - }); - } - Err(_) => {}, + Err(err) => { + error!("Failed to serialize TelemetryDefinition {err}"); + } + } + }); } Ok(uuid)