-
Notifications
You must be signed in to change notification settings - Fork 0
/
queue.go
106 lines (88 loc) · 2.5 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
package azqlite
import (
"context"
"fmt"
"time"
"github.com/Azure/azure-storage-queue-go/azqueue"
)
var _ Queue = &queue{}
type queue struct {
queueURL azqueue.QueueURL
messagesURL azqueue.MessagesURL
}
func newQueue(queueURL azqueue.QueueURL) *queue {
messagesURL := queueURL.NewMessagesURL()
return &queue{queueURL, messagesURL}
}
func (q *queue) MessageCount(ctx context.Context) (int, error) {
props, err := q.queueURL.GetProperties(ctx)
if err != nil {
return 0, fmt.Errorf("failed to get queue properties: %w", err)
}
return int(props.ApproximateMessagesCount()), nil
}
type Message struct {
ID string
PopReceipt string
DequeueCount int
Body string
}
func (q *queue) Dequeue(ctx context.Context, count int, timeout time.Duration) ([]*Message, error) {
dequeue, err := q.messagesURL.Dequeue(ctx, int32(count), timeout)
if err != nil {
return nil, fmt.Errorf("failed to dequeue messages: %w", err)
}
numMessages := dequeue.NumMessages()
if numMessages == 0 {
return nil, nil
}
messages := make([]*Message, numMessages)
for i := int32(0); i < numMessages; i++ {
msg := dequeue.Message(i)
messages[i] = &Message{
ID: string(msg.ID),
PopReceipt: string(msg.PopReceipt),
DequeueCount: int(msg.DequeueCount),
Body: msg.Text,
}
}
return messages, nil
}
func (q *queue) Enqueue(ctx context.Context, message string, timeout, ttl time.Duration) (*Message, error) {
msg, err := q.messagesURL.Enqueue(ctx, message, timeout, ttl)
if err != nil {
return nil, fmt.Errorf("failed to enqueue message: %w", err)
}
return &Message{
ID: string(msg.MessageID),
PopReceipt: string(msg.PopReceipt),
DequeueCount: 0,
Body: message,
}, nil
}
func (q *queue) Peek(ctx context.Context, count int) ([]*Message, error) {
peek, err := q.messagesURL.Peek(ctx, int32(count))
if err != nil {
return nil, fmt.Errorf("failed to peek messages: %w", err)
}
numMessages := peek.NumMessages()
if numMessages == 0 {
return nil, nil
}
messages := make([]*Message, numMessages)
for i := int32(0); i < numMessages; i++ {
msg := peek.Message(i)
messages[i] = &Message{
ID: string(msg.ID),
PopReceipt: "",
DequeueCount: int(msg.DequeueCount),
Body: msg.Text,
}
}
return messages, nil
}
func (q *queue) Delete(ctx context.Context, m *Message) error {
msgIDURL := q.messagesURL.NewMessageIDURL(azqueue.MessageID(m.ID))
_, err := msgIDURL.Delete(ctx, azqueue.PopReceipt(m.PopReceipt))
return err
}