diff --git a/common/src/udp.rs b/common/src/udp.rs index eacab4f..fe1fb11 100644 --- a/common/src/udp.rs +++ b/common/src/udp.rs @@ -1,4 +1,5 @@ use crate::command::{Command, CommandHeader}; +use crate::udp::UdpRecvPostcardError::ExtraData; use crate::udp::UdpSendPostcardError::LengthMismatch; use log::error; use serde::{Deserialize, Serialize}; @@ -12,6 +13,8 @@ pub enum UdpRecvPostcardError { Io(#[from] std::io::Error), #[error("Deserialization Error")] Deserialization(#[from] postcard::Error), + #[error("Extra Data")] + ExtraData { amount: usize }, #[error("No Data")] NoData, } @@ -66,8 +69,13 @@ impl UdpSocketExt for UdpSocket { buffer: &'de mut [u8], ) -> Result<(T, SocketAddr), UdpRecvPostcardError> { match self.recv_from(buffer) { - Ok((size, addr)) => match postcard::from_bytes::(&buffer[..size]) { - Ok(res) => Ok((res, addr)), + Ok((size, addr)) => match postcard::take_from_bytes::(&buffer[..size]) { + Ok((res, rem)) => { + if !rem.is_empty() { + return Err(ExtraData { amount: rem.len() }); + } + Ok((res, addr)) + } Err(err) => Err(err.into()), }, Err(err) => match err.kind() { diff --git a/flight/src/comms/mod.rs b/flight/src/comms/mod.rs index 447ee14..83c46f4 100644 --- a/flight/src/comms/mod.rs +++ b/flight/src/comms/mod.rs @@ -1,14 +1,13 @@ use crate::scheduler::{CyclicTask, TaskHandle}; -use anyhow::Result; +use anyhow::{Result, ensure}; use log::{error, trace, warn}; -use nautilus_common::command::{CommandHeader, SetPin, ValidPriorityCommand}; +use nautilus_common::command::{Command, CommandHeader}; use nautilus_common::telemetry::{Telemetry, TelemetryMessage}; use nautilus_common::udp::{UdpRecvPostcardError, UdpSocketExt}; use std::any::type_name; +use std::collections::HashMap; use std::fmt::{Debug, Formatter}; use std::net::{IpAddr, Ipv4Addr, SocketAddr, ToSocketAddrs, UdpSocket}; -use std::sync::Arc; -use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc::Receiver; use std::time::Instant; @@ -25,47 +24,35 @@ impl TelemetrySender { } } -pub struct CommsTask +type CommandCallback<'a> = Box Result<()> + Send + 'a>; + +pub struct CommsTask<'a, A> where A: ToSocketAddrs + Debug, - SetPinA: Fn(ValidPriorityCommand) + Send + Sync, - SetPinB: Fn(ValidPriorityCommand) + Send + Sync, { udp: UdpSocket, ground_address: A, - running: Arc, - set_pin_a: SetPinA, - set_pin_b: SetPinB, + command_callbacks: HashMap>, } -impl Debug for CommsTask +impl Debug for CommsTask<'_, A> where A: ToSocketAddrs + Debug, - SetPinA: Fn(ValidPriorityCommand) + Send + Sync, - SetPinB: Fn(ValidPriorityCommand) + Send + Sync, { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!( f, - "CommsTask {{ udp: {:?}, ground_address: {:?}, running: {:?} }}", - self.udp, self.ground_address, self.running + "CommsTask {{ udp: {:?}, ground_address: {:?} }}", + self.udp, self.ground_address ) } } -impl CommsTask +impl<'a, A> CommsTask<'a, A> where A: ToSocketAddrs + Debug, - SetPinA: Fn(ValidPriorityCommand) + Send + Sync, - SetPinB: Fn(ValidPriorityCommand) + Send + Sync, { - pub fn new( - local_port: u16, - ground_address: A, - running: Arc, - set_pin_a: SetPinA, - set_pin_b: SetPinB, - ) -> Result { + pub fn new(local_port: u16, ground_address: A) -> Result { trace!( "CommsTask::new(local_port: {local_port}, ground_address: {ground_address:?})", type_name::() @@ -77,18 +64,40 @@ where Ok(Self { udp, ground_address, - running, - set_pin_a, - set_pin_b, + command_callbacks: HashMap::new(), }) } + + pub fn add_command_handler( + &mut self, + command: impl Into, + handler: impl Fn(T) + Send + 'a, + ) -> Result<()> { + let command = command.into(); + ensure!( + !self.command_callbacks.contains_key(&command), + "Already Contains Command {command}" + ); + self.command_callbacks.insert( + command.clone(), + Box::new(move |cmd_buf| { + let (cmd, remainder) = postcard::take_from_bytes::(cmd_buf)?; + ensure!( + remainder.is_empty(), + "{command} received {} extra bytes", + remainder.len() + ); + handler(cmd); + Ok(()) + }), + ); + Ok(()) + } } -impl CyclicTask for CommsTask +impl CyclicTask for CommsTask<'_, A> where A: ToSocketAddrs + Debug, - SetPinA: Fn(ValidPriorityCommand) + Send + Sync, - SetPinB: Fn(ValidPriorityCommand) + Send + Sync, { type Message = Telemetry; type Data = (); @@ -108,48 +117,13 @@ where let mut buffer = [0u8; 512]; match self.udp.recv_postcard::(&mut buffer) { - Ok((cmd, _)) => match cmd.name { - "/shutdown" => match postcard::take_from_bytes::<()>(cmd.data) { - Ok(((), remainder)) => { - if remainder.is_empty() { - self.running.store(false, Ordering::Relaxed); - } else { - error!("shutdown has extra data"); - } - } - Err(e) => { - error!("Failed to parse ShutdownCommand {e}"); - } - }, - "/mcp23017a/set" => { - match postcard::take_from_bytes::>(cmd.data) { - Ok((set_pin, remainder)) => { - if remainder.is_empty() { - (self.set_pin_a)(set_pin); - } else { - error!("set pin has extra data"); - } - } - Err(e) => { - error!("Failed to parse SetPin {e}"); - } + Ok((cmd, _)) => match self.command_callbacks.get(cmd.name) { + Some(handler) => { + if let Err(e) = handler(cmd.data) { + error!("Command Error: {e}"); } } - "/mcp23017b/set" => { - match postcard::take_from_bytes::>(cmd.data) { - Ok((set_pin, remainder)) => { - if remainder.is_empty() { - (self.set_pin_b)(set_pin); - } else { - error!("set pin has extra data"); - } - } - Err(e) => { - error!("Failed to parse SetPin {e}"); - } - } - } - _ => { + None => { warn!("Unknown Command: {}", cmd.name); } }, diff --git a/flight/src/hardware/pin.rs b/flight/src/hardware/pin.rs index 1ba6545..7a45ad9 100644 --- a/flight/src/hardware/pin.rs +++ b/flight/src/hardware/pin.rs @@ -5,17 +5,18 @@ use std::time::Instant; pub trait PinDevice { fn set_pin(&self, pin: u8, value: PinState, valid_until: Instant, priority: u8); - fn new_pinset_callback(self) -> impl Fn(ValidPriorityCommand) + fn new_pinset_callback<'a>(&self) -> impl Fn(ValidPriorityCommand) + 'a where - Self: Sized, + Self: Sized + Clone + 'a, { + let this = self.clone(); move |cmd| { - self.set_pin( + this.set_pin( cmd.pin, cmd.value.into(), cmd.get_valid_until_instant(), cmd.priority, - ) + ); } } } diff --git a/flight/src/lib.rs b/flight/src/lib.rs index abcd120..9bbfe5a 100644 --- a/flight/src/lib.rs +++ b/flight/src/lib.rs @@ -19,6 +19,11 @@ use std::time::Duration; mod hardware; +fn new_shutdown_handler(running: &Arc) -> impl Fn(()) { + let running = running.clone(); + move |()| running.store(false, Ordering::Relaxed) +} + /// Run the flight software /// /// # Errors @@ -64,17 +69,11 @@ pub fn run() -> Result<()> { )?; let b_id = task_b.get_id(); - let comms = s.run_cyclic( - "comms-task", - CommsTask::new( - 15000, - "nautilus-ground:14000", - running.clone(), - task_a.clone().new_pinset_callback(), - task_b.clone().new_pinset_callback(), - )?, - 10, - )?; + let mut comms = CommsTask::new(15000, "nautilus-ground:14000")?; + comms.add_command_handler("/shutdown", new_shutdown_handler(&running))?; + comms.add_command_handler("/mcp23017a/set", task_a.new_pinset_callback())?; + comms.add_command_handler("/mcp23017b/set", task_b.new_pinset_callback())?; + let comms = s.run_cyclic("comms-task", comms, 10)?; let sv = &state_vector; s.run_cyclic( diff --git a/flight/src/scheduler/mod.rs b/flight/src/scheduler/mod.rs index 6294b4e..ea1ebd7 100644 --- a/flight/src/scheduler/mod.rs +++ b/flight/src/scheduler/mod.rs @@ -84,14 +84,15 @@ impl<'s> Scheduler<'s, '_> { } #[allow(dead_code)] - pub fn run( + pub fn run<'t, T>( &self, name: impl Into, task: T, ) -> Result> where - T: Task + Send + Debug + 's, + T: Task + Send + Debug + 't, T::Message: Send, + 't: 's, { let name = name.into(); trace!( @@ -109,15 +110,16 @@ impl<'s> Scheduler<'s, '_> { Ok(TaskHandle { name, sender, data }) } - pub fn run_cyclic( + pub fn run_cyclic<'t, T>( &self, name: impl Into, mut task: T, frequency: u64, ) -> Result> where - T: CyclicTask + Send + 's, + T: CyclicTask + Send + 't, T::Message: Send, + 't: 's, { let name = name.into(); trace!(