Skip to content

Commit

Permalink
Unclutter Module struct and refactor events
Browse files Browse the repository at this point in the history
  • Loading branch information
ppacher committed Sep 21, 2020
1 parent 19f75bb commit cab8681
Show file tree
Hide file tree
Showing 9 changed files with 257 additions and 192 deletions.
221 changes: 132 additions & 89 deletions modules/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,43 +4,80 @@ import (
"context"
"errors"
"fmt"
"sync"

"github.com/safing/portbase/log"
)

type eventHookFn func(context.Context, interface{}) error
type (
// EventObserverFunc can be registered for one or more event types
// and will be called with the event payload.
// Any error returned from the observer function will be logged.
EventObserverFunc func(context.Context, interface{}) error

type eventHook struct {
description string
hookingModule *Module
hookFn eventHookFn
}
// eventHooks keeps track of registered event subscriptions.
eventHooks struct {
sync.RWMutex
subscriptions map[string][]*subscription
}

// TriggerEvent executes all hook functions registered to the specified event.
func (m *Module) TriggerEvent(event string, data interface{}) {
if m.OnlineSoon() {
go m.processEventTrigger(event, data)
// subscription defines the subscription to an observable.
// Any time the observable emits an event the subscriptions
// callback is called.
subscription struct {
// description is a human readable description of the
// subscription purpose. This is mainly used for logging
// purposes.
description string
// subscriber is a reference to the module that placed
// the subscription.
subscriber *Module
// target holds a reference to the module that is
// observed by this subscription
target *Module
// callback is the function to execute when the observed
// event occurs.
callback EventObserverFunc
}
}
)

func (m *Module) processEventTrigger(event string, data interface{}) {
m.eventHooksLock.RLock()
defer m.eventHooksLock.RUnlock()
// RegisterEvent registers a new event to allow for registering hooks.
func (m *Module) RegisterEvent(event string) {
m.events.defineEvent(event)
}

hooks, ok := m.eventHooks[event]
if !ok {
log.Warningf(`%s: tried to trigger non-existent event "%s"`, m.Name, event)
return
// RegisterEventHook registers a hook function with (another) modules'
// event. Whenever a hook is triggered and the receiving module has not
// yet fully started, hook execution will be delayed until the modules
// completed starting.
func (m *Module) RegisterEventHook(module, event, description string, fn EventObserverFunc) error {
targetModule := m
if module != m.Name {
var ok bool
// TODO(ppacher): accessing modules[module] here without any
// kind of protection seems wrong.... Check with
// @dhaavi.
targetModule, ok = modules[module]
if !ok {
return fmt.Errorf(`module "%s" does not exist`, module)
}
}

for _, hook := range hooks {
if hook.hookingModule.OnlineSoon() {
go m.runEventHook(hook, event, data)
}
return targetModule.events.addSubscription(targetModule, m, event, description, fn)
}

// TriggerEvent executes all hook functions registered to the
// specified event.
func (m *Module) TriggerEvent(event string, data interface{}) {
if m.OnlineSoon() {
go m.processEventTrigger(event, event, data)
}
}

// InjectEvent triggers an event from a foreign module and executes all hook functions registered to that event.
// InjectEvent triggers an event from a foreign module and executes
// all hook functions registered to that event.
// Note that sourceEventName is only used for logging purposes while
// targetModuleName and targetEventName must actually exist.
func (m *Module) InjectEvent(sourceEventName, targetModuleName, targetEventName string, data interface{}) error {
if !m.OnlineSoon() {
return errors.New("module not yet started")
Expand All @@ -55,99 +92,105 @@ func (m *Module) InjectEvent(sourceEventName, targetModuleName, targetEventName
return fmt.Errorf(`module "%s" does not exist`, targetModuleName)
}

targetModule.eventHooksLock.RLock()
defer targetModule.eventHooksLock.RUnlock()
targetModule.processEventTrigger(targetEventName, sourceEventName, data)

targetHooks, ok := targetModule.eventHooks[targetEventName]
return nil
}

func (m *Module) processEventTrigger(eventID, eventName string, data interface{}) {
m.events.RLock()
defer m.events.RUnlock()

hooks, ok := m.events.subscriptions[eventID]
if !ok {
return fmt.Errorf(`module "%s" has no event named "%s"`, targetModuleName, targetEventName)
log.Warningf(`%s: tried to trigger non-existent event "%s"`, m.Name, eventID)
return
}

for _, hook := range targetHooks {
if hook.hookingModule.OnlineSoon() {
go m.runEventHook(hook, sourceEventName, data)
for _, hook := range hooks {
if hook.subscriber.OnlineSoon() {
go hook.runEventHook(eventName, data)
}
}
}

return nil
func (hook *subscription) Name(event string) string {
return fmt.Sprintf("event hook %s/%s -> %s/%s", hook.target.Name, event, hook.subscriber.Name, hook.description)
}

func (m *Module) runEventHook(hook *eventHook, event string, data interface{}) {
// check if source module is ready for handling
if m.Status() != StatusOnline {
// target module has not yet fully started, wait until start is complete
select {
case <-m.StartCompleted():
// continue with hook execution
case <-hook.hookingModule.Stopping():
return
case <-m.Stopping():
return
}
func waitForModule(ctx context.Context, m *Module) bool {
select {
case <-ctx.Done():
return false
case <-m.StartCompleted():
return true
}
}

func (hook *subscription) runEventHook(event string, data interface{}) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// check if destionation module is ready for handling
if hook.hookingModule.Status() != StatusOnline {
// target module has not yet fully started, wait until start is complete
go func() {
select {
case <-hook.hookingModule.StartCompleted():
// continue with hook execution
case <-hook.hookingModule.Stopping():
return
case <-m.Stopping():
return
case <-hook.subscriber.Stopping():
cancel()
case <-hook.target.Stopping():
cancel()
case <-ctx.Done():
}
}()

// wait for both modules to become online (or shutdown)
if !waitForModule(ctx, hook.target) || !waitForModule(ctx, hook.subscriber) {
return
}

err := hook.hookingModule.RunWorker(
fmt.Sprintf("event hook %s/%s -> %s/%s", m.Name, event, hook.hookingModule.Name, hook.description),
err := hook.subscriber.RunWorker(
hook.Name(event),
func(ctx context.Context) error {
return hook.hookFn(ctx, data)
return hook.callback(ctx, data)
},
)
if err != nil {
log.Warningf("%s: failed to execute event hook %s/%s -> %s/%s: %s", hook.hookingModule.Name, m.Name, event, hook.hookingModule.Name, hook.description, err)
log.Warningf("%s: failed to execute %s: %s", hook.target.Name, hook.Name(event), err)
}
}

// RegisterEvent registers a new event to allow for registering hooks.
func (m *Module) RegisterEvent(event string) {
m.eventHooksLock.Lock()
defer m.eventHooksLock.Unlock()
func (hooks *eventHooks) addSubscription(target, subscriber *Module, event, descr string, fn EventObserverFunc) error {
hooks.Lock()
defer hooks.Unlock()

_, ok := m.eventHooks[event]
if !ok {
m.eventHooks[event] = make([]*eventHook, 0, 1)
if hooks.subscriptions == nil {
return fmt.Errorf("unknown event %q", event)
}
}

// RegisterEventHook registers a hook function with (another) modules' event. Whenever a hook is triggered and the receiving module has not yet fully started, hook execution will be delayed until the modules completed starting.
func (m *Module) RegisterEventHook(module string, event string, description string, fn func(context.Context, interface{}) error) error {
// get target module
var eventModule *Module
if module == m.Name {
eventModule = m
} else {
var ok bool
eventModule, ok = modules[module]
if !ok {
return fmt.Errorf(`module "%s" does not exist`, module)
}
if _, ok := hooks.subscriptions[event]; !ok {
return fmt.Errorf("unknown event %q", event)
}

// get target event
eventModule.eventHooksLock.Lock()
defer eventModule.eventHooksLock.Unlock()
hooks, ok := eventModule.eventHooks[event]
if !ok {
return fmt.Errorf(`event "%s/%s" does not exist`, eventModule.Name, event)
}
hooks.subscriptions[event] = append(
hooks.subscriptions[event],
&subscription{
description: descr,
subscriber: subscriber,
target: target,
callback: fn,
},
)

// add hook
eventModule.eventHooks[event] = append(hooks, &eventHook{
description: description,
hookingModule: m,
hookFn: fn,
})
return nil
}

func (hooks *eventHooks) defineEvent(event string) {
hooks.Lock()
defer hooks.Unlock()

if hooks.subscriptions == nil {
hooks.subscriptions = make(map[string][]*subscription)
}

if _, ok := hooks.subscriptions[event]; !ok {
hooks.subscriptions[event] = make([]*subscription, 0, 1)
}
}
2 changes: 1 addition & 1 deletion modules/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func parseFlags() error {
func printGraph() {
// mark roots
for _, module := range modules {
if len(module.depReverse) == 0 {
if len(module.dependencies.reverse) == 0 {
// is root, dont print deps in dep tree
module.stopFlag.Set()
}
Expand Down
2 changes: 1 addition & 1 deletion modules/mgmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func buildEnabledTree() {
}

func (m *Module) markDependencies() {
for _, dep := range m.depModules {
for _, dep := range m.dependencies.modules {
if dep.enabledAsDependency.SetToIf(false, true) {
dep.markDependencies()
}
Expand Down
4 changes: 2 additions & 2 deletions modules/microtasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (m *Module) RunLowPriorityMicroTask(name *string, fn func(context.Context)
func (m *Module) runMicroTask(name *string, fn func(context.Context) error) (err error) {
// start for module
// hint: only microTasks global var is important for scheduling, others can be set here
atomic.AddInt32(m.microTaskCnt, 1)
atomic.AddInt32(m.stats.microTaskCnt, 1)
m.waitGroup.Add(1)

// set up recovery
Expand All @@ -144,7 +144,7 @@ func (m *Module) runMicroTask(name *string, fn func(context.Context) error) (err
}

// finish for module
atomic.AddInt32(m.microTaskCnt, -1)
atomic.AddInt32(m.stats.microTaskCnt, -1)
m.waitGroup.Done()

// finish and possibly trigger next task
Expand Down
Loading

0 comments on commit cab8681

Please sign in to comment.