From 21fddb0070fa01ab286823785db45b511da07aa5 Mon Sep 17 00:00:00 2001 From: nolash Date: Mon, 24 Feb 2020 09:35:18 +0100 Subject: [PATCH 1/9] file: WIP halfway adding underlying job component --- file/hasher/job.go | 327 ++++++++++++++++ file/hasher/job_test.go | 654 +++++++++++++++++++++++++++++++ file/testutillocal/cache.go | 106 +++++ file/testutillocal/cache_test.go | 53 +++ file/testutillocal/hash.go | 24 ++ 5 files changed, 1164 insertions(+) create mode 100644 file/hasher/job.go create mode 100644 file/hasher/job_test.go create mode 100644 file/testutillocal/cache.go create mode 100644 file/testutillocal/cache_test.go create mode 100644 file/testutillocal/hash.go diff --git a/file/hasher/job.go b/file/hasher/job.go new file mode 100644 index 0000000000..66252f952a --- /dev/null +++ b/file/hasher/job.go @@ -0,0 +1,327 @@ +package hasher + +import ( + "fmt" + "sync" + "sync/atomic" + + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethersphere/swarm/file" + "github.com/ethersphere/swarm/log" +) + +// necessary metadata across asynchronous input +type jobUnit struct { + index int + data []byte + count int +} + +// encapsulates one single intermediate chunk to be hashed +type job struct { + target *target + params *treeParams + index *jobIndex + + level int // level in tree + dataSection int // data section index + cursorSection int32 // next write position in job + endCount int32 // number of writes to be written to this job (0 means write to capacity) + lastSectionSize int // data size on the last data section write + firstSectionData []byte // store first section of data written to solve the dangling chunk edge case + + writeC chan jobUnit + writer file.SectionWriter // underlying data processor + doneC chan struct{} // pointer to target doneC channel, set to nil in process() when closed + + mu sync.Mutex +} + +func newJob(params *treeParams, tgt *target, jobIndex *jobIndex, lvl int, dataSection int) *job { + jb := &job{ + params: params, + index: jobIndex, + level: lvl, + dataSection: dataSection, + writeC: make(chan jobUnit), + target: tgt, + doneC: nil, + } + if jb.index == nil { + jb.index = newJobIndex(9) + } + targetLevel := tgt.Level() + if targetLevel == 0 { + log.Trace("target not set", "level", lvl) + jb.doneC = tgt.doneC + + } else { + targetCount := tgt.Count() + jb.endCount = int32(jb.targetCountToEndCount(targetCount)) + } + log.Trace("target count", "level", lvl, "count", tgt.Count()) + + jb.index.Add(jb) + return jb +} + +func (jb *job) start() { + jb.writer = jb.params.GetWriter() + go jb.process() +} + +// implements Stringer interface +func (jb *job) String() string { + return fmt.Sprintf("job: l:%d,s:%d", jb.level, jb.dataSection) +} + +// atomically increments the write counter of the job +func (jb *job) inc() int { + return int(atomic.AddInt32(&jb.cursorSection, 1)) +} + +// atomically returns the write counter of the job +func (jb *job) count() int { + return int(atomic.LoadInt32(&jb.cursorSection)) +} + +// size returns the byte size of the span the job represents +// if job is last index in a level and writes have been finalized, it will return the target size +// otherwise, regardless of job index, it will return the size according to the current write count +// TODO: returning expected size in one case and actual size in another can lead to confusion +func (jb *job) size() int { + jb.mu.Lock() + count := int(jb.cursorSection) //jb.count() + endCount := int(jb.endCount) //int(atomic.LoadInt32(&jb.endCount)) + jb.mu.Unlock() + if endCount%jb.params.Branches == 0 { + return count * jb.params.SectionSize * jb.params.Spans[jb.level] + } + log.Trace("size", "sections", jb.target.sections, "size", jb.target.Size(), "endcount", endCount, "level", jb.level) + return jb.target.Size() % (jb.params.Spans[jb.level] * jb.params.SectionSize * jb.params.Branches) +} + +// add data to job +// does no checking for data length or index validity +// TODO: rename index param not to confuse with index object +func (jb *job) write(index int, data []byte) { + + jb.inc() + + // if a write is received at the first datasection of a level we need to store this hash + // in case of a balanced tree and we need to send it to resultC later + // at the time of hasing of a balanced tree we have no way of knowing for sure whether + // that is the end of the job or not + if jb.dataSection == 0 && index == 0 { + topHashLevel := jb.index.GetTopHashLevel() + if topHashLevel < jb.level { + log.Trace("have tophash", "level", jb.level, "ref", hexutil.Encode(data)) + jb.index.AddTopHash(data) + } + } + jb.writeC <- jobUnit{ + index: index, + data: data, + } +} + +// runs in loop until: +// - sectionSize number of job writes have occurred (one full chunk) +// - data write is finalized and targetcount for this chunk was already reached +// - data write is finalized and targetcount is reached on a subsequent job write +func (jb *job) process() { + + log.Trace("starting job process", "level", jb.level, "sec", jb.dataSection) + + var processCount int + defer jb.destroy() + + // is set when data write is finished, AND + // the final data section falls within the span of this job + // if not, loop will only exit on Branches writes +OUTER: + for { + select { + + // enter here if new data is written to the job + // TODO: Error if calculated write count exceed chunk + case entry := <-jb.writeC: + + // split the contents to fit the underlying SectionWriter + entrySections := len(entry.data) / jb.writer.SectionSize() + jb.mu.Lock() + endCount := int(jb.endCount) + oldProcessCount := processCount + processCount += entrySections + jb.mu.Unlock() + if entry.index == 0 { + jb.firstSectionData = entry.data + } + log.Trace("job entry", "datasection", jb.dataSection, "num sections", entrySections, "level", jb.level, "processCount", oldProcessCount, "endcount", endCount, "index", entry.index, "data", hexutil.Encode(entry.data)) + + // TODO: this write is superfluous when the received data is the root hash + var offset int + for i := 0; i < entrySections; i++ { + idx := entry.index + i + data := entry.data[offset : offset+jb.writer.SectionSize()] + log.Trace("job write", "datasection", jb.dataSection, "level", jb.level, "processCount", oldProcessCount+i, "endcount", endCount, "index", entry.index+i, "data", hexutil.Encode(data)) + jb.writer.SeekSection(idx) + jb.writer.Write(data) + offset += jb.writer.SectionSize() + } + + // since newcount is incremented above it can only equal endcount if this has been set in the case below, + // which means data write has been completed + // otherwise if we reached the chunk limit we also continue to hashing + if processCount == endCount { + log.Trace("quitting writec - endcount", "c", processCount, "level", jb.level) + break OUTER + } + if processCount == jb.writer.Branches() { + log.Trace("quitting writec - branches") + break OUTER + } + + // enter here if data writes have been completed + // TODO: this case currently executes for all cycles after data write is complete for which writes to this job do not happen. perhaps it can be improved + case <-jb.doneC: + jb.mu.Lock() + jb.doneC = nil + log.Trace("doneloop", "level", jb.level, "processCount", processCount, "endcount", jb.endCount) + //count := jb.count() + + // if the target count falls within the span of this job + // set the endcount so we know we have to do extra calculations for + // determining span in case of unbalanced tree + targetCount := jb.target.Count() + jb.endCount = int32(jb.targetCountToEndCount(targetCount)) + log.Trace("doneloop done", "level", jb.level, "targetcount", jb.target.Count(), "endcount", jb.endCount) + + // if we have reached the end count for this chunk, we proceed to hashing + // this case is important when write to the level happen after this goroutine + // registers that data writes have been completed + if processCount > 0 && processCount == int(jb.endCount) { + log.Trace("quitting donec", "level", jb.level, "processcount", processCount) + jb.mu.Unlock() + break OUTER + } + jb.mu.Unlock() + } + } + + jb.sum() +} + +func (jb *job) sum() { + + targetLevel := jb.target.Level() + if targetLevel == jb.level { + jb.target.resultC <- jb.index.GetTopHash(jb.level) + return + } + + // get the size of the span and execute the hash digest of the content + size := jb.size() + //span := bmt.LengthToSpan(size) + refSize := jb.count() * jb.params.SectionSize + jb.writer.SetLength(refSize) + jb.writer.SetSpan(size) + log.Trace("job sum", "count", jb.count(), "refsize", refSize, "size", size, "datasection", jb.dataSection, "level", jb.level, "targetlevel", targetLevel, "endcount", jb.endCount) + ref := jb.writer.Sum(nil) + + // endCount > 0 means this is the last chunk on the level + // the hash from the level below the target level will be the result + belowRootLevel := targetLevel - 1 + if jb.endCount > 0 && jb.level == belowRootLevel { + jb.target.resultC <- ref + return + } + + // retrieve the parent and the corresponding section in it to write to + parent := jb.parent() + log.Trace("have parent", "level", jb.level, "jb p", fmt.Sprintf("%p", jb), "jbp p", fmt.Sprintf("%p", parent)) + nextLevel := jb.level + 1 + parentSection := dataSectionToLevelSection(jb.params, nextLevel, jb.dataSection) + + // in the event that we have a balanced tree and a chunk with single reference below the target level + // we move the single reference up to the penultimate level + if jb.endCount == 1 { + ref = jb.firstSectionData + for parent.level < belowRootLevel { + log.Trace("parent write skip", "level", parent.level) + oldParent := parent + parent = parent.parent() + oldParent.destroy() + nextLevel += 1 + parentSection = dataSectionToLevelSection(jb.params, nextLevel, jb.dataSection) + } + } + parent.write(parentSection, ref) + +} + +// determine whether the given data section count falls within the span of the current job +func (jb *job) targetWithinJob(targetSection int) (int, bool) { + var endIndex int + var ok bool + + // span one level above equals the data size of 128 units of one section on this level + // using the span table saves one multiplication + //dataBoundary := dataSectionToLevelBoundary(jb.params, jb.level, jb.dataSection) + dataBoundary := dataSectionToLevelBoundary(jb.params, jb.level, jb.dataSection) + upperLimit := dataBoundary + jb.params.Spans[jb.level+1] + + // the data section is the data section index where the span of this job starts + if targetSection >= dataBoundary && targetSection < upperLimit { + + // data section index must be divided by corresponding section size on the job's level + // then wrap on branch period to find the correct section within this job + endIndex = (targetSection / jb.params.Spans[jb.level]) % jb.params.Branches + + ok = true + } + return endIndex, ok +} + +// if last data index falls within the span, return the appropriate end count for the level +// otherwise return 0 (which means job write until limit) +func (jb *job) targetCountToEndCount(targetCount int) int { + endIndex, ok := jb.targetWithinJob(targetCount - 1) + if !ok { + return 0 + } + return endIndex + 1 +} + +// returns the parent job of the receiver job +// a new parent job is created if none exists for the slot +func (jb *job) parent() *job { + jb.index.mu.Lock() + defer jb.index.mu.Unlock() + newLevel := jb.level + 1 + // Truncate to even quotient which is the actual logarithmic boundary of the data section under the span + newDataSection := dataSectionToLevelBoundary(jb.params, jb.level+1, jb.dataSection) + parent := jb.index.Get(newLevel, newDataSection) + if parent != nil { + return parent + } + jbp := newJob(jb.params, jb.target, jb.index, jb.level+1, newDataSection) + jbp.start() + return jbp +} + +// Next creates the job for the next data section span on the same level as the receiver job +// this is only meant to be called once for each job, consecutive calls will overwrite index with new empty job +func (jb *job) Next() *job { + jbn := newJob(jb.params, jb.target, jb.index, jb.level, jb.dataSection+jb.params.Spans[jb.level+1]) + jbn.start() + return jbn +} + +// cleans up the job; reset hasher and remove pointer to job from index +func (jb *job) destroy() { + if jb.writer != nil { + jb.params.PutWriter(jb.writer) + } + jb.index.Delete(jb) +} diff --git a/file/hasher/job_test.go b/file/hasher/job_test.go new file mode 100644 index 0000000000..e4d1084260 --- /dev/null +++ b/file/hasher/job_test.go @@ -0,0 +1,654 @@ +package hasher + +import ( + "context" + "fmt" + "math/rand" + "strconv" + "strings" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethersphere/swarm/bmt" + "github.com/ethersphere/swarm/file" + "github.com/ethersphere/swarm/file/testutillocal" + "github.com/ethersphere/swarm/log" + "github.com/ethersphere/swarm/testutil" + "golang.org/x/crypto/sha3" +) + +const ( + zeroHex = "0000000000000000000000000000000000000000000000000000000000000000" +) + +// TestTreeParams verifies that params are set correctly by the param constructor +func TestTreeParams(t *testing.T) { + + params := newTreeParams(dummyHashFunc) + + if params.SectionSize != 32 { + t.Fatalf("section: expected %d, got %d", sectionSize, params.SectionSize) + } + + if params.Branches != 128 { + t.Fatalf("branches: expected %d, got %d", branches, params.SectionSize) + } + + if params.Spans[2] != branches*branches { + t.Fatalf("span %d: expected %d, got %d", 2, branches*branches, params.Spans[1]) + } + +} + +// TestTarget verifies that params are set correctly by the target constructor +func TestTarget(t *testing.T) { + + tgt := newTarget() + tgt.Set(32, 1, 2) + + if tgt.size != 32 { + t.Fatalf("target size expected %d, got %d", 32, tgt.size) + } + + if tgt.sections != 1 { + t.Fatalf("target sections expected %d, got %d", 1, tgt.sections) + } + + if tgt.level != 2 { + t.Fatalf("target level expected %d, got %d", 2, tgt.level) + } +} + +// TestJobTargetWithinJobDefault verifies the calculation of whether a final data section index +// falls within a particular job's span without regard to differing SectionSize +func TestJobTargetWithinDefault(t *testing.T) { + params := newTreeParams(dummyHashFunc) + index := newJobIndex(9) + tgt := newTarget() + + jb := newJob(params, tgt, index, 1, branches*branches) + defer jb.destroy() + + finalSize := chunkSize*branches + chunkSize*2 + finalCount := dataSizeToSectionCount(finalSize, sectionSize) + log.Trace("within test", "size", finalSize, "count", finalCount) + c, ok := jb.targetWithinJob(finalCount - 1) + if !ok { + t.Fatalf("target %d within %d: expected true", finalCount, jb.level) + } + if c != 1 { + t.Fatalf("target %d within %d: expected %d, got %d", finalCount, jb.level, 2, c) + } +} + +// TestJobTargetWithinDifferentSections does the same as TestTargetWithinJobDefault but +// with SectionSize/Branches settings differeing between client target and underlying writer +func TestJobTargetWithinDifferentSections(t *testing.T) { + dummyHashDoubleFunc := func(_ context.Context) file.SectionWriter { + return newDummySectionWriter(chunkSize, sectionSize*2, sectionSize*2, branches/2) + } + params := newTreeParams(dummyHashDoubleFunc) + index := newJobIndex(9) + tgt := newTarget() + + //jb := newJob(params, tgt, index, 1, branches*branches) + jb := newJob(params, tgt, index, 1, 0) + defer jb.destroy() + + //finalSize := chunkSize*branches + chunkSize*2 + finalSize := chunkSize + finalCount := dataSizeToSectionCount(finalSize, sectionSize) + log.Trace("within test", "size", finalSize, "count", finalCount) + c, ok := jb.targetWithinJob(finalCount - 1) + if !ok { + t.Fatalf("target %d within %d: expected true", finalCount, jb.level) + } + if c != 1 { + t.Fatalf("target %d within %d: expected %d, got %d", finalCount, jb.level, 1, c) + } +} + +// TestNewJob verifies that a job is initialized with the correct values +func TestNewJob(t *testing.T) { + + params := newTreeParams(dummyHashFunc) + params.Debug = true + + tgt := newTarget() + jb := newJob(params, tgt, nil, 1, branches*branches+1) + if jb.level != 1 { + t.Fatalf("job level expected 1, got %d", jb.level) + } + if jb.dataSection != branches*branches+1 { + t.Fatalf("datasectionindex: expected %d, got %d", branches+1, jb.dataSection) + } + tgt.Set(0, 0, 0) + jb.destroy() +} + +// TestJobSize verifies the data size calculation used for calculating the span of data +// under a particular level reference +// it tests both a balanced and an unbalanced tree +func TestJobSize(t *testing.T) { + params := newTreeParams(dummyHashFunc) + params.Debug = true + index := newJobIndex(9) + + tgt := newTarget() + jb := newJob(params, tgt, index, 3, 0) + jb.cursorSection = 1 + jb.endCount = 1 + size := chunkSize*branches + chunkSize + sections := dataSizeToSectionIndex(size, sectionSize) + 1 + tgt.Set(size, sections, 3) + jobSize := jb.size() + if jobSize != size { + t.Fatalf("job size: expected %d, got %d", size, jobSize) + } + jb.destroy() + + tgt = newTarget() + jb = newJob(params, tgt, index, 3, 0) + jb.cursorSection = 1 + jb.endCount = 1 + size = chunkSize * branches * branches + sections = dataSizeToSectionIndex(size, sectionSize) + 1 + tgt.Set(size, sections, 3) + jobSize = jb.size() + if jobSize != size { + t.Fatalf("job size: expected %d, got %d", size, jobSize) + } + jb.destroy() + +} + +// TestJobTarget verifies that the underlying calculation for determining whether +// a data section index is within a level's span is correct +func TestJobTarget(t *testing.T) { + tgt := newTarget() + params := newTreeParams(dummyHashFunc) + params.Debug = true + index := newJobIndex(9) + + jb := newJob(params, tgt, index, 1, branches*branches) + + // this is less than chunksize * 128 + // it will not be in the job span + finalSize := chunkSize + sectionSize + 1 + finalSection := dataSizeToSectionIndex(finalSize, sectionSize) + c, ok := jb.targetWithinJob(finalSection) + if ok { + t.Fatalf("targetwithinjob: expected false") + } + jb.destroy() + + // chunkSize*128+chunkSize*2 (532480) is within chunksize*128 (524288) and chunksize*128*2 (1048576) + // it will be within the job span + finalSize = chunkSize*branches + chunkSize*2 + finalSection = dataSizeToSectionIndex(finalSize, sectionSize) + c, ok = jb.targetWithinJob(finalSection) + if !ok { + t.Fatalf("targetwithinjob section %d: expected true", branches*branches) + } + if c != 1 { + t.Fatalf("targetwithinjob section %d: expected %d, got %d", branches*branches, 1, c) + } + c = jb.targetCountToEndCount(finalSection + 1) + if c != 2 { + t.Fatalf("targetcounttoendcount section %d: expected %d, got %d", branches*branches, 2, c) + } + jb.destroy() +} + +// TestJobIndex verifies that the job constructor adds the job to the job index +// and removes it on job destruction +func TestJobIndex(t *testing.T) { + tgt := newTarget() + params := newTreeParams(dummyHashFunc) + + jb := newJob(params, tgt, nil, 1, branches) + jobIndex := jb.index + jbGot := jobIndex.Get(1, branches) + if jb != jbGot { + t.Fatalf("jbIndex get: expect %p, got %p", jb, jbGot) + } + jbGot.destroy() + if jobIndex.Get(1, branches) != nil { + t.Fatalf("jbIndex delete: expected nil") + } +} + +// TestJobGetNext verifies that the new job constructed through the job.Next() method +// has the correct level and data section index +func TestJobGetNext(t *testing.T) { + tgt := newTarget() + params := newTreeParams(dummyHashFunc) + params.Debug = true + + jb := newJob(params, tgt, nil, 1, branches*branches) + jbn := jb.Next() + if jbn == nil { + t.Fatalf("parent: nil") + } + if jbn.level != 1 { + t.Fatalf("nextjob level: expected %d, got %d", 2, jbn.level) + } + if jbn.dataSection != jb.dataSection+branches*branches { + t.Fatalf("nextjob section: expected %d, got %d", jb.dataSection+branches*branches, jbn.dataSection) + } +} + +// TestJobWriteTwoAndFinish writes two references to a job and sets the job target to two chunks +// it verifies that the job count after the writes is two, and the hash is correct +func TestJobWriteTwoAndFinish(t *testing.T) { + + tgt := newTarget() + params := newTreeParams(dummyHashFunc) + + jb := newJob(params, tgt, nil, 1, 0) + jb.start() + _, data := testutil.SerialData(sectionSize*2, 255, 0) + jb.write(0, data[:sectionSize]) + jb.write(1, data[sectionSize:]) + + finalSize := chunkSize * 2 + finalSection := dataSizeToSectionIndex(finalSize, sectionSize) + tgt.Set(finalSize, finalSection-1, 2) + + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*199) + defer cancel() + select { + case ref := <-tgt.Done(): + correctRefHex := "0xe1553e1a3a6b73f96e6fc48318895e401e7db2972962ee934633fa8b3eaaf78b" + refHex := hexutil.Encode(ref) + if refHex != correctRefHex { + t.Fatalf("job write two and finish: expected %s, got %s", correctRefHex, refHex) + } + case <-ctx.Done(): + t.Fatalf("timeout: %v", ctx.Err()) + } + + if jb.count() != 2 { + t.Fatalf("jobcount: expected %d, got %d", 2, jb.count()) + } +} + +// TestJobGetParent verifies that the parent returned from two jobs' parent() calls +// that are within the same span as the parent chunk of references is the same +// BUG: not guaranteed to return same parent when run with eg -count 100 +func TestJobGetParent(t *testing.T) { + tgt := newTarget() + params := newTreeParams(dummyHashFunc) + + jb := newJob(params, tgt, nil, 1, branches*branches) + jb.start() + jbp := jb.parent() + if jbp == nil { + t.Fatalf("parent: nil") + } + if jbp.level != 2 { + t.Fatalf("parent level: expected %d, got %d", 2, jbp.level) + } + if jbp.dataSection != 0 { + t.Fatalf("parent data section: expected %d, got %d", 0, jbp.dataSection) + } + jbGot := jb.index.Get(2, 0) + if jbGot == nil { + t.Fatalf("index get: nil") + } + + jbNext := jb.Next() + jbpNext := jbNext.parent() + if jbpNext != jbp { + t.Fatalf("next parent: expected %p, got %p", jbp, jbpNext) + } +} + +// TestJobWriteParentSection verifies that a data write translates to a write +// in the correct section of its parent +func TestJobWriteParentSection(t *testing.T) { + tgt := newTarget() + params := newTreeParams(dummyHashFunc) + index := newJobIndex(9) + + jb := newJob(params, tgt, index, 1, 0) + jbn := jb.Next() + _, data := testutil.SerialData(sectionSize*2, 255, 0) + jbn.write(0, data[:sectionSize]) + jbn.write(1, data[sectionSize:]) + + finalSize := chunkSize*branches + chunkSize*2 + finalSection := dataSizeToSectionIndex(finalSize, sectionSize) + tgt.Set(finalSize, finalSection, 3) + + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) + defer cancel() + select { + case <-tgt.Done(): + t.Fatalf("unexpected done") + case <-ctx.Done(): + } + jbnp := jbn.parent() + if jbnp.count() != 1 { + t.Fatalf("parent count: expected %d, got %d", 1, jbnp.count()) + } + correctRefHex := "0xe1553e1a3a6b73f96e6fc48318895e401e7db2972962ee934633fa8b3eaaf78b" + + // extract data in section 2 from the writer + // TODO: overload writer to provide a get method to extract data to improve clarity + w := jbnp.writer.(*dummySectionWriter) + w.mu.Lock() + parentRef := w.data[32:64] + w.mu.Unlock() + parentRefHex := hexutil.Encode(parentRef) + if parentRefHex != correctRefHex { + t.Fatalf("parent data: expected %s, got %s", correctRefHex, parentRefHex) + } +} + +// TestJobWriteFull verifies the hashing result of the write of a balanced tree +// where the simulated tree is chunkSize*branches worth of data +func TestJobWriteFull(t *testing.T) { + + tgt := newTarget() + params := newTreeParams(dummyHashFunc) + + jb := newJob(params, tgt, nil, 1, 0) + jb.start() + _, data := testutil.SerialData(chunkSize, 255, 0) + for i := 0; i < chunkSize; i += sectionSize { + jb.write(i/sectionSize, data[i:i+sectionSize]) + } + + tgt.Set(chunkSize, branches, 2) + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) + defer cancel() + select { + case ref := <-tgt.Done(): + correctRefHex := "0x8ace4673563b86281778b943aa60481fc4ede9f238dd98f1b3a5df4cb54ee79b" + refHex := hexutil.Encode(ref) + if refHex != correctRefHex { + t.Fatalf("job write full: expected %s, got %s", correctRefHex, refHex) + } + case <-ctx.Done(): + t.Fatalf("timeout: %v", ctx.Err()) + } + if jb.count() != branches { + t.Fatalf("jobcount: expected %d, got %d", 32, jb.count()) + } +} + +// TestJobWriteSpan uses the bmt asynchronous hasher +// it verifies that a result can be attained at chunkSize+sectionSize*2 references +// which translates to chunkSize*branches+chunkSize*2 bytes worth of data +func TestJobWriteSpan(t *testing.T) { + + tgt := newTarget() + hashFunc := testutillocal.NewBMTHasherFunc(0) + params := newTreeParams(hashFunc) + + jb := newJob(params, tgt, nil, 1, 0) + jb.start() + _, data := testutil.SerialData(chunkSize+sectionSize*2, 255, 0) + + for i := 0; i < chunkSize; i += sectionSize { + jb.write(i/sectionSize, data[i:i+sectionSize]) + } + jbn := jb.Next() + jbn.write(0, data[chunkSize:chunkSize+sectionSize]) + jbn.write(1, data[chunkSize+sectionSize:]) + finalSize := chunkSize*branches + chunkSize*2 + finalSection := dataSizeToSectionIndex(finalSize, sectionSize) + tgt.Set(finalSize, finalSection, 3) + + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*1000) + defer cancel() + select { + case ref := <-tgt.Done(): + // TODO: double check that this hash if correct!! + refCorrectHex := "0xee56134cab34a5a612648dcc22d88b7cb543081bd144906dfc4fa93802c9addf" + refHex := hexutil.Encode(ref) + if refHex != refCorrectHex { + t.Fatalf("writespan sequential: expected %s, got %s", refCorrectHex, refHex) + } + case <-ctx.Done(): + t.Fatalf("timeout: %v", ctx.Err()) + } + + sz := jb.size() + if sz != chunkSize*branches { + t.Fatalf("job 1 size: expected %d, got %d", chunkSize, sz) + } + + sz = jbn.size() + if sz != chunkSize*2 { + t.Fatalf("job 2 size: expected %d, got %d", sectionSize, sz) + } +} + +// TestJobWriteSpanShuffle does the same as TestJobWriteSpan but +// shuffles the indices of the first chunk write +// verifying that sequential use of the underlying hasher is not required +func TestJobWriteSpanShuffle(t *testing.T) { + + tgt := newTarget() + hashFunc := testutillocal.NewBMTHasherFunc(0) + params := newTreeParams(hashFunc) + + jb := newJob(params, tgt, nil, 1, 0) + jb.start() + _, data := testutil.SerialData(chunkSize+sectionSize*2, 255, 0) + + var idxs []int + for i := 0; i < branches; i++ { + idxs = append(idxs, i) + } + rand.Shuffle(branches, func(i int, j int) { + idxs[i], idxs[j] = idxs[j], idxs[i] + }) + for _, idx := range idxs { + jb.write(idx, data[idx*sectionSize:idx*sectionSize+sectionSize]) + } + + jbn := jb.Next() + jbn.write(0, data[chunkSize:chunkSize+sectionSize]) + jbn.write(1, data[chunkSize+sectionSize:]) + finalSize := chunkSize*branches + chunkSize*2 + finalSection := dataSizeToSectionIndex(finalSize, sectionSize) + tgt.Set(finalSize, finalSection, 3) + + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100) + defer cancel() + select { + case ref := <-tgt.Done(): + refCorrectHex := "0xee56134cab34a5a612648dcc22d88b7cb543081bd144906dfc4fa93802c9addf" + refHex := hexutil.Encode(ref) + jbparent := jb.parent() + jbnparent := jbn.parent() + log.Info("succeeding", "jb count", jb.count(), "jbn count", jbn.count(), "jb parent count", jbparent.count(), "jbn parent count", jbnparent.count()) + if refHex != refCorrectHex { + t.Fatalf("writespan sequential: expected %s, got %s", refCorrectHex, refHex) + } + case <-ctx.Done(): + + jbparent := jb.parent() + jbnparent := jbn.parent() + log.Error("failing", "jb count", jb.count(), "jbn count", jbn.count(), "jb parent count", jbparent.count(), "jbn parent count", jbnparent.count(), "jb parent p", fmt.Sprintf("%p", jbparent), "jbn parent p", fmt.Sprintf("%p", jbnparent)) + t.Fatalf("timeout: %v", ctx.Err()) + } + + sz := jb.size() + if sz != chunkSize*branches { + t.Fatalf("job size: expected %d, got %d", chunkSize*branches, sz) + } + + sz = jbn.size() + if sz != chunkSize*2 { + t.Fatalf("job size: expected %d, got %d", chunkSize*branches, sz) + } +} + +func TestJobWriteDoubleSection(t *testing.T) { + writeSize := sectionSize * 2 + dummyHashDoubleFunc := func(_ context.Context) file.SectionWriter { + return newDummySectionWriter(chunkSize, sectionSize*2, sectionSize*2, branches/2) + } + params := newTreeParams(dummyHashDoubleFunc) + + tgt := newTarget() + jb := newJob(params, tgt, nil, 1, 0) + jb.start() + _, data := testutil.SerialData(chunkSize, 255, 0) + + for i := 0; i < chunkSize; i += writeSize { + jb.write(i/writeSize, data[i:i+writeSize]) + } + tgt.Set(chunkSize, branches/2-1, 2) + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) + defer cancel() + select { + case refLong := <-tgt.Done(): + refLongHex := hexutil.Encode(refLong) + correctRefLongHex := "0x8ace4673563b86281778b943aa60481fc4ede9f238dd98f1b3a5df4cb54ee79b" + zeroHex + if refLongHex != correctRefLongHex { + t.Fatalf("section long: expected %s, got %s", correctRefLongHex, refLongHex) + } + case <-ctx.Done(): + t.Fatalf("timeout: %v", ctx.Err()) + } + +} + +// TestVectors executes the barebones functionality of the hasher +// and verifies against source of truth results generated from the reference hasher +// for the same data +// TODO: vet dynamically against the referencefilehasher instead of expect vector +func TestJobVector(t *testing.T) { + poolSync := bmt.NewTreePool(sha3.NewLegacyKeccak256, branches, bmt.PoolSize) + dataHash := bmt.New(poolSync) + hashFunc := testutillocal.NewBMTHasherFunc(0) + params := newTreeParams(hashFunc) + var mismatch int + + for i := start; i < end; i++ { + tgt := newTarget() + dataLength := dataLengths[i] + _, data := testutil.SerialData(dataLength, 255, 0) + jb := newJob(params, tgt, nil, 1, 0) + jb.start() + count := 0 + log.Info("test vector", "length", dataLength) + for i := 0; i < dataLength; i += chunkSize { + ie := i + chunkSize + if ie > dataLength { + ie = dataLength + } + writeSize := ie - i + dataHash.Reset() + dataHash.SetLength(writeSize) + c, err := dataHash.Write(data[i:ie]) + if err != nil { + jb.destroy() + t.Fatalf("data ref fail: %v", err) + } + if c != ie-i { + jb.destroy() + t.Fatalf("data ref short write: expect %d, got %d", ie-i, c) + } + ref := dataHash.Sum(nil) + log.Debug("data ref", "i", i, "ie", ie, "data", hexutil.Encode(ref)) + jb.write(count, ref) + count += 1 + if ie%(chunkSize*branches) == 0 { + jb = jb.Next() + count = 0 + } + } + dataSections := dataSizeToSectionIndex(dataLength, params.SectionSize) + tgt.Set(dataLength, dataSections, getLevelsFromLength(dataLength, params.SectionSize, params.Branches)) + eq := true + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*1000) + defer cancel() + select { + case ref := <-tgt.Done(): + refCorrectHex := "0x" + expected[i] + refHex := hexutil.Encode(ref) + if refHex != refCorrectHex { + mismatch++ + eq = false + } + t.Logf("[%7d+%4d]\t%v\tref: %x\texpect: %s", dataLength/chunkSize, dataLength%chunkSize, eq, ref, expected[i]) + case <-ctx.Done(): + t.Fatalf("timeout: %v", ctx.Err()) + } + + } + if mismatch > 0 { + t.Fatalf("mismatches: %d/%d", mismatch, end-start) + } +} + +// BenchmarkVector generates benchmarks that are comparable to the pyramid hasher +func BenchmarkJob(b *testing.B) { + for i := start; i < end; i++ { + b.Run(fmt.Sprintf("%d/%d", i, dataLengths[i]), benchmarkJob) + } +} + +func benchmarkJob(b *testing.B) { + params := strings.Split(b.Name(), "/") + dataLengthParam, err := strconv.ParseInt(params[2], 10, 64) + if err != nil { + b.Fatal(err) + } + dataLength := int(dataLengthParam) + + poolSync := bmt.NewTreePool(sha3.NewLegacyKeccak256, branches, bmt.PoolSize) + dataHash := bmt.New(poolSync) + hashFunc := testutillocal.NewBMTHasherFunc(0) + treeParams := newTreeParams(hashFunc) + _, data := testutil.SerialData(dataLength, 255, 0) + + for j := 0; j < b.N; j++ { + tgt := newTarget() + jb := newJob(treeParams, tgt, nil, 1, 0) + jb.start() + count := 0 + //log.Info("test vector", "length", dataLength) + for i := 0; i < dataLength; i += chunkSize { + ie := i + chunkSize + if ie > dataLength { + ie = dataLength + } + writeSize := ie - i + dataHash.Reset() + dataHash.SetLength(writeSize) + c, err := dataHash.Write(data[i:ie]) + if err != nil { + jb.destroy() + b.Fatalf("data ref fail: %v", err) + } + if c != ie-i { + jb.destroy() + b.Fatalf("data ref short write: expect %d, got %d", ie-i, c) + } + ref := dataHash.Sum(nil) + jb.write(count, ref) + count += 1 + if ie%(chunkSize*branches) == 0 { + jb = jb.Next() + count = 0 + } + } + dataSections := dataSizeToSectionIndex(dataLength, treeParams.SectionSize) + tgt.Set(dataLength, dataSections, getLevelsFromLength(dataLength, treeParams.SectionSize, treeParams.Branches)) + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*1000) + defer cancel() + select { + case <-tgt.Done(): + case <-ctx.Done(): + b.Fatalf("timeout: %v", ctx.Err()) + } + } +} diff --git a/file/testutillocal/cache.go b/file/testutillocal/cache.go new file mode 100644 index 0000000000..5fe94295fe --- /dev/null +++ b/file/testutillocal/cache.go @@ -0,0 +1,106 @@ +package testutillocal + +import ( + "context" + + "github.com/ethersphere/swarm/file" +) + +var ( + defaultSectionSize = 32 + defaultBranches = 128 +) + +type Cache struct { + data map[int][]byte + index int + w file.SectionWriter +} + +func NewCache() *Cache { + return &Cache{ + data: make(map[int][]byte), + } +} + +func (c *Cache) Init(_ context.Context, _ func(error)) { +} + +func (c *Cache) SetWriter(writeFunc file.SectionWriterFunc) file.SectionWriter { + c.w = writeFunc(nil) + return c +} + +func (c *Cache) SetSpan(length int) { + if c.w != nil { + c.w.SetSpan(length) + } +} + +func (c *Cache) SetLength(length int) { + if c.w != nil { + c.w.SetLength(length) + } +} + +func (c *Cache) SeekSection(offset int) { + c.index = offset + if c.w != nil { + c.w.SeekSection(offset) + } +} + +func (c *Cache) Write(b []byte) (int, error) { + c.data[c.index] = b + if c.w != nil { + return c.w.Write(b) + } + return len(b), nil +} + +func (c *Cache) Sum(b []byte) []byte { + if c.w == nil { + return nil + } + return c.w.Sum(b) +} + +func (c *Cache) Reset() { + if c.w == nil { + return + } + c.w.Reset() +} + +func (c *Cache) SectionSize() int { + if c.w != nil { + return c.w.SectionSize() + } + return defaultSectionSize +} + +func (c *Cache) BlockSize() int { + return c.SectionSize() +} + +func (c *Cache) Size() int { + if c.w != nil { + return c.w.Size() + } + return defaultSectionSize +} + +func (c *Cache) Branches() int { + if c.w != nil { + return c.w.Branches() + } + return defaultBranches +} + +func (c *Cache) Get(index int) []byte { + return c.data[index] +} + +func (c *Cache) Delete(index int) { + delete(c.data, index) +} diff --git a/file/testutillocal/cache_test.go b/file/testutillocal/cache_test.go new file mode 100644 index 0000000000..cf43f0d3cb --- /dev/null +++ b/file/testutillocal/cache_test.go @@ -0,0 +1,53 @@ +package testutillocal + +import ( + "bytes" + "context" + "testing" + + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethersphere/swarm/testutil" +) + +const ( + sectionSize = 32 + chunkSize = 4096 +) + +func init() { + testutil.Init() +} + +func TestCache(t *testing.T) { + c := NewCache() + c.Init(context.Background(), func(error) {}) + _, data := testutil.SerialData(chunkSize, 255, 0) + c.Write(data) + cachedData := c.Get(0) + if !bytes.Equal(cachedData, data) { + t.Fatalf("cache data; expected %x, got %x", data, cachedData) + } +} + +func TestCacheLink(t *testing.T) { + + hashFunc := NewBMTHasherFunc(0) + + c := NewCache() + c.Init(context.Background(), func(error) {}) + c.SetWriter(hashFunc) + _, data := testutil.SerialData(chunkSize, 255, 0) + c.SeekSection(-1) + c.Write(data) + ref := c.Sum(nil) + refHex := hexutil.Encode(ref) + correctRefHex := "0xc10090961e7682a10890c334d759a28426647141213abda93b096b892824d2ef" + if refHex != correctRefHex { + t.Fatalf("cache link; expected %s, got %s", correctRefHex, refHex) + } + + c.Delete(0) + if _, ok := c.data[0]; ok { + t.Fatalf("delete; expected not found") + } +} diff --git a/file/testutillocal/hash.go b/file/testutillocal/hash.go new file mode 100644 index 0000000000..03f8c69bcc --- /dev/null +++ b/file/testutillocal/hash.go @@ -0,0 +1,24 @@ +package testutillocal + +import ( + "context" + + "github.com/ethersphere/swarm/bmt" + "github.com/ethersphere/swarm/file" + "golang.org/x/crypto/sha3" +) + +var ( + branches = 128 +) + +func NewBMTHasherFunc(poolSize int) file.SectionWriterFunc { + if poolSize == 0 { + poolSize = bmt.PoolSize + } + poolAsync := bmt.NewTreePool(sha3.NewLegacyKeccak256, branches, poolSize) + refHashFunc := func(_ context.Context) file.SectionWriter { + return bmt.New(poolAsync).NewAsyncWriter(false) + } + return refHashFunc +} From 780a5fc486105b70bb30a9705ed7acce8478789c Mon Sep 17 00:00:00 2001 From: nolash Date: Mon, 24 Feb 2020 11:44:19 +0100 Subject: [PATCH 2/9] file: Add dependency files to job unit --- file/hasher/common_test.go | 202 ++++++++++++++++++++++++++++++++++++ file/hasher/index.go | 82 +++++++++++++++ file/hasher/target.go | 61 +++++++++++ file/hasher/util.go | 27 +++++ file/testutillocal/cache.go | 24 ++--- file/testutillocal/hash.go | 6 +- 6 files changed, 388 insertions(+), 14 deletions(-) create mode 100644 file/hasher/index.go create mode 100644 file/hasher/target.go diff --git a/file/hasher/common_test.go b/file/hasher/common_test.go index bad3556420..84d2fdf33b 100644 --- a/file/hasher/common_test.go +++ b/file/hasher/common_test.go @@ -1,7 +1,18 @@ package hasher import ( + "bytes" + "context" + "encoding/binary" + "hash" + "sync" + "testing" + + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethersphere/swarm/file" + "github.com/ethersphere/swarm/log" "github.com/ethersphere/swarm/testutil" + "golang.org/x/crypto/sha3" ) const ( @@ -64,3 +75,194 @@ var ( func init() { testutil.Init() } + +var ( + dummyHashFunc = func(_ context.Context) file.SectionWriter { + return newDummySectionWriter(chunkSize*branches, sectionSize, sectionSize, branches) + } + + // placeholder for cases where a hasher is not necessary + noHashFunc = func(_ context.Context) file.SectionWriter { + return nil + } + + logErrFunc = func(err error) { + log.Error("SectionWriter pipeline error", "err", err) + } +) + +// simple file.SectionWriter hasher that keeps the data written to it +// for later inspection +// TODO: see if this can be replaced with the fake hasher from storage module +type dummySectionWriter struct { + sectionSize int + digestSize int + branches int + data []byte + digest []byte + size int + span []byte + summed bool + index int + writer hash.Hash + mu sync.Mutex + wg sync.WaitGroup +} + +func newDummySectionWriter(cp int, sectionSize int, digestSize int, branches int) *dummySectionWriter { + return &dummySectionWriter{ + sectionSize: sectionSize, + digestSize: digestSize, + branches: branches, + data: make([]byte, cp), + writer: sha3.NewLegacyKeccak256(), + digest: make([]byte, digestSize), + } +} + +func (d *dummySectionWriter) Init(_ context.Context, _ func(error)) { +} + +func (d *dummySectionWriter) SetWriter(_ file.SectionWriterFunc) file.SectionWriter { + log.Error("dummySectionWriter does not support SectionWriter chaining") + return d +} + +// implements file.SectionWriter +func (d *dummySectionWriter) SeekSection(offset int) { + d.index = offset * d.SectionSize() +} + +// implements file.SectionWriter +func (d *dummySectionWriter) SetLength(length int) { + d.size = length +} + +// implements file.SectionWriter +func (d *dummySectionWriter) SetSpan(length int) { + d.span = make([]byte, 8) + binary.LittleEndian.PutUint64(d.span, uint64(length)) +} + +// implements file.SectionWriter +func (d *dummySectionWriter) Write(data []byte) (int, error) { + d.mu.Lock() + copy(d.data[d.index:], data) + d.size += len(data) + log.Trace("dummywriter write", "index", d.index, "size", d.size, "threshold", d.sectionSize*d.branches) + if d.isFull() { + d.summed = true + d.mu.Unlock() + d.sum() + } else { + d.mu.Unlock() + } + return len(data), nil +} + +// implements file.SectionWriter +func (d *dummySectionWriter) Sum(_ []byte) []byte { + log.Trace("dummy Sumcall", "size", d.size) + d.mu.Lock() + if !d.summed { + d.summed = true + d.mu.Unlock() + d.sum() + } else { + d.mu.Unlock() + } + return d.digest +} + +func (d *dummySectionWriter) sum() { + d.mu.Lock() + defer d.mu.Unlock() + d.writer.Write(d.span) + log.Trace("dummy sum writing span", "span", d.span) + for i := 0; i < d.size; i += d.writer.Size() { + sectionData := d.data[i : i+d.writer.Size()] + log.Trace("dummy sum write", "i", i/d.writer.Size(), "data", hexutil.Encode(sectionData), "size", d.size) + d.writer.Write(sectionData) + } + copy(d.digest, d.writer.Sum(nil)) + log.Trace("dummy sum result", "ref", hexutil.Encode(d.digest)) +} + +// implements file.SectionWriter +func (d *dummySectionWriter) Reset() { + d.mu.Lock() + defer d.mu.Unlock() + d.data = make([]byte, len(d.data)) + d.digest = make([]byte, d.digestSize) + d.size = 0 + d.summed = false + d.span = nil + d.writer.Reset() +} + +// implements file.SectionWriter +func (d *dummySectionWriter) BlockSize() int { + return d.sectionSize +} + +// implements file.SectionWriter +func (d *dummySectionWriter) SectionSize() int { + return d.sectionSize +} + +// implements file.SectionWriter +func (d *dummySectionWriter) Size() int { + return d.sectionSize +} + +// implements file.SectionWriter +func (d *dummySectionWriter) Branches() int { + return d.branches +} + +func (d *dummySectionWriter) isFull() bool { + return d.size == d.sectionSize*d.branches +} + +// TestDummySectionWriter +func TestDummySectionWriter(t *testing.T) { + + w := newDummySectionWriter(chunkSize*2, sectionSize, sectionSize, branches) + w.Reset() + + _, data := testutil.SerialData(sectionSize*2, 255, 0) + + w.SeekSection(branches) + w.Write(data[:sectionSize]) + w.SeekSection(branches + 1) + w.Write(data[sectionSize:]) + if !bytes.Equal(w.data[chunkSize:chunkSize+sectionSize*2], data) { + t.Fatalf("Write double pos %d: expected %x, got %x", chunkSize, w.data[chunkSize:chunkSize+sectionSize*2], data) + } + + correctDigestHex := "0x52eefd0c37895a8845d4a6cf6c6b56980e448376e55eb45717663ab7b3fc8d53" + w.SetLength(chunkSize * 2) + w.SetSpan(chunkSize * 2) + digest := w.Sum(nil) + digestHex := hexutil.Encode(digest) + if digestHex != correctDigestHex { + t.Fatalf("Digest: 2xsectionSize*1; expected %s, got %s", correctDigestHex, digestHex) + } + + w = newDummySectionWriter(chunkSize*2, sectionSize*2, sectionSize*2, branches/2) + w.Reset() + w.SeekSection(branches / 2) + w.Write(data) + if !bytes.Equal(w.data[chunkSize:chunkSize+sectionSize*2], data) { + t.Fatalf("Write double pos %d: expected %x, got %x", chunkSize, w.data[chunkSize:chunkSize+sectionSize*2], data) + } + + correctDigestHex += zeroHex + w.SetLength(chunkSize * 2) + w.SetSpan(chunkSize * 2) + digest = w.Sum(nil) + digestHex = hexutil.Encode(digest) + if digestHex != correctDigestHex { + t.Fatalf("Digest 1xsectionSize*2; expected %s, got %s", correctDigestHex, digestHex) + } +} diff --git a/file/hasher/index.go b/file/hasher/index.go new file mode 100644 index 0000000000..9e8ab21071 --- /dev/null +++ b/file/hasher/index.go @@ -0,0 +1,82 @@ +package hasher + +import ( + "fmt" + "sync" +) + +// keeps an index of all the existing jobs for a file hashing operation +// sorted by level +// +// it also keeps all the "top hashes", ie hashes on first data section index of every level +// these are needed in case of balanced tree results, since the hashing result would be +// lost otherwise, due to the job not having any intermediate storage of any data +type jobIndex struct { + maxLevels int + jobs []sync.Map + topHashes [][]byte + mu sync.Mutex +} + +func newJobIndex(maxLevels int) *jobIndex { + ji := &jobIndex{ + maxLevels: maxLevels, + } + for i := 0; i < maxLevels; i++ { + ji.jobs = append(ji.jobs, sync.Map{}) + } + return ji +} + +// implements Stringer interface +func (ji *jobIndex) String() string { + return fmt.Sprintf("%p", ji) +} + +// Add adds a job to the index at the level +// and data section index specified in the job +func (ji *jobIndex) Add(jb *job) { + //log.Trace("adding job", "job", jb) + ji.jobs[jb.level].Store(jb.dataSection, jb) +} + +// Get retrieves a job from the job index +// based on the level of the job and its data section index +// if a job for the level and section index does not exist this method returns nil +func (ji *jobIndex) Get(lvl int, section int) *job { + jb, ok := ji.jobs[lvl].Load(section) + if !ok { + return nil + } + return jb.(*job) +} + +// Delete removes a job from the job index +// leaving it to be garbage collected when +// the reference in the main code is relinquished +func (ji *jobIndex) Delete(jb *job) { + ji.jobs[jb.level].Delete(jb.dataSection) +} + +// AddTopHash should be called by a job when a hash is written to the first index of a level +// since the job doesn't store any data written to it (just passing it through to the underlying writer) +// this is needed for the edge case of balanced trees +func (ji *jobIndex) AddTopHash(ref []byte) { + ji.mu.Lock() + defer ji.mu.Unlock() + ji.topHashes = append(ji.topHashes, ref) + //log.Trace("added top hash", "length", len(ji.topHashes), "index", ji) +} + +// GetJobHash gets the current top hash for a particular level set by AddTopHash +func (ji *jobIndex) GetTopHash(lvl int) []byte { + ji.mu.Lock() + defer ji.mu.Unlock() + return ji.topHashes[lvl-1] +} + +func (ji *jobIndex) GetTopHashLevel() int { + ji.mu.Lock() + defer ji.mu.Unlock() + return len(ji.topHashes) +} diff --git a/file/hasher/target.go b/file/hasher/target.go new file mode 100644 index 0000000000..9d566fa490 --- /dev/null +++ b/file/hasher/target.go @@ -0,0 +1,61 @@ +package hasher + +import ( + "sync" + + "github.com/ethersphere/swarm/log" +) + +// passed to a job to determine at which data lengths and levels a job should terminate +type target struct { + size int32 // bytes written + sections int32 // sections written + level int32 // target level calculated from bytes written against branching factor and sector size + resultC chan []byte // channel to receive root hash + doneC chan struct{} // when this channel is closed all jobs will calculate their end write count + mu sync.Mutex +} + +func newTarget() *target { + return &target{ + resultC: make(chan []byte), + doneC: make(chan struct{}), + } +} + +// Set is called when the final length of the data to be written is known +// TODO: method can be simplified to calculate sections and level internally +func (t *target) Set(size int, sections int, level int) { + t.mu.Lock() + defer t.mu.Unlock() + t.size = int32(size) + t.sections = int32(sections) + t.level = int32(level) + log.Trace("target set", "size", t.size, "section", t.sections, "level", t.level) + close(t.doneC) +} + +// Count returns the total section count for the target +// it should only be called after Set() +func (t *target) Count() int { + t.mu.Lock() + defer t.mu.Unlock() + return int(t.sections) + 1 +} + +func (t *target) Level() int { + t.mu.Lock() + defer t.mu.Unlock() + return int(t.level) +} + +func (t *target) Size() int { + t.mu.Lock() + defer t.mu.Unlock() + return int(t.size) +} + +// Done returns the channel in which the root hash will be sent +func (t *target) Done() <-chan []byte { + return t.resultC +} diff --git a/file/hasher/util.go b/file/hasher/util.go index 141fd1d114..da23a03957 100644 --- a/file/hasher/util.go +++ b/file/hasher/util.go @@ -29,3 +29,30 @@ func getLevelsFromLength(l int, sectionSize int, branches int) int { return int(math.Log(float64(c))/math.Log(float64(branches)) + 1) } + +// calculates the section index of the given byte size +func dataSizeToSectionIndex(length int, sectionSize int) int { + return (length - 1) / sectionSize +} + +// calculates the section count of the given byte size +func dataSizeToSectionCount(length int, sectionSize int) int { + return dataSizeToSectionIndex(length, sectionSize) + 1 +} + +// calculates the corresponding level section for a data section +func dataSectionToLevelSection(p *treeParams, lvl int, sections int) int { + span := p.Spans[lvl] + return sections / span +} + +// calculates the lower data section boundary of a level for which a data section is contained +// the higher level use is to determine whether the final data section written falls within +// a certain level's span +func dataSectionToLevelBoundary(p *treeParams, lvl int, section int) int { + span := p.Spans[lvl+1] + spans := section / span + spanBytes := spans * span + //log.Trace("levelboundary", "spans", spans, "section", section, "span", span) + return spanBytes +} diff --git a/file/testutillocal/cache.go b/file/testutillocal/cache.go index 5fe94295fe..ff88b23fc2 100644 --- a/file/testutillocal/cache.go +++ b/file/testutillocal/cache.go @@ -37,18 +37,18 @@ func (c *Cache) SetSpan(length int) { } } -func (c *Cache) SetLength(length int) { - if c.w != nil { - c.w.SetLength(length) - } -} - -func (c *Cache) SeekSection(offset int) { - c.index = offset - if c.w != nil { - c.w.SeekSection(offset) - } -} +//func (c *Cache) SetLength(length int) { +// if c.w != nil { +// c.w.SetLength(length) +// } +//} + +//func (c *Cache) SeekSection(offset int) { +// c.index = offset +// if c.w != nil { +// c.w.SeekSection(offset) +// } +//} func (c *Cache) Write(b []byte) (int, error) { c.data[c.index] = b diff --git a/file/testutillocal/hash.go b/file/testutillocal/hash.go index 03f8c69bcc..79dc767a60 100644 --- a/file/testutillocal/hash.go +++ b/file/testutillocal/hash.go @@ -5,6 +5,7 @@ import ( "github.com/ethersphere/swarm/bmt" "github.com/ethersphere/swarm/file" + asyncbmt "github.com/ethersphere/swarm/file/bmt" "golang.org/x/crypto/sha3" ) @@ -17,8 +18,9 @@ func NewBMTHasherFunc(poolSize int) file.SectionWriterFunc { poolSize = bmt.PoolSize } poolAsync := bmt.NewTreePool(sha3.NewLegacyKeccak256, branches, poolSize) - refHashFunc := func(_ context.Context) file.SectionWriter { - return bmt.New(poolAsync).NewAsyncWriter(false) + refHashFunc := func(ctx context.Context) file.SectionWriter { + bmtHasher := bmt.New(poolAsync) + return asyncbmt.NewAsyncHasher(ctx, bmtHasher, false, nil) } return refHashFunc } From 26f9d5197cfc8029aceb3ef303683d92fe9eacfa Mon Sep 17 00:00:00 2001 From: nolash Date: Mon, 24 Feb 2020 12:27:57 +0100 Subject: [PATCH 3/9] file: Make job unit compile --- bmt/bmt.go | 14 ++-- file/bmt/async.go | 8 ++ file/hasher/common_test.go | 9 +++ file/hasher/job.go | 12 +-- file/hasher/job_test.go | 10 +-- file/hasher/reference.go | 145 ---------------------------------- file/hasher/reference_test.go | 140 -------------------------------- file/testutillocal/cache.go | 8 ++ file/types.go | 2 + 9 files changed, 44 insertions(+), 304 deletions(-) delete mode 100644 file/hasher/reference.go delete mode 100644 file/hasher/reference_test.go diff --git a/bmt/bmt.go b/bmt/bmt.go index 1753e7a941..5a632b7e3e 100644 --- a/bmt/bmt.go +++ b/bmt/bmt.go @@ -25,9 +25,6 @@ import ( "strings" "sync" "sync/atomic" - - "github.com/ethersphere/swarm/file" - "github.com/ethersphere/swarm/log" ) /* @@ -290,11 +287,12 @@ func newTree(segmentSize, depth int, hashfunc func() hash.Hash) *tree { } } -// SetWriter implements file.SectionWriter -func (h *Hasher) SetWriter(_ file.SectionWriterFunc) file.SectionWriter { - log.Warn("Synchasher does not currently support SectionWriter chaining") - return h -} +// +//// SetWriter implements file.SectionWriter +//func (h *Hasher) SetWriter(_ file.SectionWriterFunc) file.SectionWriter { +// log.Warn("Synchasher does not currently support SectionWriter chaining") +// return h +//} // SectionSize implements file.SectionWriter func (h *Hasher) SectionSize() int { diff --git a/file/bmt/async.go b/file/bmt/async.go index 2b8a6331ad..4076121c13 100644 --- a/file/bmt/async.go +++ b/file/bmt/async.go @@ -22,6 +22,8 @@ import ( "sync" "github.com/ethersphere/swarm/bmt" + "github.com/ethersphere/swarm/file" + "github.com/ethersphere/swarm/log" ) // NewAsyncWriter extends Hasher with an interface for concurrent segment.GetSection() writes @@ -177,3 +179,9 @@ func (sw *AsyncHasher) SumIndexed(b []byte, length int) (s []byte) { hsh.Write(s) return hsh.Sum(b) } + +// SetWriter implements file.SectionWriter +func (sw *AsyncHasher) SetWriter(_ file.SectionWriterFunc) file.SectionWriter { + log.Warn("BMT hasher does not currently support SectionWriter chaining") + return sw +} diff --git a/file/hasher/common_test.go b/file/hasher/common_test.go index 84d2fdf33b..c387d286e6 100644 --- a/file/hasher/common_test.go +++ b/file/hasher/common_test.go @@ -224,6 +224,15 @@ func (d *dummySectionWriter) isFull() bool { return d.size == d.sectionSize*d.branches } +func (d *dummySectionWriter) SumIndexed(b []byte, i int) []byte { + return d.writer.Sum(b) +} + +func (d *dummySectionWriter) WriteIndexed(_ int, b []byte) { + log.Warn("index in WriteIndexed ignored for dummyWriter") + d.writer.Write(b) +} + // TestDummySectionWriter func TestDummySectionWriter(t *testing.T) { diff --git a/file/hasher/job.go b/file/hasher/job.go index 66252f952a..c908749472 100644 --- a/file/hasher/job.go +++ b/file/hasher/job.go @@ -31,8 +31,8 @@ type job struct { firstSectionData []byte // store first section of data written to solve the dangling chunk edge case writeC chan jobUnit - writer file.SectionWriter // underlying data processor - doneC chan struct{} // pointer to target doneC channel, set to nil in process() when closed + writer file.SectionWriter + doneC chan struct{} // pointer to target doneC channel, set to nil in process() when closed mu sync.Mutex } @@ -165,8 +165,8 @@ OUTER: idx := entry.index + i data := entry.data[offset : offset+jb.writer.SectionSize()] log.Trace("job write", "datasection", jb.dataSection, "level", jb.level, "processCount", oldProcessCount+i, "endcount", endCount, "index", entry.index+i, "data", hexutil.Encode(data)) - jb.writer.SeekSection(idx) - jb.writer.Write(data) + //jb.writer.SeekSection(idx) + jb.writer.WriteIndexed(idx, data) offset += jb.writer.SectionSize() } @@ -224,10 +224,10 @@ func (jb *job) sum() { size := jb.size() //span := bmt.LengthToSpan(size) refSize := jb.count() * jb.params.SectionSize - jb.writer.SetLength(refSize) + //jb.writer.SetLength(refSize) jb.writer.SetSpan(size) log.Trace("job sum", "count", jb.count(), "refsize", refSize, "size", size, "datasection", jb.dataSection, "level", jb.level, "targetlevel", targetLevel, "endcount", jb.endCount) - ref := jb.writer.Sum(nil) + ref := jb.writer.SumIndexed(nil, refSize) // endCount > 0 means this is the last chunk on the level // the hash from the level below the target level will be the result diff --git a/file/hasher/job_test.go b/file/hasher/job_test.go index e4d1084260..cf0c2cd4c2 100644 --- a/file/hasher/job_test.go +++ b/file/hasher/job_test.go @@ -544,9 +544,9 @@ func TestJobVector(t *testing.T) { if ie > dataLength { ie = dataLength } - writeSize := ie - i + //writeSize := ie - i dataHash.Reset() - dataHash.SetLength(writeSize) + //dataHash.SetLength(writeSize) c, err := dataHash.Write(data[i:ie]) if err != nil { jb.destroy() @@ -556,7 +556,7 @@ func TestJobVector(t *testing.T) { jb.destroy() t.Fatalf("data ref short write: expect %d, got %d", ie-i, c) } - ref := dataHash.Sum(nil) + ref := dataHash.Sum(nil) //, writeSize) log.Debug("data ref", "i", i, "ie", ie, "data", hexutil.Encode(ref)) jb.write(count, ref) count += 1 @@ -621,9 +621,9 @@ func benchmarkJob(b *testing.B) { if ie > dataLength { ie = dataLength } - writeSize := ie - i + //writeSize := ie - i dataHash.Reset() - dataHash.SetLength(writeSize) + //dataHash.SetLength(writeSize) c, err := dataHash.Write(data[i:ie]) if err != nil { jb.destroy() diff --git a/file/hasher/reference.go b/file/hasher/reference.go deleted file mode 100644 index 0ceb570ee8..0000000000 --- a/file/hasher/reference.go +++ /dev/null @@ -1,145 +0,0 @@ -package hasher - -import ( - "github.com/ethersphere/swarm/file" -) - -// ReferenceHasher is the source-of-truth implementation of the swarm file hashing algorithm -type ReferenceHasher struct { - params *treeParams - cursors []int // section write position, indexed per level - length int // number of bytes written to the data level of the hasher - buffer []byte // keeps data and hashes, indexed by cursors - counts []int // number of sums performed, indexed per level - hasher file.SectionWriter // underlying hasher -} - -// NewReferenceHasher constructs and returns a new ReferenceHasher -// This implementation is limited to a tree of 9 levels, where level 0 is the data level -// With 32 section size and 128 branches (i.e. unencrypted, non erasure-coded content) this means -// a capacity of 4096 bytes * (128^(9-1)) ~ 295.148 * (10^18) bytes -func NewReferenceHasher(params *treeParams) *ReferenceHasher { - // TODO: remove when bmt interface is amended - h := params.GetWriter() - return &ReferenceHasher{ - params: params, - cursors: make([]int, 9), - counts: make([]int, 9), - buffer: make([]byte, params.ChunkSize*9), - hasher: h, - } -} - -// Hash computes and returns the root hash of arbitrary data -func (r *ReferenceHasher) Hash(data []byte) []byte { - l := r.params.ChunkSize - for i := 0; i < len(data); i += r.params.ChunkSize { - if len(data)-i < r.params.ChunkSize { - l = len(data) - i - } - r.update(0, data[i:i+l]) - } - - // if we didn't end on a chunk boundary we need to hash remaining chunks first - r.hashUnfinished() - - // if the already hashed parts tree is balanced - r.moveDanglingChunk() - - return r.digest() -} - -// write to the data buffer on the specified level -// calls sum if chunk boundary is reached and recursively calls this function for the next level with the acquired bmt hash -// adjusts cursors accordingly -func (r *ReferenceHasher) update(lvl int, data []byte) { - if lvl == 0 { - r.length += len(data) - } - copy(r.buffer[r.cursors[lvl]:r.cursors[lvl]+len(data)], data) - r.cursors[lvl] += len(data) - if r.cursors[lvl]-r.cursors[lvl+1] == r.params.ChunkSize { - ref := r.sum(lvl) - r.update(lvl+1, ref) - r.cursors[lvl] = r.cursors[lvl+1] - } -} - -// calculates and returns the bmt sum of the last written data on the level -func (r *ReferenceHasher) sum(lvl int) []byte { - r.counts[lvl]++ - spanSize := r.params.Spans[lvl] * r.params.ChunkSize - span := (r.length-1)%spanSize + 1 - - sizeToSum := r.cursors[lvl] - r.cursors[lvl+1] - - r.hasher.Reset() - r.hasher.SetSpan(span) - r.hasher.Write(r.buffer[r.cursors[lvl+1] : r.cursors[lvl+1]+sizeToSum]) - ref := r.hasher.Sum(nil) - return ref -} - -// called after all data has been written -// sums the final chunks of each level -// skips intermediate levels that end on span boundary -func (r *ReferenceHasher) digest() []byte { - - // the first section of the buffer will hold the root hash - return r.buffer[:r.params.SectionSize] -} - -// hashes the remaining unhashed chunks at the end of each level -func (r *ReferenceHasher) hashUnfinished() { - if r.length%r.params.ChunkSize != 0 { - ref := r.sum(0) - copy(r.buffer[r.cursors[1]:], ref) - r.cursors[1] += len(ref) - r.cursors[0] = r.cursors[1] - } -} - -// in case of a balanced tree this method concatenates the reference to the single reference -// at the highest level of the tree. -// -// Let F be full chunks (disregarding branching factor) and S be single references -// in the following scenario: -// -// S -// F F -// F F F -// F F F F S -// -// The result will be: -// -// SS -// F F -// F F F -// F F F F -// -// After which the SS will be hashed to obtain the final root hash -func (r *ReferenceHasher) moveDanglingChunk() { - - // calculate the total number of levels needed to represent the data (including the data level) - targetLevel := getLevelsFromLength(r.length, r.params.SectionSize, r.params.Branches) - - // sum every intermediate level and write to the level above it - for i := 1; i < targetLevel; i++ { - - // and if there is a single reference outside a balanced tree on this level - // don't hash it again but pass it on to the next level - if r.counts[i] > 0 { - // TODO: simplify if possible - if r.counts[i-1]-r.params.Spans[targetLevel-1-i] <= 1 { - r.cursors[i+1] = r.cursors[i] - r.cursors[i] = r.cursors[i-1] - continue - } - } - - ref := r.sum(i) - copy(r.buffer[r.cursors[i+1]:], ref) - r.cursors[i+1] += len(ref) - r.cursors[i] = r.cursors[i+1] - } -} diff --git a/file/hasher/reference_test.go b/file/hasher/reference_test.go deleted file mode 100644 index d4deef5c0b..0000000000 --- a/file/hasher/reference_test.go +++ /dev/null @@ -1,140 +0,0 @@ -package hasher - -import ( - "context" - "fmt" - "strconv" - "strings" - "testing" - - "github.com/ethereum/go-ethereum/common/hexutil" - "github.com/ethersphere/swarm/bmt" - "github.com/ethersphere/swarm/file" - "github.com/ethersphere/swarm/log" - "github.com/ethersphere/swarm/testutil" - "golang.org/x/crypto/sha3" -) - -// TestManualDanglingChunk is a test script explicitly hashing and writing every individual level in the dangling chunk edge case -// we use a balanced tree with data size of chunkSize*branches, and a single chunk of data -// this case is chosen because it produces the wrong result in the pyramid hasher at the time of writing (master commit hash 4928d989ebd0854d993c10c194e61a5a5455e4f9) -func TestManualDanglingChunk(t *testing.T) { - pool := bmt.NewTreePool(sha3.NewLegacyKeccak256, branches, bmt.PoolSize) - h := bmt.New(pool) - - // to execute the job we need buffers with the following capacities: - // level 0: chunkSize*branches+chunkSize - // level 1: chunkSize - // level 2: sectionSize * 2 - var levels [][]byte - levels = append(levels, nil) - levels = append(levels, make([]byte, chunkSize)) - levels = append(levels, make([]byte, sectionSize*2)) - - // hash the balanced tree portion of the data level and write to level 1 - _, levels[0] = testutil.SerialData(chunkSize*branches+chunkSize, 255, 0) - for i := 0; i < chunkSize*branches; i += chunkSize { - h.Reset() - h.SetSpan(chunkSize) - h.Write(levels[0][i : i+chunkSize]) - copy(levels[1][i/branches:], h.Sum(nil)) - } - refHex := hexutil.Encode(levels[1][:sectionSize]) - correctRefHex := "0xc10090961e7682a10890c334d759a28426647141213abda93b096b892824d2ef" - if refHex != correctRefHex { - t.Fatalf("manual dangling single chunk; expected %s, got %s", correctRefHex, refHex) - } - - // write the dangling chunk - // hash it and write the reference on the second section of level 2 - h.Reset() - h.SetSpan(chunkSize) - h.Write(levels[0][chunkSize*branches:]) - copy(levels[2][sectionSize:], h.Sum(nil)) - refHex = hexutil.Encode(levels[2][sectionSize:]) - correctRefHex = "0x81b31d9a7f6c377523e8769db021091df23edd9fd7bd6bcdf11a22f518db6006" - if refHex != correctRefHex { - t.Fatalf("manual dangling single chunk; expected %s, got %s", correctRefHex, refHex) - } - - // hash the chunk on level 1 and write into the first section of level 2 - h.Reset() - h.SetSpan(chunkSize * branches) - h.Write(levels[1]) - copy(levels[2], h.Sum(nil)) - refHex = hexutil.Encode(levels[2][:sectionSize]) - correctRefHex = "0x3047d841077898c26bbe6be652a2ec590a5d9bd7cd45d290ea42511b48753c09" - if refHex != correctRefHex { - t.Fatalf("manual dangling balanced tree; expected %s, got %s", correctRefHex, refHex) - } - - // hash the two sections on level 2 to obtain the root hash - h.Reset() - h.SetSpan(chunkSize*branches + chunkSize) - h.Write(levels[2]) - ref := h.Sum(nil) - refHex = hexutil.Encode(ref) - correctRefHex = "0xb8e1804e37a064d28d161ab5f256cc482b1423d5cd0a6b30fde7b0f51ece9199" - if refHex != correctRefHex { - t.Fatalf("manual dangling root; expected %s, got %s", correctRefHex, refHex) - } -} - -// TestReferenceFileHasherVector executes the file hasher algorithms on serial input data of periods of 0-254 -// of lengths defined in common_test.go -// -// the "expected" array in common_test.go is generated by this implementation, and test failure due to -// result mismatch is nothing else than an indication that something has changed in the reference filehasher -// or the underlying hashing algorithm -func TestReferenceHasherVector(t *testing.T) { - - hashFunc := func(_ context.Context) file.SectionWriter { - pool := bmt.NewTreePool(sha3.NewLegacyKeccak256, branches, bmt.PoolSize) - return bmt.New(pool) - } - params := newTreeParams(hashFunc) - var mismatch int - for i := start; i < end; i++ { - dataLength := dataLengths[i] - log.Info("start", "i", i, "len", dataLength) - rh := NewReferenceHasher(params) - _, data := testutil.SerialData(dataLength, 255, 0) - refHash := rh.Hash(data) - eq := true - if expected[i] != fmt.Sprintf("%x", refHash) { - mismatch++ - eq = false - } - t.Logf("[%7d+%4d]\t%v\tref: %x\texpect: %s", dataLength/chunkSize, dataLength%chunkSize, eq, refHash, expected[i]) - } - if mismatch > 0 { - t.Fatalf("mismatches: %d/%d", mismatch, end-start) - } -} - -// BenchmarkReferenceHasher establishes a baseline for a fully synchronous file hashing operation -// it will be vastly inefficient -func BenchmarkReferenceHasher(b *testing.B) { - for i := start; i < end; i++ { - b.Run(fmt.Sprintf("%d", dataLengths[i]), benchmarkReferenceHasher) - } -} - -func benchmarkReferenceHasher(b *testing.B) { - benchParams := strings.Split(b.Name(), "/") - dataLength, err := strconv.ParseInt(benchParams[1], 10, 64) - if err != nil { - b.Fatal(err) - } - hashFunc := func(_ context.Context) file.SectionWriter { - pool := bmt.NewTreePool(sha3.NewLegacyKeccak256, branches, bmt.PoolSize) - return bmt.New(pool) - } - params := newTreeParams(hashFunc) - b.ResetTimer() - for i := 0; i < b.N; i++ { - _, data := testutil.SerialData(int(dataLength), 255, 0) - fh := NewReferenceHasher(params) - fh.Hash(data) - } -} diff --git a/file/testutillocal/cache.go b/file/testutillocal/cache.go index ff88b23fc2..c82c5d0610 100644 --- a/file/testutillocal/cache.go +++ b/file/testutillocal/cache.go @@ -104,3 +104,11 @@ func (c *Cache) Get(index int) []byte { func (c *Cache) Delete(index int) { delete(c.data, index) } + +func (c *Cache) SumIndexed(b []byte, i int) []byte { + return c.w.SumIndexed(b, i) +} + +func (c *Cache) WriteIndexed(i int, b []byte) { + c.w.WriteIndexed(i, b) +} diff --git a/file/types.go b/file/types.go index 5ad84fce90..b8a39403ad 100644 --- a/file/types.go +++ b/file/types.go @@ -13,4 +13,6 @@ type SectionWriter interface { SetSpan(length int) // set data span of chunk SectionSize() int // section size of this SectionWriter Branches() int // branch factor of this SectionWriter + SumIndexed([]byte, int) []byte + WriteIndexed(int, []byte) } From 020e76f90db4d51c6fd6f399a73f02d45eed3786 Mon Sep 17 00:00:00 2001 From: nolash Date: Wed, 18 Mar 2020 17:15:35 +0100 Subject: [PATCH 4/9] file: Rehabilitate job test --- file/hasher/common_test.go | 12 ++- file/hasher/job.go | 5 +- file/hasher/job_test.go | 37 +-------- file/hasher/reference.go | 145 ++++++++++++++++++++++++++++++++++ file/hasher/reference_test.go | 140 ++++++++++++++++++++++++++++++++ 5 files changed, 299 insertions(+), 40 deletions(-) create mode 100644 file/hasher/reference.go create mode 100644 file/hasher/reference_test.go diff --git a/file/hasher/common_test.go b/file/hasher/common_test.go index c387d286e6..cb513e6b45 100644 --- a/file/hasher/common_test.go +++ b/file/hasher/common_test.go @@ -110,6 +110,7 @@ type dummySectionWriter struct { } func newDummySectionWriter(cp int, sectionSize int, digestSize int, branches int) *dummySectionWriter { + log.Trace("creating dummy writer", "sectionsize", sectionSize, "digestsize", digestSize, "branches", branches) return &dummySectionWriter{ sectionSize: sectionSize, digestSize: digestSize, @@ -224,13 +225,16 @@ func (d *dummySectionWriter) isFull() bool { return d.size == d.sectionSize*d.branches } -func (d *dummySectionWriter) SumIndexed(b []byte, i int) []byte { +func (d *dummySectionWriter) SumIndexed(b []byte, l int) []byte { + //log.Trace("dummy sum indexed", "d", d.data[:l], "l", l, "b", b, "s", d.span) + d.writer.Write(d.span) + d.writer.Write(d.data[:l]) return d.writer.Sum(b) } -func (d *dummySectionWriter) WriteIndexed(_ int, b []byte) { - log.Warn("index in WriteIndexed ignored for dummyWriter") - d.writer.Write(b) +func (d *dummySectionWriter) WriteIndexed(i int, b []byte) { + //log.Trace("dummy write indexed", "i", i, "b", len(b)) + copy(d.data[i*d.sectionSize:], b) } // TestDummySectionWriter diff --git a/file/hasher/job.go b/file/hasher/job.go index c908749472..5356d57922 100644 --- a/file/hasher/job.go +++ b/file/hasher/job.go @@ -56,6 +56,7 @@ func newJob(params *treeParams, tgt *target, jobIndex *jobIndex, lvl int, dataSe jb.doneC = tgt.doneC } else { + log.Trace("target set", "level", lvl, "targetLevel", targetLevel) targetCount := tgt.Count() jb.endCount = int32(jb.targetCountToEndCount(targetCount)) } @@ -131,7 +132,7 @@ func (jb *job) write(index int, data []byte) { // - data write is finalized and targetcount is reached on a subsequent job write func (jb *job) process() { - log.Trace("starting job process", "level", jb.level, "sec", jb.dataSection) + log.Trace("starting job process", "level", jb.level, "sec", jb.dataSection, "target", jb.target) var processCount int defer jb.destroy() @@ -165,7 +166,6 @@ OUTER: idx := entry.index + i data := entry.data[offset : offset+jb.writer.SectionSize()] log.Trace("job write", "datasection", jb.dataSection, "level", jb.level, "processCount", oldProcessCount+i, "endcount", endCount, "index", entry.index+i, "data", hexutil.Encode(data)) - //jb.writer.SeekSection(idx) jb.writer.WriteIndexed(idx, data) offset += jb.writer.SectionSize() } @@ -185,6 +185,7 @@ OUTER: // enter here if data writes have been completed // TODO: this case currently executes for all cycles after data write is complete for which writes to this job do not happen. perhaps it can be improved case <-jb.doneC: + log.Trace("doneloop enter") jb.mu.Lock() jb.doneC = nil log.Trace("doneloop", "level", jb.level, "processCount", processCount, "endcount", jb.endCount) diff --git a/file/hasher/job_test.go b/file/hasher/job_test.go index cf0c2cd4c2..f99284e4bc 100644 --- a/file/hasher/job_test.go +++ b/file/hasher/job_test.go @@ -366,7 +366,7 @@ func TestJobWriteFull(t *testing.T) { defer cancel() select { case ref := <-tgt.Done(): - correctRefHex := "0x8ace4673563b86281778b943aa60481fc4ede9f238dd98f1b3a5df4cb54ee79b" + correctRefHex := "0x1ed31ae32fe570b69b01f800fbdec1f17b7ea6f0348216d6d79df91ddf28344e" refHex := hexutil.Encode(ref) if refHex != correctRefHex { t.Fatalf("job write full: expected %s, got %s", correctRefHex, refHex) @@ -489,37 +489,6 @@ func TestJobWriteSpanShuffle(t *testing.T) { } } -func TestJobWriteDoubleSection(t *testing.T) { - writeSize := sectionSize * 2 - dummyHashDoubleFunc := func(_ context.Context) file.SectionWriter { - return newDummySectionWriter(chunkSize, sectionSize*2, sectionSize*2, branches/2) - } - params := newTreeParams(dummyHashDoubleFunc) - - tgt := newTarget() - jb := newJob(params, tgt, nil, 1, 0) - jb.start() - _, data := testutil.SerialData(chunkSize, 255, 0) - - for i := 0; i < chunkSize; i += writeSize { - jb.write(i/writeSize, data[i:i+writeSize]) - } - tgt.Set(chunkSize, branches/2-1, 2) - ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*10) - defer cancel() - select { - case refLong := <-tgt.Done(): - refLongHex := hexutil.Encode(refLong) - correctRefLongHex := "0x8ace4673563b86281778b943aa60481fc4ede9f238dd98f1b3a5df4cb54ee79b" + zeroHex - if refLongHex != correctRefLongHex { - t.Fatalf("section long: expected %s, got %s", correctRefLongHex, refLongHex) - } - case <-ctx.Done(): - t.Fatalf("timeout: %v", ctx.Err()) - } - -} - // TestVectors executes the barebones functionality of the hasher // and verifies against source of truth results generated from the reference hasher // for the same data @@ -544,9 +513,9 @@ func TestJobVector(t *testing.T) { if ie > dataLength { ie = dataLength } - //writeSize := ie - i + writeSize := ie - i dataHash.Reset() - //dataHash.SetLength(writeSize) + dataHash.SetSpan(writeSize) c, err := dataHash.Write(data[i:ie]) if err != nil { jb.destroy() diff --git a/file/hasher/reference.go b/file/hasher/reference.go new file mode 100644 index 0000000000..0ceb570ee8 --- /dev/null +++ b/file/hasher/reference.go @@ -0,0 +1,145 @@ +package hasher + +import ( + "github.com/ethersphere/swarm/file" +) + +// ReferenceHasher is the source-of-truth implementation of the swarm file hashing algorithm +type ReferenceHasher struct { + params *treeParams + cursors []int // section write position, indexed per level + length int // number of bytes written to the data level of the hasher + buffer []byte // keeps data and hashes, indexed by cursors + counts []int // number of sums performed, indexed per level + hasher file.SectionWriter // underlying hasher +} + +// NewReferenceHasher constructs and returns a new ReferenceHasher +// This implementation is limited to a tree of 9 levels, where level 0 is the data level +// With 32 section size and 128 branches (i.e. unencrypted, non erasure-coded content) this means +// a capacity of 4096 bytes * (128^(9-1)) ~ 295.148 * (10^18) bytes +func NewReferenceHasher(params *treeParams) *ReferenceHasher { + // TODO: remove when bmt interface is amended + h := params.GetWriter() + return &ReferenceHasher{ + params: params, + cursors: make([]int, 9), + counts: make([]int, 9), + buffer: make([]byte, params.ChunkSize*9), + hasher: h, + } +} + +// Hash computes and returns the root hash of arbitrary data +func (r *ReferenceHasher) Hash(data []byte) []byte { + l := r.params.ChunkSize + for i := 0; i < len(data); i += r.params.ChunkSize { + if len(data)-i < r.params.ChunkSize { + l = len(data) - i + } + r.update(0, data[i:i+l]) + } + + // if we didn't end on a chunk boundary we need to hash remaining chunks first + r.hashUnfinished() + + // if the already hashed parts tree is balanced + r.moveDanglingChunk() + + return r.digest() +} + +// write to the data buffer on the specified level +// calls sum if chunk boundary is reached and recursively calls this function for the next level with the acquired bmt hash +// adjusts cursors accordingly +func (r *ReferenceHasher) update(lvl int, data []byte) { + if lvl == 0 { + r.length += len(data) + } + copy(r.buffer[r.cursors[lvl]:r.cursors[lvl]+len(data)], data) + r.cursors[lvl] += len(data) + if r.cursors[lvl]-r.cursors[lvl+1] == r.params.ChunkSize { + ref := r.sum(lvl) + r.update(lvl+1, ref) + r.cursors[lvl] = r.cursors[lvl+1] + } +} + +// calculates and returns the bmt sum of the last written data on the level +func (r *ReferenceHasher) sum(lvl int) []byte { + r.counts[lvl]++ + spanSize := r.params.Spans[lvl] * r.params.ChunkSize + span := (r.length-1)%spanSize + 1 + + sizeToSum := r.cursors[lvl] - r.cursors[lvl+1] + + r.hasher.Reset() + r.hasher.SetSpan(span) + r.hasher.Write(r.buffer[r.cursors[lvl+1] : r.cursors[lvl+1]+sizeToSum]) + ref := r.hasher.Sum(nil) + return ref +} + +// called after all data has been written +// sums the final chunks of each level +// skips intermediate levels that end on span boundary +func (r *ReferenceHasher) digest() []byte { + + // the first section of the buffer will hold the root hash + return r.buffer[:r.params.SectionSize] +} + +// hashes the remaining unhashed chunks at the end of each level +func (r *ReferenceHasher) hashUnfinished() { + if r.length%r.params.ChunkSize != 0 { + ref := r.sum(0) + copy(r.buffer[r.cursors[1]:], ref) + r.cursors[1] += len(ref) + r.cursors[0] = r.cursors[1] + } +} + +// in case of a balanced tree this method concatenates the reference to the single reference +// at the highest level of the tree. +// +// Let F be full chunks (disregarding branching factor) and S be single references +// in the following scenario: +// +// S +// F F +// F F F +// F F F F S +// +// The result will be: +// +// SS +// F F +// F F F +// F F F F +// +// After which the SS will be hashed to obtain the final root hash +func (r *ReferenceHasher) moveDanglingChunk() { + + // calculate the total number of levels needed to represent the data (including the data level) + targetLevel := getLevelsFromLength(r.length, r.params.SectionSize, r.params.Branches) + + // sum every intermediate level and write to the level above it + for i := 1; i < targetLevel; i++ { + + // and if there is a single reference outside a balanced tree on this level + // don't hash it again but pass it on to the next level + if r.counts[i] > 0 { + // TODO: simplify if possible + if r.counts[i-1]-r.params.Spans[targetLevel-1-i] <= 1 { + r.cursors[i+1] = r.cursors[i] + r.cursors[i] = r.cursors[i-1] + continue + } + } + + ref := r.sum(i) + copy(r.buffer[r.cursors[i+1]:], ref) + r.cursors[i+1] += len(ref) + r.cursors[i] = r.cursors[i+1] + } +} diff --git a/file/hasher/reference_test.go b/file/hasher/reference_test.go new file mode 100644 index 0000000000..d4deef5c0b --- /dev/null +++ b/file/hasher/reference_test.go @@ -0,0 +1,140 @@ +package hasher + +import ( + "context" + "fmt" + "strconv" + "strings" + "testing" + + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethersphere/swarm/bmt" + "github.com/ethersphere/swarm/file" + "github.com/ethersphere/swarm/log" + "github.com/ethersphere/swarm/testutil" + "golang.org/x/crypto/sha3" +) + +// TestManualDanglingChunk is a test script explicitly hashing and writing every individual level in the dangling chunk edge case +// we use a balanced tree with data size of chunkSize*branches, and a single chunk of data +// this case is chosen because it produces the wrong result in the pyramid hasher at the time of writing (master commit hash 4928d989ebd0854d993c10c194e61a5a5455e4f9) +func TestManualDanglingChunk(t *testing.T) { + pool := bmt.NewTreePool(sha3.NewLegacyKeccak256, branches, bmt.PoolSize) + h := bmt.New(pool) + + // to execute the job we need buffers with the following capacities: + // level 0: chunkSize*branches+chunkSize + // level 1: chunkSize + // level 2: sectionSize * 2 + var levels [][]byte + levels = append(levels, nil) + levels = append(levels, make([]byte, chunkSize)) + levels = append(levels, make([]byte, sectionSize*2)) + + // hash the balanced tree portion of the data level and write to level 1 + _, levels[0] = testutil.SerialData(chunkSize*branches+chunkSize, 255, 0) + for i := 0; i < chunkSize*branches; i += chunkSize { + h.Reset() + h.SetSpan(chunkSize) + h.Write(levels[0][i : i+chunkSize]) + copy(levels[1][i/branches:], h.Sum(nil)) + } + refHex := hexutil.Encode(levels[1][:sectionSize]) + correctRefHex := "0xc10090961e7682a10890c334d759a28426647141213abda93b096b892824d2ef" + if refHex != correctRefHex { + t.Fatalf("manual dangling single chunk; expected %s, got %s", correctRefHex, refHex) + } + + // write the dangling chunk + // hash it and write the reference on the second section of level 2 + h.Reset() + h.SetSpan(chunkSize) + h.Write(levels[0][chunkSize*branches:]) + copy(levels[2][sectionSize:], h.Sum(nil)) + refHex = hexutil.Encode(levels[2][sectionSize:]) + correctRefHex = "0x81b31d9a7f6c377523e8769db021091df23edd9fd7bd6bcdf11a22f518db6006" + if refHex != correctRefHex { + t.Fatalf("manual dangling single chunk; expected %s, got %s", correctRefHex, refHex) + } + + // hash the chunk on level 1 and write into the first section of level 2 + h.Reset() + h.SetSpan(chunkSize * branches) + h.Write(levels[1]) + copy(levels[2], h.Sum(nil)) + refHex = hexutil.Encode(levels[2][:sectionSize]) + correctRefHex = "0x3047d841077898c26bbe6be652a2ec590a5d9bd7cd45d290ea42511b48753c09" + if refHex != correctRefHex { + t.Fatalf("manual dangling balanced tree; expected %s, got %s", correctRefHex, refHex) + } + + // hash the two sections on level 2 to obtain the root hash + h.Reset() + h.SetSpan(chunkSize*branches + chunkSize) + h.Write(levels[2]) + ref := h.Sum(nil) + refHex = hexutil.Encode(ref) + correctRefHex = "0xb8e1804e37a064d28d161ab5f256cc482b1423d5cd0a6b30fde7b0f51ece9199" + if refHex != correctRefHex { + t.Fatalf("manual dangling root; expected %s, got %s", correctRefHex, refHex) + } +} + +// TestReferenceFileHasherVector executes the file hasher algorithms on serial input data of periods of 0-254 +// of lengths defined in common_test.go +// +// the "expected" array in common_test.go is generated by this implementation, and test failure due to +// result mismatch is nothing else than an indication that something has changed in the reference filehasher +// or the underlying hashing algorithm +func TestReferenceHasherVector(t *testing.T) { + + hashFunc := func(_ context.Context) file.SectionWriter { + pool := bmt.NewTreePool(sha3.NewLegacyKeccak256, branches, bmt.PoolSize) + return bmt.New(pool) + } + params := newTreeParams(hashFunc) + var mismatch int + for i := start; i < end; i++ { + dataLength := dataLengths[i] + log.Info("start", "i", i, "len", dataLength) + rh := NewReferenceHasher(params) + _, data := testutil.SerialData(dataLength, 255, 0) + refHash := rh.Hash(data) + eq := true + if expected[i] != fmt.Sprintf("%x", refHash) { + mismatch++ + eq = false + } + t.Logf("[%7d+%4d]\t%v\tref: %x\texpect: %s", dataLength/chunkSize, dataLength%chunkSize, eq, refHash, expected[i]) + } + if mismatch > 0 { + t.Fatalf("mismatches: %d/%d", mismatch, end-start) + } +} + +// BenchmarkReferenceHasher establishes a baseline for a fully synchronous file hashing operation +// it will be vastly inefficient +func BenchmarkReferenceHasher(b *testing.B) { + for i := start; i < end; i++ { + b.Run(fmt.Sprintf("%d", dataLengths[i]), benchmarkReferenceHasher) + } +} + +func benchmarkReferenceHasher(b *testing.B) { + benchParams := strings.Split(b.Name(), "/") + dataLength, err := strconv.ParseInt(benchParams[1], 10, 64) + if err != nil { + b.Fatal(err) + } + hashFunc := func(_ context.Context) file.SectionWriter { + pool := bmt.NewTreePool(sha3.NewLegacyKeccak256, branches, bmt.PoolSize) + return bmt.New(pool) + } + params := newTreeParams(hashFunc) + b.ResetTimer() + for i := 0; i < b.N; i++ { + _, data := testutil.SerialData(int(dataLength), 255, 0) + fh := NewReferenceHasher(params) + fh.Hash(data) + } +} From 5689206febecfea6c83de07d43954078f49a187d Mon Sep 17 00:00:00 2001 From: nolash Date: Thu, 19 Mar 2020 15:08:44 +0100 Subject: [PATCH 5/9] file: Wrap bmt.Hasher in ReferenceHasher for SectionHasher iface --- file/hasher/reference_test.go | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/file/hasher/reference_test.go b/file/hasher/reference_test.go index d4deef5c0b..e1c0a1d7a7 100644 --- a/file/hasher/reference_test.go +++ b/file/hasher/reference_test.go @@ -15,6 +15,23 @@ import ( "golang.org/x/crypto/sha3" ) +type referenceBmtWrapper struct { + *bmt.Hasher +} + +func (r *referenceBmtWrapper) SetWriter(hashFunc file.SectionWriterFunc) file.SectionWriter { + log.Warn("BMT hasher does not currently support SectionWriter chaining") + return r +} + +func (r *referenceBmtWrapper) SumIndexed(b []byte, _ int) []byte { + return r.Sum(b) +} + +func (r *referenceBmtWrapper) WriteIndexed(_ int, b []byte) { + r.Write(b) +} + // TestManualDanglingChunk is a test script explicitly hashing and writing every individual level in the dangling chunk edge case // we use a balanced tree with data size of chunkSize*branches, and a single chunk of data // this case is chosen because it produces the wrong result in the pyramid hasher at the time of writing (master commit hash 4928d989ebd0854d993c10c194e61a5a5455e4f9) @@ -90,7 +107,7 @@ func TestReferenceHasherVector(t *testing.T) { hashFunc := func(_ context.Context) file.SectionWriter { pool := bmt.NewTreePool(sha3.NewLegacyKeccak256, branches, bmt.PoolSize) - return bmt.New(pool) + return &referenceBmtWrapper{Hasher: bmt.New(pool)} } params := newTreeParams(hashFunc) var mismatch int @@ -128,7 +145,7 @@ func benchmarkReferenceHasher(b *testing.B) { } hashFunc := func(_ context.Context) file.SectionWriter { pool := bmt.NewTreePool(sha3.NewLegacyKeccak256, branches, bmt.PoolSize) - return bmt.New(pool) + return &referenceBmtWrapper{Hasher: bmt.New(pool)} } params := newTreeParams(hashFunc) b.ResetTimer() From 3e9d96b43c95480e8c3ac3e86981082beb17e1bf Mon Sep 17 00:00:00 2001 From: nolash Date: Thu, 19 Mar 2020 15:46:34 +0100 Subject: [PATCH 6/9] file: Remote prematurely added cache.go file in testutillocal --- file/testutillocal/cache.go | 114 ------------------------------- file/testutillocal/cache_test.go | 53 -------------- 2 files changed, 167 deletions(-) delete mode 100644 file/testutillocal/cache.go delete mode 100644 file/testutillocal/cache_test.go diff --git a/file/testutillocal/cache.go b/file/testutillocal/cache.go deleted file mode 100644 index c82c5d0610..0000000000 --- a/file/testutillocal/cache.go +++ /dev/null @@ -1,114 +0,0 @@ -package testutillocal - -import ( - "context" - - "github.com/ethersphere/swarm/file" -) - -var ( - defaultSectionSize = 32 - defaultBranches = 128 -) - -type Cache struct { - data map[int][]byte - index int - w file.SectionWriter -} - -func NewCache() *Cache { - return &Cache{ - data: make(map[int][]byte), - } -} - -func (c *Cache) Init(_ context.Context, _ func(error)) { -} - -func (c *Cache) SetWriter(writeFunc file.SectionWriterFunc) file.SectionWriter { - c.w = writeFunc(nil) - return c -} - -func (c *Cache) SetSpan(length int) { - if c.w != nil { - c.w.SetSpan(length) - } -} - -//func (c *Cache) SetLength(length int) { -// if c.w != nil { -// c.w.SetLength(length) -// } -//} - -//func (c *Cache) SeekSection(offset int) { -// c.index = offset -// if c.w != nil { -// c.w.SeekSection(offset) -// } -//} - -func (c *Cache) Write(b []byte) (int, error) { - c.data[c.index] = b - if c.w != nil { - return c.w.Write(b) - } - return len(b), nil -} - -func (c *Cache) Sum(b []byte) []byte { - if c.w == nil { - return nil - } - return c.w.Sum(b) -} - -func (c *Cache) Reset() { - if c.w == nil { - return - } - c.w.Reset() -} - -func (c *Cache) SectionSize() int { - if c.w != nil { - return c.w.SectionSize() - } - return defaultSectionSize -} - -func (c *Cache) BlockSize() int { - return c.SectionSize() -} - -func (c *Cache) Size() int { - if c.w != nil { - return c.w.Size() - } - return defaultSectionSize -} - -func (c *Cache) Branches() int { - if c.w != nil { - return c.w.Branches() - } - return defaultBranches -} - -func (c *Cache) Get(index int) []byte { - return c.data[index] -} - -func (c *Cache) Delete(index int) { - delete(c.data, index) -} - -func (c *Cache) SumIndexed(b []byte, i int) []byte { - return c.w.SumIndexed(b, i) -} - -func (c *Cache) WriteIndexed(i int, b []byte) { - c.w.WriteIndexed(i, b) -} diff --git a/file/testutillocal/cache_test.go b/file/testutillocal/cache_test.go deleted file mode 100644 index cf43f0d3cb..0000000000 --- a/file/testutillocal/cache_test.go +++ /dev/null @@ -1,53 +0,0 @@ -package testutillocal - -import ( - "bytes" - "context" - "testing" - - "github.com/ethereum/go-ethereum/common/hexutil" - "github.com/ethersphere/swarm/testutil" -) - -const ( - sectionSize = 32 - chunkSize = 4096 -) - -func init() { - testutil.Init() -} - -func TestCache(t *testing.T) { - c := NewCache() - c.Init(context.Background(), func(error) {}) - _, data := testutil.SerialData(chunkSize, 255, 0) - c.Write(data) - cachedData := c.Get(0) - if !bytes.Equal(cachedData, data) { - t.Fatalf("cache data; expected %x, got %x", data, cachedData) - } -} - -func TestCacheLink(t *testing.T) { - - hashFunc := NewBMTHasherFunc(0) - - c := NewCache() - c.Init(context.Background(), func(error) {}) - c.SetWriter(hashFunc) - _, data := testutil.SerialData(chunkSize, 255, 0) - c.SeekSection(-1) - c.Write(data) - ref := c.Sum(nil) - refHex := hexutil.Encode(ref) - correctRefHex := "0xc10090961e7682a10890c334d759a28426647141213abda93b096b892824d2ef" - if refHex != correctRefHex { - t.Fatalf("cache link; expected %s, got %s", correctRefHex, refHex) - } - - c.Delete(0) - if _, ok := c.data[0]; ok { - t.Fatalf("delete; expected not found") - } -} From 3ed82d6650aa60dfa097f8bbee122d0bc015a419 Mon Sep 17 00:00:00 2001 From: nolash Date: Wed, 1 Apr 2020 10:13:15 +0200 Subject: [PATCH 7/9] file: Delint - remove unused functions --- file/hasher/common_test.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/file/hasher/common_test.go b/file/hasher/common_test.go index cb513e6b45..4c8f223550 100644 --- a/file/hasher/common_test.go +++ b/file/hasher/common_test.go @@ -82,13 +82,13 @@ var ( } // placeholder for cases where a hasher is not necessary - noHashFunc = func(_ context.Context) file.SectionWriter { - return nil - } - - logErrFunc = func(err error) { - log.Error("SectionWriter pipeline error", "err", err) - } +// noHashFunc = func(_ context.Context) file.SectionWriter { +// return nil +// } +// +// logErrFunc = func(err error) { +// log.Error("SectionWriter pipeline error", "err", err) +// } ) // simple file.SectionWriter hasher that keeps the data written to it From 531abebf9a9d2a2e85eb3c588382f4cecf95cbba Mon Sep 17 00:00:00 2001 From: nolash Date: Thu, 2 Apr 2020 09:23:55 +0200 Subject: [PATCH 8/9] file: Comments improvements and cleanup --- bmt/bmt.go | 7 ------ file/hasher/common_test.go | 18 +++++--------- file/hasher/index.go | 23 ++++++++---------- file/hasher/job.go | 48 ++++++++++++++++++++++++-------------- file/hasher/target.go | 11 ++++++++- file/hasher/util.go | 12 +++++----- file/testutillocal/hash.go | 1 + file/types.go | 18 ++++++++------ go.mod | 2 ++ go.sum | 5 ++++ 10 files changed, 81 insertions(+), 64 deletions(-) diff --git a/bmt/bmt.go b/bmt/bmt.go index 5a632b7e3e..37353aeb3f 100644 --- a/bmt/bmt.go +++ b/bmt/bmt.go @@ -287,13 +287,6 @@ func newTree(segmentSize, depth int, hashfunc func() hash.Hash) *tree { } } -// -//// SetWriter implements file.SectionWriter -//func (h *Hasher) SetWriter(_ file.SectionWriterFunc) file.SectionWriter { -// log.Warn("Synchasher does not currently support SectionWriter chaining") -// return h -//} - // SectionSize implements file.SectionWriter func (h *Hasher) SectionSize() int { return h.pool.SegmentSize diff --git a/file/hasher/common_test.go b/file/hasher/common_test.go index 4c8f223550..fba203e690 100644 --- a/file/hasher/common_test.go +++ b/file/hasher/common_test.go @@ -80,15 +80,6 @@ var ( dummyHashFunc = func(_ context.Context) file.SectionWriter { return newDummySectionWriter(chunkSize*branches, sectionSize, sectionSize, branches) } - - // placeholder for cases where a hasher is not necessary -// noHashFunc = func(_ context.Context) file.SectionWriter { -// return nil -// } -// -// logErrFunc = func(err error) { -// log.Error("SectionWriter pipeline error", "err", err) -// } ) // simple file.SectionWriter hasher that keeps the data written to it @@ -109,6 +100,7 @@ type dummySectionWriter struct { wg sync.WaitGroup } +// dummySectionWriter constructor func newDummySectionWriter(cp int, sectionSize int, digestSize int, branches int) *dummySectionWriter { log.Trace("creating dummy writer", "sectionsize", sectionSize, "digestsize", digestSize, "branches", branches) return &dummySectionWriter{ @@ -121,9 +113,7 @@ func newDummySectionWriter(cp int, sectionSize int, digestSize int, branches int } } -func (d *dummySectionWriter) Init(_ context.Context, _ func(error)) { -} - +// implements file.SectionWriter func (d *dummySectionWriter) SetWriter(_ file.SectionWriterFunc) file.SectionWriter { log.Error("dummySectionWriter does not support SectionWriter chaining") return d @@ -175,6 +165,7 @@ func (d *dummySectionWriter) Sum(_ []byte) []byte { return d.digest } +// invokes sum on the underlying writer func (d *dummySectionWriter) sum() { d.mu.Lock() defer d.mu.Unlock() @@ -221,10 +212,12 @@ func (d *dummySectionWriter) Branches() int { return d.branches } +// returns true if hasher is written to the capacity limit func (d *dummySectionWriter) isFull() bool { return d.size == d.sectionSize*d.branches } +// implements file.SectionWriter func (d *dummySectionWriter) SumIndexed(b []byte, l int) []byte { //log.Trace("dummy sum indexed", "d", d.data[:l], "l", l, "b", b, "s", d.span) d.writer.Write(d.span) @@ -232,6 +225,7 @@ func (d *dummySectionWriter) SumIndexed(b []byte, l int) []byte { return d.writer.Sum(b) } +// implements file.SectionWriter func (d *dummySectionWriter) WriteIndexed(i int, b []byte) { //log.Trace("dummy write indexed", "i", i, "b", len(b)) copy(d.data[i*d.sectionSize:], b) diff --git a/file/hasher/index.go b/file/hasher/index.go index 9e8ab21071..acf8033766 100644 --- a/file/hasher/index.go +++ b/file/hasher/index.go @@ -5,7 +5,7 @@ import ( "sync" ) -// keeps an index of all the existing jobs for a file hashing operation +// jobIndex keeps an index of all the existing jobs for a file hashing operation // sorted by level // // it also keeps all the "top hashes", ie hashes on first data section index of every level @@ -18,6 +18,7 @@ type jobIndex struct { mu sync.Mutex } +// jobIndex constructor func newJobIndex(maxLevels int) *jobIndex { ji := &jobIndex{ maxLevels: maxLevels, @@ -33,16 +34,14 @@ func (ji *jobIndex) String() string { return fmt.Sprintf("%p", ji) } -// Add adds a job to the index at the level -// and data section index specified in the job +// Add adds a job to the index at the level and data section index specified in the job func (ji *jobIndex) Add(jb *job) { - //log.Trace("adding job", "job", jb) ji.jobs[jb.level].Store(jb.dataSection, jb) } -// Get retrieves a job from the job index -// based on the level of the job and its data section index -// if a job for the level and section index does not exist this method returns nil +// Get retrieves a job from the job index based on the level of the job and its data section index +// +// If a job for the level and section index does not exist this method returns nil func (ji *jobIndex) Get(lvl int, section int) *job { jb, ok := ji.jobs[lvl].Load(section) if !ok { @@ -51,21 +50,18 @@ func (ji *jobIndex) Get(lvl int, section int) *job { return jb.(*job) } -// Delete removes a job from the job index -// leaving it to be garbage collected when -// the reference in the main code is relinquished +// Delete removes a job from the job index leaving it to be garbage collected when the reference in the main code is relinquished func (ji *jobIndex) Delete(jb *job) { ji.jobs[jb.level].Delete(jb.dataSection) } // AddTopHash should be called by a job when a hash is written to the first index of a level -// since the job doesn't store any data written to it (just passing it through to the underlying writer) -// this is needed for the edge case of balanced trees +// +// Since the job doesn't store any data written to it (just passing it through to the underlying writer) this is needed for the edge case of balanced trees func (ji *jobIndex) AddTopHash(ref []byte) { ji.mu.Lock() defer ji.mu.Unlock() ji.topHashes = append(ji.topHashes, ref) - //log.Trace("added top hash", "length", len(ji.topHashes), "index", ji) } // GetJobHash gets the current top hash for a particular level set by AddTopHash @@ -75,6 +71,7 @@ func (ji *jobIndex) GetTopHash(lvl int) []byte { return ji.topHashes[lvl-1] } +// GetTopHashLevel gets the level of the current topmost hash func (ji *jobIndex) GetTopHashLevel() int { ji.mu.Lock() defer ji.mu.Unlock() diff --git a/file/hasher/job.go b/file/hasher/job.go index 5356d57922..22aeb5c107 100644 --- a/file/hasher/job.go +++ b/file/hasher/job.go @@ -10,14 +10,14 @@ import ( "github.com/ethersphere/swarm/log" ) -// necessary metadata across asynchronous input +// jobUnit stores the necessary metadata for the asynchronous processing of a single chunk type jobUnit struct { index int data []byte count int } -// encapsulates one single intermediate chunk to be hashed +// encapsulates one single intermediate chunk to be processed type job struct { target *target params *treeParams @@ -30,13 +30,14 @@ type job struct { lastSectionSize int // data size on the last data section write firstSectionData []byte // store first section of data written to solve the dangling chunk edge case - writeC chan jobUnit - writer file.SectionWriter - doneC chan struct{} // pointer to target doneC channel, set to nil in process() when closed + writeC chan jobUnit // receives data fullyprocessed by the underlying writer + writer file.SectionWriter // underlying data processor (eg. hasher) + doneC chan struct{} // pointer to target doneC channel, set to nil in process() when closed mu sync.Mutex } +// job constructor func newJob(params *treeParams, tgt *target, jobIndex *jobIndex, lvl int, dataSection int) *job { jb := &job{ params: params, @@ -66,6 +67,7 @@ func newJob(params *treeParams, tgt *target, jobIndex *jobIndex, lvl int, dataSe return jb } +// starts the asynchronous chunk processor that dispatches data to the underlying writer func (jb *job) start() { jb.writer = jb.params.GetWriter() go jb.process() @@ -87,8 +89,9 @@ func (jb *job) count() int { } // size returns the byte size of the span the job represents -// if job is last index in a level and writes have been finalized, it will return the target size -// otherwise, regardless of job index, it will return the size according to the current write count +// +// If job is last index in a level and writes have been finalized, it will return the target size. Otherwise, regardless of job index, it will return the size according to the current write count +// // TODO: returning expected size in one case and actual size in another can lead to confusion func (jb *job) size() int { jb.mu.Lock() @@ -103,9 +106,11 @@ func (jb *job) size() int { } // add data to job -// does no checking for data length or index validity +// +// Note: Does no checking for data length or index validity +// // TODO: rename index param not to confuse with index object -func (jb *job) write(index int, data []byte) { +func (jb *job) write(idx int, data []byte) { jb.inc() @@ -113,7 +118,7 @@ func (jb *job) write(index int, data []byte) { // in case of a balanced tree and we need to send it to resultC later // at the time of hasing of a balanced tree we have no way of knowing for sure whether // that is the end of the job or not - if jb.dataSection == 0 && index == 0 { + if jb.dataSection == 0 && idx == 0 { topHashLevel := jb.index.GetTopHashLevel() if topHashLevel < jb.level { log.Trace("have tophash", "level", jb.level, "ref", hexutil.Encode(data)) @@ -121,12 +126,15 @@ func (jb *job) write(index int, data []byte) { } } jb.writeC <- jobUnit{ - index: index, + index: idx, data: data, } } +// process asynchronously handles chunk processing to the underlying writer +// // runs in loop until: +// // - sectionSize number of job writes have occurred (one full chunk) // - data write is finalized and targetcount for this chunk was already reached // - data write is finalized and targetcount is reached on a subsequent job write @@ -213,6 +221,7 @@ OUTER: jb.sum() } +// sum retrieves the sum generated by the underlying writer and writes it to the corresponding section in the level above func (jb *job) sum() { targetLevel := jb.target.Level() @@ -223,9 +232,7 @@ func (jb *job) sum() { // get the size of the span and execute the hash digest of the content size := jb.size() - //span := bmt.LengthToSpan(size) refSize := jb.count() * jb.params.SectionSize - //jb.writer.SetLength(refSize) jb.writer.SetSpan(size) log.Trace("job sum", "count", jb.count(), "refsize", refSize, "size", size, "datasection", jb.dataSection, "level", jb.level, "targetlevel", targetLevel, "endcount", jb.endCount) ref := jb.writer.SumIndexed(nil, refSize) @@ -284,8 +291,9 @@ func (jb *job) targetWithinJob(targetSection int) (int, bool) { return endIndex, ok } -// if last data index falls within the span, return the appropriate end count for the level -// otherwise return 0 (which means job write until limit) +// If last data index falls within the span, return the appropriate end count for the level +// +// Otherwise return 0, which means "job write until limit" func (jb *job) targetCountToEndCount(targetCount int) int { endIndex, ok := jb.targetWithinJob(targetCount - 1) if !ok { @@ -294,8 +302,11 @@ func (jb *job) targetCountToEndCount(targetCount int) int { return endIndex + 1 } -// returns the parent job of the receiver job -// a new parent job is created if none exists for the slot +// Returns the parent job of the receiver job +// +// A new parent job is created if none exists for the slot +// +// TODO: consider renaming to getOrCreateParent func (jb *job) parent() *job { jb.index.mu.Lock() defer jb.index.mu.Unlock() @@ -312,7 +323,8 @@ func (jb *job) parent() *job { } // Next creates the job for the next data section span on the same level as the receiver job -// this is only meant to be called once for each job, consecutive calls will overwrite index with new empty job +// +// This is only meant to be called once for each job, consecutive calls will overwrite index with new empty job func (jb *job) Next() *job { jbn := newJob(jb.params, jb.target, jb.index, jb.level, jb.dataSection+jb.params.Spans[jb.level+1]) jbn.start() diff --git a/file/hasher/target.go b/file/hasher/target.go index 9d566fa490..a6194308f3 100644 --- a/file/hasher/target.go +++ b/file/hasher/target.go @@ -6,7 +6,7 @@ import ( "github.com/ethersphere/swarm/log" ) -// passed to a job to determine at which data lengths and levels a job should terminate +// target is passed to a job to determine at which data lengths and levels a job should terminate type target struct { size int32 // bytes written sections int32 // sections written @@ -16,6 +16,7 @@ type target struct { mu sync.Mutex } +// target constructor func newTarget() *target { return &target{ resultC: make(chan []byte), @@ -24,6 +25,7 @@ func newTarget() *target { } // Set is called when the final length of the data to be written is known +// // TODO: method can be simplified to calculate sections and level internally func (t *target) Set(size int, sections int, level int) { t.mu.Lock() @@ -36,6 +38,7 @@ func (t *target) Set(size int, sections int, level int) { } // Count returns the total section count for the target +// // it should only be called after Set() func (t *target) Count() int { t.mu.Lock() @@ -43,12 +46,18 @@ func (t *target) Count() int { return int(t.sections) + 1 } +// Level returns the level of the target +// +// it should only be called after Set() func (t *target) Level() int { t.mu.Lock() defer t.mu.Unlock() return int(t.level) } +// Size returns the byte count for the target +// +// it should only be called after Set() func (t *target) Size() int { t.mu.Lock() defer t.mu.Unlock() diff --git a/file/hasher/util.go b/file/hasher/util.go index da23a03957..e55f78fa5d 100644 --- a/file/hasher/util.go +++ b/file/hasher/util.go @@ -4,8 +4,9 @@ import ( "math" ) -// TODO: level 0 should be SectionSize() not Branches() // generates a dictionary of maximum span lengths per level represented by one SectionSize() of data +// +// TODO: level 0 should be SectionSize() not Branches() func generateSpanSizes(branches int, levels int) []int { spans := make([]int, levels) span := 1 @@ -16,9 +17,9 @@ func generateSpanSizes(branches int, levels int) []int { return spans } +// calculate the last level index which a particular data section count will result in. The returned level will be the level of the root hash +// // TODO: use params instead of sectionSize, branches -// calculate the last level index which a particular data section count will result in. -// the returned level will be the level of the root hash func getLevelsFromLength(l int, sectionSize int, branches int) int { if l == 0 { return 0 @@ -47,12 +48,11 @@ func dataSectionToLevelSection(p *treeParams, lvl int, sections int) int { } // calculates the lower data section boundary of a level for which a data section is contained -// the higher level use is to determine whether the final data section written falls within -// a certain level's span +// +// the higher level use is to determine whether the final data section written falls within a certain level's span func dataSectionToLevelBoundary(p *treeParams, lvl int, section int) int { span := p.Spans[lvl+1] spans := section / span spanBytes := spans * span - //log.Trace("levelboundary", "spans", spans, "section", section, "span", span) return spanBytes } diff --git a/file/testutillocal/hash.go b/file/testutillocal/hash.go index 79dc767a60..f1decde683 100644 --- a/file/testutillocal/hash.go +++ b/file/testutillocal/hash.go @@ -13,6 +13,7 @@ var ( branches = 128 ) +// NewBMTHasherFunc is a test helper that creates a new asynchronous hasher with a specified poolsize func NewBMTHasherFunc(poolSize int) file.SectionWriterFunc { if poolSize == 0 { poolSize = bmt.PoolSize diff --git a/file/types.go b/file/types.go index b8a39403ad..0d711bdbdd 100644 --- a/file/types.go +++ b/file/types.go @@ -5,14 +5,18 @@ import ( "hash" ) +// SectionWriterFunc defines the required function signature to be used to create a SectionWriter type SectionWriterFunc func(ctx context.Context) SectionWriter +// SectionWriter is a chainable interface for processing of chunk data +// +// Implementations should pass data to underlying writer before performing their own sum calculations type SectionWriter interface { - hash.Hash // Write,Sum,Reset,Size,BlockSize - SetWriter(hashFunc SectionWriterFunc) SectionWriter // chain another SectionWriter the current instance - SetSpan(length int) // set data span of chunk - SectionSize() int // section size of this SectionWriter - Branches() int // branch factor of this SectionWriter - SumIndexed([]byte, int) []byte - WriteIndexed(int, []byte) + hash.Hash // Write,Sum,Reset,Size,BlockSize + SetWriter(hashFunc SectionWriterFunc) SectionWriter // chain another SectionWriter the current instance + SetSpan(length int) // set data span of chunk + SectionSize() int // section size of this SectionWriter + Branches() int // branch factor of this SectionWriter + SumIndexed(prepended_data []byte, span_length int) []byte // Blocking call returning the sum of the data from underlying writer, setting the final data length to span_length + WriteIndexed(int, []byte) // Write to a particular data section, enabling asynchronous writing to any position of any level } diff --git a/go.mod b/go.mod index a64125cf9a..ea828c11a3 100644 --- a/go.mod +++ b/go.mod @@ -36,6 +36,7 @@ require ( github.com/morikuni/aec v0.0.0-20170113033406-39771216ff4c // indirect github.com/naoina/go-stringutil v0.1.0 // indirect github.com/naoina/toml v0.0.0-20170918210437-9fafd6967416 + github.com/nolash/mockbytes v0.0.0-20200401133213-4cadbd154ff6 // indirect github.com/opencontainers/go-digest v1.0.0-rc1 // indirect github.com/opencontainers/image-spec v1.0.1 // indirect github.com/opencontainers/runc v0.1.1 // indirect @@ -51,6 +52,7 @@ require ( github.com/uber/jaeger-client-go v0.0.0-20180607151842-f7e0d4744fa6 github.com/uber/jaeger-lib v0.0.0-20180615202729-a51202d6f4a7 // indirect github.com/vbauerster/mpb v3.4.0+incompatible + gitlab.com/nolash/go-mockbytes v0.0.2 // indirect go.uber.org/atomic v1.4.0 // indirect golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 golang.org/x/net v0.0.0-20190724013045-ca1201d0de80 diff --git a/go.sum b/go.sum index 3f65912b67..8ea748f65f 100644 --- a/go.sum +++ b/go.sum @@ -66,6 +66,7 @@ github.com/elastic/gosigar v0.0.0-20180330100440-37f05ff46ffa/go.mod h1:cdorVVzy github.com/elazarl/goproxy v0.0.0-20170405201442-c4fc26588b6e/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= github.com/ethereum/go-ethereum v1.9.2 h1:RMIHDO/diqXEgORSVzYx8xW9x2+S32PoAX5lQwya0Lw= github.com/ethereum/go-ethereum v1.9.2/go.mod h1:PwpWDrCLZrV+tfrhqqF6kPknbISMHaJv9Ln3kPCZLwY= +github.com/ethersphere/bmt v0.0.0-20200401123433-67173b18a663 h1:MXvgOFHffdlMLiwKrfiVIda9/OCBZhjus02w9z3rIds= github.com/ethersphere/go-sw3 v0.2.1 h1:+i660uWzhRbT1YO7MAeuxzn+jUeYOTc8rGRVjsKaw+4= github.com/ethersphere/go-sw3 v0.2.1/go.mod h1:HukT0aZ6QdW/d7zuD/0g5xlw6ewu9QeqHojxLDsaERQ= github.com/evanphx/json-patch v4.2.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk= @@ -186,6 +187,8 @@ github.com/naoina/go-stringutil v0.1.0 h1:rCUeRUHjBjGTSHl0VC00jUPLz8/F9dDzYI70Hz github.com/naoina/go-stringutil v0.1.0/go.mod h1:XJ2SJL9jCtBh+P9q5btrd/Ylo8XwT/h1USek5+NqSA0= github.com/naoina/toml v0.0.0-20170918210437-9fafd6967416 h1:9M852Z3gvzUmyFvy+TruhDWCwcIG3cZWg/+Eh8VkR7M= github.com/naoina/toml v0.0.0-20170918210437-9fafd6967416/go.mod h1:NBIhNtsFMo3G2szEBne+bO4gS192HuIYRqfvOWb4i1E= +github.com/nolash/mockbytes v0.0.0-20200401133213-4cadbd154ff6 h1:3VxaUbphSn8+igZjC75xdy4okZdRbE9u8QlfgrzIty4= +github.com/nolash/mockbytes v0.0.0-20200401133213-4cadbd154ff6/go.mod h1:yRWDp8wo0IXx2FReB9wTh/0E8oezBIDFdhYE1qUQMM0= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/olekukonko/tablewriter v0.0.0-20190409134802-7e037d187b0c h1:2j4kdCOg5xiOVCTQpv0SgbzndaVJKliD6oRbMxTw6v4= github.com/olekukonko/tablewriter v0.0.0-20190409134802-7e037d187b0c/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= @@ -273,6 +276,8 @@ github.com/vbauerster/mpb v3.4.0+incompatible h1:mfiiYw87ARaeRW6x5gWwYRUawxaW1tL github.com/vbauerster/mpb v3.4.0+incompatible/go.mod h1:zAHG26FUhVKETRu+MWqYXcI70POlC6N8up9p1dID7SU= github.com/wsddn/go-ecdh v0.0.0-20161211032359-48726bab9208 h1:1cngl9mPEoITZG8s8cVcUy5CeIBYhEESkOB7m6Gmkrk= github.com/wsddn/go-ecdh v0.0.0-20161211032359-48726bab9208/go.mod h1:IotVbo4F+mw0EzQ08zFqg7pK3FebNXpaMsRy2RT+Ees= +gitlab.com/nolash/go-mockbytes v0.0.2 h1:qGF5eH+eBxCfNdw+6v5U1Z4ddkPOdVx01Aczp10jCEY= +gitlab.com/nolash/go-mockbytes v0.0.2/go.mod h1:3iOEPU9bRvF/FBBmiBdWCxvuPmKqQ6cPoX8Ro2yoILc= go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= From 0430576e3d791b2ad6e401fd482123b061d6cf10 Mon Sep 17 00:00:00 2001 From: nolash Date: Thu, 2 Apr 2020 09:31:13 +0200 Subject: [PATCH 9/9] file: Document wrapper for reference hahser --- file/hasher/job_test.go | 3 --- file/hasher/reference_test.go | 4 ++++ 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/file/hasher/job_test.go b/file/hasher/job_test.go index f99284e4bc..1ed4e20d48 100644 --- a/file/hasher/job_test.go +++ b/file/hasher/job_test.go @@ -584,15 +584,12 @@ func benchmarkJob(b *testing.B) { jb := newJob(treeParams, tgt, nil, 1, 0) jb.start() count := 0 - //log.Info("test vector", "length", dataLength) for i := 0; i < dataLength; i += chunkSize { ie := i + chunkSize if ie > dataLength { ie = dataLength } - //writeSize := ie - i dataHash.Reset() - //dataHash.SetLength(writeSize) c, err := dataHash.Write(data[i:ie]) if err != nil { jb.destroy() diff --git a/file/hasher/reference_test.go b/file/hasher/reference_test.go index e1c0a1d7a7..4d4c4e974e 100644 --- a/file/hasher/reference_test.go +++ b/file/hasher/reference_test.go @@ -15,19 +15,23 @@ import ( "golang.org/x/crypto/sha3" ) +// referenceBmtWrapper encapsulates the bmt hasher in order to implement the SectionWriter interface type referenceBmtWrapper struct { *bmt.Hasher } +// implements file.SectionWriter func (r *referenceBmtWrapper) SetWriter(hashFunc file.SectionWriterFunc) file.SectionWriter { log.Warn("BMT hasher does not currently support SectionWriter chaining") return r } +// implements file.SectionWriter func (r *referenceBmtWrapper) SumIndexed(b []byte, _ int) []byte { return r.Sum(b) } +// implements file.SectionWriter func (r *referenceBmtWrapper) WriteIndexed(_ int, b []byte) { r.Write(b) }