Skip to content

rainforestapp/queue_classic_plus

Repository files navigation

QueueClassicPlus

rainforestapp

queue_classic is a simple Postgresql backed DB queue. However, it's a little too simple to use it as the main queueing system of a medium to large app. This was developed at Rainforest QA.

QueueClassicPlus adds many lacking features to QueueClassic.

  • Standardized job format
  • Retry on specific exceptions
  • Singleton jobs
  • Metrics
  • Error logging / handling
  • Transactions
  • Rails generator to create new jobs

Compatibility

This version of the matchers are compatible with queue_classic 3.1+ which includes built-in scheduling. See other branches for other compatible versions.

Installation

Add these line to your application's Gemfile:

gem 'queue_classic_plus'

And then execute:

$ bundle

Run the migration

  QueueClassicPlus.migrate

Usage

Create a new job

rails g qc_plus_job test_job
# /app/jobs/my_job.rb
class Jobs::MyJob < QueueClassicPlus::Base
  # Specified the queue name
  @queue = :low

  # Extry up to 5 times when SomeException is raised
  retry! on: SomeException, max: 5

  def self.perform(a, b)
    # ...
  end
end

# In your code, you can enqueue this task like so:
Jobs::MyJob.do(1, "foo")

# You can also schedule a job in the future by doing
Jobs::MyJob.enqueue_perform_in(1.hour, 1, "foo")

Run the QueueClassicPlus worker

QueueClassicPlus ships with its own worker and a rake task to run it. You need to use this worker to take advance of many features of QueueClassicPlus.

QUEUE=low bundle exec qc_plus:work

Other jobs options

Singleton Job

It's common for background jobs to never need to be enqueed multiple time. QueueClassicPlus support these type of single jobs. Here's an example one:

class Jobs::UpdateMetrics < QueueClassicPlus::Base
  @queue = :low

  # Use the lock! keyword to prevent the job from being enqueud once.
  lock!

  def self.perform(metric_type)
    # ...
  end
end

Note that lock! only prevents the same job from beeing enqued multiple times if the argument match.

So in our example:

Jobs::UpdateMetrics.do 'type_a' # enqueues job
Jobs::UpdateMetrics.do 'type_a' # does not enqueues job since it's already queued
Jobs::UpdateMetrics.do 'type_b' # enqueues job as the arguments are different.

Transactions

By default, all QueueClassicPlus jobs are executed in a PostgreSQL transaction. This decision was made because most jobs are usually pretty small and it's preferable to have all the benefits of the transaction. You can optionally specify a postgres statement timeout (in seconds) for all transactions with the environment variable POSTGRES_STATEMENT_TIMEOUT.

You can disable this feature on a per job basis in the following way:

class Jobs::NoTransaction < QueueClassicPlus::Base
  # Don't run the perform method in a transaction
  skip_transaction!
  @queue = :low

  def self.perform(user_id)
    # ...
  end
end

Callbacks

Jobs support the following callbacks:

before_enqueue
after_enqueue
before_perform
after_perform
  • enqueue callbacks are called when the job is initially enqueued. Caveats:
    • not called on retries
    • not called when scheduled in the future (i.e. Job.enqueue_perform_in)
    • still called if enqueuing fails because a job with lock! is already enqueued.
  • perform callbacks are called any time the job is picked up and run by a worker, including retries and jobs scheduled in the future.

Callbacks can either be implemented by providing a method to be called:

class Jobs::WithCallbackMethods < QueueClassicPlus::Base
  before_enqueue :before_enqueue_method

  @queue = :low

  def self.before_enqueue_method(*args)
    # ...
  end

  def self.perform(user_id)
    # ...
  end
end

or by providing a block:

class Jobs::WithCallbackBlocks < QueueClassicPlus::Base
  before_enqueue do |*args|
    # ...
  end

  @queue = :low

  def self.perform(user_id)
    # ...
  end
end

The args passed to the callback method/block will be the same as the arguments passed to QueueClassicPlus::Base.do and QueueClassicPlus::Base.perform.

Advanced configuration

If you want to log exceptions in your favorite exception tracker. You can configured it like so:

QueueClassicPlus.exception_handler = -> (exception, job) do
  Sentry.capture_exception(exception, extra: { job: job, env: ENV })
end

If you use Librato, we push useful metrics directly to them.

Push metrics to your metric provider (only Librato is supported for now).

QueueClassicPlus.update_metrics

Call this in a cron job or something similar.

If you are using New Relic and want to push performance data to it, you can add this to an initializer:

require "queue_classic_plus/new_relic"

To instrument Datadog monitoring add this to your QC initializer:

require "queue_classic_plus/datadog"

The Datadog service name defaults to qc.job. This can be changed in the initializer:

QueueClassicDatadog.config.dd_service = "custom_service_name"

Contributing

  1. Fork it ( https://github.com/[my-github-username]/queue_classic_plus/fork )
  • Create your feature branch (git checkout -b my-new-feature)
  • Commit your changes (git commit -am 'Add some feature')
  • Push to the branch (git push origin my-new-feature)
  • Create a new Pull Request

Setting up the test database

createdb queue_classic_plus_test

Releasing

Releasing is done in CircleCI via the push_to_rubygems, triggered by pushing a tagged commit. You can create a new tag while creating a new GitHub release.

** Note: The tag is what publishes a new version of the gem. The release is purely for documentation.