Skip to content

Commit

Permalink
bottomless: add xz compression option
Browse files Browse the repository at this point in the history
Transplanted from libsql/sqld#780
  • Loading branch information
psarna committed Oct 18, 2023
1 parent 812e4db commit 0a44f6a
Show file tree
Hide file tree
Showing 4 changed files with 94 additions and 32 deletions.
2 changes: 1 addition & 1 deletion bottomless/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ description = "Bottomless replication for libSQL"

[dependencies]
anyhow = "1.0.66"
async-compression = { version = "0.3.15", features = ["tokio", "gzip"] }
async-compression = { version = "0.3.15", features = ["tokio", "gzip", "xz"] }
aws-config = { version = "0.55" }
aws-sdk-s3 = { version = "0.28" }
bytes = "1"
Expand Down
5 changes: 5 additions & 0 deletions bottomless/src/backup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ impl WalCopier {
wal.copy_frames(&mut gzip, len).await?;
gzip.shutdown().await?;
}
CompressionKind::Xz => {
let mut xz = async_compression::tokio::write::XzEncoder::new(&mut out);
wal.copy_frames(&mut xz, len).await?;
xz.shutdown().await?;
}
}
if tracing::enabled!(tracing::Level::DEBUG) {
let elapsed = Instant::now() - period_start;
Expand Down
6 changes: 5 additions & 1 deletion bottomless/src/read.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::replicator::CompressionKind;
use crate::wal::WalFrameHeader;
use anyhow::Result;
use async_compression::tokio::bufread::GzipDecoder;
use async_compression::tokio::bufread::{GzipDecoder, XzEncoder};
use aws_sdk_s3::primitives::ByteStream;
use std::io::ErrorKind;
use std::pin::Pin;
Expand Down Expand Up @@ -32,6 +32,10 @@ impl BatchReader {
let gzip = GzipDecoder::new(reader);
Box::pin(gzip)
}
CompressionKind::Xz => {
let xz = XzEncoder::new(reader);
Box::pin(xz)
}
},
}
}
Expand Down
113 changes: 83 additions & 30 deletions bottomless/src/replicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::uuid_utils::decode_unix_timestamp;
use crate::wal::WalFileReader;
use anyhow::{anyhow, bail};
use arc_swap::ArcSwapOption;
use async_compression::tokio::write::GzipEncoder;
use async_compression::tokio::write::{GzipEncoder, XzEncoder};
use aws_sdk_s3::config::{Credentials, Region};
use aws_sdk_s3::error::SdkError;
use aws_sdk_s3::operation::get_object::builders::GetObjectFluentBuilder;
Expand Down Expand Up @@ -171,7 +171,7 @@ impl Options {
let secret_access_key = env_var("LIBSQL_BOTTOMLESS_AWS_SECRET_ACCESS_KEY").ok();
let region = env_var("LIBSQL_BOTTOMLESS_AWS_DEFAULT_REGION").ok();
let max_frames_per_batch =
env_var_or("LIBSQL_BOTTOMLESS_BATCH_MAX_FRAMES", 500).parse::<usize>()?;
env_var_or("LIBSQL_BOTTOMLESS_BATCH_MAX_FRAMES", 10000).parse::<usize>()?;
let s3_upload_max_parallelism =
env_var_or("LIBSQL_BOTTOMLESS_S3_PARALLEL_MAX", 32).parse::<usize>()?;
let restore_transaction_page_swap_after =
Expand Down Expand Up @@ -653,7 +653,7 @@ impl Replicator {
CompressionKind::None => Ok(ByteStream::from_path(db_path).await?),
CompressionKind::Gzip => {
let mut reader = File::open(db_path).await?;
let gzip_path = Self::db_gzip_path(db_path);
let gzip_path = Self::db_compressed_path(db_path, "gz");
let compressed_file = OpenOptions::new()
.create(true)
.write(true)
Expand All @@ -671,13 +671,33 @@ impl Replicator {
);
Ok(ByteStream::from_path(gzip_path).await?)
}
CompressionKind::Xz => {
let mut reader = File::open(db_path).await?;
let xz_path = Self::db_compressed_path(db_path, "xz");
let compressed_file = OpenOptions::new()
.create(true)
.write(true)
.read(true)
.truncate(true)
.open(&xz_path)
.await?;
let mut writer = XzEncoder::new(compressed_file);
let size = tokio::io::copy(&mut reader, &mut writer).await?;
writer.shutdown().await?;
tracing::debug!(
"Compressed database file ({} bytes) into `{}`",
size,
xz_path.display()
);
Ok(ByteStream::from_path(xz_path).await?)
}
}
}

