fixes history loading slowly due to context switches

This commit is contained in:
2025-01-04 19:13:32 -05:00
parent c69022448f
commit 4dd7cea97d
12 changed files with 278 additions and 122 deletions

View File

@@ -1,20 +1,20 @@
use crate::core::TelemetryDataType;
use crate::serialization::file_ext::FileExt;
use crate::telemetry::data::TelemetryData;
use crate::telemetry::data_item::TelemetryDataItem;
use crate::telemetry::data_value::TelemetryDataValue;
use crate::telemetry::definition::TelemetryDefinition;
use anyhow::{ensure, Context};
use anyhow::{anyhow, ensure, Context};
use chrono::{DateTime, DurationRound, SecondsFormat, TimeDelta, Utc};
use log::{error, info};
use std::cmp::min;
use std::collections::VecDeque;
use std::io::SeekFrom;
use std::fs::File;
use std::io::{Seek, SeekFrom, Write};
use std::path::PathBuf;
use std::sync::{Arc, RwLock};
use std::{fs, path};
use tokio::fs::File;
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
use tokio::task::JoinHandle;
use tokio::task::{spawn_blocking, JoinHandle};
const FOLDER_DURATION: TimeDelta = TimeDelta::hours(1);
@@ -133,21 +133,18 @@ impl HistorySegmentRam {
}
}
struct HistorySegmentDisk {
struct HistorySegmentFile {
start: DateTime<Utc>,
end: DateTime<Utc>,
length: u64,
file: File,
}
impl HistorySegmentDisk {
impl HistorySegmentFile {
const TIMESTAMP_LENGTH: u64 = 8 + 4;
const HEADER_LENGTH: u64 = Self::TIMESTAMP_LENGTH + Self::TIMESTAMP_LENGTH + 8;
async fn save_to_disk(
mut folder: PathBuf,
mut segment: HistorySegmentRam,
) -> anyhow::Result<Self> {
fn save_to_disk(mut folder: PathBuf, mut segment: HistorySegmentRam) -> anyhow::Result<Self> {
// Get the path for the specific timestamp we want to save to disk
let folder_time = segment.start.duration_trunc(FOLDER_DURATION)?;
folder.push(folder_time.to_rfc3339_opts(SecondsFormat::Secs, true));
@@ -161,7 +158,7 @@ impl HistorySegmentDisk {
segment.start.to_rfc3339_opts(SecondsFormat::Secs, true)
));
let file = File::create(file).await?;
let file = File::create(file)?;
let mut result = Self {
start: segment.start,
@@ -176,14 +173,16 @@ impl HistorySegmentDisk {
// Write the segment bounds
result
.file
.write_i64(utc_offset_start.num_seconds())
.await?;
.write_data::<i64>(utc_offset_start.num_seconds())?;
result
.file
.write_i32(utc_offset_start.subsec_nanos())
.await?;
result.file.write_i64(utc_offset_end.num_seconds()).await?;
result.file.write_i32(utc_offset_end.subsec_nanos()).await?;
.write_data::<i32>(utc_offset_start.subsec_nanos())?;
result
.file
.write_data::<i64>(utc_offset_end.num_seconds())?;
result
.file
.write_data::<i32>(utc_offset_end.subsec_nanos())?;
let data = segment.data.get_mut().unwrap_or_else(|err| {
error!(
@@ -199,28 +198,28 @@ impl HistorySegmentDisk {
);
result.length = data.timestamps.len() as u64;
result.file.write_u64(result.length).await?;
result.file.write_data::<u64>(result.length)?;
// Write all the timestamps
for timestamp in &data.timestamps {
let utc_offset = *timestamp - DateTime::UNIX_EPOCH;
result.file.write_i64(utc_offset.num_seconds()).await?;
result.file.write_i32(utc_offset.subsec_nanos()).await?;
result.file.write_data::<i64>(utc_offset.num_seconds())?;
result.file.write_data::<i32>(utc_offset.subsec_nanos())?;
}
// Write all the values
for value in &data.values {
match value {
TelemetryDataValue::Float32(value) => result.file.write_f32(*value).await?,
TelemetryDataValue::Float64(value) => result.file.write_f64(*value).await?,
TelemetryDataValue::Float32(value) => result.file.write_data::<f32>(*value)?,
TelemetryDataValue::Float64(value) => result.file.write_data::<f64>(*value)?,
}
}
result.file.flush().await?;
result.file.flush()?;
Ok(result)
}
async fn load_to_ram(
fn load_to_ram(
mut self,
telemetry_data_type: TelemetryDataType,
) -> anyhow::Result<HistorySegmentRam> {
@@ -229,14 +228,14 @@ impl HistorySegmentDisk {
timestamps: Vec::with_capacity(self.length as usize),
};
self.file.seek(SeekFrom::Start(Self::HEADER_LENGTH)).await?;
self.file.seek(SeekFrom::Start(Self::HEADER_LENGTH))?;
for _ in 0..self.length {
segment_data.timestamps.push(self.read_date_time().await?);
segment_data.timestamps.push(self.read_date_time()?);
}
for _ in 0..self.length {
segment_data
.values
.push(self.read_telemetry_item(telemetry_data_type).await?);
.push(self.read_telemetry_item(telemetry_data_type)?);
}
Ok(HistorySegmentRam {
@@ -246,7 +245,7 @@ impl HistorySegmentDisk {
})
}
async fn open(folder: PathBuf, start: DateTime<Utc>) -> anyhow::Result<Self> {
fn open(folder: PathBuf, start: DateTime<Utc>) -> anyhow::Result<Self> {
// Get the path for the specific timestamp we want to save to disk
let folder_time = start.duration_trunc(FOLDER_DURATION)?;
let mut file = folder;
@@ -256,21 +255,21 @@ impl HistorySegmentDisk {
start.to_rfc3339_opts(SecondsFormat::Secs, true)
));
let mut file = File::open(file).await?;
let mut file = File::open(file)?;
// Write the segment bounds
let start_seconds = file.read_i64().await?;
let start_nanos = file.read_i32().await?;
let end_seconds = file.read_i64().await?;
let end_nanos = file.read_i32().await?;
let start_seconds = file.read_data::<i64>()?;
let start_nanos = file.read_data::<i32>()?;
let end_seconds = file.read_data::<i64>()?;
let end_nanos = file.read_data::<i32>()?;
let start = TimeDelta::new(start_seconds, start_nanos as u32)
.context("Failed to reconstruct start TimeDelta")?;
let end = TimeDelta::new(end_seconds, end_nanos as u32)
.context("Failed to reconstruct end TimeDelta")?;
let length = file.read_u64().await?;
let length = file.read_data::<u64>()?;
Ok(HistorySegmentDisk {
Ok(HistorySegmentFile {
start: DateTime::UNIX_EPOCH + start,
end: DateTime::UNIX_EPOCH + end,
length,
@@ -278,7 +277,7 @@ impl HistorySegmentDisk {
})
}
async fn get(
fn get(
&mut self,
from: DateTime<Utc>,
to: DateTime<Utc>,
@@ -290,10 +289,10 @@ impl HistorySegmentDisk {
let mut next_from = from;
if from < self.end && self.start < to {
let start = self.partition_point(from).await?;
let start = self.partition_point(from)?;
if start < self.length {
for i in start..self.length {
let t = self.get_date_time(i).await?;
let t = self.get_date_time(i)?;
if t >= self.end {
break;
}
@@ -306,7 +305,7 @@ impl HistorySegmentDisk {
next_from,
);
result.push(TelemetryDataItem {
value: self.get_telemetry_item(i, telemetry_data_type).await?,
value: self.get_telemetry_item(i, telemetry_data_type)?,
timestamp: t.to_rfc3339_opts(SecondsFormat::Millis, true),
});
}
@@ -317,38 +316,36 @@ impl HistorySegmentDisk {
Ok((next_from, result))
}
async fn read_date_time(&mut self) -> anyhow::Result<DateTime<Utc>> {
let seconds = self.file.read_i64().await?;
let nanos = self.file.read_i32().await?;
fn read_date_time(&mut self) -> anyhow::Result<DateTime<Utc>> {
let seconds = self.file.read_data::<i64>()?;
let nanos = self.file.read_data::<i32>()?;
let start =
TimeDelta::new(seconds, nanos as u32).context("Failed to reconstruct TimeDelta")?;
Ok(DateTime::UNIX_EPOCH + start)
}
async fn get_date_time(&mut self, index: u64) -> anyhow::Result<DateTime<Utc>> {
self.file
.seek(SeekFrom::Start(
Self::HEADER_LENGTH + index * Self::TIMESTAMP_LENGTH,
))
.await?;
self.read_date_time().await
fn get_date_time(&mut self, index: u64) -> anyhow::Result<DateTime<Utc>> {
self.file.seek(SeekFrom::Start(
Self::HEADER_LENGTH + index * Self::TIMESTAMP_LENGTH,
))?;
self.read_date_time()
}
async fn read_telemetry_item(
fn read_telemetry_item(
&mut self,
telemetry_data_type: TelemetryDataType,
) -> anyhow::Result<TelemetryDataValue> {
match telemetry_data_type {
TelemetryDataType::Float32 => {
Ok(TelemetryDataValue::Float32(self.file.read_f32().await?))
Ok(TelemetryDataValue::Float32(self.file.read_data::<f32>()?))
}
TelemetryDataType::Float64 => {
Ok(TelemetryDataValue::Float64(self.file.read_f64().await?))
Ok(TelemetryDataValue::Float64(self.file.read_data::<f64>()?))
}
}
}
async fn get_telemetry_item(
fn get_telemetry_item(
&mut self,
index: u64,
telemetry_data_type: TelemetryDataType,
@@ -357,15 +354,13 @@ impl HistorySegmentDisk {
TelemetryDataType::Float32 => 4,
TelemetryDataType::Float64 => 8,
};
self.file
.seek(SeekFrom::Start(
Self::HEADER_LENGTH + self.length * Self::TIMESTAMP_LENGTH + index * item_length,
))
.await?;
self.read_telemetry_item(telemetry_data_type).await
self.file.seek(SeekFrom::Start(
Self::HEADER_LENGTH + self.length * Self::TIMESTAMP_LENGTH + index * item_length,
))?;
self.read_telemetry_item(telemetry_data_type)
}
async fn partition_point(&mut self, date_time: DateTime<Utc>) -> anyhow::Result<u64> {
fn partition_point(&mut self, date_time: DateTime<Utc>) -> anyhow::Result<u64> {
if self.length == 0 {
return Ok(0);
}
@@ -379,7 +374,7 @@ impl HistorySegmentDisk {
let half = size / 2;
let mid = left + half;
let is_less = self.get_date_time(mid).await? < date_time;
let is_less = self.get_date_time(mid)? < date_time;
if is_less {
left = mid;
}
@@ -387,7 +382,7 @@ impl HistorySegmentDisk {
}
Ok(left
+ if self.get_date_time(left).await? < date_time {
+ if self.get_date_time(left)? < date_time {
1
} else {
0
@@ -423,8 +418,8 @@ impl TelemetryHistory {
) -> JoinHandle<()> {
let mut path = service.data_root_folder.clone();
path.push(&self.data.definition.uuid);
tokio::spawn(async move {
match HistorySegmentDisk::save_to_disk(path, history_segment_ram).await {
spawn_blocking(move || {
match HistorySegmentFile::save_to_disk(path, history_segment_ram) {
// Immediately drop the segment - now that we've saved it to disk we don't need to keep it in memory
Ok(segment) => drop(segment),
Err(err) => {
@@ -437,14 +432,14 @@ impl TelemetryHistory {
})
}
async fn get_disk_segment(
fn get_disk_segment(
&self,
service: &TelemetryHistoryService,
start: DateTime<Utc>,
) -> anyhow::Result<HistorySegmentDisk> {
) -> JoinHandle<anyhow::Result<HistorySegmentFile>> {
let mut path = service.data_root_folder.clone();
path.push(&self.data.definition.uuid);
HistorySegmentDisk::open(path, start).await
spawn_blocking(move || HistorySegmentFile::open(path, start))
}
async fn create_ram_segment(
@@ -453,14 +448,20 @@ impl TelemetryHistory {
service: &TelemetryHistoryService,
telemetry_data_type: TelemetryDataType,
) -> HistorySegmentRam {
let ram = match self.get_disk_segment(service, start).await {
Ok(disk) => disk.load_to_ram(telemetry_data_type).await,
let ram = self
.get_disk_segment(service, start)
.await
.unwrap_or_else(|e| Err(anyhow!("Join Error {e}")))
.map(|disk| spawn_blocking(move || disk.load_to_ram(telemetry_data_type)));
let ram = match ram {
Ok(ram) => ram.await.unwrap_or_else(|e| Err(anyhow!("Join Error {e}"))),
Err(e) => Err(e),
};
match ram {
Ok(ram) => ram,
Err(_) => HistorySegmentRam::new(start, start + service.segment_width),
_ => HistorySegmentRam::new(start, start + service.segment_width),
}
}
@@ -560,14 +561,11 @@ impl TelemetryHistory {
let mut start = start;
while start < end {
// We're going to ignore errors with getting the disk segment
if let Ok(mut disk) = self
if let Ok(Ok(mut disk)) = self
.get_disk_segment(telemetry_history_service, start)
.await
{
match disk
.get(from, to, maximum_resolution, self.data.definition.data_type)
.await
{
match disk.get(from, to, maximum_resolution, self.data.definition.data_type) {
Ok((new_from, new_data)) => {
from = new_from;
result.extend(new_data);
@@ -593,8 +591,13 @@ impl TelemetryHistory {
pub async fn cleanup(&self, service: &TelemetryHistoryService) -> anyhow::Result<()> {
let mut segments = self.segments.write().await;
for segment in segments.drain(..) {
self.cleanup_segment(service, segment).await?;
let segments = segments
.drain(..)
.map(|segment| self.cleanup_segment(service, segment))
.collect::<Vec<_>>();
for segment in segments {
segment.await?;
}
Ok(())