diff --git a/agent/agent.go b/agent/agent.go index 5291d8650..115b60e2a 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -2,6 +2,7 @@ package agent import ( "context" + "fmt" "sync" "time" @@ -130,11 +131,16 @@ func (r *Agent) Run(recipe recipe.Recipe) (run Run) { // create a goroutine to let extractor concurrently emit data // while stream is listening via stream.Listen(). go func() { + defer func() { + if r := recover(); r != nil { + run.Error = fmt.Errorf("%s", r) + } + stream.Close() + }() err = runExtractor() if err != nil { run.Error = err } - stream.Close() }() // start listening. diff --git a/agent/agent_test.go b/agent/agent_test.go index 2a456c4d5..39e681519 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -1,6 +1,7 @@ package agent_test import ( + "context" "errors" "testing" @@ -195,6 +196,29 @@ func TestRunnerRun(t *testing.T) { assert.Error(t, run.Error) }) + t.Run("should return error when extractor panicing", func(t *testing.T) { + extr := new(panicExtractor) + extr.On("Init", mockCtx, validRecipe.Source.Config).Return(nil).Once() + ef := registry.NewExtractorFactory() + ef.Register("test-extractor", newExtractor(extr)) + + proc := mocks.NewProcessor() + proc.On("Init", mockCtx, validRecipe.Processors[0].Config).Return(nil).Once() + defer proc.AssertExpectations(t) + pf := registry.NewProcessorFactory() + pf.Register("test-processor", newProcessor(proc)) + + sink := mocks.NewSink() + sink.On("Init", mockCtx, validRecipe.Sinks[0].Config).Return(nil).Once() + defer sink.AssertExpectations(t) + sf := registry.NewSinkFactory() + sf.Register("test-sink", newSink(sink)) + + r := agent.NewAgent(ef, pf, sf, nil, test.Logger) + run := r.Run(validRecipe) + assert.Error(t, run.Error) + }) + t.Run("should return error when processing fails", func(t *testing.T) { data := []models.Record{ models.NewRecord(&assets.Table{}), @@ -225,6 +249,35 @@ func TestRunnerRun(t *testing.T) { assert.Error(t, run.Error) }) + t.Run("should return error when processing panics", func(t *testing.T) { + data := []models.Record{ + models.NewRecord(&assets.Table{}), + } + + extr := mocks.NewExtractor() + extr.SetEmit(data) + extr.On("Init", mockCtx, validRecipe.Source.Config).Return(nil).Once() + extr.On("Extract", mockCtx, mock.AnythingOfType("plugins.Emit")).Return(nil).Once() + ef := registry.NewExtractorFactory() + ef.Register("test-extractor", newExtractor(extr)) + + proc := new(panicProcessor) + proc.On("Init", mockCtx, validRecipe.Processors[0].Config).Return(nil).Once() + defer proc.AssertExpectations(t) + pf := registry.NewProcessorFactory() + pf.Register("test-processor", newProcessor(proc)) + + sink := mocks.NewSink() + sink.On("Init", mockCtx, validRecipe.Sinks[0].Config).Return(nil).Once() + defer sink.AssertExpectations(t) + sf := registry.NewSinkFactory() + sf.Register("test-sink", newSink(sink)) + + r := agent.NewAgent(ef, pf, sf, nil, test.Logger) + run := r.Run(validRecipe) + assert.Error(t, run.Error) + }) + t.Run("should return error when sink fails", func(t *testing.T) { data := []models.Record{ models.NewRecord(&assets.Table{}), @@ -394,5 +447,20 @@ func newMockMonitor() *mockMonitor { func (m *mockMonitor) RecordRun(recipe recipe.Recipe, durationInMs int, success bool) { m.Called(recipe, durationInMs, success) - return +} + +type panicExtractor struct { + mocks.Extractor +} + +func (e *panicExtractor) Extract(ctx context.Context, emit plugins.Emit) (err error) { + panic("panicing") +} + +type panicProcessor struct { + mocks.Processor +} + +func (p *panicProcessor) Process(ctx context.Context, src models.Record) (dst models.Record, err error) { + panic("panicing") } diff --git a/agent/stream.go b/agent/stream.go index c73307824..00ae7f2c9 100644 --- a/agent/stream.go +++ b/agent/stream.go @@ -1,6 +1,7 @@ package agent import ( + "fmt" "sync" "github.com/odpf/meteor/models" @@ -44,6 +45,13 @@ func (s *stream) broadcast() error { for _, l := range s.subscribers { wg.Add(1) go func(l *subscriber) { + defer func() { + if r := recover(); r != nil { + s.closeWithError(fmt.Errorf("%s", r)) + } + wg.Done() + }() + batch := newBatch(l.batchSize) // listen to channel and emit data to subscriber callback if batch is full for d := range l.channel { @@ -61,8 +69,6 @@ func (s *stream) broadcast() error { s.closeWithError(err) } } - - wg.Done() }(l) }