Skip to content

Commit

Permalink
Support providing an optional id to limits/counters
Browse files Browse the repository at this point in the history
* add a key_for_counters_v2 function that uses the id as the key if set, otherwise uses the previous key encoding strategy.
* updated the distributed store to use key_for_counters_v2.  Since we can’t decode a partial counter from id based keys, we now also keep in memory the Counter in a counter field of the limits map.

Signed-off-by: Hiram Chirino <[email protected]>
  • Loading branch information
chirino committed Jun 12, 2024
1 parent 72c2b12 commit 312369b
Show file tree
Hide file tree
Showing 13 changed files with 343 additions and 166 deletions.
3 changes: 3 additions & 0 deletions limitador-server/src/http_api/request_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub struct CheckAndReportInfo {

#[derive(Debug, Eq, PartialEq, Serialize, Deserialize, Apiv2Schema)]
pub struct Limit {
id: Option<String>,
namespace: String,
max_value: u64,
seconds: u64,
Expand All @@ -29,6 +30,7 @@ pub struct Limit {
impl From<&LimitadorLimit> for Limit {
fn from(ll: &LimitadorLimit) -> Self {
Self {
id: ll.id().clone(),
namespace: ll.namespace().as_ref().to_string(),
max_value: ll.max_value(),
seconds: ll.seconds(),
Expand All @@ -42,6 +44,7 @@ impl From<&LimitadorLimit> for Limit {
impl From<Limit> for LimitadorLimit {
fn from(limit: Limit) -> Self {
let mut limitador_limit = Self::new(
limit.id,
limit.namespace,
limit.max_value,
limit.seconds,
Expand Down
2 changes: 1 addition & 1 deletion limitador/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ lenient_conditions = []
moka = { version = "0.12", features = ["sync"] }
dashmap = "5.5.3"
getrandom = { version = "0.2", features = ["js"] }
serde = { version = "1", features = ["derive"] }
serde = { version = "1", features = ["derive", "rc"] }
postcard = { version = "1.0.4", features = ["use-std"] }
serde_json = "1"
rmp-serde = "1.1.0"
Expand Down
1 change: 1 addition & 0 deletions limitador/benches/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,7 @@ fn generate_test_limits(scenario: &TestScenario) -> (Vec<Limit>, Vec<TestCallPar

for limit_idx in 0..scenario.n_limits_per_ns {
test_limits.push(Limit::new(
None,
namespace.clone(),
u64::MAX,
((limit_idx * 60) + 10) as u64,
Expand Down
5 changes: 5 additions & 0 deletions limitador/src/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub struct Counter {
set_variables: HashMap<String, String>,

remaining: Option<u64>,

expires_in: Option<Duration>,
}

Expand Down Expand Up @@ -72,6 +73,10 @@ impl Counter {
Duration::from_secs(self.limit.seconds())
}

pub fn id(&self) -> &Option<String> {
self.limit.id()
}

pub fn namespace(&self) -> &Namespace {
self.limit.namespace()
}
Expand Down
92 changes: 80 additions & 12 deletions limitador/src/limit.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
use crate::limit::conditions::{ErrorType, Literal, SyntaxError, Token, TokenType};
use serde::{Deserialize, Serialize, Serializer};
use std::cmp::Ordering;
use std::collections::{BTreeSet, HashMap, HashSet};
use std::error::Error;
use std::fmt::{Debug, Display, Formatter};
use std::hash::{Hash, Hasher};

use serde::{Deserialize, Serialize, Serializer};

#[cfg(feature = "lenient_conditions")]
pub use deprecated::check_deprecated_syntax_usages_and_reset;

use crate::limit::conditions::{ErrorType, Literal, SyntaxError, Token, TokenType};

#[cfg(feature = "lenient_conditions")]
mod deprecated {
use std::sync::atomic::{AtomicBool, Ordering};
Expand All @@ -25,9 +30,6 @@ mod deprecated {
}
}

#[cfg(feature = "lenient_conditions")]
pub use deprecated::check_deprecated_syntax_usages_and_reset;

#[derive(Debug, Hash, Eq, PartialEq, Clone, Serialize, Deserialize)]
pub struct Namespace(String);

Expand All @@ -51,6 +53,8 @@ impl From<String> for Namespace {

#[derive(Eq, Debug, Clone, Serialize, Deserialize)]
pub struct Limit {
#[serde(skip_serializing, default)]
id: Option<String>,
namespace: Namespace,
#[serde(skip_serializing, default)]
max_value: u64,
Expand Down Expand Up @@ -307,6 +311,7 @@ where

impl Limit {
pub fn new<N: Into<Namespace>, T: TryInto<Condition>>(
id: Option<String>,
namespace: N,
max_value: u64,
seconds: u64,
Expand All @@ -319,6 +324,7 @@ impl Limit {
{
// the above where-clause is needed in order to call unwrap().
Self {
id: id,

Check failure on line 327 in limitador/src/limit.rs

View workflow job for this annotation

GitHub Actions / Clippy

redundant field names in struct initialization
namespace: namespace.into(),
max_value,
seconds,
Expand All @@ -335,6 +341,10 @@ impl Limit {
&self.namespace
}

pub fn id(&self) -> &Option<String> {
&self.id
}

pub fn max_value(&self) -> u64 {
self.max_value
}
Expand Down Expand Up @@ -821,7 +831,14 @@ mod tests {

#[test]
fn limit_can_have_an_optional_name() {
let mut limit = Limit::new("test_namespace", 10, 60, vec!["x == \"5\""], vec!["y"]);
let mut limit = Limit::new(
None,
"test_namespace",
10,
60,
vec!["x == \"5\""],
vec!["y"],
);
assert!(limit.name.is_none());

let name = "Test Limit";
Expand All @@ -831,7 +848,14 @@ mod tests {

#[test]
fn limit_applies() {
let limit = Limit::new("test_namespace", 10, 60, vec!["x == \"5\""], vec!["y"]);
let limit = Limit::new(
None,
"test_namespace",
10,
60,
vec!["x == \"5\""],
vec!["y"],
);

let mut values: HashMap<String, String> = HashMap::new();
values.insert("x".into(), "5".into());
Expand All @@ -842,7 +866,14 @@ mod tests {

#[test]
fn limit_does_not_apply_when_cond_is_false() {
let limit = Limit::new("test_namespace", 10, 60, vec!["x == \"5\""], vec!["y"]);
let limit = Limit::new(
None,
"test_namespace",
10,
60,
vec!["x == \"5\""],
vec!["y"],
);

let mut values: HashMap<String, String> = HashMap::new();
values.insert("x".into(), "1".into());
Expand All @@ -854,7 +885,7 @@ mod tests {
#[test]
#[cfg(feature = "lenient_conditions")]
fn limit_does_not_apply_when_cond_is_false_deprecated_style() {
let limit = Limit::new("test_namespace", 10, 60, vec!["x == 5"], vec!["y"]);
let limit = Limit::new(None, "test_namespace", 10, 60, vec!["x == 5"], vec!["y"]);

let mut values: HashMap<String, String> = HashMap::new();
values.insert("x".into(), "1".into());
Expand All @@ -864,7 +895,14 @@ mod tests {
assert!(check_deprecated_syntax_usages_and_reset());
assert!(!check_deprecated_syntax_usages_and_reset());

let limit = Limit::new("test_namespace", 10, 60, vec!["x == foobar"], vec!["y"]);
let limit = Limit::new(
None,
"test_namespace",
10,
60,
vec!["x == foobar"],
vec!["y"],
);

let mut values: HashMap<String, String> = HashMap::new();
values.insert("x".into(), "foobar".into());
Expand All @@ -877,7 +915,14 @@ mod tests {

#[test]
fn limit_does_not_apply_when_cond_var_is_not_set() {
let limit = Limit::new("test_namespace", 10, 60, vec!["x == \"5\""], vec!["y"]);
let limit = Limit::new(
None,
"test_namespace",
10,
60,
vec!["x == \"5\""],
vec!["y"],
);

// Notice that "x" is not set
let mut values: HashMap<String, String> = HashMap::new();
Expand All @@ -889,7 +934,14 @@ mod tests {

#[test]
fn limit_does_not_apply_when_var_not_set() {
let limit = Limit::new("test_namespace", 10, 60, vec!["x == \"5\""], vec!["y"]);
let limit = Limit::new(
None,
"test_namespace",
10,
60,
vec!["x == \"5\""],
vec!["y"],
);

// Notice that "y" is not set
let mut values: HashMap<String, String> = HashMap::new();
Expand All @@ -901,6 +953,7 @@ mod tests {
#[test]
fn limit_applies_when_all_its_conditions_apply() {
let limit = Limit::new(
None,
"test_namespace",
10,
60,
Expand All @@ -919,6 +972,7 @@ mod tests {
#[test]
fn limit_does_not_apply_if_one_cond_doesnt() {
let limit = Limit::new(
None,
"test_namespace",
10,
60,
Expand Down Expand Up @@ -998,4 +1052,18 @@ mod tests {
let result = serde_json::to_string(&condition).expect("Should serialize");
assert_eq!(result, r#""foobar == \"ok\"""#.to_string());
}

#[test]
fn limit_id() {
let limit = Limit::new(
Some("test_id".to_string()),
"test_namespace",
10,
60,
vec!["req.method == 'GET'"],
vec!["app_id"],
);

assert_eq!(limit.id().clone(), Some("test_id".to_string()))
}
}
9 changes: 8 additions & 1 deletion limitador/src/storage/disk/rocksdb_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,14 @@ mod tests {
#[test]
fn opens_db_on_disk() {
let namespace = "test_namespace";
let limit = Limit::new(namespace, 1, 2, vec!["req.method == 'GET'"], vec!["app_id"]);
let limit = Limit::new(
None,
namespace,
1,
2,
vec!["req.method == 'GET'"],
vec!["app_id"],
);
let counter = Counter::new(limit, HashMap::default());

let tmp = TempDir::new().expect("We should have a dir!");
Expand Down
27 changes: 17 additions & 10 deletions limitador/src/storage/distributed/grpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::{error::Error, io::ErrorKind, pin::Pin};

use crate::counter::Counter;
use tokio::sync::mpsc::error::TrySendError;
use tokio::sync::mpsc::{Permit, Sender};
use tokio::sync::{broadcast, mpsc, Notify, RwLock};
Expand Down Expand Up @@ -156,9 +157,10 @@ impl Session {
update = udpates_to_send.recv() => {
let update = update.map_err(|_| Status::unknown("broadcast error"))?;
// Multiple updates collapse into a single update for the same key
if !tx_updates_by_key.contains_key(&update.key) {
tx_updates_by_key.insert(update.key.clone(), update.value);
tx_updates_order.push(update.key);
let key = &update.key.clone();
if !tx_updates_by_key.contains_key(key) {
tx_updates_by_key.insert(key.clone(), update);
tx_updates_order.push(key.clone());
notifier.notify_one();
}
}
Expand All @@ -174,7 +176,7 @@ impl Session {

let key = tx_updates_order.remove(0);
let cr_counter_value = tx_updates_by_key.remove(&key).unwrap().clone();
let (expiry, values) = (*cr_counter_value).clone().into_inner();
let (expiry, values) = (*cr_counter_value).value.clone().into_inner();

Check failure on line 179 in limitador/src/storage/distributed/grpc/mod.rs

View workflow job for this annotation

GitHub Actions / Clippy

deref which would be done by auto-deref

// only send the update if it has not expired.
if expiry > SystemTime::now() {
Expand Down Expand Up @@ -437,19 +439,24 @@ type CounterUpdateFn = Pin<Box<dyn Fn(CounterUpdate) + Sync + Send>>;
#[derive(Clone, Debug)]
pub struct CounterEntry {
pub key: Vec<u8>,
pub value: Arc<CrCounterValue<String>>,
pub counter: Counter,
pub value: CrCounterValue<String>,
}

impl CounterEntry {
pub fn new(key: Vec<u8>, value: Arc<CrCounterValue<String>>) -> Self {
Self { key, value }
pub fn new(key: Vec<u8>, counter: Counter, value: CrCounterValue<String>) -> Self {
Self {
key,
counter,
value,
}
}
}

#[derive(Clone)]
struct BrokerState {
id: String,
publisher: broadcast::Sender<CounterEntry>,
publisher: broadcast::Sender<Arc<CounterEntry>>,
on_counter_update: Arc<CounterUpdateFn>,
on_re_sync: Arc<Sender<Sender<Option<CounterUpdate>>>>,
}
Expand All @@ -471,7 +478,7 @@ impl Broker {
on_re_sync: Sender<Sender<Option<CounterUpdate>>>,
) -> Broker {
let (tx, _) = broadcast::channel(16);
let publisher: broadcast::Sender<CounterEntry> = tx;
let publisher: broadcast::Sender<Arc<CounterEntry>> = tx;

Broker {
listen_address,
Expand All @@ -489,7 +496,7 @@ impl Broker {
}
}

pub fn publish(&self, counter_update: CounterEntry) {
pub fn publish(&self, counter_update: Arc<CounterEntry>) {
// ignore the send error, it just means there are no active subscribers
_ = self.broker_state.publisher.send(counter_update);
}
Expand Down
Loading

0 comments on commit 312369b

Please sign in to comment.