Skip to content

Commit

Permalink
Introduce event handler levels
Browse files Browse the repository at this point in the history
To make sure events are handled properly and reliably, the EEBUS core implementation should always be the first level of interacting with incoming SPINE messages, a middleware implementation second and the application last.

The EventHandler now provides these 3 levels and Subscribe and Unsubscribe expect the level to be provided. The Core level is only usable with an non public method for safeguard
  • Loading branch information
DerAndereAndi committed Oct 28, 2023
1 parent 5460066 commit 7529a6d
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 11 deletions.
4 changes: 2 additions & 2 deletions spine/device_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (r *DeviceLocalImpl) AddRemoteDevice(ski string, writeI SpineDataConnection
// If the request returned an error, it should be retried until it does not

// always add subscription, as it checks if it already exists
Events.Subscribe(r)
_ = Events.subscribe(EventHandlerLevelCore, r)

return rDevice
}
Expand Down Expand Up @@ -149,7 +149,7 @@ func (r *DeviceLocalImpl) RemoveRemoteDevice(ski string) {

// only unsubscribe if we don't have any remote devices left
if len(r.remoteDevices) == 0 {
Events.Unsubscribe(r)
_ = Events.unsubscribe(EventHandlerLevelCore, r)
}
}

Expand Down
77 changes: 68 additions & 9 deletions spine/events.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
package spine

import (
"errors"
"sync"

"github.com/enbility/eebus-go/spine/model"
)

var Events events

type EventHandlerLevel uint

const (
EventHandlerLevelCore EventHandlerLevel = iota // Shall only be used by the core stack
EventHandlerLevelMiddleware // Shall only be used by middleware implementations, e.g. CEMd
EventHandlerLevelApplication // Shall only be used by applications
)

type ElementChangeType uint16

const (
Expand Down Expand Up @@ -41,45 +50,95 @@ type EventHandler interface {
HandleEvent(EventPayload)
}

type eventHandlerItem struct {
Level EventHandlerLevel
Handler EventHandler
}

type events struct {
mu sync.Mutex
handlers []EventHandler
handlers []eventHandlerItem // event handling outside of the core stack
}

func (r *events) Subscribe(handler EventHandler) {
// will be used in EEBUS core directly to access the level EventHandlerLevelCore
func (r *events) subscribe(level EventHandlerLevel, handler EventHandler) error {
r.mu.Lock()
defer r.mu.Unlock()

exists := false
for _, item := range r.handlers {
if item == handler {
if item.Level == level && item.Handler == handler {
exists = true
break
}
}

if !exists {
r.handlers = append(r.handlers, handler)
newHandlerItem := eventHandlerItem{
Level: level,
Handler: handler,
}
r.handlers = append(r.handlers, newHandlerItem)
}

return nil
}

// Subscribe to message events and handle them in
// the Eventhandler interface implementation
func (r *events) Subscribe(level EventHandlerLevel, handler EventHandler) error {
if level == EventHandlerLevelCore {
return errors.New("This level is restricted to the EEBUS core implenentation!")
}

return r.subscribe(level, handler)
}

func (r *events) Unsubscribe(handler EventHandler) {
// will be used in EEBUS core directly to access the level EventHandlerLevelCore
func (r *events) unsubscribe(level EventHandlerLevel, handler EventHandler) error {
r.mu.Lock()
defer r.mu.Unlock()

var newHandlers []EventHandler
var newHandlers []eventHandlerItem
for _, item := range r.handlers {
if item != handler {
if item.Level != level && item.Handler != handler {
newHandlers = append(newHandlers, item)
}
}

r.handlers = newHandlers

return nil
}

// Unsubscribe from getting events
func (r *events) Unsubscribe(level EventHandlerLevel, handler EventHandler) error {
if level == EventHandlerLevelCore {
return errors.New("This level is restricted to the EEBUS core implenentation!")
}

return r.unsubscribe(level, handler)
}

// Publish an event to all subscribers
func (r *events) Publish(payload EventPayload) {
r.mu.Lock()
defer r.mu.Unlock()
for _, handler := range r.handlers {
go handler.HandleEvent(payload)

// process subscribers by level
handlerLevels := []EventHandlerLevel{
EventHandlerLevelCore,
EventHandlerLevelMiddleware,
EventHandlerLevelApplication,
}

for _, level := range handlerLevels {
for _, item := range r.handlers {
if item.Level != level {
continue
}

go item.Handler.HandleEvent(payload)
}
}
}

0 comments on commit 7529a6d

Please sign in to comment.