initial state vector implementation

This commit is contained in:
2025-10-25 16:11:39 -07:00
parent b067ae5cec
commit e0f17649b2
8 changed files with 244 additions and 28 deletions

View File

@@ -1,8 +1,8 @@
use crate::scheduler::CyclicTask;
use crate::scheduler::{CyclicTask, TaskHandle};
use anyhow::Result;
use log::{error, trace};
use nautilus_common::command::Command;
use nautilus_common::telemetry::Telemetry;
use nautilus_common::telemetry::{Telemetry, TelemetryMessage};
use nautilus_common::udp::{UdpRecvCborError, UdpSocketExt};
use std::fmt::Debug;
use std::io::Cursor;
@@ -12,6 +12,18 @@ use std::sync::mpsc::Receiver;
use std::sync::Arc;
use std::time::Instant;
pub type TelemetrySender = TaskHandle<Telemetry>;
impl TelemetrySender {
pub fn send(&self, telemetry_message: TelemetryMessage) {
// Ignore failure
let _ = self.sender.send(Telemetry {
timestamp: chrono::Utc::now(),
message: telemetry_message,
});
}
}
#[derive(Debug)]
pub struct CommsTask<A: ToSocketAddrs> {
udp: UdpSocket,
@@ -39,9 +51,9 @@ impl<A: ToSocketAddrs + Debug> CommsTask<A> {
}
impl<A: ToSocketAddrs> CyclicTask for CommsTask<A> {
type Message = ();
type Message = Telemetry;
fn step(&mut self, _receiver: &Receiver<Self::Message>, _step_time: Instant) {
fn step(&mut self, receiver: &Receiver<Self::Message>, _step_time: Instant) {
let mut buffer = Cursor::new([0u8; 512]);
match self.udp.recv_cbor::<Command, _>(&mut buffer) {
@@ -56,9 +68,11 @@ impl<A: ToSocketAddrs> CyclicTask for CommsTask<A> {
}
}
let tlm = Telemetry::Timestamp(chrono::Utc::now());
if let Err(err) = self.udp.send_cbor(&tlm, &mut buffer, &self.ground_address) {
error!("Tx Error: {err}");
// Intentionally ignore Err case
while let Ok(tlm) = receiver.try_recv() {
if let Err(err) = self.udp.send_cbor(&tlm, &mut buffer, &self.ground_address) {
error!("Tx Error: {err}");
}
}
}
}

View File

@@ -13,4 +13,5 @@ pub trait Mcp23017 {
}
pub use driver::Mcp23017Driver;
pub use task::Mcp23017State;
pub use task::Mcp23017Task;

View File

@@ -1,6 +1,7 @@
use crate::hardware::mcp23017::Mcp23017;
use crate::hardware::pin::PinDevice;
use crate::scheduler::{CyclicTask, TaskHandle};
use crate::state_vector::{SectionIdentifier, SectionWriter, StateVector};
use embedded_hal::digital::PinState;
use log::trace;
use std::fmt::{Debug, Formatter};
@@ -31,12 +32,18 @@ impl PinDevice for TaskHandle<Mcp23017Message> {
}
}
pub struct Mcp23017Task<M: Mcp23017> {
mcp23017: M,
pins: AllPins,
#[derive(Default)]
pub struct Mcp23017State {
pub pins: [bool; 16],
}
impl<M: Mcp23017 + Debug> Debug for Mcp23017Task<M> {
pub struct Mcp23017Task<'a, M: Mcp23017> {
mcp23017: M,
pins: AllPins,
state: SectionWriter<'a, Mcp23017State>,
}
impl<M: Mcp23017 + Debug> Debug for Mcp23017Task<'_, M> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Mcp23017Task {{ mcp23017: {:?} }}", self.mcp23017)
}
@@ -64,6 +71,7 @@ struct PinData {
next_priority: u8,
default: PinState,
changed: bool,
value: PinState,
}
impl PinData {
@@ -77,16 +85,18 @@ impl PinData {
next_priority: 0,
default: PinState::Low,
changed: false,
value: PinState::Low,
}
}
fn get(&mut self, now: Instant) -> PinState {
fn evaluate(&mut self, now: Instant) {
// Do this twice to check both the current and the current next
// If the current is currently invalid, we'd upgrade the next to current
for _ in 0..2 {
let is_current_valid = self.valid_until.map(|current| current >= now).unwrap_or(false);
if is_current_valid {
return self.state;
self.value = self.state;
return;
} else {
if self.valid_until.is_some() {
self.changed = true;
@@ -100,7 +110,7 @@ impl PinData {
}
}
self.default
self.value = self.default;
}
fn set(&mut self, value: PinState, valid_until: Instant, priority: u8) {
@@ -123,6 +133,7 @@ impl PinData {
self.valid_until = Some(valid_until);
self.priority = priority;
self.changed = true;
self.value = value;
} else {
// This is not the highest priority thing
if self.priority >= self.next_priority {
@@ -138,17 +149,22 @@ impl PinData {
}
}
impl<M: Mcp23017 + Debug> Mcp23017Task<M> {
pub fn new(mcp23017: M) -> Self {
impl<'a, M: Mcp23017 + Debug> Mcp23017Task<'a, M> {
pub fn new(mcp23017: M, state_vector: &'a StateVector) -> Self {
trace!("Mcp23017Task::new(mcp23017: {mcp23017:?})");
Self {
mcp23017,
pins: AllPins::new(),
state: state_vector.create_section(Mcp23017State::default()),
}
}
pub fn get_state(&self) -> SectionIdentifier {
self.state.get_identifier()
}
}
impl<M: Mcp23017> CyclicTask for Mcp23017Task<M> {
impl<M: Mcp23017> CyclicTask for Mcp23017Task<'_, M> {
type Message = Mcp23017Message;
fn step(
@@ -158,6 +174,10 @@ impl<M: Mcp23017> CyclicTask for Mcp23017Task<M> {
) {
let mut changed = false;
for pin in 0u8..16u8 {
self.pins.pins[pin as usize].evaluate(step_time);
}
while let Ok(recv) = receiver.try_recv() {
match recv {
Mcp23017Message::SetPin { pin, value, valid_until, priority } => {
@@ -169,17 +189,20 @@ impl<M: Mcp23017> CyclicTask for Mcp23017Task<M> {
}
for pin in 0u8..16u8 {
// This shouldn't be able to fail
// TODO: handle error case
let state = self.pins.pins[pin as usize].get(step_time);
let state = self.pins.pins[pin as usize].value;
if self.pins.pins[pin as usize].changed {
self.pins.pins[pin as usize].changed = false;
// This shouldn't be able to fail
// TODO: handle error case
let _ = self.mcp23017.set_pin(pin, state);
changed = true;
}
}
if changed {
let _ = self.mcp23017.flush();
self.state.update(|s| {
s.pins = self.pins.pins.map(|pin| pin.value == PinState::High);
});
}
}
}

View File

@@ -1,16 +1,18 @@
use crate::comms::CommsTask;
use crate::hardware::channelization::{LED_A, LED_B};
use crate::hardware::initialize;
use crate::hardware::mcp23017::{Mcp23017, Mcp23017Task};
use crate::hardware::mcp23017::{Mcp23017, Mcp23017State, Mcp23017Task};
use crate::hardware::mct8316a::Mct8316a;
use crate::hardware::pin::Pin;
use crate::hardware::Hardware;
use crate::scheduler::Scheduler;
use crate::state_vector::StateVector;
use anyhow::Result;
use embedded_hal::digital::PinState;
use embedded_hal::pwm::SetDutyCycle;
use log::{debug, info};
use nautilus_common::add_ctrlc_handler;
use nautilus_common::telemetry::{SwitchBank, TelemetryMessage};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::thread::sleep;
@@ -27,6 +29,8 @@ pub fn run() -> Result<()> {
let running = Arc::new(AtomicBool::new(true));
add_ctrlc_handler(running.clone())?;
let state_vector = StateVector::new();
let hal = initialize()?;
let mut mcp23017_a = hal.new_mcp23017_a()?;
@@ -43,10 +47,31 @@ pub fn run() -> Result<()> {
mct8316.init()?;
Scheduler::new(running.clone(), |s| {
let task_a = s.run_cyclic("mcp23017-a", Mcp23017Task::new(mcp23017_a), 10)?;
let task_b = s.run_cyclic("mcp23017-b", Mcp23017Task::new(mcp23017_b), 10)?;
let task_a = Mcp23017Task::new(mcp23017_a, &state_vector);
let a_id = task_a.get_state();
let task_a = s.run_cyclic("mcp23017-a", task_a, 10)?;
let _comms = s.run_cyclic("comms", CommsTask::new(15000, "192.168.50.157:14000", running.clone())?, 1)?;
let task_b = Mcp23017Task::new(mcp23017_b, &state_vector);
let b_id = task_b.get_state();
let task_b = s.run_cyclic("mcp23017-b", task_b, 10)?;
let comms = s.run_cyclic("comms", CommsTask::new(15000, "192.168.50.157:14000", running.clone())?, 10)?;
let sv = &state_vector;
s.run_cyclic("telemetry-producer", move || {
sv.access_section(&a_id, |state: &Mcp23017State| {
comms.send(TelemetryMessage::SwitchState {
bank: SwitchBank::A,
switches: state.pins.clone(),
})
});
sv.access_section(&b_id, |state: &Mcp23017State| {
comms.send(TelemetryMessage::SwitchState {
bank: SwitchBank::B,
switches: state.pins.clone(),
})
});
}, 1)?;
let mut led_pin_a = LED_A.new(&task_a, &task_b)?;
let mut led_pin_b = LED_B.new(&task_a, &task_b)?;
@@ -84,6 +109,8 @@ pub fn run() -> Result<()> {
drop(hal);
drop(state_vector);
Ok(())
}
@@ -94,3 +121,4 @@ mod on_drop;
mod rcs;
mod comms;
mod scheduler;
mod state_vector;

View File

@@ -29,6 +29,17 @@ pub trait CyclicTask {
fn step(&mut self, receiver: &Receiver<Self::Message>, step_time: Instant);
}
impl<F> CyclicTask for F
where
F: Fn() -> (),
{
type Message = ();
fn step(&mut self, _receiver: &Receiver<Self::Message>, _step_time: Instant) {
self();
}
}
pub struct Scheduler<'s, 'e>
{
scope: &'s Scope<'s, 'e>,
@@ -84,11 +95,11 @@ impl<'s, 'e> Scheduler<'s, 'e> {
mut task: T,
frequency: u64,
) -> Result<TaskHandle<T::Message>> where
T: CyclicTask + Send + Debug + 's,
T: CyclicTask + Send + 's,
T::Message: Send,
{
let name = name.into();
trace!("Scheduler::run_cyclic(name: {name}, task: {task:?}, frequency: {frequency})");
trace!("Scheduler::run_cyclic(name: {name}, task, frequency: {frequency})");
let running = self.running.clone();
let (sender, receiver) = channel::<T::Message>();
let _ = thread::Builder::new()

View File

@@ -0,0 +1,122 @@
use std::any::Any;
use std::collections::HashMap;
use std::marker::PhantomData;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::RwLock;
pub struct StateVector {
next_section: AtomicUsize,
sections: RwLock<HashMap<SectionIdentifier, Box<RwLock<dyn Any + Send + Sync>>>>,
}
#[derive(Clone, Eq, PartialEq, Hash)]
pub struct SectionIdentifier(usize);
pub struct SectionWriter<'a, T> {
id: SectionIdentifier,
state_vector: &'a StateVector,
_phantom_data: PhantomData<T>,
}
impl<'a, T: 'static> SectionWriter<'a, T> {
pub fn get_identifier(&self) -> SectionIdentifier {
self.id.clone()
}
pub fn update<F, R>(&self, f: F) -> R
where
F: FnOnce(&mut T) -> R,
{
self.state_vector.sections.clear_poison();
let sections = self.state_vector.sections.read().unwrap();
let section = sections.get(&self.id).unwrap();
let mut data = section.write().unwrap();
let result = data.downcast_mut::<T>().unwrap();
f(result)
}
}
impl StateVector {
pub fn new() -> Self {
Self {
next_section: AtomicUsize::new(0usize),
sections: RwLock::new(HashMap::new()),
}
}
pub fn create_section<T>(&self, initial_value: T) -> SectionWriter<'_, T>
where
T: Send + Sync + 'static,
{
let id = SectionIdentifier(self.next_section.fetch_add(1usize, Ordering::SeqCst));
let lock = Box::new(RwLock::new(initial_value));
self.sections.clear_poison();
self.sections.write().unwrap().insert(id.clone(), lock);
SectionWriter {
id,
state_vector: &self,
_phantom_data: PhantomData,
}
}
pub fn access_section<T, F, R>(&self, id: &SectionIdentifier, f: F) -> Option<R>
where
T: 'static,
F: FnOnce(&T) -> R,
{
self.sections.clear_poison();
let Ok(sections) = self.sections.read() else { return None; };
let Some(section) = sections.get(id) else { return None; };
section.clear_poison();
let Ok(data) = section.read() else { return None; };
let Some(inner) = data.downcast_ref::<T>() else { return None; };
Some(f(inner))
}
}
#[cfg(test)]
mod tests {
use super::*;
use anyhow::Result;
#[derive(Default)]
struct TestType {
value1: i32,
value2: i32,
}
#[test]
fn test_two_sections() -> Result<()> {
let state_vector = StateVector::new();
let section_1 = state_vector.create_section(TestType::default());
let section_2 = state_vector.create_section(TestType::default());
section_1.update(|s| {
s.value1 = 1;
s.value2 = 2;
});
section_2.update(|s| {
s.value1 = 3;
s.value2 = 4;
});
let id_1 = section_1.get_identifier();
state_vector.access_section(&id_1, |s: &TestType| {
assert_eq!(1, s.value1);
assert_eq!(2, s.value2);
});
let id_2 = section_2.get_identifier();
state_vector.access_section(&id_2, |s: &TestType| {
assert_eq!(3, s.value1);
assert_eq!(4, s.value2);
});
Ok(())
}
}

View File

@@ -1,3 +1,4 @@
#![allow(dead_code)]
use std::mem::ManuallyDrop;
use std::ops::{Deref, DerefMut};
use std::panic;