Skip to content

Commit

Permalink
Add an experimental flag to enable experimental promQL functions
Browse files Browse the repository at this point in the history
Signed-off-by: SungJin1212 <[email protected]>
  • Loading branch information
SungJin1212 committed Nov 21, 2024
1 parent fb0561e commit 62a556c
Show file tree
Hide file tree
Showing 11 changed files with 181 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* [FEATURE] Store Gateway: Add an in-memory chunk cache. #6245
* [FEATURE] Chunk Cache: Support multi level cache and add metrics. #6249
* [FEATURE] Distributor: Accept multiple HA Tracker pairs in the same request. #6256
* [ENHANCEMENT] Query Frontend/Querier: Add an experimental flag `-querier.enable-promql-experimental-functions` to enable experimental promQL functions. #6355
* [ENHANCEMENT] OTLP: Add `-distributor.otlp-max-recv-msg-size` flag to limit OTLP request size in bytes. #6333
* [ENHANCEMENT] S3 Bucket Client: Add a list objects version configs to configure list api object version. #6280
* [ENHANCEMENT] OpenStack Swift: Add application credential configs for Openstack swift object storage backend. #6255
Expand Down
4 changes: 4 additions & 0 deletions docs/blocks-storage/querier.md
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,10 @@ querier:
# evaluation like at Query Frontend or Ruler.
# CLI flag: -querier.ignore-max-query-length
[ignore_max_query_length: <boolean> | default = false]

# [Experimental] If true, experimental promQL functions are enabled.
# CLI flag: -querier.enable-promql-experimental-functions
[enable_promql_experimental_functions: <boolean> | default = false]
```
### `blocks_storage_config`
Expand Down
4 changes: 4 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -3997,6 +3997,10 @@ store_gateway_client:
# like at Query Frontend or Ruler.
# CLI flag: -querier.ignore-max-query-length
[ignore_max_query_length: <boolean> | default = false]

# [Experimental] If true, experimental promQL functions are enabled.
# CLI flag: -querier.enable-promql-experimental-functions
[enable_promql_experimental_functions: <boolean> | default = false]
```
### `query_frontend_config`
Expand Down
126 changes: 126 additions & 0 deletions integration/query_fuzz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,123 @@ func init() {
}
}

func TestExperimentalPromQLFuncsWithPrometheus(t *testing.T) {
prometheusLatestImage := "quay.io/prometheus/prometheus:v2.55.1"
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Start dependencies.
consul := e2edb.NewConsulWithName("consul")
require.NoError(t, s.StartAndWaitReady(consul))

baseFlags := mergeFlags(AlertmanagerLocalFlags(), BlocksStorageFlags())
flags := mergeFlags(
baseFlags,
map[string]string{
"-blocks-storage.tsdb.head-compaction-interval": "4m",
"-blocks-storage.tsdb.block-ranges-period": "2h",
"-blocks-storage.tsdb.ship-interval": "1h",
"-blocks-storage.bucket-store.sync-interval": "1s",
"-blocks-storage.tsdb.retention-period": "24h",
"-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory,
"-querier.query-store-for-labels-enabled": "true",
// Ingester.
"-ring.store": "consul",
"-consul.hostname": consul.NetworkHTTPEndpoint(),
// Distributor.
"-distributor.replication-factor": "1",
// Store-gateway.
"-store-gateway.sharding-enabled": "false",
// alert manager
"-alertmanager.web.external-url": "http://localhost/alertmanager",
"-frontend.query-vertical-shard-size": "1",
"-frontend.max-cache-freshness": "1m",
// enable experimental promQL funcs
"-querier.enable-promql-experimental-functions": "true",
},
)
// make alert manager config dir
require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{}))

minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(minio))

cortex := e2ecortex.NewSingleBinary("cortex", flags, "")
require.NoError(t, s.StartAndWaitReady(cortex))

// Wait until Cortex replicas have updated the ring state.
require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total"))

now := time.Now()
start := now.Add(-time.Hour * 2)
end := now.Add(-time.Hour)
numSeries := 10
numSamples := 60
lbls := make([]labels.Labels, 0, numSeries*2)
scrapeInterval := time.Minute
statusCodes := []string{"200", "400", "404", "500", "502"}
for i := 0; i < numSeries; i++ {
lbls = append(lbls, labels.Labels{
{Name: labels.MetricName, Value: "test_series_a"},
{Name: "job", Value: "test"},
{Name: "series", Value: strconv.Itoa(i % 3)},
{Name: "status_code", Value: statusCodes[i%5]},
})

lbls = append(lbls, labels.Labels{
{Name: labels.MetricName, Value: "test_series_b"},
{Name: "job", Value: "test"},
{Name: "series", Value: strconv.Itoa((i + 1) % 3)},
{Name: "status_code", Value: statusCodes[(i+1)%5]},
})
}

