Skip to content

Commit

Permalink
autoclaim messages for dead consumers
Browse files Browse the repository at this point in the history
  • Loading branch information
covrom committed Nov 28, 2022
1 parent 7afd82e commit d80fc76
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 19 deletions.
86 changes: 68 additions & 18 deletions subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"strings"
"time"

"github.com/go-redis/redis/v9"
"gocloud.dev/gcerrors"
Expand All @@ -20,18 +21,21 @@ var recvBatcherOpts = &batcher.Options{
}

type subscription struct {
broker *redis.Client
group string
topic string
opts SubscriptionOptions
args *redis.XReadGroupArgs
broker *redis.Client
group string
topic string
opts SubscriptionOptions
args *redis.XReadGroupArgs
args0 *redis.XReadGroupArgs
autoclaim *redis.XAutoClaimArgs
}

// SubscriptionOptions contains configuration for subscriptions.
type SubscriptionOptions struct {
From string // starting id ($ after tail of stream), 0 by default (from head of stream)
Consumer string // unique consumer name
NoAck bool
From string // starting id ($ after tail of stream), 0 by default (from head of stream)
Consumer string // unique consumer name
NoAck bool
AutoClaimIdleTime time.Duration
}

// OpenSubscription creates a pubsub.Subscription that joins group, receiving
Expand All @@ -54,6 +58,9 @@ func openSubscription(broker *redis.Client, group, topic string, opts *Subscript
if opts.From == "" {
opts.From = "0"
}
if opts.AutoClaimIdleTime == 0 {
opts.AutoClaimIdleTime = 30 * time.Minute
}
// Create a consumer group eater on the stream, and start consuming from
// the latest message (represented by $) or From id
_, err := broker.XGroupCreateMkStream(context.Background(), topic, group, opts.From).Result()
Expand All @@ -69,46 +76,85 @@ func openSubscription(broker *redis.Client, group, topic string, opts *Subscript
Streams: []string{topic, ">"}, // stream
Block: 0, // infinite waiting
NoAck: opts.NoAck, // Confirmation required
Count: 1,
}

xReadGroupArgs0 := &redis.XReadGroupArgs{
Group: group, // consumer group
Consumer: opts.Consumer, // Consumer, created on-the-fly
Streams: []string{topic, "0"}, // stream
Block: 0, // infinite waiting
NoAck: opts.NoAck, // Confirmation required
Count: 1,
}

xAutoClaimArgs := &redis.XAutoClaimArgs{
Start: "0-0",
Stream: topic,
Group: group,
MinIdle: opts.AutoClaimIdleTime,
Count: 1,
Consumer: opts.Consumer,
}

ds := &subscription{
broker: broker,
opts: *opts,
args: xReadGroupArgs,
group: group,
topic: topic,
broker: broker,
opts: *opts,
args: xReadGroupArgs,
args0: xReadGroupArgs0,
autoclaim: xAutoClaimArgs,
group: group,
topic: topic,
}
return ds, nil
}

// ReceiveBatch implements driver.Subscription.ReceiveBatch.
func (s *subscription) ReceiveBatch(ctx context.Context, maxMessages int) ([]*driver.Message, error) {
args := *s.args
// if maxMessages > 0 {
// args.Count = int64(maxMessages)
// }

// XAUTOCLAIM identifies idle pending messages, captured by dead consumers,
// and transfers ownership of them to a consumer.
if dm, err := s.receiveAutoClaimMessage(ctx, s.autoclaim); dm != nil && err == nil {
return []*driver.Message{dm}, nil
}

// What will happen if we crash in the middle of processing messages,
// is that our messages will remain in the pending entries list,
// so we can access our history by giving XREADGROUP initially an ID of 0,
// and performing the same loop. Once providing an ID of 0 the reply
// is an empty set of messages, we know that we processed and acknowledged
// all the pending messages.
args0 := args
args0.Streams = []string{args.Streams[0], "0"}
if dm, err := s.receiveNextMessage(ctx, &args0); dm != nil && err == nil {
if dm, err := s.receiveNextMessage(ctx, s.args0); dm != nil && err == nil {
return []*driver.Message{dm}, nil
}

// We can start to use > as ID, in order to get the new messages
// and rejoin the consumers that are processing new things.
dm, err := s.receiveNextMessage(ctx, &args)
dm, err := s.receiveNextMessage(ctx, s.args)
if err != nil {
return nil, err
}
return []*driver.Message{dm}, nil
}

func (s *subscription) receiveAutoClaimMessage(ctx context.Context, args *redis.XAutoClaimArgs) (*driver.Message, error) {
msgs, _, err := s.broker.XAutoClaim(ctx, args).Result()
if err != nil || ctx.Err() != nil {
if err == nil {
err = ctx.Err()
}
return nil, err
}
if len(msgs) == 0 {
return nil, nil
}
msg := msgs[0]
return driverMsgFromRedisMsg(msg)
}

func (s *subscription) receiveNextMessage(ctx context.Context, args *redis.XReadGroupArgs) (*driver.Message, error) {
xStreamSlice, err := s.broker.XReadGroup(ctx, args).Result()
if err != nil || ctx.Err() != nil {
Expand All @@ -121,6 +167,10 @@ func (s *subscription) receiveNextMessage(ctx context.Context, args *redis.XRead
return nil, nil
}
msg := xStreamSlice[0].Messages[0]
return driverMsgFromRedisMsg(msg)
}

func driverMsgFromRedisMsg(msg redis.XMessage) (*driver.Message, error) {
bd := []byte(msg.Values["body"].(string))
var bm map[string]string
if err := json.Unmarshal([]byte(msg.Values["headers"].(string)), &bm); err != nil {
Expand Down
12 changes: 11 additions & 1 deletion urlopener.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/url"
"path"
"strconv"
"time"

"github.com/go-redis/redis/v9"
"gocloud.dev/pubsub"
Expand Down Expand Up @@ -53,7 +54,7 @@ func (o *URLOpener) OpenTopicURL(ctx context.Context, u *url.URL) (*pubsub.Topic

// OpenSubscriptionURL opens a pubsub.Subscription based on u.
func (o *URLOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsub.Subscription, error) {
var topic, consumer string
var topic, consumer, autoclaim string
var noack bool
from := ""
for param, value := range u.Query() {
Expand All @@ -64,6 +65,8 @@ func (o *URLOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsu
from = value[0]
case "consumer":
consumer = value[0]
case "autoclaim":
autoclaim = value[0]
case "noack":
noack = true
default:
Expand All @@ -75,6 +78,13 @@ func (o *URLOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsu
}
o.SubscriptionOptions.From = from
o.SubscriptionOptions.NoAck = noack
if autoclaim != "" {
dur, err := time.ParseDuration(autoclaim)
if err != nil {
return nil, fmt.Errorf("open subscription %v: bad autoclaim: %w", u, err)
}
o.SubscriptionOptions.AutoClaimIdleTime = dur
}
group := path.Join(u.Host, u.Path)
if group == "" {
return nil, fmt.Errorf("open subscription %v: undefined host/path group name", u)
Expand Down

0 comments on commit d80fc76

Please sign in to comment.