improves server efficiency

This commit is contained in:
2025-01-13 19:38:21 -08:00
parent 2cb1eec404
commit fdd3f2c128
4 changed files with 78 additions and 84 deletions

View File

@@ -1,16 +1,18 @@
use std::collections::HashMap; use crate::http::websocket::request::{
use crate::http::websocket::request::{RegisterTlmListenerRequest, UnregisterTlmListenerRequest, WebsocketRequest}; RegisterTlmListenerRequest, UnregisterTlmListenerRequest, WebsocketRequest,
};
use crate::http::websocket::response::{TlmValueResponse, WebsocketResponse}; use crate::http::websocket::response::{TlmValueResponse, WebsocketResponse};
use crate::telemetry::management_service::TelemetryManagementService; use crate::telemetry::management_service::TelemetryManagementService;
use actix_web::{rt, web, HttpRequest, HttpResponse}; use actix_web::{rt, web, HttpRequest, HttpResponse};
use actix_ws::{AggregatedMessage, ProtocolError, Session}; use actix_ws::{AggregatedMessage, ProtocolError, Session};
use anyhow::anyhow; use anyhow::anyhow;
use log::{error, trace}; use log::{error, trace};
use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::select;
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
use tokio::time::{sleep, Instant}; use tokio::time::{sleep_until, Instant};
use tokio::{pin, select};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tonic::codegen::tokio_stream::StreamExt; use tonic::codegen::tokio_stream::StreamExt;
@@ -32,48 +34,29 @@ fn handle_register_tlm_listener(
let mut rx = tlm_data.data.subscribe(); let mut rx = tlm_data.data.subscribe();
let tx = tx.clone(); let tx = tx.clone();
rt::spawn(async move { rt::spawn(async move {
let mut last_sent_at = Instant::now() - minimum_separation;
let mut last_value = None;
let sleep = sleep(Duration::from_millis(0));
pin!(sleep);
loop { loop {
select! { let now = select! {
_ = tx.closed() => { biased;
break; _ = tx.closed() => { break; }
} _ = token.cancelled() => { break; }
_ = token.cancelled() => {
break;
}
Ok(_) = rx.changed() => { Ok(_) = rx.changed() => {
let now = Instant::now(); let now = Instant::now();
let value = { let value = {
let ref_val = rx.borrow_and_update(); let ref_val = rx.borrow_and_update();
ref_val.clone() ref_val.clone()
}; };
if last_sent_at + minimum_separation > now {
last_value = value;
sleep.as_mut().reset(last_sent_at + minimum_separation);
continue;
} else {
last_value = None;
last_sent_at = now;
}
let _ = tx.send(TlmValueResponse { let _ = tx.send(TlmValueResponse {
uuid: request.uuid.clone(), uuid: request.uuid.clone(),
value, value,
}.into()).await; }.into()).await;
now
} }
() = &mut sleep => { };
if let Some(value) = last_value { select! {
let _ = tx.send(TlmValueResponse { biased;
uuid: request.uuid.clone(), _ = tx.closed() => { break; }
value: Some(value), _ = token.cancelled() => { break; }
}.into()).await; _ = sleep_until(now + minimum_separation) => {}
}
last_value = None;
let now = Instant::now();
last_sent_at = now;
}
} }
} }
}); });
@@ -98,10 +81,10 @@ async fn handle_websocket_message(
match request { match request {
WebsocketRequest::RegisterTlmListener(request) => { WebsocketRequest::RegisterTlmListener(request) => {
handle_register_tlm_listener(data, request, tx, tlm_listeners) handle_register_tlm_listener(data, request, tx, tlm_listeners)
}, }
WebsocketRequest::UnregisterTlmListener(request) => { WebsocketRequest::UnregisterTlmListener(request) => {
handle_unregister_tlm_listener(request, tlm_listeners) handle_unregister_tlm_listener(request, tlm_listeners)
}, }
}; };
} }

View File

