Skip to content

Commit

Permalink
Fix simulator leak [#3631]
Browse files Browse the repository at this point in the history
  • Loading branch information
firelizzard18 committed Oct 14, 2024
1 parent be43777 commit 4ed201e
Show file tree
Hide file tree
Showing 12 changed files with 80 additions and 19 deletions.
2 changes: 2 additions & 0 deletions exp/tendermint/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ func NewDispatcher(router routing.Router, self map[string]DispatcherClient) exec
return d
}

func (d *dispatcher) Close() { /* Nothing to do */ }

// Submit routes the account URL, constructs a multiaddr, and queues addressed
// submit requests.
func (d *dispatcher) Submit(ctx context.Context, u *url.URL, env *messaging.Envelope) error {
Expand Down
3 changes: 3 additions & 0 deletions internal/core/execute/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ type Dispatcher interface {

// Send submits the queued transactions.
Send(context.Context) <-chan error

// Close closes the dispatcher.
Close()
}

// BlockParams are the parameters for a new [Block].
Expand Down
1 change: 1 addition & 0 deletions internal/core/execute/v1/block/block_end.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ func (x *Executor) requestMissingSyntheticTransactions(blockIndex uint64, synthL
// Setup
wg := new(sync.WaitGroup)
dispatcher := x.NewDispatcher()
defer dispatcher.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand Down
2 changes: 2 additions & 0 deletions internal/core/execute/v1/block/null_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
// nullDispatcher is a [Dispatcher] that discards everything.
type nullDispatcher struct{}

func (nullDispatcher) Close() { /* Nothing to do */ }

func (nullDispatcher) Submit(context.Context, *url.URL, *messaging.Envelope) error { return nil }

func (nullDispatcher) Send(context.Context) <-chan error {
Expand Down
2 changes: 2 additions & 0 deletions internal/core/execute/v1/simulator/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ type dispatcher struct {

var _ execute.Dispatcher = (*dispatcher)(nil)

func (d *dispatcher) Close() { /* Nothing to do */ }

// Submit routes the envelope and adds it to the queue for a partition.
func (d *dispatcher) Submit(ctx context.Context, u *url.URL, env *messaging.Envelope) error {
partition, err := d.sim.router.RouteAccount(u)
Expand Down
10 changes: 4 additions & 6 deletions internal/core/execute/v2/block/block_end.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"

"gitlab.com/accumulatenetwork/accumulate/internal/api/private"
Expand Down Expand Up @@ -399,20 +398,19 @@ func (x *Executor) requestMissingSyntheticTransactions(blockIndex uint64, synthL
defer batch.Discard()

// Setup
wg := new(sync.WaitGroup)
dispatcher := x.NewDispatcher()
defer dispatcher.Close()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// For each partition
for _, partition := range synthLedger.Sequence {
x.requestMissingTransactionsFromPartition(ctx, wg, dispatcher, partition, false)
x.requestMissingTransactionsFromPartition(ctx, dispatcher, partition, false)
}
for _, partition := range anchorLedger.Sequence {
x.requestMissingTransactionsFromPartition(ctx, wg, dispatcher, partition, true)
x.requestMissingTransactionsFromPartition(ctx, dispatcher, partition, true)
}

wg.Wait()
for err := range dispatcher.Send(ctx) {
switch err := err.(type) {
case protocol.TransactionStatusError:
Expand All @@ -423,7 +421,7 @@ func (x *Executor) requestMissingSyntheticTransactions(blockIndex uint64, synthL
}
}

func (x *Executor) requestMissingTransactionsFromPartition(ctx context.Context, wg *sync.WaitGroup, dispatcher Dispatcher, partition *protocol.PartitionSyntheticLedger, anchor bool) {
func (x *Executor) requestMissingTransactionsFromPartition(ctx context.Context, dispatcher Dispatcher, partition *protocol.PartitionSyntheticLedger, anchor bool) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

Expand Down
2 changes: 2 additions & 0 deletions internal/node/daemon/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ func NewDispatcher(network string, router routing.Router, dialer message.Dialer)
return d
}

func (d *dispatcher) Close() { /* Nothing to do */ }

// Submit routes the account URL, constructs a multiaddr, and queues addressed
// submit requests.
func (d *dispatcher) Submit(ctx context.Context, u *url.URL, env *messaging.Envelope) error {
Expand Down
2 changes: 2 additions & 0 deletions test/simulator/consensus/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ func NewDispatcher(router routing.Router) *Dispatcher {
return &Dispatcher{router: router, mu: new(sync.Mutex)}
}

func (d *Dispatcher) Close() { /* Nothing to do */ }

// Submit adds an envelope to the queue.
func (d *Dispatcher) Submit(ctx context.Context, dest *url.URL, envelope *messaging.Envelope) error {
partition, err := d.router.RouteAccount(dest)
Expand Down
49 changes: 40 additions & 9 deletions test/simulator/consensus/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (
"context"
"errors"
"log/slog"
"slices"
"sync"
"sync/atomic"

"gitlab.com/accumulatenetwork/accumulate/internal/logging"
)
Expand All @@ -21,22 +23,50 @@ const debugSynchronous = false
type SimpleHub struct {
mu *sync.Mutex
context context.Context
modules []Module
modules atomic.Pointer[[]Module]
}

func NewSimpleHub(ctx context.Context) *SimpleHub {
return &SimpleHub{mu: new(sync.Mutex), context: logging.With(ctx, "module", "consensus")}
s := &SimpleHub{mu: new(sync.Mutex), context: logging.With(ctx, "module", "consensus")}
s.modules.Store(&[]Module{})
return s
}

func (s *SimpleHub) Register(module Module) {
s.modules = append(s.modules, module)
for {
m := s.modules.Load()
n := append(*m, module)
if s.modules.CompareAndSwap(m, &n) {
break
}
}
}

func (s *SimpleHub) Unregister(module Module) {
for {
m := s.modules.Load()
n := make([]Module, len(*m))
copy(n, *m)
n = slices.DeleteFunc(n, func(m Module) bool { return m == module })
if s.modules.CompareAndSwap(m, &n) {
break
}
}
}

func (s *SimpleHub) With(modules ...Module) Hub {
m := *s.modules.Load()
n := make([]Module, len(m)+len(modules))
copy(n, m)
copy(n[len(m):], modules)

// Use the same mutex
r := *s
r.modules = append(modules, s.modules...)
return &r
r := &SimpleHub{
mu: s.mu,
context: s.context,
}
r.modules.Store(&n)
return r
}

func (s *SimpleHub) Send(messages ...Message) error {
Expand All @@ -46,7 +76,8 @@ func (s *SimpleHub) Send(messages ...Message) error {
// Record the results of each module separately, then concatenate them. This
// way, the result has the same order as if the modules were called
// synchronously.
results := make([][]Message, len(s.modules))
modules := *s.modules.Load()
results := make([][]Message, len(modules))

// Use a mutex to avoid races when recording an error
var errs []error
Expand All @@ -55,7 +86,7 @@ func (s *SimpleHub) Send(messages ...Message) error {
wg := new(sync.WaitGroup)
receive := func(i int) {
defer wg.Done()
m, err := s.modules[i].Receive(messages...)
m, err := modules[i].Receive(messages...)
results[i] = m

if err == nil {
Expand All @@ -74,7 +105,7 @@ func (s *SimpleHub) Send(messages ...Message) error {
}

// Send the messages to each module
for i := range s.modules {
for i := range modules {
wg.Add(1)
if debugSynchronous {
receive(i)
Expand Down
5 changes: 3 additions & 2 deletions test/simulator/consensus/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ type Module interface {

// A Hub distributes messages to modules.
type Hub interface {
Register(module Module)
Register(Module)
Unregister(Module)
Send(...Message) error
With(modules ...Module) Hub
With(...Module) Hub
}

type Recorder interface {
Expand Down
12 changes: 12 additions & 0 deletions test/simulator/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
// fakeDispatcher drops everything
type fakeDispatcher struct{}

func (fakeDispatcher) Close() { /* Nothing to do */ }

func (fakeDispatcher) Submit(ctx context.Context, dest *url.URL, envelope *messaging.Envelope) error {
// Drop it
return nil
Expand All @@ -29,6 +31,16 @@ func (fakeDispatcher) Send(context.Context) <-chan error {
return ch
}

type closeDispatcher struct {
execute.Dispatcher
close func()
}

func (d *closeDispatcher) Close() {
d.close()
d.Dispatcher.Close()
}

type DispatchInterceptor = func(ctx context.Context, env *messaging.Envelope) (send bool, err error)

type interceptDispatcher struct {
Expand Down
9 changes: 7 additions & 2 deletions test/simulator/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,12 +326,17 @@ func (f *simFactory) getDispatcherFunc() func() execute.Dispatcher {
d := consensus.NewDispatcher(router)
hub.Register(d)

e := &closeDispatcher{
Dispatcher: d,
close: func() { hub.Unregister(d) },
}

if interceptor == nil {
return d
return e
}

return &interceptDispatcher{
Dispatcher: d,
Dispatcher: e,
interceptor: interceptor,
}
}
Expand Down

0 comments on commit 4ed201e

Please sign in to comment.