first cleanish working version
This commit is contained in:
commit
6611cb0dd4
|
@ -0,0 +1,20 @@
|
|||
[package]
|
||||
name = "newdasher"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
|
||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||
|
||||
[dependencies]
|
||||
axum = "0.7.4"
|
||||
clap = { version = "4.5.1", features = ["derive"] }
|
||||
serde = { version = "1.0.196", features = ["derive"] }
|
||||
serde_json = "1.0.113"
|
||||
tokio = { version = "1.36.0", features = ["net", "rt-multi-thread"] }
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = "0.3"
|
||||
sqlx = { version = "0.7", features = [ "runtime-tokio", "tls-rustls", "sqlite" ] }
|
||||
thiserror = "1.0.57"
|
||||
|
||||
[profile.dev.package.sqlx-macros]
|
||||
opt-level = 3
|
|
@ -0,0 +1,72 @@
|
|||
use axum::{extract::State, http::StatusCode, routing::post, Json, Router};
|
||||
use thiserror::Error;
|
||||
|
||||
mod types;
|
||||
use types::*;
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum HttpServError {
|
||||
#[error("OS Error")]
|
||||
IoError(#[from] std::io::Error),
|
||||
}
|
||||
|
||||
use crate::db::DB;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct AppState {
|
||||
db: DB,
|
||||
}
|
||||
|
||||
impl AppState {
|
||||
pub fn new(db: DB) -> Self {
|
||||
AppState { db }
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn run_web_api(state: AppState) -> Result<(), HttpServError> {
|
||||
let app = Router::new()
|
||||
.route("/last", post(query_last))
|
||||
.route("/lastn", post(query_last_n))
|
||||
.with_state(state);
|
||||
|
||||
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?;
|
||||
axum::serve(listener, app).await.unwrap();
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn query_last(
|
||||
State(state): State<AppState>,
|
||||
Json(payload): Json<Query>,
|
||||
) -> (StatusCode, Json<Entry>) {
|
||||
if let Ok((val, date)) = state.db.get_last(&payload.name).await {
|
||||
let response = Entry { val, date };
|
||||
|
||||
(StatusCode::OK, Json(response))
|
||||
} else {
|
||||
(StatusCode::NOT_FOUND, Json(Entry::default()))
|
||||
}
|
||||
}
|
||||
|
||||
async fn query_last_n(
|
||||
State(state): State<AppState>,
|
||||
Json(payload): Json<Query>,
|
||||
) -> (StatusCode, Json<Vec<Entry>>) {
|
||||
let n = payload.n.unwrap_or(1);
|
||||
|
||||
if let Ok(answers) = state.db.get_last_n(&payload.name, n).await {
|
||||
(
|
||||
StatusCode::OK,
|
||||
Json(
|
||||
answers
|
||||
.iter()
|
||||
.map(|x| Entry {
|
||||
val: x.0.clone(),
|
||||
date: x.1,
|
||||
})
|
||||
.collect(),
|
||||
),
|
||||
)
|
||||
} else {
|
||||
(StatusCode::NOT_FOUND, Json(Vec::new()))
|
||||
}
|
||||
}
|
|
@ -0,0 +1,14 @@
|
|||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct Query {
|
||||
pub name: String,
|
||||
pub n: Option<usize>,
|
||||
}
|
||||
|
||||
// the output to our `create_user` handler
|
||||
#[derive(Default, Serialize)]
|
||||
pub struct Entry {
|
||||
pub val: String,
|
||||
pub date: i64,
|
||||
}
|
|
@ -0,0 +1,122 @@
|
|||
use sqlx::{
|
||||
sqlite::{SqliteConnectOptions, SqlitePoolOptions},
|
||||
Pool, Sqlite,
|
||||
};
|
||||
use std::str::FromStr;
|
||||
|
||||
use thiserror::Error;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct DB(Pool<Sqlite>);
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum DBError {
|
||||
#[error("Sql fuckup.")]
|
||||
SqlError(#[from] sqlx::error::Error),
|
||||
#[error("Nothing found")]
|
||||
NoResults,
|
||||
}
|
||||
|
||||
impl DB {
|
||||
/// Creates a new database, currently Sqlite3.
|
||||
pub async fn new(db_url: &str) -> Result<Self, DBError> {
|
||||
let conops = SqliteConnectOptions::from_str(db_url)?.create_if_missing(true);
|
||||
|
||||
let pool = SqlitePoolOptions::new()
|
||||
.max_connections(10)
|
||||
.connect_with(conops)
|
||||
.await?;
|
||||
|
||||
let db = DB(pool);
|
||||
db.create_tables().await?;
|
||||
Ok(db)
|
||||
}
|
||||
|
||||
/// Creates, if they don't exist, two database tables.
|
||||
///
|
||||
/// The first table stores name of data sets, togheter with an id. (dataset_id : dataset_name)
|
||||
/// The second table stores every message received. (dataset_id : value : date)
|
||||
///
|
||||
// TODO: This is not immensely efficient.
|
||||
async fn create_tables(&self) -> Result<(), DBError> {
|
||||
sqlx::query!(
|
||||
"CREATE TABLE IF NOT EXISTS data_tables (
|
||||
data_id INTEGER PRIMARY KEY,
|
||||
name TEXT NOT NULL,
|
||||
unique(name)
|
||||
)",
|
||||
)
|
||||
.execute(&(self.0))
|
||||
.await?;
|
||||
|
||||
sqlx::query!(
|
||||
"CREATE TABLE IF NOT EXISTS data (
|
||||
data_id INTEGER NOT NULL,
|
||||
value TEXT NOT NULL,
|
||||
date INTEGER NOT NULL,
|
||||
foreign key (data_id) references data_tables (data_id)
|
||||
)",
|
||||
)
|
||||
.execute(&(self.0))
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Adds a certain value to the dataset <name>.
|
||||
pub async fn write_entry(&self, name: &str, val: &str) -> Result<(), DBError> {
|
||||
let name_id = sqlx::query!(
|
||||
"INSERT OR IGNORE INTO data_tables ( name ) VALUES ( ?1 )",
|
||||
name
|
||||
)
|
||||
.execute(&(self.0))
|
||||
.await?
|
||||
.last_insert_rowid();
|
||||
|
||||
sqlx::query!(
|
||||
"INSERT INTO data ( data_id, value, date) VALUES ( ?1, ?2, strftime('%s','now'))",
|
||||
name_id,
|
||||
val
|
||||
)
|
||||
.execute(&(self.0))
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Gets last value from database.
|
||||
pub async fn get_last(&self, name: &str) -> Result<(String, i64), DBError> {
|
||||
let results = (self.get_last_n(name, 1).await?);
|
||||
|
||||
return results
|
||||
.get(0)
|
||||
.map(|x| (*x).clone())
|
||||
.ok_or(DBError::NoResults);
|
||||
}
|
||||
|
||||
/// Retrives the lastest <n> values from the dataset <name>.
|
||||
pub async fn get_last_n(&self, name: &str, n: usize) -> Result<Vec<(String, i64)>, DBError> {
|
||||
struct LastData {
|
||||
value: String,
|
||||
date: i64,
|
||||
}
|
||||
|
||||
let n_i = n as i64;
|
||||
|
||||
let last_vals = sqlx::query_as!(
|
||||
LastData,
|
||||
"SELECT value, date FROM data WHERE
|
||||
data_id = (SELECT data_id FROM data_tables WHERE name = ?1)
|
||||
ORDER BY date ASC
|
||||
LIMIT ?2",
|
||||
name,
|
||||
n_i
|
||||
)
|
||||
.fetch_all(&(self.0))
|
||||
.await?;
|
||||
|
||||
Ok(last_vals
|
||||
.iter()
|
||||
.map(|x| (x.value.clone(), x.date))
|
||||
.collect::<Vec<(String, i64)>>())
|
||||
}
|
||||
}
|
|
@ -0,0 +1,3 @@
|
|||
mod api;
|
||||
mod db;
|
||||
mod proto;
|
|
@ -0,0 +1,25 @@
|
|||
mod api;
|
||||
use api::{run_web_api, AppState};
|
||||
|
||||
mod proto;
|
||||
use proto::listen_api;
|
||||
use tokio::task;
|
||||
mod db;
|
||||
|
||||
use db::DB;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
tracing_subscriber::fmt::init();
|
||||
|
||||
let db = DB::new("sqlite:test.db").await.unwrap();
|
||||
|
||||
let _apitask = task::spawn_local(listen_api(db.clone()));
|
||||
|
||||
let state = AppState::new(db);
|
||||
|
||||
match run_web_api(state).await {
|
||||
Ok(_) => todo!(),
|
||||
_ => todo!(),
|
||||
}
|
||||
}
|
|
@ -0,0 +1,47 @@
|
|||
use thiserror::Error;
|
||||
use tokio::{
|
||||
io::{AsyncBufReadExt, BufReader},
|
||||
net::{TcpListener, TcpStream},
|
||||
task,
|
||||
};
|
||||
|
||||
use std::io;
|
||||
|
||||
use crate::db::DB;
|
||||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum ProtoError {
|
||||
#[error("Sql fuckup.")]
|
||||
DBError(#[from] crate::db::DBError),
|
||||
}
|
||||
|
||||
async fn process_socket(socket: TcpStream, dbobj: DB) -> Result<(), ProtoError> {
|
||||
let reader = BufReader::new(socket);
|
||||
|
||||
let mut lines = reader.lines();
|
||||
|
||||
if let Ok(Some(line)) = lines.next_line().await {
|
||||
println!("name: {}", line);
|
||||
let service_name = line;
|
||||
|
||||
let mut n = 0;
|
||||
|
||||
while let Ok(Some(line)) = lines.next_line().await {
|
||||
println!("data line: n: {}, {}", n, line);
|
||||
n = n + 1;
|
||||
dbobj.write_entry(&service_name, &line).await?;
|
||||
}
|
||||
};
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn listen_api(dbobj: DB) -> io::Result<()> {
|
||||
let listener = TcpListener::bind("0.0.0.0:8085").await?;
|
||||
|
||||
loop {
|
||||
let (socket, _) = listener.accept().await?;
|
||||
let dbtmp = dbobj.clone();
|
||||
task::spawn(async move { process_socket(socket, dbtmp).await });
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue