Skip to content

Commit

Permalink
fix: ensure executor doesn't deadlock when closure errors
Browse files Browse the repository at this point in the history
When running 'gomod2nix' on in my project, the 'gomod2nix import' was
failing for every import. I have more imports than the default maxJobs.

This caused a deadlock and the program never finished.

This is because in the erroring case, we send to the errChan, which is a
blocking channel. If this blocks then the defers are never called, most
importantly the `defer` which pulls an entry off the semaphore
(e.guard).

This means once the erroring work functions exceeds the numWorkers, we
will block trying to acquire the semaphore when we call .Add with more
work.

We never get to the point where we call .Wait(), which would drain the
errChan becuase we are blocked on the semaphore whilst we are still
generating work.

This change moves the semaphore acquire to within the goroutines
themselves. This alters the behaviour in that we now will start as many
goroutines as we have work items, but the work they do will still be
gated by the semaphore.

This is reasonable behaviour: goroutines are cheap, in general this
package is useful if the work the functions are doing is expensive not
the goroutine creation itself. The work still is guarded by the
semaphore.

There is also a regression test added and in passing, the spelling of
Parallel is corrected.
  • Loading branch information
= authored and marcusramberg committed Oct 20, 2024
1 parent e8f299a commit 5d38709
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 10 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@
result*
/tests/*/gomod2nix.toml
/tests/*/go.mod
.idea
4 changes: 2 additions & 2 deletions internal/generate/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func ImportPkgs(directory string, numWorkers int) error {
return err
}

executor := lib.NewParallellExecutor(numWorkers)
executor := lib.NewParallelExecutor(numWorkers)
for _, dl := range modDownloads {
dl := dl
executor.Add(func() error {
Expand Down Expand Up @@ -146,7 +146,7 @@ func GeneratePkgs(directory string, goMod2NixPath string, numWorkers int) ([]*sc
return nil, err
}

executor := lib.NewParallellExecutor(numWorkers)
executor := lib.NewParallelExecutor(numWorkers)
var mux sync.Mutex

cache := schema.ReadCache(goMod2NixPath)
Expand Down
15 changes: 7 additions & 8 deletions internal/lib/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"sync"
)

// ParallellExecutor - Execute callback functions in parallell
type ParallellExecutor struct {
// ParallelExecutor - Execute callback functions in parallel
type ParallelExecutor struct {
errChan chan error
wg *sync.WaitGroup
mux *sync.Mutex
Expand All @@ -16,8 +16,8 @@ type ParallellExecutor struct {
done bool
}

func NewParallellExecutor(maxWorkers int) *ParallellExecutor {
return &ParallellExecutor{
func NewParallelExecutor(maxWorkers int) *ParallelExecutor {
return &ParallelExecutor{
errChan: make(chan error),
mux: new(sync.Mutex),
wg: new(sync.WaitGroup),
Expand All @@ -28,12 +28,11 @@ func NewParallellExecutor(maxWorkers int) *ParallellExecutor {
}
}

func (e *ParallellExecutor) Add(fn func() error) {
func (e *ParallelExecutor) Add(fn func() error) {
e.wg.Add(1)

e.guard <- struct{}{} // Block until a worker is available

go func() {
e.guard <- struct{}{} // Block until a worker is available
defer e.wg.Done()
defer func() {
<-e.guard
Expand All @@ -46,7 +45,7 @@ func (e *ParallellExecutor) Add(fn func() error) {
}()
}

func (e *ParallellExecutor) Wait() error {
func (e *ParallelExecutor) Wait() error {
e.mux.Lock()
defer e.mux.Unlock()

Expand Down
36 changes: 36 additions & 0 deletions internal/lib/executor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package lib

import (
"errors"
"testing"
"time"
)

// TestParallelExecutor_fnAlwaysErrors ensures that the executor does not block
// forever when there are more erroring functions than workers. This is a
// regression test.
func TestParallelExecutor_fnAlwaysErrors(t *testing.T) {
const maxWorkers = 1
executor := NewParallelExecutor(1)

for i := 0; i < maxWorkers+1; i++ {
executor.Add(func() error {
return errors.New("testerror")
})
}

errCh := make(chan error)
go func() {
defer close(errCh)
errCh <- executor.Wait()
}()

select {
case err := <-errCh:
if err == nil {
t.Error("Expected error, got nil")
}
case <-time.After(10 * time.Second):
t.Error("Timed out waiting for executor to finish: deadlock")
}
}

0 comments on commit 5d38709

Please sign in to comment.