This commit is contained in:
2024-10-19 13:15:09 -07:00
commit 17b9e50f1f
12 changed files with 1623 additions and 0 deletions

3
.gitignore vendored Normal file
View File

@@ -0,0 +1,3 @@
/target
.idea/

1319
Cargo.lock generated Normal file

File diff suppressed because it is too large Load Diff

4
Cargo.toml Normal file
View File

@@ -0,0 +1,4 @@
[workspace]
members = ["server", "examples/simple_producer"]
resolver = "2"

37
docs/architecture.md Normal file
View File

@@ -0,0 +1,37 @@
# Telemetry Visualization Architecture
## Data Gateway
A data gateway is a standalone application which connects to a live source of
data and a consumer of commands.
While this can be the original producer and end target of the commands, it can
also be a proxy or middleman application responsible for connecting the end
application to the telemetry and commanding backend.
This talks to the Backend's gateway API.
## Backend
The backend is responsible for acting as a bridge between data gateways and
the frontend.
In addition the backend is responsible for recording and providing access to
historical data.
### Gateway API
The backend provides an API for gateways to connect and send telemetry and
receive commands.
The gateway is responsible for registering the types of commands and
telemetry which it expects to interface with.
The gateway API is a Websocket API.
### Frontend API
The backend provides an API for frontends to receive telemetry and send
commands through the gateway.
Additionally, the frontend api provides access to save configurations
for various panels and procedures.
The frontend API is an HTTP and Websocket API.
## Frontend
The frontend is a web-based tool for displaying telemetry and sending
commands to end devices.

View File

@@ -0,0 +1,9 @@
[package]
name = "simple_producer"
edition = "2021"
[dependencies]
server = { path = "../../server" }
tonic = "0.12.3"
tokio = { version = "1.40.0", features = ["rt-multi-thread"] }

View File

@@ -0,0 +1,64 @@
use server::core::telemetry_service_client::TelemetryServiceClient;
use server::core::telemetry_value::Value;
use server::core::{TelemetryDataType, TelemetryDefinitionRequest, TelemetryItem, TelemetryValue, Timestamp};
use std::f32::consts::TAU;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::mpsc;
use tokio::time::Instant;
use tonic::codegen::tokio_stream::wrappers::ReceiverStream;
use tonic::codegen::tokio_stream::StreamExt;
use tonic::Request;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = TelemetryServiceClient::connect("http://[::1]:50051").await?;
let request = Request::new(TelemetryDefinitionRequest {
name: "simple_producer/sin".into(),
data_type: TelemetryDataType::Float32.into(),
});
let response = client.new_telemetry(request).await?.into_inner();
println!("Response={:?}", response);
let Some(uuid) = response.uuid else { return Err("UUID Missing".into()); };
let (tx, rx) = mpsc::channel(128);
let mut response_stream = client.insert_telemetry(ReceiverStream::new(rx)).await?.into_inner();
let mut next_time = Instant::now();
let mut index = 0;
loop {
next_time += Duration::from_millis(100);
index += 10;
tokio::time::sleep_until(next_time).await;
let now = SystemTime::now().duration_since(UNIX_EPOCH)?;
tx.send(TelemetryItem {
uuid: Some(uuid.clone()),
value: Some(TelemetryValue {
value: Some(Value::Float32(
(TAU * (index as f32) / (1000.0_f32)).sin()
)),
}),
timestamp: Some(Timestamp {
secs: now.as_secs(),
nanos: now.subsec_nanos(),
}),
}).await?;
if let Some(response) = response_stream.next().await {
match response {
Ok(_) => {
}
Err(_) => {
println!("Error");
}
}
} else {
println!("Stream closed");
break;
}
}
Ok(())
}

18
server/Cargo.toml Normal file
View File

@@ -0,0 +1,18 @@
[package]
name = "server"
edition = "2021"
version = "0.1.0"
authors = ["Sergey <me@sergeysav.com>"]
[dependencies]
fern = "0.6.2"
log = "0.4.22"
prost = "0.13.3"
rand = "0.8.5"
tonic = { version = "0.12.3" }
tokio = { version = "1.40.0", features = ["rt-multi-thread"] }
chrono = "0.4.38"
[build-dependencies]
tonic-build = "0.12.3"

5
server/build.rs Normal file
View File

@@ -0,0 +1,5 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
tonic_build::compile_protos("proto/core.proto")?;
Ok(())
}

48
server/proto/core.proto Normal file
View File