fn db_gzip_path(db_path: &Path) -> PathBuf {
let mut gzip_path = db_path.to_path_buf();
gzip_path.pop();
gzip_path.join("db.gz")
fn db_compressed_path(db_path: &Path, suffix: &'static str) -> PathBuf {
let mut compressed_path: PathBuf = db_path.to_path_buf();
compressed_path.pop();
compressed_path.join(format!("db.{suffix}"))
}

fn restore_db_path(&self) -> PathBuf {
Expand Down Expand Up @@ -816,9 +836,10 @@ impl Replicator {
let _ = snapshot_notifier.send(Ok(Some(generation)));
let elapsed = Instant::now() - start;
tracing::debug!("Snapshot upload finished (took {:?})", elapsed);
// cleanup gzip database snapshot if exists
let gzip_path = Self::db_gzip_path(&db_path);
let _ = tokio::fs::remove_file(gzip_path).await;
// cleanup gzip/xz database snapshot if exists
for suffix in &["gz", "xz"] {
let _ = tokio::fs::remove_file(Self::db_compressed_path(&db_path, suffix)).await;
}
});
let elapsed = Instant::now() - start_ts;
tracing::debug!("Scheduled DB snapshot {} (took {:?})", generation, elapsed);
Expand Down Expand Up @@ -1160,31 +1181,58 @@ impl Replicator {
}

async fn restore_from_snapshot(&mut self, generation: &Uuid, db: &mut File) -> Result<bool> {
let main_db_path = match self.use_compression {
CompressionKind::None => format!("{}-{}/db.db", self.db_name, generation),
CompressionKind::Gzip => format!("{}-{}/db.gz", self.db_name, generation),
let algos_to_try = match self.use_compression {
CompressionKind::None => &[
CompressionKind::None,
CompressionKind::Xz,
CompressionKind::Gzip,
],
CompressionKind::Gzip => &[
CompressionKind::Gzip,
CompressionKind::Xz,
CompressionKind::None,
],
CompressionKind::Xz => &[
CompressionKind::Xz,
CompressionKind::Gzip,
CompressionKind::None,
],
};

if let Ok(db_file) = self.get_object(main_db_path).send().await {
let mut body_reader = db_file.body.into_async_read();
let db_size = match self.use_compression {
CompressionKind::None => tokio::io::copy(&mut body_reader, db).await?,
CompressionKind::Gzip => {
let mut decompress_reader = async_compression::tokio::bufread::GzipDecoder::new(
tokio::io::BufReader::new(body_reader),
);
tokio::io::copy(&mut decompress_reader, db).await?
}
for algo in algos_to_try {
let main_db_path = match algo {
CompressionKind::None => format!("{}-{}/db.db", self.db_name, generation),
CompressionKind::Gzip => format!("{}-{}/db.gz", self.db_name, generation),
CompressionKind::Xz => format!("{}-{}/db.xz", self.db_name, generation),
};
db.flush().await?;
if let Ok(db_file) = self.get_object(main_db_path).send().await {
let mut body_reader = db_file.body.into_async_read();
let db_size = match algo {
CompressionKind::None => tokio::io::copy(&mut body_reader, db).await?,
CompressionKind::Gzip => {
let mut decompress_reader =
async_compression::tokio::bufread::GzipDecoder::new(
tokio::io::BufReader::new(body_reader),
);
tokio::io::copy(&mut decompress_reader, db).await?
}
CompressionKind::Xz => {
let mut decompress_reader =
async_compression::tokio::bufread::XzDecoder::new(
tokio::io::BufReader::new(body_reader),
);
tokio::io::copy(&mut decompress_reader, db).await?
}
};
db.flush().await?;

let page_size = Self::read_page_size(db).await?;
self.set_page_size(page_size)?;
tracing::info!("Restored the main database file ({} bytes)", db_size);
Ok(true)
} else {
Ok(false)
let page_size = Self::read_page_size(db).await?;
self.set_page_size(page_size)?;
tracing::info!("Restored the main database file ({} bytes)", db_size);
return Ok(true);
}
}
Ok(false)
}

async fn restore_wal(
Expand Down Expand Up @@ -1235,6 +1283,7 @@ impl Replicator {
Some(result) => result,
None => {
if !key.ends_with(".gz")
&& !key.ends_with(".xz")
&& !key.ends_with(".db")
&& !key.ends_with(".meta")
&& !key.ends_with(".dep")
Expand Down Expand Up @@ -1423,6 +1472,7 @@ impl Replicator {
let str = fpath.to_str()?;
if str.ends_with(".db")
| str.ends_with(".gz")
| str.ends_with(".xz")
| str.ends_with(".raw")
| str.ends_with(".meta")
| str.ends_with(".dep")
Expand Down Expand Up @@ -1670,13 +1720,15 @@ pub enum CompressionKind {
#[default]
None,
Gzip,
Xz,
}

impl CompressionKind {
pub fn parse(kind: &str) -> std::result::Result<Self, &str> {
match kind {
"gz" | "gzip" => Ok(CompressionKind::Gzip),
"raw" | "" => Ok(CompressionKind::None),
"xz" => Ok(CompressionKind::Xz),
other => Err(other),
}
}
Expand All @@ -1687,6 +1739,7 @@ impl std::fmt::Display for CompressionKind {
match self {
CompressionKind::None => write!(f, "raw"),
CompressionKind::Gzip => write!(f, "gz"),
CompressionKind::Xz => write!(f, "xz"),
}
}
}

0 comments on commit 0a44f6a

Please sign in to comment.