-
Notifications
You must be signed in to change notification settings - Fork 0
/
publisher.go
224 lines (192 loc) · 7.63 KB
/
publisher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
package grabbit
import (
"context"
"fmt"
"time"
amqp "github.com/rabbitmq/amqp091-go"
)
type ConfirmationOutcome int
//go:generate stringer -type=ConfirmationOutcome -linecomment
const (
ConfirmationTimeOut ConfirmationOutcome = iota // no timely response
ConfirmationClosed // data confirmation channel is closed
ConfirmationDisabled // base channel has not been put into confirm mode
ConfirmationPrevious // lower sequence number than expected
ConfirmationACK // ACK (publish confirmed)
ConfirmationNAK // NAK (publish negative acknowledgement)
)
// DeferredConfirmation wraps [amqp.DeferredConfirmation] with additional data.
// It inherits (by embedding) all original fields and functonality from the amqp object.
type DeferredConfirmation struct {
*amqp.DeferredConfirmation // wrapped low level confirmation
Outcome ConfirmationOutcome // acknowledgment received stats
RequestSequence uint64 // sequence of the original request (GetNextPublishSeqNo)
ChannelName string // channel name of the publisher
Queue string // queue name of the publisher
}
// Publisher implements an object allowing calling applications
// to publish messages on already established connections.
// Create a publisher instance by calling [NewPublisher].
type Publisher struct {
channel *Channel // assigned channel
opt PublisherOptions // specific options
}
// defaultNotifyPublish provides a base implementation of [CallbackNotifyPublish] which can be
// overwritten with [WithChannelOptionNotifyPublish]. If confirm.Ack is false
// it sends an [EventMessagePublished] kind of event over the notification channel
// (see [WithChannelOptionNotification]) with a literal error containing the delivery tag.
func defaultNotifyPublish(confirm amqp.Confirmation, ch *Channel) {
// FIXME this may get too noisy! Perhaps restrict via some on/off flag?
Event{
SourceType: CliChannel,
SourceName: ch.opt.name,
Kind: EventMessagePublished,
Err: SomeErrFromString(
fmt.Sprintf("delivery tag %d confirmation %v", confirm.DeliveryTag, confirm.Ack),
),
}.raise(ch.opt.notifier)
}
// defaultNotifyReturn provides a base implementation of [CallbackNotifyReturn] which can be
// overwritten with [WithChannelOptionNotifyReturn].
// It sends an [EventMessageReturned] kind of event over the notification channel
// (see [WithChannelOptionNotification]) with a literal error containing the return message ID.
func defaultNotifyReturn(msg amqp.Return, ch *Channel) {
Event{
SourceType: CliChannel,
SourceName: ch.opt.name,
Kind: EventMessageReturned,
Err: SomeErrFromString(
fmt.Sprintf("message %s returned", msg.MessageId),
),
}.raise(ch.opt.notifier)
}
// Channel returns the managed [Channel] which can be further used to extract [SafeBaseChan]
func (p *Publisher) Channel() *Channel {
return p.channel
}
// NewPublisher creates a publisher with the desired options.
// It creates and opens a new dedicated [Channel] using the passed shared connection.
func NewPublisher(conn *Connection, opt PublisherOptions, optionFuncs ...func(*ChannelOptions)) *Publisher {
useParams := ChanUsageParameters{
PublisherUsageOptions: opt.PublisherUsageOptions,
}
chanOpt := append(optionFuncs, WithChannelOptionUsageParams(useParams))
return &Publisher{
channel: NewChannel(conn, chanOpt...),
opt: opt,
}
}
// AwaitDeferredConfirmation waits for the confirmation of a deferred action and updates its outcome.
//
// It takes in a deferred confirmation object and a time duration for the timeout.
// It returns the updated deferred confirmation object.
func (p *Publisher) AwaitDeferredConfirmation(d *DeferredConfirmation, tmr time.Duration) *DeferredConfirmation {
if d.DeferredConfirmation == nil {
d.Outcome = ConfirmationDisabled
return d
}
select {
case <-time.After(tmr):
d.Outcome = ConfirmationTimeOut
case <-p.opt.Context.Done():
d.Outcome = ConfirmationClosed
// FIXME: could this be triggered by recovery before the channel's context?
case <-d.Done():
if d.RequestSequence > d.DeliveryTag {
d.Outcome = ConfirmationPrevious
} else if d.Acked() {
d.Outcome = ConfirmationACK
} else {
d.Outcome = ConfirmationNAK
}
}
return d
}
// Publish wraps the amqp.PublishWithContext using the internal [PublisherOptions]
// cached when the publisher was created.
func (p *Publisher) Publish(msg amqp.Publishing) error {
if p.channel.IsClosed() {
return amqp.ErrClosed
}
return p.channel.PublishWithContext(
p.opt.Context, p.opt.Exchange, p.opt.Key, p.opt.Mandatory, p.opt.Immediate,
msg)
}
// PublishDeferredConfirm wraps the amqp.PublishWithDeferredConfirmWithContext using the internal [PublisherOptions]
// cached when the publisher was created.
func (p *Publisher) PublishDeferredConfirm(msg amqp.Publishing) (*DeferredConfirmation, error) {
if p.channel.IsClosed() {
return nil, amqp.ErrClosed
}
var err error
confirmation := &DeferredConfirmation{
Outcome: ConfirmationClosed,
ChannelName: p.channel.Name(),
Queue: p.channel.Queue(),
RequestSequence: p.channel.GetNextPublishSeqNo(),
}
confirmation.DeferredConfirmation, err = p.channel.PublishWithDeferredConfirmWithContext(
p.opt.Context, p.opt.Exchange, p.opt.Key, p.opt.Mandatory, p.opt.Immediate, msg)
return confirmation, err
}
// PublishWithOptions wraps the amqp.PublishWithContext using the passed options.
func (p *Publisher) PublishWithOptions(opt PublisherOptions, msg amqp.Publishing) error {
if p.channel.IsClosed() {
return amqp.ErrClosed
}
return p.channel.PublishWithContext(
opt.Context, opt.Exchange, opt.Key, opt.Mandatory, opt.Immediate,
msg)
}
// PublishDeferredConfirmWithOptions wraps the amqp.PublishWithDeferredConfirmWithContext using the passed options.
func (p *Publisher) PublishDeferredConfirmWithOptions(opt PublisherOptions, msg amqp.Publishing) (*DeferredConfirmation, error) {
if p.channel.IsClosed() {
return nil, amqp.ErrClosed
}
var err error
confirmation := &DeferredConfirmation{
Outcome: ConfirmationClosed,
ChannelName: p.channel.Name(),
Queue: p.channel.Queue(),
RequestSequence: p.channel.GetNextPublishSeqNo(),
}
confirmation.DeferredConfirmation, err = p.channel.PublishWithDeferredConfirmWithContext(
opt.Context, opt.Exchange, opt.Key, opt.Mandatory, opt.Immediate, msg)
return confirmation, err
}
// Available returns the status of both the underlying connection and channel.
func (p *Publisher) Available() (bool, bool) {
return !p.channel.conn.IsClosed(), !p.channel.IsClosed()
}
// AwaitAvailable waits till the publisher infrastructure is ready or timeout expires.
// Useful when the connections and channels are about being created or recovering.
// When passing zero value parameter the defaults used are 7500ms for timeout and
// 330 ms for polling frequency.
func (p *Publisher) AwaitAvailable(timeout, pollFreq time.Duration) bool {
if timeout == 0 {
timeout = 7500 * time.Millisecond
}
if pollFreq == 0 {
pollFreq = 330 * time.Millisecond
}
d := time.Now().Add(timeout)
ctxLocal, cancel := context.WithDeadline(p.channel.opt.ctx, d)
defer cancel()
// status polling
ticker := time.NewTicker(pollFreq)
defer ticker.Stop()
for {
select {
case <-ctxLocal.Done():
return false
case <-ticker.C:
if connUp, chanUp := p.Available(); connUp && chanUp {
return true
}
}
}
}
// Close shuts down cleanly the publisher channel.
func (p *Publisher) Close() error {
return p.channel.Close()
}