Files
telemetry_visualization/server/src/command/service.rs

220 lines
7.6 KiB
Rust

use crate::command::command_handle::CommandHandle;
use crate::command::error::Error as CmdError;
use crate::command::error::Error::{
CommandFailure, CommandNotFound, FailedToReceiveResponse, FailedToSend,
IncorrectParameterCount, MisingParameter, NoCommandReceiver, WrongParameterType,
};
use anyhow::bail;
use api::data_type::DataType;
use api::data_value::DataValue;
use api::messages::command::{Command, CommandDefinition, CommandHeader, CommandResponse};
use api::messages::ResponseMessage;
use chrono::Utc;
use log::error;
use num_traits::FromPrimitive;
use papaya::HashMap;
use std::collections::HashMap as StdHashMap;
use tokio::sync::oneshot;
use tokio::sync::{mpsc, RwLock};
use uuid::Uuid;
#[derive(Clone)]
pub(super) struct RegisteredCommand {
pub(super) name: String,
pub(super) definition: CommandDefinition,
response_uuid: Uuid,
tx: mpsc::Sender<ResponseMessage>,
}
pub struct CommandManagementService {
registered_commands: HashMap<String, RegisteredCommand>,
outstanding_responses: RwLock<StdHashMap<Uuid, oneshot::Sender<CommandResponse>>>,
}
impl CommandManagementService {
pub fn new() -> Self {
Self {
registered_commands: HashMap::new(),
outstanding_responses: RwLock::new(StdHashMap::new()),
}
}
pub fn get_commands(&self) -> anyhow::Result<Vec<CommandDefinition>> {
let mut result = vec![];
let registered_commands = self.registered_commands.pin();
for registration in registered_commands.values() {
result.push(registration.clone().into());
}
Ok(result)
}
pub fn get_command_definition(&self, name: &String) -> Option<CommandDefinition> {
self.registered_commands
.pin()
.get(name)
.map(|registration| registration.clone().into())
}
pub fn register_command(
&self,
uuid: Uuid,
command: CommandDefinition,
tx: mpsc::Sender<ResponseMessage>,
) -> anyhow::Result<CommandHandle> {
let registered_commands = self.registered_commands.pin();
// We don't care about the previously registered command
let name = command.name.clone();
let _ = registered_commands.insert(
name.clone(),
RegisteredCommand {
response_uuid: uuid,
name: name.clone(),
definition: command,
tx,
},
);
Ok(CommandHandle::new(name, uuid))
}
pub async fn send_command(
&self,
name: impl Into<String>,
parameters: serde_json::Map<String, serde_json::Value>,
) -> Result<String, CmdError> {
let timestamp = Utc::now();
let name = name.into();
let registered_commands = self.registered_commands.pin();
let Some(registration) = registered_commands.get(&name) else {
return Err(CommandNotFound(name));
};
if parameters.len() != registration.definition.parameters.len() {
return Err(IncorrectParameterCount {
expected: registration.definition.parameters.len(),
actual: parameters.len(),
});
}
let mut result_parameters = StdHashMap::new();
for parameter in &registration.definition.parameters {
let Some(param_value) = parameters.get(&parameter.name) else {
return Err(MisingParameter(parameter.name.clone()));
};
let Some(Some(param_value)) = (match parameter.data_type {
DataType::Float32 => param_value
.as_f64()
.map(|v| Some(DataValue::Float32(v as f32))),
DataType::Float64 => param_value.as_f64().map(DataValue::Float64).map(Some),
DataType::Boolean => param_value.as_bool().map(DataValue::Boolean).map(Some),
DataType::Int8 => param_value
.as_i64()
.map(|v| Some(DataValue::Int8(i8::from_i64(v)?))),
DataType::Int16 => param_value
.as_i64()
.map(|v| Some(DataValue::Int16(i16::from_i64(v)?))),
DataType::Int32 => param_value
.as_i64()
.map(|v| Some(DataValue::Int32(i32::from_i64(v)?))),
DataType::Int64 => param_value.as_i64().map(|v| Some(DataValue::Int64(v))),
DataType::Unsigned8 => param_value
.as_u64()
.map(|v| Some(DataValue::Unsigned8(u8::from_u64(v)?))),
DataType::Unsigned16 => param_value
.as_u64()
.map(|v| Some(DataValue::Unsigned16(u16::from_u64(v)?))),
DataType::Unsigned32 => param_value
.as_u64()
.map(|v| Some(DataValue::Unsigned32(u32::from_u64(v)?))),
DataType::Unsigned64 => {
param_value.as_u64().map(|v| Some(DataValue::Unsigned64(v)))
}
}) else {
return Err(WrongParameterType {
name: parameter.name.clone(),
expected_type: parameter.data_type,
});
};
result_parameters.insert(parameter.name.clone(), param_value);
}
// Clone & Drop lets us use a standard pin instead of an owned pin
let response_uuid = registration.response_uuid;
let tx = registration.tx.clone();
drop(registered_commands);
if tx.is_closed() {
return Err(NoCommandReceiver);
}
let uuid = Uuid::new_v4();
let (response_tx, response_rx) = oneshot::channel();
{
let mut outstanding_responses = self.outstanding_responses.write().await;
outstanding_responses.insert(uuid, response_tx);
}
if let Err(e) = tx
.send(ResponseMessage {
uuid,
response: Some(response_uuid),
payload: Command {
header: CommandHeader { timestamp },
parameters: result_parameters,
}
.into(),
})
.await
{
error!("Failed to Send Command {e}");
return Err(FailedToSend);
}
response_rx
.await
.map_err(|e| {
error!("Failed to Receive Command Response: {e}");
FailedToReceiveResponse
})
.and_then(|response| {
if response.success {
Ok(response.response)
} else {
Err(CommandFailure(response.response))
}
})
}
pub async fn handle_command_response(
&self,
uuid: Uuid,
response: CommandResponse,
) -> anyhow::Result<()> {
let responder = {
let mut outstanding_responses = self.outstanding_responses.write().await;
outstanding_responses.remove(&uuid)
};
match responder {
None => bail!("Unexpected Command Response for Command {uuid}"),
Some(response_tx) => {
if let Err(e) = response_tx.send(response) {
bail!("Failed to send Command Response {e:?}");
}
}
};
Ok(())
}
pub fn unregister(&self, command_handle: CommandHandle) {
let registered_commands = self.registered_commands.pin();
// We don't care if this succeeded
let _ = registered_commands.remove_if(command_handle.name(), |_, registration| {
registration.response_uuid == *command_handle.uuid()
});
}
}