diff --git a/Cargo.lock b/Cargo.lock index 2fe87a3614c..0954407f05d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "addr2line" @@ -76,6 +76,12 @@ dependencies = [ "libc", ] +[[package]] +name = "anes" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" + [[package]] name = "anstream" version = "0.6.18" @@ -752,6 +758,12 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "cast" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" + [[package]] name = "catalog_cache" version = "0.1.0" @@ -853,6 +865,33 @@ dependencies = [ "phf_codegen", ] +[[package]] +name = "ciborium" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42e69ffd6f0917f5c029256a24d0161db17cea3997d185db0d35926308770f0e" +dependencies = [ + "ciborium-io", + "ciborium-ll", + "serde", +] + +[[package]] +name = "ciborium-io" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05afea1e0a06c9be33d539b876f1ce3692f4afea2cb41f740e7743225ed1c757" + +[[package]] +name = "ciborium-ll" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57663b653d948a338bfb3eeba9bb2fd5fcfaecb9e199e87e1eda4d9e8b240fd9" +dependencies = [ + "ciborium-io", + "half", +] + [[package]] name = "clap" version = "4.5.20" @@ -1091,6 +1130,42 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "criterion" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2b12d017a929603d80db1831cd3a24082f8137ce19c69e6447f54f5fc8d692f" +dependencies = [ + "anes", + "cast", + "ciborium", + "clap", + "criterion-plot", + "is-terminal", + "itertools 0.10.5", + "num-traits", + "once_cell", + "oorandom", + "plotters", + "rayon", + "regex", + "serde", + "serde_derive", + "serde_json", + "tinytemplate", + "walkdir", +] + +[[package]] +name = "criterion-plot" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b50826342786a51a89e2da3a28f1c32b06e387201bc2d19791f622c673706b1" +dependencies = [ + "cast", + "itertools 0.10.5", +] + [[package]] name = "croaring" version = "2.1.1" @@ -2269,6 +2344,12 @@ version = "0.3.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" +[[package]] +name = "hermit-abi" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbf6a919d6cf397374f7dfeeea91d974c7c0a7221d0d0f4f20d859d329e53fcc" + [[package]] name = "hex" version = "0.4.3" @@ -2945,12 +3026,14 @@ dependencies = [ "arrow-array", "async-trait", "chrono", + "criterion", "dashmap", "datafusion", "iox_system_tables", "iox_time", "observability_deps", "proptest", + "rand", "test-log", ] @@ -3313,6 +3396,17 @@ version = "2.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ddc24109865250148c2e0f3d25d4f0f479571723792d3802153c60922a4fb708" +[[package]] +name = "is-terminal" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "261f68e344040fbd0edea105bef17c66edf46f984ddb1115b775ce31be948f4b" +dependencies = [ + "hermit-abi 0.4.0", + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "is_terminal_polyfill" version = "1.70.1" @@ -3655,7 +3749,7 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec" dependencies = [ - "hermit-abi", + "hermit-abi 0.3.9", "libc", "wasi", "windows-sys 0.52.0", @@ -3892,7 +3986,7 @@ version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" dependencies = [ - "hermit-abi", + "hermit-abi 0.3.9", "libc", ] @@ -3969,6 +4063,12 @@ version = "1.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775" +[[package]] +name = "oorandom" +version = "11.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b410bbe7e14ab526a0e86877eb47c6996a2bd7746f027ba551028c925390e4e9" + [[package]] name = "openssl-probe" version = "0.1.5" @@ -4322,6 +4422,34 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "953ec861398dccce10c670dfeaf3ec4911ca479e9c02154b3a215178c5f566f2" +[[package]] +name = "plotters" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5aeb6f403d7a4911efb1e33402027fc44f29b5bf6def3effcc22d7bb75f2b747" +dependencies = [ + "num-traits", + "plotters-backend", + "plotters-svg", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "plotters-backend" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df42e13c12958a16b3f7f4386b9ab1f3e7933914ecea48da7139435263a4172a" + +[[package]] +name = "plotters-svg" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51bae2ac328883f7acdfea3d66a7c35751187f870bc81f94563733a154d7a670" +dependencies = [ + "plotters-backend", +] + [[package]] name = "powerfmt" version = "0.2.0" @@ -6046,6 +6174,16 @@ dependencies = [ "zerovec", ] +[[package]] +name = "tinytemplate" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be4d6b5f19ff7664e8c98d03e2139cb510db9b0a60b55f8e8709b689d939b6bc" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "tinyvec" version = "1.8.0" diff --git a/Cargo.toml b/Cargo.toml index 38ea96b3c07..2f1a2ee56a0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -56,6 +56,7 @@ chrono = "0.4" clap = { version = "4", features = ["derive", "env", "string"] } clru = "0.6.2" crc32fast = "1.2.0" +criterion = { version = "0.5", features = ["html_reports"] } crossbeam-channel = "0.5.11" csv = "1.3.0" datafusion = { git = "https://github.com/influxdata/arrow-datafusion.git", rev = "c27d5f2356a21ee6224c149ee971c89c2cc13e18" } diff --git a/influxdb3_sys_events/Cargo.toml b/influxdb3_sys_events/Cargo.toml index a25e6b7da36..9cb4f240888 100644 --- a/influxdb3_sys_events/Cargo.toml +++ b/influxdb3_sys_events/Cargo.toml @@ -21,5 +21,11 @@ dashmap.workspace = true datafusion.workspace = true [dev-dependencies] +criterion.workspace = true test-log.workspace = true proptest.workspace = true +rand.workspace = true + +[[bench]] +name = "store_benchmark" +harness = false diff --git a/influxdb3_sys_events/benches/store_benchmark.rs b/influxdb3_sys_events/benches/store_benchmark.rs new file mode 100644 index 00000000000..1a4480b9d39 --- /dev/null +++ b/influxdb3_sys_events/benches/store_benchmark.rs @@ -0,0 +1,385 @@ +use std::{sync::Arc, thread}; + +use arrow::{ + array::{StringViewBuilder, StructBuilder, UInt64Builder}, + datatypes::{DataType, Field, Fields, Schema}, + error::ArrowError, +}; +use arrow_array::{ArrayRef, RecordBatch}; +use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion}; +use influxdb3_sys_events::{Event, RingBuffer, SysEventStore, ToRecordBatch}; +use iox_time::{SystemProvider, TimeProvider}; +use observability_deps::tracing::debug; +use rand::Rng; + +const MAX_WRITE_ITERATIONS: u32 = 5000; + +#[allow(dead_code)] +#[derive(Debug)] +struct SampleSysEvent { + pub data_fetched_1: u64, + pub data_fetched_2: u64, + pub data_fetched_3: u64, + pub data_fetched_4: u64, + pub data_fetched_5: u64, + pub data_fetched_6: u64, + pub data_fetched_7: u64, + pub data_fetched_8: u64, + pub data_fetched_9: u64, + pub data_fetched_10: u64, + pub data_fetched_11: u64, + pub data_fetched_12: u64, + pub data_fetched_13: u64, + pub data_fetched_14: u64, + pub data_fetched_15: u64, + pub data_fetched_16: u64, + pub data_fetched_17: u64, + pub data_fetched_18: u64, + pub data_fetched_19: u64, + pub data_fetched_20: u64, + pub string_field: String, +} + +impl ToRecordBatch for SampleSysEvent { + fn to_record_batch( + items: Option<&RingBuffer>>, + ) -> Option> { + items.map(|buf| { + let iter = buf.in_order(); + let mut event_time_arr = StringViewBuilder::with_capacity(1000); + let mut struct_builder = StructBuilder::from_fields( + vec![ + Field::new("time_taken", DataType::UInt64, false), + Field::new("total_fetched", DataType::UInt64, false), + ], + 1000, + ); + for event in iter { + event_time_arr.append_value("2024-12-01T23:59:59.000Z"); + let time_taken_builder = struct_builder.field_builder::(0).unwrap(); + time_taken_builder.append_value(event.data.data_fetched_1); + + let num_files_fetched_builder = + struct_builder.field_builder::(1).unwrap(); + num_files_fetched_builder.append_value(event.data.data_fetched_10); + + struct_builder.append(true); + } + + let columns: Vec = vec![ + Arc::new(event_time_arr.finish()), + Arc::new(struct_builder.finish()), + ]; + RecordBatch::try_new(Arc::new(Self::schema()), columns) + }) + } + + fn schema() -> Schema { + let columns = vec![ + Field::new("event_time", DataType::Utf8View, false), + Field::new( + "event_data", + DataType::Struct(Fields::from(vec![ + Field::new("time_taken", DataType::UInt64, false), + Field::new("total_fetched", DataType::UInt64, false), + ])), + false, + ), + ]; + Schema::new(columns) + } +} + +impl SampleSysEvent { + pub fn new() -> Self { + let rand_start_range = 0..100_000_000; + let start = rand::thread_rng().gen_range(rand_start_range); + SampleSysEvent { + data_fetched_1: start + 1, + data_fetched_2: start + 2, + data_fetched_3: start + 3, + data_fetched_4: start + 4, + data_fetched_5: start + 5, + data_fetched_6: start + 6, + data_fetched_7: start + 7, + data_fetched_8: start + 8, + data_fetched_9: start + 9, + data_fetched_10: start + 10, + data_fetched_11: start + 11, + data_fetched_12: start + 12, + data_fetched_13: start + 13, + data_fetched_14: start + 14, + data_fetched_15: start + 15, + data_fetched_16: start + 16, + data_fetched_17: start + 17, + data_fetched_18: start + 18, + data_fetched_19: start + 19, + data_fetched_20: start + 20, + string_field: format!("str-{start}"), + } + } +} + +#[allow(dead_code)] +#[derive(Debug)] +struct SampleSysEvent2 { + pub data_fetched_1: u64, + pub data_fetched_2: u64, + pub data_fetched_3: u64, + pub data_fetched_4: u64, + pub data_fetched_5: u64, + pub data_fetched_6: u64, + pub data_fetched_7: u64, + pub data_fetched_8: u64, + pub data_fetched_9: u64, + pub data_fetched_10: u64, + pub data_fetched_11: u64, + pub data_fetched_12: u64, + pub data_fetched_13: u64, + pub data_fetched_14: u64, + pub data_fetched_15: u64, + pub data_fetched_16: u64, + pub data_fetched_17: u64, + pub data_fetched_18: u64, + pub data_fetched_19: u64, + pub data_fetched_20: u64, + pub string_field: String, +} + +impl ToRecordBatch for SampleSysEvent2 { + fn to_record_batch( + items: Option<&RingBuffer>>, + ) -> Option> { + items.map(|buf| { + let iter = buf.in_order(); + let mut event_time_arr = StringViewBuilder::with_capacity(1000); + let mut struct_builder = StructBuilder::from_fields( + vec![ + Field::new("time_taken", DataType::UInt64, false), + Field::new("total_fetched", DataType::UInt64, false), + ], + 1000, + ); + for event in iter { + event_time_arr.append_value("2024-12-01T23:59:59.000Z"); + let time_taken_builder = struct_builder.field_builder::(0).unwrap(); + time_taken_builder.append_value(event.data.data_fetched_1); + + let num_files_fetched_builder = + struct_builder.field_builder::(1).unwrap(); + num_files_fetched_builder.append_value(event.data.data_fetched_10); + + struct_builder.append(true); + } + + let columns: Vec = vec![ + Arc::new(event_time_arr.finish()), + Arc::new(struct_builder.finish()), + ]; + RecordBatch::try_new(Arc::new(Self::schema()), columns) + }) + } + + fn schema() -> Schema { + let columns = vec![ + Field::new("event_time", DataType::Utf8View, false), + Field::new( + "event_data", + DataType::Struct(Fields::from(vec![ + Field::new("time_taken", DataType::UInt64, false), + Field::new("total_fetched", DataType::UInt64, false), + ])), + false, + ), + ]; + Schema::new(columns) + } +} + +impl SampleSysEvent2 { + pub fn new() -> Self { + let rand_start_range = 0..100_000_000; + let start = rand::thread_rng().gen_range(rand_start_range); + SampleSysEvent2 { + data_fetched_1: start + 1, + data_fetched_2: start + 2, + data_fetched_3: start + 3, + data_fetched_4: start + 4, + data_fetched_5: start + 5, + data_fetched_6: start + 6, + data_fetched_7: start + 7, + data_fetched_8: start + 8, + data_fetched_9: start + 9, + data_fetched_10: start + 10, + data_fetched_11: start + 11, + data_fetched_12: start + 12, + data_fetched_13: start + 13, + data_fetched_14: start + 14, + data_fetched_15: start + 15, + data_fetched_16: start + 16, + data_fetched_17: start + 17, + data_fetched_18: start + 18, + data_fetched_19: start + 19, + data_fetched_20: start + 20, + string_field: format!("str-{start}"), + } + } +} + +#[allow(dead_code)] +#[derive(Debug)] +struct SampleSysEvent3 { + pub data_fetched_1: u64, + pub data_fetched_2: u64, + pub data_fetched_3: u64, + pub data_fetched_4: u64, + pub data_fetched_5: u64, + pub data_fetched_6: u64, + pub data_fetched_7: u64, + pub data_fetched_8: u64, + pub data_fetched_9: u64, + pub data_fetched_10: u64, + pub string_field: String, +} + +impl ToRecordBatch for SampleSysEvent3 { + fn to_record_batch( + items: Option<&RingBuffer>>, + ) -> Option> { + items.map(|buf| { + let iter = buf.in_order(); + let mut event_time_arr = StringViewBuilder::with_capacity(1000); + let mut struct_builder = StructBuilder::from_fields( + vec![ + Field::new("time_taken", DataType::UInt64, false), + Field::new("total_fetched", DataType::UInt64, false), + ], + 1000, + ); + for event in iter { + event_time_arr.append_value("2024-12-01T23:59:59.000Z"); + let time_taken_builder = struct_builder.field_builder::(0).unwrap(); + time_taken_builder.append_value(event.data.data_fetched_1); + + let num_files_fetched_builder = + struct_builder.field_builder::(1).unwrap(); + num_files_fetched_builder.append_value(event.data.data_fetched_10); + + struct_builder.append(true); + } + + let columns: Vec = vec![ + Arc::new(event_time_arr.finish()), + Arc::new(struct_builder.finish()), + ]; + RecordBatch::try_new(Arc::new(Self::schema()), columns) + }) + } + + fn schema() -> Schema { + let columns = vec![ + Field::new("event_time", DataType::Utf8View, false), + Field::new( + "event_data", + DataType::Struct(Fields::from(vec![ + Field::new("time_taken", DataType::UInt64, false), + Field::new("total_fetched", DataType::UInt64, false), + ])), + false, + ), + ]; + Schema::new(columns) + } +} + +impl SampleSysEvent3 { + pub fn new() -> Self { + let rand_start_range = 0..100_000_000; + let start = rand::thread_rng().gen_range(rand_start_range); + SampleSysEvent3 { + data_fetched_1: start + 1, + data_fetched_2: start + 2, + data_fetched_3: start + 3, + data_fetched_4: start + 4, + data_fetched_5: start + 5, + data_fetched_6: start + 6, + data_fetched_7: start + 7, + data_fetched_8: start + 8, + data_fetched_9: start + 9, + data_fetched_10: start + 10, + string_field: format!("str-{start}"), + } + } +} + +pub fn criterion_benchmark(c: &mut Criterion) { + let mut group = c.benchmark_group("syseventstore_writes_reads_t6"); + group.bench_function(BenchmarkId::new("array", 0), |b| { + b.iter(|| { + array_sys_event_store(); + }) + }); + // group.bench_function(BenchmarkId::new("vec", 0), |b| b.iter(|| { + // vec_sys_event_store(); + // })); + group.finish(); +} + +fn array_sys_event_store() { + let time_provider: Arc = Arc::new(SystemProvider::new()); + let sys_event_store = Arc::new(SysEventStore::new(Arc::clone(&time_provider))); + let store_clone_1 = Arc::clone(&sys_event_store); + let store_clone_2 = Arc::clone(&sys_event_store); + let store_clone_3 = Arc::clone(&sys_event_store); + let store_clone_4 = Arc::clone(&sys_event_store); + let store_clone_5 = Arc::clone(&sys_event_store); + let store_clone_6 = Arc::clone(&sys_event_store); + let t1 = thread::spawn(move || { + for _ in 0..MAX_WRITE_ITERATIONS { + store_clone_1.record(SampleSysEvent::new()); + } + }); + + let t2 = thread::spawn(move || { + for _ in 0..MAX_WRITE_ITERATIONS { + store_clone_2.record(SampleSysEvent2::new()); + } + }); + + let t3 = thread::spawn(move || { + for _ in 0..MAX_WRITE_ITERATIONS { + store_clone_3.record(SampleSysEvent3::new()); + } + }); + + let t4 = thread::spawn(move || { + for _ in 0..MAX_WRITE_ITERATIONS { + let vals = store_clone_4.as_record_batch::(); + debug!("result {:?}", vals.is_some()); + } + }); + + let t5 = thread::spawn(move || { + for _ in 0..MAX_WRITE_ITERATIONS { + let vals = store_clone_5.as_record_batch::(); + debug!("result {:?}", vals.is_some()); + } + }); + + let t6 = thread::spawn(move || { + for _ in 0..MAX_WRITE_ITERATIONS { + let vals = store_clone_6.as_record_batch::(); + debug!("result {:?}", vals.is_some()); + } + }); + + let _ = t1.join(); + let _ = t2.join(); + let _ = t3.join(); + let _ = t4.join(); + let _ = t5.join(); + let _ = t6.join(); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/influxdb3_sys_events/src/lib.rs b/influxdb3_sys_events/src/lib.rs index bb8459fd4d4..9ac936f9fdc 100644 --- a/influxdb3_sys_events/src/lib.rs +++ b/influxdb3_sys_events/src/lib.rs @@ -94,7 +94,7 @@ impl SysEventStore { /// [`ToRecordBatch`] trait pub fn as_record_batch(&self) -> Option> where - E: 'static + Clone + Debug + Sync + Send + ToRecordBatch, + E: 'static + Debug + Sync + Send + ToRecordBatch, { let map_ref = self.events.get(&TypeId::of::>>()); let buf_ref = map_ref @@ -105,37 +105,34 @@ impl SysEventStore { } } -pub struct RingBuffer { - buf: Vec, +pub type RingBuffer = RingBufferArray; + +pub struct RingBufferArray { + buf: [Option; N], max: usize, write_index: usize, } -impl RingBuffer { +impl RingBufferArray { fn new(capacity: usize) -> Self { + let buf_array: [Option; N] = [const { None }; N]; Self { - buf: Vec::with_capacity(capacity), + buf: buf_array, max: capacity, write_index: 0, } } fn push(&mut self, val: T) { - if !self.reached_max() { - self.buf.push(val); - } else { - let _ = replace(&mut self.buf[self.write_index], val); - } + let _ = replace(&mut self.buf[self.write_index], Some(val)); self.write_index = (self.write_index + 1) % self.max; } - pub fn in_order(&self) -> impl Iterator { + pub fn in_order(&self) -> impl Iterator { let (head, tail) = self.buf.split_at(self.write_index); - tail.iter().chain(head.iter()) - } - - fn reached_max(&mut self) -> bool { - self.buf.len() >= self.max + tail.iter() + .chain(head.iter()) + .filter_map(|item| item.as_ref()) } } @@ -144,7 +141,7 @@ impl RingBuffer { #[derive(Default, Clone, Debug)] pub struct Event { time: i64, - data: D, + pub data: D, } impl Event {