Replace gRPC Backend #10

Merged
sergeysav merged 15 commits from ssavelyev/grpc_removal into main 2026-01-01 10:11:53 -08:00
10 changed files with 210 additions and 234 deletions
Showing only changes of commit b8475a12ad - Show all commits

View File

@@ -7,6 +7,6 @@ authors = ["Sergey <me@sergeysav.com>"]
[dependencies] [dependencies]
chrono = { workspace = true, features = ["serde"] } chrono = { workspace = true, features = ["serde"] }
derive_more = { workspace = true, features = ["try_into"] } derive_more = { workspace = true, features = ["from", "try_into"] }
serde = { workspace = true, features = ["derive"] } serde = { workspace = true, features = ["derive"] }
thiserror = { workspace = true } thiserror = { workspace = true }

View File

@@ -1,3 +1,4 @@
use crate::data_value::DataValue;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)] #[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
@@ -7,7 +8,7 @@ pub enum DataType {
Boolean, Boolean,
} }
pub trait ToDataType { pub trait ToDataType: Into<DataValue> {
const DATA_TYPE: DataType; const DATA_TYPE: DataType;
} }

View File

@@ -1,7 +1,7 @@
use derive_more::TryInto; use derive_more::{From, TryInto};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Copy, Serialize, Deserialize, TryInto)] #[derive(Debug, Clone, Copy, Serialize, Deserialize, From, TryInto)]
pub enum DataValue { pub enum DataValue {
Float32(f32), Float32(f32),
Float64(f64), Float64(f64),

View File

@@ -5,12 +5,18 @@ use std::fmt::Display;
use std::sync::Arc; use std::sync::Arc;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
pub struct Commanding; pub struct CommandRegistry {
client: Arc<Client>,
}
impl CommandRegistry {
pub fn new(client: Arc<Client>) -> Self {
Self { client }
}
impl Commanding {
pub fn register_handler<C: IntoCommandDefinition, F, E: Display>( pub fn register_handler<C: IntoCommandDefinition, F, E: Display>(
client: Arc<Client>, &self,
command_name: String, command_name: impl Into<String>,
mut callback: F, mut callback: F,
) -> CommandHandle ) -> CommandHandle
where where
@@ -20,8 +26,9 @@ impl Commanding {
let result = CommandHandle { let result = CommandHandle {
cancellation_token: cancellation_token.clone(), cancellation_token: cancellation_token.clone(),
}; };
let client = self.client.clone();
let command_definition = C::create(command_name); let command_definition = C::create(command_name.into());
tokio::spawn(async move { tokio::spawn(async move {
while !cancellation_token.is_cancelled() { while !cancellation_token.is_cancelled() {

11
api/src/client/config.rs Normal file
View File

@@ -0,0 +1,11 @@
pub struct ClientConfiguration {
pub send_buffer_size: usize,
}
impl Default for ClientConfiguration {
fn default() -> Self {
Self {
send_buffer_size: 128,
}
}
}

View File

@@ -1,3 +1,4 @@
use crate::client::config::ClientConfiguration;
use crate::client::error::{ConnectError, MessageError}; use crate::client::error::{ConnectError, MessageError};
use crate::client::{Callback, ClientChannel, OutgoingMessage, RegisteredCallback}; use crate::client::{Callback, ClientChannel, OutgoingMessage, RegisteredCallback};
use crate::messages::callback::GenericCallbackError; use crate::messages::callback::GenericCallbackError;
@@ -23,6 +24,7 @@ pub struct ClientContext {
pub cancel: CancellationToken, pub cancel: CancellationToken,
pub request: Request, pub request: Request,
pub connected_state_tx: watch::Sender<bool>, pub connected_state_tx: watch::Sender<bool>,
pub client_configuration: ClientConfiguration,
} }
impl ClientContext { impl ClientContext {
@@ -71,7 +73,7 @@ impl ClientContext {
}; };
info!("Connected to {}", self.request.uri()); info!("Connected to {}", self.request.uri());
let (tx, rx) = mpsc::channel(128); let (tx, rx) = mpsc::channel(self.client_configuration.send_buffer_size);
*write_lock = tx; *write_lock = tx;
drop(write_lock); drop(write_lock);

View File

@@ -1,8 +1,10 @@
pub mod command; pub mod command;
mod config;
mod context; mod context;
pub mod error; pub mod error;
pub mod telemetry; pub mod telemetry;
use crate::client::config::ClientConfiguration;
use crate::client::error::{MessageError, RequestError}; use crate::client::error::{MessageError, RequestError};
use crate::messages::callback::GenericCallbackError; use crate::messages::callback::GenericCallbackError;
use crate::messages::payload::RequestMessagePayload; use crate::messages::payload::RequestMessagePayload;
@@ -41,6 +43,16 @@ pub struct Client {
impl Client { impl Client {
pub fn connect<R>(request: R) -> Result<Self, ConnectError> pub fn connect<R>(request: R) -> Result<Self, ConnectError>
where
R: IntoClientRequest,
{
Self::connect_with_config(request, ClientConfiguration::default())
}
pub fn connect_with_config<R>(
request: R,
config: ClientConfiguration,
) -> Result<Self, ConnectError>
where where
R: IntoClientRequest, R: IntoClientRequest,
{ {
@@ -52,6 +64,7 @@ impl Client {
cancel: cancel.clone(), cancel: cancel.clone(),
request: request.into_client_request()?, request: request.into_client_request()?,
connected_state_tx, connected_state_tx,
client_configuration: config,
}; };
context.start(channel.clone())?; context.start(channel.clone())?;

View File

@@ -1,77 +1,104 @@
use crate::client::error::MessageError; use crate::client::error::MessageError;
use crate::client::Client; use crate::client::Client;
use crate::data_type::DataType;
use crate::data_value::DataValue; use crate::data_value::DataValue;
use crate::messages::telemetry_definition::TelemetryDefinitionRequest; use crate::messages::telemetry_definition::TelemetryDefinitionRequest;
use crate::messages::telemetry_entry::TelemetryEntry; use crate::messages::telemetry_entry::TelemetryEntry;
use api_core::data_type::{DataType, ToDataType};
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use std::marker::PhantomData;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::{oneshot, RwLock}; use tokio::sync::{oneshot, RwLock};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use uuid::Uuid; use uuid::Uuid;
pub struct Telemetry; pub struct TelemetryRegistry {
client: Arc<Client>,
}
impl Telemetry { impl TelemetryRegistry {
pub async fn register( pub fn new(client: Arc<Client>) -> Self {
client: Arc<Client>, Self { client }
name: String, }
#[inline]
pub async fn register_generic(
&self,
name: impl Into<String>,
data_type: DataType, data_type: DataType,
) -> TelemetryHandle { ) -> GenericTelemetryHandle {
let cancellation_token = CancellationToken::new(); // inner for compilation performance
let cancel_token = cancellation_token.clone(); async fn inner(
let stored_client = client.clone(); client: Arc<Client>,
name: String,
data_type: DataType,
) -> GenericTelemetryHandle {
let cancellation_token = CancellationToken::new();
let cancel_token = cancellation_token.clone();
let stored_client = client.clone();
let response_uuid = Arc::new(RwLock::new(Uuid::nil())); let response_uuid = Arc::new(RwLock::new(Uuid::nil()));
let response_uuid_inner = response_uuid.clone(); let response_uuid_inner = response_uuid.clone();
let (tx, rx) = oneshot::channel(); let (tx, rx) = oneshot::channel();
tokio::spawn(async move { tokio::spawn(async move {
let mut write_lock = Some(response_uuid_inner.write().await); let mut write_lock = Some(response_uuid_inner.write().await);
let _ = tx.send(()); let _ = tx.send(());
while !cancel_token.is_cancelled() { while !cancel_token.is_cancelled() {
if let Ok(response) = client if let Ok(response) = client
.send_request(TelemetryDefinitionRequest { .send_request(TelemetryDefinitionRequest {
name: name.clone(), name: name.clone(),
data_type, data_type,
}) })
.await .await
{ {
let mut lock = match write_lock { let mut lock = match write_lock {
None => response_uuid_inner.write().await, None => response_uuid_inner.write().await,
Some(lock) => lock, Some(lock) => lock,
}; };
// Update the value in the lock // Update the value in the lock
*lock = response.uuid; *lock = response.uuid;
// Set this value so the loop works // Set this value so the loop works
write_lock = None; write_lock = None;
}
client.wait_disconnected().await;
} }
});
client.wait_disconnected().await; // Wait until the write lock is acquired
let _ = rx.await;
// Wait until the write lock is released for the first time
drop(response_uuid.read().await);
GenericTelemetryHandle {
cancellation_token,
uuid: response_uuid,
client: stored_client,
} }
});
// Wait until the write lock is acquired
let _ = rx.await;
// Wait until the write lock is released for the first time
drop(response_uuid.read().await);
TelemetryHandle {
cancellation_token,
uuid: response_uuid,
client: stored_client,
} }
inner(self.client.clone(), name.into(), data_type).await
}
#[inline]
pub async fn register<T: ToDataType>(&self, name: impl Into<String>) -> TelemetryHandle<T> {
self.register_generic(name, T::DATA_TYPE).await.coerce()
} }
} }
pub struct TelemetryHandle { impl Drop for GenericTelemetryHandle {
fn drop(&mut self) {
self.cancellation_token.cancel();
}
}
pub struct GenericTelemetryHandle {
cancellation_token: CancellationToken, cancellation_token: CancellationToken,
uuid: Arc<RwLock<Uuid>>, uuid: Arc<RwLock<Uuid>>,
client: Arc<Client>, client: Arc<Client>,
} }
impl TelemetryHandle { impl GenericTelemetryHandle {
pub async fn publish( pub async fn publish(
&self, &self,
value: DataValue, value: DataValue,
@@ -97,10 +124,42 @@ impl TelemetryHandle {
Ok(()) Ok(())
} }
}
impl Drop for TelemetryHandle { #[inline]
fn drop(&mut self) { pub async fn publish_now(&self, value: DataValue) -> Result<(), MessageError> {
self.cancellation_token.cancel(); self.publish(value, Utc::now()).await
}
fn coerce<T: Into<DataValue>>(self) -> TelemetryHandle<T> {
TelemetryHandle::<T> {
generic_handle: self,
_phantom: PhantomData,
}
}
}
pub struct TelemetryHandle<T> {
generic_handle: GenericTelemetryHandle,
_phantom: PhantomData<T>,
}
impl<T> TelemetryHandle<T> {
pub fn to_generic(self) -> GenericTelemetryHandle {
self.generic_handle
}
pub fn as_generic(&self) -> &GenericTelemetryHandle {
&self.generic_handle
}
}
impl<T: Into<DataValue>> TelemetryHandle<T> {
#[inline]
pub async fn publish(&self, value: T, timestamp: DateTime<Utc>) -> Result<(), MessageError> {
self.as_generic().publish(value.into(), timestamp).await
}
#[inline]
pub async fn publish_now(&self, value: T) -> Result<(), MessageError> {
self.publish(value, Utc::now()).await
} }
} }

View File

@@ -1,4 +1,4 @@
use api::client::command::Commanding; use api::client::command::CommandRegistry;
use api::client::Client; use api::client::Client;
use api::macros::IntoCommandDefinition; use api::macros::IntoCommandDefinition;
use api::messages::command::CommandHeader; use api::messages::command::CommandHeader;
@@ -20,13 +20,14 @@ async fn main() -> Result<(), Box<dyn Error>> {
}); });
} }
let client = Arc::new(Client::connect("ws://[::1]:8080/backend")?); let client = Arc::new(Client::connect("ws://localhost:8080/backend")?);
let cmd = CommandRegistry::new(client);
let handle = let handle = cmd.register_handler("simple_command/a", handle_command);
Commanding::register_handler(client, "simple_command/a".to_string(), handle_command);
cancellation_token.cancelled().await; cancellation_token.cancelled().await;
// This will automatically drop when we return
drop(handle); drop(handle);
Ok(()) Ok(())

View File

@@ -1,10 +1,9 @@
use api::client::telemetry::Telemetry; use api::client::telemetry::TelemetryRegistry;
use api::client::Client; use api::client::Client;
use api::data_type::DataType;
use api::data_value::DataValue;
use chrono::{TimeDelta, Utc}; use chrono::{TimeDelta, Utc};
use futures_util::future::join_all; use futures_util::future::join_all;
use num_traits::FloatConst; use num_traits::FloatConst;
use std::f64;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::time::{sleep_until, Instant}; use tokio::time::{sleep_until, Instant};
@@ -14,111 +13,31 @@ use tokio_util::sync::CancellationToken;
async fn main() -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> {
env_logger::init(); env_logger::init();
let client = Arc::new(Client::connect("ws://[::1]:8080/backend")?); let client = Arc::new(Client::connect("ws://localhost:8080/backend")?);
let tlm = TelemetryRegistry::new(client);
let time_offset = Telemetry::register( let time_offset = tlm.register::<f64>("simple_producer/time_offset").await;
client.clone(),
"simple_producer/time_offset".into(), let publish_offset = tlm.register::<f64>("simple_producer/publish_offset").await;
DataType::Float64,
) let await_offset = tlm.register::<f64>("simple_producer/await_offset").await;
let bool_tlm_handle = tlm.register::<bool>("simple_producer/bool").await;
let sin_handles = join_all((1..=6).map(|i| {
tlm.register::<f32>(format!(
"simple_producer/sin{}",
if i == 1 { "".into() } else { i.to_string() }
))
}))
.await; .await;
let publish_offset = Telemetry::register( let cos_handles = join_all((1..=6).map(|i| {
client.clone(), tlm.register::<f64>(format!(
"simple_producer/publish_offset".into(), "simple_producer/cos{}",
DataType::Float64, if i == 1 { "".into() } else { i.to_string() }
) ))
.await; }))
let await_offset = Telemetry::register(
client.clone(),
"simple_producer/await_offset".into(),
DataType::Float64,
)
.await;
let sin_tlm_handle = Telemetry::register(
client.clone(),
"simple_producer/sin".into(),
DataType::Float32,
)
.await;
let cos_tlm_handle = Telemetry::register(
client.clone(),
"simple_producer/cos".into(),
DataType::Float64,
)
.await;
let bool_tlm_handle = Telemetry::register(
client.clone(),
"simple_producer/bool".into(),
DataType::Boolean,
)
.await;
let sin2_tlm_handle = Telemetry::register(
client.clone(),
"simple_producer/sin2".into(),
DataType::Float32,
)
.await;
let cos2_tlm_handle = Telemetry::register(
client.clone(),
"simple_producer/cos2".into(),
DataType::Float64,
)
.await;
let sin3_tlm_handle = Telemetry::register(
client.clone(),
"simple_producer/sin3".into(),
DataType::Float32,
)
.await;
let cos3_tlm_handle = Telemetry::register(
client.clone(),
"simple_producer/cos3".into(),
DataType::Float64,
)
.await;
let sin4_tlm_handle = Telemetry::register(
client.clone(),
"simple_producer/sin4".into(),
DataType::Float32,
)
.await;
let cos4_tlm_handle = Telemetry::register(
client.clone(),
"simple_producer/cos4".into(),
DataType::Float64,
)
.await;
let sin5_tlm_handle = Telemetry::register(
client.clone(),
"simple_producer/sin5".into(),
DataType::Float32,
)
.await;
let cos5_tlm_handle = Telemetry::register(
client.clone(),
"simple_producer/cos5".into(),
DataType::Float64,
)
.await;
let sin6_tlm_handle = Telemetry::register(
client.clone(),
"simple_producer/sin6".into(),
DataType::Float32,
)
.await;
let cos6_tlm_handle = Telemetry::register(
client.clone(),
"simple_producer/cos6".into(),
DataType::Float64,
)
.await; .await;
let cancellation_token = CancellationToken::new(); let cancellation_token = CancellationToken::new();
@@ -134,81 +53,44 @@ async fn main() -> anyhow::Result<()> {
let start_instant = Instant::now(); let start_instant = Instant::now();
let mut next_time = start_instant; let mut next_time = start_instant;
let mut index = 0; let mut index = 0;
let mut tasks = vec![];
while !cancellation_token.is_cancelled() { while !cancellation_token.is_cancelled() {
next_time += Duration::from_millis(10); next_time += Duration::from_millis(10);
index += 1; index += 1;
sleep_until(next_time).await; sleep_until(next_time).await;
let publish_time = start_time + TimeDelta::from_std(next_time - start_instant).unwrap(); let publish_time = start_time + TimeDelta::from_std(next_time - start_instant).unwrap();
let actual_time = Instant::now(); let actual_time = Instant::now();
tasks.push(time_offset.publish(
DataValue::Float64((actual_time - next_time).as_secs_f64()),
Utc::now(),
));
tasks.push(sin_tlm_handle.publish(
DataValue::Float32((f32::TAU() * (index as f32) / (1000.0_f32)).sin()),
publish_time,
));
tasks.push(cos_tlm_handle.publish(
DataValue::Float64((f64::TAU() * (index as f64) / (1000.0_f64)).cos()),
publish_time,
));
tasks.push(bool_tlm_handle.publish(DataValue::Boolean(index % 1000 > 500), publish_time));
tasks.push(sin2_tlm_handle.publish(
DataValue::Float32((f32::TAU() * (index as f32) / (500.0_f32)).sin()),
publish_time,
));
tasks.push(cos2_tlm_handle.publish(
DataValue::Float64((f64::TAU() * (index as f64) / (500.0_f64)).cos()),
publish_time,
));
tasks.push(sin3_tlm_handle.publish(
DataValue::Float32((f32::TAU() * (index as f32) / (333.0_f32)).sin()),
publish_time,
));
tasks.push(cos3_tlm_handle.publish(
DataValue::Float64((f64::TAU() * (index as f64) / (333.0_f64)).cos()),
publish_time,
));
tasks.push(sin4_tlm_handle.publish(
DataValue::Float32((f32::TAU() * (index as f32) / (250.0_f32)).sin()),
publish_time,
));
tasks.push(cos4_tlm_handle.publish(
DataValue::Float64((f64::TAU() * (index as f64) / (250.0_f64)).cos()),
publish_time,
));
tasks.push(sin5_tlm_handle.publish(
DataValue::Float32((f32::TAU() * (index as f32) / (200.0_f32)).sin()),
publish_time,
));
tasks.push(cos5_tlm_handle.publish(
DataValue::Float64((f64::TAU() * (index as f64) / (200.0_f64)).cos()),
publish_time,
));
tasks.push(sin6_tlm_handle.publish(
DataValue::Float32((f32::TAU() * (index as f32) / (166.0_f32)).sin()),
publish_time,
));
tasks.push(cos6_tlm_handle.publish(
DataValue::Float64((f64::TAU() * (index as f64) / (166.0_f64)).cos()),
publish_time,
));
tasks.push(publish_offset.publish( // Due to how telemetry handles are implemented, unless the send buffer is full awaiting
DataValue::Float64((Instant::now() - actual_time).as_secs_f64()), // these will return immediately
Utc::now(), time_offset
)); .publish_now((actual_time - next_time).as_secs_f64())
.await?;
bool_tlm_handle
.publish(index % 1000 > 500, publish_time)
.await?;
// Join the tasks so they all run in parallel for (i, sin) in sin_handles.iter().enumerate() {
for task in join_all(tasks.drain(..)).await { sin.publish(
task?; (f32::TAU() * (index as f32) / (1000.0_f32 / (i + 1) as f32)).sin(),
publish_time,
)
.await?;
}
for (i, cos) in cos_handles.iter().enumerate() {
cos.publish(
(f64::TAU() * (index as f64) / (1000.0_f64 / (i + 1) as f64)).cos(),
publish_time,
)
.await?;
} }
tasks.push(await_offset.publish( publish_offset
DataValue::Float64((Instant::now() - actual_time).as_secs_f64()), .publish((Instant::now() - actual_time).as_secs_f64(), Utc::now())
Utc::now(), .await?;
));
await_offset
.publish((Instant::now() - actual_time).as_secs_f64(), Utc::now())
.await?;
} }
Ok(()) Ok(())