Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make meta calculcation for merge more efficient #284

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 28 additions & 17 deletions dask_expr/_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ class Merge(Expr):
"shuffle_backend": None,
}

def __init__(self, *args, _precomputed_meta=None, **kwargs):
super().__init__(*args, **kwargs)
self._precomputed_meta = _precomputed_meta
Comment on lines +53 to +55
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I would strongly prefer if we use a global key-value cache in the same way we cache dataset info for parquet. In fact, we should probably formalize this caching approach to avoid repeating the same kind of logic in multiple places.

It seems like a unique meta depends on a token like...

    @functools.cached_property
    def _meta_cache_token(self):
        return _tokenize_deterministic(
            self.left._meta,
            self.right._meta,
            self.how,
            self.left_on,
            self.right_on,
            self.left_index,
            self.right_index,
            self.suffixes,
            self.indicator,
        )

If self.left._meta or self.right._meta were to change (due to column projection), we would need to recalculate meta anyway. However, if the Merge object was responsible for pushing down the column projection, we could always update the cache within the simplify logic (since we would already know how the meta needs to change).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t see a global need for this yet. The slowdown in merge goes back to the nonempty meta objects, not the actual computation on empty objects.

some of the operations in Lower have side effects, which makes adjusting the meta objects of left and right bothersome and complicated.

I am open to adjusting the implementation if we run into this in more places, but as long as we need it only for merge I’d prefer this solution since we keep the complexity in here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fwiw I am also not too big of a fan of relying on meta in hashes, there are too many things in pandas that might mutate this unexpectedly, which would break this

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To clarify, I don't really mind if we implement stand-alone caching logic in _merge.py for now. The thing I'm unsure about in this PR is that we are overriding __init__ so that we can effectively cache _meta without adding _meta as an official operand.

It may be the case that this is exactly how we should be attacking this problem in Merge (and maybe everywhere). For example, maybe we will eventually have a special known_meta= kwarg in Expr, which all expression objects could leverage. However, since it is not a proper operand, this mechanism feels a bit confusing and fragile to me.

The slowdown in merge goes back to the nonempty meta objects, not the actual computation on empty objects.

I don't think I understand your point here. Either way we are effectively caching the output of _meta, no?

some of the operations in Lower have side effects, which makes adjusting the meta objects of left and right bothersome and complicated.

I don't see how this is any different for _precomputed_meta? In any case where you are confident defining _precomputed_meta, you could also just add the "lowered" object to the global cache before returning it.

Copy link
Member

@rjzamora rjzamora Aug 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fwiw I am also not too big of a fan of relying on meta in hashes, there are too many things in pandas that might mutate this unexpectedly, which would break this

Interesting. I'd say that should be a fundamental concern for dask-expr in then. What would be then most reliable way to hash the schema?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I understand your point here. Either way we are effectively caching the output of _meta, no?

Yes but the nature of the slowdown makes me think that we need it only in merge and not in other places as of now. I am open to rewriting this here as well if this turns out differently.

I don't see how this is any different for _precomputed_meta? In any case where you are confident defining _precomputed_meta, you could also just add the "lowered" object to the global cache before returning it.

pandas has some caveats that might change your dtype in meta but not on the actual df. Relying on the initial meta seems saver to me. But this might also be totally wrong.

To clarify, I don't really mind if we implement stand-alone caching logic in _merge.py for now. The thing I'm unsure about in this PR is that we are overriding init so that we can effectively cache _meta without adding _meta as an official operand.

@mrocklin and I chatted offline and landed on this solution. One motivating factor was the last part here: #284 (comment)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. I'd say that should be a fundamental concern for dask-expr in then. What would be then most reliable way to hash the schema?

I don't have a good answer for that. Meta is still the best bet, but it has some limitations. This will get more stable in the future since we are deprecating all of these caveats at the moment.

Copy link
Member

@rjzamora rjzamora Aug 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes but the nature of the slowdown makes me think that we need it only in merge and not in other places as of now. I am open to rewriting this here as well if this turns out differently.

Okay, I think you are are talking about the decision not to write general Expr-wide caching code here, which is all good with me. I was only thinking about the decision to use _precomputed_meta instead of a simple k/v cache.

Possible problems with the k/v cache approach:

  • The meta-hashing issue you mentioned
  • We would be keeping the cached meta in memory even after we need it (also a problem for parquet)

Possible problems with _precomputed_meta:

  • I suppose we are breaking with convention a bit (seems okay)
  • Any substitute_parameters call will drop the information, even if you aren't changing information that is relevant to meta

One motivating factor was the last part here ...

Right, I agree that it would be a mistake to make _precomputed_meta a proper operand.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We would also have to patch meta of left and right in the HashJoin layer because that adds a column

substitute_parameters parameters is annoying, we could override but that's not great either.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for adding all this discussion. I do get that you are focusing on the Merge-specific meta issue at hand.

