diff --git a/bottomless/src/replicator.rs b/bottomless/src/replicator.rs index 864cdfb0f9..1be2a43fe6 100644 --- a/bottomless/src/replicator.rs +++ b/bottomless/src/replicator.rs @@ -376,6 +376,33 @@ impl Replicator { }) } + /// Checks if there exists any backup of given database + pub async fn has_backup_of(db_name: impl AsRef, options: &Options) -> Result { + let prefix = match &options.db_id { + Some(db_id) => format!("{db_id}-"), + None => format!("ns-:{}-", db_name.as_ref()), + }; + let config = options.client_config().await?; + let client = Client::from_conf(config); + let bucket = options.bucket_name.clone(); + + match client.head_bucket().bucket(&bucket).send().await { + Ok(_) => tracing::trace!("Bucket {bucket} exists and is accessible"), + Err(e) => { + tracing::trace!("Bucket checking error: {e}"); + return Err(e.into()); + } + } + + let mut last_frame = 0; + let list_objects = client.list_objects().bucket(&bucket).prefix(&prefix); + let response = list_objects.send().await?; + let _ = Self::try_get_last_frame_no(response, &mut last_frame); + tracing::trace!("Last frame of {prefix}: {last_frame}"); + + Ok(last_frame > 0) + } + pub async fn shutdown_gracefully(&mut self) -> Result<()> { tracing::info!("bottomless replicator: shutting down..."); // 1. wait for all committed WAL frames to be committed locally diff --git a/libsql-server/Cargo.toml b/libsql-server/Cargo.toml index af7ca172e0..8c40429455 100644 --- a/libsql-server/Cargo.toml +++ b/libsql-server/Cargo.toml @@ -41,6 +41,7 @@ metrics = "0.21.1" metrics-util = "0.15" metrics-exporter-prometheus = "0.12.2" mimalloc = { version = "0.1.36", default-features = false } +moka = { version = "0.12.1", features = ["future"] } nix = { version = "0.26.2", features = ["fs"] } once_cell = "1.17.0" parking_lot = "0.12.1" diff --git a/libsql-server/src/error.rs b/libsql-server/src/error.rs index 87696c9397..e15461214a 100644 --- a/libsql-server/src/error.rs +++ b/libsql-server/src/error.rs @@ -96,6 +96,9 @@ pub enum Error { NamespaceStoreShutdown, #[error("Unable to update metastore: {0}")] MetaStoreUpdateFailure(Box), + // This is for errors returned by moka + #[error(transparent)] + Ref(#[from] std::sync::Arc), } trait ResponseError: std::error::Error { @@ -109,6 +112,12 @@ trait ResponseError: std::error::Error { impl ResponseError for Error {} impl IntoResponse for Error { + fn into_response(self) -> axum::response::Response { + (&self).into_response() + } +} + +impl IntoResponse for &Error { fn into_response(self) -> axum::response::Response { use Error::*; @@ -156,6 +165,7 @@ impl IntoResponse for Error { UrlParseError(_) => self.format_err(StatusCode::BAD_REQUEST), NamespaceStoreShutdown => self.format_err(StatusCode::SERVICE_UNAVAILABLE), MetaStoreUpdateFailure(_) => self.format_err(StatusCode::INTERNAL_SERVER_ERROR), + Ref(this) => this.as_ref().into_response(), } } } @@ -230,7 +240,7 @@ pub enum LoadDumpError { impl ResponseError for LoadDumpError {} -impl IntoResponse for LoadDumpError { +impl IntoResponse for &LoadDumpError { fn into_response(self) -> axum::response::Response { use LoadDumpError::*; @@ -250,7 +260,7 @@ impl IntoResponse for LoadDumpError { impl ResponseError for ForkError {} -impl IntoResponse for ForkError { +impl IntoResponse for &ForkError { fn into_response(self) -> axum::response::Response { match self { ForkError::Internal(_) diff --git a/libsql-server/src/lib.rs b/libsql-server/src/lib.rs index 846f28a76b..387f1110c1 100644 --- a/libsql-server/src/lib.rs +++ b/libsql-server/src/lib.rs @@ -100,6 +100,7 @@ pub struct Server, pub disable_namespaces: bool, pub shutdown: Arc, + pub max_active_namespaces: usize, } impl Default for Server { @@ -117,6 +118,7 @@ impl Default for Server { heartbeat_config: Default::default(), disable_namespaces: true, shutdown: Default::default(), + max_active_namespaces: 100, } } } @@ -384,6 +386,7 @@ where db_config: self.db_config.clone(), base_path: self.path.clone(), auth: auth.clone(), + max_active_namespaces: self.max_active_namespaces, }; let (namespaces, proxy_service, replication_service) = replica.configure().await?; self.rpc_client_config = None; @@ -422,6 +425,7 @@ where extensions, base_path: self.path.clone(), disable_namespaces: self.disable_namespaces, + max_active_namespaces: self.max_active_namespaces, join_set: &mut join_set, auth: auth.clone(), }; @@ -487,6 +491,7 @@ struct Primary<'a, A> { extensions: Arc<[PathBuf]>, base_path: Arc, disable_namespaces: bool, + max_active_namespaces: usize, auth: Arc, join_set: &'a mut JoinSet>, } @@ -520,12 +525,12 @@ where let meta_store_path = conf.base_path.join("metastore"); let factory = PrimaryNamespaceMaker::new(conf); - let namespaces = NamespaceStore::new( factory, false, self.db_config.snapshot_at_shutdown, meta_store_path, + self.max_active_namespaces, ) .await?; @@ -602,6 +607,7 @@ struct Replica { db_config: DbConfig, base_path: Arc, auth: Arc, + max_active_namespaces: usize, } impl Replica { @@ -627,7 +633,14 @@ impl Replica { let meta_store_path = conf.base_path.join("metastore"); let factory = ReplicaNamespaceMaker::new(conf); - let namespaces = NamespaceStore::new(factory, true, false, meta_store_path).await?; + let namespaces = NamespaceStore::new( + factory, + true, + false, + meta_store_path, + self.max_active_namespaces, + ) + .await?; let replication_service = ReplicationLogProxyService::new(channel.clone(), uri.clone()); let proxy_service = ReplicaProxyService::new(channel, uri, self.auth.clone()); diff --git a/libsql-server/src/main.rs b/libsql-server/src/main.rs index 57c557324a..4f850ff6b6 100644 --- a/libsql-server/src/main.rs +++ b/libsql-server/src/main.rs @@ -195,6 +195,10 @@ struct Cli { /// Enable snapshot at shutdown #[clap(long)] snapshot_at_shutdown: bool, + + /// Max active namespaces kept in-memory + #[clap(long, env = "SQLD_MAX_ACTIVE_NAMESPACES", default_value = "100")] + max_active_namespaces: usize, } #[derive(clap::Subcommand, Debug)] @@ -506,6 +510,7 @@ async fn build_server(config: &Cli) -> anyhow::Result { disable_default_namespace: config.disable_default_namespace, disable_namespaces: !config.enable_namespaces, shutdown, + max_active_namespaces: config.max_active_namespaces, }) } diff --git a/libsql-server/src/namespace/fork.rs b/libsql-server/src/namespace/fork.rs index 8373c4d4ae..28efd7f62c 100644 --- a/libsql-server/src/namespace/fork.rs +++ b/libsql-server/src/namespace/fork.rs @@ -109,7 +109,6 @@ impl ForkTask<'_> { self.dest_namespace.clone(), RestoreOption::Latest, self.bottomless_db_id, - true, // Forking works only on primary and // PrimaryNamespaceMaker::create ignores // reset_cb param diff --git a/libsql-server/src/namespace/meta_store.rs b/libsql-server/src/namespace/meta_store.rs index feffff1281..c2cbb7f3e9 100644 --- a/libsql-server/src/namespace/meta_store.rs +++ b/libsql-server/src/namespace/meta_store.rs @@ -177,6 +177,13 @@ impl MetaStore { inner: HandleState::External(change_tx, rx), } } + + // TODO: we need to either make sure that the metastore is restored + // before we start accepting connections or we need to contact bottomless + // here to check if a namespace exists. Preferably the former. + pub fn exists(&self, namespace: &NamespaceName) -> bool { + self.inner.lock().configs.contains_key(namespace) + } } impl MetaStoreHandle { diff --git a/libsql-server/src/namespace/mod.rs b/libsql-server/src/namespace/mod.rs index e1d74068be..4af96543a4 100644 --- a/libsql-server/src/namespace/mod.rs +++ b/libsql-server/src/namespace/mod.rs @@ -2,24 +2,24 @@ mod fork; pub mod meta_store; pub mod replication_wal; -use std::collections::hash_map::Entry; -use std::collections::HashMap; use std::fmt; use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Weak}; use anyhow::Context as _; -use async_lock::{RwLock, RwLockUpgradableReadGuard}; +use async_lock::RwLock; use bottomless::bottomless_wal::CreateBottomlessWal; use bottomless::replicator::Options; use bytes::Bytes; use chrono::NaiveDateTime; use enclose::enclose; -use futures_core::Stream; +use futures::TryFutureExt; +use futures_core::{Future, Stream}; use hyper::Uri; use libsql_replication::rpc::replication::replication_log_client::ReplicationLogClient; use libsql_sys::wal::{Sqlite3WalManager, WalManager}; +use moka::future::Cache; use parking_lot::Mutex; use rusqlite::ErrorCode; use serde::de::Visitor; @@ -30,7 +30,6 @@ use tokio::task::JoinSet; use tokio::time::{Duration, Instant}; use tokio_util::io::StreamReader; use tonic::transport::Channel; -use tracing::trace; use uuid::Uuid; use crate::auth::Authenticated; @@ -75,6 +74,12 @@ impl Default for NamespaceName { } } +impl AsRef for NamespaceName { + fn as_ref(&self) -> &str { + self.as_str() + } +} + impl NamespaceName { pub fn from_string(s: String) -> crate::Result { Self::validate(&s)?; @@ -174,7 +179,6 @@ pub trait MakeNamespace: Sync + Send + 'static { name: NamespaceName, restore_option: RestoreOption, bottomless_db_id: NamespaceBottomlessDbId, - allow_creation: bool, reset: ResetCb, meta_store: &MetaStore, ) -> crate::Result>; @@ -220,7 +224,6 @@ impl MakeNamespace for PrimaryNamespaceMaker { name: NamespaceName, restore_option: RestoreOption, bottomless_db_id: NamespaceBottomlessDbId, - allow_creation: bool, _reset: ResetCb, meta_store: &MetaStore, ) -> crate::Result> { @@ -229,7 +232,6 @@ impl MakeNamespace for PrimaryNamespaceMaker { name.clone(), restore_option, bottomless_db_id, - allow_creation, meta_store.handle(name), ) .await @@ -337,7 +339,6 @@ impl MakeNamespace for ReplicaNamespaceMaker { name: NamespaceName, restore_option: RestoreOption, _bottomless_db_id: NamespaceBottomlessDbId, - allow_creation: bool, reset: ResetCb, meta_store: &MetaStore, ) -> crate::Result> { @@ -346,14 +347,7 @@ impl MakeNamespace for ReplicaNamespaceMaker { _ => Err(LoadDumpError::ReplicaLoadDump)?, } - Namespace::new_replica( - &self.config, - name.clone(), - allow_creation, - reset, - meta_store.handle(name), - ) - .await + Namespace::new_replica(&self.config, name.clone(), reset, meta_store.handle(name)).await } async fn destroy( @@ -379,6 +373,8 @@ impl MakeNamespace for ReplicaNamespaceMaker { } } +type NamespaceEntry = Arc>>>; + /// Stores and manage a set of namespaces. pub struct NamespaceStore { inner: Arc>, @@ -393,7 +389,7 @@ impl Clone for NamespaceStore { } struct NamespaceStoreInner { - store: RwLock>>, + store: Cache>, metadata: MetaStore, /// The namespace factory, to create new namespaces. make_namespace: M, @@ -408,12 +404,34 @@ impl NamespaceStore { allow_lazy_creation: bool, snapshot_at_shutdown: bool, meta_store_path: impl AsRef, + max_active_namespaces: usize, ) -> crate::Result { let metadata = MetaStore::new(meta_store_path).await?; - + tracing::trace!("Max active namespaces: {max_active_namespaces}"); + let store = Cache::>::builder() + .async_eviction_listener(move |name, ns, cause| { + tracing::debug!("evicting namespace `{name}` asynchronously: {cause:?}"); + // TODO(sarna): not clear if we should snapshot-on-evict... + // On the one hand, better to do so, because we have no idea + // for how long we're evicting a namespace. + // On the other, if there's lots of cache pressure, snapshotting + // very often will kill the machine's I/O. + Box::pin(async move { + tracing::info!("namespace `{name}` deallocated"); + // shutdown namespace + if let Some(ns) = ns.write().await.take() { + if let Err(e) = ns.shutdown(snapshot_at_shutdown).await { + tracing::error!("error deallocating `{name}`: {e}") + } + } + }) + }) + .max_capacity(max_active_namespaces as u64) + .time_to_idle(Duration::from_secs(86400)) + .build(); Ok(Self { inner: Arc::new(NamespaceStoreInner { - store: Default::default(), + store, metadata, make_namespace, allow_lazy_creation, @@ -427,18 +445,15 @@ impl NamespaceStore { if self.inner.has_shutdown.load(Ordering::Relaxed) { return Err(Error::NamespaceStoreShutdown); } - let mut lock = self.inner.store.write().await; let mut bottomless_db_id_init = NamespaceBottomlessDbIdInit::FetchFromConfig; - if let Some(ns) = lock.remove(&namespace) { - bottomless_db_id_init = NamespaceBottomlessDbIdInit::Provided( - NamespaceBottomlessDbId::from_config(&ns.db_config_store.get()), - ); - // FIXME: when destroying, we are waiting for all the tasks associated with the - // allocation to finnish, which create a lot of contention on the lock. Need to use a - // conccurent hashmap to deal with this issue. - + if let Some(ns) = self.inner.store.remove(&namespace).await { // deallocate in-memory resources - ns.destroy().await?; + if let Some(ns) = ns.write().await.take() { + bottomless_db_id_init = NamespaceBottomlessDbIdInit::Provided( + NamespaceBottomlessDbId::from_config(&ns.db_config_store.get()), + ); + ns.destroy().await?; + } } // destroy on-disk database and backups @@ -457,24 +472,25 @@ impl NamespaceStore { Ok(()) } - async fn reset( + pub async fn reset( &self, namespace: NamespaceName, restore_option: RestoreOption, - ) -> crate::Result<()> { - if self.inner.has_shutdown.load(Ordering::Relaxed) { - return Err(Error::NamespaceStoreShutdown); - } - let mut lock = self.inner.store.write().await; - if let Some(ns) = lock.remove(&namespace) { - // FIXME: when destroying, we are waiting for all the tasks associated with the - // allocation to finnish, which create a lot of contention on the lock. Need to use a - // conccurent hashmap to deal with this issue. - - // deallocate in-memory resources + ) -> anyhow::Result<()> { + // The process for reseting is as follow: + // - get a lock on the namespace entry, if the entry exists, then it's a lock on the entry, + // if it doesn't exist, insert an empty entry and take a lock on it + // - destroy the old namespace + // - create a new namespace and insert it in the held lock + let entry = self + .inner + .store + .get_with(namespace.clone(), async { Default::default() }) + .await; + let mut lock = entry.write().await; + if let Some(ns) = lock.take() { ns.destroy().await?; } - // destroy on-disk database self.inner .make_namespace @@ -492,12 +508,12 @@ impl NamespaceStore { namespace.clone(), restore_option, NamespaceBottomlessDbId::NotProvided, - true, self.make_reset_cb(), &self.inner.metadata, ) .await?; - lock.insert(namespace, ns); + + lock.replace(ns); Ok(()) } @@ -534,18 +550,26 @@ impl NamespaceStore { if self.inner.has_shutdown.load(Ordering::Relaxed) { return Err(Error::NamespaceStoreShutdown); } - let mut lock = self.inner.store.write().await; - if lock.contains_key(&to) { - return Err(crate::error::Error::NamespaceAlreadyExist( - to.as_str().to_string(), - )); - } // check that the source namespace exists - let from_ns = match lock.entry(from.clone()) { - Entry::Occupied(e) => e.into_mut(), - Entry::Vacant(e) => { - // we just want to load the namespace into memory, so we refuse creation. + if !self.inner.metadata.exists(&from) { + return Err(crate::error::Error::NamespaceDoesntExist(from.to_string())); + } + + let to_entry = self + .inner + .store + .get_with(to.clone(), async { Default::default() }) + .await; + let mut to_lock = to_entry.write().await; + if to_lock.is_some() { + return Err(crate::error::Error::NamespaceAlreadyExist(to.to_string())); + } + + let from_entry = self + .inner + .store + .try_get_with(from.clone(), async { let ns = self .inner .make_namespace @@ -553,21 +577,27 @@ impl NamespaceStore { from.clone(), RestoreOption::Latest, NamespaceBottomlessDbId::NotProvided, - false, self.make_reset_cb(), &self.inner.metadata, ) .await?; - e.insert(ns) - } + tracing::info!("loaded namespace: `{to}`"); + Ok::<_, crate::error::Error>(Arc::new(RwLock::new(Some(ns)))) + }) + .await?; + + let from_lock = from_entry.read().await; + let Some(from_ns) = &*from_lock else { + return Err(crate::error::Error::NamespaceDoesntExist(to.to_string())); }; - let forked = self + let to_ns = self .inner .make_namespace .fork(from_ns, to.clone(), timestamp, &self.inner.metadata) .await?; - lock.insert(to.clone(), forked); + + to_lock.replace(to_ns); Ok(()) } @@ -579,7 +609,7 @@ impl NamespaceStore { f: Fun, ) -> crate::Result where - Fun: FnOnce(&Namespace) -> R, + Fun: FnOnce(&Namespace) -> R + 'static, { if self.inner.has_shutdown.load(Ordering::Relaxed) { return Err(Error::NamespaceStoreShutdown); @@ -593,82 +623,126 @@ impl NamespaceStore { pub async fn with(&self, namespace: NamespaceName, f: Fun) -> crate::Result where - Fun: FnOnce(&Namespace) -> R, + Fun: FnOnce(&Namespace) -> R + 'static, { - if self.inner.has_shutdown.load(Ordering::Relaxed) { - return Err(Error::NamespaceStoreShutdown); - } - let before_load = Instant::now(); - let lock = self.inner.store.upgradable_read().await; - if let Some(ns) = lock.get(&namespace) { - Ok(f(ns)) - } else { - let mut lock = RwLockUpgradableReadGuard::upgrade(lock).await; - let ns = self - .inner - .make_namespace - .create( - namespace.clone(), - RestoreOption::Latest, - NamespaceBottomlessDbId::NotProvided, - self.inner.allow_lazy_creation, - self.make_reset_cb(), - &self.inner.metadata, - ) - .await?; - let ret = f(&ns); - tracing::info!("loaded namespace: `{namespace}`"); - lock.insert(namespace, ns); + let init = { + let namespace = namespace.clone(); + async move { + if namespace != NamespaceName::default() + && !self.inner.metadata.exists(&namespace) + && !self.inner.allow_lazy_creation + { + return Err(Error::NamespaceDoesntExist(namespace.to_string())); + } + let ns = self + .inner + .make_namespace + .create( + namespace.clone(), + RestoreOption::Latest, + NamespaceBottomlessDbId::NotProvided, + self.make_reset_cb(), + &self.inner.metadata, + ) + .await?; + tracing::info!("loaded namespace: `{namespace}`"); - NAMESPACE_LOAD_LATENCY.record(before_load.elapsed()); + Ok(Some(ns)) + } + }; - Ok(ret) - } + let f = { + let name = namespace.clone(); + move |ns: NamespaceEntry| async move { + let lock = ns.read().await; + match &*lock { + Some(ns) => Ok(f(ns)), + // the namespace was taken out of the entry + None => Err(Error::NamespaceDoesntExist(name.to_string())), + } + } + }; + + self.with_lock_or_init(namespace, f, init).await? } - pub async fn create( + async fn with_lock_or_init( &self, namespace: NamespaceName, - restore_option: RestoreOption, - bottomless_db_id: NamespaceBottomlessDbId, - ) -> crate::Result<()> { - if self.inner.has_shutdown.load(Ordering::Relaxed) { - return Err(Error::NamespaceStoreShutdown); - } - let lock = self.inner.store.upgradable_read().await; - if lock.contains_key(&namespace) { - return Err(crate::error::Error::NamespaceAlreadyExist( - namespace.as_str().to_owned(), - )); - } - + f: Fun, + init: Init, + ) -> crate::Result + where + Fun: FnOnce(NamespaceEntry) -> Fut, + Fut: Future, + Init: Future>>>, + { + let before_load = Instant::now(); let ns = self .inner - .make_namespace - .create( + .store + .try_get_with( namespace.clone(), - restore_option, - bottomless_db_id, - true, - self.make_reset_cb(), - &self.inner.metadata, + init.map_ok(|ns| Arc::new(RwLock::new(ns))), ) .await?; + NAMESPACE_LOAD_LATENCY.record(before_load.elapsed()); + Ok(f(ns).await) + } - let mut lock = RwLockUpgradableReadGuard::upgrade(lock).await; - tracing::info!("loaded namespace: `{namespace}`"); - lock.insert(namespace, ns); + pub async fn create( + &self, + namespace: NamespaceName, + restore_option: RestoreOption, + bottomless_db_id: NamespaceBottomlessDbId, + ) -> crate::Result<()> { + // With namespaces disabled, the default namespace can be auto-created, + // otherwise it's an error. + if self.inner.allow_lazy_creation || namespace == NamespaceName::default() { + tracing::trace!("auto-creating the namespace"); + } else if self.inner.metadata.exists(&namespace) { + return Err(Error::NamespaceAlreadyExist(namespace.to_string())); + } - Ok(()) + let name = namespace.clone(); + let bottomless_db_id_for_init = bottomless_db_id.clone(); + let init = async { + let ns = self + .inner + .make_namespace + .create( + name.clone(), + restore_option, + bottomless_db_id_for_init, + self.make_reset_cb(), + &self.inner.metadata, + ) + .await; + match ns { + Ok(ns) => { + tracing::info!("loaded namespace: `{name}`"); + Ok(Some(ns)) + } + // return an empty slot to put the new namespace in + Err(Error::NamespaceDoesntExist(_)) => Ok(None), + Err(e) => Err(e), + } + }; + + self.with_lock_or_init(namespace, |_| async { Ok(()) }, init) + .await? } pub async fn shutdown(self) -> crate::Result<()> { self.inner.has_shutdown.store(true, Ordering::Relaxed); - let mut lock = self.inner.store.write().await; - for (name, ns) in lock.drain() { - ns.shutdown(self.inner.snapshot_at_shutdown).await?; - trace!("shutdown namespace: `{}`", name); + for (_name, entry) in self.inner.store.iter() { + let mut lock = entry.write().await; + if let Some(ns) = lock.take() { + ns.shutdown(self.inner.snapshot_at_shutdown).await?; + } } + self.inner.store.invalidate_all(); + self.inner.store.run_pending_tasks().await; Ok(()) } @@ -742,20 +816,12 @@ impl Namespace { async fn new_replica( config: &ReplicaNamespaceConfig, name: NamespaceName, - allow_creation: bool, reset: ResetCb, meta_store_handle: MetaStoreHandle, ) -> crate::Result { tracing::debug!("creating replica namespace"); let db_path = config.base_path.join("dbs").join(name.as_str()); - // there isn't a database folder for this database, and we're not allowed to create it. - if !allow_creation && !db_path.exists() { - return Err(crate::error::Error::NamespaceDoesntExist( - name.as_str().to_owned(), - )); - } - let rpc_client = ReplicationLogClient::with_origin(config.channel.clone(), config.uri.clone()); let client = @@ -912,7 +978,6 @@ impl Namespace { name: NamespaceName, restore_option: RestoreOption, bottomless_db_id: NamespaceBottomlessDbId, - allow_creation: bool, meta_store_handle: MetaStoreHandle, ) -> crate::Result { // FIXME: make that truly atomic. explore the idea of using temp directories, and it's implications @@ -921,7 +986,6 @@ impl Namespace { name.clone(), restore_option, bottomless_db_id, - allow_creation, meta_store_handle, ) .await @@ -942,21 +1006,11 @@ impl Namespace { name: NamespaceName, restore_option: RestoreOption, bottomless_db_id: NamespaceBottomlessDbId, - allow_creation: bool, meta_store_handle: MetaStoreHandle, ) -> crate::Result { - // if namespaces are disabled, then we allow creation for the default namespace. - let allow_creation = - allow_creation || (config.disable_namespace && name == NamespaceName::default()); - let mut join_set = JoinSet::new(); let db_path = config.base_path.join("dbs").join(name.as_str()); - // The database folder doesn't exist, bottomless replication is disabled (no db to recover) - // and we're not allowed to create a new database, return an error. - if !allow_creation && config.bottomless_replication.is_none() && !db_path.try_exists()? { - return Err(crate::error::Error::NamespaceDoesntExist(name.to_string())); - } let mut is_dirty = config.db_is_dirty; tokio::fs::create_dir_all(&db_path).await?; @@ -993,17 +1047,6 @@ impl Namespace { let options = make_bottomless_options(options, bottomless_db_id, name.clone()); let (replicator, did_recover) = init_bottomless_replicator(db_path.join("data"), options, &restore_option).await?; - - // There wasn't any database to recover from bottomless, and we are not allowed to - // create a new database - if !did_recover && !allow_creation && !db_path.try_exists()? { - // clean stale directory - // FIXME: this is not atomic, we could be left with a stale directory. Maybe do - // setup in a temp directory and then atomically rename it? - let _ = tokio::fs::remove_dir_all(&db_path).await; - return Err(crate::error::Error::NamespaceDoesntExist(name.to_string())); - } - is_dirty |= did_recover; Some(replicator) } else { diff --git a/libsql-server/src/test/bottomless.rs b/libsql-server/src/test/bottomless.rs index e8f1523c83..dc0a39e5ad 100644 --- a/libsql-server/src/test/bottomless.rs +++ b/libsql-server/src/test/bottomless.rs @@ -105,6 +105,7 @@ async fn configure_server( }, path: path.into().into(), disable_default_namespace: false, + max_active_namespaces: 100, heartbeat_config: None, idle_shutdown_timeout: None, initial_idle_shutdown_timeout: None,