Skip to content

Commit

Permalink
Merge pull request #609 from tursodatabase/replica-restart
Browse files Browse the repository at this point in the history
Fix replica reset on primary log regeneration
  • Loading branch information
LucioFranco authored Nov 13, 2023
2 parents acf5f4f + 675f4d9 commit a531d92
Show file tree
Hide file tree
Showing 7 changed files with 562 additions and 3 deletions.
2 changes: 1 addition & 1 deletion libsql-replication/src/replicator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ impl<C: ReplicatorClient> Replicator<C> {
self.has_handshake = true;
return Ok(());
}
Err(e @ Error::Fatal(_)) => return Err(e),
Err(e @ (Error::Fatal(_) | Error::Meta(_))) => return Err(e),
Err(e) if !error_printed => {
tracing::error!("error connecting to primary. retrying. error: {e}");
error_printed = true;
Expand Down
4 changes: 4 additions & 0 deletions libsql-server/src/http/admin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use std::io::ErrorKind;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Notify;
use tokio_util::io::ReaderStream;
use url::Url;

Expand Down Expand Up @@ -60,6 +61,7 @@ pub async fn run<M, A, C>(
namespaces: NamespaceStore<M>,
connector: C,
disable_metrics: bool,
shutdown: Arc<Notify>,
) -> anyhow::Result<()>
where
A: crate::net::Accept,
Expand Down Expand Up @@ -124,8 +126,10 @@ where

hyper::server::Server::builder(acceptor)
.serve(router.into_make_service())
.with_graceful_shutdown(shutdown.notified())
.await
.context("Could not bind admin HTTP API server")?;

Ok(())
}

Expand Down
4 changes: 3 additions & 1 deletion libsql-server/src/http/user/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use hyper::{header, Body, Request, Response, StatusCode};
use serde::de::DeserializeOwned;
use serde::Serialize;
use serde_json::Number;
use tokio::sync::{mpsc, oneshot};
use tokio::sync::{mpsc, oneshot, Notify};
use tokio::task::JoinSet;
use tonic::transport::Server;
use tower_http::trace::DefaultOnResponse;
Expand Down Expand Up @@ -237,6 +237,7 @@ pub struct UserApi<M: MakeNamespace, A, P, S> {
pub enable_console: bool,
pub self_url: Option<String>,
pub path: Arc<Path>,
pub shutdown: Arc<Notify>,
}

impl<M, A, P, S> UserApi<M, A, P, S>
Expand Down Expand Up @@ -441,6 +442,7 @@ where
join_set.spawn(async move {
hyper::server::Server::builder(acceptor)
.serve(h2c)
.with_graceful_shutdown(self.shutdown.notified())
.await
.context("http server")?;
Ok(())
Expand Down
6 changes: 6 additions & 0 deletions libsql-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ struct Services<M: MakeNamespace, A, P, S, C> {
db_config: DbConfig,
auth: Arc<Auth>,
path: Arc<Path>,
shutdown: Arc<Notify>,
}

impl<M, A, P, S, C> Services<M, A, P, S, C>
Expand All @@ -156,6 +157,7 @@ where
enable_console: self.user_api_config.enable_http_console,
self_url: self.user_api_config.self_url,
path: self.path.clone(),
shutdown: self.shutdown.clone(),
};

let user_http_service = user_http.configure(join_set);
Expand All @@ -166,12 +168,14 @@ where
disable_metrics,
}) = self.admin_api_config
{
let shutdown = self.shutdown.clone();
join_set.spawn(http::admin::run(
acceptor,
user_http_service,
self.namespaces,
connector,
disable_metrics,
shutdown,
));
}
}
Expand Down Expand Up @@ -398,6 +402,7 @@ where
db_config: self.db_config,
auth,
path: self.path.clone(),
shutdown: self.shutdown.clone(),
};

services.configure(&mut join_set);
Expand Down Expand Up @@ -433,6 +438,7 @@ where
db_config: self.db_config,
auth,
path: self.path.clone(),
shutdown: self.shutdown.clone(),
};

services.configure(&mut join_set);
Expand Down
11 changes: 10 additions & 1 deletion libsql-server/src/namespace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,16 @@ impl Namespace<ReplicaDatabase> {
.await?;

// force a handshake now, to retrieve the primary's current replication index
replicator.try_perform_handshake().await?;
match replicator.try_perform_handshake().await {
Err(libsql_replication::replicator::Error::Meta(
libsql_replication::meta::Error::LogIncompatible,
)) => {
tracing::error!("trying to replicate incompatible logs, reseting replica");
(reset)(ResetOp::Reset(name.clone()));
}
Err(e) => Err(e)?,
Ok(_) => (),
}
let primary_current_replicatio_index = replicator.client_mut().primary_replication_index;

let mut join_set = JoinSet::new();
Expand Down
2 changes: 2 additions & 0 deletions libsql-server/tests/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ use common::net::{init_tracing, TestServer, TurmoilAcceptor, TurmoilConnector};

use crate::common::{http::Client, net::SimServer, snapshot_metrics};

mod replica_restart;

fn make_cluster(sim: &mut Sim, num_replica: usize, disable_namespaces: bool) {
init_tracing();
let tmp = tempdir().unwrap();
Expand Down
Loading

0 comments on commit a531d92

Please sign in to comment.