Skip to content

Commit

Permalink
add code of c-backed binary search, hash table, some results
Browse files Browse the repository at this point in the history
  • Loading branch information
lixi-zhou committed May 30, 2023
1 parent b38c99c commit 2efa5ab
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 14 deletions.
11 changes: 6 additions & 5 deletions DeepMapping/DeepMapping/deepmapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,8 @@ def measure_latency_any(df, data_ori, task_name, sample_size,
generate_file=True, memory_optimized=True, latency_optimized=True,
num_loop=10, num_query=5, search_algo='binary', path_to_model=None,
block_size=1024*1024):
# TODO add support of hash to run-time memory optimized
# TODO add support of binary_c to run-time memory optimized
# TODO add support of hash to run-time memory optimized strategy
# TODO add support of binary_c to run-time memory optimized strategy
"""Measure the end-end latency of data query
Args:
Expand Down Expand Up @@ -216,12 +216,13 @@ def measure_latency_any(df, data_ori, task_name, sample_size,
df_key = [df.columns[0]]
list_y_encoded = []
list_y_encoder = []

size_encoder = 0
for col in df.columns:
if col not in df_key:
encoded_val, encoder = encode_label(df[col])
list_y_encoded.append(encoded_val)
list_y_encoder.append(encoder)
size_encoder += encoder.classes_.nbytes
num_tasks = len(list_y_encoded)

for encoder in list_y_encoder:
Expand Down Expand Up @@ -327,7 +328,7 @@ def measure_latency_any(df, data_ori, task_name, sample_size,
ndb_utils.save_byte_to_disk(file_name, data_zstd_comp)

data_ori_size = data_ori.nbytes/1024/1024
data_comp_size = [comp_zstd_size, model.count_params()*4/1024/1024, sys.getsizeof(zstd.compress(exist_bit_arr.tobytes()))/1024/1024]
data_comp_size = [size_encoder, comp_zstd_size, model.count_params()*4/1024/1024, sys.getsizeof(zstd.compress(exist_bit_arr.tobytes()))/1024/1024]
print('Ori Size: {}, Curr Size: {}'.format(data_ori.nbytes/1024/1024, data_comp_size))
np.save(os.path.join(comp_data_dir, 'num_record_per_part'), num_record_per_part)
else:
Expand Down Expand Up @@ -489,7 +490,7 @@ def measure_latency_any(df, data_ori, task_name, sample_size,

# build hash table
if search_algo == 'hash':
data_hash = dict()
data_hash = dict()
for query_idx in range(num_query):
sample_index = list_sample_index[query_idx]
timer_total.tic()
Expand Down
80 changes: 78 additions & 2 deletions DeepMapping/DeepMapping/ndb_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import numpy as np
import pandas as pd
import time
import os
import shutil
import pandas as pd
from tqdm.auto import tqdm

def df_preprocess(df, benchmark = None):
Expand Down Expand Up @@ -43,7 +45,6 @@ def df_preprocess(df, benchmark = None):
data_ori = df.to_records(index=False)
return df, data_ori


def data_manipulation(df, ops='None'):
# default use 90% data
if ops == 'None':
Expand Down Expand Up @@ -85,6 +86,81 @@ def data_manipulation(df, ops='None'):
data_ori = df.to_records(index=False)
return df, data_ori

def process_df_for_synthetic_data(df):
for col in df.columns:
if df[col].dtypes == np.int64:
df[col] = df[col].astype(np.int32)
return df

def generate_synthetic_data(df_tpch_order, df_tpch_lineitem, df_tpcds_customer_demographics, size=100):
"""This function describes how we generate the synthetic data for our data manipulation experiments
that cover the four types of data: single column low(high) correlation, multiple columns low(high)
correlation. We use the three tables: TPC-H order table (for low correlation), lineitem table and
TPC-DS customer demographics table (for high correlation).
To generate the low correlation tables, you can generate the order and lineitem tables throughs
TPC-H dbgen with specified o -s scale_factor, like 100, 1000 to generate these two tables and send
to the script. The script will automatically scale it to the target size (in MB).
To generate the high correlation tables, you are required to generate the customer demographics table
through TPC-DS dsdgen. Since the customer demographics table does not scale with the given scale factor,
we used the following script to automatically scale it up to the target size by following its pattern.
Args:
size (int):
target size in MB.
"""
size = size*1024*1024
# generate single column low correlation
# single column low correlation is generated based on the TPC-H order table
df = df_tpch_order.copy()
df.reset_index(inplace=True)
df = df[['index', 'ORDERSTATUS']]
df = process_df_for_synthetic_data(df)
data_in_recarray = df.to_records(index=False)
df_single_column_low_correlation = df[:int(np.floor(size/data_in_recarray[0].nbytes))]

# generate multi column low correlation
# multi column low correlation is generated based on the TPC-H lineitem table
df = df_tpch_lineitem.copy()
df.reset_index(inplace=True)
df = df[['index', 'LINENUMBER', 'DISCOUNT', 'TAX', 'RETURNFLAG', 'LINESTATUS', 'SHIPINSTRUCT', 'SHIPMODE']]
df = process_df_for_synthetic_data(df)
data_in_recarray = df.to_records(index=False)
df_multi_column_low_correlation = df[:int(np.floor(size/data_in_recarray[0].nbytes))]

# generate single column high correlation
# single column high correlation is generated based on the TPC-DS customer demographics table
df = df_tpcds_customer_demographics[['CD_DEMO_SK', 'CD_EDUCATION_STATUS']].copy()
df = process_df_for_synthetic_data(df)
df_new = df.copy()
data_in_recarray = df.to_records(index=False)
data_size = data_in_recarray.nbytes
repeat_time = int(np.floor(size/data_size))
last_id = np.max(df.iloc[:, 0])
for append_copy_idx in range(1,repeat_time+1):
append_copy = df.copy()
cd_demo_sk_copy= np.arange(last_id + (append_copy_idx-1)*len(append_copy), last_id + append_copy_idx*len(append_copy))
append_copy.loc[:, 'CD_DEMO_SK'] = cd_demo_sk_copy
df_new = pd.concat((df_new, append_copy))
df_single_column_high_correlation = df_new[:int(np.floor(size//data_in_recarray[0].nbytes))]

# generate multiple columns high correlation
# multiple columns high correlation is generated based on the TPC-DS customer demographics table
df = df_tpcds_customer_demographics.copy()
df_new = df.copy()
data_in_recarray = df.to_records(index=False)
data_size = data_in_recarray.nbytes
repeat_time = int(np.floor(size/data_size))
last_id = np.max(df.iloc[:, 0])
for append_copy_idx in range(1,repeat_time+1):
append_copy = df[df['cd_dep_college_count'] == 6].copy()
cd_demo_sk_copy= np.arange(last_id + (append_copy_idx-1)*len(append_copy), last_id + append_copy_idx*len(append_copy))
append_copy.loc[:, 'cd_demo_sk'] = cd_demo_sk_copy
append_copy.loc[:, 'cd_dep_college_count'] += append_copy_idx
df_new = pd.concat((df_new, append_copy))
df_multi_column_high_correlation = df_new[:int(np.floor(size//data_in_recarray[0].nbytes))]

return df_single_column_low_correlation, df_single_column_high_correlation, df_multi_column_low_correlation, df_multi_column_high_correlation

def create_features(x, max_len=None):
if max_len is None:
max_len = len(str(np.max(x)))
Expand All @@ -103,8 +179,8 @@ def generate_query(x_start, x_end, num_query=5, sample_size=1000):
np.random.seed(query_idx)
try:
sample_index = np.random.choice(np.arange(x_start, x_end+1, dtype=np.int32), sample_size, replace=False).astype(np.int32)
raise Warning("Sample size to big, sample with replace instead")
except:
print("[WARN] Sample size to big, sample with replace instead")
sample_index = np.random.choice(np.arange(x_start, x_end+1, dtype=np.int32), sample_size, replace=True).astype(np.int32)
list_sample_index.append(sample_index)
return list_sample_index
Expand Down
6 changes: 3 additions & 3 deletions DeepMapping/DeepMapping/uncompress.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
def measure_latency(df, data_ori, task_name, sample_size,
generate_file=True, memory_optimized=True, latency_optimized=True,
num_loop=10, num_query=5, search_algo='binary', block_size=1024*1024):
# TODO use_has to memory-optimized strategy
# TODO add support of binary_c to run-time memory optimized
# TODO add support of hash to run-time memory optimized strategy
# TODO add support of binary_c to run-time memory optimized strategy
"""Measure the end-end latency of data query
Args:
Expand Down Expand Up @@ -88,6 +88,7 @@ def measure_latency(df, data_ori, task_name, sample_size,
data_idx = np.logical_and(x >= val_start, x < val_end)
data_part = data_ori[data_idx]
if search_algo == 'binary_c':
# FIXME temporary workaround to avoid the overhead of converting to contiguous array
dict_contigous_key[block_idx] = np.array(data_part[key], order='F').astype(np.int32)

if len(data_part) == 0:
Expand Down Expand Up @@ -227,7 +228,6 @@ def measure_latency(df, data_ori, task_name, sample_size,
num_decomp += 1
block_bytes_size = sys.getsizeof(block_bytes)

# TODO add size computation for hash approach
if search_algo == 'hash':
t_decomp += timer_decomp.toc()
timer_build_index.tic()
Expand Down
6 changes: 3 additions & 3 deletions DeepMapping/DeepMapping/zstd_compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
def measure_latency(df, data_ori, task_name, sample_size,
generate_file=True, memory_optimized=True, latency_optimized=True,
num_loop=10, num_query=5, search_algo='binary', block_size=1024*1024):
# TODO add support for memory-optimized strategy
# TODO add support of binary_c to run-time memory optimized
# TODO add support for run-time memory optimized strategy
# TODO add support of binary_c to run-time memory optimized strategy
"""Measure the end-end latency of data query
Args:
Expand Down Expand Up @@ -86,6 +86,7 @@ def measure_latency(df, data_ori, task_name, sample_size,
data_part = data_ori[data_idx]

if search_algo == 'binary_c':
# FIXME temporary workaround to avoid the overhead of converting to contiguous array
dict_contigous_key[block_idx] = np.array(data_part[key], order='F').astype(np.int32)

if len(data_part) == 0:
Expand Down Expand Up @@ -225,7 +226,6 @@ def measure_latency(df, data_ori, task_name, sample_size,
num_decomp += 1
block_bytes_size = sys.getsizeof(block_bytes)

# TODO add size computation for hash approach
if search_algo == 'hash':
t_decomp += timer_decomp.toc()
timer_build_index.tic()
Expand Down
21 changes: 20 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ Resources for SIGMOD 2024 Submission
- [Benchmark](#benchmark)
- [Task: Data Query](#task-data-query)
- [Task: Data Manipulation](#task-data-manipulation)
- [Supplement Material](#supplement-material)
- [Comparison of end-end latency for Running NN model in CPU/GPU](#comparison-of-end-end-latency-for-running-nn-model-in-cpugpu)

<!-- /TOC -->

Expand Down Expand Up @@ -70,4 +72,21 @@ Run `python run_benchmark_data_query.py` to benchmark. To benchmark with differe
### Task: Data Manipulation
These experiments measured overall storage overhead and end-end query latency for synthetic dataset with data manipulation, i.e. INSERT/UPDATE/DELETE. Run `python run_benchmark_data_manipulation.py` to benchmark it. To benchmark with different dataset, you should modify the file correspondingly by following the instructions provided in the python file.
These experiments measured overall storage overhead and end-end query latency for synthetic dataset with data manipulation, i.e. INSERT/UPDATE/DELETE. Run `python run_benchmark_data_manipulation.py` to benchmark it. To benchmark with different dataset, you should modify the file correspondingly by following the instructions provided in the python file.
## Supplement Material
### Comparison of end-end latency for running model in CPU/GPU
| Tables | Number of Query Data (B) | Run Model On | NN Inference Time | Total Time |
|:-----------------:|:------------------------:|:------------:|:-----------------:|:----------:|
| TPCH-S10/customer | 100000 | GPU | 110.84 | 5,454.34 |
| TPCH-S10/customer | 100000 | CPU | 663.30 | 6,058.63 |
| TPCH-S10/lineitem | 100000 | GPU | 294.86 | 10,511.73 |
| TPCH-S10/lineitem | 100000 | CPU | 2,909.44 | 12,791.68 |
| TPCH-S10/orders | 100000 | GPU | 81.38 | 2,351.74 |
| TPCH-S10/orders | 100000 | CPU | 403.41 | 2,707.00 |
| TPCH-S10/part | 100000 | GPU | 238.95 | 5,686.15 |
| TPCH-S10/part | 100000 | CPU | 310.98 | 5,804.16 |
| TPCH-S10/supplier | 100000 | GPU | 56.59 | 5,097.88 |
| TPCH-S10/supplier | 100000 | CPU | 75.77 | 5,153.22 |

0 comments on commit 2efa5ab

Please sign in to comment.