From c0e3a6c36dbb07ef8fe72c985f0681f3fe545a26 Mon Sep 17 00:00:00 2001 From: tdakkota Date: Tue, 26 Sep 2023 13:52:02 +0300 Subject: [PATCH] feat(ytstorage): implement line filter offloading for YQL querier --- internal/ytstorage/yql_querier_logs.go | 36 ++++++++++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-) diff --git a/internal/ytstorage/yql_querier_logs.go b/internal/ytstorage/yql_querier_logs.go index 9365cc1a..f2e125db 100644 --- a/internal/ytstorage/yql_querier_logs.go +++ b/internal/ytstorage/yql_querier_logs.go @@ -22,6 +22,7 @@ var _ logqlengine.Querier = (*YQLQuerier)(nil) // Сapabilities defines storage capabilities. func (q *YQLQuerier) Сapabilities() (caps logqlengine.QuerierСapabilities) { caps.Label.Add(logql.OpEq, logql.OpNotEq, logql.OpRe, logql.OpNotRe) + caps.Line.Add(logql.OpEq, logql.OpNotEq, logql.OpRe, logql.OpNotRe) return caps } @@ -59,7 +60,15 @@ func (q *YQLQuerier) SelectLogs(ctx context.Context, start, end otelstorage.Time // // See https://ytsaurus.tech/docs/en/yql/udf/list/pire#match // See https://ytsaurus.tech/docs/en/yql/udf/list/re2#match - fmt.Fprintf(&query, "$matcher_%d = Re2::Match(%q);\n", matcherIdx, m.Re) + fmt.Fprintf(&query, "$label_matcher_%d = Re2::Match(%q);\n", matcherIdx, m.Re) + } + } + for matcherIdx, m := range params.Line { + if m.Op.IsRegex() { + // Note that Re2::Grep is used. + // + // Line filter is looking for substring, not for exact match. + fmt.Fprintf(&query, "$line_matcher_%d = Re2::Grep(%q);\n", matcherIdx, m.Re) } } @@ -97,7 +106,7 @@ func (q *YQLQuerier) SelectLogs(ctx context.Context, start, end otelstorage.Time case logql.OpEq, logql.OpNotEq: fmt.Fprintf(&query, "Yson::ConvertToString(Yson::YPath(%s, %q)) = %q", column, yp, m.Value) case logql.OpRe, logql.OpNotRe: - fmt.Fprintf(&query, "$matcher_%d(Yson::ConvertToString(Yson::YPath(%s, %q)))", matcherIdx, column, yp) + fmt.Fprintf(&query, "$label_matcher_%d(Yson::ConvertToString(Yson::YPath(%s, %q)))", matcherIdx, column, yp) default: return nil, errors.Errorf("unexpected op %q", m.Op) } @@ -105,6 +114,29 @@ func (q *YQLQuerier) SelectLogs(ctx context.Context, start, end otelstorage.Time } query.WriteString("\t)\n") } + for matcherIdx, m := range params.Line { + query.WriteString("\t") + switch m.Op { + case logql.OpEq, logql.OpRe: + query.WriteString("AND ") + case logql.OpNotEq, logql.OpNotRe: + query.WriteString("AND NOT ") + default: + return nil, errors.Errorf("unexpected op %q", m.Op) + } + + // Note: predicate negated above. + switch m.Op { + case logql.OpEq, logql.OpNotEq: + fmt.Fprintf(&query, "String::Contains(body, %q)", m.Value) + case logql.OpRe, logql.OpNotRe: + fmt.Fprintf(&query, "$line_matcher_%d(body)", matcherIdx) + default: + return nil, errors.Errorf("unexpected op %q", m.Op) + } + query.WriteString("\n") + } + query.WriteString("ORDER BY `timestamp`") return yqlclient.YQLQuery[logstorage.Record](ctx, q.client, query.String())