Skip to content

Commit

Permalink
Benchmark for batch readers
Browse files Browse the repository at this point in the history
Co-authored-by: Paweł Pacana <[email protected]>
  • Loading branch information
fidel and mostlyobvious committed Jul 12, 2024
1 parent 8582357 commit af8119e
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,13 @@ module RubyEventStore
module ActiveRecord
class EventRepository
POSITION_SHIFT = 1
UNSET = Object.new.freeze

def initialize(model_factory: WithDefaultModels.new, serializer:)
def initialize(model_factory: WithDefaultModels.new, serializer:, batch_reader: UNSET)
@serializer = serializer

@event_klass, @stream_klass = model_factory.call
@repo_reader = EventRepositoryReader.new(@event_klass, @stream_klass, serializer)
@repo_reader = EventRepositoryReader.new(@event_klass, @stream_klass, serializer, batch_reader)
@index_violation_detector = IndexViolationDetector.new(@event_klass.table_name, @stream_klass.table_name)
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
module RubyEventStore
module ActiveRecord
class EventRepositoryReader
def initialize(event_klass, stream_klass, serializer)
def initialize(event_klass, stream_klass, serializer, batch_reader)
@event_klass = event_klass
@stream_klass = stream_klass
@serializer = serializer
@batch_reader = batch_reader
end

def has_event?(event_id)
Expand All @@ -21,7 +22,14 @@ def last_stream_event(stream)
def read(spec)
stream = read_scope(spec)
if spec.batched?
spec.time_sort_by ? offset_limit_batch_reader(spec, stream) : monotonic_id_batch_reader(spec, stream)
case [@batch_reader, spec.time_sort_by]
in [EventRepository::UNSET, ->(time_sort_by) { !time_sort_by.nil? }]
offset_limit_batch_reader(spec, stream)
in [EventRepository::UNSET, _]
monotonic_id_batch_reader(spec, stream)
else
@batch_reader[spec, stream]
end
elsif spec.first?
record_ = stream.first
record(record_) if record_
Expand Down Expand Up @@ -106,7 +114,8 @@ def read_scope(spec)
else
stream = @stream_klass.includes(:event).where(stream: spec.stream.name)
stream = stream.where(event_id: spec.with_ids) if spec.with_ids?
stream = stream.joins(:event).where(@event_klass.table_name => { event_type: spec.with_types }) if spec.with_types?
stream =
stream.joins(:event).where(@event_klass.table_name => { event_type: spec.with_types }) if spec.with_types?
stream = stream.joins(:event).order(as_at(spec)) if spec.time_sort_by_as_at?
stream = stream.joins(:event).order(as_of(spec)) if spec.time_sort_by_as_of?
stream = stream.order(id: order(spec))
Expand Down Expand Up @@ -145,7 +154,7 @@ def start_condition(specification)
start_offset_condition(
specification,
@stream_klass.find_by!(event_id: specification.start, stream: specification.stream.name),
@stream_klass.table_name
@stream_klass.table_name,
)
rescue ::ActiveRecord::RecordNotFound
raise EventNotFound.new(specification.start)
Expand All @@ -155,7 +164,7 @@ def stop_condition(specification)
stop_offset_condition(
specification,
@stream_klass.find_by!(event_id: specification.stop, stream: specification.stream.name),
@stream_klass.table_name
@stream_klass.table_name,
)
rescue ::ActiveRecord::RecordNotFound
raise EventNotFound.new(specification.stop)
Expand All @@ -165,7 +174,7 @@ def start_condition_in_global_stream(specification)
start_offset_condition(
specification,
@event_klass.find_by!(event_id: specification.start),
@event_klass.table_name
@event_klass.table_name,
)
rescue ::ActiveRecord::RecordNotFound
raise EventNotFound.new(specification.start)
Expand All @@ -175,7 +184,7 @@ def stop_condition_in_global_stream(specification)
stop_offset_condition(
specification,
@event_klass.find_by!(event_id: specification.stop),
@event_klass.table_name
@event_klass.table_name,
)
rescue ::ActiveRecord::RecordNotFound
raise EventNotFound.new(specification.stop)
Expand Down Expand Up @@ -216,16 +225,14 @@ def order(spec)
def record(record)
record = record.event if @stream_klass === record

