allows restricting the streaming speed for the frontend

This commit is contained in:
2024-12-30 13:23:21 -05:00
parent be85ea3aa6
commit 10e80a0c2d
9 changed files with 234 additions and 175 deletions

View File

@@ -33,7 +33,6 @@ impl TelemetryService for CoreTelemetryService {
trace!("CoreTelemetryService::new_telemetry");
self.tlm_management
.register(request.into_inner())
.await
.map(|uuid| {
Response::new(TelemetryDefinitionResponse {
uuid: Some(Uuid { value: uuid }),
@@ -66,7 +65,7 @@ impl TelemetryService for CoreTelemetryService {
match message {
Ok(tlm_item) => {
tx
.send(Self::handle_new_tlm_item(&tlm_management, &tlm_item).await)
.send(Self::handle_new_tlm_item(&tlm_management, &tlm_item))
.await
.expect("working rx");
}
@@ -85,7 +84,7 @@ impl TelemetryService for CoreTelemetryService {
}
impl CoreTelemetryService {
async fn handle_new_tlm_item(
fn handle_new_tlm_item(
tlm_management: &Arc<TelemetryManagementService>,
tlm_item: &TelemetryItem,
) -> Result<TelemetryInsertResponse, Status> {
@@ -93,7 +92,7 @@ impl CoreTelemetryService {
let Some(ref uuid) = tlm_item.uuid else {
return Err(Status::failed_precondition("UUID Missing"));
};
let Some(tlm_data) = tlm_management.get_by_uuid(&uuid.value).await else {
let Some(tlm_data) = tlm_management.get_by_uuid(&uuid.value) else {
return Err(Status::not_found("Telemetry Item Not Found"));
};

View File

@@ -7,7 +7,10 @@ use derive_more::{Display, Error};
use log::{error, trace};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::select;
use std::time::Duration;
use tokio::{pin, select};
use tokio::sync::mpsc::Sender;
use tokio::time::{sleep, Instant};
use tokio_util::sync::CancellationToken;
use tonic::codegen::tokio_stream::StreamExt;
@@ -30,9 +33,15 @@ impl error::ResponseError for UserError {
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct RegisterTlmListenerRequest {
uuid: String,
minimum_separation_ms: u32,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
enum WebsocketRequest {
RegisterTlmListener { uuid: String },
RegisterTlmListener(RegisterTlmListenerRequest),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -50,13 +59,70 @@ async fn get_tlm_definition(
) -> Result<impl Responder, UserError> {
let string = name.to_string();
trace!("get_tlm_definition {}", string);
let Some(data) = data.get_by_name(&string).await else {
let Some(data) = data.get_by_name(&string) else {
return Err(UserError::TlmNotFound { tlm: string });
};
Ok(web::Json(data.definition.clone()))
}
fn handle_register_tlm_listener(data: &Arc<TelemetryManagementService>, request: RegisterTlmListenerRequest, tx: &Sender<WebsocketResponse>) {
if let Some(tlm_data) = data.get_by_uuid(&request.uuid) {
let minimum_separation = Duration::from_millis(request.minimum_separation_ms as u64);
let mut rx = tlm_data.data.subscribe();
let tx = tx.clone();
rt::spawn(async move {
let mut last_sent_at = Instant::now() - minimum_separation;
let mut last_value = None;
let sleep = sleep(Duration::from_millis(0));
pin!(sleep);
loop {
select! {
_ = tx.closed() => {
break;
}
Ok(_) = rx.changed() => {
let now = Instant::now();
let value = {
let ref_val = rx.borrow_and_update();
ref_val.clone()
};
if last_sent_at + minimum_separation > now {
last_value = value;
sleep.as_mut().reset(last_sent_at + minimum_separation);
continue;
} else {
last_value = None;
last_sent_at = now;
}
let _ = tx.send(WebsocketResponse::TlmValue {
uuid: request.uuid.clone(),
value,
}).await;
}
() = &mut sleep => {
if let Some(value) = last_value {
let _ = tx.send(WebsocketResponse::TlmValue {
uuid: request.uuid.clone(),
value: Some(value),
}).await;
}
last_value = None;
let now = Instant::now();
last_sent_at = now;
}
}
}
});
}
}
async fn handle_websocket_message(data: &Arc<TelemetryManagementService>, request: WebsocketRequest, tx: &Sender<WebsocketResponse>) {
match request {
WebsocketRequest::RegisterTlmListener(request) => handle_register_tlm_listener(data, request, tx),
};
}
async fn websocket_connect(
req: HttpRequest,
stream: web::Payload,
@@ -101,33 +167,7 @@ async fn websocket_connect(
Ok(AggregatedMessage::Text(msg)) => {
match serde_json::from_str::<WebsocketRequest>(&msg) {
Ok(request) => {
match request {
WebsocketRequest::RegisterTlmListener{ uuid } => {
if let Some(tlm_data) = data.get_by_uuid(&uuid).await {
let mut rx = tlm_data.data.subscribe();
let tx = tx.clone();
rt::spawn(async move {
loop {
select! {
_ = tx.closed() => {
break;
}
Ok(_) = rx.changed() => {
let value = {
let ref_val = rx.borrow_and_update();
ref_val.clone()
};
let _ = tx.send(WebsocketResponse::TlmValue {
uuid: uuid.clone(),
value,
}).await;
}
}
}
});
}
}
}
handle_websocket_message(data.get_ref(), request, &tx).await;
},
Err(err) => {
error!("JSON Deserialization Error Encountered {err}");
@@ -184,9 +224,9 @@ pub async fn setup(
.route("/ws", web::get().to(websocket_connect))
.service(web::scope("/api").configure(setup_api))
})
.bind("localhost:8080")?
.run()
.await?;
.bind("localhost:8080")?
.run()
.await?;
Ok(())
}

View File

@@ -1,12 +1,9 @@
use crate::core::{TelemetryDataType, TelemetryDefinitionRequest, Uuid};
use log::trace;
use serde::de::Visitor;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use std::collections::HashMap;
use std::error::Error;
use std::fmt::Formatter;
use std::sync::Arc;
use tokio::sync::RwLock;
use papaya::HashMap;
fn tlm_data_type_serialzier<S>(
tlm_data_type: &TelemetryDataType,
@@ -70,67 +67,47 @@ pub struct TelemetryData {
}
pub struct TelemetryManagementService {
uuid_mapping: Arc<RwLock<HashMap<String, String>>>,
tlm_mapping: Arc<RwLock<HashMap<String, TelemetryData>>>,
uuid_index: HashMap<String, String>,
tlm_data: HashMap<String, TelemetryData>,
}
impl TelemetryManagementService {
pub fn new() -> Self {
Self {
uuid_mapping: Arc::new(RwLock::new(HashMap::new())),
tlm_mapping: Arc::new(RwLock::new(HashMap::new())),
uuid_index: HashMap::new(),
tlm_data: HashMap::new(),
}
}
pub async fn register(
pub fn register(
&self,
telemetry_definition_request: TelemetryDefinitionRequest,
) -> Result<String, Box<dyn Error>> {
let lock = self.uuid_mapping.read().await;
if let Some(uuid) = lock.get(&telemetry_definition_request.name) {
trace!("Telemetry Definition Found {:?}", uuid);
let tlm_lock = self.tlm_mapping.read().await;
if let Some(TelemetryData { definition, .. }) = tlm_lock.get(uuid) {
if definition.data_type != telemetry_definition_request.data_type() {
return Err("A telemetry item of the same name already exists".into());
}
Ok(uuid.clone())
} else {
Err("Could not find Telemetry Data".into())
}
} else {
trace!(
"Adding New Telemetry Definition {:?}",
telemetry_definition_request
);
drop(lock);
let mut lock = self.uuid_mapping.write().await;
let mut tlm_lock = self.tlm_mapping.write().await;
let uuid = Uuid::random().value;
lock.insert(telemetry_definition_request.name.clone(), uuid.clone());
tlm_lock.insert(
uuid.clone(),
TelemetryData {
definition: TelemetryDefinition {
uuid: uuid.clone(),
name: telemetry_definition_request.name.clone(),
data_type: telemetry_definition_request.data_type(),
},
data: tokio::sync::watch::channel(None).0,
},
);
Ok(uuid)
}
let uuid_index = self.uuid_index.pin();
let tlm_data = self.tlm_data.pin();
let uuid = uuid_index.get_or_insert_with(telemetry_definition_request.name.clone(), || Uuid::random().value).clone();
let _ = tlm_data.try_insert(uuid.clone(), TelemetryData {
definition: TelemetryDefinition {
uuid: uuid.clone(),
name: telemetry_definition_request.name.clone(),
data_type: telemetry_definition_request.data_type(),
},
data: tokio::sync::watch::channel(None).0,
});
Ok(uuid)
}
pub async fn get_by_name(&self, name: &String) -> Option<TelemetryData> {
let uuid_lock = self.uuid_mapping.read().await;
let uuid = uuid_lock.get(name)?;
self.get_by_uuid(uuid).await
pub fn get_by_name(&self, name: &String) -> Option<TelemetryData> {
let uuid_index = self.uuid_index.pin();
let uuid = uuid_index.get(name)?;
self.get_by_uuid(uuid)
}
pub async fn get_by_uuid(&self, uuid: &String) -> Option<TelemetryData> {
let tlm_lock = self.tlm_mapping.read().await;
tlm_lock.get(uuid).cloned()
pub fn get_by_uuid(&self, uuid: &String) -> Option<TelemetryData> {
let tlm_data = self.tlm_data.pin();
tlm_data.get(uuid).cloned()
}
}