diff --git a/server/src/telemetry/history.rs b/server/src/telemetry/history.rs index 102ada8..683c38d 100644 --- a/server/src/telemetry/history.rs +++ b/server/src/telemetry/history.rs @@ -505,7 +505,7 @@ impl TelemetryHistory { service, self.data.definition.data_type, ) - .await, + .await, ); } } @@ -540,14 +540,32 @@ impl TelemetryHistory { maximum_resolution: TimeDelta, telemetry_history_service: &TelemetryHistoryService, ) -> Vec { - let mut result = vec![]; + let mut disk_result = vec![]; + let mut ram_result = vec![]; - let segments = self.segments.read().await; let mut from = from; + let mut to = to; + let initial_to = to; + let mut ram_from_result = from; { + let segments = self.segments.read().await; let first_ram_segment = segments.front().map(|x| x.start); + if let Some(first_ram_segment) = first_ram_segment { + let mut ram_from = first_ram_segment; + for i in 0..segments.len() { + let (new_from, new_data) = segments[i].get(ram_from, to, maximum_resolution); + ram_from = new_from; + ram_result.extend(new_data); + } + from = min(from, first_ram_segment); + to = min(to, first_ram_segment); + ram_from_result = ram_from; + } + } + + { let start = from .duration_trunc(telemetry_history_service.segment_width) .unwrap(); @@ -555,12 +573,6 @@ impl TelemetryHistory { .duration_trunc(telemetry_history_service.segment_width) .unwrap(); - let end = if let Some(first_ram_segment) = first_ram_segment { - min(end, first_ram_segment) - } else { - end - }; - let mut path = telemetry_history_service.data_root_folder.clone(); path.push(&self.data.definition.uuid); @@ -574,7 +586,7 @@ impl TelemetryHistory { match disk.get(from, to, maximum_resolution, self.data.definition.data_type) { Ok((new_from, new_data)) => { from = new_from; - result.extend(new_data); + disk_result.extend(new_data); } Err(err) => { error!("Failed to get from disk segment: {err}"); @@ -585,13 +597,21 @@ impl TelemetryHistory { } } - for i in 0..segments.len() { - let (new_from, new_data) = segments[i].get(from, to, maximum_resolution); - from = new_from; - result.extend(new_data); + { + // Go through the ram segments a second time to capture any data added since we dealt + // with the disk data + from = ram_from_result; + to = initial_to; + let segments = self.segments.read().await; + for i in 0..segments.len() { + let (new_from, new_data) = segments[i].get(from, to, maximum_resolution); + from = new_from; + ram_result.extend(new_data); + } } - result + disk_result.extend(ram_result); + disk_result } pub async fn cleanup(&self, service: &TelemetryHistoryService) -> anyhow::Result<()> {