Skip to content

Commit

Permalink
add event.Name type
Browse files Browse the repository at this point in the history
  • Loading branch information
nasdf committed Jun 12, 2024
1 parent ab56124 commit d6a1be1
Show file tree
Hide file tree
Showing 13 changed files with 44 additions and 37 deletions.
15 changes: 9 additions & 6 deletions event/buffered_bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,17 @@ type publishCommand Message

type closeCommand struct{}

// bufferedBus must implement the Bus interface
var _ (Bus) = (*bufferedBus)(nil)

// bufferedBus is a bus that uses a buffered channel to manage subscribers and publish messages.
type bufferedBus struct {
// subID is incremented for each subscriber and used to set subscriber ids.
subID atomic.Uint64
// subs is a mapping of subscriber ids to subscriptions.
subs map[uint64]*Subscription
// events is a mapping of event names to subscriber ids.
events map[string]map[uint64]struct{}
events map[Name]map[uint64]struct{}
// commandChannel manages all commands sent to this simpleChannel.
//
// It is important that all stuff gets sent through this single channel to ensure
Expand All @@ -47,10 +50,10 @@ type bufferedBus struct {
// eventBufferSize.
//
// Should the buffers be filled, subsequent calls on this bus will block.
func NewBufferedBus(commandBufferSize int, eventBufferSize int) Bus {
func NewBufferedBus(commandBufferSize int, eventBufferSize int) *bufferedBus {
bus := bufferedBus{
subs: make(map[uint64]*Subscription),
events: make(map[string]map[uint64]struct{}),
events: make(map[Name]map[uint64]struct{}),
commandChannel: make(chan any, commandBufferSize),
hasClosedChan: make(chan struct{}),
eventBufferSize: eventBufferSize,
Expand All @@ -66,7 +69,7 @@ func (b *bufferedBus) Publish(msg Message) {
b.commandChannel <- publishCommand(msg)
}

func (b *bufferedBus) Subscribe(events ...string) (*Subscription, error) {
func (b *bufferedBus) Subscribe(events ...Name) (*Subscription, error) {
if b.isClosed.Load() {
return nil, ErrSubscribedToClosedChan

Check warning on line 74 in event/buffered_bus.go

View check run for this annotation

Codecov / codecov/patch

event/buffered_bus.go#L74

Added line #L74 was not covered by tests
}
Expand Down Expand Up @@ -127,11 +130,11 @@ func (b *bufferedBus) handleChannel() {
close(t.value)

case publishCommand:
for id := range b.events[WildCardEventName] {
for id := range b.events[WildCardName] {
b.subs[id].value <- Message(t)

Check warning on line 134 in event/buffered_bus.go

View check run for this annotation

Codecov / codecov/patch

event/buffered_bus.go#L134

Added line #L134 was not covered by tests
}
for id := range b.events[t.Name] {
if _, ok := b.events[WildCardEventName][id]; ok {
if _, ok := b.events[WildCardName][id]; ok {
continue

Check warning on line 138 in event/buffered_bus.go

View check run for this annotation

Codecov / codecov/patch

event/buffered_bus.go#L138

Added line #L138 was not covered by tests
}
b.subs[id].value <- Message(t)
Expand Down
11 changes: 6 additions & 5 deletions event/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type Bus interface {
// be read from, or an error should one occur (e.g. if this object is closed).
//
// This function is non-blocking unless the subscription-buffer is full.
Subscribe(events ...string) (*Subscription, error)
Subscribe(events ...Name) (*Subscription, error)

// Unsubscribe unsubscribes from the Channel, closing the provided channel.
//
Expand All @@ -33,21 +33,22 @@ type Bus interface {
// Message contains event info.
type Message struct {
// Name is the name of the event this message was generated from.
Name string
Name Name

// Data contains optional event information.
Data any
}

// NewMessage returns a new message with the given name and optional data.
func NewMessage(name string, data any) Message {
func NewMessage(name Name, data any) Message {
return Message{name, data}
}

// Subscription is a read-only event stream.
type Subscription struct {
id uint64
value chan Message
events []string
events []Name
}

// Message returns the next event value from the subscription.
Expand All @@ -56,6 +57,6 @@ func (s *Subscription) Message() <-chan Message {
}

// Events returns the names of all subscribed events.
func (s *Subscription) Events() []string {
func (s *Subscription) Events() []Name {
return s.events

Check warning on line 61 in event/bus.go

View check run for this annotation

Codecov / codecov/patch

event/bus.go#L60-L61

Added lines #L60 - L61 were not covered by tests
}
27 changes: 15 additions & 12 deletions event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,22 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
)

// Name identifies an event
type Name string

const (
// WildCardEventName is the alias used to subscribe to all events.
WildCardEventName = "*"
// MergeEventName is the name of the net merge request event.
MergeEventName = "merge"
// MergeCompleteEventName is the name of the database merge complete event.
MergeCompleteEventName = "merge-complete"
// UpdateEventName is the name of the database update event.
UpdateEventName = "update"
// PubSubEventName is the name of the network pubsub event.
PubSubEventName = "pubsub"
// PeerEventName is the name of the network connect event.
PeerEventName = "peer"
// WildCardName is the alias used to subscribe to all events.
WildCardName = Name("*")
// MergeName is the name of the net merge request event.
MergeName = Name("merge")
// MergeCompleteName is the name of the database merge complete event.
MergeCompleteName = Name("merge-complete")
// UpdateName is the name of the database update event.
UpdateName = Name("update")
// PubSubName is the name of the network pubsub event.
PubSubName = Name("pubsub")
// PeerName is the name of the network connect event.
PeerName = Name("peer")
)

// Peer is an event that is published when
Expand Down
2 changes: 1 addition & 1 deletion internal/db/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,7 @@ func (c *collection) save(
IsCreate: isCreate,
}
txn.OnSuccess(func() {
c.db.events.Publish(event.NewMessage(event.UpdateEventName, updateEvent))
c.db.events.Publish(event.NewMessage(event.UpdateName, updateEvent))
})

txn.OnSuccess(func() {
Expand Down
2 changes: 1 addition & 1 deletion internal/db/collection_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (c *collection) applyDelete(
Block: b,
}
txn.OnSuccess(func() {
c.db.events.Publish(event.NewMessage(event.UpdateEventName, updateEvent))
c.db.events.Publish(event.NewMessage(event.UpdateName, updateEvent))
})

return nil
Expand Down
2 changes: 1 addition & 1 deletion internal/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func newDB(
return nil, err
}

sub, err := db.events.Subscribe(event.MergeEventName)
sub, err := db.events.Subscribe(event.MergeName)
if err != nil {
return nil, err

Check warning on line 131 in internal/db/db.go

View check run for this annotation

Codecov / codecov/patch

internal/db/db.go#L131

Added line #L131 was not covered by tests
}
Expand Down
2 changes: 1 addition & 1 deletion internal/db/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (db *db) handleMerges(ctx context.Context, sub *event.Subscription) {

func (db *db) executeMerge(ctx context.Context, dagMerge event.Merge) error {
// send a complete event so we can track merges in the integration tests
defer db.events.Publish(event.NewMessage(event.MergeCompleteEventName, dagMerge))
defer db.events.Publish(event.NewMessage(event.MergeCompleteName, dagMerge))

ctx, txn, err := ensureContextTxn(ctx, db, false)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/db/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (db *db) handleSubscription(ctx context.Context, r *request.Request) (<-cha
return nil, client.NewErrUnexpectedType[request.ObjectSubscription]("SubscriptionSelection", selections)
}
// subscribe to the subscription event bus so we don't block the system bus
sub, err := db.events.Subscribe(event.UpdateEventName)
sub, err := db.events.Subscribe(event.UpdateName)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion net/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ func NewNode(
// publish subscribed events to the event bus
go func() {
for val := range sub.Out() {
db.Events().Publish(event1.NewMessage(event1.PeerEventName, val))
db.Events().Publish(event1.NewMessage(event1.PeerName, val))
}
}()

Expand Down
2 changes: 1 addition & 1 deletion net/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func (p *Peer) Start() error {
}

if p.ps != nil {
sub, err := p.db.Events().Subscribe(event.UpdateEventName)
sub, err := p.db.Events().Subscribe(event.UpdateName)
if err != nil {
return err
}
Expand Down
6 changes: 3 additions & 3 deletions net/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (s *server) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.PushL
}
if exists {
// the integration tests expect every push log to emit a merge complete event
s.peer.db.Events().Publish(event.NewMessage(event.MergeCompleteEventName, evt))
s.peer.db.Events().Publish(event.NewMessage(event.MergeCompleteName, evt))
return &pb.PushLogReply{}, nil
}

Expand All @@ -226,7 +226,7 @@ func (s *server) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.PushL
corelog.Any("CID", cid),
)
}
s.peer.db.Events().Publish(event.NewMessage(event.MergeEventName, evt))
s.peer.db.Events().Publish(event.NewMessage(event.MergeName, evt))

// Once processed, subscribe to the DocID topic on the pubsub network unless we already
// suscribe to the collection.
Expand Down Expand Up @@ -373,7 +373,7 @@ func (s *server) pubSubEventHandler(from libpeer.ID, topic string, msg []byte) {
corelog.String("Topic", topic),
corelog.String("Message", string(msg)),
)
evt := event.NewMessage(event.PubSubEventName, event.PubSub{
evt := event.NewMessage(event.PubSubName, event.PubSub{
Peer: from,
})
s.peer.db.Events().Publish(evt)
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/events/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func ExecuteRequestTestCase(
testRoutineClosedChan := make(chan struct{})
closeTestRoutineChan := make(chan struct{})

eventsSub, err := db.Events().Subscribe(event.UpdateEventName)
eventsSub, err := db.Events().Subscribe(event.UpdateName)
require.NoError(t, err)

indexOfNextExpectedUpdate := 0
Expand Down
6 changes: 3 additions & 3 deletions tests/integration/utils2.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,7 @@ func setStartingNodes(
s.dbPaths = append(s.dbPaths, path)

// subscribe to merge complete events
sub, err := c.Events().Subscribe(event.MergeCompleteEventName)
sub, err := c.Events().Subscribe(event.MergeCompleteName)
require.NoError(s.t, err)
s.eventSubs = append(s.eventSubs, sub)
}
Expand Down Expand Up @@ -715,7 +715,7 @@ func restartNodes(
s.nodes[i] = c

// subscribe to merge complete events
sub, err := c.Events().Subscribe(event.MergeCompleteEventName)
sub, err := c.Events().Subscribe(event.MergeCompleteName)
require.NoError(s.t, err)
s.eventSubs[i] = sub
}
Expand Down Expand Up @@ -821,7 +821,7 @@ func configureNode(
s.dbPaths = append(s.dbPaths, path)

// subscribe to merge complete events
sub, err := c.Events().Subscribe(event.MergeCompleteEventName)
sub, err := c.Events().Subscribe(event.MergeCompleteName)
require.NoError(s.t, err)
s.eventSubs = append(s.eventSubs, sub)
}
Expand Down

0 comments on commit d6a1be1

Please sign in to comment.