Skip to content

Commit

Permalink
feat(columbus): update columbus api to new patch api (#337)
Browse files Browse the repository at this point in the history
* chore: update columbus sink api

* refactor: update patch api payload

* refactor: update unessential changes

* refactor: update record payload

* feat: update resource type field in extractors

* refactor: update typo

* refactor: update Record model name to RequestPayload
  • Loading branch information
scortier authored Apr 5, 2022
1 parent 2395ccc commit 853b1e2
Show file tree
Hide file tree
Showing 28 changed files with 212 additions and 141 deletions.
1 change: 1 addition & 0 deletions plugins/extractors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ func (e *Extractor) buildTable(ctx context.Context, t *bigquery.Table, md *bigqu
Resource: &commonv1beta1.Resource{
Urn: tableURN,
Name: t.TableID,
Type: "table",
Description: md.Description,
Service: "bigquery",
},
Expand Down
1 change: 1 addition & 0 deletions plugins/extractors/bigtable/bigtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ func (e *Extractor) getTablesInfo(ctx context.Context, emit plugins.Emit) (err e
Urn: fmt.Sprintf("%s.%s.%s", e.config.ProjectID, instance, table),
Name: table,
Service: "bigtable",
Type: "table",
},
Properties: &facetsv1beta1.Properties{
Attributes: utils.TryParseMapToProto(map[string]interface{}{
Expand Down
1 change: 1 addition & 0 deletions plugins/extractors/cassandra/cassandra.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ func (e *Extractor) processTable(keyspace string, tableName string) (err error)
Resource: &commonv1beta1.Resource{
Urn: fmt.Sprintf("%s.%s", keyspace, tableName),
Name: tableName,
Type: "table",
},
Schema: &facetsv1beta1.Columns{
Columns: columns,
Expand Down
1 change: 1 addition & 0 deletions plugins/extractors/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ func (e *Extractor) extractTables(emit plugins.Emit) (err error) {
Resource: &commonv1beta1.Resource{
Urn: fmt.Sprintf("%s.%s", dbName, tableName),
Name: tableName,
Type: "table",
}, Schema: &facetsv1beta1.Columns{
Columns: columns,
},
Expand Down
1 change: 1 addition & 0 deletions plugins/extractors/couchdb/couchdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ func (e *Extractor) processTable(ctx context.Context, dbName string, docID strin
Resource: &commonv1beta1.Resource{
Urn: fmt.Sprintf("%s.%s", dbName, docID),
Name: docID,
Type: "table",
},
Schema: &facetsv1beta1.Columns{
Columns: columns,
Expand Down
1 change: 1 addition & 0 deletions plugins/extractors/csv/csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ func (e *Extractor) buildTable(filePath string) (table *assetsv1beta1.Table, err
Urn: fileName,
Name: fileName,
Service: "csv",
Type: "table",
},
Schema: &facetsv1beta1.Columns{
Columns: e.buildColumns(content),
Expand Down
1 change: 1 addition & 0 deletions plugins/extractors/elastic/elastic.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) (err error)
Resource: &commonv1beta1.Resource{
Urn: fmt.Sprintf("%s.%s", "elasticsearch", indexName),
Name: indexName,
Type: "table",
},
Schema: &facetsv1beta1.Columns{
Columns: columns,
Expand Down
1 change: 1 addition & 0 deletions plugins/extractors/gcs/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func (e *Extractor) buildBucket(b *storage.BucketAttrs, projectID string, blobs
Urn: fmt.Sprintf("%s/%s", projectID, b.Name),
Name: b.Name,
Service: metadataSource,
Type: "bucket",
},
Location: b.Location,
StorageType: b.StorageClass,
Expand Down
3 changes: 2 additions & 1 deletion plugins/extractors/github/github.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) (err error)
}
emit(models.NewRecord(&assetsv1beta1.User{
Resource: &commonv1beta1.Resource{
Urn: usr.GetURL(),
Urn: usr.GetURL(),
Type: "user",
},
Email: usr.GetEmail(),
Username: usr.GetLogin(),
Expand Down
1 change: 1 addition & 0 deletions plugins/extractors/grafana/grafana.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func (e *Extractor) grafanaDashboardToMeteorDashboard(dashboard DashboardDetail)
Resource: &commonv1beta1.Resource{
Urn: fmt.Sprintf("grafana.%s", dashboard.Dashboard.UID),
Name: dashboard.Meta.Slug,
Type: "dashboard",
Service: "grafana",
Url: dashboard.Meta.URL,
Description: dashboard.Dashboard.Description,
Expand Down
1 change: 1 addition & 0 deletions plugins/extractors/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ func (e *Extractor) buildTopic(topic string, numOfPartitions int) *assetsv1beta1
Urn: fmt.Sprintf("kafka::%s/%s", e.config.Label, topic),
Name: topic,
Service: "kafka",
Type: "topic",
},
Profile: &assetsv1beta1.TopicProfile{
NumberOfPartitions: int64(numOfPartitions),
Expand Down
1 change: 1 addition & 0 deletions plugins/extractors/mariadb/mariadb.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func (e *Extractor) processTable(database string, tableName string) (err error)
Resource: &commonv1beta1.Resource{
Urn: fmt.Sprintf("%s.%s", database, tableName),
Name: tableName,
Type: "table",
},
Schema: &facetsv1beta1.Columns{
Columns: columns,
Expand Down
1 change: 1 addition & 0 deletions plugins/extractors/metabase/metabase.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ func (e *Extractor) buildDashboard(d Dashboard) (data *assetsv1beta1.Dashboard,
Urn: dashboardUrn,
Name: dashboard.Name,
Service: "metabase",
Type: "dashboard",
Description: dashboard.Description,
},
Charts: charts,
Expand Down
1 change: 1 addition & 0 deletions plugins/extractors/mongodb/mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ func (e *Extractor) buildTable(ctx context.Context, db *mongo.Database, collecti
Resource: &commonv1beta1.Resource{
Urn: fmt.Sprintf("%s.%s", db.Name(), collectionName),
Name: collectionName,
Type: "table",
},
Profile: &assetsv1beta1.TableProfile{
TotalRows: totalRows,
Expand Down
1 change: 1 addition & 0 deletions plugins/extractors/mssql/mssql.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func (e *Extractor) processTable(database string, tableName string) (err error)
Resource: &commonv1beta1.Resource{
Urn: fmt.Sprintf("%s.%s", database, tableName),
Name: tableName,
Type: "table",
},
Schema: &facetsv1beta1.Columns{
Columns: columns,
Expand Down
1 change: 1 addition & 0 deletions plugins/extractors/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ func (e *Extractor) processTable(database string, tableName string) (err error)
Resource: &commonv1beta1.Resource{
Urn: fmt.Sprintf("%s.%s", database, tableName),
Name: tableName,
Type: "table",
},
Schema: &facetsv1beta1.Columns{
Columns: columns,
Expand Down
1 change: 1 addition & 0 deletions plugins/extractors/optimus/optimus.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ func (e *Extractor) buildJob(ctx context.Context, jobSpec *pb.JobSpecification,
Name: jobSpec.Name,
Service: service,
Description: jobSpec.Description,
Type: "job",
},
Ownership: &facetsv1beta1.Ownership{
Owners: []*facetsv1beta1.Owner{
Expand Down
1 change: 1 addition & 0 deletions plugins/extractors/oracle/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ func (e *Extractor) getTableMetadata(db *sql.DB, dbName string, tableName string
Urn: fmt.Sprintf("%s.%s", dbName, tableName),
Name: tableName,
Service: "Oracle",
Type: "table",
},
Schema: &facetsv1beta1.Columns{
Columns: columns,
Expand Down
1 change: 1 addition & 0 deletions plugins/extractors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ func (e *Extractor) getTableMetadata(db *sql.DB, dbName string, tableName string
Urn: models.TableURN("postgres", e.host, dbName, tableName),
Name: tableName,
Service: "postgres",
Type: "table",
},
Schema: &facetsv1beta1.Columns{
Columns: columns,
Expand Down
1 change: 1 addition & 0 deletions plugins/extractors/presto/presto.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ func (e *Extractor) processTable(db *sql.DB, catalog string, database string, ta
Urn: fmt.Sprintf("%s.%s.%s", catalog, database, tableName),
Name: tableName,
Service: "presto",
Type: "table",
},
Schema: &facetsv1beta1.Columns{
Columns: columns,
Expand Down
1 change: 1 addition & 0 deletions plugins/extractors/redshift/redshift.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,7 @@ func (e *Extractor) getTableMetadata(dbName string, tableName string) (result *a
Resource: &commonv1beta1.Resource{
Urn: models.TableURN("redshift", e.config.AWSRegion, dbName, tableName),
Name: tableName,
Type: "table",
Service: "redshift",
},
Schema: &facetsv1beta1.Columns{
Expand Down
1 change: 1 addition & 0 deletions plugins/extractors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ func (e *Extractor) processTable(database string, tableName string) (err error)
Urn: fmt.Sprintf("%s.%s", database, tableName),
Name: tableName,
Service: "Snowflake",
Type: "table",
},
Schema: &facetsv1beta1.Columns{
Columns: columns,
Expand Down
1 change: 1 addition & 0 deletions plugins/extractors/superset/superset.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ func (e *Extractor) buildDashboard(id int) (data *assetsv1beta1.Dashboard, err e
Name: dashboard.DashboardTitle,
Service: "superset",
Url: dashboard.URL,
Type: "dashboard",
},
Charts: chart,
}
Expand Down
1 change: 1 addition & 0 deletions plugins/extractors/tableau/tableau.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ func (e *Extractor) buildDashboard(wb *Workbook) (data *assetsv1beta1.Dashboard,
Urn: dashboardURN,
Name: wb.Name,
Service: "tableau",
Type: "dashboard",
Description: wb.Description,
},
Charts: e.buildCharts(dashboardURN, wb, lineages),
Expand Down
1 change: 0 additions & 1 deletion plugins/sinks/columbus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ sinks:
name: columbus
config:
host: https://columbus.com
type: sample-columbus-type
headers:
Columbus-User-Email: [email protected]
Header-1: value11,value12
Expand Down
20 changes: 13 additions & 7 deletions plugins/sinks/columbus/columbus.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,26 @@
package columbus

type Record struct {
Urn string `json:"urn"`
type RequestPayload struct {
Asset Asset `json:"asset"`
Upstreams []LineageRecord `json:"upstreams"`
Downstreams []LineageRecord `json:"downstreams"`
}

type Asset struct {
URN string `json:"urn"`
Type string `json:"type"`
Name string `json:"name"`
Service string `json:"service"`
Upstreams []LineageRecord `json:"upstreams"`
Downstreams []LineageRecord `json:"downstreams"`
Owners []Owner `json:"owners"`
Description string `json:"description"`
Owners []Owner `json:"owners"`
Data interface{} `json:"data"`
Labels map[string]string `json:"labels"`
}

type LineageRecord struct {
Urn string `json:"urn"`
Type string `json:"type"`
URN string `json:"urn"`
Type string `json:"type"`
Service string `json:"service"`
}

type Owner struct {
Expand Down
59 changes: 33 additions & 26 deletions plugins/sinks/columbus/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,37 +6,38 @@ import (
_ "embed"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"strings"

"github.com/odpf/meteor/models"
"github.com/odpf/meteor/plugins"
"github.com/odpf/meteor/registry"
"github.com/odpf/meteor/utils"
"github.com/odpf/salt/log"
"github.com/pkg/errors"
"io/ioutil"
"net/http"
"strings"
)

//go:embed README.md
var summary string

type Config struct {
Host string `mapstructure:"host" validate:"required"`
Type string `mapstructure:"type" validate:"required"`
Headers map[string]string `mapstructure:"headers"`
Labels map[string]string `mapstructure:"labels"`
}

var sampleConfig = `
# The hostnmame of the columbus service
# The hostname of the columbus service
host: https://columbus.com
# The type of the data to send
type: sample-columbus-type
# Additional HTTP headers send to columbus, multiple headers value are separated by a comma
headers:
Columbus-User-Email: [email protected]
X-Other-Header: value1, value2`
X-Other-Header: value1, value2
# The labels to pass as payload label of the patch api
labels:
myCustom: $properties.attributes.myCustomField
sampleLabel: $properties.labels.sampleLabelField
`

type httpClient interface {
Do(*http.Request) (*http.Response, error)
Expand Down Expand Up @@ -95,15 +96,15 @@ func (s *Sink) Sink(ctx context.Context, batch []models.Record) (err error) {

func (s *Sink) Close() (err error) { return }

func (s *Sink) send(record Record) (err error) {
payloadBytes, err := json.Marshal([]Record{record})
func (s *Sink) send(record RequestPayload) (err error) {
payloadBytes, err := json.Marshal(record)
if err != nil {
return
}

// send request
url := fmt.Sprintf("%s/v1beta1/types/%s/records", s.config.Host, s.config.Type)
req, err := http.NewRequest(http.MethodPut, url, bytes.NewBuffer(payloadBytes))
url := fmt.Sprintf("%s/v1beta1/assets", s.config.Host)
req, err := http.NewRequest(http.MethodPatch, url, bytes.NewBuffer(payloadBytes))
if err != nil {
return
}
Expand Down Expand Up @@ -138,22 +139,26 @@ func (s *Sink) send(record Record) (err error) {
}
}

func (s *Sink) buildColumbusPayload(metadata models.Metadata) (Record, error) {
func (s *Sink) buildColumbusPayload(metadata models.Metadata) (RequestPayload, error) {
labels, err := s.buildLabels(metadata)
if err != nil {
return Record{}, errors.Wrap(err, "failed to build labels")
return RequestPayload{}, errors.Wrap(err, "failed to build labels")
}

upstreams, downstreams := s.buildLineage(metadata)
owners := s.buildOwners(metadata)
resource := metadata.GetResource()
record := Record{
Urn: resource.GetUrn(),
Name: resource.GetName(),
Service: resource.GetService(),
Data: metadata,
Labels: labels,
Owners: owners,
record := RequestPayload{
Asset: Asset{
URN: resource.GetUrn(),
Type: resource.GetType(),
Name: resource.GetName(),
Service: resource.GetService(),
Description: resource.GetDescription(),
Owners: owners,
Data: metadata,
Labels: labels,
},
Upstreams: upstreams,
Downstreams: downstreams,
}
Expand All @@ -174,14 +179,16 @@ func (s *Sink) buildLineage(metadata models.Metadata) (upstreams, downstreams []

for _, upstream := range lineage.Upstreams {
upstreams = append(upstreams, LineageRecord{
Urn: upstream.Urn,
Type: upstream.Type,
URN: upstream.Urn,
Type: upstream.Type,
Service: upstream.Service,
})
}
for _, downstream := range lineage.Downstreams {
downstreams = append(downstreams, LineageRecord{
Urn: downstream.Urn,
Type: downstream.Type,
URN: downstream.Urn,
Type: downstream.Type,
Service: downstream.Service,
})
}

Expand Down
Loading

0 comments on commit 853b1e2

Please sign in to comment.