From 2efa5ab98eda2ba0525dd143fd5b18a1cafabd86 Mon Sep 17 00:00:00 2001 From: lixizhou Date: Tue, 30 May 2023 06:48:39 -0700 Subject: [PATCH] add code of c-backed binary search, hash table, some results --- DeepMapping/DeepMapping/deepmapping.py | 11 +-- DeepMapping/DeepMapping/ndb_utils.py | 80 ++++++++++++++++++++- DeepMapping/DeepMapping/uncompress.py | 6 +- DeepMapping/DeepMapping/zstd_compression.py | 6 +- README.md | 21 +++++- 5 files changed, 110 insertions(+), 14 deletions(-) diff --git a/DeepMapping/DeepMapping/deepmapping.py b/DeepMapping/DeepMapping/deepmapping.py index d2cc215..e7fa1c4 100644 --- a/DeepMapping/DeepMapping/deepmapping.py +++ b/DeepMapping/DeepMapping/deepmapping.py @@ -178,8 +178,8 @@ def measure_latency_any(df, data_ori, task_name, sample_size, generate_file=True, memory_optimized=True, latency_optimized=True, num_loop=10, num_query=5, search_algo='binary', path_to_model=None, block_size=1024*1024): - # TODO add support of hash to run-time memory optimized - # TODO add support of binary_c to run-time memory optimized + # TODO add support of hash to run-time memory optimized strategy + # TODO add support of binary_c to run-time memory optimized strategy """Measure the end-end latency of data query Args: @@ -216,12 +216,13 @@ def measure_latency_any(df, data_ori, task_name, sample_size, df_key = [df.columns[0]] list_y_encoded = [] list_y_encoder = [] - + size_encoder = 0 for col in df.columns: if col not in df_key: encoded_val, encoder = encode_label(df[col]) list_y_encoded.append(encoded_val) list_y_encoder.append(encoder) + size_encoder += encoder.classes_.nbytes num_tasks = len(list_y_encoded) for encoder in list_y_encoder: @@ -327,7 +328,7 @@ def measure_latency_any(df, data_ori, task_name, sample_size, ndb_utils.save_byte_to_disk(file_name, data_zstd_comp) data_ori_size = data_ori.nbytes/1024/1024 - data_comp_size = [comp_zstd_size, model.count_params()*4/1024/1024, sys.getsizeof(zstd.compress(exist_bit_arr.tobytes()))/1024/1024] + data_comp_size = [size_encoder, comp_zstd_size, model.count_params()*4/1024/1024, sys.getsizeof(zstd.compress(exist_bit_arr.tobytes()))/1024/1024] print('Ori Size: {}, Curr Size: {}'.format(data_ori.nbytes/1024/1024, data_comp_size)) np.save(os.path.join(comp_data_dir, 'num_record_per_part'), num_record_per_part) else: @@ -489,7 +490,7 @@ def measure_latency_any(df, data_ori, task_name, sample_size, # build hash table if search_algo == 'hash': - data_hash = dict() + data_hash = dict() for query_idx in range(num_query): sample_index = list_sample_index[query_idx] timer_total.tic() diff --git a/DeepMapping/DeepMapping/ndb_utils.py b/DeepMapping/DeepMapping/ndb_utils.py index 16a83f1..5f86b6a 100644 --- a/DeepMapping/DeepMapping/ndb_utils.py +++ b/DeepMapping/DeepMapping/ndb_utils.py @@ -1,7 +1,9 @@ import numpy as np +import pandas as pd import time import os import shutil +import pandas as pd from tqdm.auto import tqdm def df_preprocess(df, benchmark = None): @@ -43,7 +45,6 @@ def df_preprocess(df, benchmark = None): data_ori = df.to_records(index=False) return df, data_ori - def data_manipulation(df, ops='None'): # default use 90% data if ops == 'None': @@ -85,6 +86,81 @@ def data_manipulation(df, ops='None'): data_ori = df.to_records(index=False) return df, data_ori +def process_df_for_synthetic_data(df): + for col in df.columns: + if df[col].dtypes == np.int64: + df[col] = df[col].astype(np.int32) + return df + +def generate_synthetic_data(df_tpch_order, df_tpch_lineitem, df_tpcds_customer_demographics, size=100): + """This function describes how we generate the synthetic data for our data manipulation experiments + that cover the four types of data: single column low(high) correlation, multiple columns low(high) + correlation. We use the three tables: TPC-H order table (for low correlation), lineitem table and + TPC-DS customer demographics table (for high correlation). + To generate the low correlation tables, you can generate the order and lineitem tables throughs + TPC-H dbgen with specified o -s scale_factor, like 100, 1000 to generate these two tables and send + to the script. The script will automatically scale it to the target size (in MB). + To generate the high correlation tables, you are required to generate the customer demographics table + through TPC-DS dsdgen. Since the customer demographics table does not scale with the given scale factor, + we used the following script to automatically scale it up to the target size by following its pattern. + + Args: + size (int): + target size in MB. + """ + size = size*1024*1024 + # generate single column low correlation + # single column low correlation is generated based on the TPC-H order table + df = df_tpch_order.copy() + df.reset_index(inplace=True) + df = df[['index', 'ORDERSTATUS']] + df = process_df_for_synthetic_data(df) + data_in_recarray = df.to_records(index=False) + df_single_column_low_correlation = df[:int(np.floor(size/data_in_recarray[0].nbytes))] + + # generate multi column low correlation + # multi column low correlation is generated based on the TPC-H lineitem table + df = df_tpch_lineitem.copy() + df.reset_index(inplace=True) + df = df[['index', 'LINENUMBER', 'DISCOUNT', 'TAX', 'RETURNFLAG', 'LINESTATUS', 'SHIPINSTRUCT', 'SHIPMODE']] + df = process_df_for_synthetic_data(df) + data_in_recarray = df.to_records(index=False) + df_multi_column_low_correlation = df[:int(np.floor(size/data_in_recarray[0].nbytes))] + + # generate single column high correlation + # single column high correlation is generated based on the TPC-DS customer demographics table + df = df_tpcds_customer_demographics[['CD_DEMO_SK', 'CD_EDUCATION_STATUS']].copy() + df = process_df_for_synthetic_data(df) + df_new = df.copy() + data_in_recarray = df.to_records(index=False) + data_size = data_in_recarray.nbytes + repeat_time = int(np.floor(size/data_size)) + last_id = np.max(df.iloc[:, 0]) + for append_copy_idx in range(1,repeat_time+1): + append_copy = df.copy() + cd_demo_sk_copy= np.arange(last_id + (append_copy_idx-1)*len(append_copy), last_id + append_copy_idx*len(append_copy)) + append_copy.loc[:, 'CD_DEMO_SK'] = cd_demo_sk_copy + df_new = pd.concat((df_new, append_copy)) + df_single_column_high_correlation = df_new[:int(np.floor(size//data_in_recarray[0].nbytes))] + + # generate multiple columns high correlation + # multiple columns high correlation is generated based on the TPC-DS customer demographics table + df = df_tpcds_customer_demographics.copy() + df_new = df.copy() + data_in_recarray = df.to_records(index=False) + data_size = data_in_recarray.nbytes + repeat_time = int(np.floor(size/data_size)) + last_id = np.max(df.iloc[:, 0]) + for append_copy_idx in range(1,repeat_time+1): + append_copy = df[df['cd_dep_college_count'] == 6].copy() + cd_demo_sk_copy= np.arange(last_id + (append_copy_idx-1)*len(append_copy), last_id + append_copy_idx*len(append_copy)) + append_copy.loc[:, 'cd_demo_sk'] = cd_demo_sk_copy + append_copy.loc[:, 'cd_dep_college_count'] += append_copy_idx + df_new = pd.concat((df_new, append_copy)) + df_multi_column_high_correlation = df_new[:int(np.floor(size//data_in_recarray[0].nbytes))] + + return df_single_column_low_correlation, df_single_column_high_correlation, df_multi_column_low_correlation, df_multi_column_high_correlation + def create_features(x, max_len=None): if max_len is None: max_len = len(str(np.max(x))) @@ -103,8 +179,8 @@ def generate_query(x_start, x_end, num_query=5, sample_size=1000): np.random.seed(query_idx) try: sample_index = np.random.choice(np.arange(x_start, x_end+1, dtype=np.int32), sample_size, replace=False).astype(np.int32) - raise Warning("Sample size to big, sample with replace instead") except: + print("[WARN] Sample size to big, sample with replace instead") sample_index = np.random.choice(np.arange(x_start, x_end+1, dtype=np.int32), sample_size, replace=True).astype(np.int32) list_sample_index.append(sample_index) return list_sample_index diff --git a/DeepMapping/DeepMapping/uncompress.py b/DeepMapping/DeepMapping/uncompress.py index d93092e..e792fd0 100644 --- a/DeepMapping/DeepMapping/uncompress.py +++ b/DeepMapping/DeepMapping/uncompress.py @@ -23,8 +23,8 @@ def measure_latency(df, data_ori, task_name, sample_size, generate_file=True, memory_optimized=True, latency_optimized=True, num_loop=10, num_query=5, search_algo='binary', block_size=1024*1024): - # TODO use_has to memory-optimized strategy - # TODO add support of binary_c to run-time memory optimized + # TODO add support of hash to run-time memory optimized strategy + # TODO add support of binary_c to run-time memory optimized strategy """Measure the end-end latency of data query Args: @@ -88,6 +88,7 @@ def measure_latency(df, data_ori, task_name, sample_size, data_idx = np.logical_and(x >= val_start, x < val_end) data_part = data_ori[data_idx] if search_algo == 'binary_c': + # FIXME temporary workaround to avoid the overhead of converting to contiguous array dict_contigous_key[block_idx] = np.array(data_part[key], order='F').astype(np.int32) if len(data_part) == 0: @@ -227,7 +228,6 @@ def measure_latency(df, data_ori, task_name, sample_size, num_decomp += 1 block_bytes_size = sys.getsizeof(block_bytes) - # TODO add size computation for hash approach if search_algo == 'hash': t_decomp += timer_decomp.toc() timer_build_index.tic() diff --git a/DeepMapping/DeepMapping/zstd_compression.py b/DeepMapping/DeepMapping/zstd_compression.py index 9806fcd..117ea92 100644 --- a/DeepMapping/DeepMapping/zstd_compression.py +++ b/DeepMapping/DeepMapping/zstd_compression.py @@ -21,8 +21,8 @@ def measure_latency(df, data_ori, task_name, sample_size, generate_file=True, memory_optimized=True, latency_optimized=True, num_loop=10, num_query=5, search_algo='binary', block_size=1024*1024): - # TODO add support for memory-optimized strategy - # TODO add support of binary_c to run-time memory optimized + # TODO add support for run-time memory optimized strategy + # TODO add support of binary_c to run-time memory optimized strategy """Measure the end-end latency of data query Args: @@ -86,6 +86,7 @@ def measure_latency(df, data_ori, task_name, sample_size, data_part = data_ori[data_idx] if search_algo == 'binary_c': + # FIXME temporary workaround to avoid the overhead of converting to contiguous array dict_contigous_key[block_idx] = np.array(data_part[key], order='F').astype(np.int32) if len(data_part) == 0: @@ -225,7 +226,6 @@ def measure_latency(df, data_ori, task_name, sample_size, num_decomp += 1 block_bytes_size = sys.getsizeof(block_bytes) - # TODO add size computation for hash approach if search_algo == 'hash': t_decomp += timer_decomp.toc() timer_build_index.tic() diff --git a/README.md b/README.md index 65e3fb8..e8fdcb8 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,8 @@ Resources for SIGMOD 2024 Submission - [Benchmark](#benchmark) - [Task: Data Query](#task-data-query) - [Task: Data Manipulation](#task-data-manipulation) + - [Supplement Material](#supplement-material) + - [Comparison of end-end latency for Running NN model in CPU/GPU](#comparison-of-end-end-latency-for-running-nn-model-in-cpugpu) @@ -70,4 +72,21 @@ Run `python run_benchmark_data_query.py` to benchmark. To benchmark with differe ### Task: Data Manipulation -These experiments measured overall storage overhead and end-end query latency for synthetic dataset with data manipulation, i.e. INSERT/UPDATE/DELETE. Run `python run_benchmark_data_manipulation.py` to benchmark it. To benchmark with different dataset, you should modify the file correspondingly by following the instructions provided in the python file. \ No newline at end of file +These experiments measured overall storage overhead and end-end query latency for synthetic dataset with data manipulation, i.e. INSERT/UPDATE/DELETE. Run `python run_benchmark_data_manipulation.py` to benchmark it. To benchmark with different dataset, you should modify the file correspondingly by following the instructions provided in the python file. + +## Supplement Material + +### Comparison of end-end latency for running model in CPU/GPU + +| Tables | Number of Query Data (B) | Run Model On | NN Inference Time | Total Time | +|:-----------------:|:------------------------:|:------------:|:-----------------:|:----------:| +| TPCH-S10/customer | 100000 | GPU | 110.84 | 5,454.34 | +| TPCH-S10/customer | 100000 | CPU | 663.30 | 6,058.63 | +| TPCH-S10/lineitem | 100000 | GPU | 294.86 | 10,511.73 | +| TPCH-S10/lineitem | 100000 | CPU | 2,909.44 | 12,791.68 | +| TPCH-S10/orders | 100000 | GPU | 81.38 | 2,351.74 | +| TPCH-S10/orders | 100000 | CPU | 403.41 | 2,707.00 | +| TPCH-S10/part | 100000 | GPU | 238.95 | 5,686.15 | +| TPCH-S10/part | 100000 | CPU | 310.98 | 5,804.16 | +| TPCH-S10/supplier | 100000 | GPU | 56.59 | 5,097.88 | +| TPCH-S10/supplier | 100000 | CPU | 75.77 | 5,153.22 | \ No newline at end of file