-
Notifications
You must be signed in to change notification settings - Fork 266
/
unsaved_fast_iterator.go
228 lines (188 loc) · 6.44 KB
/
unsaved_fast_iterator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
package iavl
import (
"bytes"
"errors"
"sort"
"sync"
"cosmossdk.io/core/store"
"github.com/cosmos/iavl/fastnode"
ibytes "github.com/cosmos/iavl/internal/bytes"
)
var (
errUnsavedFastIteratorNilAdditionsGiven = errors.New("unsaved fast iterator must be created with unsaved additions but they were nil")
errUnsavedFastIteratorNilRemovalsGiven = errors.New("unsaved fast iterator must be created with unsaved removals but they were nil")
)
// UnsavedFastIterator is a dbm.Iterator for ImmutableTree
// it iterates over the latest state via fast nodes,
// taking advantage of keys being located in sequence in the underlying database.
type UnsavedFastIterator struct {
start, end []byte
valid bool
ascending bool
err error
ndb *nodeDB
nextKey []byte
nextVal []byte
fastIterator store.Iterator
nextUnsavedNodeIdx int
unsavedFastNodeAdditions *sync.Map // map[string]*FastNode
unsavedFastNodeRemovals *sync.Map // map[string]interface{}
unsavedFastNodesToSort []string
}
var _ store.Iterator = (*UnsavedFastIterator)(nil)
func NewUnsavedFastIterator(start, end []byte, ascending bool, ndb *nodeDB, unsavedFastNodeAdditions, unsavedFastNodeRemovals *sync.Map) *UnsavedFastIterator {
iter := &UnsavedFastIterator{
start: start,
end: end,
ascending: ascending,
ndb: ndb,
unsavedFastNodeAdditions: unsavedFastNodeAdditions,
unsavedFastNodeRemovals: unsavedFastNodeRemovals,
nextKey: nil,
nextVal: nil,
nextUnsavedNodeIdx: 0,
fastIterator: NewFastIterator(start, end, ascending, ndb),
}
if iter.ndb == nil {
iter.err = errFastIteratorNilNdbGiven
iter.valid = false
return iter
}
if iter.unsavedFastNodeAdditions == nil {
iter.err = errUnsavedFastIteratorNilAdditionsGiven
iter.valid = false
return iter
}
if iter.unsavedFastNodeRemovals == nil {
iter.err = errUnsavedFastIteratorNilRemovalsGiven
iter.valid = false
return iter
}
// We need to ensure that we iterate over saved and unsaved state in order.
// The strategy is to sort unsaved nodes, the fast node on disk are already sorted.
// Then, we keep a pointer to both the unsaved and saved nodes, and iterate over them in order efficiently.
unsavedFastNodeAdditions.Range(func(k, v interface{}) bool {
fastNode := v.(*fastnode.Node)
if start != nil && bytes.Compare(fastNode.GetKey(), start) < 0 {
return true
}
if end != nil && bytes.Compare(fastNode.GetKey(), end) >= 0 {
return true
}
// convert key to bytes. Type conversion failure should not happen in practice
iter.unsavedFastNodesToSort = append(iter.unsavedFastNodesToSort, k.(string))
return true
})
sort.Slice(iter.unsavedFastNodesToSort, func(i, j int) bool {
if ascending {
return iter.unsavedFastNodesToSort[i] < iter.unsavedFastNodesToSort[j]
}
return iter.unsavedFastNodesToSort[i] > iter.unsavedFastNodesToSort[j]
})
// Move to the first element
iter.Next()
return iter
}
// Domain implements store.Iterator.
// Maps the underlying nodedb iterator domain, to the 'logical' keys involved.
func (iter *UnsavedFastIterator) Domain() ([]byte, []byte) {
return iter.start, iter.end
}
// Valid implements store.Iterator.
func (iter *UnsavedFastIterator) Valid() bool {
if iter.start != nil && iter.end != nil {
if bytes.Compare(iter.end, iter.start) != 1 {
return false
}
}
return iter.fastIterator.Valid() || iter.nextUnsavedNodeIdx < len(iter.unsavedFastNodesToSort) || (iter.nextKey != nil && iter.nextVal != nil)
}
// Key implements store.Iterator
func (iter *UnsavedFastIterator) Key() []byte {
return iter.nextKey
}
// Value implements store.Iterator
func (iter *UnsavedFastIterator) Value() []byte {
return iter.nextVal
}
// Next implements store.Iterator
// Its effectively running the constant space overhead algorithm for streaming through sorted lists:
// the sorted lists being underlying fast nodes & unsavedFastNodeChanges
func (iter *UnsavedFastIterator) Next() {
if iter.ndb == nil {
iter.err = errFastIteratorNilNdbGiven
iter.valid = false
return
}
diskKey := iter.fastIterator.Key()
diskKeyStr := ibytes.UnsafeBytesToStr(diskKey)
if iter.fastIterator.Valid() && iter.nextUnsavedNodeIdx < len(iter.unsavedFastNodesToSort) {
value, ok := iter.unsavedFastNodeRemovals.Load(diskKeyStr)
if ok && value != nil {
// If next fast node from disk is to be removed, skip it.
iter.fastIterator.Next()
iter.Next()
return
}
nextUnsavedKey := iter.unsavedFastNodesToSort[iter.nextUnsavedNodeIdx]
nextUnsavedNodeVal, _ := iter.unsavedFastNodeAdditions.Load(nextUnsavedKey)
nextUnsavedNode := nextUnsavedNodeVal.(*fastnode.Node)
var isUnsavedNext bool
if iter.ascending {
isUnsavedNext = diskKeyStr >= nextUnsavedKey
} else {
isUnsavedNext = diskKeyStr <= nextUnsavedKey
}
if isUnsavedNext {
// Unsaved node is next
if diskKeyStr == nextUnsavedKey {
// Unsaved update prevails over saved copy so we skip the copy from disk
iter.fastIterator.Next()
}
iter.nextKey = nextUnsavedNode.GetKey()
iter.nextVal = nextUnsavedNode.GetValue()
iter.nextUnsavedNodeIdx++
return
}
// Disk node is next
iter.nextKey = iter.fastIterator.Key()
iter.nextVal = iter.fastIterator.Value()
iter.fastIterator.Next()
return
}
// if only nodes on disk are left, we return them
if iter.fastIterator.Valid() {
value, ok := iter.unsavedFastNodeRemovals.Load(diskKeyStr)
if ok && value != nil {
// If next fast node from disk is to be removed, skip it.
iter.fastIterator.Next()
iter.Next()
return
}
iter.nextKey = iter.fastIterator.Key()
iter.nextVal = iter.fastIterator.Value()
iter.fastIterator.Next()
return
}
// if only unsaved nodes are left, we can just iterate
if iter.nextUnsavedNodeIdx < len(iter.unsavedFastNodesToSort) {
nextUnsavedKey := iter.unsavedFastNodesToSort[iter.nextUnsavedNodeIdx]
nextUnsavedNodeVal, _ := iter.unsavedFastNodeAdditions.Load(nextUnsavedKey)
nextUnsavedNode := nextUnsavedNodeVal.(*fastnode.Node)
iter.nextKey = nextUnsavedNode.GetKey()
iter.nextVal = nextUnsavedNode.GetValue()
iter.nextUnsavedNodeIdx++
return
}
iter.nextKey = nil
iter.nextVal = nil
}
// Close implements store.Iterator
func (iter *UnsavedFastIterator) Close() error {
iter.valid = false
return iter.fastIterator.Close()
}
// Error implements store.Iterator
func (iter *UnsavedFastIterator) Error() error {
return iter.err
}