diff --git a/Cargo.lock b/Cargo.lock index 29b213b86e0..a19bfdb7ecc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3005,6 +3005,12 @@ dependencies = [ "serde", ] +[[package]] +name = "indoc" +version = "2.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b248f5224d1d606005e02c97f5aa4e88eeb230488bcc03bc9ca4d7991399f2b5" + [[package]] name = "influxdb-line-protocol" version = "1.0.0" @@ -3177,6 +3183,7 @@ dependencies = [ "clap", "csv", "dotenvy", + "futures", "humantime", "influxdb3_client", "influxdb3_process", @@ -3206,6 +3213,16 @@ dependencies = [ "uuid", ] +[[package]] +name = "influxdb3_py_api" +version = "0.1.0" +dependencies = [ + "influxdb3_catalog", + "influxdb3_wal", + "pyo3", + "schema", +] + [[package]] name = "influxdb3_server" version = "0.1.0" @@ -3388,6 +3405,7 @@ dependencies = [ "influxdb3_cache", "influxdb3_catalog", "influxdb3_id", + "influxdb3_py_api", "influxdb3_telemetry", "influxdb3_test_helpers", "influxdb3_wal", @@ -4702,6 +4720,12 @@ dependencies = [ "plotters-backend", ] +[[package]] +name = "portable-atomic" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "280dc24453071f1b63954171985a0b0d30058d287960968b9b2aca264c8d4ee6" + [[package]] name = "powerfmt" version = "0.2.0" @@ -4945,6 +4969,69 @@ dependencies = [ "prost 0.12.6", ] +[[package]] +name = "pyo3" +version = "0.23.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e484fd2c8b4cb67ab05a318f1fd6fa8f199fcc30819f08f07d200809dba26c15" +dependencies = [ + "cfg-if", + "indoc", + "libc", + "memoffset", + "once_cell", + "portable-atomic", + "pyo3-build-config", + "pyo3-ffi", + "pyo3-macros", + "unindent", +] + +[[package]] +name = "pyo3-build-config" +version = "0.23.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc0e0469a84f208e20044b98965e1561028180219e35352a2afaf2b942beff3b" +dependencies = [ + "once_cell", + "target-lexicon", +] + +[[package]] +name = "pyo3-ffi" +version = "0.23.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb1547a7f9966f6f1a0f0227564a9945fe36b90da5a93b3933fc3dc03fae372d" +dependencies = [ + "libc", + "pyo3-build-config", +] + +[[package]] +name = "pyo3-macros" +version = "0.23.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdb6da8ec6fa5cedd1626c886fc8749bdcbb09424a86461eb8cdf096b7c33257" +dependencies = [ + "proc-macro2", + "pyo3-macros-backend", + "quote", + "syn 2.0.87", +] + +[[package]] +name = "pyo3-macros-backend" +version = "0.23.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38a385202ff5a92791168b1136afae5059d3ac118457bb7bc304c197c2d33e7d" +dependencies = [ + "heck 0.5.0", + "proc-macro2", + "pyo3-build-config", + "quote", + "syn 2.0.87", +] + [[package]] name = "query_functions" version = "0.1.0" @@ -6215,6 +6302,12 @@ dependencies = [ "libc", ] +[[package]] +name = "target-lexicon" +version = "0.12.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61c41af27dd6d1e27b1b16b489db798443478cef1f06a660c96db617ba5de3b1" + [[package]] name = "tempfile" version = "3.14.0" @@ -7008,6 +7101,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e" +[[package]] +name = "unindent" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7de7d73e1754487cb58364ee906a499937a0dfabd86bcb980fa99ec8c8fa2ce" + [[package]] name = "untrusted" version = "0.9.0" diff --git a/Cargo.toml b/Cargo.toml index 968a5cf33b1..7ae4d7812b2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ members = [ "influxdb3_id", "influxdb3_load_generator", "influxdb3_process", + "influxdb3_py_api", "influxdb3_server", "influxdb3_telemetry", "influxdb3_test_helpers", diff --git a/influxdb3/Cargo.toml b/influxdb3/Cargo.toml index 8f760100656..068ce902442 100644 --- a/influxdb3/Cargo.toml +++ b/influxdb3/Cargo.toml @@ -73,6 +73,7 @@ tokio_console = ["console-subscriber", "tokio/tracing", "observability_deps/rele # Use jemalloc as the default allocator. jemalloc_replacing_malloc = ["influxdb3_process/jemalloc_replacing_malloc"] +system-py = ["influxdb3_write/system-py"] [dev-dependencies] # Core Crates diff --git a/influxdb3/tests/server/flight.rs b/influxdb3/tests/server/flight.rs index 7f63b870007..6d9a8e45d11 100644 --- a/influxdb3/tests/server/flight.rs +++ b/influxdb3/tests/server/flight.rs @@ -108,20 +108,22 @@ async fn flight() -> Result<(), influxdb3_client::Error> { assert_batches_sorted_eq!( [ - "+--------------+--------------------+---------------+------------+", - "| catalog_name | db_schema_name | table_name | table_type |", - "+--------------+--------------------+---------------+------------+", - "| public | information_schema | columns | VIEW |", - "| public | information_schema | df_settings | VIEW |", - "| public | information_schema | schemata | VIEW |", - "| public | information_schema | tables | VIEW |", - "| public | information_schema | views | VIEW |", - "| public | iox | cpu | BASE TABLE |", - "| public | system | last_caches | BASE TABLE |", - "| public | system | meta_caches | BASE TABLE |", - "| public | system | parquet_files | BASE TABLE |", - "| public | system | queries | BASE TABLE |", - "+--------------+--------------------+---------------+------------+", + "+--------------+--------------------+----------------------------+------------+", + "| catalog_name | db_schema_name | table_name | table_type |", + "+--------------+--------------------+----------------------------+------------+", + "| public | information_schema | columns | VIEW |", + "| public | information_schema | df_settings | VIEW |", + "| public | information_schema | schemata | VIEW |", + "| public | information_schema | tables | VIEW |", + "| public | information_schema | views | VIEW |", + "| public | iox | cpu | BASE TABLE |", + "| public | system | last_caches | BASE TABLE |", + "| public | system | meta_caches | BASE TABLE |", + "| public | system | parquet_files | BASE TABLE |", + "| public | system | processing_engine_plugins | BASE TABLE |", + "| public | system | processing_engine_triggers | BASE TABLE |", + "| public | system | queries | BASE TABLE |", + "+--------------+--------------------+----------------------------+------------+", ], &batches ); diff --git a/influxdb3_cache/src/last_cache/mod.rs b/influxdb3_cache/src/last_cache/mod.rs index ddeb3ceac0a..72bd291626b 100644 --- a/influxdb3_cache/src/last_cache/mod.rs +++ b/influxdb3_cache/src/last_cache/mod.rs @@ -1267,6 +1267,8 @@ mod tests { map.insert(TableId::from(1), "test_table_2".into()); map }, + processing_engine_plugins: Default::default(), + processing_engine_triggers: Default::default(), deleted: false, }; let table_id = TableId::from(0); diff --git a/influxdb3_catalog/src/catalog.rs b/influxdb3_catalog/src/catalog.rs index d4b8f9b89d4..11505e838a8 100644 --- a/influxdb3_catalog/src/catalog.rs +++ b/influxdb3_catalog/src/catalog.rs @@ -1,6 +1,8 @@ //! Implementation of the Catalog that sits entirely in memory. -use crate::catalog::Error::TableNotFound; +use crate::catalog::Error::{ + ProcessingEngineCallExists, ProcessingEngineTriggerExists, TableNotFound, +}; use bimap::BiHashMap; use hashbrown::HashMap; use indexmap::IndexMap; @@ -8,6 +10,7 @@ use influxdb3_id::{ColumnId, DbId, SerdeVecMap, TableId}; use influxdb3_wal::{ CatalogBatch, CatalogOp, DeleteDatabaseDefinition, DeleteTableDefinition, FieldAdditions, FieldDefinition, LastCacheDefinition, LastCacheDelete, MetaCacheDefinition, MetaCacheDelete, + PluginDefinition, TriggerDefinition, }; use influxdb_line_protocol::FieldValue; use iox_time::Time; @@ -74,6 +77,37 @@ pub enum Error { table_name: String, existing: String, }, + + #[error( + "Cannot overwrite Processing Engine Call {} in Database {}", + call_name, + database_name + )] + ProcessingEngineCallExists { + database_name: String, + call_name: String, + }, + #[error( + "Cannot overwrite Processing Engine Trigger {} in Database {}", + trigger_name, + database_name + )] + ProcessingEngineTriggerExists { + database_name: String, + trigger_name: String, + }, + + #[error( + "Processing Engine Plugin {} not in DB schema for {}", + plugin_name, + database_name + )] + ProcessingEnginePluginNotFound { + plugin_name: String, + database_name: String, + }, + #[error("Processing Engine Unimplemented: {}", feature_description)] + ProcessingEngineUnimplemented { feature_description: String }, } pub type Result = std::result::Result; @@ -481,6 +515,9 @@ pub struct DatabaseSchema { /// The database is a map of tables pub tables: SerdeVecMap>, pub table_map: BiHashMap>, + pub processing_engine_plugins: HashMap, + // TODO: care about performance of triggers + pub processing_engine_triggers: HashMap, pub deleted: bool, } @@ -491,6 +528,8 @@ impl DatabaseSchema { name, tables: Default::default(), table_map: BiHashMap::new(), + processing_engine_plugins: HashMap::new(), + processing_engine_triggers: HashMap::new(), deleted: false, } } @@ -642,6 +681,8 @@ impl UpdateDatabaseSchema for CatalogOp { } CatalogOp::DeleteDatabase(delete_database) => delete_database.update_schema(schema), CatalogOp::DeleteTable(delete_table) => delete_table.update_schema(schema), + CatalogOp::CreatePlugin(create_plugin) => create_plugin.update_schema(schema), + CatalogOp::CreateTrigger(create_trigger) => create_trigger.update_schema(schema), } } } @@ -708,6 +749,53 @@ impl UpdateDatabaseSchema for DeleteTableDefinition { } } +impl UpdateDatabaseSchema for PluginDefinition { + fn update_schema<'a>( + &self, + mut schema: Cow<'a, DatabaseSchema>, + ) -> Result> { + match schema.processing_engine_plugins.get(&self.plugin_name) { + Some(current) if self.eq(current) => {} + Some(_) => { + return Err(ProcessingEngineCallExists { + database_name: schema.name.to_string(), + call_name: self.plugin_name.to_string(), + }) + } + None => { + schema + .to_mut() + .processing_engine_plugins + .insert(self.plugin_name.to_string(), self.clone()); + } + } + + Ok(schema) + } +} + +impl UpdateDatabaseSchema for TriggerDefinition { + fn update_schema<'a>( + &self, + mut schema: Cow<'a, DatabaseSchema>, + ) -> Result> { + if let Some(current) = schema.processing_engine_triggers.get(&self.trigger_name) { + if current == self { + return Ok(schema); + } + return Err(ProcessingEngineTriggerExists { + database_name: schema.name.to_string(), + trigger_name: self.trigger_name.to_string(), + }); + } + schema + .to_mut() + .processing_engine_triggers + .insert(self.trigger_name.to_string(), self.clone()); + Ok(schema) + } +} + fn make_new_name_using_deleted_time(name: &str, deletion_time: Time) -> Arc { Arc::from(format!( "{}-{}", @@ -1187,6 +1275,8 @@ mod tests { map.insert(TableId::from(2), "test_table_2".into()); map }, + processing_engine_plugins: Default::default(), + processing_engine_triggers: Default::default(), deleted: false, }; use InfluxColumnType::*; @@ -1396,6 +1486,8 @@ mod tests { name: "test".into(), tables: SerdeVecMap::new(), table_map: BiHashMap::new(), + processing_engine_plugins: Default::default(), + processing_engine_triggers: Default::default(), deleted: false, }; database.tables.insert( @@ -1453,6 +1545,8 @@ mod tests { map.insert(TableId::from(1), "test_table_1".into()); map }, + processing_engine_plugins: Default::default(), + processing_engine_triggers: Default::default(), deleted: false, }; use InfluxColumnType::*; @@ -1508,6 +1602,8 @@ mod tests { map.insert(TableId::from(0), "test".into()); map }, + processing_engine_plugins: Default::default(), + processing_engine_triggers: Default::default(), deleted: false, }; use InfluxColumnType::*; @@ -1607,6 +1703,8 @@ mod tests { name: "test".into(), tables: SerdeVecMap::new(), table_map: BiHashMap::new(), + processing_engine_plugins: Default::default(), + processing_engine_triggers: Default::default(), deleted: false, }; let deleted_table_id = TableId::new(); diff --git a/influxdb3_catalog/src/serialize.rs b/influxdb3_catalog/src/serialize.rs index da784937baa..4c0b263b346 100644 --- a/influxdb3_catalog/src/serialize.rs +++ b/influxdb3_catalog/src/serialize.rs @@ -3,11 +3,14 @@ use crate::catalog::DatabaseSchema; use crate::catalog::TableDefinition; use arrow::datatypes::DataType as ArrowDataType; use bimap::BiHashMap; +use hashbrown::HashMap; use influxdb3_id::ColumnId; use influxdb3_id::DbId; use influxdb3_id::SerdeVecMap; use influxdb3_id::TableId; -use influxdb3_wal::{LastCacheDefinition, LastCacheValueColumnsDef}; +use influxdb3_wal::{ + LastCacheDefinition, LastCacheValueColumnsDef, PluginDefinition, TriggerDefinition, +}; use schema::InfluxColumnType; use schema::InfluxFieldType; use schema::TIME_DATA_TIMEZONE; @@ -38,6 +41,10 @@ struct DatabaseSnapshot { id: DbId, name: Arc, tables: SerdeVecMap, + #[serde(default)] + processing_engine_plugins: SerdeVecMap, + #[serde(default)] + processing_engine_triggers: SerdeVecMap, deleted: bool, } @@ -51,6 +58,16 @@ impl From<&DatabaseSchema> for DatabaseSnapshot { .iter() .map(|(table_id, table_def)| (*table_id, table_def.as_ref().into())) .collect(), + processing_engine_plugins: db + .processing_engine_plugins + .iter() + .map(|(name, call)| (name.clone(), call.into())) + .collect(), + processing_engine_triggers: db + .processing_engine_triggers + .iter() + .map(|(name, trigger)| (name.clone(), trigger.into())) + .collect(), deleted: db.deleted, } } @@ -67,11 +84,39 @@ impl From for DatabaseSchema { (id, Arc::new(table.into())) }) .collect(); + let processing_engine_plugins: HashMap<_, _> = snap + .processing_engine_plugins + .into_iter() + .map(|(name, call)| (name, call.into())) + .collect(); + let processing_engine_triggers = snap + .processing_engine_triggers + .into_iter() + .map(|(name, trigger)| { + // TODO: Decide whether to handle errors + let plugin: PluginDefinition = processing_engine_plugins + .get(&trigger.plugin_name) + .cloned() + .expect("should have plugin"); + ( + name, + TriggerDefinition { + trigger_name: trigger.trigger_name, + plugin_name: plugin.plugin_name.to_string(), + plugin, + trigger: serde_json::from_str(&trigger.trigger_specification).unwrap(), + }, + ) + }) + .collect(); + Self { id: snap.id, name: snap.name, tables, table_map, + processing_engine_plugins, + processing_engine_triggers, deleted: snap.deleted, } } @@ -113,6 +158,21 @@ struct TableSnapshot { deleted: bool, } +#[derive(Debug, Serialize, Deserialize)] +struct ProcessingEnginePluginSnapshot { + pub plugin_name: String, + pub code: String, + pub function_name: String, + pub plugin_type: String, +} + +#[derive(Debug, Serialize, Deserialize)] +struct ProcessingEngineTriggerSnapshot { + pub trigger_name: String, + pub plugin_name: String, + pub trigger_specification: String, +} + /// Representation of Arrow's `DataType` for table snapshots. /// /// Uses `#[non_exhaustive]` with the assumption that variants will be added as we support @@ -345,6 +405,39 @@ impl From for TableDefinition { } } +impl From<&PluginDefinition> for ProcessingEnginePluginSnapshot { + fn from(plugin: &PluginDefinition) -> Self { + Self { + plugin_name: plugin.plugin_name.to_string(), + code: plugin.code.to_string(), + function_name: plugin.function_name.to_string(), + plugin_type: serde_json::to_string(&plugin.plugin_type).unwrap(), + } + } +} + +impl From for PluginDefinition { + fn from(plugin: ProcessingEnginePluginSnapshot) -> Self { + Self { + plugin_name: plugin.plugin_type.to_string(), + code: plugin.code.to_string(), + function_name: plugin.function_name.to_string(), + plugin_type: serde_json::from_str(&plugin.plugin_type).expect("serialized plugin type"), + } + } +} + +impl From<&TriggerDefinition> for ProcessingEngineTriggerSnapshot { + fn from(trigger: &TriggerDefinition) -> Self { + ProcessingEngineTriggerSnapshot { + trigger_name: trigger.trigger_name.to_string(), + plugin_name: trigger.plugin_name.to_string(), + trigger_specification: serde_json::to_string(&trigger.trigger) + .expect("should be able to serialize trigger specification"), + } + } +} + // NOTE: Ideally, we will remove the need for the InfluxFieldType, and be able // to use Arrow's DataType directly. If that happens, this conversion will need // to support the entirety of Arrow's DataType enum, which is why [`DataType`] diff --git a/influxdb3_catalog/src/snapshots/influxdb3_catalog__catalog__tests__catalog_serialization.snap b/influxdb3_catalog/src/snapshots/influxdb3_catalog__catalog__tests__catalog_serialization.snap index 343040df544..8007db91c78 100644 --- a/influxdb3_catalog/src/snapshots/influxdb3_catalog__catalog__tests__catalog_serialization.snap +++ b/influxdb3_catalog/src/snapshots/influxdb3_catalog__catalog__tests__catalog_serialization.snap @@ -263,6 +263,8 @@ snapshot_kind: text } ] ], + "processing_engine_plugins": [], + "processing_engine_triggers": [], "deleted": false } ] diff --git a/influxdb3_catalog/src/snapshots/influxdb3_catalog__catalog__tests__serialize_last_cache.snap b/influxdb3_catalog/src/snapshots/influxdb3_catalog__catalog__tests__serialize_last_cache.snap index 40501c84775..f461d190ffe 100644 --- a/influxdb3_catalog/src/snapshots/influxdb3_catalog__catalog__tests__serialize_last_cache.snap +++ b/influxdb3_catalog/src/snapshots/influxdb3_catalog__catalog__tests__serialize_last_cache.snap @@ -114,6 +114,8 @@ snapshot_kind: text } ] ], + "processing_engine_plugins": [], + "processing_engine_triggers": [], "deleted": false } ] diff --git a/influxdb3_catalog/src/snapshots/influxdb3_catalog__catalog__tests__serialize_series_keys.snap b/influxdb3_catalog/src/snapshots/influxdb3_catalog__catalog__tests__serialize_series_keys.snap index 729d475eb81..5600f91c220 100644 --- a/influxdb3_catalog/src/snapshots/influxdb3_catalog__catalog__tests__serialize_series_keys.snap +++ b/influxdb3_catalog/src/snapshots/influxdb3_catalog__catalog__tests__serialize_series_keys.snap @@ -2,6 +2,7 @@ source: influxdb3_catalog/src/catalog.rs description: catalog serialization to help catch breaking changes expression: catalog +snapshot_kind: text --- { "databases": [ @@ -97,6 +98,8 @@ expression: catalog } ] ], + "processing_engine_plugins": [], + "processing_engine_triggers": [], "deleted": false } ] diff --git a/influxdb3_load_generator/Cargo.toml b/influxdb3_load_generator/Cargo.toml index 3f71d86b5bd..7dba8ff24d1 100644 --- a/influxdb3_load_generator/Cargo.toml +++ b/influxdb3_load_generator/Cargo.toml @@ -31,6 +31,7 @@ sysinfo.workspace = true tokio.workspace = true thiserror.workspace = true url.workspace = true +futures = "0.3" [lints] workspace = true diff --git a/influxdb3_load_generator/src/commands/write.rs b/influxdb3_load_generator/src/commands/write.rs index 76a4c8d18a4..b3f62990395 100644 --- a/influxdb3_load_generator/src/commands/write.rs +++ b/influxdb3_load_generator/src/commands/write.rs @@ -4,6 +4,8 @@ use crate::specification::DataSpec; use anyhow::Context; use chrono::{DateTime, Local}; use clap::Parser; +use futures::stream::FuturesUnordered; +use futures::StreamExt; use influxdb3_client::{Client, Precision}; use std::ops::Add; use std::path::PathBuf; @@ -185,7 +187,7 @@ pub(crate) async fn run_write_load( }; // spawn tokio tasks for each writer - let mut tasks = Vec::new(); + let mut tasks = FuturesUnordered::new(); for generator in generators { let reporter = Arc::clone(&reporter); let sampling_interval = sampling_interval.into(); @@ -202,8 +204,8 @@ pub(crate) async fn run_write_load( } // wait for all tasks to complete - for task in tasks { - task.await?; + while let Some(result) = tasks.next().await { + result?; } println!("all writers finished"); diff --git a/influxdb3_py_api/Cargo.toml b/influxdb3_py_api/Cargo.toml new file mode 100644 index 00000000000..2af87b5de78 --- /dev/null +++ b/influxdb3_py_api/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "influxdb3_py_api" +version.workspace = true +authors.workspace = true +edition.workspace = true +license.workspace = true + + +[features] +system-py = ["pyo3"] +[dependencies] +influxdb3_wal = { path = "../influxdb3_wal" } +influxdb3_catalog = {path = "../influxdb3_catalog"} +schema = { workspace = true } + +[dependencies.pyo3] +version = "0.23.3" +# this is necessary to automatically initialize the Python interpreter +features = ["auto-initialize"] +optional = true + + +[lints] +workspace = true diff --git a/influxdb3_py_api/src/lib.rs b/influxdb3_py_api/src/lib.rs new file mode 100644 index 00000000000..efb3333444b --- /dev/null +++ b/influxdb3_py_api/src/lib.rs @@ -0,0 +1,2 @@ +#[cfg(feature = "system-py")] +pub mod system_py; diff --git a/influxdb3_py_api/src/system_py.rs b/influxdb3_py_api/src/system_py.rs new file mode 100644 index 00000000000..e58a95a078d --- /dev/null +++ b/influxdb3_py_api/src/system_py.rs @@ -0,0 +1,163 @@ +use influxdb3_catalog::catalog::{DatabaseSchema, TableDefinition}; +use influxdb3_wal::{FieldData, Row, WriteBatch}; +use pyo3::exceptions::PyValueError; +use pyo3::prelude::{PyAnyMethods, PyModule, PyModuleMethods}; +use pyo3::{pyclass, pymethods, pymodule, Bound, IntoPyObject, PyErr, PyObject, PyResult, Python}; +use schema::InfluxColumnType; +use std::ffi::CString; +use std::sync::Arc; + +#[pyclass] +#[derive(Debug)] +pub struct PyWriteBatchIterator { + table_definition: Arc, + rows: Vec, + current_index: usize, +} + +#[pymethods] +impl PyWriteBatchIterator { + fn next_point(&mut self) -> PyResult> { + if self.current_index >= self.rows.len() { + return Ok(None); + } + + Python::with_gil(|py| { + let row = &self.rows[self.current_index]; + self.current_index += 1; + + // Import Point class + let point_class = py + .import("influxdb_client_3.write_client.client.write.point")? + .getattr("Point") + .unwrap(); + + // Create new Point instance with measurement name (table name) + let point = point_class.call1((self.table_definition.table_name.as_ref(),))?; + + // Set timestamp + point.call_method1("time", (row.time,))?; + + // Add fields based on column definitions and field data + for field in &row.fields { + if let Some(col_def) = self.table_definition.columns.get(&field.id) { + let field_name = col_def.name.as_ref(); + + match col_def.data_type { + InfluxColumnType::Tag => { + let FieldData::Tag(tag) = &field.value else { + // error out because we expect a tag + return Err(PyValueError::new_err(format!( + "expect FieldData:Tag for tagged columns, not ${:?}", + field + ))); + }; + point.call_method1("tag", (field_name, tag.as_str()))?; + } + InfluxColumnType::Timestamp => {} + InfluxColumnType::Field(_) => { + match &field.value { + FieldData::String(s) => { + point.call_method1("field", (field_name, s.as_str()))? + } + FieldData::Integer(i) => { + point.call_method1("field", (field_name, *i))? + } + FieldData::UInteger(u) => { + point.call_method1("field", (field_name, *u))? + } + FieldData::Float(f) => { + point.call_method1("field", (field_name, *f))? + } + FieldData::Boolean(b) => { + point.call_method1("field", (field_name, *b))? + } + FieldData::Tag(t) => { + point.call_method1("field", (field_name, t.as_str()))? + } + FieldData::Key(k) => { + point.call_method1("field", (field_name, k.as_str()))? + } + FieldData::Timestamp(ts) => { + point.call_method1("field", (field_name, *ts))? + } + }; + } + } + } + } + + Ok(Some(point.into_pyobject(py)?.unbind())) + }) + } +} + +#[pyclass] +#[derive(Debug)] +pub struct PyWriteBatch { + pub write_batch: WriteBatch, + pub schema: Arc, +} + +#[pymethods] +impl PyWriteBatch { + fn get_iterator_for_table(&self, table_name: &str) -> PyResult { + // Find table ID from name + let table_id = self + .schema + .table_map + .get_by_right(&Arc::from(table_name)) + .ok_or_else(|| { + PyErr::new::(format!("Table '{}' not found", table_name)) + })?; + + // Get table chunks + let chunks = self.write_batch.table_chunks.get(table_id).ok_or_else(|| { + PyErr::new::(format!("No data for table '{}'", table_name)) + })?; + + // Get table definition + let table_def = self.schema.tables.get(table_id).ok_or_else(|| { + PyErr::new::(format!( + "Table definition not found for '{}'", + table_name + )) + })?; + + Ok(PyWriteBatchIterator { + table_definition: Arc::clone(table_def), + // TODO: avoid copying all the data at once. + rows: chunks + .chunk_time_to_chunk + .values() + .flat_map(|chunk| chunk.rows.clone()) + .collect(), + current_index: 0, + }) + } +} + +impl PyWriteBatch { + pub fn call_against_table( + &self, + table_name: &str, + setup_code: &str, + call_site: &str, + ) -> PyResult<()> { + let iterator = self.get_iterator_for_table(table_name)?; + Python::with_gil(|py| { + py.run(&CString::new(setup_code)?, None, None)?; + let py_func = py.eval(&CString::new(call_site)?, None, None)?; + py_func.call1((iterator,))?; + Ok::<(), PyErr>(()) + }) + } +} + +// Module initialization +#[pymodule] +fn influxdb3_py_api(m: &Bound<'_, PyModule>) -> PyResult<()> { + m.add_class::()?; + m.add_class::()?; + Ok(()) +} diff --git a/influxdb3_server/src/http.rs b/influxdb3_server/src/http.rs index 87187b95a3e..01da6f34a72 100644 --- a/influxdb3_server/src/http.rs +++ b/influxdb3_server/src/http.rs @@ -24,6 +24,7 @@ use influxdb3_cache::last_cache; use influxdb3_cache::meta_cache::{self, CreateMetaCacheArgs, MaxAge, MaxCardinality}; use influxdb3_catalog::catalog::Error as CatalogError; use influxdb3_process::{INFLUXDB3_GIT_HASH_SHORT, INFLUXDB3_VERSION}; +use influxdb3_wal::{PluginType, TriggerSpecificationDefinition}; use influxdb3_write::persister::TrackedMemoryArrowWriter; use influxdb3_write::write_buffer::Error as WriteBufferError; use influxdb3_write::BufferedWriteRequest; @@ -202,6 +203,9 @@ pub enum Error { #[error("v1 query API error: {0}")] V1Query(#[from] v1::QueryError), + + #[error(transparent)] + Catalog(#[from] CatalogError), } #[derive(Debug, Error)] @@ -545,8 +549,7 @@ where let body = serde_json::to_string(&PingResponse { version: &INFLUXDB3_VERSION, revision: INFLUXDB3_GIT_HASH_SHORT, - }) - .unwrap(); + })?; Ok(Response::new(Body::from(body))) } @@ -937,8 +940,58 @@ where Ok(Response::builder() .status(StatusCode::OK) - .body(Body::empty()) - .unwrap()) + .body(Body::empty())?) + } + + async fn configure_processing_engine_plugin( + &self, + req: Request, + ) -> Result> { + let ProcessingEnginePluginCreateRequest { + db, + plugin_name, + code, + function_name, + plugin_type, + } = if let Some(query) = req.uri().query() { + serde_urlencoded::from_str(query)? + } else { + self.read_body_json(req).await? + }; + self.write_buffer + .insert_plugin(&db, plugin_name, code, function_name, plugin_type) + .await?; + + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::empty())?) + } + + async fn configure_processing_engine_trigger( + &self, + req: Request, + ) -> Result> { + let ProcessEngineTriggerCreateRequest { + db, + plugin_name, + trigger_name, + trigger_specification, + } = if let Some(query) = req.uri().query() { + serde_urlencoded::from_str(query)? + } else { + self.read_body_json(req).await? + }; + self.write_buffer + .insert_trigger( + db.as_str(), + trigger_name, + plugin_name, + trigger_specification, + ) + .await?; + Ok(Response::builder() + .status(StatusCode::OK) + .body(Body::empty())?) } async fn delete_database(&self, req: Request) -> Result> { @@ -1304,6 +1357,25 @@ struct LastCacheDeleteRequest { name: String, } +/// Request definition for `POST /api/v3/configure/processing_engine_plugin` API +#[derive(Debug, Deserialize)] +struct ProcessingEnginePluginCreateRequest { + db: String, + plugin_name: String, + code: String, + function_name: String, + plugin_type: PluginType, +} + +/// Request definition for `POST /api/v3/configure/processing_engine_trigger` API +#[derive(Debug, Deserialize)] +struct ProcessEngineTriggerCreateRequest { + db: String, + plugin_name: String, + trigger_name: String, + trigger_specification: TriggerSpecificationDefinition, +} + #[derive(Debug, Deserialize)] struct DeleteDatabaseRequest { db: String, @@ -1396,6 +1468,12 @@ pub(crate) async fn route_request( (Method::DELETE, "/api/v3/configure/last_cache") => { http_server.configure_last_cache_delete(req).await } + (Method::POST, "/api/v3/configure/processing_engine_plugin") => { + http_server.configure_processing_engine_plugin(req).await + } + (Method::POST, "/api/v3/configure/processing_engine_trigger") => { + http_server.configure_processing_engine_trigger(req).await + } (Method::DELETE, "/api/v3/configure/database") => http_server.delete_database(req).await, // TODO: make table delete to use path param (DELETE db/foodb/table/bar) (Method::DELETE, "/api/v3/configure/table") => http_server.delete_table(req).await, diff --git a/influxdb3_server/src/system_tables/mod.rs b/influxdb3_server/src/system_tables/mod.rs index 0d6057e75bc..bd8a33724a8 100644 --- a/influxdb3_server/src/system_tables/mod.rs +++ b/influxdb3_server/src/system_tables/mod.rs @@ -15,8 +15,13 @@ use self::{last_caches::LastCachesTable, queries::QueriesTable}; mod last_caches; mod meta_caches; mod parquet_files; +use crate::system_tables::python_call::{ + ProcessingEnginePluginTable, ProcessingEngineTriggerTable, +}; #[cfg(test)] pub(crate) use parquet_files::table_name_predicate_error; + +mod python_call; mod queries; pub const SYSTEM_SCHEMA_NAME: &str = "system"; @@ -26,6 +31,10 @@ const META_CACHES_TABLE_NAME: &str = "meta_caches"; const PARQUET_FILES_TABLE_NAME: &str = "parquet_files"; const QUERIES_TABLE_NAME: &str = "queries"; +const PROCESSING_ENGINE_PLUGINS_TABLE_NAME: &str = "processing_engine_plugins"; + +const PROCESSING_ENGINE_TRIGGERS_TABLE_NAME: &str = "processing_engine_triggers"; + pub(crate) struct SystemSchemaProvider { tables: HashMap<&'static str, Arc>, } @@ -67,6 +76,30 @@ impl SystemSchemaProvider { db_schema.id, buffer, )))); + tables.insert( + PROCESSING_ENGINE_PLUGINS_TABLE_NAME, + Arc::new(SystemTableProvider::new(Arc::new( + ProcessingEnginePluginTable::new( + db_schema + .processing_engine_plugins + .iter() + .map(|(_name, call)| call.clone()) + .collect(), + ), + ))), + ); + tables.insert( + PROCESSING_ENGINE_TRIGGERS_TABLE_NAME, + Arc::new(SystemTableProvider::new(Arc::new( + ProcessingEngineTriggerTable::new( + db_schema + .processing_engine_triggers + .iter() + .map(|(_name, trigger)| trigger.clone()) + .collect(), + ), + ))), + ); tables.insert(PARQUET_FILES_TABLE_NAME, parquet_files); Self { tables } } diff --git a/influxdb3_server/src/system_tables/python_call.rs b/influxdb3_server/src/system_tables/python_call.rs new file mode 100644 index 00000000000..d56bea42e7e --- /dev/null +++ b/influxdb3_server/src/system_tables/python_call.rs @@ -0,0 +1,134 @@ +use arrow_array::{ArrayRef, RecordBatch, StringArray}; +use arrow_schema::{DataType, Field, Schema, SchemaRef}; +use async_trait::async_trait; +use datafusion::common::Result; +use datafusion::error::DataFusionError; +use datafusion::logical_expr::Expr; +use influxdb3_wal::{PluginDefinition, TriggerDefinition}; +use iox_system_tables::IoxSystemTable; +use std::sync::Arc; + +#[derive(Debug)] +pub(super) struct ProcessingEnginePluginTable { + schema: SchemaRef, + plugins: Vec, +} + +fn plugin_schema() -> SchemaRef { + let columns = vec![ + Field::new("plugin_name", DataType::Utf8, false), + Field::new("function_name", DataType::Utf8, false), + Field::new("code", DataType::Utf8, false), + Field::new("plugin_type", DataType::Utf8, false), + ]; + Schema::new(columns).into() +} + +impl ProcessingEnginePluginTable { + pub fn new(python_calls: Vec) -> Self { + Self { + schema: plugin_schema(), + plugins: python_calls, + } + } +} + +#[async_trait] +impl IoxSystemTable for ProcessingEnginePluginTable { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + async fn scan( + &self, + _filters: Option>, + _limit: Option, + ) -> Result { + let schema = self.schema(); + let columns: Vec = vec![ + Arc::new( + self.plugins + .iter() + .map(|call| Some(call.plugin_name.clone())) + .collect::(), + ), + Arc::new( + self.plugins + .iter() + .map(|p| Some(p.function_name.clone())) + .collect::(), + ), + Arc::new( + self.plugins + .iter() + .map(|p| Some(p.code.clone())) + .collect::(), + ), + Arc::new( + self.plugins + .iter() + .map(|p| serde_json::to_string(&p.plugin_type).ok()) + .collect::(), + ), + ]; + Ok(RecordBatch::try_new(schema, columns)?) + } +} + +#[derive(Debug)] +pub(super) struct ProcessingEngineTriggerTable { + schema: SchemaRef, + triggers: Vec, +} + +impl ProcessingEngineTriggerTable { + pub fn new(triggers: Vec) -> Self { + Self { + schema: trigger_schema(), + triggers, + } + } +} + +fn trigger_schema() -> SchemaRef { + let columns = vec![ + Field::new("trigger_name", DataType::Utf8, false), + Field::new("plugin_name", DataType::Utf8, false), + Field::new("trigger_specification", DataType::Utf8, false), + ]; + Schema::new(columns).into() +} + +#[async_trait] +impl IoxSystemTable for ProcessingEngineTriggerTable { + fn schema(&self) -> SchemaRef { + Arc::clone(&self.schema) + } + + async fn scan( + &self, + _filters: Option>, + _limit: Option, + ) -> Result { + let trigger_column = self + .triggers + .iter() + .map(|trigger| Some(trigger.trigger_name.clone())) + .collect::(); + let plugin_column = self + .triggers + .iter() + .map(|trigger| Some(trigger.plugin.plugin_name.clone())) + .collect::(); + let specification_column = self + .triggers + .iter() + .map(|trigger| serde_json::to_string(&trigger.trigger).ok()) + .collect::(); + let columns: Vec = vec![ + Arc::new(trigger_column), + Arc::new(plugin_column), + Arc::new(specification_column), + ]; + Ok(RecordBatch::try_new(Arc::clone(&self.schema), columns)?) + } +} diff --git a/influxdb3_wal/src/lib.rs b/influxdb3_wal/src/lib.rs index e2de1d8b801..73c416fc4cf 100644 --- a/influxdb3_wal/src/lib.rs +++ b/influxdb3_wal/src/lib.rs @@ -248,6 +248,8 @@ pub enum CatalogOp { DeleteLastCache(LastCacheDelete), DeleteDatabase(DeleteDatabaseDefinition), DeleteTable(DeleteTableDefinition), + CreatePlugin(PluginDefinition), + CreateTrigger(TriggerDefinition), } #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] @@ -525,6 +527,36 @@ pub struct MetaCacheDelete { pub cache_name: Arc, } +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)] +pub struct PluginDefinition { + pub plugin_name: String, + pub code: String, + pub function_name: String, + pub plugin_type: PluginType, +} + +#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize, Copy)] +#[serde(rename_all = "snake_case")] +pub enum PluginType { + WalRows, +} + +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)] +pub struct TriggerDefinition { + pub trigger_name: String, + pub plugin_name: String, + pub trigger: TriggerSpecificationDefinition, + // TODO: decide whether this should be populated from a reference rather than stored on its own. + pub plugin: PluginDefinition, +} + +#[derive(Debug, Serialize, Deserialize, Eq, PartialEq, Clone)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum TriggerSpecificationDefinition { + SingleTableWalWrite { table_name: String }, + AllTablesWalWrite, +} + #[serde_as] #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] pub struct WriteBatch { @@ -816,7 +848,7 @@ pub fn background_wal_flush( let snapshot_wal = Arc::clone(&wal); tokio::spawn(async move { let snapshot_details = snapshot_complete.await.expect("snapshot failed"); - assert!(snapshot_info.snapshot_details == snapshot_details); + assert_eq!(snapshot_info.snapshot_details, snapshot_details); snapshot_wal .cleanup_snapshot(snapshot_info, snapshot_permit) diff --git a/influxdb3_wal/src/object_store.rs b/influxdb3_wal/src/object_store.rs index 65d080e83aa..631a690355a 100644 --- a/influxdb3_wal/src/object_store.rs +++ b/influxdb3_wal/src/object_store.rs @@ -441,7 +441,7 @@ impl FlushBuffer { Vec>, Option<(SnapshotInfo, OwnedSemaphorePermit)>, ) { - // convert into wal contents and resopnses and capture if a snapshot should be taken + // convert into wal contents and responses and capture if a snapshot should be taken let (mut wal_contents, responses) = self.flush_buffer_with_responses(); self.snapshot_tracker.add_wal_period(WalPeriod { wal_file_number: wal_contents.wal_file_number, diff --git a/influxdb3_write/Cargo.toml b/influxdb3_write/Cargo.toml index e92933ee09c..fb330af045e 100644 --- a/influxdb3_write/Cargo.toml +++ b/influxdb3_write/Cargo.toml @@ -5,6 +5,9 @@ authors.workspace = true edition.workspace = true license.workspace = true +[features] +"system-py" = ["influxdb3_py_api/system-py"] + [dependencies] # Core Crates data_types.workspace = true @@ -25,6 +28,7 @@ influxdb3_id = { path = "../influxdb3_id" } influxdb3_test_helpers = { path = "../influxdb3_test_helpers" } influxdb3_wal = { path = "../influxdb3_wal" } influxdb3_telemetry = { path = "../influxdb3_telemetry" } +influxdb3_py_api = {path = "../influxdb3_py_api"} # crates.io dependencies anyhow.workspace = true diff --git a/influxdb3_write/src/lib.rs b/influxdb3_write/src/lib.rs index a8b15b1a554..2da430c0e4d 100644 --- a/influxdb3_write/src/lib.rs +++ b/influxdb3_write/src/lib.rs @@ -24,8 +24,8 @@ use influxdb3_id::ParquetFileId; use influxdb3_id::SerdeVecMap; use influxdb3_id::TableId; use influxdb3_id::{ColumnId, DbId}; -use influxdb3_wal::MetaCacheDefinition; use influxdb3_wal::{LastCacheDefinition, SnapshotSequenceNumber, WalFileSequenceNumber}; +use influxdb3_wal::{MetaCacheDefinition, PluginType, TriggerSpecificationDefinition}; use iox_query::QueryChunk; use iox_time::Time; use serde::{Deserialize, Serialize}; @@ -49,7 +49,12 @@ pub enum Error { pub type Result = std::result::Result; pub trait WriteBuffer: - Bufferer + ChunkContainer + MetaCacheManager + LastCacheManager + DatabaseManager + Bufferer + + ChunkContainer + + MetaCacheManager + + LastCacheManager + + DatabaseManager + + ProcessingEngineManager { } @@ -166,6 +171,30 @@ pub trait LastCacheManager: Debug + Send + Sync + 'static { ) -> Result<(), write_buffer::Error>; } +/// `[ProcessingEngineManager]` is used to interact with the processing engine, +/// in particular plugins and triggers. +/// +#[async_trait::async_trait] +pub trait ProcessingEngineManager: Debug + Send + Sync + 'static { + /// Inserts a plugin + async fn insert_plugin( + &self, + db: &str, + plugin_name: String, + code: String, + function_name: String, + plugin_type: PluginType, + ) -> Result<(), write_buffer::Error>; + + async fn insert_trigger( + &self, + db_name: &str, + trigger_name: String, + plugin_name: String, + trigger_specification: TriggerSpecificationDefinition, + ) -> Result<(), write_buffer::Error>; +} + /// A single write request can have many lines in it. A writer can request to accept all lines that are valid, while /// returning an error for any invalid lines. This is the error information for a single invalid line. #[derive(Debug, Serialize)] diff --git a/influxdb3_write/src/write_buffer/mod.rs b/influxdb3_write/src/write_buffer/mod.rs index 081f8a0614c..9df9e548109 100644 --- a/influxdb3_write/src/write_buffer/mod.rs +++ b/influxdb3_write/src/write_buffer/mod.rs @@ -9,7 +9,7 @@ use crate::persister::Persister; use crate::write_buffer::persisted_files::PersistedFiles; use crate::write_buffer::queryable_buffer::QueryableBuffer; use crate::write_buffer::validator::WriteValidator; -use crate::{chunk::ParquetChunk, DatabaseManager}; +use crate::{chunk::ParquetChunk, DatabaseManager, ProcessingEngineManager}; use crate::{ BufferedWriteRequest, Bufferer, ChunkContainer, LastCacheManager, MetaCacheManager, ParquetFile, PersistedSnapshot, Precision, WriteBuffer, WriteLineError, @@ -26,9 +26,13 @@ use datafusion::logical_expr::Expr; use influxdb3_cache::last_cache::{self, LastCacheProvider}; use influxdb3_cache::meta_cache::{self, CreateMetaCacheArgs, MetaCacheProvider}; use influxdb3_cache::parquet_cache::ParquetCacheOracle; +use influxdb3_catalog::catalog; use influxdb3_catalog::catalog::{Catalog, DatabaseSchema}; use influxdb3_id::{ColumnId, DbId, TableId}; -use influxdb3_wal::{object_store::WalObjectStore, DeleteDatabaseDefinition}; +use influxdb3_wal::{ + object_store::WalObjectStore, DeleteDatabaseDefinition, PluginDefinition, PluginType, + TriggerDefinition, TriggerSpecificationDefinition, +}; use influxdb3_wal::{ CatalogBatch, CatalogOp, LastCacheDefinition, LastCacheDelete, LastCacheSize, MetaCacheDefinition, MetaCacheDelete, Wal, WalConfig, WalFileNotifier, WalOp, @@ -693,6 +697,82 @@ impl DatabaseManager for WriteBufferImpl { } } +#[async_trait::async_trait] +impl ProcessingEngineManager for WriteBufferImpl { + async fn insert_plugin( + &self, + db: &str, + plugin_name: String, + code: String, + function_name: String, + plugin_type: PluginType, + ) -> crate::Result<(), Error> { + let (db_id, db_schema) = + self.catalog + .db_id_and_schema(db) + .ok_or_else(|| self::Error::DatabaseNotFound { + db_name: db.to_owned(), + })?; + + let catalog_op = CatalogOp::CreatePlugin(PluginDefinition { + plugin_name, + code, + function_name, + plugin_type, + }); + + let creation_time = self.time_provider.now(); + let catalog_batch = CatalogBatch { + time_ns: creation_time.timestamp_nanos(), + database_id: db_id, + database_name: Arc::clone(&db_schema.name), + ops: vec![catalog_op], + }; + self.catalog.apply_catalog_batch(&catalog_batch)?; + let wal_op = WalOp::Catalog(catalog_batch); + self.wal.write_ops(vec![wal_op]).await?; + Ok(()) + } + + async fn insert_trigger( + &self, + db_name: &str, + trigger_name: String, + plugin_name: String, + trigger_specification: TriggerSpecificationDefinition, + ) -> crate::Result<(), Error> { + let Some((db_id, db_schema)) = self.catalog.db_id_and_schema(db_name) else { + return Err(Error::DatabaseNotFound { + db_name: db_name.to_owned(), + }); + }; + let plugin = db_schema + .processing_engine_plugins + .get(&plugin_name) + .ok_or_else(|| catalog::Error::ProcessingEnginePluginNotFound { + plugin_name: plugin_name.to_string(), + database_name: db_schema.name.to_string(), + })?; + let catalog_op = CatalogOp::CreateTrigger(TriggerDefinition { + trigger_name, + plugin_name, + plugin: plugin.clone(), + trigger: trigger_specification, + }); + let creation_time = self.time_provider.now(); + let catalog_batch = CatalogBatch { + time_ns: creation_time.timestamp_nanos(), + database_id: db_id, + database_name: Arc::clone(&db_schema.name), + ops: vec![catalog_op], + }; + self.catalog.apply_catalog_batch(&catalog_batch)?; + let wal_op = WalOp::Catalog(catalog_batch); + self.wal.write_ops(vec![wal_op]).await?; + Ok(()) + } +} + impl WriteBuffer for WriteBufferImpl {} #[cfg(test)] diff --git a/influxdb3_write/src/write_buffer/queryable_buffer.rs b/influxdb3_write/src/write_buffer/queryable_buffer.rs index 7adc4fc0db0..81695ab3b5a 100644 --- a/influxdb3_write/src/write_buffer/queryable_buffer.rs +++ b/influxdb3_write/src/write_buffer/queryable_buffer.rs @@ -20,6 +20,8 @@ use influxdb3_cache::meta_cache::MetaCacheProvider; use influxdb3_cache::parquet_cache::{CacheRequest, ParquetCacheOracle}; use influxdb3_catalog::catalog::{Catalog, DatabaseSchema}; use influxdb3_id::{DbId, TableId}; +#[cfg(feature = "system-py")] +use influxdb3_py_api::system_py::PyWriteBatch; use influxdb3_wal::{CatalogOp, SnapshotDetails, WalContents, WalFileNotifier, WalOp, WriteBatch}; use iox_query::chunk_statistics::{create_chunk_statistics, NoColumnRanges}; use iox_query::exec::Executor; @@ -491,6 +493,8 @@ impl BufferState { table_buffer_map.remove(&table_definition.table_id); } } + CatalogOp::CreatePlugin(_) => {} + CatalogOp::CreateTrigger(_) => {} } } } @@ -503,6 +507,66 @@ impl BufferState { .catalog .db_schema_by_id(&write_batch.database_id) .expect("database should exist"); + + // TODO: factor this out + #[cfg(feature = "system-py")] + { + use influxdb3_wal::TriggerSpecificationDefinition; + use influxdb3_wal::TriggerSpecificationDefinition::SingleTableWalWrite; + let write_tables: hashbrown::HashSet<_> = write_batch + .table_chunks + .keys() + .map(|key| { + let table_name = db_schema.table_map.get_by_left(key).unwrap(); + table_name.to_string() + }) + .collect(); + let triggers: Vec<_> = db_schema + .processing_engine_triggers + .values() + .filter_map(|trigger| match &trigger.trigger { + SingleTableWalWrite { table_name } => { + if write_tables.contains(table_name.as_str()) { + Some((trigger, vec![table_name.clone()])) + } else { + None + } + } + TriggerSpecificationDefinition::AllTablesWalWrite => { + if !write_tables.is_empty() { + Some(( + trigger, + write_tables.iter().map(ToString::to_string).collect(), + )) + } else { + None + } + } + }) + .collect(); + if !triggers.is_empty() { + // Create PyWriteBatch instance + let py_write_batch = PyWriteBatch { + write_batch: write_batch.clone(), + schema: db_schema.clone(), + }; + for (trigger, write_tables) in triggers { + for table in &write_tables { + if let Err(err) = py_write_batch.call_against_table( + table, + trigger.plugin.code.as_str(), + trigger.plugin.function_name.as_str(), + ) { + error!( + "failed to call trigger {} with error {}", + trigger.trigger_name, err + ) + } + } + } + } + } + let database_buffer = self.db_to_table.entry(write_batch.database_id).or_default(); for (table_id, table_chunks) in write_batch.table_chunks { diff --git a/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-after-last-cache-create-and-new-field.snap b/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-after-last-cache-create-and-new-field.snap index 2b3aea212ca..9cceaf121c2 100644 --- a/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-after-last-cache-create-and-new-field.snap +++ b/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-after-last-cache-create-and-new-field.snap @@ -11,6 +11,8 @@ snapshot_kind: text "deleted": false, "id": 0, "name": "db", + "processing_engine_plugins": [], + "processing_engine_triggers": [], "tables": [ [ 0, diff --git a/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-immediately-after-last-cache-create.snap b/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-immediately-after-last-cache-create.snap index cdad6a1d852..9704b561c6b 100644 --- a/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-immediately-after-last-cache-create.snap +++ b/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-immediately-after-last-cache-create.snap @@ -11,6 +11,8 @@ snapshot_kind: text "deleted": false, "id": 0, "name": "db", + "processing_engine_plugins": [], + "processing_engine_triggers": [], "tables": [ [ 0, diff --git a/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-immediately-after-last-cache-delete.snap b/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-immediately-after-last-cache-delete.snap index c08ce32e83d..82f1da254a2 100644 --- a/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-immediately-after-last-cache-delete.snap +++ b/influxdb3_write/src/write_buffer/snapshots/influxdb3_write__write_buffer__tests__catalog-immediately-after-last-cache-delete.snap @@ -11,6 +11,8 @@ snapshot_kind: text "deleted": false, "id": 0, "name": "db", + "processing_engine_plugins": [], + "processing_engine_triggers": [], "tables": [ [ 0, diff --git a/influxdb3_write/src/write_buffer/table_buffer.rs b/influxdb3_write/src/write_buffer/table_buffer.rs index 660e066e5e3..e681ddae3be 100644 --- a/influxdb3_write/src/write_buffer/table_buffer.rs +++ b/influxdb3_write/src/write_buffer/table_buffer.rs @@ -13,7 +13,7 @@ use hashbrown::HashMap; use influxdb3_catalog::catalog::TableDefinition; use influxdb3_id::ColumnId; use influxdb3_wal::{FieldData, Row}; -use observability_deps::tracing::{debug, error, info}; +use observability_deps::tracing::{debug, error}; use schema::sort::SortKey; use schema::{InfluxColumnType, InfluxFieldType, Schema, SchemaBuilder}; use std::collections::btree_map::Entry; @@ -166,8 +166,8 @@ impl TableBuffer { let mut size = size_of::(); for c in self.chunk_time_to_chunks.values() { - for biulder in c.data.values() { - size += size_of::() + size_of::() + biulder.size(); + for builder in c.data.values() { + size += size_of::() + size_of::() + builder.size(); } size += c.index.size(); @@ -181,7 +181,6 @@ impl TableBuffer { table_def: Arc, older_than_chunk_time: i64, ) -> Vec { - info!(%older_than_chunk_time, "Snapshotting table buffer"); let keys_to_remove = self .chunk_time_to_chunks .keys() @@ -269,10 +268,7 @@ impl MutableTableChunk { debug!("Creating new timestamp builder"); let mut time_builder = TimestampNanosecondBuilder::new(); // append nulls for all previous rows - for _ in 0..(row_index + self.row_count) { - debug!("Appending null for timestamp"); - time_builder.append_null(); - } + time_builder.append_nulls(row_index + self.row_count); Builder::Time(time_builder) }); if let Builder::Time(b) = b { @@ -333,9 +329,7 @@ impl MutableTableChunk { let b = self.data.entry(f.id).or_insert_with(|| { let mut int_builder = Int64Builder::new(); // append nulls for all previous rows - for _ in 0..(row_index + self.row_count) { - int_builder.append_null(); - } + int_builder.append_nulls(row_index + self.row_count); Builder::I64(int_builder) }); if let Builder::I64(b) = b { @@ -348,9 +342,7 @@ impl MutableTableChunk { let b = self.data.entry(f.id).or_insert_with(|| { let mut uint_builder = UInt64Builder::new(); // append nulls for all previous rows - for _ in 0..(row_index + self.row_count) { - uint_builder.append_null(); - } + uint_builder.append_nulls(row_index + self.row_count); Builder::U64(uint_builder) }); if let Builder::U64(b) = b { @@ -363,9 +355,7 @@ impl MutableTableChunk { let b = self.data.entry(f.id).or_insert_with(|| { let mut float_builder = Float64Builder::new(); // append nulls for all previous rows - for _ in 0..(row_index + self.row_count) { - float_builder.append_null(); - } + float_builder.append_nulls(row_index + self.row_count); Builder::F64(float_builder) }); if let Builder::F64(b) = b { @@ -378,9 +368,7 @@ impl MutableTableChunk { let b = self.data.entry(f.id).or_insert_with(|| { let mut bool_builder = BooleanBuilder::new(); // append nulls for all previous rows - for _ in 0..(row_index + self.row_count) { - bool_builder.append_null(); - } + bool_builder.append_nulls(row_index + self.row_count); Builder::Bool(bool_builder) }); if let Builder::Bool(b) = b { @@ -1028,7 +1016,7 @@ mod tests { table_buffer.buffer_chunk(0, rows); let size = table_buffer.computed_size(); - assert_eq!(size, 18119); + assert_eq!(size, 18120); } #[test]