I'm just doing my best to keep the big picture in mind - It seems like we are going to keep running into cases where we would benefit from caching information outside the Expr object itself. Therefore, it would be nice if we could design a formal system where a collection of different caches can be managed in one place.

That said, I definitely don't think we need to do something like that right now.

We would also have to patch meta of left and right in the HashJoin layer because that adds a column

Right, HashJoinP2P._meta_cache_token does need to drop the hash columns.


def __str__(self):
return f"Merge({self._name[-7:]})"

Expand All @@ -69,6 +73,8 @@ def kwargs(self):

@functools.cached_property
def _meta(self):
if self._precomputed_meta is not None:
return self._precomputed_meta
left = meta_nonempty(self.left._meta)
right = meta_nonempty(self.right._meta)
return make_meta(left.merge(right, **self.kwargs))
Expand Down Expand Up @@ -104,7 +110,9 @@ def _lower(self):
or right.npartitions == 1
and how in ("left", "inner")
):
return BlockwiseMerge(left, right, **self.kwargs)
return BlockwiseMerge(
left, right, **self.kwargs, _precomputed_meta=self._meta
)

# Check if we are merging on indices with known divisions
merge_indexed_left = (
Expand Down Expand Up @@ -165,6 +173,7 @@ def _lower(self):
indicator=self.indicator,
left_index=left_index,
right_index=right_index,
_precomputed_meta=self._meta,
)

if shuffle_left_on:
Expand All @@ -186,7 +195,7 @@ def _lower(self):
)

# Blockwise merge
return BlockwiseMerge(left, right, **self.kwargs)
return BlockwiseMerge(left, right, **self.kwargs, _precomputed_meta=self._meta)

def _simplify_up(self, parent):
if isinstance(parent, (Projection, Index)):
Expand All @@ -203,13 +212,20 @@ def _simplify_up(self, parent):
projection = [projection]

left, right = self.left, self.right
left_on, right_on = self.left_on, self.right_on
if isinstance(self.left_on, list):
left_on = self.left_on
else:
left_on = [self.left_on] if self.left_on is not None else []
if isinstance(self.right_on, list):
right_on = self.right_on
else:
right_on = [self.right_on] if self.right_on is not None else []
left_suffix, right_suffix = self.suffixes[0], self.suffixes[1]
project_left, project_right = [], []

# Find columns to project on the left
for col in left.columns:
if left_on is not None and col in left_on or col in projection:
if col in left_on or col in projection:
project_left.append(col)
elif f"{col}{left_suffix}" in projection:
project_left.append(col)
Expand All @@ -220,7 +236,7 @@ def _simplify_up(self, parent):

# Find columns to project on the right
for col in right.columns:
if right_on is not None and col in right_on or col in projection:
if col in right_on or col in projection:
project_right.append(col)
elif f"{col}{right_suffix}" in projection:
project_right.append(col)
Expand All @@ -232,8 +248,13 @@ def _simplify_up(self, parent):
if set(project_left) < set(left.columns) or set(project_right) < set(
right.columns
):
columns = left_on + right_on + projection
meta_cols = [col for col in self.columns if col in columns]
result = type(self)(
left[project_left], right[project_right], *self.operands[2:]
left[project_left],
right[project_right],
*self.operands[2:],
_precomputed_meta=self._meta[meta_cols],
)
if parent_columns is None:
return type(parent)(result)
Expand Down Expand Up @@ -269,17 +290,7 @@ def _lower(self):

@functools.cached_property
def _meta(self):
left = self.left._meta.drop(columns=_HASH_COLUMN_NAME)
right = self.right._meta.drop(columns=_HASH_COLUMN_NAME)
return left.merge(
right,
left_on=self.left_on,
right_on=self.right_on,
indicator=self.indicator,
suffixes=self.suffixes,
left_index=self.left_index,
right_index=self.right_index,
)
return self._precomputed_meta

def _layer(self) -> dict:
dsk = {}
Expand Down
2 changes: 1 addition & 1 deletion dask_expr/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ def _dataset_info(self):

return dataset_info

@property
@cached_property
def _meta(self):
meta = self._dataset_info["meta"]
if self._series:
Expand Down
12 changes: 12 additions & 0 deletions dask_expr/tests/test_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,15 @@ def test_merge_len():
query = df.merge(df2).index.optimize(fuse=False)
expected = df[["x"]].merge(df2[["x"]]).index.optimize(fuse=False)
assert query._name == expected._name


def test_merge_optimize_subset_strings():
pdf = lib.DataFrame({"a": [1, 2], "aaa": 1})
pdf2 = lib.DataFrame({"b": [1, 2], "aaa": 1})
df = from_pandas(pdf)
df2 = from_pandas(pdf2)

query = df.merge(df2, on="aaa")[["aaa"]].optimize(fuse=False)
exp = df[["aaa"]].merge(df2[["aaa"]], on="aaa").optimize(fuse=False)
assert query._name == exp._name
assert_eq(query, pdf.merge(pdf2, on="aaa")[["aaa"]])
Loading