Skip to content

Commit

Permalink
feat: SerdeVecMap type for serializing ID maps (#25492)
Browse files Browse the repository at this point in the history
This PR introduces a new type `SerdeVecHashMap` that can be used in places where we need a HashMap with the following properties:
1. When serialized, it is serialized as a list of key-value pairs, instead of a map
2. When deserialized, it assumes the serialization format from (1.) and deserializes from a list of key-value pairs to a map
3. Does not allow for duplicate keys on deserialization

This is useful in places where we need to create map types that map from an identifier (integer) to some value, and need to serialize that data. For example: in the WAL when serializing write batches, and in the catalog when serializing the database/table schema.

This PR refactors the code in `influxdb3_wal` and `influxdb3_catalog` to use the new type for maps that use `DbId` and `TableId` as the key. Follow on work can give the same treatment to `ColumnId` based maps once that is fully worked out.

## Explanation

If we have a `HashMap<u32, String>`, `serde_json` will serialize it in the following way:
```json
{"0": "foo", "1": "bar"}
```
i.e., the integer keys are serialized as strings, since JSON doesn't support any other type of key in maps.

`SerdeVecHashMap<u32, String>` will be serialized by `serde_json` in the following way:
```json,
[[0, "foo"], [1, "bar"]]
```
and will deserialize from that vector-based structure back to the map. This allows serialization/deserialization to run directly off of the `HashMap`'s `Iterator`/`FromIterator` implementations.

## The Controversial Part

One thing I also did in this PR was switch the catalog from using a `BTreeMap` for tables to using the new `HashMap` type. This breaks the deterministic ordering of the database schema's `tables` map and therefore wrecks the snapshot tests we were using. I had to comment those parts of their respective tests out, because there isn't an easy way to make the underlying hashmap have a deterministic ordering just in tests that I am aware of.

If we think that using `BTreeMap` in the catalog is okay over a `HashMap`, then I think it would be okay to roll a similar `SerdeVecBTreeMap` type specifically for the catalog. Coincidentally, this may actually be a good use case for [`indexmap`](https://docs.rs/indexmap/latest/indexmap/), since it holds supposedly similar lookup performance characteristics to hashmap, while preserving order and _having faster iteration_ which could be a win for WAL serialization speed. It also accepts different hashing algorithms so could be swapped in with FNV like `HashMap` can.

## Follow-up work

Use the `SerdeVecHashMap` for column data in the WAL following #25461
  • Loading branch information
hiltontj authored Oct 25, 2024
1 parent cdb1e86 commit 0e814f5
Show file tree
Hide file tree
Showing 17 changed files with 580 additions and 861 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

234 changes: 105 additions & 129 deletions influxdb3_catalog/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use crate::catalog::Error::TableNotFound;
use arrow::datatypes::SchemaRef;
use bimap::BiHashMap;
use influxdb3_id::{ColumnId, DbId, TableId};
use influxdb3_id::{ColumnId, DbId, SerdeVecHashMap, TableId};
use influxdb3_wal::{
CatalogBatch, CatalogOp, FieldAdditions, LastCacheDefinition, LastCacheDelete,
};
Expand All @@ -12,7 +12,7 @@ use observability_deps::tracing::info;
use parking_lot::RwLock;
use schema::{InfluxColumnType, InfluxFieldType, Schema, SchemaBuilder};
use serde::{Deserialize, Serialize, Serializer};
use std::collections::{BTreeMap, HashMap};
use std::collections::BTreeMap;
use std::sync::Arc;
use thiserror::Error;

Expand Down Expand Up @@ -284,8 +284,7 @@ impl Catalog {
#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Default)]
pub struct InnerCatalog {
/// The catalog is a map of databases with their table schemas
#[serde_as(as = "DatabasesAsArray")]
databases: HashMap<DbId, Arc<DatabaseSchema>>,
databases: SerdeVecHashMap<DbId, Arc<DatabaseSchema>>,
sequence: SequenceNumber,
/// The host_id is the prefix that is passed in when starting up (`host_identifier_prefix`)
host_id: Arc<str>,
Expand Down Expand Up @@ -351,55 +350,10 @@ serde_with::serde_conv!(
}
);

serde_with::serde_conv!(
DatabasesAsArray,
HashMap<DbId, Arc<DatabaseSchema>>,
|map: &HashMap<DbId, Arc<DatabaseSchema>>| {
map.values().fold(Vec::new(), |mut acc, db| {
acc.push(DatabasesSerialized {
id: db.id,
name: Arc::clone(&db.name),
tables: db.tables.values().cloned().collect(),
});
acc
})
},
|vec: Vec<DatabasesSerialized>| -> Result<_, String> {
vec.into_iter().fold(Ok(HashMap::new()), |acc, db| {
let mut acc = acc?;
let mut table_map = BiHashMap::new();
if let Some(_) = acc.insert(db.id, Arc::new(DatabaseSchema {
id: db.id,
name: Arc::clone(&db.name),
tables: db.tables.into_iter().fold(Ok(BTreeMap::new()), |acc, table| {
let mut acc = acc?;
let table_name = Arc::clone(&table.table_name);
table_map.insert(table.table_id, Arc::clone(&table_name));
if let Some(_) = acc.insert(table.table_id, table) {
return Err(format!("found duplicate table: {}", table_name));
}
Ok(acc)
})?,
table_map
})) {
return Err(format!("found duplicate db: {}", db.name));
}
Ok(acc)
})
}
);

#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Default)]
struct DatabasesSerialized {
pub id: DbId,
pub name: Arc<str>,
pub tables: Vec<TableDefinition>,
}

impl InnerCatalog {
pub(crate) fn new(host_id: Arc<str>, instance_id: Arc<str>) -> Self {
Self {
databases: HashMap::new(),
databases: SerdeVecHashMap::new(),
sequence: SequenceNumber::new(0),
host_id,
instance_id,
Expand Down Expand Up @@ -466,7 +420,7 @@ pub struct DatabaseSchema {
pub id: DbId,
pub name: Arc<str>,
/// The database is a map of tables
pub tables: BTreeMap<TableId, TableDefinition>,
pub tables: SerdeVecHashMap<TableId, TableDefinition>,
#[serde_as(as = "TableMapAsArray")]
pub table_map: BiHashMap<TableId, Arc<str>>,
}
Expand All @@ -476,7 +430,7 @@ impl DatabaseSchema {
Self {
id,
name,
tables: BTreeMap::new(),
tables: Default::default(),
table_map: BiHashMap::new(),
}
}
Expand All @@ -485,7 +439,7 @@ impl DatabaseSchema {
/// everything is compatible and there are no updates to the existing schema, None will be
/// returned, otherwise a new `DatabaseSchema` will be returned with the updates applied.
pub fn new_if_updated_from_batch(&self, catalog_batch: &CatalogBatch) -> Result<Option<Self>> {
let mut updated_or_new_tables = BTreeMap::new();
let mut updated_or_new_tables = SerdeVecHashMap::new();

for catalog_op in &catalog_batch.ops {
match catalog_op {
Expand Down Expand Up @@ -1031,7 +985,6 @@ pub fn influx_column_type_from_field_value(fv: &FieldValue<'_>) -> InfluxColumnT

#[cfg(test)]
mod tests {
use insta::assert_json_snapshot;
use pretty_assertions::assert_eq;
use test_helpers::assert_contains;

Expand All @@ -1048,7 +1001,7 @@ mod tests {
let mut database = DatabaseSchema {
id: DbId::from(0),
name: "test_db".into(),
tables: BTreeMap::new(),
tables: SerdeVecHashMap::new(),
table_map: {
let mut map = BiHashMap::new();
map.insert(TableId::from(1), "test_table_1".into());
Expand Down Expand Up @@ -1104,10 +1057,6 @@ mod tests {
.databases
.insert(database.id, Arc::new(database));

// Perform a snapshot test to check that the JSON serialized catalog does not change in an
// undesired way when introducing features etc.
assert_json_snapshot!(catalog);

// Serialize/deserialize to ensure roundtrip to/from JSON
let serialized = serde_json::to_string(&catalog).unwrap();
let deserialized_inner: InnerCatalog = serde_json::from_str(&serialized).unwrap();
Expand All @@ -1122,85 +1071,116 @@ mod tests {
{
let json = r#"{
"databases": [
{
"id": 0,
"name": "db1",
"tables": []
},
{
"id": 0,
"name": "db1",
"tables": []
}
]
[
0,
{
"id": 0,
"name": "db1",
"tables": [],
"table_map": []
}
],
[
0,
{
"id": 0,
"name": "db1",
"tables": [],
"table_map": []
}
]
],
"sequence": 0,
"host_id": "test",
"instance_id": "test",
"db_map": []
}"#;
let err = serde_json::from_str::<InnerCatalog>(json).unwrap_err();
assert_contains!(err.to_string(), "found duplicate db: db1");
assert_contains!(err.to_string(), "duplicate key found");
}
// Duplicate tables
{
let json = r#"{
"databases": [
{
"id": 0,
"name": "db1",
"tables": [
{
"table_id": 0,
"table_name": "tbl1",
"cols": {},
"column_map": [],
"next_column_id": 0
},
{
"table_id": 0,
"table_name": "tbl1",
"cols": {},
"column_map": [],
"next_column_id": 0
}
]
}
]
[
0,
{
"id": 0,
"name": "db1",
"tables": [
[
0,
{
"table_id": 0,
"table_name": "tbl1",
"cols": {},
"column_map": [],
"next_column_id": 0
}
],
[
0,
{
"table_id": 0,
"table_name": "tbl1",
"cols": {},
"column_map": [],
"next_column_id": 0
}
]
]
}
]
],
"sequence": 0,
"host_id": "test",
"instance_id": "test",
"db_map": []
}"#;
let err = serde_json::from_str::<InnerCatalog>(json).unwrap_err();
assert_contains!(err.to_string(), "found duplicate table: tbl1");
assert_contains!(err.to_string(), "duplicate key found");
}
// Duplicate columns
{
let json = r#"{
"databases": [
{
"id": 0,
"name": "db1",
"tables": [
{
"table_id": 0,
"table_name": "tbl1",
"cols": {
"col1": {
"column_id": 0,
"type": "i64",
"influx_type": "field",
"nullable": true
},
"col1": {
"column_id": 0,
"type": "u64",
"influx_type": "field",
"nullable": true
}
},
"column_map": [
[
0,
{
"id": 0,
"name": "db1",
"tables": [
[
0,
{
"column_id": 0,
"name": "col1"
"table_id": 0,
"table_name": "tbl1",
"cols": {
"col1": {
"column_id": 0,
"type": "i64",
"influx_type": "field",
"nullable": true
},
"col1": {
"column_id": 0,
"type": "u64",
"influx_type": "field",
"nullable": true
}
},
"column_map": [
{
"column_id": 0,
"name": "col1"
}
],
"next_column_id": 1
}
],
"next_column_id": 1
}
]
}
]
]
}
]
]
}"#;
let err = serde_json::from_str::<InnerCatalog>(json).unwrap_err();
Expand All @@ -1213,7 +1193,7 @@ mod tests {
let mut database = DatabaseSchema {
id: DbId::from(0),
name: "test".into(),
tables: BTreeMap::new(),
tables: SerdeVecHashMap::new(),
table_map: BiHashMap::new(),
};
database.tables.insert(
Expand Down Expand Up @@ -1256,7 +1236,7 @@ mod tests {
let mut database = DatabaseSchema {
id: DbId::from(0),
name: "test_db".into(),
tables: BTreeMap::new(),
tables: SerdeVecHashMap::new(),
table_map: {
let mut map = BiHashMap::new();
map.insert(TableId::from(1), "test_table_1".into());
Expand Down Expand Up @@ -1291,8 +1271,6 @@ mod tests {
.databases
.insert(database.id, Arc::new(database));

assert_json_snapshot!(catalog);

let serialized = serde_json::to_string(&catalog).unwrap();
let deserialized_inner: InnerCatalog = serde_json::from_str(&serialized).unwrap();
let deserialized = Catalog::from_inner(deserialized_inner);
Expand All @@ -1307,7 +1285,7 @@ mod tests {
let mut database = DatabaseSchema {
id: DbId::from(0),
name: "test_db".into(),
tables: BTreeMap::new(),
tables: SerdeVecHashMap::new(),
table_map: {
let mut map = BiHashMap::new();
map.insert(TableId::from(0), "test".into());
Expand Down Expand Up @@ -1348,8 +1326,6 @@ mod tests {
.databases
.insert(database.id, Arc::new(database));

assert_json_snapshot!(catalog);

let serialized = serde_json::to_string(&catalog).unwrap();
let deserialized_inner: InnerCatalog = serde_json::from_str(&serialized).unwrap();
let deserialized = Catalog::from_inner(deserialized_inner);
Expand Down
Loading

0 comments on commit 0e814f5

Please sign in to comment.