Skip to content

Commit

Permalink
refactor: Use events to test network logic (sourcenetwork#2700)
Browse files Browse the repository at this point in the history
## Relevant issue(s)

Resolves sourcenetwork#2699
Resolves sourcenetwork#2687

## Description

This PR refactors the events package and updates the networking tests to
use events.

I think there is a bit more clean up to be done, but the majority should
be ready for review.

## Tasks

- [x] I made sure the code is well commented, particularly
hard-to-understand areas.
- [x] I made sure the repository-held documentation is changed
accordingly.
- [x] I made sure the pull request title adheres to the conventional
commit style (the subset used in the project can be found in
[tools/configs/chglog/config.yml](tools/configs/chglog/config.yml)).
- [x] I made sure to discuss its limitations such as threats to
validity, vulnerability to mistake and misuse, robustness to
invalidation of assumptions, resource requirements, ...

## How has this been tested?

`make test`

Specify the platform(s) on which this was tested:
- MacOS
  • Loading branch information
nasdf authored Jun 20, 2024
1 parent c7fc43d commit 3856b80
Show file tree
Hide file tree
Showing 41 changed files with 706 additions and 1,498 deletions.
2 changes: 0 additions & 2 deletions cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ func MakeStartCommand() *cobra.Command {
node.WithACPType(node.LocalACPType),
node.WithPeers(peers...),
// db options
db.WithUpdateEvents(),
db.WithDAGMergeEvents(),
db.WithMaxRetries(cfg.GetInt("datastore.MaxTxnRetries")),
// net node options
net.WithListenAddresses(cfg.GetStringSlice("net.p2pAddresses")...),
Expand Down
4 changes: 2 additions & 2 deletions client/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/sourcenetwork/immutable"

"github.com/sourcenetwork/defradb/datastore"
"github.com/sourcenetwork/defradb/events"
"github.com/sourcenetwork/defradb/event"
)

type CollectionName = string
Expand Down Expand Up @@ -75,7 +75,7 @@ type DB interface {
//
// It may be used to monitor database events - a new event will be yielded for each mutation.
// Note: it does not copy the queue, just the reference to it.
Events() events.Events
Events() *event.Bus

// MaxTxnRetries returns the number of retries that this DefraDB instance has been configured to
// make in the event of a transaction conflict in certain scenarios.
Expand Down
16 changes: 9 additions & 7 deletions client/mocks/db.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datastore/mocks/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func prepareDAGStore(t *testing.T) *DAGStore {
func NewTxnWithMultistore(t *testing.T) *MultiStoreTxn {
txn := NewTxn(t)
txn.EXPECT().OnSuccess(mock.Anything).Maybe()
txn.EXPECT().OnSuccessAsync(mock.Anything).Maybe()

result := &MultiStoreTxn{
Txn: txn,
Expand Down
163 changes: 163 additions & 0 deletions event/bus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
// Copyright 2024 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package event

import (
"sync"
"sync/atomic"
)

type subscribeCommand *Subscription

type unsubscribeCommand *Subscription

type publishCommand Message

type closeCommand struct{}

// Bus uses a buffered channel to manage subscribers and publish messages.
type Bus struct {
// subID is incremented for each subscriber and used to set subscriber ids.
subID atomic.Uint64
// subs is a mapping of subscriber ids to subscriptions.
subs map[uint64]*Subscription
// events is a mapping of event names to subscriber ids.
events map[Name]map[uint64]struct{}
// commandChannel manages all commands sent to the bufferedBus.
//
// It is important that all stuff gets sent through this single channel to ensure
// that the order of operations is preserved.
//
// This does mean that non-event commands can block the database if the buffer
// size is breached (e.g. if many subscribe commands occupy the buffer).
commandChannel chan any
eventBufferSize int
hasClosedChan chan struct{}
isClosed bool
// closeMutex is only locked when the bus is closing.
closeMutex sync.RWMutex
}

// NewBus creates a new event bus with the given commandBufferSize and
// eventBufferSize.
//
// Should the buffers be filled, subsequent calls on this bus will block.
func NewBus(commandBufferSize int, eventBufferSize int) *Bus {
bus := Bus{
subs: make(map[uint64]*Subscription),
events: make(map[Name]map[uint64]struct{}),
commandChannel: make(chan any, commandBufferSize),
hasClosedChan: make(chan struct{}),
eventBufferSize: eventBufferSize,
}
go bus.handleChannel()
return &bus
}

// Publish broadcasts the given message to the bus subscribers. Non-blocking.
func (b *Bus) Publish(msg Message) {
b.closeMutex.RLock()
defer b.closeMutex.RUnlock()

if b.isClosed {
return
}
b.commandChannel <- publishCommand(msg)
}

// Subscribe returns a new subscription that will receive all of the events
// contained in the given list of events.
func (b *Bus) Subscribe(events ...Name) (*Subscription, error) {
b.closeMutex.RLock()
defer b.closeMutex.RUnlock()

if b.isClosed {
return nil, ErrSubscribedToClosedChan
}
sub := &Subscription{
id: b.subID.Add(1),
value: make(chan Message, b.eventBufferSize),
events: events,
}
b.commandChannel <- subscribeCommand(sub)
return sub, nil
}

// Unsubscribe removes all event subscriptions and closes the subscription channel.
//
// Will do nothing if this object is already closed.
func (b *Bus) Unsubscribe(sub *Subscription) {
b.closeMutex.RLock()
defer b.closeMutex.RUnlock()

if b.isClosed {
return
}
b.commandChannel <- unsubscribeCommand(sub)
}

// Close unsubscribes all active subscribers and closes the command channel.
func (b *Bus) Close() {
b.closeMutex.Lock()
defer b.closeMutex.Unlock()

if b.isClosed {
return
}
b.isClosed = true
b.commandChannel <- closeCommand{}
// Wait for the close command to be handled, in order, before returning
<-b.hasClosedChan
}

func (b *Bus) handleChannel() {
for cmd := range b.commandChannel {
switch t := cmd.(type) {
case closeCommand:
for _, subscriber := range b.subs {
close(subscriber.value)
}
close(b.commandChannel)
close(b.hasClosedChan)
return

case subscribeCommand:
for _, event := range t.events {
if _, ok := b.events[event]; !ok {
b.events[event] = make(map[uint64]struct{})
}
b.events[event][t.id] = struct{}{}
}
b.subs[t.id] = t

case unsubscribeCommand:
if _, ok := b.subs[t.id]; !ok {
continue // not subscribed
}
for _, event := range t.events {
delete(b.events[event], t.id)
}
delete(b.subs, t.id)
close(t.value)

case publishCommand:
for id := range b.events[WildCardName] {
b.subs[id].value <- Message(t)
}
for id := range b.events[t.Name] {
if _, ok := b.events[WildCardName][id]; ok {
continue
}
b.subs[id].value <- Message(t)
}
}
}
}
Loading

0 comments on commit 3856b80

Please sign in to comment.