Skip to content

Commit

Permalink
add new remote-io benchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
rjzamora committed Jul 30, 2024
1 parent 14efa4d commit 0b48642
Show file tree
Hide file tree
Showing 3 changed files with 341 additions and 0 deletions.
1 change: 1 addition & 0 deletions dask_cuda/benchmarks/custom/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .parquet import read_parquet as custom_read_parquet
153 changes: 153 additions & 0 deletions dask_cuda/benchmarks/custom/parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
import math
import os
from concurrent.futures import ThreadPoolExecutor

import numpy as np
import pyarrow as pa
import pyarrow.parquet as pq
from pyarrow import dataset

import dask
import dask.dataframe as dd
from dask.base import apply, tokenize
from dask.distributed import get_worker
from dask.utils import parse_bytes

# NOTE: The pyarrow component of this code was mostly copied
# from dask-expr (dask_expr/io/parquet.py)


_CPU_COUNT_SET = False


def _maybe_adjust_cpu_count():
global _CPU_COUNT_SET
if not _CPU_COUNT_SET:
# Set the number of threads to the number of cores
# This is a default for pyarrow, but it's not set by default in
# dask/distributed
pa.set_cpu_count(os.cpu_count())
_CPU_COUNT_SET = True


def fragment_to_table(fragment, filters=None, columns=None, schema=None):
_maybe_adjust_cpu_count()

if isinstance(filters, list):
filters = pq.filters_to_expression(filters)

return fragment.to_table(
schema=schema,
columns=columns,
filter=filters,
# Batch size determines how many rows are read at once and will
# cause the underlying array to be split into chunks of this size
# (max). We'd like to avoid fragmentation as much as possible and
# and to set this to something like inf but we have to set a finite,
# positive number.
# In the presence of row groups, the underlying array will still be
# chunked per rowgroup
batch_size=10_000_000,
fragment_scan_options=pa.dataset.ParquetFragmentScanOptions(
pre_buffer=True,
cache_options=pa.CacheOptions(
hole_size_limit=parse_bytes("4 MiB"),
range_size_limit=parse_bytes("32.00 MiB"),
),
),
use_threads=True,
)


def tables_to_frame(tables):
import cudf

return cudf.DataFrame.from_arrow(
pa.concat_tables(tables) if len(tables) > 1 else tables[0]
)


def read_parquet_fragments(fragments, columns=None, filters=None):

kwargs = {"columns": columns, "filters": filters}
if not isinstance(fragments, list):
fragments = [fragments]

if len(fragments) > 1:
# Read multiple fragments
token = tokenize(fragments, columns, filters)
chunk_name = f"read-chunk-{token}"
dsk = {
(chunk_name, i): (apply, fragment_to_table, [fragment], kwargs)
for i, fragment in enumerate(fragments)
}
dsk[chunk_name] = (tables_to_frame, list(dsk.keys()))

try:
worker = get_worker()
except ValueError:
return dask.threaded.get(dsk, chunk_name)

if not hasattr(worker, "_rapids_executor"):
num_threads = len(os.sched_getaffinity(0))
worker._rapids_executor = ThreadPoolExecutor(num_threads)
with dask.config.set(pool=worker._rapids_executor):
return dask.threaded.get(dsk, chunk_name)

else:
# Read single fragment
return tables_to_frame([fragment_to_table(fragments[0], **kwargs)])


def mean_file_size(fragments, n=10):
n_frags = len(fragments)
if n < n_frags:
indices = np.random.choice(np.arange(n_frags), size=n, replace=False)
else:
indices = np.arange(n_frags)

sizes = []
for f in indices:
size = 0
frag = fragments[f]
for row_group in frag.row_groups:
size += row_group.total_byte_size
sizes.append(size)

return np.mean(sizes)


def aggregate_fragments(fragments, blocksize):
size = mean_file_size(fragments)
blocksize = parse_bytes(blocksize)
stride = int(math.floor(blocksize / size))

if stride < 1:
pass # Not implemented yet!

stride = max(stride, 1)
return [fragments[i : i + stride] for i in range(0, len(fragments), stride)]


def read_parquet(urlpath, columns=None, filters=None, blocksize="256MB", **kwargs):

# Use pyarrow dataset API to get fragments and meta
ds = dataset.dataset(urlpath, format="parquet")
meta = tables_to_frame([ds.schema.empty_table()])
if columns is not None:
meta = meta[columns]
fragments = list(ds.get_fragments())

# Aggregate fragments together if necessary
if blocksize:
fragments = aggregate_fragments(fragments, blocksize)

# Construct collection
return dd.from_map(
read_parquet_fragments,
fragments,
columns=columns,
filters=filters,
meta=meta,
enforce_metadata=False,
)
187 changes: 187 additions & 0 deletions dask_cuda/benchmarks/remote_parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
import contextlib
from collections import ChainMap
from time import perf_counter as clock

