Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stf/branch): simplify merged iterator #22131

Merged
merged 18 commits into from
Nov 7, 2024
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
311 changes: 123 additions & 188 deletions server/v2/stf/branch/mergeiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,229 +7,164 @@ import (
corestore "cosmossdk.io/core/store"
)

var (
errInvalidIterator = errors.New("invalid iterator")
)

// mergedIterator merges a parent Iterator and a cache Iterator.
// The cache iterator may return nil keys to signal that an item
// had been deleted (but not deleted in the parent).
// If the cache iterator has the same key as the parent, the
// cache shadows (overrides) the parent.
type mergedIterator struct {
parent corestore.Iterator
cache corestore.Iterator
ascending bool

valid bool
// The cache iterator may contain items that shadow or override items in the parent iterator.
// If the cache iterator has the same key as the parent, the cache's value takes precedence.
// Deleted items in the cache (indicated by nil values) are skipped.
type mergedIterator[Parent, Cache corestore.Iterator] struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you use generics here? mergedIterator is a package private type and not likely to be extended. It should be fine to stick with the corestore.Iterator type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in theory, by fact i am not sure, they should provide a performance benefit since it does not do dynamic dispatch on every call

parent Parent // Iterator for the parent store
cache Cache // Iterator for the cache store
ascending bool // Direction of iteration
valid bool // Indicates if the iterator is in a valid state
currKey []byte // Current key pointed by the iterator
currValue []byte // Current value corresponding to currKey
err error // Error encountered during iteration
}

var _ corestore.Iterator = (*mergedIterator)(nil)
// Ensure mergedIterator implements the corestore.Iterator interface.
var _ corestore.Iterator = (*mergedIterator[corestore.Iterator, corestore.Iterator])(nil)

