Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Use events to test network logic #2700

Merged
merged 42 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
b3f45bc
wip
nasdf Jun 5, 2024
10c522b
move event names and types to event package
nasdf Jun 5, 2024
8f870bd
refactor net events
nasdf Jun 6, 2024
138bb84
add event messages. use separate event bus for user subscriptions
nasdf Jun 7, 2024
144b392
update tests with event bus
nasdf Jun 7, 2024
243775a
Merge branch 'develop' into nasdf/refactor/events
nasdf Jun 7, 2024
6b8d97e
merge push log and dag merge events
nasdf Jun 7, 2024
05f1579
Merge branch 'develop' into nasdf/refactor/events
nasdf Jun 11, 2024
98e77d4
rename events package to event
nasdf Jun 11, 2024
2da9153
add event bus timeout
nasdf Jun 11, 2024
3725fca
add waitForMerge test util
nasdf Jun 11, 2024
45f6edd
merge event bus and simple channel implementations
nasdf Jun 11, 2024
938954e
resubscribe to events after restart
nasdf Jun 11, 2024
6e98699
update event names
nasdf Jun 12, 2024
1a1ce8f
update event bus publish logic
nasdf Jun 12, 2024
9ce688f
update command buffer size
nasdf Jun 12, 2024
7e1045f
document p2p waitForMerge require
nasdf Jun 12, 2024
573090e
add event bus interface
nasdf Jun 12, 2024
ab56124
Merge branch 'develop' into nasdf/refactor/events
nasdf Jun 12, 2024
d6a1be1
add event.Name type
nasdf Jun 12, 2024
23d4d95
fix event bus isClosed logic
nasdf Jun 12, 2024
64ebf82
remove WARNING text from bufferedBus docs
nasdf Jun 12, 2024
42c52d0
update mocks
nasdf Jun 12, 2024
7ac87b9
fix waitForMerge node address
nasdf Jun 12, 2024
311a37d
revert p2p waitForMerge change. add more info to cli peer info panic
nasdf Jun 13, 2024
cfa0ef9
fix node constructor
nasdf Jun 13, 2024
0357385
skip waitForMerge when no waitForSync present
nasdf Jun 13, 2024
3513390
revert skipWaitForMerge. use peer info from state
nasdf Jun 13, 2024
184f0cc
test db merge sequential
nasdf Jun 13, 2024
38e0c48
Merge branch 'develop' into nasdf/refactor/events
nasdf Jun 17, 2024
55aac76
add mutex to buffered bus
nasdf Jun 17, 2024
e6128ef
document bus mutex
nasdf Jun 17, 2024
801354c
add buffered bus tests
nasdf Jun 17, 2024
2dfa4d2
fix buffered bus test names
nasdf Jun 17, 2024
56136c5
remove event.Bus interface
nasdf Jun 17, 2024
726be78
make mocks
nasdf Jun 17, 2024
61ab97f
Merge branch 'develop' into nasdf/refactor/events
nasdf Jun 17, 2024
9fb2eeb
Merge branch 'develop' into nasdf/refactor/events
nasdf Jun 18, 2024
72f44ce
Merge branch 'develop' into nasdf/refactor/events
nasdf Jun 18, 2024
d0ad720
update test names
nasdf Jun 18, 2024
aeec15d
Merge branch 'develop' into nasdf/refactor/events
nasdf Jun 20, 2024
7680133
use session for dag sync
nasdf Jun 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,6 @@ func MakeStartCommand() *cobra.Command {
node.WithACPType(node.LocalACPType),
node.WithPeers(peers...),
// db options
db.WithUpdateEvents(),
db.WithDAGMergeEvents(),
db.WithMaxRetries(cfg.GetInt("datastore.MaxTxnRetries")),
// net node options
net.WithListenAddresses(cfg.GetStringSlice("net.p2pAddresses")...),
Expand Down
2 changes: 1 addition & 1 deletion client/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ type DB interface {
//
// It may be used to monitor database events - a new event will be yielded for each mutation.
// Note: it does not copy the queue, just the reference to it.
Events() events.Events
Events() *events.Bus

// MaxTxnRetries returns the number of retries that this DefraDB instance has been configured to
// make in the event of a transaction conflict in certain scenarios.
Expand Down
14 changes: 8 additions & 6 deletions client/mocks/db.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

138 changes: 138 additions & 0 deletions events/bus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright 2024 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package events

import (
"sync"
)

// Message contains event info.
type Message struct {
// Name is the name of the event this message was generated from.
Name string
// Data contains optional event information.
Data any
}

// NewMessage returns a new message with the given name and optional data.
func NewMessage(name string, data any) Message {
return Message{name, data}
}

// Subscription is a read-only event stream.
type Subscription struct {
id int
value chan Message
events []string
}

// Message returns the next event value from the subscription.
func (s *Subscription) Message() <-chan Message {
return s.value
}

// Events returns the names of all subscribed events.
func (s *Subscription) Events() []string {
return s.events

Check warning on line 44 in events/bus.go

View check run for this annotation

Codecov / codecov/patch

events/bus.go#L43-L44

Added lines #L43 - L44 were not covered by tests
}

// Bus is used to publish and subscribe to events.
nasdf marked this conversation as resolved.
Show resolved Hide resolved
type Bus struct {
subId int
nasdf marked this conversation as resolved.
Show resolved Hide resolved
nasdf marked this conversation as resolved.
Show resolved Hide resolved
subs map[int]*Subscription
events map[string]map[int]any
nasdf marked this conversation as resolved.
Show resolved Hide resolved
mutex sync.RWMutex
nasdf marked this conversation as resolved.
Show resolved Hide resolved
}

// NewBus returns a new event bus.
func NewBus() *Bus {
return &Bus{
subs: make(map[int]*Subscription),
events: make(map[string]map[int]any),
}
}

// Publish publishes the given event to all subscribers.
func (b *Bus) Publish(msg Message) {
b.mutex.RLock()
defer b.mutex.RUnlock()

subscribers := make(map[int]any)
nasdf marked this conversation as resolved.
Show resolved Hide resolved
for id := range b.events[msg.Name] {
subscribers[id] = struct{}{}
}
for id := range b.events[WildCardEventName] {
subscribers[id] = struct{}{}
}

for id := range subscribers {
select {
case b.subs[id].value <- msg:
// published event
default:
// channel full
}
}
}

// Subscribe returns a new channel that will receive all of the subscribed events.
func (b *Bus) Subscribe(size int, events ...string) *Subscription {
b.mutex.Lock()
defer b.mutex.Unlock()

sub := &Subscription{
id: b.subId,
value: make(chan Message, size),
events: events,
}

for _, event := range events {
if _, ok := b.events[event]; !ok {
b.events[event] = make(map[int]any)
}
b.events[event][sub.id] = struct{}{}
}

b.subId++
b.subs[sub.id] = sub
return sub
}

// Unsubscribe unsubscribes from all events and closes the event channel of the given subscription.
func (b *Bus) Unsubscribe(sub *Subscription) {
b.mutex.Lock()
defer b.mutex.Unlock()

if _, ok := b.subs[sub.id]; !ok {
return // not subscribed
}
for _, event := range sub.events {
delete(b.events[event], sub.id)
}

delete(b.subs, sub.id)
close(sub.value)
}

// Close closes the event bus by unsubscribing all subscribers.
func (b *Bus) Close() {
var subs []*Subscription

b.mutex.RLock()
for _, sub := range b.subs {
subs = append(subs, sub)
}
islamaliev marked this conversation as resolved.
Show resolved Hide resolved
b.mutex.RUnlock()

for _, sub := range subs {
b.Unsubscribe(sub)
}
}
34 changes: 34 additions & 0 deletions events/bus_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2024 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package events

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestBusPublish(t *testing.T) {
nasdf marked this conversation as resolved.
Show resolved Hide resolved
bus := NewBus()
defer bus.Close()

sub1 := bus.Subscribe(1, "test")
sub2 := bus.Subscribe(1, WildCardEventName)

msg := NewMessage("test", "hello")
bus.Publish(msg)

event1 := <-sub1.Message()
assert.Equal(t, msg, event1)

event2 := <-sub2.Message()
assert.Equal(t, msg, event2)
}
33 changes: 0 additions & 33 deletions events/dag_sync.go

This file was deleted.

45 changes: 0 additions & 45 deletions events/db_update.go

This file was deleted.

19 changes: 0 additions & 19 deletions events/errors.go

This file was deleted.

Loading
Loading