Skip to content

Commit

Permalink
Merge pull request #63 from drdo/sqlite-union-limit
Browse files Browse the repository at this point in the history
Add support for more than 500 snapshots in a repository
  • Loading branch information
drdo authored Aug 5, 2024
2 parents 761e8e3 + 33e91d6 commit 1292f41
Show file tree
Hide file tree
Showing 4 changed files with 411 additions and 305 deletions.
81 changes: 66 additions & 15 deletions benches/cache.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::cell::Cell;

use criterion::{black_box, criterion_group, criterion_main, Criterion};
use redu::{cache::tests::*, restic::Snapshot};
use redu::{
cache::{tests::*, Migrator},
restic::Snapshot,
};

pub fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("merge sizetree", |b| {
Expand All @@ -12,10 +15,47 @@ pub fn criterion_benchmark(c: &mut Criterion) {
b.iter(move || sizetree0.take().merge(black_box(sizetree1.take())));
});

c.bench_function("create and save snapshot", |b| {
with_cache_open(|mut cache| {
let foo = Snapshot {
id: "foo".to_string(),
c.bench_function("save snapshot", |b| {
let foo = Snapshot {
id: "foo".to_string(),
time: mk_datetime(2024, 4, 12, 12, 00, 00),
parent: Some("bar".to_string()),
tree: "sometree".to_string(),
paths: vec![
"/home/user".to_string(),
"/etc".to_string(),
"/var".to_string(),
],
hostname: Some("foo.com".to_string()),
username: Some("user".to_string()),
uid: Some(123),
gid: Some(456),
excludes: vec![
".cache".to_string(),
"Cache".to_string(),
"/home/user/Downloads".to_string(),
],
tags: vec!["foo_machine".to_string(), "rewrite".to_string()],
original_id: Some("fefwfwew".to_string()),
program_version: Some("restic 0.16.0".to_string()),
};
b.iter_with_setup(
|| {
let tempfile = Tempfile::new();
let cache =
Migrator::open(&tempfile.0).unwrap().migrate().unwrap();
(tempfile, cache, generate_sizetree(6, 12))
},
|(_tempfile, mut cache, tree)| {
cache.save_snapshot(&foo, tree).unwrap()
},
);
});

c.bench_function("save lots of small snapshots", |b| {
fn mk_snapshot(id: String) -> Snapshot {
Snapshot {
id,
time: mk_datetime(2024, 4, 12, 12, 00, 00),
parent: Some("bar".to_string()),
tree: "sometree".to_string(),
Expand All @@ -36,16 +76,27 @@ pub fn criterion_benchmark(c: &mut Criterion) {
tags: vec!["foo_machine".to_string(), "rewrite".to_string()],
original_id: Some("fefwfwew".to_string()),
program_version: Some("restic 0.16.0".to_string()),
};
b.iter(move || {
cache
.save_snapshot(
&foo,
generate_sizetree(black_box(6), black_box(12)),
)
.unwrap();
});
})
}
}

b.iter_with_setup(
|| {
let tempfile = Tempfile::new();
let cache =
Migrator::open(&tempfile.0).unwrap().migrate().unwrap();
(tempfile, cache, generate_sizetree(1, 0))
},
|(_tempfile, mut cache, tree)| {
for i in 0..10_000 {
cache
.save_snapshot(
&mk_snapshot(i.to_string()),
tree.clone(),
)
.unwrap();
}
},
);
});
}

Expand Down
211 changes: 110 additions & 101 deletions src/cache/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
use std::{collections::HashSet, path::Path};
use std::{
cmp::{max, Reverse},
collections::{HashMap, HashSet},
path::Path,
};

use camino::{Utf8Path, Utf8PathBuf};
use chrono::{DateTime, Utc};
use log::trace;
use rusqlite::{
functions::FunctionFlags, params, types::FromSqlError, Connection,
OptionalExtension, Row,
OptionalExtension,
};
use thiserror::Error;

Expand Down Expand Up @@ -117,119 +121,124 @@ impl Cache {
Ok(path_id)
}

fn entries_tables(
&self,
) -> Result<impl Iterator<Item = String>, rusqlite::Error> {
Ok(get_tables(&self.conn)?
.into_iter()
.filter(|name| name.starts_with("entries_")))
}

/// This returns the children files/directories of the given path.
/// Each entry's size is the largest size of that file/directory across
/// all snapshots.
pub fn get_entries(
&self,
path_id: Option<PathId>,
) -> Result<Vec<Entry>, rusqlite::Error> {
let aux = |row: &Row| {
Ok(Entry {
path_id: PathId(row.get("path_id")?),
component: row.get("component")?,
size: row.get("size")?,
is_dir: row.get("is_dir")?,
})
};
let raw_path_id = o_path_id_to_raw_u64(path_id);
let cte_stmt_string = get_tables(&self.conn)?
.into_iter()
.filter(|name| name.starts_with("entries_"))
.map(|table| {
format!(
"SELECT \
path_id, \
component, \
size, \
is_dir \
FROM \"{table}\" JOIN paths ON path_id = paths.id \
WHERE parent_id = {raw_path_id}\n"
)
})
.intersperse(String::from(" UNION ALL "))
.collect::<String>();
if cte_stmt_string.is_empty() {
return Ok(vec![]);
let mut entries: Vec<Entry> = Vec::new();
let mut index: HashMap<PathId, usize> = HashMap::new();
for table in self.entries_tables()? {
let stmt_str = format!(
"SELECT \
path_id, \
component, \
size, \
is_dir \
FROM \"{table}\" JOIN paths ON path_id = paths.id \
WHERE parent_id = {raw_path_id}\n",
);
let mut stmt = self.conn.prepare(&stmt_str)?;
let rows = stmt.query_map([], |row| {
Ok(Entry {
path_id: PathId(row.get("path_id")?),
component: row.get("component")?,
size: row.get("size")?,
is_dir: row.get("is_dir")?,
})
})?;
for row in rows {
let row = row?;
let path_id = row.path_id;
match index.get(&path_id) {
None => {
entries.push(row);
index.insert(path_id, entries.len() - 1);
}
Some(i) => {
let entry = &mut entries[*i];
entry.size = max(entry.size, row.size);
entry.is_dir = entry.is_dir || row.is_dir;
}
}
}
}
let mut stmt = self.conn.prepare(&format!(
"WITH rich_entries AS ({cte_stmt_string}) \
SELECT \
path_id, \
component, \
max(size) as size, \
max(is_dir) as is_dir \
FROM rich_entries \
GROUP BY path_id \
ORDER BY size DESC",
))?;
let rows = stmt.query_map([], aux)?;
rows.collect()
entries.sort_by_key(|e| Reverse(e.size));
Ok(entries)
}

pub fn get_entry_details(
&self,
path_id: PathId,
) -> Result<EntryDetails, Error> {
let aux = |row: &Row| -> Result<EntryDetails, Error> {
Ok(EntryDetails {
max_size: row.get("max_size")?,
max_size_snapshot_hash: row.get("max_size_snapshot_hash")?,
first_seen: timestamp_to_datetime(row.get("first_seen")?)?,
first_seen_snapshot_hash: row
.get("first_seen_snapshot_hash")?,
last_seen: timestamp_to_datetime(row.get("last_seen")?)?,
last_seen_snapshot_hash: row.get("last_seen_snapshot_hash")?,
})
};
) -> Result<Option<EntryDetails>, Error> {
let raw_path_id = path_id.0;
let rich_entries_cte = get_tables(&self.conn)?
.iter()
.filter_map(|name| name.strip_prefix("entries_"))
.map(|snapshot_hash| {
format!(
let run_query =
|table: &str| -> Result<(String, usize, DateTime<Utc>), Error> {
let snapshot_hash = table.strip_prefix("entries_").unwrap();
let stmt_str = format!(
"SELECT \
hash, \
size, \
time \
FROM \"entries_{snapshot_hash}\" \
JOIN paths ON path_id = paths.id \
JOIN snapshots ON hash = '{snapshot_hash}' \
WHERE path_id = {raw_path_id}\n"
)
})
.intersperse(String::from(" UNION ALL "))
.collect::<String>();
let query = format!(
"WITH \
rich_entries AS ({rich_entries_cte}), \
first_seen AS (
SELECT hash, time
FROM rich_entries
ORDER BY time ASC
LIMIT 1), \
last_seen AS (
SELECT hash, time
FROM rich_entries
ORDER BY time DESC
LIMIT 1), \
max_size AS (
SELECT hash, size
FROM rich_entries
ORDER BY size DESC, time DESC
LIMIT 1) \
SELECT \
max_size.size AS max_size, \
max_size.hash AS max_size_snapshot_hash, \
first_seen.time AS first_seen, \
first_seen.hash as first_seen_snapshot_hash, \
last_seen.time AS last_seen, \
last_seen.hash as last_seen_snapshot_hash \
FROM max_size
JOIN first_seen ON 1=1
JOIN last_seen ON 1=1"
);
self.conn.query_row_and_then(&query, [], aux)
hash, \
size, \
time \
FROM \"{table}\" \
JOIN paths ON path_id = paths.id \
JOIN snapshots ON hash = '{snapshot_hash}' \
WHERE path_id = {raw_path_id}\n"
);
let mut stmt = self.conn.prepare(&stmt_str)?;
let (hash, size, timestamp) = stmt.query_row([], |row| {
Ok((row.get("hash")?, row.get("size")?, row.get("time")?))
})?;
let time = timestamp_to_datetime(timestamp)?;
Ok((hash, size, time))
};

let mut entries_tables = self.entries_tables()?;
let mut details = match entries_tables.next() {
None => return Ok(None),
Some(table) => {
let (hash, size, time) = run_query(&table)?;
EntryDetails {
max_size: size,
max_size_snapshot_hash: hash.clone(),
first_seen: time,
first_seen_snapshot_hash: hash.clone(),
last_seen: time,
last_seen_snapshot_hash: hash,
}
}
};
let mut max_size_time = details.first_seen; // Time of the max_size snapshot
for table in entries_tables {
let (hash, size, time) = run_query(&table)?;
if size > details.max_size
|| (size == details.max_size && time > max_size_time)
{
details.max_size = size;
details.max_size_snapshot_hash = hash.clone();
max_size_time = time;
}
if time < details.first_seen {
details.first_seen = time;
details.first_seen_snapshot_hash = hash.clone();
}
if time > details.last_seen {
details.last_seen = time;
details.last_seen_snapshot_hash = hash;
}
}
Ok(Some(details))
}

pub fn save_snapshot(
Expand Down Expand Up @@ -383,7 +392,7 @@ impl Cache {

// A PathId should never be 0.
// This is reserved for the absolute root and should match None
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
#[repr(transparent)]
pub struct PathId(u64);

Expand All @@ -407,7 +416,7 @@ pub struct Entry {
pub is_dir: bool,
}

#[derive(Clone, Debug)]
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct EntryDetails {
pub max_size: usize,
pub max_size_snapshot_hash: String,
Expand Down
Loading

0 comments on commit 1292f41

Please sign in to comment.