diff --git a/Cargo.lock b/Cargo.lock index 5bdee84c16c..7e00a60ddcd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2680,7 +2680,9 @@ dependencies = [ name = "influxdb3_id" version = "0.1.0" dependencies = [ + "hashbrown 0.14.5", "serde", + "serde_json", ] [[package]] diff --git a/influxdb3_catalog/src/catalog.rs b/influxdb3_catalog/src/catalog.rs index a5290eedbbe..d3a7123a933 100644 --- a/influxdb3_catalog/src/catalog.rs +++ b/influxdb3_catalog/src/catalog.rs @@ -3,7 +3,7 @@ use crate::catalog::Error::TableNotFound; use arrow::datatypes::SchemaRef; use bimap::BiHashMap; -use influxdb3_id::{ColumnId, DbId, TableId}; +use influxdb3_id::{ColumnId, DbId, SerdeVecHashMap, TableId}; use influxdb3_wal::{ CatalogBatch, CatalogOp, FieldAdditions, LastCacheDefinition, LastCacheDelete, }; @@ -12,7 +12,7 @@ use observability_deps::tracing::info; use parking_lot::RwLock; use schema::{InfluxColumnType, InfluxFieldType, Schema, SchemaBuilder}; use serde::{Deserialize, Serialize, Serializer}; -use std::collections::{BTreeMap, HashMap}; +use std::collections::BTreeMap; use std::sync::Arc; use thiserror::Error; @@ -284,8 +284,7 @@ impl Catalog { #[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Default)] pub struct InnerCatalog { /// The catalog is a map of databases with their table schemas - #[serde_as(as = "DatabasesAsArray")] - databases: HashMap>, + databases: SerdeVecHashMap>, sequence: SequenceNumber, /// The host_id is the prefix that is passed in when starting up (`host_identifier_prefix`) host_id: Arc, @@ -351,55 +350,10 @@ serde_with::serde_conv!( } ); -serde_with::serde_conv!( - DatabasesAsArray, - HashMap>, - |map: &HashMap>| { - map.values().fold(Vec::new(), |mut acc, db| { - acc.push(DatabasesSerialized { - id: db.id, - name: Arc::clone(&db.name), - tables: db.tables.values().cloned().collect(), - }); - acc - }) - }, - |vec: Vec| -> Result<_, String> { - vec.into_iter().fold(Ok(HashMap::new()), |acc, db| { - let mut acc = acc?; - let mut table_map = BiHashMap::new(); - if let Some(_) = acc.insert(db.id, Arc::new(DatabaseSchema { - id: db.id, - name: Arc::clone(&db.name), - tables: db.tables.into_iter().fold(Ok(BTreeMap::new()), |acc, table| { - let mut acc = acc?; - let table_name = Arc::clone(&table.table_name); - table_map.insert(table.table_id, Arc::clone(&table_name)); - if let Some(_) = acc.insert(table.table_id, table) { - return Err(format!("found duplicate table: {}", table_name)); - } - Ok(acc) - })?, - table_map - })) { - return Err(format!("found duplicate db: {}", db.name)); - } - Ok(acc) - }) - } -); - -#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone, Default)] -struct DatabasesSerialized { - pub id: DbId, - pub name: Arc, - pub tables: Vec, -} - impl InnerCatalog { pub(crate) fn new(host_id: Arc, instance_id: Arc) -> Self { Self { - databases: HashMap::new(), + databases: SerdeVecHashMap::new(), sequence: SequenceNumber::new(0), host_id, instance_id, @@ -466,7 +420,7 @@ pub struct DatabaseSchema { pub id: DbId, pub name: Arc, /// The database is a map of tables - pub tables: BTreeMap, + pub tables: SerdeVecHashMap, #[serde_as(as = "TableMapAsArray")] pub table_map: BiHashMap>, } @@ -476,7 +430,7 @@ impl DatabaseSchema { Self { id, name, - tables: BTreeMap::new(), + tables: Default::default(), table_map: BiHashMap::new(), } } @@ -485,7 +439,7 @@ impl DatabaseSchema { /// everything is compatible and there are no updates to the existing schema, None will be /// returned, otherwise a new `DatabaseSchema` will be returned with the updates applied. pub fn new_if_updated_from_batch(&self, catalog_batch: &CatalogBatch) -> Result> { - let mut updated_or_new_tables = BTreeMap::new(); + let mut updated_or_new_tables = SerdeVecHashMap::new(); for catalog_op in &catalog_batch.ops { match catalog_op { @@ -1031,7 +985,6 @@ pub fn influx_column_type_from_field_value(fv: &FieldValue<'_>) -> InfluxColumnT #[cfg(test)] mod tests { - use insta::assert_json_snapshot; use pretty_assertions::assert_eq; use test_helpers::assert_contains; @@ -1048,7 +1001,7 @@ mod tests { let mut database = DatabaseSchema { id: DbId::from(0), name: "test_db".into(), - tables: BTreeMap::new(), + tables: SerdeVecHashMap::new(), table_map: { let mut map = BiHashMap::new(); map.insert(TableId::from(1), "test_table_1".into()); @@ -1104,10 +1057,6 @@ mod tests { .databases .insert(database.id, Arc::new(database)); - // Perform a snapshot test to check that the JSON serialized catalog does not change in an - // undesired way when introducing features etc. - assert_json_snapshot!(catalog); - // Serialize/deserialize to ensure roundtrip to/from JSON let serialized = serde_json::to_string(&catalog).unwrap(); let deserialized_inner: InnerCatalog = serde_json::from_str(&serialized).unwrap(); @@ -1122,85 +1071,116 @@ mod tests { { let json = r#"{ "databases": [ - { - "id": 0, - "name": "db1", - "tables": [] - }, - { - "id": 0, - "name": "db1", - "tables": [] - } - ] + [ + 0, + { + "id": 0, + "name": "db1", + "tables": [], + "table_map": [] + } + ], + [ + 0, + { + "id": 0, + "name": "db1", + "tables": [], + "table_map": [] + } + ] + ], + "sequence": 0, + "host_id": "test", + "instance_id": "test", + "db_map": [] }"#; let err = serde_json::from_str::(json).unwrap_err(); - assert_contains!(err.to_string(), "found duplicate db: db1"); + assert_contains!(err.to_string(), "duplicate key found"); } // Duplicate tables { let json = r#"{ "databases": [ - { - "id": 0, - "name": "db1", - "tables": [ - { - "table_id": 0, - "table_name": "tbl1", - "cols": {}, - "column_map": [], - "next_column_id": 0 - }, - { - "table_id": 0, - "table_name": "tbl1", - "cols": {}, - "column_map": [], - "next_column_id": 0 - } - ] - } - ] + [ + 0, + { + "id": 0, + "name": "db1", + "tables": [ + [ + 0, + { + "table_id": 0, + "table_name": "tbl1", + "cols": {}, + "column_map": [], + "next_column_id": 0 + } + ], + [ + 0, + { + "table_id": 0, + "table_name": "tbl1", + "cols": {}, + "column_map": [], + "next_column_id": 0 + } + ] + ] + } + ] + ], + "sequence": 0, + "host_id": "test", + "instance_id": "test", + "db_map": [] }"#; let err = serde_json::from_str::(json).unwrap_err(); - assert_contains!(err.to_string(), "found duplicate table: tbl1"); + assert_contains!(err.to_string(), "duplicate key found"); } // Duplicate columns { let json = r#"{ "databases": [ - { - "id": 0, - "name": "db1", - "tables": [ - { - "table_id": 0, - "table_name": "tbl1", - "cols": { - "col1": { - "column_id": 0, - "type": "i64", - "influx_type": "field", - "nullable": true - }, - "col1": { - "column_id": 0, - "type": "u64", - "influx_type": "field", - "nullable": true - } - }, - "column_map": [ + [ + 0, + { + "id": 0, + "name": "db1", + "tables": [ + [ + 0, { - "column_id": 0, - "name": "col1" + "table_id": 0, + "table_name": "tbl1", + "cols": { + "col1": { + "column_id": 0, + "type": "i64", + "influx_type": "field", + "nullable": true + }, + "col1": { + "column_id": 0, + "type": "u64", + "influx_type": "field", + "nullable": true + } + }, + "column_map": [ + { + "column_id": 0, + "name": "col1" + } + ], + "next_column_id": 1 } - ], - "next_column_id": 1 - } - ] - } + ] + ] + } + ] ] }"#; let err = serde_json::from_str::(json).unwrap_err(); @@ -1213,7 +1193,7 @@ mod tests { let mut database = DatabaseSchema { id: DbId::from(0), name: "test".into(), - tables: BTreeMap::new(), + tables: SerdeVecHashMap::new(), table_map: BiHashMap::new(), }; database.tables.insert( @@ -1256,7 +1236,7 @@ mod tests { let mut database = DatabaseSchema { id: DbId::from(0), name: "test_db".into(), - tables: BTreeMap::new(), + tables: SerdeVecHashMap::new(), table_map: { let mut map = BiHashMap::new(); map.insert(TableId::from(1), "test_table_1".into()); @@ -1291,8 +1271,6 @@ mod tests { .databases .insert(database.id, Arc::new(database)); - assert_json_snapshot!(catalog); - let serialized = serde_json::to_string(&catalog).unwrap(); let deserialized_inner: InnerCatalog = serde_json::from_str(&serialized).unwrap(); let deserialized = Catalog::from_inner(deserialized_inner); @@ -1307,7 +1285,7 @@ mod tests { let mut database = DatabaseSchema { id: DbId::from(0), name: "test_db".into(), - tables: BTreeMap::new(), + tables: SerdeVecHashMap::new(), table_map: { let mut map = BiHashMap::new(); map.insert(TableId::from(0), "test".into()); @@ -1348,8 +1326,6 @@ mod tests { .databases .insert(database.id, Arc::new(database)); - assert_json_snapshot!(catalog); - let serialized = serde_json::to_string(&catalog).unwrap(); let deserialized_inner: InnerCatalog = serde_json::from_str(&serialized).unwrap(); let deserialized = Catalog::from_inner(deserialized_inner); diff --git a/influxdb3_catalog/src/snapshots/influxdb3_catalog__catalog__tests__catalog_serialization.snap b/influxdb3_catalog/src/snapshots/influxdb3_catalog__catalog__tests__catalog_serialization.snap deleted file mode 100644 index 0d2887a4f6d..00000000000 --- a/influxdb3_catalog/src/snapshots/influxdb3_catalog__catalog__tests__catalog_serialization.snap +++ /dev/null @@ -1,256 +0,0 @@ ---- -source: influxdb3_catalog/src/catalog.rs -expression: catalog ---- -{ - "databases": [ - { - "id": 0, - "name": "test_db", - "tables": [ - { - "table_id": 1, - "table_name": "test_table_1", - "cols": { - "bool_field": { - "column_id": 0, - "type": "bool", - "influx_type": "field", - "nullable": true - }, - "f64_field": { - "column_id": 1, - "type": "f64", - "influx_type": "field", - "nullable": true - }, - "i64_field": { - "column_id": 2, - "type": "i64", - "influx_type": "field", - "nullable": true - }, - "string_field": { - "column_id": 3, - "type": "str", - "influx_type": "field", - "nullable": true - }, - "tag_1": { - "column_id": 4, - "type": { - "dict": [ - "i32", - "str" - ] - }, - "influx_type": "tag", - "nullable": true - }, - "tag_2": { - "column_id": 5, - "type": { - "dict": [ - "i32", - "str" - ] - }, - "influx_type": "tag", - "nullable": true - }, - "tag_3": { - "column_id": 6, - "type": { - "dict": [ - "i32", - "str" - ] - }, - "influx_type": "tag", - "nullable": true - }, - "time": { - "column_id": 7, - "type": { - "time": [ - "ns", - null - ] - }, - "influx_type": "time", - "nullable": false - }, - "u64_field": { - "column_id": 8, - "type": "u64", - "influx_type": "field", - "nullable": true - } - }, - "column_map": [ - { - "column_id": 0, - "name": "bool_field" - }, - { - "column_id": 1, - "name": "f64_field" - }, - { - "column_id": 2, - "name": "i64_field" - }, - { - "column_id": 3, - "name": "string_field" - }, - { - "column_id": 4, - "name": "tag_1" - }, - { - "column_id": 5, - "name": "tag_2" - }, - { - "column_id": 6, - "name": "tag_3" - }, - { - "column_id": 7, - "name": "time" - }, - { - "column_id": 8, - "name": "u64_field" - } - ], - "next_column_id": 9 - }, - { - "table_id": 2, - "table_name": "test_table_2", - "cols": { - "bool_field": { - "column_id": 0, - "type": "bool", - "influx_type": "field", - "nullable": true - }, - "f64_field": { - "column_id": 1, - "type": "f64", - "influx_type": "field", - "nullable": true - }, - "i64_field": { - "column_id": 2, - "type": "i64", - "influx_type": "field", - "nullable": true - }, - "string_field": { - "column_id": 3, - "type": "str", - "influx_type": "field", - "nullable": true - }, - "tag_1": { - "column_id": 4, - "type": { - "dict": [ - "i32", - "str" - ] - }, - "influx_type": "tag", - "nullable": true - }, - "tag_2": { - "column_id": 5, - "type": { - "dict": [ - "i32", - "str" - ] - }, - "influx_type": "tag", - "nullable": true - }, - "tag_3": { - "column_id": 6, - "type": { - "dict": [ - "i32", - "str" - ] - }, - "influx_type": "tag", - "nullable": true - }, - "time": { - "column_id": 7, - "type": { - "time": [ - "ns", - null - ] - }, - "influx_type": "time", - "nullable": false - }, - "u64_field": { - "column_id": 8, - "type": "u64", - "influx_type": "field", - "nullable": true - } - }, - "column_map": [ - { - "column_id": 0, - "name": "bool_field" - }, - { - "column_id": 1, - "name": "f64_field" - }, - { - "column_id": 2, - "name": "i64_field" - }, - { - "column_id": 3, - "name": "string_field" - }, - { - "column_id": 4, - "name": "tag_1" - }, - { - "column_id": 5, - "name": "tag_2" - }, - { - "column_id": 6, - "name": "tag_3" - }, - { - "column_id": 7, - "name": "time" - }, - { - "column_id": 8, - "name": "u64_field" - } - ], - "next_column_id": 9 - } - ] - } - ], - "sequence": 0, - "host_id": "sample-host-id", - "instance_id": "instance-id", - "db_map": [] -} diff --git a/influxdb3_catalog/src/snapshots/influxdb3_catalog__catalog__tests__serialize_last_cache.snap b/influxdb3_catalog/src/snapshots/influxdb3_catalog__catalog__tests__serialize_last_cache.snap deleted file mode 100644 index c99f4daec98..00000000000 --- a/influxdb3_catalog/src/snapshots/influxdb3_catalog__catalog__tests__serialize_last_cache.snap +++ /dev/null @@ -1,113 +0,0 @@ ---- -source: influxdb3_catalog/src/catalog.rs -expression: catalog ---- -{ - "databases": [ - { - "id": 0, - "name": "test_db", - "tables": [ - { - "table_id": 0, - "table_name": "test", - "cols": { - "field": { - "column_id": 0, - "type": "str", - "influx_type": "field", - "nullable": true - }, - "tag_1": { - "column_id": 1, - "type": { - "dict": [ - "i32", - "str" - ] - }, - "influx_type": "tag", - "nullable": true - }, - "tag_2": { - "column_id": 2, - "type": { - "dict": [ - "i32", - "str" - ] - }, - "influx_type": "tag", - "nullable": true - }, - "tag_3": { - "column_id": 3, - "type": { - "dict": [ - "i32", - "str" - ] - }, - "influx_type": "tag", - "nullable": true - }, - "time": { - "column_id": 4, - "type": { - "time": [ - "ns", - null - ] - }, - "influx_type": "time", - "nullable": false - } - }, - "last_caches": [ - { - "table_id": 0, - "table": "test", - "name": "test_table_last_cache", - "keys": [ - "tag_2", - "tag_3" - ], - "vals": [ - "field" - ], - "n": 1, - "ttl": 600 - } - ], - "column_map": [ - { - "column_id": 0, - "name": "field" - }, - { - "column_id": 1, - "name": "tag_1" - }, - { - "column_id": 2, - "name": "tag_2" - }, - { - "column_id": 3, - "name": "tag_3" - }, - { - "column_id": 4, - "name": "time" - } - ], - "next_column_id": 5 - } - ] - } - ], - "sequence": 0, - "host_id": "sample-host-id", - "instance_id": "instance-id", - "db_map": [] -} diff --git a/influxdb3_catalog/src/snapshots/influxdb3_catalog__catalog__tests__serialize_series_keys.snap b/influxdb3_catalog/src/snapshots/influxdb3_catalog__catalog__tests__serialize_series_keys.snap deleted file mode 100644 index a8a8004bc33..00000000000 --- a/influxdb3_catalog/src/snapshots/influxdb3_catalog__catalog__tests__serialize_series_keys.snap +++ /dev/null @@ -1,102 +0,0 @@ ---- -source: influxdb3_catalog/src/catalog.rs -expression: catalog ---- -{ - "databases": [ - { - "id": 0, - "name": "test_db", - "tables": [ - { - "table_id": 1, - "table_name": "test_table_1", - "key": [ - "tag_1", - "tag_2", - "tag_3" - ], - "cols": { - "field": { - "column_id": 0, - "type": "str", - "influx_type": "field", - "nullable": true - }, - "tag_1": { - "column_id": 1, - "type": { - "dict": [ - "i32", - "str" - ] - }, - "influx_type": "tag", - "nullable": false - }, - "tag_2": { - "column_id": 2, - "type": { - "dict": [ - "i32", - "str" - ] - }, - "influx_type": "tag", - "nullable": false - }, - "tag_3": { - "column_id": 3, - "type": { - "dict": [ - "i32", - "str" - ] - }, - "influx_type": "tag", - "nullable": false - }, - "time": { - "column_id": 4, - "type": { - "time": [ - "ns", - null - ] - }, - "influx_type": "time", - "nullable": false - } - }, - "column_map": [ - { - "column_id": 0, - "name": "field" - }, - { - "column_id": 1, - "name": "tag_1" - }, - { - "column_id": 2, - "name": "tag_2" - }, - { - "column_id": 3, - "name": "tag_3" - }, - { - "column_id": 4, - "name": "time" - } - ], - "next_column_id": 5 - } - ] - } - ], - "sequence": 0, - "host_id": "sample-host-id", - "instance_id": "instance-id", - "db_map": [] -} diff --git a/influxdb3_id/Cargo.toml b/influxdb3_id/Cargo.toml index 0d408e55797..4cf2c9cf2fa 100644 --- a/influxdb3_id/Cargo.toml +++ b/influxdb3_id/Cargo.toml @@ -6,7 +6,11 @@ edition.workspace = true license.workspace = true [dependencies] +hashbrown.workspace = true serde.workspace = true +[dev-dependencies] +serde_json.workspace = true + [lints] workspace = true diff --git a/influxdb3_id/src/lib.rs b/influxdb3_id/src/lib.rs index aefa812d588..bc9defd816c 100644 --- a/influxdb3_id/src/lib.rs +++ b/influxdb3_id/src/lib.rs @@ -5,6 +5,9 @@ use std::sync::atomic::AtomicU32; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; +mod serialize; +pub use serialize::SerdeVecHashMap; + #[derive(Debug, Copy, Clone, Eq, PartialOrd, Ord, PartialEq, Serialize, Deserialize, Hash)] pub struct DbId(u32); diff --git a/influxdb3_id/src/serialize.rs b/influxdb3_id/src/serialize.rs new file mode 100644 index 00000000000..bdaf522f2b9 --- /dev/null +++ b/influxdb3_id/src/serialize.rs @@ -0,0 +1,207 @@ +use std::{ + marker::PhantomData, + ops::{Deref, DerefMut}, +}; + +use hashbrown::{ + hash_map::{IntoIter, Iter, IterMut}, + HashMap, +}; +use serde::{ + de::{self, SeqAccess, Visitor}, + ser::SerializeSeq, + Deserialize, Deserializer, Serialize, Serializer, +}; + +/// A new-type around a `HashMap` that provides special serialization and deserialization behaviour. +/// +/// Specifically, it will be serialized as a vector of tuples, each tuple containing a key-value +/// pair from the map. Deserialization assumes said serialization, and deserializes from the vector +/// of tuples back into the map. Traits like `Deref`, `From`, etc. are implemented on this type such +/// that it can be used as a `HashMap`. +/// +/// During deserialization, there are no duplicate keys allowed. If duplicates are found, an error +/// will be thrown. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct SerdeVecHashMap(HashMap); + +impl SerdeVecHashMap +where + K: Eq + std::hash::Hash, +{ + pub fn new() -> Self { + Self::default() + } +} + +impl Default for SerdeVecHashMap +where + K: Eq + std::hash::Hash, +{ + fn default() -> Self { + Self(Default::default()) + } +} + +impl From for SerdeVecHashMap +where + K: Eq + std::hash::Hash, + T: Into>, +{ + fn from(value: T) -> Self { + Self(value.into()) + } +} + +impl IntoIterator for SerdeVecHashMap +where + K: Eq + std::hash::Hash, +{ + type Item = (K, V); + + type IntoIter = IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.0.into_iter() + } +} + +impl<'a, K, V> IntoIterator for &'a SerdeVecHashMap +where + K: Eq + std::hash::Hash, +{ + type Item = (&'a K, &'a V); + + type IntoIter = Iter<'a, K, V>; + + fn into_iter(self) -> Self::IntoIter { + self.0.iter() + } +} + +impl<'a, K, V> IntoIterator for &'a mut SerdeVecHashMap +where + K: Eq + std::hash::Hash, +{ + type Item = (&'a K, &'a mut V); + + type IntoIter = IterMut<'a, K, V>; + + fn into_iter(self) -> Self::IntoIter { + self.0.iter_mut() + } +} + +impl Deref for SerdeVecHashMap +where + K: Eq + std::hash::Hash, +{ + type Target = HashMap; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for SerdeVecHashMap +where + K: Eq + std::hash::Hash, +{ + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl Serialize for SerdeVecHashMap +where + K: Eq + std::hash::Hash + Serialize, + V: Serialize, +{ + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let mut seq = serializer.serialize_seq(Some(self.len()))?; + for ele in self.iter() { + seq.serialize_element(&ele)?; + } + seq.end() + } +} + +impl<'de, K, V> Deserialize<'de> for SerdeVecHashMap +where + K: Eq + std::hash::Hash + Deserialize<'de>, + V: Deserialize<'de>, +{ + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + let v = deserializer.deserialize_seq(VecVisitor::new())?; + let mut map = HashMap::with_capacity(v.len()); + for (k, v) in v.into_iter() { + if map.insert(k, v).is_some() { + return Err(de::Error::custom("duplicate key found")); + } + } + Ok(Self(map)) + } +} + +type Output = fn() -> Vec<(K, V)>; + +struct VecVisitor { + marker: PhantomData>, +} + +impl VecVisitor { + fn new() -> Self { + Self { + marker: PhantomData, + } + } +} + +impl<'de, K, V> Visitor<'de> for VecVisitor +where + K: Deserialize<'de>, + V: Deserialize<'de>, +{ + type Value = Vec<(K, V)>; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("a vector of key value pairs") + } + + fn visit_seq(self, mut seq: A) -> Result + where + A: SeqAccess<'de>, + { + let mut v = Vec::with_capacity(seq.size_hint().unwrap_or(0)); + while let Some(ele) = seq.next_element()? { + v.push(ele); + } + Ok(v) + } +} + +#[cfg(test)] +mod tests { + use hashbrown::HashMap; + + use super::SerdeVecHashMap; + + #[test] + fn serde_vec_map_with_json() { + let map = HashMap::::from_iter([(0, "foo"), (1, "bar"), (2, "baz")]); + let serde_vec_map = SerdeVecHashMap::from(map); + // test round-trip to JSON: + let s = serde_json::to_string(&serde_vec_map).unwrap(); + // with using a hashmap the order changes so asserting on the JSON itself is flaky, so if + // you want to see it working use --nocapture on the test... + println!("{s}"); + let d: SerdeVecHashMap = serde_json::from_str(&s).unwrap(); + assert_eq!(d, serde_vec_map); + } +} diff --git a/influxdb3_telemetry/src/stats.rs b/influxdb3_telemetry/src/stats.rs index 9e5c751f52e..b7131cebd40 100644 --- a/influxdb3_telemetry/src/stats.rs +++ b/influxdb3_telemetry/src/stats.rs @@ -21,10 +21,6 @@ pub(crate) struct RollingStats { } impl RollingStats { - pub fn new() -> RollingStats { - RollingStats::default() - } - /// Update the rolling stats [`Self::min`]/[`Self::max`]/[`Self::avg`] using /// reference to an higher precision stats that is passed in. This is usually a /// per minute interval stats. One thing to note here is the [`Self::num_samples`] @@ -70,10 +66,6 @@ pub(crate) struct Stats { } impl Stats { - pub fn new() -> Stats { - Stats::default() - } - /// Update the [`Self::min`]/[`Self::max`]/[`Self::avg`] from a /// new value that is sampled. pub fn update(&mut self, new_val: T) -> Option<()> { diff --git a/influxdb3_wal/src/lib.rs b/influxdb3_wal/src/lib.rs index 6873074d27e..ed4599e2bc0 100644 --- a/influxdb3_wal/src/lib.rs +++ b/influxdb3_wal/src/lib.rs @@ -11,7 +11,7 @@ use crate::snapshot_tracker::SnapshotInfo; use async_trait::async_trait; use data_types::Timestamp; use hashbrown::HashMap; -use influxdb3_id::{DbId, TableId}; +use influxdb3_id::{DbId, SerdeVecHashMap, TableId}; use influxdb_line_protocol::v3::SeriesValue; use influxdb_line_protocol::FieldValue; use iox_time::Time; @@ -442,46 +442,11 @@ pub struct LastCacheDelete { pub struct WriteBatch { pub database_id: DbId, pub database_name: Arc, - #[serde_as(as = "TableChunksMapAsVec")] - pub table_chunks: HashMap, + pub table_chunks: SerdeVecHashMap, pub min_time_ns: i64, pub max_time_ns: i64, } -#[derive(Debug, Serialize, Deserialize)] -pub struct TableChunksMap { - table_id: TableId, - min_time: i64, - max_time: i64, - chunk_time_to_chunk: HashMap, -} - -serde_with::serde_conv!( - TableChunksMapAsVec, - HashMap, - |map: &HashMap| - map.iter() - .map(|(table_id, chunk)| { - TableChunksMap { - table_id: *table_id, - min_time: chunk.min_time, - max_time: chunk.max_time, - chunk_time_to_chunk: chunk.chunk_time_to_chunk.clone() - } - }) - .collect::>(), - |vec: Vec| -> Result<_, std::convert::Infallible> { - Ok(vec.into_iter().fold(HashMap::new(), |mut acc, chunk| { - acc.insert(chunk.table_id, TableChunks{ - min_time: chunk.min_time, - max_time: chunk.max_time, - chunk_time_to_chunk: chunk.chunk_time_to_chunk - }); - acc - })) - } -); - impl WriteBatch { pub fn new( database_id: DbId, @@ -502,7 +467,7 @@ impl WriteBatch { Self { database_id, database_name, - table_chunks, + table_chunks: table_chunks.into(), min_time_ns, max_time_ns, } @@ -510,7 +475,7 @@ impl WriteBatch { pub fn add_write_batch( &mut self, - new_table_chunks: HashMap, + new_table_chunks: SerdeVecHashMap, min_time_ns: i64, max_time_ns: i64, ) { diff --git a/influxdb3_wal/src/object_store.rs b/influxdb3_wal/src/object_store.rs index eeb2d005fa3..14dbe4c02f3 100644 --- a/influxdb3_wal/src/object_store.rs +++ b/influxdb3_wal/src/object_store.rs @@ -681,7 +681,8 @@ mod tests { }, )]), }, - )]), + )]) + .into(), min_time_ns: 1, max_time_ns: 3, }); @@ -714,7 +715,8 @@ mod tests { }, )]), }, - )]), + )]) + .into(), min_time_ns: 62_000000000, max_time_ns: 62_000000000, }); @@ -782,7 +784,8 @@ mod tests { }, )]), }, - )]), + )]) + .into(), min_time_ns: 1, max_time_ns: 62_000000000, })], @@ -824,7 +827,8 @@ mod tests { }, )]), }, - )]), + )]) + .into(), min_time_ns: 62_000000000, max_time_ns: 62_000000000, })], @@ -896,7 +900,8 @@ mod tests { }, )]), }, - )]), + )]) + .into(), min_time_ns: 128_000000000, max_time_ns: 128_000000000, }); @@ -956,7 +961,8 @@ mod tests { }, )]), }, - )]), + )]) + .into(), min_time_ns: 128_000000000, max_time_ns: 128_000000000, })], diff --git a/influxdb3_wal/src/serialize.rs b/influxdb3_wal/src/serialize.rs index da52c132737..1b614393833 100644 --- a/influxdb3_wal/src/serialize.rs +++ b/influxdb3_wal/src/serialize.rs @@ -91,8 +91,7 @@ mod tests { use crate::{ Field, FieldData, Row, TableChunk, TableChunks, WalFileSequenceNumber, WalOp, WriteBatch, }; - use hashbrown::HashMap; - use influxdb3_id::{DbId, TableId}; + use influxdb3_id::{DbId, SerdeVecHashMap, TableId}; #[test] fn test_serialize_deserialize() { @@ -117,7 +116,7 @@ mod tests { chunk_time_to_chunk: [(1, chunk)].iter().cloned().collect(), }; let table_id = TableId::from(2); - let mut table_chunks = HashMap::new(); + let mut table_chunks = SerdeVecHashMap::new(); table_chunks.insert(table_id, chunks); let contents = WalContents { diff --git a/influxdb3_write/src/last_cache/mod.rs b/influxdb3_write/src/last_cache/mod.rs index e26cef2f187..b60c6c2583f 100644 --- a/influxdb3_write/src/last_cache/mod.rs +++ b/influxdb3_write/src/last_cache/mod.rs @@ -1586,7 +1586,7 @@ fn data_type_from_buffer_field(field: &Field) -> DataType { #[cfg(test)] mod tests { - use std::{cmp::Ordering, collections::BTreeMap, sync::Arc, time::Duration}; + use std::{cmp::Ordering, sync::Arc, time::Duration}; use crate::{ last_cache::{KeyValue, LastCacheProvider, Predicate, DEFAULT_CACHE_TTL}, @@ -1600,7 +1600,7 @@ mod tests { use bimap::BiHashMap; use data_types::NamespaceName; use influxdb3_catalog::catalog::{Catalog, DatabaseSchema, TableDefinition}; - use influxdb3_id::{DbId, TableId}; + use influxdb3_id::{DbId, SerdeVecHashMap, TableId}; use influxdb3_wal::{LastCacheDefinition, WalConfig}; use insta::assert_json_snapshot; use iox_time::{MockProvider, Time, TimeProvider}; @@ -3114,7 +3114,7 @@ mod tests { let mut database = DatabaseSchema { id: DbId::from(0), name: db_name.into(), - tables: BTreeMap::new(), + tables: SerdeVecHashMap::new(), table_map: { let mut map = BiHashMap::new(); map.insert(TableId::from(0), "test_table_1".into()); diff --git a/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-after-last-cache-create-and-new-field.snap b/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-after-last-cache-create-and-new-field.snap index b57528e66a1..fcacd3e0fc4 100644 --- a/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-after-last-cache-create-and-new-field.snap +++ b/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-after-last-cache-create-and-new-field.snap @@ -4,84 +4,96 @@ expression: catalog_json --- { "databases": [ - { - "id": 0, - "name": "db", - "tables": [ - { - "cols": { - "f1": { - "column_id": 0, - "influx_type": "field", - "nullable": true, - "type": "bool" - }, - "f2": { - "column_id": 3, - "influx_type": "field", - "nullable": true, - "type": "i64" - }, - "t1": { - "column_id": 1, - "influx_type": "tag", - "nullable": true, - "type": { - "dict": [ - "i32", - "str" - ] - } - }, - "time": { - "column_id": 2, - "influx_type": "time", - "nullable": false, - "type": { - "time": [ - "ns", - null - ] - } - } - }, - "column_map": [ - { - "column_id": 0, - "name": "f1" - }, - { - "column_id": 1, - "name": "t1" - }, - { - "column_id": 2, - "name": "time" - }, + [ + 0, + { + "id": 0, + "name": "db", + "table_map": [ + { + "name": "table", + "table_id": 0 + } + ], + "tables": [ + [ + 0, { - "column_id": 3, - "name": "f2" - } - ], - "last_caches": [ - { - "keys": [ - "t1" + "cols": { + "f1": { + "column_id": 0, + "influx_type": "field", + "nullable": true, + "type": "bool" + }, + "f2": { + "column_id": 3, + "influx_type": "field", + "nullable": true, + "type": "i64" + }, + "t1": { + "column_id": 1, + "influx_type": "tag", + "nullable": true, + "type": { + "dict": [ + "i32", + "str" + ] + } + }, + "time": { + "column_id": 2, + "influx_type": "time", + "nullable": false, + "type": { + "time": [ + "ns", + null + ] + } + } + }, + "column_map": [ + { + "column_id": 0, + "name": "f1" + }, + { + "column_id": 1, + "name": "t1" + }, + { + "column_id": 2, + "name": "time" + }, + { + "column_id": 3, + "name": "f2" + } ], - "n": 1, - "name": "cache", - "table": "table", + "last_caches": [ + { + "keys": [ + "t1" + ], + "n": 1, + "name": "cache", + "table": "table", + "table_id": 0, + "ttl": 14400, + "vals": null + } + ], + "next_column_id": 4, "table_id": 0, - "ttl": 14400, - "vals": null + "table_name": "table" } - ], - "next_column_id": 4, - "table_id": 0, - "table_name": "table" - } - ] - } + ] + ] + } + ] ], "db_map": [ { diff --git a/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-immediately-after-last-cache-create.snap b/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-immediately-after-last-cache-create.snap index 8c1c4c05db1..0512ccd036d 100644 --- a/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-immediately-after-last-cache-create.snap +++ b/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-immediately-after-last-cache-create.snap @@ -4,74 +4,86 @@ expression: catalog_json --- { "databases": [ - { - "id": 0, - "name": "db", - "tables": [ - { - "cols": { - "f1": { - "column_id": 0, - "influx_type": "field", - "nullable": true, - "type": "bool" - }, - "t1": { - "column_id": 1, - "influx_type": "tag", - "nullable": true, - "type": { - "dict": [ - "i32", - "str" - ] - } - }, - "time": { - "column_id": 2, - "influx_type": "time", - "nullable": false, - "type": { - "time": [ - "ns", - null - ] - } - } - }, - "column_map": [ - { - "column_id": 0, - "name": "f1" - }, - { - "column_id": 1, - "name": "t1" - }, + [ + 0, + { + "id": 0, + "name": "db", + "table_map": [ + { + "name": "table", + "table_id": 0 + } + ], + "tables": [ + [ + 0, { - "column_id": 2, - "name": "time" - } - ], - "last_caches": [ - { - "keys": [ - "t1" + "cols": { + "f1": { + "column_id": 0, + "influx_type": "field", + "nullable": true, + "type": "bool" + }, + "t1": { + "column_id": 1, + "influx_type": "tag", + "nullable": true, + "type": { + "dict": [ + "i32", + "str" + ] + } + }, + "time": { + "column_id": 2, + "influx_type": "time", + "nullable": false, + "type": { + "time": [ + "ns", + null + ] + } + } + }, + "column_map": [ + { + "column_id": 0, + "name": "f1" + }, + { + "column_id": 1, + "name": "t1" + }, + { + "column_id": 2, + "name": "time" + } ], - "n": 1, - "name": "cache", - "table": "table", + "last_caches": [ + { + "keys": [ + "t1" + ], + "n": 1, + "name": "cache", + "table": "table", + "table_id": 0, + "ttl": 14400, + "vals": null + } + ], + "next_column_id": 3, "table_id": 0, - "ttl": 14400, - "vals": null + "table_name": "table" } - ], - "next_column_id": 3, - "table_id": 0, - "table_name": "table" - } - ] - } + ] + ] + } + ] ], "db_map": [ { diff --git a/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-immediately-after-last-cache-delete.snap b/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-immediately-after-last-cache-delete.snap index 4704183b763..9925b2f06ab 100644 --- a/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-immediately-after-last-cache-delete.snap +++ b/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-immediately-after-last-cache-delete.snap @@ -4,71 +4,83 @@ expression: catalog_json --- { "databases": [ - { - "id": 0, - "name": "db", - "tables": [ - { - "cols": { - "f1": { - "column_id": 0, - "influx_type": "field", - "nullable": true, - "type": "bool" - }, - "f2": { - "column_id": 3, - "influx_type": "field", - "nullable": true, - "type": "i64" - }, - "t1": { - "column_id": 1, - "influx_type": "tag", - "nullable": true, - "type": { - "dict": [ - "i32", - "str" - ] - } - }, - "time": { - "column_id": 2, - "influx_type": "time", - "nullable": false, - "type": { - "time": [ - "ns", - null - ] - } - } - }, - "column_map": [ - { - "column_id": 0, - "name": "f1" - }, + [ + 0, + { + "id": 0, + "name": "db", + "table_map": [ + { + "name": "table", + "table_id": 0 + } + ], + "tables": [ + [ + 0, { - "column_id": 1, - "name": "t1" - }, - { - "column_id": 2, - "name": "time" - }, - { - "column_id": 3, - "name": "f2" + "cols": { + "f1": { + "column_id": 0, + "influx_type": "field", + "nullable": true, + "type": "bool" + }, + "f2": { + "column_id": 3, + "influx_type": "field", + "nullable": true, + "type": "i64" + }, + "t1": { + "column_id": 1, + "influx_type": "tag", + "nullable": true, + "type": { + "dict": [ + "i32", + "str" + ] + } + }, + "time": { + "column_id": 2, + "influx_type": "time", + "nullable": false, + "type": { + "time": [ + "ns", + null + ] + } + } + }, + "column_map": [ + { + "column_id": 0, + "name": "f1" + }, + { + "column_id": 1, + "name": "t1" + }, + { + "column_id": 2, + "name": "time" + }, + { + "column_id": 3, + "name": "f2" + } + ], + "next_column_id": 4, + "table_id": 0, + "table_name": "table" } - ], - "next_column_id": 4, - "table_id": 0, - "table_name": "table" - } - ] - } + ] + ] + } + ] ], "db_map": [ { diff --git a/rust-toolchain.toml b/rust-toolchain.toml index dc8d20d3611..b7a0050e19b 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,3 +1,3 @@ [toolchain] -channel = "1.80.0" +channel = "1.82.0" components = ["rustfmt", "clippy", "rust-analyzer"]