Skip to content

Commit

Permalink
feat!: extend HTTP extractor (#461)
Browse files Browse the repository at this point in the history
- Add script global function `execute_request` for being able to execute
  1 or more HTTP requests using the request configuration options
  similar to recipe config.
- Instead of only providing the response body to the script, provide the
  {status_code, header, body}.
- Allow configuration of concurrency to control the number of concurrent
  HTTP requests made from inside the script.

BREAKING CHANGE: The response provided to the script in HTTP extractor
has the {status_code, header, body} instead of just the response body.
  • Loading branch information
sudo-suhas authored Jan 30, 2023
1 parent 72abc18 commit 2309f9f
Show file tree
Hide file tree
Showing 10 changed files with 619 additions and 116 deletions.
127 changes: 104 additions & 23 deletions plugins/extractors/http/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ The user specified script has access to the response, if the API call was
successful, and can use it for constructing and emitting assets using a custom
script. Currently, [Tengo][tengo] is the only supported script engine.

Refer Tengo documentation for script language syntax and supported functionality
\- https://github.com/d5/tengo/tree/v2.13.0#references
. [Tengo standard library modules][tengo-stdlib] can also be imported and used
if required.
Refer Tengo documentation for script language syntax and supported
functionality - https://github.com/d5/tengo/tree/v2.13.0#references.
[Tengo standard library modules][tengo-stdlib] can also be imported and used if
required (except the `os` module).

## Usage

Expand All @@ -46,6 +46,7 @@ source:
key: value
timeout: 5s
success_codes: [ 200 ]
concurrency: 3
script:
engine: tengo
source: |
Expand All @@ -56,19 +57,26 @@ source:
## Inputs
| Key | Value | Example | Description | Required? |
|:-----------------------|:--------------------|:---------------------------------------|:------------------------------------------------------------------------------------------------|:----------|
| `request.url` | `string` | `http://example.com/api/v1/endpoint` | The HTTP endpoint to send request to | ✅ |
| `request.query_params` | `[]{key, value}` | `[{"key":"s","value":"One Piece"}]` | The query parameters to be added to the request URL. | ✘ |
| `request.method` | `string` | `GET`/`POST` | The HTTP verb/method to use with request. Default is `GET`. | ✘ |
| `request.headers` | `map[string]string` | `{"Api-Token": "..."}` | Headers to send in the HTTP request. | ✘ |
| `request.content_type` | `string` | `application/json` | Content type for encoding request body. Also sent as a header. | ✅ |
| `request.accept` | `string` | `application/json` | Sent as the `Accept` header. Also indicates the format to use for decoding. | ✅ |
| `request.body` | `Object` | `{"key": "value"}` | The request body to be sent. | ✘ |
| `request.timeout` | `string` | `1s` | Timeout for the HTTP request. Default is 5s. | ✘ |
| `success_codes` | `[]int` | `[200]` | The list of status codes that would be considered as a successful response. Default is `[200]`. | ✘ |
| `script.engine` | `string` | `tengo` | Script engine. Only `"tengo"` is supported currently | ✅ |
| `script.source` | `string` | see [Worked Example](#worked-example). | [Tengo][tengo] script used to map the response into 0 or more assets. | ✅ |
| Key | Value | Example | Description | Required? |
|:----------------|:---------|:---------------------------------------|:------------------------------------------------------------------------------------------------|:----------|
| `request` | `Object` | see [Request](#request) | The configuration for constructing and sending HTTP request. | ✅ |
| `success_codes` | `[]int` | `[200]` | The list of status codes that would be considered as a successful response. Default is `[200]`. | ✘ |
| `concurrency` | `int` | `5` | Number of concurrent child requests to execute. Default is `5` | ✘ |
| `script.engine` | `string` | `tengo` | Script engine. Only `"tengo"` is supported currently | ✅ |
| `script.source` | `string` | see [Worked Example](#worked-example). | [Tengo][tengo] script used to map the response into 0 or more assets. | ✅ |

### Request

| Key | Value | Example | Description | Required? |
|:---------------|:--------------------|:-------------------------------------|:----------------------------------------------------------------------------|:----------|
| `url` | `string` | `http://example.com/api/v1/endpoint` | The HTTP endpoint to send request to | ✅ |
| `query_params` | `[]{key, value}` | `[{"key":"s","value":"One Piece"}]` | The query parameters to be added to the request URL. | ✘ |
| `method` | `string` | `GET`/`POST` | The HTTP verb/method to use with request. Default is `GET`. | ✘ |
| `headers` | `map[string]string` | `{"Api-Token": "..."}` | Headers to send in the HTTP request. | ✘ |
| `content_type` | `string` | `application/json` | Content type for encoding request body. Also sent as a header. | ✅ |
| `accept` | `string` | `application/json` | Sent as the `Accept` header. Also indicates the format to use for decoding. | ✅ |
| `body` | `Object` | `{"key": "value"}` | The request body to be sent. | ✘ |
| `timeout` | `string` | `1s` | Timeout for the HTTP request. Default is 5s. | ✘ |

### Notes

Expand All @@ -86,7 +94,24 @@ source:

#### `response`

HTTP response received.
HTTP response received with the `status_code`, `header` and `body`. Ex:

```json
{
"status_code": "200",
"header": {
"link": "</products?page=5&perPage=20>;rel=self,</products?page=0&perPage=20>;rel=first,</products?page=4&perPage=20>;rel=previous,</products?page=6&perPage=20>;rel=next,</products?page=26&perPage=20>;rel=last"
},
"body": [
{"id": 1, "name": "Widget #1"},
{"id": 2, "name": "Widget #2"},
{"id": 3, "name": "Widget #3"}
]
}
```

The header names are always in lower case. See
[Worked Example](#worked-example) for detailed usage.

#### `new_asset(string): Asset`

Expand Down Expand Up @@ -125,6 +150,60 @@ asset.data.full_name = "Daiyamondo Jozu"
Takes an asset and emits the asset that can then be consumed by the
processor/sink.

#### `execute_request(...requests)`

Takes 1 or more requests and executes the requests with the concurrency defined
in the recipe. The results are returned as an array. Each item in the array can
be an error or the HTTP response. The request object supports the properties
defined in the [Request](#request) input section.

When a request is executed, it can fail due to temporary errors such as network
errors. These instances need to be handled in the script.

[//]: # (@formatter:off)

```go
if !response.body.success {
exit()
}
reqs := []
for j in response.body.jobs {
reqs = append(reqs, {
url: format("http://my.server.com/jobs/%s/config", j.id),
method: "GET",
content_type: "application/json",
accept: "application/json",
timeout: "5s"
})
}
responses := execute_request(reqs...)
for r in responses {
if is_error(r) {
// TODO: Handle it appropriately. The error value has the request and
// error string:
// r.value.{request, error}
continue
}
asset := new_asset("job")
asset.name = r.body.name
exec_cfg := r.body["execution-config"]
asset.data.attributes = {
"job_id": r.body.jid,
"job_parallelism": exec_cfg["job-parallelism"],
"config": exec_cfg["user-config"]
}
emit(asset)
}
```

[//]: # (@formatter:on)

If the request passed to the function fails validation, a runtime error is
thrown.

#### `exit()`

Terminates the script execution.
Expand Down Expand Up @@ -200,11 +279,11 @@ source:
script:
engine: tengo
source: |
if !response.success {
if !response.body.success {
exit()
}
users := response.data
users := response.body.data
for u in users {
if u.email == "" {
continue
Expand All @@ -229,19 +308,21 @@ source:
manager_id: u.manager_id,
cost_center_id: u.cost_center_id,
supervisory_org_name: u.supervisory_org_name,
location_id: u.location_id
location_id: u.location_id,
service_job_id: response.header["x-job-id"]
}
emit(asset)
}
```

This would emit a 'User' asset for each user object in `response.data`.
This would emit a 'User' asset for each user object in `response.data`. Note
that the response headers can be accessed under `response.header` and can be
used as needed.

## Caveats

The following features are currently not supported:

- Pagination.
- Explicit authentication support, ex: Basic auth/OAuth/OAuth2/JWT etc.
- Retries with configurable backoff.
- Content type for request/response body other than `application/json`.
Expand Down
54 changes: 32 additions & 22 deletions plugins/extractors/http/execute_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,32 @@ import (
"fmt"
"io"
"net/http"
"strings"
)

func (e *Extractor) executeRequest(ctx context.Context) (interface{}, error) {
cfg := e.config
type executeRequestFunc func(ctx context.Context, reqCfg RequestConfig) (map[string]interface{}, error)

ctx, cancel := context.WithTimeout(ctx, cfg.Request.Timeout)
defer cancel()
func makeRequestExecutor(successCodes []int, httpClient *http.Client) executeRequestFunc {
return func(ctx context.Context, reqCfg RequestConfig) (map[string]interface{}, error) {
ctx, cancel := context.WithTimeout(ctx, reqCfg.Timeout)
defer cancel()

req, err := buildRequest(ctx, cfg)
if err != nil {
return nil, err
}
req, err := buildRequest(ctx, reqCfg)
if err != nil {
return nil, err
}

resp, err := e.http.Do(req)
defer drainBody(resp)
if err != nil {
return nil, fmt.Errorf("do request: %w", err)
}
resp, err := httpClient.Do(req)
defer drainBody(resp)
if err != nil {
return nil, fmt.Errorf("do request: %w", err)
}

return handleResponse(cfg, resp)
return handleResponse(successCodes, resp)
}
}

func buildRequest(ctx context.Context, cfg Config) (*http.Request, error) {
reqCfg := cfg.Request

func buildRequest(ctx context.Context, reqCfg RequestConfig) (*http.Request, error) {
body, err := asReader(reqCfg.Body)
if err != nil {
return nil, fmt.Errorf("encode request body: %w", err)
Expand Down Expand Up @@ -72,17 +73,26 @@ func addQueryParams(req *http.Request, params []QueryParam) {
req.URL.RawQuery = q.Encode()
}

func handleResponse(cfg Config, resp *http.Response) (interface{}, error) {
if !has(cfg.SuccessCodes, resp.StatusCode) {
func handleResponse(successCodes []int, resp *http.Response) (map[string]interface{}, error) {
if !has(successCodes, resp.StatusCode) {
return nil, fmt.Errorf("unsuccessful request: response status code: %d", resp.StatusCode)
}

var res interface{}
if err := json.NewDecoder(resp.Body).Decode(&res); err != nil {
h := make(map[string]interface{}, len(resp.Header))
for k := range resp.Header {
h[strings.ToLower(k)] = resp.Header.Get(k)
}

var body interface{}
if err := json.NewDecoder(resp.Body).Decode(&body); err != nil {
return nil, fmt.Errorf("decode response: %w", err)
}

return res, nil
return map[string]interface{}{
"status_code": resp.StatusCode,
"header": h,
"body": body,
}, nil
}

func asReader(v interface{}) (io.Reader, error) {
Expand Down
Loading

0 comments on commit 2309f9f

Please sign in to comment.