diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 7ca098045..6b2200699 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -14,4 +14,4 @@ jobs: - name: Install packages run: go mod tidy - name: Run Test - run: make test \ No newline at end of file + run: make test diff --git a/Makefile b/Makefile index 1e42402a7..87931d1c3 100644 --- a/Makefile +++ b/Makefile @@ -18,6 +18,9 @@ test: test-coverage: test go tool cover -html=coverage.out +test-e2e: + go test ./test/e2e -tags=integration -count=1 + generate-proto: ## regenerate protos @echo " > cloning protobuf from odpf/proton" @echo " > generating protobuf" diff --git a/agent/agent.go b/agent/agent.go index 873c7c3a8..03e64045c 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -254,6 +254,12 @@ func (r *Agent) setupSink(ctx context.Context, sr recipe.SinkRecipe, stream *str return err }, defaultBatchSize) + stream.onClose(func() { + if err = sink.Close(); err != nil { + r.logger.Warn("error closing sink", "sink", sr.Name, "error", err) + } + }) + return } diff --git a/agent/agent_test.go b/agent/agent_test.go index dc1703379..c6bf99fb2 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -3,10 +3,6 @@ package agent_test import ( "context" "errors" - "github.com/odpf/meteor/test" - "testing" - "time" - "github.com/odpf/meteor/agent" "github.com/odpf/meteor/models" "github.com/odpf/meteor/models/odpf/assets" @@ -14,8 +10,11 @@ import ( "github.com/odpf/meteor/recipe" "github.com/odpf/meteor/registry" "github.com/odpf/meteor/test/mocks" + "github.com/odpf/meteor/test/utils" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "testing" + "time" ) var mockCtx = mock.AnythingOfType("*context.emptyCtx") @@ -43,7 +42,7 @@ func TestRunnerRun(t *testing.T) { ExtractorFactory: registry.NewExtractorFactory(), ProcessorFactory: registry.NewProcessorFactory(), SinkFactory: registry.NewSinkFactory(), - Logger: test.Logger, + Logger: utils.Logger, }) run := r.Run(validRecipe) assert.IsType(t, agent.Run{}, run) @@ -67,7 +66,7 @@ func TestRunnerRun(t *testing.T) { ExtractorFactory: registry.NewExtractorFactory(), ProcessorFactory: pf, SinkFactory: sf, - Logger: test.Logger, + Logger: utils.Logger, }) run := r.Run(validRecipe) assert.Error(t, run.Error) @@ -92,7 +91,7 @@ func TestRunnerRun(t *testing.T) { ExtractorFactory: ef, ProcessorFactory: registry.NewProcessorFactory(), SinkFactory: sf, - Logger: test.Logger, + Logger: utils.Logger, }) run := r.Run(validRecipe) assert.Error(t, run.Error) @@ -119,7 +118,7 @@ func TestRunnerRun(t *testing.T) { ExtractorFactory: ef, ProcessorFactory: pf, SinkFactory: registry.NewSinkFactory(), - Logger: test.Logger, + Logger: utils.Logger, }) run := r.Run(validRecipe) assert.Error(t, run.Error) @@ -150,7 +149,7 @@ func TestRunnerRun(t *testing.T) { ExtractorFactory: ef, ProcessorFactory: pf, SinkFactory: sf, - Logger: test.Logger, + Logger: utils.Logger, }) run := r.Run(validRecipe) assert.Error(t, run.Error) @@ -183,7 +182,7 @@ func TestRunnerRun(t *testing.T) { ExtractorFactory: ef, ProcessorFactory: pf, SinkFactory: sf, - Logger: test.Logger, + Logger: utils.Logger, }) run := r.Run(validRecipe) assert.Error(t, run.Error) @@ -218,7 +217,7 @@ func TestRunnerRun(t *testing.T) { ExtractorFactory: ef, ProcessorFactory: pf, SinkFactory: sf, - Logger: test.Logger, + Logger: utils.Logger, }) run := r.Run(validRecipe) assert.Error(t, run.Error) @@ -243,6 +242,7 @@ func TestRunnerRun(t *testing.T) { sink := mocks.NewSink() sink.On("Init", mockCtx, validRecipe.Sinks[0].Config).Return(nil).Once() + sink.On("Close").Return(nil) defer sink.AssertExpectations(t) sf := registry.NewSinkFactory() if err := sf.Register("test-sink", newSink(sink)); err != nil { @@ -253,7 +253,7 @@ func TestRunnerRun(t *testing.T) { ExtractorFactory: ef, ProcessorFactory: pf, SinkFactory: sf, - Logger: test.Logger, + Logger: utils.Logger, }) run := r.Run(validRecipe) assert.Error(t, run.Error) @@ -277,6 +277,7 @@ func TestRunnerRun(t *testing.T) { sink := mocks.NewSink() sink.On("Init", mockCtx, validRecipe.Sinks[0].Config).Return(nil).Once() + sink.On("Close").Return(nil) defer sink.AssertExpectations(t) sf := registry.NewSinkFactory() if err := sf.Register("test-sink", newSink(sink)); err != nil { @@ -287,7 +288,7 @@ func TestRunnerRun(t *testing.T) { ExtractorFactory: ef, ProcessorFactory: pf, SinkFactory: sf, - Logger: test.Logger, + Logger: utils.Logger, }) run := r.Run(validRecipe) assert.Error(t, run.Error) @@ -318,6 +319,7 @@ func TestRunnerRun(t *testing.T) { sink := mocks.NewSink() sink.On("Init", mockCtx, validRecipe.Sinks[0].Config).Return(nil).Once() + sink.On("Close").Return(nil) defer sink.AssertExpectations(t) sf := registry.NewSinkFactory() if err := sf.Register("test-sink", newSink(sink)); err != nil { @@ -328,7 +330,7 @@ func TestRunnerRun(t *testing.T) { ExtractorFactory: ef, ProcessorFactory: pf, SinkFactory: sf, - Logger: test.Logger, + Logger: utils.Logger, }) run := r.Run(validRecipe) assert.Error(t, run.Error) @@ -358,6 +360,7 @@ func TestRunnerRun(t *testing.T) { sink := mocks.NewSink() sink.On("Init", mockCtx, validRecipe.Sinks[0].Config).Return(nil).Once() + sink.On("Close").Return(nil) defer sink.AssertExpectations(t) sf := registry.NewSinkFactory() if err := sf.Register("test-sink", newSink(sink)); err != nil { @@ -368,7 +371,7 @@ func TestRunnerRun(t *testing.T) { ExtractorFactory: ef, ProcessorFactory: pf, SinkFactory: sf, - Logger: test.Logger, + Logger: utils.Logger, }) run := r.Run(validRecipe) assert.Error(t, run.Error) @@ -400,6 +403,7 @@ func TestRunnerRun(t *testing.T) { sink := mocks.NewSink() sink.On("Init", mockCtx, validRecipe.Sinks[0].Config).Return(nil).Once() sink.On("Sink", mockCtx, data).Return(errors.New("some error")) + sink.On("Close").Return(nil) defer sink.AssertExpectations(t) sf := registry.NewSinkFactory() if err := sf.Register("test-sink", newSink(sink)); err != nil { @@ -410,7 +414,7 @@ func TestRunnerRun(t *testing.T) { ExtractorFactory: ef, ProcessorFactory: pf, SinkFactory: sf, - Logger: test.Logger, + Logger: utils.Logger, }) run := r.Run(validRecipe) assert.NoError(t, run.Error) @@ -442,6 +446,7 @@ func TestRunnerRun(t *testing.T) { sink := mocks.NewSink() sink.On("Init", mockCtx, validRecipe.Sinks[0].Config).Return(nil).Once() sink.On("Sink", mockCtx, data).Return(errors.New("some error")) + sink.On("Close").Return(nil) defer sink.AssertExpectations(t) sf := registry.NewSinkFactory() if err := sf.Register("test-sink", newSink(sink)); err != nil { @@ -452,7 +457,7 @@ func TestRunnerRun(t *testing.T) { ExtractorFactory: ef, ProcessorFactory: pf, SinkFactory: sf, - Logger: test.Logger, + Logger: utils.Logger, StopOnSinkError: true, }) run := r.Run(validRecipe) @@ -485,6 +490,7 @@ func TestRunnerRun(t *testing.T) { sink := mocks.NewSink() sink.On("Init", mockCtx, validRecipe.Sinks[0].Config).Return(nil).Once() sink.On("Sink", mockCtx, data).Return(nil) + sink.On("Close").Return(nil) defer sink.AssertExpectations(t) sf := registry.NewSinkFactory() if err := sf.Register("test-sink", newSink(sink)); err != nil { @@ -495,7 +501,7 @@ func TestRunnerRun(t *testing.T) { ExtractorFactory: ef, ProcessorFactory: pf, SinkFactory: sf, - Logger: test.Logger, + Logger: utils.Logger, }) run := r.Run(validRecipe) assert.NoError(t, run.Error) @@ -528,6 +534,7 @@ func TestRunnerRun(t *testing.T) { sink := mocks.NewSink() sink.On("Init", mockCtx, validRecipe.Sinks[0].Config).Return(nil).Once() sink.On("Sink", mockCtx, data).Return(nil) + sink.On("Close").Return(nil) defer sink.AssertExpectations(t) sf := registry.NewSinkFactory() if err := sf.Register("test-sink", newSink(sink)); err != nil { @@ -544,7 +551,7 @@ func TestRunnerRun(t *testing.T) { ProcessorFactory: pf, SinkFactory: sf, Monitor: monitor, - Logger: test.Logger, + Logger: utils.Logger, }) run := r.Run(validRecipe) assert.NoError(t, run.Error) @@ -579,6 +586,7 @@ func TestRunnerRun(t *testing.T) { sink.On("Init", mockCtx, validRecipe.Sinks[0].Config).Return(nil).Once() sink.On("Sink", mockCtx, data).Return(plugins.NewRetryError(err)).Once() sink.On("Sink", mockCtx, data).Return(nil) + sink.On("Close").Return(nil) defer sink.AssertExpectations(t) sf := registry.NewSinkFactory() if err := sf.Register("test-sink", newSink(sink)); err != nil { @@ -589,7 +597,7 @@ func TestRunnerRun(t *testing.T) { ExtractorFactory: ef, ProcessorFactory: pf, SinkFactory: sf, - Logger: test.Logger, + Logger: utils.Logger, MaxRetries: 2, // need to retry "at least" 2 times since Sink returns RetryError twice RetryInitialInterval: 1 * time.Millisecond, // this is to override default retry interval to reduce test time }) @@ -628,6 +636,7 @@ func TestRunnerRunMultiple(t *testing.T) { sink := mocks.NewSink() sink.On("Init", mockCtx, validRecipe.Sinks[0].Config).Return(nil) sink.On("Sink", mockCtx, data).Return(nil) + sink.On("Close").Return(nil) defer sink.AssertExpectations(t) sf := registry.NewSinkFactory() if err := sf.Register("test-sink", newSink(sink)); err != nil { @@ -638,7 +647,7 @@ func TestRunnerRunMultiple(t *testing.T) { ExtractorFactory: ef, ProcessorFactory: pf, SinkFactory: sf, - Logger: test.Logger, + Logger: utils.Logger, }) runs := r.RunMultiple(recipeList) diff --git a/agent/stream.go b/agent/stream.go index dc3b04dd7..a6fc1f4f8 100644 --- a/agent/stream.go +++ b/agent/stream.go @@ -18,6 +18,7 @@ type subscriber struct { type stream struct { middlewares []streamMiddleware subscribers []*subscriber + onCloses []func() closed bool err error } @@ -38,6 +39,13 @@ func (s *stream) subscribe(callback func(batchedData []models.Record) error, bat return s } +// onClose() is used to register callback for after stream is closed. +func (s *stream) onClose(callback func()) *stream { + s.onCloses = append(s.onCloses, callback) + + return s +} + // broadcast() will start listening to emitter for any pushed data. // This process is blocking, so most times you would want to call this inside a goroutine. func (s *stream) broadcast() error { @@ -55,7 +63,7 @@ func (s *stream) broadcast() error { batch := newBatch(l.batchSize) // listen to channel and emit data to subscriber callback if batch is full for d := range l.channel { - if err := batch.add(d); err != nil { + if err := batch.add(d); err != nil { s.closeWithError(err) } if batch.isFull() { @@ -116,6 +124,10 @@ func (s *stream) Close() { close(l.channel) } s.closed = true + + for _, onClose := range s.onCloses { + onClose() + } } func (s *stream) runMiddlewares(d models.Record) (res models.Record, err error) { diff --git a/plugins/extractors/bigquery/bigquery_test.go b/plugins/extractors/bigquery/bigquery_test.go index b240d2304..520cc69e9 100644 --- a/plugins/extractors/bigquery/bigquery_test.go +++ b/plugins/extractors/bigquery/bigquery_test.go @@ -4,17 +4,17 @@ package bigquery_test import ( "context" + "github.com/odpf/meteor/test/utils" "testing" "github.com/odpf/meteor/plugins" "github.com/odpf/meteor/plugins/extractors/bigquery" - "github.com/odpf/meteor/test" "github.com/stretchr/testify/assert" ) func TestInit(t *testing.T) { t.Run("should return error if config is invalid", func(t *testing.T) { - extr := bigquery.New(test.Logger) + extr := bigquery.New(utils.Logger) ctx, cancel := context.WithCancel(context.Background()) defer cancel() err := extr.Init(ctx, map[string]interface{}{ @@ -24,7 +24,7 @@ func TestInit(t *testing.T) { assert.Equal(t, plugins.InvalidConfigError{}, err) }) t.Run("should not return invalid config error if config is valid", func(t *testing.T) { - extr := bigquery.New(test.Logger) + extr := bigquery.New(utils.Logger) ctx, cancel := context.WithCancel(context.Background()) defer cancel() err := extr.Init(ctx, map[string]interface{}{ diff --git a/plugins/extractors/bigtable/bigtable_test.go b/plugins/extractors/bigtable/bigtable_test.go index 67bb7d9ef..27cc35e2b 100644 --- a/plugins/extractors/bigtable/bigtable_test.go +++ b/plugins/extractors/bigtable/bigtable_test.go @@ -4,6 +4,7 @@ package bigtable_test import ( "context" + "github.com/odpf/meteor/test/utils" "log" "os" "testing" @@ -11,8 +12,6 @@ import ( "cloud.google.com/go/bigtable" "github.com/odpf/meteor/plugins" bt "github.com/odpf/meteor/plugins/extractors/bigtable" - "github.com/odpf/meteor/test" - "github.com/ory/dockertest/v3" "github.com/ory/dockertest/v3/docker" "github.com/stretchr/testify/assert" @@ -38,7 +37,7 @@ func TestMain(m *testing.M) { _, err = bigtable.NewAdminClient(context.Background(), "dev", "dev") return } - purgeFn, err := test.CreateContainer(opts, retryFn) + purgeFn, err := utils.CreateContainer(opts, retryFn) if err != nil { log.Fatal("", err) } @@ -54,7 +53,7 @@ func TestMain(m *testing.M) { func TestInit(t *testing.T) { t.Run("should return error if no project_id in config", func(t *testing.T) { - err := bt.New(test.Logger).Init(context.TODO(), map[string]interface{}{ + err := bt.New(utils.Logger).Init(context.TODO(), map[string]interface{}{ "wrong-config": "sample-project", }) @@ -62,7 +61,7 @@ func TestInit(t *testing.T) { }) t.Run("should return error if project_id is empty", func(t *testing.T) { - err := bt.New(test.Logger).Init(context.TODO(), map[string]interface{}{ + err := bt.New(utils.Logger).Init(context.TODO(), map[string]interface{}{ "project_id": "", }) diff --git a/plugins/extractors/cassandra/cassandra_test.go b/plugins/extractors/cassandra/cassandra_test.go index ead175dab..ea4a0b0b3 100644 --- a/plugins/extractors/cassandra/cassandra_test.go +++ b/plugins/extractors/cassandra/cassandra_test.go @@ -5,6 +5,7 @@ package cassandra_test import ( "context" "fmt" + "github.com/odpf/meteor/test/utils" "log" "os" "testing" @@ -16,7 +17,6 @@ import ( "github.com/odpf/meteor/models/odpf/assets/facets" "github.com/odpf/meteor/plugins" "github.com/odpf/meteor/plugins/extractors/cassandra" - "github.com/odpf/meteor/test" "github.com/odpf/meteor/test/mocks" "github.com/ory/dockertest/v3" "github.com/ory/dockertest/v3/docker" @@ -70,7 +70,7 @@ func TestMain(m *testing.M) { } return nil } - purgeFn, err := test.CreateContainer(opts, retryFn) + purgeFn, err := utils.CreateContainer(opts, retryFn) if err != nil { log.Fatal(err) } @@ -103,7 +103,7 @@ func TestEmptyHosts(t *testing.T) { // TestInit tests the configs func TestInit(t *testing.T) { t.Run("should return error for invalid configs", func(t *testing.T) { - err := cassandra.New(test.Logger).Init(context.TODO(), map[string]interface{}{ + err := cassandra.New(utils.Logger).Init(context.TODO(), map[string]interface{}{ "password": pass, "host": host, }) @@ -116,7 +116,7 @@ func TestInit(t *testing.T) { func TestExtract(t *testing.T) { t.Run("should extract and output tables metadata along with its columns", func(t *testing.T) { ctx := context.TODO() - extr := cassandra.New(test.Logger) + extr := cassandra.New(utils.Logger) err := extr.Init(ctx, map[string]interface{}{ "user_id": user, @@ -175,7 +175,7 @@ func execute(queries []string) (err error) { // newExtractor returns a new extractor func newExtractor() *cassandra.Extractor { - return cassandra.New(test.Logger) + return cassandra.New(utils.Logger) } // getExpected returns the expected result diff --git a/plugins/extractors/clickhouse/clickhouse_test.go b/plugins/extractors/clickhouse/clickhouse_test.go index 901f1076d..42b697d49 100644 --- a/plugins/extractors/clickhouse/clickhouse_test.go +++ b/plugins/extractors/clickhouse/clickhouse_test.go @@ -5,6 +5,7 @@ package clickhouse_test import ( "context" "fmt" + "github.com/odpf/meteor/test/utils" "log" "os" "testing" @@ -18,7 +19,6 @@ import ( "github.com/odpf/meteor/models/odpf/assets/facets" "github.com/odpf/meteor/plugins" "github.com/odpf/meteor/plugins/extractors/clickhouse" - "github.com/odpf/meteor/test" "github.com/odpf/meteor/test/mocks" "github.com/ory/dockertest/v3" "github.com/ory/dockertest/v3/docker" @@ -65,7 +65,7 @@ func TestMain(m *testing.M) { } return db.Ping() } - purgeFn, err := test.CreateContainer(opts, retryFn) + purgeFn, err := utils.CreateContainer(opts, retryFn) if err != nil { log.Fatal(err) } @@ -206,5 +206,5 @@ func execute(db *sql.DB, queries []string) (err error) { } func newExtractor() *clickhouse.Extractor { - return clickhouse.New(test.Logger) + return clickhouse.New(utils.Logger) } diff --git a/plugins/extractors/couchdb/couchdb_test.go b/plugins/extractors/couchdb/couchdb_test.go index 887d589f5..dbc442f9e 100644 --- a/plugins/extractors/couchdb/couchdb_test.go +++ b/plugins/extractors/couchdb/couchdb_test.go @@ -5,6 +5,7 @@ package couchdb_test import ( "context" "fmt" + "github.com/odpf/meteor/test/utils" "log" "os" "strconv" @@ -14,7 +15,6 @@ import ( "github.com/go-kivik/kivik" "github.com/odpf/meteor/plugins" "github.com/odpf/meteor/plugins/extractors/couchdb" - "github.com/odpf/meteor/test" "github.com/odpf/meteor/test/mocks" "github.com/ory/dockertest/v3" "github.com/ory/dockertest/v3/docker" @@ -60,7 +60,7 @@ func TestMain(m *testing.M) { _, err = client.Ping(context.TODO()) return } - purgeFn, err := test.CreateContainer(opts, retryFn) + purgeFn, err := utils.CreateContainer(opts, retryFn) if err != nil { log.Fatal(err) } @@ -81,7 +81,7 @@ func TestMain(m *testing.M) { func TestInit(t *testing.T) { t.Run("should return error for invalid configs", func(t *testing.T) { - err := couchdb.New(test.Logger).Init(context.TODO(), map[string]interface{}{ + err := couchdb.New(utils.Logger).Init(context.TODO(), map[string]interface{}{ "password": "pass", "host": host, }) @@ -93,7 +93,7 @@ func TestInit(t *testing.T) { func TestExtract(t *testing.T) { t.Run("should extract and output tables metadata along with its columns", func(t *testing.T) { ctx := context.TODO() - extr := couchdb.New(test.Logger) + extr := couchdb.New(utils.Logger) err := extr.Init(ctx, map[string]interface{}{ "user_id": user, diff --git a/plugins/extractors/csv/csv_test.go b/plugins/extractors/csv/csv_test.go index 72fdd4f8c..007b6e844 100644 --- a/plugins/extractors/csv/csv_test.go +++ b/plugins/extractors/csv/csv_test.go @@ -4,6 +4,7 @@ package csv_test import ( "context" + "github.com/odpf/meteor/test/utils" "testing" "github.com/odpf/meteor/models" @@ -12,7 +13,6 @@ import ( "github.com/odpf/meteor/models/odpf/assets/facets" "github.com/odpf/meteor/plugins" "github.com/odpf/meteor/plugins/extractors/csv" - "github.com/odpf/meteor/test" "github.com/odpf/meteor/test/mocks" "github.com/stretchr/testify/assert" ) @@ -20,7 +20,7 @@ import ( func TestInit(t *testing.T) { t.Run("should return error if fileName and directory both are empty", func(t *testing.T) { config := map[string]interface{}{} - err := csv.New(test.Logger).Init( + err := csv.New(utils.Logger).Init( context.TODO(), config) assert.Equal(t, plugins.InvalidConfigError{}, err) @@ -30,7 +30,7 @@ func TestInit(t *testing.T) { func TestExtract(t *testing.T) { t.Run("should extract data if path is a file", func(t *testing.T) { ctx := context.TODO() - extr := csv.New(test.Logger) + extr := csv.New(utils.Logger) err := extr.Init(ctx, map[string]interface{}{ "path": "./testdata/test.csv", }) @@ -64,7 +64,7 @@ func TestExtract(t *testing.T) { t.Run("should extract data from all files if path is a dir", func(t *testing.T) { ctx := context.TODO() - extr := csv.New(test.Logger) + extr := csv.New(utils.Logger) err := extr.Init(ctx, map[string]interface{}{ "path": "./testdata", }) diff --git a/plugins/extractors/elastic/elastic_test.go b/plugins/extractors/elastic/elastic_test.go index a936ed6fe..b893b2117 100644 --- a/plugins/extractors/elastic/elastic_test.go +++ b/plugins/extractors/elastic/elastic_test.go @@ -6,6 +6,7 @@ import ( "context" "encoding/json" "fmt" + "github.com/odpf/meteor/test/utils" "log" "net/http" "os" @@ -20,7 +21,6 @@ import ( "github.com/odpf/meteor/models/odpf/assets/facets" "github.com/odpf/meteor/plugins" "github.com/odpf/meteor/plugins/extractors/elastic" - "github.com/odpf/meteor/test" "github.com/odpf/meteor/test/mocks" "github.com/ory/dockertest/v3" "github.com/ory/dockertest/v3/docker" @@ -83,7 +83,7 @@ func TestMain(m *testing.M) { } return } - purgeFn, err := test.CreateContainer(opts, retryFn) + purgeFn, err := utils.CreateContainer(opts, retryFn) if err != nil { log.Fatal(err) } @@ -184,7 +184,7 @@ func jsonStruct(doc MeteorMockElasticDocs) string { } func newExtractor() *elastic.Extractor { - return elastic.New(test.Logger) + return elastic.New(utils.Logger) } func getExpectedVal() []models.Record { diff --git a/plugins/extractors/gcs/gcs_test.go b/plugins/extractors/gcs/gcs_test.go index 3735ac36c..fcfbeb7ba 100644 --- a/plugins/extractors/gcs/gcs_test.go +++ b/plugins/extractors/gcs/gcs_test.go @@ -4,17 +4,17 @@ package gcs_test import ( "context" + "github.com/odpf/meteor/test/utils" "testing" "github.com/odpf/meteor/plugins" "github.com/odpf/meteor/plugins/extractors/gcs" - "github.com/odpf/meteor/test" "github.com/stretchr/testify/assert" ) func TestInit(t *testing.T) { t.Run("should return error if no project_id in config", func(t *testing.T) { - err := gcs.New(test.Logger).Init(context.TODO(), map[string]interface{}{ + err := gcs.New(utils.Logger).Init(context.TODO(), map[string]interface{}{ "wrong-config": "sample-project", }) diff --git a/plugins/extractors/grafana/grafana_test.go b/plugins/extractors/grafana/grafana_test.go index 4c0d52aa1..db4e0e194 100644 --- a/plugins/extractors/grafana/grafana_test.go +++ b/plugins/extractors/grafana/grafana_test.go @@ -5,6 +5,7 @@ package grafana_test import ( "context" "fmt" + "github.com/odpf/meteor/test/utils" "net/http" "net/http/httptest" "os" @@ -15,7 +16,6 @@ import ( "github.com/odpf/meteor/models/odpf/assets/common" "github.com/odpf/meteor/plugins" "github.com/odpf/meteor/plugins/extractors/grafana" - "github.com/odpf/meteor/test" "github.com/odpf/meteor/test/mocks" "github.com/stretchr/testify/assert" ) @@ -34,7 +34,7 @@ func TestMain(m *testing.M) { func TestInit(t *testing.T) { t.Run("should return error if for empty base_url in config", func(t *testing.T) { - err := grafana.New(test.Logger).Init(context.TODO(), map[string]interface{}{ + err := grafana.New(utils.Logger).Init(context.TODO(), map[string]interface{}{ "base_url": "", "api_key": "qwerty123", }) @@ -43,7 +43,7 @@ func TestInit(t *testing.T) { }) t.Run("should return error if for empty api_key in config", func(t *testing.T) { - err := grafana.New(test.Logger).Init(context.TODO(), map[string]interface{}{ + err := grafana.New(utils.Logger).Init(context.TODO(), map[string]interface{}{ "base_url": testServer.URL, "api_key": "", }) @@ -117,7 +117,7 @@ func TestExtract(t *testing.T) { } ctx := context.TODO() - extractor := grafana.New(test.Logger) + extractor := grafana.New(utils.Logger) err := extractor.Init(ctx, map[string]interface{}{ "base_url": testServer.URL, "api_key": "qwerty123", diff --git a/plugins/extractors/kafka/kafka_test.go b/plugins/extractors/kafka/kafka_test.go index d28d0f882..839c91719 100644 --- a/plugins/extractors/kafka/kafka_test.go +++ b/plugins/extractors/kafka/kafka_test.go @@ -4,6 +4,7 @@ package kafka_test import ( "context" + "github.com/odpf/meteor/test/utils" "log" "net" @@ -16,7 +17,6 @@ import ( "github.com/odpf/meteor/models/odpf/assets/common" "github.com/odpf/meteor/plugins" "github.com/odpf/meteor/plugins/extractors/kafka" - "github.com/odpf/meteor/test" "github.com/odpf/meteor/test/mocks" "github.com/ory/dockertest/v3" "github.com/ory/dockertest/v3/docker" @@ -60,7 +60,7 @@ func TestMain(m *testing.M) { return } - purgeContainer, err := test.CreateContainer(opts, retryFn) + purgeContainer, err := utils.CreateContainer(opts, retryFn) if err != nil { log.Fatal(err) } @@ -161,7 +161,7 @@ func setup(broker kafkaLib.Broker) (err error) { } func newExtractor() *kafka.Extractor { - return kafka.New(test.Logger) + return kafka.New(utils.Logger) } // This function compares two slices without concerning about the order diff --git a/plugins/extractors/metabase/metabase_test.go b/plugins/extractors/metabase/metabase_test.go index e01f3dc5b..2b1addf3f 100644 --- a/plugins/extractors/metabase/metabase_test.go +++ b/plugins/extractors/metabase/metabase_test.go @@ -7,6 +7,7 @@ import ( "context" "encoding/json" "fmt" + "github.com/odpf/meteor/test/utils" "io/ioutil" "log" "net/http" @@ -18,7 +19,6 @@ import ( "github.com/odpf/meteor/models/odpf/assets" "github.com/odpf/meteor/plugins" "github.com/odpf/meteor/plugins/extractors/metabase" - "github.com/odpf/meteor/test" "github.com/odpf/meteor/test/mocks" "github.com/ory/dockertest/v3" "github.com/ory/dockertest/v3/docker" @@ -82,7 +82,7 @@ func TestMain(m *testing.M) { } // Exponential backoff-retry for container to be resy to accept connections - purgeFn, err := test.CreateContainer(opts, retryFn) + purgeFn, err := utils.CreateContainer(opts, retryFn) if err != nil { log.Fatal(err) } @@ -102,7 +102,7 @@ func TestMain(m *testing.M) { func TestInit(t *testing.T) { t.Run("should return error for invalid config", func(t *testing.T) { - err := metabase.New(test.Logger).Init(context.TODO(), map[string]interface{}{ + err := metabase.New(utils.Logger).Init(context.TODO(), map[string]interface{}{ "user_id": "user", "host": host, }) @@ -114,7 +114,7 @@ func TestInit(t *testing.T) { func TestExtract(t *testing.T) { t.Run("should return dashboard model", func(t *testing.T) { ctx := context.TODO() - extr := metabase.New(test.Logger) + extr := metabase.New(utils.Logger) err := extr.Init(ctx, map[string]interface{}{ "user_id": email, "password": pass, diff --git a/plugins/extractors/mongodb/mongodb_test.go b/plugins/extractors/mongodb/mongodb_test.go index 4a89b8ece..329ddd296 100644 --- a/plugins/extractors/mongodb/mongodb_test.go +++ b/plugins/extractors/mongodb/mongodb_test.go @@ -5,6 +5,7 @@ package mongodb_test import ( "context" "fmt" + "github.com/odpf/meteor/test/utils" "log" "os" "testing" @@ -14,7 +15,6 @@ import ( "github.com/odpf/meteor/models/odpf/assets/common" "github.com/odpf/meteor/plugins" "github.com/odpf/meteor/plugins/extractors/mongodb" - "github.com/odpf/meteor/test" "github.com/odpf/meteor/test/mocks" "github.com/ory/dockertest/v3" "github.com/ory/dockertest/v3/docker" @@ -68,7 +68,7 @@ func TestMain(m *testing.M) { return client.Ping(ctx, nil) } - purgeFn, err := test.CreateContainer(opts, retryFn) + purgeFn, err := utils.CreateContainer(opts, retryFn) if err != nil { log.Fatal(err) } @@ -92,7 +92,7 @@ func TestMain(m *testing.M) { func TestInit(t *testing.T) { t.Run("should return error for invalid", func(t *testing.T) { - err := mongodb.New(test.Logger).Init(context.TODO(), map[string]interface{}{ + err := mongodb.New(utils.Logger).Init(context.TODO(), map[string]interface{}{ "password": pass, "host": host, }) @@ -104,7 +104,7 @@ func TestInit(t *testing.T) { func TestExtract(t *testing.T) { t.Run("should extract and output tables metadata along with its columns", func(t *testing.T) { ctx := context.TODO() - extr := mongodb.New(test.Logger) + extr := mongodb.New(utils.Logger) err := extr.Init(ctx, map[string]interface{}{ "user_id": user, diff --git a/plugins/extractors/mssql/mssql_test.go b/plugins/extractors/mssql/mssql_test.go index 26cf8535f..38b1fd2c5 100644 --- a/plugins/extractors/mssql/mssql_test.go +++ b/plugins/extractors/mssql/mssql_test.go @@ -6,6 +6,7 @@ import ( "context" "database/sql" "fmt" + "github.com/odpf/meteor/test/utils" "log" "os" "testing" @@ -17,7 +18,6 @@ import ( "github.com/odpf/meteor/models/odpf/assets/facets" "github.com/odpf/meteor/plugins" "github.com/odpf/meteor/plugins/extractors/mssql" - "github.com/odpf/meteor/test" "github.com/odpf/meteor/test/mocks" "github.com/ory/dockertest/v3" "github.com/ory/dockertest/v3/docker" @@ -58,7 +58,7 @@ func TestMain(m *testing.M) { } return db.Ping() } - purgeFn, err := test.CreateContainer(opts, retryFn) + purgeFn, err := utils.CreateContainer(opts, retryFn) if err != nil { log.Fatal(err) } @@ -79,7 +79,7 @@ func TestMain(m *testing.M) { func TestInit(t *testing.T) { t.Run("should error for invalid configurations", func(t *testing.T) { - err := mssql.New(test.Logger).Init(context.TODO(), map[string]interface{}{ + err := mssql.New(utils.Logger).Init(context.TODO(), map[string]interface{}{ "password": "pass", "host": host, }) @@ -91,7 +91,7 @@ func TestInit(t *testing.T) { func TestExtract(t *testing.T) { t.Run("should extract and output tables metadata along with its columns", func(t *testing.T) { ctx := context.TODO() - extr := mssql.New(test.Logger) + extr := mssql.New(utils.Logger) err := extr.Init(ctx, map[string]interface{}{ "user_id": user, diff --git a/plugins/extractors/mysql/mysql_test.go b/plugins/extractors/mysql/mysql_test.go index 849e92087..b4637f208 100644 --- a/plugins/extractors/mysql/mysql_test.go +++ b/plugins/extractors/mysql/mysql_test.go @@ -5,6 +5,7 @@ package mysql_test import ( "context" "fmt" + "github.com/odpf/meteor/test/utils" "log" "os" "testing" @@ -18,7 +19,6 @@ import ( "github.com/odpf/meteor/models/odpf/assets/facets" "github.com/odpf/meteor/plugins" "github.com/odpf/meteor/plugins/extractors/mysql" - "github.com/odpf/meteor/test" "github.com/odpf/meteor/test/mocks" "github.com/ory/dockertest/v3" "github.com/ory/dockertest/v3/docker" @@ -58,7 +58,7 @@ func TestMain(m *testing.M) { } return db.Ping() } - purgeFn, err := test.CreateContainer(opts, retryFn) + purgeFn, err := utils.CreateContainer(opts, retryFn) if err != nil { log.Fatal(err) } @@ -79,7 +79,7 @@ func TestMain(m *testing.M) { func TestInit(t *testing.T) { t.Run("should return error for invalid configs", func(t *testing.T) { - err := mysql.New(test.Logger).Init(context.TODO(), map[string]interface{}{ + err := mysql.New(utils.Logger).Init(context.TODO(), map[string]interface{}{ "password": "pass", "host": host, }) @@ -91,7 +91,7 @@ func TestInit(t *testing.T) { func TestExtract(t *testing.T) { t.Run("should extract and output tables metadata along with its columns", func(t *testing.T) { ctx := context.TODO() - extr := mysql.New(test.Logger) + extr := mysql.New(utils.Logger) err := extr.Init(ctx, map[string]interface{}{ "user_id": user, diff --git a/plugins/extractors/postgres/postgres_test.go b/plugins/extractors/postgres/postgres_test.go index 758c4e745..f59f70196 100644 --- a/plugins/extractors/postgres/postgres_test.go +++ b/plugins/extractors/postgres/postgres_test.go @@ -5,6 +5,7 @@ package postgres_test import ( "context" "fmt" + "github.com/odpf/meteor/test/utils" "log" "os" "testing" @@ -15,7 +16,6 @@ import ( "github.com/odpf/meteor/models/odpf/assets" "github.com/odpf/meteor/plugins" "github.com/odpf/meteor/plugins/extractors/postgres" - "github.com/odpf/meteor/test" "github.com/odpf/meteor/test/mocks" "github.com/ory/dockertest/v3" "github.com/ory/dockertest/v3/docker" @@ -52,7 +52,7 @@ func TestMain(m *testing.M) { } return db.Ping() } - purgeFn, err := test.CreateContainer(opts, retryFn) + purgeFn, err := utils.CreateContainer(opts, retryFn) if err != nil { log.Fatal(err) } @@ -73,7 +73,7 @@ func TestMain(m *testing.M) { func TestInit(t *testing.T) { t.Run("should return error for invalid config", func(t *testing.T) { - err := postgres.New(test.Logger).Init(context.TODO(), map[string]interface{}{ + err := postgres.New(utils.Logger).Init(context.TODO(), map[string]interface{}{ "password": "pass", "host": host, }) @@ -85,7 +85,7 @@ func TestInit(t *testing.T) { func TestExtract(t *testing.T) { t.Run("should return mockdata we generated with postgres", func(t *testing.T) { ctx := context.TODO() - extr := postgres.New(test.Logger) + extr := postgres.New(utils.Logger) err := extr.Init(ctx, map[string]interface{}{ "user_id": user, diff --git a/plugins/extractors/superset/superset_test.go b/plugins/extractors/superset/superset_test.go index 724b4e108..3bb6945e8 100644 --- a/plugins/extractors/superset/superset_test.go +++ b/plugins/extractors/superset/superset_test.go @@ -7,6 +7,7 @@ import ( "context" "encoding/json" "fmt" + "github.com/odpf/meteor/test/utils" "io" "io/ioutil" "log" @@ -18,7 +19,6 @@ import ( "github.com/odpf/meteor/models/odpf/assets" "github.com/odpf/meteor/plugins" "github.com/odpf/meteor/plugins/extractors/superset" - "github.com/odpf/meteor/test" "github.com/odpf/meteor/test/mocks" "github.com/ory/dockertest/v3" "github.com/ory/dockertest/v3/docker" @@ -83,7 +83,7 @@ func TestMain(m *testing.M) { } // exponential backoff-retry, because the application in the container might not be ready to accept connections yet - purgeFn, err := test.CreateContainer(opts, retryFn) + purgeFn, err := utils.CreateContainer(opts, retryFn) if err != nil { log.Fatal(err) } @@ -104,7 +104,7 @@ func TestMain(m *testing.M) { // TestInit tests the configs func TestInit(t *testing.T) { t.Run("should return error for invalid config", func(t *testing.T) { - err := superset.New(test.Logger).Init(context.TODO(), map[string]interface{}{ + err := superset.New(utils.Logger).Init(context.TODO(), map[string]interface{}{ "user_id": "user", "host": host, }) @@ -116,7 +116,7 @@ func TestInit(t *testing.T) { func TestExtract(t *testing.T) { t.Run("should return dashboard model", func(t *testing.T) { ctx := context.TODO() - extr := superset.New(test.Logger) + extr := superset.New(utils.Logger) err := extr.Init(ctx, map[string]interface{}{ "username": user, "password": pass, diff --git a/plugins/plugin.go b/plugins/plugin.go index 481e27e57..4f60bca04 100644 --- a/plugins/plugin.go +++ b/plugins/plugin.go @@ -55,6 +55,9 @@ type Processor interface { type Syncer interface { Plugin Sink(ctx context.Context, batch []models.Record) (err error) + + // Close will be called once after everything is done + Close() error } // ParseInfo parses the plugin's meta.yaml file and returns an plugin Info struct. diff --git a/plugins/sinks/columbus/sink.go b/plugins/sinks/columbus/sink.go index 08a170799..46bb1d5c9 100644 --- a/plugins/sinks/columbus/sink.go +++ b/plugins/sinks/columbus/sink.go @@ -92,6 +92,8 @@ func (s *Sink) Sink(ctx context.Context, batch []models.Record) (err error) { return } +func (s *Sink) Close() (err error) { return } + func (s *Sink) buildColumbusPayload(metadata models.Metadata) (interface{}, error) { // skip if mapping is not defined if s.config.Mapping == nil { diff --git a/plugins/sinks/columbus/sink_test.go b/plugins/sinks/columbus/sink_test.go index 13fde8616..8ee96b091 100644 --- a/plugins/sinks/columbus/sink_test.go +++ b/plugins/sinks/columbus/sink_test.go @@ -5,6 +5,7 @@ import ( "context" "errors" "fmt" + "github.com/odpf/meteor/test/utils" "io/ioutil" "net/http" "testing" @@ -15,7 +16,6 @@ import ( "github.com/odpf/meteor/models/odpf/assets/facets" "github.com/odpf/meteor/plugins" "github.com/odpf/meteor/plugins/sinks/columbus" - "github.com/odpf/meteor/test" "github.com/stretchr/testify/assert" ) @@ -55,7 +55,7 @@ func TestInit(t *testing.T) { } for i, config := range invalidConfigs { t.Run(fmt.Sprintf("test invalid config #%d", i+1), func(t *testing.T) { - columbusSink := columbus.New(newmockHTTPClient(http.MethodGet, url, requestPayload), test.Logger) + columbusSink := columbus.New(newmockHTTPClient(http.MethodGet, url, requestPayload), utils.Logger) err := columbusSink.Init(context.TODO(), config) assert.Equal(t, plugins.InvalidConfigError{Type: plugins.PluginTypeSink}, err) @@ -70,7 +70,7 @@ func TestSink(t *testing.T) { client.SetupResponse(200, "") ctx := context.TODO() - columbusSink := columbus.New(client, test.Logger) + columbusSink := columbus.New(client, utils.Logger) err := columbusSink.Init(ctx, map[string]interface{}{ "host": host, "type": columbusType, @@ -95,7 +95,7 @@ func TestSink(t *testing.T) { client.SetupResponse(404, columbusError) ctx := context.TODO() - columbusSink := columbus.New(client, test.Logger) + columbusSink := columbus.New(client, utils.Logger) err := columbusSink.Init(ctx, map[string]interface{}{ "host": host, "type": "my-type", @@ -118,7 +118,7 @@ func TestSink(t *testing.T) { client.SetupResponse(code, `{"reason":"internal server error"}`) ctx := context.TODO() - columbusSink := columbus.New(client, test.Logger) + columbusSink := columbus.New(client, utils.Logger) err := columbusSink.Init(ctx, map[string]interface{}{ "host": host, "type": "my-type", @@ -141,7 +141,7 @@ func TestSink(t *testing.T) { client.SetupResponse(200, `{"success": true}`) ctx := context.TODO() - columbusSink := columbus.New(client, test.Logger) + columbusSink := columbus.New(client, utils.Logger) err := columbusSink.Init(ctx, map[string]interface{}{ "host": host, "type": "my-type", @@ -174,7 +174,7 @@ func TestSink(t *testing.T) { client.SetupResponse(200, "") ctx := context.TODO() - columbusSink := columbus.New(client, test.Logger) + columbusSink := columbus.New(client, utils.Logger) err := columbusSink.Init(ctx, map[string]interface{}{ "host": host, "type": columbusType, diff --git a/plugins/sinks/console/sink.go b/plugins/sinks/console/sink.go index 5f5ed64fc..a757d6527 100644 --- a/plugins/sinks/console/sink.go +++ b/plugins/sinks/console/sink.go @@ -49,6 +49,8 @@ func (s *Sink) Sink(ctx context.Context, batch []models.Record) (err error) { return nil } +func (s *Sink) Close() (err error) { return } + func (s *Sink) process(value interface{}) error { jsonBytes, err := json.Marshal(value) if err != nil { diff --git a/plugins/sinks/kafka/sink.go b/plugins/sinks/kafka/sink.go index 9a227e118..b2bba9edd 100644 --- a/plugins/sinks/kafka/sink.go +++ b/plugins/sinks/kafka/sink.go @@ -72,8 +72,6 @@ func (s *Sink) Init(ctx context.Context, configMap map[string]interface{}) (err } func (s *Sink) Sink(ctx context.Context, batch []models.Record) (err error) { - defer s.writer.Close() - for _, record := range batch { if err := s.push(ctx, record.Data()); err != nil { return err @@ -83,10 +81,11 @@ func (s *Sink) Sink(ctx context.Context, batch []models.Record) (err error) { return } -func (s *Sink) push(ctx context.Context, payload interface{}) error { - // struct needs to be cast to pointer to implement proto methods - payload = castModelToPointer(payload) +func (s *Sink) Close() (err error) { + return s.writer.Close() +} +func (s *Sink) push(ctx context.Context, payload interface{}) error { kafkaValue, err := s.buildValue(payload) if err != nil { return err @@ -188,13 +187,6 @@ func (s *Sink) getTopLevelKeyFromPath(keyPath string) (string, error) { return keyPaths[1], nil } -func castModelToPointer(value interface{}) interface{} { - vp := reflect.New(reflect.TypeOf(value)) - vp.Elem().Set(reflect.ValueOf(value)) - - return vp.Interface() -} - func createWriter(config Config) *kafka.Writer { brokers := strings.Split(config.Brokers, ",") return &kafka.Writer{ diff --git a/test/e2e/e2e_test.go b/test/e2e/e2e_test.go new file mode 100644 index 000000000..166ece476 --- /dev/null +++ b/test/e2e/e2e_test.go @@ -0,0 +1,357 @@ +//+build integration + +package e2e_test + +import ( + "context" + "database/sql" + "fmt" + "github.com/odpf/meteor/test/utils" + "github.com/ory/dockertest/v3" + "github.com/ory/dockertest/v3/docker" + "log" + "net" + "os" + "strconv" + "strings" + "testing" + "time" + + "github.com/odpf/meteor/cmd" + "github.com/odpf/meteor/config" + "github.com/odpf/meteor/models/odpf/assets" + "github.com/odpf/meteor/models/odpf/assets/common" + "github.com/odpf/meteor/models/odpf/assets/facets" + _ "github.com/odpf/meteor/plugins/extractors" + _ "github.com/odpf/meteor/plugins/processors" + _ "github.com/odpf/meteor/plugins/sinks" + "github.com/pkg/errors" + "github.com/segmentio/kafka-go" + "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/proto" +) + +var ( + db *sql.DB + conn *kafka.Conn + broker kafka.Broker +) + +const ( + testDB = "test_db" + user = "test_user" + pass = "admin" + mysqlHost = "localhost:3306" + brokerHost = "localhost:9093" + testTopic = "topic-a" + partition = 0 +) + +func TestMain(m *testing.M) { + // generate purge function + mysqlPurgeContainer, err := mysqlDockerSetup() + if err != nil { + return + } + kafkaPurgeContainer, err := kafkaDockerSetup() + if err != nil { + return + } + + // setup and populate data for testing + if err := setupMySQL(); err != nil { + log.Fatal(err) + } + if err := setupKafka(); err != nil { + log.Fatal(err) + } + + // run tests + code := m.Run() + + // clean tests + if err = conn.Close(); err != nil { + return + } + if err = db.Close(); err != nil { + return + } + + // purge containers + if err := mysqlPurgeContainer(); err != nil { + log.Fatal(err) + } + if err := kafkaPurgeContainer(); err != nil { + log.Fatal(err) + } + + os.Exit(code) +} + +// TestMySqlToKafka tests the recipe from source to sink completely +func TestMySqlToKafka(t *testing.T) { + err := setupKafka() + if err != nil { + t.Fatal(err) + } + + var sinkData []*assets.Table + ctx, cancel := context.WithCancel(context.TODO()) + go func() { + err = listenToTopic(ctx, testTopic, &sinkData) + if err != nil { + t.Error(err) + } + }() + + // run mysql_kafka.yml file + cfg, err := config.Load() + if err != nil { + t.Error(err) + } + command := cmd.New(utils.Logger, nil, cfg) + command.SetArgs([]string{"run", "mysql_kafka.yml"}) + if err := command.Execute(); err != nil { + if strings.HasPrefix(err.Error(), "unknown command ") { + if !strings.HasSuffix(err.Error(), "\n") { + t.Fatal(err) + } + t.Fatal(err) + } else { + t.Fatal(err) + } + } + + time.Sleep(2 * time.Second) // this is to wait consumer to finish adding data to sinkData + cancel() // cancel will cancel context, hinting the consumer to end + time.Sleep(100 * time.Millisecond) // this is to give time for the consumer to closing all its connections + + expected := getExpectedTables() + assert.Equal(t, len(getExpectedTables()), len(sinkData)) + for tableNum := 0; tableNum < len(getExpectedTables()); tableNum++ { + assert.Equal(t, expected[tableNum].Resource.Urn, sinkData[tableNum].Resource.Urn) + assert.Equal(t, expected[tableNum].Resource.Name, sinkData[tableNum].Resource.Name) + assert.Equal(t, len(expected[tableNum].Schema.Columns), len(sinkData[tableNum].Schema.Columns)) + } +} + +// listenToTopic listens to a topic and stores the data in sinkData +func listenToTopic(ctx context.Context, topic string, data *[]*assets.Table) error { + reader := kafka.NewReader(kafka.ReaderConfig{ + Brokers: []string{brokerHost}, + Topic: topic, + }) + defer func(reader *kafka.Reader) { + if err := reader.Close(); err != nil { + return + } + }(reader) + + for { + msg, err := reader.ReadMessage(ctx) + if err != nil { + break + + } + var convertMsg assets.Table + if err := proto.Unmarshal(msg.Value, &convertMsg); err != nil { + return errors.Wrap(err, "failed to parse kafka message") + } + *data = append(*data, &convertMsg) + } + + return nil +} + +// setupKafka initializes kafka broker with topic and partition +func setupKafka() error { + conn, err := kafka.DialLeader(context.TODO(), "tcp", net.JoinHostPort(broker.Host, strconv.Itoa(broker.Port)), testTopic, partition) + if err != nil { + return errors.Wrap(err, "failed to setup kafka connection") + } + defer func(conn *kafka.Conn) { + if err := conn.Close(); err != nil { + return + } + }(conn) + + if err := conn.DeleteTopics(testTopic); err != nil { + return errors.Wrap(err, "failed to delete topic") + } + if err := conn.CreateTopics(kafka.TopicConfig{ + Topic: testTopic, + NumPartitions: 1, + ReplicationFactor: 1, + }); err != nil { + return errors.Wrap(err, "failed to create topic") + } + + return nil +} + +// setupMySQL initializes mysql database +func setupMySQL() (err error) { + // create database, user and grant access + if err = execute(db, []string{ + fmt.Sprintf("DROP DATABASE IF EXISTS %s", testDB), + fmt.Sprintf("CREATE DATABASE %s", testDB), + fmt.Sprintf("USE %s;", testDB), + fmt.Sprintf(`CREATE USER IF NOT EXISTS '%s'@'%%' IDENTIFIED BY '%s';`, user, pass), + fmt.Sprintf(`GRANT ALL PRIVILEGES ON *.* TO '%s'@'%%';`, user), + }); err != nil { + return errors.Wrap(err, "failed to create database") + } + + // create and populate tables + if err = execute(db, []string{ + "CREATE TABLE applicant (applicant_id int, last_name varchar(255), first_name varchar(255));", + "INSERT INTO applicant VALUES (1, 'test1', 'test11');", + "CREATE TABLE jobs (job_id int, job varchar(255), department varchar(255));", + "INSERT INTO jobs VALUES (2, 'test2', 'test22');", + }); err != nil { + return errors.Wrap(err, "failed to populate database") + } + + return +} + +// execute executes a list of sql statements +func execute(db *sql.DB, queries []string) (err error) { + for _, query := range queries { + _, err = db.Exec(query) + if err != nil { + return + } + } + + return +} + +// kafkaDockerSetup sets up a kafka docker container +func kafkaDockerSetup() (purge func() error, err error) { + // kafka setup test + kafkaOpts := dockertest.RunOptions{ + Repository: "moeenz/docker-kafka-kraft", + Tag: "latest", + Env: []string{ + "KRAFT_CONTAINER_HOST_NAME=kafka", + }, + ExposedPorts: []string{"9093"}, + PortBindings: map[docker.Port][]docker.PortBinding{ + "9093": { + {HostIP: "localhost", HostPort: "9093"}, + }, + }, + } + // exponential backoff-retry, because the application in the container might not be ready to accept connections yet + kafkaRetryFn := func(resource *dockertest.Resource) (err error) { + // create client + if conn, err = kafka.Dial("tcp", brokerHost); err != nil { + return errors.Wrap(err, "failed to kafka create client") + } + if broker, err = conn.Controller(); err != nil { + return errors.Wrap(err, "failed to generate broker request") + } + return + } + purgeContainer, err := utils.CreateContainer(kafkaOpts, kafkaRetryFn) + if err != nil { + log.Fatal(err) + } + + return purgeContainer, nil +} + +// mysqlDockerSetup sets up a mysql docker container +func mysqlDockerSetup() (purge func() error, err error) { + // mysql setup test + mysqlOpts := dockertest.RunOptions{ + Repository: "mysql", + Tag: "latest", + Env: []string{ + "MYSQL_ROOT_PASSWORD=" + pass, + }, + ExposedPorts: []string{"3306"}, + PortBindings: map[docker.Port][]docker.PortBinding{ + "3306": { + {HostIP: "0.0.0.0", HostPort: "3306"}, + }, + }, + } + // exponential backoff-retry, because the application in the container might not be ready to accept connections yet + mysqlRetryFn := func(resource *dockertest.Resource) (err error) { + db, err = sql.Open("mysql", fmt.Sprintf("root:%s@tcp(%s)/", pass, mysqlHost)) + if err != nil { + return errors.Wrap(err, "failed to create mysql client") + } + return db.Ping() + } + purgeContainer, err := utils.CreateContainer(mysqlOpts, mysqlRetryFn) + if err != nil { + log.Fatal(err) + } + + return purgeContainer, nil +} + +// getExpectedTables returns the expected tables +func getExpectedTables() []*assets.Table { + return []*assets.Table{ + { + Resource: &common.Resource{ + Urn: testDB + ".applicant", + Name: "applicant", + }, + Schema: &facets.Columns{ + Columns: []*facets.Column{ + { + Name: "applicant_id", + DataType: "int", + IsNullable: true, + Length: 0, + }, + { + Name: "first_name", + DataType: "varchar", + IsNullable: true, + Length: 255, + }, + { + Name: "last_name", + DataType: "varchar", + IsNullable: true, + Length: 255, + }, + }, + }, + }, + { + Resource: &common.Resource{ + Urn: testDB + ".jobs", + Name: "jobs", + }, + Schema: &facets.Columns{ + Columns: []*facets.Column{ + { + Name: "department", + DataType: "varchar", + IsNullable: true, + Length: 255, + }, + { + Name: "job", + DataType: "varchar", + IsNullable: true, + Length: 255, + }, + { + Name: "job_id", + DataType: "int", + IsNullable: true, + Length: 0, + }, + }, + }, + }, + } +} diff --git a/test/e2e/mysql_kafka.yml b/test/e2e/mysql_kafka.yml new file mode 100644 index 000000000..26d7dff6a --- /dev/null +++ b/test/e2e/mysql_kafka.yml @@ -0,0 +1,15 @@ +name: sample +source: + type: mysql + config: + user_id: root + password: admin + host: 127.0.0.1:3306 +sinks: + - name: kafka + config: + brokers: "localhost:9093" + topic: "topic-a" +processors: + - name: enrich + config: diff --git a/test/mocks/plugin.go b/test/mocks/plugin.go index b93d3cba4..6e12f7f9e 100644 --- a/test/mocks/plugin.go +++ b/test/mocks/plugin.go @@ -76,6 +76,11 @@ func (m *Sink) Sink(ctx context.Context, batch []models.Record) error { return args.Error(0) } +func (m *Sink) Close() error { + args := m.Called() + return args.Error(0) +} + type Emitter struct { data []models.Record } diff --git a/test/dockertest.go b/test/utils/dockertest.go similarity index 98% rename from test/dockertest.go rename to test/utils/dockertest.go index e7311a566..4f2ad1051 100644 --- a/test/dockertest.go +++ b/test/utils/dockertest.go @@ -1,4 +1,4 @@ -package test +package utils import ( "fmt" diff --git a/test/logger.go b/test/utils/logger.go similarity index 91% rename from test/logger.go rename to test/utils/logger.go index f3d7c7189..602b3896e 100644 --- a/test/logger.go +++ b/test/utils/logger.go @@ -1,4 +1,4 @@ -package test +package utils import ( "io/ioutil"