// mergeIterators merges two iterators.
func mergeIterators(parent, cache corestore.Iterator, ascending bool) corestore.Iterator {
iter := &mergedIterator{
// mergeIterators creates a new merged iterator from parent and cache iterators.
// The 'ascending' parameter determines the direction of iteration.
func mergeIterators[Parent, Cache corestore.Iterator](parent Parent, cache Cache, ascending bool) *mergedIterator[Parent, Cache] {
iter := &mergedIterator[Parent, Cache]{
parent: parent,
cache: cache,
ascending: ascending,
}

iter.valid = iter.skipUntilExistsOrInvalid()
iter.advance() // Initialize the iterator by advancing to the first valid item
return iter
}

// Domain implements Iterator.
// Returns parent domain because cache and parent domains are the same.
func (iter *mergedIterator) Domain() (start, end []byte) {
return iter.parent.Domain()
}

// Valid implements Iterator.
func (iter *mergedIterator) Valid() bool {
return iter.valid
// Domain returns the start and end range of the iterator.
// It delegates to the parent iterator as both iterators share the same domain.
func (i *mergedIterator[Parent, Cache]) Domain() (start, end []byte) {
return i.parent.Domain()
}

// Next implements Iterator
func (iter *mergedIterator) Next() {
iter.assertValid()

switch {
case !iter.parent.Valid():
// If parent is invalid, get the next cache item.
iter.cache.Next()
case !iter.cache.Valid():
// If cache is invalid, get the next parent item.
iter.parent.Next()
default:
// Both are valid. Compare keys.
keyP, keyC := iter.parent.Key(), iter.cache.Key()
switch iter.compare(keyP, keyC) {
case -1: // parent < cache
iter.parent.Next()
case 0: // parent == cache
iter.parent.Next()
iter.cache.Next()
case 1: // parent > cache
iter.cache.Next()
}
}
iter.valid = iter.skipUntilExistsOrInvalid()
}

// Key implements Iterator
func (iter *mergedIterator) Key() []byte {
iter.assertValid()

// If parent is invalid, get the cache key.
if !iter.parent.Valid() {
return iter.cache.Key()
}

// If cache is invalid, get the parent key.
if !iter.cache.Valid() {
return iter.parent.Key()
}

// Both are valid. Compare keys.
keyP, keyC := iter.parent.Key(), iter.cache.Key()

cmp := iter.compare(keyP, keyC)
switch cmp {
case -1: // parent < cache
return keyP
case 0: // parent == cache
return keyP
case 1: // parent > cache
return keyC
default:
panic("invalid compare result")
}
}

// Value implements Iterator
func (iter *mergedIterator) Value() []byte {
iter.assertValid()

// If parent is invalid, get the cache value.
if !iter.parent.Valid() {
return iter.cache.Value()
}

// If cache is invalid, get the parent value.
if !iter.cache.Valid() {
return iter.parent.Value()
}

// Both are valid. Compare keys.
keyP, keyC := iter.parent.Key(), iter.cache.Key()

cmp := iter.compare(keyP, keyC)
switch cmp {
case -1: // parent < cache
return iter.parent.Value()
case 0: // parent == cache
return iter.cache.Value()
case 1: // parent > cache
return iter.cache.Value()
default:
panic("invalid comparison result")
}
// Valid checks if the iterator is in a valid state.
// It returns true if the iterator has not reached the end.
func (i *mergedIterator[Parent, Cache]) Valid() bool {
return i.valid
}

// Close implements Iterator
func (iter *mergedIterator) Close() error {
err1 := iter.cache.Close()
if err := iter.parent.Close(); err != nil {
return err
// Next advances the iterator to the next valid item.
// It skips over deleted items (with nil values) and updates the current key and value.
func (i *mergedIterator[Parent, Cache]) Next() {
if !i.valid {
i.err = errInvalidIterator
return
}

return err1
i.advance()
}

var errInvalidIterator = errors.New("invalid merged iterator")

// Error returns an error if the mergedIterator is invalid defined by the
// Valid method.
func (iter *mergedIterator) Error() error {
if !iter.Valid() {
return errInvalidIterator
// Key returns the current key pointed by the iterator.
// If the iterator is invalid, it returns nil.
func (i *mergedIterator[Parent, Cache]) Key() []byte {
if !i.valid {
return nil
testinginprod marked this conversation as resolved.
Show resolved Hide resolved
}

return nil
return i.currKey
}

// If not valid, panics.
// NOTE: May have side-effect of iterating over cache.
func (iter *mergedIterator) assertValid() {
if err := iter.Error(); err != nil {
panic(err)
// Value returns the current value corresponding to the current key.
// If the iterator is invalid, it returns nil.
func (i *mergedIterator[Parent, Cache]) Value() []byte {
if !i.valid {
return nil
testinginprod marked this conversation as resolved.
Show resolved Hide resolved
}
return i.currValue
}

// Like bytes.Compare but opposite if not ascending.
func (iter *mergedIterator) compare(a, b []byte) int {
if iter.ascending {
return bytes.Compare(a, b)
// Close closes both the parent and cache iterators.
// It returns any error encountered during the closing of the iterators.
func (i *mergedIterator[Parent, Cache]) Close() error {
err1 := i.parent.Close()
err2 := i.cache.Close()
testinginprod marked this conversation as resolved.
Show resolved Hide resolved
if err1 != nil {
testinginprod marked this conversation as resolved.
Show resolved Hide resolved
return err1
}

return bytes.Compare(a, b) * -1
return err2
}

// Skip all delete-items from the cache w/ `key < until`. After this function,
// current cache item is a non-delete-item, or `until <= key`.
// If the current cache item is not a delete item, does nothing.
// If `until` is nil, there is no limit, and cache may end up invalid.
// CONTRACT: cache is valid.
func (iter *mergedIterator) skipCacheDeletes(until []byte) {
for iter.cache.Valid() &&
iter.cache.Value() == nil &&
(until == nil || iter.compare(iter.cache.Key(), until) < 0) {
iter.cache.Next()
}
// Error returns any error that occurred during iteration.
// If the iterator is valid, it returns nil.
func (i *mergedIterator[Parent, Cache]) Error() error {
return i.err
}

// Fast forwards cache (or parent+cache in case of deleted items) until current
// item exists, or until iterator becomes invalid.
// Returns whether the iterator is valid.
func (iter *mergedIterator) skipUntilExistsOrInvalid() bool {
// advance moves the iterator to the next valid (non-deleted) item.
// It handles merging logic between the parent and cache iterators.
func (i *mergedIterator[Parent, Cache]) advance() {
for {
// If parent is invalid, fast-forward cache.
if !iter.parent.Valid() {
iter.skipCacheDeletes(nil)
return iter.cache.Valid()
// Check if both iterators have reached the end
if !i.parent.Valid() && !i.cache.Valid() {
i.valid = false
return
}
// Parent is valid.

if !iter.cache.Valid() {
return true
var key, value []byte

// If parent iterator is exhausted, use the cache iterator
if !i.parent.Valid() {
key = i.cache.Key()
value = i.cache.Value()
i.cache.Next()
} else if !i.cache.Valid() {
// If cache iterator is exhausted, use the parent iterator
key = i.parent.Key()
value = i.parent.Value()
i.parent.Next()
} else {
// Both iterators are valid; compare keys
keyP, keyC := i.parent.Key(), i.cache.Key()
switch cmp := i.compare(keyP, keyC); {
case cmp < 0:
// Parent key is less than cache key
key = keyP
value = i.parent.Value()
i.parent.Next()
case cmp == 0:
// Keys are equal; cache overrides parent
key = keyC
value = i.cache.Value()
i.parent.Next()
i.cache.Next()
case cmp > 0:
// Cache key is less than parent key
key = keyC
value = i.cache.Value()
i.cache.Next()
}
}
// Parent is valid, cache is valid.

// Compare parent and cache.
keyP := iter.parent.Key()
keyC := iter.cache.Key()

switch iter.compare(keyP, keyC) {
case -1: // parent < cache.
return true
// Skip deleted items (value is nil)
if value == nil {
continue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

personal preference: a recursion could be used instead of the for loop. But this may be easier to read

}

case 0: // parent == cache.
// Skip over if cache item is a delete.
valueC := iter.cache.Value()
if valueC == nil {
iter.parent.Next()
iter.cache.Next()
// Update the current key and value, and mark iterator as valid
i.currKey = key
i.currValue = value
i.valid = true
return
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Reset i.err in the advance method to avoid stale errors

In the advance() method, when advancing the iterator, i.err is not reset. This could result in the Error() method returning an outdated error even after the iterator has recovered from a previous invalid state. To ensure that i.err accurately reflects the current state of the iterator, consider resetting it at the beginning of the advance() method.

Apply this diff to reset i.err appropriately:

 func (i *mergedIterator[Parent, Cache]) advance() {
+    i.err = nil
     for {
         if !i.parent.Valid() && !i.cache.Valid() {
             i.valid = false
             return
         }
         // Rest of the code...
     }
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// advance moves the iterator to the next valid (non-deleted) item.
// It handles merging logic between the parent and cache iterators.
func (i *mergedIterator[Parent, Cache]) advance() {
for {
// If parent is invalid, fast-forward cache.
if !iter.parent.Valid() {
iter.skipCacheDeletes(nil)
return iter.cache.Valid()
// Check if both iterators have reached the end
if !i.parent.Valid() && !i.cache.Valid() {
i.valid = false
return
}
// Parent is valid.
if !iter.cache.Valid() {
return true
var key, value []byte
// If parent iterator is exhausted, use the cache iterator
if !i.parent.Valid() {
key = i.cache.Key()
value = i.cache.Value()
i.cache.Next()
} else if !i.cache.Valid() {
// If cache iterator is exhausted, use the parent iterator
key = i.parent.Key()
value = i.parent.Value()
i.parent.Next()
} else {
// Both iterators are valid; compare keys
keyP, keyC := i.parent.Key(), i.cache.Key()
switch cmp := i.compare(keyP, keyC); {
case cmp < 0:
// Parent key is less than cache key
key = keyP
value = i.parent.Value()
i.parent.Next()
case cmp == 0:
// Keys are equal; cache overrides parent
key = keyC
value = i.cache.Value()
i.parent.Next()
i.cache.Next()
case cmp > 0:
// Cache key is less than parent key
key = keyC
value = i.cache.Value()
i.cache.Next()
}
}
// Parent is valid, cache is valid.
// Compare parent and cache.
keyP := iter.parent.Key()
keyC := iter.cache.Key()
switch iter.compare(keyP, keyC) {
case -1: // parent < cache.
return true
// Skip deleted items (value is nil)
if value == nil {
continue
}
case 0: // parent == cache.
// Skip over if cache item is a delete.
valueC := iter.cache.Value()
if valueC == nil {
iter.parent.Next()
iter.cache.Next()
// Update the current key and value, and mark iterator as valid
i.currKey = key
i.currValue = value
i.valid = true
return
}
}
// advance moves the iterator to the next valid (non-deleted) item.
// It handles merging logic between the parent and cache iterators.
func (i *mergedIterator[Parent, Cache]) advance() {
i.err = nil
for {
// Check if both iterators have reached the end
if !i.parent.Valid() && !i.cache.Valid() {
i.valid = false
return
}
var key, value []byte
// If parent iterator is exhausted, use the cache iterator
if !i.parent.Valid() {
key = i.cache.Key()
value = i.cache.Value()
i.cache.Next()
} else if !i.cache.Valid() {
// If cache iterator is exhausted, use the parent iterator
key = i.parent.Key()
value = i.parent.Value()
i.parent.Next()
} else {
// Both iterators are valid; compare keys
keyP, keyC := i.parent.Key(), i.cache.Key()
switch cmp := i.compare(keyP, keyC); {
case cmp < 0:
// Parent key is less than cache key
key = keyP
value = i.parent.Value()
i.parent.Next()
case cmp == 0:
// Keys are equal; cache overrides parent
key = keyC
value = i.cache.Value()
i.parent.Next()
i.cache.Next()
case cmp > 0:
// Cache key is less than parent key
key = keyC
value = i.cache.Value()
i.cache.Next()
}
}
// Skip deleted items (value is nil)
if value == nil {
continue
}
// Update the current key and value, and mark iterator as valid
i.currKey = key
i.currValue = value
i.valid = true
return
}
}


continue
}
// Cache is not a delete.

return true // cache exists.
case 1: // cache < parent
// Skip over if cache item is a delete.
valueC := iter.cache.Value()
if valueC == nil {
iter.skipCacheDeletes(keyP)
continue
}
// Cache is not a delete.
return true // cache exists.
}
// compare compares two byte slices a and b.
// It returns an integer comparing a and b:
// - Negative if a < b
// - Zero if a == b
// - Positive if a > b
//
// The comparison respects the iterator's direction (ascending or descending).
func (i *mergedIterator[Parent, Cache]) compare(a, b []byte) int {
if i.ascending {
return bytes.Compare(a, b)
}
return bytes.Compare(b, a)
}
Loading