Skip to content

Commit

Permalink
Merge pull request #7 from 1and1/bun-migration
Browse files Browse the repository at this point in the history
migrated from go-pg to bun
  • Loading branch information
tbe authored Jun 15, 2022
2 parents 3dbe2ce + e357078 commit 36246b5
Show file tree
Hide file tree
Showing 752 changed files with 63,671 additions and 65,383 deletions.
8 changes: 4 additions & 4 deletions HACKING.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

### define your model in collector/models

Models are defined by creating a struct like in the most ORM's. As pg-exporter uses [go-pg/v9][1],
the same `sql` tags are used.
Models are defined by creating a struct like in the most ORM's. As pg-exporter uses [bun][1],
the same `bun` tags are used.

In addition, the following pg-exporter specific tags are available and required for code-generation:

Expand All @@ -23,8 +23,8 @@ run `go generate .` in the root folder of the project

- create a new, empty, struct in the `collector` package, that implements the `Scraper` interface.
- Inside the `Scrape` function, fetch your data from the database with the given database connection.
- see [go-pg/v9][1] for details how to do that
- see [bun][1] for details how to do that
- use the (generated) function `ToMetrics` on your result to provide the metrics to the prometheus handler


[1]: https://github.com/go-pg/pg/tree/v9
[1]: https://bun.uptrace.dev/
10 changes: 7 additions & 3 deletions collector/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"context"
"fmt"

"github.com/go-pg/pg/v9"
"github.com/prometheus/client_golang/prometheus"
"github.com/uptrace/bun"
"gopkg.in/alecthomas/kingpin.v2"

"github.com/1and1/pg-exporter/collector/models"
Expand Down Expand Up @@ -62,7 +62,7 @@ func (ScrapeActivity) Type() ScrapeType {
}

