initial integration with telem viz
This commit is contained in:
142
ground/src/telemetry.rs
Normal file
142
ground/src/telemetry.rs
Normal file
@@ -0,0 +1,142 @@
|
||||
use api::client::Client;
|
||||
use api::client::telemetry::{TelemetryHandle, TelemetryRegistry};
|
||||
use futures::TryFutureExt;
|
||||
use futures::future::join_all;
|
||||
use log::{error, trace};
|
||||
use nautilus_common::on_drop::on_drop;
|
||||
use nautilus_common::telemetry::{SwitchBank, Telemetry, TelemetryMessage};
|
||||
use nautilus_common::udp::UdpRecvPostcardError;
|
||||
use nautilus_common::udp::tokio::AsyncUdpSocketExt;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
use tokio::net::UdpSocket;
|
||||
use tokio::sync::{RwLock, RwLockWriteGuard};
|
||||
use tokio::task::yield_now;
|
||||
use tokio::{join, pin, select};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
pub struct TelemetryHandler<'a> {
|
||||
tlm: TelemetryRegistry,
|
||||
flight_addr: &'a RwLock<SocketAddr>,
|
||||
lock: Option<RwLockWriteGuard<'a, SocketAddr>>,
|
||||
udp: &'a UdpSocket,
|
||||
cancel: CancellationToken,
|
||||
}
|
||||
|
||||
impl<'a> TelemetryHandler<'a> {
|
||||
pub fn new(
|
||||
client: Arc<Client>,
|
||||
flight_addr: &'a RwLock<SocketAddr>,
|
||||
lock: RwLockWriteGuard<'a, SocketAddr>,
|
||||
udp: &'a UdpSocket,
|
||||
cancel: CancellationToken,
|
||||
) -> Self {
|
||||
Self {
|
||||
tlm: TelemetryRegistry::new(client),
|
||||
flight_addr,
|
||||
lock: Some(lock),
|
||||
udp,
|
||||
cancel,
|
||||
}
|
||||
}
|
||||
|
||||
/// Run this telemetry handler.
|
||||
/// Note: this method is expected to block so should run in its own thread
|
||||
pub fn run(mut self) -> anyhow::Result<()> {
|
||||
let cancel = self.cancel.clone();
|
||||
// Trigger a shutdown if we exit the telemetry process for some reason (including panic)
|
||||
let _shutdown_when_closed = on_drop(move || cancel.cancel());
|
||||
|
||||
let runtime = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()?;
|
||||
|
||||
runtime.block_on(async {
|
||||
let mut context = TelemetryContext::new(&self).await;
|
||||
|
||||
let mut buffer = [0u8; 512];
|
||||
|
||||
while !self.cancel.is_cancelled() {
|
||||
select! {
|
||||
() = self.cancel.cancelled() => {},
|
||||
incoming = self.udp.recv_postcard::<Telemetry>(&mut buffer) => {
|
||||
match incoming {
|
||||
Ok((tlm, addr)) => {
|
||||
trace!("{tlm:?}");
|
||||
|
||||
let flight_addr_update = async {
|
||||
// The first time around we will use the lock given to us
|
||||
// Other times we will grab a new lock
|
||||
// self.re
|
||||
let mut lock = match self.lock.take() {
|
||||
None => self.flight_addr.write().await,
|
||||
Some(lock) => lock,
|
||||
};
|
||||
// Update the value in the lock
|
||||
*lock = addr;
|
||||
};
|
||||
pin!(flight_addr_update);
|
||||
|
||||
// We can do these two operations concurrently
|
||||
join!(
|
||||
flight_addr_update,
|
||||
context.step(tlm),
|
||||
);
|
||||
},
|
||||
Err(UdpRecvPostcardError::NoData) => {
|
||||
// This shouldn't be possible when using a tokio socket I don't think
|
||||
// But let's just yield our time anyways
|
||||
yield_now().await;
|
||||
},
|
||||
Err(err) => {
|
||||
error!("Rx error: {err}");
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Explicit Drop
|
||||
drop(self);
|
||||
|
||||
Ok(())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
struct TelemetryContext {
|
||||
bank_a: Vec<TelemetryHandle<bool>>,
|
||||
bank_b: Vec<TelemetryHandle<bool>>,
|
||||
}
|
||||
|
||||
impl TelemetryContext {
|
||||
async fn new(tlm: &TelemetryHandler<'_>) -> Self {
|
||||
let bank_a =
|
||||
join_all((0..16).map(|i| tlm.tlm.register::<bool>(format!("switch.bank_a.{i}")))).await;
|
||||
let bank_b =
|
||||
join_all((0..16).map(|i| tlm.tlm.register::<bool>(format!("switch.bank_b.{i}")))).await;
|
||||
|
||||
Self { bank_a, bank_b }
|
||||
}
|
||||
|
||||
async fn step(&mut self, tlm: Telemetry) {
|
||||
match tlm.message {
|
||||
TelemetryMessage::SwitchState { bank, switches } => {
|
||||
let bank_handles = match bank {
|
||||
SwitchBank::A => &self.bank_a,
|
||||
SwitchBank::B => &self.bank_b,
|
||||
};
|
||||
assert!(bank_handles.len() >= switches.iter().len());
|
||||
join_all(switches.into_iter().enumerate().map(|(i, state)| {
|
||||
bank_handles[i]
|
||||
.publish(state, tlm.timestamp)
|
||||
.unwrap_or_else(move |err| {
|
||||
// We don't need to bubble this error up, just report it
|
||||
error!("Failed to publish telemetry for switch {bank} {i}: {err}");
|
||||
})
|
||||
}))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user