clean up producer

This commit is contained in:
2024-10-19 15:21:12 -07:00
parent 17b9e50f1f
commit c6b32312d8
4 changed files with 140 additions and 47 deletions

3
Cargo.lock generated
View File

@@ -886,8 +886,11 @@ checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
name = "simple_producer"
version = "0.0.0"
dependencies = [
"chrono",
"num-traits",
"server",
"tokio",
"tokio-util",
"tonic",
]

View File

@@ -7,3 +7,6 @@ edition = "2021"
server = { path = "../../server" }
tonic = "0.12.3"
tokio = { version = "1.40.0", features = ["rt-multi-thread"] }
chrono = "0.4.38"
tokio-util = "0.7.12"
num-traits = "0.2.19"

View File

@@ -1,63 +1,150 @@
use chrono::DateTime;
use server::core::telemetry_service_client::TelemetryServiceClient;
use server::core::telemetry_value::Value;
use server::core::{TelemetryDataType, TelemetryDefinitionRequest, TelemetryItem, TelemetryValue, Timestamp};
use std::f32::consts::TAU;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use server::core::{TelemetryDataType, TelemetryDefinitionRequest, TelemetryItem, TelemetryValue, Timestamp, Uuid};
use std::error::Error;
use std::future::IntoFuture;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::sync::mpsc::Sender;
use tokio::task::JoinHandle;
use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use tonic::codegen::tokio_stream::wrappers::ReceiverStream;
use tonic::codegen::tokio_stream::StreamExt;
use tonic::codegen::StdError;
use tonic::transport::Channel;
use tonic::Request;
use num_traits::float::FloatConst;
struct Telemetry {
client: TelemetryServiceClient<Channel>,
tx: Sender<TelemetryItem>,
handle: Option<JoinHandle<()>>,
cancel: CancellationToken,
}
struct TelemetryItemHandle {
uuid: Vec<u8>,
tx: Sender<TelemetryItem>,
}
impl Telemetry {
pub async fn new<D>(dst: D) -> Result<Self, Box<dyn Error>>
where
D: TryInto<tonic::transport::Endpoint>,
D::Error: Into<StdError>,
{
let mut client = TelemetryServiceClient::connect(dst).await?;
let mut client_stored = client.clone();
let cancel = CancellationToken::new();
let cancel_stored = cancel.clone();
let (local_tx, mut local_rx) = mpsc::channel(128);
let handle: JoinHandle<()> = tokio::spawn(async move {
while !cancel.is_cancelled() {
let (server_tx, server_rx) = mpsc::channel(1);
let response_stream = client.insert_telemetry(ReceiverStream::new(server_rx)).await;
if let Ok(response_stream) = response_stream {
let mut response_stream = response_stream.into_inner();
while let Some(item) = local_rx.recv().await {
match server_tx.send(item).await {
Ok(_) => {},
Err(_) => {
break
},
}
if let Some(response) = response_stream.next().await {
match response {
Ok(_) => {},
Err(_) => {
break;
},
}
} else {
break;
}
}
} else {
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
()
});
Ok(Self {
client: client_stored,
tx: local_tx,
handle: Some(handle),
cancel: cancel_stored,
})
}
pub async fn register(&mut self, name: String, data_type: TelemetryDataType) -> Result<TelemetryItemHandle, Box<dyn Error>> {
let response = self.client.new_telemetry(Request::new(TelemetryDefinitionRequest {
name,
data_type: data_type.into(),
})).await?.into_inner();
let Some(uuid) = response.uuid else { return Err("UUID Missing".into()); };
Ok(TelemetryItemHandle {
uuid: uuid.value,
tx: self.tx.clone(),
})
}
}
impl Drop for Telemetry {
fn drop(&mut self) {
self.cancel.cancel();
}
}
impl TelemetryItemHandle {
pub async fn publish(&self, value: Value, timestamp: DateTime<chrono::Utc>) -> Result<(), Box<dyn Error>> {
let offset_from_unix_epoch = timestamp - DateTime::from_timestamp(0, 0).expect("Could not get Unix epoch");
self.tx.send(TelemetryItem {
uuid: Some(Uuid {
value: self.uuid.clone()
}),
value: Some(TelemetryValue {
value: Some(value)
}),
timestamp: Some(Timestamp {
secs: offset_from_unix_epoch.num_seconds(),
nanos: offset_from_unix_epoch.subsec_nanos(),
}),
}).await?;
Ok(())
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = TelemetryServiceClient::connect("http://[::1]:50051").await?;
async fn main() -> Result<(), Box<dyn Error>> {
let mut tlm = Telemetry::new("http://[::1]:50051").await?;
let request = Request::new(TelemetryDefinitionRequest {
name: "simple_producer/sin".into(),
data_type: TelemetryDataType::Float32.into(),
});
let response = client.new_telemetry(request).await?.into_inner();
println!("Response={:?}", response);
let Some(uuid) = response.uuid else { return Err("UUID Missing".into()); };
let (tx, rx) = mpsc::channel(128);
let mut response_stream = client.insert_telemetry(ReceiverStream::new(rx)).await?.into_inner();
let sin_tlm_handle = tlm.register("simple_producer/sin".into(), TelemetryDataType::Float32).await?;
let cos_tlm_handle = tlm.register("simple_producer/cos".into(), TelemetryDataType::Float64).await?;
let mut next_time = Instant::now();
let mut index = 0;
loop {
for _ in 0..100 {
next_time += Duration::from_millis(100);
index += 10;
tokio::time::sleep_until(next_time).await;
let now = SystemTime::now().duration_since(UNIX_EPOCH)?;
tx.send(TelemetryItem {
uuid: Some(uuid.clone()),
value: Some(TelemetryValue {
value: Some(Value::Float32(
(TAU * (index as f32) / (1000.0_f32)).sin()
)),
}),
timestamp: Some(Timestamp {
secs: now.as_secs(),
nanos: now.subsec_nanos(),
}),
}).await?;
if let Some(response) = response_stream.next().await {
match response {
Ok(_) => {
}
Err(_) => {
println!("Error");
}
}
} else {
println!("Stream closed");
break;
}
sin_tlm_handle.publish(
Value::Float32(
(f32::TAU() * (index as f32) / (1000.0_f32)).sin()
),
chrono::Utc::now(),
).await?;
cos_tlm_handle.publish(
Value::Float64(
(f64::TAU() * (index as f64) / (1000.0_f64)).cos()
),
chrono::Utc::now(),
).await?;
}
Ok(())

View File

@@ -10,7 +10,7 @@ enum TelemetryDataType {
message TelemetryValue {
oneof value {
float float_32 = 1;
float float_64 = 2;
double float_64 = 2;
}
}
@@ -20,8 +20,8 @@ message UUID {
// UTC since UNIX
message Timestamp {
fixed64 secs = 1;
fixed32 nanos = 2;
sfixed64 secs = 1;
sfixed32 nanos = 2;
}
message TelemetryDefinitionRequest {