From da49f017cdfa10db7211ce4421a291938ec1e2ef Mon Sep 17 00:00:00 2001 From: Julian Craske Date: Thu, 8 Feb 2024 16:26:02 +0000 Subject: [PATCH 01/15] streamline the database interfaces and add pgx adapter --- pkg/sql/context.go | 7 +- pkg/sql/pgx/pgx_adapter.go | 122 +++++++++++++++++++++++++++++++++ pkg/sql/pubsub_test.go | 18 ++--- pkg/sql/schema_adapter_test.go | 8 +-- pkg/sql/sql.go | 32 +++++---- pkg/sql/sql_adapter.go | 41 +++++++++++ pkg/sql/subscriber.go | 4 +- pkg/sql/tx.go | 3 +- 8 files changed, 202 insertions(+), 33 deletions(-) create mode 100644 pkg/sql/pgx/pgx_adapter.go create mode 100644 pkg/sql/sql_adapter.go diff --git a/pkg/sql/context.go b/pkg/sql/context.go index 7740dee..36d33e4 100644 --- a/pkg/sql/context.go +++ b/pkg/sql/context.go @@ -2,7 +2,6 @@ package sql import ( "context" - "database/sql" ) type contextKey string @@ -11,7 +10,7 @@ const ( txContextKey contextKey = "tx" ) -func setTxToContext(ctx context.Context, tx *sql.Tx) context.Context { +func setTxToContext(ctx context.Context, tx Tx) context.Context { return context.WithValue(ctx, txContextKey, tx) } @@ -21,7 +20,7 @@ func setTxToContext(ctx context.Context, tx *sql.Tx) context.Context { // // It is useful when you want to ensure that data is updated only when the message is processed. // Example usage: https://github.com/ThreeDotsLabs/watermill/tree/master/_examples/real-world-examples/exactly-once-delivery-counter -func TxFromContext(ctx context.Context) (*sql.Tx, bool) { - tx, ok := ctx.Value(txContextKey).(*sql.Tx) +func TxFromContext(ctx context.Context) (Tx, bool) { + tx, ok := ctx.Value(txContextKey).(Tx) return tx, ok } diff --git a/pkg/sql/pgx/pgx_adapter.go b/pkg/sql/pgx/pgx_adapter.go new file mode 100644 index 0000000..9be831e --- /dev/null +++ b/pkg/sql/pgx/pgx_adapter.go @@ -0,0 +1,122 @@ +package pgx + +import ( + "context" + stdSQL "database/sql" + "fmt" + "time" + + "github.com/ThreeDotsLabs/watermill-sql/v3/pkg/sql" + + "github.com/jackc/pgconn" + "github.com/jackc/pgx/v4" +) + +type Beginner struct { + *pgx.Conn +} + +type Tx struct { + pgx.Tx +} + +type Result struct { + pgconn.CommandTag +} + +type Rows struct { + pgx.Rows +} + +func (c Beginner) BeginTx(ctx context.Context, options *stdSQL.TxOptions) (sql.Tx, error) { + opts := pgx.TxOptions{} + if options != nil { + iso, err := toPgxIsolationLevel(options.Isolation) + if err != nil { + return nil, err + } + + opts = pgx.TxOptions{ + IsoLevel: iso, + AccessMode: toPgxAccessMode(options.ReadOnly), + DeferrableMode: "", + } + } + + tx, err := c.Conn.BeginTx(ctx, opts) + + return Tx{tx}, err +} + +func (c Beginner) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) { + res, err := c.Conn.Exec(ctx, query, args...) + + return Result{res}, err +} + +func (c Beginner) QueryContext(ctx context.Context, query string, args ...interface{}) (sql.Rows, error) { + rows, err := c.Conn.Query(ctx, query, args...) + + return Rows{rows}, err +} + +func (t Tx) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) { + res, err := t.Tx.Exec(ctx, query, args...) + + return Result{res}, err +} + +func (t Tx) QueryContext(ctx context.Context, query string, args ...any) (sql.Rows, error) { + rows, err := t.Tx.Query(ctx, query, args...) + + return Rows{rows}, err +} + +func (t Tx) Rollback() error { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + return t.Tx.Rollback(ctx) +} + +func (t Tx) Commit() error { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + return t.Tx.Commit(ctx) +} + +func (p Result) RowsAffected() (int64, error) { + return p.CommandTag.RowsAffected(), nil +} + +func (p Rows) Close() error { + p.Rows.Close() + + return nil +} + +func toPgxIsolationLevel(level stdSQL.IsolationLevel) (pgx.TxIsoLevel, error) { + switch level { + case stdSQL.LevelReadUncommitted: + return pgx.ReadUncommitted, nil + case stdSQL.LevelReadCommitted: + return pgx.ReadCommitted, nil + case stdSQL.LevelRepeatableRead: + return pgx.RepeatableRead, nil + case stdSQL.LevelSerializable: + return pgx.Serializable, nil + case stdSQL.LevelSnapshot: + return pgx.Serializable, fmt.Errorf("pgx does not support snapshot isolation") + default: + return pgx.Serializable, nil + } +} + +func toPgxAccessMode(readOnly bool) pgx.TxAccessMode { + if readOnly { + return pgx.ReadOnly + } + + return pgx.ReadWrite +} diff --git a/pkg/sql/pubsub_test.go b/pkg/sql/pubsub_test.go index c45d31c..1671608 100644 --- a/pkg/sql/pubsub_test.go +++ b/pkg/sql/pubsub_test.go @@ -22,12 +22,12 @@ import ( ) var ( - logger = watermill.NewStdLogger(true, false) + logger = watermill.NewStdLogger(false, false) ) func newPubSub(t *testing.T, db *stdSQL.DB, consumerGroup string, schemaAdapter sql.SchemaAdapter, offsetsAdapter sql.OffsetsAdapter) (message.Publisher, message.Subscriber) { publisher, err := sql.NewPublisher( - db, + &sql.StdSQLBeginner{DB: db}, sql.PublisherConfig{ SchemaAdapter: schemaAdapter, }, @@ -36,7 +36,7 @@ func newPubSub(t *testing.T, db *stdSQL.DB, consumerGroup string, schemaAdapter require.NoError(t, err) subscriber, err := sql.NewSubscriber( - db, + &sql.StdSQLBeginner{DB: db}, sql.SubscriberConfig{ ConsumerGroup: consumerGroup, @@ -293,7 +293,7 @@ func TestCtxValues(t *testing.T) { tx, ok := sql.TxFromContext(msg.Context()) assert.True(t, ok) assert.NotNil(t, t, tx) - assert.IsType(t, &stdSQL.Tx{}, tx) + assert.IsType(t, &sql.StdSQLTx{}, tx) msg.Ack() case <-time.After(time.Second * 10): t.Fatal("no message received") @@ -344,7 +344,7 @@ func TestNotMissingMessages(t *testing.T) { } sub, err := sql.NewSubscriber( - db, + &sql.StdSQLBeginner{DB: db}, sql.SubscriberConfig{ ConsumerGroup: "consumerGroup", @@ -394,7 +394,7 @@ func TestNotMissingMessages(t *testing.T) { time.Sleep(time.Millisecond * 10) pub0, err := sql.NewPublisher( - tx0, + &sql.StdSQLTx{Tx: tx0}, sql.PublisherConfig{ SchemaAdapter: pubSub.SchemaAdapter, }, @@ -405,7 +405,7 @@ func TestNotMissingMessages(t *testing.T) { require.NoError(t, err, "cannot publish message") pub1, err := sql.NewPublisher( - tx1, + &sql.StdSQLTx{Tx: tx1}, sql.PublisherConfig{ SchemaAdapter: pubSub.SchemaAdapter, }, @@ -416,7 +416,7 @@ func TestNotMissingMessages(t *testing.T) { require.NoError(t, err, "cannot publish message") pubRollback, err := sql.NewPublisher( - txRollback, + &sql.StdSQLTx{Tx: txRollback}, sql.PublisherConfig{ SchemaAdapter: pubSub.SchemaAdapter, }, @@ -427,7 +427,7 @@ func TestNotMissingMessages(t *testing.T) { require.NoError(t, err, "cannot publish message") pub2, err := sql.NewPublisher( - tx2, + &sql.StdSQLTx{Tx: tx2}, sql.PublisherConfig{ SchemaAdapter: pubSub.SchemaAdapter, }, diff --git a/pkg/sql/schema_adapter_test.go b/pkg/sql/schema_adapter_test.go index 6cde8c2..ac330c4 100644 --- a/pkg/sql/schema_adapter_test.go +++ b/pkg/sql/schema_adapter_test.go @@ -16,7 +16,7 @@ import ( // TestDefaultMySQLSchema checks if the SQL schema defined in DefaultMySQLSchema is correctly executed // and if message marshaling works as intended. func TestDefaultMySQLSchema(t *testing.T) { - db := newMySQL(t) + db := &sql.StdSQLBeginner{DB: newMySQL(t)} publisher, err := sql.NewPublisher(db, sql.PublisherConfig{ SchemaAdapter: sql.DefaultMySQLSchema{}, @@ -40,7 +40,7 @@ func TestDefaultMySQLSchema_implicit_commit_warning(t *testing.T) { require.NoError(t, err) schemaAdapter := sql.DefaultMySQLSchema{} - _, err = sql.NewPublisher(tx, sql.PublisherConfig{ + _, err = sql.NewPublisher(&sql.StdSQLTx{Tx: tx}, sql.PublisherConfig{ SchemaAdapter: schemaAdapter, AutoInitializeSchema: true, }, logger) @@ -54,7 +54,7 @@ func TestDefaultMySQLSchema_implicit_commit(t *testing.T) { require.NoError(t, err) schemaAdapter := sql.DefaultMySQLSchema{} - _, err = sql.NewPublisher(tx, sql.PublisherConfig{ + _, err = sql.NewPublisher(&sql.StdSQLTx{Tx: tx}, sql.PublisherConfig{ SchemaAdapter: schemaAdapter, AutoInitializeSchema: true, }, logger) @@ -64,7 +64,7 @@ func TestDefaultMySQLSchema_implicit_commit(t *testing.T) { // TestDefaultPostgreSQLSchema checks if the SQL schema defined in DefaultPostgreSQLSchema is correctly executed // and if message marshaling works as intended. func TestDefaultPostgreSQLSchema(t *testing.T) { - db := newPostgreSQL(t) + db := &sql.StdSQLBeginner{DB: newPostgreSQL(t)} publisher, err := sql.NewPublisher(db, sql.PublisherConfig{ SchemaAdapter: sql.DefaultPostgreSQLSchema{}, diff --git a/pkg/sql/sql.go b/pkg/sql/sql.go index 8b64613..ae00572 100644 --- a/pkg/sql/sql.go +++ b/pkg/sql/sql.go @@ -7,27 +7,35 @@ import ( "strings" ) -// interface definitions borrowed from github.com/volatiletech/sqlboiler +// A Result summarizes an executed SQL command. +type Result interface { + // RowsAffected returns the number of rows affected by an + // update, insert, or delete. Not every database or database + // driver may support this. + RowsAffected() (int64, error) +} + +type Rows interface { + Scan(dest ...any) error + Close() error + Next() bool +} -// Executor can perform SQL queries. -type Executor interface { - Exec(query string, args ...interface{}) (sql.Result, error) - Query(query string, args ...interface{}) (*sql.Rows, error) - QueryRow(query string, args ...interface{}) *sql.Row +type Tx interface { + ContextExecutor + Rollback() error + Commit() error } // ContextExecutor can perform SQL queries with context type ContextExecutor interface { - Executor - - ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) - QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) - QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row + ExecContext(ctx context.Context, query string, args ...any) (Result, error) + QueryContext(ctx context.Context, query string, args ...any) (Rows, error) } // Beginner begins transactions. type Beginner interface { - BeginTx(context.Context, *sql.TxOptions) (*sql.Tx, error) + BeginTx(context.Context, *sql.TxOptions) (Tx, error) ContextExecutor } diff --git a/pkg/sql/sql_adapter.go b/pkg/sql/sql_adapter.go new file mode 100644 index 0000000..0849125 --- /dev/null +++ b/pkg/sql/sql_adapter.go @@ -0,0 +1,41 @@ +package sql + +import ( + "context" + "database/sql" +) + +type StdSQLBeginner struct { + *sql.DB +} + +type StdSQLTx struct { + *sql.Tx +} + +// BeginTx converts the stdSQL.Tx struct to our Tx interface +func (c *StdSQLBeginner) BeginTx(ctx context.Context, options *sql.TxOptions) (Tx, error) { + tx, err := c.DB.BeginTx(ctx, options) + + return &StdSQLTx{tx}, err +} + +// ExecContext converts the stdSQL.Result struct to our Result interface +func (c *StdSQLBeginner) ExecContext(ctx context.Context, query string, args ...interface{}) (Result, error) { + return c.DB.ExecContext(ctx, query, args...) +} + +// QueryContext converts the stdSQL.Rows struct to our Rows interface +func (c *StdSQLBeginner) QueryContext(ctx context.Context, query string, args ...interface{}) (Rows, error) { + return c.DB.QueryContext(ctx, query, args...) +} + +// ExecContext converts the stdSQL.Result struct to our Result interface +func (t *StdSQLTx) ExecContext(ctx context.Context, query string, args ...any) (Result, error) { + return t.Tx.ExecContext(ctx, query, args...) +} + +// QueryContext converts the stdSQL.Rows struct to our Rows interface +func (t *StdSQLTx) QueryContext(ctx context.Context, query string, args ...any) (Rows, error) { + return t.Tx.QueryContext(ctx, query, args...) +} diff --git a/pkg/sql/subscriber.go b/pkg/sql/subscriber.go index 8c3e67c..1ad76ff 100644 --- a/pkg/sql/subscriber.go +++ b/pkg/sql/subscriber.go @@ -183,7 +183,7 @@ func (s *Subscriber) Subscribe(ctx context.Context, topic string) (o <-chan *mes bsq := s.config.OffsetsAdapter.BeforeSubscribingQueries(topic, s.config.ConsumerGroup) if len(bsq) >= 1 { - err := runInTx(ctx, s.db, func(ctx context.Context, tx *sql.Tx) error { + err := runInTx(ctx, s.db, func(ctx context.Context, tx Tx) error { for _, q := range bsq { s.logger.Debug("Executing before subscribing query", watermill.LogFields{ "query": q, @@ -364,7 +364,7 @@ func (s *Subscriber) processMessage( ctx context.Context, topic string, row Row, - tx *sql.Tx, + tx Tx, out chan *message.Message, logger watermill.LoggerAdapter, ) (bool, error) { diff --git a/pkg/sql/tx.go b/pkg/sql/tx.go index 70e8caf..1a21404 100644 --- a/pkg/sql/tx.go +++ b/pkg/sql/tx.go @@ -2,7 +2,6 @@ package sql import ( "context" - "database/sql" "errors" "fmt" ) @@ -10,7 +9,7 @@ import ( func runInTx( ctx context.Context, db Beginner, - fn func(ctx context.Context, tx *sql.Tx) error, + fn func(ctx context.Context, tx Tx) error, ) (err error) { tx, err := db.BeginTx(ctx, nil) if err != nil { From b61bad0c20061af3fc860747b6a7c0d746954729 Mon Sep 17 00:00:00 2001 From: Julian Craske Date: Thu, 8 Feb 2024 16:55:02 +0000 Subject: [PATCH 02/15] change module name --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index f6f37b7..621bc47 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module github.com/ThreeDotsLabs/watermill-sql/v3 +module github.com/julesjcraske/watermill-sql/v3 go 1.20 From 8a953359c05b03af3380894a854383b5f884c351 Mon Sep 17 00:00:00 2001 From: Julian Craske Date: Thu, 8 Feb 2024 17:01:45 +0000 Subject: [PATCH 03/15] Add an interface for the pgx connection --- pkg/sql/pgx/pgx_adapter.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/pkg/sql/pgx/pgx_adapter.go b/pkg/sql/pgx/pgx_adapter.go index 9be831e..d82911c 100644 --- a/pkg/sql/pgx/pgx_adapter.go +++ b/pkg/sql/pgx/pgx_adapter.go @@ -6,14 +6,20 @@ import ( "fmt" "time" - "github.com/ThreeDotsLabs/watermill-sql/v3/pkg/sql" + "github.com/julesjcraske/watermill-sql/v3/pkg/sql" "github.com/jackc/pgconn" "github.com/jackc/pgx/v4" ) +type Conn interface { + BeginTx(ctx context.Context, options pgx.TxOptions) (pgx.Tx, error) + Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error) + Query(ctx context.Context, sql string, arguments ...interface{}) (pgx.Rows, error) +} + type Beginner struct { - *pgx.Conn + Conn } type Tx struct { From e065a8c19288bd949c26fa2f3928b9ebe2a01504 Mon Sep 17 00:00:00 2001 From: Julian Craske Date: Thu, 8 Feb 2024 17:19:02 +0000 Subject: [PATCH 04/15] change the module name in tests --- pkg/sql/pubsub_test.go | 2 +- pkg/sql/schema_adapter_test.go | 2 +- pkg/sql/topic_test.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/sql/pubsub_test.go b/pkg/sql/pubsub_test.go index 1671608..a151f41 100644 --- a/pkg/sql/pubsub_test.go +++ b/pkg/sql/pubsub_test.go @@ -9,13 +9,13 @@ import ( "time" "github.com/ThreeDotsLabs/watermill" - "github.com/ThreeDotsLabs/watermill-sql/v3/pkg/sql" "github.com/ThreeDotsLabs/watermill/message" "github.com/ThreeDotsLabs/watermill/message/subscriber" "github.com/ThreeDotsLabs/watermill/pubsub/tests" driver "github.com/go-sql-driver/mysql" "github.com/jackc/pgx/v4" "github.com/jackc/pgx/v4/stdlib" + "github.com/julesjcraske/watermill-sql/v3/pkg/sql" _ "github.com/lib/pq" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" diff --git a/pkg/sql/schema_adapter_test.go b/pkg/sql/schema_adapter_test.go index ac330c4..ccf6bdb 100644 --- a/pkg/sql/schema_adapter_test.go +++ b/pkg/sql/schema_adapter_test.go @@ -9,8 +9,8 @@ import ( "github.com/stretchr/testify/require" "github.com/ThreeDotsLabs/watermill" - "github.com/ThreeDotsLabs/watermill-sql/v3/pkg/sql" "github.com/ThreeDotsLabs/watermill/message" + "github.com/julesjcraske/watermill-sql/v3/pkg/sql" ) // TestDefaultMySQLSchema checks if the SQL schema defined in DefaultMySQLSchema is correctly executed diff --git a/pkg/sql/topic_test.go b/pkg/sql/topic_test.go index a080275..7cca719 100644 --- a/pkg/sql/topic_test.go +++ b/pkg/sql/topic_test.go @@ -5,8 +5,8 @@ import ( "testing" "time" - "github.com/ThreeDotsLabs/watermill-sql/v3/pkg/sql" "github.com/ThreeDotsLabs/watermill/message" + "github.com/julesjcraske/watermill-sql/v3/pkg/sql" "github.com/pkg/errors" "github.com/stretchr/testify/assert" From e45329a973129e5c33ce7fa76882518ce00937b5 Mon Sep 17 00:00:00 2001 From: Julian Craske Date: Mon, 12 Feb 2024 11:51:36 +0000 Subject: [PATCH 05/15] change the module name back --- go.mod | 4 ++-- pkg/sql/pgx/pgx_adapter.go | 2 +- pkg/sql/pubsub_test.go | 2 +- pkg/sql/schema_adapter_test.go | 2 +- pkg/sql/topic_test.go | 2 +- 5 files changed, 6 insertions(+), 6 deletions(-) diff --git a/go.mod b/go.mod index 621bc47..c72995c 100644 --- a/go.mod +++ b/go.mod @@ -1,10 +1,11 @@ -module github.com/julesjcraske/watermill-sql/v3 +module github.com/ThreeDotsLabs/watermill-sql/v3 go 1.20 require ( github.com/ThreeDotsLabs/watermill v1.2.0 github.com/go-sql-driver/mysql v1.4.1 + github.com/jackc/pgconn v1.6.4 github.com/jackc/pgx/v4 v4.8.1 github.com/lib/pq v1.3.0 github.com/oklog/ulid v1.3.1 @@ -18,7 +19,6 @@ require ( github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/jackc/chunkreader/v2 v2.0.1 // indirect - github.com/jackc/pgconn v1.6.4 // indirect github.com/jackc/pgio v1.0.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgproto3/v2 v2.0.2 // indirect diff --git a/pkg/sql/pgx/pgx_adapter.go b/pkg/sql/pgx/pgx_adapter.go index d82911c..f07a224 100644 --- a/pkg/sql/pgx/pgx_adapter.go +++ b/pkg/sql/pgx/pgx_adapter.go @@ -6,7 +6,7 @@ import ( "fmt" "time" - "github.com/julesjcraske/watermill-sql/v3/pkg/sql" + "github.com/ThreeDotsLabs/watermill-sql/v3/pkg/sql" "github.com/jackc/pgconn" "github.com/jackc/pgx/v4" diff --git a/pkg/sql/pubsub_test.go b/pkg/sql/pubsub_test.go index a151f41..1671608 100644 --- a/pkg/sql/pubsub_test.go +++ b/pkg/sql/pubsub_test.go @@ -9,13 +9,13 @@ import ( "time" "github.com/ThreeDotsLabs/watermill" + "github.com/ThreeDotsLabs/watermill-sql/v3/pkg/sql" "github.com/ThreeDotsLabs/watermill/message" "github.com/ThreeDotsLabs/watermill/message/subscriber" "github.com/ThreeDotsLabs/watermill/pubsub/tests" driver "github.com/go-sql-driver/mysql" "github.com/jackc/pgx/v4" "github.com/jackc/pgx/v4/stdlib" - "github.com/julesjcraske/watermill-sql/v3/pkg/sql" _ "github.com/lib/pq" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" diff --git a/pkg/sql/schema_adapter_test.go b/pkg/sql/schema_adapter_test.go index ccf6bdb..ac330c4 100644 --- a/pkg/sql/schema_adapter_test.go +++ b/pkg/sql/schema_adapter_test.go @@ -9,8 +9,8 @@ import ( "github.com/stretchr/testify/require" "github.com/ThreeDotsLabs/watermill" + "github.com/ThreeDotsLabs/watermill-sql/v3/pkg/sql" "github.com/ThreeDotsLabs/watermill/message" - "github.com/julesjcraske/watermill-sql/v3/pkg/sql" ) // TestDefaultMySQLSchema checks if the SQL schema defined in DefaultMySQLSchema is correctly executed diff --git a/pkg/sql/topic_test.go b/pkg/sql/topic_test.go index 7cca719..a080275 100644 --- a/pkg/sql/topic_test.go +++ b/pkg/sql/topic_test.go @@ -5,8 +5,8 @@ import ( "testing" "time" + "github.com/ThreeDotsLabs/watermill-sql/v3/pkg/sql" "github.com/ThreeDotsLabs/watermill/message" - "github.com/julesjcraske/watermill-sql/v3/pkg/sql" "github.com/pkg/errors" "github.com/stretchr/testify/assert" From 32cd23d84ecfb8c69cf31f70ce79b470e9e0c2b8 Mon Sep 17 00:00:00 2001 From: Julian Craske Date: Thu, 5 Sep 2024 10:47:44 +0100 Subject: [PATCH 06/15] start adjusting the tests to use the new adapters --- pkg/{sql => }/pgx/pgx_adapter.go | 0 pkg/sql/pubsub_test.go | 88 ++++++++++++++++++++++++++------ pkg/sql/schema_adapter_test.go | 12 ++--- pkg/sql/sql_adapter.go | 10 ++-- 4 files changed, 84 insertions(+), 26 deletions(-) rename pkg/{sql => }/pgx/pgx_adapter.go (100%) diff --git a/pkg/sql/pgx/pgx_adapter.go b/pkg/pgx/pgx_adapter.go similarity index 100% rename from pkg/sql/pgx/pgx_adapter.go rename to pkg/pgx/pgx_adapter.go diff --git a/pkg/sql/pubsub_test.go b/pkg/sql/pubsub_test.go index 1671608..a8c4eba 100644 --- a/pkg/sql/pubsub_test.go +++ b/pkg/sql/pubsub_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/ThreeDotsLabs/watermill" + wpgx "github.com/ThreeDotsLabs/watermill-sql/v3/pkg/pgx" "github.com/ThreeDotsLabs/watermill-sql/v3/pkg/sql" "github.com/ThreeDotsLabs/watermill/message" "github.com/ThreeDotsLabs/watermill/message/subscriber" @@ -25,9 +26,9 @@ var ( logger = watermill.NewStdLogger(false, false) ) -func newPubSub(t *testing.T, db *stdSQL.DB, consumerGroup string, schemaAdapter sql.SchemaAdapter, offsetsAdapter sql.OffsetsAdapter) (message.Publisher, message.Subscriber) { +func newPubSub(t *testing.T, db sql.Beginner, consumerGroup string, schemaAdapter sql.SchemaAdapter, offsetsAdapter sql.OffsetsAdapter) (message.Publisher, message.Subscriber) { publisher, err := sql.NewPublisher( - &sql.StdSQLBeginner{DB: db}, + db, sql.PublisherConfig{ SchemaAdapter: schemaAdapter, }, @@ -36,7 +37,7 @@ func newPubSub(t *testing.T, db *stdSQL.DB, consumerGroup string, schemaAdapter require.NoError(t, err) subscriber, err := sql.NewSubscriber( - &sql.StdSQLBeginner{DB: db}, + db, sql.SubscriberConfig{ ConsumerGroup: consumerGroup, @@ -52,7 +53,7 @@ func newPubSub(t *testing.T, db *stdSQL.DB, consumerGroup string, schemaAdapter return publisher, subscriber } -func newMySQL(t *testing.T) *stdSQL.DB { +func newMySQL(t *testing.T) sql.Beginner { addr := os.Getenv("WATERMILL_TEST_MYSQL_HOST") if addr == "" { addr = "localhost" @@ -70,10 +71,10 @@ func newMySQL(t *testing.T) *stdSQL.DB { err = db.Ping() require.NoError(t, err) - return db + return sql.StdSQLBeginner{DB: db} } -func newPostgreSQL(t *testing.T) *stdSQL.DB { +func newPostgreSQL(t *testing.T) sql.Beginner { addr := os.Getenv("WATERMILL_TEST_POSTGRES_HOST") if addr == "" { addr = "localhost" @@ -86,10 +87,10 @@ func newPostgreSQL(t *testing.T) *stdSQL.DB { err = db.Ping() require.NoError(t, err) - return db + return sql.StdSQLBeginner{DB: db} } -func newPgxPostgreSQL(t *testing.T) *stdSQL.DB { +func newPgxPostgreSQL(t *testing.T) sql.Beginner { addr := os.Getenv("WATERMILL_TEST_POSTGRES_HOST") if addr == "" { addr = "localhost" @@ -104,7 +105,24 @@ func newPgxPostgreSQL(t *testing.T) *stdSQL.DB { err = db.Ping() require.NoError(t, err) - return db + return sql.StdSQLBeginner{DB: db} +} + +func newPgx(t *testing.T) sql.Beginner { + addr := os.Getenv("WATERMILL_TEST_POSTGRES_HOST") + if addr == "" { + addr = "localhost" + } + + connStr := fmt.Sprintf("postgres://watermill:password@%s/watermill?sslmode=disable", addr) + + db, err := pgx.Connect(context.Background(), connStr) + require.NoError(t, err) + + err = db.Ping(context.Background()) + require.NoError(t, err) + + return wpgx.Beginner{Conn: db} } func createMySQLPubSubWithConsumerGroup(t *testing.T, consumerGroup string) (message.Publisher, message.Subscriber) { @@ -187,6 +205,24 @@ func createPgxPostgreSQLPubSubWithConsumerGroup(t *testing.T, consumerGroup stri return newPubSub(t, newPgxPostgreSQL(t), consumerGroup, schemaAdapter, offsetsAdapter) } +func createPgxPubSubWithConsumerGroup(t *testing.T, consumerGroup string) (message.Publisher, message.Subscriber) { + schemaAdapter := &testPostgreSQLSchema{ + sql.DefaultPostgreSQLSchema{ + GenerateMessagesTableName: func(topic string) string { + return fmt.Sprintf(`"test_pgx_%s"`, topic) + }, + }, + } + + offsetsAdapter := sql.DefaultPostgreSQLOffsetsAdapter{ + GenerateMessagesOffsetsTableName: func(topic string) string { + return fmt.Sprintf(`"test_pgx_offsets_%s"`, topic) + }, + } + + return newPubSub(t, newPgx(t), consumerGroup, schemaAdapter, offsetsAdapter) +} + func createPostgreSQLPubSub(t *testing.T) (message.Publisher, message.Subscriber) { return createPostgreSQLPubSubWithConsumerGroup(t, "test") } @@ -195,6 +231,10 @@ func createPgxPostgreSQLPubSub(t *testing.T) (message.Publisher, message.Subscri return createPgxPostgreSQLPubSubWithConsumerGroup(t, "test") } +func createPgxPubSub(t *testing.T) (message.Publisher, message.Subscriber) { + return createPgxPubSubWithConsumerGroup(t, "test") +} + func TestMySQLPublishSubscribe(t *testing.T) { t.Parallel() @@ -249,6 +289,24 @@ func TestPgxPostgreSQLPublishSubscribe(t *testing.T) { ) } +func TestPgxPublishSubscribe(t *testing.T) { + t.Parallel() + + features := tests.Features{ + ConsumerGroups: true, + ExactlyOnceDelivery: false, + GuaranteedOrder: true, + Persistent: true, + } + + tests.TestPubSub( + t, + features, + createPgxPubSub, + createPgxPubSubWithConsumerGroup, + ) +} + func TestCtxValues(t *testing.T) { pubSubConstructors := []struct { Name string @@ -309,7 +367,7 @@ func TestNotMissingMessages(t *testing.T) { pubSubs := []struct { Name string - DbConstructor func(t *testing.T) *stdSQL.DB + DbConstructor func(t *testing.T) sql.Beginner SchemaAdapter sql.SchemaAdapter OffsetsAdapter sql.OffsetsAdapter }{ @@ -344,7 +402,7 @@ func TestNotMissingMessages(t *testing.T) { } sub, err := sql.NewSubscriber( - &sql.StdSQLBeginner{DB: db}, + db, sql.SubscriberConfig{ ConsumerGroup: "consumerGroup", @@ -394,7 +452,7 @@ func TestNotMissingMessages(t *testing.T) { time.Sleep(time.Millisecond * 10) pub0, err := sql.NewPublisher( - &sql.StdSQLTx{Tx: tx0}, + tx0, sql.PublisherConfig{ SchemaAdapter: pubSub.SchemaAdapter, }, @@ -405,7 +463,7 @@ func TestNotMissingMessages(t *testing.T) { require.NoError(t, err, "cannot publish message") pub1, err := sql.NewPublisher( - &sql.StdSQLTx{Tx: tx1}, + tx1, sql.PublisherConfig{ SchemaAdapter: pubSub.SchemaAdapter, }, @@ -416,7 +474,7 @@ func TestNotMissingMessages(t *testing.T) { require.NoError(t, err, "cannot publish message") pubRollback, err := sql.NewPublisher( - &sql.StdSQLTx{Tx: txRollback}, + txRollback, sql.PublisherConfig{ SchemaAdapter: pubSub.SchemaAdapter, }, @@ -427,7 +485,7 @@ func TestNotMissingMessages(t *testing.T) { require.NoError(t, err, "cannot publish message") pub2, err := sql.NewPublisher( - &sql.StdSQLTx{Tx: tx2}, + tx2, sql.PublisherConfig{ SchemaAdapter: pubSub.SchemaAdapter, }, diff --git a/pkg/sql/schema_adapter_test.go b/pkg/sql/schema_adapter_test.go index ac330c4..8438721 100644 --- a/pkg/sql/schema_adapter_test.go +++ b/pkg/sql/schema_adapter_test.go @@ -16,7 +16,7 @@ import ( // TestDefaultMySQLSchema checks if the SQL schema defined in DefaultMySQLSchema is correctly executed // and if message marshaling works as intended. func TestDefaultMySQLSchema(t *testing.T) { - db := &sql.StdSQLBeginner{DB: newMySQL(t)} + db := newMySQL(t) publisher, err := sql.NewPublisher(db, sql.PublisherConfig{ SchemaAdapter: sql.DefaultMySQLSchema{}, @@ -36,11 +36,11 @@ func TestDefaultMySQLSchema(t *testing.T) { func TestDefaultMySQLSchema_implicit_commit_warning(t *testing.T) { db := newMySQL(t) - tx, err := db.Begin() + tx, err := db.BeginTx(context.Background(), nil) require.NoError(t, err) schemaAdapter := sql.DefaultMySQLSchema{} - _, err = sql.NewPublisher(&sql.StdSQLTx{Tx: tx}, sql.PublisherConfig{ + _, err = sql.NewPublisher(tx, sql.PublisherConfig{ SchemaAdapter: schemaAdapter, AutoInitializeSchema: true, }, logger) @@ -50,11 +50,11 @@ func TestDefaultMySQLSchema_implicit_commit_warning(t *testing.T) { func TestDefaultMySQLSchema_implicit_commit(t *testing.T) { db := newMySQL(t) - tx, err := db.Begin() + tx, err := db.BeginTx(context.Background(), nil) require.NoError(t, err) schemaAdapter := sql.DefaultMySQLSchema{} - _, err = sql.NewPublisher(&sql.StdSQLTx{Tx: tx}, sql.PublisherConfig{ + _, err = sql.NewPublisher(tx, sql.PublisherConfig{ SchemaAdapter: schemaAdapter, AutoInitializeSchema: true, }, logger) @@ -64,7 +64,7 @@ func TestDefaultMySQLSchema_implicit_commit(t *testing.T) { // TestDefaultPostgreSQLSchema checks if the SQL schema defined in DefaultPostgreSQLSchema is correctly executed // and if message marshaling works as intended. func TestDefaultPostgreSQLSchema(t *testing.T) { - db := &sql.StdSQLBeginner{DB: newPostgreSQL(t)} + db := newPostgreSQL(t) publisher, err := sql.NewPublisher(db, sql.PublisherConfig{ SchemaAdapter: sql.DefaultPostgreSQLSchema{}, diff --git a/pkg/sql/sql_adapter.go b/pkg/sql/sql_adapter.go index 0849125..03ca058 100644 --- a/pkg/sql/sql_adapter.go +++ b/pkg/sql/sql_adapter.go @@ -14,28 +14,28 @@ type StdSQLTx struct { } // BeginTx converts the stdSQL.Tx struct to our Tx interface -func (c *StdSQLBeginner) BeginTx(ctx context.Context, options *sql.TxOptions) (Tx, error) { +func (c StdSQLBeginner) BeginTx(ctx context.Context, options *sql.TxOptions) (Tx, error) { tx, err := c.DB.BeginTx(ctx, options) return &StdSQLTx{tx}, err } // ExecContext converts the stdSQL.Result struct to our Result interface -func (c *StdSQLBeginner) ExecContext(ctx context.Context, query string, args ...interface{}) (Result, error) { +func (c StdSQLBeginner) ExecContext(ctx context.Context, query string, args ...interface{}) (Result, error) { return c.DB.ExecContext(ctx, query, args...) } // QueryContext converts the stdSQL.Rows struct to our Rows interface -func (c *StdSQLBeginner) QueryContext(ctx context.Context, query string, args ...interface{}) (Rows, error) { +func (c StdSQLBeginner) QueryContext(ctx context.Context, query string, args ...interface{}) (Rows, error) { return c.DB.QueryContext(ctx, query, args...) } // ExecContext converts the stdSQL.Result struct to our Result interface -func (t *StdSQLTx) ExecContext(ctx context.Context, query string, args ...any) (Result, error) { +func (t StdSQLTx) ExecContext(ctx context.Context, query string, args ...any) (Result, error) { return t.Tx.ExecContext(ctx, query, args...) } // QueryContext converts the stdSQL.Rows struct to our Rows interface -func (t *StdSQLTx) QueryContext(ctx context.Context, query string, args ...any) (Rows, error) { +func (t StdSQLTx) QueryContext(ctx context.Context, query string, args ...any) (Rows, error) { return t.Tx.QueryContext(ctx, query, args...) } From 64a15bde05d523d4ee0a3b0e003c0406680fe6e5 Mon Sep 17 00:00:00 2001 From: Julian Craske Date: Mon, 14 Oct 2024 10:22:31 +0100 Subject: [PATCH 07/15] use pgx v5, try and resolve concurrent table creates --- go.mod | 23 ++-- go.sum | 160 +++----------------------- pkg/pgx/pgx_adapter.go | 5 +- pkg/sql/offsets_adapter_postgresql.go | 19 ++- pkg/sql/pubsub_test.go | 20 ++-- pkg/sql/schema.go | 14 ++- pkg/sql/schema_adapter_postgresql.go | 13 ++- 7 files changed, 80 insertions(+), 174 deletions(-) diff --git a/go.mod b/go.mod index c72995c..0f97e07 100644 --- a/go.mod +++ b/go.mod @@ -1,13 +1,14 @@ module github.com/ThreeDotsLabs/watermill-sql/v3 -go 1.20 +go 1.21 + +toolchain go1.22.1 require ( github.com/ThreeDotsLabs/watermill v1.2.0 github.com/go-sql-driver/mysql v1.4.1 - github.com/jackc/pgconn v1.6.4 - github.com/jackc/pgx/v4 v4.8.1 - github.com/lib/pq v1.3.0 + github.com/jackc/pgx/v5 v5.7.1 + github.com/lib/pq v1.10.9 github.com/oklog/ulid v1.3.1 github.com/pkg/errors v0.9.1 github.com/stretchr/testify v1.8.1 @@ -18,18 +19,16 @@ require ( github.com/google/uuid v1.3.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect - github.com/jackc/chunkreader/v2 v2.0.1 // indirect - github.com/jackc/pgio v1.0.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect - github.com/jackc/pgproto3/v2 v2.0.2 // indirect - github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect - github.com/jackc/pgtype v1.4.2 // indirect + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect + github.com/jackc/puddle/v2 v2.2.2 // indirect + github.com/kr/text v0.1.0 // indirect github.com/lithammer/shortuuid/v3 v3.0.7 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rogpeppe/go-internal v1.10.0 // indirect - golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 // indirect - golang.org/x/text v0.3.3 // indirect - golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 // indirect + golang.org/x/crypto v0.27.0 // indirect + golang.org/x/sync v0.8.0 // indirect + golang.org/x/text v0.18.0 // indirect google.golang.org/appengine v1.6.7 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index b66df85..25ba7fa 100644 --- a/go.sum +++ b/go.sum @@ -1,21 +1,11 @@ -github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/ThreeDotsLabs/watermill v1.2.0 h1:TU3TML1dnQ/ifK09F2+4JQk2EKhmhXe7Qv7eb5ZpTS8= github.com/ThreeDotsLabs/watermill v1.2.0/go.mod h1:IuVxGk/kgCN0cex2S94BLglUiB0PwOm8hbUhm6g2Nx4= -github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I= -github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ= -github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= -github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= -github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/go-sql-driver/mysql v1.4.1 h1:g24URVg0OFbNUTx9qqY1IRZ9D9z3iPyi5zKhQZpNwpA= github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= -github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= -github.com/gofrs/uuid v3.2.0+incompatible h1:y12jRkkFxsd7GpqdSZ+/KCs/fJbqpEXSGd4+jfEaewE= -github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -24,171 +14,57 @@ github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= -github.com/jackc/chunkreader v1.0.0/go.mod h1:RT6O25fNZIuasFJRyZ4R/Y2BbhasbmZXF9QQ7T3kePo= -github.com/jackc/chunkreader/v2 v2.0.0/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk= -github.com/jackc/chunkreader/v2 v2.0.1 h1:i+RDz65UE+mmpjTfyz0MoVTnzeYxroil2G82ki7MGG8= -github.com/jackc/chunkreader/v2 v2.0.1/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk= -github.com/jackc/pgconn v0.0.0-20190420214824-7e0022ef6ba3/go.mod h1:jkELnwuX+w9qN5YIfX0fl88Ehu4XC3keFuOJJk9pcnA= -github.com/jackc/pgconn v0.0.0-20190824142844-760dd75542eb/go.mod h1:lLjNuW/+OfW9/pnVKPazfWOgNfH2aPem8YQ7ilXGvJE= -github.com/jackc/pgconn v0.0.0-20190831204454-2fabfa3c18b7/go.mod h1:ZJKsE/KZfsUgOEh9hBm+xYTstcNHg7UPMVJqRfQxq4s= -github.com/jackc/pgconn v1.4.0/go.mod h1:Y2O3ZDF0q4mMacyWV3AstPJpeHXWGEetiFttmq5lahk= -github.com/jackc/pgconn v1.5.0/go.mod h1:QeD3lBfpTFe8WUnPZWN5KY/mB8FGMIYRdd8P8Jr0fAI= -github.com/jackc/pgconn v1.5.1-0.20200601181101-fa742c524853/go.mod h1:QeD3lBfpTFe8WUnPZWN5KY/mB8FGMIYRdd8P8Jr0fAI= -github.com/jackc/pgconn v1.6.4 h1:S7T6cx5o2OqmxdHaXLH1ZeD1SbI8jBznyYE9Ec0RCQ8= -github.com/jackc/pgconn v1.6.4/go.mod h1:w2pne1C2tZgP+TvjqLpOigGzNqjBgQW9dUw/4Chex78= -github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE= -github.com/jackc/pgio v1.0.0/go.mod h1:oP+2QK2wFfUWgr+gxjoBH9KGBb31Eio69xUb0w5bYf8= -github.com/jackc/pgmock v0.0.0-20190831213851-13a1b77aafa2 h1:JVX6jT/XfzNqIjye4717ITLaNwV9mWbJx0dLCpcRzdA= -github.com/jackc/pgmock v0.0.0-20190831213851-13a1b77aafa2/go.mod h1:fGZlG77KXmcq05nJLRkk0+p82V8B8Dw8KN2/V9c/OAE= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= -github.com/jackc/pgproto3 v1.1.0/go.mod h1:eR5FA3leWg7p9aeAqi37XOTgTIbkABlvcPB3E5rlc78= -github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190420180111-c116219b62db/go.mod h1:bhq50y+xrl9n5mRYyCBFKkpRVTLYJVWeCc+mEAI3yXA= -github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190609003834-432c2951c711/go.mod h1:uH0AWtUmuShn0bcesswc4aBTWGvw0cAxIJp+6OB//Wg= -github.com/jackc/pgproto3/v2 v2.0.0-rc3/go.mod h1:ryONWYqW6dqSg1Lw6vXNMXoBJhpzvWKnT95C46ckYeM= -github.com/jackc/pgproto3/v2 v2.0.0-rc3.0.20190831210041-4c03ce451f29/go.mod h1:ryONWYqW6dqSg1Lw6vXNMXoBJhpzvWKnT95C46ckYeM= -github.com/jackc/pgproto3/v2 v2.0.1/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= -github.com/jackc/pgproto3/v2 v2.0.2 h1:q1Hsy66zh4vuNsajBUF2PNqfAMMfxU5mk594lPE9vjY= -github.com/jackc/pgproto3/v2 v2.0.2/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= -github.com/jackc/pgservicefile v0.0.0-20200307190119-3430c5407db8/go.mod h1:vsD4gTJCa9TptPL8sPkXrLZ+hDuNrZCnj29CQpr4X1E= -github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b h1:C8S2+VttkHFdOOCXJe+YGfa4vHYwlt4Zx+IVXQ97jYg= -github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b/go.mod h1:vsD4gTJCa9TptPL8sPkXrLZ+hDuNrZCnj29CQpr4X1E= -github.com/jackc/pgtype v0.0.0-20190421001408-4ed0de4755e0/go.mod h1:hdSHsc1V01CGwFsrv11mJRHWJ6aifDLfdV3aVjFF0zg= -github.com/jackc/pgtype v0.0.0-20190824184912-ab885b375b90/go.mod h1:KcahbBH1nCMSo2DXpzsoWOAfFkdEtEJpPbVLq8eE+mc= -github.com/jackc/pgtype v0.0.0-20190828014616-a8802b16cc59/go.mod h1:MWlu30kVJrUS8lot6TQqcg7mtthZ9T0EoIBFiJcmcyw= -github.com/jackc/pgtype v1.2.0/go.mod h1:5m2OfMh1wTK7x+Fk952IDmI4nw3nPrvtQdM0ZT4WpC0= -github.com/jackc/pgtype v1.3.1-0.20200510190516-8cd94a14c75a/go.mod h1:vaogEUkALtxZMCH411K+tKzNpwzCKU+AnPzBKZ+I+Po= -github.com/jackc/pgtype v1.3.1-0.20200606141011-f6355165a91c/go.mod h1:cvk9Bgu/VzJ9/lxTO5R5sf80p0DiucVtN7ZxvaC4GmQ= -github.com/jackc/pgtype v1.4.2 h1:t+6LWm5eWPLX1H5Se702JSBcirq6uWa4jiG4wV1rAWY= -github.com/jackc/pgtype v1.4.2/go.mod h1:JCULISAZBFGrHaOXIIFiyfzW5VY0GRitRr8NeJsrdig= -github.com/jackc/pgx/v4 v4.0.0-20190420224344-cc3461e65d96/go.mod h1:mdxmSJJuR08CZQyj1PVQBHy9XOp5p8/SHH6a0psbY9Y= -github.com/jackc/pgx/v4 v4.0.0-20190421002000-1b8f0016e912/go.mod h1:no/Y67Jkk/9WuGR0JG/JseM9irFbnEPbuWV2EELPNuM= -github.com/jackc/pgx/v4 v4.0.0-pre1.0.20190824185557-6972a5742186/go.mod h1:X+GQnOEnf1dqHGpw7JmHqHc1NxDoalibchSk9/RWuDc= -github.com/jackc/pgx/v4 v4.5.0/go.mod h1:EpAKPLdnTorwmPUUsqrPxy5fphV18j9q3wrfRXgo+kA= -github.com/jackc/pgx/v4 v4.6.1-0.20200510190926-94ba730bb1e9/go.mod h1:t3/cdRQl6fOLDxqtlyhe9UWgfIi9R8+8v8GKV5TRA/o= -github.com/jackc/pgx/v4 v4.6.1-0.20200606145419-4e5062306904/go.mod h1:ZDaNWkt9sW1JMiNn0kdYBaLelIhw7Pg4qd+Vk6tw7Hg= -github.com/jackc/pgx/v4 v4.8.1 h1:SUbCLP2pXvf/Sr/25KsuI4aTxiFYIvpfk4l6aTSdyCw= -github.com/jackc/pgx/v4 v4.8.1/go.mod h1:4HOLxrl8wToZJReD04/yB20GDwf4KBYETvlHciCnwW0= -github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= -github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= -github.com/jackc/puddle v1.1.0/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= -github.com/jackc/puddle v1.1.1/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= -github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= -github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= -github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.7.1 h1:x7SYsPBYDkHDksogeSmZZ5xzThcTgRz++I5E+ePFUcs= +github.com/jackc/pgx/v5 v5.7.1/go.mod h1:e7O26IywZZ+naJtWWos6i6fvWK+29etgITqrqHLfoZA= +github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= +github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= -github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= -github.com/lib/pq v1.1.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= -github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= -github.com/lib/pq v1.3.0 h1:/qkRGz8zljWiDcFvgpwUpwIAPu3r07TDvs3Rws+o/pU= -github.com/lib/pq v1.3.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= +github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/lithammer/shortuuid/v3 v3.0.7 h1:trX0KTHy4Pbwo/6ia8fscyHoGA+mf1jWbPJVuvyJQQ8= github.com/lithammer/shortuuid/v3 v3.0.7/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= -github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ= -github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= -github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= -github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= -github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= -github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= -github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ= -github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= -github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= -github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= -github.com/rs/zerolog v1.13.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OKkWU= -github.com/rs/zerolog v1.15.0/go.mod h1:xYTKnLHcpfU2225ny5qZjxnj9NvkumZYjJHlAThCjNc= -github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= -github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4= -github.com/shopspring/decimal v0.0.0-20200227202807-02e2044944cc h1:jUIKcSPO9MoMJBbEoyE/RJoE8vz7Mb8AjvifMMwSyvY= -github.com/shopspring/decimal v0.0.0-20200227202807-02e2044944cc/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= -github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= -github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= -github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= -github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q= -go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= -go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= -go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= -go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= -go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= -go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= -go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= -go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20190411191339-88737f569e3a/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE= -golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= -golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= -golang.org/x/crypto v0.0.0-20190911031432-227b76d455e7/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= -golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9 h1:psW17arqaxU48Z5kZ0CQnkZWQJsqcURM6tKiBApRjXI= -golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= -golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= -golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A= +golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70= golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= -golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= -golang.org/x/text v0.3.3 h1:cokOdA+Jmi5PJGXLlLllQSgYigAEfHXJAERHVMaCc2k= -golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= +golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= -golang.org/x/tools v0.0.0-20190425163242-31fd60d6bfdc/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= -golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= -golang.org/x/tools v0.0.0-20190823170909-c4a336ef6a2f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c= google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= -gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= -gopkg.in/inconshreveable/log15.v2 v2.0.0-20180818164646-67afb5ed74ec/go.mod h1:aPpfJ7XW+gOuirDoZ8gHhLh3kZ1B08FtV2bbmy7Jv3s= -gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= diff --git a/pkg/pgx/pgx_adapter.go b/pkg/pgx/pgx_adapter.go index f07a224..c23f074 100644 --- a/pkg/pgx/pgx_adapter.go +++ b/pkg/pgx/pgx_adapter.go @@ -7,9 +7,8 @@ import ( "time" "github.com/ThreeDotsLabs/watermill-sql/v3/pkg/sql" - - "github.com/jackc/pgconn" - "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" ) type Conn interface { diff --git a/pkg/sql/offsets_adapter_postgresql.go b/pkg/sql/offsets_adapter_postgresql.go index 05db880..6f1d82d 100644 --- a/pkg/sql/offsets_adapter_postgresql.go +++ b/pkg/sql/offsets_adapter_postgresql.go @@ -16,20 +16,29 @@ import ( type DefaultPostgreSQLOffsetsAdapter struct { // GenerateMessagesOffsetsTableName may be used to override how the messages/offsets table name is generated. GenerateMessagesOffsetsTableName func(topic string) string + + // AdvisoryXActLock if greater than zero will use pg_advisory_xact_lock to lock the transaction which is needed + // to concurrently create tables. + AdvisoryXActLock int } func (a DefaultPostgreSQLOffsetsAdapter) SchemaInitializingQueries(topic string) []Query { - return []Query{ - { - Query: ` + createOffsetsTableQuery := ` CREATE TABLE IF NOT EXISTS ` + a.MessagesOffsetsTable(topic) + ` ( consumer_group VARCHAR(255) NOT NULL, offset_acked BIGINT, last_processed_transaction_id xid8 NOT NULL, PRIMARY KEY(consumer_group) - )`, - }, + )` + + queries := []Query{{Query: createOffsetsTableQuery}} + if a.AdvisoryXActLock > 0 { + queries = append([]Query{ + {Query: fmt.Sprintf("SELECT pg_advisory_xact_lock(%d);", a.AdvisoryXActLock)}, + }, queries...) } + + return queries } func (a DefaultPostgreSQLOffsetsAdapter) NextOffsetQuery(topic, consumerGroup string) Query { diff --git a/pkg/sql/pubsub_test.go b/pkg/sql/pubsub_test.go index a8c4eba..0158fab 100644 --- a/pkg/sql/pubsub_test.go +++ b/pkg/sql/pubsub_test.go @@ -15,8 +15,9 @@ import ( "github.com/ThreeDotsLabs/watermill/message/subscriber" "github.com/ThreeDotsLabs/watermill/pubsub/tests" driver "github.com/go-sql-driver/mysql" - "github.com/jackc/pgx/v4" - "github.com/jackc/pgx/v4/stdlib" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/jackc/pgx/v5/stdlib" _ "github.com/lib/pq" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -108,15 +109,17 @@ func newPgxPostgreSQL(t *testing.T) sql.Beginner { return sql.StdSQLBeginner{DB: db} } -func newPgx(t *testing.T) sql.Beginner { +func newPgx(t *testing.T) wpgx.Beginner { addr := os.Getenv("WATERMILL_TEST_POSTGRES_HOST") if addr == "" { addr = "localhost" } connStr := fmt.Sprintf("postgres://watermill:password@%s/watermill?sslmode=disable", addr) + conf, err := pgxpool.ParseConfig(connStr) + require.NoError(t, err) - db, err := pgx.Connect(context.Background(), connStr) + db, err := pgxpool.NewWithConfig(context.Background(), conf) require.NoError(t, err) err = db.Ping(context.Background()) @@ -206,11 +209,10 @@ func createPgxPostgreSQLPubSubWithConsumerGroup(t *testing.T, consumerGroup stri } func createPgxPubSubWithConsumerGroup(t *testing.T, consumerGroup string) (message.Publisher, message.Subscriber) { - schemaAdapter := &testPostgreSQLSchema{ - sql.DefaultPostgreSQLSchema{ - GenerateMessagesTableName: func(topic string) string { - return fmt.Sprintf(`"test_pgx_%s"`, topic) - }, + schemaAdapter := &sql.DefaultPostgreSQLSchema{ + AdvisoryXActLock: 1, + GenerateMessagesTableName: func(topic string) string { + return fmt.Sprintf(`"test_pgx_%s"`, topic) }, } diff --git a/pkg/sql/schema.go b/pkg/sql/schema.go index 661fa4c..e66b456 100644 --- a/pkg/sql/schema.go +++ b/pkg/sql/schema.go @@ -11,7 +11,7 @@ func initializeSchema( ctx context.Context, topic string, logger watermill.LoggerAdapter, - db ContextExecutor, + db Beginner, schemaAdapter SchemaAdapter, offsetsAdapter OffsetsAdapter, ) error { @@ -29,12 +29,22 @@ func initializeSchema( "query": initializingQueries, }) + tx, err := db.BeginTx(ctx, nil) + if err != nil { + return errors.Wrap(err, "could not start transaction") + } + for _, q := range initializingQueries { - _, err := db.ExecContext(ctx, q.Query, q.Args...) + _, err := tx.ExecContext(ctx, q.Query, q.Args...) if err != nil { return errors.Wrap(err, "could not initialize schema") } } + err = tx.Commit() + if err != nil { + return errors.Wrap(err, "could not commit transaction") + } + return nil } diff --git a/pkg/sql/schema_adapter_postgresql.go b/pkg/sql/schema_adapter_postgresql.go index 9b40bf0..e036b26 100644 --- a/pkg/sql/schema_adapter_postgresql.go +++ b/pkg/sql/schema_adapter_postgresql.go @@ -22,6 +22,10 @@ type DefaultPostgreSQLSchema struct { // // Default value is 100. SubscribeBatchSize int + + // AdvisoryXActLock if greater than zero will use pg_advisory_xact_lock to lock the transaction which is needed + // to concurrently create tables. + AdvisoryXActLock int } func (s DefaultPostgreSQLSchema) SchemaInitializingQueries(topic string) []Query { @@ -37,7 +41,14 @@ func (s DefaultPostgreSQLSchema) SchemaInitializingQueries(topic string) []Query ); ` - return []Query{{Query: createMessagesTable}} + queries := []Query{{Query: createMessagesTable}} + if s.AdvisoryXActLock > 0 { + queries = append([]Query{ + {Query: fmt.Sprintf("SELECT pg_advisory_xact_lock(%d);", s.AdvisoryXActLock)}, + }, queries...) + } + + return queries } func (s DefaultPostgreSQLSchema) InsertQuery(topic string, msgs message.Messages) (Query, error) { From 4c6668ca33e667411ca4f0d213bcd70817da7785 Mon Sep 17 00:00:00 2001 From: Julian Craske Date: Mon, 14 Oct 2024 10:35:37 +0100 Subject: [PATCH 08/15] revert and comment lock --- pkg/sql/offsets_adapter_postgresql.go | 18 +++--------------- pkg/sql/schema_adapter_postgresql.go | 2 +- 2 files changed, 4 insertions(+), 16 deletions(-) diff --git a/pkg/sql/offsets_adapter_postgresql.go b/pkg/sql/offsets_adapter_postgresql.go index 6f1d82d..3103337 100644 --- a/pkg/sql/offsets_adapter_postgresql.go +++ b/pkg/sql/offsets_adapter_postgresql.go @@ -16,29 +16,17 @@ import ( type DefaultPostgreSQLOffsetsAdapter struct { // GenerateMessagesOffsetsTableName may be used to override how the messages/offsets table name is generated. GenerateMessagesOffsetsTableName func(topic string) string - - // AdvisoryXActLock if greater than zero will use pg_advisory_xact_lock to lock the transaction which is needed - // to concurrently create tables. - AdvisoryXActLock int } func (a DefaultPostgreSQLOffsetsAdapter) SchemaInitializingQueries(topic string) []Query { - createOffsetsTableQuery := ` + return []Query{{Query: ` CREATE TABLE IF NOT EXISTS ` + a.MessagesOffsetsTable(topic) + ` ( consumer_group VARCHAR(255) NOT NULL, offset_acked BIGINT, last_processed_transaction_id xid8 NOT NULL, PRIMARY KEY(consumer_group) - )` - - queries := []Query{{Query: createOffsetsTableQuery}} - if a.AdvisoryXActLock > 0 { - queries = append([]Query{ - {Query: fmt.Sprintf("SELECT pg_advisory_xact_lock(%d);", a.AdvisoryXActLock)}, - }, queries...) - } - - return queries + )`, + }} } func (a DefaultPostgreSQLOffsetsAdapter) NextOffsetQuery(topic, consumerGroup string) Query { diff --git a/pkg/sql/schema_adapter_postgresql.go b/pkg/sql/schema_adapter_postgresql.go index e036b26..a0651df 100644 --- a/pkg/sql/schema_adapter_postgresql.go +++ b/pkg/sql/schema_adapter_postgresql.go @@ -24,7 +24,7 @@ type DefaultPostgreSQLSchema struct { SubscribeBatchSize int // AdvisoryXActLock if greater than zero will use pg_advisory_xact_lock to lock the transaction which is needed - // to concurrently create tables. + // to concurrently create tables. https://stackoverflow.com/questions/74261789/postgres-create-table-if-not-exists-%E2%87%92-23505 AdvisoryXActLock int } From 96dc97f869b7d0bf7dba4453b268bcd0a7b07a08 Mon Sep 17 00:00:00 2001 From: Julian Craske Date: Wed, 16 Oct 2024 09:47:51 +0100 Subject: [PATCH 09/15] initialise in transaction, add to all tests --- pkg/sql/pubsub_test.go | 57 ++++++++++++++++++++++++---- pkg/sql/schema.go | 39 +++++++++++++++---- pkg/sql/schema_adapter_postgresql.go | 4 ++ 3 files changed, 86 insertions(+), 14 deletions(-) diff --git a/pkg/sql/pubsub_test.go b/pkg/sql/pubsub_test.go index 0158fab..f3960a9 100644 --- a/pkg/sql/pubsub_test.go +++ b/pkg/sql/pubsub_test.go @@ -311,20 +311,29 @@ func TestPgxPublishSubscribe(t *testing.T) { func TestCtxValues(t *testing.T) { pubSubConstructors := []struct { - Name string - Constructor func(t *testing.T) (message.Publisher, message.Subscriber) + Name string + Constructor func(t *testing.T) (message.Publisher, message.Subscriber) + ExpectedType interface{} }{ { - Name: "mysql", - Constructor: createMySQLPubSub, + Name: "mysql", + Constructor: createMySQLPubSub, + ExpectedType: &sql.StdSQLTx{}, + }, + { + Name: "postgresql", + Constructor: createPostgreSQLPubSub, + ExpectedType: &sql.StdSQLTx{}, }, { - Name: "postgresql", - Constructor: createPostgreSQLPubSub, + Name: "pgx", + Constructor: createPgxPubSub, + ExpectedType: wpgx.Tx{}, }, } for _, constructor := range pubSubConstructors { + constructor := constructor pub, sub := constructor.Constructor(t) t.Run(constructor.Name, func(t *testing.T) { @@ -353,7 +362,7 @@ func TestCtxValues(t *testing.T) { tx, ok := sql.TxFromContext(msg.Context()) assert.True(t, ok) assert.NotNil(t, t, tx) - assert.IsType(t, &sql.StdSQLTx{}, tx) + assert.IsType(t, constructor.ExpectedType, tx) msg.Ack() case <-time.After(time.Second * 10): t.Fatal("no message received") @@ -385,6 +394,14 @@ func TestNotMissingMessages(t *testing.T) { SchemaAdapter: newPostgresSchemaAdapter(0), OffsetsAdapter: newPostgresOffsetsAdapter(), }, + { + Name: "pgx", + DbConstructor: func(t *testing.T) sql.Beginner { + return newPgx(t) + }, + SchemaAdapter: newPostgresSchemaAdapter(0), + OffsetsAdapter: newPostgresOffsetsAdapter(), + }, } for _, pubSub := range pubSubs { @@ -574,6 +591,32 @@ func TestConcurrentSubscribe_different_bulk_sizes(t *testing.T) { }, Test: tests.TestConcurrentSubscribe, }, + { + Name: "TestConcurrentSubscribe_pgx_1", + Constructor: func(t *testing.T) (message.Publisher, message.Subscriber) { + return newPubSub( + t, + newPgx(t), + "test", + newPostgresSchemaAdapter(1), + newPostgresOffsetsAdapter(), + ) + }, + Test: tests.TestPublishSubscribe, + }, + { + Name: "TestConcurrentSubscribe_pgx_5", + Constructor: func(t *testing.T) (message.Publisher, message.Subscriber) { + return newPubSub( + t, + newPgx(t), + "test", + newPostgresSchemaAdapter(5), + newPostgresOffsetsAdapter(), + ) + }, + Test: tests.TestConcurrentSubscribe, + }, } for i := range testCases { tc := testCases[i] diff --git a/pkg/sql/schema.go b/pkg/sql/schema.go index e66b456..e90d5cd 100644 --- a/pkg/sql/schema.go +++ b/pkg/sql/schema.go @@ -7,11 +7,16 @@ import ( "github.com/pkg/errors" ) +type RequiresTransaction interface { + // RequiresTransaction returns true if the schema adapter requires a transaction to be started before executing queries. + RequiresTransaction() bool +} + func initializeSchema( ctx context.Context, topic string, logger watermill.LoggerAdapter, - db Beginner, + db ContextExecutor, schemaAdapter SchemaAdapter, offsetsAdapter OffsetsAdapter, ) error { @@ -29,21 +34,41 @@ func initializeSchema( "query": initializingQueries, }) - tx, err := db.BeginTx(ctx, nil) - if err != nil { - return errors.Wrap(err, "could not start transaction") + if rt, ok := schemaAdapter.(RequiresTransaction); ok && rt.RequiresTransaction() { + err = initialiseInTx(ctx, db, initializingQueries) + if err != nil { + return errors.Wrap(err, "could not initialize schema in transaction") + } } for _, q := range initializingQueries { - _, err := tx.ExecContext(ctx, q.Query, q.Args...) + _, err := db.ExecContext(ctx, q.Query, q.Args...) if err != nil { return errors.Wrap(err, "could not initialize schema") } } - err = tx.Commit() + return nil +} + +func initialiseInTx(ctx context.Context, db ContextExecutor, initializingQueries []Query) error { + beginner, ok := db.(Beginner) + if !ok { + return errors.New("db is not a Beginner") + } + + err := runInTx(ctx, beginner, func(ctx context.Context, tx Tx) error { + for _, q := range initializingQueries { + _, err := tx.ExecContext(ctx, q.Query, q.Args...) + if err != nil { + return errors.Wrap(err, "could not initialize schema") + } + } + + return nil + }) if err != nil { - return errors.Wrap(err, "could not commit transaction") + return errors.Wrap(err, "run in tx") } return nil diff --git a/pkg/sql/schema_adapter_postgresql.go b/pkg/sql/schema_adapter_postgresql.go index a0651df..4af3c17 100644 --- a/pkg/sql/schema_adapter_postgresql.go +++ b/pkg/sql/schema_adapter_postgresql.go @@ -154,3 +154,7 @@ func (s DefaultPostgreSQLSchema) SubscribeIsolationLevel() sql.IsolationLevel { // For Postgres Repeatable Read is enough. return sql.LevelRepeatableRead } + +func (s DefaultPostgreSQLSchema) RequiresTransaction() bool { + return s.AdvisoryXActLock > 0 +} From d43f6af70c12fae8599b5f9a3f8e3c53a5e4f2e7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mi=C5=82osz=20Sm=C3=B3=C5=82ka?= Date: Mon, 18 Nov 2024 16:47:36 +0100 Subject: [PATCH 10/15] Try running CI on pr --- .github/workflows/pr.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/pr.yml b/.github/workflows/pr.yml index 3a19c48..aa4ed0c 100644 --- a/.github/workflows/pr.yml +++ b/.github/workflows/pr.yml @@ -1,5 +1,6 @@ name: pr on: + pr: push: branches-ignore: - master From 8dcc57e90580ff47901dc2426715bb556a5510b5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mi=C5=82osz=20Sm=C3=B3=C5=82ka?= Date: Mon, 18 Nov 2024 16:54:34 +0100 Subject: [PATCH 11/15] Fix? --- .github/workflows/pr.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/pr.yml b/.github/workflows/pr.yml index aa4ed0c..7ef3227 100644 --- a/.github/workflows/pr.yml +++ b/.github/workflows/pr.yml @@ -1,6 +1,6 @@ name: pr on: - pr: + pull_request: push: branches-ignore: - master From d170a6861237704bcfcddffeb40ebf8185bbc396 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mi=C5=82osz=20Sm=C3=B3=C5=82ka?= Date: Mon, 18 Nov 2024 17:01:55 +0100 Subject: [PATCH 12/15] go mod tidy --- go.mod | 4 ++-- go.sum | 41 ++++++++++++++++++++++++++--------------- 2 files changed, 28 insertions(+), 17 deletions(-) diff --git a/go.mod b/go.mod index fdce5df..7d71bb3 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ toolchain go1.23.0 require ( github.com/ThreeDotsLabs/watermill v1.4.0-rc.2 + github.com/ThreeDotsLabs/watermill-sql/v3 v3.1.0 github.com/go-sql-driver/mysql v1.4.1 github.com/jackc/pgx/v5 v5.7.1 github.com/lib/pq v1.10.9 @@ -22,11 +23,10 @@ require ( github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect - github.com/kr/text v0.1.0 // indirect github.com/lithammer/shortuuid/v3 v3.0.7 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/rogpeppe/go-internal v1.10.0 // indirect + github.com/sony/gobreaker v1.0.0 // indirect golang.org/x/crypto v0.27.0 // indirect golang.org/x/sync v0.8.0 // indirect golang.org/x/text v0.18.0 // indirect diff --git a/go.sum b/go.sum index 024413f..f0024ed 100644 --- a/go.sum +++ b/go.sum @@ -1,10 +1,12 @@ -github.com/ThreeDotsLabs/watermill v1.2.0 h1:TU3TML1dnQ/ifK09F2+4JQk2EKhmhXe7Qv7eb5ZpTS8= -github.com/ThreeDotsLabs/watermill v1.2.0/go.mod h1:IuVxGk/kgCN0cex2S94BLglUiB0PwOm8hbUhm6g2Nx4= +github.com/ThreeDotsLabs/watermill v1.4.0-rc.2 h1:K62uIAKOkCHTXtAwY+Nj95vyLR0y25UMBsbf/FuWCeQ= +github.com/ThreeDotsLabs/watermill v1.4.0-rc.2/go.mod h1:lBnrLbxOjeMRgcJbv+UiZr8Ylz8RkJ4m6i/VN/Nk+to= +github.com/ThreeDotsLabs/watermill-sql/v3 v3.1.0 h1:g4uE5Nm3Z6LVB3m+uMgHlN4ne4bDpwf3RJmXYRgMv94= +github.com/ThreeDotsLabs/watermill-sql/v3 v3.1.0/go.mod h1:G8/otZYWLTCeYL2Ww3ujQ7gQ/3+jw5Bj0UtyKn7bBjA= +github.com/cenkalti/backoff/v3 v3.2.2 h1:cfUAAO3yvKMYKPrvhDuHSwQnhZNk/RMHKdZqKTxfm6M= +github.com/cenkalti/backoff/v3 v3.2.2/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vbaY= -github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-sql-driver/mysql v1.4.1 h1:g24URVg0OFbNUTx9qqY1IRZ9D9z3iPyi5zKhQZpNwpA= github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -16,19 +18,30 @@ github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= +github.com/jackc/chunkreader/v2 v2.0.1 h1:i+RDz65UE+mmpjTfyz0MoVTnzeYxroil2G82ki7MGG8= +github.com/jackc/chunkreader/v2 v2.0.1/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk= +github.com/jackc/pgconn v1.14.3 h1:bVoTr12EGANZz66nZPkMInAV/KHD2TxH9npjXXgiB3w= +github.com/jackc/pgconn v1.14.3/go.mod h1:RZbme4uasqzybK2RK5c65VsHxoyaml09lx3tXOcO/VM= +github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE= +github.com/jackc/pgio v1.0.0/go.mod h1:oP+2QK2wFfUWgr+gxjoBH9KGBb31Eio69xUb0w5bYf8= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgproto3/v2 v2.3.3 h1:1HLSx5H+tXR9pW3in3zaztoEwQYRC9SQaYUHjTSUOag= +github.com/jackc/pgproto3/v2 v2.3.3/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgtype v1.14.0 h1:y+xUdabmyMkJLyApYuPj38mW+aAIqCe5uuBB51rH3Vw= +github.com/jackc/pgtype v1.14.0/go.mod h1:LUMuVrfsFfdKGLw+AFFVv6KtHOFMwRgDDzBt76IqCA4= +github.com/jackc/pgx/v4 v4.18.2 h1:xVpYkNR5pk5bMCZGfClbO962UIqVABcAGt7ha1s/FeU= +github.com/jackc/pgx/v4 v4.18.2/go.mod h1:Ey4Oru5tH5sB6tV7hDmfWFahwF15Eb7DNXlRKx2CkVw= github.com/jackc/pgx/v5 v5.7.1 h1:x7SYsPBYDkHDksogeSmZZ5xzThcTgRz++I5E+ePFUcs= github.com/jackc/pgx/v5 v5.7.1/go.mod h1:e7O26IywZZ+naJtWWos6i6fvWK+29etgITqrqHLfoZA= github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= -github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= -github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= -github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= -github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/lithammer/shortuuid/v3 v3.0.7 h1:trX0KTHy4Pbwo/6ia8fscyHoGA+mf1jWbPJVuvyJQQ8= @@ -41,15 +54,13 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/sony/gobreaker v1.0.0 h1:feX5fGGXSl3dYd4aHZItw+FpHLvvoaqkawKjVNiFMNQ= +github.com/sony/gobreaker v1.0.0/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= -github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= -github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A= golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70= From 05ee6530f38bce3fda5043c01fd745bff7a18de6 Mon Sep 17 00:00:00 2001 From: Julian Craske Date: Wed, 27 Nov 2024 09:34:33 +0000 Subject: [PATCH 13/15] extract initialise method for schema --- pkg/sql/schema.go | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/pkg/sql/schema.go b/pkg/sql/schema.go index d6ef936..269e42c 100644 --- a/pkg/sql/schema.go +++ b/pkg/sql/schema.go @@ -53,6 +53,10 @@ func initializeSchema( } } + return initialise(ctx, db, initializingQueries) +} + +func initialise(ctx context.Context, db ContextExecutor, initializingQueries []Query) error { for _, q := range initializingQueries { _, err = db.ExecContext(ctx, q.Query, q.Args...) if err != nil { @@ -70,14 +74,7 @@ func initialiseInTx(ctx context.Context, db ContextExecutor, initializingQueries } err := runInTx(ctx, beginner, func(ctx context.Context, tx Tx) error { - for _, q := range initializingQueries { - _, err := tx.ExecContext(ctx, q.Query, q.Args...) - if err != nil { - return errors.Wrap(err, "could not initialize schema") - } - } - - return nil + return initialise(ctx, tx, initializingQueries) }) if err != nil { return errors.Wrap(err, "run in tx") From d4bf3496836d042ff25640bb0a1b35fd815437c9 Mon Sep 17 00:00:00 2001 From: Julian Craske Date: Wed, 27 Nov 2024 09:34:55 +0000 Subject: [PATCH 14/15] add backward compatibility for NewSubscriber --- pkg/sql/sql.go | 7 +++++++ pkg/sql/sql_adapter.go | 2 +- pkg/sql/subscriber.go | 11 ++++++++++- 3 files changed, 18 insertions(+), 2 deletions(-) diff --git a/pkg/sql/sql.go b/pkg/sql/sql.go index ae00572..3c6c31d 100644 --- a/pkg/sql/sql.go +++ b/pkg/sql/sql.go @@ -39,6 +39,13 @@ type Beginner interface { ContextExecutor } +// SQLBeginner matches the standard library sql.DB and sql.Tx interfaces. +type SQLBeginner interface { + BeginTx(context.Context, *sql.TxOptions) (*sql.Tx, error) + ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) + QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error) +} + // sqlArgsToLog is used for "lazy" generating sql args strings to logger type sqlArgsToLog []interface{} diff --git a/pkg/sql/sql_adapter.go b/pkg/sql/sql_adapter.go index 03ca058..75833be 100644 --- a/pkg/sql/sql_adapter.go +++ b/pkg/sql/sql_adapter.go @@ -6,7 +6,7 @@ import ( ) type StdSQLBeginner struct { - *sql.DB + DB SQLBeginner } type StdSQLTx struct { diff --git a/pkg/sql/subscriber.go b/pkg/sql/subscriber.go index 7a02c09..37bb0c5 100644 --- a/pkg/sql/subscriber.go +++ b/pkg/sql/subscriber.go @@ -119,7 +119,16 @@ type Subscriber struct { logger watermill.LoggerAdapter } -func NewSubscriber(db Beginner, config SubscriberConfig, logger watermill.LoggerAdapter) (*Subscriber, error) { +func NewSubscriber(db interface{}, config SubscriberConfig, logger watermill.LoggerAdapter) (*Subscriber, error) { + sqlDB, ok := db.(SQLBeginner) + if !ok { + return nil, errors.New("db is not a sql.DB") + } + + return NewSubscriberV2(StdSQLBeginner{sqlDB}, config, logger) +} + +func NewSubscriberV2(db Beginner, config SubscriberConfig, logger watermill.LoggerAdapter) (*Subscriber, error) { if db == nil { return nil, errors.New("db is nil") } From c39c4f39359d049e09fbc28595ab8b415be0e166 Mon Sep 17 00:00:00 2001 From: Julian Craske Date: Wed, 27 Nov 2024 10:06:23 +0000 Subject: [PATCH 15/15] don't use interface in NewSubscriber --- pkg/sql/subscriber.go | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/pkg/sql/subscriber.go b/pkg/sql/subscriber.go index 37bb0c5..e0fb9d7 100644 --- a/pkg/sql/subscriber.go +++ b/pkg/sql/subscriber.go @@ -119,13 +119,8 @@ type Subscriber struct { logger watermill.LoggerAdapter } -func NewSubscriber(db interface{}, config SubscriberConfig, logger watermill.LoggerAdapter) (*Subscriber, error) { - sqlDB, ok := db.(SQLBeginner) - if !ok { - return nil, errors.New("db is not a sql.DB") - } - - return NewSubscriberV2(StdSQLBeginner{sqlDB}, config, logger) +func NewSubscriber(db SQLBeginner, config SubscriberConfig, logger watermill.LoggerAdapter) (*Subscriber, error) { + return NewSubscriberV2(StdSQLBeginner{db}, config, logger) } func NewSubscriberV2(db Beginner, config SubscriberConfig, logger watermill.LoggerAdapter) (*Subscriber, error) {