Skip to content

Commit

Permalink
add event bus interface
Browse files Browse the repository at this point in the history
  • Loading branch information
nasdf committed Jun 12, 2024
1 parent 7e1045f commit 573090e
Show file tree
Hide file tree
Showing 8 changed files with 169 additions and 145 deletions.
2 changes: 1 addition & 1 deletion client/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ type DB interface {
//
// It may be used to monitor database events - a new event will be yielded for each mutation.
// Note: it does not copy the queue, just the reference to it.
Events() *event.Bus
Events() event.Bus

// MaxTxnRetries returns the number of retries that this DefraDB instance has been configured to
// make in the event of a transaction conflict in certain scenarios.
Expand Down
141 changes: 141 additions & 0 deletions event/buffered_bus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
// Copyright 2024 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package event

import (
"sync/atomic"
)

type subscribeCommand *Subscription

type unsubscribeCommand *Subscription

type publishCommand Message

type closeCommand struct{}

// bufferedBus is a bus that uses a buffered channel to manage subscribers and publish messages.
type bufferedBus struct {
// subID is incremented for each subscriber and used to set subscriber ids.
subID atomic.Uint64
// subs is a mapping of subscriber ids to subscriptions.
subs map[uint64]*Subscription
// events is a mapping of event names to subscriber ids.
events map[string]map[uint64]struct{}
// commandChannel manages all commands sent to this simpleChannel.
//
// It is important that all stuff gets sent through this single channel to ensure
// that the order of operations is preserved.
//
// WARNING: This does mean that non-event commands can block the database if the buffer
// size is breached (e.g. if many subscribe commands occupy the buffer).
commandChannel chan any
eventBufferSize int
hasClosedChan chan struct{}
isClosed atomic.Bool
}

// NewBufferedBus creates a new event bus with the given commandBufferSize and
// eventBufferSize.
//
// Should the buffers be filled, subsequent calls on this bus will block.
func NewBufferedBus(commandBufferSize int, eventBufferSize int) Bus {
bus := bufferedBus{
subs: make(map[uint64]*Subscription),
events: make(map[string]map[uint64]struct{}),
commandChannel: make(chan any, commandBufferSize),
hasClosedChan: make(chan struct{}),
eventBufferSize: eventBufferSize,
}
go bus.handleChannel()
return &bus
}

func (b *bufferedBus) Publish(msg Message) {
if b.isClosed.Load() {
return
}
b.commandChannel <- publishCommand(msg)
}

func (b *bufferedBus) Subscribe(events ...string) (*Subscription, error) {
if b.isClosed.Load() {
return nil, ErrSubscribedToClosedChan

Check warning on line 71 in event/buffered_bus.go

View check run for this annotation

Codecov / codecov/patch

event/buffered_bus.go#L71

Added line #L71 was not covered by tests
}
sub := &Subscription{
id: b.subID.Add(1),
value: make(chan Message, b.eventBufferSize),
events: events,
}
b.commandChannel <- subscribeCommand(sub)
return sub, nil
}

func (b *bufferedBus) Unsubscribe(sub *Subscription) {
if b.isClosed.Load() {
return
}
b.commandChannel <- unsubscribeCommand(sub)
}

func (b *bufferedBus) Close() {
if b.isClosed.Load() {
return
}
b.isClosed.Store(true)
b.commandChannel <- closeCommand{}
// Wait for the close command to be handled, in order, before returning
<-b.hasClosedChan
}

func (b *bufferedBus) handleChannel() {
for cmd := range b.commandChannel {
switch t := cmd.(type) {
case closeCommand:
for _, subscriber := range b.subs {
close(subscriber.value)
}
close(b.commandChannel)
close(b.hasClosedChan)
return

case subscribeCommand:
for _, event := range t.events {
if _, ok := b.events[event]; !ok {
b.events[event] = make(map[uint64]struct{})
}
b.events[event][t.id] = struct{}{}
}
b.subs[t.id] = t

case unsubscribeCommand:
if _, ok := b.subs[t.id]; !ok {
continue // not subscribed

Check warning on line 121 in event/buffered_bus.go

View check run for this annotation

Codecov / codecov/patch

event/buffered_bus.go#L121

Added line #L121 was not covered by tests
}
for _, event := range t.events {
delete(b.events[event], t.id)
}
delete(b.subs, t.id)
close(t.value)

case publishCommand:
for id := range b.events[WildCardEventName] {
b.subs[id].value <- Message(t)

Check warning on line 131 in event/buffered_bus.go

View check run for this annotation

Codecov / codecov/patch

event/buffered_bus.go#L131

Added line #L131 was not covered by tests
}
for id := range b.events[t.Name] {
if _, ok := b.events[WildCardEventName][id]; ok {
continue

Check warning on line 135 in event/buffered_bus.go

View check run for this annotation

Codecov / codecov/patch

event/buffered_bus.go#L135

Added line #L135 was not covered by tests
}
b.subs[id].value <- Message(t)
}
}
}
}
10 changes: 5 additions & 5 deletions event/bus_test.go → event/buffered_bus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
)

