Skip to content

Commit

Permalink
libsql: Improve WAL sync logic
Browse files Browse the repository at this point in the history
  • Loading branch information
penberg committed Dec 11, 2024
1 parent 692bce8 commit 83327de
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 27 deletions.
3 changes: 3 additions & 0 deletions libsql/examples/offline_writes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ async fn main() {

let conn = db.connect().unwrap();

println!("Syncing database from remote...");
db.sync().await.unwrap();

conn.execute(
r#"
CREATE TABLE IF NOT EXISTS guest_book_entries (
Expand Down
53 changes: 26 additions & 27 deletions libsql/src/local/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,29 +391,36 @@ impl Database {
use crate::sync::SyncError;
use crate::Error;

match self.try_push().await {
Ok(rep) => Ok(rep),
Err(Error::Sync(err)) => {
// Retry the sync because we are ahead of the server and we need to push some older
// frames.
if let Some(SyncError::InvalidPushFrameNoLow(_, _)) =
err.downcast_ref::<SyncError>()
{
tracing::debug!("got InvalidPushFrameNo, retrying push");
self.try_push().await
} else {
Err(Error::Sync(err))
let mut sync_ctx = self.sync_ctx.as_ref().unwrap().lock().await;
let conn = self.connect()?;

let durable_frame_no = sync_ctx.durable_frame_num();
let max_frame_no = conn.wal_frame_count();

if max_frame_no > durable_frame_no {
match self.try_push(&mut sync_ctx, &conn).await {
Ok(rep) => Ok(rep),
Err(Error::Sync(err)) => {
// Retry the sync because we are ahead of the server and we need to push some older
// frames.
if let Some(SyncError::InvalidPushFrameNoLow(_, _)) =
err.downcast_ref::<SyncError>()
{
tracing::debug!("got InvalidPushFrameNo, retrying push");
self.try_push(&mut sync_ctx, &conn).await
} else {
Err(Error::Sync(err))
}
}
Err(e) => Err(e),
}
Err(e) => Err(e),
} else {
self.try_pull(&mut sync_ctx, &conn).await
}
}

#[cfg(feature = "sync")]
async fn try_push(&self) -> Result<crate::database::Replicated> {
let mut sync_ctx = self.sync_ctx.as_ref().unwrap().lock().await;
let conn = self.connect()?;

async fn try_push(&self, sync_ctx: &mut SyncContext, conn: &Connection) -> Result<crate::database::Replicated> {
let page_size = {
let rows = conn
.query("PRAGMA page_size", crate::params::Params::None)?
Expand All @@ -424,10 +431,7 @@ impl Database {
};

let max_frame_no = conn.wal_frame_count();

if max_frame_no == 0 {
return self.try_pull(&mut sync_ctx).await;
}
assert!(max_frame_no > 0);

let generation = sync_ctx.generation(); // TODO: Probe from WAL.
let start_frame_no = sync_ctx.durable_frame_num() + 1;
Expand All @@ -452,10 +456,6 @@ impl Database {

sync_ctx.write_metadata().await?;

if start_frame_no > end_frame_no {
return self.try_pull(&mut sync_ctx).await;
}

// TODO(lucio): this can underflow if the server previously returned a higher max_frame_no
// than what we have stored here.
let frame_count = end_frame_no - start_frame_no + 1;
Expand All @@ -466,10 +466,9 @@ impl Database {
}

#[cfg(feature = "sync")]
async fn try_pull(&self, sync_ctx: &mut SyncContext) -> Result<crate::database::Replicated> {
async fn try_pull(&self, sync_ctx: &mut SyncContext, conn: &Connection) -> Result<crate::database::Replicated> {
let generation = sync_ctx.generation();
let mut frame_no = sync_ctx.durable_frame_num() + 1;
let conn = self.connect()?;
conn.wal_insert_begin()?;

let mut err = None;
Expand Down

0 comments on commit 83327de

Please sign in to comment.