forked from wagslane/go-rabbitmq
-
Notifications
You must be signed in to change notification settings - Fork 0
/
connection.go
90 lines (74 loc) · 2.5 KB
/
connection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package rabbitmq
import (
"math/rand"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/wagslane/go-rabbitmq/internal/connectionmanager"
)
// Conn manages the connection to a rabbit cluster
// it is intended to be shared across publishers and consumers
type Conn struct {
connectionManager *connectionmanager.ConnectionManager
reconnectErrCh <-chan error
closeConnectionToManagerCh chan<- struct{}
options ConnectionOptions
}
// Config wraps amqp.Config
// Config is used in DialConfig and Open to specify the desired tuning
// parameters used during a connection open handshake. The negotiated tuning
// will be stored in the returned connection's Config field.
type Config amqp.Config
type Resolver = connectionmanager.Resolver
type StaticResolver struct {
urls []string
shuffle bool
}
func (r *StaticResolver) Resolve() ([]string, error) {
// TODO: move to slices.Clone when supported Go versions > 1.21
var urls []string
urls = append(urls, r.urls...)
if r.shuffle {
rand.Shuffle(len(urls), func(i, j int) {
urls[i], urls[j] = urls[j], urls[i]
})
}
return urls, nil
}
func NewStaticResolver(urls []string, shuffle bool) *StaticResolver {
return &StaticResolver{urls: urls, shuffle: shuffle}
}
// NewConn creates a new connection manager
func NewConn(url string, opts ...func(*ConnectionOptions)) (*Conn, error) {
return NewClusterConn(NewStaticResolver([]string{url}, false), opts...)
}
func NewClusterConn(resolver Resolver, opts ...func(*ConnectionOptions)) (*Conn, error) {
defaultOptions := getDefaultConnectionOptions()
options := &defaultOptions
for _, optFn := range opts {
optFn(options)
}
manager, err := connectionmanager.NewConnectionManager(resolver, amqp.Config(options.Config), options.Logger, options.ReconnectInterval)
if err != nil {
return nil, err
}
reconnectErrCh, closeCh := manager.NotifyReconnect()
conn := &Conn{
connectionManager: manager,
reconnectErrCh: reconnectErrCh,
closeConnectionToManagerCh: closeCh,
options: *options,
}
go conn.handleRestarts()
return conn, nil
}
func (conn *Conn) handleRestarts() {
for err := range conn.reconnectErrCh {
conn.options.Logger.Infof("successful connection recovery from: %v", err)
}
}
// Close closes the connection, it's not safe for re-use.
// You should also close any consumers and publishers before
// closing the connection
func (conn *Conn) Close() error {
conn.closeConnectionToManagerCh <- struct{}{}
return conn.connectionManager.Close()
}