diff --git a/libsql-server/src/http/admin/mod.rs b/libsql-server/src/http/admin/mod.rs index 75c2443b8e..1dfc1af425 100644 --- a/libsql-server/src/http/admin/mod.rs +++ b/libsql-server/src/http/admin/mod.rs @@ -129,6 +129,7 @@ where .with_graceful_shutdown(shutdown.notified()) .await .context("Could not bind admin HTTP API server")?; + Ok(()) } diff --git a/libsql-server/src/namespace/mod.rs b/libsql-server/src/namespace/mod.rs index 70eb5b5091..b14ac6d056 100644 --- a/libsql-server/src/namespace/mod.rs +++ b/libsql-server/src/namespace/mod.rs @@ -576,7 +576,9 @@ impl Namespace { // force a handshake now, to retrieve the primary's current replication index match replicator.try_perform_handshake().await { - Err(libsql_replication::replicator::Error::Meta(libsql_replication::meta::Error::LogIncompatible)) => { + Err(libsql_replication::replicator::Error::Meta( + libsql_replication::meta::Error::LogIncompatible, + )) => { tracing::error!("trying to replicate incompatible logs, reseting replica"); (reset)(ResetOp::Reset(name.clone())); } diff --git a/libsql-server/tests/cluster/mod.rs b/libsql-server/tests/cluster/mod.rs index 0b0b55fe9d..02a39e1c16 100644 --- a/libsql-server/tests/cluster/mod.rs +++ b/libsql-server/tests/cluster/mod.rs @@ -14,6 +14,8 @@ use common::net::{init_tracing, TestServer, TurmoilAcceptor, TurmoilConnector}; use crate::common::{http::Client, net::SimServer, snapshot_metrics}; +mod replica_restart; + fn make_cluster(sim: &mut Sim, num_replica: usize, disable_namespaces: bool) { init_tracing(); let tmp = tempdir().unwrap(); diff --git a/libsql-server/tests/cluster/replica_restart.rs b/libsql-server/tests/cluster/replica_restart.rs new file mode 100644 index 0000000000..40b06edf50 --- /dev/null +++ b/libsql-server/tests/cluster/replica_restart.rs @@ -0,0 +1,536 @@ +use std::sync::Arc; +use std::time::Duration; + +use futures::FutureExt; +use libsql::Database; +use sqld::config::{AdminApiConfig, RpcClientConfig, RpcServerConfig, UserApiConfig}; +use tempfile::tempdir; +use tokio::sync::Notify; +use turmoil::Builder; + +use crate::common::{ + http::Client, + net::{init_tracing, SimServer, TestServer, TurmoilAcceptor, TurmoilConnector}, +}; + +/// In this test, we create a primary and a replica, add some data and sync them. when then shut +/// down and bring back up the replica, and ensure the the replica continue normal mode of +/// operation. +#[test] +fn replica_restart() { + let mut sim = Builder::new().build(); + let tmp = tempdir().unwrap(); + sim.host("primary", move || { + let path = tmp.path().to_path_buf(); + async move { + let server = TestServer { + path: path.into(), + user_api_config: UserApiConfig { + ..Default::default() + }, + admin_api_config: Some(AdminApiConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await?, + connector: TurmoilConnector, + disable_metrics: true, + }), + rpc_server_config: Some(RpcServerConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 4567)).await?, + tls_config: None, + }), + ..Default::default() + }; + + server.start_sim(8080).await?; + + Ok(()) + } + }); + + let notify = Arc::new(Notify::new()); + let tmp = tempdir().unwrap(); + let notify_clone = notify.clone(); + sim.host("replica", move || { + let path = tmp.path().to_path_buf(); + let notify = notify_clone.clone(); + async move { + let make_server = || { + let path = path.clone(); + async { + TestServer { + path: path.into(), + user_api_config: UserApiConfig { + ..Default::default() + }, + admin_api_config: Some(AdminApiConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await.unwrap(), + connector: TurmoilConnector, + disable_metrics: true, + }), + rpc_client_config: Some(RpcClientConfig { + remote_url: "http://primary:4567".into(), + connector: TurmoilConnector, + tls_config: None, + }), + ..Default::default() + } + } + }; + + let server = make_server().await; + + tokio::select! { + res = server.start_sim(8080) => { + res.unwrap() + } + _ = notify.notified() => (), + } + + let server = make_server().await; + server.start_sim(8080).await.unwrap(); + + Ok(()) + } + }); + + sim.client("client", async move { + let http = Client::new(); + let db = Database::open_remote_with_connector("http://primary:8080", "", TurmoilConnector)?; + let conn = db.connect()?; + + // insert a few valued into the primary + conn.execute("create table test (x)", ()).await.unwrap(); + for _ in 0..50 { + conn.execute("insert into test values (42)", ()) + .await + .unwrap(); + } + + let primary_index = http + .get("http://primary:9090/v1/namespaces/default/stats") + .await + .unwrap() + .json_value() + .await + .unwrap()["replication_index"] + .clone() + .as_i64(); + + loop { + let replica_index = http + .get("http://primary:9090/v1/namespaces/default/stats") + .await + .unwrap() + .json_value() + .await + .unwrap()["replication_index"] + .clone() + .as_i64(); + if primary_index == replica_index { + break; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + + notify.notify_waiters(); + + // make sure that replica is up to date + loop { + let replica_index = http + .get("http://primary:9090/v1/namespaces/default/stats") + .await + .unwrap() + .json_value() + .await + .unwrap()["replication_index"] + .clone() + .as_i64(); + if primary_index == replica_index { + break; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + + Ok(()) + }); + + sim.run().unwrap(); +} + +/// In this test, we start a primary and a replica. We add some entries to the primary, and wait +/// for the replica to be up to date. Then we stop the primary, remove it's wallog, and restart the +/// primary. This will force the primary to regenerate the log. The replica should catch that, and +/// self heal. During this process the replica is not shutdown. +#[test] +fn primary_regenerate_log_no_replica_restart() { + let mut sim = Builder::new().build(); + let tmp = tempdir().unwrap(); + + let notify = Arc::new(Notify::new()); + let notify_clone = notify.clone(); + + init_tracing(); + sim.host("primary", move || { + let notify = notify_clone.clone(); + let path = tmp.path().to_path_buf(); + async move { + let make_server = || async { + TestServer { + path: path.clone().into(), + user_api_config: UserApiConfig { + ..Default::default() + }, + admin_api_config: Some(AdminApiConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await.unwrap(), + connector: TurmoilConnector, + disable_metrics: true, + }), + rpc_server_config: Some(RpcServerConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 4567)).await.unwrap(), + tls_config: None, + }), + ..Default::default() + } + }; + let server = make_server().await; + let shutdown = server.shutdown.clone(); + + let fut = async move { server.start_sim(8080).await }; + + tokio::pin!(fut); + + loop { + tokio::select! { + res = &mut fut => { + res.unwrap(); + break + } + _ = notify.notified() => { + shutdown.notify_waiters(); + }, + } + } + // remove the wallog and start again + tokio::fs::remove_file(path.join("dbs/default/wallog")) + .await + .unwrap(); + notify.notify_waiters(); + let server = make_server().await; + server.start_sim(8080).await.unwrap(); + + Ok(()) + } + }); + + let tmp = tempdir().unwrap(); + sim.host("replica", move || { + let path = tmp.path().to_path_buf(); + async move { + let make_server = || { + let path = path.clone(); + async { + TestServer { + path: path.into(), + user_api_config: UserApiConfig { + ..Default::default() + }, + admin_api_config: Some(AdminApiConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await.unwrap(), + connector: TurmoilConnector, + disable_metrics: true, + }), + rpc_client_config: Some(RpcClientConfig { + remote_url: "http://primary:4567".into(), + connector: TurmoilConnector, + tls_config: None, + }), + ..Default::default() + } + } + }; + + let server = make_server().await; + server.start_sim(8080).await.unwrap(); + + Ok(()) + } + }); + + sim.client("client", async move { + let http = Client::new(); + let db = Database::open_remote_with_connector("http://primary:8080", "", TurmoilConnector)?; + let conn = db.connect()?; + + // insert a few valued into the primary + conn.execute("create table test (x)", ()).await.unwrap(); + for _ in 0..50 { + conn.execute("insert into test values (42)", ()) + .await + .unwrap(); + } + + let primary_index = http + .get("http://primary:9090/v1/namespaces/default/stats") + .await + .unwrap() + .json_value() + .await + .unwrap()["replication_index"] + .clone() + .as_i64(); + + loop { + let replica_index = http + .get("http://primary:9090/v1/namespaces/default/stats") + .await + .unwrap() + .json_value() + .await + .unwrap()["replication_index"] + .clone() + .as_i64(); + if primary_index == replica_index { + break; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + + notify.notify_waiters(); + notify.notified().await; + + drop(http); + let http = Client::new(); + // make sure that replica is up to date + let new_primary_index = http + .get("http://primary:9090/v1/namespaces/default/stats") + .await + .unwrap() + .json_value() + .await + .unwrap()["replication_index"] + .clone() + .as_i64(); + assert_ne!(primary_index, new_primary_index); + loop { + let replica_index = http + .get("http://primary:9090/v1/namespaces/default/stats") + .await + .unwrap() + .json_value() + .await + .unwrap()["replication_index"] + .clone() + .as_i64(); + if new_primary_index == replica_index { + break; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + + Ok(()) + }); + + sim.run().unwrap(); +} + +/// This test is very similar to `primary_regenerate_log_no_replica_restart`. The only difference +/// is that the replica is being shutdown before the primary regenerates their log. When the +/// replica is brought back up, it will try to load the namespace from a primary with a new log, +/// and it should self heal. +#[test] +fn primary_regenerate_log_with_replica_restart() { + let mut sim = Builder::new().build(); + let tmp = tempdir().unwrap(); + + let notify = Arc::new(Notify::new()); + let notify_clone = notify.clone(); + + init_tracing(); + sim.host("primary", move || { + let notify = notify_clone.clone(); + let path = tmp.path().to_path_buf(); + async move { + let make_server = || async { + TestServer { + path: path.clone().into(), + user_api_config: UserApiConfig { + ..Default::default() + }, + admin_api_config: Some(AdminApiConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await.unwrap(), + connector: TurmoilConnector, + disable_metrics: true, + }), + rpc_server_config: Some(RpcServerConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 4567)).await.unwrap(), + tls_config: None, + }), + ..Default::default() + } + }; + let server = make_server().await; + let shutdown = server.shutdown.clone(); + + let fut = async move { server.start_sim(8080).await }; + + tokio::pin!(fut); + + loop { + tokio::select! { + res = &mut fut => { + res.unwrap(); + break + } + _ = notify.notified() => { + shutdown.notify_waiters(); + }, + } + } + // remove the wallog and start again + tokio::fs::remove_file(path.join("dbs/default/wallog")) + .await + .unwrap(); + notify.notify_waiters(); + let server = make_server().await; + server.start_sim(8080).await.unwrap(); + + Ok(()) + } + }); + + let tmp = tempdir().unwrap(); + let notify_clone = notify.clone(); + sim.host("replica", move || { + let path = tmp.path().to_path_buf(); + let notify = notify_clone.clone(); + async move { + let make_server = || { + let path = path.clone(); + async { + TestServer { + path: path.into(), + user_api_config: UserApiConfig { + ..Default::default() + }, + admin_api_config: Some(AdminApiConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await.unwrap(), + connector: TurmoilConnector, + disable_metrics: true, + }), + rpc_client_config: Some(RpcClientConfig { + remote_url: "http://primary:4567".into(), + connector: TurmoilConnector, + tls_config: None, + }), + ..Default::default() + } + } + }; + + let server = make_server().await; + let shutdown = server.shutdown.clone(); + let fut = async { + server.start_sim(8080).await.unwrap(); + }; + + tokio::pin!(fut); + let notify_fut = async { + notify.notified().await; + } + .fuse(); + tokio::pin!(notify_fut); + loop { + tokio::select! { + _ = &mut fut => break, + _ = &mut notify_fut => { + shutdown.notify_waiters(); + } + } + } + + // we wait for the server to have restarted + notify.notified().await; + + // and then restart the replica + let server = make_server().await; + server.start_sim(8080).await.unwrap(); + + Ok(()) + } + }); + + sim.client("client", async move { + let http = Client::new(); + let db = Database::open_remote_with_connector("http://primary:8080", "", TurmoilConnector)?; + let conn = db.connect()?; + + // insert a few valued into the primary + conn.execute("create table test (x)", ()).await.unwrap(); + for _ in 0..50 { + conn.execute("insert into test values (42)", ()) + .await + .unwrap(); + } + + let primary_index = http + .get("http://primary:9090/v1/namespaces/default/stats") + .await + .unwrap() + .json_value() + .await + .unwrap()["replication_index"] + .clone() + .as_i64(); + + loop { + let replica_index = http + .get("http://primary:9090/v1/namespaces/default/stats") + .await + .unwrap() + .json_value() + .await + .unwrap()["replication_index"] + .clone() + .as_i64(); + if primary_index == replica_index { + break; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + + notify.notify_waiters(); + notify.notified().await; + + drop(http); + let http = Client::new(); + // make sure that replica is up to date + let new_primary_index = http + .get("http://primary:9090/v1/namespaces/default/stats") + .await + .unwrap() + .json_value() + .await + .unwrap()["replication_index"] + .clone() + .as_i64(); + assert_ne!(primary_index, new_primary_index); + loop { + let replica_index = http + .get("http://primary:9090/v1/namespaces/default/stats") + .await + .unwrap() + .json_value() + .await + .unwrap()["replication_index"] + .clone() + .as_i64(); + if new_primary_index == replica_index { + break; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + + Ok(()) + }); + + sim.run().unwrap(); +}