From 1f64ddcacc07916a73d1331c27ea54f39b4f117e Mon Sep 17 00:00:00 2001 From: pjurewicz Date: Tue, 30 Jan 2024 17:02:54 +0100 Subject: [PATCH] proof of concept --- .../after_commit_batch_async_dispatcher.rb | 87 +++++++ .../lib/rails_event_store/all.rb | 1 + ...fter_commit_batch_async_dispatcher_spec.rb | 227 ++++++++++++++++++ 3 files changed, 315 insertions(+) create mode 100644 rails_event_store/lib/rails_event_store/after_commit_batch_async_dispatcher.rb create mode 100644 rails_event_store/spec/after_commit_batch_async_dispatcher_spec.rb diff --git a/rails_event_store/lib/rails_event_store/after_commit_batch_async_dispatcher.rb b/rails_event_store/lib/rails_event_store/after_commit_batch_async_dispatcher.rb new file mode 100644 index 0000000000..4f5070117d --- /dev/null +++ b/rails_event_store/lib/rails_event_store/after_commit_batch_async_dispatcher.rb @@ -0,0 +1,87 @@ +# frozen_string_literal: true + +module RailsEventStore + class AfterCommitBatchAsyncDispatcher + def initialize(scheduler:, batch_scheduler:) + @scheduler = scheduler + @batch_scheduler = batch_scheduler + @transactions_records = {} + @subscription_records = Hash.new { |h, k| h[k] = [] } + end + + def call(subscriber, _, record) + transaction = ActiveRecord::Base.connection.current_transaction + + if transaction.joinable? + transaction.add_record( + @transactions_records[transaction] = AsyncRecord.new( + -> { run(transaction) }, + -> { clear(transaction) } + ) + ) unless @transactions_records[transaction] + @subscription_records[transaction] << [subscriber, record] + else + @scheduler.call(subscriber, record) + end + end + + def clear(transaction) + @transactions_records.delete(transaction) + @subscription_records.delete(transaction) + end + + def run(transaction) + @batch_scheduler.call(@subscription_records[transaction]) + end + + def verify(subscriber) + @scheduler.verify(subscriber) && @batch_scheduler.verify(subscriber) + end + + class AsyncRecord + def initialize(schedule_proc, clear_proc) + @schedule_proc = schedule_proc + @clear_proc = clear_proc + end + + def committed!(*) + @schedule_proc.call + @clear_proc.call + end + + def rolledback!(*) + @clear_proc.call + end + + def before_committed!; end + + def trigger_transactional_callbacks?; end + end + end + + class ActiveJobBatchScheduler + def initialize(serializer:) + @serializer = serializer + end + + def call(records) + ActiveJob.perform_all_later(records.map { |subscriber, record| serialize(subscriber, record) }) + end + + def verify(subscriber) + if Class === subscriber + !!(subscriber < ActiveJob::Base) + else + subscriber.instance_of?(ActiveJob::ConfiguredJob) + end + end + + def serialize(subscriber, record) + subscriber.new(record.serialize(serializer).to_h.transform_keys(&:to_s)) + end + + private + + attr_reader :serializer + end +end diff --git a/rails_event_store/lib/rails_event_store/all.rb b/rails_event_store/lib/rails_event_store/all.rb index a34890350d..3e7222304e 100644 --- a/rails_event_store/lib/rails_event_store/all.rb +++ b/rails_event_store/lib/rails_event_store/all.rb @@ -4,6 +4,7 @@ require_relative "async_handler_helpers" require_relative "link_by_metadata" require_relative "after_commit_async_dispatcher" +require_relative "after_commit_batch_async_dispatcher" require_relative "active_job_scheduler" require_relative 'active_job_id_only_scheduler' require_relative "client" diff --git a/rails_event_store/spec/after_commit_batch_async_dispatcher_spec.rb b/rails_event_store/spec/after_commit_batch_async_dispatcher_spec.rb new file mode 100644 index 0000000000..1ff06efed0 --- /dev/null +++ b/rails_event_store/spec/after_commit_batch_async_dispatcher_spec.rb @@ -0,0 +1,227 @@ +# frozen_string_literal: true + +require "spec_helper" +require "ruby_event_store/spec/dispatcher_lint" + +module RailsEventStore + ::RSpec.describe AfterCommitBatchAsyncDispatcher do + DummyError = Class.new(StandardError) + + class DummyRecord < ActiveRecord::Base + self.table_name = "dummy_records" + after_commit -> { raise DummyError } + end + + it_behaves_like :dispatcher, AfterCommitBatchAsyncDispatcher.new(scheduler: ActiveJobScheduler.new(serializer: RubyEventStore::Serializers::YAML), batch_scheduler: ActiveJobBatchScheduler.new(serializer: RubyEventStore::Serializers::YAML)) + + let(:event) { TimeEnrichment.with(RubyEventStore::Event.new(event_id: "83c3187f-84f6-4da7-8206-73af5aca7cc8")) } + let(:record) { RubyEventStore::Mappers::Default.new.event_to_record(event) } + let(:serialized_record) { record.serialize(RubyEventStore::Serializers::YAML).to_h.transform_keys(&:to_s) } + + let(:dispatcher) { AfterCommitBatchAsyncDispatcher.new(scheduler: ActiveJobScheduler.new(serializer: RubyEventStore::Serializers::YAML), batch_scheduler: ActiveJobBatchScheduler.new(serializer: RubyEventStore::Serializers::YAML)) } + + before(:each) { MyActiveJobAsyncHandler.reset } + + it "dispatch job immediately when no transaction is open" do + expect_to_have_enqueued_job(MyActiveJobAsyncHandler) { dispatcher.call(MyActiveJobAsyncHandler, event, record) } + expect(MyActiveJobAsyncHandler.received).to be_nil + MyActiveJobAsyncHandler.perform_enqueued_jobs + expect(MyActiveJobAsyncHandler.received).to eq(serialized_record) + end + + it "dispatch job only after transaction commit" do + expect_to_have_enqueued_job(MyActiveJobAsyncHandler) do + ActiveRecord::Base.transaction do + expect_no_enqueued_job(MyActiveJobAsyncHandler) { dispatcher.call(MyActiveJobAsyncHandler, event, record) } + end + end + expect(MyActiveJobAsyncHandler.received).to be_nil + MyActiveJobAsyncHandler.perform_enqueued_jobs + expect(MyActiveJobAsyncHandler.received).to eq(serialized_record) + end + + context "when transaction is rolledback" do + it "does not dispatch job" do + expect_no_enqueued_job(MyActiveJobAsyncHandler) do + ActiveRecord::Base.transaction do + dispatcher.call(MyActiveJobAsyncHandler, event, record) + raise ::ActiveRecord::Rollback + end + end + MyActiveJobAsyncHandler.perform_enqueued_jobs + expect(MyActiveJobAsyncHandler.received).to be_nil + end + + context "when raise_in_transactional_callbacks is enabled" do + around { |example| with_raise_in_transactional_callbacks { example.run } } + + it "does not dispatch job" do + expect_no_enqueued_job(MyActiveJobAsyncHandler) do + ActiveRecord::Base.transaction do + dispatcher.call(MyActiveJobAsyncHandler, event, record) + raise ::ActiveRecord::Rollback + end + end + MyActiveJobAsyncHandler.perform_enqueued_jobs + expect(MyActiveJobAsyncHandler.received).to be_nil + end + end + end + + it "dispatch job only after top-level transaction (nested is not new) commit" do + expect_to_have_enqueued_job(MyActiveJobAsyncHandler) do + ActiveRecord::Base.transaction do + expect_no_enqueued_job(MyActiveJobAsyncHandler) do + ActiveRecord::Base.transaction(requires_new: false) { dispatcher.call(MyActiveJobAsyncHandler, event, record) } + end + end + end + expect(MyActiveJobAsyncHandler.received).to be_nil + MyActiveJobAsyncHandler.perform_enqueued_jobs + expect(MyActiveJobAsyncHandler.received).to eq(serialized_record) + end + + it "dispatch job only after top-level transaction commit" do + expect_to_have_enqueued_job(MyActiveJobAsyncHandler) do + ActiveRecord::Base.transaction do + expect_no_enqueued_job(MyActiveJobAsyncHandler) do + ActiveRecord::Base.transaction(requires_new: true) { dispatcher.call(MyActiveJobAsyncHandler, event, record) } + end + end + end + expect(MyActiveJobAsyncHandler.received).to be_nil + MyActiveJobAsyncHandler.perform_enqueued_jobs + expect(MyActiveJobAsyncHandler.received).to eq(serialized_record) + end + + it "does not dispatch job after nested transaction rollback" do + expect_no_enqueued_job(MyActiveJobAsyncHandler) do + ActiveRecord::Base.transaction do + expect_no_enqueued_job(MyActiveJobAsyncHandler) do + ActiveRecord::Base.transaction(requires_new: true) do + dispatcher.call(MyActiveJobAsyncHandler, event, record) + raise ::ActiveRecord::Rollback + end + end + end + end + MyActiveJobAsyncHandler.perform_enqueued_jobs + expect(MyActiveJobAsyncHandler.received).to be_nil + end + + context "when an exception is raised within after commit callback" do + before { ActiveRecord::Schema.define { create_table(:dummy_records) } } + + it "dispatches the job after commit" do + expect_to_have_enqueued_job(MyActiveJobAsyncHandler) do + begin + ActiveRecord::Base.transaction do + DummyRecord.new.save! + expect_no_enqueued_job(MyActiveJobAsyncHandler) { dispatcher.call(MyActiveJobAsyncHandler, event, record) } + end + rescue DummyError + end + end + expect(DummyRecord.count).to eq(1) + expect(MyActiveJobAsyncHandler.received).to be_nil + + MyActiveJobAsyncHandler.perform_enqueued_jobs + expect(MyActiveJobAsyncHandler.received).to eq(serialized_record) + end + + context "when raise_in_transactional_callbacks is enabled" do + around { |example| with_raise_in_transactional_callbacks { example.run } } + + it "dispatches the job after commit" do + expect_to_have_enqueued_job(MyActiveJobAsyncHandler) do + begin + ActiveRecord::Base.transaction do + DummyRecord.new.save! + expect_no_enqueued_job(MyActiveJobAsyncHandler) { dispatcher.call(MyActiveJobAsyncHandler, event, record) } + end + rescue DummyError + end + end + expect(DummyRecord.count).to eq(1) + expect(MyActiveJobAsyncHandler.received).to be_nil + + MyActiveJobAsyncHandler.perform_enqueued_jobs + expect(MyActiveJobAsyncHandler.received).to eq(serialized_record) + end + end + end + + context "within a non-joinable transaction" do + around { |example| ActiveRecord::Base.transaction(joinable: false) { example.run } } + + it "dispatches the job" do + expect_to_have_enqueued_job(MyActiveJobAsyncHandler) { dispatcher.call(MyActiveJobAsyncHandler, event, record) } + end + + it "dispatches the job after a nested transaction commits" do + expect_to_have_enqueued_job(MyActiveJobAsyncHandler) do + ActiveRecord::Base.transaction do + expect_no_enqueued_job(MyActiveJobAsyncHandler) { dispatcher.call(MyActiveJobAsyncHandler, event, record) } + end + end + end + end + + describe "#verify" do + specify { expect(dispatcher.verify(MyActiveJobAsyncHandler)).to eq(true) } + end + + def expect_no_enqueued_job(job) + raise unless block_given? + yield + expect(job.queued).to be_nil + end + + def expect_to_have_enqueued_job(job) + raise unless block_given? + yield + expect(job.queued).not_to be_nil + end + + def with_raise_in_transactional_callbacks + skip unless ActiveRecord::Base.respond_to?(:raise_in_transactional_callbacks) + + old_transaction_config = ActiveRecord::Base.raise_in_transactional_callbacks + ActiveRecord::Base.raise_in_transactional_callbacks = true + + yield + + ActiveRecord::Base.raise_in_transactional_callbacks = old_transaction_config + end + + class MyActiveJobAsyncHandler < ActiveJob::Base + @@received = nil + @@queued = nil + + def self.reset + @@received = nil + @@queued = nil + end + + def self.queued + @@queued + end + + def self.received + @@received + end + + def self.perform_enqueued_jobs + @@received = @@queued + end + + def self.perform_later(event) + @@queued = event + end + + def perform(event) + @@queued = event + end + end + end +end