Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Use events to test network logic #2700

Merged
merged 42 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
b3f45bc
wip
nasdf Jun 5, 2024
10c522b
move event names and types to event package
nasdf Jun 5, 2024
8f870bd
refactor net events
nasdf Jun 6, 2024
138bb84
add event messages. use separate event bus for user subscriptions
nasdf Jun 7, 2024
144b392
update tests with event bus
nasdf Jun 7, 2024
243775a
Merge branch 'develop' into nasdf/refactor/events
nasdf Jun 7, 2024
6b8d97e
merge push log and dag merge events
nasdf Jun 7, 2024
05f1579
Merge branch 'develop' into nasdf/refactor/events
nasdf Jun 11, 2024
98e77d4
rename events package to event
nasdf Jun 11, 2024
2da9153
add event bus timeout
nasdf Jun 11, 2024
3725fca
add waitForMerge test util
nasdf Jun 11, 2024
45f6edd
merge event bus and simple channel implementations
nasdf Jun 11, 2024
938954e
resubscribe to events after restart
nasdf Jun 11, 2024
6e98699
update event names
nasdf Jun 12, 2024
1a1ce8f
update event bus publish logic
nasdf Jun 12, 2024
9ce688f
update command buffer size
nasdf Jun 12, 2024
7e1045f
document p2p waitForMerge require
nasdf Jun 12, 2024
573090e
add event bus interface
nasdf Jun 12, 2024
ab56124
Merge branch 'develop' into nasdf/refactor/events
nasdf Jun 12, 2024
d6a1be1
add event.Name type
nasdf Jun 12, 2024
23d4d95
fix event bus isClosed logic
nasdf Jun 12, 2024
64ebf82
remove WARNING text from bufferedBus docs
nasdf Jun 12, 2024
42c52d0
update mocks
nasdf Jun 12, 2024
7ac87b9
fix waitForMerge node address
nasdf Jun 12, 2024
311a37d
revert p2p waitForMerge change. add more info to cli peer info panic
nasdf Jun 13, 2024
cfa0ef9
fix node constructor
nasdf Jun 13, 2024
0357385
skip waitForMerge when no waitForSync present
nasdf Jun 13, 2024
3513390
revert skipWaitForMerge. use peer info from state
nasdf Jun 13, 2024
184f0cc
test db merge sequential
nasdf Jun 13, 2024
38e0c48
Merge branch 'develop' into nasdf/refactor/events
nasdf Jun 17, 2024
55aac76
add mutex to buffered bus
nasdf Jun 17, 2024
e6128ef
document bus mutex
nasdf Jun 17, 2024
801354c
add buffered bus tests
nasdf Jun 17, 2024
2dfa4d2
fix buffered bus test names
nasdf Jun 17, 2024
56136c5
remove event.Bus interface
nasdf Jun 17, 2024
726be78
make mocks
nasdf Jun 17, 2024
61ab97f
Merge branch 'develop' into nasdf/refactor/events
nasdf Jun 17, 2024
9fb2eeb
Merge branch 'develop' into nasdf/refactor/events
nasdf Jun 18, 2024
72f44ce
Merge branch 'develop' into nasdf/refactor/events
nasdf Jun 18, 2024
d0ad720
update test names
nasdf Jun 18, 2024
aeec15d
Merge branch 'develop' into nasdf/refactor/events
nasdf Jun 20, 2024
7680133
use session for dag sync
nasdf Jun 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,6 @@ func MakeStartCommand() *cobra.Command {
node.WithACPType(node.LocalACPType),
node.WithPeers(peers...),
// db options
db.WithUpdateEvents(),
db.WithDAGMergeEvents(),
db.WithMaxRetries(cfg.GetInt("datastore.MaxTxnRetries")),
// net node options
net.WithListenAddresses(cfg.GetStringSlice("net.p2pAddresses")...),
Expand Down
4 changes: 2 additions & 2 deletions client/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/sourcenetwork/immutable"

"github.com/sourcenetwork/defradb/datastore"
"github.com/sourcenetwork/defradb/events"
"github.com/sourcenetwork/defradb/event"
)

