-
Notifications
You must be signed in to change notification settings - Fork 14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add scan. #531
base: main
Are you sure you want to change the base?
Add scan. #531
Conversation
Closes cubed-dev#277
# Here we diverge from Blelloch, who runs a balanced tree algorithm to calculate the scan. | ||
# Instead we generalize recursively apply the scan to `reduced`. | ||
# 3a. First we merge to a decent intermediate chunksize since reduced.chunksize[axis] == 1 | ||
new_chunksize = min(reduced.shape[axis], reduced.chunksize[axis] * 5) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need input here on choosing a new intermediate chunksize to rechunk to based on memory info.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a couple of things to consider here: the number of chunks to combine at each stage, and the memory limits.
The first is like split_every
in reduction
, where the default is 4, although 6 or 8 may be better for larger workloads.
For the second, we should make sure the new chunksize is no larger than (x.spec.allowed_mem - x.spec.reserved_mem) // 4
, where the factor of 4 is comes about because of the {compressed,uncompressed} * {input,output} copies.
There is an error case where this memory constraint means the new chunksize is no larger than the existing one, so the computation can't proceed. The user can fix this either by reducing the chunksize or by increasing the memory. This is similar to this case:
Lines 985 to 991 in 88c5dc4
# single axis: see how many result chunks fit in max_mem | |
# factor of 4 is memory for {compressed, uncompressed} x {input, output} | |
target_chunk_size = (max_mem - chunk_mem) // (chunk_mem * 4) | |
if target_chunk_size <= 1: | |
raise ValueError( | |
f"Not enough memory for reduction. Increase allowed_mem ({allowed_mem}) or decrease chunk size" | |
) |
shape=scanned.shape, | ||
dtype=scanned.dtype, | ||
chunks=scanned.chunks, | ||
extra_projected_mem=scanned.chunkmem * 2, # arbitrary |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need input here too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be the memory allocated to read from the side inputs (scanned
and increment
here). We double the chunk memory to account for reading the compressed Zarr chunk, so the result would be
extra_projected_mem=scanned.chunkmem * 2 + increment.chunkmem * 2
(There's an open issue #288 to make this a bit more transparent.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you going to add a user-facing cumulative_sum
function from the Array API? This would be a good function for the unit tests to test.
shape=scanned.shape, | ||
dtype=scanned.dtype, | ||
chunks=scanned.chunks, | ||
extra_projected_mem=scanned.chunkmem * 2, # arbitrary |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be the memory allocated to read from the side inputs (scanned
and increment
here). We double the chunk memory to account for reading the compressed Zarr chunk, so the result would be
extra_projected_mem=scanned.chunkmem * 2 + increment.chunkmem * 2
(There's an open issue #288 to make this a bit more transparent.)
# Here we diverge from Blelloch, who runs a balanced tree algorithm to calculate the scan. | ||
# Instead we generalize recursively apply the scan to `reduced`. | ||
# 3a. First we merge to a decent intermediate chunksize since reduced.chunksize[axis] == 1 | ||
new_chunksize = min(reduced.shape[axis], reduced.chunksize[axis] * 5) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a couple of things to consider here: the number of chunks to combine at each stage, and the memory limits.
The first is like split_every
in reduction
, where the default is 4, although 6 or 8 may be better for larger workloads.
For the second, we should make sure the new chunksize is no larger than (x.spec.allowed_mem - x.spec.reserved_mem) // 4
, where the factor of 4 is comes about because of the {compressed,uncompressed} * {input,output} copies.
There is an error case where this memory constraint means the new chunksize is no larger than the existing one, so the computation can't proceed. The user can fix this either by reducing the chunksize or by increasing the memory. This is similar to this case:
Lines 985 to 991 in 88c5dc4
# single axis: see how many result chunks fit in max_mem | |
# factor of 4 is memory for {compressed, uncompressed} x {input, output} | |
target_chunk_size = (max_mem - chunk_mem) // (chunk_mem * 4) | |
if target_chunk_size <= 1: | |
raise ValueError( | |
f"Not enough memory for reduction. Increase allowed_mem ({allowed_mem}) or decrease chunk size" | |
) |
""" | ||
# Blelloch (1990) out-of-core algorithm. | ||
# 1. First, scan blockwise | ||
scanned = blockwise(func, "ij", array, "ij", axis=axis) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using map_blocks
would be simpler and avoid the 2D assumption
@@ -1442,3 +1443,120 @@ def smallest_blockdim(blockdims): | |||
m = ntd[0] | |||
out = ntd | |||
return out | |||
|
|||
|
|||
def wrapper_binop( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe call something like _scan_binop
to link it to the scan implementation? I've been using a naming convention like that elsewhere in the file.
Closes #277