From a3aeff1d6fbd2c2666574ae85b1d4cdbc3c82c8e Mon Sep 17 00:00:00 2001 From: Sergey Savelyev Date: Tue, 30 Dec 2025 18:33:42 -0500 Subject: [PATCH] improve reconnect logic --- Cargo.lock | 152 +++++++++++++++++++++++++-- Cargo.toml | 1 + api/Cargo.toml | 2 +- api/src/client/mod.rs | 53 +++++++++- api/src/messages/mod.rs | 4 + examples/simple_command/Cargo.toml | 2 + examples/simple_command/src/main.rs | 109 +++++++++++++------ examples/simple_producer/Cargo.toml | 2 + examples/simple_producer/src/main.rs | 67 ++++++++++-- 9 files changed, 342 insertions(+), 50 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 6f47e24..99b6f8b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -245,6 +245,56 @@ dependencies = [ "libc", ] +[[package]] +name = "anstream" +version = "0.6.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43d5b281e737544384e969a5ccad3f1cdd24b48086a0fc1b2a5262a26b8f4f4a" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "is_terminal_polyfill", + "utf8parse", +] + +[[package]] +name = "anstyle" +version = "1.0.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5192cca8006f1fd4f7237516f40fa183bb07f8fbdfedaa0036de5ea9b0b45e78" + +[[package]] +name = "anstyle-parse" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e7644824f0aa2c7b9384579234ef10eb7efb6a0deb83f9630a49594dd9c15c2" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" +dependencies = [ + "windows-sys 0.61.2", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" +dependencies = [ + "anstyle", + "once_cell_polyfill", + "windows-sys 0.61.2", +] + [[package]] name = "anyhow" version = "1.0.100" @@ -392,6 +442,12 @@ dependencies = [ "windows-link", ] +[[package]] +name = "colorchoice" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" + [[package]] name = "concurrent-queue" version = "2.5.0" @@ -597,6 +653,29 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "env_filter" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bf3c259d255ca70051b30e2e95b5446cdb8949ac4cd22c0d7fd634d89f568e2" +dependencies = [ + "log", + "regex", +] + +[[package]] +name = "env_logger" +version = "0.11.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13c863f0904021b108aa8b2f55046443e6b1ebde8fd4a15c399893aae4fa069f" +dependencies = [ + "anstream", + "anstyle", + "env_filter", + "jiff", + "log", +] + [[package]] name = "equivalent" version = "1.0.1" @@ -1052,12 +1131,42 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "is_terminal_polyfill" +version = "1.70.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695" + [[package]] name = "itoa" version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" +[[package]] +name = "jiff" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a87d9b8105c23642f50cbbae03d1f75d8422c5cb98ce7ee9271f7ff7505be6b8" +dependencies = [ + "jiff-static", + "log", + "portable-atomic", + "portable-atomic-util", + "serde_core", +] + +[[package]] +name = "jiff-static" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b787bebb543f8969132630c51fd0afab173a86c6abae56ff3b9e5e3e3f9f6e58" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "jobserver" version = "0.1.32" @@ -1267,6 +1376,12 @@ version = "1.21.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" +[[package]] +name = "once_cell_polyfill" +version = "1.70.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" + [[package]] name = "openssl-probe" version = "0.2.0" @@ -1372,6 +1487,21 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "953ec861398dccce10c670dfeaf3ec4911ca479e9c02154b3a215178c5f566f2" +[[package]] +name = "portable-atomic" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f89776e4d69bb58bc6993e99ffa1d11f228b839984854c7daeb5d37f87cbe950" + +[[package]] +name = "portable-atomic-util" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8a2f0d8d040d7848a709caf78912debcc3f33ee4b3cac47d73d1e1069e83507" +dependencies = [ + "portable-atomic", +] + [[package]] name = "potential_utf" version = "0.1.4" @@ -1398,18 +1528,18 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.92" +version = "1.0.104" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37d3544b3f2748c54e147655edb5025752e2303145b5aefb3c3ea2c78b973bb0" +checksum = "9695f8df41bb4f3d222c95a67532365f569318332d03d5f3f67f37b20e6ebdf0" dependencies = [ "unicode-ident", ] [[package]] name = "quote" -version = "1.0.37" +version = "1.0.42" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5b9d34b8991d19d98081b46eacdd8eb58c6f2b201139f7c5f643cc155a633af" +checksum = "a338cc41d27e6cc6dce6cefc13a0729dfbb81c262b1f519331575dd80ef3067f" dependencies = [ "proc-macro2", ] @@ -1805,6 +1935,8 @@ version = "0.0.0" dependencies = [ "anyhow", "api", + "env_logger", + "log", "tokio", "tokio-util", ] @@ -1816,7 +1948,9 @@ dependencies = [ "anyhow", "api", "chrono", + "env_logger", "futures-util", + "log", "num-traits", "tokio", "tokio-util", @@ -2095,9 +2229,9 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" [[package]] name = "syn" -version = "2.0.93" +version = "2.0.112" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c786062daee0d6db1132800e623df74274a0a87322d8e183338e01b3d98d058" +checksum = "21f182278bf2d2bcb3c88b1b08a37df029d71ce3d3ae26168e3c653b213b99d4" dependencies = [ "proc-macro2", "quote", @@ -2395,6 +2529,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" +[[package]] +name = "utf8parse" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" + [[package]] name = "uuid" version = "1.19.0" diff --git a/Cargo.toml b/Cargo.toml index 6581938..4133001 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ actix-ws = "0.3.0" anyhow = "1.0.100" chrono = { version = "0.4.42" } derive_more = { version = "2.1.1" } +env_logger = "0.11.8" fern = "0.7.1" futures-util = "0.3.31" log = "0.4.29" diff --git a/api/Cargo.toml b/api/Cargo.toml index 3c84ef6..666a009 100644 --- a/api/Cargo.toml +++ b/api/Cargo.toml @@ -12,7 +12,7 @@ serde = { workspace = true, features = ["derive"] } derive_more = { workspace = true, features = ["from", "try_into"] } uuid = { workspace = true, features = ["serde"] } chrono = { workspace = true, features = ["serde"] } -tokio = { workspace = true, features = ["rt", "macros"] } +tokio = { workspace = true, features = ["rt", "macros", "time"] } tokio-tungstenite = { workspace = true, features = ["rustls-tls-native-roots"] } tokio-util = { workspace = true } futures-util = { workspace = true } diff --git a/api/src/client/mod.rs b/api/src/client/mod.rs index c11a905..e02c41c 100644 --- a/api/src/client/mod.rs +++ b/api/src/client/mod.rs @@ -10,13 +10,15 @@ use crate::messages::{ use error::ConnectError; use futures_util::stream::StreamExt; use futures_util::SinkExt; -use log::{debug, error, warn}; +use log::{debug, error, info, trace, warn}; use std::collections::HashMap; use std::sync::mpsc::sync_channel; use std::sync::Arc; use std::thread; +use std::time::Duration; use tokio::net::TcpStream; -use tokio::sync::{mpsc, oneshot, RwLock, RwLockWriteGuard}; +use tokio::sync::{mpsc, oneshot, watch, RwLock, RwLockWriteGuard}; +use tokio::time::sleep; use tokio::{select, spawn}; use tokio_tungstenite::tungstenite::client::IntoClientRequest; use tokio_tungstenite::tungstenite::handshake::client::Request; @@ -42,11 +44,13 @@ struct OutgoingMessage { pub struct Client { cancel: CancellationToken, channel: ClientChannel, + connected_state_rx: watch::Receiver, } struct ClientContext { cancel: CancellationToken, request: Request, + connected_state_tx: watch::Sender, } impl Client { @@ -57,14 +61,20 @@ impl Client { let (tx, _rx) = mpsc::channel(1); let cancel = CancellationToken::new(); let channel = Arc::new(RwLock::new(tx)); + let (connected_state_tx, connected_state_rx) = watch::channel(false); let context = ClientContext { cancel: cancel.clone(), request: request.into_client_request()?, + connected_state_tx, }; context.start(channel.clone())?; - Ok(Self { cancel, channel }) + Ok(Self { + cancel, + channel, + connected_state_rx, + }) } pub async fn send_message(&self, msg: M) -> Result<(), MessageError> { @@ -226,6 +236,28 @@ impl Client { Ok(()) } + + pub async fn wait_connected(&self) { + let mut connected_rx = self.connected_state_rx.clone(); + + // If we aren't currently connected + if !*connected_rx.borrow_and_update() { + // Wait for a change notification + // If the channel is closed there is nothing we can do + let _ = connected_rx.changed().await; + } + } + + pub async fn wait_disconnected(&self) { + let mut connected_rx = self.connected_state_rx.clone(); + + // If we are currently connected + if *connected_rx.borrow_and_update() { + // Wait for a change notification + // If the channel is closed there is nothing we can do + let _ = connected_rx.changed().await; + } + } } impl ClientContext { @@ -263,24 +295,33 @@ impl ClientContext { mut write_lock: RwLockWriteGuard<'a, mpsc::Sender>, channel: &'a ClientChannel, ) -> RwLockWriteGuard<'a, mpsc::Sender> { + debug!("Attempting to Connect to {}", self.request.uri()); let mut ws = match connect_async(self.request.clone()).await { Ok((ws, _)) => ws, Err(e) => { - error!("Connect Error: {e}"); + info!("Failed to Connect: {e}"); + sleep(Duration::from_secs(1)).await; return write_lock; } }; + info!("Connected to {}", self.request.uri()); let (tx, rx) = mpsc::channel(128); *write_lock = tx; drop(write_lock); + // Don't care about the previous value + let _ = self.connected_state_tx.send_replace(true); + let close_connection = self.handle_connection(&mut ws, rx, channel).await; let write_lock = channel.write().await; + // Send this after grabbing the lock - to prevent extra contention when others try to grab + // the lock to use that as a signal that we have reconnected + let _ = self.connected_state_tx.send_replace(false); if close_connection { if let Err(e) = ws.close(None).await { - println!("Close Error {e}"); + error!("Failed to Close the Connection: {e}"); } } write_lock @@ -301,6 +342,7 @@ impl ClientContext { Ok(msg) => { match msg { Message::Text(msg) => { + trace!("Incoming: {msg}"); let msg: ResponseMessage = match serde_json::from_str(&msg) { Ok(m) => m, Err(e) => { @@ -345,6 +387,7 @@ impl ClientContext { break; } }; + trace!("Outgoing: {msg}"); if let Err(e) = ws.send(Message::Text(msg.into())).await { error!("Send Error {e}"); break; diff --git a/api/src/messages/mod.rs b/api/src/messages/mod.rs index f11f638..3112e83 100644 --- a/api/src/messages/mod.rs +++ b/api/src/messages/mod.rs @@ -11,6 +11,8 @@ use uuid::Uuid; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct RequestMessage { pub uuid: Uuid, + #[serde(default)] + #[serde(skip_serializing_if = "Option::is_none")] pub response: Option, #[serde(flatten)] pub payload: RequestMessagePayload, @@ -19,6 +21,8 @@ pub struct RequestMessage { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ResponseMessage { pub uuid: Uuid, + #[serde(default)] + #[serde(skip_serializing_if = "Option::is_none")] pub response: Option, #[serde(flatten)] pub payload: ResponseMessagePayload, diff --git a/examples/simple_command/Cargo.toml b/examples/simple_command/Cargo.toml index dab9b42..fdefbd1 100644 --- a/examples/simple_command/Cargo.toml +++ b/examples/simple_command/Cargo.toml @@ -6,5 +6,7 @@ edition = "2021" [dependencies] anyhow = { workspace = true } api = { path = "../../api" } +env_logger = { workspace = true } +log = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread", "signal"] } tokio-util = { workspace = true } diff --git a/examples/simple_command/src/main.rs b/examples/simple_command/src/main.rs index 2f2adf4..055c611 100644 --- a/examples/simple_command/src/main.rs +++ b/examples/simple_command/src/main.rs @@ -4,6 +4,7 @@ use api::data_type::DataType; use api::messages::command::{ Command, CommandDefinition, CommandParameterDefinition, CommandResponse, }; +use log::info; use std::error::Error; use std::sync::Arc; use tokio_util::sync::CancellationToken; @@ -26,15 +27,70 @@ fn handle_command(command: Command) -> anyhow::Result { .ok_or(anyhow!("Parameter 'c' Missing"))?) .try_into()?; - println!("Command Received:\n timestamp: {timestamp}\n a: {a}\n b: {b}\n c: {c}"); + info!("Command Received:\n timestamp: {timestamp}\n a: {a}\n b: {b}\n c: {c}"); Ok(format!( "Successfully Received Command! timestamp: {timestamp} a: {a} b: {b} c: {c}" )) } +struct CommandHandle { + cancellation_token: CancellationToken, +} + +impl CommandHandle { + pub async fn register( + client: Arc, + command_definition: CommandDefinition, + mut callback: impl FnMut(Command) -> anyhow::Result + Send + 'static, + ) -> anyhow::Result { + let cancellation_token = CancellationToken::new(); + let result = Self { + cancellation_token: cancellation_token.clone(), + }; + + tokio::spawn(async move { + while !cancellation_token.is_cancelled() { + // This would only fail if the sender closed while trying to insert data + // It would wait until space is made + let Ok(mut rx) = client + .register_callback_channel(command_definition.clone()) + .await + else { + continue; + }; + + while let Some((cmd, responder)) = rx.recv().await { + let response = match callback(cmd) { + Ok(response) => CommandResponse { + success: true, + response, + }, + Err(err) => CommandResponse { + success: false, + response: err.to_string(), + }, + }; + // This should only err if we had an error elsewhere + let _ = responder.send(response); + } + } + }); + + Ok(result) + } +} + +impl Drop for CommandHandle { + fn drop(&mut self) { + self.cancellation_token.cancel(); + } +} + #[tokio::main] async fn main() -> Result<(), Box> { + env_logger::init(); + let cancellation_token = CancellationToken::new(); { let cancellation_token = cancellation_token.clone(); @@ -46,39 +102,32 @@ async fn main() -> Result<(), Box> { let client = Arc::new(Client::connect("ws://[::1]:8080/backend")?); - client - .register_callback_fn( - CommandDefinition { - name: "simple_command/a".to_string(), - parameters: vec![ - CommandParameterDefinition { - name: "a".to_string(), - data_type: DataType::Float32, - }, - CommandParameterDefinition { - name: "b".to_string(), - data_type: DataType::Float64, - }, - CommandParameterDefinition { - name: "c".to_string(), - data_type: DataType::Boolean, - }, - ], - }, - |command| match handle_command(command) { - Ok(response) => CommandResponse { - success: true, - response, + let handle = CommandHandle::register( + client, + CommandDefinition { + name: "simple_command/a".to_string(), + parameters: vec![ + CommandParameterDefinition { + name: "a".to_string(), + data_type: DataType::Float32, }, - Err(error) => CommandResponse { - success: false, - response: error.to_string(), + CommandParameterDefinition { + name: "b".to_string(), + data_type: DataType::Float64, }, - }, - ) - .await?; + CommandParameterDefinition { + name: "c".to_string(), + data_type: DataType::Boolean, + }, + ], + }, + handle_command, + ) + .await?; cancellation_token.cancelled().await; + drop(handle); + Ok(()) } diff --git a/examples/simple_producer/Cargo.toml b/examples/simple_producer/Cargo.toml index 5e3298a..9746d13 100644 --- a/examples/simple_producer/Cargo.toml +++ b/examples/simple_producer/Cargo.toml @@ -7,8 +7,10 @@ edition = "2021" anyhow = { workspace = true } api = { path = "../../api" } chrono = { workspace = true } +env_logger = { workspace = true } futures-util = { workspace = true } num-traits = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread", "signal", "time", "macros"] } tokio-util = { workspace = true } uuid = { workspace = true } +log = "0.4.29" diff --git a/examples/simple_producer/src/main.rs b/examples/simple_producer/src/main.rs index d176c8f..9426c1c 100644 --- a/examples/simple_producer/src/main.rs +++ b/examples/simple_producer/src/main.rs @@ -9,6 +9,7 @@ use futures_util::future::join_all; use num_traits::FloatConst; use std::sync::Arc; use std::time::Duration; +use tokio::sync::{oneshot, RwLock}; use tokio::time::{sleep_until, Instant}; use tokio_util::sync::CancellationToken; use uuid::Uuid; @@ -18,7 +19,8 @@ struct Telemetry { } struct TelemetryItemHandle { - uuid: Uuid, + cancellation_token: CancellationToken, + uuid: Arc>, client: Arc, } @@ -32,13 +34,48 @@ impl Telemetry { name: String, data_type: DataType, ) -> anyhow::Result { - let response = self - .client - .send_request(TelemetryDefinitionRequest { name, data_type }) - .await?; + let cancellation_token = CancellationToken::new(); + let cancel_token = cancellation_token.clone(); + let client = self.client.clone(); + + let response_uuid = Arc::new(RwLock::new(Uuid::nil())); + + let response_uuid_inner = response_uuid.clone(); + let (tx, rx) = oneshot::channel(); + + tokio::spawn(async move { + let mut write_lock = Some(response_uuid_inner.write().await); + let _ = tx.send(()); + while !cancel_token.is_cancelled() { + if let Ok(response) = client + .send_request(TelemetryDefinitionRequest { + name: name.clone(), + data_type, + }) + .await + { + let mut lock = match write_lock { + None => response_uuid_inner.write().await, + Some(lock) => lock, + }; + // Update the value in the lock + *lock = response.uuid; + // Set this value so the loop works + write_lock = None; + } + + client.wait_disconnected().await; + } + }); + + // Wait until the write lock is acquired + let _ = rx.await; + // Wait until the write lock is released for the first time + drop(response_uuid.read().await); Ok(TelemetryItemHandle { - uuid: response.uuid, + cancellation_token, + uuid: response_uuid, client: self.client.clone(), }) } @@ -46,9 +83,15 @@ impl Telemetry { impl TelemetryItemHandle { pub async fn publish(&self, value: DataValue, timestamp: DateTime) -> anyhow::Result<()> { + let Ok(lock) = self.uuid.try_read() else { + return Ok(()); + }; + let uuid = *lock; + drop(lock); + self.client .send_message_if_connected(TelemetryEntry { - uuid: self.uuid, + uuid, value, timestamp, }) @@ -62,8 +105,16 @@ impl TelemetryItemHandle { } } +impl Drop for TelemetryItemHandle { + fn drop(&mut self) { + self.cancellation_token.cancel(); + } +} + #[tokio::main] async fn main() -> anyhow::Result<()> { + env_logger::init(); + let client = Arc::new(Client::connect("ws://[::1]:8080/backend")?); let tlm = Telemetry::new(client); @@ -141,7 +192,7 @@ async fn main() -> anyhow::Result<()> { let mut index = 0; let mut tasks = vec![]; while !cancellation_token.is_cancelled() { - next_time += Duration::from_millis(10); + next_time += Duration::from_millis(1000); index += 1; sleep_until(next_time).await; let publish_time = start_time + TimeDelta::from_std(next_time - start_instant).unwrap();