Skip to content

Commit

Permalink
Started to integrate tokio
Browse files Browse the repository at this point in the history
  • Loading branch information
stefanDeveloper committed Feb 17, 2024
1 parent f14c41c commit 27373df
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 70 deletions.
6 changes: 3 additions & 3 deletions heidpi-rust/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use anyhow::{bail, ensure};
use clap::Parser;
use std::path::PathBuf;

use crate::stream::{self, NDpidTcpstream};
use crate::stream;

#[derive(Parser, Debug)]
#[command(author, version, about, long_about)]
Expand Down Expand Up @@ -42,7 +42,7 @@ pub enum Cli {
}

impl Cli {
pub fn run() -> anyhow::Result<()> {
pub async fn run() -> anyhow::Result<()> {
let cli = Self::parse();
use Cli::*;
match cli {
Expand All @@ -68,7 +68,7 @@ impl Cli {
error_events,
flow_events,
} => {
let mut v = NDpidTcpstream::connect("127.0.0.1:7000");
let mut v = stream::connect("127.0.0.1:7000").await;
}
}

Expand Down
2 changes: 1 addition & 1 deletion heidpi-rust/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub async fn main() {
// default to displaying warning and error log messages only
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("trace")).init();

match Cli::run() {
match Cli::run().await {
Ok(_) => {}
Err(e) => {
error!("{e}");
Expand Down
140 changes: 74 additions & 66 deletions heidpi-rust/src/stream.rs
Original file line number Diff line number Diff line change
@@ -1,94 +1,103 @@
use log::{debug, info, trace, warn};
use serde_json::{error, Value};
use log::{info, trace, warn};
use serde_json::Value;
use std::io::{self, Read};
use std::net::TcpStream;
// use std::net::TcpStream;
use std::str;
use std::time::Duration;
use std::{thread, time};
use valico::json_schema;
use tokio::io::Interest;
use tokio::net::TcpStream;

const NETWORK_BUFFER_LENGTH_DIGITS: usize = 5;
const NETWORK_BUFFER_MAX_SIZE: usize = 33792;
const EOL: &str = "\n";

#[derive(Debug)]
pub struct NDpidTcpstream {
event_type: NDpidEventType,
pub struct HeiDPITcpstream {
event_type: HeiDPIEventType,
data: Value,
}

#[derive(Debug)]
pub enum NDpidEventType {
pub enum HeiDPIEventType {
PACKET,
FLOW,
DAEMON,
ERROR,
}

pub async fn connect(connection: &str) -> anyhow::Result<()> {
loop {
match std::net::TcpStream::connect(connection) {
Err(_e) => {
warn!("Could not connect to Server");

impl NDpidTcpstream {
pub fn connect(&self, connection: &str) -> anyhow::Result<()> {
loop {
match TcpStream::connect(connection) {
Err(_e) => {
warn!("Could not connect to Server");
// We don't want to hammer the server with reconnection attempts, so we wait for 5 seconds.
let five_seconds = time::Duration::from_millis(5000);
thread::sleep(five_seconds);

// We don't want to hammer the server with reconnection attempts, so we wait for 5 seconds.
let five_seconds = time::Duration::from_millis(5000);
thread::sleep(five_seconds);

continue;
}
Ok(mut stream) => {
info!("Connected");
match stream.set_nonblocking(true) {
Ok(..) => info!("Non-blocking State"),
Err(..) => panic!("Non-blocking State Failed"),
};

match stream.set_read_timeout(Some(Duration::new(60, 0))) {
Ok(..) => info!("Set Read Timeout"),
Err(..) => panic!("Setting Read Timeout Failed"),
};
continue;
}
Ok(std_stream) => {
info!("Connected");
match std_stream.set_nonblocking(true) {
Ok(..) => info!("Non-blocking State"),
Err(..) => panic!("Non-blocking State Failed"),
};

let mut buf = [0u8; NETWORK_BUFFER_MAX_SIZE];
match std_stream.set_read_timeout(Some(Duration::new(60, 0))) {
Ok(..) => info!("Set Read Timeout"),
Err(..) => panic!("Setting Read Timeout Failed"),
};

let stream = TcpStream::from_std(std_stream)?;
let mut buf = vec![0u8; NETWORK_BUFFER_MAX_SIZE];
let ready = stream
.ready(Interest::READABLE | Interest::WRITABLE)
.await?;
if ready.is_readable() {
loop {
match stream.read(&mut buf) {
match stream.try_read(&mut buf) {
Ok(data) => {
match std::str::from_utf8(&buf[..data]) {
Ok(json_str) => {
let s = match str::from_utf8(json_str.as_bytes()) {
Ok(v) => v,
Err(e) => {
warn!("Invalid UTF-8 sequence: '{}'.", e);
break;
}
};
for s_plit_n in s.split(EOL).into_iter() {
if s_plit_n.len() > NETWORK_BUFFER_LENGTH_DIGITS {
let v: Value = match serde_json::from_str(
&s_plit_n[NETWORK_BUFFER_LENGTH_DIGITS..],
) {
Ok(json) => {
trace!("Converted result: {}", json);
json
}
Err(_e) => {
warn!(
"Invalid JSON object: '{}' for string: {}",
_e, s
);
serde_json::Value::Null
}
};
};
}
}
Err(_) => {
warn!("BUG: Invalid UTF-8 in buffer");
}
};
println!("read {} bytes", data);
// match std::str::from_utf8(&buf[..data]) {
// Ok(json_str) => {
// let s = match str::from_utf8(json_str.as_bytes()) {
// Ok(v) => v,
// Err(e) => {
// warn!("Invalid UTF-8 sequence: '{}'.", e);
// break;
// }
// };
// for s_plit_n in s.split(EOL).into_iter() {
// if s_plit_n.len() > NETWORK_BUFFER_LENGTH_DIGITS {
// // TODO Call processing and save to file
// tokio::spawn(async move {
// // process(socket).await;

// let v: Value = match serde_json::from_str(
// &s_plit_n[NETWORK_BUFFER_LENGTH_DIGITS..],
// ) {
// Ok(json) => {
// trace!("Converted result: {}", json);
// json
// }
// Err(_e) => {
// warn!(
// "Invalid JSON object: '{}' for string: {}",
// _e, s
// );
// serde_json::Value::Null
// }
// };
// });
// };
// }
// }
// Err(_) => {
// warn!("BUG: Invalid UTF-8 in buffer");
// }
// };
}
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
continue;
Expand All @@ -104,6 +113,5 @@ impl NDpidTcpstream {
}
}
}
Ok(())
}
}

0 comments on commit 27373df

Please sign in to comment.