From c9b964eac7498b2822de6689a5b256e7112b4d1f Mon Sep 17 00:00:00 2001 From: Gang Liao Date: Sat, 1 Jan 2022 16:45:15 -0500 Subject: [PATCH] feat: support nexmark q13 on cloud functions (#405) * feat: support nexmark q13 on cloud functions * fix: we cannot infer schema for side input This is because inferred schema's field type could be different. For example, Int32 and Int64 are different. * fix: add side_input_schema() * fix: unused variable: `ctx` * fix: use base64 to encode schema --- benchmarks/Cargo.toml | 2 + benchmarks/src/nexmark/main.rs | 143 +++++++++++------- benchmarks/src/nexmark/query/q13.sql | 8 ++ benchmarks/src/s3/main.rs | 2 +- flock-cli/src/nexmark.rs | 5 +- flock-function/Cargo.toml | 1 + flock-function/src/aws/actor.rs | 144 +++++++++++++++---- flock-function/src/aws/window.rs | 34 +++-- flock/Cargo.toml | 1 - flock/src/config.toml | 1 + flock/src/datasource/nexmark/event.rs | 13 ++ flock/src/datasource/nexmark/nexmark.rs | 21 +-- flock/src/datasource/nexmark/queries/q13.sql | 3 + flock/src/services.rs | 36 ++++- flock_bench.sh | 2 +- 15 files changed, 304 insertions(+), 112 deletions(-) create mode 100644 benchmarks/src/nexmark/query/q13.sql diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml index 8e292a24..2971e714 100644 --- a/benchmarks/Cargo.toml +++ b/benchmarks/Cargo.toml @@ -9,6 +9,7 @@ publish = false [dependencies] arrow = { git = "https://github.com/flock-lab/arrow-rs", branch = "flock", features = [ "simd" ] } +base64 = "0.13.0" chrono = "0.4.19" datafusion = { git = "https://github.com/flock-lab/arrow-datafusion", branch = "flock" } env_logger = "^0.9" @@ -18,6 +19,7 @@ humantime = "2.1.0" itertools = "0.10.0" lazy_static = "1.4" log = "0.4.14" +reqwest = "0.11.7" rusoto_core = "0.47.0" rusoto_lambda = "0.47.0" rusoto_logs = "0.47.0" diff --git a/benchmarks/src/nexmark/main.rs b/benchmarks/src/nexmark/main.rs index ac93661b..ad869d2a 100644 --- a/benchmarks/src/nexmark/main.rs +++ b/benchmarks/src/nexmark/main.rs @@ -24,17 +24,24 @@ use flock::prelude::*; use humantime::parse_duration; use lazy_static::lazy_static; use log::info; -use nexmark::event::{Auction, Bid, Person}; +use nexmark::event::{side_input_schema, Auction, Bid, Person}; use nexmark::NEXMarkSource; use rainbow::{rainbow_println, rainbow_string}; -use rusoto_core::{ByteStream, Region}; +use rusoto_core::Region; use rusoto_lambda::InvocationResponse; -use rusoto_s3::{ListObjectsV2Request, PutObjectRequest, S3Client, S3}; +use rusoto_s3::S3Client; use std::collections::HashMap; use std::sync::Arc; use structopt::StructOpt; use tokio::task::JoinHandle; +static SIDE_INPUT_DOWNLOAD_URL: &str = concat!( + "https://gist.githubusercontent.com/gangliao/", + "de6f544b8a93f26081036e0a7f8c1715/raw/", + "586c88ad6f89d12c9f1753622eddf4788f6f0f9d/", + "nexmark_q13_side_input.csv" +); + lazy_static! { pub static ref FLOCK_S3_KEY: String = FLOCK_CONF["flock"]["s3_key"].to_string(); pub static ref FLOCK_S3_BUCKET: String = FLOCK_CONF["flock"]["s3_bucket"].to_string(); @@ -51,6 +58,7 @@ lazy_static! { pub static ref NEXMARK_SOURCE_LOG_GROUP: String = "/aws/lambda/flock_datasource".to_string(); pub static ref NEXMARK_Q4_S3_KEY: String = FLOCK_CONF["nexmark"]["q4_s3_key"].to_string(); pub static ref NEXMARK_Q6_S3_KEY: String = FLOCK_CONF["nexmark"]["q6_s3_key"].to_string(); + pub static ref NEXMARK_Q13_S3_SIDE_INPUT_KEY: String = FLOCK_CONF["nexmark"]["q13_s3_side_input_key"].to_string(); } #[derive(Default, Clone, Debug, StructOpt)] @@ -115,10 +123,18 @@ pub async fn register_nexmark_tables() -> Result { )?; ctx.register_table("bid", Arc::new(bid_table))?; + // For NEXMark Q13 + let side_input_schema = Arc::new(side_input_schema()); + let side_input_table = MemTable::try_new( + side_input_schema.clone(), + vec![vec![RecordBatch::new_empty(side_input_schema)]], + )?; + ctx.register_table("side_input", Arc::new(side_input_table))?; + Ok(ctx) } -pub fn create_nexmark_source(opt: &mut NexmarkBenchmarkOpt) -> NEXMarkSource { +pub async fn create_nexmark_source(opt: &mut NexmarkBenchmarkOpt) -> Result { let window = match opt.query_number { 0..=4 | 6 | 9 | 10 | 13 => StreamWindow::ElementWise, 5 => StreamWindow::HoppingWindow((10, 5)), @@ -131,7 +147,28 @@ pub fn create_nexmark_source(opt: &mut NexmarkBenchmarkOpt) -> NEXMarkSource { if opt.query_number == 10 { opt.data_sink_type = "s3".to_string(); } - NEXMarkSource::new(opt.seconds, opt.generators, opt.events_per_second, window) + + if opt.query_number == 13 { + let data = reqwest::get(SIDE_INPUT_DOWNLOAD_URL) + .await + .map_err(|_| "Failed to download side input data")? + .text_with_charset("utf-8") + .await + .map_err(|_| "Failed to read side input data")?; + put_object_to_s3_if_missing( + FLOCK_S3_BUCKET.clone(), + NEXMARK_Q13_S3_SIDE_INPUT_KEY.clone(), + data.as_bytes().to_vec(), + ) + .await?; + } + + Ok(NEXMarkSource::new( + opt.seconds, + opt.generators, + opt.events_per_second, + window, + )) } pub async fn plan_placement( @@ -145,27 +182,12 @@ pub async fn plan_placement( 6 => (FLOCK_S3_BUCKET.clone(), NEXMARK_Q6_S3_KEY.clone()), _ => unreachable!(), }; - if let Some(0) = FLOCK_S3_CLIENT - .list_objects_v2(ListObjectsV2Request { - bucket: s3_bucket.clone(), - prefix: Some(s3_key.clone()), - max_keys: Some(1), - ..Default::default() - }) - .await - .map_err(|e| FlockError::Internal(e.to_string()))? - .key_count - { - FLOCK_S3_CLIENT - .put_object(PutObjectRequest { - bucket: s3_bucket.clone(), - key: s3_key.clone(), - body: Some(ByteStream::from(serde_json::to_vec(&physcial_plan)?)), - ..Default::default() - }) - .await - .map_err(|e| FlockError::Internal(e.to_string()))?; - } + put_object_to_s3_if_missing( + s3_bucket.clone(), + s3_key.clone(), + serde_json::to_vec(&physcial_plan)?, + ) + .await?; Ok((FLOCK_EMPTY_PLAN.clone(), Some((s3_bucket, s3_key)))) } _ => Ok((physcial_plan, None)), @@ -318,6 +340,48 @@ pub async fn create_physical_plans( Ok(plans) } +pub async fn add_extra_metadata( + opt: &NexmarkBenchmarkOpt, + metadata: &mut HashMap, +) -> Result<()> { + metadata.insert( + "invocation_type".to_string(), + if opt.async_type { + "async".to_string() + } else { + "sync".to_string() + }, + ); + + if opt.query_number == 12 { + metadata.insert( + "add_process_time_query".to_string(), + nexmark_query(opt.query_number)[0].clone(), + ); + } + + if opt.query_number == 11 || opt.query_number == 12 { + metadata.insert("session_key".to_string(), "bidder".to_string()); + metadata.insert("session_name".to_string(), "bid".to_string()); + } + + if opt.query_number == 13 { + metadata.insert( + "side_input_s3_key".to_string(), + NEXMARK_Q13_S3_SIDE_INPUT_KEY.clone(), + ); + metadata.insert("side_input_format".to_string(), "csv".to_string()); + + let side_input_schema = Arc::new(side_input_schema()); + metadata.insert( + "side_input_schema".to_string(), + base64::encode(schema_to_bytes(side_input_schema)), + ); + } + + Ok(()) +} + pub async fn nexmark_benchmark(opt: &mut NexmarkBenchmarkOpt) -> Result<()> { rainbow_println("================================================================"); rainbow_println(" Running the benchmark "); @@ -326,7 +390,7 @@ pub async fn nexmark_benchmark(opt: &mut NexmarkBenchmarkOpt) -> Result<()> { rainbow_println(format!("{:#?}\n", opt)); let query_number = opt.query_number; - let nexmark_conf = create_nexmark_source(opt); + let nexmark_conf = create_nexmark_source(opt).await?; let mut ctx = register_nexmark_tables().await?; let plans = create_physical_plans(&mut ctx, query_number).await?; @@ -343,26 +407,7 @@ pub async fn nexmark_benchmark(opt: &mut NexmarkBenchmarkOpt) -> Result<()> { // *delete* and **recreate** the source function every time we change the query. let mut metadata = HashMap::new(); metadata.insert("workers".to_string(), serde_json::to_string(&worker)?); - metadata.insert( - "invocation_type".to_string(), - if opt.async_type { - "async".to_string() - } else { - "sync".to_string() - }, - ); - - if query_number == 12 { - metadata.insert( - "add_process_time_query".to_string(), - nexmark_query(query_number)[0].clone(), - ); - } - - if query_number == 11 || query_number == 12 { - metadata.insert("session_key".to_string(), "bidder".to_string()); - metadata.insert("session_name".to_string(), "bid".to_string()); - } + add_extra_metadata(opt, &mut metadata).await?; let tasks = (0..opt.generators) .into_iter() @@ -432,6 +477,7 @@ pub fn nexmark_query(query_number: usize) -> Vec { 10 => vec![include_str!("query/q10.sql")], 11 => vec![include_str!("query/q11.sql")], 12 => include_str!("query/q12.sql").split(';').collect(), + 13 => vec![include_str!("query/q13.sql")], _ => unreachable!(), } .into_iter() @@ -453,7 +499,7 @@ mod tests { events_per_second: 1000, ..Default::default() }; - let conf = create_nexmark_source(&mut opt); + let conf = create_nexmark_source(&mut opt).await?; let (event, _) = Arc::new(conf.generate_data()?) .select(1, 0) .expect("Failed to select event."); @@ -470,6 +516,7 @@ mod tests { nexmark_query(10), nexmark_query(11), nexmark_query(12), + nexmark_query(13), ]; let ctx = register_nexmark_tables().await?; for sql in sqls { diff --git a/benchmarks/src/nexmark/query/q13.sql b/benchmarks/src/nexmark/query/q13.sql new file mode 100644 index 00000000..fbc18630 --- /dev/null +++ b/benchmarks/src/nexmark/query/q13.sql @@ -0,0 +1,8 @@ +SELECT auction, + bidder, + price, + b_date_time, + value +FROM bid + JOIN side_input + ON auction = key; diff --git a/benchmarks/src/s3/main.rs b/benchmarks/src/s3/main.rs index ddea1d27..938727af 100644 --- a/benchmarks/src/s3/main.rs +++ b/benchmarks/src/s3/main.rs @@ -49,7 +49,7 @@ async fn benchmark(opt: &mut NexmarkBenchmarkOpt) -> Result<()> { "Running the NEXMark benchmark [S3] with the following options: {:?}", opt ); - let nexmark_conf = create_nexmark_source(opt); + let nexmark_conf = create_nexmark_source(opt).await?; let query_number = opt.query_number; let mut ctx = register_nexmark_tables().await?; diff --git a/flock-cli/src/nexmark.rs b/flock-cli/src/nexmark.rs index e8eeebdc..d92d546b 100644 --- a/flock-cli/src/nexmark.rs +++ b/flock-cli/src/nexmark.rs @@ -50,8 +50,11 @@ fn run_args() -> App<'static> { Arg::new("query number") .short('q') .long("query") - .help("Sets the NEXMark benchmark query number [0-12]") + .help("Sets the NEXMark benchmark query number") .takes_value(true) + .possible_values(&[ + "0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", "13", + ]) .default_value("3"), ) .arg( diff --git a/flock-function/Cargo.toml b/flock-function/Cargo.toml index e53cfd7b..923a73db 100644 --- a/flock-function/Cargo.toml +++ b/flock-function/Cargo.toml @@ -13,6 +13,7 @@ snmalloc = [ "snmalloc-rs" ] [dependencies] arrow = { git = "https://github.com/flock-lab/arrow-rs", branch = "flock", features = [ "simd" ] } aws_lambda_events = "0.5" +base64 = "0.13.0" bytes = "1.1.0" chrono = "0.4.19" datafusion = { git = "https://github.com/flock-lab/arrow-datafusion", branch = "flock" } diff --git a/flock-function/src/aws/actor.rs b/flock-function/src/aws/actor.rs index e7d01270..63ced5a7 100644 --- a/flock-function/src/aws/actor.rs +++ b/flock-function/src/aws/actor.rs @@ -11,6 +11,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +use arrow::csv::reader::ReaderBuilder; use arrow::record_batch::RecordBatch; use datafusion::physical_plan::Partitioning::RoundRobinBatch; use flock::prelude::*; @@ -25,7 +26,7 @@ use rusoto_s3::{GetObjectRequest, S3}; use serde_json::json; use serde_json::Value; use std::collections::HashMap; -use std::io::Read; +use std::io::{Cursor, Read}; use std::sync::{Arc, Mutex}; lazy_static! { @@ -147,39 +148,47 @@ pub async fn handler( info!("Receiving a data packet: {:?}", event.uuid); let tid = event.uuid.tid.clone(); - let input_partitions = { - if let Some((bucket, key)) = infer_s3_mode(&event.metadata) { - info!("Reading payload from S3..."); - let payload = read_payload_from_s3(bucket, key).await?; - info!("[OK] Received payload from S3."); - - info!("Parsing payload to input partitions..."); - let (r1, r2, _) = payload.to_record_batch(); - info!("[OK] Parsed payload."); - - vec![vec![r1], vec![r2]] - } else if match &ctx.next { - CloudFunction::Sink(..) | CloudFunction::Lambda(..) => true, - CloudFunction::Group(..) => false, - } { - // ressemble data packets to a single window. - let (ready, uuid) = arena.reassemble(event); - if ready { - info!("Received all data packets for the window: {:?}", uuid.tid); - arena.batches(uuid.tid) - } else { - let response = "Window data collection has not been completed.".to_string(); - info!("{}", response); - return Ok(json!({ "response": response })); - } + let mut input = vec![]; + if let Ok(batch) = infer_side_input(&event.metadata).await { + input.push(vec![batch]); + } + + if let Some((bucket, key)) = infer_s3_mode(&event.metadata) { + info!("Reading payload from S3..."); + let payload = read_payload_from_s3(bucket, key).await?; + info!("[OK] Received payload from S3."); + + info!("Parsing payload to input partitions..."); + let (r1, r2, _) = payload.to_record_batch(); + info!("[OK] Parsed payload."); + + input.push(vec![r1]); + input.push(vec![r2]); + } else if match &ctx.next { + CloudFunction::Sink(..) | CloudFunction::Lambda(..) => true, + CloudFunction::Group(..) => false, + } { + // ressemble data packets to a single window. + let (ready, uuid) = arena.reassemble(event); + if ready { + info!("Received all data packets for the window: {:?}", uuid.tid); + arena + .batches(uuid.tid) + .into_iter() + .for_each(|b| input.push(b)); } else { - // data packet is an individual event for the current function. - let (r1, r2, _) = event.to_record_batch(); - vec![vec![r1], vec![r2]] + let response = "Window data collection has not been completed.".to_string(); + info!("{}", response); + return Ok(json!({ "response": response })); } - }; + } else { + // data packet is an individual event for the current function. + let (r1, r2, _) = event.to_record_batch(); + input.push(vec![r1]); + input.push(vec![r2]); + } - let output = collect(ctx, input_partitions).await?; + let output = collect(ctx, input).await?; match &ctx.next { CloudFunction::Sink(sink_type) => { @@ -271,6 +280,79 @@ pub fn infer_s3_mode(metadata: &Option>) -> Option<(Stri None } +pub async fn infer_side_input( + metadata: &Option>, +) -> Result> { + if let Some(metadata) = metadata { + if let Some(key) = metadata.get("side_input_s3_key") { + let body = FLOCK_S3_CLIENT + .get_object(GetObjectRequest { + bucket: FLOCK_S3_BUCKET.clone(), + key: key.parse::().unwrap(), + ..Default::default() + }) + .await + .map_err(|e| FlockError::AWS(e.to_string()))? + .body + .take() + .expect("body is empty"); + + let bytes: Vec = tokio::task::spawn_blocking(move || { + let mut buf = Vec::new(); + body.into_blocking_read().read_to_end(&mut buf).unwrap(); + buf + }) + .await + .expect("failed to load side input from S3"); + + let format = metadata + .get("side_input_format") + .expect("side_input_format is missing") + .as_str(); + + let schema = schema_from_bytes(&base64::decode( + metadata + .get("side_input_schema") + .expect("side_input_schema is missing") + .as_str(), + )?)?; + + let mut batches = vec![]; + match format { + "csv" => { + let mut batch_reader = ReaderBuilder::new() + .with_schema(schema) + .has_header(true) + .with_delimiter(b',') + .with_batch_size(1024) + .build(Cursor::new(bytes))?; + loop { + match batch_reader.next() { + Some(Ok(batch)) => { + batches.push(batch); + } + None => { + break; + } + Some(Err(e)) => { + return Err(FlockError::Execution(format!( + "Error reading batch from side input: {}", + e + ))); + } + } + } + } + _ => unimplemented!(), + } + return Ok(batches); + } + } + Err(FlockError::AWS( + "Side Input's S3 key is not specified".to_string(), + )) +} + /// Infer group keys for session windows (used in NEXMark Q11 and Q12). pub fn infer_session_keys(metadata: &Option>) -> Result<(String, String)> { if let Some(metadata) = metadata { diff --git a/flock-function/src/aws/window.rs b/flock-function/src/aws/window.rs index e0b4a0cb..2bbd67bd 100644 --- a/flock-function/src/aws/window.rs +++ b/flock-function/src/aws/window.rs @@ -852,8 +852,10 @@ pub async fn elementwise_tasks( stream: Arc, seconds: usize, ) -> Result<()> { - let (mut ring, group_name) = infer_actor_info(&payload.metadata)?; - let sync = infer_invocation_type(&payload.metadata)?; + let query_number = payload.query_number; + let metadata = payload.metadata; + let (mut ring, group_name) = infer_actor_info(&metadata)?; + let sync = infer_invocation_type(&metadata)?; let invocation_type = if sync { FLOCK_LAMBDA_SYNC_CALL.to_string() } else { @@ -868,15 +870,11 @@ pub async fn elementwise_tasks( let function_name = group_name.clone(); let uuid = UuidBuilder::new_with_ts(&function_name, Utc::now().timestamp(), 1).next_uuid(); - let payload = serde_json::to_vec(&events.select_event_to_payload( - epoch, - 0, - payload.query_number, - uuid, - sync, - )?)?; - info!("[OK] function payload bytes: {}", payload.len()); - invoke_lambda_function(function_name, Some(payload.into()), invocation_type.clone()) + let mut payload = events.select_event_to_payload(epoch, 0, query_number, uuid, sync)?; + payload.metadata = metadata.clone(); + let bytes = serde_json::to_vec(&payload)?; + info!("[OK] function payload bytes: {}", bytes.len()); + invoke_lambda_function(function_name, Some(bytes.into()), invocation_type.clone()) .await?; } else { // Calculate the total data packets to be sent. @@ -906,20 +904,20 @@ pub async fn elementwise_tasks( let empty = vec![]; for i in 0..size { - let payload = serde_json::to_vec(&to_payload( + let mut payload = to_payload( if i < a.len() { &a[i] } else { &empty }, if i < b.len() { &b[i] } else { &empty }, uuid_builder.next_uuid(), sync, - ))?; - info!( - "[OK] Event {} - function payload bytes: {}", - i, - payload.len() ); + payload.query_number = query_number; + payload.metadata = metadata.clone(); + + let bytes = serde_json::to_vec(&payload)?; + info!("[OK] Event {} - function payload bytes: {}", i, bytes.len()); invoke_lambda_function( function_name.clone(), - Some(payload.into()), + Some(bytes.into()), invocation_type.clone(), ) .await?; diff --git a/flock/Cargo.toml b/flock/Cargo.toml index 91ad1b2c..086a696d 100644 --- a/flock/Cargo.toml +++ b/flock/Cargo.toml @@ -69,7 +69,6 @@ zstd = "0.9.0+zstd.1.5.0" [dev-dependencies] cargo_toml = "0.10.1" - reqwest = "0.11.7" [lib] diff --git a/flock/src/config.toml b/flock/src/config.toml index 4b397d21..c98b9304 100644 --- a/flock/src/config.toml +++ b/flock/src/config.toml @@ -64,6 +64,7 @@ permissions = "0777" [nexmark] +q13_s3_side_input_key = "nexmark_q13_side_input" q4_s3_key = "nexmark_q4_plan" q6_s3_key = "nexmark_q6_plan" s3_key = "nexmark" diff --git a/flock/src/datasource/nexmark/event.rs b/flock/src/datasource/nexmark/event.rs index 5bfb52c3..d3d3d4be 100644 --- a/flock/src/datasource/nexmark/event.rs +++ b/flock/src/datasource/nexmark/event.rs @@ -371,6 +371,19 @@ impl Bid { } } +/// Returns the side input schema for the NEXMark benchmark (Q13). +pub fn side_input_schema() -> Schema { + let mut metadata = HashMap::new(); + metadata.insert("name".to_string(), "side_input".to_string()); + Schema::new_with_metadata( + vec![ + Field::new("key", DataType::Int32, false), + Field::new("value", DataType::Int32, false), + ], + metadata, + ) +} + #[cfg(test)] mod tests { use super::*; diff --git a/flock/src/datasource/nexmark/nexmark.rs b/flock/src/datasource/nexmark/nexmark.rs index 50948e65..fd36fcbe 100644 --- a/flock/src/datasource/nexmark/nexmark.rs +++ b/flock/src/datasource/nexmark/nexmark.rs @@ -178,7 +178,7 @@ impl DataStream for NEXMarkStream { *FLOCK_ASYNC_GRANULE_SIZE }; let (r1, r2) = match query_number.expect("Query number is not set.") { - 0 | 1 | 2 | 5 | 7 | 10 | 11 | 12 => ( + 0 | 1 | 2 | 5 | 7 | 10..=13 => ( event_bytes_to_batch(&event.bids, NEXMARK_BID.clone(), granule_size * 2), vec![], ), @@ -249,27 +249,30 @@ impl DataStream for NEXMarkStream { ); let batch_size = *FLOCK_SYNC_GRANULE_SIZE; - match query_number.expect("Query number is not set.") { - 0 | 1 | 2 | 5 | 7 | 10 | 11 | 12 => Ok(to_payload( + let mut payload = match query_number.expect("Query number is not set.") { + 0 | 1 | 2 | 5 | 7 | 10..=13 => to_payload( &event_bytes_to_batch(&event.bids, NEXMARK_BID.clone(), batch_size), &[], uuid, sync, - )), - 3 | 8 => Ok(to_payload( + ), + 3 | 8 => to_payload( &event_bytes_to_batch(&event.persons, NEXMARK_PERSON.clone(), batch_size), &event_bytes_to_batch(&event.auctions, NEXMARK_AUCTION.clone(), batch_size), uuid, sync, - )), - 4 | 6 | 9 => Ok(to_payload( + ), + 4 | 6 | 9 => to_payload( &event_bytes_to_batch(&event.auctions, NEXMARK_AUCTION.clone(), batch_size), &event_bytes_to_batch(&event.bids, NEXMARK_BID.clone(), batch_size), uuid, sync, - )), + ), _ => unimplemented!(), - } + }; + payload.query_number = query_number; + + Ok(payload) } } diff --git a/flock/src/datasource/nexmark/queries/q13.sql b/flock/src/datasource/nexmark/queries/q13.sql index c9f507a3..2949cddc 100644 --- a/flock/src/datasource/nexmark/queries/q13.sql +++ b/flock/src/datasource/nexmark/queries/q13.sql @@ -4,6 +4,9 @@ -- Joins a stream to a bounded side input, modeling basic stream enrichment. -- ------------------------------------------------------------------------------------------------- +-- https://nightlies.apache.org/flink/flink-docs-release-1.11/dev/table/streaming/temporal_tables.html +-- https://nightlies.apache.org/flink/flink-docs-release-1.11/dev/table/streaming/joins.html#join-with-a-temporal-table + CREATE TABLE side_input ( key BIGINT, `value` VARCHAR diff --git a/flock/src/services.rs b/flock/src/services.rs index f2bab803..3f28ecdc 100644 --- a/flock/src/services.rs +++ b/flock/src/services.rs @@ -22,7 +22,7 @@ use bytes::Bytes; use humantime::parse_duration; use lazy_static::lazy_static; use log::info; -use rusoto_core::{Region, RusotoError}; +use rusoto_core::{ByteStream, Region, RusotoError}; use rusoto_efs::{ CreateAccessPointError, CreateAccessPointRequest, CreateFileSystemError, CreateFileSystemRequest, CreateMountTargetError, CreateMountTargetRequest, CreationInfo, @@ -34,7 +34,7 @@ use rusoto_lambda::{ Lambda, LambdaClient, PutFunctionConcurrencyRequest, UpdateFunctionCodeRequest, }; use rusoto_logs::CloudWatchLogsClient; -use rusoto_s3::S3Client; +use rusoto_s3::{ListObjectsV2Request, PutObjectRequest, S3Client, S3}; use rusoto_sqs::SqsClient; use std::path::Path; use std::time::Duration; @@ -441,3 +441,35 @@ pub async fn create_mount_target(file_system_id: &str) -> Result { Err(e) => Err(FlockError::AWS(e.to_string())), } } + +/// Puts an object to AWS S3 if the object does not exist. If the object exists, +/// it isn't modified. +/// +/// # Arguments +/// * `bucket` - The name of the bucket to put the object in. +/// * `key` - The key of the object to put. +/// * `body` - The body of the object to put. +pub async fn put_object_to_s3_if_missing(bucket: String, key: String, body: Vec) -> Result<()> { + if let Some(0) = FLOCK_S3_CLIENT + .list_objects_v2(ListObjectsV2Request { + bucket: bucket.clone(), + prefix: Some(key.clone()), + max_keys: Some(1), + ..Default::default() + }) + .await + .map_err(|e| FlockError::Internal(e.to_string()))? + .key_count + { + FLOCK_S3_CLIENT + .put_object(PutObjectRequest { + bucket: bucket.clone(), + key: key.clone(), + body: Some(ByteStream::from(body)), + ..Default::default() + }) + .await + .map_err(|e| FlockError::AWS(e.to_string()))?; + } + Ok(()) +} diff --git a/flock_bench.sh b/flock_bench.sh index bd552c9e..3830f68c 100755 --- a/flock_bench.sh +++ b/flock_bench.sh @@ -86,7 +86,7 @@ Run() { echo $(echogreen "============================================================") echo $(echogreen " Running the benchmarks ") echo $(echogreen "============================================================") - echo + echo echo $(echored "[Error] If you want to run the benchmark, please use \"flock-cli [nexmark|ysb] run\".") echo echo $(echoblue "$ ./target/x86_64-unknown-linux-gnu/release/flock-cli nexmark run -h")