Skip to content

Commit

Permalink
feat: lock free feature resolution
Browse files Browse the repository at this point in the history
Redesign the way data flows through Edge. Previously, we had thread locks on our data sources, which was impacting the response time of the application. This moves everything to be in memory cached with lazy persistence in the background and reloading the state on application startup. This means the hot path is now lock free.

---------

Co-authored-by: sighphyre <[email protected]>
  • Loading branch information
Christopher Kolstad and sighphyre authored Mar 7, 2023
1 parent dd2a9ec commit a263dca
Show file tree
Hide file tree
Showing 33 changed files with 1,755 additions and 1,585 deletions.
266 changes: 147 additions & 119 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 1 addition & 2 deletions benchmarks/clientfeaturesendpoint.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,5 @@ export const options = {
};

export default function () {
http.get('http://edge:3063/api/client/features', { 'headers': { 'Authorization': `${__ENV.TOKEN}` } });
sleep(0.2);
http.get('http://127.0.0.1:3063/api/client/features', { 'headers': { 'Authorization': `${__ENV.TOKEN}` } });
}
23 changes: 23 additions & 0 deletions benchmarks/proxyHtmlReport.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { htmlReport } from "https://raw.githubusercontent.com/benc-uk/k6-reporter/main/dist/bundle.js";
import http from 'k6/http';

import { sleep } from 'k6';

export const options = {
duration: '10s',
vus: 50,
thresholds: {
http_req_failed: ['rate<0.01'],
http_req_duration: ['p(95)<10'] // (95th percentile should be < 10 ms)
}
};

export function handleSummary(data) {
return {
"summary.html": htmlReport(data),
};
}

export default function () {
http.get('http://127.0.0.1:3063/api/proxy', { 'headers': { 'Authorization': `${__ENV.TOKEN}` } });
}
3 changes: 1 addition & 2 deletions benchmarks/proxyendpoint.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,5 @@ export const options = {
};

export default function () {
http.get('http://edge:3063/api/proxy', { 'headers': { 'Authorization': `${__ENV.TOKEN}` } });
sleep(0.2);
http.get('http://127.0.0.1:3063/api/proxy', { 'headers': { 'Authorization': `${__ENV.TOKEN}` } });
}
10 changes: 5 additions & 5 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ rustls-pemfile = "1.0.2"
serde = {version = "1.0.152", features = ["derive"]}
serde_json = "1.0.93"
shadow-rs = "0.21.0"
tokio = {version = "1.25.0", features = ["macros", "rt-multi-thread", "tracing"]}
tokio = {version = "1.25.0", features = ["macros", "rt-multi-thread", "tracing", "fs"]}
tracing = {version = "0.1.37", features = ["log"]}
tracing-subscriber = {version = "0.3.16", features = ["json", "env-filter"]}
ulid = "1.0.0"
unleash-types = {version = "0.8.3", features = ["openapi", "hashes"]}
unleash-yggdrasil = "0.4.5"
utoipa = { version = "3", features = ["actix_extras", "chrono"]}
utoipa-swagger-ui = { version = "3", features = ["actix-web"] }
unleash-types = {version = "0.9.0", features = ["openapi", "hashes"]}
unleash-yggdrasil = "0.5.3"
utoipa = {version = "3", features = ["actix_extras", "chrono"]}
utoipa-swagger-ui = {version = "3", features = ["actix-web"]}
[dev-dependencies]
actix-http = "3.3.0"
actix-http-test = "3.1.0"
Expand Down
91 changes: 38 additions & 53 deletions server/src/auth/token_validator.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use crate::error::EdgeError;
use crate::http::unleash_client::UnleashClient;
use crate::types::{EdgeResult, EdgeSink, EdgeSource, EdgeToken, ValidateTokensRequest};
use crate::persistence::EdgePersistence;
use crate::types::{EdgeResult, EdgeToken, ValidateTokensRequest};
use std::sync::Arc;
use unleash_types::Merge;

use dashmap::DashMap;
use unleash_types::Upsert;
#[derive(Clone)]
pub struct TokenValidator {
pub unleash_client: Arc<UnleashClient>,
pub edge_source: Arc<dyn EdgeSource>,
pub edge_sink: Arc<dyn EdgeSink>,
pub token_cache: Arc<DashMap<String, EdgeToken>>,
pub persistence: Option<Arc<dyn EdgePersistence>>,
}

