add telemetry producer to api

This commit is contained in:
2025-12-30 19:15:49 -05:00
parent a3aeff1d6f
commit 6fdbb868b7
4 changed files with 547 additions and 491 deletions

106
api/src/client/telemetry.rs Normal file
View File

@@ -0,0 +1,106 @@
use crate::client::error::MessageError;
use crate::client::Client;
use crate::data_type::DataType;
use crate::data_value::DataValue;
use crate::messages::telemetry_definition::TelemetryDefinitionRequest;
use crate::messages::telemetry_entry::TelemetryEntry;
use chrono::{DateTime, Utc};
use std::sync::Arc;
use tokio::sync::{oneshot, RwLock};
use tokio_util::sync::CancellationToken;
use uuid::Uuid;
pub struct Telemetry;
impl Telemetry {
pub async fn register(
client: Arc<Client>,
name: String,
data_type: DataType,
) -> TelemetryHandle {
let cancellation_token = CancellationToken::new();
let cancel_token = cancellation_token.clone();
let stored_client = client.clone();
let response_uuid = Arc::new(RwLock::new(Uuid::nil()));
let response_uuid_inner = response_uuid.clone();
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
let mut write_lock = Some(response_uuid_inner.write().await);
let _ = tx.send(());
while !cancel_token.is_cancelled() {
if let Ok(response) = client
.send_request(TelemetryDefinitionRequest {
name: name.clone(),
data_type,
})
.await
{
let mut lock = match write_lock {
None => response_uuid_inner.write().await,
Some(lock) => lock,
};
// Update the value in the lock
*lock = response.uuid;
// Set this value so the loop works
write_lock = None;
}
client.wait_disconnected().await;
}
});
// Wait until the write lock is acquired
let _ = rx.await;
// Wait until the write lock is released for the first time
drop(response_uuid.read().await);
TelemetryHandle {
cancellation_token,
uuid: response_uuid,
client: stored_client,
}
}
}
pub struct TelemetryHandle {
cancellation_token: CancellationToken,
uuid: Arc<RwLock<Uuid>>,
client: Arc<Client>,
}
impl TelemetryHandle {
pub async fn publish(
&self,
value: DataValue,
timestamp: DateTime<Utc>,
) -> Result<(), MessageError> {
let Ok(lock) = self.uuid.try_read() else {
return Ok(());
};
let uuid = *lock;
drop(lock);
self.client
.send_message_if_connected(TelemetryEntry {
uuid,
value,
timestamp,
})
.await
.or_else(|e| match e {
MessageError::TokioLockError(_) => Ok(()),
e => Err(e),
})?;
Ok(())
}
}
impl Drop for TelemetryHandle {
fn drop(&mut self) {
self.cancellation_token.cancel();
}
}