-
Notifications
You must be signed in to change notification settings - Fork 0
/
rabbitmq.go
118 lines (103 loc) · 2.1 KB
/
rabbitmq.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package rabbitmq
import (
"fmt"
"github.com/gozelle/amqp"
"github.com/gozelle/rabbitmq/consumer"
"github.com/gozelle/rabbitmq/producer"
)
func NewRabbitMQ[T any](addr string) *RabbitMQ[T] {
return &RabbitMQ[T]{addr: addr}
}
type RabbitMQ[T any] struct {
addr string
}
func (r RabbitMQ[T]) openConn() (conn *amqp.Connection, err error) {
conn, err = amqp.Dial(r.addr)
if err != nil {
return
}
return
}
func (r RabbitMQ[T]) openChannel(conn *amqp.Connection) (ch *amqp.Channel, err error) {
ch, err = conn.Channel()
if err != nil {
err = fmt.Errorf("open channel error: %s", err)
return
}
return
}
func (r RabbitMQ[T]) NewConsumer(options ...consumer.Option) (p *consumer.Consumer[T], err error) {
c := &consumer.Config{}
for _, v := range options {
v(c)
}
conn, err := r.openConn()
if err != nil {
return
}
ch, err := r.openChannel(conn)
if err != nil {
return
}
p = consumer.NewConsumer[T](conn, ch, c)
return
}
func (r RabbitMQ[T]) NewProducer(options ...producer.Option) (p *producer.Producer, err error) {
c := &producer.Config{}
for _, v := range options {
v(c)
}
conn, err := r.openConn()
if err != nil {
return
}
ch, err := r.openChannel(conn)
if err != nil {
return
}
if c.Exchange() == nil {
err = fmt.Errorf("expect exchange define use WithExchange")
return
}
err = ch.ExchangeDeclare(
c.Exchange().Name(),
c.Exchange().Kind(),
c.Exchange().Durable(),
c.Exchange().AutoDelete(),
c.Exchange().Internal(),
c.Exchange().NoWait(),
c.Exchange().Args(),
)
if err != nil {
err = fmt.Errorf("exchange declare error: %s", err)
return
}
for _, v := range c.Queues() {
var vv amqp.Queue
vv, err = ch.QueueDeclare(
v.Name(),
v.Durable(),
v.AutoDelete(),
v.Exclusive(),
v.NoWait(),
v.Args(),
)
if err != nil {
err = fmt.Errorf("queue declare error: %s", err)
return
}
err = ch.QueueBind(
vv.Name,
v.BindKey(),
c.Exchange().Name(),
v.NoWait(),
v.Args(),
)
if err != nil {
err = fmt.Errorf("queue bind error: %s", err)
return
}
}
p = producer.NewProducer(conn, ch, c.Exchange().Name())
return
}