impl TokenValidator {
Expand All @@ -23,10 +26,14 @@ impl TokenValidator {
if tokens_with_valid_format.is_empty() {
Err(EdgeError::TokenParseError)
} else {
let mut tokens = vec![];
let mut tokens: Vec<EdgeToken> = vec![];
for token in tokens_with_valid_format {
let known_data = self.edge_source.get_token(token.token.clone()).await?;
tokens.push(known_data.unwrap_or(token));
let owned_token = self
.token_cache
.get(&token.token.clone())
.map(|t| t.value().clone())
.unwrap_or_else(|| token.clone());
tokens.push(owned_token);
}
Ok(tokens.into_iter().partition(|t| t.token_type.is_none()))
}
Expand Down Expand Up @@ -75,23 +82,27 @@ impl TokenValidator {
}
})
.collect();
self.edge_sink.sink_tokens(tokens_to_sink.clone()).await?;
Ok(tokens_to_sink.merge(known_tokens))
tokens_to_sink.iter().for_each(|t| {
self.token_cache.insert(t.token.clone(), t.clone());
});
let updated_tokens = tokens_to_sink.upsert(known_tokens);
if let Some(persist) = self.persistence.clone() {
let _ = persist.save_tokens(updated_tokens.clone()).await;
}
Ok(updated_tokens)
}
}
}

#[cfg(test)]
mod tests {
use crate::data_sources::memory_provider::MemoryProvider;
use crate::data_sources::repository::{DataSource, DataSourceFacade};
use crate::types::{EdgeSink, EdgeSource, EdgeToken, TokenType, TokenValidationStatus};
use super::TokenValidator;
use crate::types::{EdgeToken, TokenType, TokenValidationStatus};
use actix_http::HttpService;
use actix_http_test::{test_server, TestServer};
use actix_service::map_config;

use actix_web::{dev::AppConfig, web, App, HttpResponse};
use chrono::Duration;
use dashmap::DashMap;
use serde::{Deserialize, Serialize};
use std::sync::Arc;

Expand Down Expand Up @@ -131,28 +142,16 @@ mod tests {

#[tokio::test]
pub async fn can_validate_tokens() {
let test_provider = Arc::new(MemoryProvider::default());
let facade = Arc::new(DataSourceFacade {
features_refresh_interval: Some(Duration::seconds(1)),
token_source: test_provider.clone(),
feature_source: test_provider.clone(),
feature_sink: test_provider.clone(),
token_sink: test_provider.clone(),
});

let sink: Arc<dyn EdgeSink> = facade.clone();
let source: Arc<dyn EdgeSource> = facade.clone();

let srv = test_validation_server().await;
let unleash_client =
crate::http::unleash_client::UnleashClient::new(srv.url("/").as_str(), None)
.expect("Couldn't build client");

let validation_holder = super::TokenValidator {
let validation_holder = TokenValidator {
unleash_client: Arc::new(unleash_client),
edge_source: source,
edge_sink: sink,
token_cache: Arc::new(DashMap::default()),
persistence: None,
};

let tokens_to_validate = vec![
"*:development.1d38eefdd7bf72676122b008dcf330f2f2aa2f3031438e1b7e8f0d1f".into(),
"*:production.abcdef1234567890".into(),
Expand All @@ -161,41 +160,27 @@ mod tests {
.register_tokens(tokens_to_validate)
.await
.expect("Couldn't register tokens");
let known_tokens = test_provider
.get_tokens()
.await
.expect("Couldn't get tokens");
assert_eq!(known_tokens.len(), 2);
assert!(known_tokens.iter().any(|t| t.token
assert_eq!(validation_holder.token_cache.len(), 2);
assert!(validation_holder.token_cache.iter().any(|t| t.value().token
== "*:development.1d38eefdd7bf72676122b008dcf330f2f2aa2f3031438e1b7e8f0d1f"
&& t.status == TokenValidationStatus::Validated));
assert!(known_tokens
assert!(validation_holder
.token_cache
.iter()
.any(|t| t.token == "*:production.abcdef1234567890"
&& t.status == TokenValidationStatus::Invalid));
.any(|t| t.value().token == "*:production.abcdef1234567890"
&& t.value().status == TokenValidationStatus::Invalid));
}

#[tokio::test]
pub async fn tokens_with_wrong_format_is_not_included() {
let test_provider = Arc::new(MemoryProvider::default());
let facade = Arc::new(DataSourceFacade {
features_refresh_interval: Some(Duration::seconds(1)),
feature_source: test_provider.clone(),
token_source: test_provider.clone(),
feature_sink: test_provider.clone(),
token_sink: test_provider.clone(),
});
let sink: Arc<dyn EdgeSink> = facade.clone();
let source: Arc<dyn EdgeSource> = facade.clone();

let srv = test_validation_server().await;
let unleash_client =
crate::http::unleash_client::UnleashClient::new(srv.url("/").as_str(), None)
.expect("Couldn't build client");
let validation_holder = super::TokenValidator {
let validation_holder = TokenValidator {
unleash_client: Arc::new(unleash_client),
edge_source: source,
edge_sink: sink,
token_cache: Arc::new(DashMap::default()),
persistence: None,
};
let invalid_tokens = vec!["jamesbond".into(), "invalidtoken".into()];
let validated_tokens = validation_holder.register_tokens(invalid_tokens).await;
Expand Down
Loading

0 comments on commit a263dca

Please sign in to comment.