Skip to content

Commit

Permalink
feat: Update catalog to use sequence number in path (#25526)
Browse files Browse the repository at this point in the history
Updates the catalog to use its own sequence number in the path. This will enable downstream Pro systems that pick up PersistedSnapshots to get the specific catalog that a snapshot is associated with since its sequence number is included.

Also updated the type to be CatalogSequenceNumber to make it more clear & readable when being used.
  • Loading branch information
pauldix authored Nov 8, 2024
1 parent 3bb63b2 commit 35e29d1
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 91 deletions.
14 changes: 7 additions & 7 deletions influxdb3_catalog/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ pub const TIME_COLUMN_NAME: &str = "time";
#[derive(
Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize,
)]
pub struct SequenceNumber(u32);
pub struct CatalogSequenceNumber(u32);

impl SequenceNumber {
impl CatalogSequenceNumber {
pub fn new(id: u32) -> Self {
Self(id)
}
Expand Down Expand Up @@ -202,7 +202,7 @@ impl Catalog {
self.inner.read().databases.values().cloned().collect()
}

pub fn sequence_number(&self) -> SequenceNumber {
pub fn sequence_number(&self) -> CatalogSequenceNumber {
self.inner.read().sequence
}

Expand Down Expand Up @@ -280,7 +280,7 @@ impl Catalog {
/// After the catalog has been persisted, mark it as not updated, if the sequence number
/// matches. If it doesn't then the catalog was updated while persistence was running and
/// will need to be persisted on the next snapshot.
pub fn set_updated_false_if_sequence_matches(&self, sequence_number: SequenceNumber) {
pub fn set_updated_false_if_sequence_matches(&self, sequence_number: CatalogSequenceNumber) {
let mut inner = self.inner.write();
if inner.sequence == sequence_number {
inner.updated = false;
Expand All @@ -297,7 +297,7 @@ impl Catalog {
pub struct InnerCatalog {
/// The catalog is a map of databases with their table schemas
databases: SerdeVecMap<DbId, Arc<DatabaseSchema>>,
sequence: SequenceNumber,
sequence: CatalogSequenceNumber,
/// The host_id is the prefix that is passed in when starting up (`host_identifier_prefix`)
host_id: Arc<str>,
/// The instance_id uniquely identifies the instance that generated the catalog
Expand Down Expand Up @@ -366,15 +366,15 @@ impl InnerCatalog {
pub(crate) fn new(host_id: Arc<str>, instance_id: Arc<str>) -> Self {
Self {
databases: SerdeVecMap::new(),
sequence: SequenceNumber::new(0),
sequence: CatalogSequenceNumber::new(0),
host_id,
instance_id,
updated: false,
db_map: BiHashMap::new(),
}
}

pub fn sequence_number(&self) -> SequenceNumber {
pub fn sequence_number(&self) -> CatalogSequenceNumber {
self.sequence
}

Expand Down
17 changes: 4 additions & 13 deletions influxdb3_write/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use datafusion::catalog::Session;
use datafusion::error::DataFusionError;
use datafusion::prelude::Expr;
use influxdb3_catalog::catalog::Catalog;
use influxdb3_catalog::catalog::{self, SequenceNumber};
use influxdb3_catalog::catalog::CatalogSequenceNumber;
use influxdb3_id::ParquetFileId;
use influxdb3_id::TableId;
use influxdb3_id::{ColumnId, DbId};
Expand Down Expand Up @@ -151,15 +151,6 @@ pub struct BufferedWriteRequest {
pub index_count: usize,
}

/// A persisted Catalog that contains the database, table, and column schemas.
#[derive(Debug, Serialize, Deserialize, Default)]
pub struct PersistedCatalog {
/// The wal file number that triggered the snapshot to persisst this catalog
pub wal_file_sequence_number: WalFileSequenceNumber,
/// The catalog that was persisted.
pub catalog: catalog::InnerCatalog,
}

/// The collection of Parquet files that were persisted in a snapshot
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
pub struct PersistedSnapshot {
Expand All @@ -178,7 +169,7 @@ pub struct PersistedSnapshot {
/// The wal file sequence number that triggered this snapshot
pub wal_file_sequence_number: WalFileSequenceNumber,
/// The catalog sequence number associated with this snapshot
pub catalog_sequence_number: SequenceNumber,
pub catalog_sequence_number: CatalogSequenceNumber,
/// The size of the snapshot parquet files in bytes.
pub parquet_size_bytes: u64,
/// The number of rows across all parquet files in the snapshot.
Expand All @@ -197,7 +188,7 @@ impl PersistedSnapshot {
host_id: String,
snapshot_sequence_number: SnapshotSequenceNumber,
wal_file_sequence_number: WalFileSequenceNumber,
catalog_sequence_number: SequenceNumber,
catalog_sequence_number: CatalogSequenceNumber,
) -> Self {
Self {
host_id,
Expand Down Expand Up @@ -326,10 +317,10 @@ pub(crate) fn guess_precision(timestamp: i64) -> Precision {

#[cfg(test)]
mod test_helpers {
use crate::catalog::Catalog;
use crate::write_buffer::validator::WriteValidator;
use crate::Precision;
use data_types::NamespaceName;
use influxdb3_catalog::catalog::Catalog;
use influxdb3_wal::{Gen1Duration, WriteBatch};
use iox_time::Time;
use std::sync::Arc;
Expand Down
9 changes: 5 additions & 4 deletions influxdb3_write/src/paths.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use chrono::prelude::*;
use influxdb3_catalog::catalog::CatalogSequenceNumber;
use influxdb3_wal::{SnapshotSequenceNumber, WalFileSequenceNumber};
use object_store::path::Path as ObjPath;
use std::ops::Deref;
Expand All @@ -20,11 +21,11 @@ fn object_store_file_stem(n: u64) -> u64 {
pub struct CatalogFilePath(ObjPath);

impl CatalogFilePath {
pub fn new(host_prefix: &str, wal_file_sequence_number: WalFileSequenceNumber) -> Self {
pub fn new(host_prefix: &str, catalog_sequence_number: CatalogSequenceNumber) -> Self {
let num = u64::MAX - catalog_sequence_number.as_u32() as u64;
let path = ObjPath::from(format!(
"{host_prefix}/catalogs/{:020}.{}",
object_store_file_stem(wal_file_sequence_number.as_u64()),
CATALOG_FILE_EXTENSION
num, CATALOG_FILE_EXTENSION
));
Self(path)
}
Expand Down Expand Up @@ -123,7 +124,7 @@ impl AsRef<ObjPath> for SnapshotInfoFilePath {
#[test]
fn catalog_file_path_new() {
assert_eq!(
*CatalogFilePath::new("my_host", WalFileSequenceNumber::new(0)),
*CatalogFilePath::new("my_host", CatalogSequenceNumber::new(0)),
ObjPath::from("my_host/catalogs/18446744073709551615.json")
);
}
Expand Down
81 changes: 24 additions & 57 deletions influxdb3_write/src/persister.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use crate::last_cache;
use crate::paths::CatalogFilePath;
use crate::paths::ParquetFilePath;
use crate::paths::SnapshotInfoFilePath;
use crate::PersistedCatalog;
use crate::PersistedSnapshot;
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
Expand All @@ -22,7 +21,6 @@ use futures_util::stream::StreamExt;
use futures_util::stream::TryStreamExt;
use influxdb3_catalog::catalog::Catalog;
use influxdb3_catalog::catalog::InnerCatalog;
use influxdb3_wal::WalFileSequenceNumber;
use object_store::path::Path as ObjPath;
use object_store::ObjectStore;
use observability_deps::tracing::info;
Expand Down Expand Up @@ -124,15 +122,14 @@ impl Persister {
/// instance id and create a new catalog and persist it immediately
pub async fn load_or_create_catalog(&self) -> Result<Catalog> {
let catalog = match self.load_catalog().await? {
Some(c) => Catalog::from_inner(c.catalog),
Some(c) => Catalog::from_inner(c),
None => {
let uuid = Uuid::new_v4().to_string();
let instance_id = Arc::from(uuid.as_str());
info!(instance_id = ?instance_id, "Catalog not found, creating new instance id");
let new_catalog =
Catalog::new(Arc::from(self.host_identifier_prefix.as_str()), instance_id);
self.persist_catalog(WalFileSequenceNumber::new(0), &new_catalog)
.await?;
self.persist_catalog(&new_catalog).await?;
new_catalog
}
};
Expand All @@ -142,7 +139,7 @@ impl Persister {
/// Loads the most recently persisted catalog from object storage.
///
/// This is used on server start.
pub async fn load_catalog(&self) -> Result<Option<PersistedCatalog>> {
pub async fn load_catalog(&self) -> Result<Option<InnerCatalog>> {
let mut list = self
.object_store
.list(Some(&CatalogFilePath::dir(&self.host_identifier_prefix)));
Expand Down Expand Up @@ -181,18 +178,7 @@ impl Persister {
Some(path) => {
let bytes = self.object_store.get(&path).await?.bytes().await?;
let catalog: InnerCatalog = serde_json::from_slice(&bytes)?;
let file_name = path
.filename()
// NOTE: this holds so long as CatalogFilePath is used
// from crate::paths
.expect("catalog file paths are guaranteed to have a filename");
let parsed_number = file_name
.trim_end_matches(format!(".{}", crate::paths::CATALOG_FILE_EXTENSION).as_str())
.parse::<u64>()?;
Ok(Some(PersistedCatalog {
wal_file_sequence_number: WalFileSequenceNumber::new(u64::MAX - parsed_number),
catalog,
}))
Ok(Some(catalog))
}
}
}
Expand Down Expand Up @@ -268,14 +254,10 @@ impl Persister {

/// Persists the catalog with the given `WalFileSequenceNumber`. If this is the highest ID, it will
/// be the catalog that is returned the next time `load_catalog` is called.
pub async fn persist_catalog(
&self,
wal_file_sequence_number: WalFileSequenceNumber,
catalog: &Catalog,
) -> Result<()> {
pub async fn persist_catalog(&self, catalog: &Catalog) -> Result<()> {
let catalog_path = CatalogFilePath::new(
self.host_identifier_prefix.as_str(),
wal_file_sequence_number,
catalog.sequence_number(),
);
let json = serde_json::to_vec_pretty(&catalog)?;
self.object_store
Expand Down Expand Up @@ -415,9 +397,9 @@ impl<W: Write + Send> TrackedMemoryArrowWriter<W> {
mod tests {
use super::*;
use crate::ParquetFileId;
use influxdb3_catalog::catalog::SequenceNumber;
use influxdb3_catalog::catalog::CatalogSequenceNumber;
use influxdb3_id::{ColumnId, DbId, TableId};
use influxdb3_wal::SnapshotSequenceNumber;
use influxdb3_wal::{SnapshotSequenceNumber, WalFileSequenceNumber};
use object_store::memory::InMemory;
use observability_deps::tracing::info;
use pretty_assertions::assert_eq;
Expand All @@ -438,10 +420,7 @@ mod tests {
let catalog = Catalog::new(host_id, instance_id);
let _ = catalog.db_or_create("my_db");

persister
.persist_catalog(WalFileSequenceNumber::new(0), &catalog)
.await
.unwrap();
persister.persist_catalog(&catalog).await.unwrap();
}

#[tokio::test]
Expand All @@ -454,33 +433,23 @@ mod tests {
let catalog = Catalog::new(host_id.clone(), instance_id.clone());
let _ = catalog.db_or_create("my_db");

persister
.persist_catalog(WalFileSequenceNumber::new(0), &catalog)
.await
.unwrap();
persister.persist_catalog(&catalog).await.unwrap();

let catalog = Catalog::new(host_id.clone(), instance_id.clone());
let _ = catalog.db_or_create("my_second_db");

persister
.persist_catalog(WalFileSequenceNumber::new(1), &catalog)
.await
.unwrap();
persister.persist_catalog(&catalog).await.unwrap();

let catalog = persister
.load_catalog()
.await
.expect("loading the catalog did not cause an error")
.expect("there was a catalog to load");

assert_eq!(
catalog.wal_file_sequence_number,
WalFileSequenceNumber::new(1)
);
// my_second_db
assert!(catalog.catalog.db_exists(DbId::from(1)));
assert!(catalog.db_exists(DbId::from(1)));
// my_db
assert!(!catalog.catalog.db_exists(DbId::from(0)));
assert!(!catalog.db_exists(DbId::from(0)));
}

#[tokio::test]
Expand All @@ -496,7 +465,7 @@ mod tests {
next_column_id: ColumnId::from(1),
snapshot_sequence_number: SnapshotSequenceNumber::new(0),
wal_file_sequence_number: WalFileSequenceNumber::new(0),
catalog_sequence_number: SequenceNumber::new(0),
catalog_sequence_number: CatalogSequenceNumber::new(0),
databases: HashMap::new(),
min_time: 0,
max_time: 1,
Expand All @@ -520,7 +489,7 @@ mod tests {
next_column_id: ColumnId::from(1),
snapshot_sequence_number: SnapshotSequenceNumber::new(0),
wal_file_sequence_number: WalFileSequenceNumber::new(0),
catalog_sequence_number: SequenceNumber::default(),
catalog_sequence_number: CatalogSequenceNumber::default(),
databases: HashMap::new(),
min_time: 0,
max_time: 1,
Expand All @@ -535,7 +504,7 @@ mod tests {
next_column_id: ColumnId::from(1),
snapshot_sequence_number: SnapshotSequenceNumber::new(1),
wal_file_sequence_number: WalFileSequenceNumber::new(1),
catalog_sequence_number: SequenceNumber::default(),
catalog_sequence_number: CatalogSequenceNumber::default(),
databases: HashMap::new(),
max_time: 1,
min_time: 0,
Expand All @@ -550,7 +519,7 @@ mod tests {
next_column_id: ColumnId::from(1),
snapshot_sequence_number: SnapshotSequenceNumber::new(2),
wal_file_sequence_number: WalFileSequenceNumber::new(2),
catalog_sequence_number: SequenceNumber::default(),
catalog_sequence_number: CatalogSequenceNumber::default(),
databases: HashMap::new(),
min_time: 0,
max_time: 1,
Expand Down Expand Up @@ -586,7 +555,7 @@ mod tests {
next_column_id: ColumnId::from(1),
snapshot_sequence_number: SnapshotSequenceNumber::new(0),
wal_file_sequence_number: WalFileSequenceNumber::new(0),
catalog_sequence_number: SequenceNumber::default(),
catalog_sequence_number: CatalogSequenceNumber::default(),
databases: HashMap::new(),
min_time: 0,
max_time: 1,
Expand Down Expand Up @@ -615,7 +584,7 @@ mod tests {
next_column_id: ColumnId::from(1),
snapshot_sequence_number: SnapshotSequenceNumber::new(id),
wal_file_sequence_number: WalFileSequenceNumber::new(id),
catalog_sequence_number: SequenceNumber::new(id as u32),
catalog_sequence_number: CatalogSequenceNumber::new(id as u32),
databases: HashMap::new(),
min_time: 0,
max_time: 1,
Expand Down Expand Up @@ -644,7 +613,7 @@ mod tests {
"test_host".to_string(),
SnapshotSequenceNumber::new(0),
WalFileSequenceNumber::new(0),
SequenceNumber::new(0),
CatalogSequenceNumber::new(0),
);

for _ in 0..=9875 {
Expand Down Expand Up @@ -761,11 +730,9 @@ mod tests {
info!(local_disk = ?local_disk, "Using local disk");
let persister = Persister::new(Arc::new(local_disk), "test_host");
let _ = persister.load_or_create_catalog().await.unwrap();
let persisted_catalog = persister.load_catalog().await.unwrap().unwrap();
assert_eq!(
persisted_catalog.wal_file_sequence_number,
WalFileSequenceNumber::new(0)
);
let persisted_catalog = persister.load_catalog().await.unwrap();

assert!(persisted_catalog.is_some());
}

#[test_log::test(tokio::test)]
Expand All @@ -785,7 +752,7 @@ mod tests {
LocalFileSystem::new_with_prefix(test_helpers::tmp_dir().unwrap()).unwrap();
info!(local_disk = ?local_disk, "Using local disk");

let catalog_path = CatalogFilePath::new("test_host", WalFileSequenceNumber::new(0));
let catalog_path = CatalogFilePath::new("test_host", CatalogSequenceNumber::new(0));
let _ = local_disk
.put(&catalog_path, catalog_json.into())
.await
Expand Down
Loading

0 comments on commit 35e29d1

Please sign in to comment.