From b38c99c0e5b21177b1aab8da9c3b7279dda2cf56 Mon Sep 17 00:00:00 2001 From: lixizhou Date: Mon, 29 May 2023 11:54:51 -0700 Subject: [PATCH 1/2] add c implementation for binary search --- DeepMapping/DeepMapping/deepmapping.py | 60 +++++++++++++++---- DeepMapping/DeepMapping/dgpe_compression.py | 1 + DeepMapping/DeepMapping/ndb_utils.py | 18 +++++- DeepMapping/DeepMapping/uncompress.py | 66 +++++++++++++++++---- DeepMapping/DeepMapping/zstd_compression.py | 57 ++++++++++++++++-- 5 files changed, 175 insertions(+), 27 deletions(-) diff --git a/DeepMapping/DeepMapping/deepmapping.py b/DeepMapping/DeepMapping/deepmapping.py index 2be79d0..d2cc215 100644 --- a/DeepMapping/DeepMapping/deepmapping.py +++ b/DeepMapping/DeepMapping/deepmapping.py @@ -26,6 +26,9 @@ shared_utils.create_fetures.argtypes = [ND_POINTER_1, ND_POINTER_2, ctypes.c_long, ctypes.c_int] shared_utils.create_fetures_mutlt_thread_mgr.argtypes = [ND_POINTER_1, ND_POINTER_2, ctypes.c_long, ctypes.c_int32, ctypes.c_int32] +shared_utils.aux_look_up_bin.argtypes = [ND_POINTER_2, ctypes.c_int, ctypes.c_long] +shared_utils.aux_look_up_bin.restype = ctypes.c_long + def encode_label(arr): label_encoder = preprocessing.LabelEncoder().fit(arr) arr_encode = label_encoder.transform(arr) @@ -173,7 +176,10 @@ def compress_data(df, model_sturcture, batch_size=1024, num_epochs=500, train_ve def measure_latency_any(df, data_ori, task_name, sample_size, generate_file=True, memory_optimized=True, latency_optimized=True, - num_loop=10, num_query=5, search_algo='binary', path_to_model=None): + num_loop=10, num_query=5, search_algo='binary', path_to_model=None, + block_size=1024*1024): + # TODO add support of hash to run-time memory optimized + # TODO add support of binary_c to run-time memory optimized """Measure the end-end latency of data query Args: @@ -295,7 +301,8 @@ def measure_latency_any(df, data_ori, task_name, sample_size, if len(misclassified_data) == 0: misclassified_data = np.zeros((1,2)) record_size = misclassified_data[0].nbytes - block_size = 1024 * 1024 + # block_size = 1024 * 1024 + # block_size = 1024 * 512 num_record_per_part = np.floor(block_size / record_size) x_start = np.min(misclassified_data[:,0]) @@ -334,7 +341,6 @@ def measure_latency_any(df, data_ori, task_name, sample_size, shared_utils.create_fetures_mutlt_thread_mgr.argtypes = [ND_POINTER_1, ND_POINTER_2, ctypes.c_long, ctypes.c_int32, ctypes.c_int32] shared_utils.create_fetures_mutlt_thread_mgr.restype = ctypes.POINTER(ctypes.c_bool * (sample_size * max_len * 10)) list_sample_index = ndb_utils.generate_query(x_start, x_end, num_query=num_query, sample_size=sample_size) - # Measure latency for run-time memory optimzed strategy if memory_optimized: timer_creatfeatures = ndb_utils.Timer() @@ -461,6 +467,7 @@ def measure_latency_any(df, data_ori, task_name, sample_size, timer_exist_lookup = ndb_utils.Timer() timer_remap = ndb_utils.Timer() timer_sort = ndb_utils.Timer() + timer_build_index = ndb_utils.Timer() t_remap = 0 t_decomp = 0 t_createfeatures = 0 @@ -470,6 +477,7 @@ def measure_latency_any(df, data_ori, task_name, sample_size, t_total = 0 t_sort = 0 t_locate_part = 0 + t_build_index = 0 block_bytes_size = 0 timer_total.tic() for _ in tqdm(range(num_loop)): @@ -479,6 +487,9 @@ def measure_latency_any(df, data_ori, task_name, sample_size, peak_memory = 0 cache_block_memory = 0 + # build hash table + if search_algo == 'hash': + data_hash = dict() for query_idx in range(num_query): sample_index = list_sample_index[query_idx] timer_total.tic() @@ -494,6 +505,8 @@ def measure_latency_any(df, data_ori, task_name, sample_size, x_features_arr, sample_index, sample_size, max_len, num_threads) sampled_features = np.frombuffer( x_features_arr_ptr.contents, dtype=bool).reshape(sample_size, -1) + # sampled_features = ndb_utils.create_features(sample_index, max_len)[0] + t_createfeatures += timer_creatfeatures.toc() timer_nn.tic() y_nn_pred = model(sampled_features) @@ -506,7 +519,7 @@ def measure_latency_any(df, data_ori, task_name, sample_size, col_name = data_ori.dtype.names[i+1] result[col_name] = np.argmax(y_nn_pred[i], axis=1) t_nn += timer_nn.toc() - + for idx, val in enumerate(sample_index): # ------ non exist look up timer_exist_lookup.tic() @@ -536,17 +549,44 @@ def measure_latency_any(df, data_ori, task_name, sample_size, zstd.decompress(block_zstd_comp), dtype=np.int32).reshape(-1, num_tasks+1).copy(order='F') decomp_aux_block[part_idx] = data_uncomp num_decomp += 1 - cache_block_memory += data_uncomp.nbytes block_bytes_size = sys.getsizeof(block_zstd_comp) prev_part_idx = part_idx + + # TODO add size computation for hash approach + if search_algo == 'hash': + t_decomp += timer_decomp.toc() + timer_build_index.tic() + for block_data_idx in range(len(data_uncomp)): + data_entry_key = data_uncomp[block_data_idx, 0] + # print(data_entry_key) + data_entry_val = data_uncomp[block_data_idx] + data_hash[data_entry_key] = data_entry_val + cache_block_memory = sys.getsizeof(data_hash) + t_build_index += timer_build_index.toc() + timer_decomp.tic() + else: + cache_block_memory += data_uncomp.nbytes else: data_uncomp = decomp_aux_block[part_idx] t_decomp += timer_decomp.toc() timer_aux_lookup.tic() - data_idx = ndb_utils.binary_search(data_uncomp[:,0], query_key, len(data_uncomp)) - - if data_idx != -1: - result[query_key_index_in_old] = tuple(data_uncomp[data_idx]) + if search_algo == 'binary': + # TODO code can be optimized at revision stage + data_idx = ndb_utils.binary_search(data_uncomp[:,0], query_key, len(data_uncomp)) + if data_idx != -1: + result[query_key_index_in_old] = tuple(data_uncomp[data_idx]) + else: + count_nonexist += 1 + elif search_algo == 'binary_c': + data_idx = shared_utils.aux_look_up_bin(data_uncomp[:,0], query_key, len(data_uncomp)) + if data_idx != -1: + result[query_key_index_in_old] = tuple(data_uncomp[data_idx]) + else: + count_nonexist += 1 + elif search_algo == 'hash': + if query_key in data_hash.keys(): + result[query_key_index_in_old] = tuple(data_hash[query_key]) + t_aux_lookup += timer_aux_lookup.toc() if cache_block_memory + block_bytes_size > peak_memory: @@ -562,7 +602,7 @@ def measure_latency_any(df, data_ori, task_name, sample_size, peak_memory += exist_bit_arr.nbytes latency_optimized_result = result.copy() - latency_optimized_latency = np.array((data_ori_size, np.sum(data_comp_size), sample_size, 1, peak_memory/1024/1024, t_sort / num_loop, t_createfeatures / num_loop, t_nn / num_loop, t_locate_part / num_loop, t_decomp / num_loop, + latency_optimized_latency = np.array((data_ori_size, np.sum(data_comp_size), sample_size, 1, peak_memory/1024/1024, t_sort / num_loop, t_createfeatures / num_loop, t_nn / num_loop, t_locate_part / num_loop, t_decomp / num_loop, t_build_index / num_loop, t_aux_lookup / num_loop, t_exist_lookup / num_loop, t_remap / num_loop, t_total / num_loop, num_decomp, count_nonexist, exist_bit_arr.nbytes/1024/1024, model.count_params()*4/1024/1024)).T return_latency = None diff --git a/DeepMapping/DeepMapping/dgpe_compression.py b/DeepMapping/DeepMapping/dgpe_compression.py index b6c924f..6f25507 100644 --- a/DeepMapping/DeepMapping/dgpe_compression.py +++ b/DeepMapping/DeepMapping/dgpe_compression.py @@ -304,6 +304,7 @@ def measure_latency(df, data_ori, task_name, sample_size, t_sort += timer_sort.toc() result = np.recarray((sample_size,), dtype=data_ori.dtype) result_idx = 0 + current_memory = 0 for idx in range(sample_size): timer_locate_part.tic() diff --git a/DeepMapping/DeepMapping/ndb_utils.py b/DeepMapping/DeepMapping/ndb_utils.py index 89afad9..16a83f1 100644 --- a/DeepMapping/DeepMapping/ndb_utils.py +++ b/DeepMapping/DeepMapping/ndb_utils.py @@ -85,13 +85,27 @@ def data_manipulation(df, ops='None'): data_ori = df.to_records(index=False) return df, data_ori - +def create_features(x, max_len=None): + if max_len is None: + max_len = len(str(np.max(x))) + # one-hot encoding for each digit + x_features = np.zeros((len(x), max_len*10)) + for idx in range(len(x)): + digit_idx = max_len - 1 + for digit in str(x[idx])[::-1]: + x_features[idx, digit_idx*10 + int(digit)] = 1 + digit_idx -= 1 + return x_features, max_len def generate_query(x_start, x_end, num_query=5, sample_size=1000): list_sample_index = [] for query_idx in tqdm(range(num_query)): np.random.seed(query_idx) - sample_index = np.random.choice(np.arange(x_start, x_end+1, dtype=np.int32), sample_size, replace=False).astype(np.int32) + try: + sample_index = np.random.choice(np.arange(x_start, x_end+1, dtype=np.int32), sample_size, replace=False).astype(np.int32) + raise Warning("Sample size to big, sample with replace instead") + except: + sample_index = np.random.choice(np.arange(x_start, x_end+1, dtype=np.int32), sample_size, replace=True).astype(np.int32) list_sample_index.append(sample_index) return list_sample_index diff --git a/DeepMapping/DeepMapping/uncompress.py b/DeepMapping/DeepMapping/uncompress.py index c084618..d93092e 100644 --- a/DeepMapping/DeepMapping/uncompress.py +++ b/DeepMapping/DeepMapping/uncompress.py @@ -3,15 +3,28 @@ import sys import math import os +import ctypes from DeepMapping import ndb_utils -from DeepMapping.ndb_utils import Timer, recreate_temp_dir, save_byte_to_disk, read_bytes_from_disk -from more_itertools import run_length +# from DeepMapping.ndb_utils import Timer, recreate_temp_dir, save_byte_to_disk, read_bytes_from_disk +# from more_itertools import run_length from tqdm.auto import tqdm +ND_POINTER_1 = np.ctypeslib.ndpointer(dtype=np.bool_, + ndim=1, + flags="C") +ND_POINTER_2 = np.ctypeslib.ndpointer(dtype=np.int32, + ndim=1, + flags="C") + +shared_utils = ctypes.CDLL(os.path.abspath("shared_utils.so")) # Or full path to file +shared_utils.aux_look_up_bin.argtypes = [ND_POINTER_2, ctypes.c_int, ctypes.c_long] +shared_utils.aux_look_up_bin.restype = ctypes.c_long def measure_latency(df, data_ori, task_name, sample_size, generate_file=True, memory_optimized=True, latency_optimized=True, - num_loop=10, num_query=5, search_algo='binary'): + num_loop=10, num_query=5, search_algo='binary', block_size=1024*1024): + # TODO use_has to memory-optimized strategy + # TODO add support of binary_c to run-time memory optimized """Measure the end-end latency of data query Args: @@ -34,7 +47,7 @@ def measure_latency(df, data_ori, task_name, sample_size, num_query : int number of queries search_algo : str - search algorithm that applied to search entry in each partition + search algorithm that applied to search entry in each partition, available strategy: naive, binary, hash path_to_model : str load model from custom path """ @@ -46,7 +59,8 @@ def measure_latency(df, data_ori, task_name, sample_size, latency_optimized_result = None key = df.columns[0] - block_size = 1024 * 1024 + # block_size = 1024 * 1024 + # block_size = 1024 * 512 record_size = data_ori[0].nbytes num_record_per_part = np.floor(block_size / record_size) x = data_ori[key] @@ -60,6 +74,8 @@ def measure_latency(df, data_ori, task_name, sample_size, folder_name = 'uncompress' comp_data_dir = os.path.join(root_path, task_name, folder_name) print('[Generate File Path]: {}'.format(comp_data_dir)) + + dict_contigous_key = dict() # generate file if generate_file: @@ -71,6 +87,8 @@ def measure_latency(df, data_ori, task_name, sample_size, num_record_per_part, x_start + (block_idx+1)*num_record_per_part data_idx = np.logical_and(x >= val_start, x < val_end) data_part = data_ori[data_idx] + if search_algo == 'binary_c': + dict_contigous_key[block_idx] = np.array(data_part[key], order='F').astype(np.int32) if len(data_part) == 0: continue @@ -142,7 +160,7 @@ def measure_latency(df, data_ori, task_name, sample_size, data_idx = ndb_utils.binary_search(curr_decomp_block[key], query_key, len(curr_decomp_block)) elif search_algo == 'naive': data_idx = curr_decomp_block[key] == query_key - + if (search_algo == 'binary' and data_idx >= 0) or (search_algo == 'naive' and np.sum(data_idx) > 0): result[query_key_index_in_old] = curr_decomp_block[data_idx] else: @@ -163,12 +181,14 @@ def measure_latency(df, data_ori, task_name, sample_size, timer_lookup = ndb_utils.Timer() timer_total = ndb_utils.Timer() timer_sort = ndb_utils.Timer() + timer_build_index = ndb_utils.Timer() timer_locate_part = ndb_utils.Timer() t_decomp = 0 t_lookup = 0 t_sort = 0 t_total = 0 t_locate_part = 0 + t_build_index = 0 timer_total.tic() for _ in tqdm(range(num_loop)): decomp_block = dict() @@ -176,6 +196,11 @@ def measure_latency(df, data_ori, task_name, sample_size, num_decomp = 0 count_nonexist = 0 cache_block_memory = 0 + + # build hash table + if search_algo == 'hash': + data_hash = dict() + for query_idx in range(num_query): sample_index = list_sample_index[query_idx] timer_total.tic() @@ -190,8 +215,8 @@ def measure_latency(df, data_ori, task_name, sample_size, timer_locate_part.tic() query_key = sample_index_sorted[idx] query_key_index_in_old = sample_index_argsort[idx] - part_idx = int((query_key-x_start) // num_record_per_part) t_locate_part += timer_locate_part.toc() + part_idx = int((query_key-x_start) // num_record_per_part) timer_decomp.tic() if part_idx not in decomp_block: @@ -200,8 +225,22 @@ def measure_latency(df, data_ori, task_name, sample_size, curr_decomp_block = np.rec.array(block_bytes, dtype=data_ori.dtype) decomp_block[part_idx] = curr_decomp_block num_decomp += 1 - cache_block_memory += curr_decomp_block.nbytes block_bytes_size = sys.getsizeof(block_bytes) + + # TODO add size computation for hash approach + if search_algo == 'hash': + t_decomp += timer_decomp.toc() + timer_build_index.tic() + for block_data_idx in range(len(curr_decomp_block)): + data_entry_key = curr_decomp_block[key][block_data_idx] + # print(data_entry_key) + data_entry_val = curr_decomp_block[block_data_idx] + data_hash[data_entry_key] = data_entry_val + cache_block_memory = sys.getsizeof(data_hash) + t_build_index += timer_build_index.toc() + timer_decomp.tic() + else: + cache_block_memory += curr_decomp_block.nbytes else: curr_decomp_block = decomp_block[part_idx] @@ -210,11 +249,18 @@ def measure_latency(df, data_ori, task_name, sample_size, if search_algo == 'binary': data_idx = ndb_utils.binary_search(curr_decomp_block[key], query_key, len(curr_decomp_block)) + elif search_algo == 'binary_c': + arr_contigous_arr = dict_contigous_key[part_idx] + data_idx = shared_utils.aux_look_up_bin(arr_contigous_arr, query_key, len(curr_decomp_block)) elif search_algo == 'naive': data_idx = curr_decomp_block[key] == query_key + elif search_algo == 'hash': + data_idx = query_key in data_hash.keys() - if (search_algo == 'binary' and data_idx >= 0) or (search_algo == 'naive' and np.sum(data_idx) > 0): + if ((search_algo == 'binary' or search_algo =='binary_c') and data_idx >= 0) or (search_algo == 'naive' and np.sum(data_idx) > 0): result[query_key_index_in_old] = curr_decomp_block[data_idx] + elif search_algo == 'hash' and data_idx == True: + result[query_key_index_in_old] = data_hash[query_key] else: count_nonexist += 1 @@ -225,7 +271,7 @@ def measure_latency(df, data_ori, task_name, sample_size, t_total += timer_total.toc() latency_optimized_result = result.copy() latency_optimized_latency = np.array((data_ori_size, data_comp_size, sample_size, 1, peak_memory/1024/1024, t_sort / num_loop, - t_locate_part / num_loop, t_decomp / num_loop, + t_locate_part / num_loop, t_decomp / num_loop, t_build_index / num_loop, t_lookup / num_loop, t_total / num_loop, num_decomp, count_nonexist)).T return_latency = None diff --git a/DeepMapping/DeepMapping/zstd_compression.py b/DeepMapping/DeepMapping/zstd_compression.py index 9b4fcd9..9806fcd 100644 --- a/DeepMapping/DeepMapping/zstd_compression.py +++ b/DeepMapping/DeepMapping/zstd_compression.py @@ -3,12 +3,26 @@ import zstd import math import os +import ctypes from DeepMapping import ndb_utils from tqdm.auto import tqdm +ND_POINTER_1 = np.ctypeslib.ndpointer(dtype=np.bool_, + ndim=1, + flags="C") +ND_POINTER_2 = np.ctypeslib.ndpointer(dtype=np.int32, + ndim=1, + flags="C") + +shared_utils = ctypes.CDLL(os.path.abspath("shared_utils.so")) # Or full path to file +shared_utils.aux_look_up_bin.argtypes = [ND_POINTER_2, ctypes.c_int, ctypes.c_long] +shared_utils.aux_look_up_bin.restype = ctypes.c_long + def measure_latency(df, data_ori, task_name, sample_size, generate_file=True, memory_optimized=True, latency_optimized=True, - num_loop=10, num_query=5, search_algo='binary'): + num_loop=10, num_query=5, search_algo='binary', block_size=1024*1024): + # TODO add support for memory-optimized strategy + # TODO add support of binary_c to run-time memory optimized """Measure the end-end latency of data query Args: @@ -42,7 +56,8 @@ def measure_latency(df, data_ori, task_name, sample_size, memory_optimized_result = None latency_optimized_result = None key = df.columns[0] - block_size = 1024 * 1024 + # block_size = 1024 * 1024 + # block_size = 1024 * 512 record_size = data_ori[0].nbytes num_record_per_part = np.floor(block_size / record_size) x = data_ori[key] @@ -57,6 +72,8 @@ def measure_latency(df, data_ori, task_name, sample_size, comp_data_dir = os.path.join(root_path, task_name, folder_name) print('[Generate File Path]: {}'.format(comp_data_dir)) + dict_contigous_key = dict() + # generate file if generate_file: ndb_utils.recreate_temp_dir(comp_data_dir) @@ -68,6 +85,9 @@ def measure_latency(df, data_ori, task_name, sample_size, data_idx = np.logical_and(x >= val_start, x < val_end) data_part = data_ori[data_idx] + if search_algo == 'binary_c': + dict_contigous_key[block_idx] = np.array(data_part[key], order='F').astype(np.int32) + if len(data_part) == 0: continue data_bytes = data_part.tobytes() @@ -160,11 +180,13 @@ def measure_latency(df, data_ori, task_name, sample_size, timer_total = ndb_utils.Timer() timer_sort = ndb_utils.Timer() timer_locate_part = ndb_utils.Timer() + timer_build_index = ndb_utils.Timer() t_decomp = 0 t_lookup = 0 t_sort = 0 t_total = 0 t_locate_part = 0 + t_build_index = 0 timer_total.tic() for _ in tqdm(range(num_loop)): @@ -174,6 +196,10 @@ def measure_latency(df, data_ori, task_name, sample_size, count_nonexist = 0 cache_block_memory = 0 + # build hash table + if search_algo == 'hash': + data_hash = dict() + for query_idx in range(num_query): sample_index = list_sample_index[query_idx] timer_total.tic() @@ -197,8 +223,22 @@ def measure_latency(df, data_ori, task_name, sample_size, curr_decomp_block = np.rec.array(zstd.decompress(block_bytes), dtype=data_ori.dtype) decomp_block[part_idx] = curr_decomp_block num_decomp += 1 - cache_block_memory += curr_decomp_block.nbytes block_bytes_size = sys.getsizeof(block_bytes) + + # TODO add size computation for hash approach + if search_algo == 'hash': + t_decomp += timer_decomp.toc() + timer_build_index.tic() + for block_data_idx in range(len(curr_decomp_block)): + data_entry_key = curr_decomp_block[key][block_data_idx] + # print(data_entry_key) + data_entry_val = curr_decomp_block[block_data_idx] + data_hash[data_entry_key] = data_entry_val + cache_block_memory = sys.getsizeof(data_hash) + t_build_index += timer_build_index.toc() + timer_decomp.tic() + else: + cache_block_memory += curr_decomp_block.nbytes else: curr_decomp_block = decomp_block[part_idx] t_decomp += timer_decomp.toc() @@ -206,11 +246,18 @@ def measure_latency(df, data_ori, task_name, sample_size, if search_algo == 'binary': data_idx = ndb_utils.binary_search(curr_decomp_block[key], query_key, len(curr_decomp_block)) + elif search_algo == 'binary_c': + arr_contigous_arr = dict_contigous_key[part_idx] + data_idx = shared_utils.aux_look_up_bin(arr_contigous_arr, query_key, len(curr_decomp_block)) elif search_algo == 'naive': data_idx = curr_decomp_block[key] == query_key + elif search_algo == 'hash': + data_idx = query_key in data_hash.keys() - if (search_algo == 'binary' and data_idx >= 0) or (search_algo == 'naive' and np.sum(data_idx) > 0): + if ((search_algo == 'binary' or search_algo =='binary_c') and data_idx >= 0) or (search_algo == 'naive' and np.sum(data_idx) > 0): result[query_key_index_in_old] = curr_decomp_block[data_idx] + elif search_algo == 'hash' and data_idx == True: + result[query_key_index_in_old] = data_hash[query_key] else: count_nonexist += 1 t_lookup += timer_lookup.toc() @@ -221,7 +268,7 @@ def measure_latency(df, data_ori, task_name, sample_size, t_total += timer_total.toc() latency_optimized_result = result.copy() latency_optimized_latency = np.array((data_ori_size, data_comp_size, sample_size, 1, peak_memory/1024/1024, t_sort / num_loop, - t_locate_part / num_loop, t_decomp / num_loop, + t_locate_part / num_loop, t_decomp / num_loop, t_build_index / num_loop, t_lookup / num_loop, t_total / num_loop, num_decomp, count_nonexist)).T return_latency = None From 2efa5ab98eda2ba0525dd143fd5b18a1cafabd86 Mon Sep 17 00:00:00 2001 From: lixizhou Date: Tue, 30 May 2023 06:48:39 -0700 Subject: [PATCH 2/2] add code of c-backed binary search, hash table, some results --- DeepMapping/DeepMapping/deepmapping.py | 11 +-- DeepMapping/DeepMapping/ndb_utils.py | 80 ++++++++++++++++++++- DeepMapping/DeepMapping/uncompress.py | 6 +- DeepMapping/DeepMapping/zstd_compression.py | 6 +- README.md | 21 +++++- 5 files changed, 110 insertions(+), 14 deletions(-) diff --git a/DeepMapping/DeepMapping/deepmapping.py b/DeepMapping/DeepMapping/deepmapping.py index d2cc215..e7fa1c4 100644 --- a/DeepMapping/DeepMapping/deepmapping.py +++ b/DeepMapping/DeepMapping/deepmapping.py @@ -178,8 +178,8 @@ def measure_latency_any(df, data_ori, task_name, sample_size, generate_file=True, memory_optimized=True, latency_optimized=True, num_loop=10, num_query=5, search_algo='binary', path_to_model=None, block_size=1024*1024): - # TODO add support of hash to run-time memory optimized - # TODO add support of binary_c to run-time memory optimized + # TODO add support of hash to run-time memory optimized strategy + # TODO add support of binary_c to run-time memory optimized strategy """Measure the end-end latency of data query Args: @@ -216,12 +216,13 @@ def measure_latency_any(df, data_ori, task_name, sample_size, df_key = [df.columns[0]] list_y_encoded = [] list_y_encoder = [] - + size_encoder = 0 for col in df.columns: if col not in df_key: encoded_val, encoder = encode_label(df[col]) list_y_encoded.append(encoded_val) list_y_encoder.append(encoder) + size_encoder += encoder.classes_.nbytes num_tasks = len(list_y_encoded) for encoder in list_y_encoder: @@ -327,7 +328,7 @@ def measure_latency_any(df, data_ori, task_name, sample_size, ndb_utils.save_byte_to_disk(file_name, data_zstd_comp) data_ori_size = data_ori.nbytes/1024/1024 - data_comp_size = [comp_zstd_size, model.count_params()*4/1024/1024, sys.getsizeof(zstd.compress(exist_bit_arr.tobytes()))/1024/1024] + data_comp_size = [size_encoder, comp_zstd_size, model.count_params()*4/1024/1024, sys.getsizeof(zstd.compress(exist_bit_arr.tobytes()))/1024/1024] print('Ori Size: {}, Curr Size: {}'.format(data_ori.nbytes/1024/1024, data_comp_size)) np.save(os.path.join(comp_data_dir, 'num_record_per_part'), num_record_per_part) else: @@ -489,7 +490,7 @@ def measure_latency_any(df, data_ori, task_name, sample_size, # build hash table if search_algo == 'hash': - data_hash = dict() + data_hash = dict() for query_idx in range(num_query): sample_index = list_sample_index[query_idx] timer_total.tic() diff --git a/DeepMapping/DeepMapping/ndb_utils.py b/DeepMapping/DeepMapping/ndb_utils.py index 16a83f1..5f86b6a 100644 --- a/DeepMapping/DeepMapping/ndb_utils.py +++ b/DeepMapping/DeepMapping/ndb_utils.py @@ -1,7 +1,9 @@ import numpy as np +import pandas as pd import time import os import shutil +import pandas as pd from tqdm.auto import tqdm def df_preprocess(df, benchmark = None): @@ -43,7 +45,6 @@ def df_preprocess(df, benchmark = None): data_ori = df.to_records(index=False) return df, data_ori - def data_manipulation(df, ops='None'): # default use 90% data if ops == 'None': @@ -85,6 +86,81 @@ def data_manipulation(df, ops='None'): data_ori = df.to_records(index=False) return df, data_ori +def process_df_for_synthetic_data(df): + for col in df.columns: + if df[col].dtypes == np.int64: + df[col] = df[col].astype(np.int32) + return df + +def generate_synthetic_data(df_tpch_order, df_tpch_lineitem, df_tpcds_customer_demographics, size=100): + """This function describes how we generate the synthetic data for our data manipulation experiments + that cover the four types of data: single column low(high) correlation, multiple columns low(high) + correlation. We use the three tables: TPC-H order table (for low correlation), lineitem table and + TPC-DS customer demographics table (for high correlation). + To generate the low correlation tables, you can generate the order and lineitem tables throughs + TPC-H dbgen with specified o -s scale_factor, like 100, 1000 to generate these two tables and send + to the script. The script will automatically scale it to the target size (in MB). + To generate the high correlation tables, you are required to generate the customer demographics table + through TPC-DS dsdgen. Since the customer demographics table does not scale with the given scale factor, + we used the following script to automatically scale it up to the target size by following its pattern. + + Args: + size (int): + target size in MB. + """ + size = size*1024*1024 + # generate single column low correlation + # single column low correlation is generated based on the TPC-H order table + df = df_tpch_order.copy() + df.reset_index(inplace=True) + df = df[['index', 'ORDERSTATUS']] + df = process_df_for_synthetic_data(df) + data_in_recarray = df.to_records(index=False) + df_single_column_low_correlation = df[:int(np.floor(size/data_in_recarray[0].nbytes))] + + # generate multi column low correlation + # multi column low correlation is generated based on the TPC-H lineitem table + df = df_tpch_lineitem.copy() + df.reset_index(inplace=True) + df = df[['index', 'LINENUMBER', 'DISCOUNT', 'TAX', 'RETURNFLAG', 'LINESTATUS', 'SHIPINSTRUCT', 'SHIPMODE']] + df = process_df_for_synthetic_data(df) + data_in_recarray = df.to_records(index=False) + df_multi_column_low_correlation = df[:int(np.floor(size/data_in_recarray[0].nbytes))] + + # generate single column high correlation + # single column high correlation is generated based on the TPC-DS customer demographics table + df = df_tpcds_customer_demographics[['CD_DEMO_SK', 'CD_EDUCATION_STATUS']].copy() + df = process_df_for_synthetic_data(df) + df_new = df.copy() + data_in_recarray = df.to_records(index=False) + data_size = data_in_recarray.nbytes + repeat_time = int(np.floor(size/data_size)) + last_id = np.max(df.iloc[:, 0]) + for append_copy_idx in range(1,repeat_time+1): + append_copy = df.copy() + cd_demo_sk_copy= np.arange(last_id + (append_copy_idx-1)*len(append_copy), last_id + append_copy_idx*len(append_copy)) + append_copy.loc[:, 'CD_DEMO_SK'] = cd_demo_sk_copy + df_new = pd.concat((df_new, append_copy)) + df_single_column_high_correlation = df_new[:int(np.floor(size//data_in_recarray[0].nbytes))] + + # generate multiple columns high correlation + # multiple columns high correlation is generated based on the TPC-DS customer demographics table + df = df_tpcds_customer_demographics.copy() + df_new = df.copy() + data_in_recarray = df.to_records(index=False) + data_size = data_in_recarray.nbytes + repeat_time = int(np.floor(size/data_size)) + last_id = np.max(df.iloc[:, 0]) + for append_copy_idx in range(1,repeat_time+1): + append_copy = df[df['cd_dep_college_count'] == 6].copy() + cd_demo_sk_copy= np.arange(last_id + (append_copy_idx-1)*len(append_copy), last_id + append_copy_idx*len(append_copy)) + append_copy.loc[:, 'cd_demo_sk'] = cd_demo_sk_copy + append_copy.loc[:, 'cd_dep_college_count'] += append_copy_idx + df_new = pd.concat((df_new, append_copy)) + df_multi_column_high_correlation = df_new[:int(np.floor(size//data_in_recarray[0].nbytes))] + + return df_single_column_low_correlation, df_single_column_high_correlation, df_multi_column_low_correlation, df_multi_column_high_correlation + def create_features(x, max_len=None): if max_len is None: max_len = len(str(np.max(x))) @@ -103,8 +179,8 @@ def generate_query(x_start, x_end, num_query=5, sample_size=1000): np.random.seed(query_idx) try: sample_index = np.random.choice(np.arange(x_start, x_end+1, dtype=np.int32), sample_size, replace=False).astype(np.int32) - raise Warning("Sample size to big, sample with replace instead") except: + print("[WARN] Sample size to big, sample with replace instead") sample_index = np.random.choice(np.arange(x_start, x_end+1, dtype=np.int32), sample_size, replace=True).astype(np.int32) list_sample_index.append(sample_index) return list_sample_index diff --git a/DeepMapping/DeepMapping/uncompress.py b/DeepMapping/DeepMapping/uncompress.py index d93092e..e792fd0 100644 --- a/DeepMapping/DeepMapping/uncompress.py +++ b/DeepMapping/DeepMapping/uncompress.py @@ -23,8 +23,8 @@ def measure_latency(df, data_ori, task_name, sample_size, generate_file=True, memory_optimized=True, latency_optimized=True, num_loop=10, num_query=5, search_algo='binary', block_size=1024*1024): - # TODO use_has to memory-optimized strategy - # TODO add support of binary_c to run-time memory optimized + # TODO add support of hash to run-time memory optimized strategy + # TODO add support of binary_c to run-time memory optimized strategy """Measure the end-end latency of data query Args: @@ -88,6 +88,7 @@ def measure_latency(df, data_ori, task_name, sample_size, data_idx = np.logical_and(x >= val_start, x < val_end) data_part = data_ori[data_idx] if search_algo == 'binary_c': + # FIXME temporary workaround to avoid the overhead of converting to contiguous array dict_contigous_key[block_idx] = np.array(data_part[key], order='F').astype(np.int32) if len(data_part) == 0: @@ -227,7 +228,6 @@ def measure_latency(df, data_ori, task_name, sample_size, num_decomp += 1 block_bytes_size = sys.getsizeof(block_bytes) - # TODO add size computation for hash approach if search_algo == 'hash': t_decomp += timer_decomp.toc() timer_build_index.tic() diff --git a/DeepMapping/DeepMapping/zstd_compression.py b/DeepMapping/DeepMapping/zstd_compression.py index 9806fcd..117ea92 100644 --- a/DeepMapping/DeepMapping/zstd_compression.py +++ b/DeepMapping/DeepMapping/zstd_compression.py @@ -21,8 +21,8 @@ def measure_latency(df, data_ori, task_name, sample_size, generate_file=True, memory_optimized=True, latency_optimized=True, num_loop=10, num_query=5, search_algo='binary', block_size=1024*1024): - # TODO add support for memory-optimized strategy - # TODO add support of binary_c to run-time memory optimized + # TODO add support for run-time memory optimized strategy + # TODO add support of binary_c to run-time memory optimized strategy """Measure the end-end latency of data query Args: @@ -86,6 +86,7 @@ def measure_latency(df, data_ori, task_name, sample_size, data_part = data_ori[data_idx] if search_algo == 'binary_c': + # FIXME temporary workaround to avoid the overhead of converting to contiguous array dict_contigous_key[block_idx] = np.array(data_part[key], order='F').astype(np.int32) if len(data_part) == 0: @@ -225,7 +226,6 @@ def measure_latency(df, data_ori, task_name, sample_size, num_decomp += 1 block_bytes_size = sys.getsizeof(block_bytes) - # TODO add size computation for hash approach if search_algo == 'hash': t_decomp += timer_decomp.toc() timer_build_index.tic() diff --git a/README.md b/README.md index 65e3fb8..e8fdcb8 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,8 @@ Resources for SIGMOD 2024 Submission - [Benchmark](#benchmark) - [Task: Data Query](#task-data-query) - [Task: Data Manipulation](#task-data-manipulation) + - [Supplement Material](#supplement-material) + - [Comparison of end-end latency for Running NN model in CPU/GPU](#comparison-of-end-end-latency-for-running-nn-model-in-cpugpu) @@ -70,4 +72,21 @@ Run `python run_benchmark_data_query.py` to benchmark. To benchmark with differe ### Task: Data Manipulation -These experiments measured overall storage overhead and end-end query latency for synthetic dataset with data manipulation, i.e. INSERT/UPDATE/DELETE. Run `python run_benchmark_data_manipulation.py` to benchmark it. To benchmark with different dataset, you should modify the file correspondingly by following the instructions provided in the python file. \ No newline at end of file +These experiments measured overall storage overhead and end-end query latency for synthetic dataset with data manipulation, i.e. INSERT/UPDATE/DELETE. Run `python run_benchmark_data_manipulation.py` to benchmark it. To benchmark with different dataset, you should modify the file correspondingly by following the instructions provided in the python file. + +## Supplement Material + +### Comparison of end-end latency for running model in CPU/GPU + +| Tables | Number of Query Data (B) | Run Model On | NN Inference Time | Total Time | +|:-----------------:|:------------------------:|:------------:|:-----------------:|:----------:| +| TPCH-S10/customer | 100000 | GPU | 110.84 | 5,454.34 | +| TPCH-S10/customer | 100000 | CPU | 663.30 | 6,058.63 | +| TPCH-S10/lineitem | 100000 | GPU | 294.86 | 10,511.73 | +| TPCH-S10/lineitem | 100000 | CPU | 2,909.44 | 12,791.68 | +| TPCH-S10/orders | 100000 | GPU | 81.38 | 2,351.74 | +| TPCH-S10/orders | 100000 | CPU | 403.41 | 2,707.00 | +| TPCH-S10/part | 100000 | GPU | 238.95 | 5,686.15 | +| TPCH-S10/part | 100000 | CPU | 310.98 | 5,804.16 | +| TPCH-S10/supplier | 100000 | GPU | 56.59 | 5,097.88 | +| TPCH-S10/supplier | 100000 | CPU | 75.77 | 5,153.22 | \ No newline at end of file