type CollectionName = string
Expand Down Expand Up @@ -75,7 +75,7 @@ type DB interface {
//
// It may be used to monitor database events - a new event will be yielded for each mutation.
// Note: it does not copy the queue, just the reference to it.
Events() events.Events
Events() event.Bus

// MaxTxnRetries returns the number of retries that this DefraDB instance has been configured to
// make in the event of a transaction conflict in certain scenarios.
Expand Down
16 changes: 9 additions & 7 deletions client/mocks/db.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datastore/mocks/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func prepareDAGStore(t *testing.T) *DAGStore {
func NewTxnWithMultistore(t *testing.T) *MultiStoreTxn {
txn := NewTxn(t)
txn.EXPECT().OnSuccess(mock.Anything).Maybe()
txn.EXPECT().OnSuccessAsync(mock.Anything).Maybe()

result := &MultiStoreTxn{
Txn: txn,
Expand Down
141 changes: 141 additions & 0 deletions event/buffered_bus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// Copyright 2024 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package event

import (
"sync/atomic"
)

type subscribeCommand *Subscription

type unsubscribeCommand *Subscription

type publishCommand Message

type closeCommand struct{}

// bufferedBus is a bus that uses a buffered channel to manage subscribers and publish messages.
type bufferedBus struct {
Copy link
Contributor

@AndrewSisley AndrewSisley Jun 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo: Please add a compile time check to ensure (and document) that this type implements the Bus interface.

i.e.:

var _ Bus = (*bufferedBus)(nil)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The constructor method does this already. I usually choose one or the other not both.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough :) Fred might ask you to change the constructor to return the concrete though 😁

// subID is incremented for each subscriber and used to set subscriber ids.
subID atomic.Uint64
// subs is a mapping of subscriber ids to subscriptions.
subs map[uint64]*Subscription
// events is a mapping of event names to subscriber ids.
events map[string]map[uint64]struct{}
// commandChannel manages all commands sent to this simpleChannel.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: what are you referring to by "simpleChannel"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. It was a copy paste error.

//
// It is important that all stuff gets sent through this single channel to ensure
// that the order of operations is preserved.
//
// WARNING: This does mean that non-event commands can block the database if the buffer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: sounds more like a NOTE, not a WARNING

// size is breached (e.g. if many subscribe commands occupy the buffer).
commandChannel chan any
eventBufferSize int
hasClosedChan chan struct{}
isClosed atomic.Bool
}

// NewBufferedBus creates a new event bus with the given commandBufferSize and
// eventBufferSize.
//
// Should the buffers be filled, subsequent calls on this bus will block.
func NewBufferedBus(commandBufferSize int, eventBufferSize int) Bus {
bus := bufferedBus{
subs: make(map[uint64]*Subscription),
events: make(map[string]map[uint64]struct{}),
commandChannel: make(chan any, commandBufferSize),
hasClosedChan: make(chan struct{}),
eventBufferSize: eventBufferSize,
}
go bus.handleChannel()
return &bus
}

func (b *bufferedBus) Publish(msg Message) {
if b.isClosed.Load() {
return
}
b.commandChannel <- publishCommand(msg)
}

func (b *bufferedBus) Subscribe(events ...string) (*Subscription, error) {
if b.isClosed.Load() {
return nil, ErrSubscribedToClosedChan

Check warning on line 71 in event/buffered_bus.go

View check run for this annotation

Codecov / codecov/patch

event/buffered_bus.go#L71

Added line #L71 was not covered by tests
}
sub := &Subscription{
id: b.subID.Add(1),
value: make(chan Message, b.eventBufferSize),
events: events,
}
b.commandChannel <- subscribeCommand(sub)
return sub, nil
}

func (b *bufferedBus) Unsubscribe(sub *Subscription) {
if b.isClosed.Load() {
return
}
b.commandChannel <- unsubscribeCommand(sub)
}

func (b *bufferedBus) Close() {
if b.isClosed.Load() {
return
}
b.isClosed.Store(true)
Copy link
Contributor

@islamaliev islamaliev Jun 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo: these 2 ops are not done atomically and might close twice. Please use compare-and-swap:

if !b.isClosed.CompareAndSwap(false, true) {
    return
}
...

Check please if you have similar occurrences.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch!

b.commandChannel <- closeCommand{}
// Wait for the close command to be handled, in order, before returning
<-b.hasClosedChan
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: this is thread-safe only under the assumption that all the calls to bufferedBus are done by the same thread. I'm not sure this is the case. Is it?

Otherwise the following scenario is possible:

  1. execution enters Subscribe method, b.isClosed == false so it proceeds but pauses just before b.commandChannel <- subscribeCommand(sub)
  2. concurrently Close is being called, sets b.isClosed to true, pushes b.commandChannel <- closeCommand{}
  3. handleChannel receives closeCommand and executes close(b.commandChannel)
  4. now the thread from step 1 resumes, executes b.commandChannel <- subscribeCommand(sub) and causes a panic.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've replaced the isClosed atomic with a RWMutex. Issue should be fixed now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's quite a broad mutex, please don't merge yet

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should only block when calling close.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I think I understand, and there is no nicer way of handling this that I can spot off the top of my head.

todo: Can you please rename mutex to closeLock or similar, we really really don't want this to be used for anything else.

todo: Can please also document the lock on its declaration in bufferedBus, explaining that it is only locked whilst closing.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AndrewSisley can you explain what you mean by broad? We use the same technique in the memory store and I think it's really fine for this as well.

Copy link
Contributor

@AndrewSisley AndrewSisley Jun 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@AndrewSisley can you explain what you mean by broad? We use the same technique in the memory store and I think it's really fine for this as well.

The lock spans every public call to this struct, it can't really get any broader :) If it only locks whilst closing that's fine, but it would be very damaging if that were to change (hence the documentation and name change requested).

The current name (mutex) is just begging for other places to lock it besides Close in the future.

}

func (b *bufferedBus) handleChannel() {
for cmd := range b.commandChannel {
switch t := cmd.(type) {
case closeCommand:
for _, subscriber := range b.subs {
close(subscriber.value)
}
close(b.commandChannel)
close(b.hasClosedChan)
return

case subscribeCommand:
for _, event := range t.events {
if _, ok := b.events[event]; !ok {
b.events[event] = make(map[uint64]struct{})
}
b.events[event][t.id] = struct{}{}
}
b.subs[t.id] = t

case unsubscribeCommand:
if _, ok := b.subs[t.id]; !ok {
continue // not subscribed

Check warning on line 121 in event/buffered_bus.go

View check run for this annotation

Codecov / codecov/patch

event/buffered_bus.go#L121

Added line #L121 was not covered by tests
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Just pointing incase easy to test, that this path is not tested.

}
for _, event := range t.events {
delete(b.events[event], t.id)
}
delete(b.subs, t.id)
close(t.value)

case publishCommand:
for id := range b.events[WildCardEventName] {
b.subs[id].value <- Message(t)

Check warning on line 131 in event/buffered_bus.go

View check run for this annotation

Codecov / codecov/patch

event/buffered_bus.go#L131

Added line #L131 was not covered by tests
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Just pointing incase easy to test, that this path is not tested.

}
for id := range b.events[t.Name] {
if _, ok := b.events[WildCardEventName][id]; ok {
continue

Check warning on line 135 in event/buffered_bus.go

View check run for this annotation

Codecov / codecov/patch

event/buffered_bus.go#L135

Added line #L135 was not covered by tests
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Just pointing incase easy to test, that this path is not tested.

}
b.subs[id].value <- Message(t)
}
}
}
}
147 changes: 147 additions & 0 deletions event/buffered_bus_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// Copyright 2024 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package event

import (
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestSimplePushIsNotBlockedWithoutSubscribers(t *testing.T) {
bus := NewBufferedBus(0, 0)
defer bus.Close()

msg := NewMessage("test", 1)
bus.Publish(msg)

// just assert that we reach this line, for the sake of having an assert
assert.True(t, true)
}

func TestSimpleSubscribersAreNotBlockedAfterClose(t *testing.T) {
bus := NewBufferedBus(0, 0)
defer bus.Close()

sub, err := bus.Subscribe("test")
assert.Nil(t, err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: it's better to use assert.NoError because it produces better output

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was copy pasted, but I've fixed them all.


bus.Close()

<-sub.Message()

// just assert that we reach this line, for the sake of having an assert
assert.True(t, true)
}

func TestSimpleEachSubscribersRecievesEachItem(t *testing.T) {
bus := NewBufferedBus(0, 0)
defer bus.Close()

msg1 := NewMessage("test", 1)
msg2 := NewMessage("test", 2)

sub1, err := bus.Subscribe("test")
assert.Nil(t, err)

sub2, err := bus.Subscribe("test")
assert.Nil(t, err)

// ordering of publish is not deterministic
// so capture each in a go routine
var wg sync.WaitGroup
var event1 Message
var event2 Message

go func() {
event1 = <-sub1.Message()
wg.Done()
}()

go func() {
event2 = <-sub2.Message()
wg.Done()
}()

wg.Add(2)
bus.Publish(msg1)
wg.Wait()

assert.Equal(t, msg1, event1)
assert.Equal(t, msg1, event2)

go func() {
event1 = <-sub1.Message()
wg.Done()
}()

go func() {
event2 = <-sub2.Message()
wg.Done()
}()

wg.Add(2)
bus.Publish(msg2)
wg.Wait()

assert.Equal(t, msg2, event1)
assert.Equal(t, msg2, event2)
}

func TestSimpleEachSubscribersRecievesEachItemGivenBufferedEventChan(t *testing.T) {
bus := NewBufferedBus(0, 2)
defer bus.Close()

msg1 := NewMessage("test", 1)
msg2 := NewMessage("test", 2)

sub1, err := bus.Subscribe("test")
assert.Nil(t, err)
sub2, err := bus.Subscribe("test")
assert.Nil(t, err)

// both inputs are added first before read, using the internal chan buffer
bus.Publish(msg1)
bus.Publish(msg2)

output1Ch1 := <-sub1.Message()
output1Ch2 := <-sub2.Message()

output2Ch1 := <-sub1.Message()
output2Ch2 := <-sub2.Message()

assert.Equal(t, msg1, output1Ch1)
assert.Equal(t, msg1, output1Ch2)

assert.Equal(t, msg2, output2Ch1)
assert.Equal(t, msg2, output2Ch2)
}

func TestSimpleSubscribersDontRecieveItemsAfterUnsubscribing(t *testing.T) {
bus := NewBufferedBus(0, 0)
defer bus.Close()

sub, err := bus.Subscribe("test")
assert.Nil(t, err)
bus.Unsubscribe(sub)

msg := NewMessage("test", 1)
bus.Publish(msg)

// tiny delay to try and make sure the internal logic would have had time
// to do its thing with the pushed item.
time.Sleep(5 * time.Millisecond)

// closing the channel will result in reads yielding the default value
assert.Equal(t, Message{}, <-sub.Message())
}
Loading
Loading