func TestSimplePushIsNotBlockedWithoutSubscribers(t *testing.T) {
bus := NewBus(0, 0)
bus := NewBufferedBus(0, 0)
defer bus.Close()

msg := NewMessage("test", 1)
Expand All @@ -30,7 +30,7 @@ func TestSimplePushIsNotBlockedWithoutSubscribers(t *testing.T) {
}

func TestSimpleSubscribersAreNotBlockedAfterClose(t *testing.T) {
bus := NewBus(0, 0)
bus := NewBufferedBus(0, 0)
defer bus.Close()

sub, err := bus.Subscribe("test")
Expand All @@ -45,7 +45,7 @@ func TestSimpleSubscribersAreNotBlockedAfterClose(t *testing.T) {
}

func TestSimpleEachSubscribersRecievesEachItem(t *testing.T) {
bus := NewBus(0, 0)
bus := NewBufferedBus(0, 0)
defer bus.Close()

msg1 := NewMessage("test", 1)
Expand Down Expand Up @@ -99,7 +99,7 @@ func TestSimpleEachSubscribersRecievesEachItem(t *testing.T) {
}

func TestSimpleEachSubscribersRecievesEachItemGivenBufferedEventChan(t *testing.T) {
bus := NewBus(0, 2)
bus := NewBufferedBus(0, 2)
defer bus.Close()

msg1 := NewMessage("test", 1)
Expand Down Expand Up @@ -128,7 +128,7 @@ func TestSimpleEachSubscribersRecievesEachItemGivenBufferedEventChan(t *testing.
}

func TestSimpleSubscribersDontRecieveItemsAfterUnsubscribing(t *testing.T) {
bus := NewBus(0, 0)
bus := NewBufferedBus(0, 0)
defer bus.Close()

sub, err := bus.Subscribe("test")
Expand Down
149 changes: 16 additions & 133 deletions event/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,25 @@

package event

import (
"sync/atomic"
)

type subscribeCommand *Subscription
// Bus is an event bus used to broadcasts messages to subscribers.
type Bus interface {
// Subscribe subscribes to the Channel, returning a channel by which events can
// be read from, or an error should one occur (e.g. if this object is closed).
//
// This function is non-blocking unless the subscription-buffer is full.
Subscribe(events ...string) (*Subscription, error)

type unsubscribeCommand *Subscription
// Unsubscribe unsubscribes from the Channel, closing the provided channel.
//
// Will do nothing if this object is already closed.
Unsubscribe(sub *Subscription)

type publishCommand Message
// Publish pushes the given item into this channel. Non-blocking.
Publish(msg Message)

type closeCommand struct{}
// Close closes this Channel, and any owned or subscribing channels.
Close()
}

// Message contains event info.
type Message struct {
Expand Down Expand Up @@ -51,128 +59,3 @@ func (s *Subscription) Message() <-chan Message {
func (s *Subscription) Events() []string {
return s.events

Check warning on line 60 in event/bus.go

View check run for this annotation

Codecov / codecov/patch

event/bus.go#L59-L60

Added lines #L59 - L60 were not covered by tests
}

// Bus is used to broadcast events to subscribers.
type Bus struct {
// subID is incremented for each subscriber and used to set subscriber ids.
subID atomic.Uint64
// subs is a mapping of subscriber ids to subscriptions.
subs map[uint64]*Subscription
// events is a mapping of event names to subscriber ids.
events map[string]map[uint64]struct{}
// commandChannel manages all commands sent to this simpleChannel.
//
// It is important that all stuff gets sent through this single channel to ensure
// that the order of operations is preserved.
//
// WARNING: This does mean that non-event commands can block the database if the buffer
// size is breached (e.g. if many subscribe commands occupy the buffer).
commandChannel chan any
eventBufferSize int
hasClosedChan chan struct{}
isClosed atomic.Bool
}

// NewBus creates a new event bus with the given commandBufferSize and
// eventBufferSize.
//
// Should the buffers be filled subsequent calls to functions on this object may start to block.
func NewBus(commandBufferSize int, eventBufferSize int) *Bus {
bus := Bus{
subs: make(map[uint64]*Subscription),
events: make(map[string]map[uint64]struct{}),
commandChannel: make(chan any, commandBufferSize),
hasClosedChan: make(chan struct{}),
eventBufferSize: eventBufferSize,
}
go bus.handleChannel()
return &bus
}

// Publish publishes the given event message to all subscribers.
func (b *Bus) Publish(msg Message) {
if b.isClosed.Load() {
return
}
b.commandChannel <- publishCommand(msg)
}

// Subscribe returns a new channel that will receive all of the subscribed events.
func (b *Bus) Subscribe(events ...string) (*Subscription, error) {
if b.isClosed.Load() {
return nil, ErrSubscribedToClosedChan
}

sub := &Subscription{
id: b.subID.Add(1),
value: make(chan Message, b.eventBufferSize),
events: events,
}

b.commandChannel <- subscribeCommand(sub)
return sub, nil
}

// Unsubscribe unsubscribes from all events and closes the event channel of the given subscription.
func (b *Bus) Unsubscribe(sub *Subscription) {
if b.isClosed.Load() {
return
}
b.commandChannel <- unsubscribeCommand(sub)
}

// Close closes the event bus by unsubscribing all subscribers.
func (b *Bus) Close() {
if b.isClosed.Load() {
return
}
b.isClosed.Store(true)
b.commandChannel <- closeCommand{}

// Wait for the close command to be handled, in order, before returning
<-b.hasClosedChan
}

func (b *Bus) handleChannel() {
for cmd := range b.commandChannel {
switch t := cmd.(type) {
case closeCommand:
for _, subscriber := range b.subs {
close(subscriber.value)
}
close(b.commandChannel)
close(b.hasClosedChan)
return

case subscribeCommand:
for _, event := range t.events {
if _, ok := b.events[event]; !ok {
b.events[event] = make(map[uint64]struct{})
}
b.events[event][t.id] = struct{}{}
}
b.subs[t.id] = t

case unsubscribeCommand:
if _, ok := b.subs[t.id]; !ok {
continue // not subscribed
}
for _, event := range t.events {
delete(b.events[event], t.id)
}
delete(b.subs, t.id)
close(t.value)

case publishCommand:
for id := range b.events[WildCardEventName] {
b.subs[id].value <- Message(t)
}
for id := range b.events[t.Name] {
if _, ok := b.events[WildCardEventName][id]; ok {
continue
}
b.subs[id].value <- Message(t)
}
}
}
}
2 changes: 1 addition & 1 deletion http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ func (c *Client) Headstore() ds.Read {
panic("client side database")
}

func (c *Client) Events() *event.Bus {
func (c *Client) Events() event.Bus {

Check warning on line 454 in http/client.go

View check run for this annotation

Codecov / codecov/patch

http/client.go#L454

Added line #L454 was not covered by tests
panic("client side database")
}

Expand Down
6 changes: 3 additions & 3 deletions internal/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type db struct {
rootstore datastore.RootStore
multistore datastore.MultiStore

events *event.Bus
events event.Bus

parser core.Parser

Expand Down Expand Up @@ -109,7 +109,7 @@ func newDB(
lensRegistry: lens,
parser: parser,
options: options,
events: event.NewBus(commandBufferSize, eventBufferSize),
events: event.NewBufferedBus(commandBufferSize, eventBufferSize),
}

// apply options
Expand Down Expand Up @@ -251,7 +251,7 @@ func (db *db) initialize(ctx context.Context) error {
}

// Events returns the events Channel.
func (db *db) Events() *event.Bus {
func (db *db) Events() event.Bus {
return db.events
}

Expand Down
Loading

0 comments on commit 573090e

Please sign in to comment.