Skip to content

Commit

Permalink
updated tests, added compression to BFiles
Browse files Browse the repository at this point in the history
  • Loading branch information
PaulSnow committed May 24, 2024
1 parent 546f927 commit e034ff0
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 65 deletions.
50 changes: 46 additions & 4 deletions internal/database/blockchainDB/bfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ type BFile struct {
EOB int // End of the current buffer... Where to put the next data 'write'
}


// Get
// Get the value for a given DBKeyFull
func (b *BFile) Get(Key [32]byte) (value []byte, err error) {
Expand Down Expand Up @@ -134,6 +133,7 @@ func (b *BFile) Close() {
panic(err)
}
}
b.Keys = nil // Drop the reference to the Keys map
b.bfWriter.Close(b.Buffer, b.EOB, eod) // Close that file
b.bfWriter = nil // kill any reference to the bfWriter
b.Buffer = nil // Close writes the buffer, and the file is closed. clear the buffer
Expand All @@ -160,8 +160,9 @@ func (b *BFile) CreateBuffers() {

// NewBFile
// Creates a new Buffered file. The caller is responsible for writing the header
func NewBFile( Filename string, BufferCnt int) (bFile *BFile, err error) {
func NewBFile(Filename string, BufferCnt int) (bFile *BFile, err error) {
bFile = new(BFile) // create a new BFile
bFile.FileName = Filename //
bFile.Keys = make(map[[32]byte]DBBKey) // Allocate the Keys map
bFile.BufferCnt = BufferCnt // How many buffers we are going to use
if bFile.File, err = os.Create(Filename); err != nil {
Expand Down Expand Up @@ -222,8 +223,6 @@ func (b *BFile) Write(Data []byte) error {
return b.Write(Data) // Write out the remaining data
}



// OpenBFile
// Open a DBBlock file at a given height for read/write access
// The only legitimate writes to a BFile would be to add/update keys
Expand Down Expand Up @@ -272,3 +271,46 @@ func OpenBFile(BufferCnt int, Filename string) (bFile *BFile, err error) {
return b, err
}

// Compress
// Reads the entire BFile into memory then writes it back out again.
// The BFile is closed. The new compressed BFile is returned, along with an error
// If an error is reported, the state of the BFile is undetermined.
func (b *BFile) Compress() (newBFile *BFile, err error) {

// Get the state of the BFile needed and
// Close the BFile (to flush it all to disk)
keys := b.Keys // These are the keys so far
EOD := b.EOD // The length of the values
filename := b.FileName // The filename

b.Close() // Close the BFile to force all its contents to disk
b.Block() // Block here to ensure all writes and the close completes

// Now read the values into a values buffer
values := make([]byte, EOD) // EOD provides the byte length of all values
file, err := os.Open(filename) // Open the file
if err != nil {
return nil, err
}
if cnt, err := file.Read(values); cnt != int(EOD) || err != nil {
if err != nil {
return nil, err
}
return nil, fmt.Errorf("read %d bytes, tried to read %d bytes",cnt,EOD)
}

// At this point, the keys have the offsets and lengths to each value
// Open a new BFile, and write all the keys and values. Now
// no gaps remain in the BFile for values that have had multiple values
// written to them.

if b, err = NewBFile(filename, 5); err != nil { // Create a new BFile
return nil, err
}
for k, v := range keys { // Write all the key value pairs
value := values[v.Offset : v.Offset+v.Length]
b.Put(k, value)
}

return b, nil
}
80 changes: 37 additions & 43 deletions internal/database/blockchainDB/bfile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,13 @@ package blockchainDB

import (
"bytes"
"crypto/sha256"
"fmt"
"os"
"path/filepath"
"testing"
"time"

"github.com/dgraph-io/badger"
"github.com/stretchr/testify/assert"
"gitlab.com/accumulatenetwork/accumulate/internal/database/smt/common"
)

const (
Expand All @@ -24,13 +21,13 @@ const (
MaxBadgerSize = 10_000 // Maximum size of Badger tx
)

var Directory = filepath.Join(os.TempDir(), "DBBlock")
var Directory = filepath.Join(os.TempDir(), "ShardDBTest")
var KeySliceDir = filepath.Join(Directory, "keySlice")

func TestWriteSmallKeys(t *testing.T) {

filename := filepath.Join(os.TempDir(), "BFileTest.dat")
bFile, err := NewBFile( filename,5)
bFile, err := NewBFile(filename, 5)
assert.NoError(t, err, "expected no error creating BBFile")

getKey := func(v byte) (r [32]byte) {
Expand All @@ -56,7 +53,7 @@ func TestWriteKeys(t *testing.T) {

start := time.Now()
filename := filepath.Join(os.TempDir(), "BFileTest.dat")
bFile, err := NewBFile(filename,5)
bFile, err := NewBFile(filename, 5)
assert.NoError(t, err, "expected no error creating BBFile")

fr := NewFastRandom([32]byte{}) // Make a random number generator
Expand All @@ -79,7 +76,7 @@ func TestReadKeys(t *testing.T) {
os.Mkdir(Directory, os.ModePerm)

filename := filepath.Join(os.TempDir(), "BFileTest.dat")
bFile, err := NewBFile(filename,5)
bFile, err := NewBFile(filename, 5)
assert.NoError(t, err, "expected no error creating BBFile")

fr := NewFastRandom([32]byte{}) // Make a random number generator
Expand Down Expand Up @@ -110,46 +107,43 @@ func TestReadKeys(t *testing.T) {

}

func TestBadger(t *testing.T) {
fmt.Println("TestBadger 1141.7 t/s")
func TestCompress(t *testing.T) {

os.RemoveAll(Directory)
os.Mkdir(Directory, os.ModePerm)

badgerFile := filepath.Join(Directory, "badger")
DB, err := badger.Open(badger.DefaultOptions(badgerFile))
defer func() {
_ = DB.Close()
}()
assert.NoError(t, err, "failed to open badger db")
tx := DB.NewTransaction(true)
txSize := 0

start := time.Now()

var rh common.RandHash
for i := 0; i < Writes; i++ {
value := rh.GetRandBuff(rh.GetIntN(MaxSize-MinSize) + MinSize)
key := sha256.Sum256(value)

if txSize+len(value) > MaxBadgerSize {
err = tx.Commit()
assert.NoError(t, err, "fail to commit")
tx.Discard()
tx = DB.NewTransaction(true)
}
err = tx.Set(key[:], value)
assert.NoError(t, err, "badger Set fail")
txSize += len(value) + 64 // figure txSize is value + key + overhead. Not exact.
fr := NewFastRandom([32]byte{1, 2, 3, 4, 5}) // Random generator

TotalKeys := 100000 // Allocate some keys
Keys := make(map[int][32]byte) // put into a map
for i := 0; i < TotalKeys; i++ {
Keys[i] = fr.NextHash()
}
err = tx.Commit()
assert.NoError(t, err, "fail to commit")
tx.Discard()

for i := 0; i < Writes; i++ {

TotalCompressions := 10 // Compress this many times after writing
TotalWrites := 10000 // so many keys

Directory := filepath.Join(os.TempDir(), "dynamic")
os.Mkdir(Directory,os.ModePerm)
defer os.RemoveAll(Directory)
filename := filepath.Join(Directory, "shard.dat")

BFile,err := NewBFile(filename, 5)
assert.NoError(t,err,"failed to create BFile")

for i := 0; i < TotalCompressions; i++ {
newLen :=0
for i := 0; i < TotalWrites; i++ {
key := Keys[int(fr.UintN(uint(TotalKeys)))]
value := fr.RandBuff(200, 1024)
err := BFile.Put(key,value)
newLen += len(value)
assert.NoError(t,err,"failed to write value")
}
fmt.Printf("compress %3d ",i)
BFile, err = BFile.Compress()
fmt.Print("!\n")
if err != nil {
panic(err)
}
assert.NoError(t,err,"failed to compress")
}

fmt.Printf("%10.1f t/s", float64(Writes)/time.Since(start).Seconds())
}
55 changes: 50 additions & 5 deletions internal/database/blockchainDB/shard.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
package blockchainDB

import (
"math/rand"
"os"
"sync"
"time"
)

// Note that this code does not behave properly if compiled prior to 1.20
// See notes on rand
// https://stackoverflow.com/questions/75597325/rand-seedseed-is-deprecated-how-to-use-newrandnewseed

// Shard
// Holds the stuff required to access a shard.
type Shard struct {
BufferCnt int // Number of buffers to use
Filename string // The file with the BFile
BFile *BFile // The BFile
Cache map[[32]byte][]byte // Cache of what is being written to the cache
KeyCount int // How many keys in the BFile
KeyWrites int // How many writes since compression
Mutex sync.Mutex // Keeps compression from conflict with access
}

Expand All @@ -25,9 +33,34 @@ func NewShard(BufferCnt int, Filename string) (shard *Shard, err error) {
if shard.BFile, err = NewBFile(Filename, BufferCnt); err != nil {
return nil, err
}
go shard.process()
return shard, err
}


// process
// Note that process calls rand.Intn() which isn't randomized without a call to
// rand.Seed()
func (s *Shard) process() {
for {
r := time.Duration(rand.Intn(5))
time.Sleep(r*time.Second + 15)
s.compress()
}
}

func (s *Shard) compress() {
s.Mutex.Lock()
defer s.Mutex.Unlock()
var err error
if s.KeyWrites > 1000 {
s.BFile, err = s.BFile.Compress()
if err != nil {
panic(err)
}
}
}

// Close
// Clean up and close the Shard
func (s *Shard) Close() {
Expand All @@ -42,6 +75,9 @@ func (s *Shard) Close() {
// Open
// Open an existing Shard. If the BFile does not exist, create it
func (s *Shard) Open() (err error) {
s.Mutex.Lock()
defer s.Mutex.Unlock()

if s.BFile != nil {
return
}
Expand All @@ -51,21 +87,30 @@ func (s *Shard) Open() (err error) {
}
return err
}
s.KeyCount = len(s.BFile.Keys)
s.KeyWrites = 0
return nil
}

// Put
// Put a key value pair into the shard
func (s *Shard) Put (key [32]byte, value []byte) error {
s.Cache[key]=value
return s.BFile.Put(key,value)
func (s *Shard) Put(key [32]byte, value []byte) error {
s.Mutex.Lock()
defer s.Mutex.Unlock()

s.KeyWrites++
s.Cache[key] = value
return s.BFile.Put(key, value)
}

// Get
// Get the value for the key out of the shard
func (s *Shard) Get (key [32]byte) ( value []byte, err error) {
func (s *Shard) Get(key [32]byte) (value []byte, err error) {
s.Mutex.Lock()
defer s.Mutex.Unlock()

if value, ok := s.Cache[key]; ok {
return value, nil
}
return s.BFile.Get(key)
}
}
18 changes: 11 additions & 7 deletions internal/database/blockchainDB/shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"os"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/assert"
)
Expand All @@ -21,22 +22,25 @@ func TestShard(t *testing.T) {
entries := make(map[[32]byte][]byte)
fr := NewFastRandom([32]byte{1, 2, 3, 4, 5})

for i := 0; i < 100; i++ {
if i%10 == 0 && i != 0 {
fmt.Printf("%d ",i)
if i%100==0 {
fmt.Println()
}
writes :=0
reads :=0
start := time.Now()
for i := 0; i < 10000; i++ {
if i%100 == 0 && i != 0 {
fmt.Printf("Writes: %10d Reads %10d %13.0f/s \n",writes,reads,
float64(writes+reads)/time.Since(start).Seconds())
}
for i := 0; i < 100; i++ {
for i := 0; i < 10; i++ {
entries[fr.NextHash()] = fr.RandBuff(100, 500)
}
for k := range entries {
nv := fr.RandBuff(100, 500)
writes++
shard.Put(k, nv)
entries[k] = nv
}
for k, v := range entries {
reads++
v2, err := shard.Get(k)
assert.NoError(t, err, err)
assert.Equal(t, v, v2, "Didn't get the right value back")
Expand Down
6 changes: 3 additions & 3 deletions internal/database/blockchainDB/sharddb.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
)

const (
ShardBits = 9
Shards = 512 // Number of shards in bits
ShardBits = 8
Shards = 256 // Number of shards in bits
)

// ShardDB
Expand All @@ -22,7 +22,7 @@ type ShardDB struct {
Shards [Shards]*Shard // List of all the Shards
}

func CreateShardDB(Directory string, Partition, BufferCnt int) (SDB *ShardDB, err error) {
func NewShardDB(Directory string, Partition, BufferCnt int) (SDB *ShardDB, err error) {
_, err = os.Stat(Directory)
if err == nil {
return nil, fmt.Errorf("cannot create ShardDB; directory %s exists", Directory)
Expand Down
Loading

0 comments on commit e034ff0

Please sign in to comment.