diff --git a/dask_cuda/benchmarks/custom/__init__.py b/dask_cuda/benchmarks/custom/__init__.py new file mode 100644 index 00000000..cfd5f0de --- /dev/null +++ b/dask_cuda/benchmarks/custom/__init__.py @@ -0,0 +1 @@ +from .parquet import read_parquet as custom_read_parquet diff --git a/dask_cuda/benchmarks/custom/parquet.py b/dask_cuda/benchmarks/custom/parquet.py new file mode 100644 index 00000000..fa5075bc --- /dev/null +++ b/dask_cuda/benchmarks/custom/parquet.py @@ -0,0 +1,153 @@ +import math +import os +from concurrent.futures import ThreadPoolExecutor + +import numpy as np +import pyarrow as pa +import pyarrow.parquet as pq +from pyarrow import dataset + +import dask +import dask.dataframe as dd +from dask.base import apply, tokenize +from dask.distributed import get_worker +from dask.utils import parse_bytes + +# NOTE: The pyarrow component of this code was mostly copied +# from dask-expr (dask_expr/io/parquet.py) + + +_CPU_COUNT_SET = False + + +def _maybe_adjust_cpu_count(): + global _CPU_COUNT_SET + if not _CPU_COUNT_SET: + # Set the number of threads to the number of cores + # This is a default for pyarrow, but it's not set by default in + # dask/distributed + pa.set_cpu_count(os.cpu_count()) + _CPU_COUNT_SET = True + + +def fragment_to_table(fragment, filters=None, columns=None, schema=None): + _maybe_adjust_cpu_count() + + if isinstance(filters, list): + filters = pq.filters_to_expression(filters) + + return fragment.to_table( + schema=schema, + columns=columns, + filter=filters, + # Batch size determines how many rows are read at once and will + # cause the underlying array to be split into chunks of this size + # (max). We'd like to avoid fragmentation as much as possible and + # and to set this to something like inf but we have to set a finite, + # positive number. + # In the presence of row groups, the underlying array will still be + # chunked per rowgroup + batch_size=10_000_000, + fragment_scan_options=pa.dataset.ParquetFragmentScanOptions( + pre_buffer=True, + cache_options=pa.CacheOptions( + hole_size_limit=parse_bytes("4 MiB"), + range_size_limit=parse_bytes("32.00 MiB"), + ), + ), + use_threads=True, + ) + + +def tables_to_frame(tables): + import cudf + + return cudf.DataFrame.from_arrow( + pa.concat_tables(tables) if len(tables) > 1 else tables[0] + ) + + +def read_parquet_fragments(fragments, columns=None, filters=None): + + kwargs = {"columns": columns, "filters": filters} + if not isinstance(fragments, list): + fragments = [fragments] + + if len(fragments) > 1: + # Read multiple fragments + token = tokenize(fragments, columns, filters) + chunk_name = f"read-chunk-{token}" + dsk = { + (chunk_name, i): (apply, fragment_to_table, [fragment], kwargs) + for i, fragment in enumerate(fragments) + } + dsk[chunk_name] = (tables_to_frame, list(dsk.keys())) + + try: + worker = get_worker() + except ValueError: + return dask.threaded.get(dsk, chunk_name) + + if not hasattr(worker, "_rapids_executor"): + num_threads = len(os.sched_getaffinity(0)) + worker._rapids_executor = ThreadPoolExecutor(num_threads) + with dask.config.set(pool=worker._rapids_executor): + return dask.threaded.get(dsk, chunk_name) + + else: + # Read single fragment + return tables_to_frame([fragment_to_table(fragments[0], **kwargs)]) + + +def mean_file_size(fragments, n=10): + n_frags = len(fragments) + if n < n_frags: + indices = np.random.choice(np.arange(n_frags), size=n, replace=False) + else: + indices = np.arange(n_frags) + + sizes = [] + for f in indices: + size = 0 + frag = fragments[f] + for row_group in frag.row_groups: + size += row_group.total_byte_size + sizes.append(size) + + return np.mean(sizes) + + +def aggregate_fragments(fragments, blocksize): + size = mean_file_size(fragments) + blocksize = parse_bytes(blocksize) + stride = int(math.floor(blocksize / size)) + + if stride < 1: + pass # Not implemented yet! + + stride = max(stride, 1) + return [fragments[i : i + stride] for i in range(0, len(fragments), stride)] + + +def read_parquet(urlpath, columns=None, filters=None, blocksize="256MB", **kwargs): + + # Use pyarrow dataset API to get fragments and meta + ds = dataset.dataset(urlpath, format="parquet") + meta = tables_to_frame([ds.schema.empty_table()]) + if columns is not None: + meta = meta[columns] + fragments = list(ds.get_fragments()) + + # Aggregate fragments together if necessary + if blocksize: + fragments = aggregate_fragments(fragments, blocksize) + + # Construct collection + return dd.from_map( + read_parquet_fragments, + fragments, + columns=columns, + filters=filters, + meta=meta, + enforce_metadata=False, + ) diff --git a/dask_cuda/benchmarks/remote_parquet.py b/dask_cuda/benchmarks/remote_parquet.py new file mode 100644 index 00000000..8cd99ec7 --- /dev/null +++ b/dask_cuda/benchmarks/remote_parquet.py @@ -0,0 +1,187 @@ +import contextlib +from collections import ChainMap +from time import perf_counter as clock + +import pandas as pd + +import dask +import dask.dataframe as dd +from dask.distributed import performance_report +from dask.utils import format_bytes, parse_bytes + +from dask_cuda.benchmarks.common import Config, execute_benchmark +from dask_cuda.benchmarks.custom import custom_read_parquet +from dask_cuda.benchmarks.utils import ( + parse_benchmark_args, + print_key_value, + print_separator, + print_throughput_bandwidth, +) + +DEFAULT_DATASET_PATH = "s3://dask-cudf-parquet-testing/dedup_parquet" +DEFAULT_COLUMNS = ["text", "id"] +DEFAULT_STORAGE_SIZE = 2_843_373_145 # Compressed byte size + + +def read_data( + backend, + filesystem, + aggregate_files, + blocksize, +): + path = DEFAULT_DATASET_PATH + columns = DEFAULT_COLUMNS + with dask.config.set({"dataframe.backend": backend}): + if filesystem == "arrow" and backend == "cudf": + df = custom_read_parquet( + path, + columns=columns, + blocksize=blocksize, + ) + else: + if filesystem == "arrow": + # TODO: Warn user that blocksize and aggregate_files + # are ingored when `filesystem == "arrow"` + _blocksize = {} + _aggregate_files = {} + else: + _blocksize = {"blocksize": blocksize} + _aggregate_files = {"aggregate_files": aggregate_files} + + df = dd.read_parquet( + path, + columns=columns, + filesystem=filesystem, + **_blocksize, + **_aggregate_files, + ) + return df.memory_usage().compute().sum() + + +def bench_once(client, args, write_profile=None): + + if write_profile is None: + ctx = contextlib.nullcontext() + else: + ctx = performance_report(filename=args.profile) + + with ctx: + t1 = clock() + output_size = read_data( + backend="cudf" if args.type == "gpu" else "pandas", + filesystem=args.filesystem, + aggregate_files=args.aggregate_files, + blocksize=args.blocksize, + ) + t2 = clock() + + return (DEFAULT_STORAGE_SIZE, output_size, t2 - t1) + + +def pretty_print_results(args, address_to_index, p2p_bw, results): + if args.markdown: + print("```") + print("Remote Parquet benchmark") + print_separator(separator="-") + backend = "cudf" if args.type == "gpu" else "pandas" + print_key_value(key="Backend", value=f"{backend}") + print_key_value(key="Filesystem", value=f"{args.filesystem}") + print_key_value(key="Blocksize", value=f"{args.blocksize}") + print_key_value(key="Aggregate files", value=f"{args.aggregate_files}") + print_key_value(key="Output size", value=f"{format_bytes(results[0][1])}") + if args.markdown: + print("\n```") + data_processed, output_size, durations = zip(*results) + print_throughput_bandwidth( + args, durations, data_processed, p2p_bw, address_to_index + ) + + +def create_tidy_results(args, p2p_bw, results): + configuration = { + "backend": "cudf" if args.type == "gpu" else "pandas", + "filesystem": args.filesystem, + "blocksize": args.blocksize, + "aggregate_files": args.aggregate_files, + } + timing_data = pd.DataFrame( + [ + pd.Series( + data=ChainMap( + configuration, + { + "wallclock": duration, + "data_processed": data_processed, + "output_size": output_size, + }, + ) + ) + for data_processed, output_size, duration in results + ] + ) + return timing_data, p2p_bw + + +def parse_args(): + special_args = [ + { + "name": "--blocksize", + "default": "256MB", + "type": str, + "help": "How to set the blocksize option", + }, + { + "name": "--aggregate-files", + "default": False, + "action": "store_true", + "help": "How to set the aggregate_files option", + }, + { + "name": [ + "-t", + "--type", + ], + "choices": ["cpu", "gpu"], + "default": "gpu", + "type": str, + "help": "Use GPU or CPU dataframes (default 'gpu')", + }, + { + "name": "--filesystem", + "choices": ["arrow", "fsspec"], + "default": "fsspec", + "type": str, + "help": "Filesystem backend", + }, + { + "name": "--runs", + "default": 3, + "type": int, + "help": "Number of runs", + }, + # NOTE: The following args are not relevant to this benchmark + { + "name": "--ignore-size", + "default": "1 MiB", + "metavar": "nbytes", + "type": parse_bytes, + "help": "Ignore messages smaller than this (default '1 MB')", + }, + ] + + return parse_benchmark_args( + description="Remote Parquet (dask/cudf) benchmark", + args_list=special_args, + check_explicit_comms=False, + ) + + +if __name__ == "__main__": + execute_benchmark( + Config( + args=parse_args(), + bench_once=bench_once, + create_tidy_results=create_tidy_results, + pretty_print_results=pretty_print_results, + ) + )