From d40bc5e5bf06113a73771ef8efa325996d44e5f9 Mon Sep 17 00:00:00 2001 From: gongna <2036479155@qq.com> Date: Thu, 5 Oct 2023 12:46:48 +0800 Subject: [PATCH 01/12] Implement atomic operations for various metrics using Prometheus as the backend storage - Implemented atomic operations for metrics like read/write counts, session counts, etc. - Integrated Prometheus as the backend storage for these metrics. - Note: Metrics are not yet instrumented for telemetry. --- pkg/config/model.go | 17 +- pkg/metrics/manager/manager.go | 395 +++++++++++++++++ pkg/metrics/manager/manager_test.go | 42 ++ pkg/metrics/stats/counter.go | 155 +++++++ pkg/metrics/stats/counter_test.go | 74 ++++ pkg/metrics/stats/counters.go | 414 ++++++++++++++++++ pkg/metrics/stats/duration.go | 136 ++++++ pkg/metrics/stats/export.go | 315 +++++++++++++ pkg/metrics/stats/export_test.go | 171 ++++++++ pkg/metrics/stats/histogram.go | 172 ++++++++ pkg/metrics/stats/histogram_test.go | 90 ++++ pkg/metrics/stats/kebab_case_converter.go | 64 +++ .../stats/kebab_case_converter_test.go | 53 +++ pkg/metrics/stats/prometheus/collectors.go | 365 +++++++++++++++ .../stats/prometheus/prometheus_backend.go | 122 ++++++ .../prometheus/prometheus_backend_test.go | 351 +++++++++++++++ pkg/metrics/stats/snake_case_converter.go | 65 +++ pkg/metrics/stats/timings.go | 246 +++++++++++ pkg/metrics/stats/timings_test.go | 80 ++++ pkg/metrics/stats/util.go | 26 ++ pkg/metrics/stats/variable_interface.go | 30 ++ pkg/util/sync2/atomic.go | 187 ++++++++ pkg/util/sync2/atomic_test.go | 60 +++ 23 files changed, 3624 insertions(+), 6 deletions(-) create mode 100644 pkg/metrics/manager/manager.go create mode 100644 pkg/metrics/manager/manager_test.go create mode 100644 pkg/metrics/stats/counter.go create mode 100644 pkg/metrics/stats/counter_test.go create mode 100644 pkg/metrics/stats/counters.go create mode 100644 pkg/metrics/stats/duration.go create mode 100644 pkg/metrics/stats/export.go create mode 100644 pkg/metrics/stats/export_test.go create mode 100644 pkg/metrics/stats/histogram.go create mode 100644 pkg/metrics/stats/histogram_test.go create mode 100644 pkg/metrics/stats/kebab_case_converter.go create mode 100644 pkg/metrics/stats/kebab_case_converter_test.go create mode 100644 pkg/metrics/stats/prometheus/collectors.go create mode 100644 pkg/metrics/stats/prometheus/prometheus_backend.go create mode 100644 pkg/metrics/stats/prometheus/prometheus_backend_test.go create mode 100644 pkg/metrics/stats/snake_case_converter.go create mode 100644 pkg/metrics/stats/timings.go create mode 100644 pkg/metrics/stats/timings_test.go create mode 100644 pkg/metrics/stats/util.go create mode 100644 pkg/metrics/stats/variable_interface.go create mode 100644 pkg/util/sync2/atomic.go create mode 100644 pkg/util/sync2/atomic_test.go diff --git a/pkg/config/model.go b/pkg/config/model.go index d52c81f5..265741a2 100644 --- a/pkg/config/model.go +++ b/pkg/config/model.go @@ -82,14 +82,19 @@ type ( BootOptions struct { Spec `yaml:",inline"` - Config *Options `yaml:"config" json:"config"` - Listeners []*Listener `validate:"required,dive" yaml:"listeners" json:"listeners"` - Registry *Registry `yaml:"registry" json:"registry"` - Trace *Trace `yaml:"trace" json:"trace"` - Supervisor *User `validate:"required,dive" yaml:"supervisor" json:"supervisor"` - Logging *log.Config `validate:"required,dive" yaml:"logging" json:"logging"` + Config *Options `yaml:"config" json:"config"` + Listeners []*Listener `validate:"required,dive" yaml:"listeners" json:"listeners"` + Registry *Registry `yaml:"registry" json:"registry"` + Trace *Trace `yaml:"trace" json:"trace"` + Supervisor *User `validate:"required,dive" yaml:"supervisor" json:"supervisor"` + Logging *log.Config `validate:"required,dive" yaml:"logging" json:"logging"` + Stats *StatsConfig `validate:"required,dive" yaml:"stats" json:"stats"` } + StatsConfig struct { + Service string `yaml:"service" json:"service"` + StatsEnabled bool `yaml:"stats_enabled" json:"stats_enabled"` + } // Configuration represents an Arana configuration. Configuration struct { Spec `yaml:",inline"` diff --git a/pkg/metrics/manager/manager.go b/pkg/metrics/manager/manager.go new file mode 100644 index 00000000..395144a3 --- /dev/null +++ b/pkg/metrics/manager/manager.go @@ -0,0 +1,395 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package manager + +import ( + "fmt" + "net/http" + "sort" + "strconv" + "strings" + "sync" + "time" +) + +import ( + "go.uber.org/atomic" +) + +import ( + "github.com/arana-db/arana/pkg/config" + "github.com/arana-db/arana/pkg/metrics/stats" + "github.com/arana-db/arana/pkg/metrics/stats/prometheus" + "github.com/arana-db/arana/pkg/util/log" +) + +const ( + SQLExecTimeSize = 5000 +) + +const ( + statsLabelCluster = "Cluster" + statsLabelOperation = "Operation" + statsLabelNamespace = "Namespace" + statsLabelFingerprint = "Fingerprint" + statsLabelFlowDirection = "FlowDirection" + statsLabelSlice = "Slice" + statsLabelIPAddr = "IPAddr" + statsLabelRole = "role" +) + +// SQLResponse record one namespace SQL response like P99/P95 +type SQLResponse struct { + ns string + sqlExecTimeRecordSwitch bool + sQLExecTimeChan chan string + sQLTimeList []float64 + response99Max map[string]float64 // map[backendAddr]P99MaxValue + response99Avg map[string]float64 // map[backendAddr]P99AvgValue + response95Max map[string]float64 // map[backendAddr]P95MaxValue + response95Avg map[string]float64 // map[backendAddr]P95AvgValue +} + +func NewSQLResponse(name string) *SQLResponse { + return &SQLResponse{ + ns: name, + sqlExecTimeRecordSwitch: false, + sQLExecTimeChan: make(chan string, SQLExecTimeSize), + sQLTimeList: []float64{}, + response99Max: make(map[string]float64), + response99Avg: make(map[string]float64), + response95Max: make(map[string]float64), + response95Avg: make(map[string]float64), + } + +} + +// StatisticManager statistics manager +type StatisticManager struct { + bootstrapPath string + options *config.BootOptions + + clusterName string + startTime int64 + + statsType string // 监控后端类型 + handlers map[string]http.Handler + generalLogger log.Logger + + sqlTimings *stats.MultiTimings // SQL耗时统计 + sqlErrorCounts *stats.CountersWithMultiLabels // SQL错误数统计 + + flowCounts *stats.CountersWithMultiLabels // 业务流量统计 + sessionCounts *stats.GaugesWithMultiLabels // 前端会话数统计 + clientConnecions sync.Map // 等同于sessionCounts, 用于限制前端连接 + + backendSQLTimings *stats.MultiTimings // 后端SQL耗时统计 + backendSQLFingerprintSlowCounts *stats.CountersWithMultiLabels // 后端慢SQL指纹数量统计 + backendSQLErrorCounts *stats.CountersWithMultiLabels // 后端SQL错误数统计 + backendConnectPoolIdleCounts *stats.GaugesWithMultiLabels //后端空闲连接数统计 + backendConnectPoolInUseCounts *stats.GaugesWithMultiLabels //后端正在使用连接数统计 + backendConnectPoolWaitCounts *stats.GaugesWithMultiLabels //后端等待队列统计 + backendInstanceCounts *stats.GaugesWithMultiLabels //后端实例状态统计 + uptimeCounts *stats.GaugesWithMultiLabels // 启动时间记录 + backendSQLResponse99MaxCounts *stats.GaugesWithMultiLabels // 后端 SQL 耗时 P99 最大响应时间 + backendSQLResponse99AvgCounts *stats.GaugesWithMultiLabels // 后端 SQL 耗时 P99 平均响应时间 + backendSQLResponse95MaxCounts *stats.GaugesWithMultiLabels // 后端 SQL 耗时 P95 最大响应时间 + backendSQLResponse95AvgCounts *stats.GaugesWithMultiLabels // 后端 SQL 耗时 P95 平均响应时间 + + SQLResponsePercentile map[string]*SQLResponse // 用于记录 P99/P95 Max/AVG 响应时间 + slowSQLTime int64 + closeChan chan bool +} + +// Init init StatisticManager +func (s *StatisticManager) Init() error { + s.startTime = time.Now().Unix() + s.closeChan = make(chan bool, 0) + s.handlers = make(map[string]http.Handler) + bootOptions, err := config.LoadBootOptions(s.bootstrapPath) + if err != nil { + return err + } + if bootOptions.Stats == nil { + return fmt.Errorf("bootOptions.Stats is nil") + } + if err := s.initBackend(bootOptions.Stats); err != nil { + return err + } + + s.sqlTimings = stats.NewMultiTimings("SqlTimings", + "arana proxy sql sqlTimings", []string{statsLabelCluster, statsLabelNamespace, statsLabelOperation}) + s.sqlErrorCounts = stats.NewCountersWithMultiLabels("SqlErrorCounts", + "arana proxy sql error counts per error type", []string{statsLabelCluster, statsLabelNamespace, statsLabelOperation}) + + s.flowCounts = stats.NewCountersWithMultiLabels("FlowCounts", + "arana proxy flow counts", []string{statsLabelCluster, statsLabelNamespace, statsLabelFlowDirection}) + s.sessionCounts = stats.NewGaugesWithMultiLabels("SessionCounts", + "arana proxy session counts", []string{statsLabelCluster, statsLabelNamespace}) + s.backendSQLTimings = stats.NewMultiTimings("BackendSqlTimings", + "arana proxy backend sql sqlTimings", []string{statsLabelCluster, statsLabelNamespace, statsLabelOperation}) + s.backendSQLFingerprintSlowCounts = stats.NewCountersWithMultiLabels("BackendSqlFingerprintSlowCounts", + "arana proxy backend sql fingerprint slow counts", []string{statsLabelCluster, statsLabelNamespace, statsLabelFingerprint}) + s.backendSQLErrorCounts = stats.NewCountersWithMultiLabels("BackendSqlErrorCounts", + "arana proxy backend sql error counts per error type", []string{statsLabelCluster, statsLabelNamespace, statsLabelOperation}) + s.backendConnectPoolIdleCounts = stats.NewGaugesWithMultiLabels("backendConnectPoolIdleCounts", + "arana proxy backend idle connect counts", []string{statsLabelCluster, statsLabelNamespace, statsLabelSlice, statsLabelIPAddr}) + s.backendConnectPoolInUseCounts = stats.NewGaugesWithMultiLabels("backendConnectPoolInUseCounts", + "arana proxy backend in-use connect counts", []string{statsLabelCluster, statsLabelNamespace, statsLabelSlice, statsLabelIPAddr}) + s.backendConnectPoolWaitCounts = stats.NewGaugesWithMultiLabels("backendConnectPoolWaitCounts", + "arana proxy backend wait connect counts", []string{statsLabelCluster, statsLabelNamespace, statsLabelSlice, statsLabelIPAddr}) + s.backendInstanceCounts = stats.NewGaugesWithMultiLabels("backendInstanceCounts", + "arana proxy backend DB status counts", []string{statsLabelCluster, statsLabelNamespace, statsLabelSlice, statsLabelIPAddr, statsLabelRole}) + s.backendSQLResponse99MaxCounts = stats.NewGaugesWithMultiLabels("backendSQLResponse99MaxCounts", + "arana proxy backend sql sqlTimings P99 max", []string{statsLabelCluster, statsLabelNamespace, statsLabelIPAddr}) + s.backendSQLResponse99AvgCounts = stats.NewGaugesWithMultiLabels("backendSQLResponse99AvgCounts", + "arana proxy backend sql sqlTimings P99 avg", []string{statsLabelCluster, statsLabelNamespace, statsLabelIPAddr}) + s.backendSQLResponse95MaxCounts = stats.NewGaugesWithMultiLabels("backendSQLResponse95MaxCounts", + "arana proxy backend sql sqlTimings P95 max", []string{statsLabelCluster, statsLabelNamespace, statsLabelIPAddr}) + s.backendSQLResponse95AvgCounts = stats.NewGaugesWithMultiLabels("backendSQLResponse95AvgCounts", + "arana proxy backend sql sqlTimings P95 avg", []string{statsLabelCluster, statsLabelNamespace, statsLabelIPAddr}) + s.uptimeCounts = stats.NewGaugesWithMultiLabels("UptimeCounts", + "arana proxy uptime counts", []string{statsLabelCluster}) + s.clientConnecions = sync.Map{} + s.startClearTask() + return nil +} + +// Close close proxy stats +func (s *StatisticManager) Close() { + close(s.closeChan) +} + +// GetHandlers return specific handler of stats +func (s *StatisticManager) GetHandlers() map[string]http.Handler { + return s.handlers +} + +func (s *StatisticManager) initBackend(cfg *config.StatsConfig) error { + prometheus.Init(cfg.Service) + s.handlers = prometheus.GetHandlers() + return nil +} + +// clear data to prevent +func (s *StatisticManager) startClearTask() { + go func() { + t := time.NewTicker(time.Hour) + for { + select { + case <-s.closeChan: + return + case <-t.C: + s.clearLargeCounters() + } + } + }() +} + +func (s *StatisticManager) clearLargeCounters() { + s.sqlErrorCounts.ResetAll() + + s.backendSQLErrorCounts.ResetAll() + s.backendSQLFingerprintSlowCounts.ResetAll() +} + +func (s *StatisticManager) recordSessionErrorSQLFingerprint(namespace string, operation string, md5 string) { + operationStatsKey := []string{s.clusterName, namespace, operation} + s.sqlErrorCounts.Add(operationStatsKey, 1) +} + +func (s *StatisticManager) recordSessionSQLTiming(namespace string, operation string, startTime time.Time) { + operationStatsKey := []string{s.clusterName, namespace, operation} + s.sqlTimings.Record(operationStatsKey, startTime) +} + +// millisecond duration +func (s *StatisticManager) isBackendSlowSQL(startTime time.Time) bool { + duration := time.Since(startTime).Nanoseconds() / int64(time.Millisecond) + return duration > s.slowSQLTime || s.slowSQLTime == 0 +} + +func (s *StatisticManager) recordBackendSlowSQLFingerprint(namespace string, md5 string) { + fingerprintStatsKey := []string{s.clusterName, namespace, md5} + s.backendSQLFingerprintSlowCounts.Add(fingerprintStatsKey, 1) +} + +func (s *StatisticManager) recordBackendErrorSQLFingerprint(namespace string, operation string, md5 string) { + operationStatsKey := []string{s.clusterName, namespace, operation} + s.backendSQLErrorCounts.Add(operationStatsKey, 1) +} + +func (s *StatisticManager) recordBackendSQLTiming(namespace string, operation string, sliceName, backendAddr string, startTime time.Time) { + operationStatsKey := []string{s.clusterName, namespace, operation} + s.backendSQLTimings.Record(operationStatsKey, startTime) + + if !s.SQLResponsePercentile[namespace].sqlExecTimeRecordSwitch { + return + } + execTime := float64(time.Since(startTime).Milliseconds()) + select { + case s.SQLResponsePercentile[namespace].sQLExecTimeChan <- fmt.Sprintf(sliceName + "__" + backendAddr + "__" + fmt.Sprintf("%f", execTime)): + case <-time.After(time.Millisecond): + s.SQLResponsePercentile[namespace].sqlExecTimeRecordSwitch = false + } +} + +// IncrSessionCount incr session count +func (s *StatisticManager) IncrSessionCount(namespace string) { + statsKey := []string{s.clusterName, namespace} + s.sessionCounts.Add(statsKey, 1) +} + +func (s *StatisticManager) IncrConnectionCount(namespace string) { + if value, ok := s.clientConnecions.Load(namespace); !ok { + s.clientConnecions.Store(namespace, atomic.NewInt32(1)) + } else { + lastNum := value.(*atomic.Int32) + lastNum.Inc() + } +} + +// DescSessionCount decr session count +func (s *StatisticManager) DescSessionCount(namespace string) { + statsKey := []string{s.clusterName, namespace} + s.sessionCounts.Add(statsKey, -1) +} + +func (s *StatisticManager) DescConnectionCount(namespace string) { + if value, ok := s.clientConnecions.Load(namespace); !ok { + log.Warn("namespace: '%v' maxClientConnections should in map", namespace) + } else { + lastNum := value.(*atomic.Int32) + lastNum.Dec() + } +} + +// AddReadFlowCount add read flow count +func (s *StatisticManager) AddReadFlowCount(namespace string, byteCount int) { + statsKey := []string{s.clusterName, namespace, "read"} + s.flowCounts.Add(statsKey, int64(byteCount)) +} + +// AddWriteFlowCount add write flow count +func (s *StatisticManager) AddWriteFlowCount(namespace string, byteCount int) { + statsKey := []string{s.clusterName, namespace, "write"} + s.flowCounts.Add(statsKey, int64(byteCount)) +} + +// record idle connect count +func (s *StatisticManager) recordConnectPoolIdleCount(namespace string, slice string, addr string, count int64) { + statsKey := []string{s.clusterName, namespace, slice, addr} + s.backendConnectPoolIdleCounts.Set(statsKey, count) +} + +// record in-use connect count +func (s *StatisticManager) recordConnectPoolInuseCount(namespace string, slice string, addr string, count int64) { + statsKey := []string{s.clusterName, namespace, slice, addr} + s.backendConnectPoolInUseCounts.Set(statsKey, count) +} + +// record wait queue length +func (s *StatisticManager) recordConnectPoolWaitCount(namespace string, slice string, addr string, count int64) { + statsKey := []string{s.clusterName, namespace, slice, addr} + s.backendConnectPoolWaitCounts.Set(statsKey, count) +} + +// record wait queue length +func (s *StatisticManager) recordInstanceCount(namespace string, slice string, addr string, count int64, role string) { + statsKey := []string{s.clusterName, namespace, slice, addr, role} + s.backendInstanceCounts.Set(statsKey, count) +} + +// record wait queue length +func (s *StatisticManager) recordBackendSQLTimingP99Max(namespace, backendAddr string, count int64) { + statsKey := []string{s.clusterName, namespace, backendAddr} + s.backendSQLResponse99MaxCounts.Set(statsKey, count) +} + +func (s *StatisticManager) recordBackendSQLTimingP99Avg(namespace, backendAddr string, count int64) { + statsKey := []string{s.clusterName, namespace, backendAddr} + s.backendSQLResponse99AvgCounts.Set(statsKey, count) +} + +func (s *StatisticManager) recordBackendSQLTimingP95Max(namespace, backendAddr string, count int64) { + statsKey := []string{s.clusterName, namespace, backendAddr} + s.backendSQLResponse95MaxCounts.Set(statsKey, count) +} + +func (s *StatisticManager) recordBackendSQLTimingP95Avg(namespace, backendAddr string, count int64) { + statsKey := []string{s.clusterName, namespace, backendAddr} + s.backendSQLResponse95AvgCounts.Set(statsKey, count) +} + +// AddUptimeCount add uptime count +func (s *StatisticManager) AddUptimeCount(count int64) { + statsKey := []string{s.clusterName} + s.uptimeCounts.Set(statsKey, count) +} + +func (s *StatisticManager) CalcAvgSQLTimes() { + for ns, sQLResponse := range s.SQLResponsePercentile { + sqlTimes := make([]float64, 0) + quit := false + backendAddr := "" + for !quit { + select { + case tmp := <-sQLResponse.sQLExecTimeChan: + if len(sqlTimes) >= SQLExecTimeSize { + quit = true + } + sqlTimeSplit := strings.Split(tmp, "__") + if len(sqlTimeSplit) < 3 { + log.Warnf("sql time format error.get:%s", tmp) + quit = true + } + backendAddr = sqlTimeSplit[1] + etime, _ := strconv.ParseFloat(sqlTimeSplit[2], 64) + sqlTimes = append(sqlTimes, etime) + case <-time.After(time.Millisecond): + quit = true + } + sort.Float64s(sqlTimes) + + sum := 0.0 + p99sum := 0.0 + p95sum := 0.0 + if len(sqlTimes) != 0 { + s.SQLResponsePercentile[ns].response99Max[backendAddr] = sqlTimes[(len(sqlTimes)-1)*99/100] + s.SQLResponsePercentile[ns].response95Max[backendAddr] = sqlTimes[(len(sqlTimes)-1)*95/100] + } + for k, _ := range sqlTimes { + sum += sqlTimes[k] + if k < len(sqlTimes)*95/100 { + p95sum += sqlTimes[k] + } + if k < len(sqlTimes)*99/100 { + p99sum += sqlTimes[k] + } + } + if len(sqlTimes)*99/100 > 0 { + s.SQLResponsePercentile[ns].response99Avg[backendAddr] = p99sum / float64(len(sqlTimes)*99/100) + } + if len(sqlTimes)*95/100 > 0 { + s.SQLResponsePercentile[ns].response95Avg[backendAddr] = p95sum / float64(len(sqlTimes)*95/100) + } + + s.SQLResponsePercentile[ns].sqlExecTimeRecordSwitch = true + } + } +} diff --git a/pkg/metrics/manager/manager_test.go b/pkg/metrics/manager/manager_test.go new file mode 100644 index 00000000..84637563 --- /dev/null +++ b/pkg/metrics/manager/manager_test.go @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package manager + +import ( + "net/http" + "testing" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "github.com/arana-db/arana/pkg/config" +) + +func TestInitBackend(t *testing.T) { + manager := &StatisticManager{} + cfg := &config.StatsConfig{} + + err := manager.initBackend(cfg) + + assert.Nil(t, err) + assert.IsType(t, map[string]http.Handler{}, manager.handlers) + assert.Contains(t, manager.handlers, "/metrics") +} diff --git a/pkg/metrics/stats/counter.go b/pkg/metrics/stats/counter.go new file mode 100644 index 00000000..3bb84565 --- /dev/null +++ b/pkg/metrics/stats/counter.go @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package stats + +import ( + "strconv" +) + +import ( + "github.com/arana-db/arana/pkg/util/log" + "github.com/arana-db/arana/pkg/util/sync2" +) + +// Counter tracks a cumulative count of a metric. +// For a one-dimensional or multi-dimensional counter, please use +// CountersWithSingleLabel or CountersWithMultiLabels instead. +type Counter struct { + i sync2.AtomicInt64 + help string +} + +// NewCounter returns a new Counter. +func NewCounter(name string, help string) *Counter { + v := &Counter{help: help} + if name != "" { + publish(name, v) + } + return v +} + +// Add adds the provided value to the Counter. +func (v *Counter) Add(delta int64) { + if delta < 0 { + log.Warn("[stats] Adding a negative value to a counter, %v should be a gauge instead", v) + } + v.i.Add(delta) +} + +// Reset resets the counter value to 0. +func (v *Counter) Reset() { + v.i.Set(int64(0)) +} + +// Get returns the value. +func (v *Counter) Get() int64 { + return v.i.Get() +} + +// String implements the expvar.Var interface. +func (v *Counter) String() string { + return strconv.FormatInt(v.i.Get(), 10) +} + +// Help returns the help string. +func (v *Counter) Help() string { + return v.help +} + +// CounterFunc allows to provide the counter value via a custom function. +// For implementations that differentiate between Counters/Gauges, +// CounterFunc's values only go up (or are reset to 0). +type CounterFunc struct { + F func() int64 + help string +} + +// NewCounterFunc creates a new CounterFunc instance and publishes it if name is +// set. +func NewCounterFunc(name string, help string, f func() int64) *CounterFunc { + c := &CounterFunc{ + F: f, + help: help, + } + + if name != "" { + publish(name, c) + } + return c +} + +// Help returns the help string. +func (cf CounterFunc) Help() string { + return cf.help +} + +// String implements expvar.Var. +func (cf CounterFunc) String() string { + return strconv.FormatInt(cf.F(), 10) +} + +// Gauge tracks the current value of an integer metric. +// The emphasis here is on *current* i.e. this is not a cumulative counter. +// For a one-dimensional or multi-dimensional gauge, please use +// GaugeWithSingleLabel or GaugesWithMultiLabels instead. +type Gauge struct { + Counter +} + +// NewGauge creates a new Gauge and publishes it if name is set. +func NewGauge(name string, help string) *Gauge { + v := &Gauge{Counter: Counter{help: help}} + + if name != "" { + publish(name, v) + } + return v +} + +// Set overwrites the current value. +func (v *Gauge) Set(value int64) { + v.Counter.i.Set(value) +} + +// Add adds the provided value to the Gauge. +func (v *Gauge) Add(delta int64) { + v.Counter.i.Add(delta) +} + +// GaugeFunc is the same as CounterFunc but meant for gauges. +// It's a wrapper around CounterFunc for values that go up/down for +// implementations (like Prometheus) that need to differ between Counters and +// Gauges. +type GaugeFunc struct { + CounterFunc +} + +// NewGaugeFunc creates a new GaugeFunc instance and publishes it if name is +// set. +func NewGaugeFunc(name string, help string, f func() int64) *GaugeFunc { + i := &GaugeFunc{ + CounterFunc: CounterFunc{ + F: f, + help: help, + }} + + if name != "" { + publish(name, i) + } + return i +} diff --git a/pkg/metrics/stats/counter_test.go b/pkg/metrics/stats/counter_test.go new file mode 100644 index 00000000..2dd65e3e --- /dev/null +++ b/pkg/metrics/stats/counter_test.go @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package stats + +import ( + "expvar" + "testing" +) + +func TestCounter(t *testing.T) { + var gotname string + var gotv *Counter + clear() + Register(func(name string, v expvar.Var) { + gotname = name + gotv = v.(*Counter) + }) + v := NewCounter("Int", "help") + if gotname != "Int" { + t.Errorf("want Int, got %s", gotname) + } + if gotv != v { + t.Errorf("want %#v, got %#v", v, gotv) + } + v.Add(1) + if v.Get() != 1 { + t.Errorf("want 1, got %v", v.Get()) + } + if v.String() != "1" { + t.Errorf("want 1, got %v", v.Get()) + } + v.Reset() + if v.Get() != 0 { + t.Errorf("want 0, got %v", v.Get()) + } +} + +func TestGaugeFunc(t *testing.T) { + var gotname string + var gotv *GaugeFunc + clear() + Register(func(name string, v expvar.Var) { + gotname = name + gotv = v.(*GaugeFunc) + }) + + v := NewGaugeFunc("name", "help", func() int64 { + return 1 + }) + if gotname != "name" { + t.Errorf("want name, got %s", gotname) + } + if gotv != v { + t.Errorf("want %#v, got %#v", v, gotv) + } + if v.String() != "1" { + t.Errorf("want 1, got %v", v.String()) + } +} diff --git a/pkg/metrics/stats/counters.go b/pkg/metrics/stats/counters.go new file mode 100644 index 00000000..eba9403e --- /dev/null +++ b/pkg/metrics/stats/counters.go @@ -0,0 +1,414 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package stats + +import ( + "bytes" + "fmt" + "sync" + "sync/atomic" +) + +import ( + "github.com/arana-db/arana/pkg/util/log" +) + +// counters is similar to expvar.Map, except that it doesn't allow floats. +// It is used to build CountersWithSingleLabel and GaugesWithSingleLabel. +type counters struct { + // mu only protects adding and retrieving the value (*int64) from the + // map. + // The modification to the actual number (int64) must be done with + // atomic funcs. + // If a value for a given name already exists in the map, we only have + // to use a read-lock to retrieve it. This is an important performance + // optimizations because it allows to concurrently increment a counter. + mu sync.RWMutex + counts map[string]*int64 + help string +} + +// String implements the expvar.Var interface. +func (c *counters) String() string { + b := bytes.NewBuffer(make([]byte, 0, 4096)) + + c.mu.RLock() + defer c.mu.RUnlock() + + fmt.Fprintf(b, "{") + firstValue := true + for k, a := range c.counts { + if firstValue { + firstValue = false + } else { + fmt.Fprintf(b, ", ") + } + fmt.Fprintf(b, "%q: %v", k, atomic.LoadInt64(a)) + } + fmt.Fprintf(b, "}") + return b.String() +} + +func (c *counters) getValueAddr(name string) *int64 { + c.mu.RLock() + a, ok := c.counts[name] + c.mu.RUnlock() + + if ok { + return a + } + + c.mu.Lock() + defer c.mu.Unlock() + // we need to check the existence again + // as it may be created by other goroutine. + a, ok = c.counts[name] + if ok { + return a + } + a = new(int64) + c.counts[name] = a + return a +} + +// Add adds a value to a named counter. +func (c *counters) Add(name string, value int64) { + a := c.getValueAddr(name) + atomic.AddInt64(a, value) +} + +// ResetAll resets all counter values and clears all keys. +func (c *counters) ResetAll() { + c.mu.Lock() + defer c.mu.Unlock() + c.counts = make(map[string]*int64) +} + +// ZeroAll resets all counter values to zero +func (c *counters) ZeroAll() { + c.mu.Lock() + defer c.mu.Unlock() + for _, a := range c.counts { + atomic.StoreInt64(a, int64(0)) + } +} + +// Reset resets a specific counter value to 0. +func (c *counters) Reset(name string) { + a := c.getValueAddr(name) + atomic.StoreInt64(a, int64(0)) +} + +// Counts returns a copy of the Counters' map. +func (c *counters) Counts() map[string]int64 { + c.mu.RLock() + defer c.mu.RUnlock() + + counts := make(map[string]int64, len(c.counts)) + for k, a := range c.counts { + counts[k] = atomic.LoadInt64(a) + } + return counts +} + +// Help returns the help string. +func (c *counters) Help() string { + return c.help +} + +// CountersWithSingleLabel tracks multiple counter values for a single +// dimension ("label"). +// It provides a Counts method which can be used for tracking rates. +type CountersWithSingleLabel struct { + counters + label string +} + +// NewCountersWithSingleLabel create a new Counters instance. +// If name is set, the variable gets published. +// The function also accepts an optional list of tags that pre-creates them +// initialized to 0. +// label is a category name used to organize the tags. It is currently only +// used by Prometheus, but not by the expvar package. +func NewCountersWithSingleLabel(name, help, label string, tags ...string) *CountersWithSingleLabel { + c := &CountersWithSingleLabel{ + counters: counters{ + counts: make(map[string]*int64), + help: help, + }, + label: label, + } + + for _, tag := range tags { + c.counts[tag] = new(int64) + } + if name != "" { + publish(name, c) + } + return c +} + +// Label returns the label name. +func (c *CountersWithSingleLabel) Label() string { + return c.label +} + +// Add adds a value to a named counter. +func (c *CountersWithSingleLabel) Add(name string, value int64) { + if value < 0 { + log.Warn("[stats] Adding a negative value to a counter, %v should be a gauge instead", c) + } + a := c.getValueAddr(name) + atomic.AddInt64(a, value) +} + +// CountersWithMultiLabels is a multidimensional counters implementation. +// Internally, each tuple of dimensions ("labels") is stored as a single +// label value where all label values are joined with ".". +type CountersWithMultiLabels struct { + counters + labels []string +} + +// NewCountersWithMultiLabels creates a new CountersWithMultiLabels +// instance, and publishes it if name is set. +func NewCountersWithMultiLabels(name, help string, labels []string) *CountersWithMultiLabels { + t := &CountersWithMultiLabels{ + counters: counters{ + counts: make(map[string]*int64), + help: help}, + labels: labels, + } + if name != "" { + publish(name, t) + } + + return t +} + +// Labels returns the list of labels. +func (mc *CountersWithMultiLabels) Labels() []string { + return mc.labels +} + +// Add adds a value to a named counter. +// len(names) must be equal to len(Labels) +func (mc *CountersWithMultiLabels) Add(names []string, value int64) { + if len(names) != len(mc.labels) { + panic("CountersWithMultiLabels: wrong number of values in Add") + } + if value < 0 { + log.Warn("[stats] Adding a negative value to a counter, %v should be a gauge instead", mc) + } + + mc.counters.Add(safeJoinLabels(names), value) +} + +// Reset resets the value of a named counter back to 0. +// len(names) must be equal to len(Labels). +func (mc *CountersWithMultiLabels) Reset(names []string) { + if len(names) != len(mc.labels) { + panic("CountersWithMultiLabels: wrong number of values in Reset") + } + + mc.counters.Reset(safeJoinLabels(names)) +} + +// Counts returns a copy of the Counters' map. +// The key is a single string where all labels are joined by a "." e.g. +// "label1.label2". +func (mc *CountersWithMultiLabels) Counts() map[string]int64 { + return mc.counters.Counts() +} + +// CountersFuncWithMultiLabels is a multidimensional counters implementation +// where names of categories are compound names made with joining +// multiple strings with '.'. Since the map is returned by the +// function, we assume it's in the right format (meaning each key is +// of the form 'aaa.bbb.ccc' with as many elements as there are in +// Labels). +// +// Note that there is no CountersFuncWithSingleLabel object. That this +// because such an object would be identical to this one because these +// function-based counters have no Add() or Set() method which are different +// for the single vs. multiple labels cases. +// If you have only a single label, pass an array with a single element. +type CountersFuncWithMultiLabels struct { + f func() map[string]int64 + help string + labels []string +} + +// Labels returns the list of labels. +func (c CountersFuncWithMultiLabels) Labels() []string { + return c.labels +} + +// Help returns the help string. +func (c CountersFuncWithMultiLabels) Help() string { + return c.help +} + +// NewCountersFuncWithMultiLabels creates a new CountersFuncWithMultiLabels +// mapping to the provided function. +func NewCountersFuncWithMultiLabels(name, help string, labels []string, f func() map[string]int64) *CountersFuncWithMultiLabels { + t := &CountersFuncWithMultiLabels{ + f: f, + help: help, + labels: labels, + } + if name != "" { + publish(name, t) + } + + return t +} + +// Counts returns a copy of the counters' map. +func (c CountersFuncWithMultiLabels) Counts() map[string]int64 { + return c.f() +} + +// String implements the expvar.Var interface. +func (c CountersFuncWithMultiLabels) String() string { + m := c.f() + if m == nil { + return "{}" + } + b := bytes.NewBuffer(make([]byte, 0, 4096)) + fmt.Fprintf(b, "{") + firstValue := true + for k, v := range m { + if firstValue { + firstValue = false + } else { + fmt.Fprintf(b, ", ") + } + fmt.Fprintf(b, "%q: %v", k, v) + } + fmt.Fprintf(b, "}") + return b.String() +} + +// GaugesWithSingleLabel is similar to CountersWithSingleLabel, except its +// meant to track the current value and not a cumulative count. +type GaugesWithSingleLabel struct { + CountersWithSingleLabel +} + +// NewGaugesWithSingleLabel creates a new GaugesWithSingleLabel and +// publishes it if the name is set. +func NewGaugesWithSingleLabel(name, help, label string, tags ...string) *GaugesWithSingleLabel { + g := &GaugesWithSingleLabel{ + CountersWithSingleLabel: CountersWithSingleLabel{ + counters: counters{ + counts: make(map[string]*int64), + help: help, + }, + label: label, + }, + } + + for _, tag := range tags { + g.counts[tag] = new(int64) + } + if name != "" { + publish(name, g) + } + return g +} + +// Set sets the value of a named gauge. +func (g *GaugesWithSingleLabel) Set(name string, value int64) { + a := g.getValueAddr(name) + atomic.StoreInt64(a, value) +} + +// Add adds a value to a named gauge. +func (g *GaugesWithSingleLabel) Add(name string, value int64) { + a := g.getValueAddr(name) + atomic.AddInt64(a, value) +} + +// GaugesWithMultiLabels is a CountersWithMultiLabels implementation where +// the values can go up and down. +type GaugesWithMultiLabels struct { + CountersWithMultiLabels +} + +// NewGaugesWithMultiLabels creates a new GaugesWithMultiLabels instance, +// and publishes it if name is set. +func NewGaugesWithMultiLabels(name, help string, labels []string) *GaugesWithMultiLabels { + t := &GaugesWithMultiLabels{ + CountersWithMultiLabels: CountersWithMultiLabels{ + counters: counters{ + counts: make(map[string]*int64), + help: help, + }, + labels: labels, + }} + if name != "" { + publish(name, t) + } + + return t +} + +// Set sets the value of a named counter. +// len(names) must be equal to len(Labels). +func (mg *GaugesWithMultiLabels) Set(names []string, value int64) { + if len(names) != len(mg.CountersWithMultiLabels.labels) { + panic("GaugesWithMultiLabels: wrong number of values in Set") + } + a := mg.getValueAddr(safeJoinLabels(names)) + atomic.StoreInt64(a, value) +} + +// Add adds a value to a named gauge. +// len(names) must be equal to len(Labels). +func (mg *GaugesWithMultiLabels) Add(names []string, value int64) { + if len(names) != len(mg.labels) { + panic("CountersWithMultiLabels: wrong number of values in Add") + } + + mg.counters.Add(safeJoinLabels(names), value) +} + +// GaugesFuncWithMultiLabels is a wrapper around CountersFuncWithMultiLabels +// for values that go up/down for implementations (like Prometheus) that +// need to differ between Counters and Gauges. +type GaugesFuncWithMultiLabels struct { + CountersFuncWithMultiLabels +} + +// NewGaugesFuncWithMultiLabels creates a new GaugesFuncWithMultiLabels +// mapping to the provided function. +func NewGaugesFuncWithMultiLabels(name, help string, labels []string, f func() map[string]int64) *GaugesFuncWithMultiLabels { + t := &GaugesFuncWithMultiLabels{ + CountersFuncWithMultiLabels: CountersFuncWithMultiLabels{ + f: f, + help: help, + labels: labels, + }} + + if name != "" { + publish(name, t) + } + + return t +} diff --git a/pkg/metrics/stats/duration.go b/pkg/metrics/stats/duration.go new file mode 100644 index 00000000..dc9c2a58 --- /dev/null +++ b/pkg/metrics/stats/duration.go @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package stats + +import ( + "strconv" + "time" +) + +import ( + "github.com/arana-db/arana/pkg/util/sync2" +) + +// CounterDuration exports a time.Duration as counter. +type CounterDuration struct { + i sync2.AtomicDuration + help string +} + +// NewCounterDuration returns a new CounterDuration. +func NewCounterDuration(name, help string) *CounterDuration { + cd := &CounterDuration{ + help: help, + } + publish(name, cd) + return cd +} + +// Help implements the Variable interface. +func (cd CounterDuration) Help() string { + return cd.help +} + +// String is the implementation of expvar.var. +func (cd CounterDuration) String() string { + return strconv.FormatInt(int64(cd.i.Get()), 10) +} + +// Add adds the provided value to the CounterDuration. +func (cd *CounterDuration) Add(delta time.Duration) { + cd.i.Add(delta) +} + +// Get returns the value. +func (cd *CounterDuration) Get() time.Duration { + return cd.i.Get() +} + +// GaugeDuration exports a time.Duration as gauge. +// In addition to CounterDuration, it also has Set() which allows overriding +// the current value. +type GaugeDuration struct { + CounterDuration +} + +// NewGaugeDuration returns a new GaugeDuration. +func NewGaugeDuration(name, help string) *GaugeDuration { + gd := &GaugeDuration{ + CounterDuration: CounterDuration{ + help: help, + }, + } + publish(name, gd) + return gd +} + +// Set sets the value. +func (gd *GaugeDuration) Set(value time.Duration) { + gd.i.Set(value) +} + +// CounterDurationFunc allows to provide the value via a custom function. +type CounterDurationFunc struct { + F func() time.Duration + help string +} + +// NewCounterDurationFunc creates a new CounterDurationFunc instance and +// publishes it if name is set. +func NewCounterDurationFunc(name string, help string, f func() time.Duration) *CounterDurationFunc { + cf := &CounterDurationFunc{ + F: f, + help: help, + } + + if name != "" { + publish(name, cf) + } + return cf +} + +// Help implements the Variable interface. +func (cf CounterDurationFunc) Help() string { + return cf.help +} + +// String is the implementation of expvar.var. +func (cf CounterDurationFunc) String() string { + return strconv.FormatInt(int64(cf.F()), 10) +} + +// GaugeDurationFunc allows to provide the value via a custom function. +type GaugeDurationFunc struct { + CounterDurationFunc +} + +// NewGaugeDurationFunc creates a new GaugeDurationFunc instance and +// publishes it if name is set. +func NewGaugeDurationFunc(name string, help string, f func() time.Duration) *GaugeDurationFunc { + gf := &GaugeDurationFunc{ + CounterDurationFunc: CounterDurationFunc{ + F: f, + help: help, + }, + } + + if name != "" { + publish(name, gf) + } + return gf +} diff --git a/pkg/metrics/stats/export.go b/pkg/metrics/stats/export.go new file mode 100644 index 00000000..1493bdb1 --- /dev/null +++ b/pkg/metrics/stats/export.go @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package stats + +import ( + "bytes" + "expvar" + "fmt" + "strconv" + "sync" + "time" +) + +import ( + "github.com/arana-db/arana/pkg/util/log" +) + +const defaultEmitPeriod = 60 * time.Second + +// NewVarHook is the type of a hook to export variables in a different way +type NewVarHook func(name string, v expvar.Var) + +type varGroup struct { + sync.Mutex + vars map[string]expvar.Var + newVarHook NewVarHook +} + +func (vg *varGroup) register(nvh NewVarHook) { + vg.Lock() + defer vg.Unlock() + if vg.newVarHook != nil { + panic("You've already registered a function") + } + if nvh == nil { + panic("nil not allowed") + } + vg.newVarHook = nvh + // Call hook on existing vars because some might have been + // created before the call to register + for k, v := range vg.vars { + nvh(k, v) + } + vg.vars = nil +} + +func (vg *varGroup) publish(name string, v expvar.Var) { + vg.Lock() + defer vg.Unlock() + + expvar.Publish(name, v) + if vg.newVarHook != nil { + vg.newVarHook(name, v) + } else { + vg.vars[name] = v + } +} + +var defaultVarGroup = varGroup{vars: make(map[string]expvar.Var)} + +// Register allows you to register a callback function +// that will be called whenever a new stats variable gets +// created. This can be used to build alternate methods +// of exporting stats variables. +func Register(nvh NewVarHook) { + defaultVarGroup.register(nvh) +} + +// Publish is expvar.Publish+hook +func Publish(name string, v expvar.Var) { + publish(name, v) +} + +func publish(name string, v expvar.Var) { + defaultVarGroup.publish(name, v) +} + +// PushBackend is an interface for any stats/metrics backend that requires data +// to be pushed to it. It's used to support push-based metrics backends, as expvar +// by default only supports pull-based ones. +type PushBackend interface { + // PushAll pushes all stats from expvar to the backend + PushAll() error +} + +var pushBackends = make(map[string]PushBackend) +var pushBackendsLock sync.Mutex +var once sync.Once + +// GetPushBackend return push backends +func GetPushBackend() map[string]PushBackend { + return pushBackends +} + +// RegisterPushBackend allows modules to register PushBackend implementations. +// Should be called on init(). +func RegisterPushBackend(name string, backend PushBackend, emitPeriod time.Duration) { + pushBackendsLock.Lock() + defer pushBackendsLock.Unlock() + if _, ok := pushBackends[name]; ok { + log.Fatal(fmt.Sprintf("PushBackend %s already exists; can't register the same name multiple times", name)) + } + pushBackends[name] = backend + + if emitPeriod <= 0 { + log.Warn("[stats] push backend got invalid emitPeriod: %v, use default emitPeriod instead: %v", emitPeriod, defaultEmitPeriod) + emitPeriod = defaultEmitPeriod + } + // Start a single goroutine to emit stats periodically + once.Do(func() { + go emitToBackend(name, emitPeriod) + }) +} + +// emitToBackend does a periodic emit to the selected PushBackend. If a push fails, +// it will be logged as a warning (but things will otherwise proceed as normal). +func emitToBackend(name string, emitPeriod time.Duration) { + ticker := time.NewTicker(emitPeriod) + defer ticker.Stop() + for range ticker.C { + backend, ok := pushBackends[name] + if !ok { + log.Fatal(fmt.Sprintf("No PushBackend registered with name %s", name)) + return + } + err := backend.PushAll() + if err != nil { + log.Warn("Pushing stats to backend %v failed: %v", name, err) + } + } +} + +// Float is expvar.Float+Get+hook +type Float struct { + mu sync.Mutex + f float64 +} + +// NewFloat creates a new Float and exports it. +func NewFloat(name string) *Float { + v := new(Float) + publish(name, v) + return v +} + +// Add adds the provided value to the Float +func (v *Float) Add(delta float64) { + v.mu.Lock() + v.f += delta + v.mu.Unlock() +} + +// Set sets the value +func (v *Float) Set(value float64) { + v.mu.Lock() + v.f = value + v.mu.Unlock() +} + +// Get returns the value +func (v *Float) Get() float64 { + v.mu.Lock() + f := v.f + v.mu.Unlock() + return f +} + +// String is the implementation of expvar.var +func (v *Float) String() string { + return strconv.FormatFloat(v.Get(), 'g', -1, 64) +} + +// FloatFunc converts a function that returns +// a float64 as an expvar. +type FloatFunc func() float64 + +// String is the implementation of expvar.var +func (f FloatFunc) String() string { + return strconv.FormatFloat(f(), 'g', -1, 64) +} + +// String is expvar.String+Get+hook +type String struct { + mu sync.Mutex + s string +} + +// NewString returns a new String +func NewString(name string) *String { + v := new(String) + publish(name, v) + return v +} + +// Set sets the value +func (v *String) Set(value string) { + v.mu.Lock() + v.s = value + v.mu.Unlock() +} + +// Get returns the value +func (v *String) Get() string { + v.mu.Lock() + s := v.s + v.mu.Unlock() + return s +} + +// String is the implementation of expvar.var +func (v *String) String() string { + return strconv.Quote(v.Get()) +} + +// StringFunc converts a function that returns +// an string as an expvar. +type StringFunc func() string + +// String is the implementation of expvar.var +func (f StringFunc) String() string { + return strconv.Quote(f()) +} + +// JSONFunc is the public type for a single function that returns json directly. +type JSONFunc func() string + +// String is the implementation of expvar.var +func (f JSONFunc) String() string { + return f() +} + +// PublishJSONFunc publishes any function that returns +// a JSON string as a variable. The string is sent to +// expvar as is. +func PublishJSONFunc(name string, f func() string) { + publish(name, JSONFunc(f)) +} + +// StringMap is a map of string -> string +type StringMap struct { + mu sync.Mutex + values map[string]string +} + +// NewStringMap returns a new StringMap +func NewStringMap(name string) *StringMap { + v := &StringMap{values: make(map[string]string)} + publish(name, v) + return v +} + +// Set will set a value (existing or not) +func (v *StringMap) Set(name, value string) { + v.mu.Lock() + v.values[name] = value + v.mu.Unlock() +} + +// Get will return the value, or "" f not set. +func (v *StringMap) Get(name string) string { + v.mu.Lock() + s := v.values[name] + v.mu.Unlock() + return s +} + +// String is the implementation of expvar.Var +func (v *StringMap) String() string { + v.mu.Lock() + defer v.mu.Unlock() + return stringMapToString(v.values) +} + +// StringMapFunc is the function equivalent of StringMap +type StringMapFunc func() map[string]string + +// String is used by expvar. +func (f StringMapFunc) String() string { + m := f() + if m == nil { + return "{}" + } + return stringMapToString(m) +} + +func stringMapToString(m map[string]string) string { + b := bytes.NewBuffer(make([]byte, 0, 4096)) + fmt.Fprintf(b, "{") + firstValue := true + for k, v := range m { + if firstValue { + firstValue = false + } else { + fmt.Fprintf(b, ", ") + } + fmt.Fprintf(b, "\"%v\": %v", k, strconv.Quote(v)) + } + fmt.Fprintf(b, "}") + return b.String() +} diff --git a/pkg/metrics/stats/export_test.go b/pkg/metrics/stats/export_test.go new file mode 100644 index 00000000..3aa0fc1a --- /dev/null +++ b/pkg/metrics/stats/export_test.go @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package stats + +import ( + "expvar" + "testing" +) + +func clear() { + defaultVarGroup.vars = make(map[string]expvar.Var) + defaultVarGroup.newVarHook = nil +} + +func TestNoHook(t *testing.T) { + clear() + v := NewCounter("plainint", "help") + v.Add(1) + if v.String() != "1" { + t.Errorf("want 1, got %s", v.String()) + } +} + +func TestFloat(t *testing.T) { + var gotname string + var gotv *Float + clear() + Register(func(name string, v expvar.Var) { + gotname = name + gotv = v.(*Float) + }) + v := NewFloat("Float") + if gotname != "Float" { + t.Errorf("want Float, got %s", gotname) + } + if gotv != v { + t.Errorf("want %#v, got %#v", v, gotv) + } + v.Set(5.1) + if v.Get() != 5.1 { + t.Errorf("want 5.1, got %v", v.Get()) + } + v.Add(1.0) + if v.Get() != 6.1 { + t.Errorf("want 6.1, got %v", v.Get()) + } + if v.String() != "6.1" { + t.Errorf("want 6.1, got %v", v.Get()) + } + + f := FloatFunc(func() float64 { + return 1.234 + }) + if f.String() != "1.234" { + t.Errorf("want 1.234, got %v", f.String()) + } +} + +func TestString(t *testing.T) { + var gotname string + var gotv *String + clear() + Register(func(name string, v expvar.Var) { + gotname = name + gotv = v.(*String) + }) + v := NewString("String") + if gotname != "String" { + t.Errorf("want String, got %s", gotname) + } + if gotv != v { + t.Errorf("want %#v, got %#v", v, gotv) + } + v.Set("a\"b") + if v.Get() != "a\"b" { + t.Errorf("want \"a\"b\", got %#v", gotv) + } + if v.String() != "\"a\\\"b\"" { + t.Errorf("want \"\"a\\\"b\"\", got %#v", gotv) + } + + f := StringFunc(func() string { + return "a" + }) + if f.String() != "\"a\"" { + t.Errorf("want \"a\", got %v", f.String()) + } +} + +type Mystr string + +func (m *Mystr) String() string { + return string(*m) +} + +func TestPublish(t *testing.T) { + var gotname string + var gotv expvar.Var + clear() + Register(func(name string, v expvar.Var) { + gotname = name + gotv = v.(*Mystr) + }) + v := Mystr("abcd") + Publish("Mystr", &v) + if gotname != "Mystr" { + t.Errorf("want Mystr, got %s", gotname) + } + if gotv != &v { + t.Errorf("want %#v, got %#v", &v, gotv) + } +} + +func f() string { + return "abcd" +} + +func TestPublishFunc(t *testing.T) { + var gotname string + var gotv JSONFunc + clear() + Register(func(name string, v expvar.Var) { + gotname = name + gotv = v.(JSONFunc) + }) + PublishJSONFunc("Myfunc", f) + if gotname != "Myfunc" { + t.Errorf("want Myfunc, got %s", gotname) + } + if gotv.String() != f() { + t.Errorf("want %v, got %#v", f(), gotv()) + } +} + +func TestStringMap(t *testing.T) { + clear() + c := NewStringMap("stringmap1") + c.Set("c1", "val1") + c.Set("c2", "val2") + c.Set("c2", "val3") + want1 := `{"c1": "val1", "c2": "val3"}` + want2 := `{"c2": "val3", "c1": "val1"}` + if s := c.String(); s != want1 && s != want2 { + t.Errorf("want %s or %s, got %s", want1, want2, s) + } + + f := StringMapFunc(func() map[string]string { + return map[string]string{ + "c1": "val1", + "c2": "val3", + } + }) + if s := f.String(); s != want1 && s != want2 { + t.Errorf("want %s or %s, got %s", want1, want2, s) + } +} diff --git a/pkg/metrics/stats/histogram.go b/pkg/metrics/stats/histogram.go new file mode 100644 index 00000000..42761b5e --- /dev/null +++ b/pkg/metrics/stats/histogram.go @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package stats + +import ( + "bytes" + "fmt" +) + +import ( + "github.com/arana-db/arana/pkg/util/sync2" +) + +// Histogram tracks counts and totals while +// splitting the counts under different buckets +// using specified cutoffs. +type Histogram struct { + help string + cutoffs []int64 + labels []string + countLabel string + totalLabel string + hook func(int64) + + buckets []sync2.AtomicInt64 + total sync2.AtomicInt64 +} + +// NewHistogram creates a histogram with auto-generated labels +// based on the cutoffs. The buckets are categorized using the +// following criterion: cutoff[i-1] < value <= cutoff[i]. Anything +// higher than the highest cutoff is labeled as "inf". +func NewHistogram(name, help string, cutoffs []int64) *Histogram { + labels := make([]string, len(cutoffs)+1) + for i, v := range cutoffs { + labels[i] = fmt.Sprintf("%d", v) + } + labels[len(labels)-1] = "inf" + return NewGenericHistogram(name, help, cutoffs, labels, "Count", "Total") +} + +// NewGenericHistogram creates a histogram where all the labels are +// supplied by the caller. The number of labels has to be one more than +// the number of cutoffs because the last label captures everything that +// exceeds the highest cutoff. +func NewGenericHistogram(name, help string, cutoffs []int64, labels []string, countLabel, totalLabel string) *Histogram { + if len(cutoffs) != len(labels)-1 { + panic("mismatched cutoff and label lengths") + } + h := &Histogram{ + help: help, + cutoffs: cutoffs, + labels: labels, + countLabel: countLabel, + totalLabel: totalLabel, + buckets: make([]sync2.AtomicInt64, len(labels)), + } + if name != "" { + publish(name, h) + } + return h +} + +// Add adds a new measurement to the Histogram. +func (h *Histogram) Add(value int64) { + for i := range h.labels { + if i == len(h.labels)-1 || value <= h.cutoffs[i] { + h.buckets[i].Add(1) + h.total.Add(value) + break + } + } + if h.hook != nil { + h.hook(value) + } +} + +// String returns a string representation of the Histogram. +// Note that sum of all buckets may not be equal to the total temporarily, +// because Add() increments bucket and total with two atomic operations. +func (h *Histogram) String() string { + b, _ := h.MarshalJSON() + return string(b) +} + +// MarshalJSON returns a JSON representation of the Histogram. +// Note that sum of all buckets may not be equal to the total temporarily, +// because Add() increments bucket and total with two atomic operations. +func (h *Histogram) MarshalJSON() ([]byte, error) { + b := bytes.NewBuffer(make([]byte, 0, 4096)) + fmt.Fprintf(b, "{") + totalCount := int64(0) + for i, label := range h.labels { + totalCount += h.buckets[i].Get() + fmt.Fprintf(b, "\"%v\": %v, ", label, totalCount) + } + fmt.Fprintf(b, "\"%s\": %v, ", h.countLabel, totalCount) + fmt.Fprintf(b, "\"%s\": %v", h.totalLabel, h.total.Get()) + fmt.Fprintf(b, "}") + return b.Bytes(), nil +} + +// Counts returns a map from labels to the current count in the Histogram for that label. +func (h *Histogram) Counts() map[string]int64 { + counts := make(map[string]int64, len(h.labels)) + for i, label := range h.labels { + counts[label] = h.buckets[i].Get() + } + return counts +} + +// CountLabel returns the count label that was set when this Histogram was created. +func (h *Histogram) CountLabel() string { + return h.countLabel +} + +// Count returns the number of times Add has been called. +func (h *Histogram) Count() (count int64) { + for i := range h.buckets { + count += h.buckets[i].Get() + } + return +} + +// TotalLabel returns the total label that was set when this Histogram was created. +func (h *Histogram) TotalLabel() string { + return h.totalLabel +} + +// Total returns the sum of all values that have been added to this Histogram. +func (h *Histogram) Total() (total int64) { + return h.total.Get() +} + +// Labels returns the labels that were set when this Histogram was created. +func (h *Histogram) Labels() []string { + return h.labels +} + +// Cutoffs returns the cutoffs that were set when this Histogram was created. +func (h *Histogram) Cutoffs() []int64 { + return h.cutoffs +} + +// Buckets returns a snapshot of the current values in all buckets. +func (h *Histogram) Buckets() []int64 { + buckets := make([]int64, len(h.buckets)) + for i := range h.buckets { + buckets[i] = h.buckets[i].Get() + } + return buckets +} + +// Help returns the help string. +func (h *Histogram) Help() string { + return h.help +} diff --git a/pkg/metrics/stats/histogram_test.go b/pkg/metrics/stats/histogram_test.go new file mode 100644 index 00000000..5263b7fc --- /dev/null +++ b/pkg/metrics/stats/histogram_test.go @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package stats + +import ( + "expvar" + "testing" +) + +func TestHistogram(t *testing.T) { + clear() + h := NewHistogram("hist1", "help", []int64{1, 5}) + for i := 0; i < 10; i++ { + h.Add(int64(i)) + } + want := `{"1": 2, "5": 6, "inf": 10, "Count": 10, "Total": 45}` + if h.String() != want { + t.Errorf("got %v, want %v", h.String(), want) + } + counts := h.Counts() + counts["Count"] = h.Count() + counts["Total"] = h.Total() + for k, want := range map[string]int64{ + "1": 2, + "5": 4, + "inf": 4, + "Count": 10, + "Total": 45, + } { + if got := counts[k]; got != want { + t.Errorf("histogram counts [%v]: got %d, want %d", k, got, want) + } + } + if got, want := h.CountLabel(), "Count"; got != want { + t.Errorf("got %v, want %v", got, want) + } + if got, want := h.TotalLabel(), "Total"; got != want { + t.Errorf("got %v, want %v", got, want) + } +} + +func TestGenericHistogram(t *testing.T) { + clear() + h := NewGenericHistogram( + "histgen", + "help", + []int64{1, 5}, + []string{"one", "five", "max"}, + "count", + "total", + ) + want := `{"one": 0, "five": 0, "max": 0, "count": 0, "total": 0}` + if got := h.String(); got != want { + t.Errorf("got %v, want %v", got, want) + } +} + +func TestHistogramHook(t *testing.T) { + var gotname string + var gotv *Histogram + clear() + Register(func(name string, v expvar.Var) { + gotname = name + gotv = v.(*Histogram) + }) + + name := "hist2" + v := NewHistogram(name, "help", []int64{1}) + if gotname != name { + t.Errorf("got %v; want %v", gotname, name) + } + if gotv != v { + t.Errorf("got %#v, want %#v", gotv, v) + } +} diff --git a/pkg/metrics/stats/kebab_case_converter.go b/pkg/metrics/stats/kebab_case_converter.go new file mode 100644 index 00000000..bcdfd4a1 --- /dev/null +++ b/pkg/metrics/stats/kebab_case_converter.go @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package stats + +import ( + "regexp" + "strings" + "sync" +) + +// toKebabCase produces a monitoring compliant name from the +// original. It converts CamelCase to camel-case, +// and CAMEL_CASE to camel-case. For numbers, it +// converts 0.5 to v0_5. +func toKebabCase(name string) (hyphenated string) { + memoizer.Lock() + defer memoizer.Unlock() + if hyphenated = memoizer.memo[name]; hyphenated != "" { + return hyphenated + } + hyphenated = name + for _, converter := range kebabConverters { + hyphenated = converter.re.ReplaceAllString(hyphenated, converter.repl) + } + hyphenated = strings.ToLower(hyphenated) + memoizer.memo[name] = hyphenated + return +} + +var kebabConverters = []struct { + re *regexp.Regexp + repl string +}{ + // example: LC -> L-C (e.g. CamelCase -> Camel-Case). + {regexp.MustCompile("([a-z])([A-Z])"), "$1-$2"}, + // example: CCa -> C-Ca (e.g. CCamel -> C-Camel). + {regexp.MustCompile("([A-Z])([A-Z][a-z])"), "$1-$2"}, + {regexp.MustCompile("_"), "-"}, + {regexp.MustCompile("\\."), "_"}, +} + +var memoizer = memoizerType{ + memo: make(map[string]string), +} + +type memoizerType struct { + sync.Mutex + memo map[string]string +} diff --git a/pkg/metrics/stats/kebab_case_converter_test.go b/pkg/metrics/stats/kebab_case_converter_test.go new file mode 100644 index 00000000..18324301 --- /dev/null +++ b/pkg/metrics/stats/kebab_case_converter_test.go @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package stats + +import "testing" + +func TestToKebabCase(t *testing.T) { + var kebabCaseTest = []struct{ input, output string }{ + {"Camel", "camel"}, + {"Camel", "camel"}, + {"CamelCase", "camel-case"}, + {"CamelCaseAgain", "camel-case-again"}, + {"CCamel", "c-camel"}, + {"CCCamel", "cc-camel"}, + {"CAMEL_CASE", "camel-case"}, + {"camel-case", "camel-case"}, + {"0", "0"}, + {"0.0", "0_0"}, + {"JSON", "json"}, + } + + for _, tt := range kebabCaseTest { + if got, want := toKebabCase(tt.input), tt.output; got != want { + t.Errorf("want '%s', got '%s'", want, got) + } + } +} + +func TestMemoize(t *testing.T) { + key := "Test" + if memoizer.memo[key] != "" { + t.Errorf("want '', got '%s'", memoizer.memo[key]) + } + toKebabCase(key) + if memoizer.memo[key] != "test" { + t.Errorf("want 'test', got '%s'", memoizer.memo[key]) + } +} diff --git a/pkg/metrics/stats/prometheus/collectors.go b/pkg/metrics/stats/prometheus/collectors.go new file mode 100644 index 00000000..a5e3e9d6 --- /dev/null +++ b/pkg/metrics/stats/prometheus/collectors.go @@ -0,0 +1,365 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package prometheus + +import ( + "strings" +) + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +import ( + "github.com/arana-db/arana/pkg/metrics/stats" +) + +type metricFuncCollector struct { + // f returns the floating point value of the metric. + f func() float64 + desc *prometheus.Desc + vt prometheus.ValueType +} + +func newMetricFuncCollector(v stats.Variable, name string, vt prometheus.ValueType, f func() float64) { + collector := &metricFuncCollector{ + f: f, + desc: prometheus.NewDesc( + name, + v.Help(), + nil, + nil), + vt: vt} + + prometheus.MustRegister(collector) +} + +// Describe implements Collector. +func (mc *metricFuncCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- mc.desc +} + +// Collect implements Collector. +func (mc *metricFuncCollector) Collect(ch chan<- prometheus.Metric) { + ch <- prometheus.MustNewConstMetric(mc.desc, mc.vt, float64(mc.f())) +} + +// countersWithSingleLabelCollector collects stats.CountersWithSingleLabel. +type countersWithSingleLabelCollector struct { + counters *stats.CountersWithSingleLabel + desc *prometheus.Desc + vt prometheus.ValueType +} + +func newCountersWithSingleLabelCollector(c *stats.CountersWithSingleLabel, name string, labelName string, vt prometheus.ValueType) { + collector := &countersWithSingleLabelCollector{ + counters: c, + desc: prometheus.NewDesc( + name, + c.Help(), + []string{labelName}, + nil), + vt: vt} + + prometheus.MustRegister(collector) +} + +// Describe implements Collector. +func (c *countersWithSingleLabelCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- c.desc +} + +// Collect implements Collector. +func (c *countersWithSingleLabelCollector) Collect(ch chan<- prometheus.Metric) { + for tag, val := range c.counters.Counts() { + ch <- prometheus.MustNewConstMetric( + c.desc, + c.vt, + float64(val), + tag) + } +} + +// gaugesWithSingleLabelCollector collects stats.GaugesWithSingleLabel. +type gaugesWithSingleLabelCollector struct { + gauges *stats.GaugesWithSingleLabel + desc *prometheus.Desc + vt prometheus.ValueType +} + +func newGaugesWithSingleLabelCollector(g *stats.GaugesWithSingleLabel, name string, labelName string, vt prometheus.ValueType) { + collector := &gaugesWithSingleLabelCollector{ + gauges: g, + desc: prometheus.NewDesc( + name, + g.Help(), + []string{labelName}, + nil), + vt: vt} + + prometheus.MustRegister(collector) +} + +// Describe implements Collector. +func (g *gaugesWithSingleLabelCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- g.desc +} + +// Collect implements Collector. +func (g *gaugesWithSingleLabelCollector) Collect(ch chan<- prometheus.Metric) { + for tag, val := range g.gauges.Counts() { + ch <- prometheus.MustNewConstMetric( + g.desc, + g.vt, + float64(val), + tag) + } +} + +type metricWithMultiLabelsCollector struct { + cml *stats.CountersWithMultiLabels + desc *prometheus.Desc +} + +func newMetricWithMultiLabelsCollector(cml *stats.CountersWithMultiLabels, name string) { + c := &metricWithMultiLabelsCollector{ + cml: cml, + desc: prometheus.NewDesc( + name, + cml.Help(), + labelsToSnake(cml.Labels()), + nil), + } + + prometheus.MustRegister(c) +} + +// Describe implements Collector. +func (c *metricWithMultiLabelsCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- c.desc +} + +// Collect implements Collector. +func (c *metricWithMultiLabelsCollector) Collect(ch chan<- prometheus.Metric) { + for lvs, val := range c.cml.Counts() { + labelValues := strings.Split(lvs, ".") + value := float64(val) + ch <- prometheus.MustNewConstMetric(c.desc, prometheus.CounterValue, value, labelValues...) + } +} + +type gaugesWithMultiLabelsCollector struct { + gml *stats.GaugesWithMultiLabels + desc *prometheus.Desc +} + +func newGaugesWithMultiLabelsCollector(gml *stats.GaugesWithMultiLabels, name string) { + c := &gaugesWithMultiLabelsCollector{ + gml: gml, + desc: prometheus.NewDesc( + name, + gml.Help(), + labelsToSnake(gml.Labels()), + nil), + } + + prometheus.MustRegister(c) +} + +// Describe implements Collector. +func (c *gaugesWithMultiLabelsCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- c.desc +} + +// Collect implements Collector. +func (c *gaugesWithMultiLabelsCollector) Collect(ch chan<- prometheus.Metric) { + for lvs, val := range c.gml.Counts() { + labelValues := strings.Split(lvs, ".") + value := float64(val) + ch <- prometheus.MustNewConstMetric(c.desc, prometheus.GaugeValue, value, labelValues...) + } +} + +type metricsFuncWithMultiLabelsCollector struct { + cfml *stats.CountersFuncWithMultiLabels + desc *prometheus.Desc + vt prometheus.ValueType +} + +func newMetricsFuncWithMultiLabelsCollector(cfml *stats.CountersFuncWithMultiLabels, name string, vt prometheus.ValueType) { + collector := &metricsFuncWithMultiLabelsCollector{ + cfml: cfml, + desc: prometheus.NewDesc( + name, + cfml.Help(), + labelsToSnake(cfml.Labels()), + nil), + vt: vt, + } + + prometheus.MustRegister(collector) +} + +// Describe implements Collector. +func (c *metricsFuncWithMultiLabelsCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- c.desc +} + +// Collect implements Collector. +func (c *metricsFuncWithMultiLabelsCollector) Collect(ch chan<- prometheus.Metric) { + for lvs, val := range c.cfml.Counts() { + labelValues := strings.Split(lvs, ".") + value := float64(val) + ch <- prometheus.MustNewConstMetric(c.desc, c.vt, value, labelValues...) + } +} + +type timingsCollector struct { + t *stats.Timings + cutoffs []float64 + desc *prometheus.Desc +} + +func newTimingsCollector(t *stats.Timings, name string) { + cutoffs := make([]float64, len(t.Cutoffs())) + for i, val := range t.Cutoffs() { + cutoffs[i] = float64(val) / 1000000000 + } + + collector := &timingsCollector{ + t: t, + cutoffs: cutoffs, + desc: prometheus.NewDesc( + name, + t.Help(), + []string{t.Label()}, + nil), + } + + prometheus.MustRegister(collector) +} + +// Describe implements Collector. +func (c *timingsCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- c.desc +} + +// Collect implements Collector. +func (c *timingsCollector) Collect(ch chan<- prometheus.Metric) { + for cat, his := range c.t.Histograms() { + ch <- prometheus.MustNewConstHistogram( + c.desc, + uint64(his.Count()), + float64(his.Total())/1000000000, + makeCumulativeBuckets(c.cutoffs, his.Buckets()), + cat) + } +} + +func makeCumulativeBuckets(cutoffs []float64, buckets []int64) map[float64]uint64 { + output := make(map[float64]uint64) + last := uint64(0) + for i, key := range cutoffs { + output[key] = uint64(buckets[i]) + last + last = output[key] + } + return output +} + +type multiTimingsCollector struct { + mt *stats.MultiTimings + cutoffs []float64 + desc *prometheus.Desc +} + +func newMultiTimingsCollector(mt *stats.MultiTimings, name string) { + cutoffs := make([]float64, len(mt.Cutoffs())) + for i, val := range mt.Cutoffs() { + cutoffs[i] = float64(val) / 1000000000 + } + + collector := &multiTimingsCollector{ + mt: mt, + cutoffs: cutoffs, + desc: prometheus.NewDesc( + name, + mt.Help(), + labelsToSnake(mt.Labels()), + nil), + } + + prometheus.MustRegister(collector) +} + +// Describe implements Collector. +func (c *multiTimingsCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- c.desc +} + +// Collect implements Collector. +func (c *multiTimingsCollector) Collect(ch chan<- prometheus.Metric) { + for cat, his := range c.mt.Timings.Histograms() { + labelValues := strings.Split(cat, ".") + ch <- prometheus.MustNewConstHistogram( + c.desc, + uint64(his.Count()), + float64(his.Total())/1000000000, + makeCumulativeBuckets(c.cutoffs, his.Buckets()), + labelValues...) + } +} + +type histogramCollector struct { + h *stats.Histogram + cutoffs []float64 + desc *prometheus.Desc +} + +func newHistogramCollector(h *stats.Histogram, name string) { + cutoffs := make([]float64, len(h.Cutoffs())) + for i, val := range h.Cutoffs() { + cutoffs[i] = float64(val) + } + + collector := &histogramCollector{ + h: h, + cutoffs: cutoffs, + desc: prometheus.NewDesc( + name, + h.Help(), + []string{}, + nil), + } + + prometheus.MustRegister(collector) +} + +// Describe implements Collector. +func (c *histogramCollector) Describe(ch chan<- *prometheus.Desc) { + ch <- c.desc +} + +// Collect implements Collector. +func (c *histogramCollector) Collect(ch chan<- prometheus.Metric) { + ch <- prometheus.MustNewConstHistogram( + c.desc, + uint64(c.h.Count()), + float64(c.h.Total()), + makeCumulativeBuckets(c.cutoffs, c.h.Buckets()), + ) +} diff --git a/pkg/metrics/stats/prometheus/prometheus_backend.go b/pkg/metrics/stats/prometheus/prometheus_backend.go new file mode 100644 index 00000000..5a57efbd --- /dev/null +++ b/pkg/metrics/stats/prometheus/prometheus_backend.go @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package prometheus + +import ( + "expvar" + "net/http" + "strings" +) + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +import ( + "github.com/arana-db/arana/pkg/metrics/stats" + "github.com/arana-db/arana/pkg/util/log" +) + +// PromBackend implements PullBackend using Prometheus as the backing metrics storage. +type PromBackend struct { + namespace string +} + +var ( + be PromBackend +) + +// Init initializes the Prometheus be with the given namespace. +func Init(namespace string) { + be.namespace = namespace + stats.Register(be.publishPrometheusMetric) +} + +// GetHandlers return registered handlers +func GetHandlers() map[string]http.Handler { + return map[string]http.Handler{ + "/metrics": promhttp.Handler(), + } +} + +// PublishPromMetric is used to publish the metric to Prometheus. +func (be PromBackend) publishPrometheusMetric(name string, v expvar.Var) { + switch st := v.(type) { + case *stats.Counter: + newMetricFuncCollector(st, be.buildPromName(name), prometheus.CounterValue, func() float64 { return float64(st.Get()) }) + case *stats.CounterFunc: + newMetricFuncCollector(st, be.buildPromName(name), prometheus.CounterValue, func() float64 { return float64(st.F()) }) + case *stats.Gauge: + newMetricFuncCollector(st, be.buildPromName(name), prometheus.GaugeValue, func() float64 { return float64(st.Get()) }) + case *stats.GaugeFunc: + newMetricFuncCollector(st, be.buildPromName(name), prometheus.GaugeValue, func() float64 { return float64(st.F()) }) + case *stats.CountersWithSingleLabel: + newCountersWithSingleLabelCollector(st, be.buildPromName(name), st.Label(), prometheus.CounterValue) + case *stats.CountersWithMultiLabels: + newMetricWithMultiLabelsCollector(st, be.buildPromName(name)) + case *stats.CountersFuncWithMultiLabels: + newMetricsFuncWithMultiLabelsCollector(st, be.buildPromName(name), prometheus.CounterValue) + case *stats.GaugesFuncWithMultiLabels: + newMetricsFuncWithMultiLabelsCollector(&st.CountersFuncWithMultiLabels, be.buildPromName(name), prometheus.GaugeValue) + case *stats.GaugesWithSingleLabel: + newGaugesWithSingleLabelCollector(st, be.buildPromName(name), st.Label(), prometheus.GaugeValue) + case *stats.GaugesWithMultiLabels: + newGaugesWithMultiLabelsCollector(st, be.buildPromName(name)) + case *stats.CounterDuration: + newMetricFuncCollector(st, be.buildPromName(name), prometheus.CounterValue, func() float64 { return st.Get().Seconds() }) + case *stats.CounterDurationFunc: + newMetricFuncCollector(st, be.buildPromName(name), prometheus.CounterValue, func() float64 { return st.F().Seconds() }) + case *stats.GaugeDuration: + newMetricFuncCollector(st, be.buildPromName(name), prometheus.GaugeValue, func() float64 { return st.Get().Seconds() }) + case *stats.GaugeDurationFunc: + newMetricFuncCollector(st, be.buildPromName(name), prometheus.GaugeValue, func() float64 { return st.F().Seconds() }) + case *stats.Timings: + newTimingsCollector(st, be.buildPromName(name)) + case *stats.MultiTimings: + newMultiTimingsCollector(st, be.buildPromName(name)) + case *stats.Histogram: + newHistogramCollector(st, be.buildPromName(name)) + default: + log.Warn("[prometheus] Not exporting to Prometheus an unsupported metric type of %T: %s", st, name) + } +} + +// buildPromName specifies the namespace as a prefix to the metric name +func (be PromBackend) buildPromName(name string) string { + s := strings.TrimPrefix(normalizeMetric(name), be.namespace+"_") + return prometheus.BuildFQName("", be.namespace, s) +} + +func labelsToSnake(labels []string) []string { + output := make([]string, len(labels)) + for i, l := range labels { + output[i] = normalizeMetric(l) + } + return output +} + +// normalizeMetricForPrometheus produces a compliant name by applying +// special case conversions and then applying a camel case to snake case converter. +func normalizeMetric(name string) string { + // Special cases + r := strings.NewReplacer("VSchema", "vschema", "VtGate", "vtgate") + name = r.Replace(name) + + return stats.GetSnakeName(name) +} diff --git a/pkg/metrics/stats/prometheus/prometheus_backend_test.go b/pkg/metrics/stats/prometheus/prometheus_backend_test.go new file mode 100644 index 00000000..44afc4ce --- /dev/null +++ b/pkg/metrics/stats/prometheus/prometheus_backend_test.go @@ -0,0 +1,351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package prometheus + +import ( + "fmt" + "net/http" + "net/http/httptest" + "os" + "strings" + "testing" + "time" +) + +import ( + "github.com/arana-db/arana/pkg/metrics/stats" +) + +import ( + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +const namespace = "namespace" + +func TestPrometheusCounter(t *testing.T) { + name := "blah" + c := stats.NewCounter(name, "blah") + c.Add(1) + checkHandlerForMetrics(t, name, 1) + + c.Reset() + checkHandlerForMetrics(t, name, 0) +} + +func TestPrometheusGauge(t *testing.T) { + name := "blah_gauge" + c := stats.NewGauge(name, "help") + c.Add(1) + checkHandlerForMetrics(t, name, 1) + c.Add(-1) + checkHandlerForMetrics(t, name, 0) + c.Set(-5) + checkHandlerForMetrics(t, name, -5) + c.Reset() + checkHandlerForMetrics(t, name, 0) +} + +func TestPrometheusCounterFunc(t *testing.T) { + name := "blah_counterfunc" + stats.NewCounterFunc(name, "help", func() int64 { + return 2 + }) + + checkHandlerForMetrics(t, name, 2) +} + +func TestPrometheusGaugeFunc(t *testing.T) { + name := "blah_gaugefunc" + + stats.NewGaugeFunc(name, "help", func() int64 { + return -3 + }) + + checkHandlerForMetrics(t, name, -3) +} + +func TestPrometheusCounterDuration(t *testing.T) { + name := "blah_counterduration" + + d := stats.NewCounterDuration(name, "help") + d.Add(1 * time.Second) + + checkHandlerForMetrics(t, name, 1) +} + +func TestPrometheusCounterDurationFunc(t *testing.T) { + name := "blah_counterdurationfunc" + + stats.NewCounterDurationFunc(name, "help", func() time.Duration { return 1 * time.Second }) + + checkHandlerForMetrics(t, name, 1) +} + +func TestPrometheusGaugeDuration(t *testing.T) { + name := "blah_gaugeduration" + + d := stats.NewGaugeDuration(name, "help") + d.Set(1 * time.Second) + + checkHandlerForMetrics(t, name, 1) +} + +func TestPrometheusGaugeDurationFunc(t *testing.T) { + name := "blah_gaugedurationfunc" + + stats.NewGaugeDurationFunc(name, "help", func() time.Duration { return 1 * time.Second }) + + checkHandlerForMetrics(t, name, 1) +} + +func checkHandlerForMetrics(t *testing.T, metric string, value int) { + response := testMetricsHandler(t) + + expected := fmt.Sprintf("%s_%s %d", namespace, metric, value) + + if !strings.Contains(response.Body.String(), expected) { + t.Fatalf("Expected %s got %s", expected, response.Body.String()) + } +} + +func TestPrometheusCountersWithSingleLabel(t *testing.T) { + name := "blah_counterswithsinglelabel" + c := stats.NewCountersWithSingleLabel(name, "help", "label", "tag1", "tag2") + c.Add("tag1", 1) + checkHandlerForMetricWithSingleLabel(t, name, "label", "tag1", 1) + checkHandlerForMetricWithSingleLabel(t, name, "label", "tag2", 0) + c.Add("tag2", 41) + checkHandlerForMetricWithSingleLabel(t, name, "label", "tag1", 1) + checkHandlerForMetricWithSingleLabel(t, name, "label", "tag2", 41) + c.Reset("tag2") + checkHandlerForMetricWithSingleLabel(t, name, "label", "tag1", 1) + checkHandlerForMetricWithSingleLabel(t, name, "label", "tag2", 0) +} + +func TestPrometheusGaugesWithSingleLabel(t *testing.T) { + name := "blah_gaugeswithsinglelabel" + c := stats.NewGaugesWithSingleLabel(name, "help", "label", "tag1", "tag2") + c.Add("tag1", 1) + checkHandlerForMetricWithSingleLabel(t, name, "label", "tag1", 1) + + c.Add("tag2", 1) + checkHandlerForMetricWithSingleLabel(t, name, "label", "tag2", 1) + + c.Set("tag1", -1) + checkHandlerForMetricWithSingleLabel(t, name, "label", "tag1", -1) + + c.Reset("tag2") + checkHandlerForMetricWithSingleLabel(t, name, "label", "tag1", -1) + checkHandlerForMetricWithSingleLabel(t, name, "label", "tag2", 0) +} + +func checkHandlerForMetricWithSingleLabel(t *testing.T, metric, label, tag string, value int) { + response := testMetricsHandler(t) + + expected := fmt.Sprintf("%s_%s{%s=\"%s\"} %d", namespace, metric, label, tag, value) + + if !strings.Contains(response.Body.String(), expected) { + t.Fatalf("Expected %s got %s", expected, response.Body.String()) + } +} + +func TestPrometheusCountersWithMultiLabels(t *testing.T) { + name := "blah_counterswithmultilabels" + labels := []string{"label1", "label2"} + labelValues := []string{"foo", "bar"} + c := stats.NewCountersWithMultiLabels(name, "help", labels) + c.Add(labelValues, 1) + checkHandlerForMetricWithMultiLabels(t, name, labels, labelValues, 1) + labelValues2 := []string{"baz", "bazbar"} + c.Add(labelValues2, 1) + checkHandlerForMetricWithMultiLabels(t, name, labels, labelValues, 1) + checkHandlerForMetricWithMultiLabels(t, name, labels, labelValues2, 1) + c.Reset(labelValues) + checkHandlerForMetricWithMultiLabels(t, name, labels, labelValues, 0) + checkHandlerForMetricWithMultiLabels(t, name, labels, labelValues2, 1) +} + +func TestPrometheusGaugesWithMultiLabels(t *testing.T) { + name := "blah_gaugeswithmultilabels" + labels := []string{"label1", "label2"} + labelValues := []string{"foo", "bar"} + c := stats.NewGaugesWithMultiLabels(name, "help", labels) + c.Add(labelValues, 1) + checkHandlerForMetricWithMultiLabels(t, name, labels, labelValues, 1) + + c.Set(labelValues, -1) + checkHandlerForMetricWithMultiLabels(t, name, labels, labelValues, -1) + + labelValues2 := []string{"baz", "bazbar"} + c.Add(labelValues2, 1) + checkHandlerForMetricWithMultiLabels(t, name, labels, labelValues, -1) + checkHandlerForMetricWithMultiLabels(t, name, labels, labelValues2, 1) + + c.Reset(labelValues) + checkHandlerForMetricWithMultiLabels(t, name, labels, labelValues, 0) + checkHandlerForMetricWithMultiLabels(t, name, labels, labelValues2, 1) +} + +func TestPrometheusCountersWithMultiLabels_AddPanic(t *testing.T) { + defer func() { + if r := recover(); r == nil { + t.Errorf("The code did not panic when adding to inequal label lengths") + } + }() + + name := "blah_counterswithmultilabels_inequallength" + c := stats.NewCountersWithMultiLabels(name, "help", []string{"label1", "label2"}) + c.Add([]string{"label1"}, 1) +} + +func TestPrometheusCountersFuncWithMultiLabels(t *testing.T) { + name := "blah_countersfuncwithmultilabels" + labels := []string{"label1", "label2"} + + stats.NewCountersFuncWithMultiLabels(name, "help", labels, func() map[string]int64 { + m := make(map[string]int64) + m["foo.bar"] = 1 + m["bar.baz"] = 1 + return m + }) + + checkHandlerForMetricWithMultiLabels(t, name, labels, []string{"foo", "bar"}, 1) + checkHandlerForMetricWithMultiLabels(t, name, labels, []string{"bar", "baz"}, 1) +} + +func checkHandlerForMetricWithMultiLabels(t *testing.T, metric string, labels []string, labelValues []string, value int64) { + response := testMetricsHandler(t) + + expected := fmt.Sprintf("%s_%s{%s=\"%s\",%s=\"%s\"} %d", namespace, metric, labels[0], labelValues[0], labels[1], labelValues[1], value) + + if !strings.Contains(response.Body.String(), expected) { + t.Fatalf("Expected %s got %s", expected, response.Body.String()) + } +} + +func TestPrometheusTimings(t *testing.T) { + name := "blah_timings" + cats := []string{"cat1", "cat2"} + timing := stats.NewTimings(name, "help", "category", cats...) + timing.Add("cat1", time.Duration(30*time.Millisecond)) + timing.Add("cat1", time.Duration(200*time.Millisecond)) + timing.Add("cat1", time.Duration(1*time.Second)) + + response := testMetricsHandler(t) + var s []string + + s = append(s, fmt.Sprintf("%s_%s_bucket{category=\"%s\",le=\"0.0005\"} %d", namespace, name, cats[0], 0)) + s = append(s, fmt.Sprintf("%s_%s_bucket{category=\"%s\",le=\"0.001\"} %d", namespace, name, cats[0], 0)) + s = append(s, fmt.Sprintf("%s_%s_bucket{category=\"%s\",le=\"0.005\"} %d", namespace, name, cats[0], 0)) + s = append(s, fmt.Sprintf("%s_%s_bucket{category=\"%s\",le=\"0.01\"} %d", namespace, name, cats[0], 0)) + s = append(s, fmt.Sprintf("%s_%s_bucket{category=\"%s\",le=\"0.05\"} %d", namespace, name, cats[0], 1)) + s = append(s, fmt.Sprintf("%s_%s_bucket{category=\"%s\",le=\"0.1\"} %d", namespace, name, cats[0], 1)) + s = append(s, fmt.Sprintf("%s_%s_bucket{category=\"%s\",le=\"0.5\"} %d", namespace, name, cats[0], 2)) + s = append(s, fmt.Sprintf("%s_%s_bucket{category=\"%s\",le=\"1\"} %d", namespace, name, cats[0], 3)) + s = append(s, fmt.Sprintf("%s_%s_bucket{category=\"%s\",le=\"5\"} %d", namespace, name, cats[0], 3)) + s = append(s, fmt.Sprintf("%s_%s_bucket{category=\"%s\",le=\"10\"} %d", namespace, name, cats[0], 3)) + s = append(s, fmt.Sprintf("%s_%s_bucket{category=\"%s\",le=\"+Inf\"} %d", namespace, name, cats[0], 3)) + s = append(s, fmt.Sprintf("%s_%s_sum{category=\"%s\"} %s", namespace, name, cats[0], "1.23")) + s = append(s, fmt.Sprintf("%s_%s_count{category=\"%s\"} %d", namespace, name, cats[0], 3)) + + for _, line := range s { + if !strings.Contains(response.Body.String(), line) { + t.Fatalf("Expected result to contain %s, got %s", line, response.Body.String()) + } + } +} + +func TestPrometheusMultiTimings(t *testing.T) { + name := "blah_multitimings" + cats := []string{"cat1", "cat2"} + catLabels := []string{"foo", "bar"} + timing := stats.NewMultiTimings(name, "help", cats) + timing.Add(catLabels, time.Duration(30*time.Millisecond)) + timing.Add(catLabels, time.Duration(200*time.Millisecond)) + timing.Add(catLabels, time.Duration(1*time.Second)) + + response := testMetricsHandler(t) + var s []string + + s = append(s, fmt.Sprintf("%s_%s_bucket{%s=\"%s\",%s=\"%s\",le=\"0.0005\"} %d", namespace, name, cats[0], catLabels[0], cats[1], catLabels[1], 0)) + s = append(s, fmt.Sprintf("%s_%s_bucket{%s=\"%s\",%s=\"%s\",le=\"0.001\"} %d", namespace, name, cats[0], catLabels[0], cats[1], catLabels[1], 0)) + s = append(s, fmt.Sprintf("%s_%s_bucket{%s=\"%s\",%s=\"%s\",le=\"0.005\"} %d", namespace, name, cats[0], catLabels[0], cats[1], catLabels[1], 0)) + s = append(s, fmt.Sprintf("%s_%s_bucket{%s=\"%s\",%s=\"%s\",le=\"0.01\"} %d", namespace, name, cats[0], catLabels[0], cats[1], catLabels[1], 0)) + s = append(s, fmt.Sprintf("%s_%s_bucket{%s=\"%s\",%s=\"%s\",le=\"0.05\"} %d", namespace, name, cats[0], catLabels[0], cats[1], catLabels[1], 1)) + s = append(s, fmt.Sprintf("%s_%s_bucket{%s=\"%s\",%s=\"%s\",le=\"0.1\"} %d", namespace, name, cats[0], catLabels[0], cats[1], catLabels[1], 1)) + s = append(s, fmt.Sprintf("%s_%s_bucket{%s=\"%s\",%s=\"%s\",le=\"0.5\"} %d", namespace, name, cats[0], catLabels[0], cats[1], catLabels[1], 2)) + s = append(s, fmt.Sprintf("%s_%s_bucket{%s=\"%s\",%s=\"%s\",le=\"1\"} %d", namespace, name, cats[0], catLabels[0], cats[1], catLabels[1], 3)) + s = append(s, fmt.Sprintf("%s_%s_bucket{%s=\"%s\",%s=\"%s\",le=\"5\"} %d", namespace, name, cats[0], catLabels[0], cats[1], catLabels[1], 3)) + s = append(s, fmt.Sprintf("%s_%s_bucket{%s=\"%s\",%s=\"%s\",le=\"10\"} %d", namespace, name, cats[0], catLabels[0], cats[1], catLabels[1], 3)) + s = append(s, fmt.Sprintf("%s_%s_bucket{%s=\"%s\",%s=\"%s\",le=\"+Inf\"} %d", namespace, name, cats[0], catLabels[0], cats[1], catLabels[1], 3)) + s = append(s, fmt.Sprintf("%s_%s_sum{%s=\"%s\",%s=\"%s\"} %s", namespace, name, cats[0], catLabels[0], cats[1], catLabels[1], "1.23")) + s = append(s, fmt.Sprintf("%s_%s_count{%s=\"%s\",%s=\"%s\"} %d", namespace, name, cats[0], catLabels[0], cats[1], catLabels[1], 3)) + + for _, line := range s { + if !strings.Contains(response.Body.String(), line) { + t.Fatalf("Expected result to contain %s, got %s", line, response.Body.String()) + } + } +} + +func TestPrometheusMultiTimings_PanicWrongLength(t *testing.T) { + defer func() { + if r := recover(); r == nil { + t.Errorf("The code did not panic when adding to inequal label lengths") + } + }() + + c := stats.NewMultiTimings("name", "help", []string{"label1", "label2"}) + c.Add([]string{"label1"}, time.Duration(100000000)) +} + +func TestPrometheusHistogram(t *testing.T) { + name := "blah_hist" + hist := stats.NewHistogram(name, "help", []int64{1, 5, 10}) + hist.Add(2) + hist.Add(3) + hist.Add(6) + + response := testMetricsHandler(t) + var s []string + + s = append(s, fmt.Sprintf("%s_%s_bucket{le=\"1\"} %d", namespace, name, 0)) + s = append(s, fmt.Sprintf("%s_%s_bucket{le=\"5\"} %d", namespace, name, 2)) + s = append(s, fmt.Sprintf("%s_%s_bucket{le=\"10\"} %d", namespace, name, 3)) + s = append(s, fmt.Sprintf("%s_%s_sum %d", namespace, name, 1)) + s = append(s, fmt.Sprintf("%s_%s_count %d", namespace, name, 3)) + + for _, line := range s { + if !strings.Contains(response.Body.String(), line) { + t.Fatalf("Expected result to contain %s, got %s", line, response.Body.String()) + } + } +} + +func testMetricsHandler(t *testing.T) *httptest.ResponseRecorder { + req, _ := http.NewRequest("GET", "/metrics", nil) + response := httptest.NewRecorder() + + promhttp.Handler().ServeHTTP(response, req) + return response +} + +func TestMain(m *testing.M) { + Init(namespace) + os.Exit(m.Run()) +} diff --git a/pkg/metrics/stats/snake_case_converter.go b/pkg/metrics/stats/snake_case_converter.go new file mode 100644 index 00000000..227c0687 --- /dev/null +++ b/pkg/metrics/stats/snake_case_converter.go @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package stats + +import ( + "regexp" + "strings" +) + +// GetSnakeName calls toSnakeName on the passed in string. It produces +// a snake-cased name from the provided camel-cased name. +// It memoizes the transformation and returns the stored result if available. +func GetSnakeName(name string) string { + return toSnakeCase(name) +} + +// toSnakeCase produces a monitoring compliant name from the original. +// For systems (like Prometheus) that ask for snake-case names. +// It converts CamelCase to camel_case, and CAMEL_CASE to camel_case. +// For numbers, it converts 0.5 to v0_5. +func toSnakeCase(name string) (hyphenated string) { + snakeMemoizer.Lock() + defer snakeMemoizer.Unlock() + if hyphenated = snakeMemoizer.memo[name]; hyphenated != "" { + return hyphenated + } + hyphenated = name + for _, converter := range snakeConverters { + hyphenated = converter.re.ReplaceAllString(hyphenated, converter.repl) + } + hyphenated = strings.ToLower(hyphenated) + snakeMemoizer.memo[name] = hyphenated + return +} + +var snakeConverters = []struct { + re *regexp.Regexp + repl string +}{ + // example: LC -> L_C (e.g. CamelCase -> Camel_Case). + {regexp.MustCompile("([a-z])([A-Z])"), "${1}_${2}"}, + // example: CCa -> C_Ca (e.g. CCamel -> C_Camel). + {regexp.MustCompile("([A-Z])([A-Z][a-z])"), "${1}_${2}"}, + {regexp.MustCompile("\\."), "_"}, + {regexp.MustCompile("-"), "_"}, +} + +var snakeMemoizer = memoizerType{ + memo: make(map[string]string), +} diff --git a/pkg/metrics/stats/timings.go b/pkg/metrics/stats/timings.go new file mode 100644 index 00000000..799ed5db --- /dev/null +++ b/pkg/metrics/stats/timings.go @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package stats + +import ( + "encoding/json" + "fmt" + "strings" + "sync" + "time" +) + +import ( + "github.com/arana-db/arana/pkg/util/sync2" +) + +// Timings is meant to tracks timing data +// by named categories as well as histograms. +type Timings struct { + totalCount sync2.AtomicInt64 + totalTime sync2.AtomicInt64 + + // mu protects get and set of hook and the map. + // Modification to the value in the map is not protected. + mu sync.RWMutex + histograms map[string]*Histogram + hook func(string, time.Duration) + help string + label string +} + +// NewTimings creates a new Timings object, and publishes it if name is set. +// categories is an optional list of categories to initialize to 0. +// Categories that aren't initialized will be missing from the map until the +// first time they are updated. +func NewTimings(name, help, label string, categories ...string) *Timings { + t := &Timings{ + histograms: make(map[string]*Histogram), + help: help, + label: label, + } + for _, cat := range categories { + t.histograms[cat] = NewGenericHistogram("", "", bucketCutoffs, bucketLabels, "Count", "Time") + } + if name != "" { + publish(name, t) + } + + return t +} + +// Add will add a new value to the named histogram. +func (t *Timings) Add(name string, elapsed time.Duration) { + // Get existing Histogram. + t.mu.RLock() + hist, ok := t.histograms[name] + hook := t.hook + t.mu.RUnlock() + + // Create Histogram if it does not exist. + if !ok { + t.mu.Lock() + hist, ok = t.histograms[name] + if !ok { + hist = NewGenericHistogram("", "", bucketCutoffs, bucketLabels, "Count", "Time") + t.histograms[name] = hist + } + t.mu.Unlock() + } + + elapsedNs := int64(elapsed) + hist.Add(elapsedNs) + t.totalCount.Add(1) + t.totalTime.Add(elapsedNs) + if hook != nil { + hook(name, elapsed) + } +} + +// Record is a convenience function that records completion +// timing data based on the provided start time of an event. +func (t *Timings) Record(name string, startTime time.Time) { + t.Add(name, time.Now().Sub(startTime)) +} + +// String is for expvar. +func (t *Timings) String() string { + t.mu.RLock() + defer t.mu.RUnlock() + + tm := struct { + TotalCount int64 + TotalTime int64 + Histograms map[string]*Histogram + }{ + t.totalCount.Get(), + t.totalTime.Get(), + t.histograms, + } + + data, err := json.Marshal(tm) + if err != nil { + data, _ = json.Marshal(err.Error()) + } + return string(data) +} + +// Histograms returns a map pointing at the histograms. +func (t *Timings) Histograms() (h map[string]*Histogram) { + t.mu.RLock() + defer t.mu.RUnlock() + h = make(map[string]*Histogram, len(t.histograms)) + for k, v := range t.histograms { + h[k] = v + } + return +} + +// Count returns the total count for all values. +func (t *Timings) Count() int64 { + return t.totalCount.Get() +} + +// Time returns the total time elapsed for all values. +func (t *Timings) Time() int64 { + return t.totalTime.Get() +} + +// Counts returns the total count for each value. +func (t *Timings) Counts() map[string]int64 { + t.mu.RLock() + defer t.mu.RUnlock() + + counts := make(map[string]int64, len(t.histograms)+1) + for k, v := range t.histograms { + counts[k] = v.Count() + } + counts["All"] = t.totalCount.Get() + return counts +} + +// Cutoffs returns the cutoffs used in the component histograms. +// Do not change the returned slice. +func (t *Timings) Cutoffs() []int64 { + return bucketCutoffs +} + +// Help returns the help string. +func (t *Timings) Help() string { + return t.help +} + +// Label returns the label name. +func (t *Timings) Label() string { + return t.label +} + +var bucketCutoffs = []int64{5e5, 1e6, 5e6, 1e7, 5e7, 1e8, 5e8, 1e9, 5e9, 1e10} + +var bucketLabels []string + +func init() { + bucketLabels = make([]string, len(bucketCutoffs)+1) + for i, v := range bucketCutoffs { + bucketLabels[i] = fmt.Sprintf("%d", v) + } + bucketLabels[len(bucketLabels)-1] = "inf" +} + +// MultiTimings is meant to tracks timing data by categories as well +// as histograms. The names of the categories are compound names made +// with joining multiple strings with '.'. +type MultiTimings struct { + Timings + labels []string +} + +// NewMultiTimings creates a new MultiTimings object. +func NewMultiTimings(name string, help string, labels []string) *MultiTimings { + t := &MultiTimings{ + Timings: Timings{ + histograms: make(map[string]*Histogram), + help: help, + }, + labels: labels, + } + if name != "" { + publish(name, t) + } + + return t +} + +// Labels returns descriptions of the parts of each compound category name. +func (mt *MultiTimings) Labels() []string { + return mt.labels +} + +// safeJoinLabels joins the label values with ".", but first replaces any existing +// "." characters in the labels with the proper replacement, to avoid issues parsing +// them apart later. +func safeJoinLabels(labels []string) string { + sanitizedLabels := make([]string, len(labels)) + for idx, label := range labels { + sanitizedLabels[idx] = safeLabel(label) + } + return strings.Join(sanitizedLabels, ".") +} + +// Add will add a new value to the named histogram. +func (mt *MultiTimings) Add(names []string, elapsed time.Duration) { + if len(names) != len(mt.labels) { + panic("MultiTimings: wrong number of values in Add") + } + mt.Timings.Add(safeJoinLabels(names), elapsed) +} + +// Record is a convenience function that records completion +// timing data based on the provided start time of an event. +func (mt *MultiTimings) Record(names []string, startTime time.Time) { + if len(names) != len(mt.labels) { + panic("MultiTimings: wrong number of values in Record") + } + mt.Timings.Record(safeJoinLabels(names), startTime) +} + +// Cutoffs returns the cutoffs used in the component histograms. +// Do not change the returned slice. +func (mt *MultiTimings) Cutoffs() []int64 { + return bucketCutoffs +} diff --git a/pkg/metrics/stats/timings_test.go b/pkg/metrics/stats/timings_test.go new file mode 100644 index 00000000..594a6718 --- /dev/null +++ b/pkg/metrics/stats/timings_test.go @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package stats + +import ( + "expvar" + "strings" + "testing" + "time" +) + +func TestTimings(t *testing.T) { + clear() + tm := NewTimings("timings1", "help", "category") + tm.Add("tag1", 500*time.Microsecond) + tm.Add("tag1", 1*time.Millisecond) + tm.Add("tag2", 1*time.Millisecond) + want := `{"TotalCount":3,"TotalTime":2500000,"Histograms":{"tag1":{"500000":1,"1000000":2,"5000000":2,"10000000":2,"50000000":2,"100000000":2,"500000000":2,"1000000000":2,"5000000000":2,"10000000000":2,"inf":2,"Count":2,"Time":1500000},"tag2":{"500000":0,"1000000":1,"5000000":1,"10000000":1,"50000000":1,"100000000":1,"500000000":1,"1000000000":1,"5000000000":1,"10000000000":1,"inf":1,"Count":1,"Time":1000000}}}` + if got := tm.String(); got != want { + t.Errorf("got %s, want %s", got, want) + } +} + +func TestMultiTimings(t *testing.T) { + clear() + mtm := NewMultiTimings("maptimings1", "help", []string{"dim1", "dim2"}) + mtm.Add([]string{"tag1a", "tag1b"}, 500*time.Microsecond) + mtm.Add([]string{"tag1a", "tag1b"}, 1*time.Millisecond) + mtm.Add([]string{"tag2a", "tag2b"}, 1*time.Millisecond) + want := `{"TotalCount":3,"TotalTime":2500000,"Histograms":{"tag1a.tag1b":{"500000":1,"1000000":2,"5000000":2,"10000000":2,"50000000":2,"100000000":2,"500000000":2,"1000000000":2,"5000000000":2,"10000000000":2,"inf":2,"Count":2,"Time":1500000},"tag2a.tag2b":{"500000":0,"1000000":1,"5000000":1,"10000000":1,"50000000":1,"100000000":1,"500000000":1,"1000000000":1,"5000000000":1,"10000000000":1,"inf":1,"Count":1,"Time":1000000}}}` + if got := mtm.String(); got != want { + t.Errorf("got %s, want %s", got, want) + } +} + +func TestMultiTimingsDot(t *testing.T) { + clear() + mtm := NewMultiTimings("maptimings2", "help", []string{"label"}) + mtm.Add([]string{"value.dot"}, 500*time.Microsecond) + safe := safeLabel("value.dot") + safeJSON := strings.Replace(safe, "\\", "\\\\", -1) + want := `{"TotalCount":1,"TotalTime":500000,"Histograms":{"` + safeJSON + `":{"500000":1,"1000000":1,"5000000":1,"10000000":1,"50000000":1,"100000000":1,"500000000":1,"1000000000":1,"5000000000":1,"10000000000":1,"inf":1,"Count":1,"Time":500000}}}` + if got := mtm.String(); got != want { + t.Errorf("got %s, want %s", got, want) + } +} + +func TestTimingsHook(t *testing.T) { + var gotname string + var gotv *Timings + clear() + Register(func(name string, v expvar.Var) { + gotname = name + gotv = v.(*Timings) + }) + + name := "timings2" + v := NewTimings(name, "help", "") + if gotname != name { + t.Errorf("got %q, want %q", gotname, name) + } + if gotv != v { + t.Errorf("got %#v, want %#v", gotv, v) + } +} diff --git a/pkg/metrics/stats/util.go b/pkg/metrics/stats/util.go new file mode 100644 index 00000000..af1f2be9 --- /dev/null +++ b/pkg/metrics/stats/util.go @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package stats + +import "strings" + +// safeLabel turns a label into a safe label for stats export. +// It is in its own file so it can be customized. +func safeLabel(label string) string { + return strings.Replace(label, ".", "_", -1) +} diff --git a/pkg/metrics/stats/variable_interface.go b/pkg/metrics/stats/variable_interface.go new file mode 100644 index 00000000..2c015b6e --- /dev/null +++ b/pkg/metrics/stats/variable_interface.go @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package stats + +// Variable is the minimal interface which each type in this "stats" package +// must implement. +// When integrating the Vitess stats types ("variables") with the different +// monitoring systems, you can rely on this interface. +type Variable interface { + // Help returns the description of the variable. + Help() string + + // String must implement String() from the expvar.Var interface. + String() string +} diff --git a/pkg/util/sync2/atomic.go b/pkg/util/sync2/atomic.go new file mode 100644 index 00000000..d5aaeda4 --- /dev/null +++ b/pkg/util/sync2/atomic.go @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package sync2 + +import ( + "sync" + "sync/atomic" + "time" +) + +// AtomicInt32 is a wrapper with a simpler interface around atomic.(Add|Store|Load|CompareAndSwap)Int32 functions. +type AtomicInt32 struct { + int32 +} + +// NewAtomicInt32 initializes a new AtomicInt32 with a given value. +func NewAtomicInt32(n int32) AtomicInt32 { + return AtomicInt32{n} +} + +// Add atomically adds n to the value. +func (i *AtomicInt32) Add(n int32) int32 { + return atomic.AddInt32(&i.int32, n) +} + +// Set atomically sets n as new value. +func (i *AtomicInt32) Set(n int32) { + atomic.StoreInt32(&i.int32, n) +} + +// Get atomically returns the current value. +func (i *AtomicInt32) Get() int32 { + return atomic.LoadInt32(&i.int32) +} + +// CompareAndSwap atomatically swaps the old with the new value. +func (i *AtomicInt32) CompareAndSwap(oldval, newval int32) (swapped bool) { + return atomic.CompareAndSwapInt32(&i.int32, oldval, newval) +} + +// AtomicInt64 is a wrapper with a simpler interface around atomic.(Add|Store|Load|CompareAndSwap)Int64 functions. +type AtomicInt64 struct { + int64 +} + +// NewAtomicInt64 initializes a new AtomicInt64 with a given value. +func NewAtomicInt64(n int64) AtomicInt64 { + return AtomicInt64{n} +} + +// Add atomically adds n to the value. +func (i *AtomicInt64) Add(n int64) int64 { + return atomic.AddInt64(&i.int64, n) +} + +// Set atomically sets n as new value. +func (i *AtomicInt64) Set(n int64) { + atomic.StoreInt64(&i.int64, n) +} + +// Get atomically returns the current value. +func (i *AtomicInt64) Get() int64 { + return atomic.LoadInt64(&i.int64) +} + +// CompareAndSwap atomatically swaps the old with the new value. +func (i *AtomicInt64) CompareAndSwap(oldval, newval int64) (swapped bool) { + return atomic.CompareAndSwapInt64(&i.int64, oldval, newval) +} + +// AtomicDuration is a wrapper with a simpler interface around atomic.(Add|Store|Load|CompareAndSwap)Int64 functions. +type AtomicDuration struct { + int64 +} + +// NewAtomicDuration initializes a new AtomicDuration with a given value. +func NewAtomicDuration(duration time.Duration) AtomicDuration { + return AtomicDuration{int64(duration)} +} + +// Add atomically adds duration to the value. +func (d *AtomicDuration) Add(duration time.Duration) time.Duration { + return time.Duration(atomic.AddInt64(&d.int64, int64(duration))) +} + +// Set atomically sets duration as new value. +func (d *AtomicDuration) Set(duration time.Duration) { + atomic.StoreInt64(&d.int64, int64(duration)) +} + +// Get atomically returns the current value. +func (d *AtomicDuration) Get() time.Duration { + return time.Duration(atomic.LoadInt64(&d.int64)) +} + +// CompareAndSwap atomatically swaps the old with the new value. +func (d *AtomicDuration) CompareAndSwap(oldval, newval time.Duration) (swapped bool) { + return atomic.CompareAndSwapInt64(&d.int64, int64(oldval), int64(newval)) +} + +// AtomicBool gives an atomic boolean variable. +type AtomicBool struct { + int32 +} + +// NewAtomicBool initializes a new AtomicBool with a given value. +func NewAtomicBool(n bool) AtomicBool { + if n { + return AtomicBool{1} + } + return AtomicBool{0} +} + +// Set atomically sets n as new value. +func (i *AtomicBool) Set(n bool) { + if n { + atomic.StoreInt32(&i.int32, 1) + } else { + atomic.StoreInt32(&i.int32, 0) + } +} + +// Get atomically returns the current value. +func (i *AtomicBool) Get() bool { + return atomic.LoadInt32(&i.int32) != 0 +} + +// CompareAndSwap atomatically swaps the old with the new value. +func (i *AtomicBool) CompareAndSwap(o, n bool) bool { + var old, new int32 + if o { + old = 1 + } + if n { + new = 1 + } + return atomic.CompareAndSwapInt32(&i.int32, old, new) +} + +// AtomicString gives you atomic-style APIs for string, but +// it's only a convenience wrapper that uses a mutex. So, it's +// not as efficient as the rest of the atomic types. +type AtomicString struct { + mu sync.Mutex + str string +} + +// Set atomically sets str as new value. +func (s *AtomicString) Set(str string) { + s.mu.Lock() + s.str = str + s.mu.Unlock() +} + +// Get atomically returns the current value. +func (s *AtomicString) Get() string { + s.mu.Lock() + str := s.str + s.mu.Unlock() + return str +} + +// CompareAndSwap atomatically swaps the old with the new value. +func (s *AtomicString) CompareAndSwap(oldval, newval string) (swqpped bool) { + s.mu.Lock() + defer s.mu.Unlock() + if s.str == oldval { + s.str = newval + return true + } + return false +} diff --git a/pkg/util/sync2/atomic_test.go b/pkg/util/sync2/atomic_test.go new file mode 100644 index 00000000..6dfc2e9d --- /dev/null +++ b/pkg/util/sync2/atomic_test.go @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package sync2 + +import ( + "testing" +) + +func TestAtomicString(t *testing.T) { + var s AtomicString + if s.Get() != "" { + t.Errorf("want empty, got %s", s.Get()) + } + s.Set("a") + if s.Get() != "a" { + t.Errorf("want a, got %s", s.Get()) + } + if s.CompareAndSwap("b", "c") { + t.Errorf("want false, got true") + } + if s.Get() != "a" { + t.Errorf("want a, got %s", s.Get()) + } + if !s.CompareAndSwap("a", "c") { + t.Errorf("want true, got false") + } + if s.Get() != "c" { + t.Errorf("want c, got %s", s.Get()) + } +} + +func TestAtomicBool(t *testing.T) { + b := NewAtomicBool(true) + if !b.Get() { + t.Error("b.Get: false, want true") + } + b.Set(false) + if b.Get() { + t.Error("b.Get: true, want false") + } + b.Set(true) + if !b.Get() { + t.Error("b.Get: false, want true") + } +} From 33048c8f44f9e55d9beff28a6e3cfd7ae0802a13 Mon Sep 17 00:00:00 2001 From: gongna <2036479155@qq.com> Date: Thu, 5 Oct 2023 12:55:32 +0800 Subject: [PATCH 02/12] fix: Change field properties name --- pkg/metrics/manager/manager.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/pkg/metrics/manager/manager.go b/pkg/metrics/manager/manager.go index 395144a3..a64d098b 100644 --- a/pkg/metrics/manager/manager.go +++ b/pkg/metrics/manager/manager.go @@ -57,8 +57,8 @@ const ( type SQLResponse struct { ns string sqlExecTimeRecordSwitch bool - sQLExecTimeChan chan string - sQLTimeList []float64 + sqlExecTimeChan chan string + sqlTimeList []float64 response99Max map[string]float64 // map[backendAddr]P99MaxValue response99Avg map[string]float64 // map[backendAddr]P99AvgValue response95Max map[string]float64 // map[backendAddr]P95MaxValue @@ -69,8 +69,8 @@ func NewSQLResponse(name string) *SQLResponse { return &SQLResponse{ ns: name, sqlExecTimeRecordSwitch: false, - sQLExecTimeChan: make(chan string, SQLExecTimeSize), - sQLTimeList: []float64{}, + sqlExecTimeChan: make(chan string, SQLExecTimeSize), + sqlTimeList: []float64{}, response99Max: make(map[string]float64), response99Avg: make(map[string]float64), response95Max: make(map[string]float64), @@ -243,7 +243,7 @@ func (s *StatisticManager) recordBackendSQLTiming(namespace string, operation st } execTime := float64(time.Since(startTime).Milliseconds()) select { - case s.SQLResponsePercentile[namespace].sQLExecTimeChan <- fmt.Sprintf(sliceName + "__" + backendAddr + "__" + fmt.Sprintf("%f", execTime)): + case s.SQLResponsePercentile[namespace].sqlExecTimeChan <- fmt.Sprintf(sliceName + "__" + backendAddr + "__" + fmt.Sprintf("%f", execTime)): case <-time.After(time.Millisecond): s.SQLResponsePercentile[namespace].sqlExecTimeRecordSwitch = false } @@ -349,7 +349,7 @@ func (s *StatisticManager) CalcAvgSQLTimes() { backendAddr := "" for !quit { select { - case tmp := <-sQLResponse.sQLExecTimeChan: + case tmp := <-sQLResponse.sqlExecTimeChan: if len(sqlTimes) >= SQLExecTimeSize { quit = true } From 29d569e3933da16cd63f61993364b9edf282b398 Mon Sep 17 00:00:00 2001 From: gongna <2036479155@qq.com> Date: Thu, 5 Oct 2023 13:18:47 +0800 Subject: [PATCH 03/12] Fix: Resolve test failures due to BootOptions field changes This commit addresses the issue of failing tests caused by modifications to the BootOptions fields. The tests have been updated to align with the new structure, ensuring they pass as expected. --- conf/bootstrap.docker.yaml | 4 + conf/bootstrap.local-etcd.yaml | 4 + pkg/config/misc_test.go | 5 +- pkg/metrics/stats/duration_test.go | 126 +++++++++++++++++++++++++++++ testdata/fake_bootstrap.yaml | 4 + 5 files changed, 141 insertions(+), 2 deletions(-) create mode 100644 pkg/metrics/stats/duration_test.go diff --git a/conf/bootstrap.docker.yaml b/conf/bootstrap.docker.yaml index a19b28e2..e79e1ad8 100644 --- a/conf/bootstrap.docker.yaml +++ b/conf/bootstrap.docker.yaml @@ -43,3 +43,7 @@ logging: max_age: 7 compress: true console: true + +stats: + service: "TestService" + stats_enable: false \ No newline at end of file diff --git a/conf/bootstrap.local-etcd.yaml b/conf/bootstrap.local-etcd.yaml index bc45a261..1ab22586 100644 --- a/conf/bootstrap.local-etcd.yaml +++ b/conf/bootstrap.local-etcd.yaml @@ -39,3 +39,7 @@ logging: max_age: 30 compress: false console: true + +stats: + service: "TestService" + stats_enable: false \ No newline at end of file diff --git a/pkg/config/misc_test.go b/pkg/config/misc_test.go index 68a6e6a8..1a486888 100644 --- a/pkg/config/misc_test.go +++ b/pkg/config/misc_test.go @@ -21,9 +21,7 @@ package config import ( "os" "testing" -) -import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -58,6 +56,9 @@ logging: max_age: 7 compress: true console: true +stats: + service: "TestService" + stats_enable: false ` _, err = tmpfile.WriteString(text) require.NoError(t, err) diff --git a/pkg/metrics/stats/duration_test.go b/pkg/metrics/stats/duration_test.go new file mode 100644 index 00000000..7e1921e6 --- /dev/null +++ b/pkg/metrics/stats/duration_test.go @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package stats + +import ( + "expvar" + "testing" + "time" +) + +func TestCounterDuration(t *testing.T) { + var gotname string + var gotv *CounterDuration + clear() + Register(func(name string, v expvar.Var) { + gotname = name + gotv = v.(*CounterDuration) + }) + v := NewCounterDuration("CounterDuration", "help") + if gotname != "CounterDuration" { + t.Errorf("want CounterDuration, got %s", gotname) + } + if gotv != v { + t.Errorf("want %#v, got %#v", v, gotv) + } + if v.Get() != 0 { + t.Errorf("want 0, got %v", v.Get()) + } + v.Add(time.Duration(1)) + if v.Get() != 1 { + t.Errorf("want 1, got %v", v.Get()) + } + if v.String() != "1" { + t.Errorf("want 1, got %v", v.Get()) + } +} + +func TestCounterDurationFunc(t *testing.T) { + var gotname string + var gotv *CounterDurationFunc + clear() + Register(func(name string, v expvar.Var) { + gotname = name + gotv = v.(*CounterDurationFunc) + }) + + v := NewCounterDurationFunc("CounterDurationFunc", "help", func() time.Duration { + return time.Duration(1) + }) + if gotname != "CounterDurationFunc" { + t.Errorf("want CounterDurationFunc, got %s", gotname) + } + if gotv != v { + t.Errorf("want %#v, got %#v", v, gotv) + } + if v.String() != "1" { + t.Errorf("want 1, got %v", v.String()) + } +} + +func TestGaugeDuration(t *testing.T) { + var gotname string + var gotv *GaugeDuration + clear() + Register(func(name string, v expvar.Var) { + gotname = name + gotv = v.(*GaugeDuration) + }) + v := NewGaugeDuration("GaugeDuration", "help") + if gotname != "GaugeDuration" { + t.Errorf("want GaugeDuration, got %s", gotname) + } + if gotv != v { + t.Errorf("want %#v, got %#v", v, gotv) + } + v.Set(time.Duration(5)) + if v.Get() != 5 { + t.Errorf("want 5, got %v", v.Get()) + } + v.Add(time.Duration(1)) + if v.Get() != 6 { + t.Errorf("want 6, got %v", v.Get()) + } + if v.String() != "6" { + t.Errorf("want 6, got %v", v.Get()) + } +} + +func TestGaugeDurationFunc(t *testing.T) { + var gotname string + var gotv *GaugeDurationFunc + clear() + Register(func(name string, v expvar.Var) { + gotname = name + gotv = v.(*GaugeDurationFunc) + }) + + v := NewGaugeDurationFunc("GaugeDurationFunc", "help", func() time.Duration { + return time.Duration(1) + }) + + if gotname != "GaugeDurationFunc" { + t.Errorf("want GaugeDurationFunc, got %s", gotname) + } + if gotv != v { + t.Errorf("want %#v, got %#v", v, gotv) + } + if v.String() != "1" { + t.Errorf("want 1, got %v", v.String()) + } +} diff --git a/testdata/fake_bootstrap.yaml b/testdata/fake_bootstrap.yaml index c7bad58f..b2c16166 100644 --- a/testdata/fake_bootstrap.yaml +++ b/testdata/fake_bootstrap.yaml @@ -37,3 +37,7 @@ logging: max_age: 7 compress: true console: true + +stats: + service: "TestService" + stats_enable: false \ No newline at end of file From 0efbc1bd66eb0c73981e295dc4df6030c57bff36 Mon Sep 17 00:00:00 2001 From: gongna <2036479155@qq.com> Date: Thu, 5 Oct 2023 13:25:00 +0800 Subject: [PATCH 04/12] fix:change import style --- pkg/config/misc_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/config/misc_test.go b/pkg/config/misc_test.go index 1a486888..231843a4 100644 --- a/pkg/config/misc_test.go +++ b/pkg/config/misc_test.go @@ -21,7 +21,9 @@ package config import ( "os" "testing" +) +import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) From 0f0100fcfa0fc11c83b4f174e6ba03a7d7ad90b2 Mon Sep 17 00:00:00 2001 From: gongna <2036479155@qq.com> Date: Thu, 5 Oct 2023 13:33:59 +0800 Subject: [PATCH 05/12] fix:Use raw string when regexp.MustCompile to avoid unnecessary escaping. --- pkg/metrics/stats/kebab_case_converter.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/metrics/stats/kebab_case_converter.go b/pkg/metrics/stats/kebab_case_converter.go index bcdfd4a1..058316af 100644 --- a/pkg/metrics/stats/kebab_case_converter.go +++ b/pkg/metrics/stats/kebab_case_converter.go @@ -47,11 +47,11 @@ var kebabConverters = []struct { repl string }{ // example: LC -> L-C (e.g. CamelCase -> Camel-Case). - {regexp.MustCompile("([a-z])([A-Z])"), "$1-$2"}, + {regexp.MustCompile(`([a-z])([A-Z])`), "$1-$2"}, // example: CCa -> C-Ca (e.g. CCamel -> C-Camel). - {regexp.MustCompile("([A-Z])([A-Z][a-z])"), "$1-$2"}, - {regexp.MustCompile("_"), "-"}, - {regexp.MustCompile("\\."), "_"}, + {regexp.MustCompile(`([A-Z])([A-Z][a-z])`), "$1-$2"}, + {regexp.MustCompile(`_`), "-"}, + {regexp.MustCompile(`\.`), "_"}, } var memoizer = memoizerType{ From 0f2cab0d4b8e25f254f04f9f9bb708b4d1303281 Mon Sep 17 00:00:00 2001 From: gongna <2036479155@qq.com> Date: Thu, 5 Oct 2023 13:37:22 +0800 Subject: [PATCH 06/12] fix:use time.Since instead of time.Now().Sub --- pkg/metrics/stats/timings.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/metrics/stats/timings.go b/pkg/metrics/stats/timings.go index 799ed5db..105099f3 100644 --- a/pkg/metrics/stats/timings.go +++ b/pkg/metrics/stats/timings.go @@ -95,7 +95,7 @@ func (t *Timings) Add(name string, elapsed time.Duration) { // Record is a convenience function that records completion // timing data based on the provided start time of an event. func (t *Timings) Record(name string, startTime time.Time) { - t.Add(name, time.Now().Sub(startTime)) + t.Add(name, time.Since(startTime)) } // String is for expvar. From 49a02056885a2b7a487c9a1b9a16d17c10a675a8 Mon Sep 17 00:00:00 2001 From: gongna <2036479155@qq.com> Date: Thu, 5 Oct 2023 13:40:06 +0800 Subject: [PATCH 07/12] fix:add a newline to .yaml from the command line --- conf/bootstrap.docker.yaml | 2 +- conf/bootstrap.local-etcd.yaml | 2 +- testdata/fake_bootstrap.yaml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/conf/bootstrap.docker.yaml b/conf/bootstrap.docker.yaml index e79e1ad8..91689825 100644 --- a/conf/bootstrap.docker.yaml +++ b/conf/bootstrap.docker.yaml @@ -46,4 +46,4 @@ logging: stats: service: "TestService" - stats_enable: false \ No newline at end of file + stats_enable: false diff --git a/conf/bootstrap.local-etcd.yaml b/conf/bootstrap.local-etcd.yaml index 1ab22586..716c5bf2 100644 --- a/conf/bootstrap.local-etcd.yaml +++ b/conf/bootstrap.local-etcd.yaml @@ -42,4 +42,4 @@ logging: stats: service: "TestService" - stats_enable: false \ No newline at end of file + stats_enable: false diff --git a/testdata/fake_bootstrap.yaml b/testdata/fake_bootstrap.yaml index b2c16166..a0cb4d72 100644 --- a/testdata/fake_bootstrap.yaml +++ b/testdata/fake_bootstrap.yaml @@ -40,4 +40,4 @@ logging: stats: service: "TestService" - stats_enable: false \ No newline at end of file + stats_enable: false From 80f9681209308ca4e3b0b3ba2daacef8d2b1eb6e Mon Sep 17 00:00:00 2001 From: gongna <2036479155@qq.com> Date: Thu, 5 Oct 2023 13:50:21 +0800 Subject: [PATCH 08/12] add: snakecese converter --- pkg/metrics/stats/snake_case_converter.go | 10 ++-- .../stats/snake_case_converter_test.go | 53 +++++++++++++++++++ 2 files changed, 57 insertions(+), 6 deletions(-) create mode 100644 pkg/metrics/stats/snake_case_converter_test.go diff --git a/pkg/metrics/stats/snake_case_converter.go b/pkg/metrics/stats/snake_case_converter.go index 227c0687..f2653db5 100644 --- a/pkg/metrics/stats/snake_case_converter.go +++ b/pkg/metrics/stats/snake_case_converter.go @@ -52,12 +52,10 @@ var snakeConverters = []struct { re *regexp.Regexp repl string }{ - // example: LC -> L_C (e.g. CamelCase -> Camel_Case). - {regexp.MustCompile("([a-z])([A-Z])"), "${1}_${2}"}, - // example: CCa -> C_Ca (e.g. CCamel -> C_Camel). - {regexp.MustCompile("([A-Z])([A-Z][a-z])"), "${1}_${2}"}, - {regexp.MustCompile("\\."), "_"}, - {regexp.MustCompile("-"), "_"}, + {regexp.MustCompile(`([a-z])([A-Z])`), "${1}_${2}"}, + {regexp.MustCompile(`([A-Z])([A-Z][a-z])`), "${1}_${2}"}, + {regexp.MustCompile(`\.`), "_"}, + {regexp.MustCompile(`-`), "_"}, } var snakeMemoizer = memoizerType{ diff --git a/pkg/metrics/stats/snake_case_converter_test.go b/pkg/metrics/stats/snake_case_converter_test.go new file mode 100644 index 00000000..f3d16459 --- /dev/null +++ b/pkg/metrics/stats/snake_case_converter_test.go @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package stats + +import "testing" + +func TestToSnakeCase(t *testing.T) { + var snakeCaseTest = []struct{ input, output string }{ + {"Camel", "camel"}, + {"Camel", "camel"}, + {"CamelCase", "camel_case"}, + {"CamelCaseAgain", "camel_case_again"}, + {"CCamel", "c_camel"}, + {"CCCamel", "cc_camel"}, + {"CAMEL_CASE", "camel_case"}, + {"camel-case", "camel_case"}, + {"0", "0"}, + {"0.0", "0_0"}, + {"JSON", "json"}, + } + + for _, tt := range snakeCaseTest { + if got, want := toSnakeCase(tt.input), tt.output; got != want { + t.Errorf("want '%s', got '%s'", want, got) + } + } +} + +func TestSnakeMemoize(t *testing.T) { + key := "Test" + if snakeMemoizer.memo[key] != "" { + t.Errorf("want '', got '%s'", snakeMemoizer.memo[key]) + } + toSnakeCase(key) + if snakeMemoizer.memo[key] != "test" { + t.Errorf("want 'test', got '%s'", snakeMemoizer.memo[key]) + } +} From e2f96fbb393426e0fb29e73bc73349dce6b75ede Mon Sep 17 00:00:00 2001 From: gongna <2036479155@qq.com> Date: Thu, 5 Oct 2023 13:59:02 +0800 Subject: [PATCH 09/12] fix:use raw string () with regexp.MustCompile to avoid having to escape twice --- pkg/metrics/stats/snake_case_converter.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/metrics/stats/snake_case_converter.go b/pkg/metrics/stats/snake_case_converter.go index f2653db5..11d8da26 100644 --- a/pkg/metrics/stats/snake_case_converter.go +++ b/pkg/metrics/stats/snake_case_converter.go @@ -52,6 +52,7 @@ var snakeConverters = []struct { re *regexp.Regexp repl string }{ + // example: LC -> L_C (e.g. CamelCase -> Camel_Case). {regexp.MustCompile(`([a-z])([A-Z])`), "${1}_${2}"}, {regexp.MustCompile(`([A-Z])([A-Z][a-z])`), "${1}_${2}"}, {regexp.MustCompile(`\.`), "_"}, From 7cda02dedcd85e4d8c54ae53e3fd5d0fdad5d5fa Mon Sep 17 00:00:00 2001 From: gongna <2036479155@qq.com> Date: Thu, 5 Oct 2023 14:11:02 +0800 Subject: [PATCH 10/12] fix:Unnecessary assignment to blank identifier --- pkg/metrics/manager/manager.go | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/pkg/metrics/manager/manager.go b/pkg/metrics/manager/manager.go index a64d098b..ee7e1ada 100644 --- a/pkg/metrics/manager/manager.go +++ b/pkg/metrics/manager/manager.go @@ -25,17 +25,12 @@ import ( "strings" "sync" "time" -) - -import ( - "go.uber.org/atomic" -) -import ( "github.com/arana-db/arana/pkg/config" "github.com/arana-db/arana/pkg/metrics/stats" "github.com/arana-db/arana/pkg/metrics/stats/prometheus" "github.com/arana-db/arana/pkg/util/log" + "go.uber.org/atomic" ) const ( @@ -119,7 +114,8 @@ type StatisticManager struct { // Init init StatisticManager func (s *StatisticManager) Init() error { s.startTime = time.Now().Unix() - s.closeChan = make(chan bool, 0) + s.closeChan = make(chan bool) + s.handlers = make(map[string]http.Handler) bootOptions, err := config.LoadBootOptions(s.bootstrapPath) if err != nil { @@ -373,7 +369,7 @@ func (s *StatisticManager) CalcAvgSQLTimes() { s.SQLResponsePercentile[ns].response99Max[backendAddr] = sqlTimes[(len(sqlTimes)-1)*99/100] s.SQLResponsePercentile[ns].response95Max[backendAddr] = sqlTimes[(len(sqlTimes)-1)*95/100] } - for k, _ := range sqlTimes { + for k := range sqlTimes { sum += sqlTimes[k] if k < len(sqlTimes)*95/100 { p95sum += sqlTimes[k] From 49dbf00b60e35bb9aa8d4802fdd031862cefb331 Mon Sep 17 00:00:00 2001 From: gongna <2036479155@qq.com> Date: Thu, 5 Oct 2023 14:33:32 +0800 Subject: [PATCH 11/12] fix: change fmt style --- conf/bootstrap.yaml | 4 ++++ test/integration_test.go | 9 +++++---- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/conf/bootstrap.yaml b/conf/bootstrap.yaml index 14f6664a..32a665c0 100644 --- a/conf/bootstrap.yaml +++ b/conf/bootstrap.yaml @@ -79,3 +79,7 @@ logging_config: sql_log_enabled: false sql_log_name: sql.log physical_sql_log_name: physql.log + +stats: + service: "TestService" + stats_enable: false diff --git a/test/integration_test.go b/test/integration_test.go index 3ff21944..65cd62cf 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -32,10 +32,7 @@ import ( ) import ( - "github.com/arana-db/parser" - _ "github.com/go-sql-driver/mysql" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" ) @@ -45,6 +42,7 @@ import ( "github.com/arana-db/arana/pkg/runtime" "github.com/arana-db/arana/pkg/util/rand2" utils "github.com/arana-db/arana/pkg/util/tableprint" + "github.com/arana-db/parser" ) type IntegrationSuite struct { @@ -1250,7 +1248,10 @@ func (s *IntegrationSuite) TestCompat80() { } { t.Run(it.sql, func(t *testing.T) { rows, err := db.Query(it.sql) - assert.NoError(t, err) + if err != nil { + t.Fatalf("Failed to execute query: %v", err) // 这里使用t.Fatalf停止当前的测试,并报告错误。 + return + } defer rows.Close() }) } From 1f1106d11d7e665c17bf9ec7c053478bf0125802 Mon Sep 17 00:00:00 2001 From: gongna <2036479155@qq.com> Date: Mon, 30 Oct 2023 09:13:09 +0800 Subject: [PATCH 12/12] fixed an issue where integration tests failed:ambulance: --- pkg/config/model.go | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/pkg/config/model.go b/pkg/config/model.go index 265741a2..67d15be9 100644 --- a/pkg/config/model.go +++ b/pkg/config/model.go @@ -28,23 +28,15 @@ import ( "strings" "time" "unicode" -) -import ( + "github.com/arana-db/arana/pkg/util/log" "github.com/go-playground/validator/v10" - "github.com/pkg/errors" - "golang.org/x/text/cases" "golang.org/x/text/language" - "gopkg.in/yaml.v3" ) -import ( - "github.com/arana-db/arana/pkg/util/log" -) - type ( DataRevision interface { Revision() string @@ -88,7 +80,7 @@ type ( Trace *Trace `yaml:"trace" json:"trace"` Supervisor *User `validate:"required,dive" yaml:"supervisor" json:"supervisor"` Logging *log.Config `validate:"required,dive" yaml:"logging" json:"logging"` - Stats *StatsConfig `validate:"required,dive" yaml:"stats" json:"stats"` + Stats *StatsConfig `yaml:"stats" json:"stats"` } StatsConfig struct {