diff --git a/bmt/bmt.go b/bmt/bmt.go index 18eab5a2bc..1013fee0fd 100644 --- a/bmt/bmt.go +++ b/bmt/bmt.go @@ -18,11 +18,17 @@ package bmt import ( + "context" + "encoding/binary" + "errors" "fmt" "hash" "strings" "sync" "sync/atomic" + + "github.com/ethersphere/swarm/file" + "github.com/ethersphere/swarm/log" ) /* @@ -60,6 +66,10 @@ const ( PoolSize = 8 ) +var ( + ZeroSpan = make([]byte, 8) +) + // BaseHasherFunc is a hash.Hash constructor function used for the base hash of the BMT. // implemented by Keccak256 SHA3 sha3.NewLegacyKeccak256 type BaseHasherFunc func() hash.Hash @@ -75,8 +85,13 @@ type BaseHasherFunc func() hash.Hash // the tree and itself in a state reusable for hashing a new chunk // - generates and verifies segment inclusion proofs (TODO:) type Hasher struct { - pool *TreePool // BMT resource pool - bmt *tree // prebuilt BMT resource for flowcontrol and proofs + mtx sync.Mutex // protects Hasher.size increments (temporary solution) + pool *TreePool // BMT resource pool + bmt *tree // prebuilt BMT resource for flowcontrol and proofs + size int // bytes written to Hasher since last Reset() + cursor int // cursor to write to on next Write() call + errFunc func(error) + ctx context.Context } // New creates a reusable BMT Hasher that @@ -276,48 +291,77 @@ func newTree(segmentSize, depth int, hashfunc func() hash.Hash) *tree { } } -// methods needed to implement hash.Hash +// SetWriter implements file.SectionWriter +func (h *Hasher) SetWriter(_ file.SectionWriterFunc) file.SectionWriter { + log.Warn("Synchasher does not currently support SectionWriter chaining") + return h +} -// Size returns the size +// SectionSize implements file.SectionWriter +func (h *Hasher) SectionSize() int { + return h.pool.SegmentSize +} + +// SetSpan implements file.SectionWriter +func (h *Hasher) SetSpan(length int) { + span := LengthToSpan(length) + h.getTree().span = span +} + +// SetSpanBytes implements storage.SwarmHash +func (h *Hasher) SetSpanBytes(b []byte) { + t := h.getTree() + t.span = make([]byte, 8) + copy(t.span, b) +} + +// Branches implements file.SectionWriter +func (h *Hasher) Branches() int { + return h.pool.SegmentCount +} + +// Size implements hash.Hash and file.SectionWriter func (h *Hasher) Size() int { return h.pool.SegmentSize } -// BlockSize returns the block size +// BlockSize implements hash.Hash and file.SectionWriter func (h *Hasher) BlockSize() int { return 2 * h.pool.SegmentSize } // Sum returns the BMT root hash of the buffer // using Sum presupposes sequential synchronous writes (io.Writer interface) -// hash.Hash interface Sum method appends the byte slice to the underlying -// data before it calculates and returns the hash of the chunk -// caller must make sure Sum is not called concurrently with Write, writeSection +// Implements hash.Hash in file.SectionWriter func (h *Hasher) Sum(b []byte) (s []byte) { t := h.getTree() + if h.size == 0 && t.offset == 0 { + h.releaseTree() + //return h.pool.zerohashes[h.pool.Depth] + return h.GetZeroHash() + } // write the last section with final flag set to true - go h.writeSection(t.cursor, t.section, true, true) + go h.WriteSection(t.cursor, t.section, true, true) // wait for the result s = <-t.result + if t.span == nil { + t.span = LengthToSpan(h.size) + } span := t.span // release the tree resource back to the pool h.releaseTree() - // b + sha3(span + BMT(pure_chunk)) - if len(span) == 0 { - return append(b, s...) - } return doSum(h.pool.hasher(), b, span, s) } -// methods needed to implement the SwarmHash and the io.Writer interfaces - // Write calls sequentially add to the buffer to be hashed, -// with every full segment calls writeSection in a go routine +// with every full segment calls WriteSection in a go routine +// Implements hash.Hash and file.SectionWriter func (h *Hasher) Write(b []byte) (int, error) { l := len(b) if l == 0 || l > h.pool.Size { return 0, nil } + h.size += len(b) t := h.getTree() secsize := 2 * h.pool.SegmentSize // calculate length of missing bit to complete current open section @@ -344,7 +388,7 @@ func (h *Hasher) Write(b []byte) (int, error) { // read full sections and the last possibly partial section from the input buffer for smax < l { // section complete; push to tree asynchronously - go h.writeSection(t.cursor, t.section, true, false) + go h.WriteSection(t.cursor, t.section, true, false) // reset section t.section = make([]byte, secsize) // copy from input buffer at smax to right half of section @@ -358,21 +402,13 @@ func (h *Hasher) Write(b []byte) (int, error) { return l, nil } -// Reset needs to be called before writing to the hasher +// Reset implements hash.Hash and file.SectionWriter func (h *Hasher) Reset() { + h.cursor = 0 + h.size = 0 h.releaseTree() } -// methods needed to implement the SwarmHash interface - -// ResetWithLength needs to be called before writing to the hasher -// the argument is supposed to be the byte slice binary representation of -// the length of the data subsumed under the hash, i.e., span -func (h *Hasher) ResetWithLength(span []byte) { - h.Reset() - h.getTree().span = span -} - // releaseTree gives back the Tree to the pool whereby it unlocks // it resets tree, segment and index func (h *Hasher) releaseTree() { @@ -394,34 +430,31 @@ func (h *Hasher) releaseTree() { }() } -// NewAsyncWriter extends Hasher with an interface for concurrent segment/section writes -func (h *Hasher) NewAsyncWriter(double bool) *AsyncHasher { - secsize := h.pool.SegmentSize +// NewAsyncWriter extends Hasher with an interface for concurrent segment.GetSection() writes +// TODO: Instead of explicitly setting double size of segment should be dynamic and chunked internally. If not, we have to keep different bmt hashers generation functions for different purposes in the same instance, or cope with added complexity of bmt hasher generation functions having to receive parameters +func NewAsyncHasher(ctx context.Context, h *Hasher, double bool, errFunc func(error)) *AsyncHasher { + secsize := h.SectionSize() if double { secsize *= 2 } - write := func(i int, section []byte, final bool) { - h.writeSection(i, section, double, final) + seccount := h.Branches() + if double { + seccount /= 2 } return &AsyncHasher{ - Hasher: h, - double: double, - secsize: secsize, - write: write, + Hasher: h, + double: double, + secsize: secsize, + seccount: seccount, + ctx: ctx, + errFunc: errFunc, } } -// SectionWriter is an asynchronous segment/section writer interface -type SectionWriter interface { - Reset() // standard init to be called before reuse - Write(index int, data []byte) // write into section of index - Sum(b []byte, length int, span []byte) []byte // returns the hash of the buffer - SectionSize() int // size of the async section unit to use -} - -// AsyncHasher extends BMT Hasher with an asynchronous segment/section writer interface -// AsyncHasher is unsafe and does not check indexes and section data lengths -// it must be used with the right indexes and length and the right number of sections +// AsyncHasher extends BMT Hasher with an asynchronous segment.GetSection() writer interface +// AsyncHasher cannot be used as with a hash.Hash interface: It must be used with the +// right indexes and length and the right number of sections +// It is unsafe and does not check indexes and section data lengths // // behaviour is undefined if // * non-final sections are shorter or longer than secsize @@ -434,53 +467,77 @@ type SectionWriter interface { // * it will not leak processes if not all sections are written but it blocks // and keeps the resource which can be released calling Reset() type AsyncHasher struct { - *Hasher // extends the Hasher - mtx sync.Mutex // to lock the cursor access - double bool // whether to use double segments (call Hasher.writeSection) - secsize int // size of base section (size of hash or double) - write func(i int, section []byte, final bool) + *Hasher // extends the Hasher + mtx sync.Mutex // to lock the cursor access + double bool // whether to use double segments (call Hasher.writeSection) + secsize int // size of base section (size of hash or double) + seccount int // base section count + write func(i int, section []byte, final bool) + errFunc func(error) + ctx context.Context + all bool // if all written in one go, temporary workaround +} + +func (sw *AsyncHasher) raiseError(err string) { + if sw.errFunc != nil { + sw.errFunc(errors.New(err)) + } } -// methods needed to implement AsyncWriter +// Reset implements file.SectionWriter +func (sw *AsyncHasher) Reset() { + sw.all = false + sw.Hasher.Reset() +} -// SectionSize returns the size of async section unit to use +// SectionSize implements file.SectionWriter func (sw *AsyncHasher) SectionSize() int { return sw.secsize } -// Write writes the i-th section of the BMT base +// Branches implements file.SectionWriter +func (sw *AsyncHasher) Branches() int { + return sw.seccount +} + +// WriteSection writes the i-th section of the BMT base // this function can and is meant to be called concurrently // it sets max segment threadsafely -func (sw *AsyncHasher) Write(i int, section []byte) { +func (sw *AsyncHasher) WriteIndexed(i int, section []byte) { sw.mtx.Lock() defer sw.mtx.Unlock() - t := sw.getTree() - // cursor keeps track of the rightmost section written so far + t := sw.GetTree() + // cursor keeps track of the rightmost.GetSection() written so far // if index is lower than cursor then just write non-final section as is - if i < t.cursor { + if i < sw.Hasher.GetCursor() { // if index is not the rightmost, safe to write section - go sw.write(i, section, false) + go sw.WriteSection(i, section, sw.double, false) return } - // if there is a previous rightmost section safe to write section - if t.offset > 0 { - if i == t.cursor { + // if there is a previous rightmost.GetSection() safe to write section + if t.GetOffset() > 0 { + if i == sw.Hasher.GetCursor() { // i==cursor implies cursor was set by Hash call so we can write section as final one // since it can be shorter, first we copy it to the padded buffer - t.section = make([]byte, sw.secsize) - copy(t.section, section) - go sw.write(i, t.section, true) + //t.GetSection() = make([]byte, sw.secsize) + //copy(t.GetSection(), section) + // TODO: Consider whether the section here needs to be copied, maybe we can enforce not change the original slice + copySection := make([]byte, sw.secsize) + copy(copySection, section) + t.SetSection(copySection) + go sw.Hasher.WriteSection(i, t.GetSection(), sw.double, true) return } // the rightmost section just changed, so we write the previous one as non-final - go sw.write(t.cursor, t.section, false) + go sw.WriteSection(sw.Hasher.GetCursor(), t.GetSection(), sw.double, false) } - // set i as the index of the righmost section written so far - // set t.offset to cursor*secsize+1 - t.cursor = i - t.offset = i*sw.secsize + 1 - t.section = make([]byte, sw.secsize) - copy(t.section, section) + // set i as the index of the righmost.GetSection() written so far + // set t.GetOffset() to cursor*secsize+1 + sw.Hasher.SetCursor(i) + t.SetOffset(i*sw.secsize + 1) + copySection := make([]byte, sw.secsize) + copy(copySection, section) + t.SetSection(copySection) } // Sum can be called any time once the length and the span is known @@ -492,35 +549,49 @@ func (sw *AsyncHasher) Write(i int, section []byte) { // length: known length of the input (unsafe; undefined if out of range) // meta: metadata to hash together with BMT root for the final digest // e.g., span for protection against existential forgery -func (sw *AsyncHasher) Sum(b []byte, length int, meta []byte) (s []byte) { +func (sw *AsyncHasher) SumIndexed(b []byte, length int) (s []byte) { sw.mtx.Lock() - t := sw.getTree() + t := sw.GetTree() if length == 0 { + sw.ReleaseTree() sw.mtx.Unlock() - s = sw.pool.zerohashes[sw.pool.Depth] + s = sw.Hasher.GetZeroHash() + return } else { - // for non-zero input the rightmost section is written to the tree asynchronously - // if the actual last section has been written (t.cursor == length/t.secsize) + // for non-zero input the rightmost.GetSection() is written to the tree asynchronously + // if the actual last.GetSection() has been written (sw.Hasher.GetCursor() == length/t.secsize) maxsec := (length - 1) / sw.secsize - if t.offset > 0 { - go sw.write(t.cursor, t.section, maxsec == t.cursor) + if t.GetOffset() > 0 { + go sw.Hasher.WriteSection(sw.Hasher.GetCursor(), t.GetSection(), sw.double, maxsec == sw.Hasher.GetCursor()) } - // set cursor to maxsec so final section is written when it arrives - t.cursor = maxsec - t.offset = length - result := t.result + // sesw.Hasher.GetCursor() to maxsec so final section is written when it arrives + sw.Hasher.SetCursor(maxsec) + t.SetOffset(length) + // TODO: must this t.result channel be within lock? + result := t.GetResult() sw.mtx.Unlock() // wait for the result or reset s = <-result } // relesase the tree back to the pool - sw.releaseTree() - // if no meta is given just append digest to b - if len(meta) == 0 { - return append(b, s...) - } + meta := t.GetSpan() + sw.ReleaseTree() // hash together meta and BMT root hash using the pools - return doSum(sw.pool.hasher(), b, meta, s) + hsh := sw.Hasher.GetHasher() + hsh.Reset() + hsh.Write(meta) + hsh.Write(s) + return hsh.Sum(b) +} + +// Writesection writes data to the data level in the section at index i. +// Setting final to true tells the hasher no further data will be written and prepares the data for h.Sum() +// TODO remove double as argument, push responsibility for handling data context to caller +func (h *Hasher) WriteSection(i int, section []byte, double bool, final bool) { + h.mtx.Lock() + h.size += len(section) + h.mtx.Unlock() + h.writeSection(i, section, double, final) } // writeSection writes the hash of i-th section into level 1 node of the BMT tree @@ -688,3 +759,75 @@ func calculateDepthFor(n int) (d int) { } return d + 1 } + +// LengthToSpan creates a binary data span size representation +// It is required for calculating the BMT hash +func LengthToSpan(length int) []byte { + spanBytes := make([]byte, 8) + binary.LittleEndian.PutUint64(spanBytes, uint64(length)) + return spanBytes +} + +// ASYNCHASHER ACCESSORS +// All methods below here are exported to enable access for AsyncHasher +// + +// GetHasher returns a new instance of the underlying hasher +func (h *Hasher) GetHasher() hash.Hash { + return h.pool.hasher() +} + +// GetZeroHash returns the zero hash of the full depth of the Hasher instance +func (h *Hasher) GetZeroHash() []byte { + return h.pool.zerohashes[h.pool.Depth] +} + +// GetTree gets the underlying tree in use by the Hasher +func (h *Hasher) GetTree() *tree { + return h.getTree() +} + +// GetTree releases the underlying tree in use by the Hasher +func (h *Hasher) ReleaseTree() { + h.releaseTree() +} + +// GetCursor returns the current write cursor for the Hasher +func (h *Hasher) GetCursor() int { + return h.cursor +} + +// GetCursor assigns the value of the current write cursor for the Hasher +func (h *Hasher) SetCursor(c int) { + h.cursor = c +} + +// GetOffset returns the write offset within the current section of the Hasher +func (t *tree) GetOffset() int { + return t.offset +} + +// GetOffset assigns the value of the write offset within the current section of the Hasher +func (t *tree) SetOffset(offset int) { + t.offset = offset +} + +// GetSection returns the current section Hasher is operating on +func (t *tree) GetSection() []byte { + return t.section +} + +// SetSection assigns the current section Hasher is operating on +func (t *tree) SetSection(b []byte) { + t.section = b +} + +// GetResult returns the result channel of the Hasher +func (t *tree) GetResult() <-chan []byte { + return t.result +} + +// GetSpan returns the span set by SetSpan +func (t *tree) GetSpan() []byte { + return t.span +} diff --git a/bmt/bmt_bench_test.go b/bmt/bmt_bench_test.go new file mode 100644 index 0000000000..05b8308344 --- /dev/null +++ b/bmt/bmt_bench_test.go @@ -0,0 +1,13 @@ +package bmt + +import ( + "fmt" + "testing" +) + +func BenchmarkBMTUsed(t *testing.B) { + size := 4096 + t.Run(fmt.Sprintf("%v_size_%v", "BMT", size), func(t *testing.B) { + benchmarkBMT(t, size) + }) +} diff --git a/bmt/bmt_test.go b/bmt/bmt_test.go index fc020eb7c2..2662359ba4 100644 --- a/bmt/bmt_test.go +++ b/bmt/bmt_test.go @@ -18,6 +18,7 @@ package bmt import ( "bytes" + "context" "encoding/binary" "fmt" "math/rand" @@ -30,8 +31,12 @@ import ( "golang.org/x/crypto/sha3" ) +func init() { + testutil.Init() +} + // the actual data length generated (could be longer than max datalength of the BMT) -const BufferSize = 4128 +const bufferSize = 4128 const ( // segmentCount is the maximum number of segments of the underlying chunk @@ -42,6 +47,8 @@ const ( var counts = []int{1, 2, 3, 4, 5, 8, 9, 15, 16, 17, 32, 37, 42, 53, 63, 64, 65, 111, 127, 128} +var benchmarkBMTResult []byte + // calculates the Keccak256 SHA3 hash of the data func sha3hash(data ...[]byte) []byte { h := sha3.NewLegacyKeccak256() @@ -141,10 +148,10 @@ func TestHasherEmptyData(t *testing.T) { defer pool.Drain(0) bmt := New(pool) rbmt := NewRefHasher(hasher, count) - refHash := rbmt.Hash(data) - expHash := syncHash(bmt, nil, data) - if !bytes.Equal(expHash, refHash) { - t.Fatalf("hash mismatch with reference. expected %x, got %x", refHash, expHash) + expHash := rbmt.Hash(data) + resHash := syncHash(bmt, 0, data) + if !bytes.Equal(expHash, resHash) { + t.Fatalf("hash mismatch with reference. expected %x, got %x", resHash, expHash) } }) } @@ -152,7 +159,7 @@ func TestHasherEmptyData(t *testing.T) { // tests sequential write with entire max size written in one go func TestSyncHasherCorrectness(t *testing.T) { - data := testutil.RandomBytes(1, BufferSize) + data := testutil.RandomBytes(1, bufferSize) hasher := sha3.NewLegacyKeccak256 size := hasher().Size() @@ -178,7 +185,7 @@ func TestSyncHasherCorrectness(t *testing.T) { // tests order-neutral concurrent writes with entire max size written in one go func TestAsyncCorrectness(t *testing.T) { - data := testutil.RandomBytes(1, BufferSize) + data := testutil.RandomBytes(1, bufferSize) hasher := sha3.NewLegacyKeccak256 size := hasher().Size() whs := []whenHash{first, last, random} @@ -194,18 +201,20 @@ func TestAsyncCorrectness(t *testing.T) { defer pool.Drain(0) for n := 1; n <= max; n += incr { incr = 1 + rand.Intn(5) - bmt := New(pool) + bmtobj := New(pool) d := data[:n] - rbmt := NewRefHasher(hasher, count) - exp := rbmt.Hash(d) - got := syncHash(bmt, nil, d) + rbmtobj := NewRefHasher(hasher, count) + expNoMeta := rbmtobj.Hash(d) + h := hasher() + h.Write(ZeroSpan) + h.Write(expNoMeta) + exp := h.Sum(nil) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + sw := NewAsyncHasher(ctx, bmtobj, double, nil) + got := asyncHashRandom(sw, 0, d, wh) if !bytes.Equal(got, exp) { - t.Fatalf("wrong sync hash for datalength %v: expected %x (ref), got %x", n, exp, got) - } - sw := bmt.NewAsyncWriter(double) - got = asyncHashRandom(sw, nil, d, wh) - if !bytes.Equal(got, exp) { - t.Fatalf("wrong async hash for datalength %v: expected %x, got %x", n, exp, got) + t.Fatalf("wrong async hash (asyncpart) for datalength %v: expected %x, got %x", n, exp, got) } } }) @@ -232,7 +241,7 @@ func testHasherReuse(poolsize int, t *testing.T) { bmt := New(pool) for i := 0; i < 100; i++ { - data := testutil.RandomBytes(1, BufferSize) + data := testutil.RandomBytes(1, bufferSize) n := rand.Intn(bmt.Size()) err := testHasherCorrectness(bmt, hasher, data, n, segmentCount) if err != nil { @@ -252,7 +261,7 @@ func TestBMTConcurrentUse(t *testing.T) { for i := 0; i < cycles; i++ { go func() { bmt := New(pool) - data := testutil.RandomBytes(1, BufferSize) + data := testutil.RandomBytes(1, bufferSize) n := rand.Intn(bmt.Size()) errc <- testHasherCorrectness(bmt, hasher, data, n, 128) }() @@ -288,8 +297,12 @@ func TestBMTWriterBuffers(t *testing.T) { bmt := New(pool) data := testutil.RandomBytes(1, n) rbmt := NewRefHasher(hasher, count) - refHash := rbmt.Hash(data) - expHash := syncHash(bmt, nil, data) + refNoMetaHash := rbmt.Hash(data) + h := hasher() + h.Write(ZeroSpan) + h.Write(refNoMetaHash) + refHash := h.Sum(nil) + expHash := syncHash(bmt, 0, data) if !bytes.Equal(expHash, refHash) { t.Fatalf("hash mismatch with reference. expected %x, got %x", refHash, expHash) } @@ -308,6 +321,7 @@ func TestBMTWriterBuffers(t *testing.T) { return fmt.Errorf("incorrect read. expected %v bytes, got %v", buflen, read) } } + bmt.SetSpan(0) hash := bmt.Sum(nil) if !bytes.Equal(hash, expHash) { return fmt.Errorf("hash mismatch. expected %x, got %x", hash, expHash) @@ -346,11 +360,16 @@ func testHasherCorrectness(bmt *Hasher, hasher BaseHasherFunc, d []byte, n, coun if len(d) < n { n = len(d) } - binary.BigEndian.PutUint64(span, uint64(n)) + binary.LittleEndian.PutUint64(span, uint64(n)) data := d[:n] rbmt := NewRefHasher(hasher, count) - exp := sha3hash(span, rbmt.Hash(data)) - got := syncHash(bmt, span, data) + var exp []byte + if n == 0 { + exp = bmt.pool.zerohashes[bmt.pool.Depth] + } else { + exp = sha3hash(span, rbmt.Hash(data)) + } + got := syncHash(bmt, n, data) if !bytes.Equal(got, exp) { return fmt.Errorf("wrong hash: expected %x, got %x", exp, got) } @@ -456,12 +475,14 @@ func benchmarkBMT(t *testing.B, n int) { hasher := sha3.NewLegacyKeccak256 pool := NewTreePool(hasher, segmentCount, PoolSize) bmt := New(pool) + var r []byte t.ReportAllocs() t.ResetTimer() for i := 0; i < t.N; i++ { - syncHash(bmt, nil, data) + r = syncHash(bmt, 0, data) } + benchmarkBMTResult = r } // benchmarks BMT hasher with asynchronous concurrent segment/section writes @@ -469,8 +490,11 @@ func benchmarkBMTAsync(t *testing.B, n int, wh whenHash, double bool) { data := testutil.RandomBytes(1, n) hasher := sha3.NewLegacyKeccak256 pool := NewTreePool(hasher, segmentCount, PoolSize) - bmt := New(pool).NewAsyncWriter(double) - idxs, segments := splitAndShuffle(bmt.SectionSize(), data) + bmth := New(pool) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + bmtobj := NewAsyncHasher(ctx, bmth, double, nil) + idxs, segments := splitAndShuffle(bmtobj.SectionSize(), data) rand.Shuffle(len(idxs), func(i int, j int) { idxs[i], idxs[j] = idxs[j], idxs[i] }) @@ -478,7 +502,7 @@ func benchmarkBMTAsync(t *testing.B, n int, wh whenHash, double bool) { t.ReportAllocs() t.ResetTimer() for i := 0; i < t.N; i++ { - asyncHash(bmt, nil, n, wh, idxs, segments) + asyncHash(bmtobj, 0, n, wh, idxs, segments) } } @@ -498,7 +522,7 @@ func benchmarkPool(t *testing.B, poolsize, n int) { go func() { defer wg.Done() bmt := New(pool) - syncHash(bmt, nil, data) + syncHash(bmt, 0, data) }() } wg.Wait() @@ -519,8 +543,9 @@ func benchmarkRefHasher(t *testing.B, n int) { } // Hash hashes the data and the span using the bmt hasher -func syncHash(h *Hasher, span, data []byte) []byte { - h.ResetWithLength(span) +func syncHash(h *Hasher, spanLength int, data []byte) []byte { + h.Reset() + h.SetSpan(spanLength) h.Write(data) return h.Sum(nil) } @@ -547,23 +572,25 @@ func splitAndShuffle(secsize int, data []byte) (idxs []int, segments [][]byte) { } // splits the input data performs a random shuffle to mock async section writes -func asyncHashRandom(bmt SectionWriter, span []byte, data []byte, wh whenHash) (s []byte) { - idxs, segments := splitAndShuffle(bmt.SectionSize(), data) - return asyncHash(bmt, span, len(data), wh, idxs, segments) +func asyncHashRandom(bmtobj *AsyncHasher, spanLength int, data []byte, wh whenHash) (s []byte) { + idxs, segments := splitAndShuffle(bmtobj.SectionSize(), data) + return asyncHash(bmtobj, spanLength, len(data), wh, idxs, segments) } -// mock for async section writes for BMT SectionWriter +// mock for async section writes for file.SectionWriter // requires a permutation (a random shuffle) of list of all indexes of segments // and writes them in order to the appropriate section // the Sum function is called according to the wh parameter (first, last, random [relative to segment writes]) -func asyncHash(bmt SectionWriter, span []byte, l int, wh whenHash, idxs []int, segments [][]byte) (s []byte) { - bmt.Reset() +func asyncHash(bmtobj *AsyncHasher, spanLength int, l int, wh whenHash, idxs []int, segments [][]byte) (s []byte) { + bmtobj.Reset() if l == 0 { - return bmt.Sum(nil, l, span) + bmtobj.SetSpan(spanLength) + return bmtobj.SumIndexed(nil, l) } c := make(chan []byte, 1) hashf := func() { - c <- bmt.Sum(nil, l, span) + bmtobj.SetSpan(spanLength) + c <- bmtobj.SumIndexed(nil, l) } maxsize := len(idxs) var r int @@ -571,13 +598,57 @@ func asyncHash(bmt SectionWriter, span []byte, l int, wh whenHash, idxs []int, s r = rand.Intn(maxsize) } for i, idx := range idxs { - bmt.Write(idx, segments[idx]) + bmtobj.WriteIndexed(idx, segments[idx]) if (wh == first || wh == random) && i == r { go hashf() } } if wh == last { - return bmt.Sum(nil, l, span) + bmtobj.SetSpan(spanLength) + return bmtobj.SumIndexed(nil, l) } return <-c } + +// TestUseSyncAsOrdinaryHasher verifies that the bmt.Hasher can be used with the hash.Hash interface +func TestUseSyncAsOrdinaryHasher(t *testing.T) { + hasher := sha3.NewLegacyKeccak256 + pool := NewTreePool(hasher, segmentCount, PoolSize) + bmt := New(pool) + bmt.SetSpan(3) + bmt.Write([]byte("foo")) + res := bmt.Sum(nil) + refh := NewRefHasher(hasher, 128) + resh := refh.Hash([]byte("foo")) + hsub := hasher() + span := LengthToSpan(3) + hsub.Write(span) + hsub.Write(resh) + refRes := hsub.Sum(nil) + if !bytes.Equal(res, refRes) { + t.Fatalf("normalhash; expected %x, got %x", refRes, res) + } +} + +// TestUseAsyncAsOrdinaryHasher verifies that the bmt.Hasher can be used with the hash.Hash interface +func TestUseAsyncAsOrdinaryHasher(t *testing.T) { + hasher := sha3.NewLegacyKeccak256 + pool := NewTreePool(hasher, segmentCount, PoolSize) + sbmt := New(pool) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + abmt := NewAsyncHasher(ctx, sbmt, false, nil) + abmt.SetSpan(3) + abmt.Write([]byte("foo")) + res := abmt.Sum(nil) + refh := NewRefHasher(hasher, 128) + resh := refh.Hash([]byte("foo")) + hsub := hasher() + span := LengthToSpan(3) + hsub.Write(span) + hsub.Write(resh) + refRes := hsub.Sum(nil) + if !bytes.Equal(res, refRes) { + t.Fatalf("normalhash; expected %x, got %x", refRes, res) + } +} diff --git a/file/types.go b/file/types.go new file mode 100644 index 0000000000..5ad84fce90 --- /dev/null +++ b/file/types.go @@ -0,0 +1,16 @@ +package file + +import ( + "context" + "hash" +) + +type SectionWriterFunc func(ctx context.Context) SectionWriter + +type SectionWriter interface { + hash.Hash // Write,Sum,Reset,Size,BlockSize + SetWriter(hashFunc SectionWriterFunc) SectionWriter // chain another SectionWriter the current instance + SetSpan(length int) // set data span of chunk + SectionSize() int // section size of this SectionWriter + Branches() int // branch factor of this SectionWriter +} diff --git a/storage/chunker_test.go b/storage/chunker_test.go index fd1af937f2..3e1158d13f 100644 --- a/storage/chunker_test.go +++ b/storage/chunker_test.go @@ -151,7 +151,8 @@ func TestSha3ForCorrectness(t *testing.T) { rawSha3Output := rawSha3.Sum(nil) sha3FromMakeFunc := MakeHashFunc(SHA3Hash)() - sha3FromMakeFunc.ResetWithLength(input[:8]) + sha3FromMakeFunc.Reset() + sha3FromMakeFunc.SetSpanBytes(input[:8]) sha3FromMakeFunc.Write(input[8:]) sha3FromMakeFuncOutput := sha3FromMakeFunc.Sum(nil) diff --git a/storage/common_test.go b/storage/common_test.go index a65a686943..e625cd8091 100644 --- a/storage/common_test.go +++ b/storage/common_test.go @@ -151,7 +151,8 @@ func testStoreCorrect(m ChunkStore, n int, t *testing.T) { } hasher := MakeHashFunc(DefaultHash)() data := chunk.Data() - hasher.ResetWithLength(data[:8]) + hasher.Reset() + hasher.SetSpanBytes(data[:8]) hasher.Write(data[8:]) exp := hasher.Sum(nil) if !bytes.Equal(h, exp) { diff --git a/storage/hasherstore.go b/storage/hasherstore.go index 4890219a15..d81ffba5aa 100644 --- a/storage/hasherstore.go +++ b/storage/hasherstore.go @@ -184,8 +184,9 @@ func (h *hasherStore) startWait(ctx context.Context) { func (h *hasherStore) createHash(chunkData ChunkData) Address { hasher := h.hashFunc() - hasher.ResetWithLength(chunkData[:8]) // 8 bytes of length - hasher.Write(chunkData[8:]) // minus 8 []byte length + hasher.Reset() + hasher.SetSpanBytes(chunkData[:8]) // 8 bytes of length + hasher.Write(chunkData[8:]) // minus 8 []byte length return hasher.Sum(nil) } diff --git a/storage/swarmhasher.go b/storage/swarmhasher.go index fae03f0c72..0cbc12556c 100644 --- a/storage/swarmhasher.go +++ b/storage/swarmhasher.go @@ -28,14 +28,14 @@ const ( type SwarmHash interface { hash.Hash - ResetWithLength([]byte) + SetSpanBytes([]byte) } type HashWithLength struct { hash.Hash } -func (h *HashWithLength) ResetWithLength(length []byte) { +func (h *HashWithLength) SetSpanBytes(length []byte) { h.Reset() h.Write(length) } diff --git a/storage/types.go b/storage/types.go index a4b102a62c..9fa258495d 100644 --- a/storage/types.go +++ b/storage/types.go @@ -93,7 +93,8 @@ func GenerateRandomChunk(dataSize int64) Chunk { sdata := make([]byte, dataSize+8) rand.Read(sdata[8:]) binary.LittleEndian.PutUint64(sdata[:8], uint64(dataSize)) - hasher.ResetWithLength(sdata[:8]) + hasher.Reset() + hasher.SetSpanBytes(sdata[:8]) hasher.Write(sdata[8:]) return NewChunk(hasher.Sum(nil), sdata) } @@ -202,7 +203,8 @@ func (v *ContentAddressValidator) Validate(ch Chunk) bool { } hasher := v.Hasher() - hasher.ResetWithLength(data[:8]) + hasher.Reset() + hasher.SetSpanBytes(data[:8]) hasher.Write(data[8:]) hash := hasher.Sum(nil)