Skip to content

Commit

Permalink
Extend interfaces with errors
Browse files Browse the repository at this point in the history
  • Loading branch information
m110 committed Oct 23, 2024
1 parent c611c62 commit ed30aa8
Show file tree
Hide file tree
Showing 12 changed files with 92 additions and 62 deletions.
9 changes: 6 additions & 3 deletions pkg/sql/delayed_postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,11 @@ type delayedPostgreSQLSchemaAdapter struct {
PostgreSQLQueueSchema
}

func (a delayedPostgreSQLSchemaAdapter) SchemaInitializingQueries(params SchemaInitializingQueriesParams) []Query {
queries := a.PostgreSQLQueueSchema.SchemaInitializingQueries(params)
func (a delayedPostgreSQLSchemaAdapter) SchemaInitializingQueries(params SchemaInitializingQueriesParams) ([]Query, error) {
queries, err := a.PostgreSQLQueueSchema.SchemaInitializingQueries(params)
if err != nil {
return nil, err
}

table := a.MessagesTable(params.Topic)
index := fmt.Sprintf(`"%s_delayed_until_idx"`, strings.ReplaceAll(table, `"`, ""))
Expand All @@ -121,5 +124,5 @@ func (a delayedPostgreSQLSchemaAdapter) SchemaInitializingQueries(params SchemaI
Query: fmt.Sprintf(`CREATE INDEX IF NOT EXISTS %s ON %s ((metadata->>'%s'))`, index, table, delay.DelayedUntilKey),
})

return queries
return queries, nil
}
10 changes: 5 additions & 5 deletions pkg/sql/offsets_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,22 @@ type BeforeSubscribingQueriesParams struct {

type OffsetsAdapter interface {
// AckMessageQuery the SQL query and arguments that will mark a message as read for a given consumer group.
AckMessageQuery(params AckMessageQueryParams) Query
AckMessageQuery(params AckMessageQueryParams) (Query, error)

// ConsumedMessageQuery will return the SQL query and arguments which be executed after consuming message,
// but before ack.
//
// ConsumedMessageQuery is optional, and will be not executed if query is empty.
ConsumedMessageQuery(params ConsumedMessageQueryParams) Query
ConsumedMessageQuery(params ConsumedMessageQueryParams) (Query, error)

// NextOffsetQuery returns the SQL query and arguments which should return offset of next message to consume.
NextOffsetQuery(params NextOffsetQueryParams) Query
NextOffsetQuery(params NextOffsetQueryParams) (Query, error)

// SchemaInitializingQueries returns SQL queries which will make sure (CREATE IF NOT EXISTS)
// that the appropriate tables exist to write messages to the given topic.
SchemaInitializingQueries(params OffsetsSchemaInitializingQueriesParams) []Query
SchemaInitializingQueries(params OffsetsSchemaInitializingQueriesParams) ([]Query, error)

// BeforeSubscribingQueries returns queries which will be executed before subscribing to a topic.
// All queries will be executed in a single transaction.
BeforeSubscribingQueries(params BeforeSubscribingQueriesParams) []Query
BeforeSubscribingQueries(params BeforeSubscribingQueriesParams) ([]Query, error)
}
20 changes: 10 additions & 10 deletions pkg/sql/offsets_adapter_mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type DefaultMySQLOffsetsAdapter struct {
GenerateMessagesOffsetsTableName func(topic string) string
}

func (a DefaultMySQLOffsetsAdapter) SchemaInitializingQueries(params OffsetsSchemaInitializingQueriesParams) []Query {
func (a DefaultMySQLOffsetsAdapter) SchemaInitializingQueries(params OffsetsSchemaInitializingQueriesParams) ([]Query, error) {
return []Query{
{
Query: `
Expand All @@ -29,25 +29,25 @@ func (a DefaultMySQLOffsetsAdapter) SchemaInitializingQueries(params OffsetsSche
PRIMARY KEY(consumer_group)
)`,
},
}
}, nil
}

func (a DefaultMySQLOffsetsAdapter) AckMessageQuery(params AckMessageQueryParams) Query {
func (a DefaultMySQLOffsetsAdapter) AckMessageQuery(params AckMessageQueryParams) (Query, error) {
ackQuery := `INSERT INTO ` + a.MessagesOffsetsTable(params.Topic) + ` (offset_consumed, offset_acked, consumer_group)
VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE offset_consumed=VALUES(offset_consumed), offset_acked=VALUES(offset_acked)`

return Query{ackQuery, []any{params.LastRow.Offset, params.LastRow.Offset, params.ConsumerGroup}}
return Query{ackQuery, []any{params.LastRow.Offset, params.LastRow.Offset, params.ConsumerGroup}}, nil
}

func (a DefaultMySQLOffsetsAdapter) NextOffsetQuery(params NextOffsetQueryParams) Query {
func (a DefaultMySQLOffsetsAdapter) NextOffsetQuery(params NextOffsetQueryParams) (Query, error) {
return Query{
Query: `SELECT COALESCE(
(SELECT offset_acked
FROM ` + a.MessagesOffsetsTable(params.Topic) + `
WHERE consumer_group=? FOR UPDATE
), 0)`,
Args: []any{params.ConsumerGroup},
}
}, nil
}

func (a DefaultMySQLOffsetsAdapter) MessagesOffsetsTable(topic string) string {
Expand All @@ -57,13 +57,13 @@ func (a DefaultMySQLOffsetsAdapter) MessagesOffsetsTable(topic string) string {
return fmt.Sprintf("`watermill_offsets_%s`", topic)
}

func (a DefaultMySQLOffsetsAdapter) ConsumedMessageQuery(params ConsumedMessageQueryParams) Query {
func (a DefaultMySQLOffsetsAdapter) ConsumedMessageQuery(params ConsumedMessageQueryParams) (Query, error) {
// offset_consumed is not queried anywhere, it's used only to detect race conditions with NextOffsetQuery.
ackQuery := `INSERT INTO ` + a.MessagesOffsetsTable(params.Topic) + ` (offset_consumed, consumer_group)
VALUES (?, ?) ON DUPLICATE KEY UPDATE offset_consumed=VALUES(offset_consumed)`
return Query{ackQuery, []interface{}{params.Row.Offset, params.ConsumerGroup}}
return Query{ackQuery, []interface{}{params.Row.Offset, params.ConsumerGroup}}, nil
}

func (a DefaultMySQLOffsetsAdapter) BeforeSubscribingQueries(params BeforeSubscribingQueriesParams) []Query {
return nil
func (a DefaultMySQLOffsetsAdapter) BeforeSubscribingQueries(params BeforeSubscribingQueriesParams) ([]Query, error) {
return nil, nil
}
20 changes: 10 additions & 10 deletions pkg/sql/offsets_adapter_postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type DefaultPostgreSQLOffsetsAdapter struct {
GenerateMessagesOffsetsTableName func(topic string) string
}

func (a DefaultPostgreSQLOffsetsAdapter) SchemaInitializingQueries(params OffsetsSchemaInitializingQueriesParams) []Query {
func (a DefaultPostgreSQLOffsetsAdapter) SchemaInitializingQueries(params OffsetsSchemaInitializingQueriesParams) ([]Query, error) {
return []Query{
{
Query: `
Expand All @@ -29,10 +29,10 @@ func (a DefaultPostgreSQLOffsetsAdapter) SchemaInitializingQueries(params Offset
PRIMARY KEY(consumer_group)
)`,
},
}
}, nil
}

func (a DefaultPostgreSQLOffsetsAdapter) NextOffsetQuery(params NextOffsetQueryParams) Query {
func (a DefaultPostgreSQLOffsetsAdapter) NextOffsetQuery(params NextOffsetQueryParams) (Query, error) {
return Query{
Query: `
SELECT
Expand All @@ -43,10 +43,10 @@ func (a DefaultPostgreSQLOffsetsAdapter) NextOffsetQuery(params NextOffsetQueryP
FOR UPDATE
`,
Args: []any{params.ConsumerGroup},
}
}, nil
}

func (a DefaultPostgreSQLOffsetsAdapter) AckMessageQuery(params AckMessageQueryParams) Query {
func (a DefaultPostgreSQLOffsetsAdapter) AckMessageQuery(params AckMessageQueryParams) (Query, error) {
ackQuery := `INSERT INTO ` + a.MessagesOffsetsTable(params.Topic) + `(offset_acked, last_processed_transaction_id, consumer_group)
VALUES
($1, $2, $3)
Expand All @@ -56,7 +56,7 @@ func (a DefaultPostgreSQLOffsetsAdapter) AckMessageQuery(params AckMessageQueryP
offset_acked = excluded.offset_acked,
last_processed_transaction_id = excluded.last_processed_transaction_id`

return Query{ackQuery, []any{params.LastRow.Offset, params.LastRow.ExtraData["transaction_id"], params.ConsumerGroup}}
return Query{ackQuery, []any{params.LastRow.Offset, params.LastRow.ExtraData["transaction_id"], params.ConsumerGroup}}, nil
}

func (a DefaultPostgreSQLOffsetsAdapter) MessagesOffsetsTable(topic string) string {
Expand All @@ -66,11 +66,11 @@ func (a DefaultPostgreSQLOffsetsAdapter) MessagesOffsetsTable(topic string) stri
return fmt.Sprintf(`"watermill_offsets_%s"`, topic)
}

func (a DefaultPostgreSQLOffsetsAdapter) ConsumedMessageQuery(params ConsumedMessageQueryParams) Query {
return Query{}
func (a DefaultPostgreSQLOffsetsAdapter) ConsumedMessageQuery(params ConsumedMessageQueryParams) (Query, error) {
return Query{}, nil
}

func (a DefaultPostgreSQLOffsetsAdapter) BeforeSubscribingQueries(params BeforeSubscribingQueriesParams) []Query {
func (a DefaultPostgreSQLOffsetsAdapter) BeforeSubscribingQueries(params BeforeSubscribingQueriesParams) ([]Query, error) {
return []Query{
{
// It's required for exactly-once-delivery guarantee.
Expand All @@ -84,5 +84,5 @@ func (a DefaultPostgreSQLOffsetsAdapter) BeforeSubscribingQueries(params BeforeS
Query: `INSERT INTO ` + a.MessagesOffsetsTable(params.Topic) + ` (consumer_group, offset_acked, last_processed_transaction_id) VALUES ($1, 0, '0') ON CONFLICT DO NOTHING;`,
Args: []any{params.ConsumerGroup},
},
}
}, nil
}
3 changes: 2 additions & 1 deletion pkg/sql/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -665,11 +665,12 @@ func TestDefaultPostgreSQLSchema_planner_mis_estimate_regression(t *testing.T) {
<-messages // wait for the subscriber to finish

schemAdapterBatch1 := newPostgresSchemaAdapter(1)
q := schemAdapterBatch1.SelectQuery(sql.SelectQueryParams{
q, err := schemAdapterBatch1.SelectQuery(sql.SelectQueryParams{
Topic: topicName,
ConsumerGroup: "",
OffsetsAdapter: offsetsAdapter,
})
require.NoError(t, err)

var analyseResult string

Expand Down
20 changes: 10 additions & 10 deletions pkg/sql/queue_offsets_adapter_postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ type PostgreSQLQueueOffsetsAdapter struct {
GenerateMessagesTableName func(topic string) string
}

func (a PostgreSQLQueueOffsetsAdapter) SchemaInitializingQueries(params OffsetsSchemaInitializingQueriesParams) []Query {
return []Query{}
func (a PostgreSQLQueueOffsetsAdapter) SchemaInitializingQueries(params OffsetsSchemaInitializingQueriesParams) ([]Query, error) {
return []Query{}, nil
}

func (a PostgreSQLQueueOffsetsAdapter) NextOffsetQuery(params NextOffsetQueryParams) Query {
return Query{}
func (a PostgreSQLQueueOffsetsAdapter) NextOffsetQuery(params NextOffsetQueryParams) (Query, error) {
return Query{}, nil
}

func (a PostgreSQLQueueOffsetsAdapter) AckMessageQuery(params AckMessageQueryParams) Query {
func (a PostgreSQLQueueOffsetsAdapter) AckMessageQuery(params AckMessageQueryParams) (Query, error) {
if params.ConsumerGroup != "" {
panic("consumer groups are not supported in PostgreSQLQueueOffsetsAdapter")
}
Expand All @@ -44,7 +44,7 @@ func (a PostgreSQLQueueOffsetsAdapter) AckMessageQuery(params AckMessageQueryPar
offsets[i] = row.Offset
}

return Query{ackQuery, []any{pq.Array(offsets)}}
return Query{ackQuery, []any{pq.Array(offsets)}}, nil
}

func (a PostgreSQLQueueOffsetsAdapter) MessagesTable(topic string) string {
Expand All @@ -54,10 +54,10 @@ func (a PostgreSQLQueueOffsetsAdapter) MessagesTable(topic string) string {
return fmt.Sprintf(`"watermill_%s"`, topic)
}

func (a PostgreSQLQueueOffsetsAdapter) ConsumedMessageQuery(params ConsumedMessageQueryParams) Query {
return Query{}
func (a PostgreSQLQueueOffsetsAdapter) ConsumedMessageQuery(params ConsumedMessageQueryParams) (Query, error) {
return Query{}, nil
}

func (a PostgreSQLQueueOffsetsAdapter) BeforeSubscribingQueries(params BeforeSubscribingQueriesParams) []Query {
return []Query{}
func (a PostgreSQLQueueOffsetsAdapter) BeforeSubscribingQueries(params BeforeSubscribingQueriesParams) ([]Query, error) {
return []Query{}, nil
}
11 changes: 6 additions & 5 deletions pkg/sql/queue_schema_adapter_postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sql
import (
"database/sql"
"encoding/json"
"errors"
"fmt"
"strings"

Expand Down Expand Up @@ -38,7 +39,7 @@ type PostgreSQLQueueSchema struct {
SubscribeBatchSize int
}

func (s PostgreSQLQueueSchema) SchemaInitializingQueries(params SchemaInitializingQueriesParams) []Query {
func (s PostgreSQLQueueSchema) SchemaInitializingQueries(params SchemaInitializingQueriesParams) ([]Query, error) {
createMessagesTable := `
CREATE TABLE IF NOT EXISTS ` + s.MessagesTable(params.Topic) + ` (
"offset" SERIAL PRIMARY KEY,
Expand All @@ -50,7 +51,7 @@ func (s PostgreSQLQueueSchema) SchemaInitializingQueries(params SchemaInitializi
);
`

return []Query{{Query: createMessagesTable}}
return []Query{{Query: createMessagesTable}}, nil
}

func (s PostgreSQLQueueSchema) InsertQuery(params InsertQueryParams) (Query, error) {
Expand Down Expand Up @@ -88,9 +89,9 @@ func (s PostgreSQLQueueSchema) batchSize() int {
return s.SubscribeBatchSize
}

func (s PostgreSQLQueueSchema) SelectQuery(params SelectQueryParams) Query {
func (s PostgreSQLQueueSchema) SelectQuery(params SelectQueryParams) (Query, error) {
if params.ConsumerGroup != "" {
panic("consumer groups are not supported in PostgreSQLQueueSchema")
return Query{}, errors.New("consumer groups are not supported in PostgreSQLQueueSchema")
}

whereParams := GenerateWhereClauseParams{
Expand All @@ -115,7 +116,7 @@ func (s PostgreSQLQueueSchema) SelectQuery(params SelectQueryParams) Query {
LIMIT ` + fmt.Sprintf("%d", s.batchSize()) + `
FOR UPDATE`

return Query{selectQuery, args}
return Query{selectQuery, args}, nil
}

func (s PostgreSQLQueueSchema) UnmarshalMessage(params UnmarshalMessageParams) (Row, error) {
Expand Down
11 changes: 9 additions & 2 deletions pkg/sql/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,20 @@ func initializeSchema(
return err
}

initializingQueries := schemaAdapter.SchemaInitializingQueries(SchemaInitializingQueriesParams{
initializingQueries, err := schemaAdapter.SchemaInitializingQueries(SchemaInitializingQueriesParams{
Topic: topic,
})
if err != nil {
return fmt.Errorf("could not generate schema initializing queries: %w", err)
}

if offsetsAdapter != nil {
queries := offsetsAdapter.SchemaInitializingQueries(OffsetsSchemaInitializingQueriesParams{
queries, err := offsetsAdapter.SchemaInitializingQueries(OffsetsSchemaInitializingQueriesParams{
Topic: topic,
})
if err != nil {
return fmt.Errorf("could not generate offset adapter's schema initializing queries: %w", err)
}
initializingQueries = append(initializingQueries, queries...)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/schema_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ type SchemaAdapter interface {

// SelectQuery returns the SQL query and arguments
// that returns the next unread message for a given consumer group.
SelectQuery(params SelectQueryParams) Query
SelectQuery(params SelectQueryParams) (Query, error)

// UnmarshalMessage transforms the Row obtained SelectQuery a Watermill message.
// It also returns the offset of the last read message, for the purpose of acking.
UnmarshalMessage(params UnmarshalMessageParams) (Row, error)

// SchemaInitializingQueries returns SQL queries which will make sure (CREATE IF NOT EXISTS)
// that the appropriate tables exist to write messages to the given topic.
SchemaInitializingQueries(params SchemaInitializingQueriesParams) []Query
SchemaInitializingQueries(params SchemaInitializingQueriesParams) ([]Query, error)

// SubscribeIsolationLevel returns the isolation level that will be used when subscribing.
SubscribeIsolationLevel() sql.IsolationLevel
Expand Down
13 changes: 8 additions & 5 deletions pkg/sql/schema_adapter_mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type DefaultMySQLSchema struct {
SubscribeBatchSize int
}

func (s DefaultMySQLSchema) SchemaInitializingQueries(params SchemaInitializingQueriesParams) []Query {
func (s DefaultMySQLSchema) SchemaInitializingQueries(params SchemaInitializingQueriesParams) ([]Query, error) {
createMessagesTable := strings.Join([]string{
"CREATE TABLE IF NOT EXISTS " + s.MessagesTable(params.Topic) + " (",
"`offset` BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,",
Expand All @@ -63,7 +63,7 @@ func (s DefaultMySQLSchema) SchemaInitializingQueries(params SchemaInitializingQ
");",
}, "\n")

return []Query{{Query: createMessagesTable}}
return []Query{{Query: createMessagesTable}}, nil
}

func (s DefaultMySQLSchema) InsertQuery(params InsertQueryParams) (Query, error) {
Expand All @@ -89,20 +89,23 @@ func (s DefaultMySQLSchema) batchSize() int {
return s.SubscribeBatchSize
}

func (s DefaultMySQLSchema) SelectQuery(params SelectQueryParams) Query {
func (s DefaultMySQLSchema) SelectQuery(params SelectQueryParams) (Query, error) {
nextOffsetParams := NextOffsetQueryParams{
Topic: params.Topic,
ConsumerGroup: params.ConsumerGroup,
}
nextOffsetQuery := params.OffsetsAdapter.NextOffsetQuery(nextOffsetParams)
nextOffsetQuery, err := params.OffsetsAdapter.NextOffsetQuery(nextOffsetParams)
if err != nil {
return Query{}, err
}

// It's important to wrap offset with "`" for MariaDB.
// See https://github.com/ThreeDotsLabs/watermill/issues/377
selectQuery := "SELECT `offset`, `uuid`, `payload`, `metadata` FROM " + s.MessagesTable(params.Topic) +
" WHERE `offset` > (" + nextOffsetQuery.Query + ") ORDER BY `offset` ASC" +
` LIMIT ` + fmt.Sprintf("%d", s.batchSize())

return Query{Query: selectQuery, Args: nextOffsetQuery.Args}
return Query{Query: selectQuery, Args: nextOffsetQuery.Args}, nil
}

func (s DefaultMySQLSchema) UnmarshalMessage(params UnmarshalMessageParams) (Row, error) {
Expand Down
Loading

0 comments on commit ed30aa8

Please sign in to comment.