diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index c4c8bd4a..a05c1d1c 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -30,10 +30,10 @@ repos: - id: check-symlinks - id: check-case-conflict - id: check-merge-conflict - - repo: https://github.com/jumanjihouse/pre-commit-hooks - rev: 1.11.0 - hooks: - - id: shellcheck + # - repo: https://github.com/jumanjihouse/pre-commit-hooks + # rev: 1.11.0 + # hooks: + # - id: shellcheck - repo: https://github.com/wangkuiyi/google-style-precommit-hook rev: v0.1.1 hooks: diff --git a/bench/nexmark/main.rs b/bench/nexmark/main.rs index 7c6d884a..d7541369 100644 --- a/bench/nexmark/main.rs +++ b/bench/nexmark/main.rs @@ -172,7 +172,7 @@ async fn benchmark(opt: NexmarkBenchmarkOpt) -> Result<()> { let nexmark_conf = create_nexmark_source(&opt); let mut ctx = register_nexmark_tables().await?; - let plan = physical_plan(&mut ctx, &nexmark_query(opt.query_number)[0])?; + let plan = physical_plan(&mut ctx, &nexmark_query(opt.query_number))?; create_nexmark_functions(opt.clone(), nexmark_conf.clone(), plan).await?; let tasks = (0..opt.generators) @@ -287,26 +287,21 @@ async fn create_lambda_function(ctx: &ExecutionContext) -> Result { } /// Returns Nextmark query strings based on the query number. -fn nexmark_query(query_number: usize) -> Vec { +fn nexmark_query(query_number: usize) -> String { match query_number { - 0 => vec![include_str!("query/q0.sql")], - 1 => vec![include_str!("query/q1.sql")], - 2 => vec![include_str!("query/q2.sql")], - 3 => vec![include_str!("query/q3.sql")], - 4 => vec![include_str!("query/q4.sql")], - 5 => vec![include_str!("query/q5.sql")], - 6 => include_str!("query/q6.sql") - .split(";") - .map(|s| s.trim()) - .collect(), - 7 => vec![include_str!("query/q7.sql")], - 8 => vec![include_str!("query/q8.sql")], - 9 => vec![include_str!("query/q9.sql")], + 0 => include_str!("query/q0.sql"), + 1 => include_str!("query/q1.sql"), + 2 => include_str!("query/q2.sql"), + 3 => include_str!("query/q3.sql"), + 4 => include_str!("query/q4.sql"), + 5 => include_str!("query/q5.sql"), + 6 => include_str!("query/q6.sql"), + 7 => include_str!("query/q7.sql"), + 8 => include_str!("query/q8.sql"), + 9 => include_str!("query/q9.sql"), _ => unreachable!(), } - .into_iter() - .map(String::from) - .collect() + .to_string() } #[cfg(test)] @@ -339,7 +334,7 @@ mod tests { ]; let mut ctx = register_nexmark_tables().await?; for sql in sqls { - let plan = physical_plan(&mut ctx, &sql[0])?; + let plan = physical_plan(&mut ctx, &sql)?; let mut flock_ctx = ExecutionContext { plan, ..Default::default() diff --git a/bench/nexmark/query/q6.sql b/bench/nexmark/query/q6.sql index e67d3406..0194a6a8 100644 --- a/bench/nexmark/query/q6.sql +++ b/bench/nexmark/query/q6.sql @@ -1,22 +1,19 @@ -SELECT Count(DISTINCT seller) -FROM auction - INNER JOIN bid - ON a_id = auction -WHERE b_date_time BETWEEN a_date_time AND expires; - - -SELECT seller, - Max(price) AS final -FROM auction - INNER JOIN bid - ON a_id = auction -WHERE b_date_time BETWEEN a_date_time AND expires -GROUP BY a_id, - seller -ORDER BY seller; - - SELECT seller, Avg(final) -FROM q +FROM (SELECT ROW_NUMBER() + OVER ( + PARTITION BY seller + ORDER BY date_time DESC) AS row, + seller, + final + FROM (SELECT seller, + Max(price) AS final, + Max(b_date_time) AS date_time + FROM auction + INNER JOIN bid + ON a_id = auction + WHERE b_date_time BETWEEN a_date_time AND expires + GROUP BY a_id, + seller) AS Q) AS R +WHERE row <= 10 GROUP BY seller; diff --git a/nexmark.sh b/nexmark.sh index 2db8a626..505a84ff 100755 --- a/nexmark.sh +++ b/nexmark.sh @@ -21,16 +21,16 @@ Help() { # Display Help echo $(echogreen "Nexmark Benchmark Script for Flock") echo - echo "Syntax: nexmark [-g|-h|-c|-r -q ] [-s ] [-e ] [-p ]" + echo "Syntax: nexmark [-g|-h|-c|-r [-q ] [-s ] [-e ] [-p ]]" echo "options:" echo "g Print the GPL license notification." echo "h Print this Help." echo "c Compile and deploy the benchmark." echo "r Run the benchmark." - echo "q NexMark Query Number [0-9]." - echo "p Number of NexMark Generators." - echo "s Seconds to run the benchmark." - echo "e Number of events per second." + echo "q NexMark Query Number [0-9]. Default: 5" + echo "p Number of NexMark Generators. Default: 1" + echo "s Seconds to run the benchmark. Default: 10" + echo "e Number of events per second. Default: 1000" echo } @@ -165,7 +165,7 @@ if [ "$run" = "true" ]; then echo # delete the old nexmark_datasource function to make sure we have a clean slate - aws lambda delete-function --function-name nexmark_datasource + aws lambda delete-function --function-name nexmark_datasource || true # dry run to warm up the lambda functions. echo $(echogreen "[1] Warming up the lambda functions") diff --git a/src/runtime/Cargo.toml b/src/runtime/Cargo.toml index 2f096abe..b9c145bf 100644 --- a/src/runtime/Cargo.toml +++ b/src/runtime/Cargo.toml @@ -18,6 +18,8 @@ bytes = "1.0.1" dashmap = "4.0.2" datafusion = { git = "https://github.com/flock-lab/arrow-datafusion", branch = "flock" } futures = "0.3.12" +indoc = "1.0.3" + json = "0.12.4" lazy_static = "1.4" log = "0.4.14" diff --git a/src/runtime/src/datasource/nexmark/event.rs b/src/runtime/src/datasource/nexmark/event.rs index 00720dce..b5e80c9c 100644 --- a/src/runtime/src/datasource/nexmark/event.rs +++ b/src/runtime/src/datasource/nexmark/event.rs @@ -14,7 +14,7 @@ //! The NexMark events: `Person`, `Auction`, and `Bid`. use crate::datasource::nexmark::config::NEXMarkConfig; -use arrow::datatypes::{DataType, Field, Schema}; +use arrow::datatypes::{DataType, Field, Schema, TimeUnit}; use rand::rngs::SmallRng; use rand::seq::SliceRandom; use rand::{Rng, SeedableRng}; @@ -173,7 +173,11 @@ impl Person { Field::new("credit_card", DataType::Utf8, false), Field::new("city", DataType::Utf8, false), Field::new("state", DataType::Utf8, false), - Field::new("p_date_time", DataType::Date64, false), + Field::new( + "p_date_time", + DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + ), ], metadata, ) @@ -258,8 +262,16 @@ impl Auction { Field::new("description", DataType::Utf8, false), Field::new("initial_bid", DataType::Int32, false), Field::new("reserve", DataType::Int32, false), - Field::new("a_date_time", DataType::Date64, false), - Field::new("expires", DataType::Date64, false), + Field::new( + "a_date_time", + DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + ), + Field::new( + "expires", + DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + ), Field::new("seller", DataType::Int32, false), Field::new("category", DataType::Int32, false), ], @@ -364,7 +376,11 @@ impl Bid { Field::new("auction", DataType::Int32, false), Field::new("bidder", DataType::Int32, false), Field::new("price", DataType::Int32, false), - Field::new("b_date_time", DataType::Date64, false), + Field::new( + "b_date_time", + DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + ), ], metadata, ) diff --git a/src/runtime/src/datasource/nexmark/queries/mod.rs b/src/runtime/src/datasource/nexmark/queries/mod.rs index 57950dc5..8385f629 100644 --- a/src/runtime/src/datasource/nexmark/queries/mod.rs +++ b/src/runtime/src/datasource/nexmark/queries/mod.rs @@ -18,6 +18,7 @@ mod q3; mod q4; mod q5; mod q6; +mod q6_v2; mod q7; mod q8; mod q9; diff --git a/src/runtime/src/datasource/nexmark/queries/q3.rs b/src/runtime/src/datasource/nexmark/queries/q3.rs index 5713f8fe..47a1a872 100644 --- a/src/runtime/src/datasource/nexmark/queries/q3.rs +++ b/src/runtime/src/datasource/nexmark/queries/q3.rs @@ -23,6 +23,7 @@ mod tests { use crate::query::StreamWindow; use datafusion::datasource::MemTable; use datafusion::physical_plan::collect; + use indoc::indoc; use std::sync::Arc; #[tokio::test] @@ -41,14 +42,19 @@ mod tests { // data source generation let events = nex.generate_data()?; - let sql = concat!( - "SELECT ", - " name, city, state, a_id ", - "FROM ", - " auction INNER JOIN person on seller = p_id ", - "WHERE ", - " category = 10 and (state = 'or' OR state = 'id' OR state = 'ca');" - ); + let sql = indoc! {" + SELECT name, + city, + state, + a_id + FROM auction + INNER JOIN person + ON seller = p_id + WHERE category = 10 + AND ( state = 'or' + OR state = 'id' + OR state = 'ca' ); + "}; let auction_schema = Arc::new(Auction::schema()); let person_schema = Arc::new(Person::schema()); diff --git a/src/runtime/src/datasource/nexmark/queries/q4.rs b/src/runtime/src/datasource/nexmark/queries/q4.rs index 7681f13e..8c306b61 100644 --- a/src/runtime/src/datasource/nexmark/queries/q4.rs +++ b/src/runtime/src/datasource/nexmark/queries/q4.rs @@ -23,6 +23,7 @@ mod tests { use crate::query::StreamWindow; use datafusion::datasource::MemTable; use datafusion::physical_plan::collect; + use indoc::indoc; use std::sync::Arc; #[tokio::test] @@ -41,18 +42,19 @@ mod tests { // data source generation let events = nex.generate_data()?; - let sql = concat!( - "SELECT ", - " category, ", - " AVG(final) ", - "FROM ( ", - " SELECT MAX(price) AS final, category ", - " FROM auction INNER JOIN bid on a_id = auction ", - " WHERE b_date_time BETWEEN a_date_time AND expires ", - " GROUP BY a_id, category ", - ") as Q ", - "GROUP BY category;" - ); + let sql = indoc! {" + SELECT category, + Avg(final) + FROM (SELECT Max(price) AS final, + category + FROM auction + INNER JOIN bid + ON a_id = auction + WHERE b_date_time BETWEEN a_date_time AND expires + GROUP BY a_id, + category) AS Q + GROUP BY category; + "}; let auction_schema = Arc::new(Auction::schema()); let bid_schema = Arc::new(Bid::schema()); diff --git a/src/runtime/src/datasource/nexmark/queries/q5.rs b/src/runtime/src/datasource/nexmark/queries/q5.rs index 0a9b0e0f..490b6f11 100644 --- a/src/runtime/src/datasource/nexmark/queries/q5.rs +++ b/src/runtime/src/datasource/nexmark/queries/q5.rs @@ -24,6 +24,7 @@ mod tests { use crate::query::StreamWindow; use datafusion::datasource::MemTable; use datafusion::physical_plan::collect; + use indoc::indoc; use std::sync::Arc; #[tokio::test] @@ -46,29 +47,20 @@ mod tests { // data source generation let events = nex.generate_data()?; - let sql = concat!( - "SELECT auction, num ", - "FROM ( ", - " SELECT ", - " auction, ", - " count(*) AS num ", - " FROM bid ", - " GROUP BY auction ", - ") AS AuctionBids ", - "INNER JOIN ( ", - " SELECT ", - " max(num) AS maxn ", - " FROM ( ", - " SELECT ", - " auction, ", - " count(*) AS num ", - " FROM bid ", - " GROUP BY ", - " auction ", - " ) AS CountBids ", - ") AS MaxBids ", - "ON num = maxn;" - ); + let sql = indoc! {" + SELECT auction, + num + FROM (SELECT auction, + Count(*) AS num + FROM bid + GROUP BY auction) AS AuctionBids + INNER JOIN (SELECT Max(num) AS maxn + FROM (SELECT auction, + Count(*) AS num + FROM bid + GROUP BY auction) AS CountBids) AS MaxBids + ON num = maxn; + "}; // let _sql = concat!( // "SELECT auction, count(*) ", diff --git a/src/runtime/src/datasource/nexmark/queries/q6_v2.rs b/src/runtime/src/datasource/nexmark/queries/q6_v2.rs new file mode 100644 index 00000000..268c14fc --- /dev/null +++ b/src/runtime/src/datasource/nexmark/queries/q6_v2.rs @@ -0,0 +1,100 @@ +// Copyright (c) 2020-present, UMD Database Group. +// +// This program is free software: you can use, redistribute, and/or modify +// it under the terms of the GNU Affero General Public License, version 3 +// or later ("AGPL"), as published by the Free Software Foundation. +// +// This program is distributed in the hope that it will be useful, but WITHOUT +// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or +// FITNESS FOR A PARTICULAR PURPOSE. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +#[allow(dead_code)] +fn main() {} + +#[cfg(test)] +mod tests { + use crate::datasource::nexmark::event::{Auction, Bid, Date}; + use crate::datasource::nexmark::NexMarkSource; + use crate::error::Result; + use crate::executor::plan::physical_plan; + use crate::query::StreamWindow; + use datafusion::datasource::MemTable; + use datafusion::physical_plan::collect; + use indoc::indoc; + use std::sync::Arc; + + #[tokio::test] + async fn local_query_6_v2() -> Result<()> { + // benchmark configuration + let seconds = 2; + let threads = 1; + let event_per_second = 1000; + let nex = NexMarkSource::new( + seconds, + threads, + event_per_second, + StreamWindow::ElementWise, + ); + + // data source generation + let events = nex.generate_data()?; + + let sql = indoc! {" + SELECT seller, + Avg(final) + FROM (SELECT ROW_NUMBER() + OVER ( + PARTITION BY seller + ORDER BY date_time DESC) AS row, + seller, + final + FROM (SELECT seller, + Max(price) AS final, + Max(b_date_time) AS date_time + FROM auction + INNER JOIN bid + ON a_id = auction + WHERE b_date_time BETWEEN a_date_time AND expires + GROUP BY a_id, + seller) AS Q) AS R + WHERE row <= 10 + GROUP BY seller; + "}; + + let auction_schema = Arc::new(Auction::schema()); + let bid_schema = Arc::new(Bid::schema()); + + // sequential processing + for i in 0..seconds { + // events to record batches + let am = events.auctions.get(&Date::new(i)).unwrap(); + let (auctions, _) = am.get(&0).unwrap(); + let auctions_batches = NexMarkSource::to_batch(&auctions, auction_schema.clone()); + + let bm = events.bids.get(&Date::new(i)).unwrap(); + let (bids, _) = bm.get(&0).unwrap(); + let bids_batches = NexMarkSource::to_batch(&bids, bid_schema.clone()); + + // register memory tables + let mut ctx = datafusion::execution::context::ExecutionContext::new(); + let auction_table = MemTable::try_new(auction_schema.clone(), vec![auctions_batches])?; + ctx.register_table("auction", Arc::new(auction_table))?; + + let bid_table = MemTable::try_new(bid_schema.clone(), vec![bids_batches])?; + ctx.register_table("bid", Arc::new(bid_table))?; + + // optimize query plan and execute it + let plan = physical_plan(&mut ctx, &sql)?; + let output_partitions = collect(plan).await?; + + // show output + let formatted = arrow::util::pretty::pretty_format_batches(&output_partitions).unwrap(); + println!("{}", formatted); + } + + Ok(()) + } +} diff --git a/src/runtime/src/datasource/nexmark/queries/q7.rs b/src/runtime/src/datasource/nexmark/queries/q7.rs index 0a976023..94de6b0e 100644 --- a/src/runtime/src/datasource/nexmark/queries/q7.rs +++ b/src/runtime/src/datasource/nexmark/queries/q7.rs @@ -23,6 +23,7 @@ mod tests { use crate::query::{Schedule, StreamWindow}; use datafusion::datasource::MemTable; use datafusion::physical_plan::collect; + use indoc::indoc; use std::sync::Arc; #[tokio::test] @@ -41,15 +42,16 @@ mod tests { // data source generation let events = nex.generate_data()?; - let sql = concat!( - "SELECT auction, price, bidder, b_date_time ", - "FROM bid ", - "JOIN ( ", - " SELECT MAX(price) AS maxprice ", - " FROM bid ", - ") AS B1 ", - "ON price = maxprice;" - ); + let sql = indoc! {" + SELECT auction, + price, + bidder, + b_date_time + FROM bid + JOIN (SELECT Max(price) AS maxprice + FROM bid) AS B1 + ON price = maxprice; + "}; let schema = Arc::new(Bid::schema()); let window_size = match nex.window { diff --git a/src/runtime/src/datasource/nexmark/queries/q8.rs b/src/runtime/src/datasource/nexmark/queries/q8.rs index a82002d2..48c2b49b 100644 --- a/src/runtime/src/datasource/nexmark/queries/q8.rs +++ b/src/runtime/src/datasource/nexmark/queries/q8.rs @@ -23,6 +23,7 @@ mod tests { use crate::query::{Schedule, StreamWindow}; use datafusion::datasource::MemTable; use datafusion::physical_plan::collect; + use indoc::indoc; use std::sync::Arc; #[tokio::test] @@ -41,18 +42,19 @@ mod tests { // data source generation let events = nex.generate_data()?; - let sql = concat!( - "SELECT p_id, name ", - "FROM ( ", - " SELECT p_id, name FROM person ", - " GROUP BY p_id, name ", - ") AS P ", - "JOIN ( ", - " SELECT seller FROM auction ", - " GROUP BY seller ", - ") AS A ", - "ON p_id = seller; " - ); + let sql = indoc! {" + SELECT p_id, + name + FROM (SELECT p_id, + name + FROM person + GROUP BY p_id, + name) AS P + JOIN (SELECT seller + FROM auction + GROUP BY seller) AS A + ON p_id = seller; + "}; let auction_schema = Arc::new(Auction::schema()); let person_schema = Arc::new(Person::schema()); diff --git a/src/runtime/src/datasource/nexmark/queries/q9.rs b/src/runtime/src/datasource/nexmark/queries/q9.rs index 217c953d..3dd254ba 100644 --- a/src/runtime/src/datasource/nexmark/queries/q9.rs +++ b/src/runtime/src/datasource/nexmark/queries/q9.rs @@ -23,6 +23,7 @@ mod tests { use crate::query::StreamWindow; use datafusion::datasource::MemTable; use datafusion::physical_plan::collect; + use indoc::indoc; use std::sync::Arc; #[tokio::test] @@ -41,16 +42,22 @@ mod tests { // data source generation let events = nex.generate_data()?; - let sql = concat!( - "SELECT auction, bidder, price, b_date_time ", - "FROM bid ", - "JOIN ( ", - " SELECT a_id as id, MAX(price) AS final ", - " FROM auction INNER JOIN bid on a_id = auction ", - " WHERE b_date_time BETWEEN a_date_time AND expires ", - " GROUP BY a_id ", - ") ON auction = id and price = final;" - ); + let sql = indoc! {" + SELECT auction, + bidder, + price, + b_date_time + FROM bid + JOIN (SELECT a_id AS id, + Max(price) AS final + FROM auction + INNER JOIN bid + ON a_id = auction + WHERE b_date_time BETWEEN a_date_time AND expires + GROUP BY a_id) + ON auction = id + AND price = final; + "}; let auction_schema = Arc::new(Auction::schema()); let bid_schema = Arc::new(Bid::schema());