Skip to content

Commit

Permalink
[service] Run event callbacks asynchronously
Browse files Browse the repository at this point in the history
  • Loading branch information
dhaavi committed Aug 28, 2024
1 parent de4cb5b commit 4b2e4f2
Showing 1 changed file with 34 additions and 20 deletions.
54 changes: 34 additions & 20 deletions service/mgr/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
package mgr

import (
"fmt"
"slices"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -107,36 +106,51 @@ func (em *EventMgr[T]) Submit(event T) {

// Run callbacks.
for _, ec := range em.callbacks {
// Check if callback was canceled.
if ec.canceled.Load() {
anyCanceled = true
continue
}

// Execute callback.
var (
cancel bool
err error
)
if em.mgr != nil {
// Prefer executing in worker.
wkrErr := em.mgr.Do("execute event callback", func(w *WorkerCtx) error {
cancel, err = ec.callback(w, event) //nolint:scopelint // Execution is within scope.
name := "event " + em.name + " callback " + ec.name
em.mgr.Go(name, func(w *WorkerCtx) error {
cancel, err = ec.callback(w, event)
// Handle error and cancelation.
if err != nil {
w.Warn(
"event callback failed",
"event", em.name,
"callback", ec.name,
"err", err,
)
}
if cancel {
ec.canceled.Store(true)
}
return nil
})
if wkrErr != nil {
err = fmt.Errorf("callback execution failed: %w", wkrErr)
}
} else {
cancel, err = ec.callback(nil, event)
}

// Handle error and cancelation.
if err != nil && em.mgr != nil {
em.mgr.Warn(
"event callback failed",
"event", em.name,
"callback", ec.name,
"err", err,
)
}
if cancel {
ec.canceled.Store(true)
anyCanceled = true
// Handle error and cancelation.
if err != nil && em.mgr != nil {
em.mgr.Warn(
"event callback failed",
"event", em.name,
"callback", ec.name,
"err", err,
)
}
if cancel {
ec.canceled.Store(true)
anyCanceled = true
}
}
}

Expand Down

0 comments on commit 4b2e4f2

Please sign in to comment.