diff --git a/app/celery/process_ses_receipts_tasks.py b/app/celery/process_ses_receipts_tasks.py index c03df0c98..c0b7d3413 100644 --- a/app/celery/process_ses_receipts_tasks.py +++ b/app/celery/process_ses_receipts_tasks.py @@ -12,7 +12,7 @@ send_complaint_to_service, send_delivery_status_to_service, ) -from app.config import QueueNames +from app.config import Config, QueueNames from app.dao import notifications_dao from app.dao.complaint_dao import save_complaint from app.dao.notifications_dao import dao_get_notification_history_by_reference @@ -65,7 +65,9 @@ def process_ses_results(self, response): f"Callback may have arrived before notification was" f"persisted to the DB. Adding task to retry queue" ) - self.retry(queue=QueueNames.RETRY) + self.retry( + queue=QueueNames.RETRY, expires=Config.DEFAULT_REDIS_EXPIRE_TIME + ) else: current_app.logger.warning( f"Notification not found for reference: {reference} " @@ -115,7 +117,7 @@ def process_ses_results(self, response): except Exception: current_app.logger.exception("Error processing SES results") - self.retry(queue=QueueNames.RETRY) + self.retry(queue=QueueNames.RETRY, expires=Config.DEFAULT_REDIS_EXPIRE_TIME) def determine_notification_bounce_type(ses_message): diff --git a/app/celery/provider_tasks.py b/app/celery/provider_tasks.py index b0c6a4c9b..1e6c7e96f 100644 --- a/app/celery/provider_tasks.py +++ b/app/celery/provider_tasks.py @@ -10,7 +10,7 @@ from app.clients.email import EmailClientNonRetryableException from app.clients.email.aws_ses import AwsSesClientThrottlingSendRateException from app.clients.sms import SmsClientResponseException -from app.config import QueueNames +from app.config import Config, QueueNames from app.dao import notifications_dao from app.dao.notifications_dao import ( sanitize_successful_notification_by_id, @@ -152,9 +152,15 @@ def deliver_sms(self, notification_id): try: if self.request.retries == 0: - self.retry(queue=QueueNames.RETRY, countdown=0) + self.retry( + queue=QueueNames.RETRY, + countdown=0, + expires=Config.DEFAULT_REDIS_EXPIRE_TIME, + ) else: - self.retry(queue=QueueNames.RETRY) + self.retry( + queue=QueueNames.RETRY, expires=Config.DEFAULT_REDIS_EXPIRE_TIME + ) except self.MaxRetriesExceededError: message = ( "RETRY FAILED: Max retries reached. The task send_sms_to_provider failed for notification {}. " @@ -203,7 +209,7 @@ def deliver_email(self, notification_id): f"RETRY: Email notification {notification_id} failed" ) - self.retry(queue=QueueNames.RETRY) + self.retry(queue=QueueNames.RETRY, expires=Config.DEFAULT_REDIS_EXPIRE_TIME) except self.MaxRetriesExceededError: message = ( "RETRY FAILED: Max retries reached. " diff --git a/app/celery/tasks.py b/app/celery/tasks.py index 7526e6619..f27cc4b75 100644 --- a/app/celery/tasks.py +++ b/app/celery/tasks.py @@ -7,7 +7,7 @@ from app import create_uuid, encryption, notify_celery from app.aws import s3 from app.celery import provider_tasks -from app.config import QueueNames +from app.config import Config, QueueNames from app.dao.inbound_sms_dao import dao_get_inbound_sms_by_id from app.dao.jobs_dao import dao_get_job_by_id, dao_update_job from app.dao.notifications_dao import ( @@ -146,6 +146,7 @@ def process_row(row, template, job, service, sender_id=None): ), task_kwargs, queue=QueueNames.DATABASE, + expires=Config.DEFAULT_REDIS_EXPIRE_TIME, ) return notification_id @@ -369,7 +370,7 @@ def save_api_email_or_sms(self, encrypted_notification): except SQLAlchemyError: try: - self.retry(queue=QueueNames.RETRY) + self.retry(queue=QueueNames.RETRY, expires=Config.DEFAULT_REDIS_EXPIRE_TIME) except self.MaxRetriesExceededError: current_app.logger.exception( f"Max retry failed Failed to persist notification {notification['id']}", @@ -390,7 +391,11 @@ def handle_exception(task, notification, notification_id, exc): # This probably (hopefully) is not an issue with Redis as the celery backing store current_app.logger.exception("Retry" + retry_msg) try: - task.retry(queue=QueueNames.RETRY, exc=exc) + task.retry( + queue=QueueNames.RETRY, + exc=exc, + expires=Config.DEFAULT_REDIS_EXPIRE_TIME, + ) except task.MaxRetriesExceededError: current_app.logger.exception("Max retry failed" + retry_msg) @@ -439,7 +444,9 @@ def send_inbound_sms_to_service(self, inbound_sms_id, service_id): ) if not isinstance(e, HTTPError) or e.response.status_code >= 500: try: - self.retry(queue=QueueNames.RETRY) + self.retry( + queue=QueueNames.RETRY, expires=Config.DEFAULT_REDIS_EXPIRE_TIME + ) except self.MaxRetriesExceededError: current_app.logger.exception( "Retry: send_inbound_sms_to_service has retried the max number of" diff --git a/app/config.py b/app/config.py index 53ea039a0..12159e289 100644 --- a/app/config.py +++ b/app/config.py @@ -53,6 +53,7 @@ class TaskNames(object): class Config(object): NOTIFY_APP_NAME = "api" + DEFAULT_REDIS_EXPIRE_TIME = 4 * 24 * 60 * 60 NOTIFY_ENVIRONMENT = getenv("NOTIFY_ENVIRONMENT", "development") # URL of admin app ADMIN_BASE_URL = getenv("ADMIN_BASE_URL", "http://localhost:6012") diff --git a/tests/app/celery/test_provider_tasks.py b/tests/app/celery/test_provider_tasks.py index a22a3fb93..80d5a0d7e 100644 --- a/tests/app/celery/test_provider_tasks.py +++ b/tests/app/celery/test_provider_tasks.py @@ -1,4 +1,5 @@ import json +from unittest.mock import ANY import pytest from botocore.exceptions import ClientError @@ -148,7 +149,7 @@ def test_should_add_to_retry_queue_if_notification_not_found_in_deliver_sms_task deliver_sms(notification_id) app.delivery.send_to_providers.send_sms_to_provider.assert_not_called() app.celery.provider_tasks.deliver_sms.retry.assert_called_with( - queue="retry-tasks", countdown=0 + queue="retry-tasks", countdown=0, expires=ANY ) @@ -208,7 +209,7 @@ def test_should_go_into_technical_error_if_exceeds_retries_on_deliver_sms_task( assert str(sample_notification.id) in str(e.value) provider_tasks.deliver_sms.retry.assert_called_with( - queue="retry-tasks", countdown=0 + queue="retry-tasks", countdown=0, expires=ANY ) assert sample_notification.status == NotificationStatus.TEMPORARY_FAILURE @@ -240,7 +241,7 @@ def test_should_add_to_retry_queue_if_notification_not_found_in_deliver_email_ta deliver_email(notification_id) app.delivery.send_to_providers.send_email_to_provider.assert_not_called() app.celery.provider_tasks.deliver_email.retry.assert_called_with( - queue="retry-tasks" + queue="retry-tasks", expires=ANY ) @@ -268,7 +269,9 @@ def test_should_go_into_technical_error_if_exceeds_retries_on_deliver_email_task deliver_email(sample_notification.id) assert str(sample_notification.id) in str(e.value) - provider_tasks.deliver_email.retry.assert_called_with(queue="retry-tasks") + provider_tasks.deliver_email.retry.assert_called_with( + queue="retry-tasks", expires=ANY + ) assert sample_notification.status == NotificationStatus.TECHNICAL_FAILURE diff --git a/tests/app/celery/test_scheduled_tasks.py b/tests/app/celery/test_scheduled_tasks.py index 90a29f5ed..ae9ea571d 100644 --- a/tests/app/celery/test_scheduled_tasks.py +++ b/tests/app/celery/test_scheduled_tasks.py @@ -415,6 +415,7 @@ def test_check_for_missing_rows_in_completed_jobs_calls_save_email( ), {}, queue="database-tasks", + expires=ANY, ) diff --git a/tests/app/celery/test_tasks.py b/tests/app/celery/test_tasks.py index 5720b15f9..4fccfb8cb 100644 --- a/tests/app/celery/test_tasks.py +++ b/tests/app/celery/test_tasks.py @@ -1,7 +1,7 @@ import json import uuid from datetime import datetime, timedelta -from unittest.mock import Mock, call +from unittest.mock import ANY, Mock, call import pytest import requests_mock @@ -115,6 +115,7 @@ def test_should_process_sms_job(sample_job, mocker): (str(sample_job.service_id), "uuid", "something_encrypted"), {}, queue="database-tasks", + expires=ANY, ) job = jobs_dao.dao_get_job_by_id(sample_job.id) assert job.job_status == JobStatus.FINISHED @@ -135,6 +136,7 @@ def test_should_process_sms_job_with_sender_id(sample_job, mocker, fake_uuid): (str(sample_job.service_id), "uuid", "something_encrypted"), {"sender_id": fake_uuid}, queue="database-tasks", + expires=ANY, ) @@ -179,6 +181,7 @@ def test_should_process_job_if_send_limits_are_not_exceeded( ), {}, queue="database-tasks", + expires=ANY, ) @@ -237,6 +240,7 @@ def test_should_process_email_job(email_job_with_placeholders, mocker): ), {}, queue="database-tasks", + expires=ANY, ) job = jobs_dao.dao_get_job_by_id(email_job_with_placeholders.id) assert job.job_status == JobStatus.FINISHED @@ -262,6 +266,7 @@ def test_should_process_email_job_with_sender_id( (str(email_job_with_placeholders.service_id), "uuid", "something_encrypted"), {"sender_id": fake_uuid}, queue="database-tasks", + expires=ANY, ) @@ -351,6 +356,7 @@ def test_process_row_sends_letter_task( ), {}, queue=expected_queue, + expires=ANY, ) @@ -387,6 +393,7 @@ def test_process_row_when_sender_id_is_provided(mocker, fake_uuid): ), {"sender_id": fake_uuid}, queue="database-tasks", + expires=ANY, ) @@ -839,7 +846,9 @@ def test_save_sms_should_go_to_retry_queue_if_database_errors(sample_template, m encryption.encrypt(notification), ) assert provider_tasks.deliver_sms.apply_async.called is False - tasks.save_sms.retry.assert_called_with(exc=expected_exception, queue="retry-tasks") + tasks.save_sms.retry.assert_called_with( + exc=expected_exception, queue="retry-tasks", expires=ANY + ) assert _get_notification_query_count() == 0 @@ -868,7 +877,7 @@ def test_save_email_should_go_to_retry_queue_if_database_errors( ) assert not provider_tasks.deliver_email.apply_async.called tasks.save_email.retry.assert_called_with( - exc=expected_exception, queue="retry-tasks" + exc=expected_exception, queue="retry-tasks", expires=ANY ) assert _get_notification_query_count() == 0