From d24bd82087fa5c2475f5978b65bfeb407129f36d Mon Sep 17 00:00:00 2001 From: Alex Woods Date: Wed, 4 Dec 2024 11:23:17 -0800 Subject: [PATCH] pr cleanups + add background processing of periodic tasks --- .github/workflows/ci.yml | 1 - .rubocop.yml | 1 + .../middleware/elastic_beanstalk_sqsd.rb | 57 +++++++++++++------ sample-app/Gemfile | 2 +- sample-app/app/jobs/test_job.rb | 3 + .../middleware/elastic_beanstalk_sqsd_spec.rb | 19 +++++++ 6 files changed, 64 insertions(+), 19 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3b8f4c79..fe625ef1 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -57,7 +57,6 @@ jobs: with: ruby-version: ${{ matrix.ruby }} bundler-cache: true - bundler: latest - name: Test run: bundle exec rake spec diff --git a/.rubocop.yml b/.rubocop.yml index 8486dee8..e09d4b7b 100644 --- a/.rubocop.yml +++ b/.rubocop.yml @@ -17,6 +17,7 @@ Metrics/BlockLength: - 'spec/**/*.rb' Metrics/MethodLength: + Max: 15 Exclude: - 'spec/**/*.rb' diff --git a/lib/aws/rails/middleware/elastic_beanstalk_sqsd.rb b/lib/aws/rails/middleware/elastic_beanstalk_sqsd.rb index 6a3baa1b..991f8cc7 100644 --- a/lib/aws/rails/middleware/elastic_beanstalk_sqsd.rb +++ b/lib/aws/rails/middleware/elastic_beanstalk_sqsd.rb @@ -9,7 +9,9 @@ def initialize(app) @app = app @logger = ::Rails.logger - init_executor if ENV['AWS_PROCESS_BEANSTALK_WORKER_JOBS_ASYNC'] + return unless ENV['AWS_PROCESS_BEANSTALK_WORKER_JOBS_ASYNC'] + + @executor = init_executor end def call(env) @@ -27,7 +29,7 @@ def call(env) end # Execute job or periodic task based on HTTP request context - periodic_task?(request) ? execute_periodic_task(request) : execute_job(request) + execute(request) end def shutdown(timeout = nil) @@ -44,19 +46,28 @@ def shutdown(timeout = nil) def init_executor threads = Integer(ENV.fetch('AWS_PROCESS_BEANSTALK_WORKER_THREADS', Concurrent.available_processor_count || Concurrent.processor_count)) - puts "Running with threads: #{threads}" options = { max_threads: threads, max_queue: 1, + auto_terminate: false, # register our own at_exit to gracefully shutdown fallback_policy: :abort # Concurrent::RejectedExecutionError must be handled } - @executor = Concurrent::ThreadPoolExecutor.new(options) at_exit { shutdown } + + Concurrent::ThreadPoolExecutor.new(options) + end + + def execute(request) + if periodic_task?(request) + execute_periodic_task(request) + else + execute_job(request) + end end def execute_job(request) if @executor - _execute_job_parallel(request) + _execute_job_background(request) else _execute_job_now(request) end @@ -77,10 +88,10 @@ def _execute_job_now(request) end # Execute a job using the thread pool executor - def _execute_job_parallel(request) + def _execute_job_background(request) job_data = ::ActiveSupport::JSON.decode(request.body.string) + @logger.debug("Queuing background job: #{job_data['job_class']}") @executor.post(job_data) do |job| - @logger.debug("Executing job [in thread]: #{job['job_class']}") ::ActiveJob::Base.execute(job) end [200, { 'Content-Type' => 'text/plain' }, ["Successfully queued job #{job_data['job_class']}"]] @@ -93,21 +104,33 @@ def _execute_job_parallel(request) def execute_periodic_task(request) # The beanstalk worker SQS Daemon will add the 'X-Aws-Sqsd-Taskname' for periodic tasks set in cron.yaml. job_name = request.headers['X-Aws-Sqsd-Taskname'] - @logger.debug("Creating and executing periodic task: #{job_name}") - _execute_periodic_task(job_name) - [200, { 'Content-Type' => 'text/plain' }, ["Successfully ran periodic task #{job_name}."]] - rescue NameError - internal_error_response - end - - def _execute_periodic_task(job_name) job = job_name.constantize.new - job.perform_now + if @executor + _execute_periodic_task_background(job) + else + _execute_periodic_task_now(job) + end rescue NameError => e @logger.error("Periodic task #{job_name} could not resolve to an Active Job class " \ '- check the cron name spelling and set the path as / in cron.yaml.') @logger.error("Error: #{e}.") - raise e + internal_error_response + end + + def _execute_periodic_task_now(job) + @logger.debug("Executing periodic task: #{job.class}") + job.perform_now + [200, { 'Content-Type' => 'text/plain' }, ["Successfully ran periodic task #{job.class}."]] + end + + def _execute_periodic_task_background(job) + @logger.debug("Queuing bakground periodic task: #{job.class}") + @executor.post(job, &:perform_now) + [200, { 'Content-Type' => 'text/plain' }, ["Successfully queued periodic task #{job.class}"]] + rescue Concurrent::RejectedExecutionError + msg = 'No capacity to execute periodic task.' + @logger.info(msg) + [429, { 'Content-Type' => 'text/plain' }, [msg]] end def internal_error_response diff --git a/sample-app/Gemfile b/sample-app/Gemfile index e63af5d3..4834a7c5 100644 --- a/sample-app/Gemfile +++ b/sample-app/Gemfile @@ -2,7 +2,7 @@ source "https://rubygems.org" # Our gems # bundle config set local.aws-sdk-rails ../ -gem 'aws-sdk-rails', git: 'https://github.com/aws/aws-sdk-rails', branch: 'main' +gem 'aws-sdk-rails', path: "../" gem 'aws-actiondispatch-dynamodb', git: 'https://github.com/aws/aws-actiondispatch-dynamodb-ruby', branch: 'main' gem 'aws-actionmailbox-ses', git: 'https://github.com/aws/aws-actionmailbox-ses-ruby', branch: 'main' gem 'aws-actionmailer-ses', git: 'https://github.com/aws/aws-actionmailer-ses-ruby', branch: 'main' diff --git a/sample-app/app/jobs/test_job.rb b/sample-app/app/jobs/test_job.rb index 364cc5e0..8fc96579 100644 --- a/sample-app/app/jobs/test_job.rb +++ b/sample-app/app/jobs/test_job.rb @@ -4,5 +4,8 @@ class TestJob < ApplicationJob def perform(*args) puts "Job performed with args: #{args}" + if args[0].is_a?(Hash) && args[0][:error] + raise StandardError, 'Boom - error in job.' + end end end diff --git a/spec/aws/rails/middleware/elastic_beanstalk_sqsd_spec.rb b/spec/aws/rails/middleware/elastic_beanstalk_sqsd_spec.rb index e22bc3dc..bb0d8e96 100644 --- a/spec/aws/rails/middleware/elastic_beanstalk_sqsd_spec.rb +++ b/spec/aws/rails/middleware/elastic_beanstalk_sqsd_spec.rb @@ -242,6 +242,25 @@ module Middleware expect(response[0]).to eq(429) end end + + context 'periodic task' do + let(:is_periodic_task) { true } + + it 'queues job' do + expect_any_instance_of(Concurrent::ThreadPoolExecutor).to receive(:post) + expect(response[0]).to eq(200) + expect(response[2]).to eq(['Successfully queued periodic task ElasticBeanstalkPeriodicTask']) + end + + context 'no capacity' do + it 'returns too many requests error' do + allow_any_instance_of(Concurrent::ThreadPoolExecutor).to receive(:post) + .and_raise Concurrent::RejectedExecutionError + + expect(response[0]).to eq(429) + end + end + end end def stub_runs_in_neither_docker_container