adds saving and loading history to and from disk
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@@ -1,3 +1,4 @@
|
|||||||
|
|
||||||
/target
|
/target
|
||||||
.idea/
|
.idea/
|
||||||
|
telemetry/
|
||||||
|
|||||||
10
Cargo.lock
generated
10
Cargo.lock
generated
@@ -1512,7 +1512,6 @@ dependencies = [
|
|||||||
"tokio-util",
|
"tokio-util",
|
||||||
"tonic",
|
"tonic",
|
||||||
"tonic-build",
|
"tonic-build",
|
||||||
"uuid",
|
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@@ -1899,15 +1898,6 @@ dependencies = [
|
|||||||
"percent-encoding",
|
"percent-encoding",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "uuid"
|
|
||||||
version = "1.11.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "f8c5f0a0af699448548ad1a2fbf920fb4bee257eae39953ba95cb84891a0446a"
|
|
||||||
dependencies = [
|
|
||||||
"getrandom",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "version_check"
|
name = "version_check"
|
||||||
version = "0.9.5"
|
version = "0.9.5"
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
[*.{js,jsx,mjs,cjs,ts,tsx,mts,cts,vue}]
|
[*.{js,jsx,mjs,cjs,ts,tsx,mts,cts,vue}]
|
||||||
charset = utf-8
|
charset = utf-8
|
||||||
indent_size = 2
|
indent_size = 4
|
||||||
indent_style = space
|
indent_style = space
|
||||||
insert_final_newline = true
|
insert_final_newline = true
|
||||||
trim_trailing_whitespace = true
|
trim_trailing_whitespace = true
|
||||||
|
|||||||
@@ -30,14 +30,16 @@ const props = defineProps<{
|
|||||||
}>();
|
}>();
|
||||||
|
|
||||||
const smoothing_distance_x = 5;
|
const smoothing_distance_x = 5;
|
||||||
|
const maximum_minimum_separation_live = 100; // ms
|
||||||
|
|
||||||
const text_offset = computed(() => 10);
|
const text_offset = computed(() => 10);
|
||||||
|
const min_sep = computed(() => Math.min(props.minimum_separation || 0, maximum_minimum_separation_live));
|
||||||
|
|
||||||
const { data } = useTelemetry(() => props.data);
|
const { data } = useTelemetry(() => props.data);
|
||||||
const websocket = inject<ShallowRef<WebsocketHandle>>(WEBSOCKET_SYMBOL)!;
|
const websocket = inject<ShallowRef<WebsocketHandle>>(WEBSOCKET_SYMBOL)!;
|
||||||
const value = websocket.value.listen_to_telemetry(
|
const value = websocket.value.listen_to_telemetry(
|
||||||
data,
|
data,
|
||||||
props.minimum_separation,
|
min_sep,
|
||||||
);
|
);
|
||||||
|
|
||||||
const graph_data = inject<GraphData>(GRAPH_DATA)!;
|
const graph_data = inject<GraphData>(GRAPH_DATA)!;
|
||||||
@@ -84,7 +86,7 @@ watch([value], ([val]) => {
|
|||||||
x: val_t,
|
x: val_t,
|
||||||
y: item_val,
|
y: item_val,
|
||||||
} as Point;
|
} as Point;
|
||||||
memo.value.insert(new_item);
|
memo.value.insert(new_item, props.minimum_separation);
|
||||||
if (item_val < min.value) {
|
if (item_val < min.value) {
|
||||||
min.value = item_val;
|
min.value = item_val;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -34,8 +34,36 @@ export class PointLine {
|
|||||||
return left + (this.data[left].x < x ? 1 : 0);
|
return left + (this.data[left].x < x ? 1 : 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
insert(point: Point) {
|
insert(point: Point, maximum_separation?: number) {
|
||||||
const index = this.find_index(point.x);
|
const index = this.find_index(point.x);
|
||||||
this.data.splice(index, 0, point);
|
this.data.splice(index, 0, point);
|
||||||
|
if (maximum_separation !== undefined) {
|
||||||
|
this.reduce_to_maximum_separation(maximum_separation, [index - 1, index + 1]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
reduce_to_maximum_separation(maximum_separation: number, range?: [number, number]) {
|
||||||
|
if (maximum_separation <= 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// Add a default range if it does not exist
|
||||||
|
range = range || [1, this.data.length - 2];
|
||||||
|
// clamp it to the bounds
|
||||||
|
range = [Math.max(1, range[0]), Math.min(this.data.length - 2, range[1])];
|
||||||
|
|
||||||
|
// Loop over the indices in the range (backwards so removals don't break anything)
|
||||||
|
for (let i = range[1]; i >= range[0]; i--) {
|
||||||
|
const x_previous = this.data[i - 1].x;
|
||||||
|
const x_current = this.data[i].x;
|
||||||
|
const x_next = this.data[i + 1].x;
|
||||||
|
|
||||||
|
const separation_before = x_current - x_previous;
|
||||||
|
const separation_after = x_next - x_current;
|
||||||
|
|
||||||
|
// If the data points are too close on both sides then delete this point
|
||||||
|
if (separation_before < maximum_separation && separation_after < maximum_separation) {
|
||||||
|
this.data.splice(i, 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -28,40 +28,40 @@ provide(WEBSOCKET_SYMBOL, websocket);
|
|||||||
:height="400"
|
:height="400"
|
||||||
:border_top_bottom="24"
|
:border_top_bottom="24"
|
||||||
:border_left_right="128"
|
:border_left_right="128"
|
||||||
:duration="60 * 1000"
|
:duration="60 * 1000 * 10"
|
||||||
>
|
>
|
||||||
<Axis>
|
<Axis>
|
||||||
<Line
|
<Line
|
||||||
data="simple_producer/sin"
|
data="simple_producer/sin"
|
||||||
:minimum_separation="100"
|
:minimum_separation="2000"
|
||||||
></Line>
|
></Line>
|
||||||
<Line
|
<Line
|
||||||
data="simple_producer/cos4"
|
data="simple_producer/cos4"
|
||||||
:minimum_separation="100"
|
:minimum_separation="2000"
|
||||||
></Line>
|
></Line>
|
||||||
<Line
|
<Line
|
||||||
data="simple_producer/sin2"
|
data="simple_producer/sin2"
|
||||||
:minimum_separation="100"
|
:minimum_separation="2000"
|
||||||
></Line>
|
></Line>
|
||||||
<Line
|
<Line
|
||||||
data="simple_producer/cos"
|
data="simple_producer/cos"
|
||||||
:minimum_separation="100"
|
:minimum_separation="2000"
|
||||||
></Line>
|
></Line>
|
||||||
<Line
|
<Line
|
||||||
data="simple_producer/sin3"
|
data="simple_producer/sin3"
|
||||||
:minimum_separation="100"
|
:minimum_separation="2000"
|
||||||
></Line>
|
></Line>
|
||||||
<Line
|
<Line
|
||||||
data="simple_producer/cos2"
|
data="simple_producer/cos2"
|
||||||
:minimum_separation="100"
|
:minimum_separation="2000"
|
||||||
></Line>
|
></Line>
|
||||||
<Line
|
<Line
|
||||||
data="simple_producer/sin4"
|
data="simple_producer/sin4"
|
||||||
:minimum_separation="100"
|
:minimum_separation="2000"
|
||||||
></Line>
|
></Line>
|
||||||
<Line
|
<Line
|
||||||
data="simple_producer/cos3"
|
data="simple_producer/cos3"
|
||||||
:minimum_separation="100"
|
:minimum_separation="2000"
|
||||||
></Line>
|
></Line>
|
||||||
</Axis>
|
</Axis>
|
||||||
</Graph>
|
</Graph>
|
||||||
|
|||||||
@@ -11,7 +11,7 @@ log = "0.4.22"
|
|||||||
prost = "0.13.3"
|
prost = "0.13.3"
|
||||||
rand = "0.8.5"
|
rand = "0.8.5"
|
||||||
tonic = { version = "0.12.3" }
|
tonic = { version = "0.12.3" }
|
||||||
tokio = { version = "1.40.0", features = ["rt-multi-thread", "signal"] }
|
tokio = { version = "1.40.0", features = ["rt-multi-thread", "signal", "fs"] }
|
||||||
chrono = "0.4.38"
|
chrono = "0.4.38"
|
||||||
actix-web = "4.9.0"
|
actix-web = "4.9.0"
|
||||||
actix-ws = "0.3.0"
|
actix-ws = "0.3.0"
|
||||||
@@ -23,7 +23,6 @@ papaya = "0.1.7"
|
|||||||
thiserror = "2.0.9"
|
thiserror = "2.0.9"
|
||||||
derive_more = { version = "1.0.0", features = ["from"] }
|
derive_more = { version = "1.0.0", features = ["from"] }
|
||||||
anyhow = "1.0.95"
|
anyhow = "1.0.95"
|
||||||
uuid = { version = "1.11.0", features = ["v4"] }
|
|
||||||
|
|
||||||
[build-dependencies]
|
[build-dependencies]
|
||||||
tonic-build = "0.12.3"
|
tonic-build = "0.12.3"
|
||||||
|
|||||||
@@ -8,8 +8,7 @@ use crate::telemetry::data_item::TelemetryDataItem;
|
|||||||
use crate::telemetry::data_value::TelemetryDataValue;
|
use crate::telemetry::data_value::TelemetryDataValue;
|
||||||
use crate::telemetry::management_service::TelemetryManagementService;
|
use crate::telemetry::management_service::TelemetryManagementService;
|
||||||
use chrono::{DateTime, SecondsFormat};
|
use chrono::{DateTime, SecondsFormat};
|
||||||
use log::{error, trace};
|
use log::{error, info, trace};
|
||||||
use std::error::Error;
|
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::select;
|
use tokio::select;
|
||||||
@@ -20,6 +19,7 @@ use tonic::codegen::tokio_stream::wrappers::ReceiverStream;
|
|||||||
use tonic::codegen::tokio_stream::{Stream, StreamExt};
|
use tonic::codegen::tokio_stream::{Stream, StreamExt};
|
||||||
use tonic::transport::Server;
|
use tonic::transport::Server;
|
||||||
use tonic::{Request, Response, Status, Streaming};
|
use tonic::{Request, Response, Status, Streaming};
|
||||||
|
use crate::telemetry::history::TelemetryHistory;
|
||||||
|
|
||||||
pub struct CoreTelemetryService {
|
pub struct CoreTelemetryService {
|
||||||
pub tlm_management: Arc<TelemetryManagementService>,
|
pub tlm_management: Arc<TelemetryManagementService>,
|
||||||
@@ -128,9 +128,7 @@ impl CoreTelemetryService {
|
|||||||
value: value.clone(),
|
value: value.clone(),
|
||||||
timestamp: timestamp.to_rfc3339_opts(SecondsFormat::Millis, true),
|
timestamp: timestamp.to_rfc3339_opts(SecondsFormat::Millis, true),
|
||||||
}));
|
}));
|
||||||
tlm_data
|
TelemetryHistory::insert_sync(tlm_data.clone(), tlm_management.history_service(), value, timestamp);
|
||||||
.history
|
|
||||||
.insert(tlm_management.history_service(), value, timestamp);
|
|
||||||
|
|
||||||
Ok(TelemetryInsertResponse {})
|
Ok(TelemetryInsertResponse {})
|
||||||
}
|
}
|
||||||
@@ -139,7 +137,7 @@ impl CoreTelemetryService {
|
|||||||
pub fn setup(
|
pub fn setup(
|
||||||
token: CancellationToken,
|
token: CancellationToken,
|
||||||
telemetry_management_service: Arc<TelemetryManagementService>,
|
telemetry_management_service: Arc<TelemetryManagementService>,
|
||||||
) -> Result<JoinHandle<()>, Box<dyn Error>> {
|
) -> anyhow::Result<JoinHandle<()>> {
|
||||||
let addr = "[::1]:50051".parse()?;
|
let addr = "[::1]:50051".parse()?;
|
||||||
Ok(tokio::spawn(async move {
|
Ok(tokio::spawn(async move {
|
||||||
let tlm_service = CoreTelemetryService {
|
let tlm_service = CoreTelemetryService {
|
||||||
@@ -147,7 +145,7 @@ pub fn setup(
|
|||||||
cancellation_token: token.clone(),
|
cancellation_token: token.clone(),
|
||||||
};
|
};
|
||||||
|
|
||||||
trace!("Starting gRPC Server");
|
info!("Starting gRPC Server");
|
||||||
let result = Server::builder()
|
let result = Server::builder()
|
||||||
.add_service(TelemetryServiceServer::new(tlm_service))
|
.add_service(TelemetryServiceServer::new(tlm_service))
|
||||||
.serve_with_shutdown(addr, token.cancelled_owned())
|
.serve_with_shutdown(addr, token.cancelled_owned())
|
||||||
|
|||||||
@@ -54,10 +54,11 @@ async fn get_tlm_history(
|
|||||||
};
|
};
|
||||||
let maximum_resolution = TimeDelta::milliseconds(info.resolution);
|
let maximum_resolution = TimeDelta::milliseconds(info.resolution);
|
||||||
|
|
||||||
|
let history_service = data.history_service();
|
||||||
let data = data.pin();
|
let data = data.pin();
|
||||||
match data.get_by_uuid(&uuid) {
|
match data.get_by_uuid(&uuid) {
|
||||||
None => Err(HttpServerResultError::TlmUuidNotFound { uuid }),
|
None => Err(HttpServerResultError::TlmUuidNotFound { uuid }),
|
||||||
Some(tlm) => Ok(web::Json(tlm.history.get(from, to, maximum_resolution))),
|
Some(tlm) => Ok(web::Json(tlm.get(from, to, maximum_resolution, &history_service).await)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -6,19 +6,18 @@ use crate::http::api::setup_api;
|
|||||||
use crate::http::websocket::setup_websocket;
|
use crate::http::websocket::setup_websocket;
|
||||||
use crate::telemetry::management_service::TelemetryManagementService;
|
use crate::telemetry::management_service::TelemetryManagementService;
|
||||||
use actix_web::{web, App, HttpServer};
|
use actix_web::{web, App, HttpServer};
|
||||||
use log::trace;
|
use log::info;
|
||||||
use std::error::Error;
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
|
|
||||||
pub async fn setup(
|
pub async fn setup(
|
||||||
cancellation_token: CancellationToken,
|
cancellation_token: CancellationToken,
|
||||||
telemetry_definitions: Arc<TelemetryManagementService>,
|
telemetry_definitions: Arc<TelemetryManagementService>,
|
||||||
) -> Result<(), Box<dyn Error>> {
|
) -> anyhow::Result<()> {
|
||||||
let data = web::Data::new(telemetry_definitions);
|
let data = web::Data::new(telemetry_definitions);
|
||||||
let cancel_token = web::Data::new(cancellation_token);
|
let cancel_token = web::Data::new(cancellation_token);
|
||||||
|
|
||||||
trace!("Starting HTTP Server");
|
info!("Starting HTTP Server");
|
||||||
HttpServer::new(move || {
|
HttpServer::new(move || {
|
||||||
App::new()
|
App::new()
|
||||||
.app_data(data.clone())
|
.app_data(data.clone())
|
||||||
|
|||||||
@@ -9,11 +9,11 @@ pub mod core {
|
|||||||
|
|
||||||
use crate::telemetry::history::TelemetryHistoryService;
|
use crate::telemetry::history::TelemetryHistoryService;
|
||||||
use crate::telemetry::management_service::TelemetryManagementService;
|
use crate::telemetry::management_service::TelemetryManagementService;
|
||||||
use std::error::Error;
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use log::error;
|
||||||
use tokio_util::sync::CancellationToken;
|
use tokio_util::sync::CancellationToken;
|
||||||
|
|
||||||
pub async fn setup() -> Result<(), Box<dyn Error>> {
|
pub async fn setup() -> anyhow::Result<()> {
|
||||||
let cancellation_token = CancellationToken::new();
|
let cancellation_token = CancellationToken::new();
|
||||||
{
|
{
|
||||||
let cancellation_token = cancellation_token.clone();
|
let cancellation_token = cancellation_token.clone();
|
||||||
@@ -24,15 +24,23 @@ pub async fn setup() -> Result<(), Box<dyn Error>> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let tlm = Arc::new(TelemetryManagementService::new(
|
let tlm = Arc::new(TelemetryManagementService::new(
|
||||||
TelemetryHistoryService::new(),
|
TelemetryHistoryService::new()?,
|
||||||
));
|
)?);
|
||||||
|
|
||||||
let grpc_server = grpc::setup(cancellation_token.clone(), tlm.clone())?;
|
let grpc_server = grpc::setup(cancellation_token.clone(), tlm.clone())?;
|
||||||
|
|
||||||
let result = http::setup(cancellation_token.clone(), tlm).await;
|
let result = http::setup(cancellation_token.clone(), tlm.clone()).await;
|
||||||
cancellation_token.cancel();
|
cancellation_token.cancel();
|
||||||
result?;
|
result?; // result is dropped
|
||||||
grpc_server.await?;
|
grpc_server.await?; //grpc server is dropped
|
||||||
|
drop(cancellation_token); // All cancellation tokens are now dropped
|
||||||
|
|
||||||
|
// Perform cleanup functions - at this point all servers have stopped and we can be sure that cleaning things up is safe
|
||||||
|
if let Some(tlm) = Arc::into_inner(tlm) {
|
||||||
|
tlm.cleanup().await?;
|
||||||
|
} else {
|
||||||
|
error!("Could not clean up Telemetry Management Service. Arc not released.")
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,7 +2,7 @@ use std::env;
|
|||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
async fn main() -> anyhow::Result<()> {
|
||||||
let log_file = env::var("LOG_FILE");
|
let log_file = env::var("LOG_FILE");
|
||||||
let log_level = match env::var("LOG_LEVEL") {
|
let log_level = match env::var("LOG_LEVEL") {
|
||||||
Ok(log_level) => log::LevelFilter::from_str(&log_level).unwrap_or(log::LevelFilter::Info),
|
Ok(log_level) => log::LevelFilter::from_str(&log_level).unwrap_or(log::LevelFilter::Info),
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
use crate::telemetry::data_item::TelemetryDataItem;
|
use crate::telemetry::data_item::TelemetryDataItem;
|
||||||
use crate::telemetry::definition::TelemetryDefinition;
|
use crate::telemetry::definition::TelemetryDefinition;
|
||||||
use crate::telemetry::history::TelemetryHistory;
|
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct TelemetryData {
|
pub struct TelemetryData {
|
||||||
@@ -8,7 +7,11 @@ pub struct TelemetryData {
|
|||||||
pub data: tokio::sync::watch::Sender<Option<TelemetryDataItem>>,
|
pub data: tokio::sync::watch::Sender<Option<TelemetryDataItem>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct TelemetryDataHistory {
|
impl From<TelemetryDefinition> for TelemetryData {
|
||||||
pub data: TelemetryData,
|
fn from(value: TelemetryDefinition) -> Self {
|
||||||
pub history: TelemetryHistory,
|
Self {
|
||||||
|
definition: value,
|
||||||
|
data: tokio::sync::watch::channel(None).0
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,22 +1,56 @@
|
|||||||
use crate::telemetry::data_item::TelemetryDataItem;
|
use crate::telemetry::data_item::TelemetryDataItem;
|
||||||
use crate::telemetry::data_value::TelemetryDataValue;
|
use crate::telemetry::data_value::TelemetryDataValue;
|
||||||
use chrono::{DateTime, SecondsFormat, TimeDelta, Utc};
|
use chrono::{DateTime, DurationRound, SecondsFormat, TimeDelta, Utc};
|
||||||
use log::error;
|
use log::{error, info};
|
||||||
use std::collections::VecDeque;
|
use std::collections::VecDeque;
|
||||||
use std::sync::RwLock;
|
use std::{fs, path};
|
||||||
|
use std::cmp::min;
|
||||||
|
use std::io::SeekFrom;
|
||||||
|
use std::path::PathBuf;
|
||||||
|
use std::sync::{Arc, RwLock};
|
||||||
|
use anyhow::{ensure, Context};
|
||||||
|
use tokio::fs::File;
|
||||||
|
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
|
||||||
|
use tokio::task::JoinHandle;
|
||||||
|
use crate::core::TelemetryDataType;
|
||||||
|
use crate::telemetry::data::TelemetryData;
|
||||||
|
use crate::telemetry::definition::TelemetryDefinition;
|
||||||
|
|
||||||
|
const FOLDER_DURATION: TimeDelta = TimeDelta::hours(1);
|
||||||
|
|
||||||
|
fn update_next_from(time_since_next_from: TimeDelta, maximum_resolution: TimeDelta, t: DateTime<Utc>, next_from: DateTime<Utc>) -> DateTime<Utc> {
|
||||||
|
match (
|
||||||
|
time_since_next_from.num_nanoseconds(),
|
||||||
|
maximum_resolution.num_nanoseconds(),
|
||||||
|
) {
|
||||||
|
(_, Some(0)) => t,
|
||||||
|
(Some(nanos_since_next_from), Some(maximum_resolution_nanos)) => {
|
||||||
|
let nanos_since_next_from = nanos_since_next_from as u64;
|
||||||
|
let maximum_resolution_nanos = maximum_resolution_nanos as u64;
|
||||||
|
let num_steps =
|
||||||
|
nanos_since_next_from.div_ceil(maximum_resolution_nanos);
|
||||||
|
if num_steps > i32::MAX as u64 {
|
||||||
|
t + maximum_resolution
|
||||||
|
} else {
|
||||||
|
next_from + maximum_resolution * num_steps as i32
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => t + maximum_resolution, // If there is a gap so big it can't be represented in 2^63 nanoseconds (over 200 years) just skip forward
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
struct SegmentData {
|
struct SegmentData {
|
||||||
values: Vec<TelemetryDataValue>,
|
values: Vec<TelemetryDataValue>,
|
||||||
timestamps: Vec<DateTime<Utc>>,
|
timestamps: Vec<DateTime<Utc>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
struct HistorySegment {
|
struct HistorySegmentRam {
|
||||||
start: DateTime<Utc>,
|
start: DateTime<Utc>,
|
||||||
end: DateTime<Utc>,
|
end: DateTime<Utc>,
|
||||||
data: RwLock<SegmentData>,
|
data: RwLock<SegmentData>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl HistorySegment {
|
impl HistorySegmentRam {
|
||||||
fn new(start: DateTime<Utc>, end: DateTime<Utc>) -> Self {
|
fn new(start: DateTime<Utc>, end: DateTime<Utc>) -> Self {
|
||||||
Self {
|
Self {
|
||||||
start,
|
start,
|
||||||
@@ -34,7 +68,7 @@ impl HistorySegment {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let mut data = self.data.write().unwrap_or_else(|err| {
|
let mut data = self.data.write().unwrap_or_else(|err| {
|
||||||
error!("HistorySegment::insert - data was poisoned: {}", err);
|
error!("HistorySegmentRam::insert - data was poisoned: {}", err);
|
||||||
let lock = err.into_inner();
|
let lock = err.into_inner();
|
||||||
self.data.clear_poison();
|
self.data.clear_poison();
|
||||||
lock
|
lock
|
||||||
@@ -60,38 +94,23 @@ impl HistorySegment {
|
|||||||
if from < self.end && self.start < to {
|
if from < self.end && self.start < to {
|
||||||
// If there is overlap with the range
|
// If there is overlap with the range
|
||||||
let data = self.data.read().unwrap_or_else(|err| {
|
let data = self.data.read().unwrap_or_else(|err| {
|
||||||
error!("HistorySegment::get - data was poisoned: {}", err);
|
error!("HistorySegmentRam::get - data was poisoned: {}", err);
|
||||||
let lock = err.into_inner();
|
let lock = err.into_inner();
|
||||||
self.data.clear_poison();
|
self.data.clear_poison();
|
||||||
lock
|
lock
|
||||||
});
|
});
|
||||||
|
|
||||||
let start = data.timestamps.partition_point(|x| x < &from);
|
let start = data.timestamps.partition_point(|x| x < &from);
|
||||||
let end = data.timestamps.partition_point(|x| x < &to);
|
|
||||||
|
|
||||||
if start < data.timestamps.len() {
|
if start < data.timestamps.len() {
|
||||||
for i in start..end {
|
for i in start..data.timestamps.len() {
|
||||||
let t = data.timestamps[i];
|
let t = data.timestamps[i];
|
||||||
|
if t >= self.end {
|
||||||
|
break;
|
||||||
|
}
|
||||||
if t >= next_from {
|
if t >= next_from {
|
||||||
let time_since_next_from = t - next_from;
|
let time_since_next_from = t - next_from;
|
||||||
next_from = match (
|
next_from = update_next_from(time_since_next_from, maximum_resolution, t, next_from);
|
||||||
time_since_next_from.num_nanoseconds(),
|
|
||||||
maximum_resolution.num_nanoseconds(),
|
|
||||||
) {
|
|
||||||
(_, Some(0)) => t,
|
|
||||||
(Some(nanos_since_next_from), Some(maximum_resolution_nanos)) => {
|
|
||||||
let nanos_since_next_from = nanos_since_next_from as u64;
|
|
||||||
let maximum_resolution_nanos = maximum_resolution_nanos as u64;
|
|
||||||
let num_steps =
|
|
||||||
nanos_since_next_from.div_ceil(maximum_resolution_nanos);
|
|
||||||
if num_steps > i32::MAX as u64 {
|
|
||||||
t + maximum_resolution
|
|
||||||
} else {
|
|
||||||
next_from + maximum_resolution * num_steps as i32
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_ => t + maximum_resolution, // If there is a gap so big it can't be represented in 2^63 nanoseconds (over 200 years) just skip forward
|
|
||||||
};
|
|
||||||
result.push(TelemetryDataItem {
|
result.push(TelemetryDataItem {
|
||||||
value: data.values[i].clone(),
|
value: data.values[i].clone(),
|
||||||
timestamp: t.to_rfc3339_opts(SecondsFormat::Millis, true),
|
timestamp: t.to_rfc3339_opts(SecondsFormat::Millis, true),
|
||||||
@@ -105,65 +124,298 @@ impl HistorySegment {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct HistorySegmentDisk {
|
||||||
|
start: DateTime<Utc>,
|
||||||
|
end: DateTime<Utc>,
|
||||||
|
length: u64,
|
||||||
|
file: File
|
||||||
|
}
|
||||||
|
|
||||||
|
impl HistorySegmentDisk {
|
||||||
|
const TIMESTAMP_LENGTH: u64 = 8 + 4;
|
||||||
|
const HEADER_LENGTH: u64 = Self::TIMESTAMP_LENGTH + Self::TIMESTAMP_LENGTH + 8;
|
||||||
|
|
||||||
|
async fn save_to_disk(mut folder: PathBuf, mut segment: HistorySegmentRam) -> anyhow::Result<Self> {
|
||||||
|
// Get the path for the specific timestamp we want to save to disk
|
||||||
|
let folder_time = segment.start.duration_trunc(FOLDER_DURATION)?;
|
||||||
|
folder.push(folder_time.to_rfc3339_opts(SecondsFormat::Secs, true));
|
||||||
|
|
||||||
|
// Create the necessary folders
|
||||||
|
fs::create_dir_all(&folder)?;
|
||||||
|
|
||||||
|
let mut file = folder;
|
||||||
|
file.push(format!("{}.dat", segment.start.to_rfc3339_opts(SecondsFormat::Secs, true)));
|
||||||
|
|
||||||
|
let file = File::create(file).await?;
|
||||||
|
|
||||||
|
let mut result = Self {
|
||||||
|
start: segment.start,
|
||||||
|
end: segment.end,
|
||||||
|
length: 0,
|
||||||
|
file
|
||||||
|
};
|
||||||
|
|
||||||
|
let utc_offset_start = result.start - DateTime::UNIX_EPOCH;
|
||||||
|
let utc_offset_end = result.end - DateTime::UNIX_EPOCH;
|
||||||
|
|
||||||
|
// Write the segment bounds
|
||||||
|
result.file.write_i64(utc_offset_start.num_seconds()).await?;
|
||||||
|
result.file.write_i32(utc_offset_start.subsec_nanos()).await?;
|
||||||
|
result.file.write_i64(utc_offset_end.num_seconds()).await?;
|
||||||
|
result.file.write_i32(utc_offset_end.subsec_nanos()).await?;
|
||||||
|
|
||||||
|
let data = segment.data.get_mut().unwrap_or_else(|err| {
|
||||||
|
error!("HistorySegmentDisk::save_to_disk - data was poisoned: {}", err);
|
||||||
|
let lock = err.into_inner();
|
||||||
|
lock
|
||||||
|
});
|
||||||
|
|
||||||
|
ensure!(data.timestamps.len() == data.values.len(), "Invalid Segment Cannot Be Saved to Disk");
|
||||||
|
|
||||||
|
result.length = data.timestamps.len() as u64;
|
||||||
|
result.file.write_u64(result.length).await?;
|
||||||
|
|
||||||
|
// Write all the timestamps
|
||||||
|
for timestamp in &data.timestamps {
|
||||||
|
let utc_offset = *timestamp - DateTime::UNIX_EPOCH;
|
||||||
|
result.file.write_i64(utc_offset.num_seconds()).await?;
|
||||||
|
result.file.write_i32(utc_offset.subsec_nanos()).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write all the values
|
||||||
|
for value in &data.values {
|
||||||
|
match value {
|
||||||
|
TelemetryDataValue::Float32(value) => result.file.write_f32(*value).await?,
|
||||||
|
TelemetryDataValue::Float64(value) => result.file.write_f64(*value).await?,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
result.file.flush().await?;
|
||||||
|
Ok(result)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn load_to_ram(mut self, telemetry_data_type: TelemetryDataType) -> anyhow::Result<HistorySegmentRam> {
|
||||||
|
let mut segment_data = SegmentData {
|
||||||
|
values: Vec::with_capacity(self.length as usize),
|
||||||
|
timestamps: Vec::with_capacity(self.length as usize),
|
||||||
|
};
|
||||||
|
|
||||||
|
self.file.seek(SeekFrom::Start(Self::HEADER_LENGTH)).await?;
|
||||||
|
for _ in 0..self.length {
|
||||||
|
segment_data.timestamps.push(self.read_date_time().await?);
|
||||||
|
}
|
||||||
|
for _ in 0..self.length {
|
||||||
|
segment_data.values.push(self.read_telemetry_item(telemetry_data_type).await?);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(HistorySegmentRam {
|
||||||
|
start: self.start,
|
||||||
|
end: self.end,
|
||||||
|
data: RwLock::new(segment_data),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn open(folder: PathBuf, start: DateTime<Utc>) -> anyhow::Result<Self> {
|
||||||
|
// Get the path for the specific timestamp we want to save to disk
|
||||||
|
let folder_time = start.duration_trunc(FOLDER_DURATION)?;
|
||||||
|
let mut file = folder;
|
||||||
|
file.push(folder_time.to_rfc3339_opts(SecondsFormat::Secs, true));
|
||||||
|
file.push(format!("{}.dat", start.to_rfc3339_opts(SecondsFormat::Secs, true)));
|
||||||
|
|
||||||
|
let mut file = File::open(file).await?;
|
||||||
|
|
||||||
|
// Write the segment bounds
|
||||||
|
let start_seconds = file.read_i64().await?;
|
||||||
|
let start_nanos = file.read_i32().await?;
|
||||||
|
let end_seconds = file.read_i64().await?;
|
||||||
|
let end_nanos = file.read_i32().await?;
|
||||||
|
let start = TimeDelta::new(start_seconds, start_nanos as u32).context("Failed to reconstruct start TimeDelta")?;
|
||||||
|
let end = TimeDelta::new(end_seconds, end_nanos as u32).context("Failed to reconstruct end TimeDelta")?;
|
||||||
|
|
||||||
|
let length = file.read_u64().await?;
|
||||||
|
|
||||||
|
Ok(HistorySegmentDisk {
|
||||||
|
start: DateTime::UNIX_EPOCH + start,
|
||||||
|
end: DateTime::UNIX_EPOCH + end,
|
||||||
|
length,
|
||||||
|
file,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get(
|
||||||
|
&mut self,
|
||||||
|
from: DateTime<Utc>,
|
||||||
|
to: DateTime<Utc>,
|
||||||
|
maximum_resolution: TimeDelta,
|
||||||
|
telemetry_data_type: TelemetryDataType,
|
||||||
|
) -> anyhow::Result<(DateTime<Utc>, Vec<TelemetryDataItem>)> {
|
||||||
|
let mut result = vec![];
|
||||||
|
|
||||||
|
let mut next_from = from;
|
||||||
|
|
||||||
|
if from < self.end && self.start < to {
|
||||||
|
let start = self.partition_point(from).await?;
|
||||||
|
if start < self.length {
|
||||||
|
for i in start..self.length {
|
||||||
|
let t = self.get_date_time(i).await?;
|
||||||
|
if t >= self.end {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if t >= next_from {
|
||||||
|
let time_since_next_from = t - next_from;
|
||||||
|
next_from = update_next_from(time_since_next_from, maximum_resolution, t, next_from);
|
||||||
|
result.push(TelemetryDataItem {
|
||||||
|
value: self.get_telemetry_item(i, telemetry_data_type).await?,
|
||||||
|
timestamp: t.to_rfc3339_opts(SecondsFormat::Millis, true),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok((next_from, result))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn read_date_time(&mut self) -> anyhow::Result<DateTime<Utc>> {
|
||||||
|
let seconds = self.file.read_i64().await?;
|
||||||
|
let nanos = self.file.read_i32().await?;
|
||||||
|
let start = TimeDelta::new(seconds, nanos as u32).context("Failed to reconstruct TimeDelta")?;
|
||||||
|
Ok(DateTime::UNIX_EPOCH + start)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_date_time(&mut self, index: u64) -> anyhow::Result<DateTime<Utc>> {
|
||||||
|
self.file.seek(SeekFrom::Start(Self::HEADER_LENGTH + index * Self::TIMESTAMP_LENGTH)).await?;
|
||||||
|
self.read_date_time().await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn read_telemetry_item(&mut self, telemetry_data_type: TelemetryDataType) -> anyhow::Result<TelemetryDataValue> {
|
||||||
|
match telemetry_data_type {
|
||||||
|
TelemetryDataType::Float32 => Ok(TelemetryDataValue::Float32(self.file.read_f32().await?)),
|
||||||
|
TelemetryDataType::Float64 => Ok(TelemetryDataValue::Float64(self.file.read_f64().await?)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_telemetry_item(&mut self, index: u64, telemetry_data_type: TelemetryDataType) -> anyhow::Result<TelemetryDataValue> {
|
||||||
|
let item_length = match telemetry_data_type {
|
||||||
|
TelemetryDataType::Float32 => 4,
|
||||||
|
TelemetryDataType::Float64 => 8,
|
||||||
|
};
|
||||||
|
self.file.seek(SeekFrom::Start(Self::HEADER_LENGTH + self.length * Self::TIMESTAMP_LENGTH + index * item_length)).await?;
|
||||||
|
self.read_telemetry_item(telemetry_data_type).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn partition_point(&mut self, date_time: DateTime<Utc>) -> anyhow::Result<u64> {
|
||||||
|
if self.length == 0 {
|
||||||
|
return Ok(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
// left should be too early to insert
|
||||||
|
// right should be too late to insert
|
||||||
|
let mut left = 0;
|
||||||
|
let mut size = self.length;
|
||||||
|
|
||||||
|
while size > 1 {
|
||||||
|
let half = size / 2;
|
||||||
|
let mid = left + half;
|
||||||
|
|
||||||
|
let is_less = self.get_date_time(mid).await? < date_time;
|
||||||
|
if is_less {
|
||||||
|
left = mid;
|
||||||
|
}
|
||||||
|
size -= half;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(left + if self.get_date_time(left).await? < date_time { 1 } else { 0 })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub struct TelemetryHistory {
|
pub struct TelemetryHistory {
|
||||||
segments: RwLock<VecDeque<HistorySegment>>,
|
pub data: TelemetryData,
|
||||||
|
segments: tokio::sync::RwLock<VecDeque<HistorySegmentRam>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
impl From<TelemetryData> for TelemetryHistory {
|
||||||
|
fn from(value: TelemetryData) -> Self {
|
||||||
|
Self {
|
||||||
|
data: value,
|
||||||
|
segments: tokio::sync::RwLock::new(VecDeque::new()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<TelemetryDefinition> for TelemetryHistory {
|
||||||
|
fn from(value: TelemetryDefinition) -> Self {
|
||||||
|
<TelemetryDefinition as Into<TelemetryData>>::into(value).into()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TelemetryHistory {
|
impl TelemetryHistory {
|
||||||
pub fn new() -> Self {
|
fn cleanup_segment(&self, service: &TelemetryHistoryService, history_segment_ram: HistorySegmentRam) -> JoinHandle<()> {
|
||||||
Self {
|
let mut path = service.data_root_folder.clone();
|
||||||
segments: RwLock::new(VecDeque::new()),
|
path.push(&self.data.definition.uuid);
|
||||||
|
tokio::spawn(async move {
|
||||||
|
match HistorySegmentDisk::save_to_disk(path, history_segment_ram).await {
|
||||||
|
// Immediately drop the segment - now that we've saved it to disk we don't need to keep it in memory
|
||||||
|
Ok(segment) => drop(segment),
|
||||||
|
Err(err) => {
|
||||||
|
error!("An error occurred saving telemetry history to disk: {}", err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_disk_segment(&self, service: &TelemetryHistoryService, start: DateTime<Utc>) -> anyhow::Result<HistorySegmentDisk> {
|
||||||
|
let mut path = service.data_root_folder.clone();
|
||||||
|
path.push(&self.data.definition.uuid);
|
||||||
|
HistorySegmentDisk::open(path, start).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn create_ram_segment(&self, start: DateTime<Utc>, service: &TelemetryHistoryService, telemetry_data_type: TelemetryDataType) -> HistorySegmentRam {
|
||||||
|
let ram = match self.get_disk_segment(service, start).await {
|
||||||
|
Ok(disk) => disk.load_to_ram(telemetry_data_type).await,
|
||||||
|
Err(e) => Err(e),
|
||||||
|
};
|
||||||
|
|
||||||
|
match ram {
|
||||||
|
Ok(ram) => ram,
|
||||||
|
Err(_) => HistorySegmentRam::new(
|
||||||
|
start,
|
||||||
|
start + service.segment_width,
|
||||||
|
),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn insert(
|
pub async fn insert(
|
||||||
&self,
|
&self,
|
||||||
service: &TelemetryHistoryService,
|
service: &TelemetryHistoryService,
|
||||||
value: TelemetryDataValue,
|
value: TelemetryDataValue,
|
||||||
timestamp: DateTime<Utc>,
|
timestamp: DateTime<Utc>,
|
||||||
) {
|
) {
|
||||||
let segments = self.segments.read().unwrap_or_else(|err| {
|
let segments = self.segments.read().await;
|
||||||
error!("TelemetryHistory::insert - segments was poisoned: {}", err);
|
|
||||||
let lock = err.into_inner();
|
|
||||||
self.segments.clear_poison();
|
|
||||||
lock
|
|
||||||
});
|
|
||||||
|
|
||||||
let segments = if segments.is_empty() || segments[segments.len() - 1].end < timestamp {
|
let segments = if segments.is_empty() || segments[segments.len() - 1].end < timestamp {
|
||||||
// We want to insert something that doesn't fit into our history
|
// We want to insert something that doesn't fit into our history
|
||||||
drop(segments);
|
drop(segments);
|
||||||
let mut segments = self.segments.write().unwrap_or_else(|err| {
|
let mut segments = self.segments.write().await;
|
||||||
error!("TelemetryHistory::insert - segments was poisoned: {}", err);
|
|
||||||
let lock = err.into_inner();
|
|
||||||
self.segments.clear_poison();
|
|
||||||
lock
|
|
||||||
});
|
|
||||||
|
|
||||||
if segments.len() == 0 {
|
if segments.len() == 0 {
|
||||||
segments.push_back(HistorySegment::new(
|
let start_time = timestamp.duration_trunc(service.segment_width).unwrap();
|
||||||
timestamp,
|
segments.push_back(self.create_ram_segment(start_time, service, self.data.definition.data_type).await);
|
||||||
timestamp + service.segment_width,
|
|
||||||
));
|
|
||||||
} else {
|
} else {
|
||||||
while segments[segments.len() - 1].end < timestamp {
|
while segments[segments.len() - 1].end < timestamp {
|
||||||
if segments.len() == service.max_segments {
|
if segments.len() == service.max_segments {
|
||||||
let _ = segments.pop_front();
|
if let Some(segment) = segments.pop_front() {
|
||||||
|
let _ = self.cleanup_segment(service, segment);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
let start_time = segments[segments.len() - 1].end;
|
let start_time = segments[segments.len() - 1].end;
|
||||||
segments.push_back(HistorySegment::new(
|
segments.push_back(self.create_ram_segment(start_time, service, self.data.definition.data_type).await);
|
||||||
start_time,
|
|
||||||
start_time + service.segment_width,
|
|
||||||
));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
drop(segments);
|
drop(segments);
|
||||||
self.segments.read().unwrap_or_else(|err| {
|
self.segments.read().await
|
||||||
error!("TelemetryHistory::insert - segments was poisoned: {}", err);
|
|
||||||
let lock = err.into_inner();
|
|
||||||
self.segments.clear_poison();
|
|
||||||
lock
|
|
||||||
})
|
|
||||||
} else {
|
} else {
|
||||||
segments
|
segments
|
||||||
};
|
};
|
||||||
@@ -174,23 +426,60 @@ impl TelemetryHistory {
|
|||||||
segments[segment_index].insert(value, timestamp);
|
segments[segment_index].insert(value, timestamp);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get(
|
pub fn insert_sync(
|
||||||
|
history: Arc<Self>,
|
||||||
|
service: Arc<TelemetryHistoryService>,
|
||||||
|
value: TelemetryDataValue,
|
||||||
|
timestamp: DateTime<Utc>,
|
||||||
|
) {
|
||||||
|
tokio::spawn(async move {
|
||||||
|
history.insert(&service, value, timestamp).await;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get(
|
||||||
&self,
|
&self,
|
||||||
from: DateTime<Utc>,
|
from: DateTime<Utc>,
|
||||||
to: DateTime<Utc>,
|
to: DateTime<Utc>,
|
||||||
maximum_resolution: TimeDelta,
|
maximum_resolution: TimeDelta,
|
||||||
|
telemetry_history_service: &TelemetryHistoryService
|
||||||
) -> Vec<TelemetryDataItem> {
|
) -> Vec<TelemetryDataItem> {
|
||||||
let mut result = vec![];
|
let mut result = vec![];
|
||||||
|
|
||||||
let segments = self.segments.read().unwrap_or_else(|err| {
|
let segments = self.segments.read().await;
|
||||||
error!("TelemetryHistory::get - segments was poisoned: {}", err);
|
|
||||||
let lock = err.into_inner();
|
|
||||||
self.segments.clear_poison();
|
|
||||||
lock
|
|
||||||
});
|
|
||||||
|
|
||||||
let mut from = from;
|
let mut from = from;
|
||||||
|
|
||||||
|
{
|
||||||
|
let first_ram_segment = segments[0].start;
|
||||||
|
let start = from.duration_trunc(telemetry_history_service.segment_width).unwrap();
|
||||||
|
let end = (to + telemetry_history_service.segment_width).duration_trunc(telemetry_history_service.segment_width).unwrap();
|
||||||
|
|
||||||
|
let end = min(end, first_ram_segment);
|
||||||
|
|
||||||
|
let mut path = telemetry_history_service.data_root_folder.clone();
|
||||||
|
path.push(&self.data.definition.uuid);
|
||||||
|
|
||||||
|
let mut start = start;
|
||||||
|
while start < end {
|
||||||
|
match self.get_disk_segment(telemetry_history_service, start).await {
|
||||||
|
Ok(mut disk) => {
|
||||||
|
match disk.get(from, to, maximum_resolution, self.data.definition.data_type).await {
|
||||||
|
Ok((new_from, new_data)) => {
|
||||||
|
from = new_from;
|
||||||
|
result.extend(new_data);
|
||||||
|
},
|
||||||
|
Err(err) => {
|
||||||
|
error!("Failed to get from disk segment: {err}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(_) => {}, // Ignore errors
|
||||||
|
}
|
||||||
|
start += telemetry_history_service.segment_width;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
for i in 0..segments.len() {
|
for i in 0..segments.len() {
|
||||||
let (new_from, new_data) = segments[i].get(from, to, maximum_resolution);
|
let (new_from, new_data) = segments[i].get(from, to, maximum_resolution);
|
||||||
from = new_from;
|
from = new_from;
|
||||||
@@ -199,18 +488,42 @@ impl TelemetryHistory {
|
|||||||
|
|
||||||
result
|
result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn cleanup(&self, service: &TelemetryHistoryService) -> anyhow::Result<()> {
|
||||||
|
let mut segments = self.segments.write().await;
|
||||||
|
|
||||||
|
for segment in segments.drain(..) {
|
||||||
|
self.cleanup_segment(service, segment).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct TelemetryHistoryService {
|
pub struct TelemetryHistoryService {
|
||||||
segment_width: TimeDelta,
|
segment_width: TimeDelta,
|
||||||
max_segments: usize,
|
max_segments: usize,
|
||||||
|
data_root_folder: PathBuf,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TelemetryHistoryService {
|
impl TelemetryHistoryService {
|
||||||
pub fn new() -> Self {
|
pub fn new() -> anyhow::Result<Self> {
|
||||||
Self {
|
let result = Self {
|
||||||
segment_width: TimeDelta::minutes(1),
|
segment_width: TimeDelta::minutes(1),
|
||||||
max_segments: 5,
|
max_segments: 5,
|
||||||
|
data_root_folder: path::absolute("telemetry")?,
|
||||||
|
};
|
||||||
|
|
||||||
|
fs::create_dir_all(&result.data_root_folder)?;
|
||||||
|
|
||||||
|
info!("Recording Telemetry Data to {}", result.data_root_folder.to_string_lossy());
|
||||||
|
|
||||||
|
Ok(result)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn get_metadata_file(&self) -> PathBuf {
|
||||||
|
let mut result = self.data_root_folder.clone();
|
||||||
|
result.push("metadata.json");
|
||||||
|
result
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,30 +1,84 @@
|
|||||||
|
use std::fs;
|
||||||
|
use std::fs::File;
|
||||||
use crate::core::{TelemetryDefinitionRequest, Uuid};
|
use crate::core::{TelemetryDefinitionRequest, Uuid};
|
||||||
use crate::telemetry::data::{TelemetryData, TelemetryDataHistory};
|
use crate::telemetry::data::TelemetryData;
|
||||||
use crate::telemetry::definition::TelemetryDefinition;
|
use crate::telemetry::definition::TelemetryDefinition;
|
||||||
use crate::telemetry::history::{TelemetryHistory, TelemetryHistoryService};
|
use crate::telemetry::history::{TelemetryHistory, TelemetryHistoryService};
|
||||||
use papaya::{HashMap, HashMapRef, LocalGuard};
|
use papaya::{HashMap, HashMapRef, LocalGuard};
|
||||||
use std::error::Error;
|
|
||||||
use std::hash::RandomState;
|
use std::hash::RandomState;
|
||||||
|
use std::io::{Read, Write};
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::time::Duration;
|
||||||
|
use log::{error, info, warn};
|
||||||
|
use tokio::sync::Mutex;
|
||||||
|
use tokio::time::sleep;
|
||||||
|
|
||||||
|
const RELEASED_ATTEMPTS: usize = 5;
|
||||||
|
|
||||||
pub struct TelemetryManagementService {
|
pub struct TelemetryManagementService {
|
||||||
uuid_index: HashMap<String, String>,
|
uuid_index: HashMap<String, String>,
|
||||||
tlm_data: HashMap<String, TelemetryDataHistory>,
|
tlm_data: HashMap<String, Arc<TelemetryHistory>>,
|
||||||
telemetry_history_service: TelemetryHistoryService,
|
telemetry_history_service: Arc<TelemetryHistoryService>,
|
||||||
|
metadata_file: Arc<Mutex<File>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TelemetryManagementService {
|
impl TelemetryManagementService {
|
||||||
pub fn new(telemetry_history_service: TelemetryHistoryService) -> Self {
|
pub fn new(telemetry_history_service: TelemetryHistoryService) -> anyhow::Result<Self> {
|
||||||
Self {
|
let metadata_file = telemetry_history_service.get_metadata_file();
|
||||||
uuid_index: HashMap::new(),
|
|
||||||
tlm_data: HashMap::new(),
|
let uuid_index = HashMap::new();
|
||||||
telemetry_history_service,
|
let tlm_data = HashMap::new();
|
||||||
|
|
||||||
|
// TODO: Load metadata from file
|
||||||
|
match File::open(&metadata_file) {
|
||||||
|
Ok(mut metadata_file) => {
|
||||||
|
let uuid_index = uuid_index.pin();
|
||||||
|
let tlm_data = tlm_data.pin();
|
||||||
|
|
||||||
|
// Read all data from the file
|
||||||
|
let mut data = String::new();
|
||||||
|
metadata_file.read_to_string(&mut data)?;
|
||||||
|
drop(metadata_file);
|
||||||
|
|
||||||
|
// Each line is a separate entry
|
||||||
|
for line in data.split("\n") {
|
||||||
|
if line.is_empty() {
|
||||||
|
// Skip empty lines
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
|
// Skip invalid entries
|
||||||
|
match serde_json::from_str::<TelemetryDefinition>(line) {
|
||||||
|
Ok(tlm_def) => {
|
||||||
|
let _ = uuid_index.insert(tlm_def.name.clone(), tlm_def.uuid.clone());
|
||||||
|
let _ = tlm_data.insert(tlm_def.uuid.clone(), Arc::new(tlm_def.into()));
|
||||||
|
},
|
||||||
|
Err(err) => {
|
||||||
|
error!("Failed to parse metadata entry {err}");
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
warn!("Failed to open metadata file {err}. Continuing");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(Self {
|
||||||
|
uuid_index,
|
||||||
|
tlm_data,
|
||||||
|
telemetry_history_service: Arc::new(telemetry_history_service),
|
||||||
|
metadata_file: Arc::new(Mutex::new(fs::OpenOptions::new()
|
||||||
|
.create(true)
|
||||||
|
.write(true)
|
||||||
|
.append(true)
|
||||||
|
.open(metadata_file)?))
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn register(
|
pub fn register(
|
||||||
&self,
|
&self,
|
||||||
telemetry_definition_request: TelemetryDefinitionRequest,
|
telemetry_definition_request: TelemetryDefinitionRequest,
|
||||||
) -> Result<String, Box<dyn Error>> {
|
) -> anyhow::Result<String> {
|
||||||
let uuid_index = self.uuid_index.pin();
|
let uuid_index = self.uuid_index.pin();
|
||||||
let tlm_data = self.tlm_data.pin();
|
let tlm_data = self.tlm_data.pin();
|
||||||
|
|
||||||
@@ -34,21 +88,47 @@ impl TelemetryManagementService {
|
|||||||
})
|
})
|
||||||
.clone();
|
.clone();
|
||||||
|
|
||||||
let _ = tlm_data.try_insert(
|
let inserted = tlm_data.try_insert(
|
||||||
uuid.clone(),
|
uuid.clone(),
|
||||||
TelemetryDataHistory {
|
Arc::new(TelemetryDefinition {
|
||||||
data: TelemetryData {
|
|
||||||
definition: TelemetryDefinition {
|
|
||||||
uuid: uuid.clone(),
|
uuid: uuid.clone(),
|
||||||
name: telemetry_definition_request.name.clone(),
|
name: telemetry_definition_request.name.clone(),
|
||||||
data_type: telemetry_definition_request.data_type(),
|
data_type: telemetry_definition_request.data_type(),
|
||||||
},
|
}.into()),
|
||||||
data: tokio::sync::watch::channel(None).0,
|
|
||||||
},
|
|
||||||
history: TelemetryHistory::new(),
|
|
||||||
},
|
|
||||||
);
|
);
|
||||||
|
|
||||||
|
match inserted {
|
||||||
|
Ok(newly_inserted) => {
|
||||||
|
// This data also needs to be written to disk
|
||||||
|
let file = self.metadata_file.clone();
|
||||||
|
let newly_inserted = newly_inserted.data.definition.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let mut file = file.lock_owned().await;
|
||||||
|
let newly_inserted = serde_json::to_string(&newly_inserted);
|
||||||
|
match newly_inserted {
|
||||||
|
Ok(newly_inserted) => {
|
||||||
|
if let Err(err) = file.write(newly_inserted.as_bytes()) {
|
||||||
|
error!("Failed to write TelemetryDefinition to file {err}");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if let Err(err) = file.write("\n".as_bytes()) {
|
||||||
|
error!("Failed to write newline to file {err}");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if let Err(err) = file.flush() {
|
||||||
|
error!("Failed to flush file {err}");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(err) => {
|
||||||
|
error!("Failed to serialize TelemetryDefinition {err}");
|
||||||
|
},
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Err(_) => {},
|
||||||
|
}
|
||||||
|
|
||||||
Ok(uuid)
|
Ok(uuid)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -72,17 +152,51 @@ impl TelemetryManagementService {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn history_service(&self) -> &TelemetryHistoryService {
|
pub fn history_service(&self) -> Arc<TelemetryHistoryService> {
|
||||||
&self.telemetry_history_service
|
self.telemetry_history_service.clone()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn cleanup(self) -> anyhow::Result<()> {
|
||||||
|
info!("Saving Telemetry");
|
||||||
|
let history_service = self.telemetry_history_service;
|
||||||
|
let metadata_file = self.metadata_file;
|
||||||
|
let tlm_data = self.tlm_data.pin_owned();
|
||||||
|
let mut tasks = vec![];
|
||||||
|
|
||||||
|
for _ in 0..RELEASED_ATTEMPTS {
|
||||||
|
if Arc::strong_count(&history_service) != 1 {
|
||||||
|
sleep(Duration::from_secs(1)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if Arc::strong_count(&history_service) != 1 {
|
||||||
|
warn!("History Service Has not been released after {RELEASED_ATTEMPTS} attempts!");
|
||||||
|
}
|
||||||
|
|
||||||
|
for data_history in tlm_data.values() {
|
||||||
|
tasks.push(data_history.cleanup(&history_service));
|
||||||
|
}
|
||||||
|
{
|
||||||
|
if let Some(tlm) = Arc::into_inner(metadata_file) {
|
||||||
|
tlm.into_inner().sync_all()?;
|
||||||
|
} else {
|
||||||
|
error!("Could not close metadata file: still in use")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for task in tasks {
|
||||||
|
task.await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct TelemetryManagementServicePin<'a> {
|
pub struct TelemetryManagementServicePin<'a> {
|
||||||
tlm_data: HashMapRef<'a, String, TelemetryDataHistory, RandomState, LocalGuard<'a>>,
|
tlm_data: HashMapRef<'a, String, Arc<TelemetryHistory>, RandomState, LocalGuard<'a>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<'a> TelemetryManagementServicePin<'a> {
|
impl<'a> TelemetryManagementServicePin<'a> {
|
||||||
pub fn get_by_uuid(&'a self, uuid: &String) -> Option<&'a TelemetryDataHistory> {
|
pub fn get_by_uuid(&'a self, uuid: &String) -> Option<&'a Arc<TelemetryHistory>> {
|
||||||
self.tlm_data.get(uuid)
|
self.tlm_data.get(uuid)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user