From 2a2143bb59f49aaecfe088039e722d4895a2ff6e Mon Sep 17 00:00:00 2001 From: Suhas Karanth Date: Wed, 11 Jan 2023 08:15:56 +0530 Subject: [PATCH] feat: add HTTP extractor (#442) - Add HTTP extractor, a generic extractor that uses a script capable of emitting any type of asset. - Fix config error field key to support nested fields. - Disallow import of os module from scripts. - Remove unnecessary init method on application extractor. - Add test helper to assert slice of protos. --- docs/docs/reference/extractors.md | 43 +- plugins/base_plugin_test.go | 4 + plugins/extractors/application_yaml/README.md | 2 +- .../application_yaml/application_yaml.go | 9 - plugins/extractors/caramlstore/README.md | 10 +- plugins/extractors/http/README.md | 281 ++++++++ plugins/extractors/http/execute_request.go | 125 ++++ plugins/extractors/http/execute_script.go | 169 +++++ .../extractors/http/execute_script_test.go | 119 ++++ plugins/extractors/http/http_extractor.go | 125 ++++ .../extractors/http/http_extractor_test.go | 654 ++++++++++++++++++ plugins/extractors/populate.go | 1 + plugins/internal/tengoutil/secure_script.go | 5 +- .../internal/tengoutil/secure_script_test.go | 9 +- plugins/processors/script/README.md | 11 +- plugins/util.go | 5 +- test/utils/assert.go | 37 + 17 files changed, 1584 insertions(+), 25 deletions(-) create mode 100644 plugins/extractors/http/README.md create mode 100644 plugins/extractors/http/execute_request.go create mode 100644 plugins/extractors/http/execute_script.go create mode 100644 plugins/extractors/http/execute_script_test.go create mode 100644 plugins/extractors/http/http_extractor.go create mode 100644 plugins/extractors/http/http_extractor_test.go diff --git a/docs/docs/reference/extractors.md b/docs/docs/reference/extractors.md index 8d49b226a..275c111a8 100644 --- a/docs/docs/reference/extractors.md +++ b/docs/docs/reference/extractors.md @@ -1,6 +1,10 @@ # Extractors -Meteor currently supports metadata extraction on these data sources. To perform extraction on any of these you need to create a recipe file with instructions as mentioned [here](../concepts/recipe.md). In the `sample-recipe.yaml` add `source` information such as `type` from the table below and `config` for that particular extractor can be found by visiting the link in the `type` field. +Meteor currently supports metadata extraction on these data sources. To perform +extraction on any of these you need to create a recipe file with instructions as +mentioned [here](../concepts/recipe.md). In the `sample-recipe.yaml` +add `source` information such as `type` from the table below and `config` for +that particular extractor can be found by visiting the link in the `type` field. ## Extractors Feature Matrix @@ -31,7 +35,6 @@ Meteor currently supports metadata extraction on these data sources. To perform | [`tableau`][tableau-readme] | ✅ | ✅ | ✅ | ✗ | ✗ | | [`redash`][redash-readme] | ✅ | ✗ | ✗ | ✗ | ✗ | - ### Topic | Type | Profile | Schema | Ownership | Lineage | Tags | Custom | @@ -76,31 +79,67 @@ Meteor currently supports metadata extraction on these data sources. To perform |:--------------------------|:----------|:----------|:------------|:-------| | [`merlin`][merlin-readme] | ✅ | ✅ | ✗ | ✅ | ✅ | +### Generic + +These are special type of extractors that are capable of extracting _any_ type +of asset. + +| Type | Ownership | Upstreams | Downstreams | Custom | +|:----------------------|:----------|:----------|:------------|:-------| +| [`http`][http-readme] | ✅ | ✅ | ✅ | ✅ | ✅ | + [clickhouse-readme]: https://github.com/odpf/meteor/tree/main/plugins/extractors/clickhouse/README.md + [couchdb-readme]: https://github.com/odpf/meteor/tree/main/plugins/extractors/couchdb/README.md + [mongodb-readme]: https://github.com/odpf/meteor/tree/main/plugins/extractors/mongodb/README.md + [mssql-readme]: https://github.com/odpf/meteor/tree/main/plugins/extractors/mssql/README.md + [mysql-readme]: https://github.com/odpf/meteor/tree/main/plugins/extractors/mysql/README.md + [postgres-readme]: https://github.com/odpf/meteor/tree/main/plugins/extractors/postgres/README.md + [cassandra-readme]: https://github.com/odpf/meteor/tree/main/plugins/extractors/cassandra/README.md + [oracle-readme]: https://github.com/odpf/meteor/tree/main/plugins/extractors/oracle/README.md + [mariadb-readme]: https://github.com/odpf/meteor/tree/main/plugins/extractors/mariadb/README.md + [redshift-readme]: https://github.com/odpf/meteor/tree/main/plugins/extractors/redshift/README.md + [presto-readme]: https://github.com/odpf/meteor/tree/main/plugins/extractors/presto/README.md + [snowflake-readme]: https://github.com/odpf/meteor/tree/main/plugins/extractors/snowflake/README.md + [grafana-readme]: https://github.com/odpf/meteor/tree/main/plugins/extractors/grafana/README.md + [metabase-readme]: https://github.com/odpf/meteor/tree/main/plugins/extractors/metabase/README.md + [superset-readme]: https://github.com/odpf/meteor/tree/main/plugins/extractors/superset/README.md + [tableau-readme]: https://github.com/odpf/meteor/tree/main/plugins/extractors/tableau/README.md + [redash-readme]: https://github.com/odpf/meteor/tree/main/plugins/extractors/redash/README.md + [kafka-readme]: https://github.com/odpf/meteor/tree/main/plugins/extractors/kafka/README.md + [github-readme]: https://github.com/odpf/meteor/tree/main/plugins/extractors/github/README.md + [shield-readme]: https://github.com/odpf/meteor/tree/main/plugins/extractors/shield/README.md + [gsuite-readme]: https://github.com/odpf/meteor/tree/main/plugins/extractors/gsuite/README.md + [gcs-readme]: https://github.com/odpf/meteor/tree/main/plugins/extractors/gcs/README.md + [optimus-readme]: https://github.com/odpf/meteor/tree/main/plugins/extractors/optimus/README.md + [caramlstore-readme]: https://github.com/odpf/meteor/tree/main/plugins/extractors/caramlstore/README.md + [application-yaml-readme]: https://github.com/odpf/meteor/tree/main/plugins/extractors/application_yaml/README.md + [merlin-readme]: https://github.com/odpf/meteor/tree/main/plugins/extractors/merlin/README.md + +[http-readme]: https://github.com/odpf/meteor/tree/main/plugins/extractors/http/README.md diff --git a/plugins/base_plugin_test.go b/plugins/base_plugin_test.go index 9b1c6f6f8..e18d627ae 100644 --- a/plugins/base_plugin_test.go +++ b/plugins/base_plugin_test.go @@ -54,6 +54,9 @@ func TestBasePluginValidate(t *testing.T) { invalidConfig := struct { FieldA string `mapstructure:"field_a" validate:"required"` FieldB string `mapstructure:"field_b" validate:"url"` + Nested struct { + FieldC string `mapstructure:"field_c" validate:"required"` + } `mapstructure:"nested"` }{} basePlugin := plugins.NewBasePlugin(plugins.Info{}, &invalidConfig) @@ -66,6 +69,7 @@ func TestBasePluginValidate(t *testing.T) { Errors: []plugins.ConfigError{ {Key: "field_a", Message: "validation for field 'field_a' failed on the 'required' tag"}, {Key: "field_b", Message: "validation for field 'field_b' failed on the 'url' tag"}, + {Key: "nested.field_c", Message: "validation for field 'nested.field_c' failed on the 'required' tag"}, }, }) }) diff --git a/plugins/extractors/application_yaml/README.md b/plugins/extractors/application_yaml/README.md index 0da81b6db..dcd36a065 100644 --- a/plugins/extractors/application_yaml/README.md +++ b/plugins/extractors/application_yaml/README.md @@ -18,7 +18,7 @@ source: | Key | Value | Example | Description | Required? | |:-------------|:---------|:------------------|:-----------------------------------------------------------------------------------------------------------------------|:----------| | `file` | `string` | `meteor.app.yaml` | File path of `application.yaml` | ✅ | -| `env_prefix` | `string` | `CI` | Prefix for environment variables. These are made available as variables in `application.yaml` with the prefix trimmed. | ❌ | +| `env_prefix` | `string` | `CI` | Prefix for environment variables. These are made available as variables in `application.yaml` with the prefix trimmed. | ✘ | ### `application.yaml` format diff --git a/plugins/extractors/application_yaml/application_yaml.go b/plugins/extractors/application_yaml/application_yaml.go index 32189983e..f3633475d 100644 --- a/plugins/extractors/application_yaml/application_yaml.go +++ b/plugins/extractors/application_yaml/application_yaml.go @@ -73,15 +73,6 @@ func New(logger log.Logger) *Extractor { return &e } -// Init initializes the extractor -func (e *Extractor) Init(ctx context.Context, config plugins.Config) error { - if err := e.BaseExtractor.Init(ctx, config); err != nil { - return err - } - - return nil -} - func (e *Extractor) Extract(_ context.Context, emit plugins.Emit) error { tmpl, err := template.ParseFiles(e.config.File) if err != nil { diff --git a/plugins/extractors/caramlstore/README.md b/plugins/extractors/caramlstore/README.md index 9e6834dd6..c24ad2bdf 100644 --- a/plugins/extractors/caramlstore/README.md +++ b/plugins/extractors/caramlstore/README.md @@ -19,11 +19,11 @@ source: ## Inputs -| Key | Value | Example | Description | Required? | -|:------------------|:---------|:----------------------|:----------------------------------------------------------|:----------| -| `url` | `string` | `caraml-store.com:80` | caraml-store's host URL | ✅ | -| `max_size_in_mb` | `int` | `10` | Max MB for gRPC client to receive message. Default is 45. | ❌ | -| `request_timeout` | `string` | `10s` | Timeout for gRPC requests to caraml-store | ❌ | +| Key | Value | Example | Description | Required? | +|:------------------|:---------|:----------------------|:-----------------------------------------------------------|:----------| +| `url` | `string` | `caraml-store.com:80` | caraml-store's host URL | ✅ | +| `max_size_in_mb` | `int` | `10` | Max MB for gRPC client to receive message. Default is 45. | ✘ | +| `request_timeout` | `string` | `10s` | Timeout for gRPC requests to caraml-store. Default is 10s. | ✘ | ## Outputs diff --git a/plugins/extractors/http/README.md b/plugins/extractors/http/README.md new file mode 100644 index 000000000..92f6a9144 --- /dev/null +++ b/plugins/extractors/http/README.md @@ -0,0 +1,281 @@ +# http + +Generic Extractor capable of using the HTTP response from an external API for +constructing the following assets types: + +- [`Bucket`][proton-bucket] +- [`Dashboard`][proton-dashboard] +- [`Experiment`][proton-experiment] +- [`FeatureTable`][proton-featuretable] +- [`Group`][proton-group] +- [`Job`][proton-job] +- [`Metric`][proton-metric] +- [`Model`][proton-model] +- [`Application`][proton-application] +- [`Table`][proton-table] +- [`Topic`][proton-topic] +- [`User`][proton-user] + +The user specified script has access to the response, if the API call was +successful, and can use it for constructing and emitting assets using a custom +script. Currently, [Tengo][tengo] is the only supported script engine. + +Refer Tengo documentation for script language syntax and supported functionality +\- https://github.com/d5/tengo/tree/v2.13.0#references +. [Tengo standard library modules][tengo-stdlib] can also be imported and used +if required. + +## Usage + +```yaml +source: + scope: odpf + type: http + config: + request: + url: "https://example.com/api/v1/endpoint" + query_params: + - key: param_key + value: param_value + method: "POST" + headers: + "User-Id": "1a4336bc-bc6a-4972-83c1-d6426b4d79c3" + content_type: application/json + accept: application/json + body: + key: value + timeout: 5s + success_codes: [ 200 ] + script: + engine: tengo + source: | + asset := new_asset("user") + // modify the asset using 'response'... + emit(asset) +``` + +## Inputs + +| Key | Value | Example | Description | Required? | +|:-----------------------|:--------------------|:---------------------------------------|:------------------------------------------------------------------------------------------------|:----------| +| `request.url` | `string` | `http://example.com/api/v1/endpoint` | The HTTP endpoint to send request to | ✅ | +| `request.query_params` | `[]{key, value}` | `[{"key":"s","value":"One Piece"}]` | The query parameters to be added to the request URL. | ✘ | +| `request.method` | `string` | `GET`/`POST` | The HTTP verb/method to use with request. Default is `GET`. | ✘ | +| `request.headers` | `map[string]string` | `{"Api-Token": "..."}` | Headers to send in the HTTP request. | ✘ | +| `request.content_type` | `string` | `application/json` | Content type for encoding request body. Also sent as a header. | ✅ | +| `request.accept` | `string` | `application/json` | Sent as the `Accept` header. Also indicates the format to use for decoding. | ✅ | +| `request.body` | `Object` | `{"key": "value"}` | The request body to be sent. | ✘ | +| `request.timeout` | `string` | `1s` | Timeout for the HTTP request. Default is 5s. | ✘ | +| `success_codes` | `[]int` | `[200]` | The list of status codes that would be considered as a successful response. Default is `[200]`. | ✘ | +| `script.engine` | `string` | `tengo` | Script engine. Only `"tengo"` is supported currently | ✅ | +| `script.source` | `string` | see [Worked Example](#worked-example). | [Tengo][tengo] script used to map the response into 0 or more assets. | ✅ | + +### Notes + +- In case of conflicts between query parameters present in `request.url` + and `request.query_params`, `request.query_params` takes precedence. +- Currently, only `application/json` is supported for encoding the request body + and for decoding the response body. If `Content-Type` and `Accept` headers are + added under `request.headers`, they will be ignored and overridden. +- Script is only executed if the response status code matches + the `success_codes` provided. +- Tengo is the only supported script engine. +- Tengo's `os` stdlib module cannot be imported and used in the script. + +### Script Globals + +#### `response` + +HTTP response received. + +#### `new_asset(string): Asset` + +Takes a single string parameter and returns an asset instance. The `type` +parameter can be one of the following: + +- `"bucket"` ([proto][proton-bucket]) +- `"dashboard"` ([proto][proton-dashboard]) +- `"experiment"` ([proto][proton-experiment]) +- `"feature_table"` ([proto][proton-featuretable]) +- `"group"` ([proto][proton-group]) +- `"job"` ([proto][proton-job]) +- `"metric"` ([proto][proton-metric]) +- `"model"` ([proto][proton-model]) +- `"application"` ([proto][proton-application]) +- `"table"` ([proto][proton-table]) +- `"topic"` ([proto][proton-topic]) +- `"user"` ([proto][proton-user]) + +The asset can then be modified in the script to set properties that are +available for the given asset type. + +**WARNING:** Do not overwrite the `data` property, set fields on it instead. +Translating script object into proto fails otherwise. + +```go +// Bad +asset.data = {full_name: "Daiyamondo Jozu"} + +// Good +asset.data.full_name = "Daiyamondo Jozu" +``` + +#### `emit(Asset)` + +Takes an asset and emits the asset that can then be consumed by the +processor/sink. + +#### `exit()` + +Terminates the script execution. + +## Output + +The output of the extractor depends on the user specified script. It can emit 0 +or more assets. + +### Worked Example + +Lets consider a service that returns a list of users on making a `GET` call on +the endpoint `http://my_user_service.company.com/api/v1/users` in the following +format: + +```json +{ + "success": "" + "message": "", + "data": [ + { + "manager_name": "", + "terminated": "", + "fullname": "", + "location_name": "", + "work_email": "", + "supervisory_org_id": "", + "supervisory_org_name": "", + "preferred_last_name": "", + "business_title": "", + "company_name": "", + "cost_center_id": "", + "preferred_first_name": "", + "product_name": "", + "cost_center_name": "", + "employee_id": "", + "manager_id": "", + "location_id": "", + "manager_id_2": "", + "termination_date": "", + "company_hierarchy": "", + "company_id": "", + "preferred_middle_name": "", + "preferred_social_suffix": "", + "legal_middle_name": "", + "manager_email_2": "", + "legal_first_name": "", + "manager_name_2": "", + "manager_email": "", + "legal_last_name": "" + } + ] +} +``` + +Assuming the authentication can be done using an `Api-Token` header, we can use +the following recipe: + +```yaml +source: + scope: production + type: http + config: + request: + url: "http://my_user_service.company.com/api/v1/users" + method: "GET" + headers: + "Api-Token": "1a4336bc-bc6a-4972-83c1-d6426b4d79c3" + content_type: application/json + accept: application/json + timeout: 5s + success_codes: [ 200 ] + script: + engine: tengo + source: | + if !response.success { + exit() + } + + users := response.data + for u in users { + if u.email == "" { + continue + } + + asset := new_asset("user") + // URN format: "urn:{service}:{scope}:{type}:{id}" + asset.urn = format("urn:%s:staging:user:%s", "my_usr_svc", u.employee_id) + asset.name = u.fullname + asset.service = "my_usr_svc" + // asset.type = "user" // not required, new_asset("user") sets the field. + asset.data.email = u.work_email + asset.data.username = u.employee_id + asset.data.first_name = u.legal_first_name + asset.data.last_name = u.legal_last_name + asset.data.full_name = u.fullname + asset.data.display_name = u.fullname + asset.data.title = u.business_title + asset.data.status = u.terminated == "true" ? "suspended" : "active" + asset.data.manager_email = u.manager_email + asset.data.attributes = { + manager_id: u.manager_id, + cost_center_id: u.cost_center_id, + supervisory_org_name: u.supervisory_org_name, + location_id: u.location_id + } + emit(asset) + } +``` + +This would emit a 'User' asset for each user object in `response.data`. + +## Caveats + +The following features are currently not supported: + +- Pagination. +- Explicit authentication support, ex: Basic auth/OAuth/OAuth2/JWT etc. +- Retries with configurable backoff. +- Content type for request/response body other than `application/json`. + +## Contributing + +Refer to +the [contribution guidelines](../../../docs/docs/contribute/guide.md#adding-a-new-extractor) +for information on contributing to this module. + +[proton-bucket]: https://github.com/odpf/proton/blob/fabbde8/odpf/assets/v1beta2/bucket.proto#L13 + +[proton-dashboard]: https://github.com/odpf/proton/blob/fabbde8/odpf/assets/v1beta2/dashboard.proto#L14 + +[proton-experiment]: https://github.com/odpf/proton/blob/fabbde8/odpf/assets/v1beta2/experiment.proto#L15 + +[proton-featuretable]: https://github.com/odpf/proton/blob/fabbde8/odpf/assets/v1beta2/feature_table.proto#L32 + +[proton-group]: https://github.com/odpf/proton/blob/fabbde8/odpf/assets/v1beta2/group.proto#L12 + +[proton-job]: https://github.com/odpf/proton/blob/fabbde8/odpf/assets/v1beta2/job.proto#L13 + +[proton-metric]: https://github.com/odpf/proton/blob/fabbde8/odpf/assets/v1beta2/metric.proto#L13 + +[proton-model]: https://github.com/odpf/proton/blob/fabbde8/odpf/assets/v1beta2/model.proto#L17 + +[proton-application]: https://github.com/odpf/proton/blob/fabbde8/odpf/assets/v1beta2/application.proto#L11 + +[proton-table]: https://github.com/odpf/proton/blob/fabbde8/odpf/assets/v1beta2/table.proto#L14 + +[proton-topic]: https://github.com/odpf/proton/blob/fabbde8/odpf/assets/v1beta2/topic.proto#L14 + +[proton-user]: https://github.com/odpf/proton/blob/fabbde8/odpf/assets/v1beta2/user.proto#L15 + +[tengo]: https://github.com/d5/tengo + +[tengo-stdlib]: https://github.com/d5/tengo/blob/v2.13.0/docs/stdlib.md diff --git a/plugins/extractors/http/execute_request.go b/plugins/extractors/http/execute_request.go new file mode 100644 index 000000000..5bfe6a1c1 --- /dev/null +++ b/plugins/extractors/http/execute_request.go @@ -0,0 +1,125 @@ +package http + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" +) + +func (e *Extractor) executeRequest(ctx context.Context) (interface{}, error) { + cfg := e.config + + ctx, cancel := context.WithTimeout(ctx, cfg.Request.Timeout) + defer cancel() + + req, err := buildRequest(ctx, cfg) + if err != nil { + return nil, err + } + + resp, err := e.http.Do(req) + defer drainBody(resp) + if err != nil { + return nil, fmt.Errorf("do request: %w", err) + } + + return handleResponse(cfg, resp) +} + +func buildRequest(ctx context.Context, cfg Config) (*http.Request, error) { + reqCfg := cfg.Request + + body, err := asReader(reqCfg.Body) + if err != nil { + return nil, fmt.Errorf("encode request body: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, reqCfg.Method, reqCfg.URL, body) + if err != nil { + return nil, fmt.Errorf("create HTTP request: %w", err) + } + + addQueryParams(req, reqCfg.QueryParams) + + for name, v := range reqCfg.Headers { + req.Header.Set(name, v) + } + if req.Body != nil && req.Body != http.NoBody { + req.Header.Set("Content-Type", "application/json") + } + req.Header.Set("Accept", "application/json") + + return req, nil +} + +func addQueryParams(req *http.Request, params []QueryParam) { + if len(params) == 0 { + return + } + + q := req.URL.Query() + // First delete any possible conflicts. Cannot be done in a single loop + // because params can have multiple entries with the same key. + for _, p := range params { + q.Del(p.Key) + } + for _, p := range params { + q.Add(p.Key, p.Value) + } + req.URL.RawQuery = q.Encode() +} + +func handleResponse(cfg Config, resp *http.Response) (interface{}, error) { + if !has(cfg.SuccessCodes, resp.StatusCode) { + return nil, fmt.Errorf("unsuccessful request: response status code: %d", resp.StatusCode) + } + + var res interface{} + if err := json.NewDecoder(resp.Body).Decode(&res); err != nil { + return nil, fmt.Errorf("decode response: %w", err) + } + + return res, nil +} + +func asReader(v interface{}) (io.Reader, error) { + if v == nil { + return nil, nil + } + + if body, ok := v.(string); ok { + return bytes.NewBufferString(body), nil + } + + var buf bytes.Buffer + if err := json.NewEncoder(&buf).Encode(v); err != nil { + return nil, err + } + + return &buf, nil +} + +// drainBody drains and closes the response body to avoid the following +// gotcha: +// http://devs.cloudimmunity.com/gotchas-and-common-mistakes-in-go-golang/index.html#close_http_resp_body +func drainBody(resp *http.Response) { + if resp == nil { + return + } + + _, _ = io.Copy(io.Discard, resp.Body) + _ = resp.Body.Close() +} + +func has(haystack []int, needle int) bool { + for _, n := range haystack { + if n == needle { + return true + } + } + + return false +} diff --git a/plugins/extractors/http/execute_script.go b/plugins/extractors/http/execute_script.go new file mode 100644 index 000000000..8a8e95976 --- /dev/null +++ b/plugins/extractors/http/execute_script.go @@ -0,0 +1,169 @@ +package http + +import ( + "context" + "errors" + "fmt" + + "github.com/d5/tengo/v2" + "github.com/odpf/meteor/models" + v1beta2 "github.com/odpf/meteor/models/odpf/assets/v1beta2" + "github.com/odpf/meteor/plugins" + "github.com/odpf/meteor/plugins/internal/tengoutil" + "github.com/odpf/meteor/plugins/internal/tengoutil/structmap" + "google.golang.org/protobuf/proto" +) + +func (e *Extractor) executeScript(ctx context.Context, res interface{}, emit plugins.Emit) error { + s := tengoutil.NewSecureScript(([]byte)(e.config.Script.Source)) + if err := declareGlobals(s, res, emit); err != nil { + return fmt.Errorf("declare globals: %w", err) + } + + c, err := s.Compile() + if err != nil { + return fmt.Errorf("compile: %w", err) + } + + if err := c.RunContext(ctx); err != nil && !errors.Is(err, errUserExit) { + return fmt.Errorf("run: %w", err) + } + + return nil +} + +func declareGlobals(s *tengo.Script, res interface{}, emit plugins.Emit) error { + for name, v := range map[string]interface{}{ + "response": res, + "new_asset": &tengo.UserFunction{ + Name: "new_asset", + Value: newAssetWrapper(), + }, + "emit": &tengo.UserFunction{ + Name: "emit", + Value: emitWrapper(emit), + }, + "exit": &tengo.UserFunction{ + Name: "exit", + Value: func(...tengo.Object) (tengo.Object, error) { + return nil, errUserExit + }, + }, + } { + if err := s.Add(name, v); err != nil { + return fmt.Errorf("declare script globals: %w", err) + } + } + return nil +} + +func newAssetWrapper() tengo.CallableFunc { + typeURLs := knownTypeURLs() + return func(args ...tengo.Object) (tengo.Object, error) { + if len(args) != 1 { + return nil, tengo.ErrWrongNumArguments + } + + typ, ok := tengo.ToString(args[0]) + if !ok { + return nil, tengo.ErrInvalidArgumentType{ + Name: "typ", + Expected: "string(compatible)", + Found: args[0].TypeName(), + } + } + + return newAsset(typeURLs, typ) + } +} + +func emitWrapper(emit plugins.Emit) tengo.CallableFunc { + return func(args ...tengo.Object) (tengo.Object, error) { + if len(args) != 1 { + return nil, tengo.ErrWrongNumArguments + } + + m, ok := tengo.ToInterface(args[0]).(map[string]interface{}) + if !ok { + return nil, tengo.ErrInvalidArgumentType{ + Name: "asset", + Expected: "Map", + Found: args[0].TypeName(), + } + } + + var ast v1beta2.Asset + if err := structmap.AsStruct(m, &ast); err != nil { + return nil, fmt.Errorf("emit asset: %w", err) + } + + emit(models.NewRecord(&ast)) + + return tengo.UndefinedValue, nil + } +} + +func newAsset(typeURLs map[string]string, typ string) (tengo.Object, error) { + u, ok := typeURLs[typ] + if !ok { + return nil, fmt.Errorf("new asset: unexpected type: %s", typ) + } + + return &tengo.Map{ + Value: map[string]tengo.Object{ + "type": &tengo.String{Value: typ}, + "data": &tengo.Map{ + Value: map[string]tengo.Object{ + "@type": &tengo.String{Value: u}, + }, + }, + }, + }, nil +} + +func knownTypeURLs() map[string]string { + typeURLs := make(map[string]string, 12) + for _, typ := range []string{ + "bucket", "dashboard", "experiment", "feature_table", "group", + "job", "metric", "model", "application", "table", "topic", "user", + } { + typeURLs[typ] = typeURL(typ) + } + return typeURLs +} + +func typeURL(typ string) string { + const prefix = "type.googleapis.com/" + + var msg proto.Message + switch typ { + case "bucket": + msg = &v1beta2.Bucket{} + case "dashboard": + msg = &v1beta2.Dashboard{} + case "experiment": + msg = &v1beta2.Experiment{} + case "feature_table": + msg = &v1beta2.FeatureTable{} + case "group": + msg = &v1beta2.Group{} + case "job": + msg = &v1beta2.Job{} + case "metric": + msg = &v1beta2.Metric{} + case "model": + msg = &v1beta2.Model{} + case "application": + msg = &v1beta2.Application{} + case "table": + msg = &v1beta2.Table{} + case "topic": + msg = &v1beta2.Topic{} + case "user": + msg = &v1beta2.User{} + default: + panic(fmt.Errorf("unexpected type name: %s", typ)) + } + + return prefix + (string)(msg.ProtoReflect().Descriptor().FullName()) +} diff --git a/plugins/extractors/http/execute_script_test.go b/plugins/extractors/http/execute_script_test.go new file mode 100644 index 000000000..6701ee0e1 --- /dev/null +++ b/plugins/extractors/http/execute_script_test.go @@ -0,0 +1,119 @@ +//go:build plugins +// +build plugins + +package http + +import ( + "testing" + + "github.com/d5/tengo/v2" + v1beta2 "github.com/odpf/meteor/models/odpf/assets/v1beta2" + "github.com/odpf/meteor/plugins/internal/tengoutil/structmap" + testutils "github.com/odpf/meteor/test/utils" + "github.com/stretchr/testify/assert" +) + +func TestNewAsset(t *testing.T) { + cases := []struct { + typ string + expected *v1beta2.Asset + }{ + { + typ: "bucket", + expected: &v1beta2.Asset{ + Type: "bucket", + Data: testutils.BuildAny(t, &v1beta2.Bucket{}), + }, + }, + { + typ: "dashboard", + expected: &v1beta2.Asset{ + Type: "dashboard", + Data: testutils.BuildAny(t, &v1beta2.Dashboard{}), + }, + }, + { + typ: "experiment", + expected: &v1beta2.Asset{ + Type: "experiment", + Data: testutils.BuildAny(t, &v1beta2.Experiment{}), + }, + }, + { + typ: "feature_table", + expected: &v1beta2.Asset{ + Type: "feature_table", + Data: testutils.BuildAny(t, &v1beta2.FeatureTable{}), + }, + }, + { + typ: "group", + expected: &v1beta2.Asset{ + Type: "group", + Data: testutils.BuildAny(t, &v1beta2.Group{}), + }, + }, + { + typ: "job", + expected: &v1beta2.Asset{ + Type: "job", + Data: testutils.BuildAny(t, &v1beta2.Job{}), + }, + }, + { + typ: "metric", + expected: &v1beta2.Asset{ + Type: "metric", + Data: testutils.BuildAny(t, &v1beta2.Metric{}), + }, + }, + { + typ: "model", + expected: &v1beta2.Asset{ + Type: "model", + Data: testutils.BuildAny(t, &v1beta2.Model{}), + }, + }, + { + typ: "application", + expected: &v1beta2.Asset{ + Type: "application", + Data: testutils.BuildAny(t, &v1beta2.Application{}), + }, + }, + { + typ: "table", + expected: &v1beta2.Asset{ + Type: "table", + Data: testutils.BuildAny(t, &v1beta2.Table{}), + }, + }, + { + typ: "topic", + expected: &v1beta2.Asset{ + Type: "topic", + Data: testutils.BuildAny(t, &v1beta2.Topic{}), + }, + }, + { + typ: "user", + expected: &v1beta2.Asset{ + Type: "user", + Data: testutils.BuildAny(t, &v1beta2.User{}), + }, + }, + } + for _, tc := range cases { + typeURLs := knownTypeURLs() + t.Run(tc.typ, func(t *testing.T) { + obj, err := newAsset(typeURLs, tc.typ) + assert.NoError(t, err) + + var ast *v1beta2.Asset + err = structmap.AsStruct(tengo.ToInterface(obj), &ast) + assert.NoError(t, err) + + testutils.AssertEqualProto(t, tc.expected, ast) + }) + } +} diff --git a/plugins/extractors/http/http_extractor.go b/plugins/extractors/http/http_extractor.go new file mode 100644 index 000000000..703463d53 --- /dev/null +++ b/plugins/extractors/http/http_extractor.go @@ -0,0 +1,125 @@ +package http + +import ( + "context" + _ "embed" // used to print the embedded assets + "errors" + "fmt" + "net/http" + "time" + + "github.com/MakeNowJust/heredoc" + "github.com/odpf/meteor/plugins" + "github.com/odpf/meteor/registry" + "github.com/odpf/salt/log" +) + +var errUserExit = errors.New("user exit") + +// init register the extractor to the catalog +func init() { + if err := registry.Extractors.Register("http", func() plugins.Extractor { + return New(plugins.GetLog()) + }); err != nil { + panic(err) + } +} + +//go:embed README.md +var summary string + +// Config holds the set of configuration for the HTTP extractor. +type Config struct { + Request struct { + URL string `mapstructure:"url" validate:"required,url"` + QueryParams []QueryParam `mapstructure:"query_params" validate:"dive"` + Method string `mapstructure:"method" validate:"oneof=GET POST" default:"GET"` + Headers map[string]string `mapstructure:"headers"` + ContentType string `mapstructure:"content_type" validate:"required,oneof=application/json"` + Accept string `mapstructure:"accept" validate:"required,oneof=application/json"` + Body interface{} `mapstructure:"body"` + Timeout time.Duration `mapstructure:"timeout" validate:"min=1ms" default:"5s"` + } `mapstructure:"request"` + SuccessCodes []int `mapstructure:"success_codes" validate:"dive,gte=200,lt=300" default:"[200]"` + Script struct { + Engine string `mapstructure:"engine" validate:"required,oneof=tengo"` + Source string `mapstructure:"source" validate:"required"` + } `mapstructure:"script"` +} + +type QueryParam struct { + Key string `mapstructure:"key" validate:"required"` + Value string `mapstructure:"value" validate:"required"` +} + +var sampleConfig = heredoc.Doc(` + request: + url: "https://example.com/api/v1/endpoint" + method: "GET" + headers: + "User-Id": "1a4336bc-bc6a-4972-83c1-d6426b4d79c3" + content_type: application/json + accept: application/json + timeout: 5s + success_codes: [ 200 ] + script: + engine: tengo + source: | + asset := new_asset("user") + // modify the asset using 'response'... + emit(asset) +`) + +var info = plugins.Info{ + Description: "Generic Extractor capable of using the HTTP response from an external API for constructing 0 or more assets", + SampleConfig: sampleConfig, + Summary: summary, + Tags: []string{"http", "extractor"}, +} + +// Extractor is responsible for executing an HTTP request as per configuration +// and executing the script with the response to 'extract' assets from within +// the script. +type Extractor struct { + plugins.BaseExtractor + + logger log.Logger + config Config + http *http.Client +} + +// New returns a pointer to an initialized Extractor Object +func New(logger log.Logger) *Extractor { + e := &Extractor{ + logger: logger, + } + e.BaseExtractor = plugins.NewBaseExtractor(info, &e.config) + + return e +} + +// Init initializes the extractor +func (e *Extractor) Init(ctx context.Context, config plugins.Config) error { + if err := e.BaseExtractor.Init(ctx, config); err != nil { + return err + } + + e.http = &http.Client{} + return nil +} + +// Extract executes an HTTP request as per the configuration and if successful, +// executes the script. The script has access to the response and can use the +// same to 'emit' assets from within the script. +func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) error { + res, err := e.executeRequest(ctx) + if err != nil { + return fmt.Errorf("http extractor: execute request: %w", err) + } + + if err := e.executeScript(ctx, res, emit); err != nil { + return fmt.Errorf("http extractor: execute script: %w", err) + } + + return nil +} diff --git a/plugins/extractors/http/http_extractor_test.go b/plugins/extractors/http/http_extractor_test.go new file mode 100644 index 000000000..2f03b35c3 --- /dev/null +++ b/plugins/extractors/http/http_extractor_test.go @@ -0,0 +1,654 @@ +//go:build plugins +// +build plugins + +package http + +import ( + "context" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/MakeNowJust/heredoc" + v1beta2 "github.com/odpf/meteor/models/odpf/assets/v1beta2" + "github.com/odpf/meteor/plugins" + "github.com/odpf/meteor/test/mocks" + testutils "github.com/odpf/meteor/test/utils" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/protobuf/types/known/structpb" + "google.golang.org/protobuf/types/known/timestamppb" +) + +const urnScope = "test-http" + +var ctx = context.Background() + +func TestInit(t *testing.T) { + cases := []struct { + name string + rawCfg map[string]interface{} + expectedErr string + }{ + { + name: "ValidMinimal", + rawCfg: map[string]interface{}{ + "request": map[string]interface{}{ + "url": "http://example.com/api/v1/endpoint", + "content_type": "application/json", + "accept": "application/json", + }, + "script": map[string]interface{}{ + "engine": "tengo", + "source": "// do nothing", + }, + }, + }, + { + name: "ReqURLRequired", + rawCfg: map[string]interface{}{}, + expectedErr: "validation for field 'request.url' failed on the 'required' tag", + }, + { + name: "ReqURLInvalid", + rawCfg: map[string]interface{}{ + "request": map[string]interface{}{"url": "invalid_url"}, + }, + expectedErr: "validation for field 'request.url' failed on the 'url' tag", + }, + { + name: "ReqQueryParamInvalidKey", + rawCfg: map[string]interface{}{ + "request": map[string]interface{}{ + "query_params": []QueryParam{{Value: "v"}}, + }, + }, + expectedErr: "validation for field 'request.query_params[0].key' failed on the 'required' tag", + }, + { + name: "ReqQueryParamInvalidValue", + rawCfg: map[string]interface{}{ + "request": map[string]interface{}{ + "query_params": []QueryParam{{Key: "k"}}, + }, + }, + expectedErr: "validation for field 'request.query_params[0].value' failed on the 'required' tag", + }, + { + name: "ReqContentTypeRequired", + rawCfg: map[string]interface{}{}, + expectedErr: "validation for field 'request.content_type' failed on the 'required' tag", + }, + { + name: "ReqContentTypeUnsupported", + rawCfg: map[string]interface{}{ + "request": map[string]interface{}{ + "content_type": "application/text", + }, + }, + expectedErr: "validation for field 'request.content_type' failed on the 'oneof' tag", + }, + { + name: "ReqAcceptRequired", + rawCfg: map[string]interface{}{}, + expectedErr: "validation for field 'request.accept' failed on the 'required' tag", + }, + { + name: "ReqAcceptUnsupported", + rawCfg: map[string]interface{}{ + "request": map[string]interface{}{ + "accept": "application/text", + }, + }, + expectedErr: "validation for field 'request.accept' failed on the 'oneof' tag", + }, + { + name: "ReqMethodUnsupported", + rawCfg: map[string]interface{}{ + "request": map[string]interface{}{ + "method": "PUT", + }, + }, + expectedErr: "validation for field 'request.method' failed on the 'oneof' tag", + }, + { + name: "ReqTimeoutInvalid", + rawCfg: map[string]interface{}{ + "request": map[string]interface{}{ + "timeout": 1, + }, + }, + expectedErr: "validation for field 'request.timeout' failed on the 'min' tag", + }, + { + name: "SuccessCodesInvalid", + rawCfg: map[string]interface{}{ + "success_codes": []int{10000}, + }, + expectedErr: "validation for field 'success_codes[0]' failed on the 'lt' tag", + }, + { + name: "ScriptEngineRequired", + rawCfg: map[string]interface{}{}, + expectedErr: "validation for field 'script.engine' failed on the 'required' tag", + }, + { + name: "ScriptEngineUnsupported", + rawCfg: map[string]interface{}{ + "script": map[string]string{ + "engine": "mango", + }, + }, + expectedErr: "validation for field 'script.engine' failed on the 'oneof' tag", + }, + { + name: "ScriptSourceRequired", + rawCfg: map[string]interface{}{}, + expectedErr: "validation for field 'script.source' failed on the 'required' tag", + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + err := New(testutils.Logger).Init(ctx, plugins.Config{ + URNScope: urnScope, + RawConfig: tc.rawCfg, + }) + if tc.expectedErr != "" { + assert.ErrorAs(t, err, &plugins.InvalidConfigError{}, "should return error if config is invalid") + assert.ErrorContains(t, err, tc.expectedErr) + return + } + + assert.NoError(t, err) + }) + } +} + +func TestExtract(t *testing.T) { + cases := []struct { + name string + rawCfg map[string]interface{} + handler func(t *testing.T, w http.ResponseWriter, r *http.Request) + expected []*v1beta2.Asset + expectedErr string + }{ + { + name: "MatchRequestBasic", + rawCfg: map[string]interface{}{ + "request": map[string]interface{}{ + "url": "{{serverURL}}/api/v1/endpoint", + "content_type": "application/json", + "accept": "application/json", + }, + "script": map[string]interface{}{ + "engine": "tengo", + "source": "// do nothing", + }, + }, + handler: func(t *testing.T, w http.ResponseWriter, r *http.Request) { + assert.Equal(t, r.Method, http.MethodGet) + assert.Equal(t, r.URL.Path, "/api/v1/endpoint") + assert.Equal(t, r.URL.RawQuery, "") + h := r.Header + assert.Equal(t, "", h.Get("Content-Type")) + assert.Equal(t, "application/json", h.Get("Accept")) + data, err := io.ReadAll(r.Body) + assert.NoError(t, err) + assert.Empty(t, data) + + testutils.Respond(t, w, http.StatusOK, `[]`) + }, + }, + { + name: "MatchRequestAdvanced", + rawCfg: map[string]interface{}{ + "request": map[string]interface{}{ + "url": "{{serverURL}}/api/v1/endpoint?a=1", + "query_params": []QueryParam{ + {Key: "a", Value: "2"}, + {Key: "a", Value: "3"}, + {Key: "formula", Value: "a=b"}, + }, + "method": http.MethodPost, + "headers": map[string]string{"User-Id": "1a4336bc-bc6a-4972-83c1-d6426b4d79c3"}, + "content_type": "application/json", + "accept": "application/json", + "body": map[string]interface{}{ + "id": "urn:merlin:merlin-stg:model:46.218", + }, + }, + "success_codes": []int{http.StatusOK, http.StatusNonAuthoritativeInfo}, + "script": map[string]interface{}{ + "engine": "tengo", + "source": "// do nothing", + }, + }, + handler: func(t *testing.T, w http.ResponseWriter, r *http.Request) { + assert.Equal(t, r.Method, http.MethodPost) + assert.Equal(t, r.URL.Path, "/api/v1/endpoint") + assert.Equal(t, r.URL.RawQuery, "a=2&a=3&formula=a%3Db") + h := r.Header + assert.Equal(t, "application/json", h.Get("Content-Type")) + assert.Equal(t, "application/json", h.Get("Accept")) + assert.Equal(t, "1a4336bc-bc6a-4972-83c1-d6426b4d79c3", h.Get("User-Id")) + data, err := io.ReadAll(r.Body) + assert.NoError(t, err) + assert.JSONEq(t, `{"id": "urn:merlin:merlin-stg:model:46.218"}`, (string)(data)) + + testutils.Respond(t, w, http.StatusNonAuthoritativeInfo, `[]`) + }, + }, + { + name: "5xxResponse", + rawCfg: map[string]interface{}{ + "request": map[string]interface{}{ + "url": "{{serverURL}}/api/v1/endpoint", + "content_type": "application/json", + "accept": "application/json", + }, + "script": map[string]interface{}{ + "engine": "tengo", + "source": "// do nothing", + }, + }, + handler: func(t *testing.T, w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + }, + expectedErr: "unsuccessful request: response status code: 500", + }, + { + name: "RequestTimeout", + rawCfg: map[string]interface{}{ + "request": map[string]interface{}{ + "url": "{{serverURL}}/api/v1/endpoint", + "content_type": "application/json", + "accept": "application/json", + "timeout": "50ms", + }, + "script": map[string]interface{}{ + "engine": "tengo", + "source": "// do nothing", + }, + }, + handler: func(t *testing.T, w http.ResponseWriter, r *http.Request) { + time.Sleep(100 * time.Millisecond) + testutils.Respond(t, w, http.StatusOK, `[]`) + }, + expectedErr: "context deadline exceeded", + }, + { + name: "AssetFromResponse", + rawCfg: map[string]interface{}{ + "request": map[string]interface{}{ + "url": "{{serverURL}}/api/v1/endpoint", + "content_type": "application/json", + "accept": "application/json", + }, + "script": map[string]interface{}{ + "engine": "tengo", + "source": heredoc.Doc(` + asset := new_asset("user") + // URN format: "urn:{service}:{scope}:{type}:{id}" + asset.urn = format("urn:%s:staging:user:%s", "my_usr_svc", response.employee_id) + asset.name = response.fullname + asset.service = "my_usr_svc" + // asset.type = "user" // not required, new_asset("user") sets the field. + asset.data.email = response.work_email + asset.data.username = response.employee_id + asset.data.first_name = response.legal_first_name + asset.data.last_name = response.legal_last_name + asset.data.full_name = response.fullname + asset.data.display_name = response.fullname + asset.data.title = response.business_title + asset.data.status = response.terminated == "true" ? "suspended" : "active" + asset.data.manager_email = response.manager_email + asset.data.attributes = { + manager_id: response.manager_id, + cost_center_id: response.cost_center_id, + supervisory_org_name: response.supervisory_org_name, + location_id: response.location_id + } + emit(asset) + `), + }, + }, + handler: func(t *testing.T, w http.ResponseWriter, r *http.Request) { + testutils.Respond(t, w, http.StatusOK, + `{"manager_name":"Gabbi Champain","terminated":"false","fullname":"Van Stump","location_name":"Morocco","work_email":"vstump0@pcworld.com","supervisory_org_id":"1863d11c-fb86-4dbd-aa17-a53cbbaf3e9c","supervisory_org_name":"Research and Development","business_title":"General Manager","company_name":"Erdman Group","cost_center_id":"d6b470d8-e1ed-43ee-ab43-a645e607cf81","cost_center_name":"Sales","employee_id":"395f8292-d48b-431b-9e2d-63b3dcd4b986","manager_id":"496a320c-3c0a-4c0d-9658-a4f1dbbae20d","location_id":"MA","termination_date":null,"company_id":"8560f69c-11ef-42d4-b57e-8b8eacf32f9f","legal_first_name":"Van","manager_email":"vgchampain1@dot.gov","legal_last_name":"Stump"}`, + ) + }, + expected: []*v1beta2.Asset{{ + Urn: "urn:my_usr_svc:staging:user:395f8292-d48b-431b-9e2d-63b3dcd4b986", + Name: "Van Stump", + Service: "my_usr_svc", + Type: "user", + Data: testutils.BuildAny(t, &v1beta2.User{ + FirstName: "Van", + LastName: "Stump", + FullName: "Van Stump", + DisplayName: "Van Stump", + Email: "vstump0@pcworld.com", + Title: "General Manager", + ManagerEmail: "vgchampain1@dot.gov", + Status: "active", + Username: "395f8292-d48b-431b-9e2d-63b3dcd4b986", + Attributes: &structpb.Struct{Fields: map[string]*structpb.Value{ + "cost_center_id": structpb.NewStringValue("d6b470d8-e1ed-43ee-ab43-a645e607cf81"), + "location_id": structpb.NewStringValue("MA"), + "manager_id": structpb.NewStringValue("496a320c-3c0a-4c0d-9658-a4f1dbbae20d"), + "supervisory_org_name": structpb.NewStringValue("Research and Development"), + }}, + }), + }}, + }, + { + name: "MultipleAssetsFromResponse", + rawCfg: map[string]interface{}{ + "request": map[string]interface{}{ + "url": "{{serverURL}}/api/v1/endpoint", + "content_type": "application/json", + "accept": "application/json", + }, + "script": map[string]interface{}{ + "engine": "tengo", + "source": heredoc.Doc(` + text := import("text") + enum := import("enum") + + merge := func(m1, m2) { + for k, v in m2 { + m1[k] = v + } + return m1 + } + + kafkaServerToScope := func(server) { + ss := text.split(server, ":") + return ss[0] + } + + buildUpstreams := func(ft) { + upstreams := undefined + if src := ft.spec.batch_source; src != undefined && src.type == "BATCH_BIGQUERY" { + ss := text.split(src.bigquery_options.table_ref, ":") + upstreams = append(upstreams || [], { + urn: format("urn:bigquery:%s:table:%s", ss[0], src.bigquery_options.table_ref), + service: "bigquery", + type: "table" + }) + } + if src := ft.spec.stream_source; src != undefined { + upstreams = append(upstreams || [], { + urn: format( + "urn:kafka:%s:topic:%s", + kafkaServerToScope(src.kafka_options.bootstrap_servers), src.kafka_options.topic + ), + service: "kafka", + type: "topic" + }) + } + return upstreams + } + + for ft in response.tables { + ast := new_asset("feature_table") + ast.urn = format( + "urn:caramlstore:staging:feature_table:%s.%s", + response.project, ft.spec.name + ) + ast.name = ft.spec.name + ast.service = "caramlstore" + ast.type = "feature_table" + ast.data = merge(ast.data, { + namespace: response.project, + entities: enum.map(ft.spec.entities, func(i, e){ + entity := enum.find(response.entities, func(i, entity) { + return e == entity.spec.name + }) + return { + name: entity.spec.name, + labels: { + "value_type": entity.spec.value_type, + "description": entity.spec.description + } + } + }), + features: enum.map(ft.spec.features, func(i, f){ + return { + name: f.name, + data_type: f.value_type + } + }), + create_time: ft.meta.created_timestamp, + update_time: ft.meta.last_updated_timestamp + }) + ast.lineage = { + upstreams: buildUpstreams(ft) + } + ast.labels = ft.spec.labels + emit(ast) + } + `), + }, + }, + handler: func(t *testing.T, w http.ResponseWriter, r *http.Request) { + testutils.Respond(t, w, http.StatusOK, + `{"project":"sauron","entities":[{"spec":{"name":"merchant_uuid","value_type":"STRING","description":"merchant uuid"},"meta":{"created_timestamp":"2022-08-08T03:17:51Z","last_updated_timestamp":"2022-08-08T03:17:51Z"}},{"spec":{"name":"booking_hour","value_type":"STRING","description":"Booking creation hour"},"meta":{"created_timestamp":"2022-08-08T03:17:51Z","last_updated_timestamp":"2022-08-08T03:17:51Z"}},{"spec":{"name":"day_of_week","value_type":"STRING","description":"Booking Day of Week"},"meta":{"created_timestamp":"2022-08-08T03:17:51Z","last_updated_timestamp":"2022-08-08T03:17:51Z"}},{"spec":{"name":"meal_id","value_type":"STRING","description":"Mealtime Identifier"},"meta":{"created_timestamp":"2022-08-08T03:17:51Z","last_updated_timestamp":"2022-08-08T03:17:51Z"}},{"spec":{"name":"t3_distance_bucket","value_type":"STRING","description":"T3 Distance Bucket"},"meta":{"created_timestamp":"2022-08-08T03:17:51Z","last_updated_timestamp":"2022-08-08T03:17:51Z"}},{"spec":{"name":"destination_s2id_12","value_type":"STRING","description":"Destination s2id_12"},"meta":{"created_timestamp":"2022-09-02T08:28:33Z","last_updated_timestamp":"2022-09-02T08:28:33Z"}},{"spec":{"name":"service_area_id","value_type":"STRING","description":"the id of gofood service area"},"meta":{"created_timestamp":"2022-09-14T03:10:45Z","last_updated_timestamp":"2022-09-14T03:10:45Z"}},{"spec":{"name":"item_uuid","value_type":"STRING","description":"item uuid"},"meta":{"created_timestamp":"2022-09-17T15:14:13Z","last_updated_timestamp":"2022-09-17T15:14:13Z"}}],"tables":[{"spec":{"name":"merchant_uuid_t2_discovery","entities":["merchant_uuid"],"features":[{"name":"avg_t2_merchant_3d","value_type":"DOUBLE"},{"name":"avg_t2_merchant_1d","value_type":"DOUBLE"},{"name":"avg_merchant_price","value_type":"DOUBLE"},{"name":"avg_t2_same_hour_merchant_1m","value_type":"DOUBLE"},{"name":"avg_t2_merchant_1w","value_type":"DOUBLE"},{"name":"avg_gmv_merchant_1w","value_type":"DOUBLE"},{"name":"avg_gmv_merchant_1d","value_type":"DOUBLE"},{"name":"merch_demand_same_hour_1m","value_type":"DOUBLE"},{"name":"avg_t2_merchant_3h","value_type":"DOUBLE"},{"name":"t2_discovery","value_type":"DOUBLE"},{"name":"avg_gmv_merchant_3h","value_type":"DOUBLE"},{"name":"avg_gmv_merchant_1m","value_type":"DOUBLE"},{"name":"avg_gmv_same_hour_merchant_1m","value_type":"DOUBLE"},{"name":"avg_t2_merchant_1m","value_type":"DOUBLE"}],"max_age":"7200s","batch_source":{"type":"BATCH_BIGQUERY","event_timestamp_column":"event_timestamp","bigquery_options":{"table_ref":"celestial-dragons-staging:feast.merchant_uuid_t2_discovery"}},"online_store":{"name":"bigtable","type":"BIGTABLE"}},"meta":{"created_timestamp":"2022-08-08T03:17:54Z","last_updated_timestamp":"2022-08-08T03:17:54Z","hash":"1227ba57"}},{"spec":{"name":"avg_dispatch_arrival_time_10_mins","entities":["merchant_uuid"],"features":[{"name":"ongoing_placed_and_waiting_acceptance_orders","value_type":"INT64"},{"name":"ongoing_orders","value_type":"INT64"},{"name":"merchant_avg_dispatch_arrival_time_10m","value_type":"FLOAT"},{"name":"ongoing_accepted_orders","value_type":"INT64"}],"max_age":"0s","batch_source":{"type":"BATCH_FILE","event_timestamp_column":"null","file_options":{"file_format":{"parquet_format":{}},"file_url":"/dev/null"}},"stream_source":{"type":"STREAM_KAFKA","field_mapping":{"merchant_uuid":"restaurant_uuid"},"event_timestamp_column":"event_timestamp","kafka_options":{"bootstrap_servers":"int-dagstream-kafka.yonkou.io:6668","topic":"GO_FOOD-delay-allocation-merchant-feature-10m-log","message_format":{"proto_format":{"class_path":"com.bubble.DelayAllocationMerchantFeature10mLogMessage"}}}},"online_store":{"name":"bigtable","type":"BIGTABLE"}},"meta":{"created_timestamp":"2022-09-19T22:42:04Z","last_updated_timestamp":"2022-09-21T13:23:02Z","revision":"2","hash":"730855ef"}}]}`, + ) + }, + expected: []*v1beta2.Asset{ + { + Urn: "urn:caramlstore:staging:feature_table:sauron.merchant_uuid_t2_discovery", + Name: "merchant_uuid_t2_discovery", + Service: "caramlstore", + Type: "feature_table", + Data: testutils.BuildAny(t, &v1beta2.FeatureTable{ + Namespace: "sauron", + Entities: []*v1beta2.FeatureTable_Entity{{ + Name: "merchant_uuid", + Labels: map[string]string{ + "description": "merchant uuid", + "value_type": "STRING", + }, + }}, + Features: []*v1beta2.Feature{ + {Name: "avg_t2_merchant_3d", DataType: "DOUBLE"}, + {Name: "avg_t2_merchant_1d", DataType: "DOUBLE"}, + {Name: "avg_merchant_price", DataType: "DOUBLE"}, + {Name: "avg_t2_same_hour_merchant_1m", DataType: "DOUBLE"}, + {Name: "avg_t2_merchant_1w", DataType: "DOUBLE"}, + {Name: "avg_gmv_merchant_1w", DataType: "DOUBLE"}, + {Name: "avg_gmv_merchant_1d", DataType: "DOUBLE"}, + {Name: "merch_demand_same_hour_1m", DataType: "DOUBLE"}, + {Name: "avg_t2_merchant_3h", DataType: "DOUBLE"}, + {Name: "t2_discovery", DataType: "DOUBLE"}, + {Name: "avg_gmv_merchant_3h", DataType: "DOUBLE"}, + {Name: "avg_gmv_merchant_1m", DataType: "DOUBLE"}, + {Name: "avg_gmv_same_hour_merchant_1m", DataType: "DOUBLE"}, + {Name: "avg_t2_merchant_1m", DataType: "DOUBLE"}, + }, + CreateTime: timestamppb.New(time.Date(2022, time.August, 8, 3, 17, 54, 0, time.UTC)), + UpdateTime: timestamppb.New(time.Date(2022, time.August, 8, 3, 17, 54, 0, time.UTC)), + }), + Lineage: &v1beta2.Lineage{ + Upstreams: []*v1beta2.Resource{{ + Urn: "urn:bigquery:celestial-dragons-staging:table:celestial-dragons-staging:feast.merchant_uuid_t2_discovery", + Service: "bigquery", + Type: "table", + }}, + }, + }, + { + Urn: "urn:caramlstore:staging:feature_table:sauron.avg_dispatch_arrival_time_10_mins", + Name: "avg_dispatch_arrival_time_10_mins", + Service: "caramlstore", + Type: "feature_table", + Data: testutils.BuildAny(t, &v1beta2.FeatureTable{ + Namespace: "sauron", + Entities: []*v1beta2.FeatureTable_Entity{{ + Name: "merchant_uuid", + Labels: map[string]string{ + "description": "merchant uuid", + "value_type": "STRING", + }, + }}, + Features: []*v1beta2.Feature{ + {Name: "ongoing_placed_and_waiting_acceptance_orders", DataType: "INT64"}, + {Name: "ongoing_orders", DataType: "INT64"}, + {Name: "merchant_avg_dispatch_arrival_time_10m", DataType: "FLOAT"}, + {Name: "ongoing_accepted_orders", DataType: "INT64"}, + }, + CreateTime: timestamppb.New(time.Date(2022, time.September, 19, 22, 42, 04, 0, time.UTC)), + UpdateTime: timestamppb.New(time.Date(2022, time.September, 21, 13, 23, 02, 0, time.UTC)), + }), + Lineage: &v1beta2.Lineage{ + Upstreams: []*v1beta2.Resource{{ + Urn: "urn:kafka:int-dagstream-kafka.yonkou.io:topic:GO_FOOD-delay-allocation-merchant-feature-10m-log", + Service: "kafka", + Type: "topic", + }}, + }, + }, + }, + }, + { + name: "ConditionalExit", + rawCfg: map[string]interface{}{ + "request": map[string]interface{}{ + "url": "{{serverURL}}/api/v1/endpoint", + "content_type": "application/json", + "accept": "application/json", + }, + "script": map[string]interface{}{ + "engine": "tengo", + "source": heredoc.Doc(` + if !response.success { + exit() + } + a := new_asset("invalid") + `), + }, + }, + handler: func(t *testing.T, w http.ResponseWriter, r *http.Request) { + testutils.Respond(t, w, http.StatusOK, `{"success": false, "data": []}`) + }, + }, + { + name: "NewAssetWithoutType", + rawCfg: map[string]interface{}{ + "request": map[string]interface{}{ + "url": "{{serverURL}}/api/v1/endpoint", + "content_type": "application/json", + "accept": "application/json", + }, + "script": map[string]interface{}{ + "engine": "tengo", + "source": heredoc.Doc(` + a := new_asset() + emit(a) + `), + }, + }, + handler: func(t *testing.T, w http.ResponseWriter, r *http.Request) { + testutils.Respond(t, w, http.StatusOK, `{}`) + }, + expectedErr: "Runtime Error: wrong number of arguments in call to 'user-function:new_asset'", + }, + { + name: "InvalidAssetType", + rawCfg: map[string]interface{}{ + "request": map[string]interface{}{ + "url": "{{serverURL}}/api/v1/endpoint", + "content_type": "application/json", + "accept": "application/json", + }, + "script": map[string]interface{}{ + "engine": "tengo", + "source": heredoc.Doc(` + a := new_asset("invalid") + emit(a) + `), + }, + }, + handler: func(t *testing.T, w http.ResponseWriter, r *http.Request) { + testutils.Respond(t, w, http.StatusOK, `{}`) + }, + expectedErr: "Runtime Error: new asset: unexpected type: invalid", + }, + { + name: "EmitInvalidValue", + rawCfg: map[string]interface{}{ + "request": map[string]interface{}{ + "url": "{{serverURL}}/api/v1/endpoint", + "content_type": "application/json", + "accept": "application/json", + }, + "script": map[string]interface{}{ + "engine": "tengo", + "source": heredoc.Doc(` + emit("invalid") + `), + }, + }, + handler: func(t *testing.T, w http.ResponseWriter, r *http.Request) { + testutils.Respond(t, w, http.StatusOK, `{}`) + }, + expectedErr: "Runtime Error: invalid type for argument 'asset' in call to 'user-function:emit': expected Map, found string", + }, + { + name: "EmitMultiple", + rawCfg: map[string]interface{}{ + "request": map[string]interface{}{ + "url": "{{serverURL}}/api/v1/endpoint", + "content_type": "application/json", + "accept": "application/json", + }, + "script": map[string]interface{}{ + "engine": "tengo", + "source": heredoc.Doc(` + emit(new_asset("user"), new_asset("user")) + `), + }, + }, + handler: func(t *testing.T, w http.ResponseWriter, r *http.Request) { + testutils.Respond(t, w, http.StatusOK, `{}`) + }, + expectedErr: "Runtime Error: wrong number of arguments in call to 'user-function:emit'", + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + tc.handler(t, w, r) + })) + defer srv.Close() + + replaceServerURL(tc.rawCfg, srv.URL) + + extr := New(testutils.Logger) + err := extr.Init(ctx, plugins.Config{ + URNScope: urnScope, + RawConfig: tc.rawCfg, + }) + require.NoError(t, err) + + emitter := mocks.NewEmitter() + err = extr.Extract(ctx, emitter.Push) + if tc.expectedErr != "" { + assert.ErrorContains(t, err, tc.expectedErr) + return + } + + assert.NoError(t, err) + testutils.AssertEqualProtos(t, tc.expected, emitter.GetAllData()) + }) + } +} + +func replaceServerURL(cfg map[string]interface{}, serverURL string) { + reqCfg := cfg["request"].(map[string]interface{}) + reqCfg["url"] = strings.Replace(reqCfg["url"].(string), "{{serverURL}}", serverURL, 1) +} diff --git a/plugins/extractors/populate.go b/plugins/extractors/populate.go index 8de765f41..290a5c3cb 100644 --- a/plugins/extractors/populate.go +++ b/plugins/extractors/populate.go @@ -14,6 +14,7 @@ import ( _ "github.com/odpf/meteor/plugins/extractors/github" _ "github.com/odpf/meteor/plugins/extractors/grafana" _ "github.com/odpf/meteor/plugins/extractors/gsuite" + _ "github.com/odpf/meteor/plugins/extractors/http" _ "github.com/odpf/meteor/plugins/extractors/kafka" _ "github.com/odpf/meteor/plugins/extractors/mariadb" _ "github.com/odpf/meteor/plugins/extractors/merlin" diff --git a/plugins/internal/tengoutil/secure_script.go b/plugins/internal/tengoutil/secure_script.go index ad806a5a2..610969880 100644 --- a/plugins/internal/tengoutil/secure_script.go +++ b/plugins/internal/tengoutil/secure_script.go @@ -13,7 +13,10 @@ const ( func NewSecureScript(input []byte) *tengo.Script { s := tengo.NewScript(input) - s.SetImports(stdlib.GetModuleMap(stdlib.AllModuleNames()...)) + s.SetImports(stdlib.GetModuleMap( + // `os` is excluded, should not be importable from script. + "math", "text", "times", "rand", "fmt", "json", "base64", "hex", "enum", + )) s.SetMaxAllocs(maxAllocs) s.SetMaxConstObjects(maxConsts) diff --git a/plugins/internal/tengoutil/secure_script_test.go b/plugins/internal/tengoutil/secure_script_test.go index de0e3ed07..308eb0d8e 100644 --- a/plugins/internal/tengoutil/secure_script_test.go +++ b/plugins/internal/tengoutil/secure_script_test.go @@ -11,10 +11,9 @@ import ( ) func TestNewSecureScript(t *testing.T) { - t.Run("Allows import of builtin modules", func(t *testing.T) { + t.Run("Allows import of builtin modules except os", func(t *testing.T) { s := NewSecureScript(([]byte)(heredoc.Doc(` math := import("math") - os := import("os") text := import("text") times := import("times") rand := import("rand") @@ -28,6 +27,12 @@ func TestNewSecureScript(t *testing.T) { assert.NoError(t, err) }) + t.Run("os import disallowed", func(t *testing.T) { + s := NewSecureScript(([]byte)(`os := import("os")`)) + _, err := s.Compile() + assert.ErrorContains(t, err, "Compile Error: module 'os' not found") + }) + t.Run("File import disallowed", func(t *testing.T) { s := NewSecureScript(([]byte)(`sum := import("./testdata/sum")`)) _, err := s.Compile() diff --git a/plugins/processors/script/README.md b/plugins/processors/script/README.md index dd9ff8481..5b4f6d94d 100644 --- a/plugins/processors/script/README.md +++ b/plugins/processors/script/README.md @@ -7,7 +7,7 @@ supported script engine. Refer Tengo documentation for script language syntax and supported functionality \- https://github.com/d5/tengo/tree/v2.13.0#references . [Tengo standard library modules][tengo-stdlib] can also be imported and used -if requried. +if required. ## Usage @@ -27,6 +27,11 @@ processors: | `engine` | `string` | `"tengo"` | Script engine. Only `"tengo"` is supported currently | ✅ | | `script` | `string` | `asset.labels = merge({script_engine: "tengo"}, asset.labels)` | [Tengo][tengo] script. | ✅ | +### Notes + +- Tengo is the only supported script engine. +- Tengo's `os` stdlib module cannot be imported and used in the script. + ### Script Globals #### `asset` @@ -45,7 +50,7 @@ structure for `asset.data` will be one of the following: - [`Job`][proton-job] - [`Metric`][proton-metric] - [`Model`][proton-model] -- [`Service`][proton-service] +- [`Application`][proton-application] - [`Table`][proton-table] - [`Topic`][proton-topic] - [`User`][proton-user] @@ -242,7 +247,7 @@ for information on contributing to this module. [proton-model]: https://github.com/odpf/proton/blob/fabbde8/odpf/assets/v1beta2/model.proto#L73 -[proton-service]: https://github.com/odpf/proton/blob/fabbde8/odpf/assets/v1beta2/service.proto#L11 +[proton-application]: https://github.com/odpf/proton/blob/fabbde8/odpf/assets/v1beta2/application.proto#L11 [proton-table]: https://github.com/odpf/proton/blob/fabbde8/odpf/assets/v1beta2/table.proto#L14 diff --git a/plugins/util.go b/plugins/util.go index 963ad15f6..6a98ec0e9 100644 --- a/plugins/util.go +++ b/plugins/util.go @@ -53,9 +53,10 @@ func buildConfig(configMap map[string]interface{}, c interface{}) (err error) { if errors.As(err, &validationErr) { var configErrors []ConfigError for _, fieldErr := range validationErr { + key := strings.TrimPrefix(fieldErr.Namespace(), "Config.") configErrors = append(configErrors, ConfigError{ - Key: fieldErr.Field(), - Message: fmt.Sprintf("validation for field '%s' failed on the '%s' tag", fieldErr.Field(), fieldErr.Tag()), + Key: key, + Message: fmt.Sprintf("validation for field '%s' failed on the '%s' tag", key, fieldErr.Tag()), }) } return InvalidConfigError{ diff --git a/test/utils/assert.go b/test/utils/assert.go index 8c8d95561..3ec100359 100644 --- a/test/utils/assert.go +++ b/test/utils/assert.go @@ -3,6 +3,7 @@ package utils import ( "fmt" "os" + "reflect" "testing" "github.com/google/go-cmp/cmp" @@ -30,6 +31,42 @@ func AssertEqualProto(t *testing.T, expected, actual proto.Message) { } } +func AssertEqualProtos(t *testing.T, expected, actual interface{}) { + t.Helper() + + defer func() { + if r := recover(); r != nil { + assert.Fail(t, "assert equal protos: panic recovered", r) + } + }() + + if reflect.TypeOf(expected).Kind() != reflect.TypeOf(actual).Kind() { + msg := fmt.Sprintf( + "Mismatched kinds:\n"+ + "expected: %s\n"+ + "actual: %s\n", + reflect.TypeOf(expected).Kind(), reflect.TypeOf(actual).Kind(), + ) + assert.Fail(t, msg) + return + } + + if !assert.Len(t, actual, reflect.ValueOf(expected).Len()) { + return + } + + ev := reflect.ValueOf(expected) + av := reflect.ValueOf(actual) + switch reflect.TypeOf(expected).Kind() { + case reflect.Slice: + for i := 0; i < ev.Len(); i++ { + AssertEqualProto( + t, ev.Index(i).Interface().(proto.Message), av.Index(i).Interface().(proto.Message), + ) + } + } +} + func AssertAssetsWithJSON(t *testing.T, expected, actuals []*v1beta2.Asset) { t.Helper()