Skip to content

Commit

Permalink
feat(ytstorage): implement line filter offloading for YQL querier
Browse files Browse the repository at this point in the history
  • Loading branch information
tdakkota committed Sep 27, 2023
1 parent 1ded6bc commit c0e3a6c
Showing 1 changed file with 34 additions and 2 deletions.
36 changes: 34 additions & 2 deletions internal/ytstorage/yql_querier_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ var _ logqlengine.Querier = (*YQLQuerier)(nil)
// Сapabilities defines storage capabilities.
func (q *YQLQuerier) Сapabilities() (caps logqlengine.QuerierСapabilities) {
caps.Label.Add(logql.OpEq, logql.OpNotEq, logql.OpRe, logql.OpNotRe)
caps.Line.Add(logql.OpEq, logql.OpNotEq, logql.OpRe, logql.OpNotRe)
return caps
}

Expand Down Expand Up @@ -59,7 +60,15 @@ func (q *YQLQuerier) SelectLogs(ctx context.Context, start, end otelstorage.Time
//
// See https://ytsaurus.tech/docs/en/yql/udf/list/pire#match
// See https://ytsaurus.tech/docs/en/yql/udf/list/re2#match
fmt.Fprintf(&query, "$matcher_%d = Re2::Match(%q);\n", matcherIdx, m.Re)
fmt.Fprintf(&query, "$label_matcher_%d = Re2::Match(%q);\n", matcherIdx, m.Re)
}
}
for matcherIdx, m := range params.Line {
if m.Op.IsRegex() {
// Note that Re2::Grep is used.
//
// Line filter is looking for substring, not for exact match.
fmt.Fprintf(&query, "$line_matcher_%d = Re2::Grep(%q);\n", matcherIdx, m.Re)
}
}

Expand Down Expand Up @@ -97,14 +106,37 @@ func (q *YQLQuerier) SelectLogs(ctx context.Context, start, end otelstorage.Time
case logql.OpEq, logql.OpNotEq:
fmt.Fprintf(&query, "Yson::ConvertToString(Yson::YPath(%s, %q)) = %q", column, yp, m.Value)
case logql.OpRe, logql.OpNotRe:
fmt.Fprintf(&query, "$matcher_%d(Yson::ConvertToString(Yson::YPath(%s, %q)))", matcherIdx, column, yp)
fmt.Fprintf(&query, "$label_matcher_%d(Yson::ConvertToString(Yson::YPath(%s, %q)))", matcherIdx, column, yp)
default:
return nil, errors.Errorf("unexpected op %q", m.Op)
}
query.WriteString(" )\n")
}
query.WriteString("\t)\n")
}
for matcherIdx, m := range params.Line {
query.WriteString("\t")
switch m.Op {
case logql.OpEq, logql.OpRe:
query.WriteString("AND ")
case logql.OpNotEq, logql.OpNotRe:
query.WriteString("AND NOT ")
default:
return nil, errors.Errorf("unexpected op %q", m.Op)
}

// Note: predicate negated above.
switch m.Op {
case logql.OpEq, logql.OpNotEq:
fmt.Fprintf(&query, "String::Contains(body, %q)", m.Value)
case logql.OpRe, logql.OpNotRe:
fmt.Fprintf(&query, "$line_matcher_%d(body)", matcherIdx)
default:
return nil, errors.Errorf("unexpected op %q", m.Op)
}
query.WriteString("\n")
}

query.WriteString("ORDER BY `timestamp`")

return yqlclient.YQLQuery[logstorage.Record](ctx, q.client, query.String())
Expand Down

0 comments on commit c0e3a6c

Please sign in to comment.