Skip to content

Commit

Permalink
fix: catch panics to avoid breaking valid recipes (#232)
Browse files Browse the repository at this point in the history
* fix: catch panic while running extractor

* fix: add recoverFunc for sink and processor

* fix: remove division by zero

* fix: remove print statement

* chore: remove redundant return statement

* feat: handling panic on each goroutines

Co-authored-by: Stewart Jingga <[email protected]>
  • Loading branch information
GrayFlash and StewartJingga authored Sep 27, 2021
1 parent 09db366 commit 14dab68
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 4 deletions.
8 changes: 7 additions & 1 deletion agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package agent

import (
"context"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -130,11 +131,16 @@ func (r *Agent) Run(recipe recipe.Recipe) (run Run) {
// create a goroutine to let extractor concurrently emit data
// while stream is listening via stream.Listen().
go func() {
defer func() {
if r := recover(); r != nil {
run.Error = fmt.Errorf("%s", r)
}
stream.Close()
}()
err = runExtractor()
if err != nil {
run.Error = err
}
stream.Close()
}()

// start listening.
Expand Down
70 changes: 69 additions & 1 deletion agent/agent_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package agent_test

import (
"context"
"errors"
"testing"

Expand Down Expand Up @@ -195,6 +196,29 @@ func TestRunnerRun(t *testing.T) {
assert.Error(t, run.Error)
})

t.Run("should return error when extractor panicing", func(t *testing.T) {
extr := new(panicExtractor)
extr.On("Init", mockCtx, validRecipe.Source.Config).Return(nil).Once()
ef := registry.NewExtractorFactory()
ef.Register("test-extractor", newExtractor(extr))

proc := mocks.NewProcessor()
proc.On("Init", mockCtx, validRecipe.Processors[0].Config).Return(nil).Once()
defer proc.AssertExpectations(t)
pf := registry.NewProcessorFactory()
pf.Register("test-processor", newProcessor(proc))

sink := mocks.NewSink()
sink.On("Init", mockCtx, validRecipe.Sinks[0].Config).Return(nil).Once()
defer sink.AssertExpectations(t)
sf := registry.NewSinkFactory()
sf.Register("test-sink", newSink(sink))

r := agent.NewAgent(ef, pf, sf, nil, test.Logger)
run := r.Run(validRecipe)
assert.Error(t, run.Error)
})

t.Run("should return error when processing fails", func(t *testing.T) {
data := []models.Record{
models.NewRecord(&assets.Table{}),
Expand Down Expand Up @@ -225,6 +249,35 @@ func TestRunnerRun(t *testing.T) {
assert.Error(t, run.Error)
})

t.Run("should return error when processing panics", func(t *testing.T) {
data := []models.Record{
models.NewRecord(&assets.Table{}),
}

extr := mocks.NewExtractor()
extr.SetEmit(data)
extr.On("Init", mockCtx, validRecipe.Source.Config).Return(nil).Once()
extr.On("Extract", mockCtx, mock.AnythingOfType("plugins.Emit")).Return(nil).Once()
ef := registry.NewExtractorFactory()
ef.Register("test-extractor", newExtractor(extr))

proc := new(panicProcessor)
proc.On("Init", mockCtx, validRecipe.Processors[0].Config).Return(nil).Once()
defer proc.AssertExpectations(t)
pf := registry.NewProcessorFactory()
pf.Register("test-processor", newProcessor(proc))

sink := mocks.NewSink()
sink.On("Init", mockCtx, validRecipe.Sinks[0].Config).Return(nil).Once()
defer sink.AssertExpectations(t)
sf := registry.NewSinkFactory()
sf.Register("test-sink", newSink(sink))

r := agent.NewAgent(ef, pf, sf, nil, test.Logger)
run := r.Run(validRecipe)
assert.Error(t, run.Error)
})

t.Run("should return error when sink fails", func(t *testing.T) {
data := []models.Record{
models.NewRecord(&assets.Table{}),
Expand Down Expand Up @@ -394,5 +447,20 @@ func newMockMonitor() *mockMonitor {

func (m *mockMonitor) RecordRun(recipe recipe.Recipe, durationInMs int, success bool) {
m.Called(recipe, durationInMs, success)
return
}

type panicExtractor struct {
mocks.Extractor
}

func (e *panicExtractor) Extract(ctx context.Context, emit plugins.Emit) (err error) {
panic("panicing")
}

type panicProcessor struct {
mocks.Processor
}

func (p *panicProcessor) Process(ctx context.Context, src models.Record) (dst models.Record, err error) {
panic("panicing")
}
10 changes: 8 additions & 2 deletions agent/stream.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package agent

import (
"fmt"
"sync"

"github.com/odpf/meteor/models"
Expand Down Expand Up @@ -44,6 +45,13 @@ func (s *stream) broadcast() error {
for _, l := range s.subscribers {
wg.Add(1)
go func(l *subscriber) {
defer func() {
if r := recover(); r != nil {
s.closeWithError(fmt.Errorf("%s", r))
}
wg.Done()
}()

batch := newBatch(l.batchSize)
// listen to channel and emit data to subscriber callback if batch is full
for d := range l.channel {
Expand All @@ -61,8 +69,6 @@ func (s *stream) broadcast() error {
s.closeWithError(err)
}
}

wg.Done()
}(l)
}

Expand Down

0 comments on commit 14dab68

Please sign in to comment.