From 483593603a3daa64c6b4fc9cf60651c411b12275 Mon Sep 17 00:00:00 2001 From: David Cristofaro Date: Mon, 4 Mar 2024 13:11:55 +1100 Subject: [PATCH] Support bulk enqueue with differing class and options Allow bulk enqueue of multiple different job classes and differing job options in a single `.bulk_enqueue` block. Each job can now differ by job class, queue, priority, run at and tags (in addition to args and kwargs). --- docs/README.md | 4 +- lib/que/job.rb | 153 ++++++++++++++------------- spec/que/job.bulk_enqueue_spec.rb | 168 ++++++++++++++++++++++++++---- 3 files changed, 227 insertions(+), 98 deletions(-) diff --git a/docs/README.md b/docs/README.md index 4b5e1f46..25ef9283 100644 --- a/docs/README.md +++ b/docs/README.md @@ -849,13 +849,11 @@ Que.bulk_enqueue do end ``` -The jobs are only actually enqueued at the end of the block, at which point they are inserted into the database in one big query. +The jobs are only actually enqueued at the end of the block, at which point they are inserted into the database in one big query. `job_options` may be provided to `.bulk_enqueue` as defaults for the entire block. Alternatively, `job_options` may be individually provided to `.enqueue` and will take priority over block options. Limitations: - ActiveJob is not supported -- All jobs must use the same job class -- All jobs must use the same `job_options` (`job_options` must be provided to `.bulk_enqueue` instead of `.enqueue`) - The `que_attrs` of a job instance returned from `.enqueue` is empty (`{}`) - The notify trigger is not run by default, so jobs will only be picked up by a worker upon its next poll diff --git a/lib/que/job.rb b/lib/que/job.rb index 6093b162..50f48ba6 100644 --- a/lib/que/job.rb +++ b/lib/que/job.rb @@ -29,21 +29,18 @@ class Job SQL[:bulk_insert_jobs] = %{ - WITH args_and_kwargs as ( - SELECT * from json_to_recordset(coalesce($5, '[{args:{},kwargs:{}}]')::json) as x(args jsonb, kwargs jsonb) - ) INSERT INTO public.que_jobs (queue, priority, run_at, job_class, args, kwargs, data, job_schema_version) SELECT - coalesce($1, 'default')::text, - coalesce($2, 100)::smallint, - coalesce($3, now())::timestamptz, - $4::text, - args_and_kwargs.args, - args_and_kwargs.kwargs, - coalesce($6, '{}')::jsonb, + coalesce(queue, 'default')::text, + coalesce(priority, 100)::smallint, + coalesce(run_at, now())::timestamptz, + job_class::text, + coalesce(args, '[]')::jsonb, + coalesce(kwargs, '{}')::jsonb, + coalesce(data, '{}')::jsonb, #{Que.job_schema_version} - FROM args_and_kwargs + FROM json_populate_recordset(null::que_jobs, $1) RETURNING * } @@ -82,6 +79,9 @@ def enqueue(*args) job_options = kwargs.delete(:job_options) || {} + job_class = job_options[:job_class] || name || + raise(Error, "Can't enqueue an anonymous subclass of Que::Job") + if job_options[:tags] if job_options[:tags].length > MAXIMUM_TAGS_COUNT raise Que::Error, "Can't enqueue a job with more than #{MAXIMUM_TAGS_COUNT} tags! (passed #{job_options[:tags].length})" @@ -94,28 +94,40 @@ def enqueue(*args) end end - attrs = { - queue: job_options[:queue] || resolve_que_setting(:queue) || Que.default_queue, - priority: job_options[:priority] || resolve_que_setting(:priority), - run_at: job_options[:run_at] || resolve_que_setting(:run_at), - args: args, - kwargs: kwargs, - data: job_options[:tags] ? { tags: job_options[:tags] } : {}, - job_class: \ - job_options[:job_class] || name || - raise(Error, "Can't enqueue an anonymous subclass of Que::Job"), - } - if Thread.current[:que_jobs_to_bulk_insert] + # Don't resolve class settings during `.enqueue`, only resolve them + # during `._bulk_enqueue_insert` so they can be overwritten by specifying + # them in `.bulk_enqueue`. + attrs = { + queue: job_options[:queue], + priority: job_options[:priority], + run_at: job_options[:run_at], + job_class: job_class == 'Que::Job' ? nil : job_class, + args: args, + kwargs: kwargs, + data: job_options[:tags] && { tags: job_options[:tags] }, + klass: self, + } + if self.name == 'ActiveJob::QueueAdapters::QueAdapter::JobWrapper' raise Que::Error, "Que.bulk_enqueue does not support ActiveJob." end - raise Que::Error, "When using .bulk_enqueue, job_options must be passed to that method rather than .enqueue" unless job_options == {} - Thread.current[:que_jobs_to_bulk_insert][:jobs_attrs] << attrs - new({}) - elsif attrs[:run_at].nil? && resolve_que_setting(:run_synchronously) + return new({}) + end + + attrs = { + queue: job_options[:queue] || resolve_que_setting(:queue) || Que.default_queue, + priority: job_options[:priority] || resolve_que_setting(:priority), + run_at: job_options[:run_at] || resolve_que_setting(:run_at), + job_class: job_class, + args: args, + kwargs: kwargs, + data: job_options[:tags] ? { tags: job_options[:tags] } : {}, + } + + if attrs[:run_at].nil? && resolve_que_setting(:run_synchronously) attrs.merge!( args: Que.deserialize_json(Que.serialize_json(attrs[:args])), kwargs: Que.deserialize_json(Que.serialize_json(attrs[:kwargs])), @@ -144,16 +156,13 @@ def bulk_enqueue(job_options: {}, notify: false) jobs_attrs = Thread.current[:que_jobs_to_bulk_insert][:jobs_attrs] job_options = Thread.current[:que_jobs_to_bulk_insert][:job_options] return [] if jobs_attrs.empty? - raise Que::Error, "When using .bulk_enqueue, all jobs enqueued must be of the same job class" unless jobs_attrs.map { |attrs| attrs[:job_class] }.uniq.one? - args_and_kwargs_array = jobs_attrs.map { |attrs| attrs.slice(:args, :kwargs) } - klass = job_options[:job_class] ? Que::Job : Que.constantize(jobs_attrs.first[:job_class]) - klass._bulk_enqueue_insert(args_and_kwargs_array, job_options: job_options, notify: notify) + _bulk_enqueue_insert(jobs_attrs, job_options: job_options, notify: notify) ensure Thread.current[:que_jobs_to_bulk_insert] = nil end - def _bulk_enqueue_insert(args_and_kwargs_array, job_options: {}, notify:) - raise 'Unexpected bulk args format' if !args_and_kwargs_array.is_a?(Array) || !args_and_kwargs_array.all? { |a| a.is_a?(Hash) } + def _bulk_enqueue_insert(jobs_attrs, job_options: {}, notify: false) + raise 'Unexpected bulk args format' if !jobs_attrs.is_a?(Array) || !jobs_attrs.all? { |a| a.is_a?(Hash) } if job_options[:tags] if job_options[:tags].length > MAXIMUM_TAGS_COUNT @@ -167,49 +176,43 @@ def _bulk_enqueue_insert(args_and_kwargs_array, job_options: {}, notify:) end end - args_and_kwargs_array = args_and_kwargs_array.map do |args_and_kwargs| - args_and_kwargs.merge( - args: args_and_kwargs.fetch(:args, []), - kwargs: args_and_kwargs.fetch(:kwargs, {}), - ) - end - - attrs = { - queue: job_options[:queue] || resolve_que_setting(:queue) || Que.default_queue, - priority: job_options[:priority] || resolve_que_setting(:priority), - run_at: job_options[:run_at] || resolve_que_setting(:run_at), - args_and_kwargs_array: args_and_kwargs_array, - data: job_options[:tags] ? { tags: job_options[:tags] } : {}, - job_class: \ - job_options[:job_class] || name || - raise(Error, "Can't enqueue an anonymous subclass of Que::Job"), - } - - if attrs[:run_at].nil? && resolve_que_setting(:run_synchronously) - args_and_kwargs_array = Que.deserialize_json(Que.serialize_json(attrs.delete(:args_and_kwargs_array))) - args_and_kwargs_array.map do |args_and_kwargs| - _run_attrs( - attrs.merge( - args: args_and_kwargs.fetch(:args), - kwargs: args_and_kwargs.fetch(:kwargs), - ), + jobs_attrs = jobs_attrs.map do |attrs| + klass = attrs[:klass] || self + + attrs = { + queue: attrs[:queue] || job_options[:queue] || klass.resolve_que_setting(:queue) || Que.default_queue, + priority: attrs[:priority] || job_options[:priority] || klass.resolve_que_setting(:priority), + run_at: attrs[:run_at] || job_options[:run_at] || klass.resolve_que_setting(:run_at), + job_class: attrs[:job_class] || job_options[:job_class] || klass.name, + args: attrs[:args] || [], + kwargs: attrs[:kwargs] || {}, + data: attrs[:data] || (job_options[:tags] ? { tags: job_options[:tags] } : {}), + klass: klass + } + + if attrs[:run_at].nil? && klass.resolve_que_setting(:run_synchronously) + klass._run_attrs( + attrs.reject { |k| k == :klass }.merge( + args: Que.deserialize_json(Que.serialize_json(attrs[:args])), + kwargs: Que.deserialize_json(Que.serialize_json(attrs[:kwargs])), + data: Que.deserialize_json(Que.serialize_json(attrs[:data])), + ) ) + nil + else + attrs end - else - attrs.merge!( - args_and_kwargs_array: Que.serialize_json(attrs[:args_and_kwargs_array]), - data: Que.serialize_json(attrs[:data]), - ) - values_array = - Que.transaction do - Que.execute('SET LOCAL que.skip_notify TO true') unless notify - Que.execute( - :bulk_insert_jobs, - attrs.values_at(:queue, :priority, :run_at, :job_class, :args_and_kwargs_array, :data), - ) - end - values_array.map(&method(:new)) - end + end.compact + + values_array = + Que.transaction do + Que.execute('SET LOCAL que.skip_notify TO true') unless notify + Que.execute( + :bulk_insert_jobs, + [Que.serialize_json(jobs_attrs.map { |attrs| attrs.reject { |k| k == :klass } })] + ) + end + values_array.zip(jobs_attrs).map { |values, attrs| attrs.fetch(:klass).new(values) } end def run(*args) @@ -237,7 +240,7 @@ def resolve_que_setting(setting, *args) end end - private + protected def _run_attrs(attrs) attrs[:error_count] = 0 diff --git a/spec/que/job.bulk_enqueue_spec.rb b/spec/que/job.bulk_enqueue_spec.rb index 51a4982b..1b2523e6 100644 --- a/spec/que/job.bulk_enqueue_spec.rb +++ b/spec/que/job.bulk_enqueue_spec.rb @@ -194,6 +194,24 @@ def assert_enqueue( end end + it "should be able to handle multiple different job classes" do + class MyJobClass < Que::Job; end + class MyJobOtherClass < Que::Job; end + + assert_enqueue( + expected_count: 3, + expected_args: [[1], [4], []], + expected_kwargs: [{ two: '3' }, { five: '6' }, {}], + expected_job_classes: [MyJobClass, MyJobOtherClass, Que::Job], + ) do + Que.bulk_enqueue do + MyJobClass.enqueue(1, two: '3') + MyJobOtherClass.enqueue(4, five: '6') + Que.enqueue + end + end + end + it "should error appropriately on an anonymous job subclass" do klass = Class.new(Que::Job) error = assert_raises(Que::Error) { Que.bulk_enqueue { klass.enqueue } } @@ -278,14 +296,6 @@ def assert_enqueue( end end - it "should raise when job_options are passed to .enqueue rather than .bulk_enqueue" do - assert_raises_with_message(Que::Error, "When using .bulk_enqueue, job_options must be passed to that method rather than .enqueue") do - Que.bulk_enqueue do - Que.enqueue(1, two: "3", job_options: { priority: 15 }) - end - end - end - describe "when enqueuing jobs with tags" do it "should be able to specify tags on a case-by-case basis" do assert_enqueue( @@ -301,6 +311,20 @@ def assert_enqueue( end end + it "should be respect tags passed to .enqueue" do + assert_enqueue( + expected_count: 2, + expected_args: [[1], [4]], + expected_kwargs: [{ two: "3" }, { five: "six" }], + expected_tags: [["tag_3", "tag_4"], ["tag_1", "tag_2"]], + ) do + Que.bulk_enqueue(job_options: { tags: ["tag_1", "tag_2"] }) do + Que.enqueue(1, two: "3", job_options: { tags: ["tag_3", "tag_4"] }) + Que.enqueue(4, five: "six") + end + end + end + it "should no longer fall back to using tags specified at the top level if not specified in job_options" do assert_enqueue( expected_count: 2, @@ -347,19 +371,39 @@ def assert_enqueue( end end - it "should respect a job class defined as a string" do - class MyJobClass < Que::Job; end + describe "job class string" do + it "should respect a job class defined as a string" do + class MyJobClass < Que::Job; end - assert_enqueue( - expected_count: 2, - expected_args: [[1], [4]], - expected_kwargs: [{ two: "3" }, { five: "six" }], - expected_job_classes: [MyJobClass, MyJobClass], - expected_result_classes: [Que::Job, Que::Job], - ) do - Que.bulk_enqueue(job_options: { job_class: 'MyJobClass' }) do - Que.enqueue(1, two: "3") - Que.enqueue(4, five: "six") + assert_enqueue( + expected_count: 2, + expected_args: [[1], [4]], + expected_kwargs: [{ two: "3" }, { five: "six" }], + expected_job_classes: [MyJobClass, MyJobClass], + expected_result_classes: [Que::Job, Que::Job], + ) do + Que.bulk_enqueue(job_options: { job_class: 'MyJobClass' }) do + Que.enqueue(1, two: "3") + Que.enqueue(4, five: "six") + end + end + end + + it "should respect a job class defined as a string passed to .enqueue" do + class MyJobClass < Que::Job; end + class MyJobOtherClass < Que::Job; end + + assert_enqueue( + expected_count: 2, + expected_args: [[1], [4]], + expected_kwargs: [{ two: "3" }, { five: "six" }], + expected_job_classes: [MyJobOtherClass, MyJobClass], + expected_result_classes: [Que::Job, Que::Job], + ) do + Que.bulk_enqueue(job_options: { job_class: 'MyJobClass' }) do + Que.enqueue(1, two: "3", job_options: { job_class: 'MyJobOtherClass' }) + Que.enqueue(4, five: "six") + end end end end @@ -443,6 +487,34 @@ class QueueSubclassJob < QueueDefaultJob end end + it "should respect a priority passed to .enqueue" do + assert_enqueue( + expected_count: 2, + expected_args: [[1], [4]], + expected_kwargs: [{ two: "3" }, { five: "six" }], + expected_priorities: [2, 3], + expected_job_classes: [PrioritySubclassJob, PrioritySubclassJob] + ) do + Que.bulk_enqueue do + PrioritySubclassJob.enqueue(1, two: "3", job_options: { priority: 2 }) + PrioritySubclassJob.enqueue(4, five: "six") + end + end + + assert_enqueue( + expected_count: 2, + expected_args: [[1], [4]], + expected_kwargs: [{ two: "3" }, { five: "six" }], + expected_priorities: [2, 4], + expected_job_classes: [PrioritySubclassJob, PrioritySubclassJob] + ) do + Que.bulk_enqueue(job_options: { priority: 4 }) do + PrioritySubclassJob.enqueue(1, two: "3", job_options: { priority: 2 }) + PrioritySubclassJob.enqueue(4, five: "six") + end + end + end + it "should respect an overridden priority in a job class" do begin PrioritySubclassJob.priority = 60 @@ -535,6 +607,34 @@ class QueueSubclassJob < QueueDefaultJob end end + it "should respect a run_at passed to .enqueue" do + assert_enqueue( + expected_count: 2, + expected_args: [[1], [4]], + expected_kwargs: [{ two: "3" }, { five: "six" }], + expected_run_ats: [Time.now + 10, Time.now + 30], + expected_job_classes: [RunAtDefaultJob, RunAtDefaultJob] + ) do + Que.bulk_enqueue do + RunAtDefaultJob.enqueue(1, two: "3", job_options: { run_at: Time.now + 10 }) + RunAtDefaultJob.enqueue(4, five: "six") + end + end + + assert_enqueue( + expected_count: 2, + expected_args: [[1], [4]], + expected_kwargs: [{ two: "3" }, { five: "six" }], + expected_run_ats: [Time.now + 10, Time.now + 60], + expected_job_classes: [RunAtDefaultJob, RunAtDefaultJob] + ) do + Que.bulk_enqueue(job_options: { run_at: Time.now + 60 }) do + RunAtDefaultJob.enqueue(1, two: "3", job_options: { run_at: Time.now + 10 }) + RunAtDefaultJob.enqueue(4, five: "six") + end + end + end + it "should respect an overridden run_at in a job class" do begin RunAtSubclassJob.run_at = -> {Time.now + 90} @@ -627,6 +727,34 @@ class QueueSubclassJob < QueueDefaultJob end end + it "should respect a queue passed to .enqueue" do + assert_enqueue( + expected_count: 2, + expected_args: [[1], [4]], + expected_kwargs: [{ two: "3" }, { five: "six" }], + expected_queues: ['queue_4', 'queue_1'], + expected_job_classes: [QueueSubclassJob, QueueSubclassJob] + ) do + Que.bulk_enqueue do + QueueSubclassJob.enqueue(1, two: "3", job_options: { queue: 'queue_4' }) + QueueSubclassJob.enqueue(4, five: "six") + end + end + + assert_enqueue( + expected_count: 2, + expected_args: [[1], [4]], + expected_kwargs: [{ two: "3" }, { five: "six" }], + expected_queues: ['queue_4', 'queue_3'], + expected_job_classes: [QueueSubclassJob, QueueSubclassJob] + ) do + Que.bulk_enqueue(job_options: { queue: 'queue_3' }) do + QueueSubclassJob.enqueue(1, two: "3", job_options: { queue: 'queue_4' }) + QueueSubclassJob.enqueue(4, five: "six") + end + end + end + it "should respect an overridden queue in a job class" do begin QueueSubclassJob.queue = :queue_2