Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

bmt, param: Introduce SectionHasher interface, implement in bmt #2021

Merged
merged 16 commits into from
Feb 8, 2020
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
325 changes: 234 additions & 91 deletions bmt/bmt.go

Large diffs are not rendered by default.

149 changes: 108 additions & 41 deletions bmt/bmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package bmt

import (
"bytes"
"context"
"encoding/binary"
"fmt"
"math/rand"
Expand All @@ -30,8 +31,12 @@ import (
"golang.org/x/crypto/sha3"
)

func init() {
testutil.Init()
}

// the actual data length generated (could be longer than max datalength of the BMT)
const BufferSize = 4128
const bufferSize = 4128

const (
// segmentCount is the maximum number of segments of the underlying chunk
Expand Down Expand Up @@ -141,18 +146,18 @@ func TestHasherEmptyData(t *testing.T) {
defer pool.Drain(0)
bmt := New(pool)
rbmt := NewRefHasher(hasher, count)
refHash := rbmt.Hash(data)
expHash := syncHash(bmt, nil, data)
if !bytes.Equal(expHash, refHash) {
t.Fatalf("hash mismatch with reference. expected %x, got %x", refHash, expHash)
expHash := rbmt.Hash(data)
resHash := syncHash(bmt, 0, data)
if !bytes.Equal(expHash, resHash) {
t.Fatalf("hash mismatch with reference. expected %x, got %x", resHash, expHash)
}
})
}
}

