websocket to web
This commit is contained in:
@@ -1,10 +1,15 @@
|
||||
use actix_web::http::header::ContentType;
|
||||
use actix_web::http::StatusCode;
|
||||
use actix_web::{error, get, web, App, HttpResponse, HttpServer, Responder};
|
||||
use actix_web::{rt, error, get, web, App, HttpRequest, HttpResponse, HttpServer, Responder};
|
||||
use derive_more::{Display, Error};
|
||||
use std::sync::Arc;
|
||||
use log::trace;
|
||||
use crate::telemetry::TelemetryManagementService;
|
||||
use actix_ws::AggregatedMessage;
|
||||
use log::{error, trace};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::select;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tonic::codegen::tokio_stream::StreamExt;
|
||||
use crate::telemetry::{TelemetryDataValue, TelemetryManagementService};
|
||||
|
||||
#[derive(Debug, Display, Error)]
|
||||
enum UserError {
|
||||
@@ -25,6 +30,21 @@ impl error::ResponseError for UserError {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
enum WebsocketRequest {
|
||||
RegisterTlmListener {
|
||||
uuid: String
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
enum WebsocketResponse {
|
||||
TlmValue {
|
||||
uuid: String,
|
||||
value: Option<TelemetryDataValue>
|
||||
}
|
||||
}
|
||||
|
||||
#[get("/tlm/{name:[\\w\\d/_-]+}")]
|
||||
async fn get_tlm_definition(data: web::Data<Arc<TelemetryManagementService>>, name: web::Path<String>) -> Result<impl Responder, UserError> {
|
||||
let string = name.to_string();
|
||||
@@ -36,18 +56,121 @@ async fn get_tlm_definition(data: web::Data<Arc<TelemetryManagementService>>, na
|
||||
Ok(web::Json(data.definition.clone()))
|
||||
}
|
||||
|
||||
async fn websocket_connect(req: HttpRequest, stream: web::Payload, data: web::Data<Arc<TelemetryManagementService>>, cancel_token: web::Data<CancellationToken>) -> Result<HttpResponse, actix_web::Error> {
|
||||
trace!("websocket_connect");
|
||||
let (res, mut session, stream) = actix_ws::handle(&req, stream)?;
|
||||
|
||||
let mut stream = stream
|
||||
.aggregate_continuations()
|
||||
// up to 1 MiB
|
||||
.max_continuation_size(2_usize.pow(20));
|
||||
|
||||
let cancel_token = cancel_token.get_ref().clone();
|
||||
|
||||
rt::spawn(async move {
|
||||
let (tx, mut rx) = tokio::sync::mpsc::channel::<WebsocketResponse>(128);
|
||||
loop {
|
||||
select! {
|
||||
_ = cancel_token.cancelled() => {
|
||||
break;
|
||||
},
|
||||
Some(msg) = rx.recv() => {
|
||||
let msg_json = match serde_json::to_string(&msg) {
|
||||
Ok(msg_json) => msg_json,
|
||||
Err(err) => {
|
||||
error!("JSON Serialization Error Encountered {err}");
|
||||
break;
|
||||
},
|
||||
};
|
||||
if let Err(err) = session.text(msg_json).await {
|
||||
error!("Tx Error Encountered {err}");
|
||||
break;
|
||||
}
|
||||
},
|
||||
Some(msg) = stream.next() => {
|
||||
let result = match msg {
|
||||
Ok(AggregatedMessage::Close(_)) => {
|
||||
break;
|
||||
},
|
||||
Ok(AggregatedMessage::Text(msg)) => {
|
||||
match serde_json::from_str::<WebsocketRequest>(&msg) {
|
||||
Ok(request) => {
|
||||
match request {
|
||||
WebsocketRequest::RegisterTlmListener{ uuid } => {
|
||||
if let Some(tlm_data) = data.get_by_uuid(&uuid).await {
|
||||
let mut rx = tlm_data.data.subscribe();
|
||||
let tx = tx.clone();
|
||||
rt::spawn(async move {
|
||||
loop {
|
||||
select! {
|
||||
_ = tx.closed() => {
|
||||
break;
|
||||
}
|
||||
Ok(_) = rx.changed() => {
|
||||
let value = rx.borrow_and_update().clone();
|
||||
let _ = tx.send(WebsocketResponse::TlmValue {
|
||||
uuid: uuid.clone(),
|
||||
value,
|
||||
}).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
Err(err) => {
|
||||
error!("JSON Deserialization Error Encountered {err}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
},
|
||||
Ok(AggregatedMessage::Ping(msg)) => {
|
||||
session.pong(&msg).await
|
||||
},
|
||||
Err(err) => {
|
||||
error!("Rx Error Encountered {err}");
|
||||
break;
|
||||
},
|
||||
_ => {
|
||||
error!("Unexpected Message");
|
||||
break;
|
||||
}
|
||||
};
|
||||
if let Err(err) = result {
|
||||
error!("Tx Error Encountered {err}");
|
||||
break;
|
||||
}
|
||||
},
|
||||
else => {
|
||||
break;
|
||||
},
|
||||
}
|
||||
}
|
||||
rx.close();
|
||||
let _ = session.close(None).await;
|
||||
});
|
||||
|
||||
Ok(res)
|
||||
}
|
||||
|
||||
fn setup_api(cfg: &mut web::ServiceConfig) {
|
||||
cfg
|
||||
.service(get_tlm_definition);
|
||||
}
|
||||
|
||||
pub async fn setup(telemetry_definitions: Arc<TelemetryManagementService>) -> Result<(), Box<dyn Error>> {
|
||||
pub async fn setup(cancellation_token: CancellationToken, telemetry_definitions: Arc<TelemetryManagementService>) -> Result<(), Box<dyn Error>> {
|
||||
let data = web::Data::new(telemetry_definitions);
|
||||
let cancel_token = web::Data::new(cancellation_token);
|
||||
|
||||
trace!("Starting HTTP Server");
|
||||
HttpServer::new(move || {
|
||||
App::new()
|
||||
.app_data(data.clone())
|
||||
.app_data(cancel_token.clone())
|
||||
.route("/ws", web::get().to(websocket_connect))
|
||||
.service(web::scope("/api").configure(setup_api))
|
||||
})
|
||||
.bind("localhost:8080")?
|
||||
|
||||
@@ -26,7 +26,7 @@ pub async fn setup() -> Result<(), Box<dyn Error>> {
|
||||
|
||||
let grpc_server = grpc::setup(cancellation_token.clone(), tlm.clone())?;
|
||||
|
||||
let result = http::setup(tlm).await;
|
||||
let result = http::setup(cancellation_token.clone(), tlm).await;
|
||||
cancellation_token.cancel();
|
||||
result?;
|
||||
grpc_server.await?;
|
||||
|
||||
Reference in New Issue
Block a user