diff --git a/integration/lokie2e/common_test.go b/integration/lokie2e/common_test.go index c80d5286..0da1a890 100644 --- a/integration/lokie2e/common_test.go +++ b/integration/lokie2e/common_test.go @@ -147,6 +147,10 @@ func runTest( {`{http_method=~".+"} |= "\"method\": \"HEAD\"" |= "\"status\":500"`, 2}, {`{http_method=~".+"} |~ "\"method\":\\s*\"DELETE\""`, 20}, {`{http_method=~".+"} |~ "\"method\":\\s*\"HEAD\"" |= "\"status\":500"`, 2}, + // Try to not use offloading. + {`{http_method=~".+"} | line_format "{{ __line__ }}" |= "\"method\": \"DELETE\""`, 20}, + {`{http_method=~".+"} | line_format "{{ __line__ }}" |= "\"method\": \"HEAD\"" |= "\"status\":500"`, 2}, + {`{http_method=~".+"} |= "\"method\": \"HEAD\"" | line_format "{{ __line__ }}" |= "\"status\":500"`, 2}, // Negative line matcher. {`{http_method=~".+"} != "\"method\": \"HEAD\""`, len(set.Records) - 22}, {`{http_method=~".+"} !~ "\"method\":\\s*\"HEAD\""`, len(set.Records) - 22}, diff --git a/internal/logql/logqlengine/eval_streams.go b/internal/logql/logqlengine/eval_streams.go index ebdf671d..dae46b6a 100644 --- a/internal/logql/logqlengine/eval_streams.go +++ b/internal/logql/logqlengine/eval_streams.go @@ -80,7 +80,7 @@ func (e *Engine) selectLogs(ctx context.Context, sel logql.Selector, stages []lo params.Start = addDuration(params.Start, e.lookbackDuration) } - cond, err := extractQueryConditions(e.querierCaps, sel) + cond, err := extractQueryConditions(e.querierCaps, sel, stages) if err != nil { return nil, errors.Wrap(err, "extract preconditions") } diff --git a/internal/logql/logqlengine/precondition.go b/internal/logql/logqlengine/precondition.go index ac65dbe1..bb8a8489 100644 --- a/internal/logql/logqlengine/precondition.go +++ b/internal/logql/logqlengine/precondition.go @@ -9,7 +9,7 @@ type queryConditions struct { params SelectLogsParams } -func extractQueryConditions(caps QuerierСapabilities, sel logql.Selector) (cond queryConditions, _ error) { +func extractQueryConditions(caps QuerierСapabilities, sel logql.Selector, stages []logql.PipelineStage) (cond queryConditions, _ error) { var prefilters []Processor for _, lm := range sel.Matchers { @@ -36,5 +36,35 @@ func extractQueryConditions(caps QuerierСapabilities, sel logql.Selector) (cond cond.prefilter = &Pipeline{Stages: prefilters} } +stageLoop: + for _, stage := range stages { + switch stage := stage.(type) { + case *logql.LineFilter: + if stage.IP { + // Do not offload IP line filter. + continue + } + if !caps.Line.Supports(stage.Op) { + continue + } + cond.params.Line = append(cond.params.Line, *stage) + case *logql.JSONExpressionParser, + *logql.LogfmtExpressionParser, + *logql.RegexpLabelParser, + *logql.PatternLabelParser, + *logql.LabelFilter, + *logql.LabelFormatExpr, + *logql.DropLabelsExpr, + *logql.KeepLabelsExpr, + *logql.DistinctFilter: + // Do nothing on line, just skip. + case *logql.LineFormat, + *logql.DecolorizeExpr, + *logql.UnpackLabelParser: + // Stage modify the line, can't offload line filters after this stage. + break stageLoop + } + } + return cond, nil } diff --git a/internal/logql/logqlengine/precondition_test.go b/internal/logql/logqlengine/precondition_test.go index ab259258..fa02b153 100644 --- a/internal/logql/logqlengine/precondition_test.go +++ b/internal/logql/logqlengine/precondition_test.go @@ -10,7 +10,7 @@ import ( "github.com/go-faster/oteldb/internal/logql" ) -func Test_extractQueryConditions(t *testing.T) { +func TestExtractLabelQueryConditions(t *testing.T) { tests := []struct { sel logql.Selector labelCaps []logql.BinOp @@ -70,7 +70,7 @@ func Test_extractQueryConditions(t *testing.T) { var caps QuerierСapabilities caps.Label.Add(tt.labelCaps...) - conds, err := extractQueryConditions(caps, tt.sel) + conds, err := extractQueryConditions(caps, tt.sel, nil) if tt.wantErr { require.Error(t, err) return @@ -86,3 +86,74 @@ func Test_extractQueryConditions(t *testing.T) { }) } } + +func TestExtractLineQueryConditions(t *testing.T) { + tests := []struct { + stages []logql.PipelineStage + lineCaps []logql.BinOp + conds SelectLogsParams + wantErr bool + }{ + { + []logql.PipelineStage{ + &logql.DropLabelsExpr{}, + &logql.LineFilter{Op: logql.OpEq, Value: "first"}, + &logql.LineFilter{Op: logql.OpRe, Value: "regular.+", Re: regexp.MustCompile(`regular.+`)}, + &logql.DecolorizeExpr{}, + // These would not be offloaded. + &logql.LineFilter{Op: logql.OpEq, Value: "second"}, + &logql.LineFilter{Op: logql.OpRe, Value: "no+", Re: regexp.MustCompile(`no.+`)}, + }, + []logql.BinOp{ + logql.OpEq, + logql.OpRe, + }, + SelectLogsParams{ + Line: []logql.LineFilter{ + {Op: logql.OpEq, Value: "first"}, + {Op: logql.OpRe, Value: "regular.+", Re: regexp.MustCompile(`regular.+`)}, + }, + }, + false, + }, + { + []logql.PipelineStage{ + &logql.LineFilter{Op: logql.OpRe, Value: "a.+", Re: regexp.MustCompile(`a.+`)}, + &logql.DecolorizeExpr{}, + &logql.LineFilter{Op: logql.OpRe, Value: "b+", Re: regexp.MustCompile(`b.+`)}, + }, + []logql.BinOp{ + logql.OpEq, + }, + SelectLogsParams{}, + false, + }, + { + []logql.PipelineStage{ + &logql.LineFilter{Op: logql.OpEq, Value: "127.0.0.1", IP: true}, + }, + []logql.BinOp{ + logql.OpEq, + }, + SelectLogsParams{}, + false, + }, + } + for i, tt := range tests { + tt := tt + t.Run(fmt.Sprintf("Test%d", i+1), func(t *testing.T) { + var caps QuerierСapabilities + caps.Line.Add(tt.lineCaps...) + + conds, err := extractQueryConditions(caps, logql.Selector{}, tt.stages) + if tt.wantErr { + require.Error(t, err) + return + } + require.NoError(t, err) + + require.Equal(t, NopProcessor, conds.prefilter) + require.Equal(t, tt.conds, conds.params) + }) + } +} diff --git a/internal/logql/logqlengine/storage.go b/internal/logql/logqlengine/storage.go index 8c1aa88f..84c67b46 100644 --- a/internal/logql/logqlengine/storage.go +++ b/internal/logql/logqlengine/storage.go @@ -28,6 +28,7 @@ func (caps SupportedOps) Supports(op logql.BinOp) bool { // QuerierСapabilities defines what operations storage can do. type QuerierСapabilities struct { Label SupportedOps + Line SupportedOps } // Querier does queries to storage. @@ -44,4 +45,5 @@ type Querier interface { // SelectLogsParams is a storage query params. type SelectLogsParams struct { Labels []logql.LabelMatcher + Line []logql.LineFilter } diff --git a/internal/ytstorage/yql_querier_logs.go b/internal/ytstorage/yql_querier_logs.go index 9365cc1a..f2e125db 100644 --- a/internal/ytstorage/yql_querier_logs.go +++ b/internal/ytstorage/yql_querier_logs.go @@ -22,6 +22,7 @@ var _ logqlengine.Querier = (*YQLQuerier)(nil) // Сapabilities defines storage capabilities. func (q *YQLQuerier) Сapabilities() (caps logqlengine.QuerierСapabilities) { caps.Label.Add(logql.OpEq, logql.OpNotEq, logql.OpRe, logql.OpNotRe) + caps.Line.Add(logql.OpEq, logql.OpNotEq, logql.OpRe, logql.OpNotRe) return caps } @@ -59,7 +60,15 @@ func (q *YQLQuerier) SelectLogs(ctx context.Context, start, end otelstorage.Time // // See https://ytsaurus.tech/docs/en/yql/udf/list/pire#match // See https://ytsaurus.tech/docs/en/yql/udf/list/re2#match - fmt.Fprintf(&query, "$matcher_%d = Re2::Match(%q);\n", matcherIdx, m.Re) + fmt.Fprintf(&query, "$label_matcher_%d = Re2::Match(%q);\n", matcherIdx, m.Re) + } + } + for matcherIdx, m := range params.Line { + if m.Op.IsRegex() { + // Note that Re2::Grep is used. + // + // Line filter is looking for substring, not for exact match. + fmt.Fprintf(&query, "$line_matcher_%d = Re2::Grep(%q);\n", matcherIdx, m.Re) } } @@ -97,7 +106,7 @@ func (q *YQLQuerier) SelectLogs(ctx context.Context, start, end otelstorage.Time case logql.OpEq, logql.OpNotEq: fmt.Fprintf(&query, "Yson::ConvertToString(Yson::YPath(%s, %q)) = %q", column, yp, m.Value) case logql.OpRe, logql.OpNotRe: - fmt.Fprintf(&query, "$matcher_%d(Yson::ConvertToString(Yson::YPath(%s, %q)))", matcherIdx, column, yp) + fmt.Fprintf(&query, "$label_matcher_%d(Yson::ConvertToString(Yson::YPath(%s, %q)))", matcherIdx, column, yp) default: return nil, errors.Errorf("unexpected op %q", m.Op) } @@ -105,6 +114,29 @@ func (q *YQLQuerier) SelectLogs(ctx context.Context, start, end otelstorage.Time } query.WriteString("\t)\n") } + for matcherIdx, m := range params.Line { + query.WriteString("\t") + switch m.Op { + case logql.OpEq, logql.OpRe: + query.WriteString("AND ") + case logql.OpNotEq, logql.OpNotRe: + query.WriteString("AND NOT ") + default: + return nil, errors.Errorf("unexpected op %q", m.Op) + } + + // Note: predicate negated above. + switch m.Op { + case logql.OpEq, logql.OpNotEq: + fmt.Fprintf(&query, "String::Contains(body, %q)", m.Value) + case logql.OpRe, logql.OpNotRe: + fmt.Fprintf(&query, "$line_matcher_%d(body)", matcherIdx) + default: + return nil, errors.Errorf("unexpected op %q", m.Op) + } + query.WriteString("\n") + } + query.WriteString("ORDER BY `timestamp`") return yqlclient.YQLQuery[logstorage.Record](ctx, q.client, query.String()) diff --git a/internal/ytstorage/ytql_querier_logs.go b/internal/ytstorage/ytql_querier_logs.go index b45f95c1..2903da85 100644 --- a/internal/ytstorage/ytql_querier_logs.go +++ b/internal/ytstorage/ytql_querier_logs.go @@ -82,6 +82,7 @@ func (q *YTQLQuerier) Сapabilities() (caps logqlengine.QuerierСapabilities) { // FIXME(tdakkota): we don't add OpRe and OpNotRe because YT QL query executer throws an exception // when regexp function are used. caps.Label.Add(logql.OpEq, logql.OpNotEq) + caps.Line.Add(logql.OpEq, logql.OpNotEq) return caps } @@ -131,6 +132,19 @@ func (q *YTQLQuerier) SelectLogs(ctx context.Context, start, end otelstorage.Tim } query.WriteByte(')') } + for _, m := range params.Line { + switch m.Op { + case logql.OpEq: + query.WriteString(" AND ") + case logql.OpNotEq: + query.WriteString(" AND NOT ") + default: + return nil, errors.Errorf("unexpected op %q", m.Op) + } + + // Line filter checks if line contains given value. + fmt.Fprintf(&query, "is_substr(%q, body)", m.Value) + } r, err := q.yc.SelectRows(ctx, query.String(), nil) if err != nil {