From 0e814f5d52509dafd382522f8c9dc846967e6f2b Mon Sep 17 00:00:00 2001 From: Trevor Hilton Date: Fri, 25 Oct 2024 13:49:02 -0400 Subject: [PATCH] feat: SerdeVecMap type for serializing ID maps (#25492) This PR introduces a new type `SerdeVecHashMap` that can be used in places where we need a HashMap with the following properties: 1. When serialized, it is serialized as a list of key-value pairs, instead of a map 2. When deserialized, it assumes the serialization format from (1.) and deserializes from a list of key-value pairs to a map 3. Does not allow for duplicate keys on deserialization This is useful in places where we need to create map types that map from an identifier (integer) to some value, and need to serialize that data. For example: in the WAL when serializing write batches, and in the catalog when serializing the database/table schema. This PR refactors the code in `influxdb3_wal` and `influxdb3_catalog` to use the new type for maps that use `DbId` and `TableId` as the key. Follow on work can give the same treatment to `ColumnId` based maps once that is fully worked out. ## Explanation If we have a `HashMap`, `serde_json` will serialize it in the following way: ```json {"0": "foo", "1": "bar"} ``` i.e., the integer keys are serialized as strings, since JSON doesn't support any other type of key in maps. `SerdeVecHashMap` will be serialized by `serde_json` in the following way: ```json, [[0, "foo"], [1, "bar"]] ``` and will deserialize from that vector-based structure back to the map. This allows serialization/deserialization to run directly off of the `HashMap`'s `Iterator`/`FromIterator` implementations. ## The Controversial Part One thing I also did in this PR was switch the catalog from using a `BTreeMap` for tables to using the new `HashMap` type. This breaks the deterministic ordering of the database schema's `tables` map and therefore wrecks the snapshot tests we were using. I had to comment those parts of their respective tests out, because there isn't an easy way to make the underlying hashmap have a deterministic ordering just in tests that I am aware of. If we think that using `BTreeMap` in the catalog is okay over a `HashMap`, then I think it would be okay to roll a similar `SerdeVecBTreeMap` type specifically for the catalog. Coincidentally, this may actually be a good use case for [`indexmap`](https://docs.rs/indexmap/latest/indexmap/), since it holds supposedly similar lookup performance characteristics to hashmap, while preserving order and _having faster iteration_ which could be a win for WAL serialization speed. It also accepts different hashing algorithms so could be swapped in with FNV like `HashMap` can. ## Follow-up work Use the `SerdeVecHashMap` for column data in the WAL following https://github.com/influxdata/influxdb/issues/25461 --- Cargo.lock | 2 + influxdb3_catalog/src/catalog.rs | 234 +++++++--------- ...catalog__tests__catalog_serialization.snap | 256 ------------------ ..._catalog__tests__serialize_last_cache.snap | 113 -------- ...catalog__tests__serialize_series_keys.snap | 102 ------- influxdb3_id/Cargo.toml | 4 + influxdb3_id/src/lib.rs | 3 + influxdb3_id/src/serialize.rs | 207 ++++++++++++++ influxdb3_telemetry/src/stats.rs | 8 - influxdb3_wal/src/lib.rs | 43 +-- influxdb3_wal/src/object_store.rs | 18 +- influxdb3_wal/src/serialize.rs | 5 +- influxdb3_write/src/last_cache/mod.rs | 6 +- ...after-last-cache-create-and-new-field.snap | 160 ++++++----- ...g-immediately-after-last-cache-create.snap | 140 +++++----- ...g-immediately-after-last-cache-delete.snap | 138 +++++----- rust-toolchain.toml | 2 +- 17 files changed, 580 insertions(+), 861 deletions(-) delete mode 100644 influxdb3_catalog/src/snapshots/influxdb3_catalog__catalog__tests__catalog_serialization.snap delete mode 100644 influxdb3_catalog/src/snapshots/influxdb3_catalog__catalog__tests__serialize_last_cache.snap delete mode 100644 influxdb3_catalog/src/snapshots/influxdb3_catalog__catalog__tests__serialize_series_keys.snap create mode 100644 influxdb3_id/src/serialize.rs diff --git a/Cargo.lock b/Cargo.lock index 5bdee84c16c..7e00a60ddcd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2680,7 +2680,9 @@ dependencies = [ name = "influxdb3_id" version = "0.1.0" dependencies = [ + "hashbrown 0.14.5", "serde", + "serde_json", ] [[package]] diff --git a/influxdb3_catalog/src/catalog.rs b/influxdb3_catalog/src/catalog.rs index a5290eedbbe..d3a7123a933 100644 --- a/influxdb3_catalog/src/catalog.rs +++ b/influxdb3_catalog/src/catalog.rs @@ -3,7 +3,7 @@ use crate::catalog::Error::TableNotFound; use arrow::datatypes::SchemaRef; use bimap::BiHashMap; -use influxdb3_id::{ColumnId, DbId, TableId}; +use influxdb3_id::{ColumnId, DbId, SerdeVecHashMap, TableId}; use influxdb3_wal::{ CatalogBatch, CatalogOp, FieldAdditions, LastCacheDefinition, LastCacheDelete, }; @@ -12,7 +12,7 @@ use observability_deps::tracing::info; use parking_lot::RwLock; use schema::{InfluxColumnType, InfluxFieldType, Schema, SchemaBuilder}; use serde::{Deserialize, Serialize, Serializer}; -use std::collections::{BTreeMap, HashMap}; +use std::collections::BTreeMap; use std::sync::Arc; use thiserror::Error; @@ -284,8 +284,7 @@ impl Catalog { #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Default)] pub struct InnerCatalog { /// The catalog is a map of databases with their table schemas - #[serde_as(as = "DatabasesAsArray")] - databases: HashMap>, + databases: SerdeVecHashMap>, sequence: SequenceNumber, /// The host_id is the prefix that is passed in when starting up (`host_identifier_prefix`) host_id: Arc, @@ -351,55 +350,10 @@ serde_with::serde_conv!( } ); -serde_with::serde_conv!( - DatabasesAsArray, - HashMap>, - |map: &HashMap>| { - map.values().fold(Vec::new(), |mut acc, db| { - acc.push(DatabasesSerialized { - id: db.id, - name: Arc::clone(&db.name), - tables: db.tables.values().cloned().collect(), - }); - acc - }) - }, - |vec: Vec| -> Result<_, String> { - vec.into_iter().fold(Ok(HashMap::new()), |acc, db| { - let mut acc = acc?; - let mut table_map = BiHashMap::new(); - if let Some(_) = acc.insert(db.id, Arc::new(DatabaseSchema { - id: db.id, - name: Arc::clone(&db.name), - tables: db.tables.into_iter().fold(Ok(BTreeMap::new()), |acc, table| { - let mut acc = acc?; - let table_name = Arc::clone(&table.table_name); - table_map.insert(table.table_id, Arc::clone(&table_name)); - if let Some(_) = acc.insert(table.table_id, table) { - return Err(format!("found duplicate table: {}", table_name)); - } - Ok(acc) - })?, - table_map - })) { - return Err(format!("found duplicate db: {}", db.name)); - } - Ok(acc) - }) - } -); - -#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Default)] -struct DatabasesSerialized { - pub id: DbId, - pub name: Arc, - pub tables: Vec, -} - impl InnerCatalog { pub(crate) fn new(host_id: Arc, instance_id: Arc) -> Self { Self { - databases: HashMap::new(), + databases: SerdeVecHashMap::new(), sequence: SequenceNumber::new(0), host_id, instance_id, @@ -466,7 +420,7 @@ pub struct DatabaseSchema { pub id: DbId, pub name: Arc, /// The database is a map of tables - pub tables: BTreeMap, + pub tables: SerdeVecHashMap, #[serde_as(as = "TableMapAsArray")] pub table_map: BiHashMap>, } @@ -476,7 +430,7 @@ impl DatabaseSchema { Self { id, name, - tables: BTreeMap::new(), + tables: Default::default(), table_map: BiHashMap::new(), } } @@ -485,7 +439,7 @@ impl DatabaseSchema { /// everything is compatible and there are no updates to the existing schema, None will be /// returned, otherwise a new `DatabaseSchema` will be returned with the updates applied. pub fn new_if_updated_from_batch(&self, catalog_batch: &CatalogBatch) -> Result> { - let mut updated_or_new_tables = BTreeMap::new(); + let mut updated_or_new_tables = SerdeVecHashMap::new(); for catalog_op in &catalog_batch.ops { match catalog_op { @@ -1031,7 +985,6 @@ pub fn influx_column_type_from_field_value(fv: &FieldValue<'_>) -> InfluxColumnT #[cfg(test)] mod tests { - use insta::assert_json_snapshot; use pretty_assertions::assert_eq; use test_helpers::assert_contains; @@ -1048,7 +1001,7 @@ mod tests { let mut database = DatabaseSchema { id: DbId::from(0), name: "test_db".into(), - tables: BTreeMap::new(), + tables: SerdeVecHashMap::new(), table_map: { let mut map = BiHashMap::new(); map.insert(TableId::from(1), "test_table_1".into()); @@ -1104,10 +1057,6 @@ mod tests { .databases .insert(database.id, Arc::new(database)); - // Perform a snapshot test to check that the JSON serialized catalog does not change in an - // undesired way when introducing features etc. - assert_json_snapshot!(catalog); - // Serialize/deserialize to ensure roundtrip to/from JSON let serialized = serde_json::to_string(&catalog).unwrap(); let deserialized_inner: InnerCatalog = serde_json::from_str(&serialized).unwrap(); @@ -1122,85 +1071,116 @@ mod tests { { let json = r#"{ "databases": [ - { - "id": 0, - "name": "db1", - "tables": [] - }, - { - "id": 0, - "name": "db1", - "tables": [] - } - ] + [ + 0, + { + "id": 0, + "name": "db1", + "tables": [], + "table_map": [] + } + ], + [ + 0, + { + "id": 0, + "name": "db1", + "tables": [], + "table_map": [] + } + ] + ], + "sequence": 0, + "host_id": "test", + "instance_id": "test", + "db_map": [] }"#; let err = serde_json::from_str::(json).unwrap_err(); - assert_contains!(err.to_string(), "found duplicate db: db1"); + assert_contains!(err.to_string(), "duplicate key found"); } // Duplicate tables { let json = r#"{ "databases": [ - { - "id": 0, - "name": "db1", - "tables": [ - { - "table_id": 0, - "table_name": "tbl1", - "cols": {}, - "column_map": [], - "next_column_id": 0 - }, - { - "table_id": 0, - "table_name": "tbl1", - "cols": {}, - "column_map": [], - "next_column_id": 0 - } - ] - } - ] + [ + 0, + { + "id": 0, + "name": "db1", + "tables": [ + [ + 0, + { + "table_id": 0, + "table_name": "tbl1", + "cols": {}, + "column_map": [], + "next_column_id": 0 + } + ], + [ + 0, + { + "table_id": 0, + "table_name": "tbl1", + "cols": {}, + "column_map": [], + "next_column_id": 0 + } + ] + ] + } + ] + ], + "sequence": 0, + "host_id": "test", + "instance_id": "test", + "db_map": [] }"#; let err = serde_json::from_str::(json).unwrap_err(); - assert_contains!(err.to_string(), "found duplicate table: tbl1"); + assert_contains!(err.to_string(), "duplicate key found"); } // Duplicate columns { let json = r#"{ "databases": [ - { - "id": 0, - "name": "db1", - "tables": [ - { - "table_id": 0, - "table_name": "tbl1", - "cols": { - "col1": { - "column_id": 0, - "type": "i64", - "influx_type": "field", - "nullable": true - }, - "col1": { - "column_id": 0, - "type": "u64", - "influx_type": "field", - "nullable": true - } - }, - "column_map": [ + [ + 0, + { + "id": 0, + "name": "db1", + "tables": [ + [ + 0, { - "column_id": 0, - "name": "col1" + "table_id": 0, + "table_name": "tbl1", + "cols": { + "col1": { + "column_id": 0, + "type": "i64", + "influx_type": "field", + "nullable": true + }, + "col1": { + "column_id": 0, + "type": "u64", + "influx_type": "field", + "nullable": true + } + }, + "column_map": [ + { + "column_id": 0, + "name": "col1" + } + ], + "next_column_id": 1 } - ], - "next_column_id": 1 - } - ] - } + ] + ] + } + ] ] }"#; let err = serde_json::from_str::(json).unwrap_err(); @@ -1213,7 +1193,7 @@ mod tests { let mut database = DatabaseSchema { id: DbId::from(0), name: "test".into(), - tables: BTreeMap::new(), + tables: SerdeVecHashMap::new(), table_map: BiHashMap::new(), }; database.tables.insert( @@ -1256,7 +1236,7 @@ mod tests { let mut database = DatabaseSchema { id: DbId::from(0), name: "test_db".into(), - tables: BTreeMap::new(), + tables: SerdeVecHashMap::new(), table_map: { let mut map = BiHashMap::new(); map.insert(TableId::from(1), "test_table_1".into()); @@ -1291,8 +1271,6 @@ mod tests { .databases .insert(database.id, Arc::new(database)); - assert_json_snapshot!(catalog); - let serialized = serde_json::to_string(&catalog).unwrap(); let deserialized_inner: InnerCatalog = serde_json::from_str(&serialized).unwrap(); let deserialized = Catalog::from_inner(deserialized_inner); @@ -1307,7 +1285,7 @@ mod tests { let mut database = DatabaseSchema { id: DbId::from(0), name: "test_db".into(), - tables: BTreeMap::new(), + tables: SerdeVecHashMap::new(), table_map: { let mut map = BiHashMap::new(); map.insert(TableId::from(0), "test".into()); @@ -1348,8 +1326,6 @@ mod tests { .databases .insert(database.id, Arc::new(database)); - assert_json_snapshot!(catalog); - let serialized = serde_json::to_string(&catalog).unwrap(); let deserialized_inner: InnerCatalog = serde_json::from_str(&serialized).unwrap(); let deserialized = Catalog::from_inner(deserialized_inner); diff --git a/influxdb3_catalog/src/snapshots/influxdb3_catalog__catalog__tests__catalog_serialization.snap b/influxdb3_catalog/src/snapshots/influxdb3_catalog__catalog__tests__catalog_serialization.snap deleted file mode 100644 index 0d2887a4f6d..00000000000 --- a/influxdb3_catalog/src/snapshots/influxdb3_catalog__catalog__tests__catalog_serialization.snap +++ /dev/null @@ -1,256 +0,0 @@ ---- -source: influxdb3_catalog/src/catalog.rs -expression: catalog ---- -{ - "databases": [ - { - "id": 0, - "name": "test_db", - "tables": [ - { - "table_id": 1, - "table_name": "test_table_1", - "cols": { - "bool_field": { - "column_id": 0, - "type": "bool", - "influx_type": "field", - "nullable": true - }, - "f64_field": { - "column_id": 1, - "type": "f64", - "influx_type": "field", - "nullable": true - }, - "i64_field": { - "column_id": 2, - "type": "i64", - "influx_type": "field", - "nullable": true - }, - "string_field": { - "column_id": 3, - "type": "str", - "influx_type": "field", - "nullable": true - }, - "tag_1": { - "column_id": 4, - "type": { - "dict": [ - "i32", - "str" - ] - }, - "influx_type": "tag", - "nullable": true - }, - "tag_2": { - "column_id": 5, - "type": { - "dict": [ - "i32", - "str" - ] - }, - "influx_type": "tag", - "nullable": true - }, - "tag_3": { - "column_id": 6, - "type": { - "dict": [ - "i32", - "str" - ] - }, - "influx_type": "tag", - "nullable": true - }, - "time": { - "column_id": 7, - "type": { - "time": [ - "ns", - null - ] - }, - "influx_type": "time", - "nullable": false - }, - "u64_field": { - "column_id": 8, - "type": "u64", - "influx_type": "field", - "nullable": true - } - }, - "column_map": [ - { - "column_id": 0, - "name": "bool_field" - }, - { - "column_id": 1, - "name": "f64_field" - }, - { - "column_id": 2, - "name": "i64_field" - }, - { - "column_id": 3, - "name": "string_field" - }, - { - "column_id": 4, - "name": "tag_1" - }, - { - "column_id": 5, - "name": "tag_2" - }, - { - "column_id": 6, - "name": "tag_3" - }, - { - "column_id": 7, - "name": "time" - }, - { - "column_id": 8, - "name": "u64_field" - } - ], - "next_column_id": 9 - }, - { - "table_id": 2, - "table_name": "test_table_2", - "cols": { - "bool_field": { - "column_id": 0, - "type": "bool", - "influx_type": "field", - "nullable": true - }, - "f64_field": { - "column_id": 1, - "type": "f64", - "influx_type": "field", - "nullable": true - }, - "i64_field": { - "column_id": 2, - "type": "i64", - "influx_type": "field", - "nullable": true - }, - "string_field": { - "column_id": 3, - "type": "str", - "influx_type": "field", - "nullable": true - }, - "tag_1": { - "column_id": 4, - "type": { - "dict": [ - "i32", - "str" - ] - }, - "influx_type": "tag", - "nullable": true - }, - "tag_2": { - "column_id": 5, - "type": { - "dict": [ - "i32", - "str" - ] - }, - "influx_type": "tag", - "nullable": true - }, - "tag_3": { - "column_id": 6, - "type": { - "dict": [ - "i32", - "str" - ] - }, - "influx_type": "tag", - "nullable": true - }, - "time": { - "column_id": 7, - "type": { - "time": [ - "ns", - null - ] - }, - "influx_type": "time", - "nullable": false - }, - "u64_field": { - "column_id": 8, - "type": "u64", - "influx_type": "field", - "nullable": true - } - }, - "column_map": [ - { - "column_id": 0, - "name": "bool_field" - }, - { - "column_id": 1, - "name": "f64_field" - }, - { - "column_id": 2, - "name": "i64_field" - }, - { - "column_id": 3, - "name": "string_field" - }, - { - "column_id": 4, - "name": "tag_1" - }, - { - "column_id": 5, - "name": "tag_2" - }, - { - "column_id": 6, - "name": "tag_3" - }, - { - "column_id": 7, - "name": "time" - }, - { - "column_id": 8, - "name": "u64_field" - } - ], - "next_column_id": 9 - } - ] - } - ], - "sequence": 0, - "host_id": "sample-host-id", - "instance_id": "instance-id", - "db_map": [] -} diff --git a/influxdb3_catalog/src/snapshots/influxdb3_catalog__catalog__tests__serialize_last_cache.snap b/influxdb3_catalog/src/snapshots/influxdb3_catalog__catalog__tests__serialize_last_cache.snap deleted file mode 100644 index c99f4daec98..00000000000 --- a/influxdb3_catalog/src/snapshots/influxdb3_catalog__catalog__tests__serialize_last_cache.snap +++ /dev/null @@ -1,113 +0,0 @@ ---- -source: influxdb3_catalog/src/catalog.rs -expression: catalog ---- -{ - "databases": [ - { - "id": 0, - "name": "test_db", - "tables": [ - { - "table_id": 0, - "table_name": "test", - "cols": { - "field": { - "column_id": 0, - "type": "str", - "influx_type": "field", - "nullable": true - }, - "tag_1": { - "column_id": 1, - "type": { - "dict": [ - "i32", - "str" - ] - }, - "influx_type": "tag", - "nullable": true - }, - "tag_2": { - "column_id": 2, - "type": { - "dict": [ - "i32", - "str" - ] - }, - "influx_type": "tag", - "nullable": true - }, - "tag_3": { - "column_id": 3, - "type": { - "dict": [ - "i32", - "str" - ] - }, - "influx_type": "tag", - "nullable": true - }, - "time": { - "column_id": 4, - "type": { - "time": [ - "ns", - null - ] - }, - "influx_type": "time", - "nullable": false - } - }, - "last_caches": [ - { - "table_id": 0, - "table": "test", - "name": "test_table_last_cache", - "keys": [ - "tag_2", - "tag_3" - ], - "vals": [ - "field" - ], - "n": 1, - "ttl": 600 - } - ], - "column_map": [ - { - "column_id": 0, - "name": "field" - }, - { - "column_id": 1, - "name": "tag_1" - }, - { - "column_id": 2, - "name": "tag_2" - }, - { - "column_id": 3, - "name": "tag_3" - }, - { - "column_id": 4, - "name": "time" - } - ], - "next_column_id": 5 - } - ] - } - ], - "sequence": 0, - "host_id": "sample-host-id", - "instance_id": "instance-id", - "db_map": [] -} diff --git a/influxdb3_catalog/src/snapshots/influxdb3_catalog__catalog__tests__serialize_series_keys.snap b/influxdb3_catalog/src/snapshots/influxdb3_catalog__catalog__tests__serialize_series_keys.snap deleted file mode 100644 index a8a8004bc33..00000000000 --- a/influxdb3_catalog/src/snapshots/influxdb3_catalog__catalog__tests__serialize_series_keys.snap +++ /dev/null @@ -1,102 +0,0 @@ ---- -source: influxdb3_catalog/src/catalog.rs -expression: catalog ---- -{ - "databases": [ - { - "id": 0, - "name": "test_db", - "tables": [ - { - "table_id": 1, - "table_name": "test_table_1", - "key": [ - "tag_1", - "tag_2", - "tag_3" - ], - "cols": { - "field": { - "column_id": 0, - "type": "str", - "influx_type": "field", - "nullable": true - }, - "tag_1": { - "column_id": 1, - "type": { - "dict": [ - "i32", - "str" - ] - }, - "influx_type": "tag", - "nullable": false - }, - "tag_2": { - "column_id": 2, - "type": { - "dict": [ - "i32", - "str" - ] - }, - "influx_type": "tag", - "nullable": false - }, - "tag_3": { - "column_id": 3, - "type": { - "dict": [ - "i32", - "str" - ] - }, - "influx_type": "tag", - "nullable": false - }, - "time": { - "column_id": 4, - "type": { - "time": [ - "ns", - null - ] - }, - "influx_type": "time", - "nullable": false - } - }, - "column_map": [ - { - "column_id": 0, - "name": "field" - }, - { - "column_id": 1, - "name": "tag_1" - }, - { - "column_id": 2, - "name": "tag_2" - }, - { - "column_id": 3, - "name": "tag_3" - }, - { - "column_id": 4, - "name": "time" - } - ], - "next_column_id": 5 - } - ] - } - ], - "sequence": 0, - "host_id": "sample-host-id", - "instance_id": "instance-id", - "db_map": [] -} diff --git a/influxdb3_id/Cargo.toml b/influxdb3_id/Cargo.toml index 0d408e55797..4cf2c9cf2fa 100644 --- a/influxdb3_id/Cargo.toml +++ b/influxdb3_id/Cargo.toml @@ -6,7 +6,11 @@ edition.workspace = true license.workspace = true [dependencies] +hashbrown.workspace = true serde.workspace = true +[dev-dependencies] +serde_json.workspace = true + [lints] workspace = true diff --git a/influxdb3_id/src/lib.rs b/influxdb3_id/src/lib.rs index aefa812d588..bc9defd816c 100644 --- a/influxdb3_id/src/lib.rs +++ b/influxdb3_id/src/lib.rs @@ -5,6 +5,9 @@ use std::sync::atomic::AtomicU32; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; +mod serialize; +pub use serialize::SerdeVecHashMap; + #[derive(Debug, Copy, Clone, Eq, PartialOrd, Ord, PartialEq, Serialize, Deserialize, Hash)] pub struct DbId(u32); diff --git a/influxdb3_id/src/serialize.rs b/influxdb3_id/src/serialize.rs new file mode 100644 index 00000000000..bdaf522f2b9 --- /dev/null +++ b/influxdb3_id/src/serialize.rs @@ -0,0 +1,207 @@ +use std::{ + marker::PhantomData, + ops::{Deref, DerefMut}, +}; + +use hashbrown::{ + hash_map::{IntoIter, Iter, IterMut}, + HashMap, +}; +use serde::{ + de::{self, SeqAccess, Visitor}, + ser::SerializeSeq, + Deserialize, Deserializer, Serialize, Serializer, +}; + +/// A new-type around a `HashMap` that provides special serialization and deserialization behaviour. +/// +/// Specifically, it will be serialized as a vector of tuples, each tuple containing a key-value +/// pair from the map. Deserialization assumes said serialization, and deserializes from the vector +/// of tuples back into the map. Traits like `Deref`, `From`, etc. are implemented on this type such +/// that it can be used as a `HashMap`. +/// +/// During deserialization, there are no duplicate keys allowed. If duplicates are found, an error +/// will be thrown. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct SerdeVecHashMap(HashMap); + +impl SerdeVecHashMap +where + K: Eq + std::hash::Hash, +{ + pub fn new() -> Self { + Self::default() + } +} + +impl Default for SerdeVecHashMap +where + K: Eq + std::hash::Hash, +{ + fn default() -> Self { + Self(Default::default()) + } +} + +impl From for SerdeVecHashMap +where + K: Eq + std::hash::Hash, + T: Into>, +{ + fn from(value: T) -> Self { + Self(value.into()) + } +} + +impl IntoIterator for SerdeVecHashMap +where + K: Eq + std::hash::Hash, +{ + type Item = (K, V); + + type IntoIter = IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.0.into_iter() + } +} + +impl<'a, K, V> IntoIterator for &'a SerdeVecHashMap +where + K: Eq + std::hash::Hash, +{ + type Item = (&'a K, &'a V); + + type IntoIter = Iter<'a, K, V>; + + fn into_iter(self) -> Self::IntoIter { + self.0.iter() + } +} + +impl<'a, K, V> IntoIterator for &'a mut SerdeVecHashMap +where + K: Eq + std::hash::Hash, +{ + type Item = (&'a K, &'a mut V); + + type IntoIter = IterMut<'a, K, V>; + + fn into_iter(self) -> Self::IntoIter { + self.0.iter_mut() + } +} + +impl Deref for SerdeVecHashMap +where + K: Eq + std::hash::Hash, +{ + type Target = HashMap; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for SerdeVecHashMap +where + K: Eq + std::hash::Hash, +{ + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl Serialize for SerdeVecHashMap +where + K: Eq + std::hash::Hash + Serialize, + V: Serialize, +{ + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let mut seq = serializer.serialize_seq(Some(self.len()))?; + for ele in self.iter() { + seq.serialize_element(&ele)?; + } + seq.end() + } +} + +impl<'de, K, V> Deserialize<'de> for SerdeVecHashMap +where + K: Eq + std::hash::Hash + Deserialize<'de>, + V: Deserialize<'de>, +{ + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let v = deserializer.deserialize_seq(VecVisitor::new())?; + let mut map = HashMap::with_capacity(v.len()); + for (k, v) in v.into_iter() { + if map.insert(k, v).is_some() { + return Err(de::Error::custom("duplicate key found")); + } + } + Ok(Self(map)) + } +} + +type Output = fn() -> Vec<(K, V)>; + +struct VecVisitor { + marker: PhantomData>, +} + +impl VecVisitor { + fn new() -> Self { + Self { + marker: PhantomData, + } + } +} + +impl<'de, K, V> Visitor<'de> for VecVisitor +where + K: Deserialize<'de>, + V: Deserialize<'de>, +{ + type Value = Vec<(K, V)>; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("a vector of key value pairs") + } + + fn visit_seq(self, mut seq: A) -> Result + where + A: SeqAccess<'de>, + { + let mut v = Vec::with_capacity(seq.size_hint().unwrap_or(0)); + while let Some(ele) = seq.next_element()? { + v.push(ele); + } + Ok(v) + } +} + +#[cfg(test)] +mod tests { + use hashbrown::HashMap; + + use super::SerdeVecHashMap; + + #[test] + fn serde_vec_map_with_json() { + let map = HashMap::::from_iter([(0, "foo"), (1, "bar"), (2, "baz")]); + let serde_vec_map = SerdeVecHashMap::from(map); + // test round-trip to JSON: + let s = serde_json::to_string(&serde_vec_map).unwrap(); + // with using a hashmap the order changes so asserting on the JSON itself is flaky, so if + // you want to see it working use --nocapture on the test... + println!("{s}"); + let d: SerdeVecHashMap = serde_json::from_str(&s).unwrap(); + assert_eq!(d, serde_vec_map); + } +} diff --git a/influxdb3_telemetry/src/stats.rs b/influxdb3_telemetry/src/stats.rs index 9e5c751f52e..b7131cebd40 100644 --- a/influxdb3_telemetry/src/stats.rs +++ b/influxdb3_telemetry/src/stats.rs @@ -21,10 +21,6 @@ pub(crate) struct RollingStats { } impl RollingStats { - pub fn new() -> RollingStats { - RollingStats::default() - } - /// Update the rolling stats [`Self::min`]/[`Self::max`]/[`Self::avg`] using /// reference to an higher precision stats that is passed in. This is usually a /// per minute interval stats. One thing to note here is the [`Self::num_samples`] @@ -70,10 +66,6 @@ pub(crate) struct Stats { } impl Stats { - pub fn new() -> Stats { - Stats::default() - } - /// Update the [`Self::min`]/[`Self::max`]/[`Self::avg`] from a /// new value that is sampled. pub fn update(&mut self, new_val: T) -> Option<()> { diff --git a/influxdb3_wal/src/lib.rs b/influxdb3_wal/src/lib.rs index 6873074d27e..ed4599e2bc0 100644 --- a/influxdb3_wal/src/lib.rs +++ b/influxdb3_wal/src/lib.rs @@ -11,7 +11,7 @@ use crate::snapshot_tracker::SnapshotInfo; use async_trait::async_trait; use data_types::Timestamp; use hashbrown::HashMap; -use influxdb3_id::{DbId, TableId}; +use influxdb3_id::{DbId, SerdeVecHashMap, TableId}; use influxdb_line_protocol::v3::SeriesValue; use influxdb_line_protocol::FieldValue; use iox_time::Time; @@ -442,46 +442,11 @@ pub struct LastCacheDelete { pub struct WriteBatch { pub database_id: DbId, pub database_name: Arc, - #[serde_as(as = "TableChunksMapAsVec")] - pub table_chunks: HashMap, + pub table_chunks: SerdeVecHashMap, pub min_time_ns: i64, pub max_time_ns: i64, } -#[derive(Debug, Serialize, Deserialize)] -pub struct TableChunksMap { - table_id: TableId, - min_time: i64, - max_time: i64, - chunk_time_to_chunk: HashMap, -} - -serde_with::serde_conv!( - TableChunksMapAsVec, - HashMap, - |map: &HashMap| - map.iter() - .map(|(table_id, chunk)| { - TableChunksMap { - table_id: *table_id, - min_time: chunk.min_time, - max_time: chunk.max_time, - chunk_time_to_chunk: chunk.chunk_time_to_chunk.clone() - } - }) - .collect::>(), - |vec: Vec| -> Result<_, std::convert::Infallible> { - Ok(vec.into_iter().fold(HashMap::new(), |mut acc, chunk| { - acc.insert(chunk.table_id, TableChunks{ - min_time: chunk.min_time, - max_time: chunk.max_time, - chunk_time_to_chunk: chunk.chunk_time_to_chunk - }); - acc - })) - } -); - impl WriteBatch { pub fn new( database_id: DbId, @@ -502,7 +467,7 @@ impl WriteBatch { Self { database_id, database_name, - table_chunks, + table_chunks: table_chunks.into(), min_time_ns, max_time_ns, } @@ -510,7 +475,7 @@ impl WriteBatch { pub fn add_write_batch( &mut self, - new_table_chunks: HashMap, + new_table_chunks: SerdeVecHashMap, min_time_ns: i64, max_time_ns: i64, ) { diff --git a/influxdb3_wal/src/object_store.rs b/influxdb3_wal/src/object_store.rs index eeb2d005fa3..14dbe4c02f3 100644 --- a/influxdb3_wal/src/object_store.rs +++ b/influxdb3_wal/src/object_store.rs @@ -681,7 +681,8 @@ mod tests { }, )]), }, - )]), + )]) + .into(), min_time_ns: 1, max_time_ns: 3, }); @@ -714,7 +715,8 @@ mod tests { }, )]), }, - )]), + )]) + .into(), min_time_ns: 62_000000000, max_time_ns: 62_000000000, }); @@ -782,7 +784,8 @@ mod tests { }, )]), }, - )]), + )]) + .into(), min_time_ns: 1, max_time_ns: 62_000000000, })], @@ -824,7 +827,8 @@ mod tests { }, )]), }, - )]), + )]) + .into(), min_time_ns: 62_000000000, max_time_ns: 62_000000000, })], @@ -896,7 +900,8 @@ mod tests { }, )]), }, - )]), + )]) + .into(), min_time_ns: 128_000000000, max_time_ns: 128_000000000, }); @@ -956,7 +961,8 @@ mod tests { }, )]), }, - )]), + )]) + .into(), min_time_ns: 128_000000000, max_time_ns: 128_000000000, })], diff --git a/influxdb3_wal/src/serialize.rs b/influxdb3_wal/src/serialize.rs index da52c132737..1b614393833 100644 --- a/influxdb3_wal/src/serialize.rs +++ b/influxdb3_wal/src/serialize.rs @@ -91,8 +91,7 @@ mod tests { use crate::{ Field, FieldData, Row, TableChunk, TableChunks, WalFileSequenceNumber, WalOp, WriteBatch, }; - use hashbrown::HashMap; - use influxdb3_id::{DbId, TableId}; + use influxdb3_id::{DbId, SerdeVecHashMap, TableId}; #[test] fn test_serialize_deserialize() { @@ -117,7 +116,7 @@ mod tests { chunk_time_to_chunk: [(1, chunk)].iter().cloned().collect(), }; let table_id = TableId::from(2); - let mut table_chunks = HashMap::new(); + let mut table_chunks = SerdeVecHashMap::new(); table_chunks.insert(table_id, chunks); let contents = WalContents { diff --git a/influxdb3_write/src/last_cache/mod.rs b/influxdb3_write/src/last_cache/mod.rs index e26cef2f187..b60c6c2583f 100644 --- a/influxdb3_write/src/last_cache/mod.rs +++ b/influxdb3_write/src/last_cache/mod.rs @@ -1586,7 +1586,7 @@ fn data_type_from_buffer_field(field: &Field) -> DataType { #[cfg(test)] mod tests { - use std::{cmp::Ordering, collections::BTreeMap, sync::Arc, time::Duration}; + use std::{cmp::Ordering, sync::Arc, time::Duration}; use crate::{ last_cache::{KeyValue, LastCacheProvider, Predicate, DEFAULT_CACHE_TTL}, @@ -1600,7 +1600,7 @@ mod tests { use bimap::BiHashMap; use data_types::NamespaceName; use influxdb3_catalog::catalog::{Catalog, DatabaseSchema, TableDefinition}; - use influxdb3_id::{DbId, TableId}; + use influxdb3_id::{DbId, SerdeVecHashMap, TableId}; use influxdb3_wal::{LastCacheDefinition, WalConfig}; use insta::assert_json_snapshot; use iox_time::{MockProvider, Time, TimeProvider}; @@ -3114,7 +3114,7 @@ mod tests { let mut database = DatabaseSchema { id: DbId::from(0), name: db_name.into(), - tables: BTreeMap::new(), + tables: SerdeVecHashMap::new(), table_map: { let mut map = BiHashMap::new(); map.insert(TableId::from(0), "test_table_1".into()); diff --git a/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-after-last-cache-create-and-new-field.snap b/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-after-last-cache-create-and-new-field.snap index b57528e66a1..fcacd3e0fc4 100644 --- a/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-after-last-cache-create-and-new-field.snap +++ b/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-after-last-cache-create-and-new-field.snap @@ -4,84 +4,96 @@ expression: catalog_json --- { "databases": [ - { - "id": 0, - "name": "db", - "tables": [ - { - "cols": { - "f1": { - "column_id": 0, - "influx_type": "field", - "nullable": true, - "type": "bool" - }, - "f2": { - "column_id": 3, - "influx_type": "field", - "nullable": true, - "type": "i64" - }, - "t1": { - "column_id": 1, - "influx_type": "tag", - "nullable": true, - "type": { - "dict": [ - "i32", - "str" - ] - } - }, - "time": { - "column_id": 2, - "influx_type": "time", - "nullable": false, - "type": { - "time": [ - "ns", - null - ] - } - } - }, - "column_map": [ - { - "column_id": 0, - "name": "f1" - }, - { - "column_id": 1, - "name": "t1" - }, - { - "column_id": 2, - "name": "time" - }, + [ + 0, + { + "id": 0, + "name": "db", + "table_map": [ + { + "name": "table", + "table_id": 0 + } + ], + "tables": [ + [ + 0, { - "column_id": 3, - "name": "f2" - } - ], - "last_caches": [ - { - "keys": [ - "t1" + "cols": { + "f1": { + "column_id": 0, + "influx_type": "field", + "nullable": true, + "type": "bool" + }, + "f2": { + "column_id": 3, + "influx_type": "field", + "nullable": true, + "type": "i64" + }, + "t1": { + "column_id": 1, + "influx_type": "tag", + "nullable": true, + "type": { + "dict": [ + "i32", + "str" + ] + } + }, + "time": { + "column_id": 2, + "influx_type": "time", + "nullable": false, + "type": { + "time": [ + "ns", + null + ] + } + } + }, + "column_map": [ + { + "column_id": 0, + "name": "f1" + }, + { + "column_id": 1, + "name": "t1" + }, + { + "column_id": 2, + "name": "time" + }, + { + "column_id": 3, + "name": "f2" + } ], - "n": 1, - "name": "cache", - "table": "table", + "last_caches": [ + { + "keys": [ + "t1" + ], + "n": 1, + "name": "cache", + "table": "table", + "table_id": 0, + "ttl": 14400, + "vals": null + } + ], + "next_column_id": 4, "table_id": 0, - "ttl": 14400, - "vals": null + "table_name": "table" } - ], - "next_column_id": 4, - "table_id": 0, - "table_name": "table" - } - ] - } + ] + ] + } + ] ], "db_map": [ { diff --git a/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-immediately-after-last-cache-create.snap b/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-immediately-after-last-cache-create.snap index 8c1c4c05db1..0512ccd036d 100644 --- a/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-immediately-after-last-cache-create.snap +++ b/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-immediately-after-last-cache-create.snap @@ -4,74 +4,86 @@ expression: catalog_json --- { "databases": [ - { - "id": 0, - "name": "db", - "tables": [ - { - "cols": { - "f1": { - "column_id": 0, - "influx_type": "field", - "nullable": true, - "type": "bool" - }, - "t1": { - "column_id": 1, - "influx_type": "tag", - "nullable": true, - "type": { - "dict": [ - "i32", - "str" - ] - } - }, - "time": { - "column_id": 2, - "influx_type": "time", - "nullable": false, - "type": { - "time": [ - "ns", - null - ] - } - } - }, - "column_map": [ - { - "column_id": 0, - "name": "f1" - }, - { - "column_id": 1, - "name": "t1" - }, + [ + 0, + { + "id": 0, + "name": "db", + "table_map": [ + { + "name": "table", + "table_id": 0 + } + ], + "tables": [ + [ + 0, { - "column_id": 2, - "name": "time" - } - ], - "last_caches": [ - { - "keys": [ - "t1" + "cols": { + "f1": { + "column_id": 0, + "influx_type": "field", + "nullable": true, + "type": "bool" + }, + "t1": { + "column_id": 1, + "influx_type": "tag", + "nullable": true, + "type": { + "dict": [ + "i32", + "str" + ] + } + }, + "time": { + "column_id": 2, + "influx_type": "time", + "nullable": false, + "type": { + "time": [ + "ns", + null + ] + } + } + }, + "column_map": [ + { + "column_id": 0, + "name": "f1" + }, + { + "column_id": 1, + "name": "t1" + }, + { + "column_id": 2, + "name": "time" + } ], - "n": 1, - "name": "cache", - "table": "table", + "last_caches": [ + { + "keys": [ + "t1" + ], + "n": 1, + "name": "cache", + "table": "table", + "table_id": 0, + "ttl": 14400, + "vals": null + } + ], + "next_column_id": 3, "table_id": 0, - "ttl": 14400, - "vals": null + "table_name": "table" } - ], - "next_column_id": 3, - "table_id": 0, - "table_name": "table" - } - ] - } + ] + ] + } + ] ], "db_map": [ { diff --git a/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-immediately-after-last-cache-delete.snap b/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-immediately-after-last-cache-delete.snap index 4704183b763..9925b2f06ab 100644 --- a/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-immediately-after-last-cache-delete.snap +++ b/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-immediately-after-last-cache-delete.snap @@ -4,71 +4,83 @@ expression: catalog_json --- { "databases": [ - { - "id": 0, - "name": "db", - "tables": [ - { - "cols": { - "f1": { - "column_id": 0, - "influx_type": "field", - "nullable": true, - "type": "bool" - }, - "f2": { - "column_id": 3, - "influx_type": "field", - "nullable": true, - "type": "i64" - }, - "t1": { - "column_id": 1, - "influx_type": "tag", - "nullable": true, - "type": { - "dict": [ - "i32", - "str" - ] - } - }, - "time": { - "column_id": 2, - "influx_type": "time", - "nullable": false, - "type": { - "time": [ - "ns", - null - ] - } - } - }, - "column_map": [ - { - "column_id": 0, - "name": "f1" - }, + [ + 0, + { + "id": 0, + "name": "db", + "table_map": [ + { + "name": "table", + "table_id": 0 + } + ], + "tables": [ + [ + 0, { - "column_id": 1, - "name": "t1" - }, - { - "column_id": 2, - "name": "time" - }, - { - "column_id": 3, - "name": "f2" + "cols": { + "f1": { + "column_id": 0, + "influx_type": "field", + "nullable": true, + "type": "bool" + }, + "f2": { + "column_id": 3, + "influx_type": "field", + "nullable": true, + "type": "i64" + }, + "t1": { + "column_id": 1, + "influx_type": "tag", + "nullable": true, + "type": { + "dict": [ + "i32", + "str" + ] + } + }, + "time": { + "column_id": 2, + "influx_type": "time", + "nullable": false, + "type": { + "time": [ + "ns", + null + ] + } + } + }, + "column_map": [ + { + "column_id": 0, + "name": "f1" + }, + { + "column_id": 1, + "name": "t1" + }, + { + "column_id": 2, + "name": "time" + }, + { + "column_id": 3, + "name": "f2" + } + ], + "next_column_id": 4, + "table_id": 0, + "table_name": "table" } - ], - "next_column_id": 4, - "table_id": 0, - "table_name": "table" - } - ] - } + ] + ] + } + ] ], "db_map": [ { diff --git a/rust-toolchain.toml b/rust-toolchain.toml index dc8d20d3611..b7a0050e19b 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,3 +1,3 @@ [toolchain] -channel = "1.80.0" +channel = "1.82.0" components = ["rustfmt", "clippy", "rust-analyzer"]