Skip to content

Commit

Permalink
Add miniblockNum and eventNum to the forEachEvent iterator (#69)
Browse files Browse the repository at this point in the history
I want to add updatedAtEventNum for some items in the snapshot so that
we can do proper cache invalidation. This refactor is required to keep
things consistant when dealing with events past the snapshot
  • Loading branch information
texuf authored May 31, 2024
1 parent 106f7a2 commit e7bca36
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 29 deletions.
9 changes: 6 additions & 3 deletions core/node/events/miniblock.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,17 @@ func (b *MiniblockInfo) lastEvent() *ParsedEvent {
}
}

func (b *MiniblockInfo) forEachEvent(op func(e *ParsedEvent) (bool, error)) error {
func (b *MiniblockInfo) forEachEvent(op func(e *ParsedEvent, minibockNum int64, eventNum int64) (bool, error)) error {
blockNum := b.header().MiniblockNum
eventNum := b.header().EventNumOffset
for _, event := range b.events {
c, err := op(event)
c, err := op(event, blockNum, eventNum)
eventNum++
if !c {
return err
}
}
c, err := op(b.headerEvent)
c, err := op(b.headerEvent, blockNum, eventNum)
if !c {
return err
}
Expand Down
23 changes: 14 additions & 9 deletions core/node/events/minipool.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,34 @@ import (
type eventMap = *OrderedMap[common.Hash, *ParsedEvent]

type minipoolInstance struct {
events eventMap
generation int64
events eventMap
generation int64
eventNumOffset int64
}

func newMiniPoolInstance(events eventMap, generation int64) *minipoolInstance {
func newMiniPoolInstance(events eventMap, generation int64, eventNumOffset int64) *minipoolInstance {
return &minipoolInstance{
events: events,
generation: generation,
events: events,
generation: generation,
eventNumOffset: eventNumOffset,
}
}

func (m *minipoolInstance) copyAndAddEvent(event *ParsedEvent) *minipoolInstance {
m = &minipoolInstance{
events: m.events.Copy(1),
generation: m.generation,
events: m.events.Copy(1),
generation: m.generation,
eventNumOffset: m.eventNumOffset,
}
m.events.Set(event.Hash, event)
return m
}

func (m *minipoolInstance) forEachEvent(op func(e *ParsedEvent) (bool, error)) error {
func (m *minipoolInstance) forEachEvent(op func(e *ParsedEvent, minibockNum int64, eventNum int64) (bool, error)) error {
eventNum := m.eventNumOffset
for _, e := range m.events.Values {
cont, err := op(e)
cont, err := op(e, m.generation, eventNum)
eventNum++
if !cont {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion core/node/events/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,7 @@ func (s *streamImpl) Sub(ctx context.Context, cookie *SyncCookie, receiver SyncR

// append events from blocks
envelopes := make([]*Envelope, 0, 16)
err = s.view.forEachEvent(miniblockIndex, func(e *ParsedEvent) (bool, error) {
err = s.view.forEachEvent(miniblockIndex, func(e *ParsedEvent, minibockNum int64, eventNum int64) (bool, error) {
envelopes = append(envelopes, e.Envelope)
return true, nil
})
Expand Down
21 changes: 16 additions & 5 deletions core/node/events/stream_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,14 @@ func MakeStreamView(streamData *storage.ReadStreamFromLastSnapshotResult) (*stre
minipoolEvents.Set(parsed.Hash, parsed)
}

lastBlockHeader := miniblocks[len(miniblocks)-1].header()
generation := lastBlockHeader.MiniblockNum + 1
eventNumOffset := lastBlockHeader.EventNumOffset + int64(len(lastBlockHeader.EventHashes)) + 1 // plus one for header

return &streamViewImpl{
streamId: streamId,
blocks: miniblocks,
minipool: newMiniPoolInstance(minipoolEvents, miniblocks[len(miniblocks)-1].header().MiniblockNum+1),
minipool: newMiniPoolInstance(minipoolEvents, generation, eventNumOffset),
snapshot: snapshot,
snapshotIndex: snapshotIndex,
}, nil
Expand Down Expand Up @@ -146,10 +150,14 @@ func MakeRemoteStreamView(resp *GetStreamResponse) (*streamViewImpl, error) {
minipoolEvents.Set(parsed.Hash, parsed)
}

lastBlockHeader := miniblocks[len(miniblocks)-1].header()
generation := lastBlockHeader.MiniblockNum + 1
eventNumOffset := lastBlockHeader.EventNumOffset + int64(len(lastBlockHeader.EventHashes)) + 1 // plus one for header

return &streamViewImpl{
streamId: streamId,
blocks: miniblocks,
minipool: newMiniPoolInstance(minipoolEvents, lastMiniblockNumber+1),
minipool: newMiniPoolInstance(minipoolEvents, generation, eventNumOffset),
snapshot: snapshot,
snapshotIndex: snapshotIndex,
}, nil
Expand Down Expand Up @@ -363,10 +371,13 @@ func (r *streamViewImpl) copyAndApplyBlock(
snapshotIndex = r.snapshotIndex
}

generation := header.MiniblockNum + 1
eventNumOffset := header.EventNumOffset + int64(len(header.EventHashes)) + 1 // plus one for header

return &streamViewImpl{
streamId: r.streamId,
blocks: append(r.blocks[startIndex:], miniblock),
minipool: newMiniPoolInstance(minipoolEvents, header.MiniblockNum+1),
minipool: newMiniPoolInstance(minipoolEvents, generation, eventNumOffset),
snapshot: snapshot,
snapshotIndex: snapshotIndex,
}, nil
Expand Down Expand Up @@ -418,7 +429,7 @@ func (r *streamViewImpl) indexOfMiniblockWithNum(mininblockNum int64) (int, erro
}

// iterate over events starting at startBlock including events in the minipool
func (r *streamViewImpl) forEachEvent(startBlock int, op func(e *ParsedEvent) (bool, error)) error {
func (r *streamViewImpl) forEachEvent(startBlock int, op func(e *ParsedEvent, minibockNum int64, eventNum int64) (bool, error)) error {
if startBlock < 0 || startBlock > len(r.blocks) {
return RiverError(Err_INVALID_ARGUMENT, "iterateEvents: bad startBlock", "startBlock", startBlock)
}
Expand Down Expand Up @@ -451,7 +462,7 @@ func (r *streamViewImpl) LastEvent() *ParsedEvent {

func (r *streamViewImpl) MinipoolEnvelopes() []*Envelope {
envelopes := make([]*Envelope, 0, len(r.minipool.events.Values))
_ = r.minipool.forEachEvent(func(e *ParsedEvent) (bool, error) {
_ = r.minipool.forEachEvent(func(e *ParsedEvent, minibockNum int64, eventNum int64) (bool, error) {
envelopes = append(envelopes, e.Envelope)
return true, nil
})
Expand Down
21 changes: 15 additions & 6 deletions core/node/events/stream_view_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,11 @@ func TestLoad(t *testing.T) {
miniEnvelopes := view.MinipoolEnvelopes()
assert.Equal(t, 0, len(miniEnvelopes))

count1 := 0
newEnvelopesHashes := make([]common.Hash, 0)
_ = view.forEachEvent(0, func(e *ParsedEvent) (bool, error) {
_ = view.forEachEvent(0, func(e *ParsedEvent, minibockNum int64, eventNum int64) (bool, error) {
assert.Equal(t, int64(count1), eventNum)
count1++
newEnvelopesHashes = append(newEnvelopesHashes, e.Hash)
return true, nil
})
Expand Down Expand Up @@ -196,16 +199,22 @@ func TestLoad(t *testing.T) {
miniblockHeader, envelopes, _ := view.makeMiniblockHeader(ctx, proposal)
assert.NotNil(t, miniblockHeader.Snapshot)

// check count
count := 0
err = view.forEachEvent(0, func(e *ParsedEvent) (bool, error) {
count++
// check count2
count2 := 0
err = view.forEachEvent(0, func(e *ParsedEvent, minibockNum int64, eventNum int64) (bool, error) {
assert.Equal(t, int64(count2), eventNum)
if count2 < 3 {
assert.Equal(t, int64(0), minibockNum)
} else {
assert.Equal(t, int64(1), minibockNum)
}
count2++
return true, nil
})
assert.NoError(t, err)
assert.Equal(t, int64(3), miniblockHeader.EventNumOffset) // 3 events in the genisis miniblock
assert.Equal(t, 2, len(miniblockHeader.EventHashes)) // 2 join events added in test
assert.Equal(t, 5, count) // we should iterate over all of them
assert.Equal(t, 5, count2) // we should iterate over all of them
// test copy and apply block
// how many blocks do we currently have?
assert.Equal(t, len(view.blocks), 1)
Expand Down
6 changes: 3 additions & 3 deletions core/node/events/stream_viewstate_joinable.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (r *streamViewImpl) GetChannelMembers() (*mapset.Set[string], error) {
members.Add(userId)
}

updateFn := func(e *ParsedEvent) (bool, error) {
updateFn := func(e *ParsedEvent, minibockNum int64, eventNum int64) (bool, error) {
switch payload := e.Event.Payload.(type) {
case *protocol.StreamEvent_MemberPayload:
switch payload := payload.MemberPayload.Content.(type) {
Expand Down Expand Up @@ -67,7 +67,7 @@ func (r *streamViewImpl) GetMembership(userAddress []byte) (protocol.MembershipO
retValue = protocol.MembershipOp_SO_JOIN
}

updateFn := func(e *ParsedEvent) (bool, error) {
updateFn := func(e *ParsedEvent, minibockNum int64, eventNum int64) (bool, error) {
switch payload := e.Event.Payload.(type) {
case *protocol.StreamEvent_MemberPayload:
switch payload := payload.MemberPayload.Content.(type) {
Expand Down Expand Up @@ -101,7 +101,7 @@ func (r *streamViewImpl) GetKeySolicitations(userAddress []byte) ([]*protocol.Me
member = proto.Clone(member).(*protocol.MemberPayload_Snapshot_Member)
}

updateFn := func(e *ParsedEvent) (bool, error) {
updateFn := func(e *ParsedEvent, minibockNum int64, eventNum int64) (bool, error) {
switch payload := e.Event.Payload.(type) {
case *protocol.StreamEvent_MemberPayload:
switch payload := payload.MemberPayload.Content.(type) {
Expand Down
2 changes: 1 addition & 1 deletion core/node/events/stream_viewstate_space.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (r *streamViewImpl) GetChannelInfo(channelId shared.StreamId) (*SpacePayloa
}
channel, _ := findChannel(snap.Channels, channelId[:])

updateFn := func(e *ParsedEvent) (bool, error) {
updateFn := func(e *ParsedEvent, minibockNum int64, eventNum int64) (bool, error) {
switch payload := e.Event.Payload.(type) {
case *StreamEvent_SpacePayload:
switch spacePayload := payload.SpacePayload.Content.(type) {
Expand Down
2 changes: 1 addition & 1 deletion core/node/events/stream_viewstate_user.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (r *streamViewImpl) GetUserMembership(streamId shared.StreamId) (Membership
retValue = membership.Op
}

updateFn := func(e *ParsedEvent) (bool, error) {
updateFn := func(e *ParsedEvent, minibockNum int64, eventNum int64) (bool, error) {
switch payload := e.Event.Payload.(type) {
case *StreamEvent_UserPayload:
switch payload := payload.UserPayload.Content.(type) {
Expand Down

0 comments on commit e7bca36

Please sign in to comment.