Skip to content

Commit

Permalink
libsql: Improve WAL sync logic
Browse files Browse the repository at this point in the history
This improves the WAL sync to follow this logic:

- If there are no local writes, attempt to pull. We know there are no
  local writes if the cached remote max frame number is greater than or
  equal to local max frame number.

- If there are local writes, attempt to push. We know there are local
  writes if the cached remote max number is less than lcoal max frame
  number.

This also removes the `offline_writes_pull.rs` example and augments
`offline_writes.rs` to sync before attempting to write.
  • Loading branch information
penberg committed Dec 13, 2024
1 parent 6ed1690 commit 19e7800
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 80 deletions.
3 changes: 3 additions & 0 deletions libsql/examples/offline_writes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ async fn main() {

let conn = db.connect().unwrap();

println!("Syncing database from remote...");
db.sync().await.unwrap();

conn.execute(
r#"
CREATE TABLE IF NOT EXISTS guest_book_entries (
Expand Down
55 changes: 0 additions & 55 deletions libsql/examples/offline_writes_pull.rs

This file was deleted.

54 changes: 29 additions & 25 deletions libsql/src/local/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,29 +391,36 @@ impl Database {
use crate::sync::SyncError;
use crate::Error;

match self.try_push().await {
Ok(rep) => Ok(rep),
Err(Error::Sync(err)) => {
// Retry the sync because we are ahead of the server and we need to push some older
// frames.
if let Some(SyncError::InvalidPushFrameNoLow(_, _)) =
err.downcast_ref::<SyncError>()
{
tracing::debug!("got InvalidPushFrameNo, retrying push");
self.try_push().await
} else {
Err(Error::Sync(err))
let mut sync_ctx = self.sync_ctx.as_ref().unwrap().lock().await;
let conn = self.connect()?;

let durable_frame_no = sync_ctx.durable_frame_num();
let max_frame_no = conn.wal_frame_count();

if max_frame_no > durable_frame_no {
match self.try_push(&mut sync_ctx, &conn).await {
Ok(rep) => Ok(rep),
Err(Error::Sync(err)) => {
// Retry the sync because we are ahead of the server and we need to push some older
// frames.
if let Some(SyncError::InvalidPushFrameNoLow(_, _)) =
err.downcast_ref::<SyncError>()
{
tracing::debug!("got InvalidPushFrameNo, retrying push");
self.try_push(&mut sync_ctx, &conn).await
} else {
Err(Error::Sync(err))
}
}
Err(e) => Err(e),
}
Err(e) => Err(e),
} else {
self.try_pull(&mut sync_ctx, &conn).await
}
}

#[cfg(feature = "sync")]
async fn try_push(&self) -> Result<crate::database::Replicated> {
let mut sync_ctx = self.sync_ctx.as_ref().unwrap().lock().await;
let conn = self.connect()?;

async fn try_push(&self, sync_ctx: &mut SyncContext, conn: &Connection) -> Result<crate::database::Replicated> {
let page_size = {
let rows = conn
.query("PRAGMA page_size", crate::params::Params::None)?
Expand All @@ -424,9 +431,11 @@ impl Database {
};

let max_frame_no = conn.wal_frame_count();

if max_frame_no == 0 {
return self.try_pull(&mut sync_ctx).await;
return Ok(crate::database::Replicated {
frame_no: None,
frames_synced: 0,
});
}

let generation = sync_ctx.generation(); // TODO: Probe from WAL.
Expand All @@ -452,10 +461,6 @@ impl Database {

sync_ctx.write_metadata().await?;

if start_frame_no > end_frame_no {
return self.try_pull(&mut sync_ctx).await;
}

// TODO(lucio): this can underflow if the server previously returned a higher max_frame_no
// than what we have stored here.
let frame_count = end_frame_no - start_frame_no + 1;
Expand All @@ -466,10 +471,9 @@ impl Database {
}

#[cfg(feature = "sync")]
async fn try_pull(&self, sync_ctx: &mut SyncContext) -> Result<crate::database::Replicated> {
async fn try_pull(&self, sync_ctx: &mut SyncContext, conn: &Connection) -> Result<crate::database::Replicated> {
let generation = sync_ctx.generation();
let mut frame_no = sync_ctx.durable_frame_num() + 1;
let conn = self.connect()?;
conn.wal_insert_begin()?;

let mut err = None;
Expand Down

0 comments on commit 19e7800

Please sign in to comment.