Skip to content

Commit

Permalink
Support async job processing in EBS SQSD middleware
Browse files Browse the repository at this point in the history
  • Loading branch information
alextwoods committed Dec 3, 2024
1 parent 8aa3afa commit 9803a24
Showing 1 changed file with 41 additions and 8 deletions.
49 changes: 41 additions & 8 deletions lib/aws/rails/middleware/elastic_beanstalk_sqsd.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,21 @@ class ElasticBeanstalkSQSD
def initialize(app)
@app = app
@logger = ::Rails.logger

if ENV['AWS_PROCESS_BEANSTALK_WORKER_JOBS_ASYNC']
options = {
min_threads: 0,
max_threads: 1, # Integer(Concurrent.available_processor_count || Concurrent.processor_count),
auto_terminate: true,
max_queue: 1,
idletime: 60, # 1 minute
fallback_policy: :abort # Concurrent::RejectedExecutionError must be handled
}
if ENV['AWS_PROCESS_BEANSTALK_WORKER_THREADS']
options[:max_threads] = Integer(ENV['AWS_PROCESS_BEANSTALK_WORKER_THREADS'])
end
@executor = Concurrent::ThreadPoolExecutor.new(options)
end
end

def call(env)
Expand All @@ -31,22 +46,40 @@ def call(env)
private

def execute_job(request)
if @executor
_execute_job_parallel(request)
else
_execute_job_now(request)
end
end

# Execute a job in the current thread
def _execute_job_now(request)
# Jobs queued from the SQS adapter contain the JSON message in the request body.
job = ::ActiveSupport::JSON.decode(request.body.string)
job_name = job['job_class']
@logger.debug("Executing job: #{job_name}")
_execute_job(job, job_name)
[200, { 'Content-Type' => 'text/plain' }, ["Successfully ran job #{job_name}."]]
rescue NameError
internal_error_response
end

def _execute_job(job, job_name)
::ActiveJob::Base.execute(job)
[200, { 'Content-Type' => 'text/plain' }, ["Successfully ran job #{job_name}."]]
rescue NameError => e
@logger.error("Job #{job_name} could not resolve to a class that inherits from Active Job.")
@logger.error("Error: #{e}")
raise e
internal_error_response
end

# Execute a job using the thread pool executor
def _execute_job_parallel(request)
@executor.post(request) do |request|
job = ::ActiveSupport::JSON.decode(request.body.string)
job_name = job['job_class']
@logger.debug("Executing job [in thread]: #{job_name}")
::ActiveJob::Base.execute(job)
end
[200, { 'Content-Type' => 'text/plain' }, ['Successfully ran queued job']]
rescue Concurrent::RejectedExecutionError
msg = 'No capacity to execute job.'
@logger.debug(msg)
[429, { 'Content-Type' => 'text/plain' }, [msg]]
end

def execute_periodic_task(request)
Expand Down

0 comments on commit 9803a24

Please sign in to comment.