Skip to content

Commit

Permalink
Replace the UDP replication protocol with gRPC for the distributed st…
Browse files Browse the repository at this point in the history
…ore..

Signed-off-by: Hiram Chirino <[email protected]>
  • Loading branch information
chirino committed May 21, 2024
1 parent 5368e72 commit 1587550
Show file tree
Hide file tree
Showing 10 changed files with 1,000 additions and 119 deletions.
17 changes: 14 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions limitador-server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,8 @@ pub struct InMemoryStorageConfiguration {
pub struct DistributedStorageConfiguration {
pub name: String,
pub cache_size: Option<u64>,
pub local: String,
pub broadcast: String,
pub listen_address: String,
pub peer_urls: Vec<String>,
}

#[derive(PartialEq, Eq, Debug)]
Expand Down
28 changes: 18 additions & 10 deletions limitador-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use std::{env, process};

#[cfg(feature = "distributed_storage")]
use clap::parser::ValuesRef;

use sysinfo::{MemoryRefreshKind, RefreshKind, System};
use thiserror::Error;
use tokio::runtime::Handle;
Expand Down Expand Up @@ -165,8 +169,8 @@ impl Limiter {
let storage = DistributedInMemoryStorage::new(
cfg.name,
cfg.cache_size.or_else(guess_cache_size).unwrap(),
cfg.local,
Some(cfg.broadcast),
cfg.listen_address,
cfg.peer_urls,
);
let rate_limiter_builder =
RateLimiterBuilder::with_storage(Storage::with_counter_storage(Box::new(storage)));
Expand Down Expand Up @@ -604,18 +608,18 @@ fn create_config() -> (Configuration, &'static str) {
.help("Unique name to identify this Limitador instance"),
)
.arg(
Arg::new("LOCAL")
Arg::new("LISTEN_ADDRESS")
.action(ArgAction::Set)
.required(true)
.display_order(2)
.help("Local IP:PORT to send datagrams from"),
.help("Local IP:PORT to listen on for replication"),
)
.arg(
Arg::new("BROADCAST")
.action(ArgAction::Set)
.required(true)
Arg::new("PEER_URLS")
.action(ArgAction::Append)
.required(false)
.display_order(3)
.help("Broadcast IP:PORT to send datagrams to"),
.help("A replication peer url that this instance will connect to"),
)
.arg(
Arg::new("CACHE_SIZE")
Expand Down Expand Up @@ -697,8 +701,12 @@ fn create_config() -> (Configuration, &'static str) {
Some(("distributed", sub)) => {
StorageConfiguration::Distributed(DistributedStorageConfiguration {
name: sub.get_one::<String>("NAME").unwrap().to_owned(),
local: sub.get_one::<String>("LOCAL").unwrap().to_owned(),
broadcast: sub.get_one::<String>("BROADCAST").unwrap().to_owned(),
listen_address: sub.get_one::<String>("LISTEN_ADDRESS").unwrap().to_owned(),
peer_urls: sub
.get_many::<String>("PEER_URLS")
.unwrap_or(ValuesRef::default())
.map(|x| x.to_owned())
.collect(),
cache_size: sub.get_one::<u64>("CACHE_SIZE").copied(),
})
}
Expand Down
15 changes: 14 additions & 1 deletion limitador/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ edition = "2021"
[features]
default = ["disk_storage", "redis_storage"]
disk_storage = ["rocksdb"]
distributed_storage = []
distributed_storage = ["tokio", "tokio-stream", "h2", "base64", "uuid", "tonic", "tonic-reflection", "prost", "prost-types"]
redis_storage = ["redis", "r2d2", "tokio"]
lenient_conditions = []

Expand Down Expand Up @@ -49,6 +49,16 @@ tokio = { version = "1", optional = true, features = [
"time",
] }

base64 = { version = "0.22", optional = true }
tokio-stream = { version = "0.1", optional = true }
h2 = { version = "0.3", optional = true }
uuid = { version = "1.8.0", features = ["v4", "fast-rng"], optional = true }
tonic = { version = "0.11", optional = true }
tonic-reflection = { version = "0.11", optional = true }
prost = { version = "0.12", optional = true }
prost-types = { version = "0.12", optional = true }
time = "0.3.36"

[dev-dependencies]
serial_test = "3.0"
criterion = { version = "0.5.1", features = ["html_reports", "async_tokio"] }
Expand All @@ -62,6 +72,9 @@ tokio = { version = "1", features = [
"time",
] }

[build-dependencies]
tonic-build = "0.11"

[[bench]]
name = "bench"
path = "benches/bench.rs"
Expand Down
6 changes: 3 additions & 3 deletions limitador/benches/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ fn bench_distributed(c: &mut Criterion) {
"test_node".to_owned(),
10_000,
"127.0.0.1:0".to_owned(),
None,
vec![],
));
bench_is_rate_limited(b, test_scenario, storage);
})
Expand All @@ -162,7 +162,7 @@ fn bench_distributed(c: &mut Criterion) {
"test_node".to_owned(),
10_000,
"127.0.0.1:0".to_owned(),
None,
vec![],
));
bench_update_counters(b, test_scenario, storage);
})
Expand All @@ -177,7 +177,7 @@ fn bench_distributed(c: &mut Criterion) {
"test_node".to_owned(),
10_000,
"127.0.0.1:0".to_owned(),
None,
vec![],
));
bench_check_rate_limited_and_update(b, test_scenario, storage);
})
Expand Down
20 changes: 20 additions & 0 deletions limitador/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use std::error::Error;
use std::path::Path;

