Skip to content

Commit

Permalink
finish persistence
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Chi <[email protected]>
  • Loading branch information
skyzh committed Jan 19, 2024
1 parent 13ae8fe commit 6b24d6d
Show file tree
Hide file tree
Showing 9 changed files with 218 additions and 49 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
.vscode/
sync-tmp/
mini-lsm.db/
lsm.db/
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ We are working on a new version of the mini-lsm tutorial that is split into 3 we
| 2.4 | Compaction Strategy - Leveled || | |
| 2.5 | Manifest | 🚧 | | |
| 2.6 | Write-Ahead Log | 🚧 | | |
| 2.7 | Batch Write (and preparations for MVCC) | | | |
| 2.7 | Batch Write + Checksum | | | |
| 3.1 | Timestamp Encoding + Prefix Bloom Filter | | | |
| 3.2 | Snapshot Read | | | |
| 3.3 | Watermark and Garbage Collection | | | |
Expand Down
5 changes: 2 additions & 3 deletions mini-lsm/src/bin/mini_lsm_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::path::PathBuf;
use anyhow::Result;
use bytes::Bytes;
use clap::{Parser, ValueEnum};

use mini_lsm::compact::{
CompactionOptions, LeveledCompactionOptions, SimpleLeveledCompactionOptions,
TieredCompactionOptions,
Expand All @@ -21,11 +20,11 @@ enum CompactionStrategy {
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
#[arg(long, default_value = "mini-lsm.db")]
#[arg(long, default_value = "lsm.db")]
path: PathBuf,
#[arg(long, default_value = "leveled")]
compaction: CompactionStrategy,
#[arg(long)]
#[arg(long, default_value = "true")]
enable_wal: bool,
}

Expand Down
8 changes: 5 additions & 3 deletions mini-lsm/src/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub(crate) enum CompactionController {
}

impl CompactionController {
fn generate_compaction_task(&self, snapshot: &LsmStorageState) -> Option<CompactionTask> {
pub fn generate_compaction_task(&self, snapshot: &LsmStorageState) -> Option<CompactionTask> {
match self {
CompactionController::Leveled(ctrl) => ctrl
.generate_compaction_task(&snapshot)
Expand All @@ -61,7 +61,7 @@ impl CompactionController {
}
}

fn apply_compaction_result(
pub fn apply_compaction_result(
&self,
snapshot: &LsmStorageState,
task: &CompactionTask,
Expand Down Expand Up @@ -247,14 +247,16 @@ impl LsmStorageInner {
assert!(result.is_some());
ssts_to_remove.push(result.unwrap());
}
let mut new_sst_ids = Vec::new();
for file_to_add in sstables {
new_sst_ids.push(file_to_add.sst_id());
let result = snapshot.sstables.insert(file_to_add.sst_id(), file_to_add);
assert!(result.is_none());
}
let mut state = self.state.write();
*state = Arc::new(snapshot);
self.manifest
.add_record(&state_lock, ManifestRecord::Compaction(task))?;
.add_record(&state_lock, ManifestRecord::Compaction(task, new_sst_ids))?;
ssts_to_remove
};
for sst in ssts_to_remove {
Expand Down
128 changes: 99 additions & 29 deletions mini-lsm/src/lsm_storage.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::collections::HashMap;
use std::collections::{BTreeSet, HashMap, HashSet};
use std::fs::File;
use std::ops::Bound;
use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;

use anyhow::Result;
use anyhow::{Context, Result};
use bytes::Bytes;
use parking_lot::{Mutex, RwLock};

Expand All @@ -20,7 +20,7 @@ use crate::iterators::StorageIterator;
use crate::lsm_iterator::{FusedIterator, LsmIterator};
use crate::manifest::{Manifest, ManifestRecord};
use crate::mem_table::{map_bound, MemTable};
use crate::table::{SsTable, SsTableBuilder, SsTableIterator};
use crate::table::{self, FileObject, SsTable, SsTableBuilder, SsTableIterator};

pub type BlockCache = moka::sync::Cache<(usize, usize), Arc<Block>>;

Expand Down Expand Up @@ -103,6 +103,7 @@ impl Drop for MiniLsm {

impl MiniLsm {
pub fn close(&self) -> Result<()> {
self.inner.sync_dir()?;
self.compaction_notifier.send(()).ok();
let mut compaction_thread = self.compaction_thread.lock();
if let Some(compaction_thread) = compaction_thread.take() {
Expand Down Expand Up @@ -157,36 +158,101 @@ impl LsmStorageInner {
}

pub(crate) fn open(path: impl AsRef<Path>, options: LsmStorageOptions) -> Result<Self> {
let mut state = LsmStorageState::create(&options);
let path = path.as_ref();
let mut next_sst_id = 1;
let block_cache = Arc::new(BlockCache::new(1 << 20)); // 4GB block cache,
let manifest;

let compaction_controller = match &options.compaction_options {
CompactionOptions::Leveled(options) => {
CompactionController::Leveled(LeveledCompactionController::new(options.clone()))
}
CompactionOptions::Tiered(options) => {
CompactionController::Tiered(TieredCompactionController::new(options.clone()))
}
CompactionOptions::Simple(options) => CompactionController::Simple(
SimpleLeveledCompactionController::new(options.clone()),
),
CompactionOptions::NoCompaction => CompactionController::NoCompaction,
};

if !path.exists() {
std::fs::create_dir_all(path)?;
}
let mut state = LsmStorageState::create(&options);
if options.enable_wal {
state.memtable = Arc::new(MemTable::create_with_wal(
state.memtable.id(),
Self::path_of_wal_static(path, state.memtable.id()),
)?);
}
std::fs::create_dir_all(path).context("failed to create DB dir")?;
if options.enable_wal {
state.memtable = Arc::new(MemTable::create_with_wal(
state.memtable.id(),
Self::path_of_wal_static(path, state.memtable.id()),
)?);
}
manifest =
Manifest::create(path.join("MANIFEST")).context("failed to create manifest")?;
manifest.add_record_when_init(ManifestRecord::NewMemtable(state.memtable.id()))?;
} else {
let (m, records) = Manifest::recover(path.join("MANIFEST"))?;
let mut memtables = BTreeSet::new();
for record in records {
match record {
ManifestRecord::Flush(sst_id) => {
let res = memtables.remove(&sst_id);
assert!(res, "memtable not exist?");
state.l0_sstables.insert(0, sst_id);
}
ManifestRecord::NewMemtable(x) => {
next_sst_id = x + 1;
memtables.insert(x);
}
ManifestRecord::Compaction(task, output) => {
let (new_state, _) =
compaction_controller.apply_compaction_result(&state, &task, &output);
// TODO: apply remove again
state = new_state;
}
}
}
// recover SSTs
for table_id in state
.l0_sstables
.iter()
.chain(state.levels.iter().map(|(_, files)| files).flatten())
{
let table_id = *table_id;
let sst = SsTable::open(
table_id,
Some(block_cache.clone()),
FileObject::open(&Self::path_of_sst_static(path, table_id))
.context("failed to open SST")?,
)?;
state.sstables.insert(table_id, Arc::new(sst));
}
// recover memtables
if options.enable_wal {
for id in memtables.iter() {
let memtable =
MemTable::recover_from_wal(*id, Self::path_of_wal_static(path, *id))?;
state.imm_memtables.insert(0, Arc::new(memtable));
next_sst_id = *id + 1;
}
state.memtable = Arc::new(MemTable::create_with_wal(
next_sst_id,
Self::path_of_wal_static(path, next_sst_id),
)?);
} else {
state.memtable = Arc::new(MemTable::create(next_sst_id));
}
m.add_record_when_init(ManifestRecord::NewMemtable(state.memtable.id()))?;
next_sst_id += 1;
manifest = m;
};

let storage = Self {
state: Arc::new(RwLock::new(Arc::new(state))),
state_lock: Mutex::new(()),
path: path.to_path_buf(),
block_cache: Arc::new(BlockCache::new(1 << 20)), // 4GB block cache,
next_sst_id: AtomicUsize::new(1),
compaction_controller: match &options.compaction_options {
CompactionOptions::Leveled(options) => {
CompactionController::Leveled(LeveledCompactionController::new(options.clone()))
}
CompactionOptions::Tiered(options) => {
CompactionController::Tiered(TieredCompactionController::new(options.clone()))
}
CompactionOptions::Simple(options) => CompactionController::Simple(
SimpleLeveledCompactionController::new(options.clone()),
),
CompactionOptions::NoCompaction => CompactionController::NoCompaction,
},
manifest: Manifest::create(path.join("MANIFEST"))?,
block_cache,
next_sst_id: AtomicUsize::new(next_sst_id),
compaction_controller,
manifest,
options: options.into(),
};
storage.sync_dir()?;
Expand Down Expand Up @@ -259,8 +325,12 @@ impl LsmStorageInner {
Ok(())
}

pub(crate) fn path_of_sst_static(path: impl AsRef<Path>, id: usize) -> PathBuf {
path.as_ref().join(format!("{:05}.sst", id))
}

pub(crate) fn path_of_sst(&self, id: usize) -> PathBuf {
self.path.join(format!("{:05}.sst", id))
Self::path_of_sst_static(&self.path, id)
}

pub(crate) fn path_of_wal_static(path: impl AsRef<Path>, id: usize) -> PathBuf {
Expand Down Expand Up @@ -303,7 +373,7 @@ impl LsmStorageInner {
old_memtable.sync_wal()?;

self.manifest
.add_record(&state_lock, ManifestRecord::NewWal(memtable_id))?;
.add_record(&state_lock, ManifestRecord::NewMemtable(memtable_id))?;

Ok(())
}
Expand Down
52 changes: 45 additions & 7 deletions mini-lsm/src/manifest.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,74 @@
use std::fs::File;
use std::fs::{File, OpenOptions};
use std::io::{Read, Write};
use std::path::Path;
use std::sync::Arc;

use anyhow::Result;
use anyhow::{Context, Result};
use parking_lot::{Mutex, MutexGuard};
use serde::{Deserialize, Serialize};
use serde_json::Deserializer;

use crate::compact::CompactionTask;

pub struct Manifest {}
pub struct Manifest {
file: Arc<Mutex<File>>,
}

#[derive(Serialize, Deserialize)]
pub enum ManifestRecord {
Flush(usize),
NewWal(usize),
Compaction(CompactionTask),
NewMemtable(usize),
Compaction(CompactionTask, Vec<usize>),
}

impl Manifest {
pub fn create(path: impl AsRef<Path>) -> Result<Self> {
Ok(Self {})
Ok(Self {
file: Arc::new(Mutex::new(
OpenOptions::new()
.read(true)
.create_new(true)
.write(true)
.open(path)
.context("failed to create manifest")?,
)),
})
}

pub fn recover(path: impl AsRef<Path>) -> Result<(Self, Vec<ManifestRecord>)> {
Ok((Self {}, Vec::new()))
let mut file = OpenOptions::new()
.read(true)
.append(true)
.open(path)
.context("failed to recover manifest")?;
let mut buf = Vec::new();
file.read_to_end(&mut buf)?;
let mut stream = Deserializer::from_slice(&buf).into_iter::<ManifestRecord>();
let mut records = Vec::new();
while let Some(x) = stream.next() {
records.push(x?);
}
Ok((
Self {
file: Arc::new(Mutex::new(file)),
},
records,
))
}

pub fn add_record(
&self,
_state_lock_observer: &MutexGuard<()>,
record: ManifestRecord,
) -> Result<()> {
self.add_record_when_init(record)
}

pub fn add_record_when_init(&self, record: ManifestRecord) -> Result<()> {
let mut file = self.file.lock();
let buf = serde_json::to_vec(&record)?;
file.write(&buf)?;
file.sync_all()?;
Ok(())
}
}
8 changes: 7 additions & 1 deletion mini-lsm/src/mem_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,14 @@ impl MemTable {
})
}

/// Create a memtable from WAL
pub fn recover_from_wal(id: usize, path: impl AsRef<Path>) -> Result<Self> {
unimplemented!()
let map = Arc::new(SkipMap::new());
Ok(Self {
id,
wal: Some(Wal::recover(path.as_ref(), &map)?),
map,
})
}

/// Get a value by key.
Expand Down
6 changes: 4 additions & 2 deletions mini-lsm/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,10 @@ impl FileObject {
))
}

pub fn open(_path: &Path) -> Result<Self> {
unimplemented!()
pub fn open(path: &Path) -> Result<Self> {
let file = File::options().read(true).write(false).open(path)?;
let size = file.metadata()?.len();
Ok(FileObject(Some(file), size))
}
}

Expand Down
Loading

0 comments on commit 6b24d6d

Please sign in to comment.