forked from AlexPikalov/cdrs
-
Notifications
You must be signed in to change notification settings - Fork 0
/
server_events.rs
49 lines (40 loc) · 1.65 KB
/
server_events.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
extern crate cdrs;
use std::iter::Iterator;
use std::thread;
use cdrs::authenticators::NoneAuthenticator;
use cdrs::cluster::session::new as new_session;
use cdrs::cluster::{ClusterTcpConfig, NodeTcpConfigBuilder};
use cdrs::compression::Compression;
use cdrs::frame::events::{ChangeType, ServerEvent, SimpleServerEvent, Target};
use cdrs::load_balancing::RoundRobin;
const _ADDR: &'static str = "127.0.0.1:9042";
fn main() {
let node = NodeTcpConfigBuilder::new("127.0.0.1:9042", NoneAuthenticator {}).build();
let cluster_config = ClusterTcpConfig(vec![node]);
let lb = RoundRobin::new();
let no_compression = new_session(&cluster_config, lb).expect("session should be created");
let (listener, stream) = no_compression
.listen(
"127.0.0.1:9042",
NoneAuthenticator {},
vec![SimpleServerEvent::SchemaChange],
)
.expect("listen error");
thread::spawn(move || listener.start(&Compression::None).unwrap());
let new_tables = stream
// inspects all events in a stream
.inspect(|event| println!("inspect event {:?}", event))
// filter by event's type: schema changes
.filter(|event| event == &SimpleServerEvent::SchemaChange)
// filter by event's specific information: new table was added
.filter(|event| match event {
&ServerEvent::SchemaChange(ref event) => {
event.change_type == ChangeType::Created && event.target == Target::Table
}
_ => false,
});
println!("Start listen for server events");
for change in new_tables {
println!("server event {:?}", change);
}
}