Skip to content

Commit

Permalink
feat(ingest): allow max_workers=1 with ASYNC_BATCH rest sink (#12088)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Dec 10, 2024
1 parent 00f0ee8 commit d953718
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 6 deletions.
6 changes: 3 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ class DatahubRestSinkConfig(DatahubClientConfig):
mode: RestSinkMode = _DEFAULT_REST_SINK_MODE

# These only apply in async modes.
max_threads: int = _DEFAULT_REST_SINK_MAX_THREADS
max_pending_requests: int = 2000
max_threads: pydantic.PositiveInt = _DEFAULT_REST_SINK_MAX_THREADS
max_pending_requests: pydantic.PositiveInt = 2000

# Only applies in async batch mode.
max_per_batch: int = 100
max_per_batch: pydantic.PositiveInt = 100


@dataclasses.dataclass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ def __init__(
self.process_batch = process_batch
self.min_process_interval = min_process_interval
self.read_from_pending_interval = read_from_pending_interval
assert self.max_workers > 1
assert self.max_workers >= 1

self._state_lock = threading.Lock()
self._executor = ThreadPoolExecutor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ def task(id: str) -> str:
assert len(done_tasks) == 16


def test_batch_partition_executor_sequential_key_execution():
@pytest.mark.parametrize("max_workers", [1, 2, 10])
def test_batch_partition_executor_sequential_key_execution(max_workers: int) -> None:
executing_tasks = set()
done_tasks = set()
done_task_batches = set()
Expand All @@ -99,7 +100,7 @@ def process_batch(batch):
done_task_batches.add(tuple(id for _, id in batch))

with BatchPartitionExecutor(
max_workers=2,
max_workers=max_workers,
max_pending=10,
max_per_batch=2,
process_batch=process_batch,
Expand Down

0 comments on commit d953718

Please sign in to comment.