Skip to content

Commit

Permalink
proof of concept
Browse files Browse the repository at this point in the history
  • Loading branch information
pjurewicz committed Jan 30, 2024
1 parent 6b55ae5 commit 1f64ddc
Show file tree
Hide file tree
Showing 3 changed files with 315 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# frozen_string_literal: true

module RailsEventStore
class AfterCommitBatchAsyncDispatcher
def initialize(scheduler:, batch_scheduler:)
@scheduler = scheduler
@batch_scheduler = batch_scheduler
@transactions_records = {}
@subscription_records = Hash.new { |h, k| h[k] = [] }
end

def call(subscriber, _, record)
transaction = ActiveRecord::Base.connection.current_transaction

if transaction.joinable?
transaction.add_record(
@transactions_records[transaction] = AsyncRecord.new(
-> { run(transaction) },
-> { clear(transaction) }
)
) unless @transactions_records[transaction]
@subscription_records[transaction] << [subscriber, record]
else
@scheduler.call(subscriber, record)
end
end

def clear(transaction)
@transactions_records.delete(transaction)
@subscription_records.delete(transaction)
end

def run(transaction)
@batch_scheduler.call(@subscription_records[transaction])
end

def verify(subscriber)
@scheduler.verify(subscriber) && @batch_scheduler.verify(subscriber)
end

class AsyncRecord
def initialize(schedule_proc, clear_proc)
@schedule_proc = schedule_proc
@clear_proc = clear_proc
end

def committed!(*)
@schedule_proc.call
@clear_proc.call
end

def rolledback!(*)
@clear_proc.call
end

def before_committed!; end

def trigger_transactional_callbacks?; end
end
end

class ActiveJobBatchScheduler
def initialize(serializer:)
@serializer = serializer
end

def call(records)
ActiveJob.perform_all_later(records.map { |subscriber, record| serialize(subscriber, record) })
end

def verify(subscriber)
if Class === subscriber
!!(subscriber < ActiveJob::Base)
else
subscriber.instance_of?(ActiveJob::ConfiguredJob)
end
end

def serialize(subscriber, record)
subscriber.new(record.serialize(serializer).to_h.transform_keys(&:to_s))
end

private

attr_reader :serializer
end
end
1 change: 1 addition & 0 deletions rails_event_store/lib/rails_event_store/all.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
require_relative "async_handler_helpers"
require_relative "link_by_metadata"
require_relative "after_commit_async_dispatcher"
require_relative "after_commit_batch_async_dispatcher"
require_relative "active_job_scheduler"
require_relative 'active_job_id_only_scheduler'
require_relative "client"
Expand Down
227 changes: 227 additions & 0 deletions rails_event_store/spec/after_commit_batch_async_dispatcher_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
# frozen_string_literal: true

require "spec_helper"
require "ruby_event_store/spec/dispatcher_lint"

module RailsEventStore
::RSpec.describe AfterCommitBatchAsyncDispatcher do
DummyError = Class.new(StandardError)

class DummyRecord < ActiveRecord::Base
self.table_name = "dummy_records"
after_commit -> { raise DummyError }
end

it_behaves_like :dispatcher, AfterCommitBatchAsyncDispatcher.new(scheduler: ActiveJobScheduler.new(serializer: RubyEventStore::Serializers::YAML), batch_scheduler: ActiveJobBatchScheduler.new(serializer: RubyEventStore::Serializers::YAML))

let(:event) { TimeEnrichment.with(RubyEventStore::Event.new(event_id: "83c3187f-84f6-4da7-8206-73af5aca7cc8")) }
let(:record) { RubyEventStore::Mappers::Default.new.event_to_record(event) }
let(:serialized_record) { record.serialize(RubyEventStore::Serializers::YAML).to_h.transform_keys(&:to_s) }

let(:dispatcher) { AfterCommitBatchAsyncDispatcher.new(scheduler: ActiveJobScheduler.new(serializer: RubyEventStore::Serializers::YAML), batch_scheduler: ActiveJobBatchScheduler.new(serializer: RubyEventStore::Serializers::YAML)) }

before(:each) { MyActiveJobAsyncHandler.reset }

it "dispatch job immediately when no transaction is open" do
expect_to_have_enqueued_job(MyActiveJobAsyncHandler) { dispatcher.call(MyActiveJobAsyncHandler, event, record) }
expect(MyActiveJobAsyncHandler.received).to be_nil
MyActiveJobAsyncHandler.perform_enqueued_jobs
expect(MyActiveJobAsyncHandler.received).to eq(serialized_record)
end

it "dispatch job only after transaction commit" do
expect_to_have_enqueued_job(MyActiveJobAsyncHandler) do
ActiveRecord::Base.transaction do
expect_no_enqueued_job(MyActiveJobAsyncHandler) { dispatcher.call(MyActiveJobAsyncHandler, event, record) }
end
end
expect(MyActiveJobAsyncHandler.received).to be_nil
MyActiveJobAsyncHandler.perform_enqueued_jobs
expect(MyActiveJobAsyncHandler.received).to eq(serialized_record)
end

context "when transaction is rolledback" do
it "does not dispatch job" do
expect_no_enqueued_job(MyActiveJobAsyncHandler) do
ActiveRecord::Base.transaction do
dispatcher.call(MyActiveJobAsyncHandler, event, record)
raise ::ActiveRecord::Rollback
end
end
MyActiveJobAsyncHandler.perform_enqueued_jobs
expect(MyActiveJobAsyncHandler.received).to be_nil
end

context "when raise_in_transactional_callbacks is enabled" do
around { |example| with_raise_in_transactional_callbacks { example.run } }