// tests sequential write with entire max size written in one go
func TestSyncHasherCorrectness(t *testing.T) {
data := testutil.RandomBytes(1, BufferSize)
data := testutil.RandomBytes(1, bufferSize)
hasher := sha3.NewLegacyKeccak256
size := hasher().Size()

Expand All @@ -178,7 +183,7 @@ func TestSyncHasherCorrectness(t *testing.T) {

// tests order-neutral concurrent writes with entire max size written in one go
func TestAsyncCorrectness(t *testing.T) {
data := testutil.RandomBytes(1, BufferSize)
data := testutil.RandomBytes(1, bufferSize)
hasher := sha3.NewLegacyKeccak256
size := hasher().Size()
whs := []whenHash{first, last, random}
Expand All @@ -194,18 +199,20 @@ func TestAsyncCorrectness(t *testing.T) {
defer pool.Drain(0)
for n := 1; n <= max; n += incr {
incr = 1 + rand.Intn(5)
bmt := New(pool)
bmtobj := New(pool)
d := data[:n]
rbmt := NewRefHasher(hasher, count)
exp := rbmt.Hash(d)
got := syncHash(bmt, nil, d)
rbmtobj := NewRefHasher(hasher, count)
expNoMeta := rbmtobj.Hash(d)
h := hasher()
h.Write(ZeroSpan)
h.Write(expNoMeta)
exp := h.Sum(nil)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sw := NewAsyncHasher(ctx, bmtobj, double, nil)
got := asyncHashRandom(sw, 0, d, wh)
if !bytes.Equal(got, exp) {
t.Fatalf("wrong sync hash for datalength %v: expected %x (ref), got %x", n, exp, got)
}
sw := bmt.NewAsyncWriter(double)
got = asyncHashRandom(sw, nil, d, wh)
if !bytes.Equal(got, exp) {
t.Fatalf("wrong async hash for datalength %v: expected %x, got %x", n, exp, got)
t.Fatalf("wrong async hash (asyncpart) for datalength %v: expected %x, got %x", n, exp, got)
}
}
})
Expand All @@ -232,7 +239,7 @@ func testHasherReuse(poolsize int, t *testing.T) {
bmt := New(pool)

for i := 0; i < 100; i++ {
data := testutil.RandomBytes(1, BufferSize)
data := testutil.RandomBytes(1, bufferSize)
n := rand.Intn(bmt.Size())
err := testHasherCorrectness(bmt, hasher, data, n, segmentCount)
if err != nil {
Expand All @@ -252,7 +259,7 @@ func TestBMTConcurrentUse(t *testing.T) {
for i := 0; i < cycles; i++ {
go func() {
bmt := New(pool)
data := testutil.RandomBytes(1, BufferSize)
data := testutil.RandomBytes(1, bufferSize)
n := rand.Intn(bmt.Size())
errc <- testHasherCorrectness(bmt, hasher, data, n, 128)
}()
Expand Down Expand Up @@ -288,8 +295,12 @@ func TestBMTWriterBuffers(t *testing.T) {
bmt := New(pool)
data := testutil.RandomBytes(1, n)
rbmt := NewRefHasher(hasher, count)
refHash := rbmt.Hash(data)
expHash := syncHash(bmt, nil, data)
refNoMetaHash := rbmt.Hash(data)
h := hasher()
h.Write(ZeroSpan)
h.Write(refNoMetaHash)
refHash := h.Sum(nil)
expHash := syncHash(bmt, 0, data)
if !bytes.Equal(expHash, refHash) {
t.Fatalf("hash mismatch with reference. expected %x, got %x", refHash, expHash)
}
Expand All @@ -308,6 +319,7 @@ func TestBMTWriterBuffers(t *testing.T) {
return fmt.Errorf("incorrect read. expected %v bytes, got %v", buflen, read)
}
}
bmt.SetSpan(0)
hash := bmt.Sum(nil)
if !bytes.Equal(hash, expHash) {
return fmt.Errorf("hash mismatch. expected %x, got %x", hash, expHash)
Expand Down Expand Up @@ -346,11 +358,16 @@ func testHasherCorrectness(bmt *Hasher, hasher BaseHasherFunc, d []byte, n, coun
if len(d) < n {
n = len(d)
}
binary.BigEndian.PutUint64(span, uint64(n))
binary.LittleEndian.PutUint64(span, uint64(n))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why LittleEndian suddenly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't remember off the top of my head, but at least it's same as in storage/types.go?

data := d[:n]
rbmt := NewRefHasher(hasher, count)
exp := sha3hash(span, rbmt.Hash(data))
got := syncHash(bmt, span, data)
var exp []byte
if n == 0 {
exp = bmt.pool.zerohashes[bmt.pool.Depth]
zelig marked this conversation as resolved.
Show resolved Hide resolved
} else {
exp = sha3hash(span, rbmt.Hash(data))
}
got := syncHash(bmt, n, data)
if !bytes.Equal(got, exp) {
return fmt.Errorf("wrong hash: expected %x, got %x", exp, got)
}
Expand Down Expand Up @@ -460,7 +477,7 @@ func benchmarkBMT(t *testing.B, n int) {
t.ReportAllocs()
t.ResetTimer()
for i := 0; i < t.N; i++ {
syncHash(bmt, nil, data)
syncHash(bmt, 0, data)
}
}

Expand All @@ -469,16 +486,19 @@ func benchmarkBMTAsync(t *testing.B, n int, wh whenHash, double bool) {
data := testutil.RandomBytes(1, n)
hasher := sha3.NewLegacyKeccak256
pool := NewTreePool(hasher, segmentCount, PoolSize)
bmt := New(pool).NewAsyncWriter(double)
idxs, segments := splitAndShuffle(bmt.SectionSize(), data)
bmth := New(pool)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
bmtobj := NewAsyncHasher(ctx, bmth, double, nil)
idxs, segments := splitAndShuffle(bmtobj.SectionSize(), data)
rand.Shuffle(len(idxs), func(i int, j int) {
idxs[i], idxs[j] = idxs[j], idxs[i]
})

t.ReportAllocs()
t.ResetTimer()
for i := 0; i < t.N; i++ {
asyncHash(bmt, nil, n, wh, idxs, segments)
asyncHash(bmtobj, 0, n, wh, idxs, segments)
}
}

Expand All @@ -498,7 +518,7 @@ func benchmarkPool(t *testing.B, poolsize, n int) {
go func() {
defer wg.Done()
bmt := New(pool)
syncHash(bmt, nil, data)
syncHash(bmt, 0, data)
}()
}
wg.Wait()
Expand All @@ -519,8 +539,9 @@ func benchmarkRefHasher(t *testing.B, n int) {
}

// Hash hashes the data and the span using the bmt hasher
func syncHash(h *Hasher, span, data []byte) []byte {
h.ResetWithLength(span)
func syncHash(h *Hasher, spanLength int, data []byte) []byte {
h.Reset()
h.SetSpan(spanLength)
h.Write(data)
return h.Sum(nil)
}
Expand All @@ -547,37 +568,83 @@ func splitAndShuffle(secsize int, data []byte) (idxs []int, segments [][]byte) {
}

// splits the input data performs a random shuffle to mock async section writes
func asyncHashRandom(bmt SectionWriter, span []byte, data []byte, wh whenHash) (s []byte) {
idxs, segments := splitAndShuffle(bmt.SectionSize(), data)
return asyncHash(bmt, span, len(data), wh, idxs, segments)
func asyncHashRandom(bmtobj *AsyncHasher, spanLength int, data []byte, wh whenHash) (s []byte) {
idxs, segments := splitAndShuffle(bmtobj.SectionSize(), data)
return asyncHash(bmtobj, spanLength, len(data), wh, idxs, segments)
}

// mock for async section writes for BMT SectionWriter
// mock for async section writes for file.SectionWriter
// requires a permutation (a random shuffle) of list of all indexes of segments
// and writes them in order to the appropriate section
// the Sum function is called according to the wh parameter (first, last, random [relative to segment writes])
func asyncHash(bmt SectionWriter, span []byte, l int, wh whenHash, idxs []int, segments [][]byte) (s []byte) {
bmt.Reset()
func asyncHash(bmtobj *AsyncHasher, spanLength int, l int, wh whenHash, idxs []int, segments [][]byte) (s []byte) {
bmtobj.Reset()
if l == 0 {
return bmt.Sum(nil, l, span)
bmtobj.SetSpan(spanLength)
return bmtobj.SumIndexed(nil, l)
}
c := make(chan []byte, 1)
hashf := func() {
c <- bmt.Sum(nil, l, span)
bmtobj.SetSpan(spanLength)
c <- bmtobj.SumIndexed(nil, l)
}
maxsize := len(idxs)
var r int
if wh == random {
r = rand.Intn(maxsize)
}
for i, idx := range idxs {
bmt.Write(idx, segments[idx])
bmtobj.WriteIndexed(idx, segments[idx])
if (wh == first || wh == random) && i == r {
go hashf()
}
}
if wh == last {
return bmt.Sum(nil, l, span)
bmtobj.SetSpan(spanLength)
return bmtobj.SumIndexed(nil, l)
}
return <-c
}

// TestUseSyncAsOrdinaryHasher verifies that the bmt.Hasher can be used with the hash.Hash interface
func TestUseSyncAsOrdinaryHasher(t *testing.T) {
hasher := sha3.NewLegacyKeccak256
pool := NewTreePool(hasher, segmentCount, PoolSize)
bmt := New(pool)
bmt.SetSpan(3)
bmt.Write([]byte("foo"))
res := bmt.Sum(nil)
refh := NewRefHasher(hasher, 128)
resh := refh.Hash([]byte("foo"))
hsub := hasher()
span := LengthToSpan(3)
hsub.Write(span)
hsub.Write(resh)
refRes := hsub.Sum(nil)
if !bytes.Equal(res, refRes) {
t.Fatalf("normalhash; expected %x, got %x", refRes, res)
}
}

// TestUseAsyncAsOrdinaryHasher verifies that the bmt.Hasher can be used with the hash.Hash interface
func TestUseAsyncAsOrdinaryHasher(t *testing.T) {
hasher := sha3.NewLegacyKeccak256
pool := NewTreePool(hasher, segmentCount, PoolSize)
sbmt := New(pool)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
abmt := NewAsyncHasher(ctx, sbmt, false, nil)
abmt.SetSpan(3)
abmt.Write([]byte("foo"))
res := abmt.Sum(nil)
refh := NewRefHasher(hasher, 128)
resh := refh.Hash([]byte("foo"))
hsub := hasher()
span := LengthToSpan(3)
hsub.Write(span)
hsub.Write(resh)
refRes := hsub.Sum(nil)
if !bytes.Equal(res, refRes) {
t.Fatalf("normalhash; expected %x, got %x", refRes, res)
}
}
16 changes: 16 additions & 0 deletions file/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package file

import (
"context"
"hash"
)

type SectionWriterFunc func(ctx context.Context) SectionWriter

type SectionWriter interface {
hash.Hash // Write,Sum,Reset,Size,BlockSize
SetWriter(hashFunc SectionWriterFunc) SectionWriter // chain another SectionWriter the current instance
SetSpan(length int) // set data span of chunk
SectionSize() int // section size of this SectionWriter
Branches() int // branch factor of this SectionWriter
}
3 changes: 2 additions & 1 deletion storage/chunker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ func TestSha3ForCorrectness(t *testing.T) {
rawSha3Output := rawSha3.Sum(nil)

sha3FromMakeFunc := MakeHashFunc(SHA3Hash)()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🌷 I know it's not part of this PR, but why not use constructor for Hasher instead of a func? If the func is needed maybe the builder can be extracted instead of this?

Copy link
Contributor Author

@nolash nolash Feb 7, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pradovic The SwarmHash needs the length of the data it represents prepended as an 64-bit value. The BMT hash has this builtin, and we extend the other types with HashWithLength to allow setting the length (SetSpanBytes, see storage/swarmhasher.go)

sha3FromMakeFunc.ResetWithLength(input[:8])
sha3FromMakeFunc.Reset()
sha3FromMakeFunc.SetSpanBytes(input[:8])
sha3FromMakeFunc.Write(input[8:])
sha3FromMakeFuncOutput := sha3FromMakeFunc.Sum(nil)

Expand Down
3 changes: 2 additions & 1 deletion storage/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,8 @@ func testStoreCorrect(m ChunkStore, n int, t *testing.T) {
}
hasher := MakeHashFunc(DefaultHash)()
data := chunk.Data()
hasher.ResetWithLength(data[:8])
hasher.Reset()
hasher.SetSpanBytes(data[:8])
hasher.Write(data[8:])
exp := hasher.Sum(nil)
if !bytes.Equal(h, exp) {
Expand Down
5 changes: 3 additions & 2 deletions storage/hasherstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,9 @@ func (h *hasherStore) startWait(ctx context.Context) {

func (h *hasherStore) createHash(chunkData ChunkData) Address {
hasher := h.hashFunc()
hasher.ResetWithLength(chunkData[:8]) // 8 bytes of length
hasher.Write(chunkData[8:]) // minus 8 []byte length
hasher.Reset()
hasher.SetSpanBytes(chunkData[:8]) // 8 bytes of length
hasher.Write(chunkData[8:]) // minus 8 []byte length
return hasher.Sum(nil)
}

Expand Down
4 changes: 2 additions & 2 deletions storage/swarmhasher.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ const (

type SwarmHash interface {
hash.Hash
ResetWithLength([]byte)
SetSpanBytes([]byte)
}

type HashWithLength struct {
hash.Hash
}

func (h *HashWithLength) ResetWithLength(length []byte) {
func (h *HashWithLength) SetSpanBytes(length []byte) {
h.Reset()
h.Write(length)
}
6 changes: 4 additions & 2 deletions storage/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ func GenerateRandomChunk(dataSize int64) Chunk {
sdata := make([]byte, dataSize+8)
rand.Read(sdata[8:])
binary.LittleEndian.PutUint64(sdata[:8], uint64(dataSize))
hasher.ResetWithLength(sdata[:8])
hasher.Reset()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now this is called twice

Copy link
Contributor Author

@nolash nolash Feb 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry I don't understand what you mean? You mean since we actually construct the hasher then Reset is redundant?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

hasher.SetSpanBytes(sdata[:8])
hasher.Write(sdata[8:])
return NewChunk(hasher.Sum(nil), sdata)
}
Expand Down Expand Up @@ -202,7 +203,8 @@ func (v *ContentAddressValidator) Validate(ch Chunk) bool {
}

hasher := v.Hasher()
hasher.ResetWithLength(data[:8])
hasher.Reset()
hasher.SetSpanBytes(data[:8])
hasher.Write(data[8:])
hash := hasher.Sum(nil)

Expand Down