Skip to content

Commit

Permalink
feat(si-layer-cache): adds an experimental layer cache
Browse files Browse the repository at this point in the history
This adds an experimental layer cache to the new engine, which will
allow us to start rebuilding the underlying write/read path for the new
engine.
  • Loading branch information
adamhjk committed Mar 5, 2024
1 parent 013f1f6 commit 36ca0df
Show file tree
Hide file tree
Showing 20 changed files with 2,271 additions and 33 deletions.
450 changes: 431 additions & 19 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ members = [
"lib/si-data-nats",
"lib/si-data-pg",
"lib/si-hash",
"lib/si-layer-cache",
"lib/si-pkg",
"lib/si-posthog-rs",
"lib/si-settings",
Expand Down Expand Up @@ -81,6 +82,7 @@ comfy-table = { version = "7.0.1", features = [
config = { version = "0.13.4", default-features = false, features = ["toml"] }
console = "0.15.7"
convert_case = "0.6.0"
criterion = { version = "0.3", features = [ "async_tokio" ] }
crossbeam-channel = "0.5.8"
deadpool = { version = "0.10.0", features = ["rt_tokio_1"] }
deadpool-postgres = "0.12.1"
Expand Down Expand Up @@ -114,6 +116,7 @@ jwt-simple = { version = "0.12.6", default-features = false, features = [
"pure-rust",
] }
lazy_static = "1.4.0"
moka = { version = "0.12.5", features = [ "future" ] }
names = { version = "0.14.0", default-features = false }
nix = { version = "0.27.1", features = ["process", "signal"] }
nkeys = "0.4.0"
Expand Down Expand Up @@ -164,6 +167,7 @@ serde_json = { version = "1.0.96", features = ["preserve_order"] }
serde_url_params = "0.2.1"
serde_with = "3.0.0"
serde_yaml = "0.9.21"
sled = "0.34.7"
sodiumoxide = "0.2.7"
stream-cancel = "0.8.1"
strum = { version = "0.25.0", features = ["derive"] }
Expand Down
32 changes: 32 additions & 0 deletions lib/si-layer-cache/BUCK
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
load("@prelude-si//:macros.bzl", "rust_library")

rust_library(
name = "si-layer-cache",
deps = [
"//third-party/rust:serde",
"//third-party/rust:thiserror",
"//third-party/rust:moka",
"//third-party/rust:sled",
"//third-party/rust:lazy_static",
],
srcs = glob([
"src/**/*.rs",
]),
)

rust_test(
name = "test-integration",
deps = [
":si-layer-cache",
"//third-party/rust:tokio",
"//third-party/rust:tempfile",
"//third-party/rust:criterion",
],
srcs = glob([
"tests/**/*.rs",
]),
crate_root = "tests/integration.rs",
env = {
"CARGO_PKG_NAME": "integration",
},
)
22 changes: 22 additions & 0 deletions lib/si-layer-cache/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
[package]
name = "si-layer-cache"
version = "0.1.0"
edition = "2021"
rust-version = "1.64"
publish = false

[dependencies]
serde = { workspace = true }
thiserror = { workspace = true }
moka = { workspace = true }
sled = { workspace = true }
lazy_static = { workspace = true }

[dev_dependencies]
tokio = { workspace = true }
tempfile = { workspace = true }
criterion = { workspace = true }

[[bench]]
name = "insert_speed"
harness = false
88 changes: 88 additions & 0 deletions lib/si-layer-cache/benches/insert_speed.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
use criterion::{black_box, criterion_group, criterion_main, Criterion};
use si_layer_cache::{CacheType, LayerCache};

use tokio::runtime;

const ASCII_LOWER: [u8; 26] = [
b'a', b'b', b'c', b'd', b'e', b'f', b'g', b'h', b'i', b'j', b'k', b'l', b'm', b'n', b'o', b'p',
b'q', b'r', b's', b't', b'u', b'v', b'w', b'x', b'y', b'z',
];

const ONE_MB: usize = 1_000_000;

pub async fn fresh_cache_count(objects: &[Vec<u8>], count: usize) {
let tempdir = tempfile::TempDir::new_in("/home/adam/benches").expect("cannotc reate tempdir");
let layer_cache = LayerCache::new(tempdir).expect("cannot create layer cache");
for i in 0..count {
layer_cache
.insert(&CacheType::Object, [ASCII_LOWER[i]], objects[i].clone())
.await
.expect("cannot insert into cache");
}
}

pub fn insert_speed_1_mb_object(c: &mut Criterion) {
let rt = runtime::Builder::new_multi_thread()
.build()
.expect("cannot make tokio runtime");
let mut objects: Vec<Vec<u8>> = Vec::with_capacity(ASCII_LOWER.len());
for letter in ASCII_LOWER.iter() {
let object = vec![*letter;ONE_MB];
objects.push(object);
}

c.bench_function("Cold Cache insert speed 1 1mb object", |b| {
b.to_async(&rt)
.iter(|| fresh_cache_count(black_box(&objects[..]), 1))
});

c.bench_function("Cold Cache insert speed 26 1mb objects", |b| {
b.to_async(&rt)
.iter(|| fresh_cache_count(black_box(&objects[..]), ASCII_LOWER.len()))
});
}

pub fn hot_read_1_mb_object(c: &mut Criterion) {
let layer_cache = LayerCache::new("/home/adam/benches/.hot_read_1_mb_object")
.expect("cannot create layer cache");
let rt = runtime::Builder::new_multi_thread()
.build()
.expect("cannot make tokio runtime");
let object = vec![b'a';ONE_MB];
let _r = rt.block_on(layer_cache.insert(&CacheType::Object, b"a", object));

c.bench_function("Hot Cache speed get one 1mb object", |b| {
b.to_async(&rt)
.iter(|| layer_cache.get(&CacheType::Object, [b'a']))
});
}

pub async fn do_cold_memory_hot_disk(key: &[u8], layer_cache: &LayerCache) {
let _r = layer_cache.get(&CacheType::Object, key).await;
layer_cache.memory_cache.object_cache.remove(key).await;
}

pub fn hot_disk_cold_memory_read_1_mb_object(c: &mut Criterion) {
let layer_cache = LayerCache::new("/home/adam/benches/.disk_cache_no_memory_1_mb_object")
.expect("cannot create layer cache");
let rt = runtime::Builder::new_multi_thread()
.build()
.expect("cannot make tokio runtime");
let letter = b'a';
let object = vec![letter;ONE_MB];
let _r = rt.block_on(layer_cache.insert(&CacheType::Object, b"a", object));
let key = [letter];

c.bench_function("Hot Disk cold Memory cache speed get one 1mb object", |b| {
b.to_async(&rt)
.iter(|| do_cold_memory_hot_disk(black_box(&key), black_box(&layer_cache)))
});
}

criterion_group!(
benches,
insert_speed_1_mb_object,
hot_read_1_mb_object,
hot_disk_cold_memory_read_1_mb_object
);
criterion_main!(benches);
65 changes: 65 additions & 0 deletions lib/si-layer-cache/src/disk_cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use std::path::Path;

use sled::{self, IVec};

use crate::{error::LayerCacheResult, CacheType};

#[derive(Debug)]
pub struct DiskCache {
pub db: sled::Db,
pub object_tree: sled::Tree,
pub graph_tree: sled::Tree,
}

impl DiskCache {
pub fn new(path: impl AsRef<Path>) -> LayerCacheResult<DiskCache> {
let db = sled::open(path)?;
let object_tree = db.open_tree([CacheType::Object as u8])?;
let graph_tree = db.open_tree([CacheType::Graph as u8])?;
Ok(DiskCache {
db,
object_tree,
graph_tree,
})
}

fn get_tree(&self, cache_type: &CacheType) -> &sled::Tree {
match cache_type {
CacheType::Graph => &self.graph_tree,
CacheType::Object => &self.object_tree,
}
}

pub fn get(
&self,
cache_type: &CacheType,
key: impl AsRef<[u8]>,
) -> LayerCacheResult<Option<IVec>> {
let tree = self.get_tree(cache_type);
let result = tree.get(key)?;
Ok(result)
}

pub fn contains_key(
&self,
cache_type: &CacheType,
key: impl AsRef<[u8]>,
) -> LayerCacheResult<bool> {
let tree = self.get_tree(cache_type);
let key = key.as_ref();
let result = tree.contains_key(key)?;
Ok(result)
}

pub fn insert(
&self,
cache_type: &CacheType,
key: impl AsRef<[u8]>,
value: impl Into<Vec<u8>>,
) -> LayerCacheResult<()> {
let tree = self.get_tree(cache_type);
let key = key.as_ref();
let _result = tree.insert(key, value.into())?;
Ok(())
}
}
9 changes: 9 additions & 0 deletions lib/si-layer-cache/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
use thiserror::Error;

#[derive(Error, Debug)]
pub enum LayerCacheError {
#[error(transparent)]
SledError(#[from] sled::Error),
}

pub type LayerCacheResult<T> = Result<T, LayerCacheError>;
134 changes: 134 additions & 0 deletions lib/si-layer-cache/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
//! A fast in-memory, network aware, layered write-through cache for System Initiative.
//!
//! It should have 3 layers of caching:
//!
//! * Moka, an in-memory LRU style cache.
//! * Sled, an on-disk memory-mapped cache, to keep more data locally than can be held in memory
//! * Postgres, our final persistant storage layer.
//!
//! When a write is requested, the following happens:
//!
//! * The data is written first to a Moka cache
//! * Then written to Sled for persistent storage
//! * The data is then published to a nats topic layer-cache.workspaceId
//! * Any remote si-layer-cache instances listen to this topic, and populate their local caches
//! * Postgres gets written to eventually by a 'persister' process that writes to PG from the write
//! stream
//!
//! When a read is requested, the following happen:
//!
//! * The data is read from the moka cache
//! * On a miss, the data is read from sled, inserted into Moka, and returned to the user
//! * On a miss, the data is read from Postgres, inserted into sled, inserted into moka, and
//! returned to the user
//!
//! The postgres bits remain unimplemented! :)
pub mod disk_cache;
pub mod error;
pub mod memory_cache;

use std::fmt;
use std::path::Path;

use memory_cache::{CacheKey, CacheValueRaw};

use crate::disk_cache::DiskCache;
use crate::error::LayerCacheResult;
use crate::memory_cache::{CacheKeyRef, CacheValue, MemoryCache};

#[derive(Eq, PartialEq, PartialOrd, Ord, Hash, Debug)]
pub enum CacheType {
Object = 1,
Graph,
}

impl fmt::Display for CacheType {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
CacheType::Object => write!(f, "object"),
CacheType::Graph => write!(f, "graph"),
}
}
}

pub struct LayerCache {
pub memory_cache: MemoryCache,
pub disk_cache: DiskCache,
}

impl LayerCache {
pub fn new(path: impl AsRef<Path>) -> LayerCacheResult<LayerCache> {
let memory_cache = MemoryCache::new();
let disk_cache = DiskCache::new(path)?;
Ok(LayerCache {
memory_cache,
disk_cache,
})
}

#[inline]
pub async fn get(
&self,
cache_type: &CacheType,
key: impl AsRef<CacheKeyRef>,
) -> LayerCacheResult<Option<CacheValue>> {
let key = key.as_ref();
let memory_value = self.memory_cache.get(cache_type, key).await;
if memory_value.is_some() {
Ok(memory_value)
} else {
let maybe_value = self.disk_cache.get(cache_type, key)?;
match maybe_value {
Some(value) => {
let d: Vec<u8> = value.as_ref().into();
self.memory_cache
.insert(cache_type, Vec::from(key), d)
.await;
Ok(self.memory_cache.get(cache_type, key).await)
}
None => Ok(None),
}
}
}

#[inline]
pub async fn insert(
&self,
cache_type: &CacheType,
key: impl Into<CacheKey>,
value: impl Into<CacheValueRaw>,
) -> LayerCacheResult<()> {
let key = key.into();
let in_memory = self.memory_cache.contains_key(cache_type, &key);
let on_disk = self.disk_cache.contains_key(cache_type, &key)?;

match (in_memory, on_disk) {
// In memory and on disk
(true, true) => Ok(()),
// Neither on memory or on disk
(false, false) => {
let value = value.into();
self.memory_cache
.insert(cache_type, key.clone(), value.clone())
.await;
self.disk_cache.insert(cache_type, key, value)?;
Ok(())
}
// Not in memory, but on disk - we can write, becasue objects are immutable
(false, true) => {
let value = value.into();
self.memory_cache
.insert(cache_type, key.clone(), value)
.await;
Ok(())
}
// In memory, but not on disk
(true, false) => {
let value = value.into();
self.disk_cache.insert(cache_type, key, value)?;
Ok(())
}
}
}
}
Loading

0 comments on commit 36ca0df

Please sign in to comment.