diff --git a/plugins/extractors/http/README.md b/plugins/extractors/http/README.md index 92f6a9144..2d631a0d0 100644 --- a/plugins/extractors/http/README.md +++ b/plugins/extractors/http/README.md @@ -20,10 +20,10 @@ The user specified script has access to the response, if the API call was successful, and can use it for constructing and emitting assets using a custom script. Currently, [Tengo][tengo] is the only supported script engine. -Refer Tengo documentation for script language syntax and supported functionality -\- https://github.com/d5/tengo/tree/v2.13.0#references -. [Tengo standard library modules][tengo-stdlib] can also be imported and used -if required. +Refer Tengo documentation for script language syntax and supported +functionality - https://github.com/d5/tengo/tree/v2.13.0#references. +[Tengo standard library modules][tengo-stdlib] can also be imported and used if +required (except the `os` module). ## Usage @@ -46,6 +46,7 @@ source: key: value timeout: 5s success_codes: [ 200 ] + concurrency: 3 script: engine: tengo source: | @@ -56,19 +57,26 @@ source: ## Inputs -| Key | Value | Example | Description | Required? | -|:-----------------------|:--------------------|:---------------------------------------|:------------------------------------------------------------------------------------------------|:----------| -| `request.url` | `string` | `http://example.com/api/v1/endpoint` | The HTTP endpoint to send request to | ✅ | -| `request.query_params` | `[]{key, value}` | `[{"key":"s","value":"One Piece"}]` | The query parameters to be added to the request URL. | ✘ | -| `request.method` | `string` | `GET`/`POST` | The HTTP verb/method to use with request. Default is `GET`. | ✘ | -| `request.headers` | `map[string]string` | `{"Api-Token": "..."}` | Headers to send in the HTTP request. | ✘ | -| `request.content_type` | `string` | `application/json` | Content type for encoding request body. Also sent as a header. | ✅ | -| `request.accept` | `string` | `application/json` | Sent as the `Accept` header. Also indicates the format to use for decoding. | ✅ | -| `request.body` | `Object` | `{"key": "value"}` | The request body to be sent. | ✘ | -| `request.timeout` | `string` | `1s` | Timeout for the HTTP request. Default is 5s. | ✘ | -| `success_codes` | `[]int` | `[200]` | The list of status codes that would be considered as a successful response. Default is `[200]`. | ✘ | -| `script.engine` | `string` | `tengo` | Script engine. Only `"tengo"` is supported currently | ✅ | -| `script.source` | `string` | see [Worked Example](#worked-example). | [Tengo][tengo] script used to map the response into 0 or more assets. | ✅ | +| Key | Value | Example | Description | Required? | +|:----------------|:---------|:---------------------------------------|:------------------------------------------------------------------------------------------------|:----------| +| `request` | `Object` | see [Request](#request) | The configuration for constructing and sending HTTP request. | ✅ | +| `success_codes` | `[]int` | `[200]` | The list of status codes that would be considered as a successful response. Default is `[200]`. | ✘ | +| `concurrency` | `int` | `5` | Number of concurrent child requests to execute. Default is `5` | ✘ | +| `script.engine` | `string` | `tengo` | Script engine. Only `"tengo"` is supported currently | ✅ | +| `script.source` | `string` | see [Worked Example](#worked-example). | [Tengo][tengo] script used to map the response into 0 or more assets. | ✅ | + +### Request + +| Key | Value | Example | Description | Required? | +|:---------------|:--------------------|:-------------------------------------|:----------------------------------------------------------------------------|:----------| +| `url` | `string` | `http://example.com/api/v1/endpoint` | The HTTP endpoint to send request to | ✅ | +| `query_params` | `[]{key, value}` | `[{"key":"s","value":"One Piece"}]` | The query parameters to be added to the request URL. | ✘ | +| `method` | `string` | `GET`/`POST` | The HTTP verb/method to use with request. Default is `GET`. | ✘ | +| `headers` | `map[string]string` | `{"Api-Token": "..."}` | Headers to send in the HTTP request. | ✘ | +| `content_type` | `string` | `application/json` | Content type for encoding request body. Also sent as a header. | ✅ | +| `accept` | `string` | `application/json` | Sent as the `Accept` header. Also indicates the format to use for decoding. | ✅ | +| `body` | `Object` | `{"key": "value"}` | The request body to be sent. | ✘ | +| `timeout` | `string` | `1s` | Timeout for the HTTP request. Default is 5s. | ✘ | ### Notes @@ -86,7 +94,24 @@ source: #### `response` -HTTP response received. +HTTP response received with the `status_code`, `header` and `body`. Ex: + +```json +{ + "status_code": "200", + "header": { + "link": ";rel=self,;rel=first,;rel=previous,;rel=next,;rel=last" + }, + "body": [ + {"id": 1, "name": "Widget #1"}, + {"id": 2, "name": "Widget #2"}, + {"id": 3, "name": "Widget #3"} + ] +} +``` + +The header names are always in lower case. See +[Worked Example](#worked-example) for detailed usage. #### `new_asset(string): Asset` @@ -125,6 +150,60 @@ asset.data.full_name = "Daiyamondo Jozu" Takes an asset and emits the asset that can then be consumed by the processor/sink. +#### `execute_request(...requests)` + +Takes 1 or more requests and executes the requests with the concurrency defined +in the recipe. The results are returned as an array. Each item in the array can +be an error or the HTTP response. The request object supports the properties +defined in the [Request](#request) input section. + +When a request is executed, it can fail due to temporary errors such as network +errors. These instances need to be handled in the script. + +[//]: # (@formatter:off) + +```go +if !response.body.success { + exit() +} + +reqs := [] +for j in response.body.jobs { + reqs = append(reqs, { + url: format("http://my.server.com/jobs/%s/config", j.id), + method: "GET", + content_type: "application/json", + accept: "application/json", + timeout: "5s" + }) +} + +responses := execute_request(reqs...) +for r in responses { + if is_error(r) { + // TODO: Handle it appropriately. The error value has the request and + // error string: + // r.value.{request, error} + continue + } + + asset := new_asset("job") + asset.name = r.body.name + exec_cfg := r.body["execution-config"] + asset.data.attributes = { + "job_id": r.body.jid, + "job_parallelism": exec_cfg["job-parallelism"], + "config": exec_cfg["user-config"] + } + emit(asset) +} +``` + +[//]: # (@formatter:on) + +If the request passed to the function fails validation, a runtime error is +thrown. + #### `exit()` Terminates the script execution. @@ -200,11 +279,11 @@ source: script: engine: tengo source: | - if !response.success { + if !response.body.success { exit() } - users := response.data + users := response.body.data for u in users { if u.email == "" { continue @@ -229,19 +308,21 @@ source: manager_id: u.manager_id, cost_center_id: u.cost_center_id, supervisory_org_name: u.supervisory_org_name, - location_id: u.location_id + location_id: u.location_id, + service_job_id: response.header["x-job-id"] } emit(asset) } ``` -This would emit a 'User' asset for each user object in `response.data`. +This would emit a 'User' asset for each user object in `response.data`. Note +that the response headers can be accessed under `response.header` and can be +used as needed. ## Caveats The following features are currently not supported: -- Pagination. - Explicit authentication support, ex: Basic auth/OAuth/OAuth2/JWT etc. - Retries with configurable backoff. - Content type for request/response body other than `application/json`. diff --git a/plugins/extractors/http/execute_request.go b/plugins/extractors/http/execute_request.go index 5bfe6a1c1..e7e441e4f 100644 --- a/plugins/extractors/http/execute_request.go +++ b/plugins/extractors/http/execute_request.go @@ -7,31 +7,32 @@ import ( "fmt" "io" "net/http" + "strings" ) -func (e *Extractor) executeRequest(ctx context.Context) (interface{}, error) { - cfg := e.config +type executeRequestFunc func(ctx context.Context, reqCfg RequestConfig) (map[string]interface{}, error) - ctx, cancel := context.WithTimeout(ctx, cfg.Request.Timeout) - defer cancel() +func makeRequestExecutor(successCodes []int, httpClient *http.Client) executeRequestFunc { + return func(ctx context.Context, reqCfg RequestConfig) (map[string]interface{}, error) { + ctx, cancel := context.WithTimeout(ctx, reqCfg.Timeout) + defer cancel() - req, err := buildRequest(ctx, cfg) - if err != nil { - return nil, err - } + req, err := buildRequest(ctx, reqCfg) + if err != nil { + return nil, err + } - resp, err := e.http.Do(req) - defer drainBody(resp) - if err != nil { - return nil, fmt.Errorf("do request: %w", err) - } + resp, err := httpClient.Do(req) + defer drainBody(resp) + if err != nil { + return nil, fmt.Errorf("do request: %w", err) + } - return handleResponse(cfg, resp) + return handleResponse(successCodes, resp) + } } -func buildRequest(ctx context.Context, cfg Config) (*http.Request, error) { - reqCfg := cfg.Request - +func buildRequest(ctx context.Context, reqCfg RequestConfig) (*http.Request, error) { body, err := asReader(reqCfg.Body) if err != nil { return nil, fmt.Errorf("encode request body: %w", err) @@ -72,17 +73,26 @@ func addQueryParams(req *http.Request, params []QueryParam) { req.URL.RawQuery = q.Encode() } -func handleResponse(cfg Config, resp *http.Response) (interface{}, error) { - if !has(cfg.SuccessCodes, resp.StatusCode) { +func handleResponse(successCodes []int, resp *http.Response) (map[string]interface{}, error) { + if !has(successCodes, resp.StatusCode) { return nil, fmt.Errorf("unsuccessful request: response status code: %d", resp.StatusCode) } - var res interface{} - if err := json.NewDecoder(resp.Body).Decode(&res); err != nil { + h := make(map[string]interface{}, len(resp.Header)) + for k := range resp.Header { + h[strings.ToLower(k)] = resp.Header.Get(k) + } + + var body interface{} + if err := json.NewDecoder(resp.Body).Decode(&body); err != nil { return nil, fmt.Errorf("decode response: %w", err) } - return res, nil + return map[string]interface{}{ + "status_code": resp.StatusCode, + "header": h, + "body": body, + }, nil } func asReader(v interface{}) (io.Reader, error) { diff --git a/plugins/extractors/http/execute_script.go b/plugins/extractors/http/execute_script.go index 8a8e95976..43a9c4e5d 100644 --- a/plugins/extractors/http/execute_script.go +++ b/plugins/extractors/http/execute_script.go @@ -4,8 +4,13 @@ import ( "context" "errors" "fmt" + "reflect" + "strings" + "sync" "github.com/d5/tengo/v2" + "github.com/go-playground/validator/v10" + "github.com/mcuadros/go-defaults" "github.com/odpf/meteor/models" v1beta2 "github.com/odpf/meteor/models/odpf/assets/v1beta2" "github.com/odpf/meteor/plugins" @@ -15,9 +20,11 @@ import ( ) func (e *Extractor) executeScript(ctx context.Context, res interface{}, emit plugins.Emit) error { - s := tengoutil.NewSecureScript(([]byte)(e.config.Script.Source)) - if err := declareGlobals(s, res, emit); err != nil { - return fmt.Errorf("declare globals: %w", err) + s, err := tengoutil.NewSecureScript( + ([]byte)(e.config.Script.Source), e.scriptGlobals(ctx, res, emit), + ) + if err != nil { + return err } c, err := s.Compile() @@ -32,8 +39,8 @@ func (e *Extractor) executeScript(ctx context.Context, res interface{}, emit plu return nil } -func declareGlobals(s *tengo.Script, res interface{}, emit plugins.Emit) error { - for name, v := range map[string]interface{}{ +func (e *Extractor) scriptGlobals(ctx context.Context, res interface{}, emit plugins.Emit) map[string]interface{} { + return map[string]interface{}{ "response": res, "new_asset": &tengo.UserFunction{ Name: "new_asset", @@ -43,18 +50,17 @@ func declareGlobals(s *tengo.Script, res interface{}, emit plugins.Emit) error { Name: "emit", Value: emitWrapper(emit), }, + "execute_request": &tengo.UserFunction{ + Name: "execute_request", + Value: executeRequestWrapper(ctx, e.config.Concurrency, e.executeRequest), + }, "exit": &tengo.UserFunction{ Name: "exit", Value: func(...tengo.Object) (tengo.Object, error) { return nil, errUserExit }, }, - } { - if err := s.Add(name, v); err != nil { - return fmt.Errorf("declare script globals: %w", err) - } } - return nil } func newAssetWrapper() tengo.CallableFunc { @@ -103,6 +109,120 @@ func emitWrapper(emit plugins.Emit) tengo.CallableFunc { } } +func executeRequestWrapper(ctx context.Context, concurrency int, executeRequest executeRequestFunc) tengo.CallableFunc { + type job struct { + i int + reqCfg RequestConfig + } + requestsChan := func(ctx context.Context, reqs []RequestConfig) <-chan job { + ch := make(chan job) + + go func() { + defer close(ch) + + for i, r := range reqs { + select { + case <-ctx.Done(): + return + + case ch <- job{i, r}: + } + } + }() + + return ch + } + + type result struct { + resp interface{} + err error + } + processJobs := func(ctx context.Context, n int, ch <-chan job) []result { + var wg sync.WaitGroup + wg.Add(concurrency) + + results := make([]result, n) + work := func() { + defer wg.Done() + + for { + select { + case <-ctx.Done(): + return + + case j, ok := <-ch: + if !ok { + return + } + + resp, err := executeRequest(ctx, j.reqCfg) + if err != nil { + results[j.i] = result{err: fmt.Errorf("execute request #%d: %w", j.i, err)} + continue + } + + results[j.i] = result{resp: resp} + } + } + } + + for i := 0; i < concurrency; i++ { + go work() + } + + wg.Wait() + return results + } + + validate := validator.New() + validate.RegisterTagNameFunc(func(fld reflect.StructField) string { + name := strings.SplitN(fld.Tag.Get("mapstructure"), ",", 2)[0] + if name == "-" { + return "" + } + return name + }) + return func(args ...tengo.Object) (tengo.Object, error) { + if len(args) < 1 { + return nil, tengo.ErrWrongNumArguments + } + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + reqs, err := argsToRequestConfigs(args, validate) + if err != nil { + return nil, fmt.Errorf("execute request: %w", err) + } + + results := processJobs(ctx, len(reqs), requestsChan(ctx, reqs)) + + var ret tengo.Array + for i, res := range results { + if res.err != nil { + ret.Value = append(ret.Value, &tengo.Error{ + Value: &tengo.Map{ + Value: map[string]tengo.Object{ + "request": args[i], + "error": &tengo.String{Value: res.err.Error()}, + }, + }, + }) + continue + } + + o, err := tengo.FromInterface(res.resp) + if err != nil { + return nil, fmt.Errorf("execute request: translate response: %s: %w", args[i], err) + } + + ret.Value = append(ret.Value, o) + } + + return &ret, nil + } +} + func newAsset(typeURLs map[string]string, typ string) (tengo.Object, error) { u, ok := typeURLs[typ] if !ok { @@ -121,6 +241,24 @@ func newAsset(typeURLs map[string]string, typ string) (tengo.Object, error) { }, nil } +func argsToRequestConfigs(args []tengo.Object, validate *validator.Validate) ([]RequestConfig, error) { + reqs := make([]RequestConfig, 0, len(args)) + for _, arg := range args { + var r RequestConfig + defaults.SetDefaults(&r) + if err := structmap.AsStructWithTag("mapstructure", tengo.ToInterface(arg), &r); err != nil { + return nil, fmt.Errorf("map arg to request config: %s: %w", arg, err) + } + + if err := validate.Struct(r); err != nil { + return nil, fmt.Errorf("validate request config: %s, %w", arg, err) + } + + reqs = append(reqs, r) + } + return reqs, nil +} + func knownTypeURLs() map[string]string { typeURLs := make(map[string]string, 12) for _, typ := range []string{ diff --git a/plugins/extractors/http/http_extractor.go b/plugins/extractors/http/http_extractor.go index 703463d53..185230be5 100644 --- a/plugins/extractors/http/http_extractor.go +++ b/plugins/extractors/http/http_extractor.go @@ -30,23 +30,26 @@ var summary string // Config holds the set of configuration for the HTTP extractor. type Config struct { - Request struct { - URL string `mapstructure:"url" validate:"required,url"` - QueryParams []QueryParam `mapstructure:"query_params" validate:"dive"` - Method string `mapstructure:"method" validate:"oneof=GET POST" default:"GET"` - Headers map[string]string `mapstructure:"headers"` - ContentType string `mapstructure:"content_type" validate:"required,oneof=application/json"` - Accept string `mapstructure:"accept" validate:"required,oneof=application/json"` - Body interface{} `mapstructure:"body"` - Timeout time.Duration `mapstructure:"timeout" validate:"min=1ms" default:"5s"` - } `mapstructure:"request"` - SuccessCodes []int `mapstructure:"success_codes" validate:"dive,gte=200,lt=300" default:"[200]"` + Request RequestConfig `mapstructure:"request"` + SuccessCodes []int `mapstructure:"success_codes" validate:"dive,gte=200,lt=300" default:"[200]"` + Concurrency int `mapstructure:"concurrency" validate:"gte=1,lte=100" default:"5"` Script struct { Engine string `mapstructure:"engine" validate:"required,oneof=tengo"` Source string `mapstructure:"source" validate:"required"` } `mapstructure:"script"` } +type RequestConfig struct { + URL string `mapstructure:"url" validate:"required,url"` + QueryParams []QueryParam `mapstructure:"query_params" validate:"dive"` + Method string `mapstructure:"method" validate:"oneof=GET POST" default:"GET"` + Headers map[string]string `mapstructure:"headers"` + ContentType string `mapstructure:"content_type" validate:"required,oneof=application/json"` + Accept string `mapstructure:"accept" validate:"required,oneof=application/json"` + Body interface{} `mapstructure:"body"` + Timeout time.Duration `mapstructure:"timeout" validate:"min=1ms" default:"5s"` +} + type QueryParam struct { Key string `mapstructure:"key" validate:"required"` Value string `mapstructure:"value" validate:"required"` @@ -83,9 +86,9 @@ var info = plugins.Info{ type Extractor struct { plugins.BaseExtractor - logger log.Logger - config Config - http *http.Client + logger log.Logger + config Config + executeRequest executeRequestFunc } // New returns a pointer to an initialized Extractor Object @@ -104,7 +107,7 @@ func (e *Extractor) Init(ctx context.Context, config plugins.Config) error { return err } - e.http = &http.Client{} + e.executeRequest = makeRequestExecutor(e.config.SuccessCodes, &http.Client{}) return nil } @@ -112,7 +115,7 @@ func (e *Extractor) Init(ctx context.Context, config plugins.Config) error { // executes the script. The script has access to the response and can use the // same to 'emit' assets from within the script. func (e *Extractor) Extract(ctx context.Context, emit plugins.Emit) error { - res, err := e.executeRequest(ctx) + res, err := e.executeRequest(ctx, e.config.Request) if err != nil { return fmt.Errorf("http extractor: execute request: %w", err) } diff --git a/plugins/extractors/http/http_extractor_test.go b/plugins/extractors/http/http_extractor_test.go index 2f03b35c3..36bdd05cc 100644 --- a/plugins/extractors/http/http_extractor_test.go +++ b/plugins/extractors/http/http_extractor_test.go @@ -290,26 +290,27 @@ func TestExtract(t *testing.T) { "script": map[string]interface{}{ "engine": "tengo", "source": heredoc.Doc(` + body := response.body asset := new_asset("user") // URN format: "urn:{service}:{scope}:{type}:{id}" - asset.urn = format("urn:%s:staging:user:%s", "my_usr_svc", response.employee_id) - asset.name = response.fullname + asset.urn = format("urn:%s:staging:user:%s", "my_usr_svc", body.employee_id) + asset.name = body.fullname asset.service = "my_usr_svc" // asset.type = "user" // not required, new_asset("user") sets the field. - asset.data.email = response.work_email - asset.data.username = response.employee_id - asset.data.first_name = response.legal_first_name - asset.data.last_name = response.legal_last_name - asset.data.full_name = response.fullname - asset.data.display_name = response.fullname - asset.data.title = response.business_title - asset.data.status = response.terminated == "true" ? "suspended" : "active" - asset.data.manager_email = response.manager_email + asset.data.email = body.work_email + asset.data.username = body.employee_id + asset.data.first_name = body.legal_first_name + asset.data.last_name = body.legal_last_name + asset.data.full_name = body.fullname + asset.data.display_name = body.fullname + asset.data.title = body.business_title + asset.data.status = body.terminated == "true" ? "suspended" : "active" + asset.data.manager_email = body.manager_email asset.data.attributes = { - manager_id: response.manager_id, - cost_center_id: response.cost_center_id, - supervisory_org_name: response.supervisory_org_name, - location_id: response.location_id + manager_id: body.manager_id, + cost_center_id: body.cost_center_id, + supervisory_org_name: body.supervisory_org_name, + location_id: body.location_id } emit(asset) `), @@ -393,19 +394,20 @@ func TestExtract(t *testing.T) { return upstreams } - for ft in response.tables { + body := response.body + for ft in body.tables { ast := new_asset("feature_table") ast.urn = format( "urn:caramlstore:staging:feature_table:%s.%s", - response.project, ft.spec.name + body.project, ft.spec.name ) ast.name = ft.spec.name ast.service = "caramlstore" ast.type = "feature_table" ast.data = merge(ast.data, { - namespace: response.project, + namespace: body.project, entities: enum.map(ft.spec.entities, func(i, e){ - entity := enum.find(response.entities, func(i, entity) { + entity := enum.find(body.entities, func(i, entity) { return e == entity.spec.name }) return { @@ -514,6 +516,241 @@ func TestExtract(t *testing.T) { }, }, }, + { + name: "AdditionalRequestsFromScript", + rawCfg: map[string]interface{}{ + "request": map[string]interface{}{ + "url": "{{serverURL}}/jobs", + "content_type": "application/json", + "accept": "application/json", + }, + "script": map[string]interface{}{ + "engine": "tengo", + "source": heredoc.Doc(` + reqs := [] + for j in response.body.jobs { + reqs = append(reqs, { + url: format("{{serverURL}}/jobs/%s/config", j.id), + method: "GET", + content_type: "application/json", + accept: "application/json", + timeout: "5s" + }) + } + + responses := execute_request(reqs...) + for r in responses { + if is_error(r) { + continue + } + + asset := new_asset("job") + asset.name = r.body.name + exec_cfg := r.body["execution-config"] + asset.data.attributes = { + "job_id": r.body.jid, + "job_parallelism": exec_cfg["job-parallelism"], + "config": exec_cfg["user-config"] + } + emit(asset) + } + `), + }, + }, + handler: func(t *testing.T, w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/jobs": + testutils.Respond(t, w, http.StatusOK, + `{"jobs":[{"id":"72b6753ab1984be6a65055b95ea9dd32","status":"RUNNING"},{"id":"3473947d1115c155513014cc6ecbd2fa","status":"RUNNING"},{"id":"9b12cb10b119b957b085c08e49bde3f2","status":"RESTARTING"},{"id":"fc308f1ac8c23b5f5a7942742b253917","status":"RESTARTING"}]}`, + ) + case "/jobs/72b6753ab1984be6a65055b95ea9dd32/config": + testutils.Respond(t, w, http.StatusOK, + `{"jid":"72b6753ab1984be6a65055b95ea9dd32","name":"data-test-external-voucher-dagger","execution-config":{"execution-mode":"PIPELINED","restart-strategy":"Cluster level default restart strategy","job-parallelism":1,"object-reuse-mode":false,"user-config":{"SINK_KAFKA_TOPIC":"test_external_voucher","FLINK_ROWTIME_ATTRIBUTE_NAME":"rowtime","ENABLE_STENCIL_URL":"true","FLINK_SQL_QUERY":"SELECT member_ids.member_id as customer_id, '96962e7a-cd9e-4fb2-87fe-96091c124de6' as voucher_batch_id, rowtime as event_timestampfrom table1, UNNEST(table1.members) AS member_ids (member_id)where segment_name = 'testdagger' and action = 'ADD_MEMBERS'","SINK_TYPE":"kafka","PROCESSOR_PREPROCESSOR_CONFIG":"","FLINK_WATERMARK_INTERVAL_MS":"60000","FLINK_PARALLELISM":"1","SINK_INFLUX_BATCH_SIZE":"100","PROCESSOR_POSTPROCESSOR_ENABLE":"true","SINK_KAFKA_PROTO_MESSAGE":"com.company.esb.growth.AllocatePromoRequestMessage","PROCESSOR_PREPROCESSOR_ENABLE":"","SINK_INFLUX_MEASUREMENT_NAME":"data-test-external-voucher-dagger","SCHEMA_REGISTRY_STENCIL_ENABLE":"true","SINK_INFLUX_DB_NAME":"DAGGERS_COLLECTIVE","SCHEMA_REGISTRY_STENCIL_URLS":"http://data-systems-stencil.company.io/v1beta1/namespaces/company/schemas/esb/versions/68","SINK_KAFKA_STREAM":"data-dagstream","PROCESSOR_LONGBOW_GCP_INSTANCE_ID":"","PROCESSOR_POSTPROCESSOR_CONFIG":"{\"external_source\":{\"http\":[{\"endpoint\":\"ase1.company.io/internal/v2/voucher/allocate\",\"verb\":\"post\",\"request_pattern\":\"{\\\"voucher_batch_id\\\": \\\"%s\\\",\\\"customer_id\\\": \\\"%s-60\\\"}\",\"request_variables\":\"customer_id,customer_id\",\"stream_timeout\":\"5000\",\"connect_timeout\":\"5000\",\"fail_on_errors\":\"false\",\"capacity\":\"30\",\"headers\":{\"Content-Type\":\"application/json\",\"Accept-Language\":\"en\"},\"type\":\"com.company.esb.growth.AllocatePromoRequestMessage\",\"output_mapping\":{\"voucher_batch_id\":{\"path\":\"$.data.id\"}}}]}}","STREAMS":"[{\"SOURCE_KAFKA_TOPIC_NAMES\":\"segmentation-message\",\"INPUT_SCHEMA_PROTO_CLASS\":\"com.company.esb.segmentation.UpdateLogMessage\",\"INPUT_SCHEMA_TABLE\":\"table1\",\"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE\":\"false\",\"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET\":\"latest\",\"SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID\":\"data-test-external-voucher-dagger-0001\",\"SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS\":\"\",\"INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX\":\"3\",\"SOURCE_KAFKA_NAME\":\"data-dagstream\"}]","SINK_INFLUX_FLUSH_DURATION_MS":"1000","SINK_INFLUX_URL":"http://data-dagger-shared-influx.company.io:6798","SINK_KAFKA_BROKERS":"","FLINK_JOB_ID":"data-test-external-voucher-dagger","FLINK_WATERMARK_DELAY_MS":"1000","SINK_KAFKA_PROTO_KEY":"com.company.esb.growth.AllocatePromoRequestKey"}}}`, + ) + case "/jobs/3473947d1115c155513014cc6ecbd2fa/config": + testutils.Respond(t, w, http.StatusOK, + `{"jid":"3473947d1115c155513014cc6ecbd2fa","name":"data-booking-map-matching-dagger","execution-config":{"execution-mode":"PIPELINED","restart-strategy":"Cluster level default restart strategy","job-parallelism":1,"object-reuse-mode":false,"user-config":{"SINK_KAFKA_TOPIC":"booking-map-matching-log","FLINK_ROWTIME_ATTRIBUTE_NAME":"rowtime","SINK_BIGQUERY_TABLE_CLUSTERING_ENABLE":"false","SINK_TYPE":"kafka","FLINK_SQL_QUERY":"SELECT driver_id, booking_id, country_code, event_timestamp, booking_status, vehicle_type, driver_locations, ping_processing_driver_locations(driver_locations) as filtered_driver_locations, gh_map_matching_response, polylineFROM data_streams_0","SINK_BIGQUERY_TABLE_PARTITION_EXPIRY_MS":"-1","SINK_CONNECTOR_SCHEMA_PROTO_MESSAGE_CLASS":"","PROCESSOR_PREPROCESSOR_CONFIG":"","FLINK_WATERMARK_INTERVAL_MS":"60000","FLINK_PARALLELISM":"1","SINK_METRICS_APPLICATION_PREFIX":"dagger_","SINK_INFLUX_BATCH_SIZE":"100","SINK_KAFKA_PROTO_MESSAGE":"company.esb.cartography.erp.ERPMapMatchingLogV2Message","SINK_BIGQUERY_CLIENT_READ_TIMEOUT_MS":"-1","SINK_BIGQUERY_DATASET_LABELS":"","SCHEMA_REGISTRY_STENCIL_ENABLE":"true","SINK_INFLUX_DB_NAME":"DAGGERS_COLLECTIVE","SINK_BIGQUERY_TABLE_LABELS":"","SINK_CONNECTOR_SCHEMA_DATA_TYPE":"PROTOBUF","SINK_CONNECTOR_SCHEMA_PROTO_ALLOW_UNKNOWN_FIELDS_ENABLE":"false","SINK_INFLUX_URL":"http://data-dagger-shared-influx.company.io:6798","FLINK_JOB_ID":"data-booking-map-matching-dagger","SINK_KAFKA_BROKERS":"","PYTHON_UDF_ENABLE":"true","FLINK_WATERMARK_DELAY_MS":"1000","SINK_KAFKA_PROTO_KEY":"company.esb.cartography.erp.ERPMapMatchingLogV2Key","SINK_CONNECTOR_SCHEMA_PROTO_KEY_CLASS":"","SINK_BIGQUERY_BATCH_SIZE":"","SINK_BIGQUERY_METADATA_COLUMNS_TYPES":"","ENABLE_STENCIL_URL":"true","PYTHON_UDF_CONFIG":"{\"PYTHON_FILES\":\"gs://data-dagger-magic/python/master/88.41.13/python_udfs.zip\",\"PYTHON_ARCHIVES\":\"gs://data-dagger-magic/python/master/88.41.13/data.zip\",\"PYTHON_REQUIREMENTS\":\"gs://data-dagger-magic/python/master/88.41.13/requirements.txt\"}","SINK_CONNECTOR_SCHEMA_MESSAGE_MODE":"LOG_MESSAGE","PROCESSOR_POSTPROCESSOR_ENABLE":"true","SINK_BIGQUERY_CLIENT_CONNECT_TIMEOUT_MS":"-1","SINK_INFLUX_MEASUREMENT_NAME":"map-matching-dagger","PROCESSOR_PREPROCESSOR_ENABLE":"false","SINK_BIGQUERY_TABLE_NAME":"","SINK_KAFKA_STREAM":"data-dagstream","SCHEMA_REGISTRY_STENCIL_URLS":"http://data-systems-stencil.company.io/v1beta1/namespaces/company/schemas/esb/versions/590","SINK_BIGQUERY_GOOGLE_CLOUD_PROJECT_ID":"","SINK_BIGQUERY_DATASET_NAME":"","PROCESSOR_LONGBOW_GCP_INSTANCE_ID":"","PROCESSOR_POSTPROCESSOR_CONFIG":"{\"external_source\":{\"http\":[{\"endpoint\":\"http://11.126.1.18:6798/match\",\"verb\":\"post\",\"request_pattern\":\"{\"locations\":%s,\"hints\":{\"vehicle\":\"car\",\"instructions\":false,\"points_encoded\":true,\"gps_accuracy\":50}}\",\"request_variables\":\"filtered_driver_locations\",\"stream_timeout\":\"100000\",\"connect_timeout\":\"10000\",\"fail_on_errors\":\"false\",\"capacity\":\"30\",\"headers\":{\"content-type\":\"application/json\"},\"output_mapping\":{\"gh_map_matching_response\":{\"path\":\"$\"},\"polyline\":{\"path\":\"$.map_matching.edge_geometry_polyline\"}}}]},\"internal_source\":[{\"output_field\":\"driver_id\",\"value\":\"driver_id\",\"type\":\"sql\"},{\"output_field\":\"booking_id\",\"value\":\"booking_id\",\"type\":\"sql\"},{\"output_field\":\"country_code\",\"value\":\"country_code\",\"type\":\"sql\"},{\"output_field\":\"event_timestamp\",\"value\":\"event_timestamp\",\"type\":\"sql\"},{\"output_field\":\"booking_status\",\"value\":\"booking_status\",\"type\":\"sql\"},{\"output_field\":\"vehicle_type\",\"value\":\"vehicle_type\",\"type\":\"sql\"},{\"output_field\":\"driver_locations\",\"value\":\"driver_locations\",\"type\":\"sql\"},{\"output_field\":\"filtered_driver_locations\",\"value\":\"filtered_driver_locations\",\"type\":\"sql\"}]}","SINK_BIGQUERY_TABLE_CLUSTERING_KEYS":"","STREAMS":"[{\"INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX\":\"4\",\"INPUT_SCHEMA_PROTO_CLASS\":\"company.esb.cartography.erp.ERPMapMatchingLogV2Message\",\"INPUT_SCHEMA_TABLE\":\"data_streams_0\",\"SOURCE_DETAILS\":[{\"SOURCE_NAME\":\"KAFKA_CONSUMER\",\"SOURCE_TYPE\":\"UNBOUNDED\"}],\"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE\":\"false\",\"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET\":\"latest\",\"SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS\":\"company-mainstream.company.io:6668\",\"SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID\":\"data-booking-map-matching-dagger-0002\",\"SOURCE_KAFKA_NAME\":\"company-mainstream\",\"SOURCE_KAFKA_TOPIC_NAMES\":\"aggregated-busy-driver-location-ping\",\"SOURCE_PARQUET_FILE_DATE_RANGE\":null,\"SOURCE_PARQUET_FILE_PATHS\":null}]","SINK_INFLUX_FLUSH_DURATION_MS":"1000","SINK_BIGQUERY_METADATA_NAMESPACE":""}}}`, + ) + case "/jobs/9b12cb10b119b957b085c08e49bde3f2/config": + testutils.Respond(t, w, http.StatusOK, + `{"jid":"9b12cb10b119b957b085c08e49bde3f2","name":"data-eim-driver-nearby-staging-dagger","execution-config":{"execution-mode":"PIPELINED","restart-strategy":"Cluster level default restart strategy","job-parallelism":1,"object-reuse-mode":false,"user-config":{"SINK_KAFKA_TOPIC":"eim-driver-nearby-log","FLINK_ROWTIME_ATTRIBUTE_NAME":"rowtime","ENABLE_STENCIL_URL":"true","SINK_TYPE":"kafka","FLINK_SQL_QUERY":"SELECT gofood_booking.event_timestamp as event_timestamp, gofood_booking.order_number AS order_number, gofood_booking.service_type AS gofood_booking_message.service_type, gofood_booking.order_number AS gofood_booking_message.order_number, gofood_booking.order_url AS gofood_booking_message.order_url, gofood_booking.status AS gofood_booking_message.status, gofood_booking.event_timestamp AS gofood_booking_message.event_timestamp, gofood_booking.customer_id AS gofood_booking_message.customer_id, gofood_booking.customer_url AS gofood_booking_message.customer_url, gofood_booking.driver_id AS gofood_booking_message.driver_id, gofood_booking.driver_url AS gofood_booking_message.driver_url, gofood_booking.is_reblast AS gofood_booking_message.is_reblast, gofood_booking.activity_source AS gofood_booking_message.activity_source, gofood_booking.service_area_id AS gofood_booking_message.service_area_id, gofood_booking.payment_type AS gofood_booking_message.payment_type, gofood_booking.total_unsubsidised_price AS gofood_booking_message.total_unsubsidised_price, gofood_booking.customer_price AS gofood_booking_message.customer_price, gofood_booking.amount_paid_by_cash AS gofood_booking_message.amount_paid_by_cash, gofood_booking.amount_paid_by_credits AS gofood_booking_message.amount_paid_by_credits, gofood_booking.surcharge_amount AS gofood_booking_message.surcharge_amount, gofood_booking.tip_amount AS gofood_booking_message.tip_amount, gofood_booking.driver_cut_amount AS gofood_booking_message.driver_cut_amount, gofood_booking.requested_payment_type AS gofood_booking_message.requested_payment_type, gofood_booking.total_distance_in_kms AS gofood_booking_message.total_distance_in_kms, gofood_booking.route_polyline AS gofood_booking_message.route_polyline, gofood_booking.routes AS gofood_booking_message.routes, gofood_booking.driver_eta_pickup AS gofood_booking_message.driver_eta_pickup, gofood_booking.driver_eta_dropoff AS gofood_booking_message.driver_eta_dropoff, gofood_booking.driver_pickup_location AS gofood_booking_message.driver_pickup_location, gofood_booking.driver_dropoff_location AS gofood_booking_message.driver_dropoff_location, gofood_booking.customer_email AS gofood_booking_message.customer_email, gofood_booking.customer_name AS gofood_booking_message.customer_name, gofood_booking.customer_phone AS gofood_booking_message.customer_phone, gofood_booking.driver_email AS gofood_booking_message.driver_email, gofood_booking.driver_name AS gofood_booking_message.driver_name, gofood_booking.driver_phone AS gofood_booking_message.driver_phone, gofood_booking.driver_phone2 AS gofood_booking_message.driver_phone2, gofood_booking.driver_phone3 AS gofood_booking_message.driver_phone3, gofood_booking.cancel_reason_id AS gofood_booking_message.cancel_reason_id, gofood_booking.cancel_reason_description AS gofood_booking_message.cancel_reason_description, gofood_booking.cancel_source AS gofood_booking_message.cancel_source, gofood_booking.customer_type AS gofood_booking_message.customer_type, gofood_booking.cancel_owner AS gofood_booking_message.cancel_owner, gofood_booking.booking_creation_time AS gofood_booking_message.booking_creation_time, gofood_booking.total_customer_discount AS gofood_booking_message.total_customer_discount, gofood_booking.gopay_customer_discount AS gofood_booking_message.gopay_customer_discount, gofood_booking.voucher_customer_discount AS gofood_booking_message.voucher_customer_discount, gofood_booking.pickup_time AS gofood_booking_message.pickup_time, gofood_booking.driver_paid_in_cash AS gofood_booking_message.driver_paid_in_cash, gofood_booking.driver_paid_in_credit AS gofood_booking_message.driver_paid_in_credit, gofood_booking.receiver_name AS gofood_booking_message.receiver_name, gofood_booking.driver_photo_url AS gofood_booking_message.driver_photo_url, gofood_booking.previous_booking_status AS gofood_booking_message.previous_booking_status, gofood_booking.vehicle_type AS gofood_booking_message.vehicle_type, gofood_booking.customer_total_fare_without_surge AS gofood_booking_message.customer_total_fare_without_surge, gofood_booking.customer_surge_factor AS gofood_booking_message.customer_surge_factor, gofood_booking.customer_dynamic_surge AS gofood_booking_message.customer_dynamic_surge, gofood_booking.customer_dynamic_surge_enabled AS gofood_booking_message.customer_dynamic_surge_enabled, gofood_booking.driver_total_fare_without_surge AS gofood_booking_message.driver_total_fare_without_surge, gofood_booking.driver_surge_factor AS gofood_booking_message.driver_surge_factor, gofood_booking.driver_dynamic_surge AS gofood_booking_message.driver_dynamic_surge, gofood_booking.driver_dynamic_surge_enabled AS gofood_booking_message.driver_dynamic_surge_enabled, gofood_booking.driver_ata_pickup AS gofood_booking_message.driver_ata_pickup, gofood_booking.driver_ata_dropoff AS gofood_booking_message.driver_ata_dropoff, gofood_booking.gcm_key AS gofood_booking_message.gcm_key, gofood_booking.device_token AS gofood_booking_message.device_token, gofood_booking.pricing_service_id AS gofood_booking_message.pricing_service_id, gofood_booking.payment_invoice_number AS gofood_booking_message.payment_invoice_number, gofood_booking.pricing_currency AS gofood_booking_message.pricing_currency, gofood_booking.country_code AS gofood_booking_message.country_code, gofood_booking.service_area_tzname AS gofood_booking_message.service_area_tzname, gofood_booking.payment_option_type AS gofood_booking_message.payment_option_type, gofood_booking.payment_option_metadata AS gofood_booking_message.payment_option_metadata, gofood_booking.payment_option_name AS gofood_booking_message.payment_option_name, gofood_booking.booking_info AS gofood_booking_message.booking_info, gofood_booking.price_edit_reason AS gofood_booking_message.price_edit_reason, gofood_booking.driver_arrived_location AS gofood_booking_message.driver_arrived_location, gofood_booking.driver_arrived_time AS gofood_booking_message.driver_arrived_time, gofood_booking.driver_order_placed_location AS gofood_booking_message.driver_order_placed_location, gofood_booking.driver_order_placed_time AS gofood_booking_message.driver_order_placed_time, gofood_booking.merchant_accepted_time AS gofood_booking_message.merchant_accepted_time, gofood_booking.merchant_acknowledged_time AS gofood_booking_message.merchant_acknowledged_time, gofood_booking.merchant_received_time AS gofood_booking_message.merchant_received_time, gofood_booking.merchant_cancel_reason AS gofood_booking_message.merchant_cancel_reason, gofood_booking.merchant_cancel_time AS gofood_booking_message.merchant_cancel_time, gofood_booking.merchant_cancel_description AS gofood_booking_message.merchant_cancel_description, gofood_booking.takeaway_charges AS gofood_booking_message.takeaway_charges, gofood_booking.food_prepared_time AS gofood_booking_message.food_prepared_time, gofood_booking.cancel_reason_code AS gofood_booking_message.cancel_reason_code, gofood_booking.verification_requested_time AS gofood_booking_message.verification_requested_time, gofood_booking.customer_pickup_time AS gofood_booking_message.customer_pickup_time, gofood_booking.customer_pickup_location AS gofood_booking_message.customer_pickup_location, gofood_booking.verification_failed_time AS gofood_booking_message.verification_failed_time, gofood_booking.verification_requested_location AS gofood_booking_message.verification_requested_location, gofood_booking.convenience_fee AS gofood_booking_message.convenience_fee, gofood_booking.payment_actions AS gofood_booking_message.payment_actions, gofood_booking.marketplace_serviceability_log_id AS gofood_booking_message.marketplace_serviceability_log_id, gofood_booking.order_completion_time AS gofood_booking_message.order_completion_time, gofood_booking.restaurant_id AS gofood_booking_message.restaurant_id, gofood_booking.sub_status AS gofood_booking_message.sub_status, gofood_booking.shopping_price AS gofood_booking_message.shopping_price, gofood_booking.commission_price AS gofood_booking_message.commission_price, gofood_booking.withholding_income_tax AS gofood_booking_message.withholding_income_tax, gofood_booking.voucher_redeemed_value AS gofood_booking_message.voucher_redeemed_value, gofood_booking.voucher_commission AS gofood_booking_message.voucher_commission, gofood_booking.voucher_id AS gofood_booking_message.voucher_id, gofood_booking.voucher_title AS gofood_booking_message.voucher_title, gofood_booking.otp AS gofood_booking_message.otp, gofood_booking.driver_entered_price AS gofood_booking_message.driver_entered_price, gofood_booking.driver_wallet_id AS gofood_booking_message.driver_wallet_id, gofood_booking.saudagar_id AS gofood_booking_message.saudagar_id, gofood_booking.validated_at AS gofood_booking_message.validated_at, gofood_booking.merchant_wallet_id AS gofood_booking_message.merchant_wallet_id, gofood_booking.restaurant_uuid AS gofood_booking_message.restaurant_uuid, gofood_booking.search_id AS gofood_booking_message.search_id, gofood_booking.gopay_driver_reservation_id AS gofood_booking_message.gopay_driver_reservation_id, gofood_booking.voucher_batch_id AS gofood_booking_message.voucher_batch_id, gofood_booking.merchant_config AS gofood_booking_message.merchant_config, gofood_booking.brand_id AS gofood_booking_message.brand_id, gofood_booking.customer_wallet_id AS gofood_booking_message.customer_wallet_id, gofood_booking.customer_payment_details AS gofood_booking_message.customer_payment_details, gofood_booking.merchant_phone AS gofood_booking_message.merchant_phone, gofood_booking.previous_sub_status AS gofood_booking_message.previous_sub_status, gofood_booking.merchant_acceptance_deadline AS gofood_booking_message.merchant_acceptance_deadline, gofood_booking.inapplicable_voucher_id AS gofood_booking_message.inapplicable_voucher_id, gofood_booking.driver_fee_adjustments AS gofood_booking_message.driver_fee_adjustments, gofood_booking.receipt_url AS gofood_booking_message.receipt_url, gofood_booking.is_goresto AS gofood_booking_message.is_goresto, gofood_booking.shopping_items AS gofood_booking_message.shopping_items, gofood_booking.has_promo AS gofood_booking_message.has_promo, gofood_booking.analytics AS gofood_booking_message.analytics, gofood_booking.fraud_reason AS gofood_booking_message.fraud_reason, gofood_booking.driver_completion_time AS gofood_booking_message.driver_completion_time, gofood_booking.use_service_wallet_fund_flow AS gofood_booking_message.use_service_wallet_fund_flow, gofood_booking.campaign_discounts AS gofood_booking_message.campaign_discounts, gofood_booking.tracer_bullet AS gofood_booking_message.tracer_bullet, gofood_booking.bid_delay_in_seconds AS gofood_booking_message.bid_delay_in_seconds, gofood_booking.driver_gopay_account_id AS gofood_booking_message.driver_gopay_account_id, gofood_booking.use_gopay_v3 AS gofood_booking_message.use_gopay_v3, gofood_booking.experiments AS gofood_booking_message.experiments, gofood_booking.eta_in_minutes AS gofood_booking_message.eta_in_minutes, gofood_booking.eta_source AS gofood_booking_message.eta_source, gofood_booking.eta_performance_bucket AS gofood_booking_message.eta_performance_bucket, gofood_booking.payment_method AS gofood_booking_message.payment_method, gofood_booking.vehicle_info AS gofood_booking_message.vehicle_info, gofood_booking.actor AS gofood_booking_message.actor, gofood_booking.vehicle_tags AS gofood_booking_message.vehicle_tags, gofood_booking.goclub AS gofood_booking_message.goclub, gofood_nearby.event_timestamp AS gofood_nearby_message.event_timestamp, gofood_nearby.order_number AS gofood_nearby_message.order_number, gofood_nearby.driver_id AS gofood_nearby_message.driver_id, gofood_nearby.customer_id AS gofood_nearby_message.customer_id, gofood_nearby.restaurant_uuid AS gofood_nearby_message.restaurant_uuid, gofood_nearby.saudagar_id AS gofood_nearby_message.saudagar_id, gofood_nearby.type AS gofood_nearby_message.type, gofood_nearby.radius AS gofood_nearby_message.radius, gofood_nearby.actor AS gofood_nearby_message.actorFROM gofood_nearby JOIN gofood_booking ON gofood_nearby.order_number = gofood_booking.order_number AND gofood_nearby.driver_id = gofood_booking.driver_id AND gofood_nearby.customer_id = gofood_booking.customer_id AND gofood_nearby.restaurant_uuid = gofood_booking.restaurant_uuid AND gofood_nearby.rowtime BETWEEN (gofood_booking.rowtime - INTERVAL '10' MINUTE) AND (gofood_booking.rowtime + INTERVAL '40' MINUTE) AND gofood_booking.sub_status = 'OTW_PICKUP' AND gofood_nearby.type = 'PICKUP' AND gofood_nearby.actor = 'DRIVER'","PYTHON_UDF_CONFIG":"{\"PYTHON_FILES\": \"gs://data-dagger/python/master/latest/python_udfs.zip\",\"PYTHON_REQUIREMENTS\": \"gs://data-dagger/python/master/latest/requirements.txt\",\"PYTHON_ARCHIVES\": \"gs://data-dagger/python/master/latest/data.zip#data\",\"PYTHON_FN_EXECUTION_ARROW_BATCH_SIZE\": \"10000\",\"PYTHON_FN_EXECUTION_BUNDLE_SIZE\": \"100000\",\"PYTHON_FN_EXECUTION_BUNDLE_TIME\": \"1000\"}","PROCESSOR_PREPROCESSOR_CONFIG":"","FLINK_WATERMARK_INTERVAL_MS":"30000","FLINK_PARALLELISM":"1","SINK_INFLUX_BATCH_SIZE":"100","PROCESSOR_POSTPROCESSOR_ENABLE":"false","SINK_KAFKA_PROTO_MESSAGE":"company.esb.gomerchants.eim.EimDriverNearbyEventMessage","SINK_INFLUX_MEASUREMENT_NAME":"data-eim-driver-nearby-staging-dagger","PROCESSOR_PREPROCESSOR_ENABLE":"","SCHEMA_REGISTRY_STENCIL_ENABLE":"true","SINK_INFLUX_DB_NAME":"DAGGERS_COLLECTIVE","SINK_KAFKA_STREAM":"data-dagstream","SCHEMA_REGISTRY_STENCIL_URLS":"http://data-systems-stencil.company.io/v1beta1/namespaces/company/schemas/esb/versions/72","PROCESSOR_LONGBOW_GCP_INSTANCE_ID":"","PROCESSOR_POSTPROCESSOR_CONFIG":"{}","STREAMS":"[{\"INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX\":\"5\",\"INPUT_SCHEMA_PROTO_CLASS\":\"company.esb.booking.GoFoodBookingLogMessage\",\"INPUT_SCHEMA_TABLE\":\"gofood_booking\",\"SOURCE_DETAILS\":[{\"SOURCE_NAME\":\"KAFKA_CONSUMER\",\"SOURCE_TYPE\":\"UNBOUNDED\"}],\"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE\":\"false\",\"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET\":\"latest\",\"SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS\":\"\",\"SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID\":\"data-eim-driver-nearby-staging-dagger-0039\",\"SOURCE_KAFKA_NAME\":\"company-mainstream\",\"SOURCE_KAFKA_TOPIC_NAMES\":\"gofood-booking-log\"},{\"INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX\":\"1\",\"INPUT_SCHEMA_PROTO_CLASS\":\"company.esb.gofood.NearbyEventMessage\",\"INPUT_SCHEMA_TABLE\":\"gofood_nearby\",\"SOURCE_DETAILS\":[{\"SOURCE_NAME\":\"KAFKA_CONSUMER\",\"SOURCE_TYPE\":\"UNBOUNDED\"}],\"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE\":\"false\",\"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET\":\"latest\",\"SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS\":\"\",\"SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID\":\"data-eim-driver-nearby-staging-dagger-0040\",\"SOURCE_KAFKA_NAME\":\"company-mainstream\",\"SOURCE_KAFKA_TOPIC_NAMES\":\"gofood-nearby-log\"}]","SINK_INFLUX_FLUSH_DURATION_MS":"1000","SINK_INFLUX_URL":"http://data-dagger-shared-influx.company.io:6798","FLINK_JOB_ID":"data-eim-driver-nearby-staging-dagger","SINK_KAFKA_BROKERS":"","PYTHON_UDF_ENABLE":"false","FLINK_WATERMARK_DELAY_MS":"1000","SINK_KAFKA_PROTO_KEY":"company.esb.gomerchants.eim.EimDriverNearbyEventKey"}}}`, + ) + case "/jobs/fc308f1ac8c23b5f5a7942742b253917/config": + testutils.Respond(t, w, http.StatusInternalServerError, http.StatusText(http.StatusInternalServerError)) + default: + t.Error("Unexpected HTTP call on", r.URL.Path) + } + }, + expected: []*v1beta2.Asset{ + { + Name: "data-test-external-voucher-dagger", + Type: "job", + Data: testutils.BuildAny(t, &v1beta2.Job{ + Attributes: &structpb.Struct{Fields: map[string]*structpb.Value{ + "job_id": structpb.NewStringValue("72b6753ab1984be6a65055b95ea9dd32"), + "job_parallelism": structpb.NewNumberValue(1), + "config": structpb.NewStructValue(&structpb.Struct{Fields: map[string]*structpb.Value{ + "ENABLE_STENCIL_URL": structpb.NewStringValue("true"), + "FLINK_JOB_ID": structpb.NewStringValue("data-test-external-voucher-dagger"), + "FLINK_PARALLELISM": structpb.NewStringValue("1"), + "FLINK_ROWTIME_ATTRIBUTE_NAME": structpb.NewStringValue("rowtime"), + "FLINK_SQL_QUERY": structpb.NewStringValue("SELECT member_ids.member_id as customer_id, '96962e7a-cd9e-4fb2-87fe-96091c124de6' as voucher_batch_id, rowtime as event_timestampfrom table1, UNNEST(table1.members) AS member_ids (member_id)where segment_name = 'testdagger' and action = 'ADD_MEMBERS'"), + "FLINK_WATERMARK_DELAY_MS": structpb.NewStringValue("1000"), + "FLINK_WATERMARK_INTERVAL_MS": structpb.NewStringValue("60000"), + "PROCESSOR_LONGBOW_GCP_INSTANCE_ID": structpb.NewStringValue(""), + "PROCESSOR_POSTPROCESSOR_CONFIG": structpb.NewStringValue(`{"external_source":{"http":[{"endpoint":"ase1.company.io/internal/v2/voucher/allocate","verb":"post","request_pattern":"{\"voucher_batch_id\": \"%s\",\"customer_id\": \"%s-60\"}","request_variables":"customer_id,customer_id","stream_timeout":"5000","connect_timeout":"5000","fail_on_errors":"false","capacity":"30","headers":{"Content-Type":"application/json","Accept-Language":"en"},"type":"com.company.esb.growth.AllocatePromoRequestMessage","output_mapping":{"voucher_batch_id":{"path":"$.data.id"}}}]}}`), + "PROCESSOR_POSTPROCESSOR_ENABLE": structpb.NewStringValue("true"), + "PROCESSOR_PREPROCESSOR_CONFIG": structpb.NewStringValue(""), + "PROCESSOR_PREPROCESSOR_ENABLE": structpb.NewStringValue(""), + "SCHEMA_REGISTRY_STENCIL_ENABLE": structpb.NewStringValue("true"), + "SCHEMA_REGISTRY_STENCIL_URLS": structpb.NewStringValue("http://data-systems-stencil.company.io/v1beta1/namespaces/company/schemas/esb/versions/68"), + "SINK_INFLUX_BATCH_SIZE": structpb.NewStringValue("100"), + "SINK_INFLUX_DB_NAME": structpb.NewStringValue("DAGGERS_COLLECTIVE"), + "SINK_INFLUX_FLUSH_DURATION_MS": structpb.NewStringValue("1000"), + "SINK_INFLUX_MEASUREMENT_NAME": structpb.NewStringValue("data-test-external-voucher-dagger"), + "SINK_INFLUX_URL": structpb.NewStringValue("http://data-dagger-shared-influx.company.io:6798"), + "SINK_KAFKA_BROKERS": structpb.NewStringValue(""), + "SINK_KAFKA_PROTO_KEY": structpb.NewStringValue("com.company.esb.growth.AllocatePromoRequestKey"), + "SINK_KAFKA_PROTO_MESSAGE": structpb.NewStringValue("com.company.esb.growth.AllocatePromoRequestMessage"), + "SINK_KAFKA_STREAM": structpb.NewStringValue("data-dagstream"), + "SINK_KAFKA_TOPIC": structpb.NewStringValue("test_external_voucher"), + "SINK_TYPE": structpb.NewStringValue("kafka"), + "STREAMS": structpb.NewStringValue(`[{"SOURCE_KAFKA_TOPIC_NAMES":"segmentation-message","INPUT_SCHEMA_PROTO_CLASS":"com.company.esb.segmentation.UpdateLogMessage","INPUT_SCHEMA_TABLE":"table1","SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE":"false","SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET":"latest","SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID":"data-test-external-voucher-dagger-0001","SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS":"","INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX":"3","SOURCE_KAFKA_NAME":"data-dagstream"}]`), + }}), + }}, + }), + }, + { + Name: "data-booking-map-matching-dagger", + Type: "job", + Data: testutils.BuildAny(t, &v1beta2.Job{ + Attributes: &structpb.Struct{Fields: map[string]*structpb.Value{ + "job_id": structpb.NewStringValue("3473947d1115c155513014cc6ecbd2fa"), + "job_parallelism": structpb.NewNumberValue(1), + "config": structpb.NewStructValue(&structpb.Struct{Fields: map[string]*structpb.Value{ + "ENABLE_STENCIL_URL": structpb.NewStringValue("true"), + "FLINK_JOB_ID": structpb.NewStringValue("data-booking-map-matching-dagger"), + "FLINK_PARALLELISM": structpb.NewStringValue("1"), + "FLINK_ROWTIME_ATTRIBUTE_NAME": structpb.NewStringValue("rowtime"), + "FLINK_SQL_QUERY": structpb.NewStringValue("SELECT driver_id, booking_id, country_code, event_timestamp, booking_status, vehicle_type, driver_locations, ping_processing_driver_locations(driver_locations) as filtered_driver_locations, gh_map_matching_response, polylineFROM data_streams_0"), + "FLINK_WATERMARK_DELAY_MS": structpb.NewStringValue("1000"), + "FLINK_WATERMARK_INTERVAL_MS": structpb.NewStringValue("60000"), + "PROCESSOR_LONGBOW_GCP_INSTANCE_ID": structpb.NewStringValue(""), + "PROCESSOR_POSTPROCESSOR_CONFIG": structpb.NewStringValue(`{"external_source":{"http":[{"endpoint":"http://11.126.1.18:6798/match","verb":"post","request_pattern":"{"locations":%s,"hints":{"vehicle":"car","instructions":false,"points_encoded":true,"gps_accuracy":50}}","request_variables":"filtered_driver_locations","stream_timeout":"100000","connect_timeout":"10000","fail_on_errors":"false","capacity":"30","headers":{"content-type":"application/json"},"output_mapping":{"gh_map_matching_response":{"path":"$"},"polyline":{"path":"$.map_matching.edge_geometry_polyline"}}}]},"internal_source":[{"output_field":"driver_id","value":"driver_id","type":"sql"},{"output_field":"booking_id","value":"booking_id","type":"sql"},{"output_field":"country_code","value":"country_code","type":"sql"},{"output_field":"event_timestamp","value":"event_timestamp","type":"sql"},{"output_field":"booking_status","value":"booking_status","type":"sql"},{"output_field":"vehicle_type","value":"vehicle_type","type":"sql"},{"output_field":"driver_locations","value":"driver_locations","type":"sql"},{"output_field":"filtered_driver_locations","value":"filtered_driver_locations","type":"sql"}]}`), + "PROCESSOR_POSTPROCESSOR_ENABLE": structpb.NewStringValue("true"), + "PROCESSOR_PREPROCESSOR_CONFIG": structpb.NewStringValue(""), + "PROCESSOR_PREPROCESSOR_ENABLE": structpb.NewStringValue("false"), + "PYTHON_UDF_CONFIG": structpb.NewStringValue(`{"PYTHON_FILES":"gs://data-dagger-magic/python/master/88.41.13/python_udfs.zip","PYTHON_ARCHIVES":"gs://data-dagger-magic/python/master/88.41.13/data.zip","PYTHON_REQUIREMENTS":"gs://data-dagger-magic/python/master/88.41.13/requirements.txt"}`), + "PYTHON_UDF_ENABLE": structpb.NewStringValue("true"), + "SCHEMA_REGISTRY_STENCIL_ENABLE": structpb.NewStringValue("true"), + "SCHEMA_REGISTRY_STENCIL_URLS": structpb.NewStringValue("http://data-systems-stencil.company.io/v1beta1/namespaces/company/schemas/esb/versions/590"), + "SINK_BIGQUERY_BATCH_SIZE": structpb.NewStringValue(""), + "SINK_BIGQUERY_CLIENT_CONNECT_TIMEOUT_MS": structpb.NewStringValue("-1"), + "SINK_BIGQUERY_CLIENT_READ_TIMEOUT_MS": structpb.NewStringValue("-1"), + "SINK_BIGQUERY_DATASET_LABELS": structpb.NewStringValue(""), + "SINK_BIGQUERY_DATASET_NAME": structpb.NewStringValue(""), + "SINK_BIGQUERY_GOOGLE_CLOUD_PROJECT_ID": structpb.NewStringValue(""), + "SINK_BIGQUERY_METADATA_COLUMNS_TYPES": structpb.NewStringValue(""), + "SINK_BIGQUERY_METADATA_NAMESPACE": structpb.NewStringValue(""), + "SINK_BIGQUERY_TABLE_CLUSTERING_ENABLE": structpb.NewStringValue("false"), + "SINK_BIGQUERY_TABLE_CLUSTERING_KEYS": structpb.NewStringValue(""), + "SINK_BIGQUERY_TABLE_LABELS": structpb.NewStringValue(""), + "SINK_BIGQUERY_TABLE_NAME": structpb.NewStringValue(""), + "SINK_BIGQUERY_TABLE_PARTITION_EXPIRY_MS": structpb.NewStringValue("-1"), + "SINK_CONNECTOR_SCHEMA_DATA_TYPE": structpb.NewStringValue("PROTOBUF"), + "SINK_CONNECTOR_SCHEMA_MESSAGE_MODE": structpb.NewStringValue("LOG_MESSAGE"), + "SINK_CONNECTOR_SCHEMA_PROTO_ALLOW_UNKNOWN_FIELDS_ENABLE": structpb.NewStringValue("false"), + "SINK_CONNECTOR_SCHEMA_PROTO_KEY_CLASS": structpb.NewStringValue(""), + "SINK_CONNECTOR_SCHEMA_PROTO_MESSAGE_CLASS": structpb.NewStringValue(""), + "SINK_INFLUX_BATCH_SIZE": structpb.NewStringValue("100"), + "SINK_INFLUX_DB_NAME": structpb.NewStringValue("DAGGERS_COLLECTIVE"), + "SINK_INFLUX_FLUSH_DURATION_MS": structpb.NewStringValue("1000"), + "SINK_INFLUX_MEASUREMENT_NAME": structpb.NewStringValue("map-matching-dagger"), + "SINK_INFLUX_URL": structpb.NewStringValue("http://data-dagger-shared-influx.company.io:6798"), + "SINK_KAFKA_BROKERS": structpb.NewStringValue(""), + "SINK_KAFKA_PROTO_KEY": structpb.NewStringValue("company.esb.cartography.erp.ERPMapMatchingLogV2Key"), + "SINK_KAFKA_PROTO_MESSAGE": structpb.NewStringValue("company.esb.cartography.erp.ERPMapMatchingLogV2Message"), + "SINK_KAFKA_STREAM": structpb.NewStringValue("data-dagstream"), + "SINK_KAFKA_TOPIC": structpb.NewStringValue("booking-map-matching-log"), + "SINK_METRICS_APPLICATION_PREFIX": structpb.NewStringValue("dagger_"), + "SINK_TYPE": structpb.NewStringValue("kafka"), + "STREAMS": structpb.NewStringValue(`[{"INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX":"4","INPUT_SCHEMA_PROTO_CLASS":"company.esb.cartography.erp.ERPMapMatchingLogV2Message","INPUT_SCHEMA_TABLE":"data_streams_0","SOURCE_DETAILS":[{"SOURCE_NAME":"KAFKA_CONSUMER","SOURCE_TYPE":"UNBOUNDED"}],"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE":"false","SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET":"latest","SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS":"company-mainstream.company.io:6668","SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID":"data-booking-map-matching-dagger-0002","SOURCE_KAFKA_NAME":"company-mainstream","SOURCE_KAFKA_TOPIC_NAMES":"aggregated-busy-driver-location-ping","SOURCE_PARQUET_FILE_DATE_RANGE":null,"SOURCE_PARQUET_FILE_PATHS":null}]`), + }}), + }}, + }), + }, + { + Name: "data-eim-driver-nearby-staging-dagger", + Type: "job", + Data: testutils.BuildAny(t, &v1beta2.Job{ + Attributes: &structpb.Struct{Fields: map[string]*structpb.Value{ + "job_id": structpb.NewStringValue("9b12cb10b119b957b085c08e49bde3f2"), + "job_parallelism": structpb.NewNumberValue(1), + "config": structpb.NewStructValue(&structpb.Struct{Fields: map[string]*structpb.Value{ + "ENABLE_STENCIL_URL": structpb.NewStringValue("true"), + "FLINK_JOB_ID": structpb.NewStringValue("data-eim-driver-nearby-staging-dagger"), + "FLINK_PARALLELISM": structpb.NewStringValue("1"), + "FLINK_ROWTIME_ATTRIBUTE_NAME": structpb.NewStringValue("rowtime"), + "FLINK_SQL_QUERY": structpb.NewStringValue("SELECT gofood_booking.event_timestamp as event_timestamp, gofood_booking.order_number AS order_number, gofood_booking.service_type AS gofood_booking_message.service_type, gofood_booking.order_number AS gofood_booking_message.order_number, gofood_booking.order_url AS gofood_booking_message.order_url, gofood_booking.status AS gofood_booking_message.status, gofood_booking.event_timestamp AS gofood_booking_message.event_timestamp, gofood_booking.customer_id AS gofood_booking_message.customer_id, gofood_booking.customer_url AS gofood_booking_message.customer_url, gofood_booking.driver_id AS gofood_booking_message.driver_id, gofood_booking.driver_url AS gofood_booking_message.driver_url, gofood_booking.is_reblast AS gofood_booking_message.is_reblast, gofood_booking.activity_source AS gofood_booking_message.activity_source, gofood_booking.service_area_id AS gofood_booking_message.service_area_id, gofood_booking.payment_type AS gofood_booking_message.payment_type, gofood_booking.total_unsubsidised_price AS gofood_booking_message.total_unsubsidised_price, gofood_booking.customer_price AS gofood_booking_message.customer_price, gofood_booking.amount_paid_by_cash AS gofood_booking_message.amount_paid_by_cash, gofood_booking.amount_paid_by_credits AS gofood_booking_message.amount_paid_by_credits, gofood_booking.surcharge_amount AS gofood_booking_message.surcharge_amount, gofood_booking.tip_amount AS gofood_booking_message.tip_amount, gofood_booking.driver_cut_amount AS gofood_booking_message.driver_cut_amount, gofood_booking.requested_payment_type AS gofood_booking_message.requested_payment_type, gofood_booking.total_distance_in_kms AS gofood_booking_message.total_distance_in_kms, gofood_booking.route_polyline AS gofood_booking_message.route_polyline, gofood_booking.routes AS gofood_booking_message.routes, gofood_booking.driver_eta_pickup AS gofood_booking_message.driver_eta_pickup, gofood_booking.driver_eta_dropoff AS gofood_booking_message.driver_eta_dropoff, gofood_booking.driver_pickup_location AS gofood_booking_message.driver_pickup_location, gofood_booking.driver_dropoff_location AS gofood_booking_message.driver_dropoff_location, gofood_booking.customer_email AS gofood_booking_message.customer_email, gofood_booking.customer_name AS gofood_booking_message.customer_name, gofood_booking.customer_phone AS gofood_booking_message.customer_phone, gofood_booking.driver_email AS gofood_booking_message.driver_email, gofood_booking.driver_name AS gofood_booking_message.driver_name, gofood_booking.driver_phone AS gofood_booking_message.driver_phone, gofood_booking.driver_phone2 AS gofood_booking_message.driver_phone2, gofood_booking.driver_phone3 AS gofood_booking_message.driver_phone3, gofood_booking.cancel_reason_id AS gofood_booking_message.cancel_reason_id, gofood_booking.cancel_reason_description AS gofood_booking_message.cancel_reason_description, gofood_booking.cancel_source AS gofood_booking_message.cancel_source, gofood_booking.customer_type AS gofood_booking_message.customer_type, gofood_booking.cancel_owner AS gofood_booking_message.cancel_owner, gofood_booking.booking_creation_time AS gofood_booking_message.booking_creation_time, gofood_booking.total_customer_discount AS gofood_booking_message.total_customer_discount, gofood_booking.gopay_customer_discount AS gofood_booking_message.gopay_customer_discount, gofood_booking.voucher_customer_discount AS gofood_booking_message.voucher_customer_discount, gofood_booking.pickup_time AS gofood_booking_message.pickup_time, gofood_booking.driver_paid_in_cash AS gofood_booking_message.driver_paid_in_cash, gofood_booking.driver_paid_in_credit AS gofood_booking_message.driver_paid_in_credit, gofood_booking.receiver_name AS gofood_booking_message.receiver_name, gofood_booking.driver_photo_url AS gofood_booking_message.driver_photo_url, gofood_booking.previous_booking_status AS gofood_booking_message.previous_booking_status, gofood_booking.vehicle_type AS gofood_booking_message.vehicle_type, gofood_booking.customer_total_fare_without_surge AS gofood_booking_message.customer_total_fare_without_surge, gofood_booking.customer_surge_factor AS gofood_booking_message.customer_surge_factor, gofood_booking.customer_dynamic_surge AS gofood_booking_message.customer_dynamic_surge, gofood_booking.customer_dynamic_surge_enabled AS gofood_booking_message.customer_dynamic_surge_enabled, gofood_booking.driver_total_fare_without_surge AS gofood_booking_message.driver_total_fare_without_surge, gofood_booking.driver_surge_factor AS gofood_booking_message.driver_surge_factor, gofood_booking.driver_dynamic_surge AS gofood_booking_message.driver_dynamic_surge, gofood_booking.driver_dynamic_surge_enabled AS gofood_booking_message.driver_dynamic_surge_enabled, gofood_booking.driver_ata_pickup AS gofood_booking_message.driver_ata_pickup, gofood_booking.driver_ata_dropoff AS gofood_booking_message.driver_ata_dropoff, gofood_booking.gcm_key AS gofood_booking_message.gcm_key, gofood_booking.device_token AS gofood_booking_message.device_token, gofood_booking.pricing_service_id AS gofood_booking_message.pricing_service_id, gofood_booking.payment_invoice_number AS gofood_booking_message.payment_invoice_number, gofood_booking.pricing_currency AS gofood_booking_message.pricing_currency, gofood_booking.country_code AS gofood_booking_message.country_code, gofood_booking.service_area_tzname AS gofood_booking_message.service_area_tzname, gofood_booking.payment_option_type AS gofood_booking_message.payment_option_type, gofood_booking.payment_option_metadata AS gofood_booking_message.payment_option_metadata, gofood_booking.payment_option_name AS gofood_booking_message.payment_option_name, gofood_booking.booking_info AS gofood_booking_message.booking_info, gofood_booking.price_edit_reason AS gofood_booking_message.price_edit_reason, gofood_booking.driver_arrived_location AS gofood_booking_message.driver_arrived_location, gofood_booking.driver_arrived_time AS gofood_booking_message.driver_arrived_time, gofood_booking.driver_order_placed_location AS gofood_booking_message.driver_order_placed_location, gofood_booking.driver_order_placed_time AS gofood_booking_message.driver_order_placed_time, gofood_booking.merchant_accepted_time AS gofood_booking_message.merchant_accepted_time, gofood_booking.merchant_acknowledged_time AS gofood_booking_message.merchant_acknowledged_time, gofood_booking.merchant_received_time AS gofood_booking_message.merchant_received_time, gofood_booking.merchant_cancel_reason AS gofood_booking_message.merchant_cancel_reason, gofood_booking.merchant_cancel_time AS gofood_booking_message.merchant_cancel_time, gofood_booking.merchant_cancel_description AS gofood_booking_message.merchant_cancel_description, gofood_booking.takeaway_charges AS gofood_booking_message.takeaway_charges, gofood_booking.food_prepared_time AS gofood_booking_message.food_prepared_time, gofood_booking.cancel_reason_code AS gofood_booking_message.cancel_reason_code, gofood_booking.verification_requested_time AS gofood_booking_message.verification_requested_time, gofood_booking.customer_pickup_time AS gofood_booking_message.customer_pickup_time, gofood_booking.customer_pickup_location AS gofood_booking_message.customer_pickup_location, gofood_booking.verification_failed_time AS gofood_booking_message.verification_failed_time, gofood_booking.verification_requested_location AS gofood_booking_message.verification_requested_location, gofood_booking.convenience_fee AS gofood_booking_message.convenience_fee, gofood_booking.payment_actions AS gofood_booking_message.payment_actions, gofood_booking.marketplace_serviceability_log_id AS gofood_booking_message.marketplace_serviceability_log_id, gofood_booking.order_completion_time AS gofood_booking_message.order_completion_time, gofood_booking.restaurant_id AS gofood_booking_message.restaurant_id, gofood_booking.sub_status AS gofood_booking_message.sub_status, gofood_booking.shopping_price AS gofood_booking_message.shopping_price, gofood_booking.commission_price AS gofood_booking_message.commission_price, gofood_booking.withholding_income_tax AS gofood_booking_message.withholding_income_tax, gofood_booking.voucher_redeemed_value AS gofood_booking_message.voucher_redeemed_value, gofood_booking.voucher_commission AS gofood_booking_message.voucher_commission, gofood_booking.voucher_id AS gofood_booking_message.voucher_id, gofood_booking.voucher_title AS gofood_booking_message.voucher_title, gofood_booking.otp AS gofood_booking_message.otp, gofood_booking.driver_entered_price AS gofood_booking_message.driver_entered_price, gofood_booking.driver_wallet_id AS gofood_booking_message.driver_wallet_id, gofood_booking.saudagar_id AS gofood_booking_message.saudagar_id, gofood_booking.validated_at AS gofood_booking_message.validated_at, gofood_booking.merchant_wallet_id AS gofood_booking_message.merchant_wallet_id, gofood_booking.restaurant_uuid AS gofood_booking_message.restaurant_uuid, gofood_booking.search_id AS gofood_booking_message.search_id, gofood_booking.gopay_driver_reservation_id AS gofood_booking_message.gopay_driver_reservation_id, gofood_booking.voucher_batch_id AS gofood_booking_message.voucher_batch_id, gofood_booking.merchant_config AS gofood_booking_message.merchant_config, gofood_booking.brand_id AS gofood_booking_message.brand_id, gofood_booking.customer_wallet_id AS gofood_booking_message.customer_wallet_id, gofood_booking.customer_payment_details AS gofood_booking_message.customer_payment_details, gofood_booking.merchant_phone AS gofood_booking_message.merchant_phone, gofood_booking.previous_sub_status AS gofood_booking_message.previous_sub_status, gofood_booking.merchant_acceptance_deadline AS gofood_booking_message.merchant_acceptance_deadline, gofood_booking.inapplicable_voucher_id AS gofood_booking_message.inapplicable_voucher_id, gofood_booking.driver_fee_adjustments AS gofood_booking_message.driver_fee_adjustments, gofood_booking.receipt_url AS gofood_booking_message.receipt_url, gofood_booking.is_goresto AS gofood_booking_message.is_goresto, gofood_booking.shopping_items AS gofood_booking_message.shopping_items, gofood_booking.has_promo AS gofood_booking_message.has_promo, gofood_booking.analytics AS gofood_booking_message.analytics, gofood_booking.fraud_reason AS gofood_booking_message.fraud_reason, gofood_booking.driver_completion_time AS gofood_booking_message.driver_completion_time, gofood_booking.use_service_wallet_fund_flow AS gofood_booking_message.use_service_wallet_fund_flow, gofood_booking.campaign_discounts AS gofood_booking_message.campaign_discounts, gofood_booking.tracer_bullet AS gofood_booking_message.tracer_bullet, gofood_booking.bid_delay_in_seconds AS gofood_booking_message.bid_delay_in_seconds, gofood_booking.driver_gopay_account_id AS gofood_booking_message.driver_gopay_account_id, gofood_booking.use_gopay_v3 AS gofood_booking_message.use_gopay_v3, gofood_booking.experiments AS gofood_booking_message.experiments, gofood_booking.eta_in_minutes AS gofood_booking_message.eta_in_minutes, gofood_booking.eta_source AS gofood_booking_message.eta_source, gofood_booking.eta_performance_bucket AS gofood_booking_message.eta_performance_bucket, gofood_booking.payment_method AS gofood_booking_message.payment_method, gofood_booking.vehicle_info AS gofood_booking_message.vehicle_info, gofood_booking.actor AS gofood_booking_message.actor, gofood_booking.vehicle_tags AS gofood_booking_message.vehicle_tags, gofood_booking.goclub AS gofood_booking_message.goclub, gofood_nearby.event_timestamp AS gofood_nearby_message.event_timestamp, gofood_nearby.order_number AS gofood_nearby_message.order_number, gofood_nearby.driver_id AS gofood_nearby_message.driver_id, gofood_nearby.customer_id AS gofood_nearby_message.customer_id, gofood_nearby.restaurant_uuid AS gofood_nearby_message.restaurant_uuid, gofood_nearby.saudagar_id AS gofood_nearby_message.saudagar_id, gofood_nearby.type AS gofood_nearby_message.type, gofood_nearby.radius AS gofood_nearby_message.radius, gofood_nearby.actor AS gofood_nearby_message.actorFROM gofood_nearby JOIN gofood_booking ON gofood_nearby.order_number = gofood_booking.order_number AND gofood_nearby.driver_id = gofood_booking.driver_id AND gofood_nearby.customer_id = gofood_booking.customer_id AND gofood_nearby.restaurant_uuid = gofood_booking.restaurant_uuid AND gofood_nearby.rowtime BETWEEN (gofood_booking.rowtime - INTERVAL '10' MINUTE) AND (gofood_booking.rowtime + INTERVAL '40' MINUTE) AND gofood_booking.sub_status = 'OTW_PICKUP' AND gofood_nearby.type = 'PICKUP' AND gofood_nearby.actor = 'DRIVER'"), + "FLINK_WATERMARK_DELAY_MS": structpb.NewStringValue("1000"), + "FLINK_WATERMARK_INTERVAL_MS": structpb.NewStringValue("30000"), + "PROCESSOR_LONGBOW_GCP_INSTANCE_ID": structpb.NewStringValue(""), + "PROCESSOR_POSTPROCESSOR_CONFIG": structpb.NewStringValue("{}"), + "PROCESSOR_POSTPROCESSOR_ENABLE": structpb.NewStringValue("false"), + "PROCESSOR_PREPROCESSOR_CONFIG": structpb.NewStringValue(""), + "PROCESSOR_PREPROCESSOR_ENABLE": structpb.NewStringValue(""), + "PYTHON_UDF_CONFIG": structpb.NewStringValue(`{"PYTHON_FILES": "gs://data-dagger/python/master/latest/python_udfs.zip","PYTHON_REQUIREMENTS": "gs://data-dagger/python/master/latest/requirements.txt","PYTHON_ARCHIVES": "gs://data-dagger/python/master/latest/data.zip#data","PYTHON_FN_EXECUTION_ARROW_BATCH_SIZE": "10000","PYTHON_FN_EXECUTION_BUNDLE_SIZE": "100000","PYTHON_FN_EXECUTION_BUNDLE_TIME": "1000"}`), + "PYTHON_UDF_ENABLE": structpb.NewStringValue("false"), + "SCHEMA_REGISTRY_STENCIL_ENABLE": structpb.NewStringValue("true"), + "SCHEMA_REGISTRY_STENCIL_URLS": structpb.NewStringValue("http://data-systems-stencil.company.io/v1beta1/namespaces/company/schemas/esb/versions/72"), + "SINK_INFLUX_BATCH_SIZE": structpb.NewStringValue("100"), + "SINK_INFLUX_DB_NAME": structpb.NewStringValue("DAGGERS_COLLECTIVE"), + "SINK_INFLUX_FLUSH_DURATION_MS": structpb.NewStringValue("1000"), + "SINK_INFLUX_MEASUREMENT_NAME": structpb.NewStringValue("data-eim-driver-nearby-staging-dagger"), + "SINK_INFLUX_URL": structpb.NewStringValue("http://data-dagger-shared-influx.company.io:6798"), + "SINK_KAFKA_BROKERS": structpb.NewStringValue(""), + "SINK_KAFKA_PROTO_KEY": structpb.NewStringValue("company.esb.gomerchants.eim.EimDriverNearbyEventKey"), + "SINK_KAFKA_PROTO_MESSAGE": structpb.NewStringValue("company.esb.gomerchants.eim.EimDriverNearbyEventMessage"), + "SINK_KAFKA_STREAM": structpb.NewStringValue("data-dagstream"), + "SINK_KAFKA_TOPIC": structpb.NewStringValue("eim-driver-nearby-log"), + "SINK_TYPE": structpb.NewStringValue("kafka"), + "STREAMS": structpb.NewStringValue(`[{"INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX":"5","INPUT_SCHEMA_PROTO_CLASS":"company.esb.booking.GoFoodBookingLogMessage","INPUT_SCHEMA_TABLE":"gofood_booking","SOURCE_DETAILS":[{"SOURCE_NAME":"KAFKA_CONSUMER","SOURCE_TYPE":"UNBOUNDED"}],"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE":"false","SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET":"latest","SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS":"","SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID":"data-eim-driver-nearby-staging-dagger-0039","SOURCE_KAFKA_NAME":"company-mainstream","SOURCE_KAFKA_TOPIC_NAMES":"gofood-booking-log"},{"INPUT_SCHEMA_EVENT_TIMESTAMP_FIELD_INDEX":"1","INPUT_SCHEMA_PROTO_CLASS":"company.esb.gofood.NearbyEventMessage","INPUT_SCHEMA_TABLE":"gofood_nearby","SOURCE_DETAILS":[{"SOURCE_NAME":"KAFKA_CONSUMER","SOURCE_TYPE":"UNBOUNDED"}],"SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_COMMIT_ENABLE":"false","SOURCE_KAFKA_CONSUMER_CONFIG_AUTO_OFFSET_RESET":"latest","SOURCE_KAFKA_CONSUMER_CONFIG_BOOTSTRAP_SERVERS":"","SOURCE_KAFKA_CONSUMER_CONFIG_GROUP_ID":"data-eim-driver-nearby-staging-dagger-0040","SOURCE_KAFKA_NAME":"company-mainstream","SOURCE_KAFKA_TOPIC_NAMES":"gofood-nearby-log"}]`), + }}), + }}, + }), + }, + }, + }, + { + name: "InvalidRequestConfigFromScript", + rawCfg: map[string]interface{}{ + "request": map[string]interface{}{ + "url": "{{serverURL}}/api/v1/endpoint", + "content_type": "application/json", + "accept": "application/json", + }, + "script": map[string]interface{}{ + "engine": "tengo", + "source": heredoc.Doc(` + responses := execute_request( + { + "url": "{{serverURL}}/api/v1/endpoint", + "content_type": "application/json", + "accept": "application/json" + }, + { + "content_type": "application/json", + "accept": "application/json" + } + ) + `), + }, + }, + handler: func(t *testing.T, w http.ResponseWriter, r *http.Request) { + testutils.Respond(t, w, http.StatusOK, `{}`) + }, + expectedErr: `Runtime Error: execute request: validate request config: {content_type: "application/json", accept: "application/json"}, Key: 'RequestConfig.url' Error:Field validation for 'url' failed on the 'required' tag`, + }, { name: "ConditionalExit", rawCfg: map[string]interface{}{ @@ -525,7 +762,7 @@ func TestExtract(t *testing.T) { "script": map[string]interface{}{ "engine": "tengo", "source": heredoc.Doc(` - if !response.success { + if !response.body.success { exit() } a := new_asset("invalid") @@ -651,4 +888,6 @@ func TestExtract(t *testing.T) { func replaceServerURL(cfg map[string]interface{}, serverURL string) { reqCfg := cfg["request"].(map[string]interface{}) reqCfg["url"] = strings.Replace(reqCfg["url"].(string), "{{serverURL}}", serverURL, 1) + scriptCfg := cfg["script"].(map[string]interface{}) + scriptCfg["source"] = strings.Replace(scriptCfg["source"].(string), "{{serverURL}}", serverURL, -1) } diff --git a/plugins/internal/tengoutil/secure_script.go b/plugins/internal/tengoutil/secure_script.go index 610969880..388fd2cb8 100644 --- a/plugins/internal/tengoutil/secure_script.go +++ b/plugins/internal/tengoutil/secure_script.go @@ -1,6 +1,8 @@ package tengoutil import ( + "fmt" + "github.com/d5/tengo/v2" "github.com/d5/tengo/v2/stdlib" ) @@ -10,15 +12,21 @@ const ( maxConsts = 500 ) -func NewSecureScript(input []byte) *tengo.Script { +func NewSecureScript(input []byte, globals map[string]interface{}) (*tengo.Script, error) { s := tengo.NewScript(input) s.SetImports(stdlib.GetModuleMap( - // `os` is excluded, should not be importable from script. + // `os` is excluded, should *not* be importable from script. "math", "text", "times", "rand", "fmt", "json", "base64", "hex", "enum", )) s.SetMaxAllocs(maxAllocs) s.SetMaxConstObjects(maxConsts) - return s + for name, v := range globals { + if err := s.Add(name, v); err != nil { + return nil, fmt.Errorf("new secure script: declare globals: %w", err) + } + } + + return s, nil } diff --git a/plugins/internal/tengoutil/secure_script_test.go b/plugins/internal/tengoutil/secure_script_test.go index 308eb0d8e..cb2445111 100644 --- a/plugins/internal/tengoutil/secure_script_test.go +++ b/plugins/internal/tengoutil/secure_script_test.go @@ -12,7 +12,7 @@ import ( func TestNewSecureScript(t *testing.T) { t.Run("Allows import of builtin modules except os", func(t *testing.T) { - s := NewSecureScript(([]byte)(heredoc.Doc(` + s, err := NewSecureScript(([]byte)(heredoc.Doc(` math := import("math") text := import("text") times := import("times") @@ -22,20 +22,37 @@ func TestNewSecureScript(t *testing.T) { base64 := import("base64") hex := import("hex") enum := import("enum") - `))) - _, err := s.Compile() + `)), nil) + assert.NoError(t, err) + _, err = s.Compile() assert.NoError(t, err) }) t.Run("os import disallowed", func(t *testing.T) { - s := NewSecureScript(([]byte)(`os := import("os")`)) - _, err := s.Compile() + s, err := NewSecureScript(([]byte)(`os := import("os")`), nil) + assert.NoError(t, err) + _, err = s.Compile() assert.ErrorContains(t, err, "Compile Error: module 'os' not found") }) t.Run("File import disallowed", func(t *testing.T) { - s := NewSecureScript(([]byte)(`sum := import("./testdata/sum")`)) - _, err := s.Compile() + s, err := NewSecureScript(([]byte)(`sum := import("./testdata/sum")`), nil) + assert.NoError(t, err) + _, err = s.Compile() assert.ErrorContains(t, err, "Compile Error: module './testdata/sum' not found") }) + + t.Run("Script globals", func(t *testing.T) { + s, err := NewSecureScript(([]byte)(`obj.prop = 1`), nil) + assert.NoError(t, err) + _, err = s.Compile() + assert.ErrorContains(t, err, "Compile Error: unresolved reference 'obj'") + + s, err = NewSecureScript(([]byte)(`obj.prop = 1`), map[string]interface{}{ + "obj": map[string]interface{}{}, + }) + assert.NoError(t, err) + _, err = s.Compile() + assert.NoError(t, err) + }) } diff --git a/plugins/internal/tengoutil/structmap/structmap.go b/plugins/internal/tengoutil/structmap/structmap.go index 60a813bd8..be0c2b0ab 100644 --- a/plugins/internal/tengoutil/structmap/structmap.go +++ b/plugins/internal/tengoutil/structmap/structmap.go @@ -42,12 +42,17 @@ func AsMap(v interface{}) (interface{}, error) { } func AsStruct(input, output interface{}) error { + return AsStructWithTag("json", input, output) +} + +func AsStructWithTag(tagName string, input, output interface{}) error { dec, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{ DecodeHook: mapstructure.ComposeDecodeHookFunc( checkAssetDataHookFunc(), stringToTimestampHookFunc(time.RFC3339), timeToTimestampHookFunc(), mapstructure.StringToTimeHookFunc(time.RFC3339), + mapstructure.StringToTimeDurationHookFunc(), mapToAttributesHookFunc(), mapToAnyPBHookFunc(), ), @@ -55,7 +60,7 @@ func AsStruct(input, output interface{}) error { ErrorUnused: true, ZeroFields: true, Result: output, - TagName: "json", + TagName: tagName, }) if err != nil { return fmt.Errorf("structmap: decode into %T: create decoder: %w", output, err) diff --git a/plugins/internal/tengoutil/structmap/structmap_test.go b/plugins/internal/tengoutil/structmap/structmap_test.go index 81df0d082..8df71c59e 100644 --- a/plugins/internal/tengoutil/structmap/structmap_test.go +++ b/plugins/internal/tengoutil/structmap/structmap_test.go @@ -181,6 +181,17 @@ func TestAsMap(t *testing.T) { } } +func TestAsStructWithTag(t *testing.T) { + type V struct { + Duration time.Duration `myspltag:"duration"` + } + input := map[string]interface{}{"duration": "5s"} + var v V + err := AsStructWithTag("myspltag", input, &v) + assert.NoError(t, err) + assert.Equal(t, V{Duration: time.Second * 5}, v) +} + func TestAsStruct(t *testing.T) { cases := []struct { name string @@ -383,7 +394,7 @@ func TestAsStruct(t *testing.T) { "type": "table", "data": map[string]interface{}{}, }, - expected: &v1beta2.Asset{}, + expected: &v1beta2.Asset{}, expectedErr: "invalid keys: does-not-exist", }, { diff --git a/plugins/processors/script/tengo_script.go b/plugins/processors/script/tengo_script.go index 0e3563f91..bd70d67a0 100644 --- a/plugins/processors/script/tengo_script.go +++ b/plugins/processors/script/tengo_script.go @@ -70,8 +70,10 @@ func (p *Processor) Init(ctx context.Context, config plugins.Config) error { return fmt.Errorf("script processor init: %w", err) } - s := tengoutil.NewSecureScript(([]byte)(p.config.Script)) - if err := p.declareGlobals(s); err != nil { + s, err := tengoutil.NewSecureScript(([]byte)(p.config.Script), map[string]interface{}{ + "asset": map[string]interface{}{}, + }) + if err != nil { return fmt.Errorf("script processor init: %w", err) } @@ -108,14 +110,3 @@ func (p *Processor) Process(ctx context.Context, src models.Record) (models.Reco return models.NewRecord(transformed), nil } - -func (p *Processor) declareGlobals(s *tengo.Script) error { - for name, v := range map[string]interface{}{ - "asset": map[string]interface{}{}, - } { - if err := s.Add(name, v); err != nil { - return fmt.Errorf("declare script globals: %w", err) - } - } - return nil -}