allows scrolling backwards through history
This commit is contained in:
@@ -6,7 +6,7 @@ use crate::http::api::setup_api;
|
||||
use crate::http::websocket::setup_websocket;
|
||||
use crate::telemetry::management_service::TelemetryManagementService;
|
||||
use actix_web::{web, App, HttpServer};
|
||||
use log::{error, info};
|
||||
use log::info;
|
||||
use std::sync::Arc;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
@@ -29,7 +29,5 @@ pub async fn setup(
|
||||
.run()
|
||||
.await?;
|
||||
|
||||
error!("http setup end");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use crate::http::websocket::request::{RegisterTlmListenerRequest, WebsocketRequest};
|
||||
use std::collections::HashMap;
|
||||
use crate::http::websocket::request::{RegisterTlmListenerRequest, UnregisterTlmListenerRequest, WebsocketRequest};
|
||||
use crate::http::websocket::response::{TlmValueResponse, WebsocketResponse};
|
||||
use crate::telemetry::management_service::TelemetryManagementService;
|
||||
use actix_web::{rt, web, HttpRequest, HttpResponse};
|
||||
@@ -20,8 +21,13 @@ fn handle_register_tlm_listener(
|
||||
data: &Arc<TelemetryManagementService>,
|
||||
request: RegisterTlmListenerRequest,
|
||||
tx: &Sender<WebsocketResponse>,
|
||||
tlm_listeners: &mut HashMap<String, CancellationToken>,
|
||||
) {
|
||||
if let Some(tlm_data) = data.get_by_uuid(&request.uuid) {
|
||||
let token = CancellationToken::new();
|
||||
if let Some(token) = tlm_listeners.insert(tlm_data.definition.uuid.clone(), token.clone()) {
|
||||
token.cancel();
|
||||
}
|
||||
let minimum_separation = Duration::from_millis(request.minimum_separation_ms as u64);
|
||||
let mut rx = tlm_data.data.subscribe();
|
||||
let tx = tx.clone();
|
||||
@@ -35,6 +41,9 @@ fn handle_register_tlm_listener(
|
||||
_ = tx.closed() => {
|
||||
break;
|
||||
}
|
||||
_ = token.cancelled() => {
|
||||
break;
|
||||
}
|
||||
Ok(_) = rx.changed() => {
|
||||
let now = Instant::now();
|
||||
let value = {
|
||||
@@ -71,15 +80,28 @@ fn handle_register_tlm_listener(
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_unregister_tlm_listener(
|
||||
request: UnregisterTlmListenerRequest,
|
||||
tlm_listeners: &mut HashMap<String, CancellationToken>,
|
||||
) {
|
||||
if let Some(token) = tlm_listeners.remove(&request.uuid) {
|
||||
token.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_websocket_message(
|
||||
data: &Arc<TelemetryManagementService>,
|
||||
request: WebsocketRequest,
|
||||
tx: &Sender<WebsocketResponse>,
|
||||
tlm_listeners: &mut HashMap<String, CancellationToken>,
|
||||
) {
|
||||
match request {
|
||||
WebsocketRequest::RegisterTlmListener(request) => {
|
||||
handle_register_tlm_listener(data, request, tx)
|
||||
}
|
||||
handle_register_tlm_listener(data, request, tx, tlm_listeners)
|
||||
},
|
||||
WebsocketRequest::UnregisterTlmListener(request) => {
|
||||
handle_unregister_tlm_listener(request, tlm_listeners)
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
@@ -105,12 +127,13 @@ async fn handle_websocket_incoming(
|
||||
data: &Arc<TelemetryManagementService>,
|
||||
session: &mut Session,
|
||||
tx: &Sender<WebsocketResponse>,
|
||||
tlm_listeners: &mut HashMap<String, CancellationToken>,
|
||||
) -> anyhow::Result<bool> {
|
||||
match msg {
|
||||
Ok(AggregatedMessage::Close(_)) => Ok(false),
|
||||
Ok(AggregatedMessage::Text(msg)) => match serde_json::from_str::<WebsocketRequest>(&msg) {
|
||||
Ok(request) => {
|
||||
handle_websocket_message(data, request, tx).await;
|
||||
handle_websocket_message(data, request, tx, tlm_listeners).await;
|
||||
Ok(true)
|
||||
}
|
||||
Err(err) => Err(anyhow!("JSON Deserialization Error Encountered {err}")),
|
||||
@@ -141,12 +164,13 @@ pub async fn websocket_connect(
|
||||
let cancel_token = cancel_token.get_ref().clone();
|
||||
|
||||
rt::spawn(async move {
|
||||
let mut tlm_listeners = HashMap::new();
|
||||
let (tx, mut rx) = tokio::sync::mpsc::channel::<WebsocketResponse>(128);
|
||||
loop {
|
||||
let result = select! {
|
||||
_ = cancel_token.cancelled() => Ok(false),
|
||||
Some(msg) = rx.recv() => handle_websocket_response(msg, &mut session).await,
|
||||
Some(msg) = stream.next() => handle_websocket_incoming(msg, data.get_ref(), &mut session, &tx).await,
|
||||
Some(msg) = stream.next() => handle_websocket_incoming(msg, data.get_ref(), &mut session, &tx, &mut tlm_listeners).await,
|
||||
else => Ok(false),
|
||||
};
|
||||
match result {
|
||||
@@ -160,6 +184,9 @@ pub async fn websocket_connect(
|
||||
}
|
||||
rx.close();
|
||||
let _ = session.close(None).await;
|
||||
for (_, token) in tlm_listeners.drain() {
|
||||
token.cancel();
|
||||
}
|
||||
});
|
||||
|
||||
Ok(res)
|
||||
|
||||
@@ -7,7 +7,13 @@ pub struct RegisterTlmListenerRequest {
|
||||
pub minimum_separation_ms: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct UnregisterTlmListenerRequest {
|
||||
pub uuid: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, From)]
|
||||
pub enum WebsocketRequest {
|
||||
RegisterTlmListener(RegisterTlmListenerRequest),
|
||||
UnregisterTlmListener(UnregisterTlmListenerRequest),
|
||||
}
|
||||
|
||||
@@ -38,8 +38,6 @@ pub async fn setup() -> anyhow::Result<()> {
|
||||
grpc_server.await?; //grpc server is dropped
|
||||
drop(cancellation_token); // All cancellation tokens are now dropped
|
||||
|
||||
error!("after awaits");
|
||||
|
||||
// Perform cleanup functions - at this point all servers have stopped and we can be sure that cleaning things up is safe
|
||||
for _ in 0..15 {
|
||||
if Arc::strong_count(&tlm) != 1 {
|
||||
|
||||
Reference in New Issue
Block a user