stores telemetry definitions

This commit is contained in:
2024-10-19 15:51:35 -07:00
parent c6b32312d8
commit f47512010f
2 changed files with 45 additions and 14 deletions

View File

@@ -3,11 +3,9 @@ use server::core::telemetry_service_client::TelemetryServiceClient;
use server::core::telemetry_value::Value;
use server::core::{TelemetryDataType, TelemetryDefinitionRequest, TelemetryItem, TelemetryValue, Timestamp, Uuid};
use std::error::Error;
use std::future::IntoFuture;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::sync::mpsc::Sender;
use tokio::task::JoinHandle;
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use tonic::codegen::tokio_stream::wrappers::ReceiverStream;
@@ -20,7 +18,6 @@ use num_traits::float::FloatConst;
struct Telemetry {
client: TelemetryServiceClient<Channel>,
tx: Sender<TelemetryItem>,
handle: Option<JoinHandle<()>>,
cancel: CancellationToken,
}
@@ -36,12 +33,12 @@ impl Telemetry {
D::Error: Into<StdError>,
{
let mut client = TelemetryServiceClient::connect(dst).await?;
let mut client_stored = client.clone();
let client_stored = client.clone();
let cancel = CancellationToken::new();
let cancel_stored = cancel.clone();
let (local_tx, mut local_rx) = mpsc::channel(128);
let handle: JoinHandle<()> = tokio::spawn(async move {
tokio::spawn(async move {
while !cancel.is_cancelled() {
let (server_tx, server_rx) = mpsc::channel(1);
let response_stream = client.insert_telemetry(ReceiverStream::new(server_rx)).await;
@@ -75,7 +72,6 @@ impl Telemetry {
Ok(Self {
client: client_stored,
tx: local_tx,
handle: Some(handle),
cancel: cancel_stored,
})
}