adds command registration

This commit is contained in:
2025-11-30 08:50:35 -08:00
parent d53d78434c
commit ea56b9865e
5 changed files with 76 additions and 92 deletions

View File

@@ -1,4 +1,5 @@
use crate::command::{Command, CommandHeader}; use crate::command::{Command, CommandHeader};
use crate::udp::UdpRecvPostcardError::ExtraData;
use crate::udp::UdpSendPostcardError::LengthMismatch; use crate::udp::UdpSendPostcardError::LengthMismatch;
use log::error; use log::error;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@@ -12,6 +13,8 @@ pub enum UdpRecvPostcardError {
Io(#[from] std::io::Error), Io(#[from] std::io::Error),
#[error("Deserialization Error")] #[error("Deserialization Error")]
Deserialization(#[from] postcard::Error), Deserialization(#[from] postcard::Error),
#[error("Extra Data")]
ExtraData { amount: usize },
#[error("No Data")] #[error("No Data")]
NoData, NoData,
} }
@@ -66,8 +69,13 @@ impl UdpSocketExt for UdpSocket {
buffer: &'de mut [u8], buffer: &'de mut [u8],
) -> Result<(T, SocketAddr), UdpRecvPostcardError> { ) -> Result<(T, SocketAddr), UdpRecvPostcardError> {
match self.recv_from(buffer) { match self.recv_from(buffer) {
Ok((size, addr)) => match postcard::from_bytes::<T>(&buffer[..size]) { Ok((size, addr)) => match postcard::take_from_bytes::<T>(&buffer[..size]) {
Ok(res) => Ok((res, addr)), Ok((res, rem)) => {
if !rem.is_empty() {
return Err(ExtraData { amount: rem.len() });
}
Ok((res, addr))
}
Err(err) => Err(err.into()), Err(err) => Err(err.into()),
}, },
Err(err) => match err.kind() { Err(err) => match err.kind() {

View File

@@ -1,14 +1,13 @@
use crate::scheduler::{CyclicTask, TaskHandle}; use crate::scheduler::{CyclicTask, TaskHandle};
use anyhow::Result; use anyhow::{Result, ensure};
use log::{error, trace, warn}; use log::{error, trace, warn};
use nautilus_common::command::{CommandHeader, SetPin, ValidPriorityCommand}; use nautilus_common::command::{Command, CommandHeader};
use nautilus_common::telemetry::{Telemetry, TelemetryMessage}; use nautilus_common::telemetry::{Telemetry, TelemetryMessage};
use nautilus_common::udp::{UdpRecvPostcardError, UdpSocketExt}; use nautilus_common::udp::{UdpRecvPostcardError, UdpSocketExt};
use std::any::type_name; use std::any::type_name;
use std::collections::HashMap;
use std::fmt::{Debug, Formatter}; use std::fmt::{Debug, Formatter};
use std::net::{IpAddr, Ipv4Addr, SocketAddr, ToSocketAddrs, UdpSocket}; use std::net::{IpAddr, Ipv4Addr, SocketAddr, ToSocketAddrs, UdpSocket};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::Receiver; use std::sync::mpsc::Receiver;
use std::time::Instant; use std::time::Instant;
@@ -25,47 +24,35 @@ impl TelemetrySender {
} }
} }
pub struct CommsTask<A, SetPinA, SetPinB> type CommandCallback<'a> = Box<dyn Fn(&[u8]) -> Result<()> + Send + 'a>;
pub struct CommsTask<'a, A>
where where
A: ToSocketAddrs + Debug, A: ToSocketAddrs + Debug,
SetPinA: Fn(ValidPriorityCommand<SetPin>) + Send + Sync,
SetPinB: Fn(ValidPriorityCommand<SetPin>) + Send + Sync,
{ {
udp: UdpSocket, udp: UdpSocket,
ground_address: A, ground_address: A,
running: Arc<AtomicBool>, command_callbacks: HashMap<String, CommandCallback<'a>>,
set_pin_a: SetPinA,
set_pin_b: SetPinB,
} }
impl<A, SetPinA, SetPinB> Debug for CommsTask<A, SetPinA, SetPinB> impl<A> Debug for CommsTask<'_, A>
where where
A: ToSocketAddrs + Debug, A: ToSocketAddrs + Debug,
SetPinA: Fn(ValidPriorityCommand<SetPin>) + Send + Sync,
SetPinB: Fn(ValidPriorityCommand<SetPin>) + Send + Sync,
{ {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!( write!(
f, f,
"CommsTask {{ udp: {:?}, ground_address: {:?}, running: {:?} }}", "CommsTask {{ udp: {:?}, ground_address: {:?} }}",
self.udp, self.ground_address, self.running self.udp, self.ground_address
) )
} }
} }
impl<A, SetPinA, SetPinB> CommsTask<A, SetPinA, SetPinB> impl<'a, A> CommsTask<'a, A>
where where
A: ToSocketAddrs + Debug, A: ToSocketAddrs + Debug,
SetPinA: Fn(ValidPriorityCommand<SetPin>) + Send + Sync,
SetPinB: Fn(ValidPriorityCommand<SetPin>) + Send + Sync,
{ {
pub fn new( pub fn new(local_port: u16, ground_address: A) -> Result<Self> {
local_port: u16,
ground_address: A,
running: Arc<AtomicBool>,
set_pin_a: SetPinA,
set_pin_b: SetPinB,
) -> Result<Self> {
trace!( trace!(
"CommsTask::new<A={}>(local_port: {local_port}, ground_address: {ground_address:?})", "CommsTask::new<A={}>(local_port: {local_port}, ground_address: {ground_address:?})",
type_name::<A>() type_name::<A>()
@@ -77,18 +64,40 @@ where
Ok(Self { Ok(Self {
udp, udp,
ground_address, ground_address,
running, command_callbacks: HashMap::new(),
set_pin_a,
set_pin_b,
}) })
} }
pub fn add_command_handler<T: Command>(
&mut self,
command: impl Into<String>,
handler: impl Fn(T) + Send + 'a,
) -> Result<()> {
let command = command.into();
ensure!(
!self.command_callbacks.contains_key(&command),
"Already Contains Command {command}"
);
self.command_callbacks.insert(
command.clone(),
Box::new(move |cmd_buf| {
let (cmd, remainder) = postcard::take_from_bytes::<T>(cmd_buf)?;
ensure!(
remainder.is_empty(),
"{command} received {} extra bytes",
remainder.len()
);
handler(cmd);
Ok(())
}),
);
Ok(())
}
} }
impl<A, SetPinA, SetPinB> CyclicTask for CommsTask<A, SetPinA, SetPinB> impl<A> CyclicTask for CommsTask<'_, A>
where where
A: ToSocketAddrs + Debug, A: ToSocketAddrs + Debug,
SetPinA: Fn(ValidPriorityCommand<SetPin>) + Send + Sync,
SetPinB: Fn(ValidPriorityCommand<SetPin>) + Send + Sync,
{ {
type Message = Telemetry; type Message = Telemetry;
type Data = (); type Data = ();
@@ -108,48 +117,13 @@ where
let mut buffer = [0u8; 512]; let mut buffer = [0u8; 512];
match self.udp.recv_postcard::<CommandHeader>(&mut buffer) { match self.udp.recv_postcard::<CommandHeader>(&mut buffer) {
Ok((cmd, _)) => match cmd.name { Ok((cmd, _)) => match self.command_callbacks.get(cmd.name) {
"/shutdown" => match postcard::take_from_bytes::<()>(cmd.data) { Some(handler) => {
Ok(((), remainder)) => { if let Err(e) = handler(cmd.data) {
if remainder.is_empty() { error!("Command Error: {e}");
self.running.store(false, Ordering::Relaxed);
} else {
error!("shutdown has extra data");
}
}
Err(e) => {
error!("Failed to parse ShutdownCommand {e}");
}
},
"/mcp23017a/set" => {
match postcard::take_from_bytes::<ValidPriorityCommand<SetPin>>(cmd.data) {
Ok((set_pin, remainder)) => {
if remainder.is_empty() {
(self.set_pin_a)(set_pin);
} else {
error!("set pin has extra data");
}
}
Err(e) => {
error!("Failed to parse SetPin {e}");
}
} }
} }
"/mcp23017b/set" => { None => {
match postcard::take_from_bytes::<ValidPriorityCommand<SetPin>>(cmd.data) {
Ok((set_pin, remainder)) => {
if remainder.is_empty() {
(self.set_pin_b)(set_pin);
} else {
error!("set pin has extra data");
}
}
Err(e) => {
error!("Failed to parse SetPin {e}");
}
}
}
_ => {
warn!("Unknown Command: {}", cmd.name); warn!("Unknown Command: {}", cmd.name);
} }
}, },

View File

@@ -5,17 +5,18 @@ use std::time::Instant;
pub trait PinDevice { pub trait PinDevice {
fn set_pin(&self, pin: u8, value: PinState, valid_until: Instant, priority: u8); fn set_pin(&self, pin: u8, value: PinState, valid_until: Instant, priority: u8);
fn new_pinset_callback(self) -> impl Fn(ValidPriorityCommand<SetPin>) fn new_pinset_callback<'a>(&self) -> impl Fn(ValidPriorityCommand<SetPin>) + 'a
where where
Self: Sized, Self: Sized + Clone + 'a,
{ {
let this = self.clone();
move |cmd| { move |cmd| {
self.set_pin( this.set_pin(
cmd.pin, cmd.pin,
cmd.value.into(), cmd.value.into(),
cmd.get_valid_until_instant(), cmd.get_valid_until_instant(),
cmd.priority, cmd.priority,
) );
} }
} }
} }

View File

@@ -19,6 +19,11 @@ use std::time::Duration;
mod hardware; mod hardware;
fn new_shutdown_handler(running: &Arc<AtomicBool>) -> impl Fn(()) {
let running = running.clone();
move |()| running.store(false, Ordering::Relaxed)
}
/// Run the flight software /// Run the flight software
/// ///
/// # Errors /// # Errors
@@ -64,17 +69,11 @@ pub fn run() -> Result<()> {
)?; )?;
let b_id = task_b.get_id(); let b_id = task_b.get_id();
let comms = s.run_cyclic( let mut comms = CommsTask::new(15000, "nautilus-ground:14000")?;
"comms-task", comms.add_command_handler("/shutdown", new_shutdown_handler(&running))?;
CommsTask::new( comms.add_command_handler("/mcp23017a/set", task_a.new_pinset_callback())?;
15000, comms.add_command_handler("/mcp23017b/set", task_b.new_pinset_callback())?;
"nautilus-ground:14000", let comms = s.run_cyclic("comms-task", comms, 10)?;
running.clone(),
task_a.clone().new_pinset_callback(),
task_b.clone().new_pinset_callback(),
)?,
10,
)?;
let sv = &state_vector; let sv = &state_vector;
s.run_cyclic( s.run_cyclic(

View File

@@ -84,14 +84,15 @@ impl<'s> Scheduler<'s, '_> {
} }
#[allow(dead_code)] #[allow(dead_code)]
pub fn run<T>( pub fn run<'t, T>(
&self, &self,
name: impl Into<String>, name: impl Into<String>,
task: T, task: T,
) -> Result<TaskHandle<T::Message, T::Data>> ) -> Result<TaskHandle<T::Message, T::Data>>
where where
T: Task + Send + Debug + 's, T: Task + Send + Debug + 't,
T::Message: Send, T::Message: Send,
't: 's,
{ {
let name = name.into(); let name = name.into();
trace!( trace!(
@@ -109,15 +110,16 @@ impl<'s> Scheduler<'s, '_> {
Ok(TaskHandle { name, sender, data }) Ok(TaskHandle { name, sender, data })
} }
pub fn run_cyclic<T>( pub fn run_cyclic<'t, T>(
&self, &self,
name: impl Into<String>, name: impl Into<String>,
mut task: T, mut task: T,
frequency: u64, frequency: u64,
) -> Result<TaskHandle<T::Message, T::Data>> ) -> Result<TaskHandle<T::Message, T::Data>>
where where
T: CyclicTask + Send + 's, T: CyclicTask + Send + 't,
T::Message: Send, T::Message: Send,
't: 's,
{ {
let name = name.into(); let name = name.into();
trace!( trace!(