adds frontend history

This commit is contained in:
2024-12-30 21:36:46 -05:00
parent aa1763cbe7
commit ff0a578940
8 changed files with 184 additions and 63 deletions

View File

@@ -9,21 +9,24 @@ import {
shallowRef, shallowRef,
type ShallowRef, type ShallowRef,
toValue, toValue,
triggerRef,
watch, watch,
} from 'vue'; } from 'vue';
import { import {
type TelemetryDataItem,
WEBSOCKET_SYMBOL, WEBSOCKET_SYMBOL,
type WebsocketHandle, type WebsocketHandle,
} from '@/composables/websocket'; } from '@/composables/websocket';
import { GRAPH_DATA, type GraphData } from '@/graph/graph'; import { GRAPH_DATA, type GraphData } from '@/graph/graph';
import { AXIS_DATA, type AxisData } from '@/graph/axis'; import { AXIS_DATA, type AxisData } from '@/graph/axis';
import ValueLabel from '@/components/ValueLabel.vue'; import ValueLabel from '@/components/ValueLabel.vue';
import type { Point } from '@/graph/line'; import { type Point, PointLine } from '@/graph/line';
const props = defineProps<{ const props = defineProps<{
data: string; data: string;
minimum_separation?: number; minimum_separation?: number;
class?: string; class?: string;
fetch_history?: number;
}>(); }>();
const smoothing_distance_x = 5; const smoothing_distance_x = 5;
@@ -70,58 +73,100 @@ onUnmounted(() => {
); );
}); });
const memo = shallowRef<Point[]>([]); const memo = shallowRef(new PointLine());
watch([value], ([val]) => { watch([value], ([val]) => {
const min_x = toValue(graph_data.min_x); const min_x = toValue(graph_data.min_x);
if (val) { if (val) {
const val_t = Date.parse(val.timestamp); const val_t = Date.parse(val.timestamp);
if (val_t >= min_x) { if (val_t >= min_x) {
// TODO: Insert this in the right spot in memo
const item_val = val.value[data.value!.data_type] as number; const item_val = val.value[data.value!.data_type] as number;
const new_memo = [ const new_item = {
{
x: val_t, x: val_t,
y: item_val, y: item_val,
}, } as Point;
].concat(memo.value); memo.value.insert(new_item);
if (item_val < min.value) { if (item_val < min.value) {
min.value = item_val; min.value = item_val;
} }
if (item_val > max.value) { if (item_val > max.value) {
max.value = item_val; max.value = item_val;
} }
memo.value = new_memo; triggerRef(memo);
debounced_recompute(); debounced_recompute();
} }
} }
}); });
watch(
[data, () => props.fetch_history],
async ([data]) => {
if (data) {
const uuid = data.uuid;
const type = data.data_type;
try {
const min_x = new Date(toValue(graph_data.min_x));
const max_x = new Date(toValue(graph_data.max_x));
const res = await fetch(
`/api/tlm/history/${uuid}?from=${min_x.toISOString()}&to=${max_x.toISOString()}&resolution=${props.minimum_separation || 0}`,
);
const response = (await res.json()) as TelemetryDataItem[];
for (const data_item of response) {
const val_t = Date.parse(data_item.timestamp);
const item_val = data_item.value[type] as number;
const new_item = {
x: val_t,
y: item_val,
} as Point;
memo.value.insert(new_item);
if (item_val < min.value) {
min.value = item_val;
}
if (item_val > max.value) {
max.value = item_val;
}
}
triggerRef(memo);
debounced_recompute();
} catch (e) {
// TODO: Response?
console.log(e);
}
}
},
{
immediate: true,
},
);
watch([graph_data.min_x, graph_data.max_x], ([min_x, max_x]) => { watch([graph_data.min_x, graph_data.max_x], ([min_x, max_x]) => {
let memo_changed = false; let memo_changed = false;
const new_memo = ([] as Point[]).concat(memo.value);
if (min_x) { if (min_x) {
while ( while (
new_memo.length > 2 && memo.value.data.length > 2 &&
new_memo[new_memo.length - 2].x < toValue(min_x) memo.value.data[1].x < toValue(min_x)
) { ) {
new_memo.pop(); memo.value.data.shift();
memo_changed = true; memo_changed = true;
} }
} }
if (max_x) { if (max_x) {
while (new_memo.length > 2 && new_memo[1].x > toValue(max_x)) { while (
new_memo.shift(); memo.value.data.length > 2 &&
memo.value.data[memo.value.data.length - 2].x > toValue(max_x)
) {
memo.value.data.pop();
memo_changed = true; memo_changed = true;
} }
} }
if (memo_changed) { if (memo_changed) {
let min_val = Infinity; let min_val = Infinity;
let max_val = -Infinity; let max_val = -Infinity;
for (const item of new_memo) { for (const item of memo.value.data) {
const item_val = item.y; const item_val = item.y;
min_val = Math.min(min_val, item_val); min_val = Math.min(min_val, item_val);
max_val = Math.max(max_val, item_val); max_val = Math.max(max_val, item_val);
} }
memo.value = new_memo; triggerRef(memo);
debounced_recompute(); debounced_recompute();
max.value = max_val; max.value = max_val;
min.value = min_val; min.value = min_val;
@@ -162,7 +207,7 @@ const group_transform = computed(() => {
watch([recompute_points], () => { watch([recompute_points], () => {
let new_points = ''; let new_points = '';
if (memo.value.length == 0 || data.value == null) { if (memo.value.data.length == 0 || data.value == null) {
return ''; return '';
} }
@@ -172,7 +217,8 @@ watch([recompute_points], () => {
let last_x = graph_data.x_map(future_number) + smoothing_distance_x; let last_x = graph_data.x_map(future_number) + smoothing_distance_x;
old_max.value = toValue(graph_data.max_x); old_max.value = toValue(graph_data.max_x);
for (const data_item of memo.value) { for (let i = memo.value.data.length - 1; i >= 0; i--) {
const data_item = memo.value.data[i];
const t = data_item.x; const t = data_item.x;
const v = data_item.y; const v = data_item.y;
const x = graph_data.x_map(t); const x = graph_data.x_map(t);

View File

@@ -2,3 +2,40 @@ export interface Point {
x: number; x: number;
y: number; y: number;
} }
export class PointLine {
data: Point[];
constructor() {
this.data = [];
}
find_index(x: number) {
if (this.data.length == 0) {
return 0;
}
// left should be too early to insert
// right should be too late to insert
let left = 0;
let size = this.data.length;
while (size > 1) {
const half = Math.floor(size / 2);
const mid = left + half;
const is_less = this.data[mid].x < x;
if (is_less) {
left = mid;
}
size -= half;
}
return left + (this.data[left].x < x ? 1 : 0);
}
insert(point: Point) {
const index = this.find_index(point.x);
this.data.splice(index, 0, point);
}
}

View File

@@ -128,7 +128,9 @@ impl CoreTelemetryService {
value: value.clone(), value: value.clone(),
timestamp: timestamp.to_rfc3339_opts(SecondsFormat::Millis, true), timestamp: timestamp.to_rfc3339_opts(SecondsFormat::Millis, true),
})); }));
tlm_data.history.insert(tlm_management.history_service(), value, timestamp); tlm_data
.history
.insert(tlm_management.history_service(), value, timestamp);
Ok(TelemetryInsertResponse {}) Ok(TelemetryInsertResponse {})
} }

View File

@@ -1,10 +1,10 @@
use crate::http::error::HttpServerResultError; use crate::http::error::HttpServerResultError;
use crate::telemetry::management_service::TelemetryManagementService; use crate::telemetry::management_service::TelemetryManagementService;
use actix_web::{get, web, Responder}; use actix_web::{get, web, Responder};
use log::{info, trace};
use std::sync::Arc;
use chrono::{DateTime, TimeDelta, Utc}; use chrono::{DateTime, TimeDelta, Utc};
use log::trace;
use serde::Deserialize; use serde::Deserialize;
use std::sync::Arc;
#[get("/tlm/info/{name:[\\w\\d/_-]+}")] #[get("/tlm/info/{name:[\\w\\d/_-]+}")]
async fn get_tlm_definition( async fn get_tlm_definition(
@@ -24,7 +24,7 @@ async fn get_tlm_definition(
struct HistoryQuery { struct HistoryQuery {
from: String, from: String,
to: String, to: String,
resolution: i64 resolution: i64,
} }
#[get("/tlm/history/{uuid:[0-9a-f]+}")] #[get("/tlm/history/{uuid:[0-9a-f]+}")]
@@ -34,28 +34,33 @@ async fn get_tlm_history(
info: web::Query<HistoryQuery>, info: web::Query<HistoryQuery>,
) -> Result<impl Responder, HttpServerResultError> { ) -> Result<impl Responder, HttpServerResultError> {
let uuid = uuid.to_string(); let uuid = uuid.to_string();
trace!("get_tlm_history {} from {} to {} resolution {}", uuid, info.from, info.to, info.resolution); trace!(
"get_tlm_history {} from {} to {} resolution {}",
uuid,
info.from,
info.to,
info.resolution
);
let Ok(from) = (&info.from).parse::<DateTime<Utc>>() else { let Ok(from) = info.from.parse::<DateTime<Utc>>() else {
return Err(HttpServerResultError::InvalidDateTime { date_time: info.from.clone() }); return Err(HttpServerResultError::InvalidDateTime {
date_time: info.from.clone(),
});
}; };
let Ok(to) = (&info.to).parse::<DateTime<Utc>>() else { let Ok(to) = info.to.parse::<DateTime<Utc>>() else {
return Err(HttpServerResultError::InvalidDateTime { date_time: info.to.clone() }); return Err(HttpServerResultError::InvalidDateTime {
date_time: info.to.clone(),
});
}; };
let maximum_resolution = TimeDelta::milliseconds(info.resolution); let maximum_resolution = TimeDelta::milliseconds(info.resolution);
info!("get_tlm_history {} from {} to {} resolution {}", uuid, from, to, maximum_resolution);
let data = data.pin(); let data = data.pin();
match data.get_by_uuid(&uuid) { match data.get_by_uuid(&uuid) {
None => Err(HttpServerResultError::TlmUuidNotFound { uuid }), None => Err(HttpServerResultError::TlmUuidNotFound { uuid }),
Some(tlm) => { Some(tlm) => Ok(web::Json(tlm.history.get(from, to, maximum_resolution))),
Ok(web::Json(tlm.history.get(from, to, maximum_resolution)))
}
} }
} }
pub fn setup_api(cfg: &mut web::ServiceConfig) { pub fn setup_api(cfg: &mut web::ServiceConfig) {
cfg cfg.service(get_tlm_definition).service(get_tlm_history);
.service(get_tlm_definition)
.service(get_tlm_history);
} }

View File

@@ -7,11 +7,11 @@ pub mod core {
tonic::include_proto!("core"); tonic::include_proto!("core");
} }
use crate::telemetry::history::TelemetryHistoryService;
use crate::telemetry::management_service::TelemetryManagementService; use crate::telemetry::management_service::TelemetryManagementService;
use std::error::Error; use std::error::Error;
use std::sync::Arc; use std::sync::Arc;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use crate::telemetry::history::TelemetryHistoryService;
pub async fn setup() -> Result<(), Box<dyn Error>> { pub async fn setup() -> Result<(), Box<dyn Error>> {
let cancellation_token = CancellationToken::new(); let cancellation_token = CancellationToken::new();
@@ -24,7 +24,7 @@ pub async fn setup() -> Result<(), Box<dyn Error>> {
} }
let tlm = Arc::new(TelemetryManagementService::new( let tlm = Arc::new(TelemetryManagementService::new(
TelemetryHistoryService::new() TelemetryHistoryService::new(),
)); ));
let grpc_server = grpc::setup(cancellation_token.clone(), tlm.clone())?; let grpc_server = grpc::setup(cancellation_token.clone(), tlm.clone())?;

View File

@@ -1,13 +1,13 @@
use crate::telemetry::data_item::TelemetryDataItem;
use crate::telemetry::data_value::TelemetryDataValue; use crate::telemetry::data_value::TelemetryDataValue;
use chrono::{DateTime, SecondsFormat, TimeDelta, Utc}; use chrono::{DateTime, SecondsFormat, TimeDelta, Utc};
use log::error;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::sync::RwLock; use std::sync::RwLock;
use log::error;
use crate::telemetry::data_item::TelemetryDataItem;
struct SegmentData { struct SegmentData {
values: Vec<TelemetryDataValue>, values: Vec<TelemetryDataValue>,
timestamps: Vec<DateTime<Utc>> timestamps: Vec<DateTime<Utc>>,
} }
struct HistorySegment { struct HistorySegment {
@@ -47,12 +47,18 @@ impl HistorySegment {
data.values.insert(index, value); data.values.insert(index, value);
} }
fn get(&self, from: DateTime<Utc>, to: DateTime<Utc>, maximum_resolution: TimeDelta) -> (DateTime<Utc>, Vec<TelemetryDataItem>) { fn get(
&self,
from: DateTime<Utc>,
to: DateTime<Utc>,
maximum_resolution: TimeDelta,
) -> (DateTime<Utc>, Vec<TelemetryDataItem>) {
let mut result = vec![]; let mut result = vec![];
let mut next_from = from; let mut next_from = from;
if from < self.end && self.start < to { // If there is overlap with the range if from < self.end && self.start < to {
// If there is overlap with the range
let data = self.data.read().unwrap_or_else(|err| { let data = self.data.read().unwrap_or_else(|err| {
error!("HistorySegment::get - data was poisoned: {}", err); error!("HistorySegment::get - data was poisoned: {}", err);
let lock = err.into_inner(); let lock = err.into_inner();
@@ -73,17 +79,25 @@ impl HistorySegment {
let t = data.timestamps[i]; let t = data.timestamps[i];
if t >= next_from { if t >= next_from {
let time_since_next_from = t - next_from; let time_since_next_from = t - next_from;
next_from = match (time_since_next_from.num_nanoseconds(), maximum_resolution.num_nanoseconds()) { next_from = match (
time_since_next_from.num_nanoseconds(),
maximum_resolution.num_nanoseconds(),
) {
(_, Some(0)) => t,
(Some(nanos_since_next_from), Some(maximum_resolution_nanos)) => { (Some(nanos_since_next_from), Some(maximum_resolution_nanos)) => {
let nanos_since_next_from = nanos_since_next_from as u64; let nanos_since_next_from = nanos_since_next_from as u64;
let maximum_resolution_nanos = maximum_resolution_nanos as u64; let maximum_resolution_nanos = maximum_resolution_nanos as u64;
let num_steps = nanos_since_next_from.div_ceil(maximum_resolution_nanos); let num_steps =
nanos_since_next_from.div_ceil(maximum_resolution_nanos);
let subsec_nanos = maximum_resolution.subsec_nanos() as u64; let subsec_nanos = maximum_resolution.subsec_nanos() as u64;
// This will break once this can't be represented in 2^63 nanoseconds (over 200 years) // This will break once this can't be represented in 2^63 nanoseconds (over 200 years)
let seconds = ((maximum_resolution.num_seconds() as u64) * num_steps) as i64; let seconds =
((maximum_resolution.num_seconds() as u64) * num_steps) as i64;
let nanos = (num_steps * subsec_nanos) as i64; let nanos = (num_steps * subsec_nanos) as i64;
next_from + TimeDelta::seconds(seconds) + TimeDelta::nanoseconds(nanos) next_from
}, + TimeDelta::seconds(seconds)
+ TimeDelta::nanoseconds(nanos)
}
_ => t + maximum_resolution, // If there is a gap so big it can't be represented in 2^63 nanoseconds (over 200 years) just skip forward _ => t + maximum_resolution, // If there is a gap so big it can't be represented in 2^63 nanoseconds (over 200 years) just skip forward
}; };
result.push(TelemetryDataItem { result.push(TelemetryDataItem {
@@ -104,14 +118,18 @@ pub struct TelemetryHistory {
} }
impl TelemetryHistory { impl TelemetryHistory {
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {
segments: RwLock::new(VecDeque::new()), segments: RwLock::new(VecDeque::new()),
} }
} }
pub fn insert(&self, service: &TelemetryHistoryService, value: TelemetryDataValue, timestamp: DateTime<Utc>) { pub fn insert(
&self,
service: &TelemetryHistoryService,
value: TelemetryDataValue,
timestamp: DateTime<Utc>,
) {
let segments = self.segments.read().unwrap_or_else(|err| { let segments = self.segments.read().unwrap_or_else(|err| {
error!("TelemetryHistory::insert - segments was poisoned: {}", err); error!("TelemetryHistory::insert - segments was poisoned: {}", err);
let lock = err.into_inner(); let lock = err.into_inner();
@@ -130,14 +148,20 @@ impl TelemetryHistory {
}); });
if segments.len() == 0 { if segments.len() == 0 {
segments.push_back(HistorySegment::new(timestamp, timestamp + service.segment_width)); segments.push_back(HistorySegment::new(
timestamp,
timestamp + service.segment_width,
));
} else { } else {
while segments[segments.len() - 1].end < timestamp { while segments[segments.len() - 1].end < timestamp {
if segments.len() == service.max_segments { if segments.len() == service.max_segments {
let _ = segments.pop_front(); let _ = segments.pop_front();
} }
let start_time = segments[segments.len() - 1].end; let start_time = segments[segments.len() - 1].end;
segments.push_back(HistorySegment::new(start_time, start_time + service.segment_width)); segments.push_back(HistorySegment::new(
start_time,
start_time + service.segment_width,
));
} }
} }
@@ -158,7 +182,12 @@ impl TelemetryHistory {
segments[segment_index].insert(value, timestamp); segments[segment_index].insert(value, timestamp);
} }
pub fn get(&self, from: DateTime<Utc>, to: DateTime<Utc>, maximum_resolution: TimeDelta) -> Vec<TelemetryDataItem> { pub fn get(
&self,
from: DateTime<Utc>,
to: DateTime<Utc>,
maximum_resolution: TimeDelta,
) -> Vec<TelemetryDataItem> {
let mut result = vec![]; let mut result = vec![];
let segments = self.segments.read().unwrap_or_else(|err| { let segments = self.segments.read().unwrap_or_else(|err| {
@@ -182,14 +211,14 @@ impl TelemetryHistory {
pub struct TelemetryHistoryService { pub struct TelemetryHistoryService {
segment_width: TimeDelta, segment_width: TimeDelta,
max_segments: usize max_segments: usize,
} }
impl TelemetryHistoryService { impl TelemetryHistoryService {
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {
segment_width: TimeDelta::minutes(1), segment_width: TimeDelta::minutes(1),
max_segments: 5 max_segments: 5,
} }
} }
} }

View File

@@ -1,15 +1,15 @@
use crate::core::{TelemetryDefinitionRequest, Uuid}; use crate::core::{TelemetryDefinitionRequest, Uuid};
use crate::telemetry::data::{TelemetryData, TelemetryDataHistory}; use crate::telemetry::data::{TelemetryData, TelemetryDataHistory};
use crate::telemetry::definition::TelemetryDefinition; use crate::telemetry::definition::TelemetryDefinition;
use crate::telemetry::history::{TelemetryHistory, TelemetryHistoryService};
use papaya::{HashMap, HashMapRef, LocalGuard}; use papaya::{HashMap, HashMapRef, LocalGuard};
use std::error::Error; use std::error::Error;
use std::hash::RandomState; use std::hash::RandomState;
use crate::telemetry::history::{TelemetryHistory, TelemetryHistoryService};
pub struct TelemetryManagementService { pub struct TelemetryManagementService {
uuid_index: HashMap<String, String>, uuid_index: HashMap<String, String>,
tlm_data: HashMap<String, TelemetryDataHistory>, tlm_data: HashMap<String, TelemetryDataHistory>,
telemetry_history_service: TelemetryHistoryService telemetry_history_service: TelemetryHistoryService,
} }
impl TelemetryManagementService { impl TelemetryManagementService {
@@ -17,7 +17,7 @@ impl TelemetryManagementService {
Self { Self {
uuid_index: HashMap::new(), uuid_index: HashMap::new(),
tlm_data: HashMap::new(), tlm_data: HashMap::new(),
telemetry_history_service telemetry_history_service,
} }
} }
@@ -60,12 +60,15 @@ impl TelemetryManagementService {
pub fn get_by_uuid(&self, uuid: &String) -> Option<TelemetryData> { pub fn get_by_uuid(&self, uuid: &String) -> Option<TelemetryData> {
let tlm_data = self.tlm_data.pin(); let tlm_data = self.tlm_data.pin();
tlm_data.get(uuid).map(|data_history| &data_history.data).cloned() tlm_data
.get(uuid)
.map(|data_history| &data_history.data)
.cloned()
} }
pub fn pin(&self) -> TelemetryManagementServicePin { pub fn pin(&self) -> TelemetryManagementServicePin {
TelemetryManagementServicePin { TelemetryManagementServicePin {
tlm_data: self.tlm_data.pin() tlm_data: self.tlm_data.pin(),
} }
} }
@@ -75,7 +78,7 @@ impl TelemetryManagementService {
} }
pub struct TelemetryManagementServicePin<'a> { pub struct TelemetryManagementServicePin<'a> {
tlm_data: HashMapRef<'a, String, TelemetryDataHistory, RandomState, LocalGuard<'a>> tlm_data: HashMapRef<'a, String, TelemetryDataHistory, RandomState, LocalGuard<'a>>,
} }
impl<'a> TelemetryManagementServicePin<'a> { impl<'a> TelemetryManagementServicePin<'a> {
@@ -83,4 +86,3 @@ impl<'a> TelemetryManagementServicePin<'a> {
self.tlm_data.get(uuid) self.tlm_data.get(uuid)
} }
} }

View File

@@ -3,5 +3,5 @@ pub mod data_item;
pub mod data_type; pub mod data_type;
pub mod data_value; pub mod data_value;
pub mod definition; pub mod definition;
pub mod management_service;
pub mod history; pub mod history;
pub mod management_service;