From 6cf97376a71034fc195af181c6a4021c55172caa Mon Sep 17 00:00:00 2001 From: Suhas Karanth Date: Tue, 8 Nov 2022 09:40:43 +0530 Subject: [PATCH] feat: add script processor using Tengo (#428) - Implement a new 'script' processor with 'tengo' as the only supported script engine. - Add internal packages for translating asset proto value into map[string]interface{} and for updating asset proto value with the given map. --- .gitignore | 2 + docs/docs/reference/processors.md | 16 +- go.mod | 9 +- go.sum | 2 + plugins/internal/tengoutil/secure_script.go | 21 ++ .../internal/tengoutil/secure_script_test.go | 36 ++ .../tengoutil/structmap/asset_wrapper.go | 59 ++++ .../tengoutil/structmap/asset_wrapper_test.go | 319 ++++++++++++++++++ .../internal/tengoutil/structmap/structmap.go | 115 +++++++ .../tengoutil/structmap/structmap_test.go | 138 ++++++++ plugins/internal/tengoutil/testdata/sum.tengo | 5 + plugins/processors/populate.go | 1 + plugins/processors/script/README.md | 252 ++++++++++++++ plugins/processors/script/tengo_script.go | 120 +++++++ .../processors/script/tengo_script_test.go | 219 ++++++++++++ test/utils/any.go | 2 + test/utils/assert.go | 23 +- 17 files changed, 1331 insertions(+), 8 deletions(-) create mode 100644 plugins/internal/tengoutil/secure_script.go create mode 100644 plugins/internal/tengoutil/secure_script_test.go create mode 100644 plugins/internal/tengoutil/structmap/asset_wrapper.go create mode 100644 plugins/internal/tengoutil/structmap/asset_wrapper_test.go create mode 100644 plugins/internal/tengoutil/structmap/structmap.go create mode 100644 plugins/internal/tengoutil/structmap/structmap_test.go create mode 100644 plugins/internal/tengoutil/testdata/sum.tengo create mode 100644 plugins/processors/script/README.md create mode 100644 plugins/processors/script/tengo_script.go create mode 100644 plugins/processors/script/tengo_script_test.go diff --git a/.gitignore b/.gitignore index 7495d827e..997c7e39b 100644 --- a/.gitignore +++ b/.gitignore @@ -32,3 +32,5 @@ meteor-plugin-* # build /dist + +.playground diff --git a/docs/docs/reference/processors.md b/docs/docs/reference/processors.md index 65af6624e..452230108 100644 --- a/docs/docs/reference/processors.md +++ b/docs/docs/reference/processors.md @@ -28,4 +28,18 @@ processors: This processor will append Asset's Labels with value from given config. -[More details](https://github.com/odpf/meteor/blob/main/plugins/processors/labels/README.md) +[More details][labels-readme] + +## Script + +Script processor uses the user specified script to transform each asset emitted +from the extractor. Currently, [Tengo][tengo] is the only supported script +engine. + +[More details][script-readme] + +[labels-readme]: https://github.com/odpf/meteor/blob/main/plugins/processors/labels/README.md + +[script-readme]: https://github.com/odpf/meteor/blob/main/plugins/processors/script/README.md + +[tengo]: https://github.com/d5/tengo diff --git a/go.mod b/go.mod index da8eb1850..37f305ba7 100644 --- a/go.mod +++ b/go.mod @@ -16,6 +16,7 @@ require ( github.com/benbjohnson/clock v1.3.0 // indirect github.com/blastrain/vitess-sqlparser v0.0.0-20201030050434-a139afbb1aba github.com/cenkalti/backoff/v4 v4.1.2 + github.com/d5/tengo/v2 v2.13.0 github.com/denisenkom/go-mssqldb v0.10.0 github.com/dnaeon/go-vcr/v2 v2.0.1 github.com/elastic/go-elasticsearch v0.0.0 @@ -31,7 +32,9 @@ require ( github.com/go-playground/validator/v10 v10.10.0 github.com/go-sql-driver/mysql v1.6.0 github.com/gocql/gocql v0.0.0-20210817081954-bc256bbb90de + github.com/google/go-cmp v0.5.8 github.com/google/go-github/v37 v37.0.0 + github.com/gopherjs/gopherjs v0.0.0-20210503212227-fb464eba2686 // indirect github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/lib/pq v1.10.4 @@ -50,6 +53,7 @@ require ( github.com/prestodb/presto-go-client v0.0.0-20211201125635-ad28cec17d6c github.com/schollz/progressbar/v3 v3.8.5 github.com/segmentio/kafka-go v0.4.17 + github.com/sergi/go-diff v1.1.0 // indirect github.com/sijms/go-ora/v2 v2.2.22 github.com/snowflakedb/gosnowflake v1.6.7 github.com/spf13/cast v1.5.0 // indirect @@ -74,8 +78,3 @@ require ( gopkg.in/ini.v1 v1.66.6 // indirect gopkg.in/yaml.v3 v3.0.1 ) - -require ( - github.com/gopherjs/gopherjs v0.0.0-20210503212227-fb464eba2686 // indirect - github.com/sergi/go-diff v1.1.0 // indirect -) diff --git a/go.sum b/go.sum index 97d3a88c4..72fbd4604 100644 --- a/go.sum +++ b/go.sum @@ -501,6 +501,8 @@ github.com/d2g/dhcp4 v0.0.0-20170904100407-a1d1b6c41b1c/go.mod h1:Ct2BUK8SB0YC1S github.com/d2g/dhcp4client v1.0.0/go.mod h1:j0hNfjhrt2SxUOw55nL0ATM/z4Yt3t2Kd1mW34z5W5s= github.com/d2g/dhcp4server v0.0.0-20181031114812-7d4a0a7f59a5/go.mod h1:Eo87+Kg/IX2hfWJfwxMzLyuSZyxSoAug2nGa1G2QAi8= github.com/d2g/hardwareaddr v0.0.0-20190221164911-e7d9fbe030e4/go.mod h1:bMl4RjIciD2oAxI7DmWRx6gbeqrkoLqv3MV0vzNad+I= +github.com/d5/tengo/v2 v2.13.0 h1:4pZ5mR4vjOejpp+PMeIMpjZdObK7iwWoLTpVyhT+0Jk= +github.com/d5/tengo/v2 v2.13.0/go.mod h1:XRGjEs5I9jYIKTxly6HCF8oiiilk5E/RYXOZ5b0DZC8= github.com/danwakefield/fnmatch v0.0.0-20160403171240-cbb64ac3d964 h1:y5HC9v93H5EPKqaS1UYVg1uYah5Xf51mBfIoWehClUQ= github.com/danwakefield/fnmatch v0.0.0-20160403171240-cbb64ac3d964/go.mod h1:Xd9hchkHSWYkEqJwUGisez3G1QY8Ryz0sdWrLPMGjLk= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/plugins/internal/tengoutil/secure_script.go b/plugins/internal/tengoutil/secure_script.go new file mode 100644 index 000000000..ad806a5a2 --- /dev/null +++ b/plugins/internal/tengoutil/secure_script.go @@ -0,0 +1,21 @@ +package tengoutil + +import ( + "github.com/d5/tengo/v2" + "github.com/d5/tengo/v2/stdlib" +) + +const ( + maxAllocs = 5000 + maxConsts = 500 +) + +func NewSecureScript(input []byte) *tengo.Script { + s := tengo.NewScript(input) + + s.SetImports(stdlib.GetModuleMap(stdlib.AllModuleNames()...)) + s.SetMaxAllocs(maxAllocs) + s.SetMaxConstObjects(maxConsts) + + return s +} diff --git a/plugins/internal/tengoutil/secure_script_test.go b/plugins/internal/tengoutil/secure_script_test.go new file mode 100644 index 000000000..de0e3ed07 --- /dev/null +++ b/plugins/internal/tengoutil/secure_script_test.go @@ -0,0 +1,36 @@ +//go:build plugins +// +build plugins + +package tengoutil + +import ( + "testing" + + "github.com/MakeNowJust/heredoc" + "github.com/stretchr/testify/assert" +) + +func TestNewSecureScript(t *testing.T) { + t.Run("Allows import of builtin modules", func(t *testing.T) { + s := NewSecureScript(([]byte)(heredoc.Doc(` + math := import("math") + os := import("os") + text := import("text") + times := import("times") + rand := import("rand") + fmt := import("fmt") + json := import("json") + base64 := import("base64") + hex := import("hex") + enum := import("enum") + `))) + _, err := s.Compile() + assert.NoError(t, err) + }) + + t.Run("File import disallowed", func(t *testing.T) { + s := NewSecureScript(([]byte)(`sum := import("./testdata/sum")`)) + _, err := s.Compile() + assert.ErrorContains(t, err, "Compile Error: module './testdata/sum' not found") + }) +} diff --git a/plugins/internal/tengoutil/structmap/asset_wrapper.go b/plugins/internal/tengoutil/structmap/asset_wrapper.go new file mode 100644 index 000000000..ca7cc273d --- /dev/null +++ b/plugins/internal/tengoutil/structmap/asset_wrapper.go @@ -0,0 +1,59 @@ +package structmap + +import ( + "fmt" + + v1beta2 "github.com/odpf/meteor/models/odpf/assets/v1beta2" + "google.golang.org/protobuf/reflect/protoregistry" + "google.golang.org/protobuf/types/known/anypb" +) + +type AssetWrapper struct { + A *v1beta2.Asset +} + +func (w AssetWrapper) AsMap() (map[string]interface{}, error) { + v, err := AsMap(w.A) + if err != nil { + return nil, fmt.Errorf("structmap: asset as map: %w", err) + } + + m, ok := v.(map[string]interface{}) + if !ok { + return nil, fmt.Errorf("structmap: asset as map: unexpected type for asset map: %T", v) + } + + return m, err +} + +func (w *AssetWrapper) OverwriteWith(m map[string]interface{}) error { + dataMap, ok := m["data"].(map[string]interface{}) + if !ok { + return fmt.Errorf("structmap: overwrite asset: unexpected type for asset data: %T", m["data"]) + } + + mt, err := protoregistry.GlobalTypes.FindMessageByName(w.A.Data.MessageName()) + if err != nil { + return fmt.Errorf("structmap: overwrite asset: resolve type by full name %s: %w", w.A.Data.MessageName(), err) + } + + msg := mt.New().Interface() + delete(dataMap, "@type") + if err := AsStruct(m["data"], &msg); err != nil { + return fmt.Errorf("structmap: overwrite asset: decode asset data: %w", err) + } + + delete(m, "data") + if err := AsStruct(m, w.A); err != nil { + return fmt.Errorf("structmap: overwrite asset: decode asset: %w", err) + } + + data, err := anypb.New(msg) + if err != nil { + return fmt.Errorf("structmap: overwrite asset: marshal data as any: %w", err) + } + + w.A.Data = data + + return nil +} diff --git a/plugins/internal/tengoutil/structmap/asset_wrapper_test.go b/plugins/internal/tengoutil/structmap/asset_wrapper_test.go new file mode 100644 index 000000000..11a9f8363 --- /dev/null +++ b/plugins/internal/tengoutil/structmap/asset_wrapper_test.go @@ -0,0 +1,319 @@ +//go:build plugins +// +build plugins + +package structmap + +import ( + "testing" + "time" + + v1beta2 "github.com/odpf/meteor/models/odpf/assets/v1beta2" + testutils "github.com/odpf/meteor/test/utils" + "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/types/known/structpb" + "google.golang.org/protobuf/types/known/timestamppb" +) + +func TestAssetWrapper_AsMap(t *testing.T) { + cases := []struct { + name string + w AssetWrapper + expected map[string]interface{} + }{ + { + name: "AssetWithFeatureTable", + w: AssetWrapper{A: &v1beta2.Asset{ + Urn: "urn:caramlstore:test-caramlstore:feature_table:avg_dispatch_arrival_time_10_mins", + Name: "avg_dispatch_arrival_time_10_mins", + Service: "caramlstore", + Type: "feature_table", + Data: testutils.BuildAny(t, &v1beta2.FeatureTable{ + Namespace: "sauron", + Entities: []*v1beta2.FeatureTable_Entity{ + {Name: "merchant_uuid", Labels: map[string]string{ + "description": "merchant uuid", + "value_type": "STRING", + }}, + }, + Features: []*v1beta2.Feature{ + {Name: "ongoing_placed_and_waiting_acceptance_orders", DataType: "INT64"}, + {Name: "ongoing_orders", DataType: "INT64"}, + {Name: "merchant_avg_dispatch_arrival_time_10m", DataType: "FLOAT"}, + {Name: "ongoing_accepted_orders", DataType: "INT64"}, + }, + CreateTime: timestamppb.New(time.Date(2022, time.September, 19, 22, 42, 0o4, 0, time.UTC)), + UpdateTime: timestamppb.New(time.Date(2022, time.September, 21, 13, 23, 0o2, 0, time.UTC)), + }), + Lineage: &v1beta2.Lineage{ + Upstreams: []*v1beta2.Resource{ + { + Urn: "urn:kafka:int-dagstream-kafka.yonkou.io:topic:GO_FOOD-delay-allocation-merchant-feature-10m-log", + Service: "kafka", + Type: "topic", + }, + }, + }, + }}, + expected: map[string]interface{}{ + "data": map[string]interface{}{ + "@type": "type.googleapis.com/odpf.assets.v1beta2.FeatureTable", + "create_time": "2022-09-19T22:42:04Z", + "entities": []interface{}{ + map[string]interface{}{ + "labels": map[string]interface{}{"description": "merchant uuid", "value_type": "STRING"}, + "name": "merchant_uuid", + }, + }, + "features": []interface{}{ + map[string]interface{}{"data_type": "INT64", "name": "ongoing_placed_and_waiting_acceptance_orders"}, + map[string]interface{}{"data_type": "INT64", "name": "ongoing_orders"}, + map[string]interface{}{"data_type": "FLOAT", "name": "merchant_avg_dispatch_arrival_time_10m"}, + map[string]interface{}{"data_type": "INT64", "name": "ongoing_accepted_orders"}, + }, + "namespace": "sauron", + "update_time": "2022-09-21T13:23:02Z", + }, + "lineage": map[string]interface{}{ + "upstreams": []interface{}{ + map[string]interface{}{ + "service": "kafka", + "type": "topic", + "urn": "urn:kafka:int-dagstream-kafka.yonkou.io:topic:GO_FOOD-delay-allocation-merchant-feature-10m-log", + }, + }, + }, + "name": "avg_dispatch_arrival_time_10_mins", + "service": "caramlstore", + "type": "feature_table", + "urn": "urn:caramlstore:test-caramlstore:feature_table:avg_dispatch_arrival_time_10_mins", + }, + }, + { + name: "AssetWithTable", + w: AssetWrapper{A: &v1beta2.Asset{ + Urn: "urn:cassandra:test-cassandra:table:cassandra_meteor_test.applicant", + Name: "applicant", + Type: "table", + Service: "cassandra", + Data: testutils.BuildAny(t, &v1beta2.Table{ + Columns: []*v1beta2.Column{ + {Name: "applicantid", DataType: "int"}, + {Name: "first_name", DataType: "text"}, + {Name: "last_name", DataType: "text"}, + }, + Attributes: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + "id": structpb.NewStringValue("test-id"), + "name": structpb.NewStringValue("test-name"), + }, + }, + }), + }}, + expected: map[string]interface{}{ + "data": map[string]interface{}{ + "@type": "type.googleapis.com/odpf.assets.v1beta2.Table", + "columns": []interface{}{ + map[string]interface{}{"data_type": "int", "name": "applicantid"}, + map[string]interface{}{"data_type": "text", "name": "first_name"}, + map[string]interface{}{"data_type": "text", "name": "last_name"}, + }, + "attributes": map[string]interface{}{ + "id": "test-id", + "name": "test-name", + }, + }, + "name": "applicant", + "service": "cassandra", + "type": "table", + "urn": "urn:cassandra:test-cassandra:table:cassandra_meteor_test.applicant", + }, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + res, err := tc.w.AsMap() + assert.NoError(t, err) + assert.Equal(t, tc.expected, res) + }) + } +} + +func TestAssetWrapper_OverwriteWith(t *testing.T) { + cases := []struct { + name string + w AssetWrapper + input map[string]interface{} + expected *v1beta2.Asset + expectedErr bool + }{ + { + name: "AssetWithFeatureTable", + w: AssetWrapper{A: &v1beta2.Asset{ + Data: &anypb.Any{TypeUrl: "type.googleapis.com/odpf.assets.v1beta2.FeatureTable"}, + }}, + input: map[string]interface{}{ + "data": map[string]interface{}{ + "@type": "type.googleapis.com/odpf.assets.v1beta2.FeatureTable", + "create_time": "2022-09-19T22:42:04Z", + "entities": []interface{}{ + map[string]interface{}{ + "labels": map[string]interface{}{"description": "merchant uuid", "value_type": "STRING"}, + "name": "merchant_uuid", + }, + }, + "features": []interface{}{ + map[string]interface{}{"data_type": "INT64", "name": "ongoing_placed_and_waiting_acceptance_orders"}, + map[string]interface{}{"data_type": "INT64", "name": "ongoing_orders"}, + map[string]interface{}{"data_type": "FLOAT", "name": "merchant_avg_dispatch_arrival_time_10m"}, + map[string]interface{}{"data_type": "INT64", "name": "ongoing_accepted_orders"}, + }, + "namespace": "sauron", + "update_time": "2022-09-21T13:23:02Z", + }, + "lineage": map[string]interface{}{ + "upstreams": []interface{}{ + map[string]interface{}{ + "service": "kafka", + "type": "topic", + "urn": "urn:kafka:int-dagstream-kafka.yonkou.io:topic:GO_FOOD-delay-allocation-merchant-feature-10m-log", + }, + }, + }, + "create_time": "2022-10-19T22:42:04Z", + "name": "avg_dispatch_arrival_time_10_mins", + "service": "caramlstore", + "type": "feature_table", + "urn": "urn:caramlstore:test-caramlstore:feature_table:avg_dispatch_arrival_time_10_mins", + }, + expected: &v1beta2.Asset{ + Urn: "urn:caramlstore:test-caramlstore:feature_table:avg_dispatch_arrival_time_10_mins", + Name: "avg_dispatch_arrival_time_10_mins", + Service: "caramlstore", + Type: "feature_table", + Data: testutils.BuildAny(t, &v1beta2.FeatureTable{ + Namespace: "sauron", + Entities: []*v1beta2.FeatureTable_Entity{ + { + Name: "merchant_uuid", + Labels: map[string]string{"description": "merchant uuid", "value_type": "STRING"}, + }, + }, + Features: []*v1beta2.Feature{ + {Name: "ongoing_placed_and_waiting_acceptance_orders", DataType: "INT64"}, + {Name: "ongoing_orders", DataType: "INT64"}, + {Name: "merchant_avg_dispatch_arrival_time_10m", DataType: "FLOAT"}, + {Name: "ongoing_accepted_orders", DataType: "INT64"}, + }, + CreateTime: timestamppb.New(time.Date(2022, time.September, 19, 22, 42, 4, 0, time.UTC)), + UpdateTime: timestamppb.New(time.Date(2022, time.September, 21, 13, 23, 2, 0, time.UTC)), + }), + Lineage: &v1beta2.Lineage{ + Upstreams: []*v1beta2.Resource{ + { + Urn: "urn:kafka:int-dagstream-kafka.yonkou.io:topic:GO_FOOD-delay-allocation-merchant-feature-10m-log", + Service: "kafka", + Type: "topic", + }, + }, + }, + CreateTime: timestamppb.New(time.Date(2022, time.October, 19, 22, 42, 4, 0, time.UTC)), + }, + }, + { + name: "AssetWithTable", + w: AssetWrapper{A: &v1beta2.Asset{ + Data: &anypb.Any{TypeUrl: "type.googleapis.com/odpf.assets.v1beta2.Table"}, + }}, + input: map[string]interface{}{ + "data": map[string]interface{}{ + "@type": "type.googleapis.com/odpf.assets.v1beta2.Table", + "columns": []interface{}{ + map[string]interface{}{"data_type": "int", "name": "applicantid"}, + map[string]interface{}{"data_type": "text", "name": "first_name"}, + map[string]interface{}{"data_type": "text", "name": "last_name"}, + }, + "attributes": map[string]interface{}{"id": "test-id", "name": "test-name"}, + }, + "name": "applicant", + "service": "cassandra", + "type": "table", + "urn": "urn:cassandra:test-cassandra:table:cassandra_meteor_test.applicant", + }, + expected: &v1beta2.Asset{ + Urn: "urn:cassandra:test-cassandra:table:cassandra_meteor_test.applicant", + Name: "applicant", + Type: "table", + Service: "cassandra", + Data: testutils.BuildAny(t, &v1beta2.Table{ + Columns: []*v1beta2.Column{ + {Name: "applicantid", DataType: "int"}, + {Name: "first_name", DataType: "text"}, + {Name: "last_name", DataType: "text"}, + }, + Attributes: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + "id": structpb.NewStringValue("test-id"), + "name": structpb.NewStringValue("test-name"), + }, + }, + }), + }, + }, + { + name: "WithoutData", + w: AssetWrapper{A: &v1beta2.Asset{ + Data: &anypb.Any{TypeUrl: "type.googleapis.com/odpf.assets.v1beta2.Table"}, + }}, + input: map[string]interface{}{ + "name": "applicant", + "service": "cassandra", + "type": "table", + "urn": "urn:cassandra:test-cassandra:table:cassandra_meteor_test.applicant", + }, + expected: &v1beta2.Asset{ + Data: &anypb.Any{TypeUrl: "type.googleapis.com/odpf.assets.v1beta2.Table"}, + }, + expectedErr: true, + }, + { + name: "UnknownKeys", + w: AssetWrapper{A: &v1beta2.Asset{ + Data: &anypb.Any{TypeUrl: "type.googleapis.com/odpf.assets.v1beta2.Table"}, + }}, + input: map[string]interface{}{ + "does-not-exist": "value", + "urn": "urn:cassandra:test-cassandra:table:cassandra_meteor_test.applicant", + "type": "table", + "data": map[string]interface{}{}, + }, + expected: &v1beta2.Asset{ + Urn: "urn:cassandra:test-cassandra:table:cassandra_meteor_test.applicant", + Type: "table", + Data: &anypb.Any{TypeUrl: "type.googleapis.com/odpf.assets.v1beta2.Table"}, + }, + expectedErr: true, + }, + { + name: "UnknownMessageName", + w: AssetWrapper{A: &v1beta2.Asset{ + Data: &anypb.Any{TypeUrl: "type.googleapis.com/odpf.assets.v1beta2.DoesNotExist"}, + }}, + input: map[string]interface{}{ + "data": map[string]interface{}{}, + }, + expected: &v1beta2.Asset{ + Data: &anypb.Any{TypeUrl: "type.googleapis.com/odpf.assets.v1beta2.DoesNotExist"}, + }, + expectedErr: true, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + err := tc.w.OverwriteWith(tc.input) + assert.Equalf(t, tc.expectedErr, (err != nil), + "AssetWrapper.OverwriteWith() err = %v,\nexpectedErr %v", err, tc.expectedErr) + testutils.AssertEqualProto(t, tc.expected, tc.w.A) + }) + } +} diff --git a/plugins/internal/tengoutil/structmap/structmap.go b/plugins/internal/tengoutil/structmap/structmap.go new file mode 100644 index 000000000..ff48b4237 --- /dev/null +++ b/plugins/internal/tengoutil/structmap/structmap.go @@ -0,0 +1,115 @@ +package structmap + +import ( + "encoding/json" + "fmt" + "reflect" + "time" + + "github.com/mitchellh/mapstructure" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/structpb" + "google.golang.org/protobuf/types/known/timestamppb" +) + +func AsMap(v interface{}) (interface{}, error) { + // Cannot use mapstructure here because of + // 1. https://github.com/mitchellh/mapstructure/issues/249 + // 2. Handling for fields with type *timestamp.Timestamp + var ( + data []byte + err error + ) + if m, ok := v.(proto.Message); ok { + data, err = protojson.MarshalOptions{UseProtoNames: true}.Marshal(m) + } else { + data, err = json.Marshal(v) + } + if err != nil { + return nil, fmt.Errorf("structmap: %T as map: marshal: %w", v, err) + } + + var res interface{} + if err := json.Unmarshal(data, &res); err != nil { + return nil, fmt.Errorf("structmap: %T as map: unmarshal: %w", v, err) + } + + return res, nil +} + +func AsStruct(input, output interface{}) error { + dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ + DecodeHook: mapstructure.ComposeDecodeHookFunc( + stringToTimestampHookFunc(time.RFC3339), + timeToTimestampHookFunc(), + mapstructure.StringToTimeHookFunc(time.RFC3339), + mapToAttributesHookFunc(), + ), + WeaklyTypedInput: true, + ErrorUnused: true, + ZeroFields: true, + Result: output, + TagName: "json", + }) + if err != nil { + return fmt.Errorf("structmap: decode into %T: create decoder: %w", output, err) + } + + if err := dec.Decode(input); err != nil { + return fmt.Errorf("structmap: decode into %T: %w", output, err) + } + + return nil +} + +// stringToTimestampHookFunc returns a DecodeHookFunc that converts +// strings to timestamppb.Timestamp. +func stringToTimestampHookFunc(layout string) mapstructure.DecodeHookFuncType { + return func(_ reflect.Type, t reflect.Type, data interface{}) (interface{}, error) { + s, ok := data.(string) + if !ok { + return data, nil + } + if t != reflect.TypeOf(timestamppb.Timestamp{}) && t != reflect.TypeOf(×tamppb.Timestamp{}) { + return data, nil + } + + // Convert it by parsing + ts, err := time.Parse(layout, s) + if err != nil { + return nil, fmt.Errorf("structmap: mapstructure string to timestamp hook: %w", err) + } + + return timestamppb.New(ts), nil + } +} + +func timeToTimestampHookFunc() mapstructure.DecodeHookFuncType { + return func(_ reflect.Type, t reflect.Type, data interface{}) (interface{}, error) { + ts, ok := data.(time.Time) + if !ok { + return data, nil + } + if t != reflect.TypeOf(timestamppb.Timestamp{}) && t != reflect.TypeOf(×tamppb.Timestamp{}) { + return data, nil + } + + return timestamppb.New(ts), nil + } +} + +func mapToAttributesHookFunc() mapstructure.DecodeHookFuncType { + return func(_ reflect.Type, t reflect.Type, data interface{}) (interface{}, error) { + m, ok := data.(map[string]interface{}) + if !ok { + return data, nil + } + + if t != reflect.TypeOf(&structpb.Struct{}) && t != reflect.TypeOf(structpb.Struct{}) { + return data, nil + } + + return structpb.NewStruct(m) + } +} diff --git a/plugins/internal/tengoutil/structmap/structmap_test.go b/plugins/internal/tengoutil/structmap/structmap_test.go new file mode 100644 index 000000000..8914d7b0a --- /dev/null +++ b/plugins/internal/tengoutil/structmap/structmap_test.go @@ -0,0 +1,138 @@ +//go:build plugins +// +build plugins + +package structmap + +import ( + "testing" + "time" + + v1beta2 "github.com/odpf/meteor/models/odpf/assets/v1beta2" + "github.com/odpf/meteor/utils" + "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/types/known/structpb" + "google.golang.org/protobuf/types/known/timestamppb" +) + +func TestAsMap(t *testing.T) { + cases := []struct { + name string + input interface{} + expected interface{} + expectedErr bool + }{ + { + name: "MapStringToString", + input: map[string]string{"key": "value"}, + expected: map[string]interface{}{"key": "value"}, + }, + { + name: "MapIntToStringSlice", + input: map[int][]string{1: {"s1", "s2"}}, + expected: map[string]interface{}{"1": []interface{}{"s1", "s2"}}, + }, + { + name: "StringSlice", + input: []string{"s1", "s2"}, + expected: []interface{}{"s1", "s2"}, + }, + { + name: "WithProtoMessage", + input: &v1beta2.Job{ + Attributes: utils.TryParseMapToProto(map[string]interface{}{ + "id": "test-id", + "name": "test-name", + }), + CreateTime: timestamppb.New(time.Date( + 2022, time.September, 19, 22, 42, 0o4, 0, time.UTC, + )), + }, + expected: map[string]interface{}{ + "attributes": map[string]interface{}{ + "id": "test-id", + "name": "test-name", + }, + "create_time": "2022-09-19T22:42:04Z", + }, + }, + { + name: "MarshalFailure", + input: make(chan int), + expectedErr: true, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + res, err := AsMap(tc.input) + assert.Equalf(t, tc.expectedErr, (err != nil), + "AsMap() err = %v,\nexpectedErr %v", err, tc.expectedErr) + assert.Equal(t, tc.expected, res) + }) + } +} + +func TestAsStruct(t *testing.T) { + cases := []struct { + name string + input interface{} + output interface{} + expected interface{} + expectedErr bool + }{ + { + name: "MapStringToString", + input: map[string]interface{}{"key": "value"}, + output: map[string]string{}, + expected: map[string]string{"key": "value"}, + }, + { + name: "MapIntToStringSlice", + input: map[string]interface{}{"1": []interface{}{"s1", "s2"}}, + output: map[int][]string{}, + expected: map[int][]string{1: {"s1", "s2"}}, + }, + { + name: "StringSlice", + input: []interface{}{"s1", "s2"}, + output: []string{}, + expected: []string{"s1", "s2"}, + }, + { + name: "WithProtoMessage", + input: map[string]interface{}{ + "attributes": map[string]interface{}{ + "id": "test-id", + "name": "test-name", + }, + "create_time": "2022-09-19T22:42:04Z", + }, + output: &v1beta2.Job{}, + expected: &v1beta2.Job{ + Attributes: &structpb.Struct{ + Fields: map[string]*structpb.Value{ + "id": structpb.NewStringValue("test-id"), + "name": structpb.NewStringValue("test-name"), + }, + }, + CreateTime: timestamppb.New(time.Date( + 2022, time.September, 19, 22, 42, 0o4, 0, time.UTC, + )), + }, + }, + { + name: "MismatchedType", + input: []interface{}{"s1"}, + output: map[string]interface{}{}, + expected: map[string]interface{}{}, + expectedErr: true, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + err := AsStruct(tc.input, &tc.output) + assert.Equalf(t, tc.expectedErr, (err != nil), + "AsStruct() err = %v,\nexpectedErr %v", err, tc.expectedErr) + assert.Equal(t, tc.expected, tc.output) + }) + } +} diff --git a/plugins/internal/tengoutil/testdata/sum.tengo b/plugins/internal/tengoutil/testdata/sum.tengo new file mode 100644 index 000000000..4b2d3e6d2 --- /dev/null +++ b/plugins/internal/tengoutil/testdata/sum.tengo @@ -0,0 +1,5 @@ +base := 5 + +export func(x) { + return x + base +} diff --git a/plugins/processors/populate.go b/plugins/processors/populate.go index 2cb5e4379..880b7cbc7 100644 --- a/plugins/processors/populate.go +++ b/plugins/processors/populate.go @@ -3,4 +3,5 @@ package processors import ( _ "github.com/odpf/meteor/plugins/processors/enrich" _ "github.com/odpf/meteor/plugins/processors/labels" + _ "github.com/odpf/meteor/plugins/processors/script" ) diff --git a/plugins/processors/script/README.md b/plugins/processors/script/README.md new file mode 100644 index 000000000..5e7c12799 --- /dev/null +++ b/plugins/processors/script/README.md @@ -0,0 +1,252 @@ +# script + +`script` processor will run the user specified script to transform each asset +that is emitted by the extractor. Currently, [Tengo][tengo] is the only +supported script engine. + +Refer Tengo documentation for script language syntax and supported functionality +\- https://github.com/d5/tengo/tree/v2.13.0#references +. [Tengo standard library modules][tengo-stdlib] can also be imported and used +if requried. + +## Usage + +```yaml +processors: + - name: script + config: + engine: tengo + script: | + asset.owners = append(asset.owners || [], { name: "Big Mom", email: "big.mom@wholecakeisland.com" }) +``` + +## Inputs + +| Key | Value | Example | Description | Required? | +|:---------|:---------|:---------------------------------------------------------------|:-----------------------------------------------------|:----------| +| `engine` | `string` | `"tengo"` | Script engine. Only `"tengo"` is supported currently | ✅ | +| `script` | `string` | `asset.labels = merge({script_engine: "tengo"}, asset.labels)` | [Tengo][tengo] script. | ✅ | + +### Script Globals + +#### `asset` + +The asset record emitted by the extractor is made available in the script +environment as `asset`. Any changes made to the asset will be reflected in the +record that will be output from the script processor. The field names will be as +per the [`Asset` proto definition][proton-asset]. Furthermore, the data +structure for `asset.data` will be one of the following: + +- [`Bucket`][proton-bucket] +- [`Dashboard`][proton-dashboard] +- [`Experiment`][proton-experiment] +- [`FeatureTable`][proton-featuretable] +- [`Group`][proton-group] +- [`Job`][proton-job] +- [`Metric`][proton-metric] +- [`Model`][proton-model] +- [`Service`][proton-service] +- [`Table`][proton-table] +- [`Topic`][proton-topic] +- [`User`][proton-user] + +The data type for `asset.data` depends on the specific type of extractor. + +## Worked Example + +Consider a [`FeatureTable`][proton-featuretable] asset with the following data: + +```json +{ + "urn": "urn:caramlstore:test-caramlstore:feature_table:avg_dispatch_arrival_time_10_mins", + "name": "avg_dispatch_arrival_time_10_mins", + "service": "caramlstore", + "type": "feature_table", + "data": { + "@type": "type.googleapis.com/odpf.assets.v1beta2.FeatureTable", + "namespace": "sauron", + "entities": [ + { + "name": "merchant_uuid", + "labels": {"description": "merchant uuid", "value_type": "STRING"} + } + ], + "features": [ + { + "name": "ongoing_placed_and_waiting_acceptance_orders", + "data_type": "INT64" + }, + {"name": "ongoing_orders", "data_type": "INT64"}, + {"name": "merchant_avg_dispatch_arrival_time_10m", "data_type": "FLOAT"}, + {"name": "ongoing_accepted_orders", "data_type": "INT64"} + ], + "create_time": "2022-09-19T22:42:04Z", + "update_time": "2022-09-21T13:23:02Z" + }, + "lineage": { + "upstreams": [ + { + "urn": "urn:kafka:int-dagstream-kafka.yonkou.io:topic:GO_FOOD-delay-allocation-merchant-feature-10m-log", + "service": "kafka", + "type": "topic" + } + ] + } +} +``` + +With the following contrived requirements to transform the asset: + +- Add a label to the asset - `"script_engine": "tengo`. +- Add a label to each entity. Ex: `"catch_phrase": "You talkin' to me?"`. +- Set an EntityName for each feature based on the following mapping: + - `ongoing_placed_and_waiting_acceptance_orders: customer_orders` + - `ongoing_orders: customer_orders` + - `merchant_avg_dispatch_arrival_time_10m: merchant_driver` + - `ongoing_accepted_orders: merchant_orders` +- Set the owner as `{Name: Big Mom, Email: big.mom@wholecakeisland.com}`. +- For each lineage upstream, if the service is Kafka, apply a string replace op + on the URN - `{.yonkou.io => }`. +- Add 1 day to the `update_time` timestamp present under `asset.data`. + +The script to apply the transformations above: + +[//]: # (@formatter:off) + +```go +text := import("text") +times := import("times") + +merge := func(m1, m2) { + for k, v in m2 { + m1[k] = v + } + return m1 +} + +asset.labels = merge({script_engine: "tengo"}, asset.labels) + +for e in asset.data.entities { + e.labels = merge({catch_phrase: "You talkin' to me?"}, e.labels) +} + +for f in asset.data.features { + if f.name == "ongoing_placed_and_waiting_acceptance_orders" || f.name == "ongoing_orders" { + f.entity_name = "customer_orders" + } else if f.name == "merchant_avg_dispatch_arrival_time_10m" { + f.entity_name = "merchant_driver" + } else if f.name == "ongoing_accepted_orders" { + f.entity_name = "merchant_orders" + } +} + +asset.owners = append(asset.owners || [], { name: "Big Mom", email: "big.mom@wholecakeisland.com" }) + +for u in asset.lineage.upstreams { + u.urn = u.service != "kafka" ? u.urn : text.replace(u.urn, ".yonkou.io", "", -1) +} + +update_time := times.parse("2006-01-02T15:04:05Z07:00", asset.data.update_time) +asset.data.update_time = times.add_date(update_time, 0, 0, 1) +``` + +[//]: # (@formatter:on) + +With this script, the output from the processor would have the following asset: + +```json +{ + "urn": "urn:caramlstore:test-caramlstore:feature_table:avg_dispatch_arrival_time_10_mins", + "name": "avg_dispatch_arrival_time_10_mins", + "service": "caramlstore", + "type": "feature_table", + "data": { + "@type": "type.googleapis.com/odpf.assets.v1beta2.FeatureTable", + "namespace": "sauron", + "entities": [ + { + "name": "merchant_uuid", + "labels": { + "catch_phrase": "You talkin' to me?", + "description": "merchant uuid", + "value_type": "STRING" + } + } + ], + "features": [ + { + "name": "ongoing_placed_and_waiting_acceptance_orders", + "data_type": "INT64", + "entity_name": "customer_orders" + }, + { + "name": "ongoing_orders", + "data_type": "INT64", + "entity_name": "customer_orders" + }, + { + "name": "merchant_avg_dispatch_arrival_time_10m", + "data_type": "FLOAT", + "entity_name": "merchant_driver" + }, + { + "name": "ongoing_accepted_orders", + "data_type": "INT64", + "entity_name": "merchant_orders" + } + ], + "create_time": "2022-09-19T22:42:04Z", + "update_time": "2022-09-22T13:23:02Z" + }, + "owners": [ + {"name": "Big Mom", "email": "big.mom@wholecakeisland.com"} + ], + "lineage": { + "upstreams": [ + { + "urn": "urn:kafka:int-dagstream-kafka:topic:GO_FOOD-delay-allocation-merchant-feature-10m-log", + "service": "kafka", + "type": "topic" + } + ] + }, + "labels": {"script_engine": "tengo"} +} +``` + +## Contributing + +Refer to +the [contribution guidelines](../../../docs/docs/contribute/guide.md#adding-a-new-processor) +for information on contributing to this module. + +[tengo]: https://github.com/d5/tengo + +[tengo-stdlib]: https://github.com/d5/tengo/blob/v2.13.0/docs/stdlib.md + +[proton-asset]: https://github.com/odpf/proton/blob/0acbe8a/odpf/assets/v1beta2/asset.proto#L14 + +[proton-bucket]: https://github.com/odpf/proton/blob/0acbe8a/odpf/assets/v1beta2/bucket.proto#L13 + +[proton-dashboard]: https://github.com/odpf/proton/blob/0acbe8a/odpf/assets/v1beta2/dashboard.proto#L14 + +[proton-experiment]: https://github.com/odpf/proton/blob/0acbe8a/odpf/assets/v1beta2/experiment.proto#L15 + +[proton-featuretable]: https://github.com/odpf/proton/blob/0acbe8a/odpf/assets/v1beta2/feature_table.proto#L32 + +[proton-group]: https://github.com/odpf/proton/blob/0acbe8a/odpf/assets/v1beta2/group.proto#L12 + +[proton-job]: https://github.com/odpf/proton/blob/0acbe8a/odpf/assets/v1beta2/job.proto#L13 + +[proton-metric]: https://github.com/odpf/proton/blob/0acbe8a/odpf/assets/v1beta2/metric.proto#L13 + +[proton-model]: https://github.com/odpf/proton/blob/0acbe8a/odpf/assets/v1beta2/model.proto#L17 + +[proton-service]: https://github.com/odpf/proton/blob/0acbe8a/odpf/assets/v1beta2/service.proto#L11 + +[proton-table]: https://github.com/odpf/proton/blob/0acbe8a/odpf/assets/v1beta2/table.proto#L14 + +[proton-topic]: https://github.com/odpf/proton/blob/0acbe8a/odpf/assets/v1beta2/topic.proto#L14 + +[proton-user]: https://github.com/odpf/proton/blob/0acbe8a/odpf/assets/v1beta2/user.proto#L15 + diff --git a/plugins/processors/script/tengo_script.go b/plugins/processors/script/tengo_script.go new file mode 100644 index 000000000..70ec50bd0 --- /dev/null +++ b/plugins/processors/script/tengo_script.go @@ -0,0 +1,120 @@ +package script + +import ( + "context" + _ "embed" // used to print the embedded assets + "fmt" + + "github.com/MakeNowJust/heredoc" + "github.com/d5/tengo/v2" + "github.com/odpf/meteor/models" + "github.com/odpf/meteor/plugins" + "github.com/odpf/meteor/plugins/internal/tengoutil" + "github.com/odpf/meteor/plugins/internal/tengoutil/structmap" + "github.com/odpf/meteor/registry" + "github.com/odpf/salt/log" +) + +func init() { + if err := registry.Processors.Register("script", func() plugins.Processor { + return New(plugins.GetLog()) + }); err != nil { + return + } +} + +//go:embed README.md +var summary string + +type Config struct { + Engine string `mapstructure:"engine" validate:"required,oneof=tengo"` + Script string `mapstructure:"script" validate:"required"` +} + +// Processor executes the configured Tengo script to transform the given asset +// record. +type Processor struct { + plugins.BasePlugin + config Config + logger log.Logger + + compiled *tengo.Compiled +} + +var sampleConfig = heredoc.Doc(` + engine: tengo + script: | + asset.owners = append(asset.owners || [], { name: "Big Mom", email: "big.mom@wholecakeisland.com" }) +`) + +var info = plugins.Info{ + Description: "Transform the extracted asset with the configured Tengo script", + SampleConfig: sampleConfig, + Summary: summary, + Tags: []string{"processor", "transform", "script"}, +} + +// New create a new processor +func New(logger log.Logger) *Processor { + p := &Processor{ + logger: logger, + } + p.BasePlugin = plugins.NewBasePlugin(info, &p.config) + + return p +} + +func (p *Processor) Init(ctx context.Context, config plugins.Config) error { + if err := p.BasePlugin.Init(ctx, config); err != nil { + return fmt.Errorf("script processor init: %w", err) + } + + s := tengoutil.NewSecureScript(([]byte)(p.config.Script)) + if err := p.declareGlobals(s); err != nil { + return fmt.Errorf("script processor init: %w", err) + } + + compiled, err := s.Compile() + if err != nil { + return fmt.Errorf("script processor init: compile script: %w", err) + } + + p.compiled = compiled + + return nil +} + +// Process processes the data +func (p *Processor) Process(ctx context.Context, src models.Record) (models.Record, error) { + astWrapper := structmap.AssetWrapper{A: src.Data()} + m, err := astWrapper.AsMap() + if err != nil { + return models.Record{}, fmt.Errorf("script processor: %w", err) + } + + c := p.compiled.Clone() + if err := c.Set("asset", m); err != nil { + return models.Record{}, fmt.Errorf("script processor: set asset into vm: %w", err) + } + + if err := c.RunContext(ctx); err != nil { + return models.Record{}, fmt.Errorf("script processor: run script: %w", err) + } + + if err := astWrapper.OverwriteWith(c.Get("asset").Map()); err != nil { + return models.Record{}, fmt.Errorf("script processor: overwrite asset: %w", err) + } + + return models.NewRecord(astWrapper.A), nil +} + +func (p *Processor) declareGlobals(s *tengo.Script) error { + for name, v := range map[string]interface{}{ + "asset": map[string]interface{}{}, + } { + if err := s.Add(name, v); err != nil { + return fmt.Errorf("declare script globals: %w", err) + } + } + return nil +} diff --git a/plugins/processors/script/tengo_script_test.go b/plugins/processors/script/tengo_script_test.go new file mode 100644 index 000000000..2bb3d8cbf --- /dev/null +++ b/plugins/processors/script/tengo_script_test.go @@ -0,0 +1,219 @@ +//go:build plugins +// +build plugins + +package script + +import ( + "context" + "testing" + "time" + + "github.com/MakeNowJust/heredoc" + "github.com/odpf/meteor/models" + v1beta2 "github.com/odpf/meteor/models/odpf/assets/v1beta2" + "github.com/odpf/meteor/plugins" + testutils "github.com/odpf/meteor/test/utils" + "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/types/known/anypb" + "google.golang.org/protobuf/types/known/timestamppb" +) + +var ( + ctx = context.Background() + script = `asset.owners = append(asset.owners || [], { name: "Big Mom", email: "big.mom@wholecakeisland.com" })` +) + +func TestInit(t *testing.T) { + t.Run("InvalidConfig", func(t *testing.T) { + cases := []struct { + name string + cfg plugins.Config + }{ + { + name: "WithoutScript", + cfg: plugins.Config{ + RawConfig: map[string]interface{}{"engine": "tengo"}, + }, + }, + { + name: "WithoutEngine", + cfg: plugins.Config{ + RawConfig: map[string]interface{}{"script": script}, + }, + }, + { + name: "WithUnsupportedEngine", + cfg: plugins.Config{ + RawConfig: map[string]interface{}{"script": script, "engine": "goja"}, + }, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + err := New(testutils.Logger).Init(ctx, tc.cfg) + assert.ErrorAs(t, err, &plugins.InvalidConfigError{}) + }) + } + }) + + t.Run("InvalidScript", func(t *testing.T) { + err := New(testutils.Logger).Init(ctx, plugins.Config{ + RawConfig: map[string]interface{}{ + "script": `ast.owners = []`, + "engine": "tengo", + }, + }) + assert.ErrorContains(t, err, "script processor init: compile script: Compile Error: unresolved reference 'ast'") + }) +} + +func TestProcess(t *testing.T) { + cases := []struct { + name string + script string + input *v1beta2.Asset + expected *v1beta2.Asset + errStr string + }{ + { + name: "FeatureTableAsset", + script: heredoc.Doc(` + text := import("text") + times := import("times") + + merge := func(m1, m2) { + for k, v in m2 { + m1[k] = v + } + return m1 + } + + asset.labels = merge({script_engine: "tengo"}, asset.labels) + + for e in asset.data.entities { + e.labels = merge({catch_phrase: "You talkin' to me?"}, e.labels) + } + + for f in asset.data.features { + if f.name == "ongoing_placed_and_waiting_acceptance_orders" || f.name == "ongoing_orders" { + f.entity_name = "customer_orders" + } else if f.name == "merchant_avg_dispatch_arrival_time_10m" { + f.entity_name = "merchant_driver" + } else if f.name == "ongoing_accepted_orders" { + f.entity_name = "merchant_orders" + } + } + + asset.owners = append(asset.owners || [], { name: "Big Mom", email: "big.mom@wholecakeisland.com" }) + + for u in asset.lineage.upstreams { + u.urn = u.service != "kafka" ? u.urn : text.replace(u.urn, ".yonkou.io", "", -1) + } + + update_time := times.parse("2006-01-02T15:04:05Z07:00", asset.data.update_time) + asset.data.update_time = times.add_date(update_time, 0, 0, 1) + `), + input: &v1beta2.Asset{ + Urn: "urn:caramlstore:test-caramlstore:feature_table:avg_dispatch_arrival_time_10_mins", + Name: "avg_dispatch_arrival_time_10_mins", + Service: "caramlstore", + Type: "feature_table", + Data: testutils.BuildAny(t, &v1beta2.FeatureTable{ + Namespace: "sauron", + Entities: []*v1beta2.FeatureTable_Entity{ + {Name: "merchant_uuid", Labels: map[string]string{ + "description": "merchant uuid", + "value_type": "STRING", + }}, + }, + Features: []*v1beta2.Feature{ + {Name: "ongoing_placed_and_waiting_acceptance_orders", DataType: "INT64"}, + {Name: "ongoing_orders", DataType: "INT64"}, + {Name: "merchant_avg_dispatch_arrival_time_10m", DataType: "FLOAT"}, + {Name: "ongoing_accepted_orders", DataType: "INT64"}, + }, + CreateTime: timestamppb.New(time.Date(2022, time.September, 19, 22, 42, 0o4, 0, time.UTC)), + UpdateTime: timestamppb.New(time.Date(2022, time.September, 21, 13, 23, 0o2, 0, time.UTC)), + }), + Lineage: &v1beta2.Lineage{ + Upstreams: []*v1beta2.Resource{ + { + Urn: "urn:kafka:int-dagstream-kafka.yonkou.io:topic:GO_FOOD-delay-allocation-merchant-feature-10m-log", + Service: "kafka", + Type: "topic", + }, + }, + }, + }, + expected: &v1beta2.Asset{ + Urn: "urn:caramlstore:test-caramlstore:feature_table:avg_dispatch_arrival_time_10_mins", + Name: "avg_dispatch_arrival_time_10_mins", + Service: "caramlstore", + Type: "feature_table", + Data: testutils.BuildAny(t, &v1beta2.FeatureTable{ + Namespace: "sauron", + Entities: []*v1beta2.FeatureTable_Entity{ + {Name: "merchant_uuid", Labels: map[string]string{ + "catch_phrase": "You talkin' to me?", + "description": "merchant uuid", + "value_type": "STRING", + }}, + }, + Features: []*v1beta2.Feature{ + {Name: "ongoing_placed_and_waiting_acceptance_orders", DataType: "INT64", EntityName: "customer_orders"}, + {Name: "ongoing_orders", DataType: "INT64", EntityName: "customer_orders"}, + {Name: "merchant_avg_dispatch_arrival_time_10m", DataType: "FLOAT", EntityName: "merchant_driver"}, + {Name: "ongoing_accepted_orders", DataType: "INT64", EntityName: "merchant_orders"}, + }, + CreateTime: timestamppb.New(time.Date(2022, time.September, 19, 22, 42, 0o4, 0, time.UTC)), + UpdateTime: timestamppb.New(time.Date(2022, time.September, 22, 13, 23, 0o2, 0, time.UTC)), + }), + Lineage: &v1beta2.Lineage{ + Upstreams: []*v1beta2.Resource{ + { + Urn: "urn:kafka:int-dagstream-kafka:topic:GO_FOOD-delay-allocation-merchant-feature-10m-log", + Service: "kafka", + Type: "topic", + }, + }, + }, + Labels: map[string]string{"script_engine": "tengo"}, + Owners: []*v1beta2.Owner{{Name: "Big Mom", Email: "big.mom@wholecakeisland.com"}}, + }, + }, + { + name: "UnknownFields", + script: heredoc.Doc(` + asset.does_not_exist = "value" + `), + input: &v1beta2.Asset{ + Data: &anypb.Any{TypeUrl: "type.googleapis.com/odpf.assets.v1beta2.Table"}, + }, + expected: nil, + errStr: "invalid keys: does_not_exist", + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + p := New(testutils.Logger) + err := p.Init(ctx, plugins.Config{ + RawConfig: map[string]interface{}{ + "script": tc.script, + "engine": "tengo", + }, + }) + if !assert.NoError(t, err) { + return + } + + res, err := p.Process(ctx, models.NewRecord(tc.input)) + if tc.errStr == "" { + assert.NoError(t, err) + } else { + assert.ErrorContains(t, err, tc.errStr) + } + + testutils.AssertEqualProto(t, tc.expected, res.Data()) + }) + } +} diff --git a/test/utils/any.go b/test/utils/any.go index 6bb955322..e9248f7ce 100644 --- a/test/utils/any.go +++ b/test/utils/any.go @@ -9,6 +9,8 @@ import ( ) func BuildAny(t *testing.T, protoMessage protoreflect.ProtoMessage) *anypb.Any { + t.Helper() + res, err := anypb.New(protoMessage) require.NoError(t, err) diff --git a/test/utils/assert.go b/test/utils/assert.go index 89e4c74df..8c8d95561 100644 --- a/test/utils/assert.go +++ b/test/utils/assert.go @@ -1,16 +1,35 @@ package utils import ( + "fmt" "os" "testing" - "google.golang.org/protobuf/encoding/protojson" - + "github.com/google/go-cmp/cmp" "github.com/nsf/jsondiff" v1beta2 "github.com/odpf/meteor/models/odpf/assets/v1beta2" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/testing/protocmp" ) +func AssertEqualProto(t *testing.T, expected, actual proto.Message) { + t.Helper() + + if diff := cmp.Diff(actual, expected, protocmp.Transform()); diff != "" { + msg := fmt.Sprintf( + "Not equal:\n"+ + "expected:\n\t'%s'\n"+ + "actual:\n\t'%s'\n"+ + "diff (-expected +actual):\n%s", + expected, actual, diff, + ) + assert.Fail(t, msg) + } +} + func AssertAssetsWithJSON(t *testing.T, expected, actuals []*v1beta2.Asset) { t.Helper()