Skip to content

Commit

Permalink
v2 init
Browse files Browse the repository at this point in the history
  • Loading branch information
millken committed Jun 13, 2023
1 parent 3c2715d commit 0daa740
Show file tree
Hide file tree
Showing 12 changed files with 283 additions and 552 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ jobs:
test:
strategy:
matrix:
go-version: [1.16.x, 1.17.x]
go-version: [1.18.x, 1.19.x, 1.20.x]
os: [ubuntu-latest, macos-latest, windows-latest]
runs-on: ${{ matrix.os }}
steps:
Expand Down
46 changes: 16 additions & 30 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"path/filepath"
"sync"

art "github.com/WenyXu/sync-adaptive-radix-tree"
"github.com/pkg/errors"
)

Expand All @@ -34,7 +35,7 @@ var (
type DB struct {
path string
opts *option
index *index
index art.Tree[*index]
segments []*segment
mu sync.RWMutex
}
Expand All @@ -58,9 +59,6 @@ func Open(path string, options ...Option) (db *DB, err error) {
}
}

if db.index, err = openIndex(db.IndexPath()); err != nil {
return nil, errors.Wrap(err, "open index")
}
// Open components.
if err := func() (err error) {
if err = db.openSegments(); err != nil {
Expand Down Expand Up @@ -138,19 +136,19 @@ func (db *DB) createSegment() (*segment, error) {
// IndexPath returns the path to the series index.
func (db *DB) IndexPath() string { return filepath.Join(db.path, "index") }

//Put put the value of the key to the db
// Put put the value of the key to the db
func (db *DB) Put(key, value []byte) error {
if err := validateKey(key); err != nil {
return err
}
if len(value) > int(MaxValueSize) {
return ErrValueTooLarge
}
return db.set(key, value, EntryInsertFlag)
return db.set(key, value, flagEntryPut)
}

//Put put the value of the key to the db
func (db *DB) set(key, value []byte, flag uint8) error {
// Put put the value of the key to the db
func (db *DB) set(key, value []byte, flag flag) error {
var err error
db.mu.Lock()
defer db.mu.Unlock()
Expand All @@ -164,45 +162,38 @@ func (db *DB) set(key, value []byte, flag uint8) error {
if err = segment.WriteEntry(entry); err != nil {
return err
}
hashKey := db.opts.hashFunc(key)
offset := segment.Size() - entry.Size()
if err = db.index.Insert(hashKey, segment.ID(), offset); err != nil {
return err
}
db.index.Insert(key, &index{
seg: segment.id,
off: offset,
})
if db.opts.fsync {
if err := segment.Flush(); err != nil {
return err
} else if err := db.index.Flush(); err != nil {
return err
}
}
return nil
}

//Get gets the value of the key
// Get gets the value of the key
func (db *DB) Get(key []byte) ([]byte, error) {
db.mu.RLock()
defer db.mu.RUnlock()
if err := validateKey(key); err != nil {
return nil, err
}
hashKey := db.opts.hashFunc(key)
item, ok := db.index.Get(hashKey)
if !ok {
idx, found := db.index.Search(key)
if !found {
return nil, ErrKeyNotFound
}
segment := db.segments[item.ID()]
segment := db.segments[idx.seg]
if segment == nil {
return nil, ErrSegmentNotFound
}
entry, err := segment.ReadEntry(item.Offset())
entry, err := segment.ReadEntry(idx.off)
if err != nil {
return nil, err
}
if entry.hdr.Flag == EntryDeleteFlag {
return nil, ErrKeyDeleted
}

if err := entry.verify(key); err != nil {
return nil, err
}
Expand All @@ -214,7 +205,7 @@ func (db *DB) Delete(key []byte) error {
if err := validateKey(key); err != nil {
return err
}
return db.set(key, nil, EntryDeleteFlag)
return db.set(key, nil, flagEntryDelete)
}

// Close closes the DB
Expand All @@ -225,11 +216,6 @@ func (db *DB) Close() error {
err = e
}
}
if db.index != nil {
if e := db.index.Close(); e != nil && err == nil {
err = e
}
}
return err
}

Expand Down
49 changes: 45 additions & 4 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package archivedb
import (
"bytes"
"fmt"
"hash/adler32"
"hash/crc32"
"io/ioutil"
"math/rand"
"os"
Expand All @@ -22,6 +24,49 @@ type benchmarkTestCase struct {
size int
}

func BenchmarkHashFunc(b *testing.B) {
crc32Hash := func(data []byte) uint64 {
return uint64(crc32.ChecksumIEEE(data))
}
adler32Hash := func(data []byte) uint64 {
return uint64(adler32.Checksum(data))
}
testSize := []struct {
name string
size int
}{
{"128B", 128},
{"256B", 256},
{"1K", 1024},
{"2K", 2048},
{"4K", 4096},
{"8K", 8192},
{"16K", 16384},
{"32K", 32768},
{"64K", 65536},
{"128K", 131072},
}
tests := []struct {
hash HashFunc
name string
}{
{DefaultHashFunc, "DefaultHashFunc"},
{adler32Hash, "adler32"},
{crc32Hash, "crc32"},
}
for _, size := range testSize {
b.Run(size.name, func(b *testing.B) {
val := bytes.Repeat([]byte{' '}, size.size)
for _, test := range tests {
b.Run(test.name, func(b *testing.B) {
for i := 0; i < b.N; i++ {
test.hash(val)
}
})
}
})
}
}
func TestDB(t *testing.T) {
require := require.New(t)
dir, cleanup := MustTempDir()
Expand Down Expand Up @@ -55,10 +100,6 @@ func TestDB(t *testing.T) {
for _, test := range tests {
err := db.Delete(test.key)
require.NoError(err)
v, err = db.Get([]byte("foo"))
require.Error(err)
require.ErrorIs(err, ErrKeyDeleted)
require.Nil(v)
}
require.NoError(db.Close())
}
Expand Down
84 changes: 15 additions & 69 deletions entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,7 @@ import (
)

const (
EntryMaxVersion = math.MaxUint8
EntryHeaderSize = 16
EntryFlagSize = 1
EntryInsertFlag uint8 = 1
EntryDeleteFlag uint8 = 2
EntryMaxVersion = math.MaxUint8
)

var CastagnoliCrcTable = crc32.MakeTable(crc32.Castagnoli)
Expand All @@ -26,93 +22,43 @@ var CastagnoliCrcTable = crc32.MakeTable(crc32.Castagnoli)
+----------+---------------+---------------+---------------+
*
*/
type EntryHeader struct {
ValueSize uint32
Checksum uint32
KeySize uint8
Flag uint8
_ [6]byte // padding
}

type entry struct {
key []byte
value []byte
hdr EntryHeader
}

func (hdr EntryHeader) EntrySize() uint32 {
return EntryHeaderSize + uint32(hdr.KeySize) + hdr.ValueSize
}

func (e *EntryHeader) Encode() []byte {
var b [EntryHeaderSize]byte
intconv.PutUint32(b[0:4], e.ValueSize)
intconv.PutUint32(b[4:8], e.Checksum)
b[9] = byte(e.KeySize)
b[10] = byte(e.Flag)

return b[:]
}

func (e *EntryHeader) String() string {
return fmt.Sprintf("Flag: %d, KeySize: %d, ValueSize: %d, Checksum: %d",
e.Flag, e.KeySize, e.ValueSize, e.Checksum)
}

func (e *entry) Size() uint32 {
return EntryHeaderSize + uint32(e.hdr.KeySize) + e.hdr.ValueSize
hdr *hdr
}

//go:noinline
func readEntryHeader(b []byte) (EntryHeader, error) {
if len(b) < EntryHeaderSize {
return EntryHeader{}, errors.Wrapf(ErrInvalidEntryHeader, "read entry header length %d", len(b))
}
return EntryHeader{
ValueSize: intconv.Uint32(b[0:4]),
Checksum: intconv.Uint32(b[4:8]),
KeySize: uint8(b[9]),
Flag: uint8(b[10]),
}, nil
}

func createEntry(flag uint8, key, value []byte) entry {
return entry{
func createEntry(flag flag, key, value []byte) *entry {
h := hdr{}
return &entry{
key: key,
value: value,
hdr: EntryHeader{
ValueSize: uint32(len(value)),
Checksum: crc32.Checksum(value, CastagnoliCrcTable),
KeySize: uint8(len(key)),
Flag: flag,
},
hdr: h.setFlag(flag).
setKeySize(uint8(len(key))).
setValueSize(uint32(len(value))).
setChecksum(crc32.Checksum(value, CastagnoliCrcTable)),
}
}

func (e *entry) verify(key []byte) error {
if e.hdr.KeySize != uint8(len(e.key)) || e.hdr.ValueSize != uint32(len(e.value)) {
if e.hdr.getKeySize() != uint8(len(e.key)) || e.hdr.getValueSize() != uint32(len(e.value)) {
return ErrLengthMismatch
}
if !bytes.Equal(e.key, key) {
return errors.Wrap(ErrKeyMismatch, "verify entry key")
}
if e.hdr.Checksum != crc32.Checksum(e.value, CastagnoliCrcTable) {
if e.hdr.getChecksum() != crc32.Checksum(e.value, CastagnoliCrcTable) {
return ErrChecksumFailed
}
return nil
}

func (e *entry) Size() uint32 {
return e.hdr.entrySize()
}

func (e *entry) String() string {
return fmt.Sprintf("Key: %s, Value: %s, Header: %s",
string(e.key), string(e.value), e.hdr.String())
}

// isValidEntryFlag returns true if flag is valid.
func isValidEntryFlag(flag uint8) bool {
switch flag {
case EntryInsertFlag, EntryDeleteFlag:
return true
default:
return false
}
}
22 changes: 8 additions & 14 deletions entry_test.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,12 @@
package archivedb

import (
"testing"
// func TestSafeEntry(t *testing.T) {
// require := require.New(t)
// k, v := []byte("foo"), []byte("bar")
// e := createEntry(flagEntryPut, k, v)

"github.com/stretchr/testify/require"
)
// require.Equal(e.key, k)
// require.Equal(e.value, v)
// require.Equal(e.Size(), uint32(len(k)+len(v)+EntryHeaderSize), "size mismatch")

func TestSafeEntry(t *testing.T) {
require := require.New(t)
k, v := []byte("foo"), []byte("bar")
e := createEntry(EntryInsertFlag, k, v)

require.Equal(e.key, k)
require.Equal(e.value, v)
require.Equal(e.Size(), uint32(len(k)+len(v)+EntryHeaderSize), "size mismatch")

}
// }
11 changes: 6 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
module github.com/millken/archivedb

go 1.17
go 1.20

require (
github.com/WenyXu/sync-adaptive-radix-tree v0.0.0-20221020123713-1ae3c4a8dd92
github.com/cespare/xxhash/v2 v2.1.2
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.7.0
golang.org/x/sys v0.0.0-20220403020550-483a9cbc67c0
github.com/stretchr/testify v1.8.0
golang.org/x/sys v0.0.0-20221010170243-090e33056c14
)

require (
github.com/davecgh/go-spew v1.1.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading

0 comments on commit 0daa740

Please sign in to comment.