diff --git a/CHANGELOG.md b/CHANGELOG.md index c8c3f95425..356789350f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ * [FEATURE] Store Gateway: Add an in-memory chunk cache. #6245 * [FEATURE] Chunk Cache: Support multi level cache and add metrics. #6249 * [FEATURE] Distributor: Accept multiple HA Tracker pairs in the same request. #6256 +* [ENHANCEMENT] Query Frontend/Querier: Add an experimental flag `-querier.enable-promql-experimental-functions` to enable experimental promQL functions. #6355 * [ENHANCEMENT] OTLP: Add `-distributor.otlp-max-recv-msg-size` flag to limit OTLP request size in bytes. #6333 * [ENHANCEMENT] S3 Bucket Client: Add a list objects version configs to configure list api object version. #6280 * [ENHANCEMENT] OpenStack Swift: Add application credential configs for Openstack swift object storage backend. #6255 diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index 4fc1958ee0..9c78746070 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -253,6 +253,10 @@ querier: # evaluation like at Query Frontend or Ruler. # CLI flag: -querier.ignore-max-query-length [ignore_max_query_length: | default = false] + + # [Experimental] If true, experimental promQL functions are enabled. + # CLI flag: -querier.enable-promql-experimental-functions + [enable_promql_experimental_functions: | default = false] ``` ### `blocks_storage_config` diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index deb79957b7..67ae897e02 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -3997,6 +3997,10 @@ store_gateway_client: # like at Query Frontend or Ruler. # CLI flag: -querier.ignore-max-query-length [ignore_max_query_length: | default = false] + +# [Experimental] If true, experimental promQL functions are enabled. +# CLI flag: -querier.enable-promql-experimental-functions +[enable_promql_experimental_functions: | default = false] ``` ### `query_frontend_config` diff --git a/integration/query_fuzz_test.go b/integration/query_fuzz_test.go index fb97189250..3f7e47c3ae 100644 --- a/integration/query_fuzz_test.go +++ b/integration/query_fuzz_test.go @@ -52,6 +52,123 @@ func init() { } } +func TestExperimentalPromQLFuncsWithPrometheus(t *testing.T) { + prometheusLatestImage := "quay.io/prometheus/prometheus:v2.55.1" + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // Start dependencies. + consul := e2edb.NewConsulWithName("consul") + require.NoError(t, s.StartAndWaitReady(consul)) + + baseFlags := mergeFlags(AlertmanagerLocalFlags(), BlocksStorageFlags()) + flags := mergeFlags( + baseFlags, + map[string]string{ + "-blocks-storage.tsdb.head-compaction-interval": "4m", + "-blocks-storage.tsdb.block-ranges-period": "2h", + "-blocks-storage.tsdb.ship-interval": "1h", + "-blocks-storage.bucket-store.sync-interval": "1s", + "-blocks-storage.tsdb.retention-period": "24h", + "-blocks-storage.bucket-store.index-cache.backend": tsdb.IndexCacheBackendInMemory, + "-querier.query-store-for-labels-enabled": "true", + // Ingester. + "-ring.store": "consul", + "-consul.hostname": consul.NetworkHTTPEndpoint(), + // Distributor. + "-distributor.replication-factor": "1", + // Store-gateway. + "-store-gateway.sharding-enabled": "false", + // alert manager + "-alertmanager.web.external-url": "http://localhost/alertmanager", + "-frontend.query-vertical-shard-size": "1", + "-frontend.max-cache-freshness": "1m", + // enable experimental promQL funcs + "-querier.enable-promql-experimental-functions": "true", + }, + ) + // make alert manager config dir + require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs", []byte{})) + + minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) + require.NoError(t, s.StartAndWaitReady(minio)) + + cortex := e2ecortex.NewSingleBinary("cortex", flags, "") + require.NoError(t, s.StartAndWaitReady(cortex)) + + // Wait until Cortex replicas have updated the ring state. + require.NoError(t, cortex.WaitSumMetrics(e2e.Equals(float64(512)), "cortex_ring_tokens_total")) + + now := time.Now() + start := now.Add(-time.Hour * 2) + end := now.Add(-time.Hour) + numSeries := 10 + numSamples := 60 + lbls := make([]labels.Labels, 0, numSeries*2) + scrapeInterval := time.Minute + statusCodes := []string{"200", "400", "404", "500", "502"} + for i := 0; i < numSeries; i++ { + lbls = append(lbls, labels.Labels{ + {Name: labels.MetricName, Value: "test_series_a"}, + {Name: "job", Value: "test"}, + {Name: "series", Value: strconv.Itoa(i % 3)}, + {Name: "status_code", Value: statusCodes[i%5]}, + }) + + lbls = append(lbls, labels.Labels{ + {Name: labels.MetricName, Value: "test_series_b"}, + {Name: "job", Value: "test"}, + {Name: "series", Value: strconv.Itoa((i + 1) % 3)}, + {Name: "status_code", Value: statusCodes[(i+1)%5]}, + }) + } + + ctx := context.Background() + rnd := rand.New(rand.NewSource(time.Now().Unix())) + + dir := filepath.Join(s.SharedDir(), "data") + err = os.MkdirAll(dir, os.ModePerm) + require.NoError(t, err) + storage, err := e2ecortex.NewS3ClientForMinio(minio, flags["-blocks-storage.s3.bucket-name"]) + require.NoError(t, err) + bkt := bucket.NewUserBucketClient("user-1", storage.GetBucket(), nil) + id, err := e2e.CreateBlock(ctx, rnd, dir, lbls, numSamples, start.UnixMilli(), end.UnixMilli(), scrapeInterval.Milliseconds(), 10) + require.NoError(t, err) + err = block.Upload(ctx, log.Logger, bkt, filepath.Join(dir, id.String()), metadata.NoneFunc) + require.NoError(t, err) + + // Wait for querier and store to sync blocks. + require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Equals(float64(1)), []string{"cortex_blocks_meta_synced"}, e2e.WaitMissingMetrics, e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "component", "store-gateway")))) + require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Equals(float64(1)), []string{"cortex_blocks_meta_synced"}, e2e.WaitMissingMetrics, e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "component", "querier")))) + require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Equals(float64(1)), []string{"cortex_bucket_store_blocks_loaded"}, e2e.WaitMissingMetrics)) + + c1, err := e2ecortex.NewClient("", cortex.HTTPEndpoint(), "", "", "user-1") + require.NoError(t, err) + + err = writeFileToSharedDir(s, "prometheus.yml", []byte("")) + require.NoError(t, err) + prom := e2edb.NewPrometheus(prometheusLatestImage, map[string]string{ + "--enable-feature": "promql-experimental-functions", + }) + require.NoError(t, s.StartAndWaitReady(prom)) + + c2, err := e2ecortex.NewPromQueryClient(prom.HTTPEndpoint()) + require.NoError(t, err) + + waitUntilReady(t, ctx, c1, c2, `{job="test"}`, start, end) + + opts := []promqlsmith.Option{ + promqlsmith.WithEnableOffset(true), + promqlsmith.WithEnableAtModifier(true), + promqlsmith.WithEnabledFunctions(enabledFunctions), + promqlsmith.WithEnableExperimentalPromQLFunctions(true), + } + ps := promqlsmith.New(rnd, lbls, opts...) + + runQueryFuzzTestCases(t, ps, c1, c2, end, start, end, scrapeInterval, 1000) +} + func TestDisableChunkTrimmingFuzz(t *testing.T) { s, err := e2e.NewScenario(networkName) require.NoError(t, err) @@ -1410,6 +1527,15 @@ func runQueryFuzzTestCases(t *testing.T, ps *promqlsmith.PromQLSmith, c1, c2 *e2 func isValidQuery(generatedQuery parser.Expr, maxDepth int) bool { isValid := true currentDepth := 0 + // TODO(SungJin1212): Test limitk, limit_ratio + if strings.Contains(generatedQuery.String(), "limitk") { + // current skip the limitk + return false + } + if strings.Contains(generatedQuery.String(), "limit_ratio") { + // current skip the limit_ratio + return false + } parser.Inspect(generatedQuery, func(node parser.Node, path []parser.Node) error { if currentDepth > maxDepth { isValid = false diff --git a/pkg/cortex/modules.go b/pkg/cortex/modules.go index 00f0b10a20..222d321168 100644 --- a/pkg/cortex/modules.go +++ b/pkg/cortex/modules.go @@ -508,6 +508,7 @@ func (t *Cortex) initQueryFrontendTripperware() (serv services.Service, err erro t.Cfg.Querier.DefaultEvaluationInterval, t.Cfg.Querier.MaxSubQuerySteps, t.Cfg.Querier.LookbackDelta, + t.Cfg.Querier.EnablePromQLExperimentalFunctions, ) return services.NewIdleService(nil, func(_ error) error { diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 34d1f8b19e..784aef3bee 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -16,6 +16,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" + "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/tsdb/chunkenc" "github.com/prometheus/prometheus/util/annotations" @@ -89,7 +90,8 @@ type Config struct { ThanosEngine bool `yaml:"thanos_engine"` // Ignore max query length check at Querier. - IgnoreMaxQueryLength bool `yaml:"ignore_max_query_length"` + IgnoreMaxQueryLength bool `yaml:"ignore_max_query_length"` + EnablePromQLExperimentalFunctions bool `yaml:"enable_promql_experimental_functions"` } var ( @@ -132,6 +134,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.ThanosEngine, "querier.thanos-engine", false, "Experimental. Use Thanos promql engine https://github.com/thanos-io/promql-engine rather than the Prometheus promql engine.") f.Int64Var(&cfg.MaxSubQuerySteps, "querier.max-subquery-steps", 0, "Max number of steps allowed for every subquery expression in query. Number of steps is calculated using subquery range / step. A value > 0 enables it.") f.BoolVar(&cfg.IgnoreMaxQueryLength, "querier.ignore-max-query-length", false, "If enabled, ignore max query length check at Querier select method. Users can choose to ignore it since the validation can be done before Querier evaluation like at Query Frontend or Ruler.") + f.BoolVar(&cfg.EnablePromQLExperimentalFunctions, "querier.enable-promql-experimental-functions", false, "[Experimental] If true, experimental promQL functions are enabled.") } // Validate the config @@ -204,6 +207,9 @@ func New(cfg Config, limits *validation.Overrides, distributor Distributor, stor }) maxConcurrentMetric.Set(float64(cfg.MaxConcurrent)) + // set EnableExperimentalFunctions + parser.EnableExperimentalFunctions = cfg.EnablePromQLExperimentalFunctions + var queryEngine promql.QueryEngine opts := promql.EngineOpts{ Logger: logger, diff --git a/pkg/querier/tripperware/merge.go b/pkg/querier/tripperware/merge.go index e1d376ac6d..a85725c541 100644 --- a/pkg/querier/tripperware/merge.go +++ b/pkg/querier/tripperware/merge.go @@ -288,9 +288,9 @@ func sortPlanForQuery(q string) (sortPlan, error) { if err != nil { return 0, err } - // Check if the root expression is topk or bottomk + // Check if the root expression is topk, bottomk, limitk, or limit_ratio if aggr, ok := expr.(*promqlparser.AggregateExpr); ok { - if aggr.Op == promqlparser.TOPK || aggr.Op == promqlparser.BOTTOMK { + if aggr.Op == promqlparser.TOPK || aggr.Op == promqlparser.BOTTOMK || aggr.Op == promqlparser.LIMITK || aggr.Op == promqlparser.LIMIT_RATIO { return mergeOnly, nil } } @@ -303,6 +303,12 @@ func sortPlanForQuery(q string) (sortPlan, error) { if n.Func.Name == "sort_desc" { sortDesc = true } + if n.Func.Name == "sort_by_label" { + sortAsc = true + } + if n.Func.Name == "sort_by_label_desc" { + sortDesc = true + } } } return sortAsc, sortDesc diff --git a/pkg/querier/tripperware/merge_test.go b/pkg/querier/tripperware/merge_test.go index 6a51ca7072..7ee5b0cbbd 100644 --- a/pkg/querier/tripperware/merge_test.go +++ b/pkg/querier/tripperware/merge_test.go @@ -4,6 +4,7 @@ import ( "testing" "github.com/prometheus/prometheus/model/labels" + promqlparser "github.com/prometheus/prometheus/promql/parser" "github.com/stretchr/testify/assert" "github.com/cortexproject/cortex/pkg/cortexpb" @@ -618,6 +619,16 @@ func Test_sortPlanForQuery(t *testing.T) { expectedPlan: mergeOnly, err: false, }, + { + query: "limitk(10, up)", + expectedPlan: mergeOnly, + err: false, + }, + { + query: "limit_ratio(0.1, up)", + expectedPlan: mergeOnly, + err: false, + }, { query: "1 + topk(10, up)", expectedPlan: sortByLabels, @@ -633,6 +644,16 @@ func Test_sortPlanForQuery(t *testing.T) { expectedPlan: sortByValuesAsc, err: false, }, + { + query: "1 + sort_by_label_desc(sum by (job) (up) )", + expectedPlan: sortByValuesDesc, + err: false, + }, + { + query: "sort_by_label(topk by (job) (10, up))", + expectedPlan: sortByValuesAsc, + err: false, + }, { query: "topk(5, up) by (job) + sort_desc(up)", expectedPlan: sortByValuesDesc, @@ -652,6 +673,7 @@ func Test_sortPlanForQuery(t *testing.T) { for _, tc := range tc { t.Run(tc.query, func(t *testing.T) { + promqlparser.EnableExperimentalFunctions = true p, err := sortPlanForQuery(tc.query) if tc.err { assert.Error(t, err) diff --git a/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go b/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go index 8ba197865f..5acb18d87e 100644 --- a/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go +++ b/pkg/querier/tripperware/queryrange/query_range_middlewares_test.go @@ -80,6 +80,7 @@ func TestRoundTrip(t *testing.T) { time.Minute, 0, 0, + false, ) for i, tc := range []struct { diff --git a/pkg/querier/tripperware/roundtrip.go b/pkg/querier/tripperware/roundtrip.go index 4edcd51cc9..69b46dd66b 100644 --- a/pkg/querier/tripperware/roundtrip.go +++ b/pkg/querier/tripperware/roundtrip.go @@ -27,6 +27,7 @@ import ( "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/prometheus/promql/parser" "github.com/thanos-io/thanos/pkg/querysharding" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/user" @@ -104,7 +105,12 @@ func NewQueryTripperware( defaultSubQueryInterval time.Duration, maxSubQuerySteps int64, lookbackDelta time.Duration, + enablePromQLExperimentalFunctions bool, ) Tripperware { + + // set EnableExperimentalFunctions + parser.EnableExperimentalFunctions = enablePromQLExperimentalFunctions + // Per tenant query metrics. queriesPerTenant := promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{ Name: "cortex_query_frontend_queries_total", diff --git a/pkg/querier/tripperware/roundtrip_test.go b/pkg/querier/tripperware/roundtrip_test.go index 7e6cf474a0..263c9a9b0e 100644 --- a/pkg/querier/tripperware/roundtrip_test.go +++ b/pkg/querier/tripperware/roundtrip_test.go @@ -221,6 +221,7 @@ func TestRoundTrip(t *testing.T) { time.Minute, tc.maxSubQuerySteps, 0, + false, ) resp, err := tw(downstream).RoundTrip(req) if tc.expectedErr == nil {