forked from armon/go-chord
-
Notifications
You must be signed in to change notification settings - Fork 0
/
obj_client.go
280 lines (230 loc) · 7.88 KB
/
obj_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
package buddystore
import (
"fmt"
"math/rand"
"time"
"github.com/golang/glog"
)
const RETRY_WAIT = 1 * time.Millisecond
type KVStoreClient interface {
Get(key string, retry bool) ([]byte, error)
Set(key string, val []byte) error
GetForSet(key string, retry bool) ([]byte, uint, error)
SetVersion(key string, version uint, val []byte) error
}
type KVStoreClientImpl struct {
ring RingIntf
lm LMClientIntf
// Implements: KVStoreClient
}
var _ KVStoreClient = &KVStoreClientImpl{}
func NewKVStoreClient(ring *Ring) *KVStoreClientImpl {
lm := &LManagerClient{Ring: ring, RLocks: make(map[string]*RLockVal), WLocks: make(map[string]*WLockVal)}
return &KVStoreClientImpl{ring: ring, lm: lm}
}
func NewKVStoreClientWithLM(ringIntf RingIntf, lm LMClientIntf) *KVStoreClientImpl {
return &KVStoreClientImpl{ring: ringIntf, lm: lm}
}
// Inform the lock manager we're interested in reading the value for key.
// Expected return value:
// Current version number associated with key
// Expected error conditions:
// Network failure => Retryable failure
// Key does not exist => Fail immediately
//
// Once current version number has been successfully read, contact nodes
// KV Store to read value at expected version.
// Expected error conditions:
// Key/version does not exist => Retry with another node
// All nodes returned error => Fail
//
// Optimization:
// Prioritize reading from local vnode if one of them may contain this data.
func (kv KVStoreClientImpl) Get(key string, retry bool) ([]byte, error) {
var err error = fmt.Errorf("DUMMY")
var val []byte
for err != nil {
val, err = kv.getWithoutRetry(key)
// glog.Infof("Get(key) => %s [Err: %s]", val, err)
if !retry || !isRetryable(err) {
return val, err
}
// TODO: Use some kind of backoff mechanism, like in
// https://github.com/cenkalti/backoff
time.Sleep(RETRY_WAIT)
}
return val, nil
}
func (kv KVStoreClientImpl) getWithoutRetry(key string) ([]byte, error) {
v, err := kv.lm.RLock(key, false)
if err != nil {
glog.Errorf("Error acquiring RLock in Get(%q): %s", key, err)
return nil, err
}
succVnodesTemp, err := kv.ring.Lookup(kv.ring.GetNumSuccessors(), []byte(key))
if err != nil {
glog.Errorf("Error listing successors in Get(%q): %q", key, err)
return nil, err
}
if len(succVnodesTemp) == 0 {
glog.Errorf("No successors found during Lookup in Get(%q)", key)
return nil, fmt.Errorf("No Successors found")
}
succVnodes := make([]*Vnode, len(succVnodesTemp))
copy(succVnodes,succVnodesTemp)
// Logic to throw away all local vnodes from the successors list.
for i, vnode := range succVnodes {
if kv.ring.Transport().IsLocalVnode(vnode) {
succVnodes = append(succVnodes[:i], succVnodes[i+1:]...)
value, err := kv.ring.Transport().Get(vnode, key, v)
// fmt.Printf("GetSubLocal(key, vnode) => %s [Err: %s]\n", value, err)
// If operation failed, try another node
if err == nil {
return value, nil
}
}
}
// TODO: Performance optimization:
// Make parallel calls and take the fastest successful response.
for len(succVnodes) > 0 {
// Pick a random node in the list of possible replicas
randval := rand.Intn(len(succVnodes))
node := succVnodes[randval]
succVnodes = append(succVnodes[:randval], succVnodes[randval+1:]...)
// Perform read operation on the random node
value, err := kv.ring.Transport().Get(node, key, v)
// fmt.Printf("GetSub(key, vnode) => %s [Err: %s]\n", value, err)
// If operation failed, try another node
if err == nil {
return value, nil
}
}
return nil, fmt.Errorf("All read replicas failed")
}
// Inform the lock manager we're interested in setting the value for key.
// Expected return value:
// Next available version number to write value to
// Expected error conditions:
// Network failure => Retryable failure
// Key does not exist => Fail immediately
// Access permissions? => Fail immediately
//
// Once next version number has been successfully read, contact master
// KV Store to write value at new version.
// Expected error conditions:
// Key/version too old => TODO: Inform lock manager
// Transient error => TODO: Retry
//
// If write operation succeeded without errors, send a commit message to
// the lock manager to finalize the operation. Until the commit returns
// successfully, the new version of this value will not be advertised.
// Expected error conditions:
// Lock not found => TODO: Return
// Transient error => TODO: Retry
//
// If write operation failed, send an abort message to the lock manager to
// cancel the operation. This is simply to speed up the lock release operation
// instead of waiting for a timeout to happen.
// Expected error conditions:
// Lock not found => TODO: Return
// Transient error => TODO: Retry
func (kv *KVStoreClientImpl) Set(key string, value []byte) error {
var err error = fmt.Errorf("DUMMY")
var v uint
for err != nil {
v, err = kv.lm.WLock(key, 0, 10)
if err == nil {
break
}
if !isRetryable(err) {
return err
}
// TODO: Use some kind of backoff mechanism, like in
// https://github.com/cenkalti/backoff
time.Sleep(RETRY_WAIT)
}
return kv.SetVersion(key, v, value)
}
// Similar to KVStore.Set, but useful for transactional read-update-write
// operations along with KVStore.GetForSet.
//
// Use the version number from the write lease acquired in KVStore.GetForSet.
// Perform regular Set operation with commit/abort.
func (kv *KVStoreClientImpl) SetVersion(key string, version uint, value []byte) error {
var err error = fmt.Errorf("DUMMY")
for err != nil {
err = kv.setVersionWithoutRetry(key, version, value)
if err == nil {
return nil
}
if !isRetryable(err) {
return err
}
// TODO: Use some kind of backoff mechanism, like in
// https://github.com/cenkalti/backoff
time.Sleep(RETRY_WAIT)
}
// TODO: Unreachable code
return nil
}
func (kv *KVStoreClientImpl) setVersionWithoutRetry(key string, version uint, value []byte) error {
succVnodes, err := kv.ring.Lookup(kv.ring.GetNumSuccessors(), []byte(key))
if err != nil {
glog.Errorf("Error listing successors in Set(%q): %q", key, err)
return err
}
if len(succVnodes) == 0 {
glog.Errorf("No successors found during Lookup in Get(%q)", key)
return fmt.Errorf("No Successors found")
}
// This request should always go to the master node.
// Replication happens at the master.
err = kv.ring.Transport().Set(succVnodes[0], key, version, value)
if err != nil {
glog.Errorf("Aborting Set(%q, %d) due to error: %q", key, version, err)
// Best-effort Abort
kv.lm.AbortWLock(key, version)
// Even if the Abort command failed, we can safely return
// because the lock manager will timeout and abort for us.
return err
}
err = kv.lm.CommitWLock(key, version)
return err
}
// Similar to KVStore.Get, but useful for transactional read-update-write
// operations along with KVStore.SetVersion.
//
// First, get a write lease from the lock manager. This prevents any
// further write operations on the same key. Proceed to read the latest
// version of the key and get its data, which is returned.
func (kv KVStoreClientImpl) GetForSet(key string, retry bool) ([]byte, uint, error) {
var err error = fmt.Errorf("DUMMY")
var val []byte
var version uint
for err != nil {
version, err = kv.lm.WLock(key, 0, 60)
if err == nil {
break
}
if !retry || !isRetryable(err) {
return nil, version, err
}
// TODO: Use some kind of backoff mechanism, like in
// https://github.com/cenkalti/backoff
time.Sleep(RETRY_WAIT)
}
err = fmt.Errorf("DUMMY")
for err != nil {
val, err = kv.getWithoutRetry(key)
if err == nil {
return val, version, nil
}
if !retry || !isRetryable(err) {
return nil, version, err
}
// TODO: Use some kind of backoff mechanism, like in
// https://github.com/cenkalti/backoff
time.Sleep(RETRY_WAIT)
}
return nil, 0, fmt.Errorf("Code should note have reached here")
}