diff --git a/frontend/src/components/TelemetryLine.vue b/frontend/src/components/TelemetryLine.vue index 3960785..8a953d0 100644 --- a/frontend/src/components/TelemetryLine.vue +++ b/frontend/src/components/TelemetryLine.vue @@ -9,21 +9,24 @@ import { shallowRef, type ShallowRef, toValue, + triggerRef, watch, } from 'vue'; import { + type TelemetryDataItem, WEBSOCKET_SYMBOL, type WebsocketHandle, } from '@/composables/websocket'; import { GRAPH_DATA, type GraphData } from '@/graph/graph'; import { AXIS_DATA, type AxisData } from '@/graph/axis'; import ValueLabel from '@/components/ValueLabel.vue'; -import type { Point } from '@/graph/line'; +import { type Point, PointLine } from '@/graph/line'; const props = defineProps<{ data: string; minimum_separation?: number; class?: string; + fetch_history?: number; }>(); const smoothing_distance_x = 5; @@ -70,58 +73,100 @@ onUnmounted(() => { ); }); -const memo = shallowRef([]); +const memo = shallowRef(new PointLine()); watch([value], ([val]) => { const min_x = toValue(graph_data.min_x); if (val) { const val_t = Date.parse(val.timestamp); if (val_t >= min_x) { - // TODO: Insert this in the right spot in memo const item_val = val.value[data.value!.data_type] as number; - const new_memo = [ - { - x: val_t, - y: item_val, - }, - ].concat(memo.value); + const new_item = { + x: val_t, + y: item_val, + } as Point; + memo.value.insert(new_item); if (item_val < min.value) { min.value = item_val; } if (item_val > max.value) { max.value = item_val; } - memo.value = new_memo; + triggerRef(memo); debounced_recompute(); } } }); +watch( + [data, () => props.fetch_history], + async ([data]) => { + if (data) { + const uuid = data.uuid; + const type = data.data_type; + try { + const min_x = new Date(toValue(graph_data.min_x)); + const max_x = new Date(toValue(graph_data.max_x)); + const res = await fetch( + `/api/tlm/history/${uuid}?from=${min_x.toISOString()}&to=${max_x.toISOString()}&resolution=${props.minimum_separation || 0}`, + ); + const response = (await res.json()) as TelemetryDataItem[]; + for (const data_item of response) { + const val_t = Date.parse(data_item.timestamp); + const item_val = data_item.value[type] as number; + const new_item = { + x: val_t, + y: item_val, + } as Point; + memo.value.insert(new_item); + if (item_val < min.value) { + min.value = item_val; + } + if (item_val > max.value) { + max.value = item_val; + } + } + triggerRef(memo); + debounced_recompute(); + } catch (e) { + // TODO: Response? + console.log(e); + } + } + }, + { + immediate: true, + }, +); + watch([graph_data.min_x, graph_data.max_x], ([min_x, max_x]) => { let memo_changed = false; - const new_memo = ([] as Point[]).concat(memo.value); + if (min_x) { while ( - new_memo.length > 2 && - new_memo[new_memo.length - 2].x < toValue(min_x) + memo.value.data.length > 2 && + memo.value.data[1].x < toValue(min_x) ) { - new_memo.pop(); + memo.value.data.shift(); memo_changed = true; } } if (max_x) { - while (new_memo.length > 2 && new_memo[1].x > toValue(max_x)) { - new_memo.shift(); + while ( + memo.value.data.length > 2 && + memo.value.data[memo.value.data.length - 2].x > toValue(max_x) + ) { + memo.value.data.pop(); memo_changed = true; } } if (memo_changed) { let min_val = Infinity; let max_val = -Infinity; - for (const item of new_memo) { + for (const item of memo.value.data) { const item_val = item.y; min_val = Math.min(min_val, item_val); max_val = Math.max(max_val, item_val); } - memo.value = new_memo; + triggerRef(memo); debounced_recompute(); max.value = max_val; min.value = min_val; @@ -162,7 +207,7 @@ const group_transform = computed(() => { watch([recompute_points], () => { let new_points = ''; - if (memo.value.length == 0 || data.value == null) { + if (memo.value.data.length == 0 || data.value == null) { return ''; } @@ -172,7 +217,8 @@ watch([recompute_points], () => { let last_x = graph_data.x_map(future_number) + smoothing_distance_x; old_max.value = toValue(graph_data.max_x); - for (const data_item of memo.value) { + for (let i = memo.value.data.length - 1; i >= 0; i--) { + const data_item = memo.value.data[i]; const t = data_item.x; const v = data_item.y; const x = graph_data.x_map(t); diff --git a/frontend/src/graph/line.ts b/frontend/src/graph/line.ts index 81956cb..116ff11 100644 --- a/frontend/src/graph/line.ts +++ b/frontend/src/graph/line.ts @@ -2,3 +2,40 @@ export interface Point { x: number; y: number; } + +export class PointLine { + data: Point[]; + + constructor() { + this.data = []; + } + + find_index(x: number) { + if (this.data.length == 0) { + return 0; + } + + // left should be too early to insert + // right should be too late to insert + let left = 0; + let size = this.data.length; + + while (size > 1) { + const half = Math.floor(size / 2); + const mid = left + half; + + const is_less = this.data[mid].x < x; + if (is_less) { + left = mid; + } + size -= half; + } + + return left + (this.data[left].x < x ? 1 : 0); + } + + insert(point: Point) { + const index = this.find_index(point.x); + this.data.splice(index, 0, point); + } +} diff --git a/server/src/grpc.rs b/server/src/grpc.rs index 2097090..17c4629 100644 --- a/server/src/grpc.rs +++ b/server/src/grpc.rs @@ -128,7 +128,9 @@ impl CoreTelemetryService { value: value.clone(), timestamp: timestamp.to_rfc3339_opts(SecondsFormat::Millis, true), })); - tlm_data.history.insert(tlm_management.history_service(), value, timestamp); + tlm_data + .history + .insert(tlm_management.history_service(), value, timestamp); Ok(TelemetryInsertResponse {}) } diff --git a/server/src/http/api/mod.rs b/server/src/http/api/mod.rs index 98e6cbb..313e8f7 100644 --- a/server/src/http/api/mod.rs +++ b/server/src/http/api/mod.rs @@ -1,10 +1,10 @@ use crate::http::error::HttpServerResultError; use crate::telemetry::management_service::TelemetryManagementService; use actix_web::{get, web, Responder}; -use log::{info, trace}; -use std::sync::Arc; use chrono::{DateTime, TimeDelta, Utc}; +use log::trace; use serde::Deserialize; +use std::sync::Arc; #[get("/tlm/info/{name:[\\w\\d/_-]+}")] async fn get_tlm_definition( @@ -24,7 +24,7 @@ async fn get_tlm_definition( struct HistoryQuery { from: String, to: String, - resolution: i64 + resolution: i64, } #[get("/tlm/history/{uuid:[0-9a-f]+}")] @@ -34,28 +34,33 @@ async fn get_tlm_history( info: web::Query, ) -> Result { let uuid = uuid.to_string(); - trace!("get_tlm_history {} from {} to {} resolution {}", uuid, info.from, info.to, info.resolution); + trace!( + "get_tlm_history {} from {} to {} resolution {}", + uuid, + info.from, + info.to, + info.resolution + ); - let Ok(from) = (&info.from).parse::>() else { - return Err(HttpServerResultError::InvalidDateTime { date_time: info.from.clone() }); + let Ok(from) = info.from.parse::>() else { + return Err(HttpServerResultError::InvalidDateTime { + date_time: info.from.clone(), + }); }; - let Ok(to) = (&info.to).parse::>() else { - return Err(HttpServerResultError::InvalidDateTime { date_time: info.to.clone() }); + let Ok(to) = info.to.parse::>() else { + return Err(HttpServerResultError::InvalidDateTime { + date_time: info.to.clone(), + }); }; let maximum_resolution = TimeDelta::milliseconds(info.resolution); - info!("get_tlm_history {} from {} to {} resolution {}", uuid, from, to, maximum_resolution); let data = data.pin(); match data.get_by_uuid(&uuid) { None => Err(HttpServerResultError::TlmUuidNotFound { uuid }), - Some(tlm) => { - Ok(web::Json(tlm.history.get(from, to, maximum_resolution))) - } + Some(tlm) => Ok(web::Json(tlm.history.get(from, to, maximum_resolution))), } } pub fn setup_api(cfg: &mut web::ServiceConfig) { - cfg - .service(get_tlm_definition) - .service(get_tlm_history); + cfg.service(get_tlm_definition).service(get_tlm_history); } diff --git a/server/src/lib.rs b/server/src/lib.rs index 441fb4f..fa1fcc9 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -7,11 +7,11 @@ pub mod core { tonic::include_proto!("core"); } +use crate::telemetry::history::TelemetryHistoryService; use crate::telemetry::management_service::TelemetryManagementService; use std::error::Error; use std::sync::Arc; use tokio_util::sync::CancellationToken; -use crate::telemetry::history::TelemetryHistoryService; pub async fn setup() -> Result<(), Box> { let cancellation_token = CancellationToken::new(); @@ -24,7 +24,7 @@ pub async fn setup() -> Result<(), Box> { } let tlm = Arc::new(TelemetryManagementService::new( - TelemetryHistoryService::new() + TelemetryHistoryService::new(), )); let grpc_server = grpc::setup(cancellation_token.clone(), tlm.clone())?; diff --git a/server/src/telemetry/history.rs b/server/src/telemetry/history.rs index cb036a5..d632f06 100644 --- a/server/src/telemetry/history.rs +++ b/server/src/telemetry/history.rs @@ -1,13 +1,13 @@ +use crate::telemetry::data_item::TelemetryDataItem; use crate::telemetry::data_value::TelemetryDataValue; use chrono::{DateTime, SecondsFormat, TimeDelta, Utc}; +use log::error; use std::collections::VecDeque; use std::sync::RwLock; -use log::error; -use crate::telemetry::data_item::TelemetryDataItem; struct SegmentData { values: Vec, - timestamps: Vec> + timestamps: Vec>, } struct HistorySegment { @@ -47,12 +47,18 @@ impl HistorySegment { data.values.insert(index, value); } - fn get(&self, from: DateTime, to: DateTime, maximum_resolution: TimeDelta) -> (DateTime, Vec) { + fn get( + &self, + from: DateTime, + to: DateTime, + maximum_resolution: TimeDelta, + ) -> (DateTime, Vec) { let mut result = vec![]; let mut next_from = from; - if from < self.end && self.start < to { // If there is overlap with the range + if from < self.end && self.start < to { + // If there is overlap with the range let data = self.data.read().unwrap_or_else(|err| { error!("HistorySegment::get - data was poisoned: {}", err); let lock = err.into_inner(); @@ -73,17 +79,25 @@ impl HistorySegment { let t = data.timestamps[i]; if t >= next_from { let time_since_next_from = t - next_from; - next_from = match (time_since_next_from.num_nanoseconds(), maximum_resolution.num_nanoseconds()) { + next_from = match ( + time_since_next_from.num_nanoseconds(), + maximum_resolution.num_nanoseconds(), + ) { + (_, Some(0)) => t, (Some(nanos_since_next_from), Some(maximum_resolution_nanos)) => { let nanos_since_next_from = nanos_since_next_from as u64; let maximum_resolution_nanos = maximum_resolution_nanos as u64; - let num_steps = nanos_since_next_from.div_ceil(maximum_resolution_nanos); + let num_steps = + nanos_since_next_from.div_ceil(maximum_resolution_nanos); let subsec_nanos = maximum_resolution.subsec_nanos() as u64; // This will break once this can't be represented in 2^63 nanoseconds (over 200 years) - let seconds = ((maximum_resolution.num_seconds() as u64) * num_steps) as i64; + let seconds = + ((maximum_resolution.num_seconds() as u64) * num_steps) as i64; let nanos = (num_steps * subsec_nanos) as i64; - next_from + TimeDelta::seconds(seconds) + TimeDelta::nanoseconds(nanos) - }, + next_from + + TimeDelta::seconds(seconds) + + TimeDelta::nanoseconds(nanos) + } _ => t + maximum_resolution, // If there is a gap so big it can't be represented in 2^63 nanoseconds (over 200 years) just skip forward }; result.push(TelemetryDataItem { @@ -104,14 +118,18 @@ pub struct TelemetryHistory { } impl TelemetryHistory { - pub fn new() -> Self { Self { segments: RwLock::new(VecDeque::new()), } } - pub fn insert(&self, service: &TelemetryHistoryService, value: TelemetryDataValue, timestamp: DateTime) { + pub fn insert( + &self, + service: &TelemetryHistoryService, + value: TelemetryDataValue, + timestamp: DateTime, + ) { let segments = self.segments.read().unwrap_or_else(|err| { error!("TelemetryHistory::insert - segments was poisoned: {}", err); let lock = err.into_inner(); @@ -130,14 +148,20 @@ impl TelemetryHistory { }); if segments.len() == 0 { - segments.push_back(HistorySegment::new(timestamp, timestamp + service.segment_width)); + segments.push_back(HistorySegment::new( + timestamp, + timestamp + service.segment_width, + )); } else { while segments[segments.len() - 1].end < timestamp { if segments.len() == service.max_segments { let _ = segments.pop_front(); } let start_time = segments[segments.len() - 1].end; - segments.push_back(HistorySegment::new(start_time, start_time + service.segment_width)); + segments.push_back(HistorySegment::new( + start_time, + start_time + service.segment_width, + )); } } @@ -158,7 +182,12 @@ impl TelemetryHistory { segments[segment_index].insert(value, timestamp); } - pub fn get(&self, from: DateTime, to: DateTime, maximum_resolution: TimeDelta) -> Vec { + pub fn get( + &self, + from: DateTime, + to: DateTime, + maximum_resolution: TimeDelta, + ) -> Vec { let mut result = vec![]; let segments = self.segments.read().unwrap_or_else(|err| { @@ -182,14 +211,14 @@ impl TelemetryHistory { pub struct TelemetryHistoryService { segment_width: TimeDelta, - max_segments: usize + max_segments: usize, } impl TelemetryHistoryService { pub fn new() -> Self { Self { segment_width: TimeDelta::minutes(1), - max_segments: 5 + max_segments: 5, } } } diff --git a/server/src/telemetry/management_service.rs b/server/src/telemetry/management_service.rs index 7669fbc..f027824 100644 --- a/server/src/telemetry/management_service.rs +++ b/server/src/telemetry/management_service.rs @@ -1,15 +1,15 @@ use crate::core::{TelemetryDefinitionRequest, Uuid}; use crate::telemetry::data::{TelemetryData, TelemetryDataHistory}; use crate::telemetry::definition::TelemetryDefinition; +use crate::telemetry::history::{TelemetryHistory, TelemetryHistoryService}; use papaya::{HashMap, HashMapRef, LocalGuard}; use std::error::Error; use std::hash::RandomState; -use crate::telemetry::history::{TelemetryHistory, TelemetryHistoryService}; pub struct TelemetryManagementService { uuid_index: HashMap, tlm_data: HashMap, - telemetry_history_service: TelemetryHistoryService + telemetry_history_service: TelemetryHistoryService, } impl TelemetryManagementService { @@ -17,7 +17,7 @@ impl TelemetryManagementService { Self { uuid_index: HashMap::new(), tlm_data: HashMap::new(), - telemetry_history_service + telemetry_history_service, } } @@ -60,12 +60,15 @@ impl TelemetryManagementService { pub fn get_by_uuid(&self, uuid: &String) -> Option { let tlm_data = self.tlm_data.pin(); - tlm_data.get(uuid).map(|data_history| &data_history.data).cloned() + tlm_data + .get(uuid) + .map(|data_history| &data_history.data) + .cloned() } pub fn pin(&self) -> TelemetryManagementServicePin { TelemetryManagementServicePin { - tlm_data: self.tlm_data.pin() + tlm_data: self.tlm_data.pin(), } } @@ -75,7 +78,7 @@ impl TelemetryManagementService { } pub struct TelemetryManagementServicePin<'a> { - tlm_data: HashMapRef<'a, String, TelemetryDataHistory, RandomState, LocalGuard<'a>> + tlm_data: HashMapRef<'a, String, TelemetryDataHistory, RandomState, LocalGuard<'a>>, } impl<'a> TelemetryManagementServicePin<'a> { @@ -83,4 +86,3 @@ impl<'a> TelemetryManagementServicePin<'a> { self.tlm_data.get(uuid) } } - diff --git a/server/src/telemetry/mod.rs b/server/src/telemetry/mod.rs index 6988350..f95ccc7 100644 --- a/server/src/telemetry/mod.rs +++ b/server/src/telemetry/mod.rs @@ -3,5 +3,5 @@ pub mod data_item; pub mod data_type; pub mod data_value; pub mod definition; -pub mod management_service; pub mod history; +pub mod management_service;