diff --git a/common/src/telemetry/mod.rs b/common/src/telemetry/mod.rs index bc442e0..a1d6b16 100644 --- a/common/src/telemetry/mod.rs +++ b/common/src/telemetry/mod.rs @@ -3,6 +3,22 @@ use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Serialize, Deserialize)] -pub enum Telemetry { - Timestamp(#[serde(with = "ts_nanoseconds")] DateTime) +pub struct Telemetry { + #[serde(with = "ts_nanoseconds")] + pub timestamp: DateTime, + pub message: TelemetryMessage, +} + +#[derive(Copy, Clone, Debug, Serialize, Deserialize)] +pub enum SwitchBank { + A, + B, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub enum TelemetryMessage { + SwitchState { + bank: SwitchBank, + switches: [bool; 16], + } } diff --git a/flight/src/comms/mod.rs b/flight/src/comms/mod.rs index f115dc1..2328b91 100644 --- a/flight/src/comms/mod.rs +++ b/flight/src/comms/mod.rs @@ -1,8 +1,8 @@ -use crate::scheduler::CyclicTask; +use crate::scheduler::{CyclicTask, TaskHandle}; use anyhow::Result; use log::{error, trace}; use nautilus_common::command::Command; -use nautilus_common::telemetry::Telemetry; +use nautilus_common::telemetry::{Telemetry, TelemetryMessage}; use nautilus_common::udp::{UdpRecvCborError, UdpSocketExt}; use std::fmt::Debug; use std::io::Cursor; @@ -12,6 +12,18 @@ use std::sync::mpsc::Receiver; use std::sync::Arc; use std::time::Instant; +pub type TelemetrySender = TaskHandle; + +impl TelemetrySender { + pub fn send(&self, telemetry_message: TelemetryMessage) { + // Ignore failure + let _ = self.sender.send(Telemetry { + timestamp: chrono::Utc::now(), + message: telemetry_message, + }); + } +} + #[derive(Debug)] pub struct CommsTask { udp: UdpSocket, @@ -39,9 +51,9 @@ impl CommsTask { } impl CyclicTask for CommsTask { - type Message = (); + type Message = Telemetry; - fn step(&mut self, _receiver: &Receiver, _step_time: Instant) { + fn step(&mut self, receiver: &Receiver, _step_time: Instant) { let mut buffer = Cursor::new([0u8; 512]); match self.udp.recv_cbor::(&mut buffer) { @@ -56,9 +68,11 @@ impl CyclicTask for CommsTask { } } - let tlm = Telemetry::Timestamp(chrono::Utc::now()); - if let Err(err) = self.udp.send_cbor(&tlm, &mut buffer, &self.ground_address) { - error!("Tx Error: {err}"); + // Intentionally ignore Err case + while let Ok(tlm) = receiver.try_recv() { + if let Err(err) = self.udp.send_cbor(&tlm, &mut buffer, &self.ground_address) { + error!("Tx Error: {err}"); + } } } } diff --git a/flight/src/hardware/mcp23017/mod.rs b/flight/src/hardware/mcp23017/mod.rs index 3b26865..d0406b2 100644 --- a/flight/src/hardware/mcp23017/mod.rs +++ b/flight/src/hardware/mcp23017/mod.rs @@ -13,4 +13,5 @@ pub trait Mcp23017 { } pub use driver::Mcp23017Driver; +pub use task::Mcp23017State; pub use task::Mcp23017Task; diff --git a/flight/src/hardware/mcp23017/task.rs b/flight/src/hardware/mcp23017/task.rs index 6de9511..f92952b 100644 --- a/flight/src/hardware/mcp23017/task.rs +++ b/flight/src/hardware/mcp23017/task.rs @@ -1,6 +1,7 @@ use crate::hardware::mcp23017::Mcp23017; use crate::hardware::pin::PinDevice; use crate::scheduler::{CyclicTask, TaskHandle}; +use crate::state_vector::{SectionIdentifier, SectionWriter, StateVector}; use embedded_hal::digital::PinState; use log::trace; use std::fmt::{Debug, Formatter}; @@ -31,12 +32,18 @@ impl PinDevice for TaskHandle { } } -pub struct Mcp23017Task { - mcp23017: M, - pins: AllPins, +#[derive(Default)] +pub struct Mcp23017State { + pub pins: [bool; 16], } -impl Debug for Mcp23017Task { +pub struct Mcp23017Task<'a, M: Mcp23017> { + mcp23017: M, + pins: AllPins, + state: SectionWriter<'a, Mcp23017State>, +} + +impl Debug for Mcp23017Task<'_, M> { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "Mcp23017Task {{ mcp23017: {:?} }}", self.mcp23017) } @@ -64,6 +71,7 @@ struct PinData { next_priority: u8, default: PinState, changed: bool, + value: PinState, } impl PinData { @@ -77,16 +85,18 @@ impl PinData { next_priority: 0, default: PinState::Low, changed: false, + value: PinState::Low, } } - fn get(&mut self, now: Instant) -> PinState { + fn evaluate(&mut self, now: Instant) { // Do this twice to check both the current and the current next // If the current is currently invalid, we'd upgrade the next to current for _ in 0..2 { let is_current_valid = self.valid_until.map(|current| current >= now).unwrap_or(false); if is_current_valid { - return self.state; + self.value = self.state; + return; } else { if self.valid_until.is_some() { self.changed = true; @@ -100,7 +110,7 @@ impl PinData { } } - self.default + self.value = self.default; } fn set(&mut self, value: PinState, valid_until: Instant, priority: u8) { @@ -123,6 +133,7 @@ impl PinData { self.valid_until = Some(valid_until); self.priority = priority; self.changed = true; + self.value = value; } else { // This is not the highest priority thing if self.priority >= self.next_priority { @@ -138,17 +149,22 @@ impl PinData { } } -impl Mcp23017Task { - pub fn new(mcp23017: M) -> Self { +impl<'a, M: Mcp23017 + Debug> Mcp23017Task<'a, M> { + pub fn new(mcp23017: M, state_vector: &'a StateVector) -> Self { trace!("Mcp23017Task::new(mcp23017: {mcp23017:?})"); Self { mcp23017, pins: AllPins::new(), + state: state_vector.create_section(Mcp23017State::default()), } } + + pub fn get_state(&self) -> SectionIdentifier { + self.state.get_identifier() + } } -impl CyclicTask for Mcp23017Task { +impl CyclicTask for Mcp23017Task<'_, M> { type Message = Mcp23017Message; fn step( @@ -158,6 +174,10 @@ impl CyclicTask for Mcp23017Task { ) { let mut changed = false; + for pin in 0u8..16u8 { + self.pins.pins[pin as usize].evaluate(step_time); + } + while let Ok(recv) = receiver.try_recv() { match recv { Mcp23017Message::SetPin { pin, value, valid_until, priority } => { @@ -169,17 +189,20 @@ impl CyclicTask for Mcp23017Task { } for pin in 0u8..16u8 { - // This shouldn't be able to fail - // TODO: handle error case - let state = self.pins.pins[pin as usize].get(step_time); + let state = self.pins.pins[pin as usize].value; if self.pins.pins[pin as usize].changed { self.pins.pins[pin as usize].changed = false; + // This shouldn't be able to fail + // TODO: handle error case let _ = self.mcp23017.set_pin(pin, state); changed = true; } } if changed { let _ = self.mcp23017.flush(); + self.state.update(|s| { + s.pins = self.pins.pins.map(|pin| pin.value == PinState::High); + }); } } } diff --git a/flight/src/lib.rs b/flight/src/lib.rs index 7ce4a0e..114d9cd 100644 --- a/flight/src/lib.rs +++ b/flight/src/lib.rs @@ -1,16 +1,18 @@ use crate::comms::CommsTask; use crate::hardware::channelization::{LED_A, LED_B}; use crate::hardware::initialize; -use crate::hardware::mcp23017::{Mcp23017, Mcp23017Task}; +use crate::hardware::mcp23017::{Mcp23017, Mcp23017State, Mcp23017Task}; use crate::hardware::mct8316a::Mct8316a; use crate::hardware::pin::Pin; use crate::hardware::Hardware; use crate::scheduler::Scheduler; +use crate::state_vector::StateVector; use anyhow::Result; use embedded_hal::digital::PinState; use embedded_hal::pwm::SetDutyCycle; use log::{debug, info}; use nautilus_common::add_ctrlc_handler; +use nautilus_common::telemetry::{SwitchBank, TelemetryMessage}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::thread::sleep; @@ -27,6 +29,8 @@ pub fn run() -> Result<()> { let running = Arc::new(AtomicBool::new(true)); add_ctrlc_handler(running.clone())?; + let state_vector = StateVector::new(); + let hal = initialize()?; let mut mcp23017_a = hal.new_mcp23017_a()?; @@ -43,10 +47,31 @@ pub fn run() -> Result<()> { mct8316.init()?; Scheduler::new(running.clone(), |s| { - let task_a = s.run_cyclic("mcp23017-a", Mcp23017Task::new(mcp23017_a), 10)?; - let task_b = s.run_cyclic("mcp23017-b", Mcp23017Task::new(mcp23017_b), 10)?; + let task_a = Mcp23017Task::new(mcp23017_a, &state_vector); + let a_id = task_a.get_state(); + let task_a = s.run_cyclic("mcp23017-a", task_a, 10)?; - let _comms = s.run_cyclic("comms", CommsTask::new(15000, "192.168.50.157:14000", running.clone())?, 1)?; + let task_b = Mcp23017Task::new(mcp23017_b, &state_vector); + let b_id = task_b.get_state(); + let task_b = s.run_cyclic("mcp23017-b", task_b, 10)?; + + let comms = s.run_cyclic("comms", CommsTask::new(15000, "192.168.50.157:14000", running.clone())?, 10)?; + + let sv = &state_vector; + s.run_cyclic("telemetry-producer", move || { + sv.access_section(&a_id, |state: &Mcp23017State| { + comms.send(TelemetryMessage::SwitchState { + bank: SwitchBank::A, + switches: state.pins.clone(), + }) + }); + sv.access_section(&b_id, |state: &Mcp23017State| { + comms.send(TelemetryMessage::SwitchState { + bank: SwitchBank::B, + switches: state.pins.clone(), + }) + }); + }, 1)?; let mut led_pin_a = LED_A.new(&task_a, &task_b)?; let mut led_pin_b = LED_B.new(&task_a, &task_b)?; @@ -84,6 +109,8 @@ pub fn run() -> Result<()> { drop(hal); + drop(state_vector); + Ok(()) } @@ -94,3 +121,4 @@ mod on_drop; mod rcs; mod comms; mod scheduler; +mod state_vector; diff --git a/flight/src/scheduler/mod.rs b/flight/src/scheduler/mod.rs index 7c65f61..ba31aed 100644 --- a/flight/src/scheduler/mod.rs +++ b/flight/src/scheduler/mod.rs @@ -29,6 +29,17 @@ pub trait CyclicTask { fn step(&mut self, receiver: &Receiver, step_time: Instant); } +impl CyclicTask for F +where + F: Fn() -> (), +{ + type Message = (); + + fn step(&mut self, _receiver: &Receiver, _step_time: Instant) { + self(); + } +} + pub struct Scheduler<'s, 'e> { scope: &'s Scope<'s, 'e>, @@ -84,11 +95,11 @@ impl<'s, 'e> Scheduler<'s, 'e> { mut task: T, frequency: u64, ) -> Result> where - T: CyclicTask + Send + Debug + 's, + T: CyclicTask + Send + 's, T::Message: Send, { let name = name.into(); - trace!("Scheduler::run_cyclic(name: {name}, task: {task:?}, frequency: {frequency})"); + trace!("Scheduler::run_cyclic(name: {name}, task, frequency: {frequency})"); let running = self.running.clone(); let (sender, receiver) = channel::(); let _ = thread::Builder::new() diff --git a/flight/src/state_vector/mod.rs b/flight/src/state_vector/mod.rs new file mode 100644 index 0000000..083ca3e --- /dev/null +++ b/flight/src/state_vector/mod.rs @@ -0,0 +1,122 @@ +use std::any::Any; +use std::collections::HashMap; +use std::marker::PhantomData; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::RwLock; + +pub struct StateVector { + next_section: AtomicUsize, + sections: RwLock>>>, +} + +#[derive(Clone, Eq, PartialEq, Hash)] +pub struct SectionIdentifier(usize); + +pub struct SectionWriter<'a, T> { + id: SectionIdentifier, + state_vector: &'a StateVector, + _phantom_data: PhantomData, +} + +impl<'a, T: 'static> SectionWriter<'a, T> { + pub fn get_identifier(&self) -> SectionIdentifier { + self.id.clone() + } + + pub fn update(&self, f: F) -> R + where + F: FnOnce(&mut T) -> R, + { + self.state_vector.sections.clear_poison(); + let sections = self.state_vector.sections.read().unwrap(); + let section = sections.get(&self.id).unwrap(); + let mut data = section.write().unwrap(); + let result = data.downcast_mut::().unwrap(); + f(result) + } +} + +impl StateVector { + pub fn new() -> Self { + Self { + next_section: AtomicUsize::new(0usize), + sections: RwLock::new(HashMap::new()), + } + } + + pub fn create_section(&self, initial_value: T) -> SectionWriter<'_, T> + where + T: Send + Sync + 'static, + { + let id = SectionIdentifier(self.next_section.fetch_add(1usize, Ordering::SeqCst)); + let lock = Box::new(RwLock::new(initial_value)); + + self.sections.clear_poison(); + self.sections.write().unwrap().insert(id.clone(), lock); + + SectionWriter { + id, + state_vector: &self, + _phantom_data: PhantomData, + } + } + + pub fn access_section(&self, id: &SectionIdentifier, f: F) -> Option + where + T: 'static, + F: FnOnce(&T) -> R, + { + self.sections.clear_poison(); + let Ok(sections) = self.sections.read() else { return None; }; + let Some(section) = sections.get(id) else { return None; }; + section.clear_poison(); + let Ok(data) = section.read() else { return None; }; + let Some(inner) = data.downcast_ref::() else { return None; }; + Some(f(inner)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use anyhow::Result; + + #[derive(Default)] + struct TestType { + value1: i32, + value2: i32, + } + + #[test] + fn test_two_sections() -> Result<()> { + let state_vector = StateVector::new(); + + let section_1 = state_vector.create_section(TestType::default()); + let section_2 = state_vector.create_section(TestType::default()); + + section_1.update(|s| { + s.value1 = 1; + s.value2 = 2; + }); + section_2.update(|s| { + s.value1 = 3; + s.value2 = 4; + }); + + let id_1 = section_1.get_identifier(); + + state_vector.access_section(&id_1, |s: &TestType| { + assert_eq!(1, s.value1); + assert_eq!(2, s.value2); + }); + + let id_2 = section_2.get_identifier(); + + state_vector.access_section(&id_2, |s: &TestType| { + assert_eq!(3, s.value1); + assert_eq!(4, s.value2); + }); + + Ok(()) + } +} diff --git a/flight/src/test_utils.rs b/flight/src/test_utils.rs index 1ceec9e..6cad53d 100644 --- a/flight/src/test_utils.rs +++ b/flight/src/test_utils.rs @@ -1,3 +1,4 @@ +#![allow(dead_code)] use std::mem::ManuallyDrop; use std::ops::{Deref, DerefMut}; use std::panic;