Skip to content

Commit

Permalink
merge: #3382
Browse files Browse the repository at this point in the history
3382: feat(si-layer-cache): adds an experimental layer cache r=adamhjk a=adamhjk

This adds an experimental layer cache to the new engine, which will allow us to start rebuilding the underlying write/read path for the new engine.

Co-authored-by: Adam Jacob <[email protected]>
  • Loading branch information
si-bors-ng[bot] and adamhjk authored Mar 5, 2024
2 parents 08b8bc3 + 36ca0df commit 57b3990
Show file tree
Hide file tree
Showing 20 changed files with 2,271 additions and 33 deletions.
450 changes: 431 additions & 19 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ members = [
"lib/si-data-nats",
"lib/si-data-pg",
"lib/si-hash",
"lib/si-layer-cache",
"lib/si-pkg",
"lib/si-posthog-rs",
"lib/si-settings",
Expand Down Expand Up @@ -81,6 +82,7 @@ comfy-table = { version = "7.0.1", features = [
config = { version = "0.13.4", default-features = false, features = ["toml"] }
console = "0.15.7"
convert_case = "0.6.0"
criterion = { version = "0.3", features = [ "async_tokio" ] }
crossbeam-channel = "0.5.8"
deadpool = { version = "0.10.0", features = ["rt_tokio_1"] }
deadpool-postgres = "0.12.1"
Expand Down Expand Up @@ -114,6 +116,7 @@ jwt-simple = { version = "0.12.6", default-features = false, features = [
"pure-rust",
] }
lazy_static = "1.4.0"
moka = { version = "0.12.5", features = [ "future" ] }
names = { version = "0.14.0", default-features = false }
nix = { version = "0.27.1", features = ["process", "signal"] }
nkeys = "0.4.0"
Expand Down Expand Up @@ -164,6 +167,7 @@ serde_json = { version = "1.0.96", features = ["preserve_order"] }
serde_url_params = "0.2.1"
serde_with = "3.0.0"
serde_yaml = "0.9.21"
sled = "0.34.7"
sodiumoxide = "0.2.7"
stream-cancel = "0.8.1"
strum = { version = "0.25.0", features = ["derive"] }
Expand Down
32 changes: 32 additions & 0 deletions lib/si-layer-cache/BUCK
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
load("@prelude-si//:macros.bzl", "rust_library")

rust_library(
name = "si-layer-cache",
deps = [
"//third-party/rust:serde",
"//third-party/rust:thiserror",
"//third-party/rust:moka",
"//third-party/rust:sled",
"//third-party/rust:lazy_static",
],
srcs = glob([
"src/**/*.rs",
]),
)

rust_test(
name = "test-integration",
deps = [
":si-layer-cache",
"//third-party/rust:tokio",
"//third-party/rust:tempfile",
"//third-party/rust:criterion",
],
srcs = glob([
"tests/**/*.rs",
]),
crate_root = "tests/integration.rs",
env = {
"CARGO_PKG_NAME": "integration",
},
)
22 changes: 22 additions & 0 deletions lib/si-layer-cache/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[package]
name = "si-layer-cache"
version = "0.1.0"
edition = "2021"
rust-version = "1.64"
publish = false

[dependencies]
serde = { workspace = true }
thiserror = { workspace = true }
moka = { workspace = true }
sled = { workspace = true }
lazy_static = { workspace = true }

[dev_dependencies]
tokio = { workspace = true }
tempfile = { workspace = true }
criterion = { workspace = true }

[[bench]]
name = "insert_speed"
harness = false
88 changes: 88 additions & 0 deletions lib/si-layer-cache/benches/insert_speed.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use si_layer_cache::{CacheType, LayerCache};

use tokio::runtime;

const ASCII_LOWER: [u8; 26] = [
b'a', b'b', b'c', b'd', b'e', b'f', b'g', b'h', b'i', b'j', b'k', b'l', b'm', b'n', b'o', b'p',
b'q', b'r', b's', b't', b'u', b'v', b'w', b'x', b'y', b'z',
];

const ONE_MB: usize = 1_000_000;

pub async fn fresh_cache_count(objects: &[Vec<u8>], count: usize) {
let tempdir = tempfile::TempDir::new_in("/home/adam/benches").expect("cannotc reate tempdir");
let layer_cache = LayerCache::new(tempdir).expect("cannot create layer cache");
for i in 0..count {
layer_cache
.insert(&CacheType::Object, [ASCII_LOWER[i]], objects[i].clone())
.await
.expect("cannot insert into cache");
}
}

pub fn insert_speed_1_mb_object(c: &mut Criterion) {
let rt = runtime::Builder::new_multi_thread()
.build()
.expect("cannot make tokio runtime");
let mut objects: Vec<Vec<u8>> = Vec::with_capacity(ASCII_LOWER.len());
for letter in ASCII_LOWER.iter() {
let object = vec![*letter;ONE_MB];
objects.push(object);
}

c.bench_function("Cold Cache insert speed 1 1mb object", |b| {
b.to_async(&rt)
.iter(|| fresh_cache_count(black_box(&objects[..]), 1))
});

c.bench_function("Cold Cache insert speed 26 1mb objects", |b| {
b.to_async(&rt)
.iter(|| fresh_cache_count(black_box(&objects[..]), ASCII_LOWER.len()))
});
}

pub fn hot_read_1_mb_object(c: &mut Criterion) {
let layer_cache = LayerCache::new("/home/adam/benches/.hot_read_1_mb_object")
.expect("cannot create layer cache");
let rt = runtime::Builder::new_multi_thread()
.build()
.expect("cannot make tokio runtime");
let object = vec![b'a';ONE_MB];
let _r = rt.block_on(layer_cache.insert(&CacheType::Object, b"a", object));

c.bench_function("Hot Cache speed get one 1mb object", |b| {
b.to_async(&rt)
.iter(|| layer_cache.get(&CacheType::Object, [b'a']))
});
}

pub async fn do_cold_memory_hot_disk(key: &[u8], layer_cache: &LayerCache) {
let _r = layer_cache.get(&CacheType::Object, key).await;
layer_cache.memory_cache.object_cache.remove(key).await;
}

pub fn hot_disk_cold_memory_read_1_mb_object(c: &mut Criterion) {
let layer_cache = LayerCache::new("/home/adam/benches/.disk_cache_no_memory_1_mb_object")
.expect("cannot create layer cache");
let rt = runtime::Builder::new_multi_thread()
.build()
.expect("cannot make tokio runtime");
let letter = b'a';
let object = vec![letter;ONE_MB];
let _r = rt.block_on(layer_cache.insert(&CacheType::Object, b"a", object));
let key = [letter];

c.bench_function("Hot Disk cold Memory cache speed get one 1mb object", |b| {
b.to_async(&rt)
.iter(|| do_cold_memory_hot_disk(black_box(&key), black_box(&layer_cache)))
});
}

criterion_group!(
benches,
insert_speed_1_mb_object,
hot_read_1_mb_object,
hot_disk_cold_memory_read_1_mb_object
);
criterion_main!(benches);
65 changes: 65 additions & 0 deletions lib/si-layer-cache/src/disk_cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use std::path::Path;

use sled::{self, IVec};

use crate::{error::LayerCacheResult, CacheType};

#[derive(Debug)]
pub struct DiskCache {
pub db: sled::Db,
pub object_tree: sled::Tree,
pub graph_tree: sled::Tree,
}

impl DiskCache {
pub fn new(path: impl AsRef<Path>) -> LayerCacheResult<DiskCache> {
let db = sled::open(path)?;
let object_tree = db.open_tree([CacheType::Object as u8])?;
let graph_tree = db.open_tree([CacheType::Graph as u8])?;
Ok(DiskCache {
db,
object_tree,
graph_tree,
})
}

fn get_tree(&self, cache_type: &CacheType) -> &sled::Tree {
match cache_type {
CacheType::Graph => &self.graph_tree,
CacheType::Object => &self.object_tree,
}
}

pub fn get(
&self,
cache_type: &CacheType,
key: impl AsRef<[u8]>,
) -> LayerCacheResult<Option<IVec>> {
let tree = self.get_tree(cache_type);
let result = tree.get(key)?;
Ok(result)
}

pub fn contains_key(
&self,
cache_type: &CacheType,
key: impl AsRef<[u8]>,
) -> LayerCacheResult<bool> {
let tree = self.get_tree(cache_type);
let key = key.as_ref();
let result = tree.contains_key(key)?;
Ok(result)
}

pub fn insert(
&self,
cache_type: &CacheType,
key: impl AsRef<[u8]>,
value: impl Into<Vec<u8>>,
) -> LayerCacheResult<()> {
let tree = self.get_tree(cache_type);
let key = key.as_ref();
let _result = tree.insert(key, value.into())?;
Ok(())
}
}
9 changes: 9 additions & 0 deletions lib/si-layer-cache/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
use thiserror::Error;

#[derive(Error, Debug)]
pub enum LayerCacheError {
#[error(transparent)]
SledError(#[from] sled::Error),
}

pub type LayerCacheResult<T> = Result<T, LayerCacheError>;
134 changes: 134 additions & 0 deletions lib/si-layer-cache/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
//! A fast in-memory, network aware, layered write-through cache for System Initiative.
//!
//! It should have 3 layers of caching:
//!
//! * Moka, an in-memory LRU style cache.
//! * Sled, an on-disk memory-mapped cache, to keep more data locally than can be held in memory
//! * Postgres, our final persistant storage layer.
//!
//! When a write is requested, the following happens:
//!
//! * The data is written first to a Moka cache
//! * Then written to Sled for persistent storage
//! * The data is then published to a nats topic layer-cache.workspaceId
//! * Any remote si-layer-cache instances listen to this topic, and populate their local caches
//! * Postgres gets written to eventually by a 'persister' process that writes to PG from the write
//! stream
//!
//! When a read is requested, the following happen:
//!
//! * The data is read from the moka cache
//! * On a miss, the data is read from sled, inserted into Moka, and returned to the user
//! * On a miss, the data is read from Postgres, inserted into sled, inserted into moka, and
//! returned to the user
//!
//! The postgres bits remain unimplemented! :)
pub mod disk_cache;
pub mod error;
pub mod memory_cache;

use std::fmt;
use std::path::Path;

use memory_cache::{CacheKey, CacheValueRaw};

use crate::disk_cache::DiskCache;
use crate::error::LayerCacheResult;
use crate::memory_cache::{CacheKeyRef, CacheValue, MemoryCache};

#[derive(Eq, PartialEq, PartialOrd, Ord, Hash, Debug)]
pub enum CacheType {
Object = 1,
Graph,
}

impl fmt::Display for CacheType {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
CacheType::Object => write!(f, "object"),
CacheType::Graph => write!(f, "graph"),
}
}
}

pub struct LayerCache {
pub memory_cache: MemoryCache,
pub disk_cache: DiskCache,
}

impl LayerCache {
pub fn new(path: impl AsRef<Path>) -> LayerCacheResult<LayerCache> {
let memory_cache = MemoryCache::new();
let disk_cache = DiskCache::new(path)?;
Ok(LayerCache {
memory_cache,
disk_cache,
})
}

#[inline]
pub async fn get(
&self,
cache_type: &CacheType,
key: impl AsRef<CacheKeyRef>,
) -> LayerCacheResult<Option<CacheValue>> {
let key = key.as_ref();
let memory_value = self.memory_cache.get(cache_type, key).await;
if memory_value.is_some() {
Ok(memory_value)
} else {
let maybe_value = self.disk_cache.get(cache_type, key)?;
match maybe_value {
Some(value) => {
let d: Vec<u8> = value.as_ref().into();
self.memory_cache
.insert(cache_type, Vec::from(key), d)
.await;
Ok(self.memory_cache.get(cache_type, key).await)
}
None => Ok(None),
}
}
}

#[inline]
pub async fn insert(
&self,
cache_type: &CacheType,
key: impl Into<CacheKey>,
value: impl Into<CacheValueRaw>,
) -> LayerCacheResult<()> {
let key = key.into();
let in_memory = self.memory_cache.contains_key(cache_type, &key);
let on_disk = self.disk_cache.contains_key(cache_type, &key)?;

match (in_memory, on_disk) {
// In memory and on disk
(true, true) => Ok(()),
// Neither on memory or on disk
(false, false) => {
let value = value.into();
self.memory_cache
.insert(cache_type, key.clone(), value.clone())
.await;
self.disk_cache.insert(cache_type, key, value)?;
Ok(())
}
// Not in memory, but on disk - we can write, becasue objects are immutable
(false, true) => {
let value = value.into();
self.memory_cache
.insert(cache_type, key.clone(), value)
.await;
Ok(())
}
// In memory, but not on disk
(true, false) => {
let value = value.into();
self.disk_cache.insert(cache_type, key, value)?;
Ok(())
}
}
}
}
Loading

0 comments on commit 57b3990

Please sign in to comment.