Skip to content

Commit

Permalink
factor MapWithConcurrencyLimit into standalone ptransform
Browse files Browse the repository at this point in the history
  • Loading branch information
cisaacstern committed Aug 9, 2023
1 parent 32d9fe5 commit c69e883
Showing 1 changed file with 40 additions and 13 deletions.
53 changes: 40 additions & 13 deletions pangeo_forge_recipes/transforms.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,41 @@ def _assign_concurrency_group(elem, max_concurrency: int):
return (random.randint(0, max_concurrency - 1), elem)


@dataclass
class MapWithConcurrencyLimit(beam.PTransform):
"""A transform which maps calls to the provided function, optionally limiting the maximum
number of concurrent calls. Useful for situations where the provided function requests data
from an external service that does not support an unlimited number of concurrent requests.
:param map_func: The function to pass to beam.Map (in the case of no concurrency limit)
or beam.FlatMap (if max_concurrency is specified).
:param op_name: The name of the operation performed by `map_func`.
:param map_func_kws: Kwargs passed to all invocations of `map_func`.
:param max_concurrency: The maximum number of concurrent invocations of `map_func`.
If unspecified, no limit is imposed by this transform (therefore the concurrency
limit will be set by the Beam Runner's configuration).
"""

map_func: Callable
op_name: str
map_func_kws: Optional[dict] = field(default_factory=dict)
max_concurrency: Optional[int] = None

def expand(self, pcoll):
return (
pcoll | self.op_name >> beam.Map(_add_keys(self.map_func), **self.map_func_kws)
if not self.max_concurrency
else (
pcoll
| beam.Map(_assign_concurrency_group, self.max_concurrency)
| beam.GroupByKey()
| DropKeys()
| f"{self.op_name} (max_concurrency={self.max_concurrency})"
>> beam.FlatMap(_add_keys_iter(self.map_func), **self.map_func_kws)
)
)


# This has side effects if using a cache
@dataclass
class OpenURLWithFSSpec(beam.PTransform):
Expand Down Expand Up @@ -142,19 +177,11 @@ def expand(self, pcoll):
secrets=self.secrets,
open_kwargs=self.open_kwargs,
)
return (
pcoll | "Open with fsspec" >> beam.Map(_add_keys(open_url), **kws)
if not self.max_concurrency
else (
pcoll
# TODO: factor next three stages into a standalone PTransform
# so they can be reused elsewhere (e.g. OpenWithKerchunk)
| beam.Map(_assign_concurrency_group, self.max_concurrency)
| beam.GroupByKey()
| DropKeys()
| f"Open with fsspec (max_concurrency={self.max_concurrency})"
>> beam.FlatMap(_add_keys_iter(open_url), **kws)
)
return pcoll | MapWithConcurrencyLimit(
map_func=open_url,
op_name="Open with fsspec",
map_func_kws=kws,
max_concurrency=self.max_concurrency,
)


Expand Down

0 comments on commit c69e883

Please sign in to comment.