Skip to content

Commit

Permalink
refactor: stream Duration instead of f64
Browse files Browse the repository at this point in the history
  • Loading branch information
knsd committed Sep 23, 2019
1 parent 608f938 commit 247eb0f
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 14 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ libc = "0.2"
mio = "0.6"
rand = "0.4"
socket2 = "0.3"
time = "0.1"
tokio-executor="0.1.2"
tokio-reactor = "0.1.1"
tokio-timer = "0.2.3"
Expand Down
2 changes: 1 addition & 1 deletion examples/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ fn main() {
let future = stream.and_then(|stream| {
stream.take(3).for_each(|mb_time| {
match mb_time {
Some(time) => println!("time={}", time),
Some(time) => println!("time={:?}", time),
None => println!("timeout"),
}
Ok(())
Expand Down
3 changes: 1 addition & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
//! let future = stream.and_then(|stream| {
//! stream.take(3).for_each(|mb_time| {
//! match mb_time {
//! Some(time) => println!("time={}", time),
//! Some(time) => println!("time={:?}", time),
//! None => println!("timeout"),
//! }
//! Ok(())
Expand All @@ -41,7 +41,6 @@ extern crate libc;
extern crate mio;
extern crate rand;
extern crate socket2;
extern crate time;
extern crate parking_lot;
extern crate tokio_executor;
extern crate tokio_reactor;
Expand Down
19 changes: 9 additions & 10 deletions src/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use futures::sync::oneshot;
use rand::random;
use parking_lot::Mutex;
use socket2::{Domain, Protocol, Type};
use time::precise_time_s;

use tokio_executor::spawn;
use tokio_reactor::Handle;
Expand All @@ -29,7 +28,7 @@ type EchoRequestBuffer = [u8; ECHO_REQUEST_BUFFER_SIZE];

#[derive(Clone)]
struct PingState {
inner: Arc<Mutex<HashMap<Token, oneshot::Sender<f64>>>>,
inner: Arc<Mutex<HashMap<Token, oneshot::Sender<Instant>>>>,
}

impl PingState {
Expand All @@ -39,11 +38,11 @@ impl PingState {
}
}

fn insert(&self, key: Token, value: oneshot::Sender<f64>) {
fn insert(&self, key: Token, value: oneshot::Sender<Instant>) {
self.inner.lock().insert(key, value);
}

fn remove(&self, key: &[u8]) -> Option<oneshot::Sender<f64>> {
fn remove(&self, key: &[u8]) -> Option<oneshot::Sender<Instant>> {
self.inner.lock().remove(key)
}
}
Expand All @@ -61,16 +60,16 @@ enum PingFutureKind {
}

struct NormalPingFutureKind {
start_time: f64,
start_time: Instant,
state: PingState,
token: Token,
delay: Delay,
send: Option<Send<EchoRequestBuffer>>,
receiver: oneshot::Receiver<f64>,
receiver: oneshot::Receiver<Instant>,
}

impl Future for PingFuture {
type Item = Option<f64>;
type Item = Option<Duration>;
type Error = Error;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
Expand Down Expand Up @@ -221,7 +220,7 @@ impl PingChainStream {
}

impl Stream for PingChainStream {
type Item = Option<f64>;
type Item = Option<Duration>;
type Error = Error;

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
Expand Down Expand Up @@ -398,7 +397,7 @@ impl Pinger {

PingFuture {
inner: PingFutureKind::Normal(NormalPingFutureKind {
start_time: precise_time_s(),
start_time: Instant::now(),
state: self.inner.state.clone(),
token: token,
delay: Delay::new(deadline),
Expand Down Expand Up @@ -463,7 +462,7 @@ impl<Message: ParseReply> Future for Receiver<Message> {
match self.socket.recv(&mut self.buffer) {
Ok(Async::Ready(bytes)) => {
if let Some(payload) = Message::reply_payload(&self.buffer[..bytes]) {
let now = precise_time_s();
let now = Instant::now();
if let Some(sender) = self.state.remove(payload) {
sender.send(now).unwrap_or_default()
}
Expand Down

0 comments on commit 247eb0f

Please sign in to comment.