use api::client::Client; use api::client::telemetry::{TelemetryHandle, TelemetryRegistry}; use futures::future::{Either, join_all, select}; use futures::{TryFutureExt, pin_mut}; use log::{error, trace}; use nautilus_common::on_drop::on_drop; use nautilus_common::telemetry::{CommsState, SwitchBank, Telemetry, TelemetryMessage}; use nautilus_common::udp::UdpRecvPostcardError; use nautilus_common::udp::tokio::AsyncUdpSocketExt; use std::net::SocketAddr; use std::sync::Arc; use tokio::net::UdpSocket; use tokio::sync::{RwLock, RwLockWriteGuard}; use tokio::task::yield_now; use tokio::{join, select, try_join}; use tokio_util::sync::CancellationToken; pub struct TelemetryHandler<'a> { tlm: TelemetryRegistry, flight_addr: &'a RwLock, lock: Option>, udp: &'a UdpSocket, cancel: CancellationToken, } impl<'a> TelemetryHandler<'a> { pub fn new( client: Arc, flight_addr: &'a RwLock, lock: RwLockWriteGuard<'a, SocketAddr>, udp: &'a UdpSocket, cancel: CancellationToken, ) -> Self { Self { tlm: TelemetryRegistry::new(client), flight_addr, lock: Some(lock), udp, cancel, } } /// Run this telemetry handler. /// Note: this method is expected to block so should run in its own thread pub fn run(mut self) -> anyhow::Result<()> { let cancel = self.cancel.clone(); // Trigger a shutdown if we exit the telemetry process for some reason (including panic) let _shutdown_when_closed = on_drop(move || cancel.cancel()); let runtime = tokio::runtime::Builder::new_current_thread() .enable_all() .build()?; runtime.block_on(async { // Allow cancellation while waiting for this to initialize let mut context = { let context = TelemetryContext::new(&self); pin_mut!(context); let cancellation_future = self.cancel.cancelled(); pin_mut!(cancellation_future); let init_or_cancel = select(context, cancellation_future).await; match init_or_cancel { Either::Left((success, _)) => success, Either::Right(_) => return Ok(()), } }; let mut buffer = [0u8; 512]; while !self.cancel.is_cancelled() { select! { () = self.cancel.cancelled() => {}, incoming = self.udp.recv_postcard::(&mut buffer) => { match incoming { Ok((tlm, addr, _)) => { trace!("{tlm:?}"); let flight_addr_update = async { // The first time around we will use the lock given to us // Other times we will grab a new lock // self.re let mut lock = match self.lock.take() { None => self.flight_addr.write().await, Some(lock) => lock, }; // Update the value in the lock *lock = addr; }; tokio::pin!(flight_addr_update); // We can do these two operations concurrently join!( flight_addr_update, context.step(tlm), ); }, Err(UdpRecvPostcardError::NoData) => { // This shouldn't be possible when using a tokio socket I don't think // But let's just yield our time anyways yield_now().await; }, Err(err) => { error!("Rx error: {err}"); }, } } } } // Explicit Drop drop(self); Ok(()) }) } } struct TelemetryContext { bank_a: Vec>, bank_b: Vec>, tx_packets: TelemetryHandle, rx_packets: TelemetryHandle, tx_bytes: TelemetryHandle, rx_bytes: TelemetryHandle, tx_errors: TelemetryHandle, rx_errors: TelemetryHandle, } impl TelemetryContext { async fn new(tlm: &TelemetryHandler<'_>) -> Self { let bank_a = join_all((0..16).map(|i| tlm.tlm.register(format!("switch.bank_a.{i}")))).await; let bank_b = join_all((0..16).map(|i| tlm.tlm.register(format!("switch.bank_b.{i}")))).await; let tx_packets = tlm.tlm.register("comms.tx_packets").await; let rx_packets = tlm.tlm.register("comms.rx_packets").await; let tx_bytes = tlm.tlm.register("comms.tx_bytes").await; let rx_bytes = tlm.tlm.register("comms.rx_bytes").await; let tx_errors = tlm.tlm.register("comms.tx_errors").await; let rx_errors = tlm.tlm.register("comms.rx_errors").await; Self { bank_a, bank_b, tx_packets, rx_packets, tx_bytes, rx_bytes, tx_errors, rx_errors, } } async fn step(&mut self, tlm: Telemetry) { match tlm.message { TelemetryMessage::SwitchState { bank, switches } => { let bank_handles = match bank { SwitchBank::A => &self.bank_a, SwitchBank::B => &self.bank_b, }; assert!(bank_handles.len() >= switches.iter().len()); join_all(switches.into_iter().enumerate().map(|(i, state)| { bank_handles[i] .publish(state, tlm.timestamp) .unwrap_or_else(move |err| { // We don't need to bubble this error up, just report it error!("Failed to publish telemetry for switch {bank} {i}: {err}"); }) })) .await; } TelemetryMessage::CommsState(CommsState { tx_packets, rx_packets, tx_bytes, rx_bytes, tx_errors, rx_errors, }) => { if let Err(e) = try_join!( self.tx_packets.publish(tx_packets, tlm.timestamp), self.rx_packets.publish(rx_packets, tlm.timestamp), self.tx_bytes.publish(tx_bytes, tlm.timestamp), self.rx_bytes.publish(rx_bytes, tlm.timestamp), self.tx_errors.publish(tx_errors, tlm.timestamp), self.rx_errors.publish(rx_errors, tlm.timestamp), ) { error!("Failed to publish comms telemetry: {e}"); } } } } }