Skip to content

Commit

Permalink
one of those painful rebases: part I
Browse files Browse the repository at this point in the history
  • Loading branch information
psarna committed Dec 15, 2023
1 parent 4d913f9 commit 78cc9da
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 50 deletions.
6 changes: 4 additions & 2 deletions libsql-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,8 @@ where
self.db_config.snapshot_at_shutdown,
meta_store_path,
self.max_active_namespaces,
);
)
.await?;

// eagerly load the default namespace when namespaces are disabled
if self.disable_namespaces {
Expand Down Expand Up @@ -638,7 +639,8 @@ impl<C: Connector> Replica<C> {
false,
meta_store_path,
self.max_active_namespaces,
);
)
.await?;
let replication_service = ReplicationLogProxyService::new(channel.clone(), uri.clone());
let proxy_service = ReplicaProxyService::new(channel, uri, self.auth.clone());

Expand Down
7 changes: 7 additions & 0 deletions libsql-server/src/namespace/meta_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,13 @@ impl MetaStore {
inner: HandleState::External(change_tx, rx),
}
}

// TODO: we need to either make sure that the metastore is restored
// before we start accepting connections or we need to contact bottomless
// here to check if a namespace exists. Preferably the former.
pub fn exists(&self, namespace: &NamespaceName) -> bool {
self.inner.lock().configs.contains_key(namespace)
}
}

impl MetaStoreHandle {
Expand Down
62 changes: 14 additions & 48 deletions libsql-server/src/namespace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ mod fork;
pub mod meta_store;
pub mod replication_wal;

use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::fmt;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicBool, Ordering};
Expand Down Expand Up @@ -175,8 +173,6 @@ pub trait MakeNamespace: Sync + Send + 'static {
timestamp: Option<NaiveDateTime>,
meta_store: &MetaStore,
) -> crate::Result<Namespace<Self::Database>>;

async fn exists(&self, namespace: &NamespaceName) -> bool;
}

/// Creates new primary `Namespace`
Expand Down Expand Up @@ -292,34 +288,6 @@ impl MakeNamespace for PrimaryNamespaceMaker {
let ns = fork_task.fork().await?;
Ok(ns)
}

async fn exists(&self, namespace: &NamespaceName) -> bool {
let ns_path = self.config.base_path.join("dbs").join(namespace.as_str());
if let Ok(true) = ns_path.try_exists() {
return true;
}

if let Some(replication_options) = self.config.bottomless_replication.as_ref() {
tracing::info!("Bottomless: {:?}", replication_options);
match bottomless::replicator::Replicator::has_backup_of(namespace, replication_options)
.await
{
Ok(true) => {
tracing::debug!("Bottomless: Backup found");
return true;
}
Ok(false) => {
tracing::debug!("Bottomless: No backup found");
}
Err(err) => {
tracing::debug!("Bottomless: Error checking backup: {}", err);
}
}
} else {
tracing::debug!("Bottomless: No backup configured");
}
false
}
}

/// Creates new replica `Namespace`
Expand Down Expand Up @@ -375,11 +343,6 @@ impl MakeNamespace for ReplicaNamespaceMaker {
) -> crate::Result<Namespace<Self::Database>> {
return Err(ForkError::ForkReplica.into());
}

async fn exists(&self, namespace: &NamespaceName) -> bool {
let ns_path = self.config.base_path.join("dbs").join(namespace.as_str());
ns_path.try_exists().unwrap_or(false)
}
}

type NamespaceEntry<T> = Arc<RwLock<Option<Namespace<T>>>>;
Expand Down Expand Up @@ -408,13 +371,14 @@ struct NamespaceStoreInner<M: MakeNamespace> {
}

impl<M: MakeNamespace> NamespaceStore<M> {
pub fn new(
pub async fn new(
make_namespace: M,
allow_lazy_creation: bool,
snapshot_at_shutdown: bool,
meta_store_path: impl AsRef<Path>,
max_active_namespaces: usize,
) -> Self {
) -> crate::Result<Self> {
let metadata = MetaStore::new(meta_store_path).await?;
tracing::trace!("Max active namespaces: {max_active_namespaces}");
let store = Cache::<NamespaceName, NamespaceEntry<M::Database>>::builder()
.async_eviction_listener(move |name, ns, cause| {
Expand All @@ -437,7 +401,7 @@ impl<M: MakeNamespace> NamespaceStore<M> {
.max_capacity(max_active_namespaces as u64)
.time_to_idle(Duration::from_secs(86400))
.build();
Self {
Ok(Self {
inner: Arc::new(NamespaceStoreInner {
store,
metadata,
Expand All @@ -446,7 +410,7 @@ impl<M: MakeNamespace> NamespaceStore<M> {
has_shutdown: AtomicBool::new(false),
snapshot_at_shutdown,
}),
}
})
}

pub async fn destroy(&self, namespace: NamespaceName) -> crate::Result<()> {
Expand Down Expand Up @@ -559,6 +523,11 @@ impl<M: MakeNamespace> NamespaceStore<M> {
return Err(Error::NamespaceStoreShutdown);
}

// check that the source namespace exists
if !self.inner.metadata.exists(&from) {
return Err(crate::error::Error::NamespaceDoesntExist(from.to_string()));
}

let to_entry = self
.inner
.store
Expand All @@ -569,11 +538,6 @@ impl<M: MakeNamespace> NamespaceStore<M> {
return Err(crate::error::Error::NamespaceAlreadyExist(to.to_string()));
}

// check that the source namespace exists
if !self.inner.make_namespace.exists(&from).await {
return Err(crate::error::Error::NamespaceDoesntExist(from.to_string()));
}

let from_entry = self
.inner
.store
Expand Down Expand Up @@ -637,7 +601,7 @@ impl<M: MakeNamespace> NamespaceStore<M> {
let namespace = namespace.clone();
async move {
if namespace != NamespaceName::default()
&& !self.inner.make_namespace.exists(&namespace).await
&& !self.inner.metadata.exists(&namespace)
&& !self.inner.allow_lazy_creation
{
return Err(Error::NamespaceDoesntExist(namespace.to_string()));
Expand All @@ -650,6 +614,7 @@ impl<M: MakeNamespace> NamespaceStore<M> {
RestoreOption::Latest,
NamespaceBottomlessDbId::NotProvided,
self.make_reset_cb(),
&self.inner.metadata,
)
.await?;
tracing::info!("loaded namespace: `{namespace}`");
Expand Down Expand Up @@ -707,7 +672,7 @@ impl<M: MakeNamespace> NamespaceStore<M> {
// otherwise it's an error.
if self.inner.allow_lazy_creation || namespace == NamespaceName::default() {
tracing::trace!("auto-creating the namespace");
} else if self.inner.make_namespace.exists(&namespace).await {
} else if self.inner.metadata.exists(&namespace) {
return Err(Error::NamespaceAlreadyExist(namespace.to_string()));
}

Expand All @@ -722,6 +687,7 @@ impl<M: MakeNamespace> NamespaceStore<M> {
restore_option,
bottomless_db_id_for_init,
self.make_reset_cb(),
&self.inner.metadata,
)
.await;
match ns {
Expand Down

0 comments on commit 78cc9da

Please sign in to comment.