@@ -1,23 +1,29 @@
use std::fs::File;
use std::io; use std::io;
use std::io::{Read, Write};
pub trait FileWriteableType { pub trait FileWriteableType {
fn write_to_file(self, file: &mut File) -> io::Result<()>; fn write_to_file(self, file: &mut impl Write) -> io::Result<()>;
} }
pub trait FileReadableType: Sized { pub trait FileReadableType: Sized {
fn read_from_file(file: &mut File) -> io::Result<Self>; fn read_from_file(file: &mut impl Read) -> io::Result<Self>;
} }
pub trait FileExt { pub trait WriteExt {
fn write_data<T: FileWriteableType>(&mut self, data: T) -> io::Result<()>; fn write_data<T: FileWriteableType>(&mut self, data: T) -> io::Result<()>;
}
pub trait ReadExt {
fn read_data<T: FileReadableType>(&mut self) -> io::Result<T>; fn read_data<T: FileReadableType>(&mut self) -> io::Result<T>;
} }
impl FileExt for File { impl<W: Write> WriteExt for W {
fn write_data<T: FileWriteableType>(&mut self, data: T) -> io::Result<()> { fn write_data<T: FileWriteableType>(&mut self, data: T) -> io::Result<()> {
data.write_to_file(self) data.write_to_file(self)
} }
}
impl<R: Read> ReadExt for R {
fn read_data<T: FileReadableType>(&mut self) -> io::Result<T> { fn read_data<T: FileReadableType>(&mut self) -> io::Result<T> {
T::read_from_file(self) T::read_from_file(self)
} }

View File

@@ -1,16 +1,15 @@
use crate::serialization::file_ext::{FileReadableType, FileWriteableType}; use crate::serialization::file_ext::{FileReadableType, FileWriteableType};
use std::fs::File;
use std::io::{Read, Write}; use std::io::{Read, Write};
macro_rules! primitive_write_read { macro_rules! primitive_write_read {
( $primitive:ty, $length:expr ) => { ( $primitive:ty, $length:expr ) => {
impl FileWriteableType for $primitive { impl FileWriteableType for $primitive {
fn write_to_file(self, file: &mut File) -> std::io::Result<()> { fn write_to_file(self, file: &mut impl Write) -> std::io::Result<()> {
file.write_all(&self.to_be_bytes()) file.write_all(&self.to_be_bytes())
} }
} }
impl FileReadableType for $primitive { impl FileReadableType for $primitive {
fn read_from_file(file: &mut File) -> std::io::Result<Self> { fn read_from_file(file: &mut impl Read) -> std::io::Result<Self> {
let mut buffer = [0u8; $length]; let mut buffer = [0u8; $length];
file.read_exact(&mut buffer)?; file.read_exact(&mut buffer)?;
Ok(Self::from_be_bytes(buffer)) Ok(Self::from_be_bytes(buffer))

View File

@@ -1,5 +1,5 @@
use crate::core::TelemetryDataType; use crate::core::TelemetryDataType;
use crate::serialization::file_ext::FileExt; use crate::serialization::file_ext::{ReadExt, WriteExt};
use crate::telemetry::data::TelemetryData; use crate::telemetry::data::TelemetryData;
use crate::telemetry::data_item::TelemetryDataItem; use crate::telemetry::data_item::TelemetryDataItem;
use crate::telemetry::data_value::TelemetryDataValue; use crate::telemetry::data_value::TelemetryDataValue;
@@ -10,7 +10,7 @@ use log::{error, info};
use std::cmp::min; use std::cmp::min;
use std::collections::VecDeque; use std::collections::VecDeque;
use std::fs::File; use std::fs::File;
use std::io::{Seek, SeekFrom, Write}; use std::io::{BufReader, BufWriter, Seek, SeekFrom, Write};
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::{fs, path}; use std::{fs, path};
@@ -137,7 +137,8 @@ struct HistorySegmentFile {
start: DateTime<Utc>, start: DateTime<Utc>,
end: DateTime<Utc>, end: DateTime<Utc>,
length: u64, length: u64,
file: File, file: BufReader<File>,
file_position: i64,
} }
impl HistorySegmentFile { impl HistorySegmentFile {
@@ -158,31 +159,16 @@ impl HistorySegmentFile {
segment.start.to_rfc3339_opts(SecondsFormat::Secs, true) segment.start.to_rfc3339_opts(SecondsFormat::Secs, true)
)); ));
let file = File::create(file)?; let mut file = BufWriter::new(File::create(file)?);
let mut result = Self { let utc_offset_start = segment.start - DateTime::UNIX_EPOCH;
start: segment.start, let utc_offset_end = segment.end - DateTime::UNIX_EPOCH;
end: segment.end,
length: 0,
file,
};
let utc_offset_start = result.start - DateTime::UNIX_EPOCH;
let utc_offset_end = result.end - DateTime::UNIX_EPOCH;
// Write the segment bounds // Write the segment bounds
result file.write_data::<i64>(utc_offset_start.num_seconds())?;
.file file.write_data::<i32>(utc_offset_start.subsec_nanos())?;
.write_data::<i64>(utc_offset_start.num_seconds())?; file.write_data::<i64>(utc_offset_end.num_seconds())?;
result file.write_data::<i32>(utc_offset_end.subsec_nanos())?;
.file
.write_data::<i32>(utc_offset_start.subsec_nanos())?;
result
.file
.write_data::<i64>(utc_offset_end.num_seconds())?;
result
.file
.write_data::<i32>(utc_offset_end.subsec_nanos())?;
let data = segment.data.get_mut().unwrap_or_else(|err| { let data = segment.data.get_mut().unwrap_or_else(|err| {
error!( error!(
@@ -197,26 +183,35 @@ impl HistorySegmentFile {
"Invalid Segment Cannot Be Saved to Disk" "Invalid Segment Cannot Be Saved to Disk"
); );
result.length = data.timestamps.len() as u64; let length = data.timestamps.len() as u64;
result.file.write_data::<u64>(result.length)?; file.write_data::<u64>(length)?;
// Write all the timestamps // Write all the timestamps
for timestamp in &data.timestamps { for timestamp in &data.timestamps {
let utc_offset = *timestamp - DateTime::UNIX_EPOCH; let utc_offset = *timestamp - DateTime::UNIX_EPOCH;
result.file.write_data::<i64>(utc_offset.num_seconds())?; file.write_data::<i64>(utc_offset.num_seconds())?;
result.file.write_data::<i32>(utc_offset.subsec_nanos())?; file.write_data::<i32>(utc_offset.subsec_nanos())?;
} }
// Write all the values // Write all the values
for value in &data.values { for value in &data.values {
match value { match value {
TelemetryDataValue::Float32(value) => result.file.write_data::<f32>(*value)?, TelemetryDataValue::Float32(value) => file.write_data::<f32>(*value)?,
TelemetryDataValue::Float64(value) => result.file.write_data::<f64>(*value)?, TelemetryDataValue::Float64(value) => file.write_data::<f64>(*value)?,
} }
} }
result.file.flush()?; file.flush()?;
Ok(result)
let mut file = BufReader::new(file.into_inner()?);
file.seek(SeekFrom::Start(0))?;
Ok(Self {
start: segment.start,
end: segment.end,
length: 0,
file,
file_position: 0,
})
} }
fn load_to_ram( fn load_to_ram(
@@ -229,6 +224,7 @@ impl HistorySegmentFile {
}; };
self.file.seek(SeekFrom::Start(Self::HEADER_LENGTH))?; self.file.seek(SeekFrom::Start(Self::HEADER_LENGTH))?;
self.file_position = Self::HEADER_LENGTH as i64;
for _ in 0..self.length { for _ in 0..self.length {
segment_data.timestamps.push(self.read_date_time()?); segment_data.timestamps.push(self.read_date_time()?);
} }
@@ -255,7 +251,7 @@ impl HistorySegmentFile {
start.to_rfc3339_opts(SecondsFormat::Secs, true) start.to_rfc3339_opts(SecondsFormat::Secs, true)
)); ));
let mut file = File::open(file)?; let mut file = BufReader::new(File::open(file)?);
// Write the segment bounds // Write the segment bounds
let start_seconds = file.read_data::<i64>()?; let start_seconds = file.read_data::<i64>()?;
@@ -269,11 +265,13 @@ impl HistorySegmentFile {
let length = file.read_data::<u64>()?; let length = file.read_data::<u64>()?;
file.seek(SeekFrom::Start(0))?;
Ok(HistorySegmentFile { Ok(HistorySegmentFile {
start: DateTime::UNIX_EPOCH + start, start: DateTime::UNIX_EPOCH + start,
end: DateTime::UNIX_EPOCH + end, end: DateTime::UNIX_EPOCH + end,
length, length,
file, file,
file_position: 0,
}) })
} }
@@ -284,6 +282,8 @@ impl HistorySegmentFile {
maximum_resolution: TimeDelta, maximum_resolution: TimeDelta,
telemetry_data_type: TelemetryDataType, telemetry_data_type: TelemetryDataType,
) -> anyhow::Result<(DateTime<Utc>, Vec<TelemetryDataItem>)> { ) -> anyhow::Result<(DateTime<Utc>, Vec<TelemetryDataItem>)> {
self.file_position = 0;
self.file.seek(SeekFrom::Start(0))?;
let mut result = vec![]; let mut result = vec![];
let mut next_from = from; let mut next_from = from;
@@ -319,15 +319,17 @@ impl HistorySegmentFile {
fn read_date_time(&mut self) -> anyhow::Result<DateTime<Utc>> { fn read_date_time(&mut self) -> anyhow::Result<DateTime<Utc>> {
let seconds = self.file.read_data::<i64>()?; let seconds = self.file.read_data::<i64>()?;
let nanos = self.file.read_data::<i32>()?; let nanos = self.file.read_data::<i32>()?;
self.file_position += 8 + 4;
let start = let start =
TimeDelta::new(seconds, nanos as u32).context("Failed to reconstruct TimeDelta")?; TimeDelta::new(seconds, nanos as u32).context("Failed to reconstruct TimeDelta")?;
Ok(DateTime::UNIX_EPOCH + start) Ok(DateTime::UNIX_EPOCH + start)
} }
fn get_date_time(&mut self, index: u64) -> anyhow::Result<DateTime<Utc>> { fn get_date_time(&mut self, index: u64) -> anyhow::Result<DateTime<Utc>> {
self.file.seek(SeekFrom::Start( let desired_position = Self::HEADER_LENGTH + index * Self::TIMESTAMP_LENGTH;
Self::HEADER_LENGTH + index * Self::TIMESTAMP_LENGTH, let seek_amount = desired_position as i64 - self.file_position;
))?; self.file_position += seek_amount;
self.file.seek_relative(seek_amount)?;
self.read_date_time() self.read_date_time()
} }
@@ -337,9 +339,11 @@ impl HistorySegmentFile {
) -> anyhow::Result<TelemetryDataValue> { ) -> anyhow::Result<TelemetryDataValue> {
match telemetry_data_type { match telemetry_data_type {
TelemetryDataType::Float32 => { TelemetryDataType::Float32 => {
self.file_position += 4;
Ok(TelemetryDataValue::Float32(self.file.read_data::<f32>()?)) Ok(TelemetryDataValue::Float32(self.file.read_data::<f32>()?))
} }
TelemetryDataType::Float64 => { TelemetryDataType::Float64 => {
self.file_position += 8;
Ok(TelemetryDataValue::Float64(self.file.read_data::<f64>()?)) Ok(TelemetryDataValue::Float64(self.file.read_data::<f64>()?))
} }
} }
@@ -354,9 +358,11 @@ impl HistorySegmentFile {
TelemetryDataType::Float32 => 4, TelemetryDataType::Float32 => 4,
TelemetryDataType::Float64 => 8, TelemetryDataType::Float64 => 8,
}; };
self.file.seek(SeekFrom::Start( let desired_position =
Self::HEADER_LENGTH + self.length * Self::TIMESTAMP_LENGTH + index * item_length, Self::HEADER_LENGTH + self.length * Self::TIMESTAMP_LENGTH + index * item_length;
))?; let seek_amount = desired_position as i64 - self.file_position;
self.file_position += seek_amount;
self.file.seek_relative(seek_amount)?;
self.read_telemetry_item(telemetry_data_type) self.read_telemetry_item(telemetry_data_type)
} }