Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

streamline the database interfaces and add pgx adapter #29

Draft
wants to merge 16 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/pr.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
name: pr
on:
pull_request:
push:
branches-ignore:
- master
Expand Down
18 changes: 8 additions & 10 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ toolchain go1.23.0

require (
github.com/ThreeDotsLabs/watermill v1.4.0-rc.2
github.com/ThreeDotsLabs/watermill-sql/v3 v3.1.0
github.com/go-sql-driver/mysql v1.4.1
github.com/jackc/pgx/v4 v4.18.2
github.com/lib/pq v1.10.2
github.com/jackc/pgx/v5 v5.7.1
github.com/lib/pq v1.10.9
github.com/oklog/ulid v1.3.1
github.com/stretchr/testify v1.9.0
)
Expand All @@ -19,19 +20,16 @@ require (
github.com/google/uuid v1.6.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
github.com/jackc/pgconn v1.14.3 // indirect
github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgproto3/v2 v2.3.3 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/pgtype v1.14.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect
github.com/jackc/puddle/v2 v2.2.2 // indirect
github.com/lithammer/shortuuid/v3 v3.0.7 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/sony/gobreaker v1.0.0 // indirect
golang.org/x/crypto v0.20.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/crypto v0.27.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/text v0.18.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
170 changes: 22 additions & 148 deletions go.sum

Large diffs are not rendered by default.

127 changes: 127 additions & 0 deletions pkg/pgx/pgx_adapter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package pgx

import (
"context"
stdSQL "database/sql"
"fmt"
"time"

"github.com/ThreeDotsLabs/watermill-sql/v3/pkg/sql"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
)

type Conn interface {
BeginTx(ctx context.Context, options pgx.TxOptions) (pgx.Tx, error)
Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error)
Query(ctx context.Context, sql string, arguments ...interface{}) (pgx.Rows, error)
}

type Beginner struct {
Conn
}

type Tx struct {
pgx.Tx
}

type Result struct {
pgconn.CommandTag
}

type Rows struct {
pgx.Rows
}

func (c Beginner) BeginTx(ctx context.Context, options *stdSQL.TxOptions) (sql.Tx, error) {
opts := pgx.TxOptions{}
if options != nil {
iso, err := toPgxIsolationLevel(options.Isolation)
if err != nil {
return nil, err
}

opts = pgx.TxOptions{
IsoLevel: iso,
AccessMode: toPgxAccessMode(options.ReadOnly),
DeferrableMode: "",
}
}

tx, err := c.Conn.BeginTx(ctx, opts)

return Tx{tx}, err
}

func (c Beginner) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) {
res, err := c.Conn.Exec(ctx, query, args...)

return Result{res}, err
}

func (c Beginner) QueryContext(ctx context.Context, query string, args ...interface{}) (sql.Rows, error) {
rows, err := c.Conn.Query(ctx, query, args...)

return Rows{rows}, err
}

func (t Tx) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) {
res, err := t.Tx.Exec(ctx, query, args...)

return Result{res}, err
}

func (t Tx) QueryContext(ctx context.Context, query string, args ...any) (sql.Rows, error) {
rows, err := t.Tx.Query(ctx, query, args...)

return Rows{rows}, err
}

func (t Tx) Rollback() error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

return t.Tx.Rollback(ctx)
}

func (t Tx) Commit() error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

return t.Tx.Commit(ctx)
}

func (p Result) RowsAffected() (int64, error) {
return p.CommandTag.RowsAffected(), nil
}

func (p Rows) Close() error {
p.Rows.Close()

return nil
}

func toPgxIsolationLevel(level stdSQL.IsolationLevel) (pgx.TxIsoLevel, error) {
switch level {
case stdSQL.LevelReadUncommitted:
return pgx.ReadUncommitted, nil
case stdSQL.LevelReadCommitted:
return pgx.ReadCommitted, nil
case stdSQL.LevelRepeatableRead:
return pgx.RepeatableRead, nil
case stdSQL.LevelSerializable:
return pgx.Serializable, nil
case stdSQL.LevelSnapshot:
return pgx.Serializable, fmt.Errorf("pgx does not support snapshot isolation")
default:
return pgx.Serializable, nil
}
}

func toPgxAccessMode(readOnly bool) pgx.TxAccessMode {
if readOnly {
return pgx.ReadOnly
}

return pgx.ReadWrite
}
7 changes: 3 additions & 4 deletions pkg/sql/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package sql

import (
"context"
"database/sql"
)

type contextKey string
Expand All @@ -11,7 +10,7 @@ const (
txContextKey contextKey = "tx"
)

func setTxToContext(ctx context.Context, tx *sql.Tx) context.Context {
func setTxToContext(ctx context.Context, tx Tx) context.Context {
return context.WithValue(ctx, txContextKey, tx)
}

Expand All @@ -21,7 +20,7 @@ func setTxToContext(ctx context.Context, tx *sql.Tx) context.Context {
//
// It is useful when you want to ensure that data is updated only when the message is processed.
// Example usage: https://github.com/ThreeDotsLabs/watermill/tree/master/_examples/real-world-examples/exactly-once-delivery-counter
func TxFromContext(ctx context.Context) (*sql.Tx, bool) {
tx, ok := ctx.Value(txContextKey).(*sql.Tx)
func TxFromContext(ctx context.Context) (Tx, bool) {
tx, ok := ctx.Value(txContextKey).(Tx)
return tx, ok
}
Loading