import pandas as pd

import dask
import dask.dataframe as dd
from dask.distributed import performance_report
from dask.utils import format_bytes, parse_bytes

from dask_cuda.benchmarks.common import Config, execute_benchmark
from dask_cuda.benchmarks.custom import custom_read_parquet
from dask_cuda.benchmarks.utils import (
parse_benchmark_args,
print_key_value,
print_separator,
print_throughput_bandwidth,
)

DEFAULT_DATASET_PATH = "s3://dask-cudf-parquet-testing/dedup_parquet"
DEFAULT_COLUMNS = ["text", "id"]
DEFAULT_STORAGE_SIZE = 2_843_373_145 # Compressed byte size


def read_data(
backend,
filesystem,
aggregate_files,
blocksize,
):
path = DEFAULT_DATASET_PATH
columns = DEFAULT_COLUMNS
with dask.config.set({"dataframe.backend": backend}):
if filesystem == "arrow" and backend == "cudf":
df = custom_read_parquet(
path,
columns=columns,
blocksize=blocksize,
)
else:
if filesystem == "arrow":
# TODO: Warn user that blocksize and aggregate_files
# are ingored when `filesystem == "arrow"`
_blocksize = {}
_aggregate_files = {}
else:
_blocksize = {"blocksize": blocksize}
_aggregate_files = {"aggregate_files": aggregate_files}

df = dd.read_parquet(
path,
columns=columns,
filesystem=filesystem,
**_blocksize,
**_aggregate_files,
)
return df.memory_usage().compute().sum()


def bench_once(client, args, write_profile=None):

if write_profile is None:
ctx = contextlib.nullcontext()
else:
ctx = performance_report(filename=args.profile)

with ctx:
t1 = clock()
output_size = read_data(
backend="cudf" if args.type == "gpu" else "pandas",
filesystem=args.filesystem,
aggregate_files=args.aggregate_files,
blocksize=args.blocksize,
)
t2 = clock()

return (DEFAULT_STORAGE_SIZE, output_size, t2 - t1)


def pretty_print_results(args, address_to_index, p2p_bw, results):
if args.markdown:
print("```")
print("Remote Parquet benchmark")
print_separator(separator="-")
backend = "cudf" if args.type == "gpu" else "pandas"
print_key_value(key="Backend", value=f"{backend}")
print_key_value(key="Filesystem", value=f"{args.filesystem}")
print_key_value(key="Blocksize", value=f"{args.blocksize}")
print_key_value(key="Aggregate files", value=f"{args.aggregate_files}")
print_key_value(key="Output size", value=f"{format_bytes(results[0][1])}")
if args.markdown:
print("\n```")
data_processed, output_size, durations = zip(*results)
print_throughput_bandwidth(
args, durations, data_processed, p2p_bw, address_to_index
)


def create_tidy_results(args, p2p_bw, results):
configuration = {
"backend": "cudf" if args.type == "gpu" else "pandas",
"filesystem": args.filesystem,
"blocksize": args.blocksize,
"aggregate_files": args.aggregate_files,
}
timing_data = pd.DataFrame(
[
pd.Series(
data=ChainMap(
configuration,
{
"wallclock": duration,
"data_processed": data_processed,
"output_size": output_size,
},
)
)
for data_processed, output_size, duration in results
]
)
return timing_data, p2p_bw


def parse_args():
special_args = [
{
"name": "--blocksize",
"default": "256MB",
"type": str,
"help": "How to set the blocksize option",
},
{
"name": "--aggregate-files",
"default": False,
"action": "store_true",
"help": "How to set the aggregate_files option",
},
{
"name": [
"-t",
"--type",
],
"choices": ["cpu", "gpu"],
"default": "gpu",
"type": str,
"help": "Use GPU or CPU dataframes (default 'gpu')",
},
{
"name": "--filesystem",
"choices": ["arrow", "fsspec"],
"default": "fsspec",
"type": str,
"help": "Filesystem backend",
},
{
"name": "--runs",
"default": 3,
"type": int,
"help": "Number of runs",
},
# NOTE: The following args are not relevant to this benchmark
{
"name": "--ignore-size",
"default": "1 MiB",
"metavar": "nbytes",
"type": parse_bytes,
"help": "Ignore messages smaller than this (default '1 MB')",
},
]

return parse_benchmark_args(
description="Remote Parquet (dask/cudf) benchmark",
args_list=special_args,
check_explicit_comms=False,
)


if __name__ == "__main__":
execute_benchmark(
Config(
args=parse_args(),
bench_once=bench_once,
create_tidy_results=create_tidy_results,
pretty_print_results=pretty_print_results,
)
)

0 comments on commit 0b48642

Please sign in to comment.