diff --git a/cmd/info.go b/cmd/info.go index c249a4145..5b8824309 100644 --- a/cmd/info.go +++ b/cmd/info.go @@ -38,7 +38,7 @@ func InfoSinkCmd() *cobra.Command { The list of supported sinks is available via the 'meteor list sinks' command.`), Example: heredoc.Doc(` $ meteor info sink console - $ meteor info sink columbus + $ meteor info sink compass `), Args: cobra.MaximumNArgs(1), Annotations: map[string]string{ diff --git a/cmd/new.go b/cmd/new.go index 42892c772..3d428cd4b 100644 --- a/cmd/new.go +++ b/cmd/new.go @@ -51,10 +51,10 @@ func NewRecipeCmd() *cobra.Command { $ meteor new recipe sample -e bigquery -s console # generate recipe with multiple sinks - $ meteor new recipe sample -e bigquery -s columbus,kafka -p enrich + $ meteor new recipe sample -e bigquery -s compass,kafka -p enrich # store recipe to a file - $ meteor new recipe sample -e bigquery -s columbus > recipe.yaml + $ meteor new recipe sample -e bigquery -s compass > recipe.yaml `), Annotations: map[string]string{ "group:core": "true", diff --git a/docs/docs/guides/2_manage_recipes.md b/docs/docs/guides/2_manage_recipes.md index 4169dcc5c..3ba580fc4 100644 --- a/docs/docs/guides/2_manage_recipes.md +++ b/docs/docs/guides/2_manage_recipes.md @@ -19,7 +19,7 @@ After making the necessary changes to the source, and sinks as per your local se $ meteor new recipe sample -e -s -p # command to generate recipe with multiple sinks -$ meteor new recipe sample -e bigquery -s columbus,kafka +$ meteor new recipe sample -e bigquery -s compass,kafka # for the tour you can use a single console sink # extracor(-e) as postgres, sink(-s) and enrich processor(-p) diff --git a/docs/docs/guides/introduction.md b/docs/docs/guides/introduction.md index 98ba32465..f34e8a774 100644 --- a/docs/docs/guides/introduction.md +++ b/docs/docs/guides/introduction.md @@ -12,6 +12,6 @@ The tour takes you through how you can use meteor as an end user and is a great - You are required to have atleast one Database, Dashboard or any other Data source and their proper credentials ready. For e.g. Postgres, MySQL, MSSQL, MongoDB, GitHub, etc. -- For sink we will be using console, if you are planning to use columbus or Kafka please make necessary changes accordingly. +- For sink we will be using console, if you are planning to use compass or Kafka please make necessary changes accordingly. Once done with the pre-requisites Get started with [Installation](./0_installation.md). diff --git a/docs/docs/reference/commands.md b/docs/docs/reference/commands.md index 0d00bad84..cbbf0dbe8 100644 --- a/docs/docs/reference/commands.md +++ b/docs/docs/reference/commands.md @@ -67,11 +67,11 @@ You can create a sample recipe usin the gen command. $ meteor gen recipe sample -e bigquery -s console # generate recipe with multiple sinks -$ meteor gen recipe sample -e bigquery -s columbus,kafka +$ meteor gen recipe sample -e bigquery -s compass,kafka # extracor(-e) as postgres, multiple sinks(-s) and enrich processor(-p) # save the generated recipe to a recipe.yaml -meteor gen recipe sample -e postgres -s columbus,kafka -p enrich > recipe.yaml +meteor gen recipe sample -e postgres -s compass,kafka -p enrich > recipe.yaml ``` ## Linting recipes diff --git a/docs/docs/reference/sinks.md b/docs/docs/reference/sinks.md index b876cb21c..9b0a7e52a 100644 --- a/docs/docs/reference/sinks.md +++ b/docs/docs/reference/sinks.md @@ -13,20 +13,20 @@ sinks: - name: console ``` -## Columbus +## Compass -`columbus` +`compass` -Upload metadata to a given `type` in [Columbus](https://github.com/odpf/meteor/tree/cb12c3ecf8904cf3f4ce365ca8981ccd132f35d0/docs/reference/github.com/odpf/columbus/README.md). Request will be send via HTTP to given host. +Upload metadata to a given `type` in [Compass](https://github.com/odpf/meteor/tree/cb12c3ecf8904cf3f4ce365ca8981ccd132f35d0/docs/reference/github.com/odpf/compass/README.md). Request will be send via HTTP to given host. -### Sample usage of columbus sink +### Sample usage of compass sink ```yaml sinks: - - name: columbus + - name: compass config: - host: https://columbus.com - type: sample-columbus-type + host: https://compass.com + type: sample-compass-type mapping: new_fieldname: "json_field_name" id: "resource.urn" @@ -35,4 +35,4 @@ sinks: _**Notes**_ -Columbus' Type requires certain fields to be sent, hence why `mapping` config is needed to map value from any of our metadata models to any field name when sending to Columbus. Supports getting value from nested fields. +Compass' Type requires certain fields to be sent, hence why `mapping` config is needed to map value from any of our metadata models to any field name when sending to Compass. Supports getting value from nested fields. diff --git a/docs/src/pages/index.js b/docs/src/pages/index.js index af9835693..622946b5e 100644 --- a/docs/src/pages/index.js +++ b/docs/src/pages/index.js @@ -190,7 +190,7 @@ export default function Home() { content: (
Meteor supports sink plugins to send metadata to a variety of - third party APIs and catalog services, including Columbus, HTTP, BigQuery, + third party APIs and catalog services, including Compass, HTTP, BigQuery, Kafka, and many others.
), diff --git a/docs/static/assets/overview.svg b/docs/static/assets/overview.svg index b442096da..3d2319f50 100644 --- a/docs/static/assets/overview.svg +++ b/docs/static/assets/overview.svg @@ -16,7 +16,7 @@ - + @@ -90,7 +90,7 @@ - Columbus + Compass diff --git a/plugins/sinks/columbus/README.md b/plugins/sinks/columbus/README.md deleted file mode 100644 index d2026fbd2..000000000 --- a/plugins/sinks/columbus/README.md +++ /dev/null @@ -1,22 +0,0 @@ -# Columbus - -Columbus is a search and discovery engine built for querying application deployments, datasets and meta resources. It can also optionally track data flow relationships between these resources and allow the user to view a representation of the data flow graph. - -## Usage - -```yaml -sinks: - name: columbus - config: - host: https://columbus.com - headers: - Columbus-User-Email: meteor@odpf.io - Header-1: value11,value12 - labels: - myCustom: $properties.attributes.myCustomField - sampleLabel: $properties.labels.sampleLabelField -``` - -## Contributing - -Refer to the contribution guidelines for information on contributing to this module. \ No newline at end of file diff --git a/plugins/sinks/compass/README.md b/plugins/sinks/compass/README.md new file mode 100644 index 000000000..ba6a74795 --- /dev/null +++ b/plugins/sinks/compass/README.md @@ -0,0 +1,22 @@ +# Compass + +Compass is a search and discovery engine built for querying application deployments, datasets and meta resources. It can also optionally track data flow relationships between these resources and allow the user to view a representation of the data flow graph. + +## Usage + +```yaml +sinks: + name: compass + config: + host: https://compass.com + headers: + compass-User-Email: meteor@odpf.io + Header-1: value11,value12 + labels: + myCustom: $properties.attributes.myCustomField + sampleLabel: $properties.labels.sampleLabelField +``` + +## Contributing + +Refer to the contribution guidelines for information on contributing to this module. \ No newline at end of file diff --git a/plugins/sinks/columbus/columbus.go b/plugins/sinks/compass/compass.go similarity index 97% rename from plugins/sinks/columbus/columbus.go rename to plugins/sinks/compass/compass.go index eb0495c2d..93fe7461d 100644 --- a/plugins/sinks/columbus/columbus.go +++ b/plugins/sinks/compass/compass.go @@ -1,4 +1,4 @@ -package columbus +package compass type RequestPayload struct { Asset Asset `json:"asset"` diff --git a/plugins/sinks/columbus/sink.go b/plugins/sinks/compass/sink.go similarity index 88% rename from plugins/sinks/columbus/sink.go rename to plugins/sinks/compass/sink.go index 49454361b..78ca26df9 100644 --- a/plugins/sinks/columbus/sink.go +++ b/plugins/sinks/compass/sink.go @@ -1,4 +1,4 @@ -package columbus +package compass import ( "bytes" @@ -27,11 +27,11 @@ type Config struct { } var sampleConfig = ` -# The hostname of the columbus service -host: https://columbus.com -# Additional HTTP headers send to columbus, multiple headers value are separated by a comma +# The hostname of the compass service +host: https://compass.com +# Additional HTTP headers send to compass, multiple headers value are separated by a comma headers: - Columbus-User-Email: meteor@odpf.io + Compass-User-Email: meteor@odpf.io X-Other-Header: value1, value2 # The labels to pass as payload label of the patch api labels: @@ -56,7 +56,7 @@ func New(c httpClient, logger log.Logger) plugins.Syncer { func (s *Sink) Info() plugins.Info { return plugins.Info{ - Description: "Send metadata to columbus http service", + Description: "Send metadata to compass http service", SampleConfig: sampleConfig, Summary: summary, Tags: []string{"http", "sink"}, @@ -78,17 +78,17 @@ func (s *Sink) Init(ctx context.Context, configMap map[string]interface{}) (err func (s *Sink) Sink(ctx context.Context, batch []models.Record) (err error) { for _, record := range batch { metadata := record.Data() - s.logger.Info("sinking record to columbus", "record", metadata.GetResource().Urn) + s.logger.Info("sinking record to compass", "record", metadata.GetResource().Urn) - columbusPayload, err := s.buildColumbusPayload(metadata) + compassPayload, err := s.buildCompassPayload(metadata) if err != nil { - return errors.Wrap(err, "failed to build columbus payload") + return errors.Wrap(err, "failed to build compass payload") } - if err = s.send(columbusPayload); err != nil { + if err = s.send(compassPayload); err != nil { return errors.Wrap(err, "error sending data") } - s.logger.Info("successfully sinked record to columbus", "record", metadata.GetResource().Urn) + s.logger.Info("successfully sinked record to compass", "record", metadata.GetResource().Urn) } return @@ -129,7 +129,7 @@ func (s *Sink) send(record RequestPayload) (err error) { if err != nil { return } - err = fmt.Errorf("columbus returns %d: %v", res.StatusCode, string(bodyBytes)) + err = fmt.Errorf("compass returns %d: %v", res.StatusCode, string(bodyBytes)) switch code := res.StatusCode; { case code >= 500: @@ -139,7 +139,7 @@ func (s *Sink) send(record RequestPayload) (err error) { } } -func (s *Sink) buildColumbusPayload(metadata models.Metadata) (RequestPayload, error) { +func (s *Sink) buildCompassPayload(metadata models.Metadata) (RequestPayload, error) { labels, err := s.buildLabels(metadata) if err != nil { return RequestPayload{}, errors.Wrap(err, "failed to build labels") @@ -299,7 +299,7 @@ func (s *Sink) getLabelValueFromProperties(field1 string, field2 string, metadat } func init() { - if err := registry.Sinks.Register("columbus", func() plugins.Syncer { + if err := registry.Sinks.Register("compass", func() plugins.Syncer { return New(&http.Client{}, plugins.GetLog()) }); err != nil { panic(err) diff --git a/plugins/sinks/columbus/sink_test.go b/plugins/sinks/compass/sink_test.go similarity index 82% rename from plugins/sinks/columbus/sink_test.go rename to plugins/sinks/compass/sink_test.go index be49f61cf..bf584495c 100644 --- a/plugins/sinks/columbus/sink_test.go +++ b/plugins/sinks/compass/sink_test.go @@ -1,4 +1,4 @@ -package columbus_test +package compass_test import ( "bytes" @@ -19,12 +19,12 @@ import ( facetsv1beta1 "github.com/odpf/meteor/models/odpf/assets/facets/v1beta1" assetsv1beta1 "github.com/odpf/meteor/models/odpf/assets/v1beta1" "github.com/odpf/meteor/plugins" - "github.com/odpf/meteor/plugins/sinks/columbus" + "github.com/odpf/meteor/plugins/sinks/compass" "github.com/stretchr/testify/assert" ) var ( - host = "http://columbus.com" + host = "http://compass.com" ) // sample metadata @@ -41,8 +41,8 @@ func TestInit(t *testing.T) { } for i, config := range invalidConfigs { t.Run(fmt.Sprintf("test invalid config #%d", i+1), func(t *testing.T) { - columbusSink := columbus.New(newMockHTTPClient(config, http.MethodPatch, url, columbus.RequestPayload{}), testUtils.Logger) - err := columbusSink.Init(context.TODO(), config) + compassSink := compass.New(newMockHTTPClient(config, http.MethodPatch, url, compass.RequestPayload{}), testUtils.Logger) + err := compassSink.Init(context.TODO(), config) assert.Equal(t, plugins.InvalidConfigError{Type: plugins.PluginTypeSink}, err) }) @@ -51,18 +51,18 @@ func TestInit(t *testing.T) { } func TestSink(t *testing.T) { - t.Run("should return error if columbus host returns error", func(t *testing.T) { - columbusError := `{"reason":"no asset found"}` - errMessage := "error sending data: columbus returns 404: {\"reason\":\"no asset found\"}" + t.Run("should return error if compass host returns error", func(t *testing.T) { + compassError := `{"reason":"no asset found"}` + errMessage := "error sending data: compass returns 404: {\"reason\":\"no asset found\"}" // setup mock client url := fmt.Sprintf("%s/v1beta1/assets", host) - client := newMockHTTPClient(map[string]interface{}{}, http.MethodPatch, url, columbus.RequestPayload{}) - client.SetupResponse(404, columbusError) + client := newMockHTTPClient(map[string]interface{}{}, http.MethodPatch, url, compass.RequestPayload{}) + client.SetupResponse(404, compassError) ctx := context.TODO() - columbusSink := columbus.New(client, testUtils.Logger) - err := columbusSink.Init(ctx, map[string]interface{}{ + compassSink := compass.New(client, testUtils.Logger) + err := compassSink.Init(ctx, map[string]interface{}{ "host": host, }) if err != nil { @@ -70,20 +70,20 @@ func TestSink(t *testing.T) { } data := &assetsv1beta1.Topic{Resource: &commonv1beta1.Resource{}} - err = columbusSink.Sink(ctx, []models.Record{models.NewRecord(data)}) + err = compassSink.Sink(ctx, []models.Record{models.NewRecord(data)}) assert.Equal(t, errMessage, err.Error()) }) - t.Run("should return RetryError if columbus returns certain status code", func(t *testing.T) { + t.Run("should return RetryError if compass returns certain status code", func(t *testing.T) { for _, code := range []int{500, 501, 502, 503, 504, 505} { t.Run(fmt.Sprintf("%d status code", code), func(t *testing.T) { url := fmt.Sprintf("%s/v1beta1/assets", host) - client := newMockHTTPClient(map[string]interface{}{}, http.MethodPatch, url, columbus.RequestPayload{}) + client := newMockHTTPClient(map[string]interface{}{}, http.MethodPatch, url, compass.RequestPayload{}) client.SetupResponse(code, `{"reason":"internal server error"}`) ctx := context.TODO() - columbusSink := columbus.New(client, testUtils.Logger) - err := columbusSink.Init(ctx, map[string]interface{}{ + compassSink := compass.New(client, testUtils.Logger) + err := compassSink.Init(ctx, map[string]interface{}{ "host": host, }) if err != nil { @@ -91,7 +91,7 @@ func TestSink(t *testing.T) { } data := &assetsv1beta1.Topic{Resource: &commonv1beta1.Resource{}} - err = columbusSink.Sink(ctx, []models.Record{models.NewRecord(data)}) + err = compassSink.Sink(ctx, []models.Record{models.NewRecord(data)}) assert.True(t, errors.Is(err, plugins.RetryError{})) }) } @@ -101,10 +101,10 @@ func TestSink(t *testing.T) { description string data models.Metadata config map[string]interface{} - expected columbus.RequestPayload + expected compass.RequestPayload }{ { - description: "should create the right request to columbus", + description: "should create the right request to compass", data: &assetsv1beta1.User{ Resource: &commonv1beta1.Resource{ Urn: "my-topic-urn", @@ -117,8 +117,8 @@ func TestSink(t *testing.T) { config: map[string]interface{}{ "host": host, }, - expected: columbus.RequestPayload{ - Asset: columbus.Asset{ + expected: compass.RequestPayload{ + Asset: compass.Asset{ URN: "my-topic-urn", Name: "my-topic", Service: "kafka", @@ -128,7 +128,7 @@ func TestSink(t *testing.T) { }, }, { - description: "should build columbus labels if labels is defined in config", + description: "should build compass labels if labels is defined in config", data: &assetsv1beta1.Topic{ Resource: &commonv1beta1.Resource{ Urn: "my-topic-urn", @@ -155,8 +155,8 @@ func TestSink(t *testing.T) { "bar": "$properties.labels.labelA", }, }, - expected: columbus.RequestPayload{ - Asset: columbus.Asset{ + expected: compass.RequestPayload{ + Asset: compass.Asset{ URN: "my-topic-urn", Name: "my-topic", Service: "kafka", @@ -197,15 +197,15 @@ func TestSink(t *testing.T) { config: map[string]interface{}{ "host": host, }, - expected: columbus.RequestPayload{ - Asset: columbus.Asset{ + expected: compass.RequestPayload{ + Asset: compass.Asset{ URN: "my-topic-urn", Name: "my-topic", Service: "kafka", Type: "topic", Description: "topic information", }, - Upstreams: []columbus.LineageRecord{ + Upstreams: []compass.LineageRecord{ { URN: "urn-1", Type: "type-a", @@ -247,15 +247,15 @@ func TestSink(t *testing.T) { config: map[string]interface{}{ "host": host, }, - expected: columbus.RequestPayload{ - Asset: columbus.Asset{ + expected: compass.RequestPayload{ + Asset: compass.Asset{ URN: "my-topic-urn", Name: "my-topic", Service: "kafka", Type: "topic", Description: "topic information", }, - Downstreams: []columbus.LineageRecord{ + Downstreams: []compass.LineageRecord{ { URN: "urn-1", Type: "type-a", @@ -305,14 +305,14 @@ func TestSink(t *testing.T) { config: map[string]interface{}{ "host": host, }, - expected: columbus.RequestPayload{ - Asset: columbus.Asset{ + expected: compass.RequestPayload{ + Asset: compass.Asset{ URN: "my-topic-urn", Name: "my-topic", Service: "kafka", Type: "topic", Description: "topic information", - Owners: []columbus.Owner{ + Owners: []compass.Owner{ { URN: "urn-1", Name: "owner-a", @@ -353,8 +353,8 @@ func TestSink(t *testing.T) { "Key2": "value2", }, }, - expected: columbus.RequestPayload{ - Asset: columbus.Asset{ + expected: compass.RequestPayload{ + Asset: compass.Asset{ URN: "my-topic-urn", Name: "my-topic", Service: "kafka", @@ -368,7 +368,7 @@ func TestSink(t *testing.T) { for _, tc := range successTestCases { t.Run(tc.description, func(t *testing.T) { tc.expected.Asset.Data = tc.data - payload := columbus.RequestPayload{ + payload := compass.RequestPayload{ Asset: tc.expected.Asset, Upstreams: tc.expected.Upstreams, Downstreams: tc.expected.Downstreams, @@ -378,13 +378,13 @@ func TestSink(t *testing.T) { client.SetupResponse(200, "") ctx := context.TODO() - columbusSink := columbus.New(client, testUtils.Logger) - err := columbusSink.Init(ctx, tc.config) + compassSink := compass.New(client, testUtils.Logger) + err := compassSink.Init(ctx, tc.config) if err != nil { t.Fatal(err) } - err = columbusSink.Sink(ctx, []models.Record{models.NewRecord(tc.data)}) + err = compassSink.Sink(ctx, []models.Record{models.NewRecord(tc.data)}) assert.NoError(t, err) client.Assert(t) @@ -396,13 +396,13 @@ type mockHTTPClient struct { URL string Method string Headers map[string]string - RequestPayload columbus.RequestPayload + RequestPayload compass.RequestPayload ResponseJSON string ResponseStatus int req *http.Request } -func newMockHTTPClient(config map[string]interface{}, method, url string, payload columbus.RequestPayload) *mockHTTPClient { +func newMockHTTPClient(config map[string]interface{}, method, url string, payload compass.RequestPayload) *mockHTTPClient { headersMap := map[string]string{} if headersItf, ok := config["headers"]; ok { headersMap = headersItf.(map[string]string) diff --git a/plugins/sinks/populate.go b/plugins/sinks/populate.go index 8e5fc73b1..dfcab7c22 100644 --- a/plugins/sinks/populate.go +++ b/plugins/sinks/populate.go @@ -1,7 +1,7 @@ package sinks import ( - _ "github.com/odpf/meteor/plugins/sinks/columbus" + _ "github.com/odpf/meteor/plugins/sinks/compass" _ "github.com/odpf/meteor/plugins/sinks/console" _ "github.com/odpf/meteor/plugins/sinks/kafka" )