fn main() -> Result<(), Box<dyn Error>> {
generate_protobuf()
}

fn generate_protobuf() -> Result<(), Box<dyn Error>> {
let proto_path: &Path = "proto/distributed.proto".as_ref();

let proto_dir = proto_path
.parent()
.expect("proto file should reside in a directory");

tonic_build::configure()
.protoc_arg("--experimental_allow_proto3_optional")
.compile(&[proto_path], &[proto_dir])?;

Ok(())
}
61 changes: 61 additions & 0 deletions limitador/proto/distributed.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
syntax = "proto3";

package limitador.service.distributed.v1;

// A packet defines all the types of messages that can be sent between replication peers.
message Packet {
oneof message {
// the Hello message is used to introduce a peer to another peer. It is the first message sent by a peer.
Hello hello = 1;
// the MembershipUpdate message is used to gossip about the other peers in the cluster:
// 1) sent after the first Hello message
// 2) sent when the membership state changes
MembershipUpdate membership_update = 2;
// the Ping message is used to request a pong from the other peer.
Ping ping = 3;
// the Pong message is used to respond to a ping.
Pong pong = 4;
// the CounterUpdate message is used to send counter updates.
CounterUpdate counter_update = 5;
}
}

// this is the first packet sent by a peer to another peer.
message Hello {
// the peer id of the sending peer
string sender_peer_id = 1;
// urls that the sending peer thinks it can be reached at.
repeated string sender_urls = 2;
// url the session initiator used to connect to the receiver peer.
optional string receiver_url = 3;
}

// A request to a peer to respond with a Pong message.
message Ping {}

// Pong is the response to a Ping and Hello message.
message Pong {
// the current time at of the peer in milliseconds of UTC time since Unix epoch 1970-01-01T00:00:00Z.
uint64 current_time = 3;
}

message MembershipUpdate {
repeated Peer peers = 1;
}

message Peer {
string peer_id = 1;
uint32 latency = 2; // the round trip latency to the peer in milliseconds.
repeated string urls = 3; // url that can be used to connect to the peer.
}

message CounterUpdate {
bytes key = 1;
map<string, uint64> values = 2;
uint64 expires_at = 3;
}

// Replication is the limitador replication service.
service Replication {
rpc Stream(stream Packet) returns (stream Packet) {}
}
Loading

0 comments on commit 1587550

Please sign in to comment.