Skip to content

Commit

Permalink
Extract global updater to separate class
Browse files Browse the repository at this point in the history
  • Loading branch information
swistak35 committed Mar 17, 2021
1 parent c129932 commit 1c6a673
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ module PersistentProjections
end

require_relative 'persistent_projections/version'
require_relative 'persistent_projections/global_updater'
require_relative 'persistent_projections/consumer'
Original file line number Diff line number Diff line change
@@ -1,19 +1,11 @@
require "logger"
require "active_record"
require "ruby_event_store/persistent_projections/global_updater"

module RubyEventStore
module PersistentProjections
class ProjectionStatus < ActiveRecord::Base
self.table_name = 'event_store_projections'
end

class Event < ActiveRecord::Base
self.table_name = 'event_store_events'
end

class Consumer
SLEEP_TIME_WHEN_NOTHING_TO_DO = 0.1
GLOBAL_POSITION_NAME = "$"

def initialize(consumer_uuid, require_file, clock: Time, logger:)
@clock = clock
Expand All @@ -24,16 +16,18 @@ def initialize(consumer_uuid, require_file, clock: Time, logger:)
prepare_traps

require require_file unless require_file.nil?

@global_updater = GlobalUpdater.new(logger: logger, clock: clock)
end

def init
logger.info("Initiated RubyEventStore::PersistentProjections v#{VERSION}")
ActiveRecord::Base.connection.execute("SET SESSION innodb_lock_wait_timeout = 1;")
@global_updater.init
end

def run
while !@gracefully_shutting_down do
was_something_changed = one_loop
was_something_changed = @global_updater.one_loop
if !was_something_changed
STDOUT.flush
sleep SLEEP_TIME_WHEN_NOTHING_TO_DO
Expand All @@ -42,41 +36,6 @@ def run
logger.info "Gracefully shutting down"
end

def one_loop
last_id = Event.order("id DESC").first&.id || 0
current_status = begin
ProjectionStatus.find_by!(name: GLOBAL_POSITION_NAME)
rescue ActiveRecord::RecordNotFound
begin
ProjectionStatus.create!(name: GLOBAL_POSITION_NAME, position: 0)
rescue ActiveRecord::RecordNotUnique
ProjectionStatus.find_by(name: GLOBAL_POSITION_NAME)
end
end
return false if last_id == current_status.position
next_position = current_status.position + 1
begin
check_event_on_position(next_position)
bump_position_to_at_least(next_position)
rescue ActiveRecord::RecordNotFound
bump_position_to_at_least(next_position)
rescue ActiveRecord::LockWaitTimeout
logger.debug "Lock wait timeout"
end
true
end

def check_event_on_position(position)
Event.transaction do
ProjectionStatus.connection.execute("SELECT id FROM event_store_events WHERE id = #{position} FOR UPDATE")
end
end

def bump_position_to_at_least(new_position)
ProjectionStatus.connection.execute("UPDATE event_store_projections SET position = GREATEST(#{new_position}, position) WHERE name = '#{GLOBAL_POSITION_NAME}'")
logger.debug "Progressed to at least #{new_position}"
end

private
attr_reader :logger

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
require "active_record"

module RubyEventStore
module PersistentProjections
class GlobalUpdater
class ProjectionStatus < ActiveRecord::Base
self.table_name = 'event_store_projections'
end

class Event < ActiveRecord::Base
self.table_name = 'event_store_events'
end

GLOBAL_POSITION_NAME = "$"

def initialize(logger:, clock:)
@logger = logger
@clock = clock
end

def init
ActiveRecord::Base.connection.execute("SET SESSION innodb_lock_wait_timeout = 1;")
end

def one_loop
last_id = Event.order("id DESC").first&.id || 0
current_status = begin
ProjectionStatus.find_by!(name: GLOBAL_POSITION_NAME)
rescue ActiveRecord::RecordNotFound
begin
ProjectionStatus.create!(name: GLOBAL_POSITION_NAME, position: 0)
rescue ActiveRecord::RecordNotUnique
ProjectionStatus.find_by(name: GLOBAL_POSITION_NAME)
end
end
return false if last_id == current_status.position
next_position = current_status.position + 1
begin
check_event_on_position(next_position)
bump_position_to_at_least(next_position)
rescue ActiveRecord::RecordNotFound
bump_position_to_at_least(next_position)
rescue ActiveRecord::LockWaitTimeout
logger.debug "Lock wait timeout"
end
true
end

def check_event_on_position(position)
Event.transaction do
ProjectionStatus.connection.execute("SELECT id FROM event_store_events WHERE id = #{position} FOR UPDATE")
end
end

def bump_position_to_at_least(new_position)
ProjectionStatus.connection.execute("UPDATE event_store_projections SET position = GREATEST(#{new_position}, position) WHERE name = '#{GLOBAL_POSITION_NAME}'")
logger.debug "Progressed to at least #{new_position}"
end

attr_reader :logger, :clock
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,25 @@

module RubyEventStore
module PersistentProjections
RSpec.describe Consumer, db: true do
RSpec.describe GlobalUpdater, db: true do
include SchemaHelper

let(:logger_output) { StringIO.new }
let(:logger) { Logger.new(logger_output) }
let(:clock) { Time }

specify "creates projection global status if doesn't exist" do
consumer = Consumer.new(SecureRandom.uuid, nil, logger: logger)
consumer = GlobalUpdater.new(logger: logger, clock: clock)

consumer.one_loop

expect(ProjectionStatus.count).to eq(1)
expect(GlobalUpdater::ProjectionStatus.count).to eq(1)
expect(global_status.position).to eq(0)
end

specify 'global thread progresses the state if available' do
publish_event
consumer = Consumer.new(SecureRandom.uuid, nil, logger: logger)
consumer = GlobalUpdater.new(logger: logger, clock: clock)

consumer.one_loop

Expand All @@ -28,7 +29,7 @@ module PersistentProjections

specify 'global thread progresses the state if event locked' do
publish_event
consumer = Consumer.new(SecureRandom.uuid, nil, logger: logger)
consumer = GlobalUpdater.new(logger: logger, clock: clock)
expect(consumer).to receive(:check_event_on_position).with(1).and_raise(ActiveRecord::LockWaitTimeout)

result = consumer.one_loop
Expand All @@ -40,7 +41,7 @@ module PersistentProjections
specify 'global thread progresses the state if event not found' do
publish_event_and_rollback
publish_event
consumer = Consumer.new(SecureRandom.uuid, nil, logger: logger)
consumer = GlobalUpdater.new(logger: logger, clock: clock)

consumer.one_loop
result = consumer.one_loop
Expand All @@ -50,11 +51,11 @@ module PersistentProjections
end

def publish_event
Event.create!(event_id: SecureRandom.uuid, event_type: "Foo", data: {})
GlobalUpdater::Event.create!(event_id: SecureRandom.uuid, event_type: "Foo", data: {})
end

def global_status
ProjectionStatus.find_by(name: Consumer::GLOBAL_POSITION_NAME)
GlobalUpdater::ProjectionStatus.find_by(name: GlobalUpdater::GLOBAL_POSITION_NAME)
end

def publish_event_and_rollback
Expand Down

0 comments on commit 1c6a673

Please sign in to comment.