-
Notifications
You must be signed in to change notification settings - Fork 1
/
future.go
101 lines (85 loc) · 2.58 KB
/
future.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package raft
import (
"time"
"github.com/relab/raft/commonpb"
)
// Result contains the index of the committed entry and the accompanied
// response.
type Result struct {
Index uint64
Value interface{}
}
// Future allows a result to be read after the operation who created it has
// completed.
type Future interface {
ResultCh() <-chan Result
}
// PromiseEntry is a promise to either write some entry to the log, or read some
// result from the state machine. Invoking Write turns it into a promise to
// write to the log, invoking read turns it into the other. Respond can be used
// to respond early, if we cannot proceed with the request. Respond is
// non-blocking but must only be invoked once.
type PromiseEntry interface {
Write(uint64) PromiseLogEntry
Read() PromiseLogEntry
Respond(interface{})
}
// PromiseLogEntry is a promise to execute some request, usually committing an
// entry to the log. Respond is used to inform a listener (Future) about the
// result of the promise. Respond is non-blocking but must only be invoked once.
type PromiseLogEntry interface {
Entry() *commonpb.Entry
Duration() time.Duration
Respond(interface{})
}
// NewPromiseNoFuture returns a struct implementing the PromiseLogEntry but does
// nothing when Respond is called.
func NewPromiseNoFuture(entry *commonpb.Entry) PromiseLogEntry {
return &promiseEntry{
entry: entry,
created: time.Now(),
}
}
// NewPromiseEntry returns a PromiseEntry and a Future which can be used to get
// the response from the promise at a later time.
func NewPromiseEntry(entry *commonpb.Entry) (PromiseEntry, Future) {
promise := &promiseEntry{
entry: entry,
created: time.Now(),
res: make(chan Result, 1),
}
return promise, promise
}
type promiseEntry struct {
entry *commonpb.Entry
created time.Time
res chan Result
}
// Write implements the PromiseEntry interface.
func (f *promiseEntry) Write(index uint64) PromiseLogEntry {
f.entry.Index = index
return f
}
// Read implements the PromiseEntry interface.
func (f *promiseEntry) Read() PromiseLogEntry {
return f
}
// Entry implements PromiseLogEntry interface.
func (f *promiseEntry) Entry() *commonpb.Entry {
return f.entry
}
// Duration implements PromiseLogEntry interface.
func (f *promiseEntry) Duration() time.Duration {
return time.Since(f.created)
}
// Respond implements PromiseEntry and PromiseLogEntry interface.
func (f *promiseEntry) Respond(value interface{}) {
if f.res == nil {
return
}
f.res <- Result{f.entry.Index, value}
}
// ResultCh implements Future interface.
func (f *promiseEntry) ResultCh() <-chan Result {
return f.res
}