From 247eb0f87c17690830e2255c3a19725c22a61d2c Mon Sep 17 00:00:00 2001 From: Fedor Gogolev Date: Mon, 23 Sep 2019 20:33:56 +0300 Subject: [PATCH] refactor: stream Duration instead of f64 --- Cargo.toml | 1 - examples/simple.rs | 2 +- src/lib.rs | 3 +-- src/ping.rs | 19 +++++++++---------- 4 files changed, 11 insertions(+), 14 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 8d7e582..8dbc078 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,7 +16,6 @@ libc = "0.2" mio = "0.6" rand = "0.4" socket2 = "0.3" -time = "0.1" tokio-executor="0.1.2" tokio-reactor = "0.1.1" tokio-timer = "0.2.3" diff --git a/examples/simple.rs b/examples/simple.rs index 0077375..fbc767d 100644 --- a/examples/simple.rs +++ b/examples/simple.rs @@ -13,7 +13,7 @@ fn main() { let future = stream.and_then(|stream| { stream.take(3).for_each(|mb_time| { match mb_time { - Some(time) => println!("time={}", time), + Some(time) => println!("time={:?}", time), None => println!("timeout"), } Ok(()) diff --git a/src/lib.rs b/src/lib.rs index f8a8f31..6a45c72 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -22,7 +22,7 @@ //! let future = stream.and_then(|stream| { //! stream.take(3).for_each(|mb_time| { //! match mb_time { -//! Some(time) => println!("time={}", time), +//! Some(time) => println!("time={:?}", time), //! None => println!("timeout"), //! } //! Ok(()) @@ -41,7 +41,6 @@ extern crate libc; extern crate mio; extern crate rand; extern crate socket2; -extern crate time; extern crate parking_lot; extern crate tokio_executor; extern crate tokio_reactor; diff --git a/src/ping.rs b/src/ping.rs index ff1d584..99419db 100644 --- a/src/ping.rs +++ b/src/ping.rs @@ -10,7 +10,6 @@ use futures::sync::oneshot; use rand::random; use parking_lot::Mutex; use socket2::{Domain, Protocol, Type}; -use time::precise_time_s; use tokio_executor::spawn; use tokio_reactor::Handle; @@ -29,7 +28,7 @@ type EchoRequestBuffer = [u8; ECHO_REQUEST_BUFFER_SIZE]; #[derive(Clone)] struct PingState { - inner: Arc>>>, + inner: Arc>>>, } impl PingState { @@ -39,11 +38,11 @@ impl PingState { } } - fn insert(&self, key: Token, value: oneshot::Sender) { + fn insert(&self, key: Token, value: oneshot::Sender) { self.inner.lock().insert(key, value); } - fn remove(&self, key: &[u8]) -> Option> { + fn remove(&self, key: &[u8]) -> Option> { self.inner.lock().remove(key) } } @@ -61,16 +60,16 @@ enum PingFutureKind { } struct NormalPingFutureKind { - start_time: f64, + start_time: Instant, state: PingState, token: Token, delay: Delay, send: Option>, - receiver: oneshot::Receiver, + receiver: oneshot::Receiver, } impl Future for PingFuture { - type Item = Option; + type Item = Option; type Error = Error; fn poll(&mut self) -> Poll { @@ -221,7 +220,7 @@ impl PingChainStream { } impl Stream for PingChainStream { - type Item = Option; + type Item = Option; type Error = Error; fn poll(&mut self) -> Poll, Self::Error> { @@ -398,7 +397,7 @@ impl Pinger { PingFuture { inner: PingFutureKind::Normal(NormalPingFutureKind { - start_time: precise_time_s(), + start_time: Instant::now(), state: self.inner.state.clone(), token: token, delay: Delay::new(deadline), @@ -463,7 +462,7 @@ impl Future for Receiver { match self.socket.recv(&mut self.buffer) { Ok(Async::Ready(bytes)) => { if let Some(payload) = Message::reply_payload(&self.buffer[..bytes]) { - let now = precise_time_s(); + let now = Instant::now(); if let Some(sender) = self.state.remove(payload) { sender.send(now).unwrap_or_default() }