From 5e67c668d2949e326e26b262d2d0b19c642a134a Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Tue, 28 Nov 2023 22:29:45 +0100 Subject: [PATCH 1/8] Change algorithm to precalculate hashes --- app/ldiff/diff.go | 93 +++++++++-------- app/ldiff/diff_test.go | 153 +++++++++++++++++++++++++--- app/ldiff/hashrange.go | 221 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 406 insertions(+), 61 deletions(-) create mode 100644 app/ldiff/hashrange.go diff --git a/app/ldiff/diff.go b/app/ldiff/diff.go index 03f3c567..6bf0df50 100644 --- a/app/ldiff/diff.go +++ b/app/ldiff/diff.go @@ -9,11 +9,12 @@ import ( "context" "encoding/hex" "errors" + "math" + "sync" + "github.com/cespare/xxhash" "github.com/huandu/skiplist" "github.com/zeebo/blake3" - "math" - "sync" ) // New creates new Diff container @@ -42,6 +43,9 @@ func New(divideFactor, compareThreshold int) Diff { compareThreshold: compareThreshold, } d.sl = skiplist.New(d) + d.ranges = newHashRanges(divideFactor, compareThreshold, d.sl) + d.ranges.dirty[d.ranges.topRange] = struct{}{} + d.ranges.recalculateHashes() return d } @@ -62,7 +66,7 @@ type Element struct { // Range request to get RangeResult type Range struct { From, To uint64 - Limit int + Elements bool } // RangeResult response for Range @@ -108,6 +112,7 @@ type diff struct { sl *skiplist.SkipList divideFactor int compareThreshold int + ranges *hashRanges mu sync.RWMutex } @@ -140,10 +145,13 @@ func (d *diff) Set(elements ...Element) { d.mu.Lock() defer d.mu.Unlock() for _, e := range elements { - el := &element{Element: e, hash: xxhash.Sum64([]byte(e.Id))} + hash := xxhash.Sum64([]byte(e.Id)) + el := &element{Element: e, hash: hash} d.sl.Remove(el) d.sl.Set(el, nil) + d.ranges.addElement(hash) } + d.ranges.recalculateHashes() } func (d *diff) Ids() (ids []string) { @@ -198,51 +206,42 @@ func (d *diff) Element(id string) (Element, error) { func (d *diff) Hash() string { d.mu.RLock() defer d.mu.RUnlock() - res := d.getRange(Range{To: math.MaxUint64}) - return hex.EncodeToString(res.Hash) + return hex.EncodeToString(d.ranges.hash()) } // RemoveId removes element by id func (d *diff) RemoveId(id string) error { d.mu.Lock() defer d.mu.Unlock() + hash := xxhash.Sum64([]byte(id)) el := &element{Element: Element{ Id: id, - }, hash: xxhash.Sum64([]byte(id))} + }, hash: hash} if d.sl.Remove(el) == nil { return ErrElementNotFound } + d.ranges.removeElement(hash) + d.ranges.recalculateHashes() return nil } func (d *diff) getRange(r Range) (rr RangeResult) { - hasher := hashersPool.Get().(*blake3.Hasher) - defer hashersPool.Put(hasher) - hasher.Reset() + rng := d.ranges.getRange(r.From, r.To) + rr.Count = rng.elements + if rng != nil { + rr.Hash = rng.hash + if !r.Elements && rng.isDivided { + return + } + } el := d.sl.Find(&element{hash: r.From}) - rr.Elements = make([]Element, 0, r.Limit) - var overfill bool + rr.Elements = make([]Element, 0, d.divideFactor) for el != nil && el.Key().(*element).hash <= r.To { elem := el.Key().(*element).Element el = el.Next() - - hasher.WriteString(elem.Id) - hasher.WriteString(elem.Head) - rr.Count++ - if !overfill { - if len(rr.Elements) < r.Limit { - rr.Elements = append(rr.Elements, elem) - } - if len(rr.Elements) == r.Limit && el != nil { - overfill = true - } - } + rr.Elements = append(rr.Elements, elem) } - if overfill { - rr.Elements = nil - } - rr.Hash = hasher.Sum(nil) return } @@ -271,9 +270,8 @@ var errMismatched = errors.New("query and results mismatched") func (d *diff) Diff(ctx context.Context, dl Remote) (newIds, changedIds, removedIds []string, err error) { dctx := &diffCtx{} dctx.toSend = append(dctx.toSend, Range{ - From: 0, - To: math.MaxUint64, - Limit: d.compareThreshold, + From: 0, + To: math.MaxUint64, }) for len(dctx.toSend) > 0 { select { @@ -307,26 +305,25 @@ func (d *diff) compareResults(dctx *diffCtx, r Range, myRes, otherRes RangeResul return } - // both has elements - if len(myRes.Elements) == myRes.Count && len(otherRes.Elements) == otherRes.Count { - d.compareElements(dctx, myRes.Elements, otherRes.Elements) + // other has elements + if len(otherRes.Elements) == otherRes.Count { + if len(myRes.Elements) == myRes.Count { + d.compareElements(dctx, myRes.Elements, otherRes.Elements) + } else { + r.Elements = true + d.compareElements(dctx, d.getRange(r).Elements, otherRes.Elements) + } return } - - // make more queries - divideFactor := uint64(d.divideFactor) - perRange := (r.To - r.From) / divideFactor - align := ((r.To-r.From)%divideFactor + 1) % divideFactor - if align == 0 { - perRange += 1 + // request all elements from other, because we don't have enough + if len(myRes.Elements) == myRes.Count { + r.Elements = true + dctx.prepare = append(dctx.prepare, r) + return } - var j = r.From - for i := 0; i < d.divideFactor; i++ { - if i == d.divideFactor-1 { - perRange += align - } - dctx.prepare = append(dctx.prepare, Range{From: j, To: j + perRange - 1, Limit: r.Limit}) - j += perRange + rangeTuples := genTupleRanges(r.From, r.To, d.divideFactor) + for _, tuple := range rangeTuples { + dctx.prepare = append(dctx.prepare, Range{From: tuple.from, To: tuple.to}) } return } diff --git a/app/ldiff/diff_test.go b/app/ldiff/diff_test.go index e21c151e..ff6cde7e 100644 --- a/app/ldiff/diff_test.go +++ b/app/ldiff/diff_test.go @@ -3,12 +3,13 @@ package ldiff import ( "context" "fmt" - "github.com/google/uuid" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" "math" "sort" "testing" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestDiff_fillRange(t *testing.T) { @@ -23,17 +24,10 @@ func TestDiff_fillRange(t *testing.T) { t.Log(d.sl.Len()) t.Run("elements", func(t *testing.T) { - r := Range{From: 0, To: math.MaxUint64, Limit: 10} - res := d.getRange(r) - assert.NotNil(t, res.Hash) - assert.Len(t, res.Elements, 10) - }) - t.Run("hash", func(t *testing.T) { - r := Range{From: 0, To: math.MaxUint64, Limit: 9} + r := Range{From: 0, To: math.MaxUint64} res := d.getRange(r) - t.Log(len(res.Elements)) assert.NotNil(t, res.Hash) - assert.Nil(t, res.Elements) + assert.Equal(t, res.Count, 10) }) } @@ -77,6 +71,73 @@ func TestDiff_Diff(t *testing.T) { assert.Len(t, changedIds, 1) assert.Len(t, removedIds, 1) }) + t.Run("complex", func(t *testing.T) { + d1 := New(16, 128) + d2 := New(16, 128) + length := 10000 + for i := 0; i < length; i++ { + id := fmt.Sprint(i) + head := uuid.NewString() + d1.Set(Element{ + Id: id, + Head: head, + }) + } + + newIds, changedIds, removedIds, err := d1.Diff(ctx, d2) + require.NoError(t, err) + assert.Len(t, newIds, 0) + assert.Len(t, changedIds, 0) + assert.Len(t, removedIds, length) + + for i := 0; i < length; i++ { + id := fmt.Sprint(i) + head := uuid.NewString() + d2.Set(Element{ + Id: id, + Head: head, + }) + } + + newIds, changedIds, removedIds, err = d1.Diff(ctx, d2) + require.NoError(t, err) + assert.Len(t, newIds, 0) + assert.Len(t, changedIds, length) + assert.Len(t, removedIds, 0) + + for i := 0; i < length; i++ { + id := fmt.Sprint(i) + head := uuid.NewString() + d2.Set(Element{ + Id: id, + Head: head, + }) + } + + res, err := d1.Ranges( + context.Background(), + []Range{{From: 0, To: math.MaxUint64, Elements: true}}, + nil) + require.NoError(t, err) + require.Len(t, res, 1) + for i, el := range res[0].Elements { + if i < length/2 { + continue + } + id := el.Id + head := el.Head + d2.Set(Element{ + Id: id, + Head: head, + }) + } + + newIds, changedIds, removedIds, err = d1.Diff(ctx, d2) + require.NoError(t, err) + assert.Len(t, newIds, 0) + assert.Len(t, changedIds, length/2) + assert.Len(t, removedIds, 0) + }) t.Run("empty", func(t *testing.T) { d1 := New(16, 16) d2 := New(16, 16) @@ -133,7 +194,7 @@ func BenchmarkDiff_Ranges(b *testing.B) { b.ResetTimer() b.ReportAllocs() var resBuf []RangeResult - var ranges = []Range{{From: 0, To: math.MaxUint64, Limit: 10}} + var ranges = []Range{{From: 0, To: math.MaxUint64}} for i := 0; i < b.N; i++ { d.Ranges(ctx, ranges, resBuf) resBuf = resBuf[:0] @@ -197,3 +258,69 @@ func TestDiff_Elements(t *testing.T) { }) assert.Equal(t, els, gotEls) } + +func TestRangesAddRemove(t *testing.T) { + length := 10000 + divideFactor := 4 + compareThreshold := 4 + addTwice := func() string { + d := New(divideFactor, compareThreshold) + var els []Element + for i := 0; i < length; i++ { + if i < length/20 { + continue + } + els = append(els, Element{ + Id: fmt.Sprint(i), + Head: fmt.Sprint("h", i), + }) + } + d.Set(els...) + els = els[:0] + for i := 0; i < length/20; i++ { + els = append(els, Element{ + Id: fmt.Sprint(i), + Head: fmt.Sprint("h", i), + }) + } + d.Set(els...) + return d.Hash() + } + addOnce := func() string { + d := New(divideFactor, compareThreshold) + var els []Element + for i := 0; i < length; i++ { + els = append(els, Element{ + Id: fmt.Sprint(i), + Head: fmt.Sprint("h", i), + }) + } + d.Set(els...) + return d.Hash() + } + addRemove := func() string { + d := New(divideFactor, compareThreshold) + var els []Element + for i := 0; i < length; i++ { + els = append(els, Element{ + Id: fmt.Sprint(i), + Head: fmt.Sprint("h", i), + }) + } + d.Set(els...) + for i := 0; i < length/20; i++ { + err := d.RemoveId(fmt.Sprint(i)) + require.NoError(t, err) + } + els = els[:0] + for i := 0; i < length/20; i++ { + els = append(els, Element{ + Id: fmt.Sprint(i), + Head: fmt.Sprint("h", i), + }) + } + d.Set(els...) + return d.Hash() + } + require.Equal(t, addTwice(), addOnce(), addRemove()) +} diff --git a/app/ldiff/hashrange.go b/app/ldiff/hashrange.go new file mode 100644 index 00000000..ef47197b --- /dev/null +++ b/app/ldiff/hashrange.go @@ -0,0 +1,221 @@ +package ldiff + +import ( + "math" + + "github.com/huandu/skiplist" + "github.com/zeebo/blake3" + "golang.org/x/exp/slices" +) + +type hashRange struct { + from, to uint64 + parent *hashRange + isDivided bool + elements int + level int + hash []byte +} + +type rangeTuple struct { + from, to uint64 +} + +type hashRanges struct { + ranges map[rangeTuple]*hashRange + topRange *hashRange + sl *skiplist.SkipList + dirty map[*hashRange]struct{} + divideFactor int + compareThreshold int +} + +func newHashRanges(divideFactor, compareThreshold int, sl *skiplist.SkipList) *hashRanges { + h := &hashRanges{ + ranges: make(map[rangeTuple]*hashRange), + dirty: make(map[*hashRange]struct{}), + divideFactor: divideFactor, + compareThreshold: compareThreshold, + sl: sl, + } + h.topRange = &hashRange{ + from: 0, + to: math.MaxUint64, + isDivided: true, + level: 0, + } + h.ranges[rangeTuple{from: 0, to: math.MaxUint64}] = h.topRange + h.makeBottomRanges(h.topRange) + return h +} + +func (h *hashRanges) hash() []byte { + return h.topRange.hash +} + +func (h *hashRanges) addElement(elHash uint64) { + rng := h.topRange + rng.elements++ + for rng.isDivided { + rng = h.getBottomRange(rng, elHash) + rng.elements++ + } + h.dirty[rng] = struct{}{} + if rng.elements > h.compareThreshold { + rng.isDivided = true + h.makeBottomRanges(rng) + } + if rng.parent != nil { + if _, ok := h.dirty[rng.parent]; ok { + delete(h.dirty, rng.parent) + } + } +} + +func (h *hashRanges) removeElement(elHash uint64) { + rng := h.topRange + rng.elements-- + for rng.isDivided { + rng = h.getBottomRange(rng, elHash) + rng.elements-- + } + parent := rng.parent + if parent.elements <= h.compareThreshold && parent != h.topRange { + ranges := genTupleRanges(parent.from, parent.to, h.divideFactor) + for _, tuple := range ranges { + child := h.ranges[tuple] + delete(h.ranges, tuple) + delete(h.dirty, child) + } + parent.isDivided = false + h.dirty[parent] = struct{}{} + } else { + h.dirty[rng] = struct{}{} + } +} + +func (h *hashRanges) recalculateHashes() { + for len(h.dirty) > 0 { + var slDirty []*hashRange + for rng := range h.dirty { + slDirty = append(slDirty, rng) + } + slices.SortFunc(slDirty, func(a, b *hashRange) int { + if a.level < b.level { + return -1 + } else if a.level > b.level { + return 1 + } else { + return 0 + } + }) + for _, rng := range slDirty { + if rng.isDivided { + rng.hash = h.calcDividedHash(rng) + } else { + rng.hash, rng.elements = h.calcElementsHash(rng.from, rng.to) + } + delete(h.dirty, rng) + if rng.parent != nil { + h.dirty[rng.parent] = struct{}{} + } + } + } +} + +func (h *hashRanges) getRange(from, to uint64) *hashRange { + return h.ranges[rangeTuple{from: from, to: to}] +} + +func (h *hashRanges) getBottomRange(rng *hashRange, elHash uint64) *hashRange { + df := uint64(h.divideFactor) + perRange := (rng.to - rng.from) / df + align := ((rng.to-rng.from)%df + 1) % df + if align == 0 { + perRange++ + } + bucket := (elHash - rng.from) / perRange + tuple := rangeTuple{from: rng.from + bucket*perRange, to: rng.from - 1 + (bucket+1)*perRange} + if bucket == df-1 { + tuple.to += align + } + return h.ranges[tuple] +} + +func (h *hashRanges) makeBottomRanges(rng *hashRange) { + ranges := genTupleRanges(rng.from, rng.to, h.divideFactor) + for _, tuple := range ranges { + newRange := h.makeRange(tuple, rng) + h.ranges[tuple] = newRange + if newRange.elements > h.compareThreshold { + if _, ok := h.dirty[rng]; ok { + delete(h.dirty, rng) + } + h.dirty[newRange] = struct{}{} + newRange.isDivided = true + h.makeBottomRanges(newRange) + } + } +} + +func (h *hashRanges) makeRange(tuple rangeTuple, parent *hashRange) *hashRange { + newRange := &hashRange{ + from: tuple.from, + to: tuple.to, + parent: parent, + } + hash, els := h.calcElementsHash(tuple.from, tuple.to) + newRange.hash = hash + newRange.level = parent.level + 1 + newRange.elements = els + return newRange +} + +func (h *hashRanges) calcDividedHash(rng *hashRange) (hash []byte) { + hasher := hashersPool.Get().(*blake3.Hasher) + defer hashersPool.Put(hasher) + hasher.Reset() + ranges := genTupleRanges(rng.from, rng.to, h.divideFactor) + for _, tuple := range ranges { + child := h.ranges[tuple] + hasher.Write(child.hash) + } + hash = hasher.Sum(nil) + return +} + +func genTupleRanges(from, to uint64, divideFactor int) (prepare []rangeTuple) { + df := uint64(divideFactor) + perRange := (to - from) / df + align := ((to-from)%df + 1) % df + if align == 0 { + perRange++ + } + var j = from + for i := 0; i < divideFactor; i++ { + if i == divideFactor-1 { + perRange += align + } + prepare = append(prepare, rangeTuple{from: j, to: j + perRange - 1}) + j += perRange + } + return +} + +func (h *hashRanges) calcElementsHash(from, to uint64) (hash []byte, els int) { + hasher := hashersPool.Get().(*blake3.Hasher) + defer hashersPool.Put(hasher) + hasher.Reset() + + el := h.sl.Find(&element{hash: from}) + for el != nil && el.Key().(*element).hash <= to { + elem := el.Key().(*element).Element + el = el.Next() + + hasher.WriteString(elem.Id) + hasher.WriteString(elem.Head) + els++ + } + hash = hasher.Sum(nil) + return +} From a8a62b092308acceabf675dbf635dd937ec9e1de Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Fri, 1 Dec 2023 00:23:24 +0100 Subject: [PATCH 2/8] Diff container --- app/ldiff/diff.go | 21 +- app/ldiff/diff_test.go | 28 +- app/ldiff/diffcontainer.go | 54 +++ app/ldiff/mock_ldiff/mock_ldiff.go | 116 ++++++- app/ldiff/olddiff.go | 311 ++++++++++++++++++ commonspace/headsync/diffsyncer.go | 39 ++- commonspace/headsync/diffsyncer_test.go | 33 +- commonspace/headsync/headsync.go | 22 +- commonspace/headsync/headsync_test.go | 22 +- commonspace/headsync/remotediff.go | 58 +++- commonspace/spacestorage/inmemorystorage.go | 17 +- .../mock_spacestorage/mock_spacestorage.go | 29 ++ commonspace/spacestorage/spacestorage.go | 2 + .../spacesyncproto/protos/spacesync.proto | 9 + commonspace/spacesyncproto/spacesync.pb.go | 304 ++++++++++++----- 15 files changed, 933 insertions(+), 132 deletions(-) create mode 100644 app/ldiff/diffcontainer.go create mode 100644 app/ldiff/olddiff.go diff --git a/app/ldiff/diff.go b/app/ldiff/diff.go index 6bf0df50..388e796b 100644 --- a/app/ldiff/diff.go +++ b/app/ldiff/diff.go @@ -1,7 +1,7 @@ // Package ldiff provides a container of elements with fixed id and changeable content. // Diff can calculate the difference with another diff container (you can make it remote) with minimum hops and traffic. // -//go:generate mockgen -destination mock_ldiff/mock_ldiff.go github.com/anyproto/any-sync/app/ldiff Diff,Remote +//go:generate mockgen -destination mock_ldiff/mock_ldiff.go github.com/anyproto/any-sync/app/ldiff Diff,Remote,DiffContainer package ldiff import ( @@ -15,9 +15,11 @@ import ( "github.com/cespare/xxhash" "github.com/huandu/skiplist" "github.com/zeebo/blake3" + + "github.com/anyproto/any-sync/commonspace/spacesyncproto" ) -// New creates new Diff container +// New creates precalculated Diff container // // divideFactor - means how many hashes you want to ask for once // @@ -32,6 +34,10 @@ import ( // // Less threshold and divideFactor - less traffic but more requests func New(divideFactor, compareThreshold int) Diff { + return newDiff(divideFactor, compareThreshold) +} + +func newDiff(divideFactor, compareThreshold int) *diff { if divideFactor < 2 { divideFactor = 2 } @@ -67,6 +73,7 @@ type Element struct { type Range struct { From, To uint64 Elements bool + Limit int } // RangeResult response for Range @@ -100,6 +107,8 @@ type Diff interface { Hash() string // Len returns count of elements in the diff Len() int + // DiffType returns type of diff + DiffType() spacesyncproto.DiffType } // Remote interface for using in the Diff @@ -227,9 +236,10 @@ func (d *diff) RemoveId(id string) error { func (d *diff) getRange(r Range) (rr RangeResult) { rng := d.ranges.getRange(r.From, r.To) - rr.Count = rng.elements + // if we have the division for this range if rng != nil { rr.Hash = rng.hash + rr.Count = rng.elements if !r.Elements && rng.isDivided { return } @@ -242,6 +252,7 @@ func (d *diff) getRange(r Range) (rr RangeResult) { el = el.Next() rr.Elements = append(rr.Elements, elem) } + rr.Count = len(rr.Elements) return } @@ -266,6 +277,10 @@ type diffCtx struct { var errMismatched = errors.New("query and results mismatched") +func (d *diff) DiffType() spacesyncproto.DiffType { + return spacesyncproto.DiffType_Precalculated +} + // Diff makes diff with remote container func (d *diff) Diff(ctx context.Context, dl Remote) (newIds, changedIds, removedIds []string, err error) { dctx := &diffCtx{} diff --git a/app/ldiff/diff_test.go b/app/ldiff/diff_test.go index ff6cde7e..f9501b34 100644 --- a/app/ldiff/diff_test.go +++ b/app/ldiff/diff_test.go @@ -150,19 +150,41 @@ func TestDiff_Diff(t *testing.T) { t.Run("one empty", func(t *testing.T) { d1 := New(4, 4) d2 := New(4, 4) - for i := 0; i < 10; i++ { + length := 10000 + for i := 0; i < length; i++ { d2.Set(Element{ Id: fmt.Sprint(i), Head: uuid.NewString(), }) } - newIds, changedIds, removedIds, err := d1.Diff(ctx, d2) require.NoError(t, err) - assert.Len(t, newIds, 10) + assert.Len(t, newIds, length) assert.Len(t, changedIds, 0) assert.Len(t, removedIds, 0) }) + t.Run("not intersecting", func(t *testing.T) { + d1 := New(16, 16) + d2 := New(16, 16) + length := 10000 + for i := 0; i < length; i++ { + d1.Set(Element{ + Id: fmt.Sprint(i), + Head: uuid.NewString(), + }) + } + for i := length; i < length*2; i++ { + d2.Set(Element{ + Id: fmt.Sprint(i), + Head: uuid.NewString(), + }) + } + newIds, changedIds, removedIds, err := d1.Diff(ctx, d2) + require.NoError(t, err) + assert.Len(t, newIds, length) + assert.Len(t, changedIds, 0) + assert.Len(t, removedIds, length) + }) t.Run("context cancel", func(t *testing.T) { d1 := New(4, 4) d2 := New(4, 4) diff --git a/app/ldiff/diffcontainer.go b/app/ldiff/diffcontainer.go new file mode 100644 index 00000000..d6b43b34 --- /dev/null +++ b/app/ldiff/diffcontainer.go @@ -0,0 +1,54 @@ +package ldiff + +import "context" + +type RemoteTypeChecker interface { + DiffTypeCheck(ctx context.Context, diffContainer DiffContainer) (needsSync bool, diff Diff, err error) +} + +type DiffContainer interface { + DiffTypeCheck(ctx context.Context, typeChecker RemoteTypeChecker) (needsSync bool, diff Diff, err error) + InitialDiff() Diff + PrecalculatedDiff() Diff + Set(elements ...Element) + RemoveId(id string) error +} + +type diffContainer struct { + initial *olddiff + precalculated *diff +} + +func (d *diffContainer) InitialDiff() Diff { + return d.initial +} + +func (d *diffContainer) PrecalculatedDiff() Diff { + return d.precalculated +} + +func (d *diffContainer) Set(elements ...Element) { + d.initial.mu.Lock() + defer d.initial.mu.Unlock() + d.precalculated.Set(elements...) +} + +func (d *diffContainer) RemoveId(id string) error { + d.initial.mu.Lock() + defer d.initial.mu.Unlock() + return d.precalculated.RemoveId(id) +} + +func (d *diffContainer) DiffTypeCheck(ctx context.Context, typeChecker RemoteTypeChecker) (needsSync bool, diff Diff, err error) { + return typeChecker.DiffTypeCheck(ctx, d) +} + +func NewDiffContainer(divideFactor, compareThreshold int) DiffContainer { + newDiff := newDiff(divideFactor, compareThreshold) + // this was for old diffs + oldDiff := newOldDiff(16, 16, newDiff.sl) + return &diffContainer{ + initial: oldDiff, + precalculated: newDiff, + } +} diff --git a/app/ldiff/mock_ldiff/mock_ldiff.go b/app/ldiff/mock_ldiff/mock_ldiff.go index f26cba47..f06cf74d 100644 --- a/app/ldiff/mock_ldiff/mock_ldiff.go +++ b/app/ldiff/mock_ldiff/mock_ldiff.go @@ -1,9 +1,9 @@ // Code generated by MockGen. DO NOT EDIT. -// Source: github.com/anyproto/any-sync/app/ldiff (interfaces: Diff,Remote) +// Source: github.com/anyproto/any-sync/app/ldiff (interfaces: Diff,Remote,DiffContainer) // // Generated by this command: // -// mockgen -destination mock_ldiff/mock_ldiff.go github.com/anyproto/any-sync/app/ldiff Diff,Remote +// mockgen -destination mock_ldiff/mock_ldiff.go github.com/anyproto/any-sync/app/ldiff Diff,Remote,DiffContainer // // Package mock_ldiff is a generated GoMock package. package mock_ldiff @@ -13,6 +13,7 @@ import ( reflect "reflect" ldiff "github.com/anyproto/any-sync/app/ldiff" + spacesyncproto "github.com/anyproto/any-sync/commonspace/spacesyncproto" gomock "go.uber.org/mock/gomock" ) @@ -56,6 +57,20 @@ func (mr *MockDiffMockRecorder) Diff(arg0, arg1 any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Diff", reflect.TypeOf((*MockDiff)(nil).Diff), arg0, arg1) } +// DiffType mocks base method. +func (m *MockDiff) DiffType() spacesyncproto.DiffType { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DiffType") + ret0, _ := ret[0].(spacesyncproto.DiffType) + return ret0 +} + +// DiffType indicates an expected call of DiffType. +func (mr *MockDiffMockRecorder) DiffType() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DiffType", reflect.TypeOf((*MockDiff)(nil).DiffType)) +} + // Element mocks base method. func (m *MockDiff) Element(arg0 string) (ldiff.Element, error) { m.ctrl.T.Helper() @@ -209,3 +224,100 @@ func (mr *MockRemoteMockRecorder) Ranges(arg0, arg1, arg2 any) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Ranges", reflect.TypeOf((*MockRemote)(nil).Ranges), arg0, arg1, arg2) } + +// MockDiffContainer is a mock of DiffContainer interface. +type MockDiffContainer struct { + ctrl *gomock.Controller + recorder *MockDiffContainerMockRecorder +} + +// MockDiffContainerMockRecorder is the mock recorder for MockDiffContainer. +type MockDiffContainerMockRecorder struct { + mock *MockDiffContainer +} + +// NewMockDiffContainer creates a new mock instance. +func NewMockDiffContainer(ctrl *gomock.Controller) *MockDiffContainer { + mock := &MockDiffContainer{ctrl: ctrl} + mock.recorder = &MockDiffContainerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockDiffContainer) EXPECT() *MockDiffContainerMockRecorder { + return m.recorder +} + +// DiffTypeCheck mocks base method. +func (m *MockDiffContainer) DiffTypeCheck(arg0 context.Context, arg1 ldiff.RemoteTypeChecker) (bool, ldiff.Diff, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DiffTypeCheck", arg0, arg1) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(ldiff.Diff) + ret2, _ := ret[2].(error) + return ret0, ret1, ret2 +} + +// DiffTypeCheck indicates an expected call of DiffTypeCheck. +func (mr *MockDiffContainerMockRecorder) DiffTypeCheck(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DiffTypeCheck", reflect.TypeOf((*MockDiffContainer)(nil).DiffTypeCheck), arg0, arg1) +} + +// InitialDiff mocks base method. +func (m *MockDiffContainer) InitialDiff() ldiff.Diff { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "InitialDiff") + ret0, _ := ret[0].(ldiff.Diff) + return ret0 +} + +// InitialDiff indicates an expected call of InitialDiff. +func (mr *MockDiffContainerMockRecorder) InitialDiff() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InitialDiff", reflect.TypeOf((*MockDiffContainer)(nil).InitialDiff)) +} + +// PrecalculatedDiff mocks base method. +func (m *MockDiffContainer) PrecalculatedDiff() ldiff.Diff { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "PrecalculatedDiff") + ret0, _ := ret[0].(ldiff.Diff) + return ret0 +} + +// PrecalculatedDiff indicates an expected call of PrecalculatedDiff. +func (mr *MockDiffContainerMockRecorder) PrecalculatedDiff() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PrecalculatedDiff", reflect.TypeOf((*MockDiffContainer)(nil).PrecalculatedDiff)) +} + +// RemoveId mocks base method. +func (m *MockDiffContainer) RemoveId(arg0 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RemoveId", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// RemoveId indicates an expected call of RemoveId. +func (mr *MockDiffContainerMockRecorder) RemoveId(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveId", reflect.TypeOf((*MockDiffContainer)(nil).RemoveId), arg0) +} + +// Set mocks base method. +func (m *MockDiffContainer) Set(arg0 ...ldiff.Element) { + m.ctrl.T.Helper() + varargs := []any{} + for _, a := range arg0 { + varargs = append(varargs, a) + } + m.ctrl.Call(m, "Set", varargs...) +} + +// Set indicates an expected call of Set. +func (mr *MockDiffContainerMockRecorder) Set(arg0 ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Set", reflect.TypeOf((*MockDiffContainer)(nil).Set), arg0...) +} diff --git a/app/ldiff/olddiff.go b/app/ldiff/olddiff.go new file mode 100644 index 00000000..71f2f0a8 --- /dev/null +++ b/app/ldiff/olddiff.go @@ -0,0 +1,311 @@ +package ldiff + +import ( + "bytes" + "context" + "encoding/hex" + "math" + "sync" + "sync/atomic" + + "github.com/cespare/xxhash" + "github.com/huandu/skiplist" + "github.com/zeebo/blake3" + + "github.com/anyproto/any-sync/commonspace/spacesyncproto" +) + +type olddiff struct { + sl *skiplist.SkipList + divideFactor int + compareThreshold int + hashIsDirty atomic.Bool + hash []byte + mu sync.RWMutex +} + +func newOldDiff(divideFactor, compareThreshold int, sl *skiplist.SkipList) *olddiff { + if divideFactor < 2 { + divideFactor = 2 + } + if compareThreshold < 1 { + compareThreshold = 1 + } + d := &olddiff{ + divideFactor: divideFactor, + compareThreshold: compareThreshold, + sl: sl, + } + return d +} + +// Compare implements skiplist interface +func (d *olddiff) Compare(lhs, rhs interface{}) int { + lhe := lhs.(*element) + rhe := rhs.(*element) + if lhe.Id == rhe.Id { + return 0 + } + if lhe.hash > rhe.hash { + return 1 + } else if lhe.hash < rhe.hash { + return -1 + } + if lhe.Id > rhe.Id { + return 1 + } else { + return -1 + } +} + +// CalcScore implements skiplist interface +func (d *olddiff) CalcScore(key interface{}) float64 { + return 0 +} + +// Set adds or update element in container +func (d *olddiff) Set(elements ...Element) { + d.mu.Lock() + defer d.mu.Unlock() + d.hashIsDirty.Store(true) + for _, e := range elements { + el := &element{Element: e, hash: xxhash.Sum64([]byte(e.Id))} + d.sl.Remove(el) + d.sl.Set(el, nil) + } +} + +func (d *olddiff) Ids() (ids []string) { + d.mu.RLock() + defer d.mu.RUnlock() + + ids = make([]string, 0, d.sl.Len()) + + cur := d.sl.Front() + for cur != nil { + el := cur.Key().(*element).Element + ids = append(ids, el.Id) + cur = cur.Next() + } + return +} + +func (d *olddiff) Len() int { + d.mu.RLock() + defer d.mu.RUnlock() + return d.sl.Len() +} + +func (d *olddiff) Elements() (elements []Element) { + d.mu.RLock() + defer d.mu.RUnlock() + + elements = make([]Element, 0, d.sl.Len()) + + cur := d.sl.Front() + for cur != nil { + el := cur.Key().(*element).Element + elements = append(elements, el) + cur = cur.Next() + } + return +} + +func (d *olddiff) Element(id string) (Element, error) { + d.mu.RLock() + defer d.mu.RUnlock() + el := d.sl.Get(&element{Element: Element{Id: id}, hash: xxhash.Sum64([]byte(id))}) + if el == nil { + return Element{}, ErrElementNotFound + } + if e, ok := el.Key().(*element); ok { + return e.Element, nil + } + return Element{}, ErrElementNotFound +} + +func (d *olddiff) Hash() string { + return hex.EncodeToString(d.caclHash()) +} + +func (d *olddiff) caclHash() []byte { + if d.hashIsDirty.Load() { + // this can be a lengthy operation + d.mu.RLock() + res := d.getRange(Range{To: math.MaxUint64}) + d.mu.RUnlock() + // saving it under write lock + d.mu.Lock() + d.hashIsDirty.Store(false) + d.hash = res.Hash + d.mu.Unlock() + return res.Hash + } + d.mu.RLock() + defer d.mu.RUnlock() + return d.hash +} + +// RemoveId removes element by id +func (d *olddiff) RemoveId(id string) error { + d.mu.Lock() + defer d.mu.Unlock() + d.hashIsDirty.Store(true) + el := &element{Element: Element{ + Id: id, + }, hash: xxhash.Sum64([]byte(id))} + if d.sl.Remove(el) == nil { + return ErrElementNotFound + } + return nil +} + +func (d *olddiff) DiffType() spacesyncproto.DiffType { + return spacesyncproto.DiffType_Initial +} + +func (d *olddiff) getRange(r Range) (rr RangeResult) { + hasher := hashersPool.Get().(*blake3.Hasher) + defer hashersPool.Put(hasher) + hasher.Reset() + + el := d.sl.Find(&element{hash: r.From}) + rr.Elements = make([]Element, 0, r.Limit) + var overfill bool + for el != nil && el.Key().(*element).hash <= r.To { + elem := el.Key().(*element).Element + el = el.Next() + + hasher.WriteString(elem.Id) + hasher.WriteString(elem.Head) + rr.Count++ + if !overfill { + if len(rr.Elements) < r.Limit { + rr.Elements = append(rr.Elements, elem) + } + if len(rr.Elements) == r.Limit && el != nil { + overfill = true + } + } + } + if overfill { + rr.Elements = nil + } + rr.Hash = hasher.Sum(nil) + return +} + +// Ranges calculates given ranges and return results +func (d *olddiff) Ranges(ctx context.Context, ranges []Range, resBuf []RangeResult) (results []RangeResult, err error) { + if len(ranges) == 1 && d.isFullRange(ranges[0]) { + return []RangeResult{{ + Hash: d.caclHash(), + Count: 1, + }}, nil + } + d.mu.RLock() + defer d.mu.RUnlock() + + results = resBuf[:0] + for _, r := range ranges { + results = append(results, d.getRange(r)) + } + return +} + +func (d *olddiff) isFullRange(r Range) bool { + return r.From == 0 && r.To == math.MaxUint64 +} + +// Diff makes diff with remote container +func (d *olddiff) Diff(ctx context.Context, dl Remote) (newIds, changedIds, removedIds []string, err error) { + dctx := &diffCtx{} + dctx.toSend = append(dctx.toSend, Range{ + From: 0, + To: math.MaxUint64, + Limit: d.compareThreshold, + }) + for len(dctx.toSend) > 0 { + select { + case <-ctx.Done(): + err = ctx.Err() + return + default: + } + if dctx.otherRes, err = dl.Ranges(ctx, dctx.toSend, dctx.otherRes); err != nil { + return + } + if dctx.myRes, err = d.Ranges(ctx, dctx.toSend, dctx.myRes); err != nil { + return + } + if len(dctx.otherRes) != len(dctx.toSend) || len(dctx.myRes) != len(dctx.toSend) { + err = errMismatched + return + } + for i, r := range dctx.toSend { + d.compareResults(dctx, r, dctx.myRes[i], dctx.otherRes[i]) + } + dctx.toSend, dctx.prepare = dctx.prepare, dctx.toSend + dctx.prepare = dctx.prepare[:0] + } + return dctx.newIds, dctx.changedIds, dctx.removedIds, nil +} + +func (d *olddiff) compareResults(dctx *diffCtx, r Range, myRes, otherRes RangeResult) { + // both hash equals - do nothing + if bytes.Equal(myRes.Hash, otherRes.Hash) { + return + } + + // both has elements + if len(myRes.Elements) == myRes.Count && len(otherRes.Elements) == otherRes.Count { + d.compareElements(dctx, myRes.Elements, otherRes.Elements) + return + } + + // make more queries + divideFactor := uint64(d.divideFactor) + perRange := (r.To - r.From) / divideFactor + align := ((r.To-r.From)%divideFactor + 1) % divideFactor + if align == 0 { + perRange += 1 + } + var j = r.From + for i := 0; i < d.divideFactor; i++ { + if i == d.divideFactor-1 { + perRange += align + } + dctx.prepare = append(dctx.prepare, Range{From: j, To: j + perRange - 1, Limit: r.Limit}) + j += perRange + } + return +} + +func (d *olddiff) compareElements(dctx *diffCtx, my, other []Element) { + find := func(list []Element, targetEl Element) (has, eq bool) { + for _, el := range list { + if el.Id == targetEl.Id { + return true, el.Head == targetEl.Head + } + } + return false, false + } + + for _, el := range my { + has, eq := find(other, el) + if !has { + dctx.removedIds = append(dctx.removedIds, el.Id) + continue + } else { + if !eq { + dctx.changedIds = append(dctx.changedIds, el.Id) + } + } + } + + for _, el := range other { + if has, _ := find(my, el); !has { + dctx.newIds = append(dctx.newIds, el.Id) + } + } +} diff --git a/commonspace/headsync/diffsyncer.go b/commonspace/headsync/diffsyncer.go index bdf070b7..50228bee 100644 --- a/commonspace/headsync/diffsyncer.go +++ b/commonspace/headsync/diffsyncer.go @@ -2,9 +2,12 @@ package headsync import ( "context" - "github.com/anyproto/any-sync/commonspace/object/treesyncer" "time" + "github.com/anyproto/any-sync/commonspace/object/treesyncer" + + "go.uber.org/zap" + "github.com/anyproto/any-sync/app/ldiff" "github.com/anyproto/any-sync/app/logger" "github.com/anyproto/any-sync/commonspace/credentialprovider" @@ -19,7 +22,6 @@ import ( "github.com/anyproto/any-sync/net/peer" "github.com/anyproto/any-sync/net/rpc/rpcerr" "github.com/anyproto/any-sync/util/slice" - "go.uber.org/zap" ) type DiffSyncer interface { @@ -33,7 +35,7 @@ const logPeriodSecs = 200 func newDiffSyncer(hs *headSync) DiffSyncer { return &diffSyncer{ - diff: hs.diff, + diffContainer: hs.diffContainer, spaceId: hs.spaceId, storage: hs.storage, peerManager: hs.peerManager, @@ -49,7 +51,7 @@ func newDiffSyncer(hs *headSync) DiffSyncer { type diffSyncer struct { spaceId string - diff ldiff.Diff + diffContainer ldiff.DiffContainer peerManager peermanager.PeerManager treeManager treemanager.TreeManager treeSyncer treesyncer.TreeSyncer @@ -68,9 +70,9 @@ func (d *diffSyncer) Init() { func (d *diffSyncer) RemoveObjects(ids []string) { for _, id := range ids { - _ = d.diff.RemoveId(id) + _ = d.diffContainer.RemoveId(id) } - if err := d.storage.WriteSpaceHash(d.diff.Hash()); err != nil { + if err := d.storage.WriteSpaceHash(d.diffContainer.PrecalculatedDiff().Hash()); err != nil { d.log.Error("can't write space hash", zap.Error(err)) } } @@ -79,11 +81,11 @@ func (d *diffSyncer) UpdateHeads(id string, heads []string) { if d.deletionState.Exists(id) { return } - d.diff.Set(ldiff.Element{ + d.diffContainer.Set(ldiff.Element{ Id: id, Head: concatStrings(heads), }) - if err := d.storage.WriteSpaceHash(d.diff.Hash()); err != nil { + if err := d.storage.WriteSpaceHash(d.diffContainer.PrecalculatedDiff().Hash()); err != nil { d.log.Error("can't write space hash", zap.Error(err)) } } @@ -119,17 +121,26 @@ func (d *diffSyncer) syncWithPeer(ctx context.Context, p peer.Peer) (err error) defer p.ReleaseDrpcConn(conn) var ( - cl = d.clientFactory.Client(conn) - rdiff = NewRemoteDiff(d.spaceId, cl) - stateCounter = d.syncStatus.StateCounter() - syncAclId = d.syncAcl.Id() + cl = d.clientFactory.Client(conn) + rdiff = NewRemoteDiff(d.spaceId, cl) + stateCounter = d.syncStatus.StateCounter() + syncAclId = d.syncAcl.Id() + newIds, changedIds, removedIds []string ) - - newIds, changedIds, removedIds, err := d.diff.Diff(ctx, rdiff) + // getting correct diff and checking if we need to continue sync + // we do this through diffContainer for the sake of testing + needsSync, diff, err := d.diffContainer.DiffTypeCheck(ctx, rdiff) err = rpcerr.Unwrap(err) if err != nil { return d.onDiffError(ctx, p, cl, err) } + if needsSync { + newIds, changedIds, removedIds, err = diff.Diff(ctx, rdiff) + err = rpcerr.Unwrap(err) + if err != nil { + return d.onDiffError(ctx, p, cl, err) + } + } d.syncStatus.SetNodesStatus(p.Id(), syncstatus.Online) totalLen := len(newIds) + len(changedIds) + len(removedIds) diff --git a/commonspace/headsync/diffsyncer_test.go b/commonspace/headsync/diffsyncer_test.go index 986f5240..cf5af467 100644 --- a/commonspace/headsync/diffsyncer_test.go +++ b/commonspace/headsync/diffsyncer_test.go @@ -7,6 +7,10 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + "storj.io/drpc" + "github.com/anyproto/any-sync/app/ldiff" "github.com/anyproto/any-sync/commonspace/object/acl/liststorage/mock_liststorage" "github.com/anyproto/any-sync/commonspace/object/tree/treechangeproto" @@ -14,9 +18,6 @@ import ( "github.com/anyproto/any-sync/commonspace/spacesyncproto" "github.com/anyproto/any-sync/consensus/consensusproto" "github.com/anyproto/any-sync/net/peer" - "github.com/stretchr/testify/require" - "go.uber.org/mock/gomock" - "storj.io/drpc" ) type pushSpaceRequestMatcher struct { @@ -112,10 +113,13 @@ func TestDiffSyncer(t *testing.T) { fx.initDiffSyncer(t) defer fx.stop() mPeer := mockPeer{} + remDiff := NewRemoteDiff(fx.spaceState.SpaceId, fx.clientMock) fx.aclMock.EXPECT().Id().AnyTimes().Return("aclId") fx.peerManagerMock.EXPECT(). GetResponsiblePeers(gomock.Any()). Return([]peer.Peer{mPeer}, nil) + fx.diffContainerMock.EXPECT(). + DiffTypeCheck(gomock.Any(), gomock.Eq(remDiff)).Return(true, fx.diffMock, nil) fx.diffMock.EXPECT(). Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(fx.spaceState.SpaceId, fx.clientMock))). Return([]string{"new"}, []string{"changed"}, nil, nil) @@ -131,12 +135,15 @@ func TestDiffSyncer(t *testing.T) { fx.initDiffSyncer(t) defer fx.stop() mPeer := mockPeer{} + remDiff := NewRemoteDiff(fx.spaceState.SpaceId, fx.clientMock) fx.aclMock.EXPECT().Id().AnyTimes().Return("aclId") fx.peerManagerMock.EXPECT(). GetResponsiblePeers(gomock.Any()). Return([]peer.Peer{mPeer}, nil) + fx.diffContainerMock.EXPECT(). + DiffTypeCheck(gomock.Any(), gomock.Eq(remDiff)).Return(true, fx.diffMock, nil) fx.diffMock.EXPECT(). - Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(fx.spaceState.SpaceId, fx.clientMock))). + Diff(gomock.Any(), gomock.Eq(remDiff)). Return([]string{"new"}, []string{"changed"}, nil, nil) fx.deletionStateMock.EXPECT().Filter([]string{"new"}).Return([]string{"new"}).Times(1) fx.deletionStateMock.EXPECT().Filter([]string{"changed"}).Return([]string{"changed", "aclId"}).Times(1) @@ -178,10 +185,11 @@ func TestDiffSyncer(t *testing.T) { newHeads := []string{"h1", "h2"} hash := "hash" fx.aclMock.EXPECT().Id().AnyTimes().Return("aclId") - fx.diffMock.EXPECT().Set(ldiff.Element{ + fx.diffContainerMock.EXPECT().Set(ldiff.Element{ Id: newId, Head: concatStrings(newHeads), }) + fx.diffContainerMock.EXPECT().PrecalculatedDiff().Return(fx.diffMock) fx.diffMock.EXPECT().Hash().Return(hash) fx.deletionStateMock.EXPECT().Exists(newId).Return(false) fx.storageMock.EXPECT().WriteSpaceHash(hash) @@ -206,12 +214,15 @@ func TestDiffSyncer(t *testing.T) { spaceHeader := &spacesyncproto.RawSpaceHeaderWithId{} spaceSettingsId := "spaceSettingsId" credential := []byte("credential") + remDiff := NewRemoteDiff(fx.spaceState.SpaceId, fx.clientMock) fx.peerManagerMock.EXPECT(). GetResponsiblePeers(gomock.Any()). Return([]peer.Peer{mockPeer{}}, nil) + fx.diffContainerMock.EXPECT(). + DiffTypeCheck(gomock.Any(), gomock.Eq(remDiff)).Return(true, fx.diffMock, nil) fx.diffMock.EXPECT(). - Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(fx.spaceState.SpaceId, fx.clientMock))). + Diff(gomock.Any(), gomock.Eq(remDiff)). Return(nil, nil, nil, spacesyncproto.ErrSpaceMissing) fx.storageMock.EXPECT().AclStorage().Return(aclStorageMock, nil) @@ -238,12 +249,15 @@ func TestDiffSyncer(t *testing.T) { fx := newHeadSyncFixture(t) fx.initDiffSyncer(t) defer fx.stop() + remDiff := NewRemoteDiff(fx.spaceState.SpaceId, fx.clientMock) fx.aclMock.EXPECT().Id().AnyTimes().Return("aclId") fx.peerManagerMock.EXPECT(). GetResponsiblePeers(gomock.Any()). Return([]peer.Peer{mockPeer{}}, nil) + fx.diffContainerMock.EXPECT(). + DiffTypeCheck(gomock.Any(), gomock.Eq(remDiff)).Return(true, fx.diffMock, nil) fx.diffMock.EXPECT(). - Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(fx.spaceState.SpaceId, fx.clientMock))). + Diff(gomock.Any(), gomock.Eq(remDiff)). Return(nil, nil, nil, spacesyncproto.ErrUnexpected) require.NoError(t, fx.diffSyncer.Sync(ctx)) @@ -254,12 +268,15 @@ func TestDiffSyncer(t *testing.T) { fx.initDiffSyncer(t) defer fx.stop() mPeer := mockPeer{} + remDiff := NewRemoteDiff(fx.spaceState.SpaceId, fx.clientMock) fx.aclMock.EXPECT().Id().AnyTimes().Return("aclId") fx.peerManagerMock.EXPECT(). GetResponsiblePeers(gomock.Any()). Return([]peer.Peer{mPeer}, nil) + fx.diffContainerMock.EXPECT(). + DiffTypeCheck(gomock.Any(), gomock.Eq(remDiff)).Return(true, fx.diffMock, nil) fx.diffMock.EXPECT(). - Diff(gomock.Any(), gomock.Eq(NewRemoteDiff(fx.spaceState.SpaceId, fx.clientMock))). + Diff(gomock.Any(), gomock.Eq(remDiff)). Return(nil, nil, nil, spacesyncproto.ErrSpaceIsDeleted) require.NoError(t, fx.diffSyncer.Sync(ctx)) diff --git a/commonspace/headsync/headsync.go b/commonspace/headsync/headsync.go index 7fde3e6e..45f0a394 100644 --- a/commonspace/headsync/headsync.go +++ b/commonspace/headsync/headsync.go @@ -50,7 +50,7 @@ type headSync struct { periodicSync periodicsync.PeriodicSync storage spacestorage.SpaceStorage - diff ldiff.Diff + diffContainer ldiff.DiffContainer log logger.CtxLogger syncer DiffSyncer configuration nodeconf.NodeConf @@ -77,7 +77,7 @@ func (h *headSync) Init(a *app.App) (err error) { h.configuration = a.MustComponent(nodeconf.CName).(nodeconf.NodeConf) h.log = log.With(zap.String("spaceId", h.spaceId)) h.storage = a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorage) - h.diff = ldiff.New(16, 16) + h.diffContainer = ldiff.NewDiffContainer(16, 128) h.peerManager = a.MustComponent(peermanager.CName).(peermanager.PeerManager) h.credentialProvider = a.MustComponent(credentialprovider.CName).(credentialprovider.CredentialProvider) h.syncStatus = a.MustComponent(syncstatus.CName).(syncstatus.StatusService) @@ -109,7 +109,11 @@ func (h *headSync) Run(ctx context.Context) (err error) { } func (h *headSync) HandleRangeRequest(ctx context.Context, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error) { - return HandleRangeRequest(ctx, h.diff, req) + if req.DiffType == spacesyncproto.DiffType_Precalculated { + return HandleRangeRequest(ctx, h.diffContainer.PrecalculatedDiff(), req) + } else { + return HandleRangeRequest(ctx, h.diffContainer.InitialDiff(), req) + } } func (h *headSync) UpdateHeads(id string, heads []string) { @@ -117,7 +121,7 @@ func (h *headSync) UpdateHeads(id string, heads []string) { } func (h *headSync) AllIds() []string { - return h.diff.Ids() + return h.diffContainer.PrecalculatedDiff().Ids() } func (h *headSync) ExternalIds() []string { @@ -129,7 +133,7 @@ func (h *headSync) ExternalIds() []string { } func (h *headSync) DebugAllHeads() (res []TreeHeads) { - els := h.diff.Elements() + els := h.diffContainer.PrecalculatedDiff().Elements() for _, el := range els { idHead := TreeHeads{ Id: el.Id, @@ -145,6 +149,7 @@ func (h *headSync) RemoveObjects(ids []string) { } func (h *headSync) Close(ctx context.Context) (err error) { + h.storage.WriteOldSpaceHash(h.diffContainer.InitialDiff().Hash()) h.periodicSync.Close() return } @@ -169,8 +174,11 @@ func (h *headSync) fillDiff(objectIds []string) { Id: h.syncAcl.Id(), Head: h.syncAcl.Head().Id, }) - h.diff.Set(els...) - if err := h.storage.WriteSpaceHash(h.diff.Hash()); err != nil { + h.diffContainer.Set(els...) + if err := h.storage.WriteSpaceHash(h.diffContainer.PrecalculatedDiff().Hash()); err != nil { h.log.Error("can't write space hash", zap.Error(err)) } + if err := h.storage.WriteOldSpaceHash(h.diffContainer.InitialDiff().Hash()); err != nil { + h.log.Error("can't write old space hash", zap.Error(err)) + } } diff --git a/commonspace/headsync/headsync_test.go b/commonspace/headsync/headsync_test.go index 064ea0d0..9ab4f2b3 100644 --- a/commonspace/headsync/headsync_test.go +++ b/commonspace/headsync/headsync_test.go @@ -2,6 +2,11 @@ package headsync import ( "context" + "testing" + + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + "github.com/anyproto/any-sync/app" "github.com/anyproto/any-sync/app/ldiff" "github.com/anyproto/any-sync/app/ldiff/mock_ldiff" @@ -28,9 +33,6 @@ import ( "github.com/anyproto/any-sync/commonspace/syncstatus" "github.com/anyproto/any-sync/nodeconf" "github.com/anyproto/any-sync/nodeconf/mock_nodeconf" - "github.com/stretchr/testify/require" - "go.uber.org/mock/gomock" - "testing" ) type mockConfig struct { @@ -62,6 +64,7 @@ type headSyncFixture struct { deletionStateMock *mock_deletionstate.MockObjectDeletionState diffSyncerMock *mock_headsync.MockDiffSyncer treeSyncerMock *mock_treesyncer.MockTreeSyncer + diffContainerMock *mock_ldiff.MockDiffContainer diffMock *mock_ldiff.MockDiff clientMock *mock_spacesyncproto.MockDRPCSpaceSyncClient aclMock *mock_syncacl.MockSyncAcl @@ -90,6 +93,7 @@ func newHeadSyncFixture(t *testing.T) *headSyncFixture { diffSyncerMock := mock_headsync.NewMockDiffSyncer(ctrl) treeSyncerMock := mock_treesyncer.NewMockTreeSyncer(ctrl) treeSyncerMock.EXPECT().Name().AnyTimes().Return(treesyncer.CName) + diffContainerMock := mock_ldiff.NewMockDiffContainer(ctrl) diffMock := mock_ldiff.NewMockDiff(ctrl) clientMock := mock_spacesyncproto.NewMockDRPCSpaceSyncClient(ctrl) aclMock := mock_syncacl.NewMockSyncAcl(ctrl) @@ -123,6 +127,7 @@ func newHeadSyncFixture(t *testing.T) *headSyncFixture { headSync: hs, diffSyncerMock: diffSyncerMock, treeSyncerMock: treeSyncerMock, + diffContainerMock: diffContainerMock, diffMock: diffMock, clientMock: clientMock, aclMock: aclMock, @@ -136,7 +141,7 @@ func (fx *headSyncFixture) init(t *testing.T) { fx.diffSyncerMock.EXPECT().Init() err := fx.headSync.Init(fx.app) require.NoError(t, err) - fx.headSync.diff = fx.diffMock + fx.headSync.diffContainer = fx.diffContainerMock } func (fx *headSyncFixture) stop() { @@ -158,15 +163,22 @@ func TestHeadSync(t *testing.T) { fx.aclMock.EXPECT().Id().AnyTimes().Return("aclId") fx.aclMock.EXPECT().Head().AnyTimes().Return(&list.AclRecord{Id: "headId"}) treeMock.EXPECT().Heads().Return([]string{"h1", "h2"}, nil) - fx.diffMock.EXPECT().Set(ldiff.Element{ + fx.diffContainerMock.EXPECT().Set(ldiff.Element{ Id: "id1", Head: "h1h2", }) + fx.diffContainerMock.EXPECT().PrecalculatedDiff().Return(fx.diffMock) fx.diffMock.EXPECT().Hash().Return("hash") fx.storageMock.EXPECT().WriteSpaceHash("hash").Return(nil) + fx.diffContainerMock.EXPECT().InitialDiff().Return(fx.diffMock) + fx.diffMock.EXPECT().Hash().Return("hash") + fx.storageMock.EXPECT().WriteOldSpaceHash("hash").Return(nil) fx.diffSyncerMock.EXPECT().Sync(gomock.Any()).Return(nil) err := fx.headSync.Run(ctx) require.NoError(t, err) + fx.diffContainerMock.EXPECT().InitialDiff().Return(fx.diffMock) + fx.diffMock.EXPECT().Hash().Return("hash") + fx.storageMock.EXPECT().WriteOldSpaceHash("hash").Return(nil) err = fx.headSync.Close(ctx) require.NoError(t, err) }) diff --git a/commonspace/headsync/remotediff.go b/commonspace/headsync/remotediff.go index a78d1180..25c3e58b 100644 --- a/commonspace/headsync/remotediff.go +++ b/commonspace/headsync/remotediff.go @@ -1,7 +1,11 @@ package headsync import ( + "bytes" "context" + "encoding/hex" + "math" + "github.com/anyproto/any-sync/app/ldiff" "github.com/anyproto/any-sync/commonspace/spacesyncproto" ) @@ -10,7 +14,12 @@ type Client interface { HeadSync(ctx context.Context, in *spacesyncproto.HeadSyncRequest) (*spacesyncproto.HeadSyncResponse, error) } -func NewRemoteDiff(spaceId string, client Client) ldiff.Remote { +type RemoteDiff interface { + ldiff.RemoteTypeChecker + ldiff.Remote +} + +func NewRemoteDiff(spaceId string, client Client) RemoteDiff { return remote{ spaceId: spaceId, client: client, @@ -22,14 +31,46 @@ type remote struct { client Client } +// DiffTypeCheck checks which type of diff should we use +func (r remote) DiffTypeCheck(ctx context.Context, diffContainer ldiff.DiffContainer) (needsSync bool, diff ldiff.Diff, err error) { + req := &spacesyncproto.HeadSyncRequest{ + SpaceId: r.spaceId, + DiffType: spacesyncproto.DiffType_Precalculated, + Ranges: []*spacesyncproto.HeadSyncRange{{From: 0, To: math.MaxUint64}}, + } + resp, err := r.client.HeadSync(ctx, req) + if err != nil { + return + } + needsSync = true + checkHash := func(diff ldiff.Diff) (bool, error) { + hashB, err := hex.DecodeString(diff.Hash()) + if err != nil { + return false, err + } + if len(resp.Results) != 0 && bytes.Equal(hashB, resp.Results[0].Hash) { + return true, nil + } + return false, nil + } + switch resp.DiffType { + case spacesyncproto.DiffType_Precalculated: + diff = diffContainer.PrecalculatedDiff() + needsSync, err = checkHash(diff) + case spacesyncproto.DiffType_Initial: + diff = diffContainer.InitialDiff() + needsSync, err = checkHash(diff) + } + return +} + func (r remote) Ranges(ctx context.Context, ranges []ldiff.Range, resBuf []ldiff.RangeResult) (results []ldiff.RangeResult, err error) { results = resBuf[:0] pbRanges := make([]*spacesyncproto.HeadSyncRange, 0, len(ranges)) for _, rg := range ranges { pbRanges = append(pbRanges, &spacesyncproto.HeadSyncRange{ - From: rg.From, - To: rg.To, - Limit: uint32(rg.Limit), + From: rg.From, + To: rg.To, }) } req := &spacesyncproto.HeadSyncRequest{ @@ -62,11 +103,13 @@ func (r remote) Ranges(ctx context.Context, ranges []ldiff.Range, resBuf []ldiff func HandleRangeRequest(ctx context.Context, d ldiff.Diff, req *spacesyncproto.HeadSyncRequest) (resp *spacesyncproto.HeadSyncResponse, err error) { ranges := make([]ldiff.Range, 0, len(req.Ranges)) + // basically we gather data applicable for both diffs for _, reqRange := range req.Ranges { ranges = append(ranges, ldiff.Range{ - From: reqRange.From, - To: reqRange.To, - Limit: int(reqRange.Limit), + From: reqRange.From, + To: reqRange.To, + Limit: int(reqRange.Limit), + Elements: reqRange.Elements, }) } res, err := d.Ranges(ctx, ranges, nil) @@ -94,5 +137,6 @@ func HandleRangeRequest(ctx context.Context, d ldiff.Diff, req *spacesyncproto.H Count: uint32(rangeRes.Count), }) } + resp.DiffType = d.DiffType() return } diff --git a/commonspace/spacestorage/inmemorystorage.go b/commonspace/spacestorage/inmemorystorage.go index 0550ffbc..1c2dec54 100644 --- a/commonspace/spacestorage/inmemorystorage.go +++ b/commonspace/spacestorage/inmemorystorage.go @@ -2,6 +2,7 @@ package spacestorage import ( "context" + "sync" "github.com/anyproto/any-sync/app" "github.com/anyproto/any-sync/commonspace/object/acl/liststorage" @@ -9,8 +10,6 @@ import ( "github.com/anyproto/any-sync/commonspace/object/tree/treestorage" "github.com/anyproto/any-sync/commonspace/spacesyncproto" "github.com/anyproto/any-sync/consensus/consensusproto" - - "sync" ) type InMemorySpaceStorage struct { @@ -22,6 +21,7 @@ type InMemorySpaceStorage struct { aclStorage liststorage.ListStorage spaceHeader *spacesyncproto.RawSpaceHeaderWithId spaceHash string + oldSpaceHash string sync.Mutex } @@ -162,12 +162,25 @@ func (i *InMemorySpaceStorage) WriteSpaceHash(hash string) error { return nil } +func (i *InMemorySpaceStorage) WriteOldSpaceHash(hash string) error { + i.Lock() + defer i.Unlock() + i.oldSpaceHash = hash + return nil +} + func (i *InMemorySpaceStorage) ReadSpaceHash() (hash string, err error) { i.Lock() defer i.Unlock() return i.spaceHash, nil } +func (i *InMemorySpaceStorage) ReadOldSpaceHash() (hash string, err error) { + i.Lock() + defer i.Unlock() + return i.oldSpaceHash, nil +} + func (i *InMemorySpaceStorage) AllTrees() map[string]treestorage.TreeStorage { i.Lock() defer i.Unlock() diff --git a/commonspace/spacestorage/mock_spacestorage/mock_spacestorage.go b/commonspace/spacestorage/mock_spacestorage/mock_spacestorage.go index 23bed04b..14eb5424 100644 --- a/commonspace/spacestorage/mock_spacestorage/mock_spacestorage.go +++ b/commonspace/spacestorage/mock_spacestorage/mock_spacestorage.go @@ -159,6 +159,21 @@ func (mr *MockSpaceStorageMockRecorder) Name() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Name", reflect.TypeOf((*MockSpaceStorage)(nil).Name)) } +// ReadOldSpaceHash mocks base method. +func (m *MockSpaceStorage) ReadOldSpaceHash() (string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ReadOldSpaceHash") + ret0, _ := ret[0].(string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ReadOldSpaceHash indicates an expected call of ReadOldSpaceHash. +func (mr *MockSpaceStorageMockRecorder) ReadOldSpaceHash() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReadOldSpaceHash", reflect.TypeOf((*MockSpaceStorage)(nil).ReadOldSpaceHash)) +} + // ReadSpaceHash mocks base method. func (m *MockSpaceStorage) ReadSpaceHash() (string, error) { m.ctrl.T.Helper() @@ -305,6 +320,20 @@ func (mr *MockSpaceStorageMockRecorder) TreeStorage(arg0 any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TreeStorage", reflect.TypeOf((*MockSpaceStorage)(nil).TreeStorage), arg0) } +// WriteOldSpaceHash mocks base method. +func (m *MockSpaceStorage) WriteOldSpaceHash(arg0 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WriteOldSpaceHash", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// WriteOldSpaceHash indicates an expected call of WriteOldSpaceHash. +func (mr *MockSpaceStorageMockRecorder) WriteOldSpaceHash(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WriteOldSpaceHash", reflect.TypeOf((*MockSpaceStorage)(nil).WriteOldSpaceHash), arg0) +} + // WriteSpaceHash mocks base method. func (m *MockSpaceStorage) WriteSpaceHash(arg0 string) error { m.ctrl.T.Helper() diff --git a/commonspace/spacestorage/spacestorage.go b/commonspace/spacestorage/spacestorage.go index a0976f3e..9c94944e 100644 --- a/commonspace/spacestorage/spacestorage.go +++ b/commonspace/spacestorage/spacestorage.go @@ -44,7 +44,9 @@ type SpaceStorage interface { HasTree(id string) (bool, error) CreateTreeStorage(payload treestorage.TreeStorageCreatePayload) (treestorage.TreeStorage, error) WriteSpaceHash(hash string) error + WriteOldSpaceHash(hash string) error ReadSpaceHash() (hash string, err error) + ReadOldSpaceHash() (hash string, err error) } type SpaceStorageCreatePayload struct { diff --git a/commonspace/spacesyncproto/protos/spacesync.proto b/commonspace/spacesyncproto/protos/spacesync.proto index 939a680c..643e3d34 100644 --- a/commonspace/spacesyncproto/protos/spacesync.proto +++ b/commonspace/spacesyncproto/protos/spacesync.proto @@ -37,6 +37,7 @@ message HeadSyncRange { uint64 from = 1; uint64 to = 2; uint32 limit = 3; + bool elements = 4; } // HeadSyncResult presenting a response for one range @@ -56,11 +57,13 @@ message HeadSyncResultElement { message HeadSyncRequest { string spaceId = 1; repeated HeadSyncRange ranges = 2; + DiffType diffType = 3; } // HeadSyncResponse is a response for HeadSync message HeadSyncResponse { repeated HeadSyncResult results = 1; + DiffType diffType = 2; } // ObjectSyncMessage is a message sent on object sync @@ -185,4 +188,10 @@ message AclGetRecordsRequest { // AclGetRecordsResponse contains list of marshaled consensusproto.RawRecordWithId message AclGetRecordsResponse { repeated bytes records = 1; +} + +// DiffType is a type of diff +enum DiffType { + Initial = 0; + Precalculated = 1; } \ No newline at end of file diff --git a/commonspace/spacesyncproto/spacesync.pb.go b/commonspace/spacesyncproto/spacesync.pb.go index 1c8e6799..9b091680 100644 --- a/commonspace/spacesyncproto/spacesync.pb.go +++ b/commonspace/spacesyncproto/spacesync.pb.go @@ -94,11 +94,38 @@ func (SpaceSubscriptionAction) EnumDescriptor() ([]byte, []int) { return fileDescriptor_80e49f1f4ac27799, []int{1} } +// DiffType is a type of diff +type DiffType int32 + +const ( + DiffType_Initial DiffType = 0 + DiffType_Precalculated DiffType = 1 +) + +var DiffType_name = map[int32]string{ + 0: "Initial", + 1: "Precalculated", +} + +var DiffType_value = map[string]int32{ + "Initial": 0, + "Precalculated": 1, +} + +func (x DiffType) String() string { + return proto.EnumName(DiffType_name, int32(x)) +} + +func (DiffType) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_80e49f1f4ac27799, []int{2} +} + // HeadSyncRange presenting a request for one range type HeadSyncRange struct { - From uint64 `protobuf:"varint,1,opt,name=from,proto3" json:"from,omitempty"` - To uint64 `protobuf:"varint,2,opt,name=to,proto3" json:"to,omitempty"` - Limit uint32 `protobuf:"varint,3,opt,name=limit,proto3" json:"limit,omitempty"` + From uint64 `protobuf:"varint,1,opt,name=from,proto3" json:"from,omitempty"` + To uint64 `protobuf:"varint,2,opt,name=to,proto3" json:"to,omitempty"` + Limit uint32 `protobuf:"varint,3,opt,name=limit,proto3" json:"limit,omitempty"` + Elements bool `protobuf:"varint,4,opt,name=elements,proto3" json:"elements,omitempty"` } func (m *HeadSyncRange) Reset() { *m = HeadSyncRange{} } @@ -155,6 +182,13 @@ func (m *HeadSyncRange) GetLimit() uint32 { return 0 } +func (m *HeadSyncRange) GetElements() bool { + if m != nil { + return m.Elements + } + return false +} + // HeadSyncResult presenting a response for one range type HeadSyncResult struct { Hash []byte `protobuf:"bytes,1,opt,name=hash,proto3" json:"hash,omitempty"` @@ -271,8 +305,9 @@ func (m *HeadSyncResultElement) GetHead() string { // HeadSyncRequest is a request for HeadSync type HeadSyncRequest struct { - SpaceId string `protobuf:"bytes,1,opt,name=spaceId,proto3" json:"spaceId,omitempty"` - Ranges []*HeadSyncRange `protobuf:"bytes,2,rep,name=ranges,proto3" json:"ranges,omitempty"` + SpaceId string `protobuf:"bytes,1,opt,name=spaceId,proto3" json:"spaceId,omitempty"` + Ranges []*HeadSyncRange `protobuf:"bytes,2,rep,name=ranges,proto3" json:"ranges,omitempty"` + DiffType DiffType `protobuf:"varint,3,opt,name=diffType,proto3,enum=spacesync.DiffType" json:"diffType,omitempty"` } func (m *HeadSyncRequest) Reset() { *m = HeadSyncRequest{} } @@ -322,9 +357,17 @@ func (m *HeadSyncRequest) GetRanges() []*HeadSyncRange { return nil } +func (m *HeadSyncRequest) GetDiffType() DiffType { + if m != nil { + return m.DiffType + } + return DiffType_Initial +} + // HeadSyncResponse is a response for HeadSync type HeadSyncResponse struct { - Results []*HeadSyncResult `protobuf:"bytes,1,rep,name=results,proto3" json:"results,omitempty"` + Results []*HeadSyncResult `protobuf:"bytes,1,rep,name=results,proto3" json:"results,omitempty"` + DiffType DiffType `protobuf:"varint,2,opt,name=diffType,proto3,enum=spacesync.DiffType" json:"diffType,omitempty"` } func (m *HeadSyncResponse) Reset() { *m = HeadSyncResponse{} } @@ -367,6 +410,13 @@ func (m *HeadSyncResponse) GetResults() []*HeadSyncResult { return nil } +func (m *HeadSyncResponse) GetDiffType() DiffType { + if m != nil { + return m.DiffType + } + return DiffType_Initial +} + // ObjectSyncMessage is a message sent on object sync type ObjectSyncMessage struct { SpaceId string `protobuf:"bytes,1,opt,name=spaceId,proto3" json:"spaceId,omitempty"` @@ -1435,6 +1485,7 @@ func (m *AclGetRecordsResponse) GetRecords() [][]byte { func init() { proto.RegisterEnum("spacesync.ErrCodes", ErrCodes_name, ErrCodes_value) proto.RegisterEnum("spacesync.SpaceSubscriptionAction", SpaceSubscriptionAction_name, SpaceSubscriptionAction_value) + proto.RegisterEnum("spacesync.DiffType", DiffType_name, DiffType_value) proto.RegisterType((*HeadSyncRange)(nil), "spacesync.HeadSyncRange") proto.RegisterType((*HeadSyncResult)(nil), "spacesync.HeadSyncResult") proto.RegisterType((*HeadSyncResultElement)(nil), "spacesync.HeadSyncResultElement") @@ -1466,82 +1517,86 @@ func init() { } var fileDescriptor_80e49f1f4ac27799 = []byte{ - // 1197 bytes of a gzipped FileDescriptorProto + // 1261 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xa4, 0x57, 0x4d, 0x6f, 0x1b, 0xc5, - 0x1b, 0xf7, 0x3a, 0xce, 0x8b, 0x9f, 0x6c, 0xdc, 0xed, 0xc4, 0x69, 0xfd, 0x77, 0x23, 0xc7, 0x1a, - 0xfd, 0x85, 0xa2, 0x1e, 0xd2, 0x26, 0x41, 0x48, 0x2d, 0x70, 0x48, 0x93, 0xb4, 0x59, 0xa0, 0x4d, - 0x34, 0xa6, 0x42, 0x42, 0xea, 0x61, 0xb2, 0x3b, 0x89, 0x17, 0xd6, 0xbb, 0xcb, 0xce, 0xb8, 0x8d, - 0x8f, 0x9c, 0xb8, 0x72, 0x86, 0x23, 0x1f, 0x83, 0x2f, 0xc0, 0xb1, 0xdc, 0x38, 0xa2, 0xe4, 0x8b, - 0xa0, 0x99, 0x9d, 0x7d, 0xb3, 0xd7, 0x01, 0xc4, 0xc5, 0xd9, 0xe7, 0xed, 0xf7, 0xbc, 0xce, 0x3c, - 0x13, 0xd8, 0x75, 0xc2, 0xd1, 0x28, 0x0c, 0x78, 0x44, 0x1d, 0xf6, 0x48, 0xfd, 0xf2, 0x49, 0xe0, - 0x44, 0x71, 0x28, 0xc2, 0x47, 0xea, 0x97, 0xe7, 0xdc, 0x1d, 0xc5, 0x40, 0xcd, 0x8c, 0x81, 0x6d, - 0x58, 0x3b, 0x61, 0xd4, 0x1d, 0x4c, 0x02, 0x87, 0xd0, 0xe0, 0x92, 0x21, 0x04, 0x8d, 0x8b, 0x38, - 0x1c, 0x75, 0x8c, 0xbe, 0xb1, 0xdd, 0x20, 0xea, 0x1b, 0xb5, 0xa0, 0x2e, 0xc2, 0x4e, 0x5d, 0x71, - 0xea, 0x22, 0x44, 0x6d, 0x58, 0xf4, 0xbd, 0x91, 0x27, 0x3a, 0x0b, 0x7d, 0x63, 0x7b, 0x8d, 0x24, - 0x04, 0xbe, 0x82, 0x56, 0x06, 0xc5, 0xf8, 0xd8, 0x17, 0x12, 0x6b, 0x48, 0xf9, 0x50, 0x61, 0x99, - 0x44, 0x7d, 0xa3, 0x4f, 0x60, 0x85, 0xf9, 0x6c, 0xc4, 0x02, 0xc1, 0x3b, 0xf5, 0xfe, 0xc2, 0xf6, - 0xea, 0x5e, 0x7f, 0x27, 0x8f, 0xaf, 0x0c, 0x70, 0x9c, 0x28, 0x92, 0xcc, 0x42, 0x7a, 0x76, 0xc2, - 0x71, 0x90, 0x79, 0x56, 0x04, 0xfe, 0x18, 0x36, 0x2a, 0x0d, 0x65, 0xe0, 0x9e, 0xab, 0xdc, 0x37, - 0x49, 0xdd, 0x73, 0x55, 0x40, 0x8c, 0xba, 0x2a, 0x95, 0x26, 0x51, 0xdf, 0xf8, 0x0d, 0xdc, 0xc9, - 0x8d, 0xbf, 0x1b, 0x33, 0x2e, 0x50, 0x07, 0x96, 0x55, 0x48, 0x76, 0x6a, 0x9b, 0x92, 0xe8, 0x31, - 0x2c, 0xc5, 0xb2, 0x4c, 0x69, 0xec, 0x9d, 0xaa, 0xd8, 0xa5, 0x02, 0xd1, 0x7a, 0xf8, 0x05, 0x58, - 0x85, 0xd8, 0xa2, 0x30, 0xe0, 0x0c, 0xed, 0xc3, 0x72, 0xac, 0xe2, 0xe4, 0x1d, 0x43, 0xc1, 0xfc, - 0x6f, 0x6e, 0x09, 0x48, 0xaa, 0x89, 0x7f, 0x36, 0xe0, 0xee, 0xe9, 0xf9, 0x37, 0xcc, 0x11, 0x52, - 0xfa, 0x92, 0x71, 0x4e, 0x2f, 0xd9, 0x2d, 0xa1, 0x6e, 0x42, 0x33, 0x4e, 0xf2, 0xb1, 0xd3, 0x84, - 0x73, 0x86, 0xb4, 0x8b, 0x59, 0xe4, 0x4f, 0x6c, 0x57, 0x95, 0xb2, 0x49, 0x52, 0x52, 0x4a, 0x22, - 0x3a, 0xf1, 0x43, 0xea, 0x76, 0x1a, 0xaa, 0x6f, 0x29, 0x89, 0xba, 0xb0, 0x12, 0xaa, 0x00, 0x6c, - 0xb7, 0xb3, 0xa8, 0x8c, 0x32, 0x1a, 0x33, 0xb0, 0x06, 0xd2, 0xf1, 0xd9, 0x98, 0x0f, 0xd3, 0x32, - 0xee, 0xe6, 0x48, 0x32, 0xb6, 0xd5, 0xbd, 0xfb, 0x85, 0x34, 0x13, 0xed, 0x44, 0x9c, 0xbb, 0xe8, - 0x01, 0x1c, 0xc6, 0xcc, 0x65, 0x81, 0xf0, 0xa8, 0xaf, 0xa2, 0x36, 0x49, 0x81, 0x83, 0xd7, 0xe1, - 0x6e, 0xc1, 0x4d, 0x52, 0x4e, 0x8c, 0x33, 0xdf, 0xbe, 0x9f, 0xfa, 0x9e, 0xea, 0x3c, 0x7e, 0x9e, - 0x19, 0x4a, 0x1d, 0xdd, 0x87, 0x7f, 0x1f, 0x20, 0xfe, 0xbe, 0x0e, 0x66, 0x51, 0x82, 0x0e, 0x60, - 0x55, 0xd9, 0xc8, 0xb6, 0xb1, 0x58, 0xe3, 0x6c, 0x15, 0x70, 0x08, 0x7d, 0x37, 0xc8, 0x15, 0xbe, - 0xf2, 0xc4, 0xd0, 0x76, 0x49, 0xd1, 0x46, 0x26, 0x4d, 0x1d, 0x5f, 0x03, 0xa6, 0x49, 0xe7, 0x1c, - 0x84, 0xc1, 0xcc, 0xa9, 0xac, 0x61, 0x25, 0x1e, 0xda, 0x83, 0xb6, 0x82, 0x1c, 0x30, 0x21, 0xbc, - 0xe0, 0x92, 0x9f, 0x95, 0x5a, 0x58, 0x29, 0x43, 0x1f, 0xc1, 0xbd, 0x2a, 0x7e, 0xd6, 0xdd, 0x39, - 0x52, 0xfc, 0xbb, 0x01, 0xab, 0x85, 0x94, 0xe4, 0x5c, 0x78, 0xaa, 0x41, 0x62, 0xa2, 0x8f, 0x7a, - 0x46, 0xcb, 0x29, 0x14, 0xde, 0x88, 0x71, 0x41, 0x47, 0x91, 0x4a, 0x6d, 0x81, 0xe4, 0x0c, 0x29, - 0x55, 0x3e, 0xbe, 0x9c, 0x44, 0x4c, 0xa7, 0x95, 0x33, 0xd0, 0x07, 0xd0, 0x92, 0x43, 0xe9, 0x39, - 0x54, 0x78, 0x61, 0xf0, 0x39, 0x9b, 0xa8, 0x6c, 0x1a, 0x64, 0x8a, 0x2b, 0x4f, 0x35, 0x67, 0x2c, - 0x89, 0xda, 0x24, 0xea, 0x1b, 0xed, 0x00, 0x2a, 0x94, 0x38, 0xad, 0xc6, 0x92, 0xd2, 0xa8, 0x90, - 0xe0, 0x33, 0x68, 0x95, 0x1b, 0x85, 0xfa, 0xb3, 0x8d, 0x35, 0xcb, 0x7d, 0x93, 0xd1, 0x7b, 0x97, - 0x01, 0x15, 0xe3, 0x98, 0xe9, 0xb6, 0xe5, 0x0c, 0x7c, 0x04, 0xed, 0xaa, 0xd6, 0xab, 0x73, 0x49, - 0xdf, 0x95, 0x50, 0x73, 0x86, 0x9e, 0xdb, 0x7a, 0x36, 0xb7, 0x3f, 0x19, 0xd0, 0x1e, 0x14, 0xdb, - 0x70, 0x18, 0x06, 0x42, 0x5e, 0x6d, 0x9f, 0x82, 0x99, 0x1c, 0xbe, 0x23, 0xe6, 0x33, 0xc1, 0x2a, - 0x06, 0xf8, 0xb4, 0x20, 0x3e, 0xa9, 0x91, 0x92, 0x3a, 0x7a, 0xaa, 0xb3, 0xd3, 0xd6, 0x75, 0x65, - 0x7d, 0x6f, 0x7a, 0xfc, 0x33, 0xe3, 0xa2, 0xf2, 0xb3, 0x65, 0x58, 0x7c, 0x4b, 0xfd, 0x31, 0xc3, - 0x3d, 0x30, 0x8b, 0x4e, 0x66, 0x0e, 0xdd, 0xbe, 0x9e, 0x13, 0x2d, 0xfe, 0x3f, 0xac, 0xb9, 0xea, - 0x2b, 0x3e, 0x63, 0x2c, 0xce, 0x6e, 0xac, 0x32, 0x13, 0xbf, 0x81, 0x8d, 0x52, 0xc2, 0x83, 0x80, - 0x46, 0x7c, 0x18, 0x0a, 0x79, 0x4c, 0x12, 0x4d, 0xd7, 0x76, 0x93, 0x8b, 0xb3, 0x49, 0x0a, 0x9c, - 0x59, 0xf8, 0x7a, 0x15, 0xfc, 0x0f, 0x06, 0x98, 0x29, 0xf4, 0x11, 0x15, 0x14, 0x3d, 0x81, 0x65, - 0x27, 0xa9, 0xa9, 0xbe, 0x8c, 0xb7, 0xa6, 0xab, 0x30, 0x55, 0x7a, 0x92, 0xea, 0xcb, 0x5d, 0xc6, - 0x75, 0x74, 0xba, 0x82, 0xfd, 0x79, 0xb6, 0x69, 0x16, 0x24, 0xb3, 0xc0, 0xdf, 0xea, 0x2b, 0x69, - 0x30, 0x3e, 0xe7, 0x4e, 0xec, 0x45, 0x72, 0x9c, 0xe5, 0x59, 0xd2, 0x17, 0x78, 0x9a, 0x62, 0x46, - 0xa3, 0xa7, 0xb0, 0x44, 0x1d, 0xa9, 0xa5, 0x9c, 0xb5, 0xf6, 0xf0, 0x8c, 0xb3, 0x02, 0xd2, 0x81, - 0xd2, 0x24, 0xda, 0x02, 0xdb, 0xb0, 0x7e, 0xe0, 0xf8, 0x07, 0xae, 0x4b, 0x98, 0x13, 0xc6, 0xee, - 0xdf, 0x6f, 0xba, 0xc2, 0x1a, 0xa8, 0x97, 0xd6, 0x00, 0xfe, 0x02, 0xda, 0x65, 0x28, 0x7d, 0x9b, - 0x76, 0x61, 0x25, 0x56, 0x9c, 0x0c, 0x2c, 0xa3, 0x6f, 0x41, 0xfb, 0x4c, 0xa1, 0xbd, 0x60, 0x22, - 0x41, 0xe3, 0xff, 0x28, 0x32, 0xea, 0xf8, 0x27, 0xf9, 0x1e, 0x4f, 0x49, 0xbc, 0x0b, 0x1b, 0x53, - 0x58, 0x3a, 0x34, 0xb5, 0xed, 0x14, 0x4b, 0x15, 0xd5, 0x24, 0x29, 0xf9, 0xf0, 0x57, 0x03, 0x56, - 0x8e, 0xe3, 0xf8, 0x30, 0x74, 0x19, 0x47, 0x2d, 0x80, 0xd7, 0x01, 0xbb, 0x8a, 0x98, 0x23, 0x98, - 0x6b, 0xd5, 0x90, 0xa5, 0xef, 0xfa, 0x97, 0x1e, 0xe7, 0x5e, 0x70, 0x69, 0x19, 0xe8, 0x8e, 0x9e, - 0xe8, 0xe3, 0x2b, 0x8f, 0x0b, 0x6e, 0xd5, 0xd1, 0x3a, 0xdc, 0x51, 0x8c, 0x57, 0xa1, 0xb0, 0x83, - 0x43, 0xea, 0x0c, 0x99, 0xb5, 0x80, 0x10, 0xb4, 0x14, 0xd3, 0xe6, 0xc9, 0xe4, 0xbb, 0x56, 0x03, - 0x75, 0xa0, 0xad, 0x26, 0x90, 0xbf, 0x0a, 0x85, 0x8e, 0xcb, 0x3b, 0xf7, 0x99, 0xb5, 0x88, 0xda, - 0x60, 0x11, 0xe6, 0x30, 0x2f, 0x12, 0x36, 0xb7, 0x83, 0xb7, 0xd4, 0xf7, 0x5c, 0x6b, 0x49, 0x62, - 0x68, 0x42, 0x5f, 0x51, 0xd6, 0xb2, 0xf4, 0x7e, 0x1c, 0xc7, 0x61, 0x7c, 0x7a, 0x71, 0xc1, 0x99, - 0xb0, 0xdc, 0x87, 0x4f, 0xe0, 0xfe, 0x9c, 0xc6, 0xa3, 0x35, 0x68, 0x6a, 0xee, 0x39, 0xb3, 0x6a, - 0xd2, 0xf4, 0x75, 0xc0, 0x33, 0x86, 0xb1, 0xf7, 0x4b, 0x03, 0x9a, 0x89, 0xed, 0x24, 0x70, 0xd0, - 0x21, 0xac, 0xa4, 0xef, 0x0e, 0xd4, 0xad, 0x7c, 0x8c, 0xa8, 0xae, 0x74, 0x1f, 0x54, 0x3f, 0x54, - 0x92, 0x2a, 0x3f, 0xd7, 0x88, 0x72, 0x39, 0xa3, 0x07, 0x33, 0xab, 0x34, 0x7f, 0x19, 0x74, 0x37, - 0xab, 0x85, 0x33, 0x38, 0xbe, 0x5f, 0x85, 0x93, 0x6d, 0xf9, 0x2a, 0x9c, 0xc2, 0x7a, 0x27, 0x60, - 0xe5, 0x0f, 0xa6, 0x81, 0x88, 0x19, 0x1d, 0xa1, 0xcd, 0x99, 0x0b, 0xb2, 0xf0, 0x9a, 0xea, 0xde, - 0x2a, 0xdd, 0x36, 0x1e, 0x1b, 0xe8, 0x04, 0x20, 0x17, 0xfc, 0x17, 0x34, 0x74, 0x0a, 0x66, 0xf1, - 0x18, 0xa1, 0x5e, 0x41, 0xbb, 0xe2, 0xa8, 0x76, 0xb7, 0xe6, 0xca, 0xb3, 0x74, 0xd7, 0x4a, 0xd3, - 0x8f, 0xa6, 0x2c, 0x66, 0xce, 0x58, 0xb7, 0x3f, 0x5f, 0x21, 0xc1, 0x7c, 0xf6, 0xe1, 0x6f, 0xd7, - 0x3d, 0xe3, 0xfd, 0x75, 0xcf, 0xf8, 0xf3, 0xba, 0x67, 0xfc, 0x78, 0xd3, 0xab, 0xbd, 0xbf, 0xe9, - 0xd5, 0xfe, 0xb8, 0xe9, 0xd5, 0xbe, 0xee, 0xce, 0xff, 0xb7, 0xe3, 0x7c, 0x49, 0xfd, 0xd9, 0xff, - 0x2b, 0x00, 0x00, 0xff, 0xff, 0xa7, 0xc2, 0xb9, 0x8a, 0x9b, 0x0c, 0x00, 0x00, + 0x1b, 0xf7, 0x6e, 0x9c, 0xd8, 0x7e, 0xb2, 0x76, 0xb7, 0x13, 0xa7, 0xf5, 0xdf, 0x8d, 0x1c, 0x6b, + 0xf5, 0x17, 0x8a, 0x72, 0x48, 0x9a, 0x04, 0x21, 0xb5, 0xc0, 0x21, 0x4d, 0x52, 0xb2, 0x40, 0x9b, + 0x68, 0x4c, 0x85, 0x84, 0xc4, 0x61, 0xb2, 0x3b, 0x89, 0x17, 0xd6, 0xbb, 0x66, 0x67, 0xdc, 0xc6, + 0x47, 0x4e, 0xdc, 0x10, 0x67, 0x38, 0xf2, 0x31, 0xf8, 0x02, 0x1c, 0xcb, 0x8d, 0x23, 0x4a, 0xbe, + 0x08, 0x9a, 0xd9, 0xd9, 0x37, 0x7b, 0x1d, 0x8a, 0xb8, 0x38, 0xfb, 0xbc, 0xfd, 0x9e, 0xd7, 0x79, + 0x66, 0x02, 0x7b, 0x4e, 0x38, 0x1a, 0x85, 0x01, 0x1b, 0x13, 0x87, 0xee, 0xca, 0x5f, 0x36, 0x0d, + 0x9c, 0x71, 0x14, 0xf2, 0x70, 0x57, 0xfe, 0xb2, 0x8c, 0xbb, 0x23, 0x19, 0xa8, 0x91, 0x32, 0x2c, + 0x0a, 0xcd, 0x53, 0x4a, 0xdc, 0xc1, 0x34, 0x70, 0x30, 0x09, 0xae, 0x28, 0x42, 0x50, 0xbd, 0x8c, + 0xc2, 0x51, 0x47, 0xeb, 0x6b, 0x5b, 0x55, 0x2c, 0xbf, 0x51, 0x0b, 0x74, 0x1e, 0x76, 0x74, 0xc9, + 0xd1, 0x79, 0x88, 0xda, 0xb0, 0xec, 0x7b, 0x23, 0x8f, 0x77, 0x96, 0xfa, 0xda, 0x56, 0x13, 0xc7, + 0x04, 0xea, 0x42, 0x9d, 0xfa, 0x74, 0x44, 0x03, 0xce, 0x3a, 0xd5, 0xbe, 0xb6, 0x55, 0xc7, 0x29, + 0x6d, 0x5d, 0x43, 0x2b, 0x75, 0x43, 0xd9, 0xc4, 0xe7, 0xc2, 0xcf, 0x90, 0xb0, 0xa1, 0xf4, 0x63, + 0x60, 0xf9, 0x8d, 0x3e, 0xca, 0x21, 0xe8, 0xfd, 0xa5, 0xad, 0xd5, 0xfd, 0xfe, 0x4e, 0x16, 0x7b, + 0x11, 0xe0, 0x24, 0x56, 0xcc, 0x7c, 0x88, 0xa8, 0x9c, 0x70, 0x12, 0xa4, 0x51, 0x49, 0xc2, 0xfa, + 0x10, 0xd6, 0x4b, 0x0d, 0x45, 0x52, 0x9e, 0x2b, 0xdd, 0x37, 0xb0, 0xee, 0xb9, 0x32, 0x20, 0x4a, + 0x5c, 0x99, 0x66, 0x03, 0xcb, 0x6f, 0xeb, 0x47, 0x0d, 0xee, 0x65, 0xd6, 0xdf, 0x4d, 0x28, 0xe3, + 0xa8, 0x03, 0x35, 0x19, 0x93, 0x9d, 0x18, 0x27, 0x24, 0x7a, 0x0c, 0x2b, 0x91, 0xa8, 0x61, 0x12, + 0x7c, 0xa7, 0x2c, 0x78, 0xa1, 0x80, 0x95, 0x1e, 0xda, 0x85, 0xba, 0xeb, 0x5d, 0x5e, 0x7e, 0x31, + 0x1d, 0x53, 0x19, 0x75, 0x6b, 0x7f, 0x2d, 0x67, 0x73, 0xac, 0x44, 0x38, 0x55, 0xb2, 0xae, 0xc1, + 0xcc, 0x65, 0x33, 0x0e, 0x03, 0x46, 0xd1, 0x01, 0xd4, 0x22, 0x99, 0x19, 0xeb, 0x68, 0xd2, 0xef, + 0xff, 0x16, 0x16, 0x0d, 0x27, 0x9a, 0x05, 0xcf, 0xfa, 0xbb, 0x78, 0xfe, 0x45, 0x83, 0xfb, 0x67, + 0x17, 0xdf, 0x50, 0x87, 0x0b, 0xb8, 0x17, 0x94, 0x31, 0x72, 0x45, 0xef, 0x28, 0xc6, 0x06, 0x34, + 0xa2, 0xb8, 0x62, 0x76, 0x52, 0xd3, 0x8c, 0x21, 0xec, 0x22, 0x3a, 0xf6, 0xa7, 0xb6, 0x2b, 0xf3, + 0x6e, 0xe0, 0x84, 0x14, 0x92, 0x31, 0x99, 0xfa, 0x21, 0x71, 0xe5, 0x10, 0x19, 0x38, 0x21, 0xc5, + 0x7c, 0x85, 0x32, 0x00, 0xdb, 0xed, 0x2c, 0x4b, 0xa3, 0x94, 0xb6, 0x28, 0x98, 0x03, 0xe1, 0xf8, + 0x7c, 0xc2, 0x86, 0x49, 0xa3, 0xf6, 0x32, 0x24, 0x11, 0xdb, 0xea, 0xfe, 0xc3, 0x5c, 0x86, 0xb1, + 0x76, 0x2c, 0xce, 0x5c, 0xf4, 0x00, 0x8e, 0x22, 0xea, 0xd2, 0x80, 0x7b, 0xc4, 0x97, 0x51, 0x1b, + 0x38, 0xc7, 0xb1, 0xd6, 0xe0, 0x7e, 0xce, 0x4d, 0x5c, 0x7f, 0xcb, 0x4a, 0x7d, 0xfb, 0x7e, 0xe2, + 0x7b, 0x66, 0xb8, 0xac, 0xe7, 0xa9, 0xa1, 0xd0, 0x51, 0x8d, 0xfb, 0xf7, 0x01, 0x5a, 0xdf, 0xeb, + 0x60, 0xe4, 0x25, 0xe8, 0x10, 0x56, 0xa5, 0x8d, 0xe8, 0x33, 0x8d, 0x14, 0xce, 0x66, 0x0e, 0x07, + 0x93, 0x37, 0x83, 0x4c, 0xe1, 0x4b, 0x8f, 0x0f, 0x6d, 0x17, 0xe7, 0x6d, 0x44, 0xd2, 0xc4, 0xf1, + 0x15, 0x60, 0x92, 0x74, 0xc6, 0x41, 0x16, 0x18, 0x19, 0x95, 0x36, 0xac, 0xc0, 0x43, 0xfb, 0xd0, + 0x96, 0x90, 0x03, 0xca, 0xb9, 0x17, 0x5c, 0xb1, 0xf3, 0x42, 0x0b, 0x4b, 0x65, 0xe8, 0x03, 0x78, + 0x50, 0xc6, 0x4f, 0xbb, 0xbb, 0x40, 0x6a, 0xfd, 0xa1, 0xc1, 0x6a, 0x2e, 0x25, 0x31, 0x17, 0x9e, + 0x6c, 0x10, 0x9f, 0xaa, 0x6d, 0x92, 0xd2, 0x62, 0x0a, 0xb9, 0x37, 0xa2, 0x8c, 0x93, 0xd1, 0x58, + 0xa6, 0xb6, 0x84, 0x33, 0x86, 0x90, 0x4a, 0x1f, 0xe9, 0xf9, 0x6b, 0xe0, 0x8c, 0x81, 0xde, 0x83, + 0x96, 0x18, 0x4a, 0xcf, 0x21, 0xdc, 0x0b, 0x83, 0xcf, 0xe8, 0x54, 0x66, 0x53, 0xc5, 0x33, 0x5c, + 0xb1, 0x38, 0x18, 0xa5, 0x71, 0xd4, 0x06, 0x96, 0xdf, 0x68, 0x07, 0x50, 0xae, 0xc4, 0x49, 0x35, + 0x56, 0xa4, 0x46, 0x89, 0xc4, 0x3a, 0x87, 0x56, 0xb1, 0x51, 0xa8, 0x3f, 0xdf, 0x58, 0xa3, 0xd8, + 0x37, 0x11, 0xbd, 0x77, 0x15, 0x10, 0x3e, 0x89, 0xa8, 0x6a, 0x5b, 0xc6, 0xb0, 0x8e, 0xa1, 0x5d, + 0xd6, 0x7a, 0x79, 0x2e, 0xc9, 0x9b, 0x02, 0x6a, 0xc6, 0x50, 0x73, 0xab, 0xa7, 0x73, 0xfb, 0xb3, + 0x06, 0xed, 0x41, 0xbe, 0x0d, 0x47, 0x61, 0xc0, 0xc5, 0xf6, 0xfc, 0x18, 0x8c, 0xf8, 0xf0, 0x1d, + 0x53, 0x9f, 0x72, 0x5a, 0x32, 0xc0, 0x67, 0x39, 0xf1, 0x69, 0x05, 0x17, 0xd4, 0xd1, 0x53, 0x95, + 0x9d, 0xb2, 0xd6, 0xa5, 0xf5, 0x83, 0xd9, 0xf1, 0x4f, 0x8d, 0xf3, 0xca, 0xcf, 0x6a, 0xb0, 0xfc, + 0x9a, 0xf8, 0x13, 0x6a, 0xf5, 0xc0, 0xc8, 0x3b, 0x99, 0x3b, 0x74, 0x07, 0x6a, 0x4e, 0x94, 0xf8, + 0xff, 0xd0, 0x74, 0xe5, 0x57, 0x74, 0x4e, 0x69, 0x94, 0x6e, 0xac, 0x22, 0xd3, 0xfa, 0x1a, 0xd6, + 0x0b, 0x09, 0x0f, 0x02, 0x32, 0x66, 0xc3, 0x90, 0x8b, 0x63, 0x12, 0x6b, 0xba, 0xb6, 0x1b, 0x6f, + 0xda, 0x06, 0xce, 0x71, 0xe6, 0xe1, 0xf5, 0x32, 0xf8, 0x1f, 0x34, 0x30, 0x12, 0xe8, 0x63, 0xc2, + 0x09, 0x7a, 0x02, 0x35, 0x27, 0xae, 0xa9, 0xda, 0xde, 0x9b, 0xb3, 0x55, 0x98, 0x29, 0x3d, 0x4e, + 0xf4, 0xc5, 0x75, 0xc9, 0x54, 0x74, 0xaa, 0x82, 0xfd, 0x45, 0xb6, 0x49, 0x16, 0x38, 0xb5, 0xb0, + 0xbe, 0x55, 0x2b, 0x69, 0x30, 0xb9, 0x60, 0x4e, 0xe4, 0x8d, 0xc5, 0x38, 0x8b, 0xb3, 0xa4, 0x16, + 0x78, 0x92, 0x62, 0x4a, 0xa3, 0xa7, 0xb0, 0x42, 0x1c, 0xa1, 0xa5, 0x2e, 0x0c, 0x6b, 0xce, 0x59, + 0x0e, 0xe9, 0x50, 0x6a, 0x62, 0x65, 0x61, 0xd9, 0xb0, 0x76, 0xe8, 0xf8, 0x87, 0xae, 0x8b, 0xa9, + 0x13, 0x46, 0xee, 0x3f, 0xdf, 0xa5, 0xb9, 0x6b, 0x40, 0x2f, 0x5c, 0x03, 0xd6, 0xe7, 0xd0, 0x2e, + 0x42, 0xa9, 0x6d, 0xda, 0x85, 0x7a, 0x24, 0x39, 0x29, 0x58, 0x4a, 0xdf, 0x81, 0xf6, 0xa9, 0x44, + 0xfb, 0x84, 0xf2, 0x18, 0x8d, 0xbd, 0x53, 0x64, 0xc4, 0xf1, 0x4f, 0xb3, 0xa7, 0x42, 0x42, 0x5a, + 0x7b, 0xb0, 0x3e, 0x83, 0xa5, 0x42, 0x93, 0xb7, 0x9d, 0x64, 0xc9, 0xa2, 0x1a, 0x38, 0x21, 0xb7, + 0x7f, 0xd3, 0xa0, 0x7e, 0x12, 0x45, 0x47, 0xa1, 0x4b, 0x19, 0x6a, 0x01, 0xbc, 0x0a, 0xe8, 0xf5, + 0x98, 0x3a, 0x9c, 0xba, 0x66, 0x05, 0x99, 0x6a, 0xd7, 0xbf, 0xf0, 0x18, 0xf3, 0x82, 0x2b, 0x53, + 0x43, 0xf7, 0xd4, 0x44, 0x9f, 0x5c, 0x7b, 0x8c, 0x33, 0x53, 0x47, 0x6b, 0x70, 0x4f, 0x32, 0x5e, + 0x86, 0xdc, 0x0e, 0x8e, 0x88, 0x33, 0xa4, 0xe6, 0x12, 0x42, 0xd0, 0x92, 0x4c, 0x9b, 0xc5, 0x93, + 0xef, 0x9a, 0x55, 0xd4, 0x81, 0xb6, 0x9c, 0x40, 0xf6, 0x32, 0xe4, 0x2a, 0x2e, 0xef, 0xc2, 0xa7, + 0xe6, 0x32, 0x6a, 0x83, 0x89, 0xa9, 0x43, 0xbd, 0x31, 0xb7, 0x99, 0x1d, 0xbc, 0x26, 0xbe, 0xe7, + 0x9a, 0x2b, 0x02, 0x43, 0x11, 0x6a, 0x45, 0x99, 0x35, 0xe1, 0xfd, 0x24, 0x8a, 0xc2, 0xe8, 0xec, + 0xf2, 0x92, 0x51, 0x6e, 0xba, 0xdb, 0x4f, 0xe0, 0xe1, 0x82, 0xc6, 0xa3, 0x26, 0x34, 0x14, 0xf7, + 0x82, 0x9a, 0x15, 0x61, 0xfa, 0x2a, 0x60, 0x29, 0x43, 0xdb, 0xde, 0x86, 0x7a, 0xf2, 0xc8, 0x40, + 0xab, 0x50, 0xb3, 0x03, 0x4f, 0x5c, 0xb0, 0x66, 0x05, 0xdd, 0x87, 0xe6, 0x79, 0x44, 0x1d, 0xe2, + 0x3b, 0x13, 0x9f, 0x88, 0xd8, 0xb5, 0xfd, 0x5f, 0xab, 0xd0, 0x88, 0xfd, 0x4c, 0x03, 0x07, 0x1d, + 0x41, 0x3d, 0x79, 0xd4, 0xa0, 0x6e, 0xe9, 0x4b, 0x47, 0x76, 0xb0, 0xfb, 0xa8, 0xfc, 0x15, 0x14, + 0x77, 0xe4, 0xb9, 0x42, 0x14, 0x17, 0x39, 0x7a, 0x34, 0x77, 0xed, 0x66, 0xaf, 0x88, 0xee, 0x46, + 0xb9, 0x70, 0x0e, 0xc7, 0xf7, 0xcb, 0x70, 0xd2, 0x17, 0x41, 0x19, 0x4e, 0xee, 0x29, 0x80, 0xc1, + 0xcc, 0x1e, 0x57, 0x03, 0x1e, 0x51, 0x32, 0x42, 0x1b, 0x73, 0xcb, 0x34, 0xf7, 0xf2, 0xea, 0xde, + 0x29, 0xdd, 0xd2, 0x1e, 0x6b, 0xe8, 0x14, 0x20, 0x13, 0xfc, 0x17, 0x34, 0x74, 0x06, 0x46, 0xfe, + 0xc8, 0xa1, 0x5e, 0x4e, 0xbb, 0xe4, 0x58, 0x77, 0x37, 0x17, 0xca, 0xd3, 0x74, 0x9b, 0x85, 0x93, + 0x82, 0x66, 0x2c, 0xe6, 0xce, 0x63, 0xb7, 0xbf, 0x58, 0x21, 0xc6, 0x7c, 0xf6, 0xfe, 0xef, 0x37, + 0x3d, 0xed, 0xed, 0x4d, 0x4f, 0xfb, 0xeb, 0xa6, 0xa7, 0xfd, 0x74, 0xdb, 0xab, 0xbc, 0xbd, 0xed, + 0x55, 0xfe, 0xbc, 0xed, 0x55, 0xbe, 0xea, 0x2e, 0xfe, 0x0f, 0xe9, 0x62, 0x45, 0xfe, 0x39, 0xf8, + 0x3b, 0x00, 0x00, 0xff, 0xff, 0xa7, 0xd0, 0x2d, 0x98, 0x46, 0x0d, 0x00, 0x00, } func (m *HeadSyncRange) Marshal() (dAtA []byte, err error) { @@ -1564,6 +1619,16 @@ func (m *HeadSyncRange) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.Elements { + i-- + if m.Elements { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x20 + } if m.Limit != 0 { i = encodeVarintSpacesync(dAtA, i, uint64(m.Limit)) i-- @@ -1688,6 +1753,11 @@ func (m *HeadSyncRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.DiffType != 0 { + i = encodeVarintSpacesync(dAtA, i, uint64(m.DiffType)) + i-- + dAtA[i] = 0x18 + } if len(m.Ranges) > 0 { for iNdEx := len(m.Ranges) - 1; iNdEx >= 0; iNdEx-- { { @@ -1732,6 +1802,11 @@ func (m *HeadSyncResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.DiffType != 0 { + i = encodeVarintSpacesync(dAtA, i, uint64(m.DiffType)) + i-- + dAtA[i] = 0x10 + } if len(m.Results) > 0 { for iNdEx := len(m.Results) - 1; iNdEx >= 0; iNdEx-- { { @@ -2563,6 +2638,9 @@ func (m *HeadSyncRange) Size() (n int) { if m.Limit != 0 { n += 1 + sovSpacesync(uint64(m.Limit)) } + if m.Elements { + n += 2 + } return n } @@ -2621,6 +2699,9 @@ func (m *HeadSyncRequest) Size() (n int) { n += 1 + l + sovSpacesync(uint64(l)) } } + if m.DiffType != 0 { + n += 1 + sovSpacesync(uint64(m.DiffType)) + } return n } @@ -2636,6 +2717,9 @@ func (m *HeadSyncResponse) Size() (n int) { n += 1 + l + sovSpacesync(uint64(l)) } } + if m.DiffType != 0 { + n += 1 + sovSpacesync(uint64(m.DiffType)) + } return n } @@ -3090,6 +3174,26 @@ func (m *HeadSyncRange) Unmarshal(dAtA []byte) error { break } } + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Elements", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSpacesync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.Elements = bool(v != 0) default: iNdEx = preIndex skippy, err := skipSpacesync(dAtA[iNdEx:]) @@ -3457,6 +3561,25 @@ func (m *HeadSyncRequest) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 3: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field DiffType", wireType) + } + m.DiffType = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSpacesync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.DiffType |= DiffType(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipSpacesync(dAtA[iNdEx:]) @@ -3541,6 +3664,25 @@ func (m *HeadSyncResponse) Unmarshal(dAtA []byte) error { return err } iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field DiffType", wireType) + } + m.DiffType = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSpacesync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.DiffType |= DiffType(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipSpacesync(dAtA[iNdEx:]) From 39e28159bb73ca639dc11afc11a766b440a1cf3d Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Fri, 1 Dec 2023 21:42:16 +0100 Subject: [PATCH 3/8] Mark hash dirty --- app/ldiff/diffcontainer.go | 2 ++ app/ldiff/olddiff.go | 4 ++++ 2 files changed, 6 insertions(+) diff --git a/app/ldiff/diffcontainer.go b/app/ldiff/diffcontainer.go index d6b43b34..c9ee867e 100644 --- a/app/ldiff/diffcontainer.go +++ b/app/ldiff/diffcontainer.go @@ -30,12 +30,14 @@ func (d *diffContainer) PrecalculatedDiff() Diff { func (d *diffContainer) Set(elements ...Element) { d.initial.mu.Lock() defer d.initial.mu.Unlock() + defer d.initial.markHashDirty() d.precalculated.Set(elements...) } func (d *diffContainer) RemoveId(id string) error { d.initial.mu.Lock() defer d.initial.mu.Unlock() + defer d.initial.markHashDirty() return d.precalculated.RemoveId(id) } diff --git a/app/ldiff/olddiff.go b/app/ldiff/olddiff.go index 71f2f0a8..9d44b900 100644 --- a/app/ldiff/olddiff.go +++ b/app/ldiff/olddiff.go @@ -251,6 +251,10 @@ func (d *olddiff) Diff(ctx context.Context, dl Remote) (newIds, changedIds, remo return dctx.newIds, dctx.changedIds, dctx.removedIds, nil } +func (d *olddiff) markHashDirty() { + d.hashIsDirty.Store(true) +} + func (d *olddiff) compareResults(dctx *diffCtx, r Range, myRes, otherRes RangeResult) { // both hash equals - do nothing if bytes.Equal(myRes.Hash, otherRes.Hash) { From 847747f8b8e90e5e8357583a7fc06ea804f123d8 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Fri, 1 Dec 2023 22:21:57 +0100 Subject: [PATCH 4/8] Send diff type during ranges sync --- commonspace/headsync/remotediff.go | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/commonspace/headsync/remotediff.go b/commonspace/headsync/remotediff.go index 25c3e58b..df0e3794 100644 --- a/commonspace/headsync/remotediff.go +++ b/commonspace/headsync/remotediff.go @@ -20,19 +20,20 @@ type RemoteDiff interface { } func NewRemoteDiff(spaceId string, client Client) RemoteDiff { - return remote{ + return &remote{ spaceId: spaceId, client: client, } } type remote struct { - spaceId string - client Client + spaceId string + client Client + diffType spacesyncproto.DiffType } // DiffTypeCheck checks which type of diff should we use -func (r remote) DiffTypeCheck(ctx context.Context, diffContainer ldiff.DiffContainer) (needsSync bool, diff ldiff.Diff, err error) { +func (r *remote) DiffTypeCheck(ctx context.Context, diffContainer ldiff.DiffContainer) (needsSync bool, diff ldiff.Diff, err error) { req := &spacesyncproto.HeadSyncRequest{ SpaceId: r.spaceId, DiffType: spacesyncproto.DiffType_Precalculated, @@ -53,6 +54,7 @@ func (r remote) DiffTypeCheck(ctx context.Context, diffContainer ldiff.DiffConta } return false, nil } + r.diffType = resp.DiffType switch resp.DiffType { case spacesyncproto.DiffType_Precalculated: diff = diffContainer.PrecalculatedDiff() @@ -64,7 +66,7 @@ func (r remote) DiffTypeCheck(ctx context.Context, diffContainer ldiff.DiffConta return } -func (r remote) Ranges(ctx context.Context, ranges []ldiff.Range, resBuf []ldiff.RangeResult) (results []ldiff.RangeResult, err error) { +func (r *remote) Ranges(ctx context.Context, ranges []ldiff.Range, resBuf []ldiff.RangeResult) (results []ldiff.RangeResult, err error) { results = resBuf[:0] pbRanges := make([]*spacesyncproto.HeadSyncRange, 0, len(ranges)) for _, rg := range ranges { @@ -74,8 +76,9 @@ func (r remote) Ranges(ctx context.Context, ranges []ldiff.Range, resBuf []ldiff }) } req := &spacesyncproto.HeadSyncRequest{ - SpaceId: r.spaceId, - Ranges: pbRanges, + SpaceId: r.spaceId, + Ranges: pbRanges, + DiffType: r.diffType, } resp, err := r.client.HeadSync(ctx, req) if err != nil { From a24e91aecd60e50c1c88f58f2b5b8a8a7efff2d4 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Fri, 1 Dec 2023 22:28:13 +0100 Subject: [PATCH 5/8] Needs sync fix --- commonspace/headsync/remotediff.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/commonspace/headsync/remotediff.go b/commonspace/headsync/remotediff.go index df0e3794..f134fcd5 100644 --- a/commonspace/headsync/remotediff.go +++ b/commonspace/headsync/remotediff.go @@ -50,9 +50,9 @@ func (r *remote) DiffTypeCheck(ctx context.Context, diffContainer ldiff.DiffCont return false, err } if len(resp.Results) != 0 && bytes.Equal(hashB, resp.Results[0].Hash) { - return true, nil + return false, nil } - return false, nil + return true, nil } r.diffType = resp.DiffType switch resp.DiffType { From 51f44b8032bfaf784a0cf9538bf096c9fd351624 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Fri, 1 Dec 2023 22:42:52 +0100 Subject: [PATCH 6/8] Don't write hashes for empty ranges --- app/ldiff/hashrange.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/app/ldiff/hashrange.go b/app/ldiff/hashrange.go index ef47197b..879344e7 100644 --- a/app/ldiff/hashrange.go +++ b/app/ldiff/hashrange.go @@ -216,6 +216,8 @@ func (h *hashRanges) calcElementsHash(from, to uint64) (hash []byte, els int) { hasher.WriteString(elem.Head) els++ } - hash = hasher.Sum(nil) + if els != 0 { + hash = hasher.Sum(nil) + } return } From 87b63ed18bd0e3965c56611da836f0426f75f391 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Sun, 3 Dec 2023 23:58:45 +0100 Subject: [PATCH 7/8] Add params comparison --- app/ldiff/diff_test.go | 78 ++++++++++++++++++++++++++++++++ commonspace/headsync/headsync.go | 2 +- 2 files changed, 79 insertions(+), 1 deletion(-) diff --git a/app/ldiff/diff_test.go b/app/ldiff/diff_test.go index f9501b34..f6b1ee8b 100644 --- a/app/ldiff/diff_test.go +++ b/app/ldiff/diff_test.go @@ -346,3 +346,81 @@ func TestRangesAddRemove(t *testing.T) { } require.Equal(t, addTwice(), addOnce(), addRemove()) } + +func printBestParams() { + numTests := 10 + length := 100000 + calcParams := func(divideFactor, compareThreshold, length int) (total, maxLevel, avgLevel, zeroEls int) { + d := New(divideFactor, compareThreshold) + var els []Element + for i := 0; i < length; i++ { + els = append(els, Element{ + Id: uuid.NewString(), + Head: uuid.NewString(), + }) + } + d.Set(els...) + df := d.(*diff) + for _, rng := range df.ranges.ranges { + if rng.elements == 0 { + zeroEls++ + } + if rng.level > maxLevel { + maxLevel = rng.level + } + avgLevel += rng.level + } + total = len(df.ranges.ranges) + avgLevel = avgLevel / total + return + } + type result struct { + divFactor, compThreshold, numRanges, maxLevel, avgLevel, zeroEls int + } + sf := func(i, j result) int { + if i.numRanges < j.numRanges { + return -1 + } else if i.numRanges == j.numRanges { + return 0 + } else { + return 1 + } + } + var results []result + for divFactor := 0; divFactor < 6; divFactor++ { + df := 1 << divFactor + for compThreshold := 0; compThreshold < 10; compThreshold++ { + ct := 1 << compThreshold + fmt.Println("starting, df:", df, "ct:", ct) + var rngs []result + for i := 0; i < numTests; i++ { + total, maxLevel, avgLevel, zeroEls := calcParams(df, ct, length) + rngs = append(rngs, result{ + divFactor: df, + compThreshold: ct, + numRanges: total, + maxLevel: maxLevel, + avgLevel: avgLevel, + zeroEls: zeroEls, + }) + } + slices.SortFunc(rngs, sf) + ranges := rngs[len(rngs)/2] + results = append(results, ranges) + } + } + slices.SortFunc(results, sf) + fmt.Println(results) + // 100000 - [{16 512 273 2 1 0} {4 512 341 4 3 0} {2 512 511 8 7 0} {1 512 511 8 7 0} + // {8 256 585 3 2 0} {8 512 585 3 2 0} {1 256 1023 9 8 0} {2 256 1023 9 8 0} + // {32 256 1057 2 1 0} {32 512 1057 2 1 0} {32 128 1089 3 1 0} {4 256 1365 5 4 0} + // {4 128 1369 6 4 0} {2 128 2049 11 9 0} {1 128 2049 11 9 0} {1 64 4157 12 10 0} + // {2 64 4159 12 10 0} {16 128 4369 3 2 0} {16 64 4369 3 2 0} {16 256 4369 3 2 0} + // {8 64 4681 4 3 0} {8 128 4681 4 3 0} {4 64 5461 6 5 0} {4 32 6389 7 5 0} + // {8 32 6505 5 4 17} {16 32 8033 4 3 374} {2 32 8619 13 11 0} {1 32 8621 13 11 0} + // {2 16 17837 15 12 0} {1 16 17847 15 12 0} {4 16 21081 8 6 22} {32 64 33825 3 2 1578} + // {32 32 33825 3 2 1559} {32 16 33825 3 2 1518} {8 16 35881 5 4 1313} {16 16 66737 4 3 13022}] + // 1000000 - [{8 256 11753 5 4 0}] + // 1000000 - [{16 128 69905 4 3 0}] + // 1000000 - [{32 256 33825 3 2 0}] +} diff --git a/commonspace/headsync/headsync.go b/commonspace/headsync/headsync.go index 45f0a394..714d6a1d 100644 --- a/commonspace/headsync/headsync.go +++ b/commonspace/headsync/headsync.go @@ -77,7 +77,7 @@ func (h *headSync) Init(a *app.App) (err error) { h.configuration = a.MustComponent(nodeconf.CName).(nodeconf.NodeConf) h.log = log.With(zap.String("spaceId", h.spaceId)) h.storage = a.MustComponent(spacestorage.CName).(spacestorage.SpaceStorage) - h.diffContainer = ldiff.NewDiffContainer(16, 128) + h.diffContainer = ldiff.NewDiffContainer(32, 256) h.peerManager = a.MustComponent(peermanager.CName).(peermanager.PeerManager) h.credentialProvider = a.MustComponent(credentialprovider.CName).(credentialprovider.CredentialProvider) h.syncStatus = a.MustComponent(syncstatus.CName).(syncstatus.StatusService) From 9652dc5291abbd74500d74867c7a351731dd2512 Mon Sep 17 00:00:00 2001 From: mcrakhman Date: Mon, 4 Dec 2023 10:45:19 +0100 Subject: [PATCH 8/8] Add slices --- app/ldiff/diff_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/app/ldiff/diff_test.go b/app/ldiff/diff_test.go index f6b1ee8b..97780de9 100644 --- a/app/ldiff/diff_test.go +++ b/app/ldiff/diff_test.go @@ -10,6 +10,7 @@ import ( "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/exp/slices" ) func TestDiff_fillRange(t *testing.T) {