ctx := context.Background()
rnd := rand.New(rand.NewSource(time.Now().Unix()))

dir := filepath.Join(s.SharedDir(), "data")
err = os.MkdirAll(dir, os.ModePerm)
require.NoError(t, err)
storage, err := e2ecortex.NewS3ClientForMinio(minio, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, err)
bkt := bucket.NewUserBucketClient("user-1", storage.GetBucket(), nil)
id, err := e2e.CreateBlock(ctx, rnd, dir, lbls, numSamples, start.UnixMilli(), end.UnixMilli(), scrapeInterval.Milliseconds(), 10)
require.NoError(t, err)
err = block.Upload(ctx, log.Logger, bkt, filepath.Join(dir, id.String()), metadata.NoneFunc)
require.NoError(t, err)

// Wait for querier and store to sync blocks.
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Equals(float64(1)), []string{"cortex_blocks_meta_synced"}, e2e.WaitMissingMetrics, e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "component", "store-gateway"))))
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Equals(float64(1)), []string{"cortex_blocks_meta_synced"}, e2e.WaitMissingMetrics, e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "component", "querier"))))
require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Equals(float64(1)), []string{"cortex_bucket_store_blocks_loaded"}, e2e.WaitMissingMetrics))

c1, err := e2ecortex.NewClient("", cortex.HTTPEndpoint(), "", "", "user-1")
require.NoError(t, err)

err = writeFileToSharedDir(s, "prometheus.yml", []byte(""))
require.NoError(t, err)
prom := e2edb.NewPrometheus(prometheusLatestImage, map[string]string{
"--enable-feature": "promql-experimental-functions",
})
require.NoError(t, s.StartAndWaitReady(prom))

c2, err := e2ecortex.NewPromQueryClient(prom.HTTPEndpoint())
require.NoError(t, err)

waitUntilReady(t, ctx, c1, c2, `{job="test"}`, start, end)

opts := []promqlsmith.Option{
promqlsmith.WithEnableOffset(true),
promqlsmith.WithEnableAtModifier(true),
promqlsmith.WithEnabledFunctions(enabledFunctions),
promqlsmith.WithEnableExperimentalPromQLFunctions(true),
}
ps := promqlsmith.New(rnd, lbls, opts...)

runQueryFuzzTestCases(t, ps, c1, c2, end, start, end, scrapeInterval, 1000)
}

