Skip to content

Commit

Permalink
pr cleanups + add background processing of periodic tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
alextwoods committed Dec 4, 2024
1 parent bafbd9f commit d24bd82
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 19 deletions.
1 change: 0 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ jobs:
with:
ruby-version: ${{ matrix.ruby }}
bundler-cache: true
bundler: latest

- name: Test
run: bundle exec rake spec
Expand Down
1 change: 1 addition & 0 deletions .rubocop.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Metrics/BlockLength:
- 'spec/**/*.rb'

Metrics/MethodLength:
Max: 15
Exclude:
- 'spec/**/*.rb'

Expand Down
57 changes: 40 additions & 17 deletions lib/aws/rails/middleware/elastic_beanstalk_sqsd.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ def initialize(app)
@app = app
@logger = ::Rails.logger

init_executor if ENV['AWS_PROCESS_BEANSTALK_WORKER_JOBS_ASYNC']
return unless ENV['AWS_PROCESS_BEANSTALK_WORKER_JOBS_ASYNC']

@executor = init_executor
end

def call(env)
Expand All @@ -27,7 +29,7 @@ def call(env)
end

# Execute job or periodic task based on HTTP request context
periodic_task?(request) ? execute_periodic_task(request) : execute_job(request)
execute(request)
end

def shutdown(timeout = nil)
Expand All @@ -44,19 +46,28 @@ def shutdown(timeout = nil)
def init_executor
threads = Integer(ENV.fetch('AWS_PROCESS_BEANSTALK_WORKER_THREADS',
Concurrent.available_processor_count || Concurrent.processor_count))
puts "Running with threads: #{threads}"
options = {
max_threads: threads,
max_queue: 1,
auto_terminate: false, # register our own at_exit to gracefully shutdown
fallback_policy: :abort # Concurrent::RejectedExecutionError must be handled
}
@executor = Concurrent::ThreadPoolExecutor.new(options)
at_exit { shutdown }

Concurrent::ThreadPoolExecutor.new(options)
end

def execute(request)
if periodic_task?(request)
execute_periodic_task(request)
else
execute_job(request)
end
end

def execute_job(request)
if @executor
_execute_job_parallel(request)
_execute_job_background(request)
else
_execute_job_now(request)
end
Expand All @@ -77,10 +88,10 @@ def _execute_job_now(request)
end

# Execute a job using the thread pool executor
def _execute_job_parallel(request)
def _execute_job_background(request)
job_data = ::ActiveSupport::JSON.decode(request.body.string)
@logger.debug("Queuing background job: #{job_data['job_class']}")
@executor.post(job_data) do |job|
@logger.debug("Executing job [in thread]: #{job['job_class']}")
::ActiveJob::Base.execute(job)
end
[200, { 'Content-Type' => 'text/plain' }, ["Successfully queued job #{job_data['job_class']}"]]
Expand All @@ -93,21 +104,33 @@ def _execute_job_parallel(request)
def execute_periodic_task(request)
# The beanstalk worker SQS Daemon will add the 'X-Aws-Sqsd-Taskname' for periodic tasks set in cron.yaml.
job_name = request.headers['X-Aws-Sqsd-Taskname']
@logger.debug("Creating and executing periodic task: #{job_name}")
_execute_periodic_task(job_name)
[200, { 'Content-Type' => 'text/plain' }, ["Successfully ran periodic task #{job_name}."]]
rescue NameError
internal_error_response
end

def _execute_periodic_task(job_name)
job = job_name.constantize.new
job.perform_now
if @executor
_execute_periodic_task_background(job)
else
_execute_periodic_task_now(job)
end
rescue NameError => e
@logger.error("Periodic task #{job_name} could not resolve to an Active Job class " \
'- check the cron name spelling and set the path as / in cron.yaml.')
@logger.error("Error: #{e}.")
raise e
internal_error_response
end

def _execute_periodic_task_now(job)
@logger.debug("Executing periodic task: #{job.class}")
job.perform_now
[200, { 'Content-Type' => 'text/plain' }, ["Successfully ran periodic task #{job.class}."]]
end

def _execute_periodic_task_background(job)
@logger.debug("Queuing bakground periodic task: #{job.class}")
@executor.post(job, &:perform_now)
[200, { 'Content-Type' => 'text/plain' }, ["Successfully queued periodic task #{job.class}"]]
rescue Concurrent::RejectedExecutionError
msg = 'No capacity to execute periodic task.'
@logger.info(msg)
[429, { 'Content-Type' => 'text/plain' }, [msg]]
end

def internal_error_response
Expand Down
2 changes: 1 addition & 1 deletion sample-app/Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ source "https://rubygems.org"

# Our gems
# bundle config set local.aws-sdk-rails ../
gem 'aws-sdk-rails', git: 'https://github.com/aws/aws-sdk-rails', branch: 'main'
gem 'aws-sdk-rails', path: "../"
gem 'aws-actiondispatch-dynamodb', git: 'https://github.com/aws/aws-actiondispatch-dynamodb-ruby', branch: 'main'
gem 'aws-actionmailbox-ses', git: 'https://github.com/aws/aws-actionmailbox-ses-ruby', branch: 'main'
gem 'aws-actionmailer-ses', git: 'https://github.com/aws/aws-actionmailer-ses-ruby', branch: 'main'
Expand Down
3 changes: 3 additions & 0 deletions sample-app/app/jobs/test_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,8 @@ class TestJob < ApplicationJob

def perform(*args)
puts "Job performed with args: #{args}"
if args[0].is_a?(Hash) && args[0][:error]
raise StandardError, 'Boom - error in job.'
end
end
end
19 changes: 19 additions & 0 deletions spec/aws/rails/middleware/elastic_beanstalk_sqsd_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,25 @@ module Middleware
expect(response[0]).to eq(429)
end
end

context 'periodic task' do
let(:is_periodic_task) { true }

it 'queues job' do
expect_any_instance_of(Concurrent::ThreadPoolExecutor).to receive(:post)
expect(response[0]).to eq(200)
expect(response[2]).to eq(['Successfully queued periodic task ElasticBeanstalkPeriodicTask'])
end

context 'no capacity' do
it 'returns too many requests error' do
allow_any_instance_of(Concurrent::ThreadPoolExecutor).to receive(:post)
.and_raise Concurrent::RejectedExecutionError

expect(response[0]).to eq(429)
end
end
end
end

def stub_runs_in_neither_docker_container
Expand Down

0 comments on commit d24bd82

Please sign in to comment.