From 3b71e6e33dc66b92380047eb6a4a7a3e3f18babd Mon Sep 17 00:00:00 2001 From: Stefan Machmeier Date: Tue, 27 Feb 2024 11:48:27 +0100 Subject: [PATCH] Update rust --- heidpi-rust/src/main.rs | 1 + heidpi-rust/src/process.rs | 2 +- heidpi-rust/src/stream.rs | 61 ++++++++++++++++---------------------- 3 files changed, 27 insertions(+), 37 deletions(-) diff --git a/heidpi-rust/src/main.rs b/heidpi-rust/src/main.rs index 90a48cd..f8dcbc7 100644 --- a/heidpi-rust/src/main.rs +++ b/heidpi-rust/src/main.rs @@ -1,6 +1,7 @@ pub mod cli; pub mod logging; pub mod stream; +pub mod process; use log::error; use cli::Cli; diff --git a/heidpi-rust/src/process.rs b/heidpi-rust/src/process.rs index ec8d002..8810a9c 100644 --- a/heidpi-rust/src/process.rs +++ b/heidpi-rust/src/process.rs @@ -1,6 +1,6 @@ use crate::geoip::get_geoip; use std::net::IpAddr; -pub fn process(json: &IpAddr) { +pub fn process(json: Value) { get_geoip(""); } \ No newline at end of file diff --git a/heidpi-rust/src/stream.rs b/heidpi-rust/src/stream.rs index cbb297e..c947809 100644 --- a/heidpi-rust/src/stream.rs +++ b/heidpi-rust/src/stream.rs @@ -1,17 +1,13 @@ use log::{info, trace, warn}; use serde_json::Value; -use tokio::sync::Mutex; -use std::future::Future; -use std::io::{self, Read}; -use std::rc::Rc; -// use std::net::TcpStream; +use std::io::{self}; use std::str; -use std::sync::Arc; use std::time::Duration; use std::{thread, time}; -use tokio::task; use tokio::net::TcpStream; +use crate::process::process; + const NETWORK_BUFFER_LENGTH_DIGITS: usize = 5; const NETWORK_BUFFER_MAX_SIZE: usize = 33792; const EOL: &str = "\n"; @@ -43,7 +39,6 @@ pub async fn connect(connection: &str) -> anyhow::Result<()> { continue; } Ok(std_stream) => { - info!("Connected"); match std_stream.set_nonblocking(true) { @@ -57,8 +52,6 @@ pub async fn connect(connection: &str) -> anyhow::Result<()> { }; let stream = TcpStream::from_std(std_stream)?; - // let stream = Arc::new(Mutex::new(stream)); - let mut buf = vec![0u8; NETWORK_BUFFER_MAX_SIZE]; loop { @@ -69,37 +62,33 @@ pub async fn connect(connection: &str) -> anyhow::Result<()> { Ok(json_str) => { match str::from_utf8(json_str.as_bytes()) { Ok(v) => { - let s = Rc::new(v); - // let g = v.clone(); - // tokio::spawn(async move { - // info!("{:?}",&v); - // }); - info!("{:?}", s); - }, + info!("{:?}", v); + v + } Err(e) => { warn!("Invalid UTF-8 sequence: '{}'.", e); break; } }; - // let d = s.clone(); - - // for s_plit_n in s.split(EOL).into_iter() { - // if s_plit_n.len() > NETWORK_BUFFER_LENGTH_DIGITS { - // // TODO Call processing and save to file - // let v: Value = match serde_json::from_str( - // &s_plit_n[NETWORK_BUFFER_LENGTH_DIGITS..], - // ) { - // Ok(json) => { - // trace!("Converted result: {}", json); - // json - // } - // Err(_e) => { - // warn!("Invalid JSON object: '{}'.", _e); - // serde_json::Value::Null - // } - // }; - // }; - // } + for s_plit_n in json_str.split(EOL).into_iter() { + if s_plit_n.len() > NETWORK_BUFFER_LENGTH_DIGITS { + let v: Value = match serde_json::from_str( + &s_plit_n[NETWORK_BUFFER_LENGTH_DIGITS..], + ) { + Ok(json) => { + trace!("Converted result: {}", json); + json + } + Err(_e) => { + warn!("Invalid JSON object: '{}'.", _e); + serde_json::Value::Null + } + }; + // TODO Multithreading? + // TODO Call processing and save to file + process(v); + }; + } } Err(_) => { warn!("BUG: Invalid UTF-8 in buffer");