Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

libsql: Improve WAL frame push logic #1817

Merged
merged 1 commit into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion libsql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ fallible-iterator = { version = "0.3", optional = true }

libsql_replication = { version = "0.6", path = "../libsql-replication", optional = true }
async-stream = { version = "0.3.5", optional = true }
reqwest = { version = "0.12.9", default-features = false, features = [ "rustls-tls" ], optional = true }
reqwest = { version = "0.12.9", default-features = false, features = [ "rustls-tls", "json" ], optional = true }

[dev-dependencies]
criterion = { version = "0.5", features = ["html_reports", "async", "async_futures", "async_tokio"] }
Expand Down Expand Up @@ -106,6 +106,7 @@ sync = [
"dep:tokio",
"dep:futures",
"dep:reqwest",
"dep:serde_json",
]
hrana = [
"parser",
Expand Down
24 changes: 17 additions & 7 deletions libsql/src/local/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,8 +397,16 @@ impl Database {
let start_frame_no = sync_ctx.durable_frame_num + 1;
let end_frame_no = max_frame_no;

for frame_no in start_frame_no..end_frame_no+1 {
self.push_one_frame(&conn, &sync_ctx, generation, frame_no, page_size).await?;
let mut frame_no = start_frame_no;
while frame_no <= end_frame_no {
// The server returns its maximum frame number. To avoid resending
// frames the server already knows about, we need to update the
// frame number to the one returned by the server.
let max_frame_no = self.push_one_frame(&conn, &sync_ctx, generation, frame_no, page_size).await?;
if max_frame_no > frame_no {
frame_no = max_frame_no;
}
frame_no += 1;
}

let frame_count = end_frame_no - start_frame_no + 1;
Expand All @@ -409,7 +417,7 @@ impl Database {
}

#[cfg(feature = "sync")]
async fn push_one_frame(&self, conn: &Connection, sync_ctx: &SyncContext, generation: u32, frame_no: u32, page_size: u32) -> Result<()> {
async fn push_one_frame(&self, conn: &Connection, sync_ctx: &SyncContext, generation: u32, frame_no: u32, page_size: u32) -> Result<u32> {
let frame_size: usize = 24+page_size as usize;
let frame = vec![0; frame_size];
let rc = unsafe {
Expand All @@ -419,12 +427,12 @@ impl Database {
return Err(crate::errors::Error::SqliteFailure(rc as std::ffi::c_int, format!("Failed to get frame: {}", frame_no)));
}
let uri = format!("{}/sync/{}/{}/{}", sync_ctx.sync_url, generation, frame_no, frame_no+1);
self.push_with_retry(uri, &sync_ctx.auth_token, frame.to_vec(), sync_ctx.max_retries).await?;
Ok(())
let max_frame_no = self.push_with_retry(uri, &sync_ctx.auth_token, frame.to_vec(), sync_ctx.max_retries).await?;
Ok(max_frame_no)
}

#[cfg(feature = "sync")]
async fn push_with_retry(&self, uri: String, auth_token: &Option<String>, frame: Vec<u8>, max_retries: usize) -> Result<()> {
async fn push_with_retry(&self, uri: String, auth_token: &Option<String>, frame: Vec<u8>, max_retries: usize) -> Result<u32> {
let mut nr_retries = 0;
loop {
let client = reqwest::Client::new();
Expand All @@ -437,7 +445,9 @@ impl Database {
}
let res = builder.body(frame.to_vec()).send().await.unwrap();
if res.status().is_success() {
return Ok(());
let resp = res.json::<serde_json::Value>().await.unwrap();
let max_frame_no = resp.get("max_frame_no").unwrap().as_u64().unwrap();
return Ok(max_frame_no as u32);
}
if nr_retries > max_retries {
return Err(crate::errors::Error::ConnectionFailed(format!("Failed to push frame: {}", res.status())));
Expand Down
Loading