Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add PostgreSQLQueue and DelayedPostgreSQL Pub/Sub #34

Merged
merged 40 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
6fc81e8
Add ConditionalPostgreSQLSchema
m110 Aug 22, 2024
484cc85
words
m110 Aug 22, 2024
0c06a8f
Merge branch 'master' into conditional-adapters
m110 Aug 23, 2024
0298e72
Merge branch 'master' into conditional-adapters
m110 Sep 6, 2024
b00f99a
quotes
m110 Sep 6, 2024
3853813
Update
m110 Sep 6, 2024
f42a4cb
Update
m110 Sep 6, 2024
62f9353
Update
m110 Sep 6, 2024
aaf911c
Quotes
m110 Sep 6, 2024
e0de57a
acked = false
m110 Sep 6, 2024
187e31f
last row
m110 Sep 6, 2024
1cebeee
Update interfaces
m110 Oct 10, 2024
8e47d88
Update
m110 Oct 10, 2024
ff969a3
Add index and custom config
m110 Oct 11, 2024
33bf3e3
continued
m110 Oct 11, 2024
3759873
Merge branch 'master' into conditional-adapters
m110 Oct 11, 2024
48a74fe
names
m110 Oct 11, 2024
9b207ae
Fix build
m110 Oct 11, 2024
c23706b
Tests
m110 Oct 14, 2024
97d3b48
update
m110 Oct 14, 2024
0ab660d
Generate name
m110 Oct 14, 2024
f1163f8
Update
m110 Oct 15, 2024
862d5f3
enable tests
m110 Oct 15, 2024
d36774f
Add tests for delayed postgresql
m110 Oct 15, 2024
705f41f
Add tests
m110 Oct 15, 2024
034fb2a
simplify
m110 Oct 15, 2024
c6f1cc2
Add DelayedRequeuer
m110 Oct 16, 2024
8d10327
tidy
m110 Oct 16, 2024
7fb34fb
Tests
m110 Oct 17, 2024
6613fe2
Add tests
m110 Oct 22, 2024
c611c62
Rename to queue
m110 Oct 23, 2024
ed30aa8
Extend interfaces with errors
m110 Oct 23, 2024
a90aa47
Bump to v4
m110 Oct 23, 2024
f261164
AllowNoDelay
m110 Oct 23, 2024
75257d7
Bump version
m110 Oct 24, 2024
584a6f7
.
m110 Oct 24, 2024
5d175d5
Bump max connections
m110 Oct 24, 2024
1f44b96
Trigger CI
m110 Oct 25, 2024
4cd3b32
Add internal wait-for
m110 Oct 25, 2024
ccdce62
Bump watermill
m110 Oct 25, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ test_reconnect:

wait:
go run github.com/ThreeDotsLabs/wait-for@latest localhost:3306 localhost:5432
go run ./internal/wait-for

build:
go build ./...
Expand Down
4 changes: 2 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ services:
mysql:
image: mysql:8.0
restart: unless-stopped
command: [ "--max_connections=500" ]
command: [ "--max_connections=5000" ]
ports:
- 3306:3306
environment:
Expand All @@ -16,7 +16,7 @@ services:
postgres:
image: postgres:15.3
restart: unless-stopped
command: postgres -c 'max_connections=500'
command: postgres -c 'max_connections=5000'
ports:
- 5432:5432
environment:
Expand Down
15 changes: 9 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
module github.com/ThreeDotsLabs/watermill-sql/v3
module github.com/ThreeDotsLabs/watermill-sql/v4

go 1.20
go 1.21

toolchain go1.23.0

require (
github.com/ThreeDotsLabs/watermill v1.2.0
github.com/ThreeDotsLabs/watermill v1.4.0-rc.2
github.com/go-sql-driver/mysql v1.4.1
github.com/jackc/pgx/v4 v4.18.2
github.com/lib/pq v1.10.2
github.com/oklog/ulid v1.3.1
github.com/stretchr/testify v1.8.1
github.com/stretchr/testify v1.9.0
)

