Skip to content

Commit

Permalink
feat: simplify nexmark query 6 using one sql stmt (#334)
Browse files Browse the repository at this point in the history
* feat: simplify nexmark query 6 using one sql stmt

* fix: nanosecond type for streaming process

* fix: typo
  • Loading branch information
gangliao authored Dec 8, 2021
1 parent 2a71dd9 commit d522da3
Show file tree
Hide file tree
Showing 14 changed files with 249 additions and 127 deletions.
8 changes: 4 additions & 4 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ repos:
- id: check-symlinks
- id: check-case-conflict
- id: check-merge-conflict
- repo: https://github.com/jumanjihouse/pre-commit-hooks
rev: 1.11.0
hooks:
- id: shellcheck
# - repo: https://github.com/jumanjihouse/pre-commit-hooks
# rev: 1.11.0
# hooks:
# - id: shellcheck
- repo: https://github.com/wangkuiyi/google-style-precommit-hook
rev: v0.1.1
hooks:
Expand Down
33 changes: 14 additions & 19 deletions bench/nexmark/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ async fn benchmark(opt: NexmarkBenchmarkOpt) -> Result<()> {
let nexmark_conf = create_nexmark_source(&opt);

let mut ctx = register_nexmark_tables().await?;
let plan = physical_plan(&mut ctx, &nexmark_query(opt.query_number)[0])?;
let plan = physical_plan(&mut ctx, &nexmark_query(opt.query_number))?;
create_nexmark_functions(opt.clone(), nexmark_conf.clone(), plan).await?;

let tasks = (0..opt.generators)
Expand Down Expand Up @@ -287,26 +287,21 @@ async fn create_lambda_function(ctx: &ExecutionContext) -> Result<String> {
}

/// Returns Nextmark query strings based on the query number.
fn nexmark_query(query_number: usize) -> Vec<String> {
fn nexmark_query(query_number: usize) -> String {
match query_number {
0 => vec![include_str!("query/q0.sql")],
1 => vec![include_str!("query/q1.sql")],
2 => vec![include_str!("query/q2.sql")],
3 => vec![include_str!("query/q3.sql")],
4 => vec![include_str!("query/q4.sql")],
5 => vec![include_str!("query/q5.sql")],
6 => include_str!("query/q6.sql")
.split(";")
.map(|s| s.trim())
.collect(),
7 => vec![include_str!("query/q7.sql")],
8 => vec![include_str!("query/q8.sql")],
9 => vec![include_str!("query/q9.sql")],
0 => include_str!("query/q0.sql"),
1 => include_str!("query/q1.sql"),
2 => include_str!("query/q2.sql"),
3 => include_str!("query/q3.sql"),
4 => include_str!("query/q4.sql"),
5 => include_str!("query/q5.sql"),
6 => include_str!("query/q6.sql"),
7 => include_str!("query/q7.sql"),
8 => include_str!("query/q8.sql"),
9 => include_str!("query/q9.sql"),
_ => unreachable!(),
}
.into_iter()
.map(String::from)
.collect()
.to_string()
}

#[cfg(test)]
Expand Down Expand Up @@ -339,7 +334,7 @@ mod tests {
];
let mut ctx = register_nexmark_tables().await?;
for sql in sqls {
let plan = physical_plan(&mut ctx, &sql[0])?;
let plan = physical_plan(&mut ctx, &sql)?;
let mut flock_ctx = ExecutionContext {
plan,
..Default::default()
Expand Down
35 changes: 16 additions & 19 deletions bench/nexmark/query/q6.sql
Original file line number Diff line number Diff line change
@@ -1,22 +1,19 @@
SELECT Count(DISTINCT seller)
FROM auction
INNER JOIN bid
ON a_id = auction
WHERE b_date_time BETWEEN a_date_time AND expires;


SELECT seller,
Max(price) AS final
FROM auction
INNER JOIN bid
ON a_id = auction
WHERE b_date_time BETWEEN a_date_time AND expires
GROUP BY a_id,
seller
ORDER BY seller;


SELECT seller,
Avg(final)
FROM q
FROM (SELECT ROW_NUMBER()
OVER (
PARTITION BY seller
ORDER BY date_time DESC) AS row,
seller,
final
FROM (SELECT seller,
Max(price) AS final,
Max(b_date_time) AS date_time
FROM auction
INNER JOIN bid
ON a_id = auction
WHERE b_date_time BETWEEN a_date_time AND expires
GROUP BY a_id,
seller) AS Q) AS R
WHERE row <= 10
GROUP BY seller;
12 changes: 6 additions & 6 deletions nexmark.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@ Help() {
# Display Help
echo $(echogreen "Nexmark Benchmark Script for Flock")
echo
echo "Syntax: nexmark [-g|-h|-c|-r -q <query_id>] [-s <number_of_seconds>] [-e <events_per_second>] [-p <number_of_parallel_streams>]"
echo "Syntax: nexmark [-g|-h|-c|-r [-q <query_id>] [-s <number_of_seconds>] [-e <events_per_second>] [-p <number_of_parallel_streams>]]"
echo "options:"
echo "g Print the GPL license notification."
echo "h Print this Help."
echo "c Compile and deploy the benchmark."
echo "r Run the benchmark."
echo "q NexMark Query Number [0-9]."
echo "p Number of NexMark Generators."
echo "s Seconds to run the benchmark."
echo "e Number of events per second."
echo "q NexMark Query Number [0-9]. Default: 5"
echo "p Number of NexMark Generators. Default: 1"
echo "s Seconds to run the benchmark. Default: 10"
echo "e Number of events per second. Default: 1000"
echo
}

Expand Down Expand Up @@ -165,7 +165,7 @@ if [ "$run" = "true" ]; then
echo

# delete the old nexmark_datasource function to make sure we have a clean slate
aws lambda delete-function --function-name nexmark_datasource
aws lambda delete-function --function-name nexmark_datasource || true

# dry run to warm up the lambda functions.
echo $(echogreen "[1] Warming up the lambda functions")
Expand Down
2 changes: 2 additions & 0 deletions src/runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ bytes = "1.0.1"
dashmap = "4.0.2"
datafusion = { git = "https://github.com/flock-lab/arrow-datafusion", branch = "flock" }
futures = "0.3.12"
indoc = "1.0.3"

json = "0.12.4"
lazy_static = "1.4"
log = "0.4.14"
Expand Down
26 changes: 21 additions & 5 deletions src/runtime/src/datasource/nexmark/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
//! The NexMark events: `Person`, `Auction`, and `Bid`.
use crate::datasource::nexmark::config::NEXMarkConfig;
use arrow::datatypes::{DataType, Field, Schema};
use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
use rand::rngs::SmallRng;
use rand::seq::SliceRandom;
use rand::{Rng, SeedableRng};
Expand Down Expand Up @@ -173,7 +173,11 @@ impl Person {
Field::new("credit_card", DataType::Utf8, false),
Field::new("city", DataType::Utf8, false),
Field::new("state", DataType::Utf8, false),
Field::new("p_date_time", DataType::Date64, false),
Field::new(
"p_date_time",
DataType::Timestamp(TimeUnit::Nanosecond, None),
false,
),
],
metadata,
)
Expand Down Expand Up @@ -258,8 +262,16 @@ impl Auction {
Field::new("description", DataType::Utf8, false),
Field::new("initial_bid", DataType::Int32, false),
Field::new("reserve", DataType::Int32, false),
Field::new("a_date_time", DataType::Date64, false),
Field::new("expires", DataType::Date64, false),
Field::new(
"a_date_time",
DataType::Timestamp(TimeUnit::Nanosecond, None),
false,
),
Field::new(
"expires",
DataType::Timestamp(TimeUnit::Nanosecond, None),
false,
),
Field::new("seller", DataType::Int32, false),
Field::new("category", DataType::Int32, false),
],
Expand Down Expand Up @@ -364,7 +376,11 @@ impl Bid {
Field::new("auction", DataType::Int32, false),
Field::new("bidder", DataType::Int32, false),
Field::new("price", DataType::Int32, false),
Field::new("b_date_time", DataType::Date64, false),
Field::new(
"b_date_time",
DataType::Timestamp(TimeUnit::Nanosecond, None),
false,
),
],
metadata,
)
Expand Down
1 change: 1 addition & 0 deletions src/runtime/src/datasource/nexmark/queries/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ mod q3;
mod q4;
mod q5;
mod q6;
mod q6_v2;
mod q7;
mod q8;
mod q9;
22 changes: 14 additions & 8 deletions src/runtime/src/datasource/nexmark/queries/q3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ mod tests {
use crate::query::StreamWindow;
use datafusion::datasource::MemTable;
use datafusion::physical_plan::collect;
use indoc::indoc;
use std::sync::Arc;

#[tokio::test]
Expand All @@ -41,14 +42,19 @@ mod tests {
// data source generation
let events = nex.generate_data()?;

let sql = concat!(
"SELECT ",
" name, city, state, a_id ",
"FROM ",
" auction INNER JOIN person on seller = p_id ",
"WHERE ",
" category = 10 and (state = 'or' OR state = 'id' OR state = 'ca');"
);
let sql = indoc! {"
SELECT name,
city,
state,
a_id
FROM auction
INNER JOIN person
ON seller = p_id
WHERE category = 10
AND ( state = 'or'
OR state = 'id'
OR state = 'ca' );
"};

let auction_schema = Arc::new(Auction::schema());
let person_schema = Arc::new(Person::schema());
Expand Down
26 changes: 14 additions & 12 deletions src/runtime/src/datasource/nexmark/queries/q4.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ mod tests {
use crate::query::StreamWindow;
use datafusion::datasource::MemTable;
use datafusion::physical_plan::collect;
use indoc::indoc;
use std::sync::Arc;

#[tokio::test]
Expand All @@ -41,18 +42,19 @@ mod tests {
// data source generation
let events = nex.generate_data()?;

let sql = concat!(
"SELECT ",
" category, ",
" AVG(final) ",
"FROM ( ",
" SELECT MAX(price) AS final, category ",
" FROM auction INNER JOIN bid on a_id = auction ",
" WHERE b_date_time BETWEEN a_date_time AND expires ",
" GROUP BY a_id, category ",
") as Q ",
"GROUP BY category;"
);
let sql = indoc! {"
SELECT category,
Avg(final)
FROM (SELECT Max(price) AS final,
category
FROM auction
INNER JOIN bid
ON a_id = auction
WHERE b_date_time BETWEEN a_date_time AND expires
GROUP BY a_id,
category) AS Q
GROUP BY category;
"};

let auction_schema = Arc::new(Auction::schema());
let bid_schema = Arc::new(Bid::schema());
Expand Down
38 changes: 15 additions & 23 deletions src/runtime/src/datasource/nexmark/queries/q5.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ mod tests {
use crate::query::StreamWindow;
use datafusion::datasource::MemTable;
use datafusion::physical_plan::collect;
use indoc::indoc;
use std::sync::Arc;

#[tokio::test]
Expand All @@ -46,29 +47,20 @@ mod tests {
// data source generation
let events = nex.generate_data()?;

let sql = concat!(
"SELECT auction, num ",
"FROM ( ",
" SELECT ",
" auction, ",
" count(*) AS num ",
" FROM bid ",
" GROUP BY auction ",
") AS AuctionBids ",
"INNER JOIN ( ",
" SELECT ",
" max(num) AS maxn ",
" FROM ( ",
" SELECT ",
" auction, ",
" count(*) AS num ",
" FROM bid ",
" GROUP BY ",
" auction ",
" ) AS CountBids ",
") AS MaxBids ",
"ON num = maxn;"
);
let sql = indoc! {"
SELECT auction,
num
FROM (SELECT auction,
Count(*) AS num
FROM bid
GROUP BY auction) AS AuctionBids
INNER JOIN (SELECT Max(num) AS maxn
FROM (SELECT auction,
Count(*) AS num
FROM bid
GROUP BY auction) AS CountBids) AS MaxBids
ON num = maxn;
"};

// let _sql = concat!(
// "SELECT auction, count(*) ",
Expand Down
Loading

0 comments on commit d522da3

Please sign in to comment.