-
Notifications
You must be signed in to change notification settings - Fork 0
/
channel_consume.go
112 lines (100 loc) · 3.18 KB
/
channel_consume.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package grabbit
import (
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
// consumer sets up the consumer feed for the given channel.
//
// It takes a pointer to a Channel as a parameter.
// There is no return value.
func (ch *Channel) consumer() <-chan amqp.Delivery {
if err := ch.baseChan.super.Qos(ch.opt.implParams.PrefetchCount, ch.opt.implParams.PrefetchSize, ch.opt.implParams.QosGlobal); err != nil {
Event{
SourceType: CliChannel,
SourceName: ch.opt.name,
Kind: EventQos,
Err: SomeErrFromError(err, true),
}.raise(ch.opt.notifier)
}
// overwrite the passed queue to consume with the server assigned value
qName := ch.opt.implParams.ConsumerQueue
if len(ch.queue) != 0 {
qName = ch.queue // only when IsDestination
}
consumer, err := ch.baseChan.super.Consume(qName,
ch.opt.implParams.ConsumerName,
ch.opt.implParams.ConsumerAutoAck,
ch.opt.implParams.ConsumerExclusive,
ch.opt.implParams.ConsumerNoLocal,
ch.opt.implParams.ConsumerNoWait,
ch.opt.implParams.ConsumerArgs)
if err != nil {
Event{
SourceType: CliChannel,
SourceName: ch.opt.name,
Kind: EventConsume,
Err: SomeErrFromError(err, true),
}.raise(ch.opt.notifier)
}
return consumer
}
// gobble runs the consumer function.
//
// It consumes messages from the given channel and processes them.
// When messages are received, they are stored in a slice called messages and processed when
// the number of messages reaches a certain count or the prefetch timeout is reached.
//
// Parameters:
// - consumer: a channel of amqp.Delivery for receiving messages.
func (ch *Channel) gobble(consumer <-chan amqp.Delivery) {
var props DeliveriesProperties
mustAck := !ch.opt.implParams.ConsumerAutoAck
messages := make([]DeliveryData, 0, ch.opt.implParams.PrefetchCount)
for {
select {
case <-ch.opt.ctx.Done(): // main chan and notifiers.Consumers should also be gone
ch.Cancel(ch.opt.implParams.ConsumerName, true)
if len(messages) != 0 {
// conn/chan are gone, cannot ACK/NAK anyways
mustAck = false
ch.opt.cbProcessMessages(&props, messages, mustAck, ch)
}
return
case msg, ok := <-consumer: // notifiers data
if !ok {
ch.Cancel(ch.opt.implParams.ConsumerName, true)
if len(messages) != 0 {
// conn/chan are gone, cannot ACK/NAK anyways
mustAck = false
ch.opt.cbProcessMessages(&props, messages, mustAck, ch)
}
return
}
// set props
if len(messages) == 0 {
props = DeliveryPropsFrom(&msg)
}
// set data payload
messages = append(messages, DeliveryDataFrom(&msg))
// process
if len(messages) == ch.opt.implParams.PrefetchCount {
if len(messages) != 0 {
ch.opt.cbProcessMessages(&props, messages, mustAck, ch)
}
messages = make([]DeliveryData, 0, ch.opt.implParams.PrefetchCount)
}
case <-time.After(ch.opt.implParams.PrefetchTimeout):
kind := EventDataExhausted
if len(messages) != 0 {
kind = EventDataPartial
ch.opt.cbProcessMessages(&props, messages, mustAck, ch)
messages = make([]DeliveryData, 0, ch.opt.implParams.PrefetchCount)
}
Event{
SourceType: CliChannel,
SourceName: ch.opt.name,
Kind: kind,
}.raise(ch.opt.notifier)
}
}
}