Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adjust class hierarchy for overlapping stuff #429

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 38 additions & 3 deletions dask_expr/_expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -1093,7 +1093,19 @@ def _task(self, index: int):
return self.value


class Blockwise(Expr):
class BlockwiseOverlapping(Expr):
"""Super class that operates like a blockwise op

We require information from neighboring partitions, so we can't prune partitions
before lowering, but the spec is the same as for Blockwise ops, we don't reoder
things, alignment stays consistent, ...

"""

pass


Comment on lines +1096 to +1107
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this exist? Is there some shared structure between these two that we need to handle?

My intuition here is just to have

class Blockwise(Expr):
    ...

class MapOverlap(Expr):
    ...

At least from this PR it doesn't seem like we're using the intermediate classes.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not not yet, are_co_aligned can leverage this for example

Every check that only cares about the partitioning layout should fall back to this base class

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My recommendation would be to wait until we have a demonstrable need, and even in that case to consider using (Blockwise, MapOverlap) instead if possible.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the PR with the use-case

It's a bit similar to how Blockwise and Elemwise relate to each other

class Blockwise(BlockwiseOverlapping):
"""Super-class for block-wise operations

This is fairly generic, and includes definitions for `_meta`, `divisions`,
Expand Down Expand Up @@ -1276,7 +1288,7 @@ def _task(self, index: int):
)


class MapOverlap(MapPartitions):
class MapOverlap(BlockwiseOverlapping):
_parameters = [
"frame",
"func",
Expand All @@ -1296,6 +1308,29 @@ class MapOverlap(MapPartitions):
"clear_divisions": False,
}

def _broadcast_dep(self, dep: Expr):
return dep.npartitions == 1

@property
def args(self):
return [self.frame] + self.operands[len(self._parameters) :]

def _divisions(self):
# Unknown divisions
if self.clear_divisions:
return (None,) * (self.frame.npartitions + 1)

# (Possibly) known divisions
dfs = [arg for arg in self.args if isinstance(arg, Expr)]
return _get_divisions_map_partitions(
True, # Partitions must already be "aligned"
self.transform_divisions,
dfs,
self.func,
self.args,
self.kwargs,
)

@functools.cached_property
def _kwargs(self) -> dict:
kwargs = self.kwargs
Expand Down Expand Up @@ -2454,7 +2489,7 @@ def non_blockwise_ancestors(expr):
e = stack.pop()
if isinstance(e, IO):
yield e
elif isinstance(e, Blockwise):
elif isinstance(e, BlockwiseOverlapping):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is the only application, then this seems lighter weight to me

Suggested change
elif isinstance(e, BlockwiseOverlapping):
elif isinstance(e, (Blockwise, MapOverlap)):

dependencies = e.dependencies()
stack.extend([expr for expr in dependencies if not is_broadcastable(expr)])
else:
Expand Down
Loading