func TestDisableChunkTrimmingFuzz(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
Expand Down Expand Up @@ -1410,6 +1527,15 @@ func runQueryFuzzTestCases(t *testing.T, ps *promqlsmith.PromQLSmith, c1, c2 *e2
func isValidQuery(generatedQuery parser.Expr, maxDepth int) bool {
isValid := true
currentDepth := 0
// TODO(SungJin1212): Test limitk, limit_ratio
if strings.Contains(generatedQuery.String(), "limitk") {
// current skip the limitk
return false
}
if strings.Contains(generatedQuery.String(), "limit_ratio") {
// current skip the limit_ratio
return false
}
parser.Inspect(generatedQuery, func(node parser.Node, path []parser.Node) error {
if currentDepth > maxDepth {
isValid = false
Expand Down
1 change: 1 addition & 0 deletions pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,7 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro
t.Cfg.Querier.DefaultEvaluationInterval,
t.Cfg.Querier.MaxSubQuerySteps,
t.Cfg.Querier.LookbackDelta,
t.Cfg.Querier.EnablePromQLExperimentalFunctions,
)

return services.NewIdleService(nil, func(_ error) error {
Expand Down
8 changes: 7 additions & 1 deletion pkg/querier/querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/util/annotations"
Expand Down Expand Up @@ -89,7 +90,8 @@ type Config struct {
ThanosEngine bool `yaml:"thanos_engine"`

// Ignore max query length check at Querier.
IgnoreMaxQueryLength bool `yaml:"ignore_max_query_length"`
IgnoreMaxQueryLength bool `yaml:"ignore_max_query_length"`
EnablePromQLExperimentalFunctions bool `yaml:"enable_promql_experimental_functions"`
}

var (
Expand Down Expand Up @@ -132,6 +134,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.ThanosEngine, "querier.thanos-engine", false, "Experimental. Use Thanos promql engine https://github.com/thanos-io/promql-engine rather than the Prometheus promql engine.")
f.Int64Var(&cfg.MaxSubQuerySteps, "querier.max-subquery-steps", 0, "Max number of steps allowed for every subquery expression in query. Number of steps is calculated using subquery range / step. A value > 0 enables it.")
f.BoolVar(&cfg.IgnoreMaxQueryLength, "querier.ignore-max-query-length", false, "If enabled, ignore max query length check at Querier select method. Users can choose to ignore it since the validation can be done before Querier evaluation like at Query Frontend or Ruler.")
f.BoolVar(&cfg.EnablePromQLExperimentalFunctions, "querier.enable-promql-experimental-functions", false, "[Experimental] If true, experimental promQL functions are enabled.")
}

// Validate the config
Expand Down Expand Up @@ -204,6 +207,9 @@ func New(cfg Config, limits *validation.Overrides, distributor Distributor, stor
})
maxConcurrentMetric.Set(float64(cfg.MaxConcurrent))

// set EnableExperimentalFunctions
parser.EnableExperimentalFunctions = cfg.EnablePromQLExperimentalFunctions

var queryEngine promql.QueryEngine
opts := promql.EngineOpts{
Logger: logger,
Expand Down
10 changes: 8 additions & 2 deletions pkg/querier/tripperware/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,9 +288,9 @@ func sortPlanForQuery(q string) (sortPlan, error) {
if err != nil {
return 0, err
}
// Check if the root expression is topk or bottomk
// Check if the root expression is topk, bottomk, limitk, or limit_ratio
if aggr, ok := expr.(*promqlparser.AggregateExpr); ok {
if aggr.Op == promqlparser.TOPK || aggr.Op == promqlparser.BOTTOMK {
if aggr.Op == promqlparser.TOPK || aggr.Op == promqlparser.BOTTOMK || aggr.Op == promqlparser.LIMITK || aggr.Op == promqlparser.LIMIT_RATIO {
return mergeOnly, nil
}
}
Expand All @@ -303,6 +303,12 @@ func sortPlanForQuery(q string) (sortPlan, error) {
if n.Func.Name == "sort_desc" {
sortDesc = true
}
if n.Func.Name == "sort_by_label" {
sortAsc = true
}
if n.Func.Name == "sort_by_label_desc" {
sortDesc = true
}
}
}
return sortAsc, sortDesc
Expand Down
22 changes: 22 additions & 0 deletions pkg/querier/tripperware/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"testing"

"github.com/prometheus/prometheus/model/labels"
promqlparser "github.com/prometheus/prometheus/promql/parser"
"github.com/stretchr/testify/assert"

"github.com/cortexproject/cortex/pkg/cortexpb"
Expand Down Expand Up @@ -618,6 +619,16 @@ func Test_sortPlanForQuery(t *testing.T) {
expectedPlan: mergeOnly,
err: false,
},
{
query: "limitk(10, up)",
expectedPlan: mergeOnly,
err: false,
},
{
query: "limit_ratio(0.1, up)",
expectedPlan: mergeOnly,
err: false,
},
{
query: "1 + topk(10, up)",
expectedPlan: sortByLabels,
Expand All @@ -633,6 +644,16 @@ func Test_sortPlanForQuery(t *testing.T) {
expectedPlan: sortByValuesAsc,
err: false,
},
{
query: "1 + sort_by_label_desc(sum by (job) (up) )",
expectedPlan: sortByValuesDesc,
err: false,
},
{
query: "sort_by_label(topk by (job) (10, up))",
expectedPlan: sortByValuesAsc,
err: false,
},
{
query: "topk(5, up) by (job) + sort_desc(up)",
expectedPlan: sortByValuesDesc,
Expand All @@ -652,6 +673,7 @@ func Test_sortPlanForQuery(t *testing.T) {

for _, tc := range tc {
t.Run(tc.query, func(t *testing.T) {
promqlparser.EnableExperimentalFunctions = true
p, err := sortPlanForQuery(tc.query)
if tc.err {
assert.Error(t, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func TestRoundTrip(t *testing.T) {
time.Minute,
0,
0,
false,
)

for i, tc := range []struct {
Expand Down
6 changes: 6 additions & 0 deletions pkg/querier/tripperware/roundtrip.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/promql/parser"
"github.com/thanos-io/thanos/pkg/querysharding"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
Expand Down Expand Up @@ -104,7 +105,12 @@ func NewQueryTripperware(
defaultSubQueryInterval time.Duration,
maxSubQuerySteps int64,
lookbackDelta time.Duration,
enablePromQLExperimentalFunctions bool,
) Tripperware {

// set EnableExperimentalFunctions
parser.EnableExperimentalFunctions = enablePromQLExperimentalFunctions

// Per tenant query metrics.
queriesPerTenant := promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_frontend_queries_total",
Expand Down
1 change: 1 addition & 0 deletions pkg/querier/tripperware/roundtrip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ func TestRoundTrip(t *testing.T) {
time.Minute,
tc.maxSubQuerySteps,
0,
false,
)
resp, err := tw(downstream).RoundTrip(req)
if tc.expectedErr == nil {
Expand Down

0 comments on commit 62a556c

Please sign in to comment.