Skip to content

Commit

Permalink
fix: Retry job state update in some edge cases
Browse files Browse the repository at this point in the history
  • Loading branch information
clemlesne committed Aug 17, 2024
1 parent df9c269 commit 5b6b834
Showing 1 changed file with 76 additions and 52 deletions.
128 changes: 76 additions & 52 deletions app/scrape.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,82 @@ async def _process_one(
return (1, queued_urls, new_result.network_used_mb)


async def _update_job_state(
blob: ContainerClient,
network_used_mb: int,
processed: int,
queued: int,
) -> None:
# Update job state
state_blob = blob.get_blob_client(JOB_STATE_NAME)

# Acquire a lease
logger.debug("Acquiring lease for job state")
lease_continue = True
while lease_continue:
try:
state_lease = await state_blob.acquire_lease(
lease_duration=15,
lease_id=str(uuid4()),
)
lease_continue = False

except ResourceExistsError: # Wait for the lease to expire
logger.debug("Lease already exists, waiting")
await asyncio.sleep(1)

except ResourceNotFoundError: # Create the blob if it does not exist
logger.debug("State blob does not exist, creating an empty one")
await state_blob.upload_blob(
data=b"",
length=0,
overwrite=True,
)

# Parse existing state
try:
f = await state_blob.download_blob(encoding="utf-8")
state = StateJobModel.model_validate_json(await f.readall())
except (ResourceNotFoundError, ValidationError):
state = StateJobModel()

# Update state
state.last_updated = datetime.now(UTC)
state.network_used_mb += network_used_mb
state.processed += processed
state.queued += queued

# Save state
await state_blob.upload_blob(
data=state.model_dump_json().encode("utf-8"),
lease=state_lease.id,
length=len(state.model_dump_json()),
overwrite=True,
)

# Release the lease
try:
await state_lease.release()
except ResourceNotFoundError:
# The lease has already expired, the 15 seconds are up; thread was probably paused for an unknown reason
# TODO: Set a maximum number of retries to avoid infinite loops
logger.warning(
"Retry updating job state, lease expired, thread was paused; if this message appears in loop, fill an issue"
)
return _update_job_state(
blob=blob,
network_used_mb=network_used_mb,
processed=processed,
queued=queued,
)

logger.info(
"Updated job state to %i processed and %i queued",
state.processed,
state.queued,
)


async def _worker(
cache_refresh: timedelta,
job: str,
Expand Down Expand Up @@ -399,58 +475,6 @@ async def _worker(
exc_info=True,
)

# Update job state
state_blob = blob.get_blob_client(JOB_STATE_NAME)
# Acquire a lease
logger.debug("Acquiring lease for job state")
lease_continue = True
while lease_continue:
try:
state_lease = await state_blob.acquire_lease(
lease_duration=15,
lease_id=str(uuid4()),
)
lease_continue = False
except ResourceExistsError: # Wait for the lease to expire
logger.debug("Lease already exists, waiting")
await asyncio.sleep(1)
except (
ResourceNotFoundError
): # Create the blob if it does not exist
logger.debug(
"State blob does not exist, creating an empty one"
)
await state_blob.upload_blob(
data=b"",
length=0,
overwrite=True,
)
# Parse existing state
try:
f = await state_blob.download_blob(encoding="utf-8")
state = StateJobModel.model_validate_json(await f.readall())
except (ResourceNotFoundError, ValidationError):
state = StateJobModel()
# Update state
state.last_updated = datetime.now(UTC)
state.network_used_mb += total_network_used_mb
state.processed += total_processed
state.queued += total_queued
# Save state
await state_blob.upload_blob(
data=state.model_dump_json().encode("utf-8"),
lease=state_lease.id,
length=len(state.model_dump_json()),
overwrite=True,
)
# Release the lease
await state_lease.release()
logger.info(
"Updated job state to %i processed and %i queued",
state.processed,
state.queued,
)

# Wait 3 sec to avoid spamming the queue if it is empty
await asyncio.sleep(3)

Expand Down

0 comments on commit 5b6b834

Please sign in to comment.