Skip to content

Commit

Permalink
feat(cascadingprocessor): Added status code filtering to cascading fi…
Browse files Browse the repository at this point in the history
…lter processor (#1600)

* Added status code filtering to Cascading filter processor
---------

Co-authored-by: Tim Chan <[email protected]>
  • Loading branch information
chan-tim-sumo and Tim Chan authored Jun 13, 2024
1 parent 37331d1 commit 76db872
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 3 deletions.
1 change: 1 addition & 0 deletions .changelog/1600.added.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
feat(cascadingfilter): Added status code filtering to cascading filter processor
19 changes: 18 additions & 1 deletion pkg/processor/cascadingfilterprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ Each of the specified drop rules has several properties:

- `name` (required): identifies the rule
- `name_pattern: <regex>`: selects the span if its operation name matches the provided regular expression
- `status_code: <string>`: supported string values: "Ok", "Unset", or "Error"
- `attributes: <list of attributes>`: list of attribute-level filters (both span level and resource level is being evaluated).
When several elements are specified, conditions for each of them must be met. Each entry might contain a number of fields:
- `key: <name>`: name of the attribute key
Expand Down Expand Up @@ -119,7 +120,7 @@ processors:
use_regex: true
```
### Filtering out healhtchecks and traffic shaping
### Filtering out healthchecks and traffic shaping
In the following example few more conditions were added:
Expand Down Expand Up @@ -153,6 +154,22 @@ cascadingfilter:
spans_per_second: 500 # <- adjust the output traffic level
```
### Just filtering out status code
Following example will drop all traces that match the following criteria:
- there is a ROOT span with status code of "Error" (since status code is "Error", "Ok", or "Unset")
(status code doc: https://opentelemetry-python.readthedocs.io/en/latest/api/trace.status.html)
```yaml
processors:
cascading_filter:
trace_reject_filters:
- name: remove-all-traces-with-error-status-code
status_code: "Error"
```
### Advanced configuration
It is additionally possible to use adaptive sampling, which will split the total spans per second budget across all the rules evenly (for up to specified limit). Additionally, it can be set that if there's any budget left, it can be filled with random traces.
Expand Down
24 changes: 24 additions & 0 deletions pkg/processor/cascadingfilterprocessor/cascade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
var testValue = 10 * time.Millisecond
var probabilisticFilteringRate = int32(10)
var healthCheckPattern = "health"
var statusCode = ptrace.StatusCodeError.String()

var cfg = cfconfig.Config{
CollectorInstances: 1,
DecisionWait: 2 * time.Second,
Expand All @@ -56,6 +58,7 @@ var cfg = cfconfig.Config{
{
Name: "health-check",
NamePattern: &healthCheckPattern,
StatusCode: &statusCode,
},
},
}
Expand All @@ -68,6 +71,7 @@ var cfgJustDropping = cfconfig.Config{
{
Name: "health-check",
NamePattern: &healthCheckPattern,
StatusCode: &statusCode,
},
},
}
Expand All @@ -90,6 +94,7 @@ var cfgAutoRate = cfconfig.Config{
{
Name: "health-check",
NamePattern: &healthCheckPattern,
StatusCode: &statusCode,
},
},
}
Expand All @@ -101,6 +106,7 @@ func fillSpan(span *ptrace.Span, durationMicros int64) {
span.Attributes().PutInt("foo", 55)
span.SetStartTimestamp(pcommon.Timestamp(startTime))
span.SetEndTimestamp(pcommon.Timestamp(nowTs))
span.Status().SetCode(ptrace.StatusCodeError)
}

