ProcessBalancer is a background job runner that is targeted toward the specific use-case of long running jobs.
If you need a job runner that runs small background jobs, look to Sidekiq.
ProcessBalancer has built-in functionality to balance your jobs across multiple instances.
Add this line to your application’s Gemfile:
gem 'process_balancer'
And then execute:
$ bundle
Or install it yourself as:
$ gem install process_balancer
Build a Job class that works through an iteration of your processing. The iteration does not have to be one record, it can be working through 1000 records. The iteration needs to be designed to lock its work atomically so that multiple concurrent workers could be running. The ProcessBalancer takes care of scaling out to run however many workers you want running across however many nodes you have running. Each instance of the Job will have a unique worker_id to ensure they do not trample on each other.
class ProcessQueue < ProcessBalancer::Base
# set a worker locking algorithm
lock_driver :simple_redis
LOCK_SQL = <<~SQL
WITH T as (
SELECT ctid
FROM queue_table
WHERE status = #{QueueRecord::QUEUED} AND lock IS NULL
ORDER BY id
LIMIT 1000
FOR UPDATE SKIP LOCKED
)
UPDATE queue_records
SET lock = :lock, updated_at = :now
WHERE ctid = ANY(ARRAY(SELECT ctid FROM T))
SQL
def lock_records
# grab a # of records and lock them with the worker_id
sql = ActiveRecord::Base.sanitize_sql([LOCK_SQL, {lock: worker_index, now: Time.now}])
ActiveRecord::Base.connection.execute(sql)
# process those records
QueueRecord.where(lock: worker_index)
end
def process_record(entry)
# do processing
# mark record as processed and release the lock on that record
entry.update(lock: nil, status: QueueRecord::PROCESSED)
end
def unlock_records
# if any error occurs unlock any of our unprocessed records
QueueRecord.where(lock: worker_index).update_all(lock: nil)
end
end
Configuration file
jobs:
process_queue:
class: 'ProcessQueue'
Bug reports and pull requests are welcome on GitHub at https://github.com/NetsoftHoldings/process_balancer.
The gem is available as open source under the terms of the LGPLv3 License.