From 82e868c721fe4bd541fadc9fd96baca15303b4e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafa=C5=82=20=C5=81asocha?= Date: Thu, 15 Dec 2022 23:08:00 +0100 Subject: [PATCH] [outbox] Rename Repository into Repositories::Mysql57 --- contrib/ruby_event_store-outbox/.mutant.yml | 2 +- .../lib/ruby_event_store/outbox.rb | 3 +- .../lib/ruby_event_store/outbox/consumer.rb | 4 +- .../ruby_event_store/outbox/repositories.rb | 6 + .../outbox/repositories/mysql57.rb | 162 ++++++++++++++ .../lib/ruby_event_store/outbox/repository.rb | 160 -------------- .../outbox/sidekiq_producer.rb | 4 +- .../outbox/sidekiq_scheduler.rb | 2 +- .../spec/consumer_spec.rb | 52 ++--- .../spec/legacy_sidekiq_scheduler_spec.rb | 8 +- .../spec/repositories/mysql57_spec.rb | 208 ++++++++++++++++++ .../spec/repository_spec.rb | 206 ----------------- .../spec/sidekiq_scheduler_spec.rb | 10 +- 13 files changed, 419 insertions(+), 408 deletions(-) create mode 100644 contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox/repositories.rb create mode 100644 contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox/repositories/mysql57.rb delete mode 100644 contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox/repository.rb create mode 100644 contrib/ruby_event_store-outbox/spec/repositories/mysql57_spec.rb delete mode 100644 contrib/ruby_event_store-outbox/spec/repository_spec.rb diff --git a/contrib/ruby_event_store-outbox/.mutant.yml b/contrib/ruby_event_store-outbox/.mutant.yml index e449621cb8..5874618343 100644 --- a/contrib/ruby_event_store-outbox/.mutant.yml +++ b/contrib/ruby_event_store-outbox/.mutant.yml @@ -34,7 +34,7 @@ matcher: - RubyEventStore::Outbox::Configuration* - RubyEventStore::Outbox::Consumer#get_remaining_count - RubyEventStore::Outbox::CleanupStrategies::None* - - RubyEventStore::Outbox::Repository* + - RubyEventStore::Outbox::Repositories* - RubyEventStore::Outbox::Runner#initialize - RubyEventStore::Outbox::Runner#run - RubyEventStore::Outbox::Runner#prepare_traps diff --git a/contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox.rb b/contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox.rb index 71992d51d2..da72b7b00e 100644 --- a/contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox.rb +++ b/contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox.rb @@ -8,10 +8,11 @@ module Outbox end require_relative "outbox/fetch_specification" -require_relative "outbox/repository" +require_relative "outbox/repositories/mysql57" require_relative "outbox/sidekiq_scheduler" require_relative "outbox/legacy_sidekiq_scheduler" require_relative "outbox/version" require_relative "outbox/tempo" require_relative "outbox/batch_result" require_relative "outbox/cleanup_strategies" +require_relative "outbox/repositories" diff --git a/contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox/consumer.rb b/contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox/consumer.rb index 517787499a..3115de1c82 100644 --- a/contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox/consumer.rb +++ b/contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox/consumer.rb @@ -1,7 +1,7 @@ require "logger" require "redis" require "active_record" -require_relative "repository" +require_relative "repositories/mysql57" require_relative "sidekiq5_format" require_relative "sidekiq_processor" require_relative "fetch_specification" @@ -24,7 +24,7 @@ def initialize(consumer_uuid, configuration, clock: Time, logger:, metrics:) raise "Unknown format" if configuration.message_format != SIDEKIQ5_FORMAT @processor = SidekiqProcessor.new(Redis.new(url: configuration.redis_url)) - @repository = Repository.build_for_consumer(configuration.database_url) + @repository = Repositories::Mysql57.build_for_consumer(configuration.database_url) @cleanup_strategy = CleanupStrategies.build(configuration, repository) end diff --git a/contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox/repositories.rb b/contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox/repositories.rb new file mode 100644 index 0000000000..04070f6e61 --- /dev/null +++ b/contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox/repositories.rb @@ -0,0 +1,6 @@ +module RubyEventStore + module Outbox + module Repositories + end + end +end diff --git a/contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox/repositories/mysql57.rb b/contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox/repositories/mysql57.rb new file mode 100644 index 0000000000..3e8225507a --- /dev/null +++ b/contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox/repositories/mysql57.rb @@ -0,0 +1,162 @@ +# frozen_string_literal: true + +require "active_record" +require "active_support/core_ext/numeric/time.rb" + +module RubyEventStore + module Outbox + module Repositories + class Mysql57 + RECENTLY_LOCKED_DURATION = 10.minutes + + class Record < ::ActiveRecord::Base + self.primary_key = :id + self.table_name = "event_store_outbox" + + def self.remaining_for(fetch_specification) + where(format: fetch_specification.message_format, split_key: fetch_specification.split_key, enqueued_at: nil) + end + + def self.for_fetch_specification(fetch_specification) + where(format: fetch_specification.message_format, split_key: fetch_specification.split_key) + end + + def hash_payload + JSON.parse(payload).deep_symbolize_keys + end + + def enqueued? + !enqueued_at.nil? + end + end + + class Lock < ::ActiveRecord::Base + self.table_name = "event_store_outbox_locks" + + def self.obtain(fetch_specification, process_uuid, clock:) + transaction do + l = get_lock_record(fetch_specification) + + if l.recently_locked?(clock: clock) + :taken + else + l.update!(locked_by: process_uuid, locked_at: clock.now) + l + end + end + rescue ::ActiveRecord::Deadlocked + :deadlocked + rescue ::ActiveRecord::LockWaitTimeout + :lock_timeout + end + + def refresh(clock:) + transaction do + current_process_uuid = locked_by + lock! + if locked_by == current_process_uuid + update!(locked_at: clock.now) + :ok + else + :stolen + end + end + rescue ::ActiveRecord::Deadlocked + :deadlocked + rescue ::ActiveRecord::LockWaitTimeout + :lock_timeout + end + + def self.release(fetch_specification, process_uuid) + transaction do + l = get_lock_record(fetch_specification) + if !l.locked_by?(process_uuid) + :not_taken_by_this_process + else + l.update!(locked_by: nil, locked_at: nil) + :ok + end + end + rescue ::ActiveRecord::Deadlocked + :deadlocked + rescue ::ActiveRecord::LockWaitTimeout + :lock_timeout + end + + def locked_by?(process_uuid) + locked_by.eql?(process_uuid) + end + + def recently_locked?(clock:) + locked_by && locked_at > RECENTLY_LOCKED_DURATION.ago(clock.now) + end + + def fetch_specification + FetchSpecification.new(format, split_key) + end + + private + + def self.lock_for_split_key(fetch_specification) + lock.find_by(format: fetch_specification.message_format, split_key: fetch_specification.split_key) + end + + def self.get_lock_record(fetch_specification) + l = lock_for_split_key(fetch_specification) + if l.nil? + begin + l = create!(format: fetch_specification.message_format, split_key: fetch_specification.split_key) + rescue ::ActiveRecord::RecordNotUnique + l = lock_for_split_key(fetch_specification) + end + end + l + end + end + + def self.build_for_consumer(database_url) + ::ActiveRecord::Base.establish_connection(database_url) unless ::ActiveRecord::Base.connected? + if ::ActiveRecord::Base.connection.adapter_name == "Mysql2" + ::ActiveRecord::Base.connection.execute("SET SESSION innodb_lock_wait_timeout = 1;") + end + new + end + + def insert_record(format, split_key, payload) + Record.create!(format: format, split_key: split_key, payload: payload) + end + + def retrieve_batch(fetch_specification, batch_size) + Record.remaining_for(fetch_specification).order("id ASC").limit(batch_size).to_a + end + + def get_remaining_count(fetch_specification) + Record.remaining_for(fetch_specification).count + end + + def obtain_lock_for_process(fetch_specification, process_uuid, clock:) + Lock.obtain(fetch_specification, process_uuid, clock: clock) + end + + def release_lock_for_process(fetch_specification, process_uuid) + Lock.release(fetch_specification, process_uuid) + end + + def mark_as_enqueued(record, now) + record.update_column(:enqueued_at, now) + end + + def delete_enqueued_older_than(fetch_specification, duration, limit) + scope = Record.for_fetch_specification(fetch_specification).where("enqueued_at < ?", duration.ago) + scope = scope.limit(limit).order(:id) unless limit == :all + scope.delete_all + :ok + rescue ::ActiveRecord::Deadlocked + :deadlocked + rescue ::ActiveRecord::LockWaitTimeout + :lock_timeout + end + end + end + end +end diff --git a/contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox/repository.rb b/contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox/repository.rb deleted file mode 100644 index d481fe6081..0000000000 --- a/contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox/repository.rb +++ /dev/null @@ -1,160 +0,0 @@ -# frozen_string_literal: true - -require "active_record" -require "active_support/core_ext/numeric/time.rb" - -module RubyEventStore - module Outbox - class Repository - RECENTLY_LOCKED_DURATION = 10.minutes - - class Record < ::ActiveRecord::Base - self.primary_key = :id - self.table_name = "event_store_outbox" - - def self.remaining_for(fetch_specification) - where(format: fetch_specification.message_format, split_key: fetch_specification.split_key, enqueued_at: nil) - end - - def self.for_fetch_specification(fetch_specification) - where(format: fetch_specification.message_format, split_key: fetch_specification.split_key) - end - - def hash_payload - JSON.parse(payload).deep_symbolize_keys - end - - def enqueued? - !enqueued_at.nil? - end - end - - class Lock < ::ActiveRecord::Base - self.table_name = "event_store_outbox_locks" - - def self.obtain(fetch_specification, process_uuid, clock:) - transaction do - l = get_lock_record(fetch_specification) - - if l.recently_locked?(clock: clock) - :taken - else - l.update!(locked_by: process_uuid, locked_at: clock.now) - l - end - end - rescue ::ActiveRecord::Deadlocked - :deadlocked - rescue ::ActiveRecord::LockWaitTimeout - :lock_timeout - end - - def refresh(clock:) - transaction do - current_process_uuid = locked_by - lock! - if locked_by == current_process_uuid - update!(locked_at: clock.now) - :ok - else - :stolen - end - end - rescue ::ActiveRecord::Deadlocked - :deadlocked - rescue ::ActiveRecord::LockWaitTimeout - :lock_timeout - end - - def self.release(fetch_specification, process_uuid) - transaction do - l = get_lock_record(fetch_specification) - if !l.locked_by?(process_uuid) - :not_taken_by_this_process - else - l.update!(locked_by: nil, locked_at: nil) - :ok - end - end - rescue ::ActiveRecord::Deadlocked - :deadlocked - rescue ::ActiveRecord::LockWaitTimeout - :lock_timeout - end - - def locked_by?(process_uuid) - locked_by.eql?(process_uuid) - end - - def recently_locked?(clock:) - locked_by && locked_at > RECENTLY_LOCKED_DURATION.ago(clock.now) - end - - def fetch_specification - FetchSpecification.new(format, split_key) - end - - private - - def self.lock_for_split_key(fetch_specification) - lock.find_by(format: fetch_specification.message_format, split_key: fetch_specification.split_key) - end - - def self.get_lock_record(fetch_specification) - l = lock_for_split_key(fetch_specification) - if l.nil? - begin - l = create!(format: fetch_specification.message_format, split_key: fetch_specification.split_key) - rescue ::ActiveRecord::RecordNotUnique - l = lock_for_split_key(fetch_specification) - end - end - l - end - end - - def self.build_for_consumer(database_url) - ::ActiveRecord::Base.establish_connection(database_url) unless ::ActiveRecord::Base.connected? - if ::ActiveRecord::Base.connection.adapter_name == "Mysql2" - ::ActiveRecord::Base.connection.execute("SET SESSION innodb_lock_wait_timeout = 1;") - end - new - end - - def insert_record(format, split_key, payload) - Repository::Record.create!(format: format, split_key: split_key, payload: payload) - end - - def retrieve_batch(fetch_specification, batch_size) - Record.remaining_for(fetch_specification).order("id ASC").limit(batch_size).to_a - end - - def get_remaining_count(fetch_specification) - Record.remaining_for(fetch_specification).count - end - - def obtain_lock_for_process(fetch_specification, process_uuid, clock:) - Lock.obtain(fetch_specification, process_uuid, clock: clock) - end - - def release_lock_for_process(fetch_specification, process_uuid) - Lock.release(fetch_specification, process_uuid) - end - - def mark_as_enqueued(record, now) - record.update_column(:enqueued_at, now) - end - - def delete_enqueued_older_than(fetch_specification, duration, limit) - scope = Record.for_fetch_specification(fetch_specification).where("enqueued_at < ?", duration.ago) - scope = scope.limit(limit).order(:id) unless limit == :all - scope.delete_all - :ok - rescue ::ActiveRecord::Deadlocked - :deadlocked - rescue ::ActiveRecord::LockWaitTimeout - :lock_timeout - end - end - end -end diff --git a/contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox/sidekiq_producer.rb b/contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox/sidekiq_producer.rb index b82316d90d..ae35bff389 100644 --- a/contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox/sidekiq_producer.rb +++ b/contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox/sidekiq_producer.rb @@ -2,12 +2,12 @@ require "sidekiq" require_relative "sidekiq5_format" -require_relative "repository" +require_relative "repositories/mysql57" module RubyEventStore module Outbox class SidekiqProducer - def initialize(repository = Repository.new) + def initialize(repository = Repositories::Mysql57.new) @repository = repository end diff --git a/contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox/sidekiq_scheduler.rb b/contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox/sidekiq_scheduler.rb index cda8c89ce3..a5c1413ef1 100644 --- a/contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox/sidekiq_scheduler.rb +++ b/contrib/ruby_event_store-outbox/lib/ruby_event_store/outbox/sidekiq_scheduler.rb @@ -5,7 +5,7 @@ module RubyEventStore module Outbox class SidekiqScheduler - def initialize(repository: Repository.new, serializer: RubyEventStore::Serializers::YAML) + def initialize(repository: Repositories::Mysql57.new, serializer: RubyEventStore::Serializers::YAML) @serializer = serializer @sidekiq_producer = SidekiqProducer.new(repository) end diff --git a/contrib/ruby_event_store-outbox/spec/consumer_spec.rb b/contrib/ruby_event_store-outbox/spec/consumer_spec.rb index b618325e3c..afb278f98e 100644 --- a/contrib/ruby_event_store-outbox/spec/consumer_spec.rb +++ b/contrib/ruby_event_store-outbox/spec/consumer_spec.rb @@ -90,7 +90,7 @@ module Outbox create_record("default", "default") consumer = Consumer.new(SecureRandom.uuid, default_configuration, logger: logger, metrics: null_metrics) clock = TickingClock.new - Repository::Lock.obtain( + Repositories::Mysql57::Lock.obtain( FetchSpecification.new(SIDEKIQ5_FORMAT, "default"), "some-other-process-uuid", clock: clock @@ -152,7 +152,7 @@ module Outbox } ] } - Repository.new.insert_record("sidekiq5", "default", payload.to_json) + Repositories::Mysql57.new.insert_record("sidekiq5", "default", payload.to_json) consumer = Consumer.new( SecureRandom.uuid, @@ -185,7 +185,7 @@ module Outbox end specify "deadlock when obtaining lock just skip that attempt" do - expect(Repository::Lock).to receive(:lock).and_raise(::ActiveRecord::Deadlocked) + expect(Repositories::Mysql57::Lock).to receive(:lock).and_raise(::ActiveRecord::Deadlocked) clock = TickingClock.new consumer = Consumer.new( @@ -205,7 +205,7 @@ module Outbox end specify "lock timeout when obtaining lock just skip that attempt" do - expect(Repository::Lock).to receive(:lock).and_raise(::ActiveRecord::LockWaitTimeout) + expect(Repositories::Mysql57::Lock).to receive(:lock).and_raise(::ActiveRecord::LockWaitTimeout) clock = TickingClock.new consumer = Consumer.new( @@ -226,7 +226,7 @@ module Outbox specify "obtaining taken lock just skip that attempt" do clock = TickingClock.new - Repository::Lock.obtain(FetchSpecification.new(SIDEKIQ5_FORMAT, "default"), "other-process-uuid", clock: clock) + Repositories::Mysql57::Lock.obtain(FetchSpecification.new(SIDEKIQ5_FORMAT, "default"), "other-process-uuid", clock: clock) consumer = Consumer.new( SecureRandom.uuid, @@ -246,7 +246,7 @@ module Outbox specify "deadlock when releasing lock doesnt do anything" do create_record("default", "default") - allow(Repository::Lock).to receive(:lock).and_wrap_original do |m, *args| + allow(Repositories::Mysql57::Lock).to receive(:lock).and_wrap_original do |m, *args| if caller.any? { |l| l.include? "`release'" } raise ::ActiveRecord::Deadlocked else @@ -273,7 +273,7 @@ module Outbox specify "lock timeout when releasing lock doesnt do anything" do create_record("default", "default") - allow(Repository::Lock).to receive(:lock).and_wrap_original do |m, *args| + allow(Repositories::Mysql57::Lock).to receive(:lock).and_wrap_original do |m, *args| if caller.any? { |l| l.include? "`release'" } raise ::ActiveRecord::LockWaitTimeout else @@ -306,7 +306,7 @@ module Outbox consumer.process - lock = Repository::Lock.find_by!(split_key: "default") + lock = Repositories::Mysql57::Lock.find_by!(split_key: "default") expect(lock.locked_by).to be_nil expect(lock.locked_at).to be_nil end @@ -317,7 +317,7 @@ module Outbox consumer = Consumer.new(SecureRandom.uuid, default_configuration, clock: clock, logger: logger, metrics: test_metrics) allow(consumer).to receive(:release_lock_for_process).and_wrap_original do |m, *args| - Repository::Lock.delete_all + Repositories::Mysql57::Lock.delete_all m.call(*args) end @@ -337,7 +337,7 @@ module Outbox consumer = Consumer.new(SecureRandom.uuid, default_configuration, clock: clock, logger: logger, metrics: null_metrics) allow(consumer).to receive(:release_lock_for_process).and_wrap_original do |m, *args| - Repository::Lock.update_all(locked_by: SecureRandom.uuid) + Repositories::Mysql57::Lock.update_all(locked_by: SecureRandom.uuid) m.call(*args) end @@ -349,7 +349,7 @@ module Outbox end specify "old lock can be reobtained" do - Repository::Lock.obtain( + Repositories::Mysql57::Lock.obtain( FetchSpecification.new(SIDEKIQ5_FORMAT, "default"), "some-old-uuid", clock: TickingClock.new(start: 10.minutes.ago) @@ -365,7 +365,7 @@ module Outbox end specify "relatively fresh locks are not reobtained" do - Repository::Lock.obtain( + Repositories::Mysql57::Lock.obtain( FetchSpecification.new(SIDEKIQ5_FORMAT, "default"), "some-old-uuid", clock: TickingClock.new(start: 9.minutes.ago) @@ -383,7 +383,7 @@ module Outbox clock = TickingClock.new consumer = Consumer.new(SecureRandom.uuid, default_configuration, clock: clock, logger: logger, metrics: null_metrics) - allow(Repository::Lock).to receive(:create!).and_wrap_original do |m, *args| + allow(Repositories::Mysql57::Lock).to receive(:create!).and_wrap_original do |m, *args| m.call(*args) # To simulate someone inserting a record just before us m.call(*args) end @@ -439,8 +439,8 @@ module Outbox specify "death of a consumer shouldnt prevent other processes from processing" do consumer_1 = Consumer.new(SecureRandom.uuid, default_configuration, logger: logger, metrics: null_metrics) - expect(Repository::Record).to receive(:where).and_raise("Unexpected error, such as OOM").ordered - expect(Repository::Record).to receive(:where).and_call_original.ordered.at_least(2).times + expect(Repositories::Mysql57::Record).to receive(:where).and_raise("Unexpected error, such as OOM").ordered + expect(Repositories::Mysql57::Record).to receive(:where).and_call_original.ordered.at_least(2).times expect { consumer_1.process }.to raise_error(/Unexpected error/) create_record("default", "default") @@ -451,14 +451,14 @@ module Outbox # We don't expect both records to be processed (because one of the Locks may be obtained by crashed process, but we expect to do SOME work in ANY splits. expect(result).to eq(true) - expect(Repository::Record.where("enqueued_at is not null").count).to be_positive + expect(Repositories::Mysql57::Record.where("enqueued_at is not null").count).to be_positive end specify "lock is refreshed after each batch" do skip "https://github.com/rspec/rspec-mocks/issues/1306" if RUBY_VERSION >= "3.0" consumer = Consumer.new(SecureRandom.uuid, default_configuration, logger: logger, metrics: null_metrics) 2.times.map { |r| create_record("default", "default") } - expect_any_instance_of(Repository::Lock).to receive(:refresh).twice.and_call_original + expect_any_instance_of(Repositories::Mysql57::Lock).to receive(:refresh).twice.and_call_original consumer.process end @@ -476,12 +476,12 @@ module Outbox ) consumer.process expect(redis.llen("queue:default")).to eq(1) - expect(Repository::Record.count).to eq(1) + expect(Repositories::Mysql57::Record.count).to eq(1) travel (7.days + 1.minute) consumer.process - expect(Repository::Record.count).to eq(0) + expect(Repositories::Mysql57::Record.count).to eq(0) end specify "clean old jobs with limit" do @@ -497,12 +497,12 @@ module Outbox ) consumer.process expect(redis.llen("queue:default")).to eq(3) - expect(Repository::Record.count).to eq(3) + expect(Repositories::Mysql57::Record.count).to eq(3) travel (7.days + 1.minute) consumer.process - expect(Repository::Record.count).to eq(1) + expect(Repositories::Mysql57::Record.count).to eq(1) end specify "clean old jobs - lock timeout" do @@ -518,13 +518,13 @@ module Outbox ) consumer.process expect(redis.llen("queue:default")).to eq(1) - expect(Repository::Record.count).to eq(1) + expect(Repositories::Mysql57::Record.count).to eq(1) travel (7.days + 1.minute) allow_any_instance_of(::ActiveRecord::Relation).to receive(:delete_all).and_raise(::ActiveRecord::LockWaitTimeout) consumer.process - expect(Repository::Record.count).to eq(1) + expect(Repositories::Mysql57::Record.count).to eq(1) expect(logger_output.string).to include("Cleanup for split_key 'default' failed (lock timeout)") expect(test_metrics.operation_results).to include({ operation: "cleanup", result: "lock_timeout" }) end @@ -542,13 +542,13 @@ module Outbox ) consumer.process expect(redis.llen("queue:default")).to eq(1) - expect(Repository::Record.count).to eq(1) + expect(Repositories::Mysql57::Record.count).to eq(1) travel (7.days + 1.minute) allow_any_instance_of(::ActiveRecord::Relation).to receive(:delete_all).and_raise(::ActiveRecord::Deadlocked) consumer.process - expect(Repository::Record.count).to eq(1) + expect(Repositories::Mysql57::Record.count).to eq(1) expect(logger_output.string).to include("Cleanup for split_key 'default' failed (deadlock)") expect(test_metrics.operation_results).to include({ operation: "cleanup", result: "deadlocked" }) end @@ -569,7 +569,7 @@ def create_record(queue, split_key, format: "sidekiq5") } ] } - Repository.new.insert_record(format, split_key, payload.to_json) + Repositories::Mysql57.new.insert_record(format, split_key, payload.to_json) end end end diff --git a/contrib/ruby_event_store-outbox/spec/legacy_sidekiq_scheduler_spec.rb b/contrib/ruby_event_store-outbox/spec/legacy_sidekiq_scheduler_spec.rb index 1305511383..ad6e5c76cf 100644 --- a/contrib/ruby_event_store-outbox/spec/legacy_sidekiq_scheduler_spec.rb +++ b/contrib/ruby_event_store-outbox/spec/legacy_sidekiq_scheduler_spec.rb @@ -65,8 +65,8 @@ def through_outbox? subject.call(CorrectAsyncHandler, serialized_record) - expect(Repository::Record.count).to eq(1) - record = Repository::Record.first + expect(Repositories::Mysql57::Record.count).to eq(1) + record = Repositories::Mysql57::Record.first expect(record.created_at).to be_present expect(record.enqueued_at).to be_nil expect(record.split_key).to eq("default") @@ -110,7 +110,7 @@ def through_outbox? subject.call(CorrectAsyncHandler, serialized_record) - record = Repository::Record.first + record = Repositories::Mysql57::Record.first expect(record.split_key).to eq("custom_queue") expect(record.hash_payload[:queue]).to eq("custom_queue") end @@ -136,7 +136,7 @@ def through_outbox? subject.call(CorrectAsyncHandler, serialized_record) - expect(Repository::Record.count).to eq(0) + expect(Repositories::Mysql57::Record.count).to eq(0) end end end diff --git a/contrib/ruby_event_store-outbox/spec/repositories/mysql57_spec.rb b/contrib/ruby_event_store-outbox/spec/repositories/mysql57_spec.rb new file mode 100644 index 0000000000..c474479946 --- /dev/null +++ b/contrib/ruby_event_store-outbox/spec/repositories/mysql57_spec.rb @@ -0,0 +1,208 @@ +require "spec_helper" + +module RubyEventStore + module Outbox + module Repositories + ::RSpec.describe Mysql57, db: true do + include SchemaHelper + + let(:database_url) { ENV["DATABASE_URL"] } + let(:message_format) { "some_message_format" } + let(:split_key) { "some_split_key" } + let(:some_process_uuid) { SecureRandom.uuid } + let(:other_process_uuid) { SecureRandom.uuid } + let(:clock) { TickingClock.new } + + specify "successful obtaining returns Lock structure" do + repository = Mysql57.build_for_consumer(database_url) + expected_fetch_specification = FetchSpecification.new(message_format, split_key) + + lock = repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid, clock: clock) + + expect(lock).to be_a(Mysql57::Lock) + expect(lock.fetch_specification).to eq(expected_fetch_specification) + expect(lock).to be_locked_by(some_process_uuid) + expect(lock).to be_recently_locked(clock: clock) + end + + specify "Lock is not considered locked after some time" do + repository = Mysql57.build_for_consumer(database_url) + expected_fetch_specification = FetchSpecification.new(message_format, split_key) + + result = repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid, clock: clock) + wait_for_lock_duration + + expect(result).not_to be_recently_locked(clock: clock) + end + + specify "trying to obtain taken Lock" do + repository = Mysql57.build_for_consumer(database_url) + expected_fetch_specification = FetchSpecification.new(message_format, split_key) + repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid, clock: clock) + + result = repository.obtain_lock_for_process(expected_fetch_specification, other_process_uuid, clock: clock) + + expect(result).to be(:taken) + end + + specify "obtains a lock for given fetch specification" do + repository = Mysql57.build_for_consumer(database_url) + repository.obtain_lock_for_process( + FetchSpecification.new("other_message_format", split_key), + some_process_uuid, + clock: clock + ) + repository.obtain_lock_for_process( + FetchSpecification.new(message_format, "other_split_key"), + some_process_uuid, + clock: clock + ) + expected_fetch_specification = FetchSpecification.new(message_format, split_key) + + lock = repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid, clock: clock) + + expect(lock.fetch_specification).to eq(expected_fetch_specification) + end + + specify "successful release" do + repository = Mysql57.build_for_consumer(database_url) + expected_fetch_specification = FetchSpecification.new(message_format, split_key) + lock = repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid, clock: clock) + + result = repository.release_lock_for_process(expected_fetch_specification, some_process_uuid) + + expect(result).to be(:ok) + lock.reload + expect(lock.locked_by).to be_nil + expect(lock.locked_at).to be_nil + end + + specify "released lock can be obtained by other process" do + repository = Mysql57.build_for_consumer(database_url) + expected_fetch_specification = FetchSpecification.new(message_format, split_key) + repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid, clock: clock) + + repository.release_lock_for_process(expected_fetch_specification, some_process_uuid) + + result = repository.obtain_lock_for_process(expected_fetch_specification, other_process_uuid, clock: clock) + expect(result).to be_a(Mysql57::Lock) + end + + specify "cant release not obtained lock" do + repository = Mysql57.build_for_consumer(database_url) + expected_fetch_specification = FetchSpecification.new(message_format, split_key) + + result = repository.release_lock_for_process(expected_fetch_specification, some_process_uuid) + + expect(result).to be(:not_taken_by_this_process) + end + + specify "one process cant release other's lock" do + repository = Mysql57.build_for_consumer(database_url) + expected_fetch_specification = FetchSpecification.new(message_format, split_key) + repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid, clock: clock) + + result = repository.release_lock_for_process(expected_fetch_specification, other_process_uuid) + + expect(result).to be(:not_taken_by_this_process) + end + + specify "lock timeout when obtaining Lock" do + repository = Mysql57.build_for_consumer(database_url) + expected_fetch_specification = FetchSpecification.new(message_format, split_key) + expect(Mysql57::Lock).to receive(:lock).and_raise(::ActiveRecord::LockWaitTimeout) + + result = repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid, clock: clock) + + expect(result).to be(:lock_timeout) + end + + specify "deadlock when obtaining Lock" do + repository = Mysql57.build_for_consumer(database_url) + expected_fetch_specification = FetchSpecification.new(message_format, split_key) + expect(Mysql57::Lock).to receive(:create!).and_raise(::ActiveRecord::Deadlocked) + + result = repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid, clock: clock) + + expect(result).to be(:deadlocked) + end + + specify "lock timeout when releasing lock" do + repository = Mysql57.build_for_consumer(database_url) + expected_fetch_specification = FetchSpecification.new(message_format, split_key) + repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid, clock: clock) + expect(Mysql57::Lock).to receive(:lock).and_raise(::ActiveRecord::LockWaitTimeout) + + result = repository.release_lock_for_process(expected_fetch_specification, some_process_uuid) + + expect(result).to be(:lock_timeout) + end + + specify "deadlock when releasing lock" do + repository = Mysql57.build_for_consumer(database_url) + expected_fetch_specification = FetchSpecification.new(message_format, split_key) + repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid, clock: clock) + expect(Mysql57::Lock).to receive(:lock).and_raise(::ActiveRecord::Deadlocked) + + result = repository.release_lock_for_process(expected_fetch_specification, some_process_uuid) + + expect(result).to be(:deadlocked) + end + + specify "refreshing lock" do + repository = Mysql57.build_for_consumer(database_url) + expected_fetch_specification = FetchSpecification.new(message_format, split_key) + lock = repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid, clock: clock) + clock.test_travel (Mysql57::RECENTLY_LOCKED_DURATION / 2) + + result = lock.refresh(clock: clock) + + clock.test_travel (Mysql57::RECENTLY_LOCKED_DURATION / 2 + 1.second) + expect(result).to be(:ok) + expect(lock).to be_locked_by(some_process_uuid) + expect(lock).to be_recently_locked(clock: clock) + end + + specify "refreshing lock when other process stole it" do + repository = Mysql57.build_for_consumer(database_url) + expected_fetch_specification = FetchSpecification.new(message_format, split_key) + lock_for_some_process = + repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid, clock: clock) + wait_for_lock_duration + lock_for_other_process = + repository.obtain_lock_for_process(expected_fetch_specification, other_process_uuid, clock: clock) + + result = lock_for_some_process.refresh(clock: clock) + + expect(result).to be(:stolen) + end + + specify "lock timeout when refreshing lock" do + repository = Mysql57.build_for_consumer(database_url) + expected_fetch_specification = FetchSpecification.new(message_format, split_key) + lock = repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid, clock: clock) + expect(lock).to receive(:lock!).and_raise(::ActiveRecord::LockWaitTimeout) + + result = lock.refresh(clock: clock) + + expect(result).to be(:lock_timeout) + end + + specify "deadlocked when refreshing lock" do + repository = Mysql57.build_for_consumer(database_url) + expected_fetch_specification = FetchSpecification.new(message_format, split_key) + lock = repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid, clock: clock) + expect(lock).to receive(:lock!).and_raise(::ActiveRecord::Deadlocked) + + result = lock.refresh(clock: clock) + + expect(result).to be(:deadlocked) + end + + def wait_for_lock_duration + clock.test_travel (Mysql57::RECENTLY_LOCKED_DURATION + 1.second) + end + end + end + end +end diff --git a/contrib/ruby_event_store-outbox/spec/repository_spec.rb b/contrib/ruby_event_store-outbox/spec/repository_spec.rb deleted file mode 100644 index ca007284d4..0000000000 --- a/contrib/ruby_event_store-outbox/spec/repository_spec.rb +++ /dev/null @@ -1,206 +0,0 @@ -require "spec_helper" - -module RubyEventStore - module Outbox - ::RSpec.describe Repository, db: true do - include SchemaHelper - - let(:database_url) { ENV["DATABASE_URL"] } - let(:message_format) { "some_message_format" } - let(:split_key) { "some_split_key" } - let(:some_process_uuid) { SecureRandom.uuid } - let(:other_process_uuid) { SecureRandom.uuid } - let(:clock) { TickingClock.new } - - specify "successful obtaining returns Lock structure" do - repository = Repository.build_for_consumer(database_url) - expected_fetch_specification = FetchSpecification.new(message_format, split_key) - - lock = repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid, clock: clock) - - expect(lock).to be_a(Repository::Lock) - expect(lock.fetch_specification).to eq(expected_fetch_specification) - expect(lock).to be_locked_by(some_process_uuid) - expect(lock).to be_recently_locked(clock: clock) - end - - specify "Lock is not considered locked after some time" do - repository = Repository.build_for_consumer(database_url) - expected_fetch_specification = FetchSpecification.new(message_format, split_key) - - result = repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid, clock: clock) - wait_for_lock_duration - - expect(result).not_to be_recently_locked(clock: clock) - end - - specify "trying to obtain taken Lock" do - repository = Repository.build_for_consumer(database_url) - expected_fetch_specification = FetchSpecification.new(message_format, split_key) - repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid, clock: clock) - - result = repository.obtain_lock_for_process(expected_fetch_specification, other_process_uuid, clock: clock) - - expect(result).to be(:taken) - end - - specify "obtains a lock for given fetch specification" do - repository = Repository.build_for_consumer(database_url) - repository.obtain_lock_for_process( - FetchSpecification.new("other_message_format", split_key), - some_process_uuid, - clock: clock - ) - repository.obtain_lock_for_process( - FetchSpecification.new(message_format, "other_split_key"), - some_process_uuid, - clock: clock - ) - expected_fetch_specification = FetchSpecification.new(message_format, split_key) - - lock = repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid, clock: clock) - - expect(lock.fetch_specification).to eq(expected_fetch_specification) - end - - specify "successful release" do - repository = Repository.build_for_consumer(database_url) - expected_fetch_specification = FetchSpecification.new(message_format, split_key) - lock = repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid, clock: clock) - - result = repository.release_lock_for_process(expected_fetch_specification, some_process_uuid) - - expect(result).to be(:ok) - lock.reload - expect(lock.locked_by).to be_nil - expect(lock.locked_at).to be_nil - end - - specify "released lock can be obtained by other process" do - repository = Repository.build_for_consumer(database_url) - expected_fetch_specification = FetchSpecification.new(message_format, split_key) - repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid, clock: clock) - - repository.release_lock_for_process(expected_fetch_specification, some_process_uuid) - - result = repository.obtain_lock_for_process(expected_fetch_specification, other_process_uuid, clock: clock) - expect(result).to be_a(Repository::Lock) - end - - specify "cant release not obtained lock" do - repository = Repository.build_for_consumer(database_url) - expected_fetch_specification = FetchSpecification.new(message_format, split_key) - - result = repository.release_lock_for_process(expected_fetch_specification, some_process_uuid) - - expect(result).to be(:not_taken_by_this_process) - end - - specify "one process cant release other's lock" do - repository = Repository.build_for_consumer(database_url) - expected_fetch_specification = FetchSpecification.new(message_format, split_key) - repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid, clock: clock) - - result = repository.release_lock_for_process(expected_fetch_specification, other_process_uuid) - - expect(result).to be(:not_taken_by_this_process) - end - - specify "lock timeout when obtaining Lock" do - repository = Repository.build_for_consumer(database_url) - expected_fetch_specification = FetchSpecification.new(message_format, split_key) - expect(Repository::Lock).to receive(:lock).and_raise(::ActiveRecord::LockWaitTimeout) - - result = repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid, clock: clock) - - expect(result).to be(:lock_timeout) - end - - specify "deadlock when obtaining Lock" do - repository = Repository.build_for_consumer(database_url) - expected_fetch_specification = FetchSpecification.new(message_format, split_key) - expect(Repository::Lock).to receive(:create!).and_raise(::ActiveRecord::Deadlocked) - - result = repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid, clock: clock) - - expect(result).to be(:deadlocked) - end - - specify "lock timeout when releasing lock" do - repository = Repository.build_for_consumer(database_url) - expected_fetch_specification = FetchSpecification.new(message_format, split_key) - repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid, clock: clock) - expect(Repository::Lock).to receive(:lock).and_raise(::ActiveRecord::LockWaitTimeout) - - result = repository.release_lock_for_process(expected_fetch_specification, some_process_uuid) - - expect(result).to be(:lock_timeout) - end - - specify "deadlock when releasing lock" do - repository = Repository.build_for_consumer(database_url) - expected_fetch_specification = FetchSpecification.new(message_format, split_key) - repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid, clock: clock) - expect(Repository::Lock).to receive(:lock).and_raise(::ActiveRecord::Deadlocked) - - result = repository.release_lock_for_process(expected_fetch_specification, some_process_uuid) - - expect(result).to be(:deadlocked) - end - - specify "refreshing lock" do - repository = Repository.build_for_consumer(database_url) - expected_fetch_specification = FetchSpecification.new(message_format, split_key) - lock = repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid, clock: clock) - clock.test_travel (Repository::RECENTLY_LOCKED_DURATION / 2) - - result = lock.refresh(clock: clock) - - clock.test_travel (Repository::RECENTLY_LOCKED_DURATION / 2 + 1.second) - expect(result).to be(:ok) - expect(lock).to be_locked_by(some_process_uuid) - expect(lock).to be_recently_locked(clock: clock) - end - - specify "refreshing lock when other process stole it" do - repository = Repository.build_for_consumer(database_url) - expected_fetch_specification = FetchSpecification.new(message_format, split_key) - lock_for_some_process = - repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid, clock: clock) - wait_for_lock_duration - lock_for_other_process = - repository.obtain_lock_for_process(expected_fetch_specification, other_process_uuid, clock: clock) - - result = lock_for_some_process.refresh(clock: clock) - - expect(result).to be(:stolen) - end - - specify "lock timeout when refreshing lock" do - repository = Repository.build_for_consumer(database_url) - expected_fetch_specification = FetchSpecification.new(message_format, split_key) - lock = repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid, clock: clock) - expect(lock).to receive(:lock!).and_raise(::ActiveRecord::LockWaitTimeout) - - result = lock.refresh(clock: clock) - - expect(result).to be(:lock_timeout) - end - - specify "deadlocked when refreshing lock" do - repository = Repository.build_for_consumer(database_url) - expected_fetch_specification = FetchSpecification.new(message_format, split_key) - lock = repository.obtain_lock_for_process(expected_fetch_specification, some_process_uuid, clock: clock) - expect(lock).to receive(:lock!).and_raise(::ActiveRecord::Deadlocked) - - result = lock.refresh(clock: clock) - - expect(result).to be(:deadlocked) - end - - def wait_for_lock_duration - clock.test_travel (Repository::RECENTLY_LOCKED_DURATION + 1.second) - end - end - end -end diff --git a/contrib/ruby_event_store-outbox/spec/sidekiq_scheduler_spec.rb b/contrib/ruby_event_store-outbox/spec/sidekiq_scheduler_spec.rb index 60688c8ab3..02119bd79c 100644 --- a/contrib/ruby_event_store-outbox/spec/sidekiq_scheduler_spec.rb +++ b/contrib/ruby_event_store-outbox/spec/sidekiq_scheduler_spec.rb @@ -64,8 +64,8 @@ def through_outbox? subject.call(CorrectAsyncHandler, event_record) - expect(Repository::Record.count).to eq(1) - record = Repository::Record.first + expect(Repositories::Mysql57::Record.count).to eq(1) + record = Repositories::Mysql57::Record.first expect(record.created_at).to be_present expect(record.enqueued_at).to be_nil expect(record.split_key).to eq("default") @@ -108,7 +108,7 @@ def through_outbox? subject.call(CorrectAsyncHandler, event_record) - record = Repository::Record.first + record = Repositories::Mysql57::Record.first expect(record.split_key).to eq("custom_queue") expect(record.hash_payload[:queue]).to eq("custom_queue") end @@ -130,7 +130,7 @@ def through_outbox? subject.call(CorrectAsyncHandlerWithRetryQueue, event_record) - record = Repository::Record.first + record = Repositories::Mysql57::Record.first expect(record.split_key).to eq("custom_queue") expect(record.hash_payload[:retry_queue]).to eq("custom_queue_retries") end @@ -155,7 +155,7 @@ def through_outbox? subject.call(CorrectAsyncHandler, event_record) - expect(Repository::Record.count).to eq(0) + expect(Repositories::Mysql57::Record.count).to eq(0) end end end