diff --git a/Cargo.lock b/Cargo.lock index 8804c45..7a6c778 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -20,7 +20,7 @@ checksum = "a23eb6b1614318a8071c9b2521f36b424b2c83db5eb3a0fead4a6c0809af6e61" [[package]] name = "api" version = "0.1.0" -source = "git+https://gitea.sergeysav.com/sergeysav/telemetry_visualization.git?rev=458c94c2ad362b1a6c31c2b1ee606a9f40605e06#458c94c2ad362b1a6c31c2b1ee606a9f40605e06" +source = "git+https://gitea.sergeysav.com/sergeysav/telemetry_visualization.git?rev=44862f65d2388e19b70a03409f1c16195e8f9342#44862f65d2388e19b70a03409f1c16195e8f9342" dependencies = [ "api-core", "api-proc-macro", @@ -40,7 +40,7 @@ dependencies = [ [[package]] name = "api-core" version = "0.1.0" -source = "git+https://gitea.sergeysav.com/sergeysav/telemetry_visualization.git?rev=458c94c2ad362b1a6c31c2b1ee606a9f40605e06#458c94c2ad362b1a6c31c2b1ee606a9f40605e06" +source = "git+https://gitea.sergeysav.com/sergeysav/telemetry_visualization.git?rev=44862f65d2388e19b70a03409f1c16195e8f9342#44862f65d2388e19b70a03409f1c16195e8f9342" dependencies = [ "chrono", "derive_more", @@ -51,7 +51,7 @@ dependencies = [ [[package]] name = "api-proc-macro" version = "0.1.0" -source = "git+https://gitea.sergeysav.com/sergeysav/telemetry_visualization.git?rev=458c94c2ad362b1a6c31c2b1ee606a9f40605e06#458c94c2ad362b1a6c31c2b1ee606a9f40605e06" +source = "git+https://gitea.sergeysav.com/sergeysav/telemetry_visualization.git?rev=44862f65d2388e19b70a03409f1c16195e8f9342#44862f65d2388e19b70a03409f1c16195e8f9342" dependencies = [ "api-core", "proc-macro-error", @@ -80,6 +80,15 @@ dependencies = [ "generic-array", ] +[[package]] +name = "block2" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdeb9d870516001442e364c5220d3574d2da8dc765554b4a617230d33fa58ef5" +dependencies = [ + "objc2", +] + [[package]] name = "bumpalo" version = "3.17.0" @@ -213,11 +222,11 @@ dependencies = [ [[package]] name = "ctrlc" -version = "3.5.0" +version = "3.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "881c5d0a13b2f1498e2306e82cbada78390e152d4b1378fb28a84f4dcd0dc4f3" +checksum = "73736a89c4aff73035ba2ed2e565061954da00d4970fc9ac25dcc85a2a20d790" dependencies = [ - "dispatch", + "dispatch2", "nix", "windows-sys 0.61.1", ] @@ -262,10 +271,16 @@ dependencies = [ ] [[package]] -name = "dispatch" -version = "0.2.0" +name = "dispatch2" +version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd0c93bb4b0c6d9b77f4435b0ae98c24d17f1c45b2ff844c6151a07256ca923b" +checksum = "89a09f22a6c6069a18470eb92d2298acf25463f14256d24778e1230d789a2aec" +dependencies = [ + "bitflags", + "block2", + "libc", + "objc2", +] [[package]] name = "either" @@ -714,6 +729,21 @@ dependencies = [ "autocfg", ] +[[package]] +name = "objc2" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7c2599ce0ec54857b29ce62166b0ed9b4f6f1a70ccc9a71165b6154caca8c05" +dependencies = [ + "objc2-encode", +] + +[[package]] +name = "objc2-encode" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ef25abbcd74fb2609453eb695bd2f860d389e457f67dc17cafc8b8cbc89d0c33" + [[package]] name = "once_cell" version = "1.21.3" diff --git a/Cargo.toml b/Cargo.toml index 6076e01..9e66d1a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,10 +4,10 @@ members = ["common", "ground", "flight"] [workspace.dependencies] anyhow = "1.0.100" -api = { git = "https://gitea.sergeysav.com/sergeysav/telemetry_visualization.git", rev = "458c94c2ad362b1a6c31c2b1ee606a9f40605e06" } +api = { git = "https://gitea.sergeysav.com/sergeysav/telemetry_visualization.git", rev = "44862f65d2388e19b70a03409f1c16195e8f9342" } chrono = { version = "0.4.42", features = ["serde"] } crc = "3.4.0" -ctrlc = "3.5.0" +ctrlc = "3.5.1" derive_more = "2.1.1" embedded-hal = "1.0.0" embedded-hal-bus = "0.3.0" diff --git a/common/Cargo.toml b/common/Cargo.toml index 9b3dbd6..fabb439 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -7,7 +7,7 @@ edition = "2024" anyhow = { workspace = true } chrono = { workspace = true } ctrlc = { workspace = true } -derive_more = {workspace = true, features = ["display"]} +derive_more = {workspace = true, features = ["display", "from"]} fern = { workspace = true } log = { workspace = true } postcard = { workspace = true } diff --git a/common/src/telemetry/mod.rs b/common/src/telemetry/mod.rs index 554c860..d47c86d 100644 --- a/common/src/telemetry/mod.rs +++ b/common/src/telemetry/mod.rs @@ -1,6 +1,6 @@ use chrono::serde::ts_nanoseconds; use chrono::{DateTime, Utc}; -use derive_more::Display; +use derive_more::{Display, From}; use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Serialize, Deserialize)] @@ -16,10 +16,21 @@ pub enum SwitchBank { B, } -#[derive(Clone, Debug, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize, From)] pub enum TelemetryMessage { SwitchState { bank: SwitchBank, switches: [bool; 16], }, + CommsState(CommsState), +} + +#[derive(Clone, Debug, Default, Serialize, Deserialize)] +pub struct CommsState { + pub tx_packets: u32, + pub rx_packets: u32, + pub tx_bytes: u32, + pub rx_bytes: u32, + pub tx_errors: u32, + pub rx_errors: u32, } diff --git a/common/src/udp.rs b/common/src/udp.rs index 1236a9b..1dc8aaf 100644 --- a/common/src/udp.rs +++ b/common/src/udp.rs @@ -8,11 +8,11 @@ use thiserror::Error; #[derive(Error, Debug)] pub enum UdpRecvPostcardError { - #[error("IO Error")] + #[error("IO Error: {0}")] Io(#[from] std::io::Error), - #[error("Deserialization Error")] + #[error("Deserialization Error: {0}")] Deserialization(#[from] postcard::Error), - #[error("Extra Data")] + #[error("Extra Data: {amount}")] ExtraData { amount: usize }, #[error("No Data")] NoData, @@ -20,11 +20,11 @@ pub enum UdpRecvPostcardError { #[derive(Error, Debug)] pub enum UdpSendPostcardError { - #[error("IO Error")] + #[error("IO Error: {0}")] Io(#[from] std::io::Error), - #[error("Serialization Error")] + #[error("Serialization Error: {0}")] Serialization(#[from] postcard::Error), - #[error("Length Mismatch")] + #[error("Length Mismatch: {expected} expected. {actual} actual")] LengthMismatch { expected: usize, actual: usize }, } @@ -37,7 +37,7 @@ pub trait UdpSocketExt { fn recv_postcard<'de, T: Deserialize<'de>>( &self, buffer: &'de mut [u8], - ) -> Result<(T, SocketAddr), UdpRecvPostcardError>; + ) -> Result<(T, SocketAddr, usize), UdpRecvPostcardError>; /// Send a CBOR encoded message to an address using this socket /// @@ -48,7 +48,7 @@ pub trait UdpSocketExt { data: &T, buffer: &mut [u8], addr: A, - ) -> Result<(), UdpSendPostcardError>; + ) -> Result; /// Send a command message to an address using this socket /// @@ -59,20 +59,20 @@ pub trait UdpSocketExt { name: &str, data: &T, addr: A, - ) -> Result<(), UdpSendPostcardError>; + ) -> Result; } fn recv_postcard_inner<'de, T: Deserialize<'de>>( result: std::io::Result<(usize, SocketAddr)>, buffer: &'de mut [u8], -) -> Result<(T, SocketAddr), UdpRecvPostcardError> { +) -> Result<(T, SocketAddr, usize), UdpRecvPostcardError> { match result { Ok((size, addr)) => match postcard::take_from_bytes::(&buffer[..size]) { Ok((res, rem)) => { if !rem.is_empty() { return Err(ExtraData { amount: rem.len() }); } - Ok((res, addr)) + Ok((res, addr, size)) } Err(err) => Err(err.into()), }, @@ -86,7 +86,7 @@ fn recv_postcard_inner<'de, T: Deserialize<'de>>( fn send_inner( send_result: Result, expected_size: usize, -) -> Result<(), UdpSendPostcardError> { +) -> Result { match send_result { Ok(size_sent) => { if expected_size != size_sent { @@ -95,7 +95,7 @@ fn send_inner( actual: size_sent, }); } - Ok(()) + Ok(size_sent) } Err(e) => Err(e.into()), } @@ -105,7 +105,7 @@ impl UdpSocketExt for UdpSocket { fn recv_postcard<'de, T: Deserialize<'de>>( &self, buffer: &'de mut [u8], - ) -> Result<(T, SocketAddr), UdpRecvPostcardError> { + ) -> Result<(T, SocketAddr, usize), UdpRecvPostcardError> { recv_postcard_inner(self.recv_from(buffer), buffer) } @@ -114,7 +114,7 @@ impl UdpSocketExt for UdpSocket { data: &T, buffer: &mut [u8], addr: A, - ) -> Result<(), UdpSendPostcardError> { + ) -> Result { let result = postcard::to_slice(data, buffer)?; let size_encoded = result.len(); send_inner(self.send_to(result, addr), size_encoded) @@ -125,7 +125,7 @@ impl UdpSocketExt for UdpSocket { name: &str, data: &T, addr: A, - ) -> Result<(), UdpSendPostcardError> { + ) -> Result { let mut inner_buffer = [0u8; 512]; let inner_buffer = postcard::to_slice(data, &mut inner_buffer)?; let mut buffer = [0u8; 512]; @@ -158,7 +158,7 @@ pub mod tokio { fn recv_postcard<'de, T: Deserialize<'de>>( &self, buffer: &'de mut [u8], - ) -> impl Future>; + ) -> impl Future>; /// Send a CBOR encoded message to an address using this socket /// @@ -169,7 +169,7 @@ pub mod tokio { data: &T, buffer: &mut [u8], addr: A, - ) -> impl Future>; + ) -> impl Future>; /// Send a command message to an address using this socket /// @@ -180,14 +180,14 @@ pub mod tokio { name: &str, data: &T, addr: A, - ) -> impl Future>; + ) -> impl Future>; } impl AsyncUdpSocketExt for UdpSocket { async fn recv_postcard<'de, T: Deserialize<'de>>( &self, buffer: &'de mut [u8], - ) -> Result<(T, SocketAddr), UdpRecvPostcardError> { + ) -> Result<(T, SocketAddr, usize), UdpRecvPostcardError> { recv_postcard_inner(self.recv_from(buffer).await, buffer) } @@ -196,7 +196,7 @@ pub mod tokio { data: &T, buffer: &mut [u8], addr: A, - ) -> Result<(), UdpSendPostcardError> { + ) -> Result { let result = postcard::to_slice(data, buffer)?; let size_encoded = result.len(); send_inner(self.send_to(result, addr).await, size_encoded) @@ -207,7 +207,7 @@ pub mod tokio { name: &str, data: &T, addr: A, - ) -> Result<(), UdpSendPostcardError> { + ) -> Result { let mut inner_buffer = [0u8; 512]; let inner_buffer = postcard::to_slice(data, &mut inner_buffer)?; let mut buffer = [0u8; 512]; diff --git a/flight/src/comms/mod.rs b/flight/src/comms/mod.rs index 83c46f4..18a8cf9 100644 --- a/flight/src/comms/mod.rs +++ b/flight/src/comms/mod.rs @@ -1,8 +1,9 @@ use crate::scheduler::{CyclicTask, TaskHandle}; +use crate::state_vector::{SectionIdentifier, SectionWriter, StateVector}; use anyhow::{Result, ensure}; use log::{error, trace, warn}; use nautilus_common::command::{Command, CommandHeader}; -use nautilus_common::telemetry::{Telemetry, TelemetryMessage}; +pub(crate) use nautilus_common::telemetry::{CommsState, Telemetry, TelemetryMessage}; use nautilus_common::udp::{UdpRecvPostcardError, UdpSocketExt}; use std::any::type_name; use std::collections::HashMap; @@ -11,7 +12,7 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr, ToSocketAddrs, UdpSocket}; use std::sync::mpsc::Receiver; use std::time::Instant; -pub type TelemetrySender = TaskHandle; +pub type TelemetrySender = TaskHandle; impl TelemetrySender { pub fn send(&self, telemetry_message: TelemetryMessage) { @@ -33,6 +34,7 @@ where udp: UdpSocket, ground_address: A, command_callbacks: HashMap>, + section_writer: SectionWriter<'a, CommsState>, } impl Debug for CommsTask<'_, A> @@ -52,7 +54,7 @@ impl<'a, A> CommsTask<'a, A> where A: ToSocketAddrs + Debug, { - pub fn new(local_port: u16, ground_address: A) -> Result { + pub fn new(local_port: u16, ground_address: A, state_vector: &'a StateVector) -> Result { trace!( "CommsTask::new(local_port: {local_port}, ground_address: {ground_address:?})", type_name::() @@ -61,10 +63,13 @@ where // let bind_addr = SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), local_port); let udp = UdpSocket::bind(bind_addr)?; udp.set_nonblocking(true)?; + + let section_writer = state_vector.create_section(CommsState::default()); Ok(Self { udp, ground_address, command_callbacks: HashMap::new(), + section_writer, }) } @@ -100,13 +105,15 @@ where A: ToSocketAddrs + Debug, { type Message = Telemetry; - type Data = (); + type Data = SectionIdentifier; fn get_data(&self) -> Self::Data { trace!( "CommsTask::get_data(self: {self:?})", type_name::() ); + + self.section_writer.get_identifier() } fn step(&mut self, receiver: &Receiver, step_time: Instant) { @@ -117,29 +124,51 @@ where let mut buffer = [0u8; 512]; match self.udp.recv_postcard::(&mut buffer) { - Ok((cmd, _)) => match self.command_callbacks.get(cmd.name) { - Some(handler) => { - if let Err(e) = handler(cmd.data) { - error!("Command Error: {e}"); + Ok((cmd, _, size)) => { + self.section_writer.update(|state| { + state.rx_packets = state.rx_packets.wrapping_add(1); + state.rx_bytes = state + .rx_bytes + .wrapping_add(u32::try_from(size % (u32::MAX as usize)).unwrap_or(0)); + }); + match self.command_callbacks.get(cmd.name) { + Some(handler) => { + if let Err(e) = handler(cmd.data) { + error!("Command Error: {e}"); + } + } + None => { + warn!("Unknown Command: {}", cmd.name); } } - None => { - warn!("Unknown Command: {}", cmd.name); - } - }, + } Err(UdpRecvPostcardError::NoData) => {} Err(err) => { error!("Rx error: {err}"); + self.section_writer.update(|state| { + state.rx_errors = state.rx_errors.wrapping_add(1); + }); } } // Intentionally ignore Err case while let Ok(tlm) = receiver.try_recv() { - if let Err(err) = self + match self .udp .send_postcard(&tlm, &mut buffer, &self.ground_address) { - error!("Tx Error: {err}"); + Ok(bytes_sent) => self.section_writer.update(|state| { + state.tx_packets = state.tx_packets.wrapping_add(1); + state.tx_bytes = state + .tx_bytes + .wrapping_add(u32::try_from(bytes_sent % (u32::MAX as usize)).unwrap_or(0)); + }), + Err(err) => { + error!("Tx Error: {err}"); + self.section_writer.update(|state| { + state.tx_errors = state.tx_errors.wrapping_add(1); + }); + } } } } diff --git a/flight/src/hardware/mcp23017/task.rs b/flight/src/hardware/mcp23017/task.rs index 4db46b1..1de3f29 100644 --- a/flight/src/hardware/mcp23017/task.rs +++ b/flight/src/hardware/mcp23017/task.rs @@ -44,7 +44,7 @@ pub struct Mcp23017Data { impl Mcp23017Data { pub fn get_id(&self) -> SectionIdentifier { - self.id.clone() + self.id } } @@ -216,9 +216,10 @@ impl CyclicTask for Mcp23017Task<'_, M> { } for pin in 0u8..16u8 { - let state = self.pins.pins[pin as usize].value; - if self.pins.pins[pin as usize].changed { - self.pins.pins[pin as usize].changed = false; + let current_pin = &mut self.pins.pins[pin as usize]; + let state = current_pin.value; + if current_pin.changed { + current_pin.changed = false; // This shouldn't be able to fail // TODO: handle error case let _ = self.mcp23017.set_pin(pin, state); diff --git a/flight/src/lib.rs b/flight/src/lib.rs index 1030681..bb158d6 100644 --- a/flight/src/lib.rs +++ b/flight/src/lib.rs @@ -1,5 +1,6 @@ #![warn(clippy::all, clippy::pedantic)] -use crate::comms::CommsTask; + +use crate::comms::{CommsState, CommsTask}; use crate::hardware::Hardware; use crate::hardware::initialize; use crate::hardware::mcp23017::{Mcp23017, Mcp23017State, Mcp23017Task}; @@ -10,12 +11,12 @@ use crate::state_vector::StateVector; use anyhow::Result; use embedded_hal::pwm::SetDutyCycle; use log::info; +use nautilus_common::add_ctrlc_handler_arc; use nautilus_common::telemetry::{SwitchBank, TelemetryMessage}; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; use std::thread::sleep; use std::time::Duration; -use nautilus_common::add_ctrlc_handler_arc; mod hardware; @@ -69,30 +70,34 @@ pub fn run() -> Result<()> { )?; let b_id = task_b.get_id(); - let mut comms = CommsTask::new(15000, "nautilus-ground:14000")?; + let mut comms = CommsTask::new(15000, "nautilus-ground:14000", &state_vector)?; comms.add_command_handler("/shutdown", new_shutdown_handler(&running))?; comms.add_command_handler("/mcp23017a/set", task_a.new_set_pin_callback())?; comms.add_command_handler("/mcp23017b/set", task_b.new_set_pin_callback())?; let comms = s.run_cyclic("comms-task", comms, 10)?; + let comms_id = *comms; let sv = &state_vector; s.run_cyclic( "telemetry-producer", move || { - sv.access_section(&a_id, |state: &Mcp23017State| { + sv.access_section(a_id, |state: &Mcp23017State| { comms.send(TelemetryMessage::SwitchState { bank: SwitchBank::A, switches: state.pins, }); }); - sv.access_section(&b_id, |state: &Mcp23017State| { + sv.access_section(b_id, |state: &Mcp23017State| { comms.send(TelemetryMessage::SwitchState { bank: SwitchBank::B, switches: state.pins, }); }); + sv.access_section(comms_id, |state: &CommsState| { + comms.send(state.clone().into()); + }); }, - 10, + 1, )?; info!("Starting Main Loop"); diff --git a/flight/src/state_vector/mod.rs b/flight/src/state_vector/mod.rs index 82bc678..80d129c 100644 --- a/flight/src/state_vector/mod.rs +++ b/flight/src/state_vector/mod.rs @@ -12,7 +12,7 @@ pub struct StateVector { sections: RwLock>>>, } -#[derive(Clone, Eq, PartialEq, Hash, Debug)] +#[derive(Clone, Copy, Eq, PartialEq, Hash, Debug)] pub struct SectionIdentifier(usize); pub struct SectionWriter<'a, T> { @@ -39,7 +39,7 @@ impl SectionWriter<'_, T> { "SectionWriter::get_identifier(self: {self:?})", type_name::() ); - self.id.clone() + self.id } pub fn update(&self, f: F) -> R @@ -81,9 +81,7 @@ impl StateVector { self.sections.clear_poison(); let mut sections = self.sections.write().unwrap(); - if !sections.contains_key(&id) { - sections.insert(id.clone(), lock); - } + sections.entry(id).or_insert(lock); drop(sections); @@ -94,7 +92,7 @@ impl StateVector { } } - pub fn access_section(&self, id: &SectionIdentifier, f: F) -> Option + pub fn access_section(&self, id: SectionIdentifier, f: F) -> Option where T: 'static, F: FnOnce(&T) -> R, @@ -109,7 +107,7 @@ impl StateVector { let Ok(sections) = self.sections.read() else { return None; }; - let section = sections.get(id)?; + let section = sections.get(&id)?; section.clear_poison(); let Ok(data) = section.read() else { return None; @@ -148,14 +146,14 @@ mod tests { let id_1 = section_1.get_identifier(); - state_vector.access_section(&id_1, |s: &TestType| { + state_vector.access_section(id_1, |s: &TestType| { assert_eq!(1, s.value1); assert_eq!(2, s.value2); }); let id_2 = section_2.get_identifier(); - state_vector.access_section(&id_2, |s: &TestType| { + state_vector.access_section(id_2, |s: &TestType| { assert_eq!(3, s.value1); assert_eq!(4, s.value2); }); diff --git a/ground/src/command.rs b/ground/src/command.rs index a28eec5..d9edad2 100644 --- a/ground/src/command.rs +++ b/ground/src/command.rs @@ -1,8 +1,8 @@ +use anyhow::bail; use api::client::Client; use api::client::command::CommandRegistry; use api::macros::IntoCommandDefinition; -use chrono::TimeDelta; -use itertools::Itertools; +use chrono::{DateTime, Utc}; use log::{error, trace}; use nautilus_common::command::{Command, OwnedCommandHeader, SetPin, ValidPriorityCommand}; use nautilus_common::udp::tokio::AsyncUdpSocketExt; @@ -15,6 +15,8 @@ use tokio::sync::RwLock; use tokio::sync::mpsc::{Sender, channel}; use tokio_util::sync::CancellationToken; +const MAX_DATETIME: DateTime = DateTime::from_timestamp_nanos(i64::MAX); + pub struct CommandHandler<'a> { cmd: CommandRegistry, flight_addr: &'a RwLock, @@ -24,6 +26,7 @@ pub struct CommandHandler<'a> { #[derive(IntoCommandDefinition)] struct SetPinCommand { + index: u8, state: bool, } @@ -50,23 +53,27 @@ impl<'a> CommandHandler<'a> { runtime.block_on(async move { let (outgoing_commands_tx, mut outgoing_commands_rx) = channel::(16); - let commands = ["a", "b"].iter().cartesian_product(0..16) - .map(|(bank, i)| { + let mut commands = ["a", "b"].iter() + .map(|bank| { let command_name = format!("/mcp23017{bank}/set"); let outgoing_commands_tx = outgoing_commands_tx.clone(); self.cmd.register_handler( - format!("switch.{bank}.{i}.set"), - move |header, cmd: SetPinCommand| -> anyhow::Result<_> { - trace!("Setting Switch {bank} {i}"); + format!("switch.{bank}.set"), + move |_, cmd: SetPinCommand| -> anyhow::Result<_> { + trace!("Setting Switch {bank} {}", cmd.index); + + if cmd.index >= 16 { + bail!("Invalid Pin Number: {}", cmd.index) + } outgoing_commands_tx.try_send_command( &command_name, &ValidPriorityCommand { inner: SetPin { - pin: i, + pin: cmd.index, value: cmd.state, }, - valid_until: header.timestamp + TimeDelta::seconds(5), + valid_until: MAX_DATETIME, // header.timestamp + TimeDelta::seconds(5), priority: 0, } )?; @@ -77,6 +84,14 @@ impl<'a> CommandHandler<'a> { }) .collect::>(); + commands.push(self.cmd.register_handler("shutdown", move |_, ()| -> anyhow::Result<_> { + trace!("Shutting Down Flight"); + + outgoing_commands_tx.try_send_command("/shutdown", &())?; + + Ok("Command Executed Successfully".to_string()) + })); + let mut buffer = [0u8; 512]; while !self.cancel.is_cancelled() { select! { @@ -86,7 +101,7 @@ impl<'a> CommandHandler<'a> { None => break, Some(outgoing) => { match self.udp.send_postcard(&outgoing, &mut buffer, *self.flight_addr.read().await).await { - Ok(()) => {}, + Ok(_) => {}, Err(err) => error!("Failed to Send Outgoing {err}"), } } diff --git a/ground/src/lib.rs b/ground/src/lib.rs index f9331fd..aeb2c13 100644 --- a/ground/src/lib.rs +++ b/ground/src/lib.rs @@ -1,6 +1,5 @@ #![warn(clippy::all, clippy::pedantic)] -use nautilus_common::udp::tokio::AsyncUdpSocketExt; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; mod command; mod telemetry; @@ -94,11 +93,7 @@ pub fn run() -> Result<()> { Ok(()) as Result<()> })?; - info!("Sending Shutdown Command"); - - if let Ok(flight_addr) = flight_addr.try_read() { - block_on(async { udp.send_command("/shutdown", &(), *flight_addr).await })?; - } + info!("Shutting Down"); udp_shutdown.cancel(); udp_thread.join().map_err(|e| anyhow!("{e:?}"))??; diff --git a/ground/src/telemetry.rs b/ground/src/telemetry.rs index 86b756a..a24d516 100644 --- a/ground/src/telemetry.rs +++ b/ground/src/telemetry.rs @@ -4,7 +4,7 @@ use futures::TryFutureExt; use futures::future::join_all; use log::{error, trace}; use nautilus_common::on_drop::on_drop; -use nautilus_common::telemetry::{SwitchBank, Telemetry, TelemetryMessage}; +use nautilus_common::telemetry::{CommsState, SwitchBank, Telemetry, TelemetryMessage}; use nautilus_common::udp::UdpRecvPostcardError; use nautilus_common::udp::tokio::AsyncUdpSocketExt; use std::net::SocketAddr; @@ -12,7 +12,7 @@ use std::sync::Arc; use tokio::net::UdpSocket; use tokio::sync::{RwLock, RwLockWriteGuard}; use tokio::task::yield_now; -use tokio::{join, pin, select}; +use tokio::{join, pin, select, try_join}; use tokio_util::sync::CancellationToken; pub struct TelemetryHandler<'a> { @@ -61,7 +61,7 @@ impl<'a> TelemetryHandler<'a> { () = self.cancel.cancelled() => {}, incoming = self.udp.recv_postcard::(&mut buffer) => { match incoming { - Ok((tlm, addr)) => { + Ok((tlm, addr, _)) => { trace!("{tlm:?}"); let flight_addr_update = async { @@ -107,16 +107,38 @@ impl<'a> TelemetryHandler<'a> { struct TelemetryContext { bank_a: Vec>, bank_b: Vec>, + tx_packets: TelemetryHandle, + rx_packets: TelemetryHandle, + tx_bytes: TelemetryHandle, + rx_bytes: TelemetryHandle, + tx_errors: TelemetryHandle, + rx_errors: TelemetryHandle, } impl TelemetryContext { async fn new(tlm: &TelemetryHandler<'_>) -> Self { let bank_a = - join_all((0..16).map(|i| tlm.tlm.register::(format!("switch.bank_a.{i}")))).await; + join_all((0..16).map(|i| tlm.tlm.register(format!("switch.bank_a.{i}")))).await; let bank_b = - join_all((0..16).map(|i| tlm.tlm.register::(format!("switch.bank_b.{i}")))).await; + join_all((0..16).map(|i| tlm.tlm.register(format!("switch.bank_b.{i}")))).await; - Self { bank_a, bank_b } + let tx_packets = tlm.tlm.register("comms.tx_packets").await; + let rx_packets = tlm.tlm.register("comms.rx_packets").await; + let tx_bytes = tlm.tlm.register("comms.tx_bytes").await; + let rx_bytes = tlm.tlm.register("comms.rx_bytes").await; + let tx_errors = tlm.tlm.register("comms.tx_errors").await; + let rx_errors = tlm.tlm.register("comms.rx_errors").await; + + Self { + bank_a, + bank_b, + tx_packets, + rx_packets, + tx_bytes, + rx_bytes, + tx_errors, + rx_errors, + } } async fn step(&mut self, tlm: Telemetry) { @@ -137,6 +159,25 @@ impl TelemetryContext { })) .await; } + TelemetryMessage::CommsState(CommsState { + tx_packets, + rx_packets, + tx_bytes, + rx_bytes, + tx_errors, + rx_errors, + }) => { + if let Err(e) = try_join!( + self.tx_packets.publish(tx_packets, tlm.timestamp), + self.rx_packets.publish(rx_packets, tlm.timestamp), + self.tx_bytes.publish(tx_bytes, tlm.timestamp), + self.rx_bytes.publish(rx_bytes, tlm.timestamp), + self.tx_errors.publish(tx_errors, tlm.timestamp), + self.rx_errors.publish(rx_errors, tlm.timestamp), + ) { + error!("Failed to publish comms telemetry: {e}"); + } + } } } }