From fdd3f2c128241b0f48522daa08b2b1ea3ad10b32 Mon Sep 17 00:00:00 2001 From: Sergey Savelyev Date: Mon, 13 Jan 2025 19:38:21 -0800 Subject: [PATCH] improves server efficiency --- server/src/http/websocket/mod.rs | 55 ++++++---------- server/src/serialization/file_ext.rs | 16 +++-- server/src/serialization/primitives.rs | 5 +- server/src/telemetry/history.rs | 86 ++++++++++++++------------ 4 files changed, 78 insertions(+), 84 deletions(-) diff --git a/server/src/http/websocket/mod.rs b/server/src/http/websocket/mod.rs index e749650..6e2911c 100644 --- a/server/src/http/websocket/mod.rs +++ b/server/src/http/websocket/mod.rs @@ -1,16 +1,18 @@ -use std::collections::HashMap; -use crate::http::websocket::request::{RegisterTlmListenerRequest, UnregisterTlmListenerRequest, WebsocketRequest}; +use crate::http::websocket::request::{ + RegisterTlmListenerRequest, UnregisterTlmListenerRequest, WebsocketRequest, +}; use crate::http::websocket::response::{TlmValueResponse, WebsocketResponse}; use crate::telemetry::management_service::TelemetryManagementService; use actix_web::{rt, web, HttpRequest, HttpResponse}; use actix_ws::{AggregatedMessage, ProtocolError, Session}; use anyhow::anyhow; use log::{error, trace}; +use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; +use tokio::select; use tokio::sync::mpsc::Sender; -use tokio::time::{sleep, Instant}; -use tokio::{pin, select}; +use tokio::time::{sleep_until, Instant}; use tokio_util::sync::CancellationToken; use tonic::codegen::tokio_stream::StreamExt; @@ -32,48 +34,29 @@ fn handle_register_tlm_listener( let mut rx = tlm_data.data.subscribe(); let tx = tx.clone(); rt::spawn(async move { - let mut last_sent_at = Instant::now() - minimum_separation; - let mut last_value = None; - let sleep = sleep(Duration::from_millis(0)); - pin!(sleep); loop { - select! { - _ = tx.closed() => { - break; - } - _ = token.cancelled() => { - break; - } + let now = select! { + biased; + _ = tx.closed() => { break; } + _ = token.cancelled() => { break; } Ok(_) = rx.changed() => { let now = Instant::now(); let value = { let ref_val = rx.borrow_and_update(); ref_val.clone() }; - if last_sent_at + minimum_separation > now { - last_value = value; - sleep.as_mut().reset(last_sent_at + minimum_separation); - continue; - } else { - last_value = None; - last_sent_at = now; - } let _ = tx.send(TlmValueResponse { uuid: request.uuid.clone(), value, }.into()).await; + now } - () = &mut sleep => { - if let Some(value) = last_value { - let _ = tx.send(TlmValueResponse { - uuid: request.uuid.clone(), - value: Some(value), - }.into()).await; - } - last_value = None; - let now = Instant::now(); - last_sent_at = now; - } + }; + select! { + biased; + _ = tx.closed() => { break; } + _ = token.cancelled() => { break; } + _ = sleep_until(now + minimum_separation) => {} } } }); @@ -98,10 +81,10 @@ async fn handle_websocket_message( match request { WebsocketRequest::RegisterTlmListener(request) => { handle_register_tlm_listener(data, request, tx, tlm_listeners) - }, + } WebsocketRequest::UnregisterTlmListener(request) => { handle_unregister_tlm_listener(request, tlm_listeners) - }, + } }; } diff --git a/server/src/serialization/file_ext.rs b/server/src/serialization/file_ext.rs index 6282804..d85f410 100644 --- a/server/src/serialization/file_ext.rs +++ b/server/src/serialization/file_ext.rs @@ -1,23 +1,29 @@ -use std::fs::File; use std::io; +use std::io::{Read, Write}; pub trait FileWriteableType { - fn write_to_file(self, file: &mut File) -> io::Result<()>; + fn write_to_file(self, file: &mut impl Write) -> io::Result<()>; } pub trait FileReadableType: Sized { - fn read_from_file(file: &mut File) -> io::Result; + fn read_from_file(file: &mut impl Read) -> io::Result; } -pub trait FileExt { +pub trait WriteExt { fn write_data(&mut self, data: T) -> io::Result<()>; +} + +pub trait ReadExt { fn read_data(&mut self) -> io::Result; } -impl FileExt for File { +impl WriteExt for W { fn write_data(&mut self, data: T) -> io::Result<()> { data.write_to_file(self) } +} + +impl ReadExt for R { fn read_data(&mut self) -> io::Result { T::read_from_file(self) } diff --git a/server/src/serialization/primitives.rs b/server/src/serialization/primitives.rs index e20cb2f..3b3cd60 100644 --- a/server/src/serialization/primitives.rs +++ b/server/src/serialization/primitives.rs @@ -1,16 +1,15 @@ use crate::serialization::file_ext::{FileReadableType, FileWriteableType}; -use std::fs::File; use std::io::{Read, Write}; macro_rules! primitive_write_read { ( $primitive:ty, $length:expr ) => { impl FileWriteableType for $primitive { - fn write_to_file(self, file: &mut File) -> std::io::Result<()> { + fn write_to_file(self, file: &mut impl Write) -> std::io::Result<()> { file.write_all(&self.to_be_bytes()) } } impl FileReadableType for $primitive { - fn read_from_file(file: &mut File) -> std::io::Result { + fn read_from_file(file: &mut impl Read) -> std::io::Result { let mut buffer = [0u8; $length]; file.read_exact(&mut buffer)?; Ok(Self::from_be_bytes(buffer)) diff --git a/server/src/telemetry/history.rs b/server/src/telemetry/history.rs index 4754f01..102ada8 100644 --- a/server/src/telemetry/history.rs +++ b/server/src/telemetry/history.rs @@ -1,5 +1,5 @@ use crate::core::TelemetryDataType; -use crate::serialization::file_ext::FileExt; +use crate::serialization::file_ext::{ReadExt, WriteExt}; use crate::telemetry::data::TelemetryData; use crate::telemetry::data_item::TelemetryDataItem; use crate::telemetry::data_value::TelemetryDataValue; @@ -10,7 +10,7 @@ use log::{error, info}; use std::cmp::min; use std::collections::VecDeque; use std::fs::File; -use std::io::{Seek, SeekFrom, Write}; +use std::io::{BufReader, BufWriter, Seek, SeekFrom, Write}; use std::path::PathBuf; use std::sync::{Arc, RwLock}; use std::{fs, path}; @@ -137,7 +137,8 @@ struct HistorySegmentFile { start: DateTime, end: DateTime, length: u64, - file: File, + file: BufReader, + file_position: i64, } impl HistorySegmentFile { @@ -158,31 +159,16 @@ impl HistorySegmentFile { segment.start.to_rfc3339_opts(SecondsFormat::Secs, true) )); - let file = File::create(file)?; + let mut file = BufWriter::new(File::create(file)?); - let mut result = Self { - start: segment.start, - end: segment.end, - length: 0, - file, - }; - - let utc_offset_start = result.start - DateTime::UNIX_EPOCH; - let utc_offset_end = result.end - DateTime::UNIX_EPOCH; + let utc_offset_start = segment.start - DateTime::UNIX_EPOCH; + let utc_offset_end = segment.end - DateTime::UNIX_EPOCH; // Write the segment bounds - result - .file - .write_data::(utc_offset_start.num_seconds())?; - result - .file - .write_data::(utc_offset_start.subsec_nanos())?; - result - .file - .write_data::(utc_offset_end.num_seconds())?; - result - .file - .write_data::(utc_offset_end.subsec_nanos())?; + file.write_data::(utc_offset_start.num_seconds())?; + file.write_data::(utc_offset_start.subsec_nanos())?; + file.write_data::(utc_offset_end.num_seconds())?; + file.write_data::(utc_offset_end.subsec_nanos())?; let data = segment.data.get_mut().unwrap_or_else(|err| { error!( @@ -197,26 +183,35 @@ impl HistorySegmentFile { "Invalid Segment Cannot Be Saved to Disk" ); - result.length = data.timestamps.len() as u64; - result.file.write_data::(result.length)?; + let length = data.timestamps.len() as u64; + file.write_data::(length)?; // Write all the timestamps for timestamp in &data.timestamps { let utc_offset = *timestamp - DateTime::UNIX_EPOCH; - result.file.write_data::(utc_offset.num_seconds())?; - result.file.write_data::(utc_offset.subsec_nanos())?; + file.write_data::(utc_offset.num_seconds())?; + file.write_data::(utc_offset.subsec_nanos())?; } // Write all the values for value in &data.values { match value { - TelemetryDataValue::Float32(value) => result.file.write_data::(*value)?, - TelemetryDataValue::Float64(value) => result.file.write_data::(*value)?, + TelemetryDataValue::Float32(value) => file.write_data::(*value)?, + TelemetryDataValue::Float64(value) => file.write_data::(*value)?, } } - result.file.flush()?; - Ok(result) + file.flush()?; + + let mut file = BufReader::new(file.into_inner()?); + file.seek(SeekFrom::Start(0))?; + Ok(Self { + start: segment.start, + end: segment.end, + length: 0, + file, + file_position: 0, + }) } fn load_to_ram( @@ -229,6 +224,7 @@ impl HistorySegmentFile { }; self.file.seek(SeekFrom::Start(Self::HEADER_LENGTH))?; + self.file_position = Self::HEADER_LENGTH as i64; for _ in 0..self.length { segment_data.timestamps.push(self.read_date_time()?); } @@ -255,7 +251,7 @@ impl HistorySegmentFile { start.to_rfc3339_opts(SecondsFormat::Secs, true) )); - let mut file = File::open(file)?; + let mut file = BufReader::new(File::open(file)?); // Write the segment bounds let start_seconds = file.read_data::()?; @@ -269,11 +265,13 @@ impl HistorySegmentFile { let length = file.read_data::()?; + file.seek(SeekFrom::Start(0))?; Ok(HistorySegmentFile { start: DateTime::UNIX_EPOCH + start, end: DateTime::UNIX_EPOCH + end, length, file, + file_position: 0, }) } @@ -284,6 +282,8 @@ impl HistorySegmentFile { maximum_resolution: TimeDelta, telemetry_data_type: TelemetryDataType, ) -> anyhow::Result<(DateTime, Vec)> { + self.file_position = 0; + self.file.seek(SeekFrom::Start(0))?; let mut result = vec![]; let mut next_from = from; @@ -319,15 +319,17 @@ impl HistorySegmentFile { fn read_date_time(&mut self) -> anyhow::Result> { let seconds = self.file.read_data::()?; let nanos = self.file.read_data::()?; + self.file_position += 8 + 4; let start = TimeDelta::new(seconds, nanos as u32).context("Failed to reconstruct TimeDelta")?; Ok(DateTime::UNIX_EPOCH + start) } fn get_date_time(&mut self, index: u64) -> anyhow::Result> { - self.file.seek(SeekFrom::Start( - Self::HEADER_LENGTH + index * Self::TIMESTAMP_LENGTH, - ))?; + let desired_position = Self::HEADER_LENGTH + index * Self::TIMESTAMP_LENGTH; + let seek_amount = desired_position as i64 - self.file_position; + self.file_position += seek_amount; + self.file.seek_relative(seek_amount)?; self.read_date_time() } @@ -337,9 +339,11 @@ impl HistorySegmentFile { ) -> anyhow::Result { match telemetry_data_type { TelemetryDataType::Float32 => { + self.file_position += 4; Ok(TelemetryDataValue::Float32(self.file.read_data::()?)) } TelemetryDataType::Float64 => { + self.file_position += 8; Ok(TelemetryDataValue::Float64(self.file.read_data::()?)) } } @@ -354,9 +358,11 @@ impl HistorySegmentFile { TelemetryDataType::Float32 => 4, TelemetryDataType::Float64 => 8, }; - self.file.seek(SeekFrom::Start( - Self::HEADER_LENGTH + self.length * Self::TIMESTAMP_LENGTH + index * item_length, - ))?; + let desired_position = + Self::HEADER_LENGTH + self.length * Self::TIMESTAMP_LENGTH + index * item_length; + let seek_amount = desired_position as i64 - self.file_position; + self.file_position += seek_amount; + self.file.seek_relative(seek_amount)?; self.read_telemetry_item(telemetry_data_type) }