func createTrace(c *cascade, numSpans int, durationMicros int64) *sampling.TraceData {
Expand Down Expand Up @@ -195,6 +201,24 @@ func TestDropTraces(t *testing.T) {
require.True(t, cascading.shouldBeDropped(pcommon.TraceID([16]byte{0}), trace2))
}

func TestDropTracesWithDifferentStatusCode(t *testing.T) {
cascading := createCascade(t)

trace1 := createTrace(cascading, 1, 1000000)
trace2 := createTrace(cascading, 1, 1000000)
trace3 := createTrace(cascading, 1, 1000000)

trace1.ReceivedBatches[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).SetName("health-check-trace-1")
trace1.ReceivedBatches[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Status().SetCode(ptrace.StatusCodeUnset)
trace2.ReceivedBatches[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).SetName("health-check-trace-2")
trace2.ReceivedBatches[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Status().SetCode(ptrace.StatusCodeOk)
trace3.ReceivedBatches[0].ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).SetName("health-check-trace-3")

require.False(t, cascading.shouldBeDropped(pcommon.TraceID([16]byte{0}), trace1))
require.False(t, cascading.shouldBeDropped(pcommon.TraceID([16]byte{0}), trace2))
require.True(t, cascading.shouldBeDropped(pcommon.TraceID([16]byte{0}), trace3))
}

func TestDropTracesAndNotLimitOthers(t *testing.T) {
cascading := createCascadeWithConfig(t, cfgJustDropping)

Expand Down
2 changes: 2 additions & 0 deletions pkg/processor/cascadingfilterprocessor/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ type TraceRejectCfg struct {
AttributeCfg []AttributeCfg `mapstructure:"attributes"`
// NamePattern (optional) describes a regular expression that must be met by any span operation name
NamePattern *string `mapstructure:"name_pattern"`
// StatusCode (optional) describes whether an operation succeeds or not (OK, ERROR, or UNSET)
StatusCode *string `mapstructure:"status_code"`
}

// Config holds the configuration for cascading-filter-based sampling.
Expand Down
4 changes: 4 additions & 0 deletions pkg/processor/cascadingfilterprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/otelcol/otelcoltest"
"go.opentelemetry.io/collector/pdata/ptrace"

cfconfig "github.com/SumoLogic/sumologic-otel-collector/pkg/processor/cascadingfilterprocessor/config"
)
Expand All @@ -45,6 +46,7 @@ func TestLoadConfig(t *testing.T) {
probFilteringRate := int32(100)
namePatternValue := "foo.*"
healthCheckNamePatternValue := "health.*"
statusCode := ptrace.StatusCodeError.String()

id1 := component.NewIDWithName(Type, "1")
assert.Equal(t, cfg.Processors[id1],
Expand All @@ -58,6 +60,7 @@ func TestLoadConfig(t *testing.T) {
{
Name: "healthcheck-rule",
NamePattern: &healthCheckNamePatternValue,
StatusCode: &statusCode,
},
},
TraceAcceptCfgs: []cfconfig.TraceAcceptCfg{
Expand Down Expand Up @@ -114,6 +117,7 @@ func TestLoadConfig(t *testing.T) {
{
Name: "healthcheck-rule",
NamePattern: &healthCheckNamePatternValue,
StatusCode: &statusCode,
},
{
Name: "remove-all-traces-with-healthcheck-service",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
package sampling

import (
"errors"
"regexp"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.uber.org/zap"

"github.com/SumoLogic/sumologic-otel-collector/pkg/processor/cascadingfilterprocessor/config"
Expand All @@ -28,10 +30,31 @@ type dropTraceEvaluator struct {
stringAttr *stringAttributeFilter
attrs []attributeFilter
operationRe *regexp.Regexp
statusCode *string

logger *zap.Logger
}

func validateStatusCode(statusCode *string) error {
if statusCode == nil {
return nil
}

validStatusCodes := []string{
ptrace.StatusCodeError.String(),
ptrace.StatusCodeOk.String(),
ptrace.StatusCodeUnset.String(),
}

for _, valid := range validStatusCodes {
if *statusCode == valid {
return nil
}
}

return errors.New("invalid status code: must be one of 'Error', 'Ok', or 'Unset' ")
}

var _ DropTraceEvaluator = (*dropTraceEvaluator)(nil)

// NewDropTraceEvaluator creates a drop trace evaluator that checks if trace should be dropped
Expand All @@ -55,11 +78,16 @@ func NewDropTraceEvaluator(logger *zap.Logger, cfg config.TraceRejectCfg) (DropT
}
}

if err := validateStatusCode(cfg.StatusCode); err != nil {
return nil, err
}

return &dropTraceEvaluator{
stringAttr: stringAttrFilter,
numericAttr: numericAttrFilter,
attrs: attrsFilter,
operationRe: operationRe,
statusCode: cfg.StatusCode,
logger: logger,
}, nil
}
Expand All @@ -74,6 +102,7 @@ func (dte *dropTraceEvaluator) ShouldDrop(_ pcommon.TraceID, trace *TraceData) b
matchingStringAttrFound := false
matchingNumericAttrFound := false
matchingAttrsFound := false
matchingStatusCodeFound := false

for _, batch := range batches {
rs := batch.ResourceSpans()
Expand Down Expand Up @@ -104,6 +133,13 @@ func (dte *dropTraceEvaluator) ShouldDrop(_ pcommon.TraceID, trace *TraceData) b
matchingNumericAttrFound = checkIfNumericAttrFound(span.Attributes(), dte.numericAttr)
}

if !matchingStatusCodeFound && dte.statusCode != nil && span.ParentSpanID().IsEmpty() {
statusCode := span.Status().Code()
if statusCode.String() == *dte.statusCode {
matchingStatusCodeFound = true
}
}

if dte.operationRe != nil && !matchingOperationFound {
if dte.operationRe.MatchString(span.Name()) {
matchingOperationFound = true
Expand All @@ -115,12 +151,13 @@ func (dte *dropTraceEvaluator) ShouldDrop(_ pcommon.TraceID, trace *TraceData) b
}

conditionMet := struct {
operationName, stringAttr, numericAttr, attrs bool
operationName, stringAttr, numericAttr, attrs, statusCode bool
}{
operationName: true,
stringAttr: true,
numericAttr: true,
attrs: true,
statusCode: true,
}

if dte.operationRe != nil {
Expand All @@ -136,5 +173,9 @@ func (dte *dropTraceEvaluator) ShouldDrop(_ pcommon.TraceID, trace *TraceData) b
conditionMet.attrs = matchingAttrsFound
}

return conditionMet.operationName && conditionMet.numericAttr && conditionMet.stringAttr && conditionMet.attrs
if dte.statusCode != nil {
conditionMet.statusCode = matchingStatusCodeFound
}

return conditionMet.operationName && conditionMet.numericAttr && conditionMet.stringAttr && conditionMet.attrs && conditionMet.statusCode
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ processors:
trace_reject_filters:
- name: healthcheck-rule
name_pattern: "health.*"
status_code: "Error"
trace_accept_filters:
- name: include-errors
spans_per_second: 200
Expand Down Expand Up @@ -42,6 +43,7 @@ processors:
trace_reject_filters:
- name: healthcheck-rule
name_pattern: "health.*"
status_code: "Error"
- name: remove-all-traces-with-healthcheck-service
string_attribute: {key: service.name, values: [healthcheck.*], use_regex: true}
trace_accept_filters:
Expand Down

0 comments on commit 76db872

Please sign in to comment.