Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support iterator #152

Closed
wants to merge 12 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions bptree.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package lotusdb

import (
"bytes"
"context"
"fmt"
"path/filepath"
Expand Down Expand Up @@ -198,3 +199,70 @@ func (bt *BPTree) Sync() error {
}
return nil
}

// bptreeIterator implement IteratorI
type bptreeIterator struct {
k []byte
v []byte
tx *bbolt.Tx
cursor *bbolt.Cursor
options IteratorOptions
}

// NewBptreeIterator
func NewBptreeIterator(tx *bbolt.Tx, options IteratorOptions) (*bptreeIterator, error) {
b := tx.Bucket(indexBucketName)
c := b.Cursor()
return &bptreeIterator{
cursor: c,
options: options,
tx: tx,
}, nil
}

// Rewind seek the first key in the iterator.
func (bi *bptreeIterator) Rewind() {
if bi.options.Reverse {
bi.k, bi.v = bi.cursor.Last()
} else {
bi.k, bi.v = bi.cursor.First()
}
}

// Seek move the iterator to the key which is
// greater(less when reverse is true) than or equal to the specified key.
func (bi *bptreeIterator) Seek(key []byte) {
bi.k, bi.v = bi.cursor.Seek(key)
if !bytes.Equal(bi.k, key) && bi.options.Reverse {
bi.k, bi.v = bi.cursor.Prev()
}
}

// Next moves the iterator to the next key.
func (bi *bptreeIterator) Next() {
if bi.options.Reverse {
bi.k, bi.v = bi.cursor.Prev()
} else {
bi.k, bi.v = bi.cursor.Next()
}
}

// Key get the current key.
func (bi *bptreeIterator) Key() []byte {
return bi.k
}

// Value get the current value.
func (ci *bptreeIterator) Value() any {
return ci.v
}

// Valid returns whether the iterator is exhausted.
func (ci *bptreeIterator) Valid() bool {
return ci.k != nil
}

// Close the iterator.
func (ci *bptreeIterator) Close() error {
return ci.tx.Rollback()
}
119 changes: 119 additions & 0 deletions bptree_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package lotusdb

