improved integration with telem viz

This commit is contained in:
2026-01-03 21:26:46 -05:00
parent 275cb07c4c
commit 252db5993d
12 changed files with 216 additions and 91 deletions

View File

@@ -1,8 +1,8 @@
use anyhow::bail;
use api::client::Client;
use api::client::command::CommandRegistry;
use api::macros::IntoCommandDefinition;
use chrono::TimeDelta;
use itertools::Itertools;
use chrono::{DateTime, Utc};
use log::{error, trace};
use nautilus_common::command::{Command, OwnedCommandHeader, SetPin, ValidPriorityCommand};
use nautilus_common::udp::tokio::AsyncUdpSocketExt;
@@ -15,6 +15,8 @@ use tokio::sync::RwLock;
use tokio::sync::mpsc::{Sender, channel};
use tokio_util::sync::CancellationToken;
const MAX_DATETIME: DateTime<Utc> = DateTime::from_timestamp_nanos(i64::MAX);
pub struct CommandHandler<'a> {
cmd: CommandRegistry,
flight_addr: &'a RwLock<SocketAddr>,
@@ -24,6 +26,7 @@ pub struct CommandHandler<'a> {
#[derive(IntoCommandDefinition)]
struct SetPinCommand {
index: u8,
state: bool,
}
@@ -50,23 +53,27 @@ impl<'a> CommandHandler<'a> {
runtime.block_on(async move {
let (outgoing_commands_tx, mut outgoing_commands_rx) = channel::<OwnedCommandHeader>(16);
let commands = ["a", "b"].iter().cartesian_product(0..16)
.map(|(bank, i)| {
let mut commands = ["a", "b"].iter()
.map(|bank| {
let command_name = format!("/mcp23017{bank}/set");
let outgoing_commands_tx = outgoing_commands_tx.clone();
self.cmd.register_handler(
format!("switch.{bank}.{i}.set"),
move |header, cmd: SetPinCommand| -> anyhow::Result<_> {
trace!("Setting Switch {bank} {i}");
format!("switch.{bank}.set"),
move |_, cmd: SetPinCommand| -> anyhow::Result<_> {
trace!("Setting Switch {bank} {}", cmd.index);
if cmd.index >= 16 {
bail!("Invalid Pin Number: {}", cmd.index)
}
outgoing_commands_tx.try_send_command(
&command_name,
&ValidPriorityCommand {
inner: SetPin {
pin: i,
pin: cmd.index,
value: cmd.state,
},
valid_until: header.timestamp + TimeDelta::seconds(5),
valid_until: MAX_DATETIME, // header.timestamp + TimeDelta::seconds(5),
priority: 0,
}
)?;
@@ -77,6 +84,14 @@ impl<'a> CommandHandler<'a> {
})
.collect::<Vec<_>>();
commands.push(self.cmd.register_handler("shutdown", move |_, ()| -> anyhow::Result<_> {
trace!("Shutting Down Flight");
outgoing_commands_tx.try_send_command("/shutdown", &())?;
Ok("Command Executed Successfully".to_string())
}));
let mut buffer = [0u8; 512];
while !self.cancel.is_cancelled() {
select! {
@@ -86,7 +101,7 @@ impl<'a> CommandHandler<'a> {
None => break,
Some(outgoing) => {
match self.udp.send_postcard(&outgoing, &mut buffer, *self.flight_addr.read().await).await {
Ok(()) => {},
Ok(_) => {},
Err(err) => error!("Failed to Send Outgoing {err}"),
}
}

View File

@@ -1,6 +1,5 @@
#![warn(clippy::all, clippy::pedantic)]
use nautilus_common::udp::tokio::AsyncUdpSocketExt;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
mod command;
mod telemetry;
@@ -94,11 +93,7 @@ pub fn run() -> Result<()> {
Ok(()) as Result<()>
})?;
info!("Sending Shutdown Command");
if let Ok(flight_addr) = flight_addr.try_read() {
block_on(async { udp.send_command("/shutdown", &(), *flight_addr).await })?;
}
info!("Shutting Down");
udp_shutdown.cancel();
udp_thread.join().map_err(|e| anyhow!("{e:?}"))??;

View File

@@ -4,7 +4,7 @@ use futures::TryFutureExt;
use futures::future::join_all;
use log::{error, trace};
use nautilus_common::on_drop::on_drop;
use nautilus_common::telemetry::{SwitchBank, Telemetry, TelemetryMessage};
use nautilus_common::telemetry::{CommsState, SwitchBank, Telemetry, TelemetryMessage};
use nautilus_common::udp::UdpRecvPostcardError;
use nautilus_common::udp::tokio::AsyncUdpSocketExt;
use std::net::SocketAddr;
@@ -12,7 +12,7 @@ use std::sync::Arc;
use tokio::net::UdpSocket;
use tokio::sync::{RwLock, RwLockWriteGuard};
use tokio::task::yield_now;
use tokio::{join, pin, select};
use tokio::{join, pin, select, try_join};
use tokio_util::sync::CancellationToken;
pub struct TelemetryHandler<'a> {
@@ -61,7 +61,7 @@ impl<'a> TelemetryHandler<'a> {
() = self.cancel.cancelled() => {},
incoming = self.udp.recv_postcard::<Telemetry>(&mut buffer) => {
match incoming {
Ok((tlm, addr)) => {
Ok((tlm, addr, _)) => {
trace!("{tlm:?}");
let flight_addr_update = async {
@@ -107,16 +107,38 @@ impl<'a> TelemetryHandler<'a> {
struct TelemetryContext {
bank_a: Vec<TelemetryHandle<bool>>,
bank_b: Vec<TelemetryHandle<bool>>,
tx_packets: TelemetryHandle<u32>,
rx_packets: TelemetryHandle<u32>,
tx_bytes: TelemetryHandle<u32>,
rx_bytes: TelemetryHandle<u32>,
tx_errors: TelemetryHandle<u32>,
rx_errors: TelemetryHandle<u32>,
}
impl TelemetryContext {
async fn new(tlm: &TelemetryHandler<'_>) -> Self {
let bank_a =
join_all((0..16).map(|i| tlm.tlm.register::<bool>(format!("switch.bank_a.{i}")))).await;
join_all((0..16).map(|i| tlm.tlm.register(format!("switch.bank_a.{i}")))).await;
let bank_b =
join_all((0..16).map(|i| tlm.tlm.register::<bool>(format!("switch.bank_b.{i}")))).await;
join_all((0..16).map(|i| tlm.tlm.register(format!("switch.bank_b.{i}")))).await;
Self { bank_a, bank_b }
let tx_packets = tlm.tlm.register("comms.tx_packets").await;
let rx_packets = tlm.tlm.register("comms.rx_packets").await;
let tx_bytes = tlm.tlm.register("comms.tx_bytes").await;
let rx_bytes = tlm.tlm.register("comms.rx_bytes").await;
let tx_errors = tlm.tlm.register("comms.tx_errors").await;
let rx_errors = tlm.tlm.register("comms.rx_errors").await;
Self {
bank_a,
bank_b,
tx_packets,
rx_packets,
tx_bytes,
rx_bytes,
tx_errors,
rx_errors,
}
}
async fn step(&mut self, tlm: Telemetry) {
@@ -137,6 +159,25 @@ impl TelemetryContext {
}))
.await;
}
TelemetryMessage::CommsState(CommsState {
tx_packets,
rx_packets,
tx_bytes,
rx_bytes,
tx_errors,
rx_errors,
}) => {
if let Err(e) = try_join!(
self.tx_packets.publish(tx_packets, tlm.timestamp),
self.rx_packets.publish(rx_packets, tlm.timestamp),
self.tx_bytes.publish(tx_bytes, tlm.timestamp),
self.rx_bytes.publish(rx_bytes, tlm.timestamp),
self.tx_errors.publish(tx_errors, tlm.timestamp),
self.rx_errors.publish(rx_errors, tlm.timestamp),
) {
error!("Failed to publish comms telemetry: {e}");
}
}
}
}
}