diff --git a/influxdb3_catalog/src/catalog.rs b/influxdb3_catalog/src/catalog.rs index 6cf1d230723..6d3ec713b35 100644 --- a/influxdb3_catalog/src/catalog.rs +++ b/influxdb3_catalog/src/catalog.rs @@ -79,9 +79,9 @@ pub const TIME_COLUMN_NAME: &str = "time"; #[derive( Debug, Default, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize, )] -pub struct SequenceNumber(u32); +pub struct CatalogSequenceNumber(u32); -impl SequenceNumber { +impl CatalogSequenceNumber { pub fn new(id: u32) -> Self { Self(id) } @@ -202,7 +202,7 @@ impl Catalog { self.inner.read().databases.values().cloned().collect() } - pub fn sequence_number(&self) -> SequenceNumber { + pub fn sequence_number(&self) -> CatalogSequenceNumber { self.inner.read().sequence } @@ -280,7 +280,7 @@ impl Catalog { /// After the catalog has been persisted, mark it as not updated, if the sequence number /// matches. If it doesn't then the catalog was updated while persistence was running and /// will need to be persisted on the next snapshot. - pub fn set_updated_false_if_sequence_matches(&self, sequence_number: SequenceNumber) { + pub fn set_updated_false_if_sequence_matches(&self, sequence_number: CatalogSequenceNumber) { let mut inner = self.inner.write(); if inner.sequence == sequence_number { inner.updated = false; @@ -297,7 +297,7 @@ impl Catalog { pub struct InnerCatalog { /// The catalog is a map of databases with their table schemas databases: SerdeVecMap>, - sequence: SequenceNumber, + sequence: CatalogSequenceNumber, /// The host_id is the prefix that is passed in when starting up (`host_identifier_prefix`) host_id: Arc, /// The instance_id uniquely identifies the instance that generated the catalog @@ -366,7 +366,7 @@ impl InnerCatalog { pub(crate) fn new(host_id: Arc, instance_id: Arc) -> Self { Self { databases: SerdeVecMap::new(), - sequence: SequenceNumber::new(0), + sequence: CatalogSequenceNumber::new(0), host_id, instance_id, updated: false, @@ -374,7 +374,7 @@ impl InnerCatalog { } } - pub fn sequence_number(&self) -> SequenceNumber { + pub fn sequence_number(&self) -> CatalogSequenceNumber { self.sequence } diff --git a/influxdb3_write/src/lib.rs b/influxdb3_write/src/lib.rs index b8b844a5934..f7a05d26b1f 100644 --- a/influxdb3_write/src/lib.rs +++ b/influxdb3_write/src/lib.rs @@ -17,7 +17,7 @@ use datafusion::catalog::Session; use datafusion::error::DataFusionError; use datafusion::prelude::Expr; use influxdb3_catalog::catalog::Catalog; -use influxdb3_catalog::catalog::{self, SequenceNumber}; +use influxdb3_catalog::catalog::CatalogSequenceNumber; use influxdb3_id::ParquetFileId; use influxdb3_id::TableId; use influxdb3_id::{ColumnId, DbId}; @@ -151,15 +151,6 @@ pub struct BufferedWriteRequest { pub index_count: usize, } -/// A persisted Catalog that contains the database, table, and column schemas. -#[derive(Debug, Serialize, Deserialize, Default)] -pub struct PersistedCatalog { - /// The wal file number that triggered the snapshot to persisst this catalog - pub wal_file_sequence_number: WalFileSequenceNumber, - /// The catalog that was persisted. - pub catalog: catalog::InnerCatalog, -} - /// The collection of Parquet files that were persisted in a snapshot #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)] pub struct PersistedSnapshot { @@ -178,7 +169,7 @@ pub struct PersistedSnapshot { /// The wal file sequence number that triggered this snapshot pub wal_file_sequence_number: WalFileSequenceNumber, /// The catalog sequence number associated with this snapshot - pub catalog_sequence_number: SequenceNumber, + pub catalog_sequence_number: CatalogSequenceNumber, /// The size of the snapshot parquet files in bytes. pub parquet_size_bytes: u64, /// The number of rows across all parquet files in the snapshot. @@ -197,7 +188,7 @@ impl PersistedSnapshot { host_id: String, snapshot_sequence_number: SnapshotSequenceNumber, wal_file_sequence_number: WalFileSequenceNumber, - catalog_sequence_number: SequenceNumber, + catalog_sequence_number: CatalogSequenceNumber, ) -> Self { Self { host_id, @@ -326,10 +317,10 @@ pub(crate) fn guess_precision(timestamp: i64) -> Precision { #[cfg(test)] mod test_helpers { - use crate::catalog::Catalog; use crate::write_buffer::validator::WriteValidator; use crate::Precision; use data_types::NamespaceName; + use influxdb3_catalog::catalog::Catalog; use influxdb3_wal::{Gen1Duration, WriteBatch}; use iox_time::Time; use std::sync::Arc; diff --git a/influxdb3_write/src/paths.rs b/influxdb3_write/src/paths.rs index 477f9bc10af..b6c88d2a404 100644 --- a/influxdb3_write/src/paths.rs +++ b/influxdb3_write/src/paths.rs @@ -1,4 +1,5 @@ use chrono::prelude::*; +use influxdb3_catalog::catalog::CatalogSequenceNumber; use influxdb3_wal::{SnapshotSequenceNumber, WalFileSequenceNumber}; use object_store::path::Path as ObjPath; use std::ops::Deref; @@ -20,11 +21,11 @@ fn object_store_file_stem(n: u64) -> u64 { pub struct CatalogFilePath(ObjPath); impl CatalogFilePath { - pub fn new(host_prefix: &str, wal_file_sequence_number: WalFileSequenceNumber) -> Self { + pub fn new(host_prefix: &str, catalog_sequence_number: CatalogSequenceNumber) -> Self { + let num = u64::MAX - catalog_sequence_number.as_u32() as u64; let path = ObjPath::from(format!( "{host_prefix}/catalogs/{:020}.{}", - object_store_file_stem(wal_file_sequence_number.as_u64()), - CATALOG_FILE_EXTENSION + num, CATALOG_FILE_EXTENSION )); Self(path) } @@ -123,7 +124,7 @@ impl AsRef for SnapshotInfoFilePath { #[test] fn catalog_file_path_new() { assert_eq!( - *CatalogFilePath::new("my_host", WalFileSequenceNumber::new(0)), + *CatalogFilePath::new("my_host", CatalogSequenceNumber::new(0)), ObjPath::from("my_host/catalogs/18446744073709551615.json") ); } diff --git a/influxdb3_write/src/persister.rs b/influxdb3_write/src/persister.rs index c07fead3d14..c8cdee48016 100644 --- a/influxdb3_write/src/persister.rs +++ b/influxdb3_write/src/persister.rs @@ -5,7 +5,6 @@ use crate::last_cache; use crate::paths::CatalogFilePath; use crate::paths::ParquetFilePath; use crate::paths::SnapshotInfoFilePath; -use crate::PersistedCatalog; use crate::PersistedSnapshot; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; @@ -22,7 +21,6 @@ use futures_util::stream::StreamExt; use futures_util::stream::TryStreamExt; use influxdb3_catalog::catalog::Catalog; use influxdb3_catalog::catalog::InnerCatalog; -use influxdb3_wal::WalFileSequenceNumber; use object_store::path::Path as ObjPath; use object_store::ObjectStore; use observability_deps::tracing::info; @@ -124,15 +122,14 @@ impl Persister { /// instance id and create a new catalog and persist it immediately pub async fn load_or_create_catalog(&self) -> Result { let catalog = match self.load_catalog().await? { - Some(c) => Catalog::from_inner(c.catalog), + Some(c) => Catalog::from_inner(c), None => { let uuid = Uuid::new_v4().to_string(); let instance_id = Arc::from(uuid.as_str()); info!(instance_id = ?instance_id, "Catalog not found, creating new instance id"); let new_catalog = Catalog::new(Arc::from(self.host_identifier_prefix.as_str()), instance_id); - self.persist_catalog(WalFileSequenceNumber::new(0), &new_catalog) - .await?; + self.persist_catalog(&new_catalog).await?; new_catalog } }; @@ -142,7 +139,7 @@ impl Persister { /// Loads the most recently persisted catalog from object storage. /// /// This is used on server start. - pub async fn load_catalog(&self) -> Result> { + pub async fn load_catalog(&self) -> Result> { let mut list = self .object_store .list(Some(&CatalogFilePath::dir(&self.host_identifier_prefix))); @@ -181,18 +178,7 @@ impl Persister { Some(path) => { let bytes = self.object_store.get(&path).await?.bytes().await?; let catalog: InnerCatalog = serde_json::from_slice(&bytes)?; - let file_name = path - .filename() - // NOTE: this holds so long as CatalogFilePath is used - // from crate::paths - .expect("catalog file paths are guaranteed to have a filename"); - let parsed_number = file_name - .trim_end_matches(format!(".{}", crate::paths::CATALOG_FILE_EXTENSION).as_str()) - .parse::()?; - Ok(Some(PersistedCatalog { - wal_file_sequence_number: WalFileSequenceNumber::new(u64::MAX - parsed_number), - catalog, - })) + Ok(Some(catalog)) } } } @@ -268,14 +254,10 @@ impl Persister { /// Persists the catalog with the given `WalFileSequenceNumber`. If this is the highest ID, it will /// be the catalog that is returned the next time `load_catalog` is called. - pub async fn persist_catalog( - &self, - wal_file_sequence_number: WalFileSequenceNumber, - catalog: &Catalog, - ) -> Result<()> { + pub async fn persist_catalog(&self, catalog: &Catalog) -> Result<()> { let catalog_path = CatalogFilePath::new( self.host_identifier_prefix.as_str(), - wal_file_sequence_number, + catalog.sequence_number(), ); let json = serde_json::to_vec_pretty(&catalog)?; self.object_store @@ -415,9 +397,9 @@ impl TrackedMemoryArrowWriter { mod tests { use super::*; use crate::ParquetFileId; - use influxdb3_catalog::catalog::SequenceNumber; + use influxdb3_catalog::catalog::CatalogSequenceNumber; use influxdb3_id::{ColumnId, DbId, TableId}; - use influxdb3_wal::SnapshotSequenceNumber; + use influxdb3_wal::{SnapshotSequenceNumber, WalFileSequenceNumber}; use object_store::memory::InMemory; use observability_deps::tracing::info; use pretty_assertions::assert_eq; @@ -438,10 +420,7 @@ mod tests { let catalog = Catalog::new(host_id, instance_id); let _ = catalog.db_or_create("my_db"); - persister - .persist_catalog(WalFileSequenceNumber::new(0), &catalog) - .await - .unwrap(); + persister.persist_catalog(&catalog).await.unwrap(); } #[tokio::test] @@ -454,18 +433,12 @@ mod tests { let catalog = Catalog::new(host_id.clone(), instance_id.clone()); let _ = catalog.db_or_create("my_db"); - persister - .persist_catalog(WalFileSequenceNumber::new(0), &catalog) - .await - .unwrap(); + persister.persist_catalog(&catalog).await.unwrap(); let catalog = Catalog::new(host_id.clone(), instance_id.clone()); let _ = catalog.db_or_create("my_second_db"); - persister - .persist_catalog(WalFileSequenceNumber::new(1), &catalog) - .await - .unwrap(); + persister.persist_catalog(&catalog).await.unwrap(); let catalog = persister .load_catalog() @@ -473,14 +446,10 @@ mod tests { .expect("loading the catalog did not cause an error") .expect("there was a catalog to load"); - assert_eq!( - catalog.wal_file_sequence_number, - WalFileSequenceNumber::new(1) - ); // my_second_db - assert!(catalog.catalog.db_exists(DbId::from(1))); + assert!(catalog.db_exists(DbId::from(1))); // my_db - assert!(!catalog.catalog.db_exists(DbId::from(0))); + assert!(!catalog.db_exists(DbId::from(0))); } #[tokio::test] @@ -496,7 +465,7 @@ mod tests { next_column_id: ColumnId::from(1), snapshot_sequence_number: SnapshotSequenceNumber::new(0), wal_file_sequence_number: WalFileSequenceNumber::new(0), - catalog_sequence_number: SequenceNumber::new(0), + catalog_sequence_number: CatalogSequenceNumber::new(0), databases: HashMap::new(), min_time: 0, max_time: 1, @@ -520,7 +489,7 @@ mod tests { next_column_id: ColumnId::from(1), snapshot_sequence_number: SnapshotSequenceNumber::new(0), wal_file_sequence_number: WalFileSequenceNumber::new(0), - catalog_sequence_number: SequenceNumber::default(), + catalog_sequence_number: CatalogSequenceNumber::default(), databases: HashMap::new(), min_time: 0, max_time: 1, @@ -535,7 +504,7 @@ mod tests { next_column_id: ColumnId::from(1), snapshot_sequence_number: SnapshotSequenceNumber::new(1), wal_file_sequence_number: WalFileSequenceNumber::new(1), - catalog_sequence_number: SequenceNumber::default(), + catalog_sequence_number: CatalogSequenceNumber::default(), databases: HashMap::new(), max_time: 1, min_time: 0, @@ -550,7 +519,7 @@ mod tests { next_column_id: ColumnId::from(1), snapshot_sequence_number: SnapshotSequenceNumber::new(2), wal_file_sequence_number: WalFileSequenceNumber::new(2), - catalog_sequence_number: SequenceNumber::default(), + catalog_sequence_number: CatalogSequenceNumber::default(), databases: HashMap::new(), min_time: 0, max_time: 1, @@ -586,7 +555,7 @@ mod tests { next_column_id: ColumnId::from(1), snapshot_sequence_number: SnapshotSequenceNumber::new(0), wal_file_sequence_number: WalFileSequenceNumber::new(0), - catalog_sequence_number: SequenceNumber::default(), + catalog_sequence_number: CatalogSequenceNumber::default(), databases: HashMap::new(), min_time: 0, max_time: 1, @@ -615,7 +584,7 @@ mod tests { next_column_id: ColumnId::from(1), snapshot_sequence_number: SnapshotSequenceNumber::new(id), wal_file_sequence_number: WalFileSequenceNumber::new(id), - catalog_sequence_number: SequenceNumber::new(id as u32), + catalog_sequence_number: CatalogSequenceNumber::new(id as u32), databases: HashMap::new(), min_time: 0, max_time: 1, @@ -644,7 +613,7 @@ mod tests { "test_host".to_string(), SnapshotSequenceNumber::new(0), WalFileSequenceNumber::new(0), - SequenceNumber::new(0), + CatalogSequenceNumber::new(0), ); for _ in 0..=9875 { @@ -761,11 +730,9 @@ mod tests { info!(local_disk = ?local_disk, "Using local disk"); let persister = Persister::new(Arc::new(local_disk), "test_host"); let _ = persister.load_or_create_catalog().await.unwrap(); - let persisted_catalog = persister.load_catalog().await.unwrap().unwrap(); - assert_eq!( - persisted_catalog.wal_file_sequence_number, - WalFileSequenceNumber::new(0) - ); + let persisted_catalog = persister.load_catalog().await.unwrap(); + + assert!(persisted_catalog.is_some()); } #[test_log::test(tokio::test)] @@ -785,7 +752,7 @@ mod tests { LocalFileSystem::new_with_prefix(test_helpers::tmp_dir().unwrap()).unwrap(); info!(local_disk = ?local_disk, "Using local disk"); - let catalog_path = CatalogFilePath::new("test_host", WalFileSequenceNumber::new(0)); + let catalog_path = CatalogFilePath::new("test_host", CatalogSequenceNumber::new(0)); let _ = local_disk .put(&catalog_path, catalog_json.into()) .await diff --git a/influxdb3_write/src/write_buffer/mod.rs b/influxdb3_write/src/write_buffer/mod.rs index e1b5fed8a66..0aa50f02057 100644 --- a/influxdb3_write/src/write_buffer/mod.rs +++ b/influxdb3_write/src/write_buffer/mod.rs @@ -541,7 +541,7 @@ mod tests { use bytes::Bytes; use datafusion_util::config::register_iox_object_store; use futures_util::StreamExt; - use influxdb3_catalog::catalog::SequenceNumber; + use influxdb3_catalog::catalog::CatalogSequenceNumber; use influxdb3_id::{DbId, ParquetFileId}; use influxdb3_test_helpers::object_store::RequestCountedObjectStore; use influxdb3_wal::{Gen1Duration, SnapshotSequenceNumber, WalFileSequenceNumber}; @@ -1137,7 +1137,7 @@ mod tests { "test_host".to_string(), prev_snapshot_seq, WalFileSequenceNumber::new(0), - SequenceNumber::new(0), + CatalogSequenceNumber::new(0), ); let snapshot_json = serde_json::to_vec(&prev_snapshot).unwrap(); // set ParquetFileId to be 0 so that we can make sure when it's loaded from the @@ -1231,7 +1231,7 @@ mod tests { // If we manage to make it so that scenario only increments the catalog sequence once, then // this assertion may fail: assert_eq!( - SequenceNumber::new(2), + CatalogSequenceNumber::new(2), persisted_snapshot.catalog_sequence_number ); } @@ -1247,7 +1247,7 @@ mod tests { "test_host".to_string(), prev_snapshot_seq, WalFileSequenceNumber::new(0), - SequenceNumber::new(0), + CatalogSequenceNumber::new(0), ); assert_eq!(prev_snapshot.next_file_id.as_u64(), 0); diff --git a/influxdb3_write/src/write_buffer/persisted_files.rs b/influxdb3_write/src/write_buffer/persisted_files.rs index 56002712121..39b2adf9b13 100644 --- a/influxdb3_write/src/write_buffer/persisted_files.rs +++ b/influxdb3_write/src/write_buffer/persisted_files.rs @@ -159,7 +159,7 @@ fn update_persisted_files_with_snapshot( #[cfg(test)] mod tests { - use influxdb3_catalog::catalog::SequenceNumber; + use influxdb3_catalog::catalog::CatalogSequenceNumber; use influxdb3_wal::{SnapshotSequenceNumber, WalFileSequenceNumber}; use observability_deps::tracing::info; use pretty_assertions::assert_eq; @@ -257,7 +257,7 @@ mod tests { ) -> PersistedSnapshot { let snap1 = SnapshotSequenceNumber::new(snapshot_id); let wal1 = WalFileSequenceNumber::new(wal_id); - let cat1 = SequenceNumber::new(catalog_id); + let cat1 = CatalogSequenceNumber::new(catalog_id); let mut new_snapshot = PersistedSnapshot::new("sample-host-id".to_owned(), snap1, wal1, cat1); parquet_files.into_iter().for_each(|file| { diff --git a/influxdb3_write/src/write_buffer/queryable_buffer.rs b/influxdb3_write/src/write_buffer/queryable_buffer.rs index a70aab58e64..8a1dbbd74e2 100644 --- a/influxdb3_write/src/write_buffer/queryable_buffer.rs +++ b/influxdb3_write/src/write_buffer/queryable_buffer.rs @@ -215,7 +215,7 @@ impl QueryableBuffer { let sequence_number = inner_catalog.sequence_number(); match persister - .persist_catalog(wal_file_number, &Catalog::from_inner(inner_catalog)) + .persist_catalog(&Catalog::from_inner(inner_catalog)) .await { Ok(_) => { diff --git a/influxdb3_write/src/write_buffer/validator.rs b/influxdb3_write/src/write_buffer/validator.rs index 507487eaf72..fccbaa87927 100644 --- a/influxdb3_write/src/write_buffer/validator.rs +++ b/influxdb3_write/src/write_buffer/validator.rs @@ -806,14 +806,14 @@ fn apply_precision_to_timestamp(precision: Precision, ts: i64) -> i64 { mod tests { use std::sync::Arc; - use crate::{catalog::Catalog, write_buffer::Error, Precision}; + use super::WriteValidator; + use crate::{write_buffer::Error, Precision}; use data_types::NamespaceName; + use influxdb3_catalog::catalog::Catalog; use influxdb3_id::TableId; use influxdb3_wal::Gen1Duration; use iox_time::Time; - use super::WriteValidator; - #[test] fn write_validator_v1() -> Result<(), Error> { let host_id = Arc::from("sample-host-id");