Skip to content

Commit

Permalink
refactor: re-introduce catalog snapshot tests update serde code
Browse files Browse the repository at this point in the history
The catalog types no longer serialize the id maps, since their info is
intrinsic to the list of schema objects themselves.
  • Loading branch information
hiltontj committed Nov 1, 2024
1 parent 5d20f47 commit f2d4d4f
Show file tree
Hide file tree
Showing 8 changed files with 567 additions and 50 deletions.
25 changes: 22 additions & 3 deletions influxdb3_catalog/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,14 +428,12 @@ impl InnerCatalog {
}
}

#[serde_with::serde_as]
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)]
#[derive(Debug, Eq, PartialEq, Clone)]
pub struct DatabaseSchema {
pub id: DbId,
pub name: Arc<str>,
/// The database is a map of tables
pub tables: SerdeVecMap<TableId, Arc<TableDefinition>>,
#[serde_as(as = "TableMapAsArray")]
pub table_map: BiHashMap<TableId, Arc<str>>,
}

Expand Down Expand Up @@ -1091,6 +1089,13 @@ mod tests {
.databases
.insert(database.id, Arc::new(database));

insta::with_settings!({
sort_maps => true,
description => "catalog serialization to help catch breaking changes"
}, {
insta::assert_json_snapshot!(catalog);
});

// Serialize/deserialize to ensure roundtrip to/from JSON
let serialized = serde_json::to_string(&catalog).unwrap();
let deserialized_inner: InnerCatalog = serde_json::from_str(&serialized).unwrap();
Expand Down Expand Up @@ -1315,6 +1320,13 @@ mod tests {
.databases
.insert(database.id, Arc::new(database));

insta::with_settings!({
sort_maps => true,
description => "catalog serialization to help catch breaking changes"
}, {
insta::assert_json_snapshot!(catalog);
});

let serialized = serde_json::to_string(&catalog).unwrap();
let deserialized_inner: InnerCatalog = serde_json::from_str(&serialized).unwrap();
let deserialized = Catalog::from_inner(deserialized_inner);
Expand Down Expand Up @@ -1372,6 +1384,13 @@ mod tests {
.databases
.insert(database.id, Arc::new(database));

insta::with_settings!({
sort_maps => true,
description => "catalog serialization to help catch breaking changes"
}, {
insta::assert_json_snapshot!(catalog);
});

let serialized = serde_json::to_string(&catalog).unwrap();
let deserialized_inner: InnerCatalog = serde_json::from_str(&serialized).unwrap();
let deserialized = Catalog::from_inner(deserialized_inner);
Expand Down
91 changes: 62 additions & 29 deletions influxdb3_catalog/src/serialize.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use crate::catalog::ColumnDefinition;
use crate::catalog::DatabaseSchema;
use crate::catalog::TableDefinition;
use arrow::datatypes::DataType as ArrowDataType;
use bimap::BiHashMap;
use influxdb3_id::ColumnId;
use influxdb3_id::DbId;
use influxdb3_id::SerdeVecMap;
use influxdb3_id::TableId;
use influxdb3_wal::{LastCacheDefinition, LastCacheValueColumnsDef};
Expand All @@ -12,6 +14,66 @@ use schema::TIME_DATA_TIMEZONE;
use serde::{Deserialize, Serialize};
use std::sync::Arc;

impl Serialize for DatabaseSchema {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let snapshot = DatabaseSnapshot::from(self);
snapshot.serialize(serializer)
}
}

impl<'de> Deserialize<'de> for DatabaseSchema {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
DatabaseSnapshot::deserialize(deserializer).map(Into::into)
}
}

#[derive(Debug, Serialize, Deserialize)]
struct DatabaseSnapshot {
id: DbId,
name: Arc<str>,
tables: SerdeVecMap<TableId, TableSnapshot>,
}

impl From<&DatabaseSchema> for DatabaseSnapshot {
fn from(db: &DatabaseSchema) -> Self {
Self {
id: db.id,
name: Arc::clone(&db.name),
tables: db
.tables
.iter()
.map(|(table_id, table_def)| (*table_id, table_def.as_ref().into()))
.collect(),
}
}
}

impl From<DatabaseSnapshot> for DatabaseSchema {
fn from(snap: DatabaseSnapshot) -> Self {
let mut table_map = BiHashMap::with_capacity(snap.tables.len());
let tables = snap
.tables
.into_iter()
.map(|(id, table)| {
table_map.insert(id, Arc::clone(&table.table_name));
(id, Arc::new(table.into()))
})
.collect();
Self {
id: snap.id,
name: snap.name,
tables,
table_map,
}
}
}

impl Serialize for TableDefinition {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
Expand Down Expand Up @@ -48,35 +110,6 @@ struct TableSnapshot {
last_caches: Vec<LastCacheSnapshot>,
}

serde_with::serde_conv!(
ColumnMapAsArray,
BiHashMap<ColumnId, Arc<str>>,
|map: &BiHashMap<ColumnId, Arc<str>>| {
let mut vec = map.iter().fold(Vec::new(), |mut acc, (id, name)| {
acc.push(ColumnMap {
column_id: *id,
name: Arc::clone(&name)
});
acc
});

vec.sort_by_key(|col| col.column_id);
vec
},
|vec: Vec<ColumnMap>| -> Result<_, std::convert::Infallible> {
Ok(vec.into_iter().fold(BiHashMap::new(), |mut acc, column| {
acc.insert(column.column_id, column.name);
acc
}))
}
);

#[derive(Debug, Serialize, Deserialize)]
struct ColumnMap {
column_id: ColumnId,
name: Arc<str>,
}

/// Representation of Arrow's `DataType` for table snapshots.
///
/// Uses `#[non_exhaustive]` with the assumption that variants will be added as we support
Expand Down
Loading

0 comments on commit f2d4d4f

Please sign in to comment.