-
Notifications
You must be signed in to change notification settings - Fork 3
/
client.go
149 lines (120 loc) · 3.36 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package riago
import (
"time"
"github.com/golang/protobuf/proto"
)
// Client represents a Riak client instance.
type Client struct {
pool *Pool
retryAttempts int
retryDelay time.Duration
readTimeout time.Duration
writeTimeout time.Duration
instrumenter func(*Profile)
}
// NewClient creates a new Riago client with a given address and pool count.
func NewClient(addr string, count int) (c *Client) {
return &Client{
pool: NewPool(addr, count),
retryAttempts: 0,
retryDelay: 500 * time.Millisecond,
}
}
// SetRetryAttempts sets the number of times an operation will be retried before
// returning an error.
func (c *Client) SetRetryAttempts(n int) {
c.retryAttempts = n
}
// SetRetryDelay sets the delay between retries.
func (c *Client) SetRetryDelay(dur time.Duration) {
c.retryDelay = dur
}
// SetReadTimeout establishes a timeout deadline for all connection reads.
func (c *Client) SetReadTimeout(dur time.Duration) {
c.readTimeout = dur
}
// SetWriteTimeout establishes a timeout deadline for all connection write.
func (c *Client) SetWriteTimeout(dur time.Duration) {
c.writeTimeout = dur
}
// SetWaitTimeout establishes a timeout deadline for how long to wait for
// a connection to become available from the pool before returning an error.
func (c *Client) SetWaitTimeout(dur time.Duration) {
c.pool.waitTimeout = dur
}
// SetInstrumenter establishes an instrument function to be called after each
// operation and given a payload of operation profile data.
func (c *Client) SetInstrumenter(fn func(*Profile)) {
c.instrumenter = fn
}
// Performs a Riak Server info request.
func (c *Client) ServerInfo() (resp *RpbGetServerInfoResp, err error) {
prof := NewProfile("server_info", "")
defer c.instrument(prof, err)
resp = &RpbGetServerInfoResp{}
err = c.do(MsgRpbGetServerInfoReq, nil, resp, prof)
return
}
// Performs a single request with a single response
func (c *Client) do(code byte, req proto.Message, resp proto.Message, prof *Profile) (err error) {
err = c.with(func(conn *Conn) (e error) {
t := time.Now()
if e = conn.request(code, req); e != nil {
return
}
prof.Request = time.Now().Sub(t)
t = time.Now()
if e = conn.response(resp); e != nil {
return
}
prof.Response = time.Now().Sub(t)
return
}, prof)
return
}
// Gets and prepares a connection, yields it to the given function and returns the error.
func (c *Client) with(fn func(*Conn) error, prof *Profile) (err error) {
var conn *Conn
t := time.Now()
if conn, err = c.pool.Get(); err != nil {
return
}
prof.ConnWait = time.Now().Sub(t)
t = time.Now()
conn.lock()
prof.ConnLock = time.Now().Sub(t)
conn.readTimeout = c.readTimeout
conn.writeTimeout = c.writeTimeout
if err = fn(conn); err != nil {
conn.close()
conn.unlock()
c.pool.Fail(conn)
return
}
conn.unlock()
c.pool.Put(conn)
return
}
// Retries a function multiple times until it does not return an error.
func (c *Client) retry(fn func() error, prof *Profile) (err error) {
for i := 0; i <= c.retryAttempts; i++ {
if i > 0 {
prof.Retries += 1
}
if err = fn(); err == nil {
return
}
if c.retryDelay > 0 {
<-time.After(c.retryDelay)
}
}
return
}
// Send a profile to the instrumenter
func (c *Client) instrument(prof *Profile, err error) {
if c.instrumenter != nil {
prof.Error = err
prof.Total = time.Now().Sub(prof.start)
c.instrumenter(prof)
}
}