Add Generic Command Registration #11

Merged
sergeysav merged 1 commits from sergeysav/generic_commands into main 2026-01-01 13:11:23 -08:00
2 changed files with 26 additions and 21 deletions

View File

@@ -1,6 +1,6 @@
use crate::client::Client; use crate::client::Client;
use crate::messages::command::CommandResponse; use crate::messages::command::CommandResponse;
use api_core::command::{CommandHeader, IntoCommandDefinition}; use api_core::command::{Command, CommandDefinition, CommandHeader, IntoCommandDefinition};
use std::fmt::Display; use std::fmt::Display;
use std::sync::Arc; use std::sync::Arc;
use tokio::select; use tokio::select;
@@ -15,13 +15,13 @@ impl CommandRegistry {
Self { client } Self { client }
} }
pub fn register_handler<C: IntoCommandDefinition, F, E: Display>( pub fn register_raw_handler<F, E: Display>(
&self, &self,
command_name: impl Into<String>, command_definition: CommandDefinition,
mut callback: F, mut callback: F,
) -> CommandHandle ) -> CommandHandle
where where
F: FnMut(CommandHeader, C) -> Result<String, E> + Send + 'static, F: FnMut(Command) -> Result<String, E> + Send + 'static,
{ {
let cancellation_token = CancellationToken::new(); let cancellation_token = CancellationToken::new();
let result = CommandHandle { let result = CommandHandle {
@@ -29,8 +29,6 @@ impl CommandRegistry {
}; };
let client = self.client.clone(); let client = self.client.clone();
let command_definition = C::create(command_name.into());
tokio::spawn(async move { tokio::spawn(async move {
while !cancellation_token.is_cancelled() { while !cancellation_token.is_cancelled() {
// This would only fail if the sender closed while trying to insert data // This would only fail if the sender closed while trying to insert data
@@ -47,9 +45,7 @@ impl CommandRegistry {
select!( select!(
rx_value = rx.recv() => { rx_value = rx.recv() => {
if let Some((cmd, responder)) = rx_value { if let Some((cmd, responder)) = rx_value {
let header = cmd.header.clone(); let response = match callback(cmd) {
let response = match C::parse(cmd) {
Ok(cmd) => match callback(header, cmd) {
Ok(response) => CommandResponse { Ok(response) => CommandResponse {
success: true, success: true,
response, response,
@@ -58,11 +54,6 @@ impl CommandRegistry {
success: false, success: false,
response: err.to_string(), response: err.to_string(),
}, },
},
Err(err) => CommandResponse {
success: false,
response: err.to_string(),
},
}; };
// This should only err if we had an error elsewhere // This should only err if we had an error elsewhere
let _ = responder.send(response); let _ = responder.send(response);
@@ -78,6 +69,23 @@ impl CommandRegistry {
result result
} }
pub fn register_handler<C: IntoCommandDefinition, F, E>(
&self,
command_name: impl Into<String>,
mut callback: F,
) -> CommandHandle
where
F: FnMut(CommandHeader, C) -> Result<String, E> + Send + 'static,
E: Display,
{
self.register_raw_handler(C::create(command_name.into()), move |command: Command| {
let header = command.header.clone();
C::parse(command)
.map_err(|e| e.to_string())
.and_then(|cmd| callback(header, cmd).map_err(|err| err.to_string()))
})
}
} }
pub struct CommandHandle { pub struct CommandHandle {

View File

@@ -362,10 +362,7 @@ mod tests {
.unwrap() .unwrap()
.unwrap(); .unwrap();
// This should block as there should not be space in the queue // This should block as there should not be space in the queue
assert!(tlm_handle assert!(tlm_handle.publish_now(false).now_or_never().is_none());
.publish_now(false)
.now_or_never()
.is_none());
let tlm_msg = timeout(Duration::from_secs(1), rx.recv()) let tlm_msg = timeout(Duration::from_secs(1), rx.recv())
.await .await