Implement Commanding #6
14
Cargo.lock
generated
14
Cargo.lock
generated
@@ -2128,6 +2128,20 @@ dependencies = [
|
||||
"rand_core 0.6.4",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "simple_command"
|
||||
version = "0.0.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"chrono",
|
||||
"log",
|
||||
"num-traits",
|
||||
"server",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tonic",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "simple_producer"
|
||||
version = "0.0.0"
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
[workspace]
|
||||
members = ["server", "examples/simple_producer"]
|
||||
members = ["server", "examples/simple_producer", "examples/simple_command"]
|
||||
resolver = "2"
|
||||
|
||||
[profile.dev.package.sqlx-macros]
|
||||
|
||||
14
examples/simple_command/Cargo.toml
Normal file
14
examples/simple_command/Cargo.toml
Normal file
@@ -0,0 +1,14 @@
|
||||
|
||||
[package]
|
||||
name = "simple_command"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
server = { path = "../../server" }
|
||||
tonic = "0.12.3"
|
||||
tokio = { version = "1.43.0", features = ["rt-multi-thread", "signal"] }
|
||||
chrono = "0.4.39"
|
||||
tokio-util = "0.7.13"
|
||||
num-traits = "0.2.19"
|
||||
log = "0.4.29"
|
||||
anyhow = "1.0.100"
|
||||
141
examples/simple_command/src/main.rs
Normal file
141
examples/simple_command/src/main.rs
Normal file
@@ -0,0 +1,141 @@
|
||||
use chrono::DateTime;
|
||||
use server::core::client_side_command::Inner;
|
||||
use server::core::command_service_client::CommandServiceClient;
|
||||
use server::core::telemetry_value::Value;
|
||||
use server::core::{
|
||||
ClientSideCommand, Command, CommandDefinitionRequest, CommandParameterDefinition,
|
||||
CommandResponse, TelemetryDataType,
|
||||
};
|
||||
use std::error::Error;
|
||||
use tokio::select;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tonic::codegen::tokio_stream::wrappers::ReceiverStream;
|
||||
use tonic::codegen::tokio_stream::StreamExt;
|
||||
use tonic::transport::Channel;
|
||||
|
||||
struct CommandHandler {
|
||||
handle: JoinHandle<()>,
|
||||
}
|
||||
|
||||
impl CommandHandler {
|
||||
pub async fn new<F: Fn(Command) -> CommandResponse + Send + 'static>(
|
||||
cancellation_token: CancellationToken,
|
||||
client: &mut CommandServiceClient<Channel>,
|
||||
command_definition_request: CommandDefinitionRequest,
|
||||
handler: F,
|
||||
) -> anyhow::Result<Self> {
|
||||
let (tx, rx) = mpsc::channel(4);
|
||||
|
||||
// The buffer size of 4 means this is safe to send immediately
|
||||
tx.send(ClientSideCommand {
|
||||
inner: Some(Inner::Request(command_definition_request)),
|
||||
})
|
||||
.await?;
|
||||
let response = client.new_command(ReceiverStream::new(rx)).await?;
|
||||
let mut cmd_stream = response.into_inner();
|
||||
|
||||
let handle = tokio::spawn(async move {
|
||||
loop {
|
||||
select! {
|
||||
_ = cancellation_token.cancelled() => break,
|
||||
Some(msg) = cmd_stream.next() => {
|
||||
match msg {
|
||||
Ok(cmd) => {
|
||||
let uuid = cmd.uuid.clone();
|
||||
let mut response = handler(cmd);
|
||||
response.uuid = uuid;
|
||||
match tx.send(ClientSideCommand {
|
||||
inner: Some(Inner::Response(response))
|
||||
}).await {
|
||||
Ok(()) => {},
|
||||
Err(e) => {
|
||||
println!("SendError: {e}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
println!("Error: {e}");
|
||||
break;
|
||||
}
|
||||
}
|
||||
},
|
||||
else => break,
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(Self { handle })
|
||||
}
|
||||
|
||||
pub async fn join(self) -> anyhow::Result<()> {
|
||||
Ok(self.handle.await?)
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn Error>> {
|
||||
let cancellation_token = CancellationToken::new();
|
||||
{
|
||||
let cancellation_token = cancellation_token.clone();
|
||||
tokio::spawn(async move {
|
||||
let _ = tokio::signal::ctrl_c().await;
|
||||
cancellation_token.cancel();
|
||||
});
|
||||
}
|
||||
|
||||
let mut client = CommandServiceClient::connect("http://[::1]:50051").await?;
|
||||
|
||||
let cmd_handler = CommandHandler::new(
|
||||
cancellation_token,
|
||||
&mut client,
|
||||
CommandDefinitionRequest {
|
||||
name: "simple_command/a".to_string(),
|
||||
parameters: vec![
|
||||
CommandParameterDefinition {
|
||||
name: "a".to_string(),
|
||||
data_type: TelemetryDataType::Float32.into(),
|
||||
},
|
||||
CommandParameterDefinition {
|
||||
name: "b".to_string(),
|
||||
data_type: TelemetryDataType::Float64.into(),
|
||||
},
|
||||
CommandParameterDefinition {
|
||||
name: "c".to_string(),
|
||||
data_type: TelemetryDataType::Boolean.into(),
|
||||
},
|
||||
],
|
||||
},
|
||||
|command| {
|
||||
let timestamp = command.timestamp.expect("Missing Timestamp");
|
||||
let timestamp = DateTime::from_timestamp(timestamp.secs, timestamp.nanos as u32)
|
||||
.expect("Could not construct date time");
|
||||
let Value::Float32(a) = command.parameters["a"].value.expect("Missing Value a") else {
|
||||
panic!("Wrong Type a");
|
||||
};
|
||||
let Value::Float64(b) = command.parameters["b"].value.expect("Missing Value b") else {
|
||||
panic!("Wrong Type b");
|
||||
};
|
||||
let Value::Boolean(c) = command.parameters["c"].value.expect("Missing Value c") else {
|
||||
panic!("Wrong Type c");
|
||||
};
|
||||
|
||||
println!("Command Received:\n timestamp: {timestamp}\n a: {a}\n b: {b}\n c: {c}");
|
||||
|
||||
CommandResponse {
|
||||
uuid: command.uuid.clone(),
|
||||
success: true,
|
||||
response: format!(
|
||||
"Successfully Received Command! timestamp: {timestamp} a: {a} b: {b} c: {c}"
|
||||
),
|
||||
}
|
||||
},
|
||||
)
|
||||
.await?;
|
||||
|
||||
cmd_handler.join().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
40
frontend/src/components/CommandInput.vue
Normal file
40
frontend/src/components/CommandInput.vue
Normal file
@@ -0,0 +1,40 @@
|
||||
<script setup lang="ts">
|
||||
import { computed, onMounted } from 'vue';
|
||||
import {
|
||||
type AnyTypeId,
|
||||
type DynamicDataType,
|
||||
isBooleanType,
|
||||
isNumericType,
|
||||
} from '@/composables/dynamic.ts';
|
||||
|
||||
const props = defineProps<{
|
||||
type: AnyTypeId;
|
||||
}>();
|
||||
|
||||
const model = defineModel<DynamicDataType>();
|
||||
|
||||
const is_numeric = computed(() => {
|
||||
return isNumericType(props.type);
|
||||
});
|
||||
|
||||
const is_boolean = computed(() => {
|
||||
return isBooleanType(props.type);
|
||||
});
|
||||
|
||||
// Initialize the parameter to some value:
|
||||
onMounted(() => {
|
||||
if (is_numeric.value) {
|
||||
model.value = 0.0;
|
||||
} else if (is_boolean.value) {
|
||||
model.value = false;
|
||||
}
|
||||
});
|
||||
</script>
|
||||
|
||||
<template>
|
||||
<input v-if="is_numeric" type="number" v-model="model" />
|
||||
<input v-else-if="is_boolean" type="checkbox" v-model="model" />
|
||||
<span v-else>UNKNOWN INPUT</span>
|
||||
</template>
|
||||
|
||||
<style scoped lang="scss"></style>
|
||||
70
frontend/src/components/CommandList.vue
Normal file
70
frontend/src/components/CommandList.vue
Normal file
@@ -0,0 +1,70 @@
|
||||
<script setup lang="ts">
|
||||
import { type CommandDefinition, useAllCommands } from '@/composables/command';
|
||||
import { computed } from 'vue';
|
||||
|
||||
const props = defineProps<{
|
||||
search?: string;
|
||||
}>();
|
||||
|
||||
const selected = defineModel<CommandDefinition | null>();
|
||||
|
||||
const search_value = computed(() => (props.search || '').toLowerCase());
|
||||
|
||||
const { data: command_data } = useAllCommands();
|
||||
|
||||
const sorted_cmd_data = computed(() => {
|
||||
const cmd_data = command_data.value;
|
||||
if (cmd_data != null) {
|
||||
return cmd_data
|
||||
.filter((entry) =>
|
||||
entry.name.toLowerCase().includes(search_value.value),
|
||||
)
|
||||
.sort((a, b) => a.name.localeCompare(b.name));
|
||||
}
|
||||
return [];
|
||||
});
|
||||
|
||||
function onClick(cmd_entry: CommandDefinition) {
|
||||
selected.value = cmd_entry;
|
||||
}
|
||||
</script>
|
||||
|
||||
<template>
|
||||
<template v-if="sorted_cmd_data.length > 0">
|
||||
<div
|
||||
v-for="cmd_entry in sorted_cmd_data"
|
||||
:class="`row data ${selected?.name == cmd_entry.name ? 'selected' : ''}`"
|
||||
:key="cmd_entry.name"
|
||||
@click="() => onClick(cmd_entry)"
|
||||
>
|
||||
<span>
|
||||
{{ cmd_entry.name }}
|
||||
</span>
|
||||
</div>
|
||||
</template>
|
||||
<template v-else>
|
||||
<div class="row">
|
||||
<span> No Matches Found </span>
|
||||
</div>
|
||||
</template>
|
||||
</template>
|
||||
|
||||
<style scoped lang="scss">
|
||||
@use '@/assets/variables';
|
||||
div {
|
||||
padding: 0.3em;
|
||||
border: 0;
|
||||
border-bottom: variables.$gray-3 solid 1px;
|
||||
border-top: 0;
|
||||
}
|
||||
|
||||
.data.selected:has(~ .data:hover),
|
||||
.data:hover ~ .data.selected {
|
||||
background-color: variables.$light-background-color;
|
||||
}
|
||||
|
||||
.data.selected,
|
||||
.data:hover {
|
||||
background-color: variables.$light2-background-color;
|
||||
}
|
||||
</style>
|
||||
23
frontend/src/components/CommandParameter.vue
Normal file
23
frontend/src/components/CommandParameter.vue
Normal file
@@ -0,0 +1,23 @@
|
||||
<script setup lang="ts">
|
||||
import type { CommandParameterDefinition } from '@/composables/command.ts';
|
||||
import { type DynamicDataType } from '@/composables/dynamic.ts';
|
||||
import CommandInput from '@/components/CommandInput.vue';
|
||||
|
||||
defineProps<{
|
||||
parameter: CommandParameterDefinition;
|
||||
}>();
|
||||
|
||||
const model = defineModel<DynamicDataType>();
|
||||
</script>
|
||||
|
||||
<template>
|
||||
<div class="row">
|
||||
<label> {{ parameter.name }} </label>
|
||||
<CommandInput
|
||||
:type="parameter.data_type"
|
||||
v-model="model"
|
||||
></CommandInput>
|
||||
</div>
|
||||
</template>
|
||||
|
||||
<style scoped lang="scss"></style>
|
||||
29
frontend/src/components/CommandParameterDataConfigurator.vue
Normal file
29
frontend/src/components/CommandParameterDataConfigurator.vue
Normal file
@@ -0,0 +1,29 @@
|
||||
<script setup lang="ts">
|
||||
import { type CommandParameterData } from '@/composables/dynamic.ts';
|
||||
import { type CommandParameterDefinition } from '@/composables/command.ts';
|
||||
import CommandInput from '@/components/CommandInput.vue';
|
||||
|
||||
defineProps<{
|
||||
param: CommandParameterDefinition;
|
||||
}>();
|
||||
|
||||
const model = defineModel<CommandParameterData>({
|
||||
required: true,
|
||||
});
|
||||
</script>
|
||||
|
||||
<template>
|
||||
<div v-if="model.type == 'constant'" class="row">
|
||||
<label>Value:</label>
|
||||
<CommandInput
|
||||
:type="param.data_type"
|
||||
v-model="model.value"
|
||||
></CommandInput>
|
||||
</div>
|
||||
<div v-if="model.type == 'input'" class="row">
|
||||
<label>ID:</label>
|
||||
<input type="text" v-model="model.id" />
|
||||
</div>
|
||||
</template>
|
||||
|
||||
<style scoped lang="scss"></style>
|
||||
99
frontend/src/components/CommandParameterListConfigurator.vue
Normal file
99
frontend/src/components/CommandParameterListConfigurator.vue
Normal file
@@ -0,0 +1,99 @@
|
||||
<script setup lang="ts">
|
||||
import {
|
||||
type CommandParameterData,
|
||||
type DynamicDataType,
|
||||
isBooleanType,
|
||||
isNumericType,
|
||||
} from '@/composables/dynamic.ts';
|
||||
import { useCommand } from '@/composables/command.ts';
|
||||
import { watch } from 'vue';
|
||||
import FlexDivider from '@/components/FlexDivider.vue';
|
||||
import CommandParameterDataConfigurator from '@/components/CommandParameterDataConfigurator.vue';
|
||||
|
||||
const props = defineProps<{
|
||||
command_name: string;
|
||||
}>();
|
||||
|
||||
const model = defineModel<{ [key: string]: CommandParameterData }>({
|
||||
required: true,
|
||||
});
|
||||
|
||||
const { data: command_info } = useCommand(props.command_name);
|
||||
|
||||
watch([command_info], ([cmd_info]) => {
|
||||
if (cmd_info == null) {
|
||||
return;
|
||||
}
|
||||
const model_value = model.value;
|
||||
for (const key in model_value) {
|
||||
const is_valid_param = cmd_info.parameters.some(
|
||||
(param) => param.name == key,
|
||||
);
|
||||
if (!is_valid_param) {
|
||||
delete model_value[key];
|
||||
}
|
||||
}
|
||||
for (const param of cmd_info.parameters) {
|
||||
let model_param_value: CommandParameterData | undefined =
|
||||
model_value[param.name];
|
||||
if (model_param_value) {
|
||||
switch (model_param_value.type) {
|
||||
case 'constant':
|
||||
if (
|
||||
typeof model_param_value.value == 'number' &&
|
||||
!isNumericType(param.data_type)
|
||||
) {
|
||||
model_param_value = undefined;
|
||||
} else if (
|
||||
typeof model_param_value.value == 'boolean' &&
|
||||
!isBooleanType(param.data_type)
|
||||
) {
|
||||
model_param_value = undefined;
|
||||
}
|
||||
break;
|
||||
case 'input':
|
||||
// Nothing to do
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!model_param_value) {
|
||||
let default_value: DynamicDataType = 0;
|
||||
if (isNumericType(param.data_type)) {
|
||||
default_value = 0;
|
||||
} else if (isBooleanType(param.data_type)) {
|
||||
default_value = false;
|
||||
}
|
||||
model_param_value = {
|
||||
type: 'constant',
|
||||
value: default_value,
|
||||
};
|
||||
}
|
||||
model_value[param.name] = model_param_value;
|
||||
}
|
||||
model.value = model_value;
|
||||
});
|
||||
</script>
|
||||
|
||||
<template>
|
||||
<template v-if="command_info">
|
||||
<template v-for="param in command_info.parameters" :key="param.name">
|
||||
<FlexDivider></FlexDivider>
|
||||
<div class="row">
|
||||
<label>{{ param.name }}</label>
|
||||
<select v-model="model[param.name].type">
|
||||
<option value="constant">Constant</option>
|
||||
<option value="input">Input</option>
|
||||
</select>
|
||||
</div>
|
||||
<CommandParameterDataConfigurator
|
||||
:param="param"
|
||||
v-model="model[param.name]"
|
||||
></CommandParameterDataConfigurator>
|
||||
</template>
|
||||
</template>
|
||||
<template v-else>
|
||||
<span> Loading... </span>
|
||||
</template>
|
||||
</template>
|
||||
|
||||
<style scoped lang="scss"></style>
|
||||
68
frontend/src/components/CommandSender.vue
Normal file
68
frontend/src/components/CommandSender.vue
Normal file
@@ -0,0 +1,68 @@
|
||||
<script setup lang="ts">
|
||||
import type { CommandDefinition } from '@/composables/command.ts';
|
||||
import { ref } from 'vue';
|
||||
import CommandParameter from '@/components/CommandParameter.vue';
|
||||
import FlexDivider from '@/components/FlexDivider.vue';
|
||||
import type { DynamicDataType } from '@/composables/dynamic.ts';
|
||||
|
||||
const props = defineProps<{
|
||||
command: CommandDefinition | null;
|
||||
}>();
|
||||
|
||||
const parameters = ref<{ [key: string]: DynamicDataType }>({});
|
||||
const busy = ref(false);
|
||||
const result = ref('');
|
||||
|
||||
async function sendCommand() {
|
||||
const command = props.command;
|
||||
const params = parameters.value;
|
||||
if (!command) {
|
||||
return;
|
||||
}
|
||||
busy.value = true;
|
||||
result.value = 'Loading...';
|
||||
|
||||
const response = await fetch(`/api/cmd/${command.name}`, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
},
|
||||
body: JSON.stringify(params),
|
||||
});
|
||||
if (response.ok) {
|
||||
result.value = await response.json();
|
||||
} else {
|
||||
result.value = await response.text();
|
||||
}
|
||||
busy.value = false;
|
||||
}
|
||||
</script>
|
||||
|
||||
<template>
|
||||
<div class="row" v-if="!command">
|
||||
<span> No Command Selected </span>
|
||||
</div>
|
||||
<template v-else>
|
||||
<div class="row">
|
||||
<span>
|
||||
{{ command.name }}
|
||||
</span>
|
||||
</div>
|
||||
<FlexDivider></FlexDivider>
|
||||
<CommandParameter
|
||||
v-for="param in command.parameters"
|
||||
:key="param.name"
|
||||
:parameter="param"
|
||||
v-model="parameters[param.name]"
|
||||
></CommandParameter>
|
||||
<div class="row">
|
||||
<button :disabled="busy" @click.stop.prevent="sendCommand">
|
||||
Send
|
||||
</button>
|
||||
</div>
|
||||
<div class="row shrink grow"></div>
|
||||
<div class="row">{{ result }}</div>
|
||||
</template>
|
||||
</template>
|
||||
|
||||
<style scoped lang="scss"></style>
|
||||
@@ -1,6 +1,12 @@
|
||||
<script setup lang="ts">
|
||||
import type { OptionalDynamicComponentData } from '@/composables/dynamic.ts';
|
||||
import { computed, defineAsyncComponent } from 'vue';
|
||||
import {
|
||||
AnyTypes,
|
||||
type CommandParameterData,
|
||||
type DynamicDataType,
|
||||
type OptionalDynamicComponentData,
|
||||
} from '@/composables/dynamic.ts';
|
||||
import { computed, defineAsyncComponent, inject, type Ref, ref } from 'vue';
|
||||
import CommandParameterListConfigurator from '@/components/CommandParameterListConfigurator.vue';
|
||||
|
||||
const TelemetryValue = defineAsyncComponent(
|
||||
() => import('@/components/TelemetryValue.vue'),
|
||||
@@ -8,6 +14,9 @@ const TelemetryValue = defineAsyncComponent(
|
||||
const GridLayout = defineAsyncComponent(
|
||||
() => import('@/components/layout/GridLayout.vue'),
|
||||
);
|
||||
const CommandInput = defineAsyncComponent(
|
||||
() => import('@/components/CommandInput.vue'),
|
||||
);
|
||||
|
||||
const model = defineModel<OptionalDynamicComponentData>('data', {
|
||||
required: true,
|
||||
@@ -19,13 +28,26 @@ const props = defineProps<{
|
||||
editable: boolean;
|
||||
}>();
|
||||
|
||||
const busy = ref(false);
|
||||
|
||||
// Provide a fallback option
|
||||
const inputs = inject<Ref<{ [id: string]: DynamicDataType }>>(
|
||||
'inputs',
|
||||
ref({}),
|
||||
);
|
||||
|
||||
const thisSymbol = Symbol();
|
||||
|
||||
const isSelected = computed(() => {
|
||||
return selection.value == thisSymbol && props.editable;
|
||||
});
|
||||
|
||||
function selectThis() {
|
||||
function selectThis(e: Event) {
|
||||
if (props.editable) {
|
||||
// Only do this when we are editable
|
||||
e.stopPropagation();
|
||||
e.preventDefault();
|
||||
}
|
||||
selection.value = thisSymbol;
|
||||
}
|
||||
|
||||
@@ -100,6 +122,54 @@ function deleteColumn() {
|
||||
model.value = grid;
|
||||
}
|
||||
}
|
||||
|
||||
function makeInput() {
|
||||
model.value = {
|
||||
type: 'input',
|
||||
id: [...Array(32)]
|
||||
.map(() => Math.floor(Math.random() * 16).toString(16))
|
||||
.join(''),
|
||||
data_type: 'Float32',
|
||||
};
|
||||
}
|
||||
|
||||
function makeCommandButton() {
|
||||
model.value = {
|
||||
type: 'command_button',
|
||||
text: 'Button Text',
|
||||
command_name: '',
|
||||
parameters: {},
|
||||
};
|
||||
}
|
||||
|
||||
async function sendCommand(command: {
|
||||
command_name: string;
|
||||
parameters: { [key: string]: CommandParameterData };
|
||||
}) {
|
||||
busy.value = true;
|
||||
|
||||
const params: { [key: string]: DynamicDataType } = {};
|
||||
for (const param_name in command.parameters) {
|
||||
const parameter = command.parameters[param_name];
|
||||
switch (parameter.type) {
|
||||
case 'constant':
|
||||
params[param_name] = parameter.value;
|
||||
break;
|
||||
case 'input':
|
||||
params[param_name] = inputs.value[parameter.id];
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
await fetch(`/api/cmd/${command.command_name}`, {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
},
|
||||
body: JSON.stringify(params),
|
||||
});
|
||||
busy.value = false;
|
||||
}
|
||||
</script>
|
||||
|
||||
<template>
|
||||
@@ -123,20 +193,30 @@ function deleteColumn() {
|
||||
<button v-if="model.type != 'grid'" @click.stop.prevent="makeGrid">
|
||||
Make Grid
|
||||
</button>
|
||||
<button
|
||||
v-if="model.type != 'input'"
|
||||
@click.stop.prevent="makeInput"
|
||||
>
|
||||
Make Input
|
||||
</button>
|
||||
<button
|
||||
v-if="model.type != 'command_button'"
|
||||
@click.stop.prevent="makeCommandButton"
|
||||
>
|
||||
Make Command Button
|
||||
</button>
|
||||
</div>
|
||||
</Teleport>
|
||||
<template v-if="model.type == 'none'">
|
||||
<!-- Intentionally Left Empty -->
|
||||
<span
|
||||
v-if="editable"
|
||||
:class="`${editable ? 'editable' : ''} ${isSelected ? 'selected' : ''}`"
|
||||
@click.stop.prevent="selectThis"
|
||||
@click="selectThis"
|
||||
></span>
|
||||
</template>
|
||||
<template v-else-if="model.type == 'text'">
|
||||
<span
|
||||
:class="`${model.justify_right ? 'justify-right' : ''} ${editable ? 'editable' : ''} ${isSelected ? 'selected' : ''}`"
|
||||
@click.stop.prevent="selectThis"
|
||||
@click="selectThis"
|
||||
>
|
||||
{{ model.text }}
|
||||
</span>
|
||||
@@ -155,7 +235,7 @@ function deleteColumn() {
|
||||
<span
|
||||
v-if="editable"
|
||||
:class="`${editable ? 'editable' : ''} ${isSelected ? 'selected' : ''}`"
|
||||
@click.stop.prevent="selectThis"
|
||||
@click="selectThis"
|
||||
>
|
||||
{{ '{' }} {{ model.data }} {{ '}' }}
|
||||
</span>
|
||||
@@ -163,7 +243,7 @@ function deleteColumn() {
|
||||
v-else
|
||||
:class="`${editable ? 'editable' : ''} ${isSelected ? 'selected' : ''}`"
|
||||
:data="model.data"
|
||||
@click.stop.prevent="selectThis"
|
||||
@click="selectThis"
|
||||
></TelemetryValue>
|
||||
<Teleport v-if="isSelected" to="#inspector">
|
||||
<label>Telemetry Item: </label>
|
||||
@@ -175,7 +255,7 @@ function deleteColumn() {
|
||||
:class="`${editable ? 'editable' : ''} ${isSelected ? 'selected' : ''}`"
|
||||
:cols="model.columns"
|
||||
:equal_col_width="model.equal_width"
|
||||
@click.stop.prevent="selectThis"
|
||||
@click="selectThis"
|
||||
>
|
||||
<template v-for="x in model.cells.length" :key="x">
|
||||
<template v-for="y in model.columns" :key="y">
|
||||
@@ -210,6 +290,60 @@ function deleteColumn() {
|
||||
</div>
|
||||
</Teleport>
|
||||
</template>
|
||||
<template v-else-if="model.type == 'input'">
|
||||
<span
|
||||
v-if="editable"
|
||||
:class="`${editable ? 'editable' : ''} ${isSelected ? 'selected' : ''}`"
|
||||
@click="selectThis"
|
||||
>
|
||||
{{ '[' }} {{ model.id }} {{ ']' }}
|
||||
</span>
|
||||
<CommandInput
|
||||
v-else
|
||||
:type="model.data_type"
|
||||
v-model="inputs[model.id]"
|
||||
></CommandInput>
|
||||
<Teleport v-if="isSelected" to="#inspector">
|
||||
<div class="row">
|
||||
<label>Input ID: </label>
|
||||
<input v-model="model.id" />
|
||||
</div>
|
||||
<div class="row">
|
||||
<label>Data Type: </label>
|
||||
<select v-model="model.data_type">
|
||||
<option v-for="type in AnyTypes" :key="type" :value="type">
|
||||
{{ type }}
|
||||
</option>
|
||||
</select>
|
||||
</div>
|
||||
</Teleport>
|
||||
</template>
|
||||
<template v-else-if="model.type == 'command_button'">
|
||||
<button
|
||||
:disabled="busy"
|
||||
:class="`${editable ? 'editable' : ''} ${isSelected ? 'selected' : ''}`"
|
||||
@click.stop.prevent="
|
||||
(e) => (editable ? selectThis(e) : sendCommand(model as any))
|
||||
"
|
||||
>
|
||||
{{ model.text }}
|
||||
</button>
|
||||
<Teleport v-if="isSelected" to="#inspector">
|
||||
<div class="row">
|
||||
<label>Button Text: </label>
|
||||
<input v-model="model.text" />
|
||||
</div>
|
||||
<div class="row">
|
||||
<label>Command: </label>
|
||||
<input v-model="model.command_name" />
|
||||
</div>
|
||||
<CommandParameterListConfigurator
|
||||
:key="model.command_name"
|
||||
:command_name="model.command_name"
|
||||
v-model="model.parameters"
|
||||
></CommandParameterListConfigurator>
|
||||
</Teleport>
|
||||
</template>
|
||||
<template v-else> ERROR: Unknown data: {{ model }} </template>
|
||||
</template>
|
||||
|
||||
|
||||
@@ -23,6 +23,11 @@ import { AXIS_DATA, type AxisData } from '@/graph/axis';
|
||||
import ValueLabel from '@/components/ValueLabel.vue';
|
||||
import { type Point, PointLine } from '@/graph/line';
|
||||
import TooltipDialog from '@/components/TooltipDialog.vue';
|
||||
import {
|
||||
type DynamicDataType,
|
||||
isBooleanType,
|
||||
isNumericType,
|
||||
} from '@/composables/dynamic.ts';
|
||||
|
||||
const props = defineProps<{
|
||||
data: string;
|
||||
@@ -98,13 +103,9 @@ watch([value], ([val]) => {
|
||||
if (val_t >= min_x) {
|
||||
const raw_item_val = val.value[telemetry_data.value!.data_type];
|
||||
let item_val = 0;
|
||||
if (
|
||||
['Float32', 'Float64'].some(
|
||||
(e) => e == telemetry_data.value!.data_type,
|
||||
)
|
||||
) {
|
||||
if (isNumericType(telemetry_data.value!.data_type)) {
|
||||
item_val = raw_item_val as number;
|
||||
} else if (telemetry_data.value!.data_type == 'Boolean') {
|
||||
} else if (isBooleanType(telemetry_data.value!.data_type)) {
|
||||
item_val = (raw_item_val as boolean) ? 1 : 0;
|
||||
}
|
||||
const new_item = {
|
||||
@@ -140,15 +141,13 @@ watch(
|
||||
const response = (await res.json()) as TelemetryDataItem[];
|
||||
for (const data_item of response) {
|
||||
const val_t = Date.parse(data_item.timestamp);
|
||||
const raw_item_val = data_item.value[type];
|
||||
const raw_item_val = data_item.value[
|
||||
type
|
||||
] as DynamicDataType;
|
||||
let item_val = 0;
|
||||
if (
|
||||
['Float32', 'Float64'].some(
|
||||
(e) => e == telemetry_data.value!.data_type,
|
||||
)
|
||||
) {
|
||||
if (isNumericType(type)) {
|
||||
item_val = raw_item_val as number;
|
||||
} else if (type == 'Boolean') {
|
||||
} else if (isBooleanType(type)) {
|
||||
item_val = (raw_item_val as boolean) ? 1 : 0;
|
||||
}
|
||||
const new_item = {
|
||||
|
||||
53
frontend/src/composables/command.ts
Normal file
53
frontend/src/composables/command.ts
Normal file
@@ -0,0 +1,53 @@
|
||||
import { ref, toValue, watchEffect } from 'vue';
|
||||
import { type MaybeRefOrGetter } from 'vue';
|
||||
import type { AnyTypeId } from '@/composables/dynamic.ts';
|
||||
|
||||
export interface CommandParameterDefinition {
|
||||
name: string;
|
||||
data_type: AnyTypeId;
|
||||
}
|
||||
|
||||
export interface CommandDefinition {
|
||||
name: string;
|
||||
parameters: CommandParameterDefinition[];
|
||||
}
|
||||
|
||||
export function useAllCommands() {
|
||||
const data = ref<CommandDefinition[] | null>(null);
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
const error = ref<any | null>(null);
|
||||
|
||||
watchEffect(async () => {
|
||||
try {
|
||||
const res = await fetch(`/api/cmd`);
|
||||
data.value = await res.json();
|
||||
error.value = null;
|
||||
} catch (e) {
|
||||
data.value = null;
|
||||
error.value = e;
|
||||
}
|
||||
});
|
||||
|
||||
return { data, error };
|
||||
}
|
||||
|
||||
export function useCommand(name: MaybeRefOrGetter<string>) {
|
||||
const data = ref<CommandDefinition | null>(null);
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
const error = ref<any | null>(null);
|
||||
|
||||
watchEffect(async () => {
|
||||
const name_value = toValue(name);
|
||||
|
||||
try {
|
||||
const res = await fetch(`/api/cmd/${name_value}`);
|
||||
data.value = await res.json();
|
||||
error.value = null;
|
||||
} catch (e) {
|
||||
data.value = null;
|
||||
error.value = e;
|
||||
}
|
||||
});
|
||||
|
||||
return { data, error };
|
||||
}
|
||||
@@ -1,3 +1,29 @@
|
||||
export const NumericTypes = ['Float32', 'Float64'] as const;
|
||||
export type NumericTypeId = (typeof NumericTypes)[number];
|
||||
export const BooleanTypes = ['Boolean'] as const;
|
||||
export type BooleanTypeId = (typeof BooleanTypes)[number];
|
||||
export const AnyTypes = [...NumericTypes, ...BooleanTypes] as const;
|
||||
export type AnyTypeId = (typeof AnyTypes)[number];
|
||||
|
||||
export function isNumericType(type: AnyTypeId): type is NumericTypeId {
|
||||
return NumericTypes.some((it) => it == type);
|
||||
}
|
||||
export function isBooleanType(type: AnyTypeId): type is BooleanTypeId {
|
||||
return BooleanTypes.some((it) => it == type);
|
||||
}
|
||||
|
||||
export type DynamicDataType = number | boolean;
|
||||
|
||||
export type CommandParameterData =
|
||||
| {
|
||||
type: 'constant';
|
||||
value: DynamicDataType;
|
||||
}
|
||||
| {
|
||||
type: 'input';
|
||||
id: string;
|
||||
};
|
||||
|
||||
export type DynamicComponentData =
|
||||
| { type: 'text'; text: string; justify_right: boolean }
|
||||
| { type: 'telemetry'; data: string }
|
||||
@@ -6,6 +32,17 @@ export type DynamicComponentData =
|
||||
columns: number;
|
||||
equal_width: boolean;
|
||||
cells: OptionalDynamicComponentData[][];
|
||||
}
|
||||
| {
|
||||
type: 'input';
|
||||
id: string;
|
||||
data_type: AnyTypeId;
|
||||
}
|
||||
| {
|
||||
type: 'command_button';
|
||||
text: string;
|
||||
command_name: string;
|
||||
parameters: { [key: string]: CommandParameterData };
|
||||
};
|
||||
|
||||
export type OptionalDynamicComponentData =
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
import { ref, toValue, watchEffect } from 'vue';
|
||||
import { type MaybeRefOrGetter } from 'vue';
|
||||
import type { AnyTypeId } from '@/composables/dynamic.ts';
|
||||
|
||||
export interface TelemetryDefinition {
|
||||
uuid: string;
|
||||
name: string;
|
||||
data_type: string;
|
||||
data_type: AnyTypeId;
|
||||
}
|
||||
|
||||
export function useAllTelemetry() {
|
||||
|
||||
@@ -34,7 +34,12 @@ export function usePanelHeirarchy(): Ref<PanelHeirarchyChildren> {
|
||||
},
|
||||
{
|
||||
name: 'Telemetry Elements',
|
||||
to: { name: 'list' },
|
||||
to: { name: 'tlm' },
|
||||
type: PanelHeirarchyType.LEAF,
|
||||
},
|
||||
{
|
||||
name: 'Commands',
|
||||
to: { name: 'cmd' },
|
||||
type: PanelHeirarchyType.LEAF,
|
||||
},
|
||||
{
|
||||
|
||||
@@ -14,10 +14,15 @@ const router = createRouter({
|
||||
component: () => import('../views/GraphView.vue'),
|
||||
},
|
||||
{
|
||||
path: '/list',
|
||||
name: 'list',
|
||||
path: '/tlm',
|
||||
name: 'tlm',
|
||||
component: () => import('../views/TelemetryListView.vue'),
|
||||
},
|
||||
{
|
||||
path: '/cmd',
|
||||
name: 'cmd',
|
||||
component: () => import('../views/CommandListView.vue'),
|
||||
},
|
||||
{
|
||||
path: '/chart',
|
||||
name: 'chart',
|
||||
|
||||
61
frontend/src/views/CommandListView.vue
Normal file
61
frontend/src/views/CommandListView.vue
Normal file
@@ -0,0 +1,61 @@
|
||||
<script setup lang="ts">
|
||||
import TextInput from '@/components/TextInput.vue';
|
||||
import { ref } from 'vue';
|
||||
import FlexDivider from '@/components/FlexDivider.vue';
|
||||
import ScreenLayout from '@/components/layout/ScreenLayout.vue';
|
||||
import { Direction } from '@/composables/Direction.ts';
|
||||
import { ScreenType } from '@/composables/ScreenType.ts';
|
||||
import LinearLayout from '@/components/layout/LinearLayout.vue';
|
||||
import CommandList from '@/components/CommandList.vue';
|
||||
import type { CommandDefinition } from '@/composables/command.ts';
|
||||
import CommandSender from '@/components/CommandSender.vue';
|
||||
|
||||
const searchValue = ref('');
|
||||
|
||||
const selected = ref<CommandDefinition | null>(null);
|
||||
</script>
|
||||
|
||||
<template>
|
||||
<ScreenLayout :type="ScreenType.Standard" limit>
|
||||
<LinearLayout
|
||||
:direction="Direction.Row"
|
||||
stretch
|
||||
class="grow no-min-height no-basis"
|
||||
>
|
||||
<div class="column grow2 stretch no-min-height no-basis">
|
||||
<div class="row">
|
||||
<TextInput
|
||||
autofocus
|
||||
class="grow"
|
||||
v-model="searchValue"
|
||||
placeholder="Search"
|
||||
></TextInput>
|
||||
</div>
|
||||
|
||||
<div class="row scroll no-min-height">
|
||||
<div class="column grow stretch">
|
||||
<CommandList
|
||||
:search="searchValue"
|
||||
v-model="selected"
|
||||
></CommandList>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
<FlexDivider></FlexDivider>
|
||||
<div class="column grow stretch no-basis command-sender">
|
||||
<CommandSender
|
||||
:command="selected"
|
||||
:key="selected?.name || ''"
|
||||
></CommandSender>
|
||||
</div>
|
||||
</LinearLayout>
|
||||
</ScreenLayout>
|
||||
</template>
|
||||
|
||||
<style lang="scss">
|
||||
@use '@/assets/variables';
|
||||
|
||||
.command-sender {
|
||||
row-gap: 4px;
|
||||
}
|
||||
</style>
|
||||
@@ -1,7 +1,10 @@
|
||||
<script setup lang="ts">
|
||||
import DynamicComponent from '@/components/DynamicComponent.vue';
|
||||
import type { OptionalDynamicComponentData } from '@/composables/dynamic.ts';
|
||||
import { computed, ref, watchEffect } from 'vue';
|
||||
import type {
|
||||
DynamicDataType,
|
||||
OptionalDynamicComponentData,
|
||||
} from '@/composables/dynamic.ts';
|
||||
import { computed, provide, ref, watchEffect } from 'vue';
|
||||
import { useRoute } from 'vue-router';
|
||||
|
||||
const route = useRoute();
|
||||
@@ -12,6 +15,10 @@ const panel = ref<OptionalDynamicComponentData>({
|
||||
type: 'none',
|
||||
});
|
||||
|
||||
const inputs = ref<{ [id: string]: DynamicDataType }>({});
|
||||
|
||||
provide('inputs', inputs);
|
||||
|
||||
watchEffect(async () => {
|
||||
const panel_data = await fetch(`/api/panel/${id.value}`);
|
||||
const panel_json_value = await panel_data.json();
|
||||
|
||||
@@ -6,7 +6,6 @@ import type { TelemetryDefinition } from '@/composables/telemetry';
|
||||
import TelemetryInfo from '@/components/TelemetryInfo.vue';
|
||||
import FlexDivider from '@/components/FlexDivider.vue';
|
||||
import ScreenLayout from '@/components/layout/ScreenLayout.vue';
|
||||
import { Direction } from '@/composables/Direction.ts';
|
||||
import { ScreenType } from '@/composables/ScreenType.ts';
|
||||
|
||||
const searchValue = ref('');
|
||||
@@ -16,36 +15,38 @@ const mousedover = ref<TelemetryDefinition | null>(null);
|
||||
</script>
|
||||
|
||||
<template>
|
||||
<ScreenLayout :direction="Direction.Row" :type="ScreenType.Standard" limit>
|
||||
<div class="column grow2 stretch no-min-height no-basis">
|
||||
<div class="row">
|
||||
<TextInput
|
||||
autofocus
|
||||
class="grow"
|
||||
v-model="searchValue"
|
||||
placeholder="Search"
|
||||
></TextInput>
|
||||
</div>
|
||||
<ScreenLayout :type="ScreenType.Standard" limit>
|
||||
<div class="row grow stretch no-min-height no-basis">
|
||||
<div class="column grow2 stretch no-min-height no-basis">
|
||||
<div class="row">
|
||||
<TextInput
|
||||
autofocus
|
||||
class="grow"
|
||||
v-model="searchValue"
|
||||
placeholder="Search"
|
||||
></TextInput>
|
||||
</div>
|
||||
|
||||
<div class="row scroll no-min-height">
|
||||
<div class="column grow stretch">
|
||||
<TelemetryList
|
||||
:search="searchValue"
|
||||
v-model="selected"
|
||||
@mouseover="
|
||||
(mousedover_value) =>
|
||||
(mousedover = mousedover_value)
|
||||
"
|
||||
></TelemetryList>
|
||||
<div class="row scroll no-min-height">
|
||||
<div class="column grow stretch">
|
||||
<TelemetryList
|
||||
:search="searchValue"
|
||||
v-model="selected"
|
||||
@mouseover="
|
||||
(mousedover_value) =>
|
||||
(mousedover = mousedover_value)
|
||||
"
|
||||
></TelemetryList>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
<FlexDivider></FlexDivider>
|
||||
<div class="column grow stretch no-basis">
|
||||
<TelemetryInfo
|
||||
:mouseover="mousedover"
|
||||
:selection="selected"
|
||||
></TelemetryInfo>
|
||||
<FlexDivider></FlexDivider>
|
||||
<div class="column grow stretch no-basis">
|
||||
<TelemetryInfo
|
||||
:mouseover="mousedover"
|
||||
:selection="selected"
|
||||
></TelemetryInfo>
|
||||
</div>
|
||||
</div>
|
||||
</ScreenLayout>
|
||||
</template>
|
||||
|
||||
@@ -48,3 +48,36 @@ service TelemetryService {
|
||||
rpc NewTelemetry (TelemetryDefinitionRequest) returns (TelemetryDefinitionResponse);
|
||||
rpc InsertTelemetry (stream TelemetryItem) returns (stream TelemetryInsertResponse);
|
||||
}
|
||||
|
||||
message CommandParameterDefinition {
|
||||
string name = 1;
|
||||
TelemetryDataType data_type = 2;
|
||||
}
|
||||
|
||||
message CommandDefinitionRequest {
|
||||
string name = 1;
|
||||
repeated CommandParameterDefinition parameters = 2;
|
||||
}
|
||||
|
||||
message Command {
|
||||
UUID uuid = 1;
|
||||
Timestamp timestamp = 2;
|
||||
map<string, TelemetryValue> parameters = 3;
|
||||
}
|
||||
|
||||
message CommandResponse {
|
||||
UUID uuid = 1;
|
||||
bool success = 2;
|
||||
string response = 3;
|
||||
}
|
||||
|
||||
message ClientSideCommand {
|
||||
oneof inner {
|
||||
CommandDefinitionRequest request = 1;
|
||||
CommandResponse response = 2;
|
||||
}
|
||||
}
|
||||
|
||||
service CommandService {
|
||||
rpc NewCommand (stream ClientSideCommand) returns (stream Command);
|
||||
}
|
||||
|
||||
36
server/src/command/definition.rs
Normal file
36
server/src/command/definition.rs
Normal file
@@ -0,0 +1,36 @@
|
||||
use crate::command::service::RegisteredCommand;
|
||||
use crate::core::TelemetryDataType;
|
||||
use crate::telemetry::data_type::tlm_data_type_deserializer;
|
||||
use crate::telemetry::data_type::tlm_data_type_serializer;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct CommandParameterDefinition {
|
||||
pub name: String,
|
||||
#[serde(serialize_with = "tlm_data_type_serializer")]
|
||||
#[serde(deserialize_with = "tlm_data_type_deserializer")]
|
||||
pub data_type: TelemetryDataType,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct CommandDefinition {
|
||||
pub name: String,
|
||||
pub parameters: Vec<CommandParameterDefinition>,
|
||||
}
|
||||
|
||||
impl From<RegisteredCommand> for CommandDefinition {
|
||||
fn from(value: RegisteredCommand) -> Self {
|
||||
Self {
|
||||
name: value.name,
|
||||
parameters: value
|
||||
.definition
|
||||
.parameters
|
||||
.into_iter()
|
||||
.map(|param| CommandParameterDefinition {
|
||||
data_type: param.data_type(),
|
||||
name: param.name,
|
||||
})
|
||||
.collect(),
|
||||
}
|
||||
}
|
||||
}
|
||||
42
server/src/command/error.rs
Normal file
42
server/src/command/error.rs
Normal file
@@ -0,0 +1,42 @@
|
||||
use crate::core::TelemetryDataType;
|
||||
use actix_web::http::StatusCode;
|
||||
use actix_web::ResponseError;
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum Error {
|
||||
#[error("Command Not Found {0}")]
|
||||
CommandNotFound(String),
|
||||
#[error("Incorrect Number of Parameters Specified. {expected} expected. {actual} found.")]
|
||||
IncorrectParameterCount { expected: usize, actual: usize },
|
||||
#[error("Missing Parameter {0}.")]
|
||||
MisingParameter(String),
|
||||
#[error("Incorrect Parameter Type for {name}. {expected_type:?} expected.")]
|
||||
WrongParameterType {
|
||||
name: String,
|
||||
expected_type: TelemetryDataType,
|
||||
},
|
||||
#[error("No Command Receiver")]
|
||||
NoCommandReceiver,
|
||||
#[error("Failed to Send")]
|
||||
FailedToSend,
|
||||
#[error("Failed to Receive Command Response")]
|
||||
FailedToReceiveResponse,
|
||||
#[error("Command Failure: {0}")]
|
||||
CommandFailure(String),
|
||||
}
|
||||
|
||||
impl ResponseError for Error {
|
||||
fn status_code(&self) -> StatusCode {
|
||||
match *self {
|
||||
Error::CommandNotFound(_) => StatusCode::NOT_FOUND,
|
||||
Error::IncorrectParameterCount { .. } => StatusCode::BAD_REQUEST,
|
||||
Error::MisingParameter(_) => StatusCode::BAD_REQUEST,
|
||||
Error::WrongParameterType { .. } => StatusCode::BAD_REQUEST,
|
||||
Error::NoCommandReceiver => StatusCode::SERVICE_UNAVAILABLE,
|
||||
Error::FailedToSend => StatusCode::INTERNAL_SERVER_ERROR,
|
||||
Error::FailedToReceiveResponse => StatusCode::INTERNAL_SERVER_ERROR,
|
||||
Error::CommandFailure(_) => StatusCode::BAD_REQUEST,
|
||||
}
|
||||
}
|
||||
}
|
||||
3
server/src/command/mod.rs
Normal file
3
server/src/command/mod.rs
Normal file
@@ -0,0 +1,3 @@
|
||||
mod definition;
|
||||
pub mod error;
|
||||
pub mod service;
|
||||
165
server/src/command/service.rs
Normal file
165
server/src/command/service.rs
Normal file
@@ -0,0 +1,165 @@
|
||||
use crate::command::definition::CommandDefinition;
|
||||
use crate::command::error::Error as CmdError;
|
||||
use crate::command::error::Error::{
|
||||
CommandFailure, CommandNotFound, FailedToReceiveResponse, FailedToSend,
|
||||
IncorrectParameterCount, MisingParameter, NoCommandReceiver, WrongParameterType,
|
||||
};
|
||||
use crate::core::telemetry_value::Value;
|
||||
use crate::core::{
|
||||
Command, CommandDefinitionRequest, CommandResponse, TelemetryDataType, TelemetryValue,
|
||||
Timestamp, Uuid,
|
||||
};
|
||||
use chrono::{DateTime, Utc};
|
||||
use log::error;
|
||||
use papaya::HashMap;
|
||||
use std::collections::HashMap as StdHashMap;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::oneshot;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub(super) struct RegisteredCommand {
|
||||
pub(super) name: String,
|
||||
pub(super) definition: CommandDefinitionRequest,
|
||||
tx: mpsc::Sender<Option<(Command, oneshot::Sender<CommandResponse>)>>,
|
||||
}
|
||||
|
||||
pub struct CommandManagementService {
|
||||
registered_commands: HashMap<String, RegisteredCommand>,
|
||||
}
|
||||
|
||||
impl CommandManagementService {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
registered_commands: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_commands(&self) -> anyhow::Result<Vec<CommandDefinition>> {
|
||||
let mut result = vec![];
|
||||
|
||||
let registered_commands = self.registered_commands.pin();
|
||||
for registration in registered_commands.values() {
|
||||
result.push(registration.clone().into());
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
pub fn get_command_definition(&self, name: &String) -> Option<CommandDefinition> {
|
||||
self.registered_commands
|
||||
.pin()
|
||||
.get(name)
|
||||
.map(|registration| registration.clone().into())
|
||||
}
|
||||
|
||||
pub async fn register_command(
|
||||
&self,
|
||||
command: CommandDefinitionRequest,
|
||||
) -> anyhow::Result<mpsc::Receiver<Option<(Command, oneshot::Sender<CommandResponse>)>>> {
|
||||
let (tx, rx) = mpsc::channel(1);
|
||||
|
||||
let registered_commands = self.registered_commands.pin_owned();
|
||||
if let Some(previous) = registered_commands.insert(
|
||||
command.name.clone(),
|
||||
RegisteredCommand {
|
||||
name: command.name.clone(),
|
||||
definition: command,
|
||||
tx,
|
||||
},
|
||||
) {
|
||||
// If the receiver was already closed, we don't care (ignore error)
|
||||
let _ = previous.tx.send(None).await;
|
||||
}
|
||||
|
||||
Ok(rx)
|
||||
}
|
||||
|
||||
pub async fn send_command(
|
||||
&self,
|
||||
name: impl Into<String>,
|
||||
parameters: serde_json::Map<String, serde_json::Value>,
|
||||
) -> Result<String, CmdError> {
|
||||
let timestamp = Utc::now();
|
||||
let offset_from_unix_epoch =
|
||||
timestamp - DateTime::from_timestamp(0, 0).expect("Could not get Unix epoch");
|
||||
|
||||
let name = name.into();
|
||||
let registered_commands = self.registered_commands.pin();
|
||||
let Some(registration) = registered_commands.get(&name) else {
|
||||
return Err(CommandNotFound(name));
|
||||
};
|
||||
|
||||
if parameters.len() != registration.definition.parameters.len() {
|
||||
return Err(IncorrectParameterCount {
|
||||
expected: registration.definition.parameters.len(),
|
||||
actual: parameters.len(),
|
||||
});
|
||||
}
|
||||
let mut result_parameters = StdHashMap::new();
|
||||
for parameter in ®istration.definition.parameters {
|
||||
let Some(param_value) = parameters.get(¶meter.name) else {
|
||||
return Err(MisingParameter(parameter.name.clone()));
|
||||
};
|
||||
let Some(param_value) = (match parameter.data_type() {
|
||||
TelemetryDataType::Float32 => {
|
||||
param_value.as_f64().map(|v| Value::Float32(v as f32))
|
||||
}
|
||||
TelemetryDataType::Float64 => param_value.as_f64().map(Value::Float64),
|
||||
TelemetryDataType::Boolean => param_value.as_bool().map(Value::Boolean),
|
||||
}) else {
|
||||
return Err(WrongParameterType {
|
||||
name: parameter.name.clone(),
|
||||
expected_type: parameter.data_type(),
|
||||
});
|
||||
};
|
||||
result_parameters.insert(
|
||||
parameter.name.clone(),
|
||||
TelemetryValue {
|
||||
value: Some(param_value),
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
// Clone & Drop lets us use a standard pin instead of an owned pin
|
||||
let tx = registration.tx.clone();
|
||||
drop(registered_commands);
|
||||
|
||||
if tx.is_closed() {
|
||||
return Err(NoCommandReceiver);
|
||||
}
|
||||
|
||||
let uuid = Uuid::random();
|
||||
let (response_tx, response_rx) = oneshot::channel();
|
||||
if let Err(e) = tx
|
||||
.send(Some((
|
||||
Command {
|
||||
uuid: Some(uuid),
|
||||
timestamp: Some(Timestamp {
|
||||
secs: offset_from_unix_epoch.num_seconds(),
|
||||
nanos: offset_from_unix_epoch.subsec_nanos(),
|
||||
}),
|
||||
parameters: result_parameters,
|
||||
},
|
||||
response_tx,
|
||||
)))
|
||||
.await
|
||||
{
|
||||
error!("Failed to Send Command: {e}");
|
||||
return Err(FailedToSend);
|
||||
}
|
||||
|
||||
response_rx
|
||||
.await
|
||||
.map_err(|e| {
|
||||
error!("Failed to Receive Command Response: {e}");
|
||||
FailedToReceiveResponse
|
||||
})
|
||||
.and_then(|response| {
|
||||
if response.success {
|
||||
Ok(response.response)
|
||||
} else {
|
||||
Err(CommandFailure(response.response))
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
150
server/src/grpc/cmd.rs
Normal file
150
server/src/grpc/cmd.rs
Normal file
@@ -0,0 +1,150 @@
|
||||
use crate::command::service::CommandManagementService;
|
||||
use crate::core::client_side_command::Inner;
|
||||
use crate::core::command_service_server::CommandService;
|
||||
use crate::core::{ClientSideCommand, Command, CommandResponse, Uuid};
|
||||
use log::{error, trace};
|
||||
use std::collections::HashMap;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use tokio::select;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tonic::codegen::tokio_stream::wrappers::ReceiverStream;
|
||||
use tonic::codegen::tokio_stream::{Stream, StreamExt};
|
||||
use tonic::{Request, Response, Status, Streaming};
|
||||
|
||||
pub struct CoreCommandService {
|
||||
pub command_service: Arc<CommandManagementService>,
|
||||
pub cancellation_token: CancellationToken,
|
||||
}
|
||||
|
||||
#[tonic::async_trait]
|
||||
impl CommandService for CoreCommandService {
|
||||
type NewCommandStream = Pin<Box<dyn Stream<Item = Result<Command, Status>> + Send>>;
|
||||
|
||||
async fn new_command(
|
||||
&self,
|
||||
request: Request<Streaming<ClientSideCommand>>,
|
||||
) -> Result<Response<Self::NewCommandStream>, Status> {
|
||||
trace!("CoreCommandService::new_command");
|
||||
|
||||
let cancel_token = self.cancellation_token.clone();
|
||||
let mut in_stream = request.into_inner();
|
||||
|
||||
let cmd_request = select! {
|
||||
_ = cancel_token.cancelled() => return Err(Status::internal("Shutting Down")),
|
||||
Some(message) = in_stream.next() => {
|
||||
match message {
|
||||
Ok(ClientSideCommand {
|
||||
inner: Some(Inner::Request(cmd_request))
|
||||
}) => cmd_request,
|
||||
Err(err) => {
|
||||
error!("Error in Stream: {err}");
|
||||
return Err(Status::cancelled("Error in Stream"));
|
||||
},
|
||||
_ => {
|
||||
return Err(Status::invalid_argument("First message must be request"));
|
||||
},
|
||||
}
|
||||
},
|
||||
else => return Err(Status::internal("Shutting Down")),
|
||||
};
|
||||
|
||||
let mut cmd_rx = match self.command_service.register_command(cmd_request).await {
|
||||
Ok(rx) => rx,
|
||||
Err(e) => {
|
||||
error!("Failed to register command: {e}");
|
||||
return Err(Status::internal("Failed to register command"));
|
||||
}
|
||||
};
|
||||
|
||||
let (tx, rx) = mpsc::channel(128);
|
||||
|
||||
tokio::spawn(async move {
|
||||
let mut result = Status::resource_exhausted("End of Command Stream");
|
||||
let mut in_progress = HashMap::<String, oneshot::Sender<CommandResponse>>::new();
|
||||
loop {
|
||||
select! {
|
||||
_ = cancel_token.cancelled() => break,
|
||||
_ = tx.closed() => break,
|
||||
Some(message) = cmd_rx.recv() => {
|
||||
match message {
|
||||
None => break,
|
||||
Some(message) => {
|
||||
let key = message.0.uuid.clone().unwrap().value;
|
||||
in_progress.insert(key.clone(), message.1);
|
||||
match tx.send(Ok(message.0)).await {
|
||||
Ok(()) => {},
|
||||
Err(e) => {
|
||||
error!("Failed to send command data: {e}");
|
||||
if in_progress.remove(&key).unwrap().send(CommandResponse {
|
||||
uuid: Some(Uuid::from(key)),
|
||||
success: false,
|
||||
response: "Failed to send command data.".to_string(),
|
||||
}).is_err() {
|
||||
error!("Failed to send command response on failure to send command data");
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
Some(message) = in_stream.next() => {
|
||||
match message {
|
||||
Ok(message) => {
|
||||
match message.inner {
|
||||
Some(Inner::Response(response)) => {
|
||||
if let Some(uuid) = &response.uuid {
|
||||
match in_progress.remove(&uuid.value) {
|
||||
Some(sender) => {
|
||||
if sender.send(response).is_err() {
|
||||
error!("Failed to send command response on success")
|
||||
}
|
||||
}
|
||||
None => {
|
||||
result = Status::invalid_argument("Invalid Command UUID");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
result = Status::invalid_argument("Subsequent Message Must Be Command Responses");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Received error from command handler {e}");
|
||||
break
|
||||
},
|
||||
}
|
||||
}
|
||||
else => break,
|
||||
}
|
||||
}
|
||||
cmd_rx.close();
|
||||
if !tx.is_closed() {
|
||||
match tx.send(Err(result)).await {
|
||||
Ok(()) => {}
|
||||
Err(e) => {
|
||||
error!("Failed to close old command sender {e}");
|
||||
}
|
||||
}
|
||||
}
|
||||
for (key, sender) in in_progress.drain() {
|
||||
if sender.send(CommandResponse {
|
||||
uuid: Some(Uuid::from(key)),
|
||||
success: false,
|
||||
response: "Command Handler Shut Down".to_string(),
|
||||
}).is_err() {
|
||||
error!("Failed to send command response on shutdown");
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
|
||||
}
|
||||
}
|
||||
44
server/src/grpc/mod.rs
Normal file
44
server/src/grpc/mod.rs
Normal file
@@ -0,0 +1,44 @@
|
||||
mod cmd;
|
||||
mod tlm;
|
||||
|
||||
use crate::command::service::CommandManagementService;
|
||||
use crate::core::command_service_server::CommandServiceServer;
|
||||
use crate::core::telemetry_service_server::TelemetryServiceServer;
|
||||
use crate::grpc::cmd::CoreCommandService;
|
||||
use crate::grpc::tlm::CoreTelemetryService;
|
||||
use crate::telemetry::management_service::TelemetryManagementService;
|
||||
use log::{error, info};
|
||||
use std::sync::Arc;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tonic::transport::Server;
|
||||
|
||||
pub fn setup(
|
||||
token: CancellationToken,
|
||||
telemetry_management_service: Arc<TelemetryManagementService>,
|
||||
command_service: Arc<CommandManagementService>,
|
||||
) -> anyhow::Result<JoinHandle<()>> {
|
||||
let addr = "[::1]:50051".parse()?;
|
||||
Ok(tokio::spawn(async move {
|
||||
let tlm_service = CoreTelemetryService {
|
||||
tlm_management: telemetry_management_service,
|
||||
cancellation_token: token.clone(),
|
||||
};
|
||||
|
||||
let cmd_service = CoreCommandService {
|
||||
command_service,
|
||||
cancellation_token: token.clone(),
|
||||
};
|
||||
|
||||
info!("Starting gRPC Server");
|
||||
let result = Server::builder()
|
||||
.add_service(TelemetryServiceServer::new(tlm_service))
|
||||
.add_service(CommandServiceServer::new(cmd_service))
|
||||
.serve_with_shutdown(addr, token.cancelled_owned())
|
||||
.await;
|
||||
|
||||
if let Err(err) = result {
|
||||
error!("gRPC Server Encountered An Error: {err}");
|
||||
}
|
||||
}))
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
use crate::core::telemetry_service_server::{TelemetryService, TelemetryServiceServer};
|
||||
use crate::core::telemetry_service_server::TelemetryService;
|
||||
use crate::core::telemetry_value::Value;
|
||||
use crate::core::{
|
||||
TelemetryDataType, TelemetryDefinitionRequest, TelemetryDefinitionResponse,
|
||||
@@ -9,16 +9,14 @@ use crate::telemetry::data_value::TelemetryDataValue;
|
||||
use crate::telemetry::history::TelemetryHistory;
|
||||
use crate::telemetry::management_service::TelemetryManagementService;
|
||||
use chrono::{DateTime, SecondsFormat};
|
||||
use log::{error, info, trace};
|
||||
use log::trace;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use tokio::select;
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::task::JoinHandle;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
use tonic::codegen::tokio_stream::wrappers::ReceiverStream;
|
||||
use tonic::codegen::tokio_stream::{Stream, StreamExt};
|
||||
use tonic::transport::Server;
|
||||
use tonic::{Request, Response, Status, Streaming};
|
||||
|
||||
pub struct CoreTelemetryService {
|
||||
@@ -141,26 +139,3 @@ impl CoreTelemetryService {
|
||||
Ok(TelemetryInsertResponse {})
|
||||
}
|
||||
}
|
||||
|
||||
pub fn setup(
|
||||
token: CancellationToken,
|
||||
telemetry_management_service: Arc<TelemetryManagementService>,
|
||||
) -> anyhow::Result<JoinHandle<()>> {
|
||||
let addr = "[::1]:50051".parse()?;
|
||||
Ok(tokio::spawn(async move {
|
||||
let tlm_service = CoreTelemetryService {
|
||||
tlm_management: telemetry_management_service,
|
||||
cancellation_token: token.clone(),
|
||||
};
|
||||
|
||||
info!("Starting gRPC Server");
|
||||
let result = Server::builder()
|
||||
.add_service(TelemetryServiceServer::new(tlm_service))
|
||||
.serve_with_shutdown(addr, token.cancelled_owned())
|
||||
.await;
|
||||
|
||||
if let Err(err) = result {
|
||||
error!("gRPC Server Encountered An Error: {err}");
|
||||
}
|
||||
}))
|
||||
}
|
||||
34
server/src/http/api/cmd.rs
Normal file
34
server/src/http/api/cmd.rs
Normal file
@@ -0,0 +1,34 @@
|
||||
use crate::command::service::CommandManagementService;
|
||||
use crate::http::error::HttpServerResultError;
|
||||
use actix_web::{get, post, web, Responder};
|
||||
use std::sync::Arc;
|
||||
|
||||
#[post("/cmd/{name:[\\w\\d/_-]+}")]
|
||||
pub(super) async fn send_command(
|
||||
command_service: web::Data<Arc<CommandManagementService>>,
|
||||
name: web::Path<String>,
|
||||
parameters: web::Json<serde_json::Map<String, serde_json::Value>>,
|
||||
) -> Result<impl Responder, HttpServerResultError> {
|
||||
let result = command_service
|
||||
.send_command(name.to_string(), parameters.into_inner())
|
||||
.await?;
|
||||
|
||||
Ok(web::Json(result))
|
||||
}
|
||||
|
||||
#[get("/cmd")]
|
||||
pub(super) async fn get_all(
|
||||
command_service: web::Data<Arc<CommandManagementService>>,
|
||||
) -> Result<impl Responder, HttpServerResultError> {
|
||||
Ok(web::Json(command_service.get_commands()?))
|
||||
}
|
||||
|
||||
#[get("/cmd/{name:[\\w\\d/_-]+}")]
|
||||
pub(super) async fn get_one(
|
||||
command_service: web::Data<Arc<CommandManagementService>>,
|
||||
name: web::Path<String>,
|
||||
) -> Result<impl Responder, HttpServerResultError> {
|
||||
Ok(web::Json(
|
||||
command_service.get_command_definition(&name.to_string()),
|
||||
))
|
||||
}
|
||||
@@ -1,3 +1,4 @@
|
||||
mod cmd;
|
||||
mod panels;
|
||||
mod tlm;
|
||||
|
||||
@@ -11,5 +12,8 @@ pub fn setup_api(cfg: &mut web::ServiceConfig) {
|
||||
.service(panels::get_all)
|
||||
.service(panels::get_one)
|
||||
.service(panels::set)
|
||||
.service(panels::delete);
|
||||
.service(panels::delete)
|
||||
.service(cmd::send_command)
|
||||
.service(cmd::get_all)
|
||||
.service(cmd::get_one);
|
||||
}
|
||||
|
||||
@@ -2,7 +2,6 @@ use actix_web::error::ResponseError;
|
||||
use actix_web::http::header::ContentType;
|
||||
use actix_web::http::StatusCode;
|
||||
use actix_web::HttpResponse;
|
||||
use anyhow::Error;
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
@@ -16,31 +15,28 @@ pub enum HttpServerResultError {
|
||||
#[error("Timed out")]
|
||||
Timeout,
|
||||
#[error("Internal Error")]
|
||||
InternalError(anyhow::Error),
|
||||
InternalError(#[from] anyhow::Error),
|
||||
#[error("Panel Uuid Not Found: {uuid}")]
|
||||
PanelUuidNotFound { uuid: String },
|
||||
#[error(transparent)]
|
||||
Command(#[from] crate::command::error::Error),
|
||||
}
|
||||
|
||||
impl ResponseError for HttpServerResultError {
|
||||
fn status_code(&self) -> StatusCode {
|
||||
match *self {
|
||||
match self {
|
||||
HttpServerResultError::TlmNameNotFound { .. } => StatusCode::NOT_FOUND,
|
||||
HttpServerResultError::TlmUuidNotFound { .. } => StatusCode::NOT_FOUND,
|
||||
HttpServerResultError::InvalidDateTime { .. } => StatusCode::BAD_REQUEST,
|
||||
HttpServerResultError::Timeout => StatusCode::GATEWAY_TIMEOUT,
|
||||
HttpServerResultError::InternalError { .. } => StatusCode::INTERNAL_SERVER_ERROR,
|
||||
HttpServerResultError::PanelUuidNotFound { .. } => StatusCode::NOT_FOUND,
|
||||
HttpServerResultError::Command(inner) => inner.status_code(),
|
||||
}
|
||||
}
|
||||
fn error_response(&self) -> HttpResponse {
|
||||
HttpResponse::build(self.status_code())
|
||||
.insert_header(ContentType::html())
|
||||
.insert_header(ContentType::plaintext())
|
||||
.body(self.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<anyhow::Error> for HttpServerResultError {
|
||||
fn from(value: Error) -> Self {
|
||||
Self::InternalError(value)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ mod api;
|
||||
mod error;
|
||||
mod websocket;
|
||||
|
||||
use crate::command::service::CommandManagementService;
|
||||
use crate::http::api::setup_api;
|
||||
use crate::http::websocket::setup_websocket;
|
||||
use crate::panels::PanelService;
|
||||
@@ -16,10 +17,12 @@ pub async fn setup(
|
||||
cancellation_token: CancellationToken,
|
||||
telemetry_definitions: Arc<TelemetryManagementService>,
|
||||
panel_service: PanelService,
|
||||
command_service: Arc<CommandManagementService>,
|
||||
) -> anyhow::Result<()> {
|
||||
let data = web::Data::new(telemetry_definitions);
|
||||
let cancel_token = web::Data::new(cancellation_token);
|
||||
let panel_service = web::Data::new(Arc::new(panel_service));
|
||||
let command_service = web::Data::new(command_service);
|
||||
|
||||
info!("Starting HTTP Server");
|
||||
HttpServer::new(move || {
|
||||
@@ -27,6 +30,7 @@ pub async fn setup(
|
||||
.app_data(data.clone())
|
||||
.app_data(cancel_token.clone())
|
||||
.app_data(panel_service.clone())
|
||||
.app_data(command_service.clone())
|
||||
.service(web::scope("/ws").configure(setup_websocket))
|
||||
.service(web::scope("/api").configure(setup_api))
|
||||
.wrap(Logger::default())
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
mod command;
|
||||
mod grpc;
|
||||
mod http;
|
||||
mod panels;
|
||||
@@ -9,6 +10,7 @@ pub mod core {
|
||||
tonic::include_proto!("core");
|
||||
}
|
||||
|
||||
use crate::command::service::CommandManagementService;
|
||||
use crate::panels::PanelService;
|
||||
use crate::telemetry::history::TelemetryHistoryService;
|
||||
use crate::telemetry::management_service::TelemetryManagementService;
|
||||
@@ -49,11 +51,13 @@ pub async fn setup() -> anyhow::Result<()> {
|
||||
TelemetryHistoryService::new(telemetry_folder)?,
|
||||
)?);
|
||||
|
||||
let grpc_server = grpc::setup(cancellation_token.clone(), tlm.clone())?;
|
||||
let cmd = Arc::new(CommandManagementService::new());
|
||||
|
||||
let grpc_server = grpc::setup(cancellation_token.clone(), tlm.clone(), cmd.clone())?;
|
||||
|
||||
let panel_service = PanelService::new(sqlite.clone());
|
||||
|
||||
let result = http::setup(cancellation_token.clone(), tlm.clone(), panel_service).await;
|
||||
let result = http::setup(cancellation_token.clone(), tlm.clone(), panel_service, cmd).await;
|
||||
cancellation_token.cancel();
|
||||
result?; // result is dropped
|
||||
grpc_server.await?; //grpc server is dropped
|
||||
|
||||
Reference in New Issue
Block a user