initial comms

This commit is contained in:
2025-10-25 12:23:23 -07:00
parent fd63bdc0c9
commit b067ae5cec
17 changed files with 536 additions and 356 deletions

114
flight/src/scheduler/mod.rs Normal file
View File

@@ -0,0 +1,114 @@
use crate::on_drop::on_drop;
use anyhow::Result;
use log::trace;
use std::fmt::Debug;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::Arc;
use std::thread;
use std::thread::{sleep, Scope};
use std::time::{Duration, Instant};
#[derive(Clone, Debug)]
pub struct TaskHandle<Message> {
#[allow(dead_code)]
pub name: String,
pub sender: Sender<Message>,
}
#[allow(dead_code)]
pub trait Task {
type Message;
fn run(self, receiver: Receiver<Self::Message>, running: Arc<AtomicBool>);
}
pub trait CyclicTask {
type Message;
fn step(&mut self, receiver: &Receiver<Self::Message>, step_time: Instant);
}
pub struct Scheduler<'s, 'e>
{
scope: &'s Scope<'s, 'e>,
running: Arc<AtomicBool>,
}
impl<'s, 'e> Scheduler<'s, 'e> {
pub fn new<'env, F, R>(running: Arc<AtomicBool>, f: F) -> R
where
F: FnOnce(Scheduler<'_, 'env>) -> R,
{
trace!("Scheduler::new(running: {running:?}, f)");
thread::scope(|scope: &Scope| {
// This will automatically set running to false when it drops
// This means that if the function returns any side branches
// checking running will shut down
let _shutdown_threads = on_drop(|| running.store(false, Ordering::Relaxed));
f(Scheduler {
scope,
running: running.clone(),
})
})
}
#[allow(dead_code)]
pub fn run<T>(
&self,
name: impl Into<String>,
task: T,
) -> Result<TaskHandle<T::Message>> where
T: Task + Send + Debug + 's,
T::Message: Send,
{
let name = name.into();
trace!("Scheduler::run(name: {name}, task: {task:?})");
let running = self.running.clone();
let (sender, receiver) = channel::<T::Message>();
let _ = thread::Builder::new()
.name(name.clone())
.spawn_scoped(self.scope, move || {
task.run(receiver, running);
})?;
Ok(TaskHandle {
name,
sender,
})
}
pub fn run_cyclic<T>(
&self,
name: impl Into<String>,
mut task: T,
frequency: u64,
) -> Result<TaskHandle<T::Message>> where
T: CyclicTask + Send + Debug + 's,
T::Message: Send,
{
let name = name.into();
trace!("Scheduler::run_cyclic(name: {name}, task: {task:?}, frequency: {frequency})");
let running = self.running.clone();
let (sender, receiver) = channel::<T::Message>();
let _ = thread::Builder::new()
.name(name.clone())
.spawn_scoped(self.scope, move || {
let period = Duration::from_nanos(1_000_000_000 / frequency);
let mut cycle_start_time = Instant::now();
while running.load(Ordering::Relaxed) {
task.step(&receiver, cycle_start_time);
cycle_start_time += period;
let sleep_duration = cycle_start_time - Instant::now();
if sleep_duration > Duration::ZERO {
sleep(sleep_duration);
}
}
})?;
Ok(TaskHandle {
name,
sender,
})
}
}