Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

Commit

Permalink
bmt: Introduce SectionWriter, implement hash.Hash in bmt (#2021)
Browse files Browse the repository at this point in the history
  • Loading branch information
nolash authored Feb 8, 2020
1 parent a836208 commit ac0845d
Show file tree
Hide file tree
Showing 9 changed files with 388 additions and 140 deletions.
325 changes: 234 additions & 91 deletions bmt/bmt.go

Large diffs are not rendered by default.

13 changes: 13 additions & 0 deletions bmt/bmt_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package bmt

import (
"fmt"
"testing"
)

func BenchmarkBMTUsed(t *testing.B) {
size := 4096
t.Run(fmt.Sprintf("%v_size_%v", "BMT", size), func(t *testing.B) {
benchmarkBMT(t, size)
})
}
153 changes: 112 additions & 41 deletions bmt/bmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package bmt

import (
"bytes"
"context"
"encoding/binary"
"fmt"
"math/rand"
Expand All @@ -30,8 +31,12 @@ import (
"golang.org/x/crypto/sha3"
)

func init() {
testutil.Init()
}

// the actual data length generated (could be longer than max datalength of the BMT)
const BufferSize = 4128
const bufferSize = 4128

const (
// segmentCount is the maximum number of segments of the underlying chunk
Expand All @@ -42,6 +47,8 @@ const (

var counts = []int{1, 2, 3, 4, 5, 8, 9, 15, 16, 17, 32, 37, 42, 53, 63, 64, 65, 111, 127, 128}

var benchmarkBMTResult []byte

// calculates the Keccak256 SHA3 hash of the data
func sha3hash(data ...[]byte) []byte {
h := sha3.NewLegacyKeccak256()
Expand Down Expand Up @@ -141,18 +148,18 @@ func TestHasherEmptyData(t *testing.T) {
defer pool.Drain(0)
bmt := New(pool)
rbmt := NewRefHasher(hasher, count)
refHash := rbmt.Hash(data)
expHash := syncHash(bmt, nil, data)
if !bytes.Equal(expHash, refHash) {
t.Fatalf("hash mismatch with reference. expected %x, got %x", refHash, expHash)
expHash := rbmt.Hash(data)
resHash := syncHash(bmt, 0, data)
if !bytes.Equal(expHash, resHash) {
t.Fatalf("hash mismatch with reference. expected %x, got %x", resHash, expHash)
}
})
}
}

// tests sequential write with entire max size written in one go
func TestSyncHasherCorrectness(t *testing.T) {
data := testutil.RandomBytes(1, BufferSize)
data := testutil.RandomBytes(1, bufferSize)
hasher := sha3.NewLegacyKeccak256
size := hasher().Size()

Expand All @@ -178,7 +185,7 @@ func TestSyncHasherCorrectness(t *testing.T) {

// tests order-neutral concurrent writes with entire max size written in one go
func TestAsyncCorrectness(t *testing.T) {
data := testutil.RandomBytes(1, BufferSize)
data := testutil.RandomBytes(1, bufferSize)
hasher := sha3.NewLegacyKeccak256
size := hasher().Size()
whs := []whenHash{first, last, random}
Expand All @@ -194,18 +201,20 @@ func TestAsyncCorrectness(t *testing.T) {
defer pool.Drain(0)
for n := 1; n <= max; n += incr {
incr = 1 + rand.Intn(5)
bmt := New(pool)
bmtobj := New(pool)
d := data[:n]
rbmt := NewRefHasher(hasher, count)
exp := rbmt.Hash(d)
got := syncHash(bmt, nil, d)
rbmtobj := NewRefHasher(hasher, count)
expNoMeta := rbmtobj.Hash(d)
h := hasher()
h.Write(ZeroSpan)
h.Write(expNoMeta)
exp := h.Sum(nil)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sw := NewAsyncHasher(ctx, bmtobj, double, nil)
got := asyncHashRandom(sw, 0, d, wh)
if !bytes.Equal(got, exp) {
t.Fatalf("wrong sync hash for datalength %v: expected %x (ref), got %x", n, exp, got)
}
sw := bmt.NewAsyncWriter(double)
got = asyncHashRandom(sw, nil, d, wh)
if !bytes.Equal(got, exp) {
t.Fatalf("wrong async hash for datalength %v: expected %x, got %x", n, exp, got)
t.Fatalf("wrong async hash (asyncpart) for datalength %v: expected %x, got %x", n, exp, got)
}
}
})
Expand All @@ -232,7 +241,7 @@ func testHasherReuse(poolsize int, t *testing.T) {
bmt := New(pool)

for i := 0; i < 100; i++ {
data := testutil.RandomBytes(1, BufferSize)
data := testutil.RandomBytes(1, bufferSize)
n := rand.Intn(bmt.Size())
err := testHasherCorrectness(bmt, hasher, data, n, segmentCount)
if err != nil {
Expand All @@ -252,7 +261,7 @@ func TestBMTConcurrentUse(t *testing.T) {
for i := 0; i < cycles; i++ {
go func() {
bmt := New(pool)
data := testutil.RandomBytes(1, BufferSize)
data := testutil.RandomBytes(1, bufferSize)
n := rand.Intn(bmt.Size())
errc <- testHasherCorrectness(bmt, hasher, data, n, 128)
}()
Expand Down Expand Up @@ -288,8 +297,12 @@ func TestBMTWriterBuffers(t *testing.T) {
bmt := New(pool)
data := testutil.RandomBytes(1, n)
rbmt := NewRefHasher(hasher, count)
refHash := rbmt.Hash(data)
expHash := syncHash(bmt, nil, data)
refNoMetaHash := rbmt.Hash(data)
h := hasher()
h.Write(ZeroSpan)
h.Write(refNoMetaHash)
refHash := h.Sum(nil)
expHash := syncHash(bmt, 0, data)
if !bytes.Equal(expHash, refHash) {
t.Fatalf("hash mismatch with reference. expected %x, got %x", refHash, expHash)
}
Expand All @@ -308,6 +321,7 @@ func TestBMTWriterBuffers(t *testing.T) {
return fmt.Errorf("incorrect read. expected %v bytes, got %v", buflen, read)
}
}
bmt.SetSpan(0)
hash := bmt.Sum(nil)
if !bytes.Equal(hash, expHash) {
return fmt.Errorf("hash mismatch. expected %x, got %x", hash, expHash)
Expand Down Expand Up @@ -346,11 +360,16 @@ func testHasherCorrectness(bmt *Hasher, hasher BaseHasherFunc, d []byte, n, coun
if len(d) < n {
n = len(d)
}
binary.BigEndian.PutUint64(span, uint64(n))
binary.LittleEndian.PutUint64(span, uint64(n))
data := d[:n]
rbmt := NewRefHasher(hasher, count)
exp := sha3hash(span, rbmt.Hash(data))
got := syncHash(bmt, span, data)
var exp []byte
if n == 0 {
exp = bmt.pool.zerohashes[bmt.pool.Depth]
} else {
exp = sha3hash(span, rbmt.Hash(data))
}
got := syncHash(bmt, n, data)
if !bytes.Equal(got, exp) {
return fmt.Errorf("wrong hash: expected %x, got %x", exp, got)
}
Expand Down Expand Up @@ -456,29 +475,34 @@ func benchmarkBMT(t *testing.B, n int) {
hasher := sha3.NewLegacyKeccak256
pool := NewTreePool(hasher, segmentCount, PoolSize)
bmt := New(pool)
var r []byte

t.ReportAllocs()
t.ResetTimer()
for i := 0; i < t.N; i++ {
syncHash(bmt, nil, data)
r = syncHash(bmt, 0, data)
}
benchmarkBMTResult = r
}

// benchmarks BMT hasher with asynchronous concurrent segment/section writes
func benchmarkBMTAsync(t *testing.B, n int, wh whenHash, double bool) {
data := testutil.RandomBytes(1, n)
hasher := sha3.NewLegacyKeccak256
pool := NewTreePool(hasher, segmentCount, PoolSize)
bmt := New(pool).NewAsyncWriter(double)
idxs, segments := splitAndShuffle(bmt.SectionSize(), data)
bmth := New(pool)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
bmtobj := NewAsyncHasher(ctx, bmth, double, nil)
idxs, segments := splitAndShuffle(bmtobj.SectionSize(), data)
rand.Shuffle(len(idxs), func(i int, j int) {
idxs[i], idxs[j] = idxs[j], idxs[i]
})

t.ReportAllocs()
t.ResetTimer()
for i := 0; i < t.N; i++ {
asyncHash(bmt, nil, n, wh, idxs, segments)
asyncHash(bmtobj, 0, n, wh, idxs, segments)
}
}

Expand All @@ -498,7 +522,7 @@ func benchmarkPool(t *testing.B, poolsize, n int) {
go func() {
defer wg.Done()
bmt := New(pool)
syncHash(bmt, nil, data)
syncHash(bmt, 0, data)
}()
}
wg.Wait()
Expand All @@ -519,8 +543,9 @@ func benchmarkRefHasher(t *testing.B, n int) {
}

// Hash hashes the data and the span using the bmt hasher
func syncHash(h *Hasher, span, data []byte) []byte {
h.ResetWithLength(span)
func syncHash(h *Hasher, spanLength int, data []byte) []byte {
h.Reset()
h.SetSpan(spanLength)
h.Write(data)
return h.Sum(nil)
}
Expand All @@ -547,37 +572,83 @@ func splitAndShuffle(secsize int, data []byte) (idxs []int, segments [][]byte) {
}

// splits the input data performs a random shuffle to mock async section writes
func asyncHashRandom(bmt SectionWriter, span []byte, data []byte, wh whenHash) (s []byte) {
idxs, segments := splitAndShuffle(bmt.SectionSize(), data)
return asyncHash(bmt, span, len(data), wh, idxs, segments)
func asyncHashRandom(bmtobj *AsyncHasher, spanLength int, data []byte, wh whenHash) (s []byte) {
idxs, segments := splitAndShuffle(bmtobj.SectionSize(), data)
return asyncHash(bmtobj, spanLength, len(data), wh, idxs, segments)
}

// mock for async section writes for BMT SectionWriter
// mock for async section writes for file.SectionWriter
// requires a permutation (a random shuffle) of list of all indexes of segments
// and writes them in order to the appropriate section
// the Sum function is called according to the wh parameter (first, last, random [relative to segment writes])
func asyncHash(bmt SectionWriter, span []byte, l int, wh whenHash, idxs []int, segments [][]byte) (s []byte) {
bmt.Reset()
func asyncHash(bmtobj *AsyncHasher, spanLength int, l int, wh whenHash, idxs []int, segments [][]byte) (s []byte) {
bmtobj.Reset()
if l == 0 {
return bmt.Sum(nil, l, span)
bmtobj.SetSpan(spanLength)
return bmtobj.SumIndexed(nil, l)
}
c := make(chan []byte, 1)
hashf := func() {
c <- bmt.Sum(nil, l, span)
bmtobj.SetSpan(spanLength)
c <- bmtobj.SumIndexed(nil, l)
}
maxsize := len(idxs)
var r int
if wh == random {
r = rand.Intn(maxsize)
}
for i, idx := range idxs {
bmt.Write(idx, segments[idx])
bmtobj.WriteIndexed(idx, segments[idx])
if (wh == first || wh == random) && i == r {
go hashf()
}
}
if wh == last {
return bmt.Sum(nil, l, span)
bmtobj.SetSpan(spanLength)
return bmtobj.SumIndexed(nil, l)
}
return <-c
}

// TestUseSyncAsOrdinaryHasher verifies that the bmt.Hasher can be used with the hash.Hash interface
func TestUseSyncAsOrdinaryHasher(t *testing.T) {
hasher := sha3.NewLegacyKeccak256
pool := NewTreePool(hasher, segmentCount, PoolSize)
bmt := New(pool)
bmt.SetSpan(3)
bmt.Write([]byte("foo"))
res := bmt.Sum(nil)
refh := NewRefHasher(hasher, 128)
resh := refh.Hash([]byte("foo"))
hsub := hasher()
span := LengthToSpan(3)
hsub.Write(span)
hsub.Write(resh)
refRes := hsub.Sum(nil)
if !bytes.Equal(res, refRes) {
t.Fatalf("normalhash; expected %x, got %x", refRes, res)
}
}

// TestUseAsyncAsOrdinaryHasher verifies that the bmt.Hasher can be used with the hash.Hash interface
func TestUseAsyncAsOrdinaryHasher(t *testing.T) {
hasher := sha3.NewLegacyKeccak256
pool := NewTreePool(hasher, segmentCount, PoolSize)
sbmt := New(pool)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
abmt := NewAsyncHasher(ctx, sbmt, false, nil)
abmt.SetSpan(3)
abmt.Write([]byte("foo"))
res := abmt.Sum(nil)
refh := NewRefHasher(hasher, 128)
resh := refh.Hash([]byte("foo"))
hsub := hasher()
span := LengthToSpan(3)
hsub.Write(span)
hsub.Write(resh)
refRes := hsub.Sum(nil)
if !bytes.Equal(res, refRes) {
t.Fatalf("normalhash; expected %x, got %x", refRes, res)
}
}
16 changes: 16 additions & 0 deletions file/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package file

import (
"context"
"hash"
)

type SectionWriterFunc func(ctx context.Context) SectionWriter

type SectionWriter interface {
hash.Hash // Write,Sum,Reset,Size,BlockSize
SetWriter(hashFunc SectionWriterFunc) SectionWriter // chain another SectionWriter the current instance
SetSpan(length int) // set data span of chunk
SectionSize() int // section size of this SectionWriter
Branches() int // branch factor of this SectionWriter
}
3 changes: 2 additions & 1 deletion storage/chunker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ func TestSha3ForCorrectness(t *testing.T) {
rawSha3Output := rawSha3.Sum(nil)

sha3FromMakeFunc := MakeHashFunc(SHA3Hash)()
sha3FromMakeFunc.ResetWithLength(input[:8])
sha3FromMakeFunc.Reset()
sha3FromMakeFunc.SetSpanBytes(input[:8])
sha3FromMakeFunc.Write(input[8:])
sha3FromMakeFuncOutput := sha3FromMakeFunc.Sum(nil)

Expand Down
3 changes: 2 additions & 1 deletion storage/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ func testStoreCorrect(m ChunkStore, n int, t *testing.T) {
}
hasher := MakeHashFunc(DefaultHash)()
data := chunk.Data()
hasher.ResetWithLength(data[:8])
hasher.Reset()
hasher.SetSpanBytes(data[:8])
hasher.Write(data[8:])
exp := hasher.Sum(nil)
if !bytes.Equal(h, exp) {
Expand Down
5 changes: 3 additions & 2 deletions storage/hasherstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,9 @@ func (h *hasherStore) startWait(ctx context.Context) {

func (h *hasherStore) createHash(chunkData ChunkData) Address {
hasher := h.hashFunc()
hasher.ResetWithLength(chunkData[:8]) // 8 bytes of length
hasher.Write(chunkData[8:]) // minus 8 []byte length
hasher.Reset()
hasher.SetSpanBytes(chunkData[:8]) // 8 bytes of length
hasher.Write(chunkData[8:]) // minus 8 []byte length
return hasher.Sum(nil)
}

Expand Down
4 changes: 2 additions & 2 deletions storage/swarmhasher.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ const (

type SwarmHash interface {
hash.Hash
ResetWithLength([]byte)
SetSpanBytes([]byte)
}

type HashWithLength struct {
hash.Hash
}

func (h *HashWithLength) ResetWithLength(length []byte) {
func (h *HashWithLength) SetSpanBytes(length []byte) {
h.Reset()
h.Write(length)
}
Loading

0 comments on commit ac0845d

Please sign in to comment.