Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate shuffle and merge to P2PBarrierTask #1157

Merged
merged 1 commit into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 38 additions & 16 deletions dask_expr/_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -608,8 +608,14 @@ def _lower(self):
return None

def _layer(self) -> dict:
from distributed.shuffle._core import ShuffleId, barrier_key, p2p_barrier
from distributed.shuffle._core import (
P2PBarrierTask,
ShuffleId,
barrier_key,
p2p_barrier,
)
from distributed.shuffle._merge import merge_unpack
from distributed.shuffle._shuffle import DataFrameShuffleSpec

dsk = {}
token_left = _tokenize_deterministic(
Expand Down Expand Up @@ -648,8 +654,6 @@ def _layer(self) -> dict:
self.npartitions,
token_left,
i,
self.left._meta,
self._partitions,
self.left_index,
)
dsk[t.key] = t
Expand All @@ -665,20 +669,44 @@ def _layer(self) -> dict:
self.npartitions,
token_right,
i,
self.right._meta,
self._partitions,
self.right_index,
)
dsk[t.key] = t
transfer_keys_right.append(t.ref())

barrier_left = Task(
_barrier_key_left, p2p_barrier, token_left, transfer_keys_left
meta_left = self.left._meta.assign(**{_HASH_COLUMN_NAME: 0})
barrier_left = P2PBarrierTask(
_barrier_key_left,
p2p_barrier,
token_left,
transfer_keys_left,
spec=DataFrameShuffleSpec(
id=token_left,
npartitions=self.npartitions,
column=_HASH_COLUMN_NAME,
meta=meta_left,
parts_out=self._partitions,
disk=True,
drop_column=True,
),
)
dsk[barrier_left.key] = barrier_left

barrier_right = Task(
_barrier_key_right, p2p_barrier, token_right, transfer_keys_right
meta_right = self.right._meta.assign(**{_HASH_COLUMN_NAME: 0})
barrier_right = P2PBarrierTask(
_barrier_key_right,
p2p_barrier,
token_right,
transfer_keys_right,
spec=DataFrameShuffleSpec(
id=token_right,
npartitions=self.npartitions,
column=_HASH_COLUMN_NAME,
meta=meta_right,
parts_out=self._partitions,
disk=True,
drop_column=True,
),
)
dsk[barrier_right.key] = barrier_right

Expand Down Expand Up @@ -813,7 +841,6 @@ def _layer(self) -> dict:


def create_assign_index_merge_transfer():
import pandas as pd
from distributed.shuffle._core import ShuffleId
from distributed.shuffle._merge import merge_transfer

Expand All @@ -824,8 +851,6 @@ def assign_index_merge_transfer(
npartitions,
id: ShuffleId,
input_partition: int,
meta: pd.DataFrame,
parts_out: set[int],
index_merge,
):
if index_merge:
Expand All @@ -847,10 +872,7 @@ def assign_index_merge_transfer(

index = partitioning_index(index, npartitions)
df = df.assign(**{name: index})
meta = meta.assign(**{name: 0})
return merge_transfer(
df, id, input_partition, npartitions, meta, parts_out, True
)
return merge_transfer(df, id, input_partition)

return assign_index_merge_transfer

Expand Down
51 changes: 25 additions & 26 deletions dask_expr/_shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -536,28 +536,13 @@ def _shuffle_transfer(
input: pd.DataFrame,
id,
input_partition: int,
npartitions: int,
column: str,
meta: pd.DataFrame,
parts_out: set[int] | int,
disk: bool,
drop_column: bool,
) -> int:
from distributed.shuffle._shuffle import shuffle_transfer

if isinstance(parts_out, int):
parts_out = list(range(parts_out))

return shuffle_transfer(
input,
id,
input_partition,
npartitions,
column,
meta,
parts_out,
disk,
drop_column,
)


Expand All @@ -569,20 +554,26 @@ def _meta(self):
return self.frame._meta.drop(columns=self.partitioning_index)

def _layer(self):
from distributed.shuffle._core import p2p_barrier
from distributed.shuffle._shuffle import ShuffleId, barrier_key, shuffle_unpack
from distributed.shuffle._core import (
P2PBarrierTask,
ShuffleId,
barrier_key,
p2p_barrier,
)
from distributed.shuffle._shuffle import DataFrameShuffleSpec, shuffle_unpack

dsk = {}
token = self._name.split("-")[-1]
_barrier_key = barrier_key(ShuffleId(token))
shuffle_id = ShuffleId(token)
_barrier_key = barrier_key(shuffle_id)
name = "shuffle-transfer-" + token

parts_out = (
self._partitions if self._filtered else list(range(self.npartitions_out))
)
# Avoid embedding a materialized list unless necessary
parts_out_arg = (
set(self._partitions) if self._filtered else self.npartitions_out
tuple(self._partitions) if self._filtered else self.npartitions_out
)

transfer_keys = list()
Expand All @@ -593,17 +584,25 @@ def _layer(self):
TaskRef((self.frame._name, i)),
token,
i,
self.npartitions_out,
self.partitioning_index,
self.frame._meta,
parts_out_arg,
True,
True,
)
dsk[t.key] = t
transfer_keys.append(t.ref())

barrier = Task(_barrier_key, p2p_barrier, token, transfer_keys)
barrier = P2PBarrierTask(
_barrier_key,
p2p_barrier,
token,
transfer_keys,
spec=DataFrameShuffleSpec(
id=shuffle_id,
npartitions=self.npartitions_out,
column=self.partitioning_index,
meta=self.frame._meta,
parts_out=parts_out_arg,
disk=True,
drop_column=True,
),
)
dsk[barrier.key] = barrier

# TODO: Decompose p2p Into transfer/barrier + unpack
Expand Down
Loading