diff --git a/Cargo.lock b/Cargo.lock index f6bcf4e..c3b2d1e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "actix-codec" @@ -1146,6 +1146,15 @@ version = "1.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775" +[[package]] +name = "papaya" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ce63bf9dca3eab259cffd421f05661b3386aee36276f5aed9f71450b98f5c5c" +dependencies = [ + "seize", +] + [[package]] name = "parking_lot" version = "0.12.3" @@ -1445,6 +1454,16 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" +[[package]] +name = "seize" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d84b0c858bdd30cb56f5597f8b3bf702ec23829e652cc636a1e5a7b9de46ae93" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "semver" version = "1.0.23" @@ -1507,6 +1526,7 @@ dependencies = [ "futures-util", "hex", "log", + "papaya", "prost", "rand", "serde", diff --git a/examples/simple_producer/src/main.rs b/examples/simple_producer/src/main.rs index 85760a5..26b6dcf 100644 --- a/examples/simple_producer/src/main.rs +++ b/examples/simple_producer/src/main.rs @@ -7,6 +7,7 @@ use server::core::{ }; use std::error::Error; use std::time::Duration; +use tokio::select; use tokio::sync::mpsc; use tokio::sync::mpsc::Sender; use tokio::time::Instant; @@ -38,30 +39,36 @@ impl Telemetry { let client_stored = client.clone(); let cancel = CancellationToken::new(); let cancel_stored = cancel.clone(); - let (local_tx, mut local_rx) = mpsc::channel(128); + let (local_tx, mut local_rx) = mpsc::channel(16); tokio::spawn(async move { while !cancel.is_cancelled() { - let (server_tx, server_rx) = mpsc::channel(1); + let (server_tx, server_rx) = mpsc::channel(16); let response_stream = client .insert_telemetry(ReceiverStream::new(server_rx)) .await; if let Ok(response_stream) = response_stream { let mut response_stream = response_stream.into_inner(); - while let Some(item) = local_rx.recv().await { - match server_tx.send(item).await { - Ok(_) => {} - Err(_) => break, - } - if let Some(response) = response_stream.next().await { - match response { - Ok(_) => {} - Err(_) => { - break; + loop { + select! { + _ = cancel.cancelled() => { + break; + }, + Some(item) = local_rx.recv() => { + match server_tx.send(item).await { + Ok(_) => {} + Err(_) => break, } - } - } else { - break; + }, + Some(response) = response_stream.next() => { + match response { + Ok(_) => {} + Err(_) => { + break; + } + } + }, + else => break, } } } else { @@ -136,6 +143,18 @@ impl TelemetryItemHandle { async fn main() -> Result<(), Box> { let mut tlm = Telemetry::new("http://[::1]:50051").await?; + let index_handle = tlm + .register("simple_producer/time_offset".into(), TelemetryDataType::Float64) + .await?; + + let publish_offset = tlm + .register("simple_producer/publish_offset".into(), TelemetryDataType::Float64) + .await?; + + let await_offset = tlm + .register("simple_producer/await_offset".into(), TelemetryDataType::Float64) + .await?; + let sin_tlm_handle = tlm .register("simple_producer/sin".into(), TelemetryDataType::Float32) .await?; @@ -187,84 +206,98 @@ async fn main() -> Result<(), Box> { }); } - let mut next_time = Instant::now(); + let start_time = chrono::Utc::now(); + let start_instant = Instant::now(); + let mut next_time = start_instant; let mut index = 0; + let mut tasks = vec![]; while !cancellation_token.is_cancelled() { next_time += Duration::from_millis(10); index += 1; tokio::time::sleep_until(next_time).await; - sin_tlm_handle + let publish_time = start_time + chrono::TimeDelta::from_std(next_time - start_instant).unwrap(); + let actual_time = Instant::now(); + tasks.push(index_handle + .publish( + Value::Float64((actual_time - next_time).as_secs_f64()), + chrono::Utc::now(), + )); + tasks.push(sin_tlm_handle .publish( Value::Float32((f32::TAU() * (index as f32) / (1000.0_f32)).sin()), - chrono::Utc::now(), - ) - .await?; - cos_tlm_handle + publish_time, + )); + tasks.push(cos_tlm_handle .publish( Value::Float64((f64::TAU() * (index as f64) / (1000.0_f64)).cos()), - chrono::Utc::now(), - ) - .await?; - sin2_tlm_handle + publish_time, + )); + tasks.push(sin2_tlm_handle .publish( Value::Float32((f32::TAU() * (index as f32) / (500.0_f32)).sin()), - chrono::Utc::now(), - ) - .await?; - cos2_tlm_handle + publish_time, + )); + tasks.push(cos2_tlm_handle .publish( Value::Float64((f64::TAU() * (index as f64) / (500.0_f64)).cos()), - chrono::Utc::now(), - ) - .await?; - sin3_tlm_handle + publish_time, + )); + tasks.push(sin3_tlm_handle .publish( Value::Float32((f32::TAU() * (index as f32) / (333.0_f32)).sin()), - chrono::Utc::now(), - ) - .await?; - cos3_tlm_handle + publish_time, + )); + tasks.push(cos3_tlm_handle .publish( Value::Float64((f64::TAU() * (index as f64) / (333.0_f64)).cos()), - chrono::Utc::now(), - ) - .await?; - sin4_tlm_handle + publish_time, + )); + tasks.push(sin4_tlm_handle .publish( Value::Float32((f32::TAU() * (index as f32) / (250.0_f32)).sin()), - chrono::Utc::now(), - ) - .await?; - cos4_tlm_handle + publish_time, + )); + tasks.push(cos4_tlm_handle .publish( Value::Float64((f64::TAU() * (index as f64) / (250.0_f64)).cos()), - chrono::Utc::now(), - ) - .await?; - sin5_tlm_handle + publish_time, + )); + tasks.push(sin5_tlm_handle .publish( Value::Float32((f32::TAU() * (index as f32) / (200.0_f32)).sin()), - chrono::Utc::now(), - ) - .await?; - cos5_tlm_handle + publish_time, + )); + tasks.push(cos5_tlm_handle .publish( Value::Float64((f64::TAU() * (index as f64) / (200.0_f64)).cos()), - chrono::Utc::now(), - ) - .await?; - sin6_tlm_handle + publish_time, + )); + tasks.push(sin6_tlm_handle .publish( Value::Float32((f32::TAU() * (index as f32) / (166.0_f32)).sin()), - chrono::Utc::now(), - ) - .await?; - cos6_tlm_handle + publish_time, + )); + tasks.push(cos6_tlm_handle .publish( Value::Float64((f64::TAU() * (index as f64) / (166.0_f64)).cos()), + publish_time, + )); + + tasks.push(publish_offset + .publish( + Value::Float64((Instant::now() - actual_time).as_secs_f64()), chrono::Utc::now(), - ) - .await?; + )); + + for task in tasks.drain(..) { + task.await?; + } + + tasks.push(await_offset + .publish( + Value::Float64((Instant::now() - actual_time).as_secs_f64()), + chrono::Utc::now(), + )); } Ok(()) diff --git a/frontend/src/components/GraphAxis.vue b/frontend/src/components/GraphAxis.vue index 66cde27..e9e9472 100644 --- a/frontend/src/components/GraphAxis.vue +++ b/frontend/src/components/GraphAxis.vue @@ -237,7 +237,7 @@ const lines = computed(() => { diff --git a/server/Cargo.toml b/server/Cargo.toml index b6b82c2..469129e 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -21,6 +21,7 @@ serde_json = "1.0.132" derive_more = { version = "1.0.0", features = ["full"] } hex = "0.4.3" futures-util = "0.3.31" +papaya = "0.1.7" [build-dependencies] tonic-build = "0.12.3" diff --git a/server/src/http.rs b/server/src/http.rs index e35be4c..3db6061 100644 --- a/server/src/http.rs +++ b/server/src/http.rs @@ -113,7 +113,10 @@ async fn websocket_connect( break; } Ok(_) = rx.changed() => { - let value = rx.borrow_and_update().clone(); + let value = { + let ref_val = rx.borrow_and_update(); + ref_val.clone() + }; let _ = tx.send(WebsocketResponse::TlmValue { uuid: uuid.clone(), value, diff --git a/server/src/telemetry.rs b/server/src/telemetry.rs index 980b0e5..e23d492 100644 --- a/server/src/telemetry.rs +++ b/server/src/telemetry.rs @@ -6,7 +6,7 @@ use std::collections::HashMap; use std::error::Error; use std::fmt::Formatter; use std::sync::Arc; -use tokio::sync::Mutex; +use tokio::sync::RwLock; fn tlm_data_type_serialzier( tlm_data_type: &TelemetryDataType, @@ -70,15 +70,15 @@ pub struct TelemetryData { } pub struct TelemetryManagementService { - uuid_mapping: Arc>>, - tlm_mapping: Arc>>, + uuid_mapping: Arc>>, + tlm_mapping: Arc>>, } impl TelemetryManagementService { pub fn new() -> Self { Self { - uuid_mapping: Arc::new(Mutex::new(HashMap::new())), - tlm_mapping: Arc::new(Mutex::new(HashMap::new())), + uuid_mapping: Arc::new(RwLock::new(HashMap::new())), + tlm_mapping: Arc::new(RwLock::new(HashMap::new())), } } @@ -86,10 +86,10 @@ impl TelemetryManagementService { &self, telemetry_definition_request: TelemetryDefinitionRequest, ) -> Result> { - let mut lock = self.uuid_mapping.lock().await; + let lock = self.uuid_mapping.read().await; if let Some(uuid) = lock.get(&telemetry_definition_request.name) { trace!("Telemetry Definition Found {:?}", uuid); - let tlm_lock = self.tlm_mapping.lock().await; + let tlm_lock = self.tlm_mapping.read().await; if let Some(TelemetryData { definition, .. }) = tlm_lock.get(uuid) { if definition.data_type != telemetry_definition_request.data_type() { return Err("A telemetry item of the same name already exists".into()); @@ -103,7 +103,9 @@ impl TelemetryManagementService { "Adding New Telemetry Definition {:?}", telemetry_definition_request ); - let mut tlm_lock = self.tlm_mapping.lock().await; + drop(lock); + let mut lock = self.uuid_mapping.write().await; + let mut tlm_lock = self.tlm_mapping.write().await; let uuid = Uuid::random().value; lock.insert(telemetry_definition_request.name.clone(), uuid.clone()); tlm_lock.insert( @@ -122,16 +124,13 @@ impl TelemetryManagementService { } pub async fn get_by_name(&self, name: &String) -> Option { - let uuid = { - let uuid_lock = self.uuid_mapping.lock().await; - uuid_lock.get(name).cloned()? - }; - - self.get_by_uuid(&uuid).await + let uuid_lock = self.uuid_mapping.read().await; + let uuid = uuid_lock.get(name)?; + self.get_by_uuid(uuid).await } pub async fn get_by_uuid(&self, uuid: &String) -> Option { - let tlm_lock = self.tlm_mapping.lock().await; + let tlm_lock = self.tlm_mapping.read().await; tlm_lock.get(uuid).cloned() } }