@@ -0,0 +1,48 @@
syntax = "proto3";
package core;
enum TelemetryDataType {
FLOAT_32 = 0;
FLOAT_64 = 1;
}
message TelemetryValue {
oneof value {
float float_32 = 1;
float float_64 = 2;
}
}
message UUID {
bytes value = 1;
}
// UTC since UNIX
message Timestamp {
fixed64 secs = 1;
fixed32 nanos = 2;
}
message TelemetryDefinitionRequest {
string name = 1;
TelemetryDataType data_type = 2;
}
message TelemetryDefinitionResponse {
UUID uuid = 1;
}
message TelemetryItem {
UUID uuid = 1;
TelemetryValue value = 2;
Timestamp timestamp = 3;
}
message TelemetryInsertResponse {
}
service TelemetryService {
rpc NewTelemetry (TelemetryDefinitionRequest) returns (TelemetryDefinitionResponse);
rpc InsertTelemetry (stream TelemetryItem) returns (stream TelemetryInsertResponse);
}

74
server/src/lib.rs Normal file
View File

@@ -0,0 +1,74 @@
mod uuid;
pub mod core {
tonic::include_proto!("core");
}
use std::error::Error;
use std::pin::Pin;
use log::{trace};
use tokio::sync::mpsc;
use tonic::{Request, Response, Status, Streaming};
use tonic::codegen::tokio_stream::{Stream, StreamExt};
use tonic::codegen::tokio_stream::wrappers::ReceiverStream;
use tonic::transport::Server;
use core::telemetry_service_server::TelemetryService;
use crate::core::{TelemetryDefinitionRequest, TelemetryDefinitionResponse, TelemetryInsertResponse, TelemetryItem, Uuid};
use crate::core::telemetry_service_server::TelemetryServiceServer;
#[derive(Debug, Default)]
pub struct CoreTelemetryService {}
#[tonic::async_trait]
impl TelemetryService for CoreTelemetryService {
async fn new_telemetry(&self, request: Request<TelemetryDefinitionRequest>) -> Result<Response<TelemetryDefinitionResponse>, Status> {
trace!("TelemetryService::new_telemetry {:?}", request);
let reply = TelemetryDefinitionResponse {
uuid: Some(Uuid::random()),
};
Ok(Response::new(reply))
}
type InsertTelemetryStream = Pin<Box<dyn Stream<Item = Result<TelemetryInsertResponse, Status>> + Send>>;
async fn insert_telemetry(&self, request: Request<Streaming<TelemetryItem>>) -> Result<Response<Self::InsertTelemetryStream>, Status> {
trace!("TelemetryService::insert_telemetry {:?}", request);
let mut in_stream = request.into_inner();
let (tx, rx) = mpsc::channel(128);
tokio::spawn(async move {
while let Some(message) = in_stream.next().await {
match message {
Ok(tlm_item) => {
trace!("tlm_item {:?}", tlm_item);
tx
.send(Ok(TelemetryInsertResponse {}))
.await
.expect("working rx");
}
Err(err) => {
let _ = tx.send(Err(err)).await;
}
}
}
});
Ok(Response::new(Box::pin(ReceiverStream::new(rx))))
}
}
pub async fn setup() -> Result<(), Box<dyn Error>> {
let addr = "[::1]:50051".parse()?;
let tlm_service = CoreTelemetryService::default();
Server::builder()
.add_service(TelemetryServiceServer::new(tlm_service))
.serve(addr)
.await?;
Ok(())
}

30
server/src/main.rs Normal file
View File

@@ -0,0 +1,30 @@
use std::env;
use std::str::FromStr;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let log_file = env::var("LOG_FILE");
let log_level = match env::var("LOG_LEVEL") {
Ok(log_level) => log::LevelFilter::from_str(&log_level)
.unwrap_or(log::LevelFilter::Info),
Err(_) => log::LevelFilter::Info
};
let mut log_config = fern::Dispatch::new()
.format(|out, message, record| {
out.finish(format_args!(
"[{}][{}][{}] {}",
chrono::Utc::now().format("%Y-%m-%d %H:%M:%S"),
record.target(),
record.level(),
message
))
})
.level(log_level)
.chain(std::io::stdout());
if let Ok(log_file) = log_file {
log_config = log_config.chain(fern::log_file(log_file)?)
}
log_config.apply()?;
server::setup().await
}

12
server/src/uuid.rs Normal file
View File

@@ -0,0 +1,12 @@
use rand::RngCore;
use crate::core::{Uuid};
impl Uuid {
pub fn random() -> Self {
let mut uuid = vec![0u8; 16];
rand::thread_rng().fill_bytes(&mut uuid);
Self {
value: uuid,
}
}
}