diff --git a/subscribe.go b/subscribe.go index 4374bda..a4e4dd6 100644 --- a/subscribe.go +++ b/subscribe.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "strings" + "time" "github.com/go-redis/redis/v9" "gocloud.dev/gcerrors" @@ -20,18 +21,21 @@ var recvBatcherOpts = &batcher.Options{ } type subscription struct { - broker *redis.Client - group string - topic string - opts SubscriptionOptions - args *redis.XReadGroupArgs + broker *redis.Client + group string + topic string + opts SubscriptionOptions + args *redis.XReadGroupArgs + args0 *redis.XReadGroupArgs + autoclaim *redis.XAutoClaimArgs } // SubscriptionOptions contains configuration for subscriptions. type SubscriptionOptions struct { - From string // starting id ($ after tail of stream), 0 by default (from head of stream) - Consumer string // unique consumer name - NoAck bool + From string // starting id ($ after tail of stream), 0 by default (from head of stream) + Consumer string // unique consumer name + NoAck bool + AutoClaimIdleTime time.Duration } // OpenSubscription creates a pubsub.Subscription that joins group, receiving @@ -54,6 +58,9 @@ func openSubscription(broker *redis.Client, group, topic string, opts *Subscript if opts.From == "" { opts.From = "0" } + if opts.AutoClaimIdleTime == 0 { + opts.AutoClaimIdleTime = 30 * time.Minute + } // Create a consumer group eater on the stream, and start consuming from // the latest message (represented by $) or From id _, err := broker.XGroupCreateMkStream(context.Background(), topic, group, opts.From).Result() @@ -69,46 +76,85 @@ func openSubscription(broker *redis.Client, group, topic string, opts *Subscript Streams: []string{topic, ">"}, // stream Block: 0, // infinite waiting NoAck: opts.NoAck, // Confirmation required + Count: 1, + } + + xReadGroupArgs0 := &redis.XReadGroupArgs{ + Group: group, // consumer group + Consumer: opts.Consumer, // Consumer, created on-the-fly + Streams: []string{topic, "0"}, // stream + Block: 0, // infinite waiting + NoAck: opts.NoAck, // Confirmation required + Count: 1, + } + + xAutoClaimArgs := &redis.XAutoClaimArgs{ + Start: "0-0", + Stream: topic, + Group: group, + MinIdle: opts.AutoClaimIdleTime, + Count: 1, + Consumer: opts.Consumer, } ds := &subscription{ - broker: broker, - opts: *opts, - args: xReadGroupArgs, - group: group, - topic: topic, + broker: broker, + opts: *opts, + args: xReadGroupArgs, + args0: xReadGroupArgs0, + autoclaim: xAutoClaimArgs, + group: group, + topic: topic, } return ds, nil } // ReceiveBatch implements driver.Subscription.ReceiveBatch. func (s *subscription) ReceiveBatch(ctx context.Context, maxMessages int) ([]*driver.Message, error) { - args := *s.args // if maxMessages > 0 { // args.Count = int64(maxMessages) // } + // XAUTOCLAIM identifies idle pending messages, captured by dead consumers, + // and transfers ownership of them to a consumer. + if dm, err := s.receiveAutoClaimMessage(ctx, s.autoclaim); dm != nil && err == nil { + return []*driver.Message{dm}, nil + } + // What will happen if we crash in the middle of processing messages, // is that our messages will remain in the pending entries list, // so we can access our history by giving XREADGROUP initially an ID of 0, // and performing the same loop. Once providing an ID of 0 the reply // is an empty set of messages, we know that we processed and acknowledged // all the pending messages. - args0 := args - args0.Streams = []string{args.Streams[0], "0"} - if dm, err := s.receiveNextMessage(ctx, &args0); dm != nil && err == nil { + if dm, err := s.receiveNextMessage(ctx, s.args0); dm != nil && err == nil { return []*driver.Message{dm}, nil } // We can start to use > as ID, in order to get the new messages // and rejoin the consumers that are processing new things. - dm, err := s.receiveNextMessage(ctx, &args) + dm, err := s.receiveNextMessage(ctx, s.args) if err != nil { return nil, err } return []*driver.Message{dm}, nil } +func (s *subscription) receiveAutoClaimMessage(ctx context.Context, args *redis.XAutoClaimArgs) (*driver.Message, error) { + msgs, _, err := s.broker.XAutoClaim(ctx, args).Result() + if err != nil || ctx.Err() != nil { + if err == nil { + err = ctx.Err() + } + return nil, err + } + if len(msgs) == 0 { + return nil, nil + } + msg := msgs[0] + return driverMsgFromRedisMsg(msg) +} + func (s *subscription) receiveNextMessage(ctx context.Context, args *redis.XReadGroupArgs) (*driver.Message, error) { xStreamSlice, err := s.broker.XReadGroup(ctx, args).Result() if err != nil || ctx.Err() != nil { @@ -121,6 +167,10 @@ func (s *subscription) receiveNextMessage(ctx context.Context, args *redis.XRead return nil, nil } msg := xStreamSlice[0].Messages[0] + return driverMsgFromRedisMsg(msg) +} + +func driverMsgFromRedisMsg(msg redis.XMessage) (*driver.Message, error) { bd := []byte(msg.Values["body"].(string)) var bm map[string]string if err := json.Unmarshal([]byte(msg.Values["headers"].(string)), &bm); err != nil { diff --git a/urlopener.go b/urlopener.go index 668c916..1377f33 100644 --- a/urlopener.go +++ b/urlopener.go @@ -6,6 +6,7 @@ import ( "net/url" "path" "strconv" + "time" "github.com/go-redis/redis/v9" "gocloud.dev/pubsub" @@ -53,7 +54,7 @@ func (o *URLOpener) OpenTopicURL(ctx context.Context, u *url.URL) (*pubsub.Topic // OpenSubscriptionURL opens a pubsub.Subscription based on u. func (o *URLOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsub.Subscription, error) { - var topic, consumer string + var topic, consumer, autoclaim string var noack bool from := "" for param, value := range u.Query() { @@ -64,6 +65,8 @@ func (o *URLOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsu from = value[0] case "consumer": consumer = value[0] + case "autoclaim": + autoclaim = value[0] case "noack": noack = true default: @@ -75,6 +78,13 @@ func (o *URLOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsu } o.SubscriptionOptions.From = from o.SubscriptionOptions.NoAck = noack + if autoclaim != "" { + dur, err := time.ParseDuration(autoclaim) + if err != nil { + return nil, fmt.Errorf("open subscription %v: bad autoclaim: %w", u, err) + } + o.SubscriptionOptions.AutoClaimIdleTime = dur + } group := path.Join(u.Host, u.Path) if group == "" { return nil, fmt.Errorf("open subscription %v: undefined host/path group name", u)