diff --git a/plugins/extractors/bigquery/bigquery.go b/plugins/extractors/bigquery/bigquery.go index eb42397b7..87bdc87d0 100644 --- a/plugins/extractors/bigquery/bigquery.go +++ b/plugins/extractors/bigquery/bigquery.go @@ -204,6 +204,7 @@ func (e *Extractor) buildTable(ctx context.Context, t *bigquery.Table, md *bigqu Resource: &commonv1beta1.Resource{ Urn: tableURN, Name: t.TableID, + Type: "table", Description: md.Description, Service: "bigquery", }, diff --git a/plugins/extractors/bigtable/bigtable.go b/plugins/extractors/bigtable/bigtable.go index dea012a44..b5dd88559 100644 --- a/plugins/extractors/bigtable/bigtable.go +++ b/plugins/extractors/bigtable/bigtable.go @@ -131,6 +131,7 @@ func (e *Extractor) getTablesInfo(ctx context.Context, emit plugins.Emit) (err e Urn: fmt.Sprintf("%s.%s.%s", e.config.ProjectID, instance, table), Name: table, Service: "bigtable", + Type: "table", }, Properties: &facetsv1beta1.Properties{ Attributes: utils.TryParseMapToProto(map[string]interface{}{ diff --git a/plugins/extractors/cassandra/cassandra.go b/plugins/extractors/cassandra/cassandra.go index e4bc8c400..e095b2f9d 100644 --- a/plugins/extractors/cassandra/cassandra.go +++ b/plugins/extractors/cassandra/cassandra.go @@ -168,6 +168,7 @@ func (e *Extractor) processTable(keyspace string, tableName string) (err error) Resource: &commonv1beta1.Resource{ Urn: fmt.Sprintf("%s.%s", keyspace, tableName), Name: tableName, + Type: "table", }, Schema: &facetsv1beta1.Columns{ Columns: columns, diff --git a/plugins/extractors/clickhouse/clickhouse.go b/plugins/extractors/clickhouse/clickhouse.go index f74089f1d..423206b2f 100644 --- a/plugins/extractors/clickhouse/clickhouse.go +++ b/plugins/extractors/clickhouse/clickhouse.go @@ -108,6 +108,7 @@ func (e *Extractor) extractTables(emit plugins.Emit) (err error) { Resource: &commonv1beta1.Resource{ Urn: fmt.Sprintf("%s.%s", dbName, tableName), Name: tableName, + Type: "table", }, Schema: &facetsv1beta1.Columns{ Columns: columns, }, diff --git a/plugins/extractors/couchdb/couchdb.go b/plugins/extractors/couchdb/couchdb.go index 505451901..49e6248ff 100644 --- a/plugins/extractors/couchdb/couchdb.go +++ b/plugins/extractors/couchdb/couchdb.go @@ -142,6 +142,7 @@ func (e *Extractor) processTable(ctx context.Context, dbName string, docID strin Resource: &commonv1beta1.Resource{ Urn: fmt.Sprintf("%s.%s", dbName, docID), Name: docID, + Type: "table", }, Schema: &facetsv1beta1.Columns{ Columns: columns, diff --git a/plugins/extractors/csv/csv.go b/plugins/extractors/csv/csv.go index 93de3c922..930f6e238 100644 --- a/plugins/extractors/csv/csv.go +++ b/plugins/extractors/csv/csv.go @@ -119,6 +119,7 @@ func (e *Extractor) buildTable(filePath string) (table *assetsv1beta1.Table, err Urn: fileName, Name: fileName, Service: "csv", + Type: "table", }, Schema: &facetsv1beta1.Columns{ Columns: e.buildColumns(content), diff --git a/plugins/extractors/elastic/elastic.go b/plugins/extractors/elastic/elastic.go index 771806c2e..01eb39e8e 100644 --- a/plugins/extractors/elastic/elastic.go +++ b/plugins/extractors/elastic/elastic.go @@ -134,6 +134,7 @@ func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) (err error) Resource: &commonv1beta1.Resource{ Urn: fmt.Sprintf("%s.%s", "elasticsearch", indexName), Name: indexName, + Type: "table", }, Schema: &facetsv1beta1.Columns{ Columns: columns, diff --git a/plugins/extractors/gcs/gcs.go b/plugins/extractors/gcs/gcs.go index 27c7280e9..bd39016ac 100644 --- a/plugins/extractors/gcs/gcs.go +++ b/plugins/extractors/gcs/gcs.go @@ -143,6 +143,7 @@ func (e *Extractor) buildBucket(b *storage.BucketAttrs, projectID string, blobs Urn: fmt.Sprintf("%s/%s", projectID, b.Name), Name: b.Name, Service: metadataSource, + Type: "bucket", }, Location: b.Location, StorageType: b.StorageClass, diff --git a/plugins/extractors/github/github.go b/plugins/extractors/github/github.go index f3d47af87..35b13981c 100644 --- a/plugins/extractors/github/github.go +++ b/plugins/extractors/github/github.go @@ -84,7 +84,8 @@ func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) (err error) } emit(models.NewRecord(&assetsv1beta1.User{ Resource: &commonv1beta1.Resource{ - Urn: usr.GetURL(), + Urn: usr.GetURL(), + Type: "user", }, Email: usr.GetEmail(), Username: usr.GetLogin(), diff --git a/plugins/extractors/grafana/grafana.go b/plugins/extractors/grafana/grafana.go index f812f3742..b2259a06f 100644 --- a/plugins/extractors/grafana/grafana.go +++ b/plugins/extractors/grafana/grafana.go @@ -104,6 +104,7 @@ func (e *Extractor) grafanaDashboardToMeteorDashboard(dashboard DashboardDetail) Resource: &commonv1beta1.Resource{ Urn: fmt.Sprintf("grafana.%s", dashboard.Dashboard.UID), Name: dashboard.Meta.Slug, + Type: "dashboard", Service: "grafana", Url: dashboard.Meta.URL, Description: dashboard.Dashboard.Description, diff --git a/plugins/extractors/kafka/kafka.go b/plugins/extractors/kafka/kafka.go index 3368c8314..7a2548972 100644 --- a/plugins/extractors/kafka/kafka.go +++ b/plugins/extractors/kafka/kafka.go @@ -127,6 +127,7 @@ func (e *Extractor) buildTopic(topic string, numOfPartitions int) *assetsv1beta1 Urn: fmt.Sprintf("kafka::%s/%s", e.config.Label, topic), Name: topic, Service: "kafka", + Type: "topic", }, Profile: &assetsv1beta1.TopicProfile{ NumberOfPartitions: int64(numOfPartitions), diff --git a/plugins/extractors/mariadb/mariadb.go b/plugins/extractors/mariadb/mariadb.go index e8c341ae6..3abfe229e 100644 --- a/plugins/extractors/mariadb/mariadb.go +++ b/plugins/extractors/mariadb/mariadb.go @@ -143,6 +143,7 @@ func (e *Extractor) processTable(database string, tableName string) (err error) Resource: &commonv1beta1.Resource{ Urn: fmt.Sprintf("%s.%s", database, tableName), Name: tableName, + Type: "table", }, Schema: &facetsv1beta1.Columns{ Columns: columns, diff --git a/plugins/extractors/metabase/metabase.go b/plugins/extractors/metabase/metabase.go index d7bddc74c..6d4b426ef 100644 --- a/plugins/extractors/metabase/metabase.go +++ b/plugins/extractors/metabase/metabase.go @@ -117,6 +117,7 @@ func (e *Extractor) buildDashboard(d Dashboard) (data *assetsv1beta1.Dashboard, Urn: dashboardUrn, Name: dashboard.Name, Service: "metabase", + Type: "dashboard", Description: dashboard.Description, }, Charts: charts, diff --git a/plugins/extractors/mongodb/mongodb.go b/plugins/extractors/mongodb/mongodb.go index 41bbf0c85..fac47099c 100644 --- a/plugins/extractors/mongodb/mongodb.go +++ b/plugins/extractors/mongodb/mongodb.go @@ -144,6 +144,7 @@ func (e *Extractor) buildTable(ctx context.Context, db *mongo.Database, collecti Resource: &commonv1beta1.Resource{ Urn: fmt.Sprintf("%s.%s", db.Name(), collectionName), Name: collectionName, + Type: "table", }, Profile: &assetsv1beta1.TableProfile{ TotalRows: totalRows, diff --git a/plugins/extractors/mssql/mssql.go b/plugins/extractors/mssql/mssql.go index ca9d64720..9642a92ad 100644 --- a/plugins/extractors/mssql/mssql.go +++ b/plugins/extractors/mssql/mssql.go @@ -134,6 +134,7 @@ func (e *Extractor) processTable(database string, tableName string) (err error) Resource: &commonv1beta1.Resource{ Urn: fmt.Sprintf("%s.%s", database, tableName), Name: tableName, + Type: "table", }, Schema: &facetsv1beta1.Columns{ Columns: columns, diff --git a/plugins/extractors/mysql/mysql.go b/plugins/extractors/mysql/mysql.go index d21944d04..a541a2245 100644 --- a/plugins/extractors/mysql/mysql.go +++ b/plugins/extractors/mysql/mysql.go @@ -145,6 +145,7 @@ func (e *Extractor) processTable(database string, tableName string) (err error) Resource: &commonv1beta1.Resource{ Urn: fmt.Sprintf("%s.%s", database, tableName), Name: tableName, + Type: "table", }, Schema: &facetsv1beta1.Columns{ Columns: columns, diff --git a/plugins/extractors/optimus/optimus.go b/plugins/extractors/optimus/optimus.go index 7dbe61e21..4c6c3635d 100644 --- a/plugins/extractors/optimus/optimus.go +++ b/plugins/extractors/optimus/optimus.go @@ -145,6 +145,7 @@ func (e *Extractor) buildJob(ctx context.Context, jobSpec *pb.JobSpecification, Name: jobSpec.Name, Service: service, Description: jobSpec.Description, + Type: "job", }, Ownership: &facetsv1beta1.Ownership{ Owners: []*facetsv1beta1.Owner{ diff --git a/plugins/extractors/oracle/oracle.go b/plugins/extractors/oracle/oracle.go index 4a287e710..bebf12d7c 100644 --- a/plugins/extractors/oracle/oracle.go +++ b/plugins/extractors/oracle/oracle.go @@ -157,6 +157,7 @@ func (e *Extractor) getTableMetadata(db *sql.DB, dbName string, tableName string Urn: fmt.Sprintf("%s.%s", dbName, tableName), Name: tableName, Service: "Oracle", + Type: "table", }, Schema: &facetsv1beta1.Columns{ Columns: columns, diff --git a/plugins/extractors/postgres/postgres.go b/plugins/extractors/postgres/postgres.go index f46b3b2ee..6af6773fb 100644 --- a/plugins/extractors/postgres/postgres.go +++ b/plugins/extractors/postgres/postgres.go @@ -168,6 +168,7 @@ func (e *Extractor) getTableMetadata(db *sql.DB, dbName string, tableName string Urn: models.TableURN("postgres", e.host, dbName, tableName), Name: tableName, Service: "postgres", + Type: "table", }, Schema: &facetsv1beta1.Columns{ Columns: columns, diff --git a/plugins/extractors/presto/presto.go b/plugins/extractors/presto/presto.go index f5f862e58..0bc273003 100644 --- a/plugins/extractors/presto/presto.go +++ b/plugins/extractors/presto/presto.go @@ -174,6 +174,7 @@ func (e *Extractor) processTable(db *sql.DB, catalog string, database string, ta Urn: fmt.Sprintf("%s.%s.%s", catalog, database, tableName), Name: tableName, Service: "presto", + Type: "table", }, Schema: &facetsv1beta1.Columns{ Columns: columns, diff --git a/plugins/extractors/redshift/redshift.go b/plugins/extractors/redshift/redshift.go index 2492233de..a741a9822 100644 --- a/plugins/extractors/redshift/redshift.go +++ b/plugins/extractors/redshift/redshift.go @@ -189,6 +189,7 @@ func (e *Extractor) getTableMetadata(dbName string, tableName string) (result *a Resource: &commonv1beta1.Resource{ Urn: models.TableURN("redshift", e.config.AWSRegion, dbName, tableName), Name: tableName, + Type: "table", Service: "redshift", }, Schema: &facetsv1beta1.Columns{ diff --git a/plugins/extractors/snowflake/snowflake.go b/plugins/extractors/snowflake/snowflake.go index 657800fd3..57a80b1f5 100644 --- a/plugins/extractors/snowflake/snowflake.go +++ b/plugins/extractors/snowflake/snowflake.go @@ -173,6 +173,7 @@ func (e *Extractor) processTable(database string, tableName string) (err error) Urn: fmt.Sprintf("%s.%s", database, tableName), Name: tableName, Service: "Snowflake", + Type: "table", }, Schema: &facetsv1beta1.Columns{ Columns: columns, diff --git a/plugins/extractors/superset/superset.go b/plugins/extractors/superset/superset.go index 78009f9b7..c0bb76a18 100644 --- a/plugins/extractors/superset/superset.go +++ b/plugins/extractors/superset/superset.go @@ -121,6 +121,7 @@ func (e *Extractor) buildDashboard(id int) (data *assetsv1beta1.Dashboard, err e Name: dashboard.DashboardTitle, Service: "superset", Url: dashboard.URL, + Type: "dashboard", }, Charts: chart, } diff --git a/plugins/extractors/tableau/tableau.go b/plugins/extractors/tableau/tableau.go index 9442682e5..d28e5f0dd 100644 --- a/plugins/extractors/tableau/tableau.go +++ b/plugins/extractors/tableau/tableau.go @@ -135,6 +135,7 @@ func (e *Extractor) buildDashboard(wb *Workbook) (data *assetsv1beta1.Dashboard, Urn: dashboardURN, Name: wb.Name, Service: "tableau", + Type: "dashboard", Description: wb.Description, }, Charts: e.buildCharts(dashboardURN, wb, lineages), diff --git a/plugins/sinks/columbus/README.md b/plugins/sinks/columbus/README.md index 2a758a39c..d2026fbd2 100644 --- a/plugins/sinks/columbus/README.md +++ b/plugins/sinks/columbus/README.md @@ -9,7 +9,6 @@ sinks: name: columbus config: host: https://columbus.com - type: sample-columbus-type headers: Columbus-User-Email: meteor@odpf.io Header-1: value11,value12 diff --git a/plugins/sinks/columbus/columbus.go b/plugins/sinks/columbus/columbus.go index 3d1dfd8da..eb0495c2d 100644 --- a/plugins/sinks/columbus/columbus.go +++ b/plugins/sinks/columbus/columbus.go @@ -1,20 +1,26 @@ package columbus -type Record struct { - Urn string `json:"urn"` +type RequestPayload struct { + Asset Asset `json:"asset"` + Upstreams []LineageRecord `json:"upstreams"` + Downstreams []LineageRecord `json:"downstreams"` +} + +type Asset struct { + URN string `json:"urn"` + Type string `json:"type"` Name string `json:"name"` Service string `json:"service"` - Upstreams []LineageRecord `json:"upstreams"` - Downstreams []LineageRecord `json:"downstreams"` - Owners []Owner `json:"owners"` Description string `json:"description"` + Owners []Owner `json:"owners"` Data interface{} `json:"data"` Labels map[string]string `json:"labels"` } type LineageRecord struct { - Urn string `json:"urn"` - Type string `json:"type"` + URN string `json:"urn"` + Type string `json:"type"` + Service string `json:"service"` } type Owner struct { diff --git a/plugins/sinks/columbus/sink.go b/plugins/sinks/columbus/sink.go index 2fcda5bca..49454361b 100644 --- a/plugins/sinks/columbus/sink.go +++ b/plugins/sinks/columbus/sink.go @@ -6,16 +6,15 @@ import ( _ "embed" "encoding/json" "fmt" - "io/ioutil" - "net/http" - "strings" - "github.com/odpf/meteor/models" "github.com/odpf/meteor/plugins" "github.com/odpf/meteor/registry" "github.com/odpf/meteor/utils" "github.com/odpf/salt/log" "github.com/pkg/errors" + "io/ioutil" + "net/http" + "strings" ) //go:embed README.md @@ -23,20 +22,22 @@ var summary string type Config struct { Host string `mapstructure:"host" validate:"required"` - Type string `mapstructure:"type" validate:"required"` Headers map[string]string `mapstructure:"headers"` Labels map[string]string `mapstructure:"labels"` } var sampleConfig = ` -# The hostnmame of the columbus service +# The hostname of the columbus service host: https://columbus.com -# The type of the data to send -type: sample-columbus-type # Additional HTTP headers send to columbus, multiple headers value are separated by a comma headers: Columbus-User-Email: meteor@odpf.io - X-Other-Header: value1, value2` + X-Other-Header: value1, value2 +# The labels to pass as payload label of the patch api +labels: + myCustom: $properties.attributes.myCustomField + sampleLabel: $properties.labels.sampleLabelField +` type httpClient interface { Do(*http.Request) (*http.Response, error) @@ -95,15 +96,15 @@ func (s *Sink) Sink(ctx context.Context, batch []models.Record) (err error) { func (s *Sink) Close() (err error) { return } -func (s *Sink) send(record Record) (err error) { - payloadBytes, err := json.Marshal([]Record{record}) +func (s *Sink) send(record RequestPayload) (err error) { + payloadBytes, err := json.Marshal(record) if err != nil { return } // send request - url := fmt.Sprintf("%s/v1beta1/types/%s/records", s.config.Host, s.config.Type) - req, err := http.NewRequest(http.MethodPut, url, bytes.NewBuffer(payloadBytes)) + url := fmt.Sprintf("%s/v1beta1/assets", s.config.Host) + req, err := http.NewRequest(http.MethodPatch, url, bytes.NewBuffer(payloadBytes)) if err != nil { return } @@ -138,22 +139,26 @@ func (s *Sink) send(record Record) (err error) { } } -func (s *Sink) buildColumbusPayload(metadata models.Metadata) (Record, error) { +func (s *Sink) buildColumbusPayload(metadata models.Metadata) (RequestPayload, error) { labels, err := s.buildLabels(metadata) if err != nil { - return Record{}, errors.Wrap(err, "failed to build labels") + return RequestPayload{}, errors.Wrap(err, "failed to build labels") } upstreams, downstreams := s.buildLineage(metadata) owners := s.buildOwners(metadata) resource := metadata.GetResource() - record := Record{ - Urn: resource.GetUrn(), - Name: resource.GetName(), - Service: resource.GetService(), - Data: metadata, - Labels: labels, - Owners: owners, + record := RequestPayload{ + Asset: Asset{ + URN: resource.GetUrn(), + Type: resource.GetType(), + Name: resource.GetName(), + Service: resource.GetService(), + Description: resource.GetDescription(), + Owners: owners, + Data: metadata, + Labels: labels, + }, Upstreams: upstreams, Downstreams: downstreams, } @@ -174,14 +179,16 @@ func (s *Sink) buildLineage(metadata models.Metadata) (upstreams, downstreams [] for _, upstream := range lineage.Upstreams { upstreams = append(upstreams, LineageRecord{ - Urn: upstream.Urn, - Type: upstream.Type, + URN: upstream.Urn, + Type: upstream.Type, + Service: upstream.Service, }) } for _, downstream := range lineage.Downstreams { downstreams = append(downstreams, LineageRecord{ - Urn: downstream.Urn, - Type: downstream.Type, + URN: downstream.Urn, + Type: downstream.Type, + Service: downstream.Service, }) } diff --git a/plugins/sinks/columbus/sink_test.go b/plugins/sinks/columbus/sink_test.go index 64d25c655..be49f61cf 100644 --- a/plugins/sinks/columbus/sink_test.go +++ b/plugins/sinks/columbus/sink_test.go @@ -29,8 +29,7 @@ var ( // sample metadata var ( - columbusType = "my-type" - url = fmt.Sprintf("%s/v1beta1/types/%s/records", host, columbusType) + url = fmt.Sprintf("%s/v1beta1/assets", host) ) func TestInit(t *testing.T) { @@ -39,14 +38,10 @@ func TestInit(t *testing.T) { { "host": "", }, - { - "host": host, - "type": "", - }, } for i, config := range invalidConfigs { t.Run(fmt.Sprintf("test invalid config #%d", i+1), func(t *testing.T) { - columbusSink := columbus.New(newMockHTTPClient(config, http.MethodGet, url, []columbus.Record{}), testUtils.Logger) + columbusSink := columbus.New(newMockHTTPClient(config, http.MethodPatch, url, columbus.RequestPayload{}), testUtils.Logger) err := columbusSink.Init(context.TODO(), config) assert.Equal(t, plugins.InvalidConfigError{Type: plugins.PluginTypeSink}, err) @@ -57,19 +52,18 @@ func TestInit(t *testing.T) { func TestSink(t *testing.T) { t.Run("should return error if columbus host returns error", func(t *testing.T) { - columbusError := `{"reason":"no such type: \"my-type\""}` - errMessage := "error sending data: columbus returns 404: {\"reason\":\"no such type: \\\"my-type\\\"\"}" + columbusError := `{"reason":"no asset found"}` + errMessage := "error sending data: columbus returns 404: {\"reason\":\"no asset found\"}" // setup mock client - url := fmt.Sprintf("%s/v1beta1/types/my-type/records", host) - client := newMockHTTPClient(map[string]interface{}{}, http.MethodPut, url, []columbus.Record{}) + url := fmt.Sprintf("%s/v1beta1/assets", host) + client := newMockHTTPClient(map[string]interface{}{}, http.MethodPatch, url, columbus.RequestPayload{}) client.SetupResponse(404, columbusError) ctx := context.TODO() columbusSink := columbus.New(client, testUtils.Logger) err := columbusSink.Init(ctx, map[string]interface{}{ "host": host, - "type": "my-type", }) if err != nil { t.Fatal(err) @@ -83,15 +77,14 @@ func TestSink(t *testing.T) { t.Run("should return RetryError if columbus returns certain status code", func(t *testing.T) { for _, code := range []int{500, 501, 502, 503, 504, 505} { t.Run(fmt.Sprintf("%d status code", code), func(t *testing.T) { - url := fmt.Sprintf("%s/v1beta1/types/my-type/records", host) - client := newMockHTTPClient(map[string]interface{}{}, http.MethodPut, url, []columbus.Record{}) + url := fmt.Sprintf("%s/v1beta1/assets", host) + client := newMockHTTPClient(map[string]interface{}{}, http.MethodPatch, url, columbus.RequestPayload{}) client.SetupResponse(code, `{"reason":"internal server error"}`) ctx := context.TODO() columbusSink := columbus.New(client, testUtils.Logger) err := columbusSink.Init(ctx, map[string]interface{}{ "host": host, - "type": "my-type", }) if err != nil { t.Fatal(err) @@ -108,34 +101,41 @@ func TestSink(t *testing.T) { description string data models.Metadata config map[string]interface{} - expected columbus.Record + expected columbus.RequestPayload }{ { description: "should create the right request to columbus", data: &assetsv1beta1.User{ Resource: &commonv1beta1.Resource{ - Urn: "my-topic-urn", - Name: "my-topic", - Service: "kafka", + Urn: "my-topic-urn", + Name: "my-topic", + Service: "kafka", + Type: "topic", + Description: "topic information", }, }, config: map[string]interface{}{ "host": host, - "type": columbusType, }, - expected: columbus.Record{ - Urn: "my-topic-urn", - Name: "my-topic", - Service: "kafka", + expected: columbus.RequestPayload{ + Asset: columbus.Asset{ + URN: "my-topic-urn", + Name: "my-topic", + Service: "kafka", + Type: "topic", + Description: "topic information", + }, }, }, { description: "should build columbus labels if labels is defined in config", data: &assetsv1beta1.Topic{ Resource: &commonv1beta1.Resource{ - Urn: "my-topic-urn", - Name: "my-topic", - Service: "kafka", + Urn: "my-topic-urn", + Name: "my-topic", + Service: "kafka", + Type: "topic", + Description: "topic information", }, Properties: &facetsv1beta1.Properties{ Attributes: utils.TryParseMapToProto(map[string]interface{}{ @@ -150,19 +150,22 @@ func TestSink(t *testing.T) { }, config: map[string]interface{}{ "host": host, - "type": columbusType, "labels": map[string]string{ "foo": "$properties.attributes.attrB", "bar": "$properties.labels.labelA", }, }, - expected: columbus.Record{ - Urn: "my-topic-urn", - Name: "my-topic", - Service: "kafka", - Labels: map[string]string{ - "foo": "valueAttrB", - "bar": "valueLabelA", + expected: columbus.RequestPayload{ + Asset: columbus.Asset{ + URN: "my-topic-urn", + Name: "my-topic", + Service: "kafka", + Type: "topic", + Description: "topic information", + Labels: map[string]string{ + "foo": "valueAttrB", + "bar": "valueLabelA", + }, }, }, }, @@ -170,39 +173,48 @@ func TestSink(t *testing.T) { description: "should send upstreams if data has upstreams", data: &assetsv1beta1.Topic{ Resource: &commonv1beta1.Resource{ - Urn: "my-topic-urn", - Name: "my-topic", - Service: "kafka", + Urn: "my-topic-urn", + Name: "my-topic", + Service: "kafka", + Type: "topic", + Description: "topic information", }, Lineage: &facetsv1beta1.Lineage{ Upstreams: []*commonv1beta1.Resource{ { - Urn: "urn-1", - Type: "type-a", + Urn: "urn-1", + Type: "type-a", + Service: "kafka", }, { - Urn: "urn-2", - Type: "type-b", + Urn: "urn-2", + Type: "type-b", + Service: "bigquery", }, }, }, }, config: map[string]interface{}{ "host": host, - "type": columbusType, }, - expected: columbus.Record{ - Urn: "my-topic-urn", - Name: "my-topic", - Service: "kafka", + expected: columbus.RequestPayload{ + Asset: columbus.Asset{ + URN: "my-topic-urn", + Name: "my-topic", + Service: "kafka", + Type: "topic", + Description: "topic information", + }, Upstreams: []columbus.LineageRecord{ { - Urn: "urn-1", - Type: "type-a", + URN: "urn-1", + Type: "type-a", + Service: "kafka", }, { - Urn: "urn-2", - Type: "type-b", + URN: "urn-2", + Type: "type-b", + Service: "bigquery", }, }, }, @@ -211,39 +223,48 @@ func TestSink(t *testing.T) { description: "should send downstreams if data has downstreams", data: &assetsv1beta1.Topic{ Resource: &commonv1beta1.Resource{ - Urn: "my-topic-urn", - Name: "my-topic", - Service: "kafka", + Urn: "my-topic-urn", + Name: "my-topic", + Service: "kafka", + Type: "topic", + Description: "topic information", }, Lineage: &facetsv1beta1.Lineage{ Downstreams: []*commonv1beta1.Resource{ { - Urn: "urn-1", - Type: "type-a", + Urn: "urn-1", + Type: "type-a", + Service: "kafka", }, { - Urn: "urn-2", - Type: "type-b", + Urn: "urn-2", + Type: "type-b", + Service: "bigquery", }, }, }, }, config: map[string]interface{}{ "host": host, - "type": columbusType, }, - expected: columbus.Record{ - Urn: "my-topic-urn", - Name: "my-topic", - Service: "kafka", + expected: columbus.RequestPayload{ + Asset: columbus.Asset{ + URN: "my-topic-urn", + Name: "my-topic", + Service: "kafka", + Type: "topic", + Description: "topic information", + }, Downstreams: []columbus.LineageRecord{ { - Urn: "urn-1", - Type: "type-a", + URN: "urn-1", + Type: "type-a", + Service: "kafka", }, { - Urn: "urn-2", - Type: "type-b", + URN: "urn-2", + Type: "type-b", + Service: "bigquery", }, }, }, @@ -252,9 +273,11 @@ func TestSink(t *testing.T) { description: "should send owners if data has ownership", data: &assetsv1beta1.Topic{ Resource: &commonv1beta1.Resource{ - Urn: "my-topic-urn", - Name: "my-topic", - Service: "kafka", + Urn: "my-topic-urn", + Name: "my-topic", + Service: "kafka", + Type: "topic", + Description: "topic information", }, Ownership: &facetsv1beta1.Ownership{ Owners: []*facetsv1beta1.Owner{ @@ -281,30 +304,33 @@ func TestSink(t *testing.T) { }, config: map[string]interface{}{ "host": host, - "type": columbusType, }, - expected: columbus.Record{ - Urn: "my-topic-urn", - Name: "my-topic", - Service: "kafka", - Owners: []columbus.Owner{ - { - URN: "urn-1", - Name: "owner-a", - Role: "role-a", - Email: "email-1", - }, - { - URN: "urn-2", - Name: "owner-b", - Role: "role-b", - Email: "email-2", - }, - { - URN: "urn-3", - Name: "owner-c", - Role: "role-c", - Email: "email-3", + expected: columbus.RequestPayload{ + Asset: columbus.Asset{ + URN: "my-topic-urn", + Name: "my-topic", + Service: "kafka", + Type: "topic", + Description: "topic information", + Owners: []columbus.Owner{ + { + URN: "urn-1", + Name: "owner-a", + Role: "role-a", + Email: "email-1", + }, + { + URN: "urn-2", + Name: "owner-b", + Role: "role-b", + Email: "email-2", + }, + { + URN: "urn-3", + Name: "owner-c", + Role: "role-c", + Email: "email-3", + }, }, }, }, @@ -313,33 +339,42 @@ func TestSink(t *testing.T) { description: "should send headers if get populated in config", data: &assetsv1beta1.Topic{ Resource: &commonv1beta1.Resource{ - Urn: "my-topic-urn", - Name: "my-topic", - Service: "kafka", + Urn: "my-topic-urn", + Name: "my-topic", + Service: "kafka", + Type: "topic", + Description: "topic information", }, }, config: map[string]interface{}{ "host": host, - "type": columbusType, "headers": map[string]string{ "Key1": "value11, value12", "Key2": "value2", }, }, - expected: columbus.Record{ - Urn: "my-topic-urn", - Name: "my-topic", - Service: "kafka", + expected: columbus.RequestPayload{ + Asset: columbus.Asset{ + URN: "my-topic-urn", + Name: "my-topic", + Service: "kafka", + Type: "topic", + Description: "topic information", + }, }, }, } for _, tc := range successTestCases { t.Run(tc.description, func(t *testing.T) { - tc.expected.Data = tc.data - payload := []columbus.Record{tc.expected} + tc.expected.Asset.Data = tc.data + payload := columbus.RequestPayload{ + Asset: tc.expected.Asset, + Upstreams: tc.expected.Upstreams, + Downstreams: tc.expected.Downstreams, + } - client := newMockHTTPClient(tc.config, http.MethodPut, url, payload) + client := newMockHTTPClient(tc.config, http.MethodPatch, url, payload) client.SetupResponse(200, "") ctx := context.TODO() @@ -361,13 +396,13 @@ type mockHTTPClient struct { URL string Method string Headers map[string]string - RequestPayload []columbus.Record + RequestPayload columbus.RequestPayload ResponseJSON string ResponseStatus int req *http.Request } -func newMockHTTPClient(config map[string]interface{}, method, url string, payload []columbus.Record) *mockHTTPClient { +func newMockHTTPClient(config map[string]interface{}, method, url string, payload columbus.RequestPayload) *mockHTTPClient { headersMap := map[string]string{} if headersItf, ok := config["headers"]; ok { headersMap = headersItf.(map[string]string)