From 5b6b8347db77072c01a72bcf32ba2866b8aa6257 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A9mence=20Lesn=C3=A9?= Date: Sat, 17 Aug 2024 02:23:52 +0200 Subject: [PATCH] fix: Retry job state update in some edge cases --- app/scrape.py | 128 ++++++++++++++++++++++++++++++-------------------- 1 file changed, 76 insertions(+), 52 deletions(-) diff --git a/app/scrape.py b/app/scrape.py index 8147387..1d5900f 100644 --- a/app/scrape.py +++ b/app/scrape.py @@ -304,6 +304,82 @@ async def _process_one( return (1, queued_urls, new_result.network_used_mb) +async def _update_job_state( + blob: ContainerClient, + network_used_mb: int, + processed: int, + queued: int, +) -> None: + # Update job state + state_blob = blob.get_blob_client(JOB_STATE_NAME) + + # Acquire a lease + logger.debug("Acquiring lease for job state") + lease_continue = True + while lease_continue: + try: + state_lease = await state_blob.acquire_lease( + lease_duration=15, + lease_id=str(uuid4()), + ) + lease_continue = False + + except ResourceExistsError: # Wait for the lease to expire + logger.debug("Lease already exists, waiting") + await asyncio.sleep(1) + + except ResourceNotFoundError: # Create the blob if it does not exist + logger.debug("State blob does not exist, creating an empty one") + await state_blob.upload_blob( + data=b"", + length=0, + overwrite=True, + ) + + # Parse existing state + try: + f = await state_blob.download_blob(encoding="utf-8") + state = StateJobModel.model_validate_json(await f.readall()) + except (ResourceNotFoundError, ValidationError): + state = StateJobModel() + + # Update state + state.last_updated = datetime.now(UTC) + state.network_used_mb += network_used_mb + state.processed += processed + state.queued += queued + + # Save state + await state_blob.upload_blob( + data=state.model_dump_json().encode("utf-8"), + lease=state_lease.id, + length=len(state.model_dump_json()), + overwrite=True, + ) + + # Release the lease + try: + await state_lease.release() + except ResourceNotFoundError: + # The lease has already expired, the 15 seconds are up; thread was probably paused for an unknown reason + # TODO: Set a maximum number of retries to avoid infinite loops + logger.warning( + "Retry updating job state, lease expired, thread was paused; if this message appears in loop, fill an issue" + ) + return _update_job_state( + blob=blob, + network_used_mb=network_used_mb, + processed=processed, + queued=queued, + ) + + logger.info( + "Updated job state to %i processed and %i queued", + state.processed, + state.queued, + ) + + async def _worker( cache_refresh: timedelta, job: str, @@ -399,58 +475,6 @@ async def _worker( exc_info=True, ) - # Update job state - state_blob = blob.get_blob_client(JOB_STATE_NAME) - # Acquire a lease - logger.debug("Acquiring lease for job state") - lease_continue = True - while lease_continue: - try: - state_lease = await state_blob.acquire_lease( - lease_duration=15, - lease_id=str(uuid4()), - ) - lease_continue = False - except ResourceExistsError: # Wait for the lease to expire - logger.debug("Lease already exists, waiting") - await asyncio.sleep(1) - except ( - ResourceNotFoundError - ): # Create the blob if it does not exist - logger.debug( - "State blob does not exist, creating an empty one" - ) - await state_blob.upload_blob( - data=b"", - length=0, - overwrite=True, - ) - # Parse existing state - try: - f = await state_blob.download_blob(encoding="utf-8") - state = StateJobModel.model_validate_json(await f.readall()) - except (ResourceNotFoundError, ValidationError): - state = StateJobModel() - # Update state - state.last_updated = datetime.now(UTC) - state.network_used_mb += total_network_used_mb - state.processed += total_processed - state.queued += total_queued - # Save state - await state_blob.upload_blob( - data=state.model_dump_json().encode("utf-8"), - lease=state_lease.id, - length=len(state.model_dump_json()), - overwrite=True, - ) - # Release the lease - await state_lease.release() - logger.info( - "Updated job state to %i processed and %i queued", - state.processed, - state.queued, - ) - # Wait 3 sec to avoid spamming the queue if it is empty await asyncio.sleep(3)