// Scrape collects data from database connection and sends it over channel as prometheus metric.
func (ScrapeActivity) Scrape(ctx context.Context, db *pg.DB, ch chan<- prometheus.Metric) error {
func (ScrapeActivity) Scrape(ctx context.Context, db *bun.DB, ch chan<- prometheus.Metric) error {
// we create a query based on the given commandline flags
columns := ""
if *withUsername {
Expand Down Expand Up @@ -94,7 +94,11 @@ func (ScrapeActivity) Scrape(ctx context.Context, db *pg.DB, ch chan<- prometheu
columns, columns)

var statActivity models.PgStatActivitySlice
if _, err := db.QueryContext(ctx, &statActivity, qs, pg.In(collectDatabases)); err != nil {
rows, err := db.QueryContext(ctx, qs, bun.In(collectDatabases))
if err != nil {
return err
}
if err := db.ScanRows(ctx, rows, &statActivity); err != nil {
return err
}
return statActivity.ToMetrics(namespace, activity, ch)
Expand Down
6 changes: 3 additions & 3 deletions collector/archiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package collector
import (
"context"

"github.com/go-pg/pg/v9"
"github.com/prometheus/client_golang/prometheus"
"github.com/uptrace/bun"

"github.com/1and1/pg-exporter/collector/models"
)
Expand Down Expand Up @@ -38,9 +38,9 @@ func (ScrapeArchiver) Type() ScrapeType {
}

// Scrape collects data from database connection and sends it over channel as prometheus metric.
func (ScrapeArchiver) Scrape(ctx context.Context, db *pg.DB, ch chan<- prometheus.Metric) error {
func (ScrapeArchiver) Scrape(ctx context.Context, db *bun.DB, ch chan<- prometheus.Metric) error {
statArchiver := &models.PgStatArchiver{}
if err := db.ModelContext(ctx, statArchiver).Select(); err != nil {
if err := db.NewSelect().Model(statArchiver).Scan(ctx); err != nil {
return err
}

Expand Down
6 changes: 3 additions & 3 deletions collector/bg_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package collector
import (
"context"

"github.com/go-pg/pg/v9"
"github.com/prometheus/client_golang/prometheus"
"github.com/uptrace/bun"

"github.com/1and1/pg-exporter/collector/models"
)
Expand Down Expand Up @@ -38,9 +38,9 @@ func (ScrapeBgWriter) Type() ScrapeType {
}

// Scrape collects data from database connection and sends it over channel as prometheus metric.
func (ScrapeBgWriter) Scrape(ctx context.Context, db *pg.DB, ch chan<- prometheus.Metric) error {
func (ScrapeBgWriter) Scrape(ctx context.Context, db *bun.DB, ch chan<- prometheus.Metric) error {
statBgwriter := &models.PgStatBgWriter{}
if err := db.ModelContext(ctx, statBgwriter).Select(); err != nil {
if err := db.NewSelect().Model(statBgwriter).Scan(ctx); err != nil {
return err
}

Expand Down
11 changes: 8 additions & 3 deletions collector/current_xact.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package collector
import (
"context"

"github.com/go-pg/pg/v9"
"github.com/prometheus/client_golang/prometheus"
"github.com/uptrace/bun"

"github.com/1and1/pg-exporter/collector/models"
)
Expand Down Expand Up @@ -38,11 +38,16 @@ func (ScrapeTXID) Type() ScrapeType {
}

// Scrape collects data from database connection and sends it over channel as prometheus metric.
func (ScrapeTXID) Scrape(ctx context.Context, db *pg.DB, ch chan<- prometheus.Metric) error {
func (ScrapeTXID) Scrape(ctx context.Context, db *bun.DB, ch chan<- prometheus.Metric) error {
qs := `SELECT CASE WHEN pg_is_in_recovery() THEN NULL ELSE txid_current() END AS current`
ctxid := &models.PgTxid{}
if _, err := db.QueryContext(ctx, ctxid, qs); err != nil {
rows, err := db.QueryContext(ctx, qs)
if err != nil {
return err
}
if err := db.ScanRows(ctx, rows, ctxid); err != nil {
return err
}

return ctxid.ToMetrics(namespace, txid, ch)
}
8 changes: 4 additions & 4 deletions collector/database_conflict.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package collector
import (
"context"

"github.com/go-pg/pg/v9"
"github.com/prometheus/client_golang/prometheus"
"github.com/uptrace/bun"

"github.com/1and1/pg-exporter/collector/models"
)
Expand Down Expand Up @@ -38,10 +38,10 @@ func (ScrapeDatabaseConflicts) Type() ScrapeType {
}

// Scrape collects data from database connection and sends it over channel as prometheus metric.
func (ScrapeDatabaseConflicts) Scrape(ctx context.Context, db *pg.DB, ch chan<- prometheus.Metric) error {
func (ScrapeDatabaseConflicts) Scrape(ctx context.Context, db *bun.DB, ch chan<- prometheus.Metric) error {
var databaseConflict models.PgStatDatabaseConflictsSlice
if err := db.ModelContext(ctx, &databaseConflict).Where("datname IN (?)", pg.In(collectDatabases)).
Select(); err != nil {
if err := db.NewSelect().Model(&databaseConflict).Where("datname IN (?)", bun.In(collectDatabases)).
Scan(ctx); err != nil {
return err
}

Expand Down
16 changes: 7 additions & 9 deletions collector/databases.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package collector
import (
"context"

"github.com/go-pg/pg/v9"
"github.com/prometheus/common/log"
"github.com/uptrace/bun"
"gopkg.in/alecthomas/kingpin.v2"

"github.com/1and1/pg-exporter/collector/models"
Expand All @@ -30,7 +30,7 @@ var (
collectDatabases []string
)

func updateDatabaseList(ctx context.Context, db *pg.DB) error {
func updateDatabaseList(ctx context.Context, db *bun.DB) error {
// prepare lookup maps
prepareLookup := func(in *[]string) map[string]bool {
lookup := make(map[string]bool)
Expand All @@ -43,8 +43,12 @@ func updateDatabaseList(ctx context.Context, db *pg.DB) error {
excludeLookup := prepareLookup(excludeDatabases)

var dblist []string
var databases []models.PgDatabase
if err := db.NewSelect().Model(&databases).Scan(ctx); err != nil {
return err
}
// iterate over all databases and check if we include or exclude them
err := db.ModelContext(ctx, (*models.PgDatabase)(nil)).ForEach(func(pgdb *models.PgDatabase) error {
for _, pgdb := range databases {
if len(includeLookup) > 0 {
if includeLookup[pgdb.Datname] {
dblist = append(dblist, pgdb.Datname)
Expand All @@ -54,16 +58,10 @@ func updateDatabaseList(ctx context.Context, db *pg.DB) error {
dblist = append(dblist, pgdb.Datname)
}
}
return nil
})

if err != nil {
return err
}

collectDatabases = dblist

log.Debugf("effective database list: %v", collectDatabases)

return nil
}
76 changes: 42 additions & 34 deletions collector/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package collector

import (
"context"
"database/sql"
"sync"
"time"

"github.com/go-pg/pg/v9"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/log"
"github.com/uptrace/bun"
"github.com/uptrace/bun/dialect/pgdialect"
"github.com/uptrace/bun/driver/pgdriver"
)

// Metric name parts.
Expand Down Expand Up @@ -40,30 +43,13 @@ var _ prometheus.Collector = (*Exporter)(nil)
type Exporter struct {
ctx context.Context
dsn string
pgoptions *pg.Options
pgoptions []pgdriver.Option
scrapers []Scraper
metrics Metrics
}

// New returns a PostgreSQL exporter for the provided DSN.
func New(ctx context.Context, pgoptions *pg.Options, metrics Metrics, scrapers []Scraper) *Exporter {
// Database exporters should only open one connection
pgoptions.PoolSize = 1
// set the lock timeout, to minimise possible production impact

// TODO: this triggers a segfault in some cases, we need to investigate why later
/*
oldOnconnect := pgoptions.OnConnect
newOnconnect := func(conn *pg.Conn) error {
conn.ExecContext(ctx, "SET lock_timeout = 5")
if oldOnconnect != nil {
return oldOnconnect(conn)
}
return nil
}
pgoptions.OnConnect = newOnconnect
*/
func New(ctx context.Context, pgoptions []pgdriver.Option, metrics Metrics, scrapers []Scraper) *Exporter {
return &Exporter{
ctx: ctx,
pgoptions: pgoptions,
Expand Down Expand Up @@ -95,17 +81,31 @@ func (e *Exporter) scrape(ctx context.Context, ch chan<- prometheus.Metric) {

scrapeTime := time.Now()

db := pg.Connect(e.pgoptions)
defer db.Close()
// create a new connection
pgconn := pgdriver.NewConnector(e.pgoptions...)
sqldb := sql.OpenDB(pgconn)
// Only use one connection
sqldb.SetMaxOpenConns(1)
db := bun.NewDB(sqldb, pgdialect.New())

// get the database version
if _, err := db.QueryContext(ctx, pg.Scan(&pgversion), "SHOW server_version_num"); err != nil {
log.Errorln("Error getting PostgreSQL version:", err)
e.metrics.PgSQLUp.Set(0)
e.metrics.Error.Set(1)
return
}
{
rows, err := db.QueryContext(ctx, "SHOW server_version_num")
if err != nil {
log.Errorln("Error getting PostgreSQL version:", err)
e.metrics.PgSQLUp.Set(0)
e.metrics.Error.Set(1)
return
}

if err := db.ScanRows(ctx, rows, &pgversion); err != nil {
log.Errorln("Error parsing PostgreSQL version:", err)
e.metrics.PgSQLUp.Set(0)
e.metrics.Error.Set(1)
return
}
rows.Close()
}
// update our requested db list
if err := updateDatabaseList(ctx, db); err != nil {
log.Errorln("error updating database list:", err)
Expand Down Expand Up @@ -147,14 +147,22 @@ func (e *Exporter) scrape(ctx context.Context, ch chan<- prometheus.Metric) {
// collect the database scoped statistics
for _, dbname := range collectDatabases {
// copy the contents of pgoptions
// we don't require a deep copy, as we only change a string value at the top level
dboptions := *e.pgoptions
dboptions.Database = dbname
var dboptions []pgdriver.Option
dboptions = append(dboptions, e.pgoptions...)

// set our database name
dboptions = append(dboptions, pgdriver.WithDatabase(dbname))

// TODO: the exporter should open only one connection to the database instance,
// as we need one connection per database, this can increase here to n + 1 where n is the number of databases
// to scrape
localdb := pg.Connect(&dboptions)
defer localdb.Close()
// create a new connection
localconn := pgdriver.NewConnector(e.pgoptions...)
localsqldb := sql.OpenDB(localconn)
// Only use one connection
localsqldb.SetMaxOpenConns(1)
localdb := bun.NewDB(localsqldb, pgdialect.New())

for _, scraper := range e.scrapers {
if scraper.Type() != SCRAPELOCAL {
continue
Expand All @@ -164,7 +172,7 @@ func (e *Exporter) scrape(ctx context.Context, ch chan<- prometheus.Metric) {
}

wg.Add(1)
go func(scraper Scraper, dbname string, db *pg.DB) {
go func(scraper Scraper, dbname string, db *bun.DB) {
defer wg.Done()
label := "collect." + scraper.Name() + "." + dbname
scrapeTime := time.Now()
Expand Down
6 changes: 3 additions & 3 deletions collector/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package collector
import (
"context"

"github.com/go-pg/pg/v9"
"github.com/prometheus/client_golang/prometheus"
"github.com/uptrace/bun"
)

const (
Expand Down Expand Up @@ -36,10 +36,10 @@ func (ScrapeInfo) Type() ScrapeType {
}

// Scrape collects data from database connection and sends it over channel as prometheus metric.
func (ScrapeInfo) Scrape(ctx context.Context, db *pg.DB, ch chan<- prometheus.Metric) error {
func (ScrapeInfo) Scrape(ctx context.Context, db *bun.DB, ch chan<- prometheus.Metric) error {
// we reuse the definition from pg_settings
var settingsRes []pgSetting
if err := db.ModelContext(ctx, &settingsRes).WhereIn("name IN (?)", []string{"server_version"}).Select(); err != nil {
if err := db.NewSelect().Model(&settingsRes).Where("name IN (?)", bun.In([]string{"server_version"})).Scan(ctx); err != nil {
return err
}
labels := make(map[string]string)
Expand Down
10 changes: 7 additions & 3 deletions collector/locks.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package collector
import (
"context"

"github.com/go-pg/pg/v9"
"github.com/prometheus/client_golang/prometheus"
"github.com/uptrace/bun"

"github.com/1and1/pg-exporter/collector/models"
)
Expand Down Expand Up @@ -38,7 +38,7 @@ func (ScrapeLocks) Type() ScrapeType {
}

// Scrape collects data from database connection and sends it over channel as prometheus metric.
func (ScrapeLocks) Scrape(ctx context.Context, db *pg.DB, ch chan<- prometheus.Metric) error {
func (ScrapeLocks) Scrape(ctx context.Context, db *bun.DB, ch chan<- prometheus.Metric) error {
qs := `WITH locks AS (
SELECT l.locktype,
CASE
Expand All @@ -60,7 +60,11 @@ func (ScrapeLocks) Scrape(ctx context.Context, db *pg.DB, ch chan<- prometheus.M
GROUP BY locktype, scope_type, database, "mode", "granted";`

var dblocks models.PgLocksSlice
if _, err := db.QueryContext(ctx, &dblocks, qs, pg.In(collectDatabases)); err != nil {
rows, err := db.QueryContext(ctx, qs, bun.In(collectDatabases))
if err != nil {
return err
}
if err := db.ScanRows(ctx, rows, &dblocks); err != nil {
return err
}
return dblocks.ToMetrics(namespace, locks, ch)
Expand Down
Loading

0 comments on commit 36246b5

Please sign in to comment.