Skip to content

Commit

Permalink
refactor(manager): improved NewManager implementation
Browse files Browse the repository at this point in the history
Replaces the (optional) Manager implementation, adding the ability to check
errors (from added tickers) with errors.Is, and provides ErrManagerStopped,
for identifying and handling failed attempts to add to a manager, that is
stopping or stopped.

Otherwise, the new behavior resembles the old, as closely as possible.

The original motivator for this change was fixing an unhandled edge case,
where a race between add and stop could lead to a deadlock. The new
implementation is both more polished, and significantly more idiomatic.
  • Loading branch information
joeycumines committed May 7, 2021
1 parent 5cc319f commit 0eb1579
Show file tree
Hide file tree
Showing 5 changed files with 354 additions and 74 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ go 1.13

require (
github.com/go-test/deep v1.0.7
github.com/joeycumines/go-bigbuff v1.14.0
github.com/xlab/treeprint v1.1.0
)
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-test/deep v1.0.4/go.mod h1:wGDj63lr65AM2AQyKZd/NYHGb0R+1RLqB8NKt3aSFNA=
github.com/go-test/deep v1.0.7 h1:/VSMRlnY/JSyqxQUzQLKVMAskpY/NZKFA5j2P+0pP2M=
github.com/go-test/deep v1.0.7/go.mod h1:QV8Hv/iy04NyLBxAdO9njL0iVPN1S4d/A3NVv1V36o8=
github.com/joeycumines/go-bigbuff v1.14.0 h1:nHql/X/YMUrV7sMQ9w0+H9T8vj0YL15lT/yOr7U0HpE=
github.com/joeycumines/go-bigbuff v1.14.0/go.mod h1:7hqtGnMDT3v+yOvHUb+hx+JSxhlTF2W9BGIkvNQizaA=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
169 changes: 111 additions & 58 deletions manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package behaviortree

import (
"errors"
"strings"
"github.com/joeycumines/go-bigbuff"
"sync"
)

Expand All @@ -33,12 +33,29 @@ type (

// manager is this package's implementation of the Manager interface
manager struct {
mutex sync.Mutex
tickers []Ticker
errs []string
stopped bool
mu sync.RWMutex
once sync.Once
worker bigbuff.Worker
done chan struct{}
stop chan struct{}
tickers chan managerTicker
errs []error
}

managerTicker struct {
Ticker Ticker
Done func()
}

errManagerTicker []error

errManagerStopped struct{ error }
)

var (
// ErrManagerStopped is returned by the manager implementation in this package (see also NewManager) in the case
// that Manager.Add is attempted after the manager has already started to stop. Use errors.Is to check this case.
ErrManagerStopped error = errManagerStopped{error: errors.New(`behaviortree.Manager.Add already stopped`)}
)

// NewManager will construct an implementation of the Manager interface, which is a stateful set of Ticker
Expand All @@ -47,9 +64,15 @@ type (
//
// Note that any error (of any registered tickers) will also trigger stopping, and stopping will prevent further
// Add calls from succeeding.
//
// As of v1.8.0, any (combined) ticker error returned by the Manager can now support error chaining (i.e. the use of
// errors.Is). Note that errors.Unwrap isn't supported, since there may be more than one. See also Manager.Err and
// Manager.Add.
func NewManager() Manager {
result := &manager{
done: make(chan struct{}),
done: make(chan struct{}),
stop: make(chan struct{}),
tickers: make(chan managerTicker),
}
return result
}
Expand All @@ -59,83 +82,113 @@ func (m *manager) Done() <-chan struct{} {
}

func (m *manager) Err() error {
m.mutex.Lock()
defer m.mutex.Unlock()
return m.err()
m.mu.RLock()
defer m.mu.RUnlock()
if len(m.errs) != 0 {
return errManagerTicker(m.errs)
}
return nil
}

func (m *manager) Stop() {
m.mutex.Lock()
defer m.mutex.Unlock()
m.stopOnce()
m.once.Do(func() {
close(m.stop)
m.start()()
})
}

func (m *manager) Add(ticker Ticker) error {
if ticker == nil {
return errors.New("behaviortree.Manager.Add nil ticker")
}
m.mutex.Lock()
defer m.mutex.Unlock()
m.check()
if m.stopped {
if err := m.err(); err != nil {
return err
done := m.start()
select {
case <-m.stop:
default:
select {
case <-m.stop:
case m.tickers <- managerTicker{
Ticker: ticker,
Done: done,
}:
return nil
}
return errors.New("behaviortree.Manager.Add already stopped")
}
m.tickers = append(m.tickers, ticker)
go func() {
<-ticker.Done()
m.mutex.Lock()
defer m.mutex.Unlock()
m.check()
}()
return nil
done()
if err := m.Err(); err != nil {
return errManagerStopped{error: err}
}
return ErrManagerStopped
}

func (m *manager) err() error {
if len(m.errs) != 0 {
return errors.New(strings.Join(m.errs, " | "))
func (m *manager) start() (done func()) { return m.worker.Do(m.run) }

func (m *manager) run(stop <-chan struct{}) {
for {
select {
case <-stop:
select {
case <-m.stop:
select {
case <-m.done:
default:
close(m.done)
}
default:
}
return
case t := <-m.tickers:
go m.handle(t)
}
}
return nil
}

func (m *manager) stopOnce() {
if !m.stopped {
m.stopped = true
go m.cleanup()
func (m *manager) handle(t managerTicker) {
select {
case <-t.Ticker.Done():
// note: this stop shouldn't be necessary, but has been retained for
// consistency, with the previous implementation)
t.Ticker.Stop()
case <-m.stop:
t.Ticker.Stop()
<-t.Ticker.Done()
}
if err := t.Ticker.Err(); err != nil {
m.mu.Lock()
m.errs = append(m.errs, err)
m.mu.Unlock()
m.Stop()
}
t.Done()
}

func (m *manager) finish(i int) {
m.tickers[i].Stop()
<-m.tickers[i].Done()
if err := m.tickers[i].Err(); err != nil {
m.errs = append(m.errs, err.Error())
m.stopOnce()
func (e errManagerTicker) Error() string {
var b []byte
for i, err := range e {
if i != 0 {
b = append(b, ' ', '|', ' ')
}
b = append(b, err.Error()...)
}
m.tickers[i] = m.tickers[len(m.tickers)-1]
m.tickers[len(m.tickers)-1] = nil
m.tickers = m.tickers[:len(m.tickers)-1]
return string(b)
}

func (m *manager) check() {
for i := 0; i < len(m.tickers); i++ {
select {
case <-m.tickers[i].Done():
m.finish(i)
i--
default:
func (e errManagerTicker) Is(target error) bool {
for _, err := range e {
if errors.Is(err, target) {
return true
}
}
return false
}

func (m *manager) cleanup() {
m.mutex.Lock()
for i := 0; i < len(m.tickers); i++ {
m.finish(i)
i--
func (e errManagerStopped) Unwrap() error { return e.error }

func (e errManagerStopped) Is(target error) bool {
switch target.(type) {
case errManagerStopped:
return true
default:
return false
}
close(m.done)
m.mutex.Unlock()
}
Loading

0 comments on commit 0eb1579

Please sign in to comment.