require (
github.com/cenkalti/backoff/v3 v3.2.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
Expand All @@ -26,7 +29,7 @@ require (
github.com/lithammer/shortuuid/v3 v3.0.7 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rogpeppe/go-internal v1.10.0 // indirect
github.com/sony/gobreaker v1.0.0 // indirect
golang.org/x/crypto v0.20.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
Expand Down
27 changes: 13 additions & 14 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/Masterminds/semver/v3 v3.1.1 h1:hLg3sBzpNErnxhQtUy/mmLR2I9foDujNK030IGemrRc=
github.com/Masterminds/semver/v3 v3.1.1/go.mod h1:VPu/7SZ7ePZ3QOrcuXROw5FAcLl4a0cBrbBpGY/8hQs=
github.com/ThreeDotsLabs/watermill v1.2.0 h1:TU3TML1dnQ/ifK09F2+4JQk2EKhmhXe7Qv7eb5ZpTS8=
github.com/ThreeDotsLabs/watermill v1.2.0/go.mod h1:IuVxGk/kgCN0cex2S94BLglUiB0PwOm8hbUhm6g2Nx4=
github.com/ThreeDotsLabs/watermill v1.4.0-rc.2 h1:K62uIAKOkCHTXtAwY+Nj95vyLR0y25UMBsbf/FuWCeQ=
github.com/ThreeDotsLabs/watermill v1.4.0-rc.2/go.mod h1:lBnrLbxOjeMRgcJbv+UiZr8Ylz8RkJ4m6i/VN/Nk+to=
github.com/cenkalti/backoff/v3 v3.2.2 h1:cfUAAO3yvKMYKPrvhDuHSwQnhZNk/RMHKdZqKTxfm6M=
github.com/cenkalti/backoff/v3 v3.2.2/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs=
github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I=
github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ=
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
Expand All @@ -21,8 +23,8 @@ github.com/gofrs/uuid v4.0.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRx
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
Expand Down Expand Up @@ -78,12 +80,13 @@ github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dv
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.1.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
Expand All @@ -104,8 +107,6 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
github.com/rs/zerolog v1.13.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OKkWU=
github.com/rs/zerolog v1.15.0/go.mod h1:xYTKnLHcpfU2225ny5qZjxnj9NvkumZYjJHlAThCjNc=
Expand All @@ -115,20 +116,18 @@ github.com/shopspring/decimal v1.2.0 h1:abSATXmQEYyShuxI4/vyW3tV1MrKAJzCZ/0zLUXY
github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sony/gobreaker v1.0.0 h1:feX5fGGXSl3dYd4aHZItw+FpHLvvoaqkawKjVNiFMNQ=
github.com/sony/gobreaker v1.0.0/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
Expand Down
84 changes: 84 additions & 0 deletions internal/wait-for/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package main

import (
stdSQL "database/sql"
"fmt"
"os"
"time"

driver "github.com/go-sql-driver/mysql"
_ "github.com/lib/pq"
)

func main() {
for i := 0; i < 10; i++ {
err := tryConnecting()
if err == nil {
os.Exit(0)
}

time.Sleep(1 * time.Second)
}

fmt.Println("Failed to connect")
os.Exit(1)
}

func tryConnecting() error {
err := connectToMySQL()
if err != nil {
return err
}

err = connectToPostgreSQL()
if err != nil {
return err
}

return nil
}

func connectToMySQL() error {
addr := os.Getenv("WATERMILL_TEST_MYSQL_HOST")
if addr == "" {
addr = "localhost"
}
conf := driver.NewConfig()
conf.Net = "tcp"
conf.User = "root"
conf.Addr = addr

conf.DBName = "watermill"

db, err := stdSQL.Open("mysql", conf.FormatDSN())
if err != nil {
return err
}

err = db.Ping()
if err != nil {
return err
}

return nil
}

func connectToPostgreSQL() error {
addr := os.Getenv("WATERMILL_TEST_POSTGRES_HOST")
if addr == "" {
addr = "localhost"
}

connStr := fmt.Sprintf("postgres://watermill:password@%s/watermill?sslmode=disable", addr)
db, err := stdSQL.Open("postgres", connStr)
if err != nil {
return err
}

err = db.Ping()
if err != nil {
return err
}

return nil
}
143 changes: 143 additions & 0 deletions pkg/sql/delayed_postgresql.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package sql

import (
"database/sql"
"fmt"
"strings"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/components/delay"
"github.com/ThreeDotsLabs/watermill/message"
)

type DelayedPostgreSQLPublisherConfig struct {
// DelayPublisherConfig is a configuration for the delay.Publisher.
DelayPublisherConfig delay.PublisherConfig

// OverridePublisherConfig allows overriding the default PublisherConfig.
OverridePublisherConfig func(config *PublisherConfig) error

Logger watermill.LoggerAdapter
}

func (c *DelayedPostgreSQLPublisherConfig) setDefaults() {
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}

// NewDelayedPostgreSQLPublisher creates a new Publisher that stores messages in PostgreSQL with a delay.
// The delay can be set per message with the Watermill's components/delay metadata.
func NewDelayedPostgreSQLPublisher(db *sql.DB, config DelayedPostgreSQLPublisherConfig) (message.Publisher, error) {
config.setDefaults()

publisherConfig := PublisherConfig{
SchemaAdapter: PostgreSQLQueueSchema{},
AutoInitializeSchema: true,
}

if config.OverridePublisherConfig != nil {
err := config.OverridePublisherConfig(&publisherConfig)
if err != nil {
return nil, err
}
}

var publisher message.Publisher
var err error

publisher, err = NewPublisher(db, publisherConfig, config.Logger)
if err != nil {
return nil, err
}

publisher, err = delay.NewPublisher(publisher, config.DelayPublisherConfig)
if err != nil {
return nil, err
}

return publisher, nil
}

type DelayedPostgreSQLSubscriberConfig struct {
// OverrideSubscriberConfig allows overriding the default SubscriberConfig.
OverrideSubscriberConfig func(config *SubscriberConfig) error

// DeleteOnAck deletes the message from the queue when it's acknowledged.
DeleteOnAck bool

// AllowNoDelay allows receiving messages without the delay metadata.
// By default, such messages will be skipped.
// If set to true, messages without delay metadata will be received immediately.
AllowNoDelay bool

Logger watermill.LoggerAdapter
}

func (c *DelayedPostgreSQLSubscriberConfig) setDefaults() {
if c.Logger == nil {
c.Logger = watermill.NopLogger{}
}
}

// NewDelayedPostgreSQLSubscriber creates a new Subscriber that reads messages from PostgreSQL with a delay.
// The delay can be set per message with the Watermill's components/delay metadata.
func NewDelayedPostgreSQLSubscriber(db *sql.DB, config DelayedPostgreSQLSubscriberConfig) (message.Subscriber, error) {
config.setDefaults()

where := fmt.Sprintf("(metadata->>'%v')::timestamptz < NOW() AT TIME ZONE 'UTC'", delay.DelayedUntilKey)

if config.AllowNoDelay {
where += fmt.Sprintf(` OR (metadata->>'%s') IS NULL`, delay.DelayedUntilKey)
}

schemaAdapter := delayedPostgreSQLSchemaAdapter{
PostgreSQLQueueSchema: PostgreSQLQueueSchema{
GenerateWhereClause: func(params GenerateWhereClauseParams) (string, []any) {
return where, nil
},
},
}

subscriberConfig := SubscriberConfig{
SchemaAdapter: schemaAdapter,
OffsetsAdapter: PostgreSQLQueueOffsetsAdapter{
DeleteOnAck: config.DeleteOnAck,
},
InitializeSchema: true,
}

if config.OverrideSubscriberConfig != nil {
err := config.OverrideSubscriberConfig(&subscriberConfig)
if err != nil {
return nil, err
}
}

sub, err := NewSubscriber(db, subscriberConfig, config.Logger)
if err != nil {
return nil, err
}

return sub, nil
}

type delayedPostgreSQLSchemaAdapter struct {
PostgreSQLQueueSchema
}

func (a delayedPostgreSQLSchemaAdapter) SchemaInitializingQueries(params SchemaInitializingQueriesParams) ([]Query, error) {
queries, err := a.PostgreSQLQueueSchema.SchemaInitializingQueries(params)
if err != nil {
return nil, err
}

table := a.MessagesTable(params.Topic)
index := fmt.Sprintf(`"%s_delayed_until_idx"`, strings.ReplaceAll(table, `"`, ""))

queries = append(queries, Query{
Query: fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s ON %s ((metadata->>'%s'))`, index, table, delay.DelayedUntilKey),
})

return queries, nil
}
Loading
Loading