-
Notifications
You must be signed in to change notification settings - Fork 0
/
http_consumer.go
150 lines (136 loc) · 4.38 KB
/
http_consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
package rocketmq
import (
"context"
"fmt"
hmq "github.com/aliyunmq/mq-http-go-sdk"
mqc "github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/gogap/errors"
log "github.com/sirupsen/logrus"
"strings"
"sync"
"time"
)
type MQHttpConsumer struct {
md *Metadata
client hmq.MQClient
contextMap sync.Map
}
func init() {
Consumers[DefaultHttpAccessProto] = &MQHttpConsumer{}
}
// NewMQHttpConsumer
func NewMQHttpConsumer(md *Metadata) (*MQHttpConsumer, error) {
mq := &MQHttpConsumer{md: md}
return mq, mq.Init(md)
}
func (mq *MQHttpConsumer) Init(md *Metadata) error {
mq.md = md
mq.client = hmq.NewAliyunMQClientWithTimeout(md.Endpoint, md.AccessKey, md.SecretKey, "", time.Second*defaultHttpMQClientTimeoutSeconds)
if md.ConsumerBatchSize < 1 {
mq.md.ConsumerBatchSize = defaultConsumerNumOfMessages
}
return nil
}
func (mq *MQHttpConsumer) consumeMsg(ctx context.Context, mqConsumer hmq.MQConsumer, topic string, msgEntry hmq.ConsumeMessageEntry, f func(context.Context, ...*primitive.MessageExt) (mqc.ConsumeResult, error)) error {
msg := &primitive.MessageExt{}
msg.MsgId = msgEntry.MessageId
msg.BornTimestamp = msgEntry.PublishTime
msg.Topic = topic
msg.Body = []byte(msgEntry.MessageBody)
msg.WithProperties(msgEntry.Properties)
status, err := f(ctx, msg)
if err != nil {
return fmt.Errorf("consume message failed. topic:%s MessageID:%s %w", topic, msgEntry.MessageId, err)
}
if status != mqc.ConsumeSuccess {
return fmt.Errorf("status not success,topic:%s MessageID:%s", topic, msgEntry.MessageId)
}
var handles []string
handles = append(handles, msgEntry.ReceiptHandle)
ackErr := mqConsumer.AckMessage(handles)
if ackErr != nil {
for _, errAckItem := range ackErr.(errors.ErrCode).Context()["Detail"].([]hmq.ErrAckItem) {
log.Printf("[rocketmq-http] topic: %s, errorHandle:%s, errorCode:%s, errorMsg:%s",
topic, errAckItem.ErrorHandle, errAckItem.ErrorCode, errAckItem.ErrorMsg)
}
return fmt.Errorf("ack message failed. handle:%s", msgEntry.ReceiptHandle)
}
return nil
}
func (mq *MQHttpConsumer) consumeInner(quitCh chan int, topic string, mqConsumer hmq.MQConsumer, f func(context.Context, ...*primitive.MessageExt) (mqc.ConsumeResult, error)) {
done := false
for !done {
respChan := make(chan hmq.ConsumeMessageResponse)
errChan := make(chan error)
go func() {
mqConsumer.ConsumeMessage(respChan, errChan, int32(mq.md.ConsumerBatchSize), defaultHttpConsumerWaitSeconds)
}()
var err error
select {
case <-quitCh:
quitCh <- 1
done = true
case resp := <-respChan:
if len(resp.Messages) > 1 {
wg := sync.WaitGroup{}
wg.Add(len(resp.Messages))
for _, v := range resp.Messages {
msg := v
go func() {
defer wg.Done()
err = mq.consumeMsg(context.TODO(), mqConsumer, topic, msg, f)
}()
}
wg.Wait()
} else if len(resp.Messages) == 1 {
err = mq.consumeMsg(context.TODO(), mqConsumer, topic, resp.Messages[0], f)
}
case err := <-errChan:
if !strings.Contains(err.(errors.ErrCode).Error(), "MessageNotExist") {
log.Println(err)
time.Sleep(time.Duration(3) * time.Second)
} else {
//log.Println("[rocketmq-http] No new message, continue!")
}
case <-time.After(defaultHttpConsumerTimeoutSeconds * time.Second):
log.Printf("[rocketmq-http] topic: %s, timeout of consumer message\n", topic)
}
if err != nil {
log.Println(err)
}
}
}
// Start the PullConsumer for consuming message
func (mq *MQHttpConsumer) Start() error {
return nil
}
// Shutdown the PullConsumer
func (mq *MQHttpConsumer) Shutdown() error {
mq.contextMap.Range(func(key, value interface{}) bool {
if err := mq.Unsubscribe(key.(string)); err != nil {
return false
}
return true
})
return nil
}
// Subscribe a topic for consuming
func (mq *MQHttpConsumer) Subscribe(topic string, selector mqc.MessageSelector, f func(context.Context, ...*primitive.MessageExt) (mqc.ConsumeResult, error)) error {
mqConsumer := mq.client.GetConsumer(mq.md.InstanceId, topic, mq.md.ConsumerGroup, selector.Expression)
quitCh := make(chan int)
mq.contextMap.Store(topic, quitCh)
go mq.consumeInner(quitCh, topic, mqConsumer, f)
return nil
}
// Unsubscribe a topic
func (mq *MQHttpConsumer) Unsubscribe(topic string) error {
v, ok := mq.contextMap.LoadAndDelete(topic)
if ok {
quitCh := v.(chan int)
quitCh <- 1
<-quitCh
close(quitCh)
}
return nil
}