Skip to content

Commit

Permalink
add Mysql support
Browse files Browse the repository at this point in the history
  • Loading branch information
def committed Jun 28, 2024
1 parent 809b176 commit 8c407bb
Show file tree
Hide file tree
Showing 8 changed files with 592 additions and 1 deletion.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ require (
)

require (
filippo.io/edwards25519 v1.1.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.1 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.1 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.1 // indirect
Expand All @@ -47,6 +48,7 @@ require (
github.com/go-openapi/jsonpointer v0.20.0 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.22.4 // indirect
github.com/go-sql-driver/mysql v1.8.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt/v5 v5.2.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.1 h1:lGlwhPtrX6EVml1hO0ivjkUxsSyl4dsiw9qcA1k/3IQ=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.1/go.mod h1:RKUqNu35KJYcVG/fqTRqmuXJZYNhYkBrnC/hX7yGbTA=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.1 h1:sO0/P7g68FrryJzljemN+6GTssUXdANk6aJ7T1ZxnsQ=
Expand Down Expand Up @@ -164,6 +166,8 @@ github.com/go-openapi/swag v0.22.4 h1:QLMzNJnMGPRNDCbySlcj1x01tzU8/9LTTL9hZZZogB
github.com/go-openapi/swag v0.22.4/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+VcZ0yl14=
github.com/go-resty/resty/v2 v2.11.0 h1:i7jMfNOJYMp69lq7qozJP+bjgzfAzeOhuGlyDrqxT/8=
github.com/go-resty/resty/v2 v2.11.0/go.mod h1:iiP/OpA0CkcL3IGt1O0+/SIItFUbkkyw5BGXiVdTu+A=
github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y=
github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI=
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls=
Expand Down
21 changes: 20 additions & 1 deletion metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/coroot/coroot-cluster-agent/flags"
"github.com/coroot/coroot-cluster-agent/metrics/aws"
"github.com/coroot/coroot-cluster-agent/metrics/mongo"
"github.com/coroot/coroot-cluster-agent/metrics/mysql"
postgres "github.com/coroot/coroot-pg-agent/collector"
"github.com/coroot/coroot/model"
"github.com/coroot/logger"
Expand Down Expand Up @@ -147,7 +148,25 @@ func (ms *Metrics) createCollector(config ExporterConfig) (prometheus.Collector,
}
query.Set("sslmode", sslmode)
dsn := fmt.Sprintf("postgresql://%s@%s/postgres?%s", userPass, config.Address(), query.Encode())
collector, err := postgres.New(dsn, ms.scrapeTimeout, logger.NewKlog(config.Address()))
collector, err := postgres.New(dsn, ms.scrapeInterval, logger.NewKlog(config.Address()))
if err != nil {
return nil, nil, err
}
return collector, func() { _ = collector.Close() }, nil

case model.ApplicationTypeMysql:
klog.Infoln(config)

userPass := url.UserPassword(config.Credentials.Username, config.Credentials.Password)
query := url.Values{}
query.Set("timeout", fmt.Sprintf("%dms", ms.scrapeTimeout.Milliseconds()))
tls := config.Params["tls"]
if tls == "" {
tls = "false"
}
query.Set("tls", tls)
dsn := fmt.Sprintf("%s@tcp(%s)/mysql?%s", userPass, config.Address(), query.Encode())
collector, err := mysql.New(dsn, logger.NewKlog(config.Address()), ms.scrapeInterval)
if err != nil {
return nil, nil, err
}
Expand Down
208 changes: 208 additions & 0 deletions metrics/mysql/mysql.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
package mysql

import (
"context"
"database/sql"
"github.com/coroot/coroot-cluster-agent/common"
"strconv"
"sync"
"time"

"github.com/coroot/logger"
_ "github.com/go-sql-driver/mysql"
"github.com/prometheus/client_golang/prometheus"
)

const (
picoSeconds = 1e12
)

var (
dUp = common.Desc("mysql_up", "")
dScrapeError = common.Desc("mysql_scrape_error", "", "error", "warning")
dInfo = common.Desc("mysql_info", "", "server_version", "server_id", "server_uuid")

dQueryCalls = common.Desc("mysql_top_query_calls_per_second", "", "schema", "query")
dQueryTotalTime = common.Desc("mysql_top_query_time_per_second", "", "schema", "query")
dQueryLockTime = common.Desc("mysql_top_query_lock_time_per_second", "", "schema", "query")

dReplicationIORunning = common.Desc("mysql_replication_io_status", "", "source_server_id", "source_server_uuid", "state", "last_error")
dReplicationSQLRunning = common.Desc("mysql_replication_sql_status", "", "source_server_id", "source_server_uuid", "state", "last_error")
dReplicationLag = common.Desc("mysql_replication_lag_seconds", "", "source_server_id", "source_server_uuid")

dConnectionsMax = common.Desc("mysql_connections_max", "")
dConnectionsCurrent = common.Desc("mysql_connections_current", "")
dConnectionsTotal = common.Desc("mysql_connections_total", "")
dConnectionsAborted = common.Desc("mysql_connections_aborted_total", "")

dBytesReceived = common.Desc("mysql_traffic_received_bytes_total", "")
dBytesSent = common.Desc("mysql_traffic_sent_bytes_total", "")

dQueries = common.Desc("mysql_queries_total", "")
dSlowQueries = common.Desc("mysql_slow_queries_total", "")

dIOTime = common.Desc("mysql_top_table_io_wait_time_per_second", "", "schema", "table", "operation")
)

type Collector struct {
host string
db *sql.DB
logger logger.Logger
topN int
cancelFunc context.CancelFunc
lock sync.RWMutex
scrapeErrors map[string]bool

globalVariables map[string]string
globalStatus map[string]string
perfschemaPrev *statementsSummarySnapshot
perfschemaCurr *statementsSummarySnapshot
replicaStatuses []*ReplicaStatus
ioByTablePrev *ioByTableSnapshot
ioByTableCurr *ioByTableSnapshot

invalidQueries map[string]bool
}

func New(dsn string, logger logger.Logger, scrapeInterval time.Duration) (*Collector, error) {
ctx, cancelFunc := context.WithCancel(context.Background())
c := &Collector{
logger: logger,
cancelFunc: cancelFunc,
globalStatus: map[string]string{},
globalVariables: map[string]string{},
invalidQueries: map[string]bool{},
}
var err error
c.db, err = sql.Open("mysql", dsn)
if err != nil {
return nil, err
}
c.db.SetMaxOpenConns(1)
if err := c.db.Ping(); err != nil {
c.logger.Warning("probe failed:", err)
}
go func() {
ticker := time.NewTicker(scrapeInterval)
c.snapshot()
for {
select {
case <-ticker.C:
c.snapshot()
case <-ctx.Done():
c.logger.Info("stopping mysql collector")
return
}
}
}()

return c, nil
}

func (c *Collector) Close() error {
c.cancelFunc()
return c.db.Close()
}

func (c *Collector) Collect(ch chan<- prometheus.Metric) {
if err := c.db.Ping(); err != nil {
c.logger.Warning("probe failed:", err)
ch <- common.Gauge(dUp, 0)
ch <- common.Gauge(dScrapeError, 1, err.Error(), "")
return
}
ch <- common.Gauge(dUp, 1)
c.lock.RLock()
defer c.lock.RUnlock()
if version := c.globalVariables["version"]; version != "" {
ch <- common.Gauge(dInfo, 1, version, c.globalVariables["server_id"], c.globalVariables["server_uuid"])
}

if len(c.scrapeErrors) > 0 {
for e := range c.scrapeErrors {
ch <- common.Gauge(dScrapeError, 1, "", e)
}
} else {
ch <- common.Gauge(dScrapeError, 0, "", "")
}
c.queryMetrics(ch, 20)
c.ioMetrics(ch, 20)
c.replicationMetrics(ch)
metricFromVariable(ch, dConnectionsMax, "max_connections", prometheus.GaugeValue, c.globalVariables)
metricFromVariable(ch, dConnectionsCurrent, "Threads_connected", prometheus.GaugeValue, c.globalStatus)
metricFromVariable(ch, dConnectionsTotal, "Connections", prometheus.CounterValue, c.globalStatus)
metricFromVariable(ch, dConnectionsAborted, "Aborted_connects", prometheus.CounterValue, c.globalStatus)
metricFromVariable(ch, dBytesReceived, "Bytes_received", prometheus.CounterValue, c.globalStatus)
metricFromVariable(ch, dBytesSent, "Bytes_sent", prometheus.CounterValue, c.globalStatus)
metricFromVariable(ch, dQueries, "Questions", prometheus.CounterValue, c.globalStatus)
metricFromVariable(ch, dSlowQueries, "Slow_queries", prometheus.CounterValue, c.globalStatus)
}

func (c *Collector) snapshot() {
c.lock.Lock()
defer c.lock.Unlock()

c.scrapeErrors = map[string]bool{}

if err := c.updateVariables("SHOW GLOBAL VARIABLES", c.globalVariables); err != nil {
c.logger.Warning(err)
c.scrapeErrors[err.Error()] = true
return
}
if err := c.updateVariables("SHOW GLOBAL STATUS", c.globalStatus); err != nil {
c.logger.Warning(err)
c.scrapeErrors[err.Error()] = true
return
}
if err := c.updateReplicationStatus(); err != nil {
c.logger.Warning(err)
c.scrapeErrors[err.Error()] = true
return
}
c.perfschemaPrev = c.perfschemaCurr
var err error
c.perfschemaCurr, err = c.queryStatementsSummary(c.perfschemaPrev)
if err != nil {
c.logger.Warning(err)
c.scrapeErrors[err.Error()] = true
return
}
c.ioByTablePrev = c.ioByTableCurr
c.ioByTableCurr, err = c.queryTableIOWaits()
if err != nil {
c.logger.Warning(err)
c.scrapeErrors[err.Error()] = true
return
}
}

func (c *Collector) Describe(ch chan<- *prometheus.Desc) {
ch <- dUp
ch <- dScrapeError
ch <- dInfo
ch <- dQueryCalls
ch <- dQueryTotalTime
ch <- dQueryLockTime
ch <- dReplicationIORunning
ch <- dReplicationSQLRunning
ch <- dReplicationLag
ch <- dConnectionsMax
ch <- dConnectionsCurrent
ch <- dConnectionsTotal
ch <- dConnectionsAborted
ch <- dBytesReceived
ch <- dBytesSent
ch <- dQueries
ch <- dSlowQueries
ch <- dIOTime
}

func metricFromVariable(ch chan<- prometheus.Metric, desc *prometheus.Desc, name string, typ prometheus.ValueType, variables map[string]string) {
v, ok := variables[name]
if !ok {
return
}
if f, err := strconv.ParseFloat(v, 64); err == nil {
ch <- prometheus.MustNewConstMetric(desc, typ, f)
}
}
101 changes: 101 additions & 0 deletions metrics/mysql/perfschema_io_waits_summary_by_table.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package mysql

import (
"github.com/coroot/coroot-cluster-agent/common"
"sort"
"time"

"github.com/prometheus/client_golang/prometheus"
)

type tableKey struct {
schema string
table string
}

type ioSummary struct {
readTotalTime uint64
writeTotalTime uint64
}

type ioByTableSnapshot struct {
ts time.Time
rows map[tableKey]ioSummary
}

func (c *Collector) queryTableIOWaits() (*ioByTableSnapshot, error) {
snapshot := &ioByTableSnapshot{ts: time.Now(), rows: map[tableKey]ioSummary{}}
q := `
SELECT
OBJECT_SCHEMA,
OBJECT_NAME,
SUM_TIMER_READ,
SUM_TIMER_WRITE
FROM performance_schema.table_io_waits_summary_by_table
WHERE
OBJECT_SCHEMA is not null AND
OBJECT_NAME is not null`
rows, err := c.db.Query(q)
if err != nil {
return nil, err
}
defer rows.Close()

for rows.Next() {
var k tableKey
var r ioSummary
if err := rows.Scan(&k.schema, &k.table, &r.readTotalTime, &r.writeTotalTime); err != nil {
c.logger.Warning(err)
continue
}
snapshot.rows[k] = r
}
return snapshot, nil
}

type ioStats struct {
readTimePerSecond float64
writeTimePerSecond float64
totalTimePerSecond float64
}

type ioStatsWithKey struct {
k tableKey
s *ioStats
}

func (c *Collector) ioMetrics(ch chan<- prometheus.Metric, n int) {
if c.ioByTablePrev == nil || c.ioByTableCurr == nil {
return
}
res := map[tableKey]*ioStats{}

withKeys := make([]ioStatsWithKey, 0, len(res))

interval := c.ioByTableCurr.ts.Sub(c.ioByTablePrev.ts).Seconds()
for k, s := range c.ioByTableCurr.rows {
prev := c.ioByTablePrev.rows[k]
stats := &ioStats{}
if v := s.readTotalTime - prev.readTotalTime; v > 0 {
stats.readTimePerSecond = float64(v) / picoSeconds / interval
}
if v := s.writeTotalTime - prev.writeTotalTime; v > 0 {
stats.writeTimePerSecond = float64(v) / picoSeconds / interval
}
stats.totalTimePerSecond = stats.readTimePerSecond + stats.writeTimePerSecond
if stats.totalTimePerSecond > 0 {
withKeys = append(withKeys, ioStatsWithKey{k: k, s: stats})
}
}
sort.Slice(withKeys, func(i, j int) bool {
return withKeys[i].s.totalTimePerSecond > withKeys[j].s.totalTimePerSecond
})
if n > len(withKeys) {
n = len(withKeys)
}
for _, i := range withKeys[:n] {
ch <- common.Gauge(dIOTime, i.s.readTimePerSecond, i.k.schema, i.k.table, "read")
ch <- common.Gauge(dIOTime, i.s.writeTimePerSecond, i.k.schema, i.k.table, "write")
}

}
Loading

0 comments on commit 8c407bb

Please sign in to comment.