applies formatting and linting

This commit is contained in:
2025-01-01 15:51:19 -05:00
parent 825b85ddad
commit 59431ebfff
13 changed files with 325 additions and 179 deletions

View File

@@ -1,24 +1,29 @@
use crate::core::TelemetryDataType;
use crate::telemetry::data::TelemetryData;
use crate::telemetry::data_item::TelemetryDataItem;
use crate::telemetry::data_value::TelemetryDataValue;
use crate::telemetry::definition::TelemetryDefinition;
use anyhow::{ensure, Context};
use chrono::{DateTime, DurationRound, SecondsFormat, TimeDelta, Utc};
use log::{error, info};
use std::collections::VecDeque;
use std::{fs, path};
use std::cmp::min;
use std::collections::VecDeque;
use std::io::SeekFrom;
use std::path::PathBuf;
use std::sync::{Arc, RwLock};
use anyhow::{ensure, Context};
use std::{fs, path};
use tokio::fs::File;
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
use tokio::task::JoinHandle;
use crate::core::TelemetryDataType;
use crate::telemetry::data::TelemetryData;
use crate::telemetry::definition::TelemetryDefinition;
const FOLDER_DURATION: TimeDelta = TimeDelta::hours(1);
fn update_next_from(time_since_next_from: TimeDelta, maximum_resolution: TimeDelta, t: DateTime<Utc>, next_from: DateTime<Utc>) -> DateTime<Utc> {
fn update_next_from(
time_since_next_from: TimeDelta,
maximum_resolution: TimeDelta,
t: DateTime<Utc>,
next_from: DateTime<Utc>,
) -> DateTime<Utc> {
match (
time_since_next_from.num_nanoseconds(),
maximum_resolution.num_nanoseconds(),
@@ -27,8 +32,7 @@ fn update_next_from(time_since_next_from: TimeDelta, maximum_resolution: TimeDel
(Some(nanos_since_next_from), Some(maximum_resolution_nanos)) => {
let nanos_since_next_from = nanos_since_next_from as u64;
let maximum_resolution_nanos = maximum_resolution_nanos as u64;
let num_steps =
nanos_since_next_from.div_ceil(maximum_resolution_nanos);
let num_steps = nanos_since_next_from.div_ceil(maximum_resolution_nanos);
if num_steps > i32::MAX as u64 {
t + maximum_resolution
} else {
@@ -110,7 +114,12 @@ impl HistorySegmentRam {
}
if t >= next_from {
let time_since_next_from = t - next_from;
next_from = update_next_from(time_since_next_from, maximum_resolution, t, next_from);
next_from = update_next_from(
time_since_next_from,
maximum_resolution,
t,
next_from,
);
result.push(TelemetryDataItem {
value: data.values[i].clone(),
timestamp: t.to_rfc3339_opts(SecondsFormat::Millis, true),
@@ -128,14 +137,17 @@ struct HistorySegmentDisk {
start: DateTime<Utc>,
end: DateTime<Utc>,
length: u64,
file: File
file: File,
}
impl HistorySegmentDisk {
const TIMESTAMP_LENGTH: u64 = 8 + 4;
const HEADER_LENGTH: u64 = Self::TIMESTAMP_LENGTH + Self::TIMESTAMP_LENGTH + 8;
async fn save_to_disk(mut folder: PathBuf, mut segment: HistorySegmentRam) -> anyhow::Result<Self> {
async fn save_to_disk(
mut folder: PathBuf,
mut segment: HistorySegmentRam,
) -> anyhow::Result<Self> {
// Get the path for the specific timestamp we want to save to disk
let folder_time = segment.start.duration_trunc(FOLDER_DURATION)?;
folder.push(folder_time.to_rfc3339_opts(SecondsFormat::Secs, true));
@@ -144,7 +156,10 @@ impl HistorySegmentDisk {
fs::create_dir_all(&folder)?;
let mut file = folder;
file.push(format!("{}.dat", segment.start.to_rfc3339_opts(SecondsFormat::Secs, true)));
file.push(format!(
"{}.dat",
segment.start.to_rfc3339_opts(SecondsFormat::Secs, true)
));
let file = File::create(file).await?;
@@ -152,25 +167,36 @@ impl HistorySegmentDisk {
start: segment.start,
end: segment.end,
length: 0,
file
file,
};
let utc_offset_start = result.start - DateTime::UNIX_EPOCH;
let utc_offset_end = result.end - DateTime::UNIX_EPOCH;
// Write the segment bounds
result.file.write_i64(utc_offset_start.num_seconds()).await?;
result.file.write_i32(utc_offset_start.subsec_nanos()).await?;
result
.file
.write_i64(utc_offset_start.num_seconds())
.await?;
result
.file
.write_i32(utc_offset_start.subsec_nanos())
.await?;
result.file.write_i64(utc_offset_end.num_seconds()).await?;
result.file.write_i32(utc_offset_end.subsec_nanos()).await?;
let data = segment.data.get_mut().unwrap_or_else(|err| {
error!("HistorySegmentDisk::save_to_disk - data was poisoned: {}", err);
let lock = err.into_inner();
lock
error!(
"HistorySegmentDisk::save_to_disk - data was poisoned: {}",
err
);
err.into_inner()
});
ensure!(data.timestamps.len() == data.values.len(), "Invalid Segment Cannot Be Saved to Disk");
ensure!(
data.timestamps.len() == data.values.len(),
"Invalid Segment Cannot Be Saved to Disk"
);
result.length = data.timestamps.len() as u64;
result.file.write_u64(result.length).await?;
@@ -194,7 +220,10 @@ impl HistorySegmentDisk {
Ok(result)
}
async fn load_to_ram(mut self, telemetry_data_type: TelemetryDataType) -> anyhow::Result<HistorySegmentRam> {
async fn load_to_ram(
mut self,
telemetry_data_type: TelemetryDataType,
) -> anyhow::Result<HistorySegmentRam> {
let mut segment_data = SegmentData {
values: Vec::with_capacity(self.length as usize),
timestamps: Vec::with_capacity(self.length as usize),
@@ -205,7 +234,9 @@ impl HistorySegmentDisk {
segment_data.timestamps.push(self.read_date_time().await?);
}
for _ in 0..self.length {
segment_data.values.push(self.read_telemetry_item(telemetry_data_type).await?);
segment_data
.values
.push(self.read_telemetry_item(telemetry_data_type).await?);
}
Ok(HistorySegmentRam {
@@ -220,7 +251,10 @@ impl HistorySegmentDisk {
let folder_time = start.duration_trunc(FOLDER_DURATION)?;
let mut file = folder;
file.push(folder_time.to_rfc3339_opts(SecondsFormat::Secs, true));
file.push(format!("{}.dat", start.to_rfc3339_opts(SecondsFormat::Secs, true)));
file.push(format!(
"{}.dat",
start.to_rfc3339_opts(SecondsFormat::Secs, true)
));
let mut file = File::open(file).await?;
@@ -229,8 +263,10 @@ impl HistorySegmentDisk {
let start_nanos = file.read_i32().await?;
let end_seconds = file.read_i64().await?;
let end_nanos = file.read_i32().await?;
let start = TimeDelta::new(start_seconds, start_nanos as u32).context("Failed to reconstruct start TimeDelta")?;
let end = TimeDelta::new(end_seconds, end_nanos as u32).context("Failed to reconstruct end TimeDelta")?;
let start = TimeDelta::new(start_seconds, start_nanos as u32)
.context("Failed to reconstruct start TimeDelta")?;
let end = TimeDelta::new(end_seconds, end_nanos as u32)
.context("Failed to reconstruct end TimeDelta")?;
let length = file.read_u64().await?;
@@ -263,7 +299,12 @@ impl HistorySegmentDisk {
}
if t >= next_from {
let time_since_next_from = t - next_from;
next_from = update_next_from(time_since_next_from, maximum_resolution, t, next_from);
next_from = update_next_from(
time_since_next_from,
maximum_resolution,
t,
next_from,
);
result.push(TelemetryDataItem {
value: self.get_telemetry_item(i, telemetry_data_type).await?,
timestamp: t.to_rfc3339_opts(SecondsFormat::Millis, true),
@@ -279,28 +320,48 @@ impl HistorySegmentDisk {
async fn read_date_time(&mut self) -> anyhow::Result<DateTime<Utc>> {
let seconds = self.file.read_i64().await?;
let nanos = self.file.read_i32().await?;
let start = TimeDelta::new(seconds, nanos as u32).context("Failed to reconstruct TimeDelta")?;
let start =
TimeDelta::new(seconds, nanos as u32).context("Failed to reconstruct TimeDelta")?;
Ok(DateTime::UNIX_EPOCH + start)
}
async fn get_date_time(&mut self, index: u64) -> anyhow::Result<DateTime<Utc>> {
self.file.seek(SeekFrom::Start(Self::HEADER_LENGTH + index * Self::TIMESTAMP_LENGTH)).await?;
self.file
.seek(SeekFrom::Start(
Self::HEADER_LENGTH + index * Self::TIMESTAMP_LENGTH,
))
.await?;
self.read_date_time().await
}
async fn read_telemetry_item(&mut self, telemetry_data_type: TelemetryDataType) -> anyhow::Result<TelemetryDataValue> {
async fn read_telemetry_item(
&mut self,
telemetry_data_type: TelemetryDataType,
) -> anyhow::Result<TelemetryDataValue> {
match telemetry_data_type {
TelemetryDataType::Float32 => Ok(TelemetryDataValue::Float32(self.file.read_f32().await?)),
TelemetryDataType::Float64 => Ok(TelemetryDataValue::Float64(self.file.read_f64().await?)),
TelemetryDataType::Float32 => {
Ok(TelemetryDataValue::Float32(self.file.read_f32().await?))
}
TelemetryDataType::Float64 => {
Ok(TelemetryDataValue::Float64(self.file.read_f64().await?))
}
}
}
async fn get_telemetry_item(&mut self, index: u64, telemetry_data_type: TelemetryDataType) -> anyhow::Result<TelemetryDataValue> {
async fn get_telemetry_item(
&mut self,
index: u64,
telemetry_data_type: TelemetryDataType,
) -> anyhow::Result<TelemetryDataValue> {
let item_length = match telemetry_data_type {
TelemetryDataType::Float32 => 4,
TelemetryDataType::Float64 => 8,
};
self.file.seek(SeekFrom::Start(Self::HEADER_LENGTH + self.length * Self::TIMESTAMP_LENGTH + index * item_length)).await?;
self.file
.seek(SeekFrom::Start(
Self::HEADER_LENGTH + self.length * Self::TIMESTAMP_LENGTH + index * item_length,
))
.await?;
self.read_telemetry_item(telemetry_data_type).await
}
@@ -325,7 +386,12 @@ impl HistorySegmentDisk {
size -= half;
}
Ok(left + if self.get_date_time(left).await? < date_time { 1 } else { 0 })
Ok(left
+ if self.get_date_time(left).await? < date_time {
1
} else {
0
})
}
}
@@ -334,7 +400,6 @@ pub struct TelemetryHistory {
segments: tokio::sync::RwLock<VecDeque<HistorySegmentRam>>,
}
impl From<TelemetryData> for TelemetryHistory {
fn from(value: TelemetryData) -> Self {
Self {
@@ -351,7 +416,11 @@ impl From<TelemetryDefinition> for TelemetryHistory {
}
impl TelemetryHistory {
fn cleanup_segment(&self, service: &TelemetryHistoryService, history_segment_ram: HistorySegmentRam) -> JoinHandle<()> {
fn cleanup_segment(
&self,
service: &TelemetryHistoryService,
history_segment_ram: HistorySegmentRam,
) -> JoinHandle<()> {
let mut path = service.data_root_folder.clone();
path.push(&self.data.definition.uuid);
tokio::spawn(async move {
@@ -359,19 +428,31 @@ impl TelemetryHistory {
// Immediately drop the segment - now that we've saved it to disk we don't need to keep it in memory
Ok(segment) => drop(segment),
Err(err) => {
error!("An error occurred saving telemetry history to disk: {}", err);
error!(
"An error occurred saving telemetry history to disk: {}",
err
);
}
}
})
}
async fn get_disk_segment(&self, service: &TelemetryHistoryService, start: DateTime<Utc>) -> anyhow::Result<HistorySegmentDisk> {
async fn get_disk_segment(
&self,
service: &TelemetryHistoryService,
start: DateTime<Utc>,
) -> anyhow::Result<HistorySegmentDisk> {
let mut path = service.data_root_folder.clone();
path.push(&self.data.definition.uuid);
HistorySegmentDisk::open(path, start).await
}
async fn create_ram_segment(&self, start: DateTime<Utc>, service: &TelemetryHistoryService, telemetry_data_type: TelemetryDataType) -> HistorySegmentRam {
async fn create_ram_segment(
&self,
start: DateTime<Utc>,
service: &TelemetryHistoryService,
telemetry_data_type: TelemetryDataType,
) -> HistorySegmentRam {
let ram = match self.get_disk_segment(service, start).await {
Ok(disk) => disk.load_to_ram(telemetry_data_type).await,
Err(e) => Err(e),
@@ -379,10 +460,7 @@ impl TelemetryHistory {
match ram {
Ok(ram) => ram,
Err(_) => HistorySegmentRam::new(
start,
start + service.segment_width,
),
Err(_) => HistorySegmentRam::new(start, start + service.segment_width),
}
}
@@ -401,16 +479,27 @@ impl TelemetryHistory {
if segments.len() == 0 {
let start_time = timestamp.duration_trunc(service.segment_width).unwrap();
segments.push_back(self.create_ram_segment(start_time, service, self.data.definition.data_type).await);
segments.push_back(
self.create_ram_segment(start_time, service, self.data.definition.data_type)
.await,
);
} else {
while segments[segments.len() - 1].end < timestamp {
if segments.len() == service.max_segments {
if let Some(segment) = segments.pop_front() {
let _ = self.cleanup_segment(service, segment);
// We don't care about this future
drop(self.cleanup_segment(service, segment));
}
}
let start_time = segments[segments.len() - 1].end;
segments.push_back(self.create_ram_segment(start_time, service, self.data.definition.data_type).await);
segments.push_back(
self.create_ram_segment(
start_time,
service,
self.data.definition.data_type,
)
.await,
);
}
}
@@ -442,7 +531,7 @@ impl TelemetryHistory {
from: DateTime<Utc>,
to: DateTime<Utc>,
maximum_resolution: TimeDelta,
telemetry_history_service: &TelemetryHistoryService
telemetry_history_service: &TelemetryHistoryService,
) -> Vec<TelemetryDataItem> {
let mut result = vec![];
@@ -452,29 +541,41 @@ impl TelemetryHistory {
{
let first_ram_segment = segments.front().map(|x| x.start);
let start = from.duration_trunc(telemetry_history_service.segment_width).unwrap();
let end = (to + telemetry_history_service.segment_width).duration_trunc(telemetry_history_service.segment_width).unwrap();
let start = from
.duration_trunc(telemetry_history_service.segment_width)
.unwrap();
let end = (to + telemetry_history_service.segment_width)
.duration_trunc(telemetry_history_service.segment_width)
.unwrap();
let end = if let Some(first_ram_segment) = first_ram_segment { min(end, first_ram_segment) } else { end };
let end = if let Some(first_ram_segment) = first_ram_segment {
min(end, first_ram_segment)
} else {
end
};
let mut path = telemetry_history_service.data_root_folder.clone();
path.push(&self.data.definition.uuid);
let mut start = start;
while start < end {
match self.get_disk_segment(telemetry_history_service, start).await {
Ok(mut disk) => {
match disk.get(from, to, maximum_resolution, self.data.definition.data_type).await {
Ok((new_from, new_data)) => {
from = new_from;
result.extend(new_data);
},
Err(err) => {
error!("Failed to get from disk segment: {err}");
}
// We're going to ignore errors with getting the disk segment
if let Ok(mut disk) = self
.get_disk_segment(telemetry_history_service, start)
.await
{
match disk
.get(from, to, maximum_resolution, self.data.definition.data_type)
.await
{
Ok((new_from, new_data)) => {
from = new_from;
result.extend(new_data);
}
},
Err(_) => {}, // Ignore errors
Err(err) => {
error!("Failed to get from disk segment: {err}");
}
}
}
start += telemetry_history_service.segment_width;
}
@@ -516,7 +617,10 @@ impl TelemetryHistoryService {
fs::create_dir_all(&result.data_root_folder)?;
info!("Recording Telemetry Data to {}", result.data_root_folder.to_string_lossy());
info!(
"Recording Telemetry Data to {}",
result.data_root_folder.to_string_lossy()
);
Ok(result)
}