Skip to content

Commit

Permalink
Merge pull request #1 from asu-cactus/revision
Browse files Browse the repository at this point in the history
Revision
  • Loading branch information
lixi-zhou authored May 30, 2023
2 parents 3c71ac6 + 2efa5ab commit fa6c90e
Show file tree
Hide file tree
Showing 6 changed files with 274 additions and 30 deletions.
65 changes: 53 additions & 12 deletions DeepMapping/DeepMapping/deepmapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
shared_utils.create_fetures.argtypes = [ND_POINTER_1, ND_POINTER_2, ctypes.c_long, ctypes.c_int]
shared_utils.create_fetures_mutlt_thread_mgr.argtypes = [ND_POINTER_1, ND_POINTER_2, ctypes.c_long, ctypes.c_int32, ctypes.c_int32]

shared_utils.aux_look_up_bin.argtypes = [ND_POINTER_2, ctypes.c_int, ctypes.c_long]
shared_utils.aux_look_up_bin.restype = ctypes.c_long

def encode_label(arr):
label_encoder = preprocessing.LabelEncoder().fit(arr)
arr_encode = label_encoder.transform(arr)
Expand Down Expand Up @@ -173,7 +176,10 @@ def compress_data(df, model_sturcture, batch_size=1024, num_epochs=500, train_ve

def measure_latency_any(df, data_ori, task_name, sample_size,
generate_file=True, memory_optimized=True, latency_optimized=True,
num_loop=10, num_query=5, search_algo='binary', path_to_model=None):
num_loop=10, num_query=5, search_algo='binary', path_to_model=None,
block_size=1024*1024):
# TODO add support of hash to run-time memory optimized strategy
# TODO add support of binary_c to run-time memory optimized strategy
"""Measure the end-end latency of data query
Args:
Expand Down Expand Up @@ -210,12 +216,13 @@ def measure_latency_any(df, data_ori, task_name, sample_size,
df_key = [df.columns[0]]
list_y_encoded = []
list_y_encoder = []

size_encoder = 0
for col in df.columns:
if col not in df_key:
encoded_val, encoder = encode_label(df[col])
list_y_encoded.append(encoded_val)
list_y_encoder.append(encoder)
size_encoder += encoder.classes_.nbytes
num_tasks = len(list_y_encoded)

for encoder in list_y_encoder:
Expand Down Expand Up @@ -295,7 +302,8 @@ def measure_latency_any(df, data_ori, task_name, sample_size,
if len(misclassified_data) == 0:
misclassified_data = np.zeros((1,2))
record_size = misclassified_data[0].nbytes
block_size = 1024 * 1024
# block_size = 1024 * 1024
# block_size = 1024 * 512
num_record_per_part = np.floor(block_size / record_size)

x_start = np.min(misclassified_data[:,0])
Expand All @@ -320,7 +328,7 @@ def measure_latency_any(df, data_ori, task_name, sample_size,
ndb_utils.save_byte_to_disk(file_name, data_zstd_comp)

data_ori_size = data_ori.nbytes/1024/1024
data_comp_size = [comp_zstd_size, model.count_params()*4/1024/1024, sys.getsizeof(zstd.compress(exist_bit_arr.tobytes()))/1024/1024]
data_comp_size = [size_encoder, comp_zstd_size, model.count_params()*4/1024/1024, sys.getsizeof(zstd.compress(exist_bit_arr.tobytes()))/1024/1024]
print('Ori Size: {}, Curr Size: {}'.format(data_ori.nbytes/1024/1024, data_comp_size))
np.save(os.path.join(comp_data_dir, 'num_record_per_part'), num_record_per_part)
else:
Expand All @@ -334,7 +342,6 @@ def measure_latency_any(df, data_ori, task_name, sample_size,
shared_utils.create_fetures_mutlt_thread_mgr.argtypes = [ND_POINTER_1, ND_POINTER_2, ctypes.c_long, ctypes.c_int32, ctypes.c_int32]
shared_utils.create_fetures_mutlt_thread_mgr.restype = ctypes.POINTER(ctypes.c_bool * (sample_size * max_len * 10))
list_sample_index = ndb_utils.generate_query(x_start, x_end, num_query=num_query, sample_size=sample_size)

# Measure latency for run-time memory optimzed strategy
if memory_optimized:
timer_creatfeatures = ndb_utils.Timer()
Expand Down Expand Up @@ -461,6 +468,7 @@ def measure_latency_any(df, data_ori, task_name, sample_size,
timer_exist_lookup = ndb_utils.Timer()
timer_remap = ndb_utils.Timer()
timer_sort = ndb_utils.Timer()
timer_build_index = ndb_utils.Timer()
t_remap = 0
t_decomp = 0
t_createfeatures = 0
Expand All @@ -470,6 +478,7 @@ def measure_latency_any(df, data_ori, task_name, sample_size,
t_total = 0
t_sort = 0
t_locate_part = 0
t_build_index = 0
block_bytes_size = 0
timer_total.tic()
for _ in tqdm(range(num_loop)):
Expand All @@ -479,6 +488,9 @@ def measure_latency_any(df, data_ori, task_name, sample_size,
peak_memory = 0
cache_block_memory = 0

# build hash table
if search_algo == 'hash':
data_hash = dict()
for query_idx in range(num_query):
sample_index = list_sample_index[query_idx]
timer_total.tic()
Expand All @@ -494,6 +506,8 @@ def measure_latency_any(df, data_ori, task_name, sample_size,
x_features_arr, sample_index, sample_size, max_len, num_threads)
sampled_features = np.frombuffer(
x_features_arr_ptr.contents, dtype=bool).reshape(sample_size, -1)
# sampled_features = ndb_utils.create_features(sample_index, max_len)[0]

t_createfeatures += timer_creatfeatures.toc()
timer_nn.tic()
y_nn_pred = model(sampled_features)
Expand All @@ -506,7 +520,7 @@ def measure_latency_any(df, data_ori, task_name, sample_size,
col_name = data_ori.dtype.names[i+1]
result[col_name] = np.argmax(y_nn_pred[i], axis=1)
t_nn += timer_nn.toc()

for idx, val in enumerate(sample_index):
# ------ non exist look up
timer_exist_lookup.tic()
Expand Down Expand Up @@ -536,17 +550,44 @@ def measure_latency_any(df, data_ori, task_name, sample_size,
zstd.decompress(block_zstd_comp), dtype=np.int32).reshape(-1, num_tasks+1).copy(order='F')
decomp_aux_block[part_idx] = data_uncomp
num_decomp += 1
cache_block_memory += data_uncomp.nbytes
block_bytes_size = sys.getsizeof(block_zstd_comp)
prev_part_idx = part_idx

# TODO add size computation for hash approach
if search_algo == 'hash':
t_decomp += timer_decomp.toc()
timer_build_index.tic()
for block_data_idx in range(len(data_uncomp)):
data_entry_key = data_uncomp[block_data_idx, 0]
# print(data_entry_key)
data_entry_val = data_uncomp[block_data_idx]
data_hash[data_entry_key] = data_entry_val
cache_block_memory = sys.getsizeof(data_hash)
t_build_index += timer_build_index.toc()
timer_decomp.tic()
else:
cache_block_memory += data_uncomp.nbytes
else:
data_uncomp = decomp_aux_block[part_idx]
t_decomp += timer_decomp.toc()
timer_aux_lookup.tic()
data_idx = ndb_utils.binary_search(data_uncomp[:,0], query_key, len(data_uncomp))

if data_idx != -1:
result[query_key_index_in_old] = tuple(data_uncomp[data_idx])
if search_algo == 'binary':
# TODO code can be optimized at revision stage
data_idx = ndb_utils.binary_search(data_uncomp[:,0], query_key, len(data_uncomp))
if data_idx != -1:
result[query_key_index_in_old] = tuple(data_uncomp[data_idx])
else:
count_nonexist += 1
elif search_algo == 'binary_c':
data_idx = shared_utils.aux_look_up_bin(data_uncomp[:,0], query_key, len(data_uncomp))
if data_idx != -1:
result[query_key_index_in_old] = tuple(data_uncomp[data_idx])
else:
count_nonexist += 1
elif search_algo == 'hash':
if query_key in data_hash.keys():
result[query_key_index_in_old] = tuple(data_hash[query_key])

t_aux_lookup += timer_aux_lookup.toc()

if cache_block_memory + block_bytes_size > peak_memory:
Expand All @@ -562,7 +603,7 @@ def measure_latency_any(df, data_ori, task_name, sample_size,

peak_memory += exist_bit_arr.nbytes
latency_optimized_result = result.copy()
latency_optimized_latency = np.array((data_ori_size, np.sum(data_comp_size), sample_size, 1, peak_memory/1024/1024, t_sort / num_loop, t_createfeatures / num_loop, t_nn / num_loop, t_locate_part / num_loop, t_decomp / num_loop,
latency_optimized_latency = np.array((data_ori_size, np.sum(data_comp_size), sample_size, 1, peak_memory/1024/1024, t_sort / num_loop, t_createfeatures / num_loop, t_nn / num_loop, t_locate_part / num_loop, t_decomp / num_loop, t_build_index / num_loop,
t_aux_lookup / num_loop, t_exist_lookup / num_loop, t_remap / num_loop, t_total / num_loop, num_decomp, count_nonexist, exist_bit_arr.nbytes/1024/1024, model.count_params()*4/1024/1024)).T

return_latency = None
Expand Down
1 change: 1 addition & 0 deletions DeepMapping/DeepMapping/dgpe_compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ def measure_latency(df, data_ori, task_name, sample_size,
t_sort += timer_sort.toc()
result = np.recarray((sample_size,), dtype=data_ori.dtype)
result_idx = 0
current_memory = 0

for idx in range(sample_size):
timer_locate_part.tic()
Expand Down
94 changes: 92 additions & 2 deletions DeepMapping/DeepMapping/ndb_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import numpy as np
import pandas as pd
import time
import os
import shutil
import pandas as pd
from tqdm.auto import tqdm

def df_preprocess(df, benchmark = None):
Expand Down Expand Up @@ -43,7 +45,6 @@ def df_preprocess(df, benchmark = None):
data_ori = df.to_records(index=False)
return df, data_ori


def data_manipulation(df, ops='None'):
# default use 90% data
if ops == 'None':
Expand Down Expand Up @@ -85,13 +86,102 @@ def data_manipulation(df, ops='None'):
data_ori = df.to_records(index=False)
return df, data_ori

def process_df_for_synthetic_data(df):
for col in df.columns:
if df[col].dtypes == np.int64:
df[col] = df[col].astype(np.int32)
return df

def generate_synthetic_data(df_tpch_order, df_tpch_lineitem, df_tpcds_customer_demographics, size=100):
"""This function describes how we generate the synthetic data for our data manipulation experiments
that cover the four types of data: single column low(high) correlation, multiple columns low(high)
correlation. We use the three tables: TPC-H order table (for low correlation), lineitem table and
TPC-DS customer demographics table (for high correlation).
To generate the low correlation tables, you can generate the order and lineitem tables throughs
TPC-H dbgen with specified o -s scale_factor, like 100, 1000 to generate these two tables and send
to the script. The script will automatically scale it to the target size (in MB).
To generate the high correlation tables, you are required to generate the customer demographics table
through TPC-DS dsdgen. Since the customer demographics table does not scale with the given scale factor,
we used the following script to automatically scale it up to the target size by following its pattern.
Args:
size (int):
target size in MB.
"""
size = size*1024*1024
# generate single column low correlation
# single column low correlation is generated based on the TPC-H order table
df = df_tpch_order.copy()
df.reset_index(inplace=True)
df = df[['index', 'ORDERSTATUS']]
df = process_df_for_synthetic_data(df)
data_in_recarray = df.to_records(index=False)
df_single_column_low_correlation = df[:int(np.floor(size/data_in_recarray[0].nbytes))]

# generate multi column low correlation
# multi column low correlation is generated based on the TPC-H lineitem table
df = df_tpch_lineitem.copy()
df.reset_index(inplace=True)
df = df[['index', 'LINENUMBER', 'DISCOUNT', 'TAX', 'RETURNFLAG', 'LINESTATUS', 'SHIPINSTRUCT', 'SHIPMODE']]
df = process_df_for_synthetic_data(df)
data_in_recarray = df.to_records(index=False)
df_multi_column_low_correlation = df[:int(np.floor(size/data_in_recarray[0].nbytes))]

# generate single column high correlation
# single column high correlation is generated based on the TPC-DS customer demographics table
df = df_tpcds_customer_demographics[['CD_DEMO_SK', 'CD_EDUCATION_STATUS']].copy()
df = process_df_for_synthetic_data(df)
df_new = df.copy()
data_in_recarray = df.to_records(index=False)
data_size = data_in_recarray.nbytes
repeat_time = int(np.floor(size/data_size))
last_id = np.max(df.iloc[:, 0])
for append_copy_idx in range(1,repeat_time+1):
append_copy = df.copy()
cd_demo_sk_copy= np.arange(last_id + (append_copy_idx-1)*len(append_copy), last_id + append_copy_idx*len(append_copy))
append_copy.loc[:, 'CD_DEMO_SK'] = cd_demo_sk_copy
df_new = pd.concat((df_new, append_copy))
df_single_column_high_correlation = df_new[:int(np.floor(size//data_in_recarray[0].nbytes))]

# generate multiple columns high correlation
# multiple columns high correlation is generated based on the TPC-DS customer demographics table
df = df_tpcds_customer_demographics.copy()
df_new = df.copy()
data_in_recarray = df.to_records(index=False)
data_size = data_in_recarray.nbytes
repeat_time = int(np.floor(size/data_size))
last_id = np.max(df.iloc[:, 0])
for append_copy_idx in range(1,repeat_time+1):
append_copy = df[df['cd_dep_college_count'] == 6].copy()
cd_demo_sk_copy= np.arange(last_id + (append_copy_idx-1)*len(append_copy), last_id + append_copy_idx*len(append_copy))
append_copy.loc[:, 'cd_demo_sk'] = cd_demo_sk_copy
append_copy.loc[:, 'cd_dep_college_count'] += append_copy_idx
df_new = pd.concat((df_new, append_copy))
df_multi_column_high_correlation = df_new[:int(np.floor(size//data_in_recarray[0].nbytes))]

return df_single_column_low_correlation, df_single_column_high_correlation, df_multi_column_low_correlation, df_multi_column_high_correlation

def create_features(x, max_len=None):
if max_len is None:
max_len = len(str(np.max(x)))
# one-hot encoding for each digit
x_features = np.zeros((len(x), max_len*10))
for idx in range(len(x)):
digit_idx = max_len - 1
for digit in str(x[idx])[::-1]:
x_features[idx, digit_idx*10 + int(digit)] = 1
digit_idx -= 1
return x_features, max_len

def generate_query(x_start, x_end, num_query=5, sample_size=1000):
list_sample_index = []
for query_idx in tqdm(range(num_query)):
np.random.seed(query_idx)
sample_index = np.random.choice(np.arange(x_start, x_end+1, dtype=np.int32), sample_size, replace=False).astype(np.int32)
try:
sample_index = np.random.choice(np.arange(x_start, x_end+1, dtype=np.int32), sample_size, replace=False).astype(np.int32)
except:
print("[WARN] Sample size to big, sample with replace instead")
sample_index = np.random.choice(np.arange(x_start, x_end+1, dtype=np.int32), sample_size, replace=True).astype(np.int32)
list_sample_index.append(sample_index)
return list_sample_index

Expand Down
Loading

0 comments on commit fa6c90e

Please sign in to comment.