it "does not dispatch job" do
expect_no_enqueued_job(MyActiveJobAsyncHandler) do
ActiveRecord::Base.transaction do
dispatcher.call(MyActiveJobAsyncHandler, event, record)
raise ::ActiveRecord::Rollback
end
end
MyActiveJobAsyncHandler.perform_enqueued_jobs
expect(MyActiveJobAsyncHandler.received).to be_nil
end
end
end

it "dispatch job only after top-level transaction (nested is not new) commit" do
expect_to_have_enqueued_job(MyActiveJobAsyncHandler) do
ActiveRecord::Base.transaction do
expect_no_enqueued_job(MyActiveJobAsyncHandler) do
ActiveRecord::Base.transaction(requires_new: false) { dispatcher.call(MyActiveJobAsyncHandler, event, record) }
end
end
end
expect(MyActiveJobAsyncHandler.received).to be_nil
MyActiveJobAsyncHandler.perform_enqueued_jobs
expect(MyActiveJobAsyncHandler.received).to eq(serialized_record)
end

it "dispatch job only after top-level transaction commit" do
expect_to_have_enqueued_job(MyActiveJobAsyncHandler) do
ActiveRecord::Base.transaction do
expect_no_enqueued_job(MyActiveJobAsyncHandler) do
ActiveRecord::Base.transaction(requires_new: true) { dispatcher.call(MyActiveJobAsyncHandler, event, record) }
end
end
end
expect(MyActiveJobAsyncHandler.received).to be_nil
MyActiveJobAsyncHandler.perform_enqueued_jobs
expect(MyActiveJobAsyncHandler.received).to eq(serialized_record)
end

it "does not dispatch job after nested transaction rollback" do
expect_no_enqueued_job(MyActiveJobAsyncHandler) do
ActiveRecord::Base.transaction do
expect_no_enqueued_job(MyActiveJobAsyncHandler) do
ActiveRecord::Base.transaction(requires_new: true) do
dispatcher.call(MyActiveJobAsyncHandler, event, record)
raise ::ActiveRecord::Rollback
end
end
end
end
MyActiveJobAsyncHandler.perform_enqueued_jobs
expect(MyActiveJobAsyncHandler.received).to be_nil
end

context "when an exception is raised within after commit callback" do
before { ActiveRecord::Schema.define { create_table(:dummy_records) } }

it "dispatches the job after commit" do
expect_to_have_enqueued_job(MyActiveJobAsyncHandler) do
begin
ActiveRecord::Base.transaction do
DummyRecord.new.save!
expect_no_enqueued_job(MyActiveJobAsyncHandler) { dispatcher.call(MyActiveJobAsyncHandler, event, record) }
end
rescue DummyError
end
end
expect(DummyRecord.count).to eq(1)
expect(MyActiveJobAsyncHandler.received).to be_nil

MyActiveJobAsyncHandler.perform_enqueued_jobs
expect(MyActiveJobAsyncHandler.received).to eq(serialized_record)
end

context "when raise_in_transactional_callbacks is enabled" do
around { |example| with_raise_in_transactional_callbacks { example.run } }

it "dispatches the job after commit" do
expect_to_have_enqueued_job(MyActiveJobAsyncHandler) do
begin
ActiveRecord::Base.transaction do
DummyRecord.new.save!
expect_no_enqueued_job(MyActiveJobAsyncHandler) { dispatcher.call(MyActiveJobAsyncHandler, event, record) }
end
rescue DummyError
end
end
expect(DummyRecord.count).to eq(1)
expect(MyActiveJobAsyncHandler.received).to be_nil

MyActiveJobAsyncHandler.perform_enqueued_jobs
expect(MyActiveJobAsyncHandler.received).to eq(serialized_record)
end
end
end

context "within a non-joinable transaction" do
around { |example| ActiveRecord::Base.transaction(joinable: false) { example.run } }

it "dispatches the job" do
expect_to_have_enqueued_job(MyActiveJobAsyncHandler) { dispatcher.call(MyActiveJobAsyncHandler, event, record) }
end

it "dispatches the job after a nested transaction commits" do
expect_to_have_enqueued_job(MyActiveJobAsyncHandler) do
ActiveRecord::Base.transaction do
expect_no_enqueued_job(MyActiveJobAsyncHandler) { dispatcher.call(MyActiveJobAsyncHandler, event, record) }
end
end
end
end

describe "#verify" do
specify { expect(dispatcher.verify(MyActiveJobAsyncHandler)).to eq(true) }
end

def expect_no_enqueued_job(job)
raise unless block_given?
yield
expect(job.queued).to be_nil
end

def expect_to_have_enqueued_job(job)
raise unless block_given?
yield
expect(job.queued).not_to be_nil
end

def with_raise_in_transactional_callbacks
skip unless ActiveRecord::Base.respond_to?(:raise_in_transactional_callbacks)

old_transaction_config = ActiveRecord::Base.raise_in_transactional_callbacks
ActiveRecord::Base.raise_in_transactional_callbacks = true

yield

ActiveRecord::Base.raise_in_transactional_callbacks = old_transaction_config
end

class MyActiveJobAsyncHandler < ActiveJob::Base
@@received = nil
@@queued = nil

def self.reset
@@received = nil
@@queued = nil
end

def self.queued
@@queued
end

def self.received
@@received
end

def self.perform_enqueued_jobs
@@received = @@queued
end

def self.perform_later(event)
@@queued = event
end

def perform(event)
@@queued = event
end
end
end
end

0 comments on commit 1f64ddc

Please sign in to comment.