diff --git a/.github/workflows/pr.yml b/.github/workflows/pr.yml index 3a19c48..7ef3227 100644 --- a/.github/workflows/pr.yml +++ b/.github/workflows/pr.yml @@ -1,5 +1,6 @@ name: pr on: + pull_request: push: branches-ignore: - master diff --git a/go.mod b/go.mod index 7921173..7d71bb3 100644 --- a/go.mod +++ b/go.mod @@ -6,9 +6,10 @@ toolchain go1.23.0 require ( github.com/ThreeDotsLabs/watermill v1.4.0-rc.2 + github.com/ThreeDotsLabs/watermill-sql/v3 v3.1.0 github.com/go-sql-driver/mysql v1.4.1 - github.com/jackc/pgx/v4 v4.18.2 - github.com/lib/pq v1.10.2 + github.com/jackc/pgx/v5 v5.7.1 + github.com/lib/pq v1.10.9 github.com/oklog/ulid v1.3.1 github.com/stretchr/testify v1.9.0 ) @@ -19,19 +20,16 @@ require ( github.com/google/uuid v1.6.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect - github.com/jackc/chunkreader/v2 v2.0.1 // indirect - github.com/jackc/pgconn v1.14.3 // indirect - github.com/jackc/pgio v1.0.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect - github.com/jackc/pgproto3/v2 v2.3.3 // indirect - github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect - github.com/jackc/pgtype v1.14.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect + github.com/jackc/puddle/v2 v2.2.2 // indirect github.com/lithammer/shortuuid/v3 v3.0.7 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/sony/gobreaker v1.0.0 // indirect - golang.org/x/crypto v0.20.0 // indirect - golang.org/x/text v0.14.0 // indirect + golang.org/x/crypto v0.27.0 // indirect + golang.org/x/sync v0.8.0 // indirect + golang.org/x/text v0.18.0 // indirect google.golang.org/appengine v1.6.7 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 76ef0bf..f0024ed 100644 --- a/go.sum +++ b/go.sum @@ -1,27 +1,15 @@ -github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= -github.com/Masterminds/semver/v3 v3.1.1 h1:hLg3sBzpNErnxhQtUy/mmLR2I9foDujNK030IGemrRc= -github.com/Masterminds/semver/v3 v3.1.1/go.mod h1:VPu/7SZ7ePZ3QOrcuXROw5FAcLl4a0cBrbBpGY/8hQs= github.com/ThreeDotsLabs/watermill v1.4.0-rc.2 h1:K62uIAKOkCHTXtAwY+Nj95vyLR0y25UMBsbf/FuWCeQ= github.com/ThreeDotsLabs/watermill v1.4.0-rc.2/go.mod h1:lBnrLbxOjeMRgcJbv+UiZr8Ylz8RkJ4m6i/VN/Nk+to= +github.com/ThreeDotsLabs/watermill-sql/v3 v3.1.0 h1:g4uE5Nm3Z6LVB3m+uMgHlN4ne4bDpwf3RJmXYRgMv94= +github.com/ThreeDotsLabs/watermill-sql/v3 v3.1.0/go.mod h1:G8/otZYWLTCeYL2Ww3ujQ7gQ/3+jw5Bj0UtyKn7bBjA= github.com/cenkalti/backoff/v3 v3.2.2 h1:cfUAAO3yvKMYKPrvhDuHSwQnhZNk/RMHKdZqKTxfm6M= github.com/cenkalti/backoff/v3 v3.2.2/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs= -github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I= -github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ= -github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= -github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= -github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vbaY= -github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-sql-driver/mysql v1.4.1 h1:g24URVg0OFbNUTx9qqY1IRZ9D9z3iPyi5zKhQZpNwpA= github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= -github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= -github.com/gofrs/uuid v4.0.0+incompatible h1:1SD/1F5pU8p29ybwgQSwpQk+mwdRrXCYuPhW6m+TnJw= -github.com/gofrs/uuid v4.0.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= @@ -30,180 +18,66 @@ github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= -github.com/jackc/chunkreader v1.0.0/go.mod h1:RT6O25fNZIuasFJRyZ4R/Y2BbhasbmZXF9QQ7T3kePo= -github.com/jackc/chunkreader/v2 v2.0.0/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk= github.com/jackc/chunkreader/v2 v2.0.1 h1:i+RDz65UE+mmpjTfyz0MoVTnzeYxroil2G82ki7MGG8= github.com/jackc/chunkreader/v2 v2.0.1/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk= -github.com/jackc/pgconn v0.0.0-20190420214824-7e0022ef6ba3/go.mod h1:jkELnwuX+w9qN5YIfX0fl88Ehu4XC3keFuOJJk9pcnA= -github.com/jackc/pgconn v0.0.0-20190824142844-760dd75542eb/go.mod h1:lLjNuW/+OfW9/pnVKPazfWOgNfH2aPem8YQ7ilXGvJE= -github.com/jackc/pgconn v0.0.0-20190831204454-2fabfa3c18b7/go.mod h1:ZJKsE/KZfsUgOEh9hBm+xYTstcNHg7UPMVJqRfQxq4s= -github.com/jackc/pgconn v1.8.0/go.mod h1:1C2Pb36bGIP9QHGBYCjnyhqu7Rv3sGshaQUvmfGIB/o= -github.com/jackc/pgconn v1.9.0/go.mod h1:YctiPyvzfU11JFxoXokUOOKQXQmDMoJL9vJzHH8/2JY= -github.com/jackc/pgconn v1.9.1-0.20210724152538-d89c8390a530/go.mod h1:4z2w8XhRbP1hYxkpTuBjTS3ne3J48K83+u0zoyvg2pI= github.com/jackc/pgconn v1.14.3 h1:bVoTr12EGANZz66nZPkMInAV/KHD2TxH9npjXXgiB3w= github.com/jackc/pgconn v1.14.3/go.mod h1:RZbme4uasqzybK2RK5c65VsHxoyaml09lx3tXOcO/VM= github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE= github.com/jackc/pgio v1.0.0/go.mod h1:oP+2QK2wFfUWgr+gxjoBH9KGBb31Eio69xUb0w5bYf8= -github.com/jackc/pgmock v0.0.0-20190831213851-13a1b77aafa2/go.mod h1:fGZlG77KXmcq05nJLRkk0+p82V8B8Dw8KN2/V9c/OAE= -github.com/jackc/pgmock v0.0.0-20201204152224-4fe30f7445fd/go.mod h1:hrBW0Enj2AZTNpt/7Y5rr2xe/9Mn757Wtb2xeBzPv2c= -github.com/jackc/pgmock v0.0.0-20210724152146-4ad1a8207f65 h1:DadwsjnMwFjfWc9y5Wi/+Zz7xoE5ALHsRQlOctkOiHc= -github.com/jackc/pgmock v0.0.0-20210724152146-4ad1a8207f65/go.mod h1:5R2h2EEX+qri8jOWMbJCtaPWkrrNc7OHwsp2TCqp7ak= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= -github.com/jackc/pgproto3 v1.1.0/go.mod h1:eR5FA3leWg7p9aeAqi37XOTgTIbkABlvcPB3E5rlc78= -github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190420180111-c116219b62db/go.mod h1:bhq50y+xrl9n5mRYyCBFKkpRVTLYJVWeCc+mEAI3yXA= -github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190609003834-432c2951c711/go.mod h1:uH0AWtUmuShn0bcesswc4aBTWGvw0cAxIJp+6OB//Wg= -github.com/jackc/pgproto3/v2 v2.0.0-rc3/go.mod h1:ryONWYqW6dqSg1Lw6vXNMXoBJhpzvWKnT95C46ckYeM= -github.com/jackc/pgproto3/v2 v2.0.0-rc3.0.20190831210041-4c03ce451f29/go.mod h1:ryONWYqW6dqSg1Lw6vXNMXoBJhpzvWKnT95C46ckYeM= -github.com/jackc/pgproto3/v2 v2.0.6/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= -github.com/jackc/pgproto3/v2 v2.1.1/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= github.com/jackc/pgproto3/v2 v2.3.3 h1:1HLSx5H+tXR9pW3in3zaztoEwQYRC9SQaYUHjTSUOag= github.com/jackc/pgproto3/v2 v2.3.3/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= -github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b/go.mod h1:vsD4gTJCa9TptPL8sPkXrLZ+hDuNrZCnj29CQpr4X1E= -github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a h1:bbPeKD0xmW/Y25WS6cokEszi5g+S0QxI/d45PkRi7Nk= -github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= -github.com/jackc/pgtype v0.0.0-20190421001408-4ed0de4755e0/go.mod h1:hdSHsc1V01CGwFsrv11mJRHWJ6aifDLfdV3aVjFF0zg= -github.com/jackc/pgtype v0.0.0-20190824184912-ab885b375b90/go.mod h1:KcahbBH1nCMSo2DXpzsoWOAfFkdEtEJpPbVLq8eE+mc= -github.com/jackc/pgtype v0.0.0-20190828014616-a8802b16cc59/go.mod h1:MWlu30kVJrUS8lot6TQqcg7mtthZ9T0EoIBFiJcmcyw= -github.com/jackc/pgtype v1.8.1-0.20210724151600-32e20a603178/go.mod h1:C516IlIV9NKqfsMCXTdChteoXmwgUceqaLfjg2e3NlM= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= github.com/jackc/pgtype v1.14.0 h1:y+xUdabmyMkJLyApYuPj38mW+aAIqCe5uuBB51rH3Vw= github.com/jackc/pgtype v1.14.0/go.mod h1:LUMuVrfsFfdKGLw+AFFVv6KtHOFMwRgDDzBt76IqCA4= -github.com/jackc/pgx/v4 v4.0.0-20190420224344-cc3461e65d96/go.mod h1:mdxmSJJuR08CZQyj1PVQBHy9XOp5p8/SHH6a0psbY9Y= -github.com/jackc/pgx/v4 v4.0.0-20190421002000-1b8f0016e912/go.mod h1:no/Y67Jkk/9WuGR0JG/JseM9irFbnEPbuWV2EELPNuM= -github.com/jackc/pgx/v4 v4.0.0-pre1.0.20190824185557-6972a5742186/go.mod h1:X+GQnOEnf1dqHGpw7JmHqHc1NxDoalibchSk9/RWuDc= -github.com/jackc/pgx/v4 v4.12.1-0.20210724153913-640aa07df17c/go.mod h1:1QD0+tgSXP7iUjYm9C1NxKhny7lq6ee99u/z+IHFcgs= github.com/jackc/pgx/v4 v4.18.2 h1:xVpYkNR5pk5bMCZGfClbO962UIqVABcAGt7ha1s/FeU= github.com/jackc/pgx/v4 v4.18.2/go.mod h1:Ey4Oru5tH5sB6tV7hDmfWFahwF15Eb7DNXlRKx2CkVw= -github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= -github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= -github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= -github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= -github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= -github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= -github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= -github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= -github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw= -github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/jackc/pgx/v5 v5.7.1 h1:x7SYsPBYDkHDksogeSmZZ5xzThcTgRz++I5E+ePFUcs= +github.com/jackc/pgx/v5 v5.7.1/go.mod h1:e7O26IywZZ+naJtWWos6i6fvWK+29etgITqrqHLfoZA= +github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= +github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= +github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= -github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= -github.com/lib/pq v1.1.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= -github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= -github.com/lib/pq v1.10.2 h1:AqzbZs4ZoCBp+GtejcpCpcxM3zlSMx29dXbUSeVtJb8= -github.com/lib/pq v1.10.2/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/lithammer/shortuuid/v3 v3.0.7 h1:trX0KTHy4Pbwo/6ia8fscyHoGA+mf1jWbPJVuvyJQQ8= github.com/lithammer/shortuuid/v3 v3.0.7/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= -github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ= -github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= -github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= -github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= -github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= -github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= -github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= -github.com/rs/zerolog v1.13.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OKkWU= -github.com/rs/zerolog v1.15.0/go.mod h1:xYTKnLHcpfU2225ny5qZjxnj9NvkumZYjJHlAThCjNc= -github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= -github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4= -github.com/shopspring/decimal v1.2.0 h1:abSATXmQEYyShuxI4/vyW3tV1MrKAJzCZ/0zLUXYbsQ= -github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= -github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= -github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= github.com/sony/gobreaker v1.0.0 h1:feX5fGGXSl3dYd4aHZItw+FpHLvvoaqkawKjVNiFMNQ= github.com/sony/gobreaker v1.0.0/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= -github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= -github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q= -go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= -go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= -go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= -go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= -go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= -go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= -go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= -go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= -go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= -go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= -go.uber.org/zap v1.13.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/crypto v0.0.0-20190411191339-88737f569e3a/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE= -golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= -golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= -golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= -golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20201203163018-be400aefbc4c/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= -golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.20.0 h1:jmAMJJZXr5KiCw05dfYK9QnqaqKLYXijU23lsEdcQqg= -golang.org/x/crypto v0.20.0/go.mod h1:Xwo95rrVNIoSMx9wa1JroENMToLWn3RNVrTBpLHgZPQ= -golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= -golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= -golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= -golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= -golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A= +golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70= golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= -golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= -golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= -golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= -golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= -golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= +golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= -golang.org/x/tools v0.0.0-20190425163242-31fd60d6bfdc/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= -golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= -golang.org/x/tools v0.0.0-20190823170909-c4a336ef6a2f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= -golang.org/x/tools v0.0.0-20200103221440-774c71fcf114/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= -golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c= google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= -gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= -gopkg.in/inconshreveable/log15.v2 v2.0.0-20180818164646-67afb5ed74ec/go.mod h1:aPpfJ7XW+gOuirDoZ8gHhLh3kZ1B08FtV2bbmy7Jv3s= -gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= diff --git a/pkg/pgx/pgx_adapter.go b/pkg/pgx/pgx_adapter.go new file mode 100644 index 0000000..c23f074 --- /dev/null +++ b/pkg/pgx/pgx_adapter.go @@ -0,0 +1,127 @@ +package pgx + +import ( + "context" + stdSQL "database/sql" + "fmt" + "time" + + "github.com/ThreeDotsLabs/watermill-sql/v3/pkg/sql" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" +) + +type Conn interface { + BeginTx(ctx context.Context, options pgx.TxOptions) (pgx.Tx, error) + Exec(ctx context.Context, sql string, arguments ...interface{}) (pgconn.CommandTag, error) + Query(ctx context.Context, sql string, arguments ...interface{}) (pgx.Rows, error) +} + +type Beginner struct { + Conn +} + +type Tx struct { + pgx.Tx +} + +type Result struct { + pgconn.CommandTag +} + +type Rows struct { + pgx.Rows +} + +func (c Beginner) BeginTx(ctx context.Context, options *stdSQL.TxOptions) (sql.Tx, error) { + opts := pgx.TxOptions{} + if options != nil { + iso, err := toPgxIsolationLevel(options.Isolation) + if err != nil { + return nil, err + } + + opts = pgx.TxOptions{ + IsoLevel: iso, + AccessMode: toPgxAccessMode(options.ReadOnly), + DeferrableMode: "", + } + } + + tx, err := c.Conn.BeginTx(ctx, opts) + + return Tx{tx}, err +} + +func (c Beginner) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) { + res, err := c.Conn.Exec(ctx, query, args...) + + return Result{res}, err +} + +func (c Beginner) QueryContext(ctx context.Context, query string, args ...interface{}) (sql.Rows, error) { + rows, err := c.Conn.Query(ctx, query, args...) + + return Rows{rows}, err +} + +func (t Tx) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) { + res, err := t.Tx.Exec(ctx, query, args...) + + return Result{res}, err +} + +func (t Tx) QueryContext(ctx context.Context, query string, args ...any) (sql.Rows, error) { + rows, err := t.Tx.Query(ctx, query, args...) + + return Rows{rows}, err +} + +func (t Tx) Rollback() error { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + return t.Tx.Rollback(ctx) +} + +func (t Tx) Commit() error { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + return t.Tx.Commit(ctx) +} + +func (p Result) RowsAffected() (int64, error) { + return p.CommandTag.RowsAffected(), nil +} + +func (p Rows) Close() error { + p.Rows.Close() + + return nil +} + +func toPgxIsolationLevel(level stdSQL.IsolationLevel) (pgx.TxIsoLevel, error) { + switch level { + case stdSQL.LevelReadUncommitted: + return pgx.ReadUncommitted, nil + case stdSQL.LevelReadCommitted: + return pgx.ReadCommitted, nil + case stdSQL.LevelRepeatableRead: + return pgx.RepeatableRead, nil + case stdSQL.LevelSerializable: + return pgx.Serializable, nil + case stdSQL.LevelSnapshot: + return pgx.Serializable, fmt.Errorf("pgx does not support snapshot isolation") + default: + return pgx.Serializable, nil + } +} + +func toPgxAccessMode(readOnly bool) pgx.TxAccessMode { + if readOnly { + return pgx.ReadOnly + } + + return pgx.ReadWrite +} diff --git a/pkg/sql/context.go b/pkg/sql/context.go index 7740dee..36d33e4 100644 --- a/pkg/sql/context.go +++ b/pkg/sql/context.go @@ -2,7 +2,6 @@ package sql import ( "context" - "database/sql" ) type contextKey string @@ -11,7 +10,7 @@ const ( txContextKey contextKey = "tx" ) -func setTxToContext(ctx context.Context, tx *sql.Tx) context.Context { +func setTxToContext(ctx context.Context, tx Tx) context.Context { return context.WithValue(ctx, txContextKey, tx) } @@ -21,7 +20,7 @@ func setTxToContext(ctx context.Context, tx *sql.Tx) context.Context { // // It is useful when you want to ensure that data is updated only when the message is processed. // Example usage: https://github.com/ThreeDotsLabs/watermill/tree/master/_examples/real-world-examples/exactly-once-delivery-counter -func TxFromContext(ctx context.Context) (*sql.Tx, bool) { - tx, ok := ctx.Value(txContextKey).(*sql.Tx) +func TxFromContext(ctx context.Context) (Tx, bool) { + tx, ok := ctx.Value(txContextKey).(Tx) return tx, ok } diff --git a/pkg/sql/pubsub_test.go b/pkg/sql/pubsub_test.go index a5cfacf..e086158 100644 --- a/pkg/sql/pubsub_test.go +++ b/pkg/sql/pubsub_test.go @@ -11,23 +11,25 @@ import ( "time" "github.com/ThreeDotsLabs/watermill" + wpgx "github.com/ThreeDotsLabs/watermill-sql/v4/pkg/pgx" "github.com/ThreeDotsLabs/watermill-sql/v4/pkg/sql" "github.com/ThreeDotsLabs/watermill/message" "github.com/ThreeDotsLabs/watermill/message/subscriber" "github.com/ThreeDotsLabs/watermill/pubsub/tests" driver "github.com/go-sql-driver/mysql" - "github.com/jackc/pgx/v4" - "github.com/jackc/pgx/v4/stdlib" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/jackc/pgx/v5/stdlib" _ "github.com/lib/pq" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) var ( - logger = watermill.NewStdLogger(true, false) + logger = watermill.NewStdLogger(false, false) ) -func newPubSub(t *testing.T, db *stdSQL.DB, consumerGroup string, schemaAdapter sql.SchemaAdapter, offsetsAdapter sql.OffsetsAdapter) (message.Publisher, message.Subscriber) { +func newPubSub(t *testing.T, db sql.Beginner, consumerGroup string, schemaAdapter sql.SchemaAdapter, offsetsAdapter sql.OffsetsAdapter) (message.Publisher, message.Subscriber) { publisher, err := sql.NewPublisher( db, sql.PublisherConfig{ @@ -54,7 +56,7 @@ func newPubSub(t *testing.T, db *stdSQL.DB, consumerGroup string, schemaAdapter return publisher, subscriber } -func newMySQL(t *testing.T) *stdSQL.DB { +func newMySQL(t *testing.T) sql.Beginner { addr := os.Getenv("WATERMILL_TEST_MYSQL_HOST") if addr == "" { addr = "localhost" @@ -72,10 +74,10 @@ func newMySQL(t *testing.T) *stdSQL.DB { err = db.Ping() require.NoError(t, err) - return db + return sql.StdSQLBeginner{DB: db} } -func newPostgreSQL(t *testing.T) *stdSQL.DB { +func newPostgreSQL(t *testing.T) sql.Beginner { addr := os.Getenv("WATERMILL_TEST_POSTGRES_HOST") if addr == "" { addr = "localhost" @@ -88,10 +90,10 @@ func newPostgreSQL(t *testing.T) *stdSQL.DB { err = db.Ping() require.NoError(t, err) - return db + return sql.StdSQLBeginner{DB: db} } -func newPgxPostgreSQL(t *testing.T) *stdSQL.DB { +func newPgxPostgreSQL(t *testing.T) sql.Beginner { addr := os.Getenv("WATERMILL_TEST_POSTGRES_HOST") if addr == "" { addr = "localhost" @@ -106,7 +108,26 @@ func newPgxPostgreSQL(t *testing.T) *stdSQL.DB { err = db.Ping() require.NoError(t, err) - return db + return sql.StdSQLBeginner{DB: db} +} + +func newPgx(t *testing.T) wpgx.Beginner { + addr := os.Getenv("WATERMILL_TEST_POSTGRES_HOST") + if addr == "" { + addr = "localhost" + } + + connStr := fmt.Sprintf("postgres://watermill:password@%s/watermill?sslmode=disable", addr) + conf, err := pgxpool.ParseConfig(connStr) + require.NoError(t, err) + + db, err := pgxpool.NewWithConfig(context.Background(), conf) + require.NoError(t, err) + + err = db.Ping(context.Background()) + require.NoError(t, err) + + return wpgx.Beginner{Conn: db} } func createMySQLPubSubWithConsumerGroup(t *testing.T, consumerGroup string) (message.Publisher, message.Subscriber) { @@ -193,6 +214,23 @@ func createPgxPostgreSQLPubSubWithConsumerGroup(t *testing.T, consumerGroup stri return newPubSub(t, newPgxPostgreSQL(t), consumerGroup, schemaAdapter, offsetsAdapter) } +func createPgxPubSubWithConsumerGroup(t *testing.T, consumerGroup string) (message.Publisher, message.Subscriber) { + schemaAdapter := &sql.DefaultPostgreSQLSchema{ + AdvisoryXActLock: 1, + GenerateMessagesTableName: func(topic string) string { + return fmt.Sprintf(`"test_pgx_%s"`, topic) + }, + } + + offsetsAdapter := sql.DefaultPostgreSQLOffsetsAdapter{ + GenerateMessagesOffsetsTableName: func(topic string) string { + return fmt.Sprintf(`"test_pgx_offsets_%s"`, topic) + }, + } + + return newPubSub(t, newPgx(t), consumerGroup, schemaAdapter, offsetsAdapter) +} + func createPostgreSQLPubSub(t *testing.T) (message.Publisher, message.Subscriber) { return createPostgreSQLPubSubWithConsumerGroup(t, "test") } @@ -201,6 +239,10 @@ func createPgxPostgreSQLPubSub(t *testing.T) (message.Publisher, message.Subscri return createPgxPostgreSQLPubSubWithConsumerGroup(t, "test") } +func createPgxPubSub(t *testing.T) (message.Publisher, message.Subscriber) { + return createPgxPubSubWithConsumerGroup(t, "test") +} + func createPostgreSQLQueue(t *testing.T, db *stdSQL.DB) (message.Publisher, message.Subscriber) { schemaAdapter := sql.PostgreSQLQueueSchema{ GeneratePayloadType: func(topic string) string { @@ -294,6 +336,24 @@ func TestPgxPostgreSQLPublishSubscribe(t *testing.T) { ) } +func TestPgxPublishSubscribe(t *testing.T) { + t.Parallel() + + features := tests.Features{ + ConsumerGroups: true, + ExactlyOnceDelivery: false, + GuaranteedOrder: true, + Persistent: true, + } + + tests.TestPubSub( + t, + features, + createPgxPubSub, + createPgxPubSubWithConsumerGroup, + ) +} + func TestPostgreSQLQueue(t *testing.T) { t.Parallel() @@ -336,20 +396,29 @@ func TestPgxPostgreSQLQueue(t *testing.T) { func TestCtxValues(t *testing.T) { pubSubConstructors := []struct { - Name string - Constructor func(t *testing.T) (message.Publisher, message.Subscriber) + Name string + Constructor func(t *testing.T) (message.Publisher, message.Subscriber) + ExpectedType interface{} }{ { - Name: "mysql", - Constructor: createMySQLPubSub, + Name: "mysql", + Constructor: createMySQLPubSub, + ExpectedType: &sql.StdSQLTx{}, + }, + { + Name: "postgresql", + Constructor: createPostgreSQLPubSub, + ExpectedType: &sql.StdSQLTx{}, }, { - Name: "postgresql", - Constructor: createPostgreSQLPubSub, + Name: "pgx", + Constructor: createPgxPubSub, + ExpectedType: wpgx.Tx{}, }, } for _, constructor := range pubSubConstructors { + constructor := constructor pub, sub := constructor.Constructor(t) t.Run(constructor.Name, func(t *testing.T) { @@ -378,7 +447,7 @@ func TestCtxValues(t *testing.T) { tx, ok := sql.TxFromContext(msg.Context()) assert.True(t, ok) assert.NotNil(t, t, tx) - assert.IsType(t, &stdSQL.Tx{}, tx) + assert.IsType(t, constructor.ExpectedType, tx) msg.Ack() case <-time.After(time.Second * 10): t.Fatal("no message received") @@ -394,7 +463,7 @@ func TestNotMissingMessages(t *testing.T) { pubSubs := []struct { Name string - DbConstructor func(t *testing.T) *stdSQL.DB + DbConstructor func(t *testing.T) sql.Beginner SchemaAdapter sql.SchemaAdapter OffsetsAdapter sql.OffsetsAdapter }{ @@ -410,6 +479,14 @@ func TestNotMissingMessages(t *testing.T) { SchemaAdapter: newPostgresSchemaAdapter(0), OffsetsAdapter: newPostgresOffsetsAdapter(), }, + { + Name: "pgx", + DbConstructor: func(t *testing.T) sql.Beginner { + return newPgx(t) + }, + SchemaAdapter: newPostgresSchemaAdapter(0), + OffsetsAdapter: newPostgresOffsetsAdapter(), + }, } for _, pubSub := range pubSubs { @@ -599,6 +676,32 @@ func TestConcurrentSubscribe_different_bulk_sizes(t *testing.T) { }, Test: tests.TestConcurrentSubscribe, }, + { + Name: "TestConcurrentSubscribe_pgx_1", + Constructor: func(t *testing.T) (message.Publisher, message.Subscriber) { + return newPubSub( + t, + newPgx(t), + "test", + newPostgresSchemaAdapter(1), + newPostgresOffsetsAdapter(), + ) + }, + Test: tests.TestPublishSubscribe, + }, + { + Name: "TestConcurrentSubscribe_pgx_5", + Constructor: func(t *testing.T) (message.Publisher, message.Subscriber) { + return newPubSub( + t, + newPgx(t), + "test", + newPostgresSchemaAdapter(5), + newPostgresOffsetsAdapter(), + ) + }, + Test: tests.TestConcurrentSubscribe, + }, } for i := range testCases { tc := testCases[i] diff --git a/pkg/sql/schema.go b/pkg/sql/schema.go index 1283281..269e42c 100644 --- a/pkg/sql/schema.go +++ b/pkg/sql/schema.go @@ -7,6 +7,11 @@ import ( "github.com/ThreeDotsLabs/watermill" ) +type RequiresTransaction interface { + // RequiresTransaction returns true if the schema adapter requires a transaction to be started before executing queries. + RequiresTransaction() bool +} + func initializeSchema( ctx context.Context, topic string, @@ -41,6 +46,17 @@ func initializeSchema( "query": initializingQueries, }) + if rt, ok := schemaAdapter.(RequiresTransaction); ok && rt.RequiresTransaction() { + err = initialiseInTx(ctx, db, initializingQueries) + if err != nil { + return errors.Wrap(err, "could not initialize schema in transaction") + } + } + + return initialise(ctx, db, initializingQueries) +} + +func initialise(ctx context.Context, db ContextExecutor, initializingQueries []Query) error { for _, q := range initializingQueries { _, err = db.ExecContext(ctx, q.Query, q.Args...) if err != nil { @@ -50,3 +66,19 @@ func initializeSchema( return nil } + +func initialiseInTx(ctx context.Context, db ContextExecutor, initializingQueries []Query) error { + beginner, ok := db.(Beginner) + if !ok { + return errors.New("db is not a Beginner") + } + + err := runInTx(ctx, beginner, func(ctx context.Context, tx Tx) error { + return initialise(ctx, tx, initializingQueries) + }) + if err != nil { + return errors.Wrap(err, "run in tx") + } + + return nil +} diff --git a/pkg/sql/schema_adapter_postgresql.go b/pkg/sql/schema_adapter_postgresql.go index e426067..5d6f9ce 100644 --- a/pkg/sql/schema_adapter_postgresql.go +++ b/pkg/sql/schema_adapter_postgresql.go @@ -25,6 +25,10 @@ type DefaultPostgreSQLSchema struct { // // Default value is 100. SubscribeBatchSize int + + // AdvisoryXActLock if greater than zero will use pg_advisory_xact_lock to lock the transaction which is needed + // to concurrently create tables. https://stackoverflow.com/questions/74261789/postgres-create-table-if-not-exists-%E2%87%92-23505 + AdvisoryXActLock int } func (s DefaultPostgreSQLSchema) SchemaInitializingQueries(params SchemaInitializingQueriesParams) ([]Query, error) { @@ -46,7 +50,14 @@ func (s DefaultPostgreSQLSchema) SchemaInitializingQueries(params SchemaInitiali ); ` - return []Query{{Query: createMessagesTable}}, nil + queries := []Query{{Query: createMessagesTable}} + if s.AdvisoryXActLock > 0 { + queries = append([]Query{ + {Query: fmt.Sprintf("SELECT pg_advisory_xact_lock(%d);", s.AdvisoryXActLock)}, + }, queries...) + } + + return queries, nil } func (s DefaultPostgreSQLSchema) InsertQuery(params InsertQueryParams) (Query, error) { @@ -222,3 +233,7 @@ func (s DefaultPostgreSQLSchema) SubscribeIsolationLevel() sql.IsolationLevel { // For Postgres Repeatable Read is enough. return sql.LevelRepeatableRead } + +func (s DefaultPostgreSQLSchema) RequiresTransaction() bool { + return s.AdvisoryXActLock > 0 +} diff --git a/pkg/sql/schema_adapter_test.go b/pkg/sql/schema_adapter_test.go index f6a3d0d..b1ae583 100644 --- a/pkg/sql/schema_adapter_test.go +++ b/pkg/sql/schema_adapter_test.go @@ -35,7 +35,7 @@ func TestDefaultMySQLSchema(t *testing.T) { func TestDefaultMySQLSchema_implicit_commit_warning(t *testing.T) { db := newMySQL(t) - tx, err := db.Begin() + tx, err := db.BeginTx(context.Background(), nil) require.NoError(t, err) schemaAdapter := sql.DefaultMySQLSchema{} @@ -49,7 +49,7 @@ func TestDefaultMySQLSchema_implicit_commit_warning(t *testing.T) { func TestDefaultMySQLSchema_implicit_commit(t *testing.T) { db := newMySQL(t) - tx, err := db.Begin() + tx, err := db.BeginTx(context.Background(), nil) require.NoError(t, err) schemaAdapter := sql.DefaultMySQLSchema{} diff --git a/pkg/sql/sql.go b/pkg/sql/sql.go index 8b64613..3c6c31d 100644 --- a/pkg/sql/sql.go +++ b/pkg/sql/sql.go @@ -7,30 +7,45 @@ import ( "strings" ) -// interface definitions borrowed from github.com/volatiletech/sqlboiler +// A Result summarizes an executed SQL command. +type Result interface { + // RowsAffected returns the number of rows affected by an + // update, insert, or delete. Not every database or database + // driver may support this. + RowsAffected() (int64, error) +} -// Executor can perform SQL queries. -type Executor interface { - Exec(query string, args ...interface{}) (sql.Result, error) - Query(query string, args ...interface{}) (*sql.Rows, error) - QueryRow(query string, args ...interface{}) *sql.Row +type Rows interface { + Scan(dest ...any) error + Close() error + Next() bool +} + +type Tx interface { + ContextExecutor + Rollback() error + Commit() error } // ContextExecutor can perform SQL queries with context type ContextExecutor interface { - Executor - - ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) - QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) - QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row + ExecContext(ctx context.Context, query string, args ...any) (Result, error) + QueryContext(ctx context.Context, query string, args ...any) (Rows, error) } // Beginner begins transactions. type Beginner interface { - BeginTx(context.Context, *sql.TxOptions) (*sql.Tx, error) + BeginTx(context.Context, *sql.TxOptions) (Tx, error) ContextExecutor } +// SQLBeginner matches the standard library sql.DB and sql.Tx interfaces. +type SQLBeginner interface { + BeginTx(context.Context, *sql.TxOptions) (*sql.Tx, error) + ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) + QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error) +} + // sqlArgsToLog is used for "lazy" generating sql args strings to logger type sqlArgsToLog []interface{} diff --git a/pkg/sql/sql_adapter.go b/pkg/sql/sql_adapter.go new file mode 100644 index 0000000..75833be --- /dev/null +++ b/pkg/sql/sql_adapter.go @@ -0,0 +1,41 @@ +package sql + +import ( + "context" + "database/sql" +) + +type StdSQLBeginner struct { + DB SQLBeginner +} + +type StdSQLTx struct { + *sql.Tx +} + +// BeginTx converts the stdSQL.Tx struct to our Tx interface +func (c StdSQLBeginner) BeginTx(ctx context.Context, options *sql.TxOptions) (Tx, error) { + tx, err := c.DB.BeginTx(ctx, options) + + return &StdSQLTx{tx}, err +} + +// ExecContext converts the stdSQL.Result struct to our Result interface +func (c StdSQLBeginner) ExecContext(ctx context.Context, query string, args ...interface{}) (Result, error) { + return c.DB.ExecContext(ctx, query, args...) +} + +// QueryContext converts the stdSQL.Rows struct to our Rows interface +func (c StdSQLBeginner) QueryContext(ctx context.Context, query string, args ...interface{}) (Rows, error) { + return c.DB.QueryContext(ctx, query, args...) +} + +// ExecContext converts the stdSQL.Result struct to our Result interface +func (t StdSQLTx) ExecContext(ctx context.Context, query string, args ...any) (Result, error) { + return t.Tx.ExecContext(ctx, query, args...) +} + +// QueryContext converts the stdSQL.Rows struct to our Rows interface +func (t StdSQLTx) QueryContext(ctx context.Context, query string, args ...any) (Rows, error) { + return t.Tx.QueryContext(ctx, query, args...) +} diff --git a/pkg/sql/subscriber.go b/pkg/sql/subscriber.go index 9522ad7..e0fb9d7 100644 --- a/pkg/sql/subscriber.go +++ b/pkg/sql/subscriber.go @@ -119,7 +119,11 @@ type Subscriber struct { logger watermill.LoggerAdapter } -func NewSubscriber(db Beginner, config SubscriberConfig, logger watermill.LoggerAdapter) (*Subscriber, error) { +func NewSubscriber(db SQLBeginner, config SubscriberConfig, logger watermill.LoggerAdapter) (*Subscriber, error) { + return NewSubscriberV2(StdSQLBeginner{db}, config, logger) +} + +func NewSubscriberV2(db Beginner, config SubscriberConfig, logger watermill.LoggerAdapter) (*Subscriber, error) { if db == nil { return nil, errors.New("db is nil") } @@ -189,7 +193,7 @@ func (s *Subscriber) Subscribe(ctx context.Context, topic string) (o <-chan *mes } if len(bsq) >= 1 { - err := runInTx(ctx, s.db, func(ctx context.Context, tx *sql.Tx) error { + err := runInTx(ctx, s.db, func(ctx context.Context, tx Tx) error { for _, q := range bsq { s.logger.Debug("Executing before subscribing query", watermill.LogFields{ "query": q, @@ -383,7 +387,7 @@ func (s *Subscriber) processMessage( ctx context.Context, topic string, row Row, - tx *sql.Tx, + tx Tx, out chan *message.Message, logger watermill.LoggerAdapter, ) (bool, error) { diff --git a/pkg/sql/tx.go b/pkg/sql/tx.go index 70e8caf..1a21404 100644 --- a/pkg/sql/tx.go +++ b/pkg/sql/tx.go @@ -2,7 +2,6 @@ package sql import ( "context" - "database/sql" "errors" "fmt" ) @@ -10,7 +9,7 @@ import ( func runInTx( ctx context.Context, db Beginner, - fn func(ctx context.Context, tx *sql.Tx) error, + fn func(ctx context.Context, tx Tx) error, ) (err error) { tx, err := db.BeginTx(ctx, nil) if err != nil {