From 8abcc599b17591711f4fbe7896b57be68dc31a91 Mon Sep 17 00:00:00 2001 From: millken Date: Sat, 1 Jul 2023 21:36:44 +0800 Subject: [PATCH] use radixtree instead of art --- .gitignore | 1 + db.go | 26 +- db_test.go | 66 ++--- go.mod | 2 - go.sum | 15 -- index.go | 35 +++ internal/radixtree/tree.go | 455 ++++++++++++++++++++++++++++++++ internal/radixtree/tree_test.go | 23 ++ option.go | 20 -- 9 files changed, 553 insertions(+), 90 deletions(-) create mode 100644 internal/radixtree/tree.go create mode 100644 internal/radixtree/tree_test.go diff --git a/.gitignore b/.gitignore index cd05d0e..31d55ae 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ *.dll *.so *.dylib +*.pprof .vscode _bench* # Test binary, built with `go test -c` diff --git a/db.go b/db.go index 65c4ca9..7c66a8a 100644 --- a/db.go +++ b/db.go @@ -6,9 +6,10 @@ import ( "math" "os" "path/filepath" + "sort" "sync" - art "github.com/WenyXu/sync-adaptive-radix-tree" + "github.com/millken/archivedb/internal/radixtree" "github.com/pkg/errors" ) @@ -35,19 +36,19 @@ var ( type DB struct { path string opts *option - index art.Tree[*index] + index *radixtree.Tree[*index] segments []*segment mu sync.RWMutex } func Open(path string, options ...Option) (db *DB, err error) { opts := &option{ - fsync: false, - hashFunc: DefaultHashFunc, + fsync: false, } db = &DB{ - path: path, - opts: opts, + path: path, + opts: opts, + index: radixtree.New[*index](), } // Create path if it doesn't exist. if err := os.MkdirAll(filepath.Join(path), 0777); err != nil { @@ -65,9 +66,9 @@ func Open(path string, options ...Option) (db *DB, err error) { return err } - // if err := db.index.Recover(db.segments); err != nil { - // return err - // } + if err := loadIndexes(db.index, db.segments); err != nil { + return err + } return nil }(); err != nil { @@ -94,6 +95,9 @@ func (db *DB) openSegments() error { return err } db.segments = append(db.segments, segment) + sort.Slice(db.segments, func(i, j int) bool { + return db.segments[i].id < db.segments[j].id + }) } // Create initial segment if none exist. if len(db.segments) == 0 { @@ -163,7 +167,7 @@ func (db *DB) set(key, value []byte, flag flag) error { return err } offset := segment.Size() - entry.Size() - db.index.Insert(key, &index{ + db.index.Put(key, &index{ seg: segment.id, off: offset, }) @@ -182,7 +186,7 @@ func (db *DB) Get(key []byte) ([]byte, error) { if err := validateKey(key); err != nil { return nil, err } - idx, found := db.index.Search(key) + idx, found := db.index.Get(key) if !found { return nil, ErrKeyNotFound } diff --git a/db_test.go b/db_test.go index 554e0d6..9f8e8b7 100644 --- a/db_test.go +++ b/db_test.go @@ -3,8 +3,6 @@ package archivedb import ( "bytes" "fmt" - "hash/adler32" - "hash/crc32" "io/ioutil" "math/rand" "os" @@ -24,49 +22,33 @@ type benchmarkTestCase struct { size int } -func BenchmarkHashFunc(b *testing.B) { - crc32Hash := func(data []byte) uint64 { - return uint64(crc32.ChecksumIEEE(data)) - } - adler32Hash := func(data []byte) uint64 { - return uint64(adler32.Checksum(data)) - } - testSize := []struct { - name string - size int - }{ - {"128B", 128}, - {"256B", 256}, - {"1K", 1024}, - {"2K", 2048}, - {"4K", 4096}, - {"8K", 8192}, - {"16K", 16384}, - {"32K", 32768}, - {"64K", 65536}, - {"128K", 131072}, - } - tests := []struct { - hash HashFunc - name string - }{ - {DefaultHashFunc, "DefaultHashFunc"}, - {adler32Hash, "adler32"}, - {crc32Hash, "crc32"}, +func TestDB_Reopen(t *testing.T) { + require := require.New(t) + dir, cleanup := MustTempDir() + defer cleanup() + db, err := Open(dir) + require.NoError(err) + require.NotNil(db) + + numKeys := 1000000 + for i := 0; i < numKeys; i++ { + key := []byte(fmt.Sprintf("%016d", i)) + value := []byte(fmt.Sprintf("%d", i)) + err = db.Put(key, value) + require.NoError(err) } - for _, size := range testSize { - b.Run(size.name, func(b *testing.B) { - val := bytes.Repeat([]byte{' '}, size.size) - for _, test := range tests { - b.Run(test.name, func(b *testing.B) { - for i := 0; i < b.N; i++ { - test.hash(val) - } - }) - } - }) + require.NoError(db.Close()) + db, err = Open(dir) + require.NoError(err) + require.NotNil(db) + for i := 0; i < numKeys; i++ { + key := []byte(fmt.Sprintf("%016d", i)) + value, err := db.Get(key) + require.NoError(err, "key: %s", key) + require.Equal([]byte(fmt.Sprintf("%d", i)), value) } } + func TestDB(t *testing.T) { require := require.New(t) dir, cleanup := MustTempDir() diff --git a/go.mod b/go.mod index f42fd91..21d69a1 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,6 @@ module github.com/millken/archivedb go 1.20 require ( - github.com/WenyXu/sync-adaptive-radix-tree v0.0.0-20221020123713-1ae3c4a8dd92 - github.com/cespare/xxhash/v2 v2.1.2 github.com/pkg/errors v0.9.1 github.com/stretchr/testify v1.8.0 golang.org/x/sys v0.0.0-20221010170243-090e33056c14 diff --git a/go.sum b/go.sum index 976d4ed..afb865b 100644 --- a/go.sum +++ b/go.sum @@ -1,19 +1,6 @@ -github.com/WenyXu/sync-adaptive-radix-tree v0.0.0-20221020123713-1ae3c4a8dd92 h1:G28oV60J2+xbzC5M3NSaPPHZMjJ1K/9v4wzCzl3adbc= -github.com/WenyXu/sync-adaptive-radix-tree v0.0.0-20221020123713-1ae3c4a8dd92/go.mod h1:DssQPBoTAUboJ5fFinarazaksPe8p/u4wG19wnzcxrk= -github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= -github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= -github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/dgraph-io/badger/v3 v3.2103.3 h1:s63J1pisDhKpzWslXFe+ChuthuZptpwTE6qEKoczPb4= -github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWajOK8= -github.com/dshulyak/art v0.0.0-20200731100216-8869b840fedc h1:LW4aUpRQdTqQbell1egWpq4YjlHq+cdV/hZjRyihip4= -github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= -github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= -github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= -github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA= -github.com/klauspost/compress v1.12.3 h1:G5AfA94pHPysR56qqrkO2pxEexdDzrpFJ6yt/VqWxVU= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -23,8 +10,6 @@ github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSS github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/tidwall/btree v1.4.4 h1:tOsRz2Upq6BEJz8T++C6THzNh9xGWymBOOSfA7ffNXY= -golang.org/x/net v0.0.0-20201021035429-f5854403a974 h1:IX6qOQeG5uLjB/hjjwjedwfjND0hgjPMMyO1RoIXQNI= golang.org/x/sys v0.0.0-20221010170243-090e33056c14 h1:k5II8e6QD8mITdi+okbbmR/cIyEbeXLBhy5Ha4nevyc= golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= diff --git a/index.go b/index.go index 4cf7d89..f7d669a 100644 --- a/index.go +++ b/index.go @@ -1,6 +1,41 @@ package archivedb +import "github.com/millken/archivedb/internal/radixtree" + type index struct { seg uint16 off uint32 } + +func loadIndexes(idx *radixtree.Tree[*index], segments []*segment) error { + for _, segment := range segments { + for size := uint32(SegmentHeaderSize); size < segment.Size(); { + buf, err := segment.mmap.ReadOff(int(size), hdrSize) + if err != nil { + return err + } + h := hdr(buf) + if !h.getFlag().isEntryValid() { + break + } + off := int(size) + hdrSize + key, err := segment.mmap.ReadOff(off, int(h.getKeySize())) + if err != nil { + return err + } + + switch h.getFlag() { + case flagEntryPut: + idx.Put(key, &index{ + seg: segment.id, + off: uint32(size), + }) + case flagEntryDelete: + idx.Delete(key) + } + size += h.entrySize() + + } + } + return nil +} diff --git a/internal/radixtree/tree.go b/internal/radixtree/tree.go new file mode 100644 index 0000000..cf29212 --- /dev/null +++ b/internal/radixtree/tree.go @@ -0,0 +1,455 @@ +package radixtree + +import ( + "bytes" +) + +// Tree is a radix tree of bytes keys and any values. +type Tree[T any] struct { + root radixNode[T] + size int +} + +// New creates a new bytes-based radix tree +func New[T any]() *Tree[T] { + return new(Tree[T]) +} + +type radixNode[T any] struct { + // prefix is the edge label between this node and the parent, minus the key + // segment used in the parent to index this child. + prefix []byte + edges []edge[T] + leaf *leaf[T] +} + +// WalkFunc is the type of the function called for each value visited by Walk +// or WalkPath. The key argument contains the elements of the key at which the +// value is stored. +// +// If the function returns true Walk stops immediately and returns. This +// applies to WalkPath as well. +type WalkFunc func(key []byte, value any) bool + +// InspectFunc is the type of the function called for each node visited by +// Inspect. The key argument contains the key at which the node is located, the +// depth is the distance from the root of the tree, and children is the number +// of children the node has. +// +// If the function returns true Inspect stops immediately and returns. +type InspectFunc[T any] func(link, prefix, key []byte, depth, children int, hasValue bool, value T) bool + +type leaf[T any] struct { + key []byte + value T +} + +type edge[T any] struct { + radix byte + node *radixNode[T] +} + +// Len returns the number of values stored in the tree. +func (t *Tree[T]) Len() int { + return t.size +} + +// Get returns the value stored at the given key. Returns false if there is no +// value present for the key. +func (t *Tree[T]) Get(key []byte) (value T, ok bool) { + node := &t.root + // Consume key data while mathcing edge and prefix; return if remaining key + // data matches nothing. + for len(key) != 0 { + // Find edge for radix. + node = node.getEdge(key[0]) + if node == nil { + return value, false + } + + // Consume key data. + key = key[1:] + if !bytes.HasPrefix(key, node.prefix) { + return value, false + } + key = key[len(node.prefix):] + } + if node.leaf != nil { + return node.leaf.value, true + } + return value, false +} + +// Put inserts the value into the tree at the given key, replacing any existing +// items. It returns true if it adds a new value, false if it replaces an +// existing value. +func (t *Tree[T]) Put(key []byte, value T) bool { + var ( + p int + isNewValue bool + newEdge edge[T] + hasNewEdge bool + ) + node := &t.root + + for i := 0; i < len(key); i++ { + radix := key[i] + if p < len(node.prefix) { + if radix == node.prefix[p] { + p++ + continue + } + } else if child := node.getEdge(radix); child != nil { + node = child + p = 0 + continue + } + // Descended as far as prefixes and edges match key, and still have key + // data, so add child that has a prefix of the unmatched key data and + // set its value to the new value. + newChild := &radixNode[T]{ + leaf: &leaf[T]{ + key: key, + value: value, + }, + } + if i < len(key)-1 { + newChild.prefix = key[i+1:] + } + newEdge = edge[T]{radix, newChild} + hasNewEdge = true + break + } + // Key has been consumed by traversing prefixes and/or edges, or has been + // put into new child. + + // If key partially matches node's prefix, then need to split node. + if p < len(node.prefix) { + node.split(p) + isNewValue = true + } + + if hasNewEdge { + node.addEdge(newEdge) + isNewValue = true + t.size++ + } else { + // Store key at existing child + if node.leaf == nil { + isNewValue = true + t.size++ + } + node.leaf = &leaf[T]{ + key: key, + value: value, + } + } + + return isNewValue +} + +// Delete removes the value associated with the given key. Returns true if +// there was a value stored for the key. If the node or any of its ancestors +// becomes childless as a result, they are removed from the tree. +func (t *Tree[T]) Delete(key []byte) bool { + node := &t.root + var ( + parents []*radixNode[T] + links []byte + ) + for len(key) != 0 { + parents = append(parents, node) + + // Find edge for radix. + node = node.getEdge(key[0]) + if node == nil { + // node does not exist. + return false + } + links = append(links, key[0]) + + // Consume key data. + key = key[1:] + if !bytes.HasPrefix(key, node.prefix) { + return false + } + key = key[len(node.prefix):] + } + + if node.leaf == nil { + return false + } + + // delete the node value, indicate that value was deleted. + node.leaf = nil + t.size-- + + // If node is leaf, remove from parent. If parent becomes leaf, repeat. + node = node.prune(parents, links) + + // If node has become compressible, compress it. + if node != &t.root { + node.compress() + } + + return true +} + +// DeletePrefix removes all values whose key is prefixed by the given prefix. +// Returns true if any values were removed. +func (t *Tree[T]) DeletePrefix(prefix []byte) bool { + node := &t.root + var ( + parents []*radixNode[T] + links []byte + ) + for len(prefix) != 0 { + parents = append(parents, node) + + // Find edge for radix. + node = node.getEdge(prefix[0]) + if node == nil { + // Node does not exist. + return false + } + links = append(links, prefix[0]) + + // Consume prefix. + prefix = prefix[1:] + if !bytes.HasPrefix(prefix, node.prefix) { + if bytes.HasPrefix(node.prefix, prefix) { + // Prefix consumed, so it prefixes every key from node down. + break + } + return false + } + prefix = prefix[len(node.prefix):] + } + + if node.edges != nil { + var count int + node.walk(func(k []byte, _ any) bool { + count++ + return false + }) + t.size -= count + node.edges = nil + } else { + t.size-- + } + node.leaf = nil + + // If node is leaf, remove from parent. If parent becomes leaf, repeat. + node = node.prune(parents, links) + + // If node has become compressible, compress it. + if node != &t.root { + node.compress() + } + + return true +} + +// Walk visits all nodes whose keys match or are prefixed by the specified key, +// calling walkFn for each value found. If walkFn returns true, Walk returns. +// Use empty key "" to visit all nodes starting from the root or the Tree. +// +// The tree is traversed in lexical order, making the output deterministic. +func (t *Tree[T]) Walk(key []byte, walkFn WalkFunc) { + node := &t.root + for len(key) != 0 { + if node = node.getEdge(key[0]); node == nil { + return + } + + // Consume key data + key = key[1:] + if !bytes.HasPrefix(key, node.prefix) { + if bytes.HasPrefix(node.prefix, key) { + break + } + return + } + key = key[len(node.prefix):] + } + + // Walk down tree starting at node located at key. + node.walk(walkFn) +} + +// WalkPath walks each node along the path from the root to the node at the +// given key, calling walkFn for each node that has a value. If walkFn returns +// true, WalkPath returns. +// +// The tree is traversed in lexical order, making the output deterministic. +func (t *Tree[T]) WalkPath(key []byte, walkFn WalkFunc) { + node := &t.root + for { + if node.leaf != nil && walkFn(node.leaf.key, node.leaf.value) { + return + } + + if len(key) == 0 { + return + } + + if node = node.getEdge(key[0]); node == nil { + return + } + + key = key[1:] + if !bytes.HasPrefix(key, node.prefix) { + return + } + key = key[len(node.prefix):] + } +} + +// Inspect walks every node of the tree, whether or not it holds a value, +// calling inspectFn with information about each node. This allows the +// structure of the tree to be examined and detailed statistics to be +// collected. +// +// If inspectFn returns false, the traversal is stopped and Inspect returns. +// +// The tree is traversed in lexical order, making the output deterministic. +func (t *Tree[T]) Inspect(inspectFn InspectFunc[T]) { + t.root.inspect([]byte{}, []byte{}, 0, inspectFn) +} + +// split splits a node such that a node: +// +// ("prefix", leaf, edges[]) +// +// is split into parent branching node, and a child leaf node: +// +// ("pre", nil, edges[f])--->("ix", leaf, edges[]) +func (node *radixNode[T]) split(p int) { + split := &radixNode[T]{ + edges: node.edges, + leaf: node.leaf, + } + if p < len(node.prefix)-1 { + split.prefix = node.prefix[p+1:] + } + node.edges = nil + node.addEdge(edge[T]{node.prefix[p], split}) + if p == 0 { + node.prefix = []byte{} + } else { + node.prefix = node.prefix[:p] + } + node.leaf = nil +} + +func (node *radixNode[T]) prune(parents []*radixNode[T], links []byte) *radixNode[T] { + if node.edges != nil { + return node + } + // iterate parents towards root of tree, removing the empty leaf. + for i := len(links) - 1; i >= 0; i-- { + node = parents[i] + node.delEdge(links[i]) + if len(node.edges) != 0 { + // parent has other edges, stop. + break + } + node.edges = nil + if node.leaf != nil { + // parent has a value, stop. + break + } + } + return node +} + +func (node *radixNode[T]) compress() { + if len(node.edges) != 1 || node.leaf != nil { + return + } + edge := node.edges[0] + var b bytes.Buffer + b.Grow(len(node.prefix) + 1 + len(edge.node.prefix)) + b.Write(node.prefix) + b.WriteByte(edge.radix) + b.Write(edge.node.prefix) + node.prefix = b.Bytes() + node.leaf = edge.node.leaf + node.edges = edge.node.edges +} + +func (node *radixNode[T]) walk(walkFn WalkFunc) bool { + if node.leaf != nil && walkFn(node.leaf.key, node.leaf.value) { + return true + } + for _, edge := range node.edges { + if edge.node.walk(walkFn) { + return true + } + } + return false +} + +func (node *radixNode[T]) inspect(link, key []byte, depth int, inspectFn InspectFunc[T]) bool { + key = append(key, link...) + key = append(key, node.prefix...) + var val T + var hasVal bool + if node.leaf != nil { + val = node.leaf.value + hasVal = true + } + if inspectFn(link, node.prefix, key, depth, len(node.edges), hasVal, val) { + return true + } + for _, edge := range node.edges { + if edge.node.inspect([]byte{edge.radix}, key, depth+1, inspectFn) { + return true + } + } + return false +} + +// indexEdge binary searches for the edge index. +// +// This is faster then going through sort.Interface for repeated searches. +func (node *radixNode[T]) indexEdge(radix byte) int { + n := len(node.edges) + i, j := 0, n + for i < j { + h := int(uint(i+j) >> 1) // avoid overflow when computing h + if node.edges[h].radix < radix { + i = h + 1 + } else { + j = h + } + } + return i +} + +// getEdge binary searches for edge. +func (node *radixNode[T]) getEdge(radix byte) *radixNode[T] { + idx := node.indexEdge(radix) + if idx < len(node.edges) && node.edges[idx].radix == radix { + return node.edges[idx].node + } + return nil +} + +// addEdge binary searches to find where to insert edge, and inserts at. +func (node *radixNode[T]) addEdge(e edge[T]) { + idx := node.indexEdge(e.radix) + node.edges = append(node.edges, edge[T]{}) + copy(node.edges[idx+1:], node.edges[idx:]) + node.edges[idx] = e +} + +// delEdge binary searches for edge and removes it. +func (node *radixNode[T]) delEdge(radix byte) { + idx := node.indexEdge(radix) + if idx < len(node.edges) && node.edges[idx].radix == radix { + copy(node.edges[idx:], node.edges[idx+1:]) + node.edges[len(node.edges)-1] = edge[T]{} + node.edges = node.edges[:len(node.edges)-1] + } +} diff --git a/internal/radixtree/tree_test.go b/internal/radixtree/tree_test.go new file mode 100644 index 0000000..31e6877 --- /dev/null +++ b/internal/radixtree/tree_test.go @@ -0,0 +1,23 @@ +package radixtree + +//https://github.com/gammazero/radixtree/blob/master/tree_test.go +import ( + "encoding/binary" + "testing" +) + +func BenchmarkTree(b *testing.B) { + tree := New[int]() + key := make([]byte, 16) + for i := 0; i < b.N; i++ { + binary.BigEndian.PutUint64(key, uint64(i)) + tree.Put(key, i) + + n, _ := tree.Get(key) + + if n != i { + b.Fatalf("unexpected value: %v", n) + } + } + +} diff --git a/option.go b/option.go index 8b02427..183c3fb 100644 --- a/option.go +++ b/option.go @@ -1,33 +1,13 @@ package archivedb -import "github.com/cespare/xxhash/v2" - // Option sets parameters for archiveDB construction parameter type Option func(*option) error -// HashFunc defines a function to generate the hash which will be used as key in db -type HashFunc func([]byte) uint64 - -// DefaultHashFunc implements a default hash function -func DefaultHashFunc(b []byte) uint64 { - return xxhash.Sum64(b) -} - type option struct { - // hashFunc is used to generate the hash which will be used as key in db - hashFunc HashFunc // fsync is used to sync the data to disk fsync bool } -// HashFuncOption sets the hash func for the database -func HashFuncOption(h HashFunc) Option { - return func(db *option) error { - db.hashFunc = h - return nil - } -} - func FsyncOption(fsync bool) Option { return func(db *option) error { db.fsync = fsync