diff --git a/ruby_event_store/lib/ruby_event_store/projection.rb b/ruby_event_store/lib/ruby_event_store/projection.rb index d06a1a62a0..ecd6a979f7 100644 --- a/ruby_event_store/lib/ruby_event_store/projection.rb +++ b/ruby_event_store/lib/ruby_event_store/projection.rb @@ -2,94 +2,45 @@ module RubyEventStore class Projection - private_class_method :new + ANONYMOUS_CLASS = "# { {} } + @init = -> { initial_state } end - attr_reader :streams, :handlers - - def init(handler) - @init = handler - self - end + def on(*event_klasses, &block) + raise(ArgumentError, 'No handler block given') unless block_given? - def when(events, handler) - Array(events).each { |event| handlers[event.to_s] = handler } + event_klasses.each do |event_klass| + name = event_klass.to_s + raise(ArgumentError, 'Anonymous class is missing name') if name.start_with? ANONYMOUS_CLASS + @handlers[name] = ->(state, event) { block.call(state, event) } + end self end - def initial_state - @init.call - end - - def current_state - @current_state ||= initial_state - end - - def call(event) - handlers.fetch(event.event_type).(current_state, event) - end - - def handled_events - handlers.keys - end - - def run(event_store, start: nil, count: PAGE_SIZE) + def call(*scopes) return initial_state if handled_events.empty? - streams.any? ? reduce_from_streams(event_store, start, count) : reduce_from_all_streams(event_store, start, count) - end - - private - def valid_starting_point?(start) - return true unless start - streams.any? ? (start.instance_of?(Array) && start.size === streams.size) : start.instance_of?(String) + scopes.reduce(initial_state) do |state, scope| + scope.of_types(handled_events).reduce(state, &method(:transition)) + end end - def reduce_from_streams(event_store, start, count) - raise ArgumentError.new("Start must be an array with event ids") unless valid_starting_point?(start) - streams - .zip(start_events(start)) - .reduce(initial_state) do |state, (stream_name, start_event_id)| - read_scope(event_store, stream_name, count, start_event_id).reduce(state, &method(:transition)) - end - end - - def reduce_from_all_streams(event_store, start, count) - raise ArgumentError.new("Start must be valid event id") unless valid_starting_point?(start) - read_scope(event_store, nil, count, start).reduce(initial_state, &method(:transition)) - end + private - def read_scope(event_store, stream, count, start) - scope = event_store.read.in_batches(count) - scope = scope.of_type(handled_events) - scope = scope.stream(stream) if stream - scope = scope.from(start) if start - scope + def initial_state + @init.call end - def start_events(start) - start ? start : Array.new + def handled_events + @handlers.keys end def transition(state, event) - handlers.fetch(event.event_type).call(state, event) - state + @handlers.fetch(event.event_type).call(state, event) end end end diff --git a/ruby_event_store/spec/projection_spec.rb b/ruby_event_store/spec/projection_spec.rb index a1ec514431..982db29e7a 100644 --- a/ruby_event_store/spec/projection_spec.rb +++ b/ruby_event_store/spec/projection_spec.rb @@ -25,12 +25,11 @@ module RubyEventStore account_balance = Projection - .from_stream(stream_name) - .init(-> { { total: 0 } }) - .when(MoneyDeposited, ->(state, event) { state[:total] += event.data[:amount] }) - .when(MoneyWithdrawn, ->(state, event) { state[:total] -= event.data[:amount] }) - .run(event_store) - expect(account_balance).to eq(total: 25) + .new(0) + .on(MoneyDeposited) { |state, event| state += event.data[:amount] } + .on(MoneyWithdrawn) { |state, event| state -= event.data[:amount] } + .call(event_store.read) + expect(account_balance).to eq(25) end specify "reduce events from many streams" do @@ -40,27 +39,11 @@ module RubyEventStore account_balance = Projection - .from_stream(%w[Customer$1 Customer$3]) - .init(-> { { total: 0 } }) - .when(MoneyDeposited, ->(state, event) { state[:total] += event.data[:amount] }) - .when(MoneyWithdrawn, ->(state, event) { state[:total] -= event.data[:amount] }) - .run(event_store) - expect(account_balance).to eq(total: 5) - end - - specify "raises proper errors when wrong argument were passed (stream mode)" do - projection = - Projection - .from_stream(%w[Customer$1 Customer$2]) - .init(-> { { total: 0 } }) - .when(MoneyDeposited, ->(state, event) { state[:total] += event.data[:amount] }) - .when(MoneyWithdrawn, ->(state, event) { state[:total] -= event.data[:amount] }) - expect { projection.run(event_store, start: :last) }.to raise_error ArgumentError, - "Start must be an array with event ids" - expect { projection.run(event_store, start: 0.7) }.to raise_error ArgumentError, - "Start must be an array with event ids" - expect { projection.run(event_store, start: [SecureRandom.uuid]) }.to raise_error ArgumentError, - "Start must be an array with event ids" + .new(0) + .on(MoneyDeposited) { |state, event| state += event.data[:amount] } + .on(MoneyWithdrawn) { |state, event| state -= event.data[:amount] } + .call(event_store.read.stream("Customer$1"), event_store.read.stream("Customer$3")) + expect(account_balance).to eq(5) end specify "take events from all streams" do @@ -71,28 +54,15 @@ module RubyEventStore account_balance = Projection - .from_all_streams - .init(-> { { total: 0 } }) - .when(MoneyDeposited, ->(state, event) { state[:total] += event.data[:amount] }) - .when(MoneyWithdrawn, ->(state, event) { state[:total] -= event.data[:amount] }) + .new(0) + .on(MoneyDeposited) { |state, event| state += event.data[:amount] } + .on(MoneyWithdrawn) { |state, event| state -= event.data[:amount] } + .call(event_store.read) - expect(account_balance.run(event_store)).to eq(total: 1) + expect(account_balance).to eq(1) end - specify "raises proper errors when wrong argument were pass (all streams mode)" do - projection = - Projection - .from_all_streams - .init(-> { { total: 0 } }) - .when(MoneyDeposited, ->(state, event) { state[:total] += event.data[:amount] }) - .when(MoneyWithdrawn, ->(state, event) { state[:total] -= event.data[:amount] }) - expect { projection.run(event_store, start: :last) }.to raise_error ArgumentError, "Start must be valid event id" - expect { projection.run(event_store, start: 0.7) }.to raise_error ArgumentError, "Start must be valid event id" - expect { projection.run(event_store, start: [SecureRandom.uuid]) }.to raise_error ArgumentError, - "Start must be valid event id" - end - - specify "empty hash is default inital state" do + specify "state could be more complex than simple value" do event_store.append( [ MoneyDeposited.new(data: { amount: 10 }), @@ -103,11 +73,10 @@ module RubyEventStore ) stats = - Projection - .from_stream(stream_name) - .when(MoneyDeposited, ->(state, event) { state[:last_deposit] = event.data[:amount] }) - .when(MoneyWithdrawn, ->(state, event) { state[:last_withdrawal] = event.data[:amount] }) - .run(event_store) + Projection.new({}) + .on(MoneyDeposited) { |state, event| state[:last_deposit] = event.data[:amount]; state } + .on(MoneyWithdrawn) { |state, event| state[:last_withdrawal] = event.data[:amount]; state } + .call(event_store.read.stream(stream_name)) expect(stats).to eq(last_deposit: 20, last_withdrawal: 5) end @@ -119,11 +88,10 @@ module RubyEventStore deposits = Projection - .from_stream(stream_name) - .init(-> { { total: 0 } }) - .when(MoneyDeposited, ->(state, event) { state[:total] += event.data[:amount] }) - .run(event_store) - expect(deposits).to eq(total: 10) + .new(0) + .on(MoneyDeposited) { |state, event| state += event.data[:amount] } + .call(event_store.read.stream(stream_name)) + expect(deposits).to eq(10) end specify "subsrcibe one handler to many events" do @@ -134,35 +102,10 @@ module RubyEventStore cashflow = Projection - .from_stream(stream_name) - .init(-> { { total: 0 } }) - .when([MoneyDeposited, MoneyWithdrawn], ->(state, event) { state[:total] += event.data[:amount] }) - .run(event_store) - expect(cashflow).to eq(total: 12) - end - - specify "subscribe to events" do - deposits = - Projection - .from_stream(stream_name) - .init(-> { { total: 0 } }) - .when(MoneyDeposited, ->(state, event) { state[:total] += event.data[:amount] }) - event_store.subscribe(deposits, to: deposits.handled_events) - event_store.publish( - [MoneyDeposited.new(data: { amount: 10 }), MoneyDeposited.new(data: { amount: 5 })], - stream_name: stream_name - ) - - expect(deposits.current_state).to eq(total: 15) - end - - specify "using default constructor" do - expect { Projection.new(stream_name) }.to raise_error(NoMethodError, /private method `new'/) - end - - specify "at least one stream must be given" do - expect { Projection.from_stream([]) }.to raise_error(ArgumentError, "At least one stream must be given") - expect { Projection.from_stream(nil) }.to raise_error(ArgumentError, "At least one stream must be given") + .new(0) + .on(MoneyDeposited, MoneyWithdrawn) { |state, event| state += event.data[:amount] } + .call(event_store.read.stream(stream_name)) + expect(cashflow).to eq(12) end specify "all events from the stream must be read (starting from begining of the stream)" do @@ -179,12 +122,11 @@ module RubyEventStore balance = Projection - .from_stream(stream_name) - .init(-> { { total: 0 } }) - .when([MoneyDeposited], ->(state, event) { state[:total] += event.data[:amount] }) - .when([MoneyWithdrawn], ->(state, event) { state[:total] -= event.data[:amount] }) - .run(event_store, count: 2) - expect(balance).to eq(total: 14) + .new(0) + .on(MoneyDeposited) { |state, event| state += event.data[:amount] } + .on(MoneyWithdrawn) { |state, event| state -= event.data[:amount] } + .call(event_store.read.stream(stream_name).in_batches(2)) + expect(balance).to eq(14) end specify "all events from the stream must be read (starting from given event)" do @@ -201,12 +143,11 @@ module RubyEventStore balance = Projection - .from_stream(stream_name) - .init(-> { { total: 0 } }) - .when([MoneyDeposited], ->(state, event) { state[:total] += event.data[:amount] }) - .when([MoneyWithdrawn], ->(state, event) { state[:total] -= event.data[:amount] }) - .run(event_store, start: [starting.event_id], count: 2) - expect(balance).to eq(total: 6) + .new(0) + .on(MoneyDeposited) { |state, event| state += event.data[:amount] } + .on(MoneyWithdrawn) { |state, event| state -= event.data[:amount] } + .call(event_store.read.stream(stream_name).from(starting.event_id).in_batches(2)) + expect(balance).to eq(6) end specify "all events from all streams must be read (starting from begining of each stream)" do @@ -218,12 +159,11 @@ module RubyEventStore balance = Projection - .from_all_streams - .init(-> { { total: 0 } }) - .when([MoneyDeposited], ->(state, event) { state[:total] += event.data[:amount] }) - .when([MoneyWithdrawn], ->(state, event) { state[:total] -= event.data[:amount] }) - .run(event_store, count: 2) - expect(balance).to eq(total: 14) + .new(0) + .on(MoneyDeposited) { |state, event| state += event.data[:amount] } + .on(MoneyWithdrawn) { |state, event| state -= event.data[:amount] } + .call(event_store.read.in_batches(2)) + expect(balance).to eq(14) end specify "all events from all streams must be read (starting from given event)" do @@ -235,12 +175,11 @@ module RubyEventStore balance = Projection - .from_all_streams - .init(-> { { total: 0 } }) - .when([MoneyDeposited], ->(state, event) { state[:total] += event.data[:amount] }) - .when([MoneyWithdrawn], ->(state, event) { state[:total] -= event.data[:amount] }) - .run(event_store, start: starting.event_id, count: 2) - expect(balance).to eq(total: 6) + .new(0) + .on(MoneyDeposited) { |state, event| state += event.data[:amount] } + .on(MoneyWithdrawn) { |state, event| state -= event.data[:amount] } + .call(event_store.read.from(starting.event_id).in_batches(2)) + expect(balance).to eq(6) end specify "only events that have handlers must be read" do @@ -260,54 +199,54 @@ module RubyEventStore balance = Projection - .from_all_streams - .init(-> { { total: 0 } }) - .when([MoneyDeposited], ->(state, event) { state[:total] += event.data[:amount] }) - .when([MoneyWithdrawn, MoneyLost], ->(state, event) { state[:total] -= event.data[:amount] }) - .run(event_store, count: 100) - expect(balance).to eq(total: 6) + .new(0) + .on(MoneyDeposited) { |state, event| state += event.data[:amount] } + .on(MoneyWithdrawn, MoneyLost) { |state, event| state -= event.data[:amount] } + .call(event_store.read.in_batches(100)) + expect(balance).to eq(6) end specify do specification = Specification.new(SpecificationReader.new(repository, mapper)) - expected = specification.in_batches(2).of_type([MoneyDeposited, MoneyWithdrawn]).result - expect(repository).to receive(:read).with(expected).and_return([]) + scope = specification.in_batches(2).of_type([MoneyDeposited, MoneyWithdrawn]) + expect(repository).to receive(:read).with(scope.result).and_return([]) Projection - .from_all_streams - .init(-> { { total: 0 } }) - .when(MoneyDeposited, ->(state, event) { state[:total] += event.data[:amount] }) - .when(MoneyWithdrawn, ->(state, event) { state[:total] -= event.data[:amount] }) - .run(event_store, count: 2) + .new(0) + .on(MoneyDeposited) { |state, event| state += event.data[:amount] } + .on(MoneyWithdrawn) { |state, event| state -= event.data[:amount] } + .call(scope) end - specify do - specification = Specification.new(SpecificationReader.new(repository, mapper)) - expected = specification.in_batches(2).of_type([MoneyDeposited, MoneyWithdrawn]).stream("FancyStream").result - expect(repository).to receive(:read).with(expected).and_return([]) + specify "default initial state" do + expect(Projection.new.call([])).to eq(nil) + end - Projection - .from_stream("FancyStream") - .init(-> { { total: 0 } }) - .when(MoneyDeposited, ->(state, event) { state[:total] += event.data[:amount] }) - .when(MoneyWithdrawn, ->(state, event) { state[:total] -= event.data[:amount] }) - .run(event_store, count: 2) + specify "block must be given to on event handlers" do + expect do + Projection.new.on(MoneyDeposited) + end.to raise_error(ArgumentError, "No handler block given") + end + + it "does not support anonymous events" do + expect do + Projection.new.on(Class.new) { |_state, _event| } + end.to raise_error(ArgumentError, "Anonymous class is missing name") end specify do expect(repository).not_to receive(:read) - - state = Projection.from_all_streams.init(-> { { total: 0 } }).run(event_store, count: 2) - - expect(state).to eq({ total: 0 }) + state = Projection.new.call(event_store.read) + expect(state).to eq(nil) end specify do expect(repository).not_to receive(:read) - state = Projection.from_all_streams.run(event_store) + initial_state = Object.new + state = Projection.new(initial_state).call(event_store.read) - expect(state).to eq({}) + expect(state).to eq(initial_state) end end end