Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add repository using NOWAIT to outbox #1507

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion contrib/ruby_event_store-outbox/.mutant.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ matcher:
- RubyEventStore::Outbox*
ignore:
- RubyEventStore::Outbox::CLI*
- RubyEventStore::Outbox::SidekiqProducer#initialize
- RubyEventStore::Outbox::SidekiqProducer#call
- RubyEventStore::Outbox::SidekiqProcessor#process
- RubyEventStore::Outbox::SidekiqProcessor#after_batch
Expand All @@ -33,7 +34,7 @@ matcher:
- RubyEventStore::Outbox::Configuration*
- RubyEventStore::Outbox::Consumer#get_remaining_count
- RubyEventStore::Outbox::CleanupStrategies::None*
- RubyEventStore::Outbox::Repository*
- RubyEventStore::Outbox::Repositories*
- RubyEventStore::Outbox::Runner#initialize
- RubyEventStore::Outbox::Runner#run
- RubyEventStore::Outbox::Runner#prepare_traps
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ module Outbox
end

require_relative "outbox/fetch_specification"
require_relative "outbox/repository"
require_relative "outbox/repositories/mysql57"
require_relative "outbox/sidekiq_scheduler"
require_relative "outbox/version"
require_relative "outbox/tempo"
require_relative "outbox/batch_result"
require_relative "outbox/cleanup_strategies"
require_relative "outbox/repositories"
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
require "logger"
require "redis"
require "active_record"
require_relative "repository"
require_relative "repositories/mysql57"
require_relative "sidekiq5_format"
require_relative "sidekiq_processor"
require_relative "fetch_specification"
Expand All @@ -24,7 +24,7 @@ def initialize(consumer_uuid, configuration, clock: Time, logger:, metrics:)
raise "Unknown format" if configuration.message_format != SIDEKIQ5_FORMAT
@processor = SidekiqProcessor.new(Redis.new(url: configuration.redis_url))

@repository = Repository.new(configuration.database_url)
@repository = Repositories::Mysql57.build_for_consumer(configuration.database_url, clock: clock)
@cleanup_strategy = CleanupStrategies.build(configuration, repository)
end

Expand Down Expand Up @@ -123,7 +123,7 @@ def log_error(e)
end

def obtain_lock_for_process(fetch_specification)
result = repository.obtain_lock_for_process(fetch_specification, consumer_uuid, clock: @clock)
result = repository.obtain_lock_for_process(fetch_specification, consumer_uuid)
case result
when :deadlocked
logger.warn "Obtaining lock for split_key '#{fetch_specification.split_key}' failed (deadlock)"
Expand Down Expand Up @@ -158,7 +158,7 @@ def release_lock_for_process(fetch_specification)
end

def refresh_lock_for_process(lock)
result = lock.refresh(clock: @clock)
result = repository.refresh_lock(lock)
case result
when :ok
return true
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
module RubyEventStore
module Outbox
module Repositories
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
# frozen_string_literal: true

require "active_record"
require "active_support/core_ext/numeric/time.rb"

module RubyEventStore
module Outbox
module Repositories
class Mysql57
RECENTLY_LOCKED_DURATION = 10.minutes

class Record < ::ActiveRecord::Base
self.primary_key = :id
self.table_name = "event_store_outbox"

def self.remaining_for(fetch_specification)
where(format: fetch_specification.message_format, split_key: fetch_specification.split_key, enqueued_at: nil)
end

def self.for_fetch_specification(fetch_specification)
where(format: fetch_specification.message_format, split_key: fetch_specification.split_key)
end

def hash_payload
JSON.parse(payload).deep_symbolize_keys
end

def enqueued?
!enqueued_at.nil?
end
end

class Lock < ::ActiveRecord::Base
self.table_name = "event_store_outbox_locks"

def self.obtain(fetch_specification, process_uuid, clock:)
transaction do
l = get_lock_record(fetch_specification)

if l.recently_locked?(clock: clock)
:taken
else
l.update!(locked_by: process_uuid, locked_at: clock.now)
l
end
end
rescue ::ActiveRecord::Deadlocked
:deadlocked
rescue ::ActiveRecord::LockWaitTimeout
:lock_timeout
end

def self.refresh(lock, clock:)
transaction do
current_process_uuid = lock.locked_by
lock_record = Lock.lock.find(lock.id)
if lock_record.locked_by == current_process_uuid
lock_record.update!(locked_at: clock.now)
lock.assign_attributes(lock_record.attributes)
:ok
else
:stolen
end
end
rescue ::ActiveRecord::Deadlocked
:deadlocked
rescue ::ActiveRecord::LockWaitTimeout
:lock_timeout
end

def self.release(fetch_specification, process_uuid)
transaction do
l = get_lock_record(fetch_specification)
if !l.locked_by?(process_uuid)
:not_taken_by_this_process
else
l.update!(locked_by: nil, locked_at: nil)
:ok
end
end
rescue ::ActiveRecord::Deadlocked
:deadlocked
rescue ::ActiveRecord::LockWaitTimeout
:lock_timeout
end

def locked_by?(process_uuid)
locked_by.eql?(process_uuid)
end

def recently_locked?(clock:)
locked_by && locked_at > RECENTLY_LOCKED_DURATION.ago(clock.now)
end

def fetch_specification
FetchSpecification.new(format, split_key)
end

private

def self.lock_for_split_key(fetch_specification)
lock.find_by(format: fetch_specification.message_format, split_key: fetch_specification.split_key)
end

def self.get_lock_record(fetch_specification)
l = lock_for_split_key(fetch_specification)
if l.nil?
begin
l = create!(format: fetch_specification.message_format, split_key: fetch_specification.split_key)
rescue ::ActiveRecord::RecordNotUnique
l = lock_for_split_key(fetch_specification)
end
end
l
end
end

def self.build_for_consumer(database_url, clock:)
::ActiveRecord::Base.establish_connection(database_url) unless ::ActiveRecord::Base.connected?
if ::ActiveRecord::Base.connection.adapter_name == "Mysql2"
::ActiveRecord::Base.connection.execute("SET SESSION innodb_lock_wait_timeout = 1;")
end
new(clock: clock)
end

def self.build_for_producer(clock: Time)
new(clock: clock)
end

def initialize(clock:)
@clock = clock
end

def insert_record(format, split_key, payload)
Record.create!(format: format, split_key: split_key, payload: payload)
end

def retrieve_batch(fetch_specification, batch_size)
Record.remaining_for(fetch_specification).order("id ASC").limit(batch_size).to_a
end

def get_remaining_count(fetch_specification)
Record.remaining_for(fetch_specification).count
end

def obtain_lock_for_process(fetch_specification, process_uuid)
Lock.obtain(fetch_specification, process_uuid, clock: clock)
end

def release_lock_for_process(fetch_specification, process_uuid)
Lock.release(fetch_specification, process_uuid)
end

def refresh_lock(lock)
Lock.refresh(lock, clock: clock)
end

def mark_as_enqueued(record, now)
record.update_column(:enqueued_at, now)
end

def delete_enqueued_older_than(fetch_specification, duration, limit)
scope = Record.for_fetch_specification(fetch_specification).where("enqueued_at < ?", duration.ago)
scope = scope.limit(limit).order(:id) unless limit == :all
scope.delete_all
:ok
rescue ::ActiveRecord::Deadlocked
:deadlocked
rescue ::ActiveRecord::LockWaitTimeout
:lock_timeout
end

private
attr_reader :clock
end
end
end
end
Loading