diff --git a/ruby_event_store-active_record/lib/ruby_event_store/active_record/event_repository.rb b/ruby_event_store-active_record/lib/ruby_event_store/active_record/event_repository.rb index c7e4712e7c..225f0840bc 100644 --- a/ruby_event_store-active_record/lib/ruby_event_store/active_record/event_repository.rb +++ b/ruby_event_store-active_record/lib/ruby_event_store/active_record/event_repository.rb @@ -6,12 +6,13 @@ module RubyEventStore module ActiveRecord class EventRepository POSITION_SHIFT = 1 + UNSET = Object.new.freeze - def initialize(model_factory: WithDefaultModels.new, serializer:) + def initialize(model_factory: WithDefaultModels.new, serializer:, batch_reader: UNSET) @serializer = serializer @event_klass, @stream_klass = model_factory.call - @repo_reader = EventRepositoryReader.new(@event_klass, @stream_klass, serializer) + @repo_reader = EventRepositoryReader.new(@event_klass, @stream_klass, serializer, batch_reader) @index_violation_detector = IndexViolationDetector.new(@event_klass.table_name, @stream_klass.table_name) end diff --git a/ruby_event_store-active_record/lib/ruby_event_store/active_record/event_repository_reader.rb b/ruby_event_store-active_record/lib/ruby_event_store/active_record/event_repository_reader.rb index b6158b6171..14e32a72b2 100644 --- a/ruby_event_store-active_record/lib/ruby_event_store/active_record/event_repository_reader.rb +++ b/ruby_event_store-active_record/lib/ruby_event_store/active_record/event_repository_reader.rb @@ -3,10 +3,11 @@ module RubyEventStore module ActiveRecord class EventRepositoryReader - def initialize(event_klass, stream_klass, serializer) + def initialize(event_klass, stream_klass, serializer, batch_reader) @event_klass = event_klass @stream_klass = stream_klass @serializer = serializer + @batch_reader = batch_reader end def has_event?(event_id) @@ -21,7 +22,14 @@ def last_stream_event(stream) def read(spec) stream = read_scope(spec) if spec.batched? - spec.time_sort_by ? offset_limit_batch_reader(spec, stream) : monotonic_id_batch_reader(spec, stream) + case [@batch_reader, spec.time_sort_by] + in [EventRepository::UNSET, ->(time_sort_by) { !time_sort_by.nil? }] + offset_limit_batch_reader(spec, stream) + in [EventRepository::UNSET, _] + monotonic_id_batch_reader(spec, stream) + else + @batch_reader[spec, stream] + end elsif spec.first? record_ = stream.first record(record_) if record_ @@ -106,7 +114,8 @@ def read_scope(spec) else stream = @stream_klass.includes(:event).where(stream: spec.stream.name) stream = stream.where(event_id: spec.with_ids) if spec.with_ids? - stream = stream.joins(:event).where(@event_klass.table_name => { event_type: spec.with_types }) if spec.with_types? + stream = + stream.joins(:event).where(@event_klass.table_name => { event_type: spec.with_types }) if spec.with_types? stream = stream.joins(:event).order(as_at(spec)) if spec.time_sort_by_as_at? stream = stream.joins(:event).order(as_of(spec)) if spec.time_sort_by_as_of? stream = stream.order(id: order(spec)) @@ -145,7 +154,7 @@ def start_condition(specification) start_offset_condition( specification, @stream_klass.find_by!(event_id: specification.start, stream: specification.stream.name), - @stream_klass.table_name + @stream_klass.table_name, ) rescue ::ActiveRecord::RecordNotFound raise EventNotFound.new(specification.start) @@ -155,7 +164,7 @@ def stop_condition(specification) stop_offset_condition( specification, @stream_klass.find_by!(event_id: specification.stop, stream: specification.stream.name), - @stream_klass.table_name + @stream_klass.table_name, ) rescue ::ActiveRecord::RecordNotFound raise EventNotFound.new(specification.stop) @@ -165,7 +174,7 @@ def start_condition_in_global_stream(specification) start_offset_condition( specification, @event_klass.find_by!(event_id: specification.start), - @event_klass.table_name + @event_klass.table_name, ) rescue ::ActiveRecord::RecordNotFound raise EventNotFound.new(specification.start) @@ -175,7 +184,7 @@ def stop_condition_in_global_stream(specification) stop_offset_condition( specification, @event_klass.find_by!(event_id: specification.stop), - @event_klass.table_name + @event_klass.table_name, ) rescue ::ActiveRecord::RecordNotFound raise EventNotFound.new(specification.stop) @@ -216,16 +225,14 @@ def order(spec) def record(record) record = record.event if @stream_klass === record - SerializedRecord - .new( - event_id: record.event_id, - metadata: record.metadata, - data: record.data, - event_type: record.event_type, - timestamp: record.created_at.iso8601(TIMESTAMP_PRECISION), - valid_at: (record.valid_at || record.created_at).iso8601(TIMESTAMP_PRECISION) - ) - .deserialize(serializer) + SerializedRecord.new( + event_id: record.event_id, + metadata: record.metadata, + data: record.data, + event_type: record.event_type, + timestamp: record.created_at.iso8601(TIMESTAMP_PRECISION), + valid_at: (record.valid_at || record.created_at).iso8601(TIMESTAMP_PRECISION), + ).deserialize(serializer) end end diff --git a/ruby_event_store-active_record/repository_reader_benchmark.rb b/ruby_event_store-active_record/repository_reader_benchmark.rb new file mode 100644 index 0000000000..73e945e799 --- /dev/null +++ b/ruby_event_store-active_record/repository_reader_benchmark.rb @@ -0,0 +1,89 @@ +require "ruby_event_store" +require "ruby_event_store/active_record" +require_relative "../support/helpers/schema_helper" +require "benchmark" + +helper = Object.new.extend(SchemaHelper) +helper.establish_database_connection +helper.drop_database +helper.load_database_schema + +class Event < ::ActiveRecord::Base + self.primary_key = :id + self.table_name = "event_store_events" +end + +class EventInStream < ::ActiveRecord::Base + self.primary_key = :id + self.table_name = "event_store_events_in_streams" + belongs_to :event, primary_key: :event_id +end + +Integer(ARGV.first || 1).times do + RubyEventStore::Client.new(repository: RubyEventStore::ActiveRecord::EventRepository.new(serializer: YAML)).append( + (1..1_000).map { RubyEventStore::Event.new }, + ) + print "." +end + +mk_client = ->(reader) do + RubyEventStore::Client.new( + repository: + RubyEventStore::ActiveRecord::EventRepository.new( + serializer: YAML, + batch_reader: reader, + model_factory: -> { [Event, EventInStream] }, + ), + ) +end + +record = ->(record) do + record = record.event if EventInStream === record + + RubyEventStore::SerializedRecord.new( + event_id: record.event_id, + metadata: record.metadata, + data: record.data, + event_type: record.event_type, + timestamp: record.created_at.iso8601(RubyEventStore::TIMESTAMP_PRECISION), + valid_at: (record.valid_at || record.created_at).iso8601(RubyEventStore::TIMESTAMP_PRECISION), + ).deserialize(YAML) +end + +offset_limit = ->(spec, stream) do + batch_reader = ->(offset, limit) { stream.offset(offset).limit(limit).map(&record) } + RubyEventStore::BatchEnumerator.new(spec.batch_size, spec.limit, batch_reader).each +end + +id_limit = ->(spec, stream) do + start_offset_condition = ->(specification, record_id, search_in) do + condition = "#{search_in}.id #{specification.forward? ? ">" : "<"} ?" + [condition, record_id] + end + + batch_reader = ->(offset_id, limit) do + search_in = + ( + if spec.stream.global? + Event.table_name + else + EventInStream.table_name + end + ) + records = + if offset_id.nil? + stream.limit(limit) + else + stream.where(start_offset_condition[spec, offset_id, search_in]).limit(limit) + end + [records.map(&record), records.last] + end + RubyEventStore::ActiveRecord::BatchEnumerator.new(spec.batch_size, spec.limit, batch_reader).each +end + +mk_benchmark = ->(reader) { mk_client[reader].read.each_batch { print "." } } + +Benchmark.bm(14) do |x| + x.report("offset/limit:") { mk_benchmark[offset_limit] } + x.report("id/limit:") { mk_benchmark[id_limit] } +end