Skip to content

Commit

Permalink
CSV parser dynamic fields (#404)
Browse files Browse the repository at this point in the history
* implement dynamic field names with header_label field

* replace inputRecord with inputEntry when testing parser functionality

* add test cases for csv dynamic field names

* test Build() errors

* test Process() errors

* csv parser dynamic field names

* document dynamic field names

* variable 'dynamic' is only used once, so just check if headerLabel is set instead

* remove numFields

* test multiple static and dynamic records with a single operator config

* split headers once when static, on every record when dynamic

* remove otlp output due to breaking change from upstream's git shistory

* downgrade container's because it is not working with go mod

* use go proxy and remove go module arg as it does not matter for go 1.16

* revert otlp removal
  • Loading branch information
Joseph Sirianni authored Aug 24, 2021
1 parent 4127075 commit a88e690
Show file tree
Hide file tree
Showing 7 changed files with 601 additions and 126 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ jobs:
command: mkdir {bin,out,tmp}
- run:
name: Build Stanza Agent
command: (cd ./cmd/stanza && GO111MODULE=on GOPROXY=direct go build -v -o ../../bin/stanza ./)
command: (cd ./cmd/stanza && go build -v -o ../../bin/stanza ./)
- run:
name: Build Log Bench
command: GOPROXY=direct go get github.com/observiq/amazon-log-agent-benchmark-tool/cmd/logbench/ &&
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Changed

- File Input: Added optional LabelRegex parameter, for parsing log file headers as labels [PR 376](https://github.com/observIQ/stanza/pull/376)
- CSV Parser: Dynamic field names [PR 404](https://github.com/observIQ/stanza/pull/404)
- ARM64 Container Image: [PR 381](https://github.com/observIQ/stanza/pull/381)
- TCP Input: Minimum TLS version is now configurable: [PR 400](https://github.com/observIQ/stanza/pull/400)
- Systemd service: Set `TimeoutSec` [PR 402](https://github.com/observIQ/stanza/pull/402)
Expand Down
92 changes: 79 additions & 13 deletions docs/operators/csv_parser.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,20 @@ The `csv_parser` operator parses the string-type field selected by `parse_from`

### Configuration Fields

| Field | Default | Description |
| --- | --- | --- |
| `id` | `csv_parser` | A unique identifier for the operator |
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries |
| `header` | required | A string of delimited field names. The values in the delimited header will be used as keys |
| `header_delimiter` | value of delimiter | A character that will be used as a delimiter for the header. Values `\r` and `\n` cannot be used as a delimiter |
| `delimiter` | `,` | A character that will be used as a delimiter. Values `\r` and `\n` cannot be used as a delimiter |
| `parse_from` | $ | A [field](/docs/types/field.md) that indicates the field to be parsed |
| `parse_to` | $ | A [field](/docs/types/field.md) that indicates the field to be parsed |
| `preserve_to` | | Preserves the unparsed value at the specified [field](/docs/types/field.md) |
| `on_error` | `send` | The behavior of the operator if it encounters an error. See [on_error](/docs/types/on_error.md) |
| `timestamp` | `nil` | An optional [timestamp](/docs/types/timestamp.md) block which will parse a timestamp field before passing the entry to the output operator |
| `severity` | `nil` | An optional [severity](/docs/types/severity.md) block which will parse a severity field before passing the entry to the output operator |
| Field | Default | Description |
| --- | --- | --- |
| `id` | `csv_parser` | A unique identifier for the operator |
| `output` | Next in pipeline | The connected operator(s) that will receive all outbound entries |
| `header` | required when `header_label` not set | A string of delimited field names |
| `header_label` | required when `header` not set | A label name to read the header field from, to support dynamic field names |
| `header_delimiter` | value of delimiter | A character that will be used as a delimiter for the header. Values `\r` and `\n` cannot be used as a delimiter |
| `delimiter` | `,` | A character that will be used as a delimiter. Values `\r` and `\n` cannot be used as a delimiter |
| `parse_from` | $ | A [field](/docs/types/field.md) that indicates the field to be parsed |
| `parse_to` | $ | A [field](/docs/types/field.md) that indicates the field to be parsed |
| `preserve_to` | | Preserves the unparsed value at the specified [field](/docs/types/field.md) |
| `on_error` | `send` | The behavior of the operator if it encounters an error. See [on_error](/docs/types/on_error.md) |
| `timestamp` | `nil` | An optional [timestamp](/docs/types/timestamp.md) block which will parse a timestamp field before passing the entry to the output operator |
| `severity` | `nil` | An optional [severity](/docs/types/severity.md) block which will parse a severity field before passing the entry to the output operator |

### Example Configurations

Expand Down Expand Up @@ -190,6 +191,71 @@ Configuration:
}
```

</td>
</tr>
</table>

#### Parse the field `message` using dynamic field names

Dynamic field names can be had when leveraging file_input's `label_regex`.

Configuration:

```yaml
- type: file_input
include:
- ./dynamic.log
start_at: beginning
label_regex: '^#(?P<key>.*?): (?P<value>.*)'

- type: csv_parser
delimiter: ","
header_label: Fields
```
Input File:
```
#Fields: "id,severity,message"
1,debug,Hello
```

<table>
<tr><td> Input record </td> <td> Output record </td></tr>
<tr>
<td>

Entry (from file_input):

```json
{
"timestamp": "",
"labels": {
"fields": "id,severity,message"
},
"record": {
"message": "1,debug,Hello"
}
}
```

</td>
<td>

```json
{
"timestamp": "",
"labels": {
"fields": "id,severity,message"
},
"record": {
"id": "1",
"severity": "debug",
"message": "Hello"
}
}
```

</td>
</tr>
</table>
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ require (
github.com/observiq/nanojack v0.0.0-20201106172433-343928847ebc
github.com/spf13/cobra v1.1.3
github.com/stretchr/testify v1.7.0
github.com/testcontainers/testcontainers-go v0.11.1
github.com/testcontainers/testcontainers-go v0.11.0
go.etcd.io/bbolt v1.3.6
go.opentelemetry.io/collector v0.13.0
go.uber.org/multierr v1.7.0
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -383,8 +383,8 @@ github.com/docker/distribution v2.7.1-0.20190205005809-0d3efadf0154+incompatible
github.com/docker/distribution v2.7.1+incompatible h1:a5mlkVzth6W5A4fOsS3D2EO5BUmsJpcB+cRlLU7cSug=
github.com/docker/distribution v2.7.1+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w=
github.com/docker/docker v17.12.0-ce-rc1.0.20200706150819-a40b877fbb9e+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
github.com/docker/docker v20.10.7+incompatible h1:Z6O9Nhsjv+ayUEeI1IojKbYcsGdgYSNqxe1s2MYzUhQ=
github.com/docker/docker v20.10.7+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
github.com/docker/docker v20.10.6+incompatible h1:oXI3Vas8TI8Eu/EjH4srKHJBVqraSzJybhxY7Om9faQ=
github.com/docker/docker v20.10.6+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKohAFqRJQ=
github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec=
github.com/docker/go-events v0.0.0-20170721190031-9461782956ad/go.mod h1:Uw6UezgYA44ePAFQYUehOuCzmy5zmg/+nl2ZfMWGkpA=
Expand Down Expand Up @@ -1368,8 +1368,8 @@ github.com/tcnksm/go-latest v0.0.0-20170313132115-e3007ae9052e h1:IWllFTiDjjLIf2
github.com/tcnksm/go-latest v0.0.0-20170313132115-e3007ae9052e/go.mod h1:d7u6HkTYKSv5m6MCKkOQlHwaShTMl3HjqSGW3XtVhXM=
github.com/tdakkota/asciicheck v0.0.0-20200416190851-d7f85be797a2 h1:Xr9gkxfOP0KQWXKNqmwe8vEeSUiUj4Rlee9CMVX2ZUQ=
github.com/tdakkota/asciicheck v0.0.0-20200416190851-d7f85be797a2/go.mod h1:yHp0ai0Z9gUljN3o0xMhYJnH/IcvkdTBOX2fmJ93JEM=
github.com/testcontainers/testcontainers-go v0.11.1 h1:FiYsB83LSGbiawoV8TpAZGfcCUbtaeeg1SXqEKUxh08=
github.com/testcontainers/testcontainers-go v0.11.1/go.mod h1:/V0UVq+1e7NWYoqTPog179clf0Qp9TOyp4EcXaEFQz8=
github.com/testcontainers/testcontainers-go v0.11.0 h1:HO5YOx2DYBHqcg4MzVWPj3FuHAv7USWVu94vCSsgiaM=
github.com/testcontainers/testcontainers-go v0.11.0/go.mod h1:HztBCODzuA+YpMXGK8amjO8j50jz2gcT0BOzSKUiYIs=
github.com/tetafro/godot v0.4.8 h1:h61+hQraWhdI6WYqMwAwZYCE5yxL6a9/Orw4REbabSU=
github.com/tetafro/godot v0.4.8/go.mod h1:/7NLHhv08H1+8DNj0MElpAACw1ajsCuf3TKNQxA5S+0=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
Expand Down
107 changes: 71 additions & 36 deletions operator/builtin/parser/csv/csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type CSVParserConfig struct {
helper.ParserConfig `yaml:",inline"`

Header string `json:"header" yaml:"header"`
HeaderLabel string `json:"header_label" yaml:"header_label"`
HeaderDelimiter string `json:"header_delimiter,omitempty" yaml:"header_delimiter,omitempty"`
FieldDelimiter string `json:"delimiter,omitempty" yaml:"delimiter,omitempty"`
}
Expand All @@ -39,8 +40,18 @@ func (c CSVParserConfig) Build(context operator.BuildContext) ([]operator.Operat
return nil, err
}

if c.Header == "" {
return nil, fmt.Errorf("Missing required field 'header'")
if c.Header == "" && c.HeaderLabel == "" {
return nil, fmt.Errorf("missing required field 'header' or 'header_label'")
}

if c.Header != "" && c.HeaderLabel != "" {
return nil, fmt.Errorf("only one header parameter can be set: 'header' or 'header_label'")
}

// configure dynamic header
dynamic := false
if c.HeaderLabel != "" {
dynamic = true
}

if c.FieldDelimiter == "" {
Expand All @@ -59,18 +70,19 @@ func (c CSVParserConfig) Build(context operator.BuildContext) ([]operator.Operat

headerDelimiter := []rune(c.HeaderDelimiter)[0]

if !strings.Contains(c.Header, c.HeaderDelimiter) {
if !dynamic && !strings.Contains(c.Header, c.HeaderDelimiter) {
return nil, fmt.Errorf("missing header delimiter in header")
}

numFields := len(strings.Split(c.Header, c.HeaderDelimiter))

csvParser := &CSVParser{
ParserOperator: parserOperator,
header: c.Header,
headerLabel: c.HeaderLabel,
headerDelimiter: headerDelimiter,
fieldDelimiter: fieldDelimiter,
numFields: numFields,

// initial parse function, overwritten when dynamic headers are enabled
parse: generateParseFunc(c.Header, headerDelimiter, fieldDelimiter, false),
}

return []operator.Operator{csvParser}, nil
Expand All @@ -80,47 +92,70 @@ func (c CSVParserConfig) Build(context operator.BuildContext) ([]operator.Operat
type CSVParser struct {
helper.ParserOperator
header string
headerLabel string
headerDelimiter rune
fieldDelimiter rune
numFields int
parse ParseFunc
}

// Process will parse an entry for csv.
func (r *CSVParser) Process(ctx context.Context, entry *entry.Entry) error {
return r.ParserOperator.ProcessWith(ctx, entry, r.parse)
}

// parse will parse a value using the supplied csv header.
func (r *CSVParser) parse(value interface{}) (interface{}, error) {
var csvLine string
switch t := value.(type) {
case string:
csvLine += t
case []byte:
csvLine += string(t)
default:
return nil, fmt.Errorf("type '%T' cannot be parsed as csv", value)
func (r *CSVParser) Process(ctx context.Context, e *entry.Entry) error {
if r.headerLabel != "" {
h, ok := e.Labels[r.headerLabel]
if !ok {
// TODO: returned error is not logged, so log it here
err := fmt.Errorf("failed to read dynamic header label %s", r.headerLabel)
r.Error(err)
return err
}
r.parse = generateParseFunc(h, r.headerDelimiter, r.fieldDelimiter, true)
}
return r.ParserOperator.ProcessWith(ctx, e, r.parse)
}

reader := csvparser.NewReader(strings.NewReader(csvLine))
reader.Comma = r.fieldDelimiter
reader.FieldsPerRecord = r.numFields
parsedValues := make(map[string]interface{})

for {
record, err := reader.Read()
if err == io.EOF {
break
type ParseFunc func(interface{}) (interface{}, error)

// generateParseFunc returns a parse function for a given header, allowing
// each entry to have a potentially unique set of fields when using dynamic
// field names retrieved from an entry's label
func generateParseFunc(header string, headerDelimiter, fieldDelimiter rune, isDynamic bool) ParseFunc {
headerFields := strings.Split(header, string([]rune{headerDelimiter}))

return func(value interface{}) (interface{}, error) {
var csvLine string
switch t := value.(type) {
case string:
csvLine += t
case []byte:
csvLine += string(t)
default:
return nil, fmt.Errorf("type '%T' cannot be parsed as csv", value)
}

if err != nil {
return nil, err
if isDynamic {
headerFields = strings.Split(header, string([]rune{headerDelimiter}))
}

for i, key := range strings.Split(r.header, string([]rune{r.headerDelimiter})) {
parsedValues[key] = record[i]
reader := csvparser.NewReader(strings.NewReader(csvLine))
reader.Comma = fieldDelimiter
reader.FieldsPerRecord = len(headerFields)
parsedValues := make(map[string]interface{})

for {
record, err := reader.Read()
if err == io.EOF {
break
}

if err != nil {
return nil, err
}

for i, key := range headerFields {
parsedValues[key] = record[i]
}
}
}

return parsedValues, nil
return parsedValues, nil
}
}
Loading

0 comments on commit a88e690

Please sign in to comment.