commit 528d5c0e3fc86dbcc2cfb66c0036b931fd77b957 Author: Sergey Savelyev Date: Sat May 23 13:37:01 2026 -0700 demo diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..3a8cabc --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/target +.idea diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..5888969 --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,7 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 4 + +[[package]] +name = "async_cyclic_app_demo" +version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..094f25a --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,7 @@ +[package] +name = "async_cyclic_app_demo" +version = "0.1.0" +edition = "2024" +publish = ["gitea"] + +[dependencies] diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..eb38f57 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,158 @@ +use crate::wait_step::wait_step; +use crate::MessageFromDevice::{CyclicData, NoOpResponse, SetConfigAResponse}; +use crate::MessageToDevice::{NoOp, SetConfigA}; +use std::collections::VecDeque; +use std::error::Error; +use std::pin::pin; +use std::task::{Context, Poll, Waker}; + +mod wait_step; + +struct Device { + rx_stream: VecDeque, Box>> +} + +impl Device { + fn tx(&mut self, message_to_device: MessageToDevice) { + // This doesn't actually need to do anything + } + + fn rx(&'_ mut self) -> Result, Box> { + self.rx_stream + .pop_front() + .unwrap_or_else(|| Err("I/O Error".into())) + } +} + +enum MessageToDevice { + NoOp, + SetConfigA, +} + +enum MessageFromDevice { + NoOpResponse, + SetConfigAResponse, + CyclicData { + tlm: i32 + }, +} + +async fn application(mut device: Device) -> Result<(), Box> { + // Startup Logic + println!("Future Startup"); + + // Wait (i.e. we've completed initialization) + wait_step().await; + + // let handle_rx = |message_from_device: Result, Box>| -> Result> { + // match message_from_device { + // Err(err) => Err(err), + // Ok(None) => Ok(false), + // // For this demo we don't actually care what the device sent us, but we do + // Ok(Some(_)) => Ok(true) + // } + // }; + + let mut tlm_value = 0; + + // loop: i.e. always try to reconnect if needed + loop { + // Send a NoOp request + device.tx(NoOp); + + println!("Sent Tx!"); + // Wait until the next cycle + wait_step().await; + + match device.rx()? { + None => continue, // Didn't hear back from the device, so back to the top of the loop (attempt reconnect) + Some(_) => {}, // We don't care what data the device sent us + } + + println!("Comms Restored!"); + + // Yay we can hear from the device! + device.tx(SetConfigA); + println!("Configuring Device!"); + let mut configured = false; + for _ in 0..3 { // 3 cycles to Rx the response + match device.rx()? { + Some(SetConfigAResponse) => { + configured = true; + break + }, + _ => {}, + } + wait_step().await; + } + if !configured { + continue; + } + + println!("Configured Device!"); + + // Configuring is done - we now just need to listen from the device + // and monitor for loss of comms + let mut missed_message_count = 0; + while missed_message_count < 3{ + wait_step().await; + match device.rx()? { + None => missed_message_count += 1, + Some(CyclicData {tlm}) => tlm_value = tlm, + _ => {}, + } + println!("Latest Tlm: {tlm_value}"); + } + + println!("Loss of Comms!") + } +} + +fn main() { + let mut device = Device { rx_stream: Default::default() }; + // The device hasn't turned on yet (but no IO errors) + device.rx_stream.push_back(Ok(None)); + device.rx_stream.push_back(Ok(None)); + // We received the NoOp command + device.rx_stream.push_back(Ok(Some(NoOpResponse))); + // Set Config A + // No immediate response + device.rx_stream.push_back(Ok(None)); + device.rx_stream.push_back(Ok(Some(SetConfigAResponse))); + // Monitoring + device.rx_stream.push_back(Ok(Some(CyclicData { tlm: 1 }))); + device.rx_stream.push_back(Ok(Some(CyclicData { tlm: 2 }))); + device.rx_stream.push_back(Ok(Some(CyclicData { tlm: 3 }))); + device.rx_stream.push_back(Ok(Some(CyclicData { tlm: 4 }))); + // Loss of comms + device.rx_stream.push_back(Ok(None)); + device.rx_stream.push_back(Ok(None)); + device.rx_stream.push_back(Ok(None)); + // No NoOp Response but comms restored due to cyclic data + device.rx_stream.push_back(Ok(Some(CyclicData { tlm: 5 }))); + // Attempting to reconfigure + device.rx_stream.push_back(Ok(Some(CyclicData { tlm: 6 }))); + device.rx_stream.push_back(Ok(Some(CyclicData { tlm: 7 }))); + device.rx_stream.push_back(Ok(Some(CyclicData { tlm: 8 }))); + // Failed back to loss of comms + // We received the NoOp command + device.rx_stream.push_back(Ok(Some(NoOpResponse))); + // Configured successfully + device.rx_stream.push_back(Ok(Some(SetConfigAResponse))); + // Normal Telemetry + device.rx_stream.push_back(Ok(Some(CyclicData { tlm: 9 }))); + device.rx_stream.push_back(Ok(Some(CyclicData { tlm: 10 }))); + device.rx_stream.push_back(Ok(Some(CyclicData { tlm: 11 }))); + + println!("Startup"); + let mut application_future = pin!(application(device)); + // Poll once to get to the end of initialization + let _ = application_future.as_mut().poll(&mut Context::from_waker(Waker::noop())); + println!("Init Complete"); + + while let Poll::Pending = application_future.as_mut().poll(&mut Context::from_waker(Waker::noop())) { + println!("step()"); + } + + println!("Complete"); +} diff --git a/src/wait_step.rs b/src/wait_step.rs new file mode 100644 index 0000000..6d507d6 --- /dev/null +++ b/src/wait_step.rs @@ -0,0 +1,28 @@ +use std::pin::Pin; +use std::task::{Context, Poll}; + +struct WaitStep { + should_wait: bool, +} + +impl Future for WaitStep { + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + if self.should_wait { + // Required to call wake by poll spec + // This implementation won't actually care + cx.waker().wake_by_ref(); + + self.should_wait = false; + + Poll::Pending + } else { + Poll::Ready(()) + } + } +} + +pub fn wait_step() -> impl Future { + WaitStep { should_wait: true } +}