import (
"bytes"
"os"
"path/filepath"
"strconv"
Expand Down Expand Up @@ -303,3 +304,121 @@ func testbptreeSync(t *testing.T, partitionNum int) {
err = bt.Sync()
assert.Nil(t, err)
}

func Test_bptreeIterator(t *testing.T) {
options := indexOptions{
indexType: BTree,
dirPath: filepath.Join(os.TempDir(), "bptree-cursorIterator"+strconv.Itoa(1)),
partitionNum: 1,
keyHashFunction: xxhash.Sum64,
}

err := os.MkdirAll(options.dirPath, os.ModePerm)
assert.Nil(t, err)
defer func() {
_ = os.RemoveAll(options.dirPath)
}()
bt, err := openBTreeIndex(options)
assert.Nil(t, err)
m := map[string]*wal.ChunkPosition{
"key 0": {SegmentId: 0, BlockNumber: 0, ChunkOffset: 0, ChunkSize: 0},
"key 1": {SegmentId: 1, BlockNumber: 1, ChunkOffset: 1, ChunkSize: 1},
"key 2": {SegmentId: 2, BlockNumber: 2, ChunkOffset: 2, ChunkSize: 2},
}
var keyPositions []*KeyPosition
keyPositions = append(keyPositions, &KeyPosition{
key: []byte("key 0"),
partition: 0,
position: &wal.ChunkPosition{SegmentId: 0, BlockNumber: 0, ChunkOffset: 0, ChunkSize: 0},
}, &KeyPosition{
key: []byte("key 1"),
partition: 0,
position: &wal.ChunkPosition{SegmentId: 1, BlockNumber: 1, ChunkOffset: 1, ChunkSize: 1},
}, &KeyPosition{
key: []byte("key 2"),
partition: 0,
position: &wal.ChunkPosition{SegmentId: 2, BlockNumber: 2, ChunkOffset: 2, ChunkSize: 2},
},
)

err = bt.PutBatch(keyPositions)
assert.Nil(t, err)

tree := bt.trees[0]
tx, err := tree.Begin(true)
assert.Nil(t, err)
iteratorOptions := IteratorOptions{
Reverse: false,
}

itr, err := NewBptreeIterator(tx, iteratorOptions)
assert.Nil(t, err)
var prev []byte
itr.Rewind()
for itr.Valid() {
currKey := itr.Key()
assert.True(t, prev == nil || bytes.Compare(prev, currKey) == -1)
assert.Equal(t, m[string(itr.Key())].Encode(), itr.Value())
prev = currKey
itr.Next()
}
err = itr.Close()
assert.Nil(t, err)

tx, err = tree.Begin(true)
assert.Nil(t, err)
iteratorOptions = IteratorOptions{
Reverse: true,
}
prev = nil

itr, err = NewBptreeIterator(tx, iteratorOptions)
assert.Nil(t, err)
itr.Rewind()
for itr.Valid() {
currKey := itr.Key()
assert.True(t, prev == nil || bytes.Compare(prev, currKey) == 1)
assert.Equal(t, m[string(itr.Key())].Encode(), itr.Value())
prev = currKey
itr.Next()
}
itr.Seek([]byte("key 4"))
assert.Equal(t, []byte("key 2"), itr.Key())

itr.Seek([]byte("key 2"))
assert.Equal(t, []byte("key 2"), itr.Key())

itr.Seek([]byte("aye 2"))
assert.False(t, itr.Valid())
err = itr.Close()
assert.Nil(t, err)

tx, err = tree.Begin(true)
assert.Nil(t, err)
iteratorOptions = IteratorOptions{
Reverse: false,
}
prev = nil

itr, err = NewBptreeIterator(tx, iteratorOptions)
assert.Nil(t, err)
itr.Rewind()
for itr.Valid() {
currKey := itr.Key()
assert.True(t, prev == nil || bytes.Compare(prev, currKey) == -1)
assert.Equal(t, m[string(itr.Key())].Encode(), itr.Value())
prev = currKey
itr.Next()
}

itr.Seek([]byte("key 0"))
assert.Equal(t, []byte("key 0"), itr.Key())
itr.Seek([]byte("key 4"))
assert.False(t, itr.Valid())

itr.Seek([]byte("aye 2"))
assert.Equal(t, []byte("key 0"), itr.Key())
err = itr.Close()
assert.Nil(t, err)

}
90 changes: 90 additions & 0 deletions db.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package lotusdb

import (
"container/heap"
"context"
"fmt"
"io"
Expand All @@ -17,6 +18,7 @@ import (
"github.com/gofrs/flock"
"github.com/rosedblabs/diskhash"
"github.com/rosedblabs/wal"
"go.etcd.io/bbolt"
"golang.org/x/sync/errgroup"
)

Expand Down Expand Up @@ -571,3 +573,91 @@ func (db *DB) rewriteValidRecords(walFile *wal.WAL, validRecords []*ValueLogReco
}
return db.index.PutBatch(positions, matchKeys...)
}

func (db *DB) NewIterator(options IteratorOptions) (*MergeIterator, error) {
db.mu.Lock()
defer func() {
if r := recover(); r != nil {
db.mu.Unlock()
}
}()
itrs := make([]*SingleIter, 0, db.options.PartitionNum+len(db.immuMems)+1)
rank := 0
txs := make([]*bbolt.Tx, db.options.PartitionNum)
index := db.index.(*BPTree)
for i := 0; i < db.options.PartitionNum; i++ {
tx, err := index.trees[i].Begin(false)
if err != nil {
return nil, err
}
txs[i] = tx
itr, err := NewBptreeIterator(
tx,
options,
)
if err != nil {
return nil, err
}
itr.Rewind()
// is empty
if !itr.Valid() {
itr.Close()
continue
}
itrs = append(itrs, &SingleIter{
iType: BptreeItr,
options: options,
rank: rank,
idx: rank,
iter: itr,
})

rank++
}

for i := 0; i < len(db.immuMems); i++ {
itr, err := NewMemtableIterator(options, db.immuMems[i])
if err != nil {
return nil, err
}
itr.Rewind()
// is empty
if !itr.Valid() {
itr.Close()
continue
}
itrs = append(itrs, &SingleIter{
iType: MemItr,
options: options,
rank: rank,
idx: rank,
iter: itr,
})
rank++
}

itr, err := NewMemtableIterator(options, db.activeMem)
if err != nil {
return nil, err
}
itr.Rewind()
if itr.Valid() {
itrs = append(itrs, &SingleIter{
iType: MemItr,
options: options,
rank: rank,
idx: rank,
iter: itr,
})
} else {
itr.Close()
}
h := IterHeap(itrs)
heap.Init(&h)

return &MergeIterator{
h: h,
itrs: itrs,
db: db,
}, nil
}
Loading
Loading