Add Generic Command Registration #11
@@ -1,6 +1,6 @@
|
|||||||
use crate::client::Client;
|
use crate::client::Client;
|
||||||
use crate::messages::command::CommandResponse;
|
use crate::messages::command::CommandResponse;
|
||||||
use api_core::command::{CommandHeader, IntoCommandDefinition};
|
use api_core::command::{Command, CommandDefinition, CommandHeader, IntoCommandDefinition};
|
||||||
use std::fmt::Display;
|
use std::fmt::Display;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::select;
|
use tokio::select;
|
||||||
@@ -15,13 +15,13 @@ impl CommandRegistry {
|
|||||||
Self { client }
|
Self { client }
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn register_handler<C: IntoCommandDefinition, F, E: Display>(
|
pub fn register_raw_handler<F, E: Display>(
|
||||||
&self,
|
&self,
|
||||||
command_name: impl Into<String>,
|
command_definition: CommandDefinition,
|
||||||
mut callback: F,
|
mut callback: F,
|
||||||
) -> CommandHandle
|
) -> CommandHandle
|
||||||
where
|
where
|
||||||
F: FnMut(CommandHeader, C) -> Result<String, E> + Send + 'static,
|
F: FnMut(Command) -> Result<String, E> + Send + 'static,
|
||||||
{
|
{
|
||||||
let cancellation_token = CancellationToken::new();
|
let cancellation_token = CancellationToken::new();
|
||||||
let result = CommandHandle {
|
let result = CommandHandle {
|
||||||
@@ -29,8 +29,6 @@ impl CommandRegistry {
|
|||||||
};
|
};
|
||||||
let client = self.client.clone();
|
let client = self.client.clone();
|
||||||
|
|
||||||
let command_definition = C::create(command_name.into());
|
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
while !cancellation_token.is_cancelled() {
|
while !cancellation_token.is_cancelled() {
|
||||||
// This would only fail if the sender closed while trying to insert data
|
// This would only fail if the sender closed while trying to insert data
|
||||||
@@ -47,17 +45,10 @@ impl CommandRegistry {
|
|||||||
select!(
|
select!(
|
||||||
rx_value = rx.recv() => {
|
rx_value = rx.recv() => {
|
||||||
if let Some((cmd, responder)) = rx_value {
|
if let Some((cmd, responder)) = rx_value {
|
||||||
let header = cmd.header.clone();
|
let response = match callback(cmd) {
|
||||||
let response = match C::parse(cmd) {
|
Ok(response) => CommandResponse {
|
||||||
Ok(cmd) => match callback(header, cmd) {
|
success: true,
|
||||||
Ok(response) => CommandResponse {
|
response,
|
||||||
success: true,
|
|
||||||
response,
|
|
||||||
},
|
|
||||||
Err(err) => CommandResponse {
|
|
||||||
success: false,
|
|
||||||
response: err.to_string(),
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
Err(err) => CommandResponse {
|
Err(err) => CommandResponse {
|
||||||
success: false,
|
success: false,
|
||||||
@@ -78,6 +69,23 @@ impl CommandRegistry {
|
|||||||
|
|
||||||
result
|
result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn register_handler<C: IntoCommandDefinition, F, E>(
|
||||||
|
&self,
|
||||||
|
command_name: impl Into<String>,
|
||||||
|
mut callback: F,
|
||||||
|
) -> CommandHandle
|
||||||
|
where
|
||||||
|
F: FnMut(CommandHeader, C) -> Result<String, E> + Send + 'static,
|
||||||
|
E: Display,
|
||||||
|
{
|
||||||
|
self.register_raw_handler(C::create(command_name.into()), move |command: Command| {
|
||||||
|
let header = command.header.clone();
|
||||||
|
C::parse(command)
|
||||||
|
.map_err(|e| e.to_string())
|
||||||
|
.and_then(|cmd| callback(header, cmd).map_err(|err| err.to_string()))
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct CommandHandle {
|
pub struct CommandHandle {
|
||||||
|
|||||||
@@ -362,10 +362,7 @@ mod tests {
|
|||||||
.unwrap()
|
.unwrap()
|
||||||
.unwrap();
|
.unwrap();
|
||||||
// This should block as there should not be space in the queue
|
// This should block as there should not be space in the queue
|
||||||
assert!(tlm_handle
|
assert!(tlm_handle.publish_now(false).now_or_never().is_none());
|
||||||
.publish_now(false)
|
|
||||||
.now_or_never()
|
|
||||||
.is_none());
|
|
||||||
|
|
||||||
let tlm_msg = timeout(Duration::from_secs(1), rx.recv())
|
let tlm_msg = timeout(Duration::from_secs(1), rx.recv())
|
||||||
.await
|
.await
|
||||||
|
|||||||
Reference in New Issue
Block a user