-
Notifications
You must be signed in to change notification settings - Fork 3
/
conn.go
178 lines (139 loc) · 3.29 KB
/
conn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
package riago
import (
"io"
"net"
"sync"
"time"
"github.com/golang/protobuf/proto"
)
// Conn represents an individual connection to a Riak host.
type Conn struct {
addr string
conn *net.TCPConn
ok bool
padlock int32
mutex sync.Mutex
readTimeout time.Duration
writeTimeout time.Duration
}
// Create a new Conn instance for the given address
func NewConn(addr string) *Conn {
return &Conn{
addr: addr,
}
}
// Closes the connection, closing the socket (if open) and marking
// the connection as down.
func (c *Conn) Close() error {
c.lock()
defer c.unlock()
return c.close()
}
// Attempts to ping the remote server. Returns an error if the
// request/response fails.
func (c *Conn) Ping() (err error) {
c.lock()
defer c.unlock()
if err = c.request(MsgRpbPingReq, nil); err != nil {
return
}
err = c.response(nil)
return
}
// Attempts to recover a downed connection by re-dialing and marking
// the connection as up in the case of success.
func (c *Conn) Recover() error {
c.lock()
defer c.unlock()
return c.dial()
}
// Attempts to connect to the Riak server. Must be called from within a lock.
func (c *Conn) dial() (err error) {
var tcpAddr *net.TCPAddr
if tcpAddr, err = net.ResolveTCPAddr("tcp", c.addr); err != nil {
return
}
if c.conn, err = net.DialTCP("tcp", nil, tcpAddr); err != nil {
return
}
if err != nil {
c.conn = nil
return
}
c.conn.SetKeepAlive(true)
c.ok = true
return
}
// Closes the connection, closing the socket (if open) and marking
// the connection as down. Must be called from within a lock.
func (c *Conn) close() (err error) {
c.ok = false
if c.conn != nil {
err = c.conn.Close()
c.conn = nil
}
return
}
// Encode and write a request to the Riak server. Must be called from
// within a lock.
func (c *Conn) request(code byte, req proto.Message) (err error) {
var buf []byte
if c.conn == nil || !c.ok {
if err = c.dial(); err != nil {
return
}
}
if buf, err = encode(code, req); err != nil {
return
}
err = c.write(buf)
return
}
// Read and decode a response from the Riak server. Must be called from
// within a lock.
func (c *Conn) response(resp proto.Message) (err error) {
var buf []byte
if buf, err = c.read(); err != nil {
return
}
if err = decode(buf, resp); err != nil {
return
}
return
}
// Write a fully encoded buffer to the connection, establishing a deadline if
// a timeout is set.
func (c *Conn) write(buf []byte) (err error) {
if c.writeTimeout > 0 {
c.conn.SetWriteDeadline(time.Now().Add(c.writeTimeout))
}
_, err = c.conn.Write(buf)
return
}
// Read a length-prefixed buffer from the connection, establishing a deadline
// if a timeout is set.
func (c *Conn) read() (buf []byte, err error) {
var sizebuf []byte
var size int
if c.readTimeout > 0 {
c.conn.SetReadDeadline(time.Now().Add(c.readTimeout))
}
sizebuf = make([]byte, 4)
if _, err = io.ReadFull(c.conn, sizebuf); err != nil {
return
}
size = int(sizebuf[0])<<24 + int(sizebuf[1])<<16 + int(sizebuf[2])<<8 + int(sizebuf[3])
buf = make([]byte, size)
if _, err = io.ReadFull(c.conn, buf); err != nil {
return
}
return
}
// Obtain a big lock on everything scary
func (c *Conn) lock() {
c.mutex.Lock()
}
// Release a big lock on everything scary
func (c *Conn) unlock() {
c.mutex.Unlock()
}