This commit is contained in:
2026-05-23 13:37:01 -07:00
commit 528d5c0e3f
5 changed files with 202 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
/target
.idea

7
Cargo.lock generated Normal file
View File

@@ -0,0 +1,7 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 4
[[package]]
name = "async_cyclic_app_demo"
version = "0.1.0"

7
Cargo.toml Normal file
View File

@@ -0,0 +1,7 @@
[package]
name = "async_cyclic_app_demo"
version = "0.1.0"
edition = "2024"
publish = ["gitea"]
[dependencies]

158
src/main.rs Normal file
View File

@@ -0,0 +1,158 @@
use crate::wait_step::wait_step;
use crate::MessageFromDevice::{CyclicData, NoOpResponse, SetConfigAResponse};
use crate::MessageToDevice::{NoOp, SetConfigA};
use std::collections::VecDeque;
use std::error::Error;
use std::pin::pin;
use std::task::{Context, Poll, Waker};
mod wait_step;
struct Device {
rx_stream: VecDeque<Result<Option<MessageFromDevice>, Box<dyn Error>>>
}
impl Device {
fn tx(&mut self, message_to_device: MessageToDevice) {
// This doesn't actually need to do anything
}
fn rx(&'_ mut self) -> Result<Option<MessageFromDevice>, Box<dyn Error>> {
self.rx_stream
.pop_front()
.unwrap_or_else(|| Err("I/O Error".into()))
}
}
enum MessageToDevice {
NoOp,
SetConfigA,
}
enum MessageFromDevice {
NoOpResponse,
SetConfigAResponse,
CyclicData {
tlm: i32
},
}
async fn application(mut device: Device) -> Result<(), Box<dyn Error>> {
// Startup Logic
println!("Future Startup");
// Wait (i.e. we've completed initialization)
wait_step().await;
// let handle_rx = |message_from_device: Result<Option<MessageFromDevice>, Box<dyn Error>>| -> Result<bool, Box<dyn Error>> {
// match message_from_device {
// Err(err) => Err(err),
// Ok(None) => Ok(false),
// // For this demo we don't actually care what the device sent us, but we do
// Ok(Some(_)) => Ok(true)
// }
// };
let mut tlm_value = 0;
// loop: i.e. always try to reconnect if needed
loop {
// Send a NoOp request
device.tx(NoOp);
println!("Sent Tx!");
// Wait until the next cycle
wait_step().await;
match device.rx()? {
None => continue, // Didn't hear back from the device, so back to the top of the loop (attempt reconnect)
Some(_) => {}, // We don't care what data the device sent us
}
println!("Comms Restored!");
// Yay we can hear from the device!
device.tx(SetConfigA);
println!("Configuring Device!");
let mut configured = false;
for _ in 0..3 { // 3 cycles to Rx the response
match device.rx()? {
Some(SetConfigAResponse) => {
configured = true;
break
},
_ => {},
}
wait_step().await;
}
if !configured {
continue;
}
println!("Configured Device!");
// Configuring is done - we now just need to listen from the device
// and monitor for loss of comms
let mut missed_message_count = 0;
while missed_message_count < 3{
wait_step().await;
match device.rx()? {
None => missed_message_count += 1,
Some(CyclicData {tlm}) => tlm_value = tlm,
_ => {},
}
println!("Latest Tlm: {tlm_value}");
}
println!("Loss of Comms!")
}
}
fn main() {
let mut device = Device { rx_stream: Default::default() };
// The device hasn't turned on yet (but no IO errors)
device.rx_stream.push_back(Ok(None));
device.rx_stream.push_back(Ok(None));
// We received the NoOp command
device.rx_stream.push_back(Ok(Some(NoOpResponse)));
// Set Config A
// No immediate response
device.rx_stream.push_back(Ok(None));
device.rx_stream.push_back(Ok(Some(SetConfigAResponse)));
// Monitoring
device.rx_stream.push_back(Ok(Some(CyclicData { tlm: 1 })));
device.rx_stream.push_back(Ok(Some(CyclicData { tlm: 2 })));
device.rx_stream.push_back(Ok(Some(CyclicData { tlm: 3 })));
device.rx_stream.push_back(Ok(Some(CyclicData { tlm: 4 })));
// Loss of comms
device.rx_stream.push_back(Ok(None));
device.rx_stream.push_back(Ok(None));
device.rx_stream.push_back(Ok(None));
// No NoOp Response but comms restored due to cyclic data
device.rx_stream.push_back(Ok(Some(CyclicData { tlm: 5 })));
// Attempting to reconfigure
device.rx_stream.push_back(Ok(Some(CyclicData { tlm: 6 })));
device.rx_stream.push_back(Ok(Some(CyclicData { tlm: 7 })));
device.rx_stream.push_back(Ok(Some(CyclicData { tlm: 8 })));
// Failed back to loss of comms
// We received the NoOp command
device.rx_stream.push_back(Ok(Some(NoOpResponse)));
// Configured successfully
device.rx_stream.push_back(Ok(Some(SetConfigAResponse)));
// Normal Telemetry
device.rx_stream.push_back(Ok(Some(CyclicData { tlm: 9 })));
device.rx_stream.push_back(Ok(Some(CyclicData { tlm: 10 })));
device.rx_stream.push_back(Ok(Some(CyclicData { tlm: 11 })));
println!("Startup");
let mut application_future = pin!(application(device));
// Poll once to get to the end of initialization
let _ = application_future.as_mut().poll(&mut Context::from_waker(Waker::noop()));
println!("Init Complete");
while let Poll::Pending = application_future.as_mut().poll(&mut Context::from_waker(Waker::noop())) {
println!("step()");
}
println!("Complete");
}

28
src/wait_step.rs Normal file
View File

@@ -0,0 +1,28 @@
use std::pin::Pin;
use std::task::{Context, Poll};
struct WaitStep {
should_wait: bool,
}
impl Future for WaitStep {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if self.should_wait {
// Required to call wake by poll spec
// This implementation won't actually care
cx.waker().wake_by_ref();
self.should_wait = false;
Poll::Pending
} else {
Poll::Ready(())
}
}
}
pub fn wait_step() -> impl Future<Output=()> {
WaitStep { should_wait: true }
}