SerializedRecord
.new(
event_id: record.event_id,
metadata: record.metadata,
data: record.data,
event_type: record.event_type,
timestamp: record.created_at.iso8601(TIMESTAMP_PRECISION),
valid_at: (record.valid_at || record.created_at).iso8601(TIMESTAMP_PRECISION)
)
.deserialize(serializer)
SerializedRecord.new(
event_id: record.event_id,
metadata: record.metadata,
data: record.data,
event_type: record.event_type,
timestamp: record.created_at.iso8601(TIMESTAMP_PRECISION),
valid_at: (record.valid_at || record.created_at).iso8601(TIMESTAMP_PRECISION),
).deserialize(serializer)
end
end

Expand Down
89 changes: 89 additions & 0 deletions ruby_event_store-active_record/repository_reader_benchmark.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
require "ruby_event_store"
require "ruby_event_store/active_record"
require_relative "../support/helpers/schema_helper"
require "benchmark"

helper = Object.new.extend(SchemaHelper)
helper.establish_database_connection
helper.drop_database
helper.load_database_schema

class Event < ::ActiveRecord::Base
self.primary_key = :id
self.table_name = "event_store_events"
end

class EventInStream < ::ActiveRecord::Base
self.primary_key = :id
self.table_name = "event_store_events_in_streams"
belongs_to :event, primary_key: :event_id
end

Integer(ARGV.first || 1).times do
RubyEventStore::Client.new(repository: RubyEventStore::ActiveRecord::EventRepository.new(serializer: YAML)).append(
(1..1_000).map { RubyEventStore::Event.new },
)
print "."
end

mk_client = ->(reader) do
RubyEventStore::Client.new(
repository:
RubyEventStore::ActiveRecord::EventRepository.new(
serializer: YAML,
batch_reader: reader,
model_factory: -> { [Event, EventInStream] },
),
)
end

record = ->(record) do
record = record.event if EventInStream === record

RubyEventStore::SerializedRecord.new(
event_id: record.event_id,
metadata: record.metadata,
data: record.data,
event_type: record.event_type,
timestamp: record.created_at.iso8601(RubyEventStore::TIMESTAMP_PRECISION),
valid_at: (record.valid_at || record.created_at).iso8601(RubyEventStore::TIMESTAMP_PRECISION),
).deserialize(YAML)
end

offset_limit = ->(spec, stream) do
batch_reader = ->(offset, limit) { stream.offset(offset).limit(limit).map(&record) }
RubyEventStore::BatchEnumerator.new(spec.batch_size, spec.limit, batch_reader).each
end

id_limit = ->(spec, stream) do
start_offset_condition = ->(specification, record_id, search_in) do
condition = "#{search_in}.id #{specification.forward? ? ">" : "<"} ?"
[condition, record_id]
end

batch_reader = ->(offset_id, limit) do
search_in =
(
if spec.stream.global?
Event.table_name
else
EventInStream.table_name
end
)
records =
if offset_id.nil?
stream.limit(limit)
else
stream.where(start_offset_condition[spec, offset_id, search_in]).limit(limit)
end
[records.map(&record), records.last]
end
RubyEventStore::ActiveRecord::BatchEnumerator.new(spec.batch_size, spec.limit, batch_reader).each
end

mk_benchmark = ->(reader) { mk_client[reader].read.each_batch { print "." } }

Benchmark.bm(14) do |x|
x.report("offset/limit:") { mk_benchmark[offset_limit] }
x.report("id/limit:") { mk_benchmark[id_limit] }
end

0 comments on commit af8119e

Please sign in to comment.