Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initialize request version #1579

Merged
merged 1 commit into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 13 additions & 35 deletions internal/api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -853,25 +853,6 @@ func (h *requestHandler) doSelect(ctx context.Context, meta util.QueryMetaInto,
return err
}

func (r *requestHandler) effectiveVersion(version string) string {
switch version {
case "", Version2:
return r.version()
default:
return version
}
}

func (r *requestHandler) version() string {
if r.forceVersion != "" {
return r.forceVersion
}
if r.versionDice != nil {
return r.versionDice()
}
return Version3
}

func (h *Handler) getMetricNameWithNamespace(metricID int32) (string, error) {
if metricID == format.TagValueIDUnspecified {
return format.CodeTagValue(format.TagValueIDUnspecified), nil
Expand Down Expand Up @@ -1799,7 +1780,7 @@ func (h *requestHandler) handleGetMetricTagValues(ctx context.Context, req getMe
return nil, false, err
}

version := h.version()
version := h.version
err = validateQuery(metricMeta, version)
if err != nil {
return nil, false, err
Expand Down Expand Up @@ -2114,9 +2095,8 @@ func (h *requestHandler) queryBadges(ctx context.Context, req seriesRequest, met
user: h.endpointStat.user,
priority: h.endpointStat.priority,
},
debug: h.debug,
forceVersion: h.forceVersion,
versionDice: h.versionDice,
debug: h.debug,
version: h.version,
query: promql.Query{
Start: req.from.Unix(),
End: req.to.Unix(),
Expand Down Expand Up @@ -3384,18 +3364,16 @@ func pprofAccessAllowed(h *httpRequestHandler) bool {

func (h *requestHandler) init(accessToken, version string) (err error) {
switch version {
case Version1, Version3:
h.requestVersion = version
h.forceVersion = version
case Version2, "":
h.requestVersion = Version2
h.versionDice = sync.OnceValue(func() string {
if rand.Float64() < h.Handler.Version3Prob.Load() {
return Version3
} else {
return Version2
}
})
case Version1:
h.version = Version1
case Version2:
if rand.Float64() < h.Handler.Version3Prob.Load() {
h.version = Version3
} else {
h.version = Version2
}
case Version3, "":
h.version = Version3
default:
return fmt.Errorf("invalid version: %q", version)
}
Expand Down
14 changes: 6 additions & 8 deletions internal/api/http_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,12 @@ type httpRoute struct {

type requestHandler struct {
*Handler
accessInfo accessInfo
endpointStat endpointStat
trace []string
debug bool
requestVersion string
forceVersion string
versionDice func() string
query promql.Query
accessInfo accessInfo
endpointStat endpointStat
trace []string
debug bool
version string
query promql.Query
}

type httpRequestHandler struct {
Expand Down
10 changes: 5 additions & 5 deletions internal/api/promql.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func HandleInstantQuery(r *httpRequestHandler) {
q := promql.Query{
Expr: r.FormValue("query"),
Options: promql.Options{
Version: r.version(),
Version: r.version,
Mode: data_model.InstantQuery,
Compat: true,
TimeNow: time.Now().Unix(),
Expand Down Expand Up @@ -73,7 +73,7 @@ func HandleRangeQuery(r *httpRequestHandler) {
q := promql.Query{
Expr: r.FormValue("query"),
Options: promql.Options{
Version: r.version(),
Version: r.version,
Compat: true,
Namespace: r.Header.Get("X-StatsHouse-Namespace"),
},
Expand Down Expand Up @@ -140,7 +140,7 @@ func HandlePromSeriesQuery(r *httpRequestHandler) {
End: end,
Expr: expr,
Options: promql.Options{
Version: r.version(),
Version: r.version,
Version3Start: r.Version3Start.Load(),
Limit: 1000,
Mode: data_model.TagsQuery,
Expand Down Expand Up @@ -215,7 +215,7 @@ func HandlePromLabelValuesQuery(r *httpRequestHandler) {
End: end,
Expr: expr,
Options: promql.Options{
Version: r.version(),
Version: r.version,
Version3Start: r.Version3Start.Load(),
Limit: 1000,
Mode: data_model.TagsQuery,
Expand Down Expand Up @@ -459,7 +459,7 @@ func (h *requestHandler) QuerySeries(ctx context.Context, qry *promql.SeriesQuer
var tx int // time index
for _, lod := range lods {
pq := pointsQuery{
version: h.requestVersion,
version: h.version,
user: h.accessInfo.user,
metricID: qry.Metric.MetricID,
preKeyTagX: format.TagIndex(qry.Metric.PreKeyTagID),
Expand Down
2 changes: 1 addition & 1 deletion internal/api/request_parsers.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ func (r *httpRequestHandler) parseSeriesRequestS(maxTabs int) (res []seriesReque
// parse dependent paramemeters
var (
finalize = func(t *seriesRequestEx) error {
t.version = r.effectiveVersion(t.version)
t.version = r.version
numResultsMax := maxSeries
if len(t.shifts) != 0 {
numResultsMax /= len(t.shifts)
Expand Down
5 changes: 4 additions & 1 deletion internal/api/rpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ func (h *rpcRequestHandler) rawGetQueryPoint(ctx context.Context, hctx *rpc.Hand
err = fmt.Errorf("failed to deserialize statshouseApi.GetQueryPoint request: %w", err)
return err
}
if err = h.init(args.AccessToken, args.Query.Version); err != nil {
return err
}
qry := seriesRequestRPC{
filter: args.Query.Filter,
function: args.Query.Function,
Expand Down Expand Up @@ -349,7 +352,7 @@ func (h *rpcRequestHandler) rawReleaseChunks(ctx context.Context, hctx *rpc.Hand

func (qry *seriesRequestRPC) toSeriesRequest(h *rpcRequestHandler) (seriesRequest, error) {
req := seriesRequest{
version: h.version(),
version: h.version,
numResults: int(qry.topN),
metricName: qry.metricName,
from: time.Unix(qry.timeFrom, 0),
Expand Down
2 changes: 1 addition & 1 deletion internal/api/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (h *requestHandler) getTableFromLODs(ctx context.Context, lods []data_model
continue
}
pq := pointsQuery{
version: h.requestVersion,
version: h.version,
user: tableReqParams.user,
metricID: metricMeta.MetricID,
preKeyTagX: format.TagIndex(metricMeta.PreKeyTagID),
Expand Down
1 change: 1 addition & 0 deletions internal/api/tscache.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ func (g *tsCacheGroup) changeMaxSize(newSize int) {

func (g *tsCacheGroup) Invalidate(lodLevel int64, times []int64) {
g.pointCaches[Version2][lodLevel].invalidate(times)
g.pointCaches[Version3][lodLevel].invalidate(times)
}

func (g *tsCacheGroup) Get(ctx context.Context, h *requestHandler, pq *pointsQuery, lod data_model.LOD, avoidCache bool) ([][]tsSelectRow, error) {
Expand Down