Skip to content

Commit

Permalink
Add Task queue for ARPA Treasury Reports (#2095)
Browse files Browse the repository at this point in the history
* chore: add needed code to run export in queue

* fix: linting

* Apply suggestions from code review

Co-authored-by: Tyler Hendrickson <[email protected]>

---------

Co-authored-by: Tyler Hendrickson <[email protected]>
  • Loading branch information
as1729 and TylerHendrickson authored Oct 16, 2023
1 parent d3a85a8 commit 63d0abb
Show file tree
Hide file tree
Showing 5 changed files with 247 additions and 4 deletions.
21 changes: 21 additions & 0 deletions packages/server/src/arpa_reporter/routes/exports.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ const express = require('express');
const router = express.Router();
const { HeadObjectCommand, GetObjectCommand } = require('@aws-sdk/client-s3');
const _ = require('lodash');
const { SendMessageCommand } = require('@aws-sdk/client-sqs');

const aws = require('../../lib/gost-aws');
const { requireUser, getAdminAuthInfo } = require('../../lib/access-helpers');
Expand Down Expand Up @@ -55,6 +56,26 @@ router.get('/', requireUser, async (req, res) => {
}
const tenantId = useTenantId();

if (req.query.queue) {
// Special handling for deferring treasury report generation and sending to a task queue
console.log('/api/exports?queue=true GET');
console.log('Generating Async treasury report via task queue');
try {
const user = useUser();
const sqs = aws.getSQSClient();
await sqs.send(new SendMessageCommand({
QueueUrl: process.env.ARPA_TREASURY_REPORT_SQS_QUEUE_URL,
MessageBody: JSON.stringify({ userId: user.userId }),
}));
res.json({ success: true });
return;
} catch (error) {
console.log(`Failed to generate and send treasury report ${error}`);
res.status(500).json({ error: 'Unable to generate treasury report and send email.' });
return;
}
}

if (req.query.async) {
// Special handling for async treasury report generation and sending.
console.log('/api/exports?async=true GET');
Expand Down
27 changes: 27 additions & 0 deletions packages/server/src/arpa_reporter/services/generate-arpa-report.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ const asyncBatch = require('async-batch').default;
const { PutObjectCommand } = require('@aws-sdk/client-s3');
const aws = require('../../lib/gost-aws');

const { getUser } = require('../../db');
const { applicationSettings } = require('../db/settings');
const { log } = require('../../lib/logging');
const { listRecipientsForReportingPeriod } = require('../db/arpa-subrecipients');
const { getTemplate } = require('./get-template');
const email = require('../../lib/email');
Expand Down Expand Up @@ -1024,10 +1026,35 @@ async function generateAndSendEmail(recipientEmail, periodId, tenantId) {
}
}

async function processSQSMessageRequest(message) {
let requestData;
try {
requestData = JSON.parse(message.Body);
} catch (err) {
log.error({ err }, 'error parsing request data from SQS message');
return false;
}

try {
const user = await getUser(requestData.userId);
if (!user) {
throw new Error(`user not found: ${requestData.userId}`);
}
await generateAndSendEmail(user.email, user.tenant_id);
} catch (err) {
log.error({ err }, 'failed to generate and send audit report');
return false;
}

log.info('successfully completed SQS message request');
return true;
}

module.exports = {
generateReport,
sendEmailWithLink,
generateAndSendEmail,
processSQSMessageRequest,
};

// NOTE: This file was copied from src/server/services/generate-arpa-report.js (git @ ada8bfdc98) in the arpa-reporter repo on 2022-09-23T20:05:47.735Z
62 changes: 62 additions & 0 deletions packages/server/src/scripts/arpaTreasuryReport.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#!/usr/bin/env node
const tracer = require('dd-trace').init(); // eslint-disable-line no-unused-vars
const { ReceiveMessageCommand, DeleteMessageCommand } = require('@aws-sdk/client-sqs');
const { log } = require('../lib/logging');
const { getSQSClient } = require('../lib/gost-aws');
const { processSQSMessageRequest } = require('../arpa_reporter/services/generate-arpa-report');

async function main() {
let shutDownRequested = false;
const requestShutdown = (signal) => {
log.warn({ signal }, 'Shutdown signal received. Requesting shutdown...');
shutDownRequested = true;
};
process.on('SIGTERM', requestShutdown);
process.on('SIGINT', requestShutdown);

const queueUrl = process.env.TASK_QUEUE_URL;
const sqs = getSQSClient();
while (shutDownRequested === false) {
// eslint-disable-next-line no-await-in-loop
await tracer.trace('arpaTreasuryReport', async () => {
log.info({ queueUrl }, 'Long-polling next SQS message batch');
const receiveResp = await sqs.send(new ReceiveMessageCommand({
QueueUrl: process.env.TASK_QUEUE_URL, WaitTimeSeconds: 20, MaxNumberOfMessages: 1,
}));
const message = (receiveResp?.Messages || [])[0];
if (message !== undefined) {
const msgLog = log.child({ sqs: { message: { ReceiptHandle: message.ReceiptHandle } } });
tracer.scope().active().setTag('message_received', 'true');
const processingSuccessful = await tracer.trace('processSQSMessageRequest',
async (span) => {
try {
return await processSQSMessageRequest(message);
} catch (e) {
msgLog.error(e, 'Error processing SQS message request for ARPA treasury report');
span.setTag('error', e);
}
return false;
});
if (processingSuccessful === true) {
msgLog.info('Deleting successfully-processed SQS message');
tracer.scope().active().setTag('processing_successful', 'true');
await sqs.send(new DeleteMessageCommand({
QueueUrl: queueUrl,
ReceiptHandle: message.ReceiptHandle,
}));
} else {
msgLog.warn('SQS message was not processed successfully; will not delete');
tracer.scope().active().setTag('processing_successful', 'false');
}
} else {
tracer.scope().active().setTag('message_received', 'false');
log.info('Empty messages batch received from SQS');
}
});
}
log.warn('Shutting down');
}

if (require.main === module) {
main().then(() => process.exit());
}
32 changes: 32 additions & 0 deletions terraform/datadog_monitors.tf
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,35 @@ resource "datadog_monitor" "arpa_audit_report-task_failed" {
evaluation_delay = local.dd_monitor_default_evaluation_delay
tags = local.dd_monitor_default_tags
}

resource "datadog_monitor" "arpa_treasury_report-task_failed" {
count = var.datadog_monitors_enabled ? 1 : 0

name = "${local.dd_monitor_name_prefix}: ARPA treasury report job failed"
type = "metric alert"
message = join("\n", [
"{{#is_alert}}",
"Alert: One or more ARPA treasury report requests were received from the SQS source queue but several attempts to handle them have failed.",
"As a result, failing SQS messages have been redirected to the SQS dead-letter queue (DLQ).",
"Investigate the issue (especially by checking arpa_treasury_report ECS task logs).",
"Once the issue is resolved, redrive the DLQ messages back to the source queue and/or delete DLQ messages if they are no longer needed.",
"This monitor will not return to normal while there are messages in the DLQ.",
"IMPORTANT: DLQ messages are not retained indefinitely; investigation and remediation is time-sensitive.",
"{{/is_alert}}",
"{{#is_recovery}}",
"Recovery: There are no longer messages in the DLQ.",
"{{/is_recovery}}",
"Notify: ${local.dd_monitor_default_notify}",
])

query = join("", [
"min(last_1h):avg:",
"aws.sqs.approximate_number_of_messages_visible",
"{env:${var.env},queuename:${module.arpa_treasury_report.sqs_dlq_name}}",
" > 0"
])

notify_no_data = false
evaluation_delay = local.dd_monitor_default_evaluation_delay
tags = local.dd_monitor_default_tags
}
109 changes: 105 additions & 4 deletions terraform/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,16 @@ module "arpa_audit_report_security_group" {
allow_all_egress = true
}

module "arpa_treasury_report_security_group" {
source = "cloudposse/security-group/aws"
version = "2.2.0"

namespace = var.namespace
vpc_id = data.aws_ssm_parameter.vpc_id.value
attributes = ["arpa_treasury_report"]
allow_all_egress = true
}

resource "aws_ecs_cluster" "default" {
count = anytrue([var.api_enabled]) ? 1 : 0

Expand Down Expand Up @@ -136,6 +146,7 @@ module "api" {
security_group_ids = [
module.consume_grants_to_postgres_security_group.id,
module.arpa_audit_report_security_group.id,
module.arpa_treasury_report_security_group.id,
]

# Cluster
Expand All @@ -153,7 +164,8 @@ module "api" {
unified_service_tags = local.unified_service_tags
datadog_environment_variables = var.api_datadog_environment_variables
api_container_environment = merge(var.api_container_environment, {
ARPA_AUDIT_REPORT_SQS_QUEUE_URL = module.arpa_audit_report.sqs_queue_url
ARPA_AUDIT_REPORT_SQS_QUEUE_URL = module.arpa_audit_report.sqs_queue_url
ARPA_TREASURY_REPORT_SQS_QUEUE_URL = module.arpa_treasury_report.sqs_queue_url
})

# DNS
Expand Down Expand Up @@ -308,6 +320,94 @@ resource "aws_iam_role_policy" "api_task-publish_to_arpa_audit_report_queue" {
policy = data.aws_iam_policy_document.publish_to_arpa_audit_report_queue.json
}

module "arpa_treasury_report" {
source = "./modules/sqs_consumer_task"
namespace = "${var.namespace}-treasury_report"
permissions_boundary_arn = local.permissions_boundary_arn

# Networking
subnet_ids = local.private_subnet_ids
security_group_ids = [module.arpa_treasury_report_security_group.id]

# Task configuration
ecs_cluster_name = join("", aws_ecs_cluster.default.*.name)
docker_tag = var.api_container_image_tag
unified_service_tags = local.unified_service_tags
stop_timeout_seconds = 120
consumer_task_command = ["node", "./src/scripts/arpaTreasuryReport.js"]
consumer_container_environment = {
API_DOMAIN = "https://${local.api_domain_name}"
AUDIT_REPORT_BUCKET = module.api.arpa_audit_reports_bucket_id
DATA_DIR = "/var/data"
LOG_LEVEL = "DEBUG"
LOG_SRC_ENABLED = "false"
NODE_OPTIONS = "--max_old_space_size=3584" # Reserve 512 MB for other task resources
NOTIFICATIONS_EMAIL = "grants-notifications@${var.website_domain_name}"
WEBSITE_DOMAIN = "https://${var.website_domain_name}"
}
datadog_environment_variables = {
DD_LOGS_INJECTION = "true"
DD_PROFILING_ENABLED = "true"
}
consumer_task_efs_volume_mounts = [{
name = "data"
container_path = "/var/data"
read_only = false
file_system_id = module.api.efs_data_volume_id
access_point_id = module.api.efs_data_volume_access_point_id
}]
additional_task_role_json_policies = {
rw-audit-reports-bucket = data.aws_iam_policy_document.arpa_audit_report_rw_reports_bucket.json
send-emails = module.api.send_emails_policy_json
}

# Task resource configuration
# TODO: Tune these values after observing usage in different environments.
# See also: --max_old_space_size in NODE_OPTIONS env var.
consumer_task_size = {
cpu = 1024 # 1 vCPU
memory = 4096 # 4 GB
}

# Messaging
autoscaling_message_thresholds = [1, 3, 5, 10, 20, 50]
sqs_publisher = {
principal_type = "Service"
principal_identifier = "ecs-tasks.amazonaws.com"
source_arn = module.api.ecs_service_arn
}
sqs_max_receive_count = 2
sqs_visibility_timeout_seconds = 900 # 15 minutes, in seconds
sqs_dlq_message_retention_seconds = 1209600 # 14 days, in seconds

# Logging
log_retention = var.api_log_retention_in_days

# Secrets
ssm_path_prefix = var.ssm_service_parameters_path_prefix

# Postgres
rds_db_connect_resources = module.postgres.rds_db_connect_resources_list
postgres_username = module.postgres.master_username
postgres_endpoint = module.postgres.cluster_endpoint
postgres_port = module.postgres.cluster_port
postgres_db_name = module.postgres.default_db_name
}

data "aws_iam_policy_document" "publish_to_arpa_treasury_report_queue" {
statement {
sid = "AllowPublishToQueue"
actions = ["sqs:SendMessage"]
resources = [module.arpa_treasury_report.sqs_queue_arn]
}
}

resource "aws_iam_role_policy" "api_task-publish_to_arpa_treasury_report_queue" {
name_prefix = "send-arpa-treasury-report-requests"
role = module.api.ecs_task_role_name
policy = data.aws_iam_policy_document.publish_to_arpa_treasury_report_queue.json
}

module "postgres" {
enabled = var.postgres_enabled
source = "./modules/gost_postgres"
Expand All @@ -318,9 +418,10 @@ module "postgres" {
vpc_id = data.aws_ssm_parameter.vpc_id.value
subnet_ids = local.private_subnet_ids
ingress_security_groups = {
from_api = module.api_to_postgres_security_group.id
from_consume_grants = module.consume_grants_to_postgres_security_group.id
from_arpa_audit_report = module.arpa_audit_report_security_group.id
from_api = module.api_to_postgres_security_group.id
from_consume_grants = module.consume_grants_to_postgres_security_group.id
from_arpa_audit_report = module.arpa_audit_report_security_group.id
from_arpa_treasury_report = module.arpa_treasury_report_security_group.id
}

prevent_destroy = var.postgres_prevent_destroy
Expand Down

0 comments on commit 63d0abb

Please sign in to comment.