From 311db08051d1af6ee6f1437765bd13da5860575f Mon Sep 17 00:00:00 2001 From: Tamo Date: Wed, 25 Oct 2023 11:40:26 +0200 Subject: [PATCH] add support for tcp keepalive --- examples/leader_latch.rs | 2 +- examples/persistent_watches.rs | 36 ++-- examples/queue_consume.rs | 11 +- examples/queue_producer.rs | 6 +- examples/zookeeper_example.rs | 21 ++- src/io.rs | 17 +- src/zookeeper.rs | 226 ++++++++++++----------- tests/test_cache.rs | 42 ++++- tests/test_leader.rs | 6 +- tests/test_persistent_recursive_watch.rs | 98 +++++++--- tests/test_recursive.rs | 23 ++- tests/test_zk.rs | 54 +++--- 12 files changed, 323 insertions(+), 219 deletions(-) diff --git a/examples/leader_latch.rs b/examples/leader_latch.rs index a3f63d4bb..5a65f18af 100644 --- a/examples/leader_latch.rs +++ b/examples/leader_latch.rs @@ -28,7 +28,7 @@ fn main() { let zk_urls = zk_server_urls(); log::info!("connecting to {}", zk_urls); - let zk = ZooKeeper::connect(&*zk_urls, Duration::from_millis(2500), NoopWatcher).unwrap(); + let zk = ZooKeeper::connect(&*zk_urls, Duration::from_millis(2500), NoopWatcher, None).unwrap(); let id = Uuid::new_v4().to_string(); log::info!("starting host with id: {:?}", id); diff --git a/examples/persistent_watches.rs b/examples/persistent_watches.rs index 00496041f..25581542d 100644 --- a/examples/persistent_watches.rs +++ b/examples/persistent_watches.rs @@ -34,11 +34,11 @@ fn zk_example() { let root = format!("/example-{}", uuid::Uuid::new_v4()); let modifying_zk = - ZooKeeper::connect(&*zk_urls, Duration::from_secs(15), LoggingWatcher).unwrap(); + ZooKeeper::connect(&*zk_urls, Duration::from_secs(15), LoggingWatcher, None).unwrap(); let recursive_watch_zk = - ZooKeeper::connect(&*zk_urls, Duration::from_secs(15), LoggingWatcher).unwrap(); + ZooKeeper::connect(&*zk_urls, Duration::from_secs(15), LoggingWatcher, None).unwrap(); let persistent_watch_zk = - ZooKeeper::connect(&*zk_urls, Duration::from_secs(15), LoggingWatcher).unwrap(); + ZooKeeper::connect(&*zk_urls, Duration::from_secs(15), LoggingWatcher, None).unwrap(); // Creating separate clients to show the example where modifications to the nodes // take place in a different session than our own. @@ -47,8 +47,10 @@ fn zk_example() { // Also separate clients per type of watch as there is a bug when creating multiple type watchers in the same // path in the same session // https://issues.apache.org/jira/browse/ZOOKEEPER-4466 - recursive_watch_zk.add_listener(|zk_state| println!("New recursive watch ZkState is {:?}", zk_state)); - persistent_watch_zk.add_listener(|zk_state| println!("New peristent watch ZkState is {:?}", zk_state)); + recursive_watch_zk + .add_listener(|zk_state| println!("New recursive watch ZkState is {:?}", zk_state)); + persistent_watch_zk + .add_listener(|zk_state| println!("New peristent watch ZkState is {:?}", zk_state)); modifying_zk.ensure_path(&root).unwrap(); @@ -64,7 +66,9 @@ fn zk_example() { }) .unwrap(); - println!("press c to add and modify child, e to edit the watched node, anything else to proceed"); + println!( + "press c to add and modify child, e to edit the watched node, anything else to proceed" + ); let stdin = io::stdin(); let inputs = stdin.lock().lines(); let mut incr = 0; @@ -82,28 +86,18 @@ fn zk_example() { ) .unwrap(); modifying_zk - .set_data( - &child_path, - b"new-data".to_vec(), - None, - ) - .unwrap(); - modifying_zk - .delete(&child_path, None) + .set_data(&child_path, b"new-data".to_vec(), None) .unwrap(); - }, + modifying_zk.delete(&child_path, None).unwrap(); + } "e" => { modifying_zk - .set_data( - &root, - format!("new-data-{incr}").into_bytes(), - None, - ) + .set_data(&root, format!("new-data-{incr}").into_bytes(), None) .unwrap(); } other => { println!("received {other}"); - break + break; } } } diff --git a/examples/queue_consume.rs b/examples/queue_consume.rs index b459250d2..dbfbeff36 100644 --- a/examples/queue_consume.rs +++ b/examples/queue_consume.rs @@ -26,17 +26,18 @@ fn main() { let zk_urls = zk_server_urls(); log::info!("connecting to {}", zk_urls); - let zk = ZooKeeper::connect(&*zk_urls, Duration::from_millis(2500), NoopWatcher).unwrap(); + let zk = ZooKeeper::connect(&*zk_urls, Duration::from_millis(2500), NoopWatcher, None).unwrap(); let queue = ZkQueue::new(Arc::new(zk), "/testing2".to_string()).unwrap(); println!("waiting for a message"); let msg = queue.take(); if msg.is_err() { - eprint!("unable to listen for message. error: {}", msg.err().unwrap().to_string()) + eprint!( + "unable to listen for message. error: {}", + msg.err().unwrap().to_string() + ) } else { println!("got {:?}", String::from_utf8(msg.unwrap()).unwrap()); - } - -} \ No newline at end of file +} diff --git a/examples/queue_producer.rs b/examples/queue_producer.rs index 43e73aaf0..526338c64 100644 --- a/examples/queue_producer.rs +++ b/examples/queue_producer.rs @@ -26,13 +26,11 @@ fn main() { let zk_urls = zk_server_urls(); log::info!("connecting to {}", zk_urls); - let zk = ZooKeeper::connect(&*zk_urls, Duration::from_millis(2500), NoopWatcher).unwrap(); + let zk = ZooKeeper::connect(&*zk_urls, Duration::from_millis(2500), NoopWatcher, None).unwrap(); let queue = ZkQueue::new(Arc::new(zk), "/testing2".to_string()).unwrap(); - let message = "Hello World"; let op = queue.offer(Vec::from(message.as_bytes())); println!("{:?}", op); - -} \ No newline at end of file +} diff --git a/examples/zookeeper_example.rs b/examples/zookeeper_example.rs index dfbefbe53..b03c75e45 100644 --- a/examples/zookeeper_example.rs +++ b/examples/zookeeper_example.rs @@ -4,14 +4,14 @@ extern crate zookeeper; extern crate log; extern crate env_logger; +use std::env; use std::io; +use std::sync::mpsc; use std::sync::Arc; -use std::time::Duration; use std::thread; -use std::env; -use std::sync::mpsc; -use zookeeper::{Acl, CreateMode, Watcher, WatchedEvent, ZooKeeper}; +use std::time::Duration; use zookeeper::recipes::cache::PathChildrenCache; +use zookeeper::{Acl, CreateMode, WatchedEvent, Watcher, ZooKeeper}; struct LoggingWatcher; impl Watcher for LoggingWatcher { @@ -28,12 +28,11 @@ fn zk_server_urls() -> String { } } - fn zk_example() { let zk_urls = zk_server_urls(); println!("connecting to {}", zk_urls); - let zk = ZooKeeper::connect(&*zk_urls, Duration::from_secs(15), LoggingWatcher).unwrap(); + let zk = ZooKeeper::connect(&*zk_urls, Duration::from_secs(15), LoggingWatcher, None).unwrap(); zk.add_listener(|zk_state| println!("New ZkState is {:?}", zk_state)); @@ -43,10 +42,12 @@ fn zk_example() { println!("authenticated -> {:?}", auth); - let path = zk.create("/test", - vec![1, 2], - Acl::open_unsafe().clone(), - CreateMode::Ephemeral); + let path = zk.create( + "/test", + vec![1, 2], + Acl::open_unsafe().clone(), + CreateMode::Ephemeral, + ); println!("created -> {:?}", path); diff --git a/src/io.rs b/src/io.rs index 9a6c4bbe0..d6dde1374 100644 --- a/src/io.rs +++ b/src/io.rs @@ -83,6 +83,7 @@ pub struct ZkIo { timeout_ms: u64, ping_timeout_duration: Duration, conn_timeout_duration: Duration, + keepalive: Option, watch_sender: mpsc::Sender, conn_resp: ConnectResponse, zxid: i64, @@ -100,14 +101,18 @@ impl ZkIo { ping_timeout_duration: Duration, watch_sender: mpsc::Sender, state_listeners: ListenerSet, + keepalive: Option, ) -> ZkIo { trace!("ZkIo::new"); let timeout_ms = ping_timeout_duration.as_secs() * 1000 + ping_timeout_duration.subsec_nanos() as u64 / 1000000; let (tx, rx) = channel(); + let sock = TcpStream::connect(&addrs[0]).unwrap(); // TODO I need a socket here, sorry. + sock.set_keepalive(keepalive).unwrap(); + let mut zkio = ZkIo { - sock: TcpStream::connect(&addrs[0]).unwrap(), // TODO I need a socket here, sorry. + sock, state: ZkState::Connecting, hosts: Hosts::new(addrs), buffer: VecDeque::new(), @@ -119,6 +124,7 @@ impl ZkIo { conn_timeout: None, ping_timeout_duration: ping_timeout_duration, conn_timeout_duration: Duration::from_secs(2), + keepalive, timeout_ms: timeout_ms, watch_sender: watch_sender, conn_resp: ConnectResponse::initial(timeout_ms), @@ -273,7 +279,7 @@ impl ZkIo { .send(WatchMessage::RemoveWatch(w.path, w.watcher_type)) .unwrap(), (_, Some(w)) => self.watch_sender.send(WatchMessage::Watch(w)).unwrap(), - (_, None) => {}, + (_, None) => {} } } @@ -335,7 +341,12 @@ impl ZkIo { let host = self.hosts.get(); info!("Connecting to new server {:?}", host); self.sock = match TcpStream::connect(host) { - Ok(sock) => sock, + Ok(sock) => { + if let Err(e) = sock.set_keepalive(self.keepalive) { + error!("Failed to set tcp-keepalive: {e}"); + } + sock + } Err(e) => { error!("Failed to connect {:?}: {:?}", host, e); continue; diff --git a/src/zookeeper.rs b/src/zookeeper.rs index a23ad6b76..50f1dbffe 100644 --- a/src/zookeeper.rs +++ b/src/zookeeper.rs @@ -1,11 +1,10 @@ use acl::*; use consts::*; use data::*; -use proto::*; use io::ZkIo; use listeners::{ListenerSet, Subscription}; use mio_extras::channel::Sender as MioSender; -use watch::{Watch, Watcher, ZkWatch}; +use proto::*; use std::convert::From; use std::net::{SocketAddr, ToSocketAddrs}; use std::result; @@ -13,9 +12,9 @@ use std::string::ToString; use std::sync::atomic::{AtomicIsize, Ordering}; use std::sync::mpsc::{sync_channel, SyncSender}; use std::sync::Mutex; -use std::time::Duration; use std::thread; - +use std::time::Duration; +use watch::{Watch, Watcher, ZkWatch}; /// Value returned from potentially-error operations. pub type ZkResult = result::Result; @@ -42,7 +41,8 @@ pub struct ZooKeeper { impl ZooKeeper { fn zk_thread(name: &str, task: F) -> ZkResult> - where F: FnOnce() + Send + 'static + where + F: FnOnce() + Send + 'static, { thread::Builder::new() .name(name.to_owned()) @@ -61,10 +61,15 @@ impl ZooKeeper { /// - `timeout`: session timeout -- how long should a client go without receiving communication /// from a server before considering it connection loss? /// - `watcher`: a watcher object to be notified of connection state changes. - pub fn connect(connect_string: &str, timeout: Duration, watcher: W) -> ZkResult - where W: Watcher + 'static + pub fn connect( + connect_string: &str, + timeout: Duration, + watcher: W, + keepalive: Option, + ) -> ZkResult + where + W: Watcher + 'static, { - let (addrs, chroot) = try!(Self::parse_connect_string(connect_string)); debug!("Initiating connection to {}", connect_string); @@ -72,7 +77,7 @@ impl ZooKeeper { let (watch, watch_sender) = ZkWatch::new(watcher, chroot.clone()); let listeners = ListenerSet::::new(); let listeners1 = listeners.clone(); - let io = ZkIo::new(addrs.clone(), timeout, watch_sender, listeners1); + let io = ZkIo::new(addrs.clone(), timeout, watch_sender, listeners1, keepalive); let sender = io.sender(); try!(Self::zk_thread("event", move || watch.run().unwrap())); @@ -90,24 +95,20 @@ impl ZooKeeper { fn parse_connect_string(connect_string: &str) -> ZkResult<(Vec, Option)> { let (chroot, end) = match connect_string.find('/') { - Some(start) => { - match &connect_string[start..connect_string.len()] { - "" | "/" => (None, start), - chroot => (Some(try!(Self::validate_path(chroot)).to_owned()), start), - } - } + Some(start) => match &connect_string[start..connect_string.len()] { + "" | "/" => (None, start), + chroot => (Some(try!(Self::validate_path(chroot)).to_owned()), start), + }, None => (None, connect_string.len()), }; let mut addrs = Vec::new(); for addr_str in connect_string[..end].split(',') { let addr = match addr_str.trim().to_socket_addrs() { - Ok(mut addrs) => { - match addrs.nth(0) { - Some(addr) => addr, - None => return Err(ZkError::BadArguments), - } - } + Ok(mut addrs) => match addrs.nth(0) { + Some(addr) => addr, + None => return Err(ZkError::BadArguments), + }, Err(_) => return Err(ZkError::BadArguments), }; addrs.push(addr); @@ -120,12 +121,13 @@ impl ZooKeeper { self.xid.fetch_add(1, Ordering::Relaxed) as i32 } - fn request(&self, - opcode: OpCode, - xid: i32, - req: Req, - watch: Option) - -> ZkResult { + fn request( + &self, + opcode: OpCode, + xid: i32, + req: Req, + watch: Option, + ) -> ZkResult { trace!("request opcode={:?} xid={:?}", opcode, xid); let rh = RequestHeader { xid: xid, @@ -156,11 +158,10 @@ impl ZooKeeper { })); match response.header.err { - 0 => { - Ok(try!(ReadFrom::read_from(&mut response.data) - .map_err(|_| ZkError::MarshallingError))) - } - e => Err(ZkError::from(e)) + 0 => Ok(try!( + ReadFrom::read_from(&mut response.data).map_err(|_| ZkError::MarshallingError) + )), + e => Err(ZkError::from(e)), } } @@ -179,12 +180,10 @@ impl ZooKeeper { fn path(&self, path: &str) -> ZkResult { match self.chroot { - Some(ref chroot) => { - match path { - "/" => Ok(chroot.clone()), - path => Ok(chroot.clone() + try!(Self::validate_path(path))), - } - } + Some(ref chroot) => match path { + "/" => Ok(chroot.clone()), + path => Ok(chroot.clone() + try!(Self::validate_path(path))), + }, None => Ok(try!(Self::validate_path(path)).to_owned()), } } @@ -236,12 +235,13 @@ impl ZooKeeper { /// /// The maximum allowable size of the data array is 1 MiB (1,048,576 bytes). Arrays larger than /// this will return `Err(ZkError::BadArguments)`. - pub fn create(&self, - path: &str, - data: Vec, - acl: Vec, - mode: CreateMode) - -> ZkResult { + pub fn create( + &self, + path: &str, + data: Vec, + acl: Vec, + mode: CreateMode, + ) -> ZkResult { trace!("ZooKeeper::create"); let req = CreateRequest { path: self.path(path)?, @@ -305,10 +305,7 @@ impl ZooKeeper { /// /// Similar to `exists`, but sets an explicit `Watcher` instead of relying on the client's base /// `Watcher`. - pub fn exists_w(&self, - path: &str, - watcher: W) - -> ZkResult> { + pub fn exists_w(&self, path: &str, watcher: W) -> ZkResult> { trace!("ZooKeeper::exists_w"); let req = ExistsRequest { path: try!(self.path(path)), @@ -321,10 +318,12 @@ impl ZooKeeper { watcher: Box::new(watcher), }; - match self.request::(OpCode::Exists, - self.xid(), - req, - Some(watch)) { + match self.request::( + OpCode::Exists, + self.xid(), + req, + Some(watch), + ) { Ok(response) => Ok(Some(response.stat)), Err(ZkError::NoNode) => Ok(None), Err(e) => Err(e), @@ -337,7 +336,9 @@ impl ZooKeeper { /// If no node with the given path exists, `Err(ZkError::NoNode)` will be returned. pub fn get_acl(&self, path: &str) -> ZkResult<(Vec, Stat)> { trace!("ZooKeeper::get_acl"); - let req = GetAclRequest { path: try!(self.path(path)) }; + let req = GetAclRequest { + path: try!(self.path(path)), + }; let response: GetAclResponse = try!(self.request(OpCode::GetAcl, self.xid(), req, None)); @@ -385,10 +386,8 @@ impl ZooKeeper { watch: watch, }; - let response: GetChildrenResponse = try!(self.request(OpCode::GetChildren, - self.xid(), - req, - None)); + let response: GetChildrenResponse = + try!(self.request(OpCode::GetChildren, self.xid(), req, None)); Ok(response.children) } @@ -397,10 +396,11 @@ impl ZooKeeper { /// /// Similar to `get_children`, but sets an explicit `Watcher` instead of relying on the client's /// base `Watcher`. - pub fn get_children_w(&self, - path: &str, - watcher: W) - -> ZkResult> { + pub fn get_children_w( + &self, + path: &str, + watcher: W, + ) -> ZkResult> { trace!("ZooKeeper::get_children_w"); let req = GetChildrenRequest { path: try!(self.path(path)), @@ -413,10 +413,8 @@ impl ZooKeeper { watcher: Box::new(watcher), }; - let response: GetChildrenResponse = try!(self.request(OpCode::GetChildren, - self.xid(), - req, - Some(watch))); + let response: GetChildrenResponse = + try!(self.request(OpCode::GetChildren, self.xid(), req, Some(watch))); Ok(response.children) } @@ -445,10 +443,11 @@ impl ZooKeeper { /// /// Similar to `get_data`, but sets an explicit `Watcher` instead of relying on the client's /// base `Watcher`. - pub fn get_data_w(&self, - path: &str, - watcher: W) - -> ZkResult<(Vec, Stat)> { + pub fn get_data_w( + &self, + path: &str, + watcher: W, + ) -> ZkResult<(Vec, Stat)> { trace!("ZooKeeper::get_data_w"); let req = GetDataRequest { path: try!(self.path(path)), @@ -461,10 +460,8 @@ impl ZooKeeper { watcher: Box::new(watcher), }; - let response: GetDataResponse = try!(self.request(OpCode::GetData, - self.xid(), - req, - Some(watch))); + let response: GetDataResponse = + try!(self.request(OpCode::GetData, self.xid(), req, Some(watch))); Ok(response.data_stat) } @@ -501,12 +498,12 @@ impl ZooKeeper { /// watch types can be set with this method. Only the modes available /// in WatchMode can be set with this method. /// Requires Zookeeper 3.6.0 - pub fn add_watch(&self, - path: &str, - mode: AddWatchMode, - watcher: W) - -> ZkResult<()> { - + pub fn add_watch( + &self, + path: &str, + mode: AddWatchMode, + watcher: W, + ) -> ZkResult<()> { let req = AddWatchRequest { path: try!(self.path(path)), mode: mode, @@ -523,11 +520,7 @@ impl ZooKeeper { } /// Remove watches of a given type for a path. - pub fn remove_watches(&self, - path: &str, - watcher_type: WatcherType) - -> ZkResult<()> { - + pub fn remove_watches(&self, path: &str, watcher_type: WatcherType) -> ZkResult<()> { let req = RemoveWatchesRequest { path: try!(self.path(path)), watcher_type: watcher_type, @@ -546,9 +539,10 @@ impl ZooKeeper { /// Adds a state change `Listener`, which will be notified of changes to the client's `ZkState`. /// A unique identifier is returned, which is used in `remove_listener` to un-subscribe. - pub fn add_listener(&self, - listener: Listener) - -> Subscription { + pub fn add_listener( + &self, + listener: Listener, + ) -> Subscription { trace!("ZooKeeper::add_listener"); self.listeners.subscribe(listener) } @@ -589,30 +583,48 @@ mod tests { use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; let (addrs, chroot) = ZooKeeper::parse_connect_string("127.0.0.1:2181,::1:2181/mesos") - .ok() - .expect("Parse 1"); - assert_eq!(addrs, - vec![SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 2181)), - SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1), - 2181, - 0, - 0))]); + .ok() + .expect("Parse 1"); + assert_eq!( + addrs, + vec![ + SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), 2181)), + SocketAddr::V6(SocketAddrV6::new( + Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1), + 2181, + 0, + 0 + )) + ] + ); assert_eq!(chroot, Some("/mesos".to_owned())); - let (addrs, chroot) = ZooKeeper::parse_connect_string("::1:2181").ok().expect("Parse 2"); - assert_eq!(addrs, - vec![SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1), - 2181, - 0, - 0))]); + let (addrs, chroot) = ZooKeeper::parse_connect_string("::1:2181") + .ok() + .expect("Parse 2"); + assert_eq!( + addrs, + vec![SocketAddr::V6(SocketAddrV6::new( + Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1), + 2181, + 0, + 0 + ))] + ); assert_eq!(chroot, None); - let (addrs, chroot) = ZooKeeper::parse_connect_string("::1:2181/").ok().expect("Parse 3"); - assert_eq!(addrs, - vec![SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1), - 2181, - 0, - 0))]); + let (addrs, chroot) = ZooKeeper::parse_connect_string("::1:2181/") + .ok() + .expect("Parse 3"); + assert_eq!( + addrs, + vec![SocketAddr::V6(SocketAddrV6::new( + Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1), + 2181, + 0, + 0 + ))] + ); assert_eq!(chroot, None); } diff --git a/tests/test_cache.rs b/tests/test_cache.rs index 5a7b1608e..63fdbd582 100644 --- a/tests/test_cache.rs +++ b/tests/test_cache.rs @@ -1,13 +1,12 @@ +use zookeeper::recipes::cache::PathChildrenCache; use zookeeper::CreateMode::*; use zookeeper::{Acl, WatchedEvent, ZooKeeper, ZooKeeperExt}; -use zookeeper::recipes::cache::PathChildrenCache; use ZkCluster; +use env_logger; use std::sync::Arc; use std::time::Duration; -use env_logger; - #[test] fn path_children_cache_test() { @@ -17,15 +16,38 @@ fn path_children_cache_test() { let cluster = ZkCluster::start(1); // Connect to the test cluster - let zk = Arc::new(ZooKeeper::connect(&cluster.connect_string, - Duration::from_secs(30), - |event: WatchedEvent| info!("{:?}", event)) - .unwrap()); + let zk = Arc::new( + ZooKeeper::connect( + &cluster.connect_string, + Duration::from_secs(30), + |event: WatchedEvent| info!("{:?}", event), + None, + ) + .unwrap(), + ); zk.ensure_path("/cache").unwrap(); - zk.create("/cache/a", vec![1, 4], Acl::open_unsafe().clone(), Ephemeral).unwrap(); - zk.create("/cache/b", vec![2, 4], Acl::open_unsafe().clone(), Ephemeral).unwrap(); - zk.create("/cache/c", vec![3, 4], Acl::open_unsafe().clone(), Ephemeral).unwrap(); + zk.create( + "/cache/a", + vec![1, 4], + Acl::open_unsafe().clone(), + Ephemeral, + ) + .unwrap(); + zk.create( + "/cache/b", + vec![2, 4], + Acl::open_unsafe().clone(), + Ephemeral, + ) + .unwrap(); + zk.create( + "/cache/c", + vec![3, 4], + Acl::open_unsafe().clone(), + Ephemeral, + ) + .unwrap(); let path_children_cache = Arc::new(PathChildrenCache::new(zk, "/cache").unwrap()); diff --git a/tests/test_leader.rs b/tests/test_leader.rs index 56d2a936e..17eba133a 100644 --- a/tests/test_leader.rs +++ b/tests/test_leader.rs @@ -1,7 +1,7 @@ +use env_logger; +use std::{sync::Arc, thread, time::Duration}; use uuid::Uuid; use zookeeper::{recipes::leader::LeaderLatch, ZkResult, ZooKeeper}; -use env_logger; -use std::{sync::Arc, time::Duration, thread}; use ZkCluster; #[test] @@ -14,6 +14,7 @@ fn leader_latch_test() -> ZkResult<()> { &cluster.connect_string, Duration::from_secs(30), |_ev| {}, + None, )?); let id1 = Uuid::new_v4().to_string(); @@ -58,6 +59,7 @@ fn leader_latch_test_disconnect() -> ZkResult<()> { &cluster.connect_string, Duration::from_secs(30), |_ev| {}, + None, )?); let id = Uuid::new_v4().to_string(); diff --git a/tests/test_persistent_recursive_watch.rs b/tests/test_persistent_recursive_watch.rs index 02c8857b8..30826e0a5 100644 --- a/tests/test_persistent_recursive_watch.rs +++ b/tests/test_persistent_recursive_watch.rs @@ -1,4 +1,4 @@ -use zookeeper::{AddWatchMode, Acl, CreateMode, WatcherType}; +use zookeeper::{Acl, AddWatchMode, CreateMode, WatcherType}; use zookeeper::{WatchedEvent, ZooKeeper, ZooKeeperExt}; use ZkCluster; @@ -18,6 +18,7 @@ fn persistent_watch_receives_more_than_one_message_on_modifications() { &cluster.connect_string, Duration::from_secs(30), |event: WatchedEvent| info!("{:?}", event), + None, ) .unwrap(); @@ -25,18 +26,25 @@ fn persistent_watch_receives_more_than_one_message_on_modifications() { &cluster.connect_string, Duration::from_secs(30), |event: WatchedEvent| info!("{:?}", event), + None, ) .unwrap(); zk_modifier.ensure_path("/base").unwrap(); let (snd, rcv) = mpsc::channel::<()>(); - zk_watcher.add_watch("/base", AddWatchMode::Persistent, move |_| { - snd.send(()).unwrap(); - }).unwrap(); - zk_modifier.set_data("/base", b"hello1".to_vec(), None).unwrap(); + zk_watcher + .add_watch("/base", AddWatchMode::Persistent, move |_| { + snd.send(()).unwrap(); + }) + .unwrap(); + zk_modifier + .set_data("/base", b"hello1".to_vec(), None) + .unwrap(); rcv.recv_timeout(Duration::from_millis(100)).unwrap(); - zk_modifier.set_data("/base", b"hello2".to_vec(), None).unwrap(); + zk_modifier + .set_data("/base", b"hello2".to_vec(), None) + .unwrap(); rcv.recv_timeout(Duration::from_millis(100)).unwrap(); } @@ -51,6 +59,7 @@ fn persistent_watch_does_not_receive_children_changes() { &cluster.connect_string, Duration::from_secs(30), |event: WatchedEvent| info!("{:?}", event), + None, ) .unwrap(); @@ -58,6 +67,7 @@ fn persistent_watch_does_not_receive_children_changes() { &cluster.connect_string, Duration::from_secs(30), |event: WatchedEvent| info!("{:?}", event), + None, ) .unwrap(); @@ -65,12 +75,18 @@ fn persistent_watch_does_not_receive_children_changes() { zk_modifier.ensure_path("/base/child").unwrap(); let (snd, rcv) = mpsc::channel::<()>(); - zk_watcher.add_watch("/base", AddWatchMode::Persistent, move |_| { - snd.send(()).unwrap(); - }).unwrap(); - zk_modifier.set_data("/base", b"hello1".to_vec(), None).unwrap(); + zk_watcher + .add_watch("/base", AddWatchMode::Persistent, move |_| { + snd.send(()).unwrap(); + }) + .unwrap(); + zk_modifier + .set_data("/base", b"hello1".to_vec(), None) + .unwrap(); rcv.recv_timeout(Duration::from_millis(100)).unwrap(); - zk_modifier.set_data("/base/child", b"hello2".to_vec(), None).unwrap(); + zk_modifier + .set_data("/base/child", b"hello2".to_vec(), None) + .unwrap(); if rcv.recv_timeout(Duration::from_millis(100)).is_ok() { panic!("received unexpected event for child"); } @@ -87,6 +103,7 @@ fn persistent_recursive_watch_stops_receiving_updates_when_removed() { &cluster.connect_string, Duration::from_secs(30), |event: WatchedEvent| info!("{:?}", event), + None, ) .unwrap(); @@ -94,19 +111,38 @@ fn persistent_recursive_watch_stops_receiving_updates_when_removed() { &cluster.connect_string, Duration::from_secs(30), |event: WatchedEvent| info!("{:?}", event), + None, ) .unwrap(); zk_modifier.ensure_path("/base").unwrap(); let (snd, rcv) = mpsc::channel::<()>(); - zk_watcher.add_watch("/base", AddWatchMode::PersistentRecursive, move |_| { - snd.send(()).unwrap(); - }).unwrap(); - zk_modifier.create("/base/child1", b"hello2".to_vec(), Acl::open_unsafe().clone(), CreateMode::Persistent).unwrap(); + zk_watcher + .add_watch("/base", AddWatchMode::PersistentRecursive, move |_| { + snd.send(()).unwrap(); + }) + .unwrap(); + zk_modifier + .create( + "/base/child1", + b"hello2".to_vec(), + Acl::open_unsafe().clone(), + CreateMode::Persistent, + ) + .unwrap(); rcv.recv_timeout(Duration::from_millis(100)).unwrap(); - zk_watcher.remove_watches("/base", WatcherType::Any).unwrap(); - zk_modifier.create("/base/child2", b"hello2".to_vec(), Acl::open_unsafe().clone(), CreateMode::Persistent).unwrap(); + zk_watcher + .remove_watches("/base", WatcherType::Any) + .unwrap(); + zk_modifier + .create( + "/base/child2", + b"hello2".to_vec(), + Acl::open_unsafe().clone(), + CreateMode::Persistent, + ) + .unwrap(); if rcv.recv_timeout(Duration::from_millis(100)).is_ok() { panic!("received unexpected event for child"); } @@ -123,6 +159,7 @@ fn persistent_recursive_watch_receive_children_changes() { &cluster.connect_string, Duration::from_secs(30), |event: WatchedEvent| info!("{:?}", event), + None, ) .unwrap(); @@ -130,17 +167,34 @@ fn persistent_recursive_watch_receive_children_changes() { &cluster.connect_string, Duration::from_secs(30), |event: WatchedEvent| info!("{:?}", event), + None, ) .unwrap(); zk_modifier.ensure_path("/base").unwrap(); let (snd, rcv) = mpsc::channel::<()>(); - zk_watcher.add_watch("/base", AddWatchMode::PersistentRecursive, move |_| { - snd.send(()).unwrap(); - }).unwrap(); - zk_modifier.create("/base/child1", b"hello2".to_vec(), Acl::open_unsafe().clone(), CreateMode::Persistent).unwrap(); + zk_watcher + .add_watch("/base", AddWatchMode::PersistentRecursive, move |_| { + snd.send(()).unwrap(); + }) + .unwrap(); + zk_modifier + .create( + "/base/child1", + b"hello2".to_vec(), + Acl::open_unsafe().clone(), + CreateMode::Persistent, + ) + .unwrap(); rcv.recv_timeout(Duration::from_millis(100)).unwrap(); - zk_modifier.create("/base/child2", b"hello2".to_vec(), Acl::open_unsafe().clone(), CreateMode::Persistent).unwrap(); + zk_modifier + .create( + "/base/child2", + b"hello2".to_vec(), + Acl::open_unsafe().clone(), + CreateMode::Persistent, + ) + .unwrap(); rcv.recv_timeout(Duration::from_millis(100)).unwrap(); } diff --git a/tests/test_recursive.rs b/tests/test_recursive.rs index e7b22c4e8..b859e55c9 100644 --- a/tests/test_recursive.rs +++ b/tests/test_recursive.rs @@ -15,7 +15,9 @@ fn get_children_recursive_test() { &cluster.connect_string, Duration::from_secs(30), |_: WatchedEvent| {}, - ).unwrap(); + None, + ) + .unwrap(); let tree = vec![ "/root/a/1", @@ -35,7 +37,8 @@ fn get_children_recursive_test() { let children = zk.get_children_recursive("/root").unwrap(); for path in tree { - for (i, _) in path.chars() + for (i, _) in path + .chars() .chain(once('/')) .enumerate() .skip(1) @@ -56,7 +59,9 @@ fn get_children_recursive_invalid_path_test() { &cluster.connect_string, Duration::from_secs(30), |_: WatchedEvent| {}, - ).unwrap(); + None, + ) + .unwrap(); let result = zk.get_children_recursive("/bad"); assert_eq!(result, Err(ZkError::NoNode)) @@ -72,7 +77,9 @@ fn get_children_recursive_only_root_test() { &cluster.connect_string, Duration::from_secs(30), |_: WatchedEvent| {}, - ).unwrap(); + None, + ) + .unwrap(); let root = "/root"; zk.ensure_path(root).unwrap(); @@ -90,7 +97,9 @@ fn delete_recursive_test() { &cluster.connect_string, Duration::from_secs(30), |_: WatchedEvent| {}, - ).unwrap(); + None, + ) + .unwrap(); let tree = vec![ "/root/a/1", @@ -123,7 +132,9 @@ fn delete_recursive_invalid_path_test() { &cluster.connect_string, Duration::from_secs(30), |_: WatchedEvent| {}, - ).unwrap(); + None, + ) + .unwrap(); let result = zk.delete_recursive("/bad"); assert_eq!(result, Err(ZkError::NoNode)) diff --git a/tests/test_zk.rs b/tests/test_zk.rs index da09c7c3c..def526904 100644 --- a/tests/test_zk.rs +++ b/tests/test_zk.rs @@ -1,14 +1,13 @@ -use zookeeper::{Acl, CreateMode, WatchedEvent, ZooKeeper}; use zookeeper::KeeperState; +use zookeeper::{Acl, CreateMode, WatchedEvent, ZooKeeper}; use ZkCluster; -use std::sync::Arc; +use env_logger; use std::sync::atomic::{AtomicUsize, Ordering}; -use std::time::Duration; +use std::sync::Arc; use std::thread; -use env_logger; - +use std::time::Duration; #[test] fn zk_test() { @@ -21,33 +20,34 @@ fn zk_test() { let disconnects_watcher = disconnects.clone(); // Connect to the test cluster - let zk = ZooKeeper::connect(&cluster.connect_string, - Duration::from_secs(5), - move |event: WatchedEvent| { - info!("{:?}", event); - if event.keeper_state == KeeperState::Disconnected { - disconnects_watcher.fetch_add(1, Ordering::Relaxed); - } - }) - .unwrap(); - + let zk = ZooKeeper::connect( + &cluster.connect_string, + Duration::from_secs(5), + move |event: WatchedEvent| { + info!("{:?}", event); + if event.keeper_state == KeeperState::Disconnected { + disconnects_watcher.fetch_add(1, Ordering::Relaxed); + } + }, + None, + ) + .unwrap(); // Do the tests - let create = zk.create("/test", - vec![8, 8], - Acl::open_unsafe().clone(), - CreateMode::Ephemeral); + let create = zk.create( + "/test", + vec![8, 8], + Acl::open_unsafe().clone(), + CreateMode::Ephemeral, + ); assert_eq!(create.ok(), Some("/test".to_owned())); - let exists = zk.exists("/test", true); assert!(exists.is_ok()); - // Check that during inactivity, pinging keeps alive the connection thread::sleep(Duration::from_secs(8)); - // Set/Get Big-Data(tm) let data = vec![7; 1024 * 1000]; let set_data = zk.set_data("/test", data.clone(), None); @@ -64,12 +64,12 @@ fn zk_test() { let delete = zk.delete("/test", None); assert!(delete.is_ok()); - let mut sorted_children = children.unwrap(); sorted_children.sort(); - assert_eq!(sorted_children, - vec!["test".to_owned(), "zookeeper".to_owned()]); - + assert_eq!( + sorted_children, + vec!["test".to_owned(), "zookeeper".to_owned()] + ); // Let's see what happens when the connected server goes down assert_eq!(disconnects.load(Ordering::Relaxed), 0); @@ -81,14 +81,12 @@ fn zk_test() { // TODO once `manual` events are possible // assert_eq!(disconnects.load(Ordering::Relaxed), 1); - // After closing the client all operations return Err zk.close().unwrap(); let exists = zk.exists("/test", true); assert!(exists.is_err()); - // Close the whole cluster cluster.shutdown(); }