Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

file: Introduce filehasher subcomponent #2116

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions bmt/bmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@ import (
"strings"
"sync"
"sync/atomic"

"github.com/ethersphere/swarm/file"
"github.com/ethersphere/swarm/log"
)

/*
Expand Down Expand Up @@ -290,12 +287,6 @@ func newTree(segmentSize, depth int, hashfunc func() hash.Hash) *tree {
}
}

// SetWriter implements file.SectionWriter
func (h *Hasher) SetWriter(_ file.SectionWriterFunc) file.SectionWriter {
log.Warn("Synchasher does not currently support SectionWriter chaining")
return h
}

// SectionSize implements file.SectionWriter
func (h *Hasher) SectionSize() int {
return h.pool.SegmentSize
Expand Down
8 changes: 8 additions & 0 deletions file/bmt/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"sync"

"github.com/ethersphere/swarm/bmt"
"github.com/ethersphere/swarm/file"
"github.com/ethersphere/swarm/log"
)

// NewAsyncWriter extends Hasher with an interface for concurrent segment.GetSection() writes
Expand Down Expand Up @@ -177,3 +179,9 @@ func (sw *AsyncHasher) SumIndexed(b []byte, length int) (s []byte) {
hsh.Write(s)
return hsh.Sum(b)
}

// SetWriter implements file.SectionWriter
func (sw *AsyncHasher) SetWriter(_ file.SectionWriterFunc) file.SectionWriter {
log.Warn("BMT hasher does not currently support SectionWriter chaining")
return sw
}
209 changes: 209 additions & 0 deletions file/hasher/common_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,18 @@
package hasher

import (
"bytes"
"context"
"encoding/binary"
"hash"
"sync"
"testing"

"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethersphere/swarm/file"
"github.com/ethersphere/swarm/log"
"github.com/ethersphere/swarm/testutil"
"golang.org/x/crypto/sha3"
)

const (
Expand Down Expand Up @@ -64,3 +75,201 @@ var (
func init() {
testutil.Init()
}

var (
dummyHashFunc = func(_ context.Context) file.SectionWriter {
return newDummySectionWriter(chunkSize*branches, sectionSize, sectionSize, branches)
}
)

// simple file.SectionWriter hasher that keeps the data written to it
// for later inspection
// TODO: see if this can be replaced with the fake hasher from storage module
type dummySectionWriter struct {
sectionSize int
digestSize int
branches int
data []byte
digest []byte
size int
span []byte
summed bool
index int
writer hash.Hash
mu sync.Mutex
wg sync.WaitGroup
}

// dummySectionWriter constructor
func newDummySectionWriter(cp int, sectionSize int, digestSize int, branches int) *dummySectionWriter {
log.Trace("creating dummy writer", "sectionsize", sectionSize, "digestsize", digestSize, "branches", branches)
return &dummySectionWriter{
sectionSize: sectionSize,
digestSize: digestSize,
branches: branches,
data: make([]byte, cp),
writer: sha3.NewLegacyKeccak256(),
digest: make([]byte, digestSize),
}
}

// implements file.SectionWriter
func (d *dummySectionWriter) SetWriter(_ file.SectionWriterFunc) file.SectionWriter {
log.Error("dummySectionWriter does not support SectionWriter chaining")
return d
}

// implements file.SectionWriter
func (d *dummySectionWriter) SeekSection(offset int) {
d.index = offset * d.SectionSize()
}

// implements file.SectionWriter
func (d *dummySectionWriter) SetLength(length int) {
d.size = length
}

// implements file.SectionWriter
func (d *dummySectionWriter) SetSpan(length int) {
d.span = make([]byte, 8)
binary.LittleEndian.PutUint64(d.span, uint64(length))
}

// implements file.SectionWriter
func (d *dummySectionWriter) Write(data []byte) (int, error) {
d.mu.Lock()
copy(d.data[d.index:], data)
d.size += len(data)
log.Trace("dummywriter write", "index", d.index, "size", d.size, "threshold", d.sectionSize*d.branches)
if d.isFull() {
d.summed = true
d.mu.Unlock()
d.sum()
} else {
d.mu.Unlock()
}
return len(data), nil
}

// implements file.SectionWriter
func (d *dummySectionWriter) Sum(_ []byte) []byte {
log.Trace("dummy Sumcall", "size", d.size)
d.mu.Lock()
if !d.summed {
d.summed = true
d.mu.Unlock()
d.sum()
} else {
d.mu.Unlock()
}
return d.digest
}

// invokes sum on the underlying writer
func (d *dummySectionWriter) sum() {
d.mu.Lock()
defer d.mu.Unlock()
d.writer.Write(d.span)
log.Trace("dummy sum writing span", "span", d.span)
for i := 0; i < d.size; i += d.writer.Size() {
sectionData := d.data[i : i+d.writer.Size()]
log.Trace("dummy sum write", "i", i/d.writer.Size(), "data", hexutil.Encode(sectionData), "size", d.size)
d.writer.Write(sectionData)
}
copy(d.digest, d.writer.Sum(nil))
log.Trace("dummy sum result", "ref", hexutil.Encode(d.digest))
}

// implements file.SectionWriter
func (d *dummySectionWriter) Reset() {
d.mu.Lock()
defer d.mu.Unlock()
d.data = make([]byte, len(d.data))
d.digest = make([]byte, d.digestSize)
d.size = 0
d.summed = false
d.span = nil
d.writer.Reset()
}

// implements file.SectionWriter
func (d *dummySectionWriter) BlockSize() int {
return d.sectionSize
}

// implements file.SectionWriter
func (d *dummySectionWriter) SectionSize() int {
return d.sectionSize
}

// implements file.SectionWriter
func (d *dummySectionWriter) Size() int {
return d.sectionSize
}

// implements file.SectionWriter
func (d *dummySectionWriter) Branches() int {
return d.branches
}

// returns true if hasher is written to the capacity limit
func (d *dummySectionWriter) isFull() bool {
return d.size == d.sectionSize*d.branches
}

// implements file.SectionWriter
func (d *dummySectionWriter) SumIndexed(b []byte, l int) []byte {
//log.Trace("dummy sum indexed", "d", d.data[:l], "l", l, "b", b, "s", d.span)
d.writer.Write(d.span)
d.writer.Write(d.data[:l])
return d.writer.Sum(b)
}

// implements file.SectionWriter
func (d *dummySectionWriter) WriteIndexed(i int, b []byte) {
//log.Trace("dummy write indexed", "i", i, "b", len(b))
copy(d.data[i*d.sectionSize:], b)
}

// TestDummySectionWriter
func TestDummySectionWriter(t *testing.T) {

w := newDummySectionWriter(chunkSize*2, sectionSize, sectionSize, branches)
w.Reset()

_, data := testutil.SerialData(sectionSize*2, 255, 0)

w.SeekSection(branches)
w.Write(data[:sectionSize])
w.SeekSection(branches + 1)
w.Write(data[sectionSize:])
if !bytes.Equal(w.data[chunkSize:chunkSize+sectionSize*2], data) {
t.Fatalf("Write double pos %d: expected %x, got %x", chunkSize, w.data[chunkSize:chunkSize+sectionSize*2], data)
}

correctDigestHex := "0x52eefd0c37895a8845d4a6cf6c6b56980e448376e55eb45717663ab7b3fc8d53"
w.SetLength(chunkSize * 2)
w.SetSpan(chunkSize * 2)
digest := w.Sum(nil)
digestHex := hexutil.Encode(digest)
if digestHex != correctDigestHex {
t.Fatalf("Digest: 2xsectionSize*1; expected %s, got %s", correctDigestHex, digestHex)
}

w = newDummySectionWriter(chunkSize*2, sectionSize*2, sectionSize*2, branches/2)
w.Reset()
w.SeekSection(branches / 2)
w.Write(data)
if !bytes.Equal(w.data[chunkSize:chunkSize+sectionSize*2], data) {
t.Fatalf("Write double pos %d: expected %x, got %x", chunkSize, w.data[chunkSize:chunkSize+sectionSize*2], data)
}

correctDigestHex += zeroHex
w.SetLength(chunkSize * 2)
w.SetSpan(chunkSize * 2)
digest = w.Sum(nil)
digestHex = hexutil.Encode(digest)
if digestHex != correctDigestHex {
t.Fatalf("Digest 1xsectionSize*2; expected %s, got %s", correctDigestHex, digestHex)
}
}
79 changes: 79 additions & 0 deletions file/hasher/index.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package hasher

import (
"fmt"
"sync"
)

// jobIndex keeps an index of all the existing jobs for a file hashing operation
// sorted by level
//
// it also keeps all the "top hashes", ie hashes on first data section index of every level
// these are needed in case of balanced tree results, since the hashing result would be
// lost otherwise, due to the job not having any intermediate storage of any data
type jobIndex struct {
maxLevels int
jobs []sync.Map
topHashes [][]byte
mu sync.Mutex
}

// jobIndex constructor
func newJobIndex(maxLevels int) *jobIndex {
ji := &jobIndex{
maxLevels: maxLevels,
}
for i := 0; i < maxLevels; i++ {
ji.jobs = append(ji.jobs, sync.Map{})
}
return ji
}

// implements Stringer interface
func (ji *jobIndex) String() string {
return fmt.Sprintf("%p", ji)
}

// Add adds a job to the index at the level and data section index specified in the job
func (ji *jobIndex) Add(jb *job) {
ji.jobs[jb.level].Store(jb.dataSection, jb)
}

// Get retrieves a job from the job index based on the level of the job and its data section index
//
// If a job for the level and section index does not exist this method returns nil
func (ji *jobIndex) Get(lvl int, section int) *job {
jb, ok := ji.jobs[lvl].Load(section)
if !ok {
return nil
}
return jb.(*job)
}

// Delete removes a job from the job index leaving it to be garbage collected when the reference in the main code is relinquished
func (ji *jobIndex) Delete(jb *job) {
ji.jobs[jb.level].Delete(jb.dataSection)
}

// AddTopHash should be called by a job when a hash is written to the first index of a level
//
// Since the job doesn't store any data written to it (just passing it through to the underlying writer) this is needed for the edge case of balanced trees
func (ji *jobIndex) AddTopHash(ref []byte) {
ji.mu.Lock()
defer ji.mu.Unlock()
ji.topHashes = append(ji.topHashes, ref)
}

// GetJobHash gets the current top hash for a particular level set by AddTopHash
func (ji *jobIndex) GetTopHash(lvl int) []byte {
ji.mu.Lock()
defer ji.mu.Unlock()
return ji.topHashes[lvl-1]
}

// GetTopHashLevel gets the level of the current topmost hash
func (ji *jobIndex) GetTopHashLevel() int {
ji.mu.Lock()
defer ji.mu.Unlock()
return len(ji.topHashes)
}
Loading