forked from libp2p/go-libp2p-raft
-
Notifications
You must be signed in to change notification settings - Fork 0
/
fsm.go
180 lines (156 loc) · 4.83 KB
/
fsm.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
package libp2praft
import (
"errors"
"io"
"sync"
consensus "github.com/libp2p/go-libp2p-consensus"
raft "github.com/hashicorp/raft"
)
// MaxSubscriberCh indicates how much buffering the subscriber channel
// has.
var MaxSubscriberCh = 128
// ErrNoState is returned when no state has been agreed upon by the consensus
// protocol
var ErrNoState = errors.New("no state has been agreed upon yet")
// FSM implements a minimal raft.FSM that holds a generic consensus.State
// and applies generic Ops to it. The state can be serialized/deserialized,
// snappshotted and restored.
// FSM is used by Consensus to keep track of the state of an OpLog.
// Please use the value returned by Consensus.FSM() to initialize Raft.
// Do not use this object directly.
type FSM struct {
state consensus.State
op consensus.Op
initialized bool
inconsistent bool
mux sync.Mutex
subscriberCh chan struct{}
chMux sync.Mutex
}
// Apply is invoked by Raft once a log entry is commited. Do not use directly.
func (fsm *FSM) Apply(rlog *raft.Log) interface{} {
fsm.mux.Lock()
defer fsm.mux.Unlock()
// What this does:
// - Check that we can deserialize an operation
// - If no -> check if we can deserialize a stateOp (rollbacks)
// - If it fails -> return nil and mark the state inconsistent
// - If it works -> replace the state and mark it as consistent
// - If yes -> ApplyTo() the state if it is consistent so far
// - If ApplyTo() fails, return nil and mark state as inconsistent
// - If ApplyTo() works, replace the state
// - Notify subscribers of the new state and return it
var newState consensus.State
if err := decodeOp(rlog.Data, fsm.op); err != nil {
// maybe it is a standard rollback
rollbackOp := consensus.Op(&stateOp{fsm.state})
err2 := decodeOp(rlog.Data, rollbackOp)
if err2 != nil { // print original error
logger.Error("error decoding op: ", err)
logger.Error("error decoding rollback: ", err2)
logger.Errorf("%+v", rlog.Data)
fsm.inconsistent = true
return nil
}
//fmt.Printf("%+v\n", rollbackOp)
castedRollback := rollbackOp.(*stateOp)
newState = castedRollback.State
fsm.inconsistent = false
} else {
//fmt.Printf("%+v\n", fsm.op)
newState, err = fsm.op.ApplyTo(fsm.state)
if err != nil {
logger.Error("error applying Op to State")
fsm.inconsistent = true
return nil
}
}
fsm.state = newState
fsm.initialized = true
fsm.updateSubscribers()
return fsm.state
}
// Snapshot encodes the current state so that we can save a snapshot.
func (fsm *FSM) Snapshot() (raft.FSMSnapshot, error) {
if !fsm.initialized {
logger.Error("tried to snapshot uninitialized state")
return nil, errors.New("cannot snapshot unitialized state")
}
if fsm.inconsistent {
logger.Error("tried to snapshot inconsistent state")
return nil, errors.New("cannot snapshot inconsistent state")
}
return &fsmSnapshot{fsm: fsm}, nil
}
// Restore takes a snapshot and sets the current state from it.
func (fsm *FSM) Restore(reader io.ReadCloser) error {
defer reader.Close()
fsm.mux.Lock()
defer fsm.mux.Unlock()
err := DecodeSnapshot(fsm.state, reader)
if err != nil {
logger.Errorf("error decoding snapshot: %s", err)
return err
}
fsm.initialized = true
fsm.inconsistent = false
return nil
}
// subscribe returns a channel on which every new state is sent.
func (fsm *FSM) subscribe() <-chan struct{} {
fsm.chMux.Lock()
defer fsm.chMux.Unlock()
if fsm.subscriberCh == nil {
fsm.subscriberCh = make(chan struct{}, MaxSubscriberCh)
}
return fsm.subscriberCh
}
// unsubscribe closes the channel returned upon Subscribe() (if any).
func (fsm *FSM) unsubscribe() {
fsm.chMux.Lock()
defer fsm.chMux.Unlock()
if fsm.subscriberCh != nil {
close(fsm.subscriberCh)
fsm.subscriberCh = nil
}
}
// getState returns the current state as agreed by the cluster
func (fsm *FSM) getState() (consensus.State, error) {
fsm.mux.Lock()
defer fsm.mux.Unlock()
if !fsm.initialized {
return nil, ErrNoState
}
if fsm.inconsistent {
return nil, errors.New("the state on this node is not consistent")
}
return fsm.state, nil
}
func (fsm *FSM) updateSubscribers() {
fsm.chMux.Lock()
defer fsm.chMux.Unlock()
if fsm.subscriberCh != nil {
select {
case fsm.subscriberCh <- struct{}{}:
default:
logger.Error("subscriber channel is full. Discarding update!")
}
}
}
// fsmSnapshot implements the hashicorp/raft interface and allows to serialize a
// state into a byte slice that can be used as a snapshot of the system.
type fsmSnapshot struct {
fsm *FSM
}
// Persist writes the snapshot (a serialized state) to a raft.SnapshotSink.
func (snap *fsmSnapshot) Persist(sink raft.SnapshotSink) error {
snap.fsm.mux.Lock()
defer snap.fsm.mux.Unlock()
err := EncodeSnapshot(snap.fsm.state, sink)
if err != nil {
sink.Cancel()
return err
}
return sink.Close()
}
func (snap *fsmSnapshot) Release() {}