Skip to content

Commit

Permalink
Merge pull request #127 from anyproto/GO-2477-change-diff-logic
Browse files Browse the repository at this point in the history
  • Loading branch information
mcrakhman authored Dec 4, 2023
2 parents 7a12b25 + 9652dc5 commit 2b86cca
Show file tree
Hide file tree
Showing 16 changed files with 1,433 additions and 197 deletions.
110 changes: 61 additions & 49 deletions app/ldiff/diff.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,25 @@
// Package ldiff provides a container of elements with fixed id and changeable content.
// Diff can calculate the difference with another diff container (you can make it remote) with minimum hops and traffic.
//
//go:generate mockgen -destination mock_ldiff/mock_ldiff.go github.com/anyproto/any-sync/app/ldiff Diff,Remote
//go:generate mockgen -destination mock_ldiff/mock_ldiff.go github.com/anyproto/any-sync/app/ldiff Diff,Remote,DiffContainer
package ldiff

import (
"bytes"
"context"
"encoding/hex"
"errors"
"math"
"sync"

"github.com/cespare/xxhash"
"github.com/huandu/skiplist"
"github.com/zeebo/blake3"
"math"
"sync"

"github.com/anyproto/any-sync/commonspace/spacesyncproto"
)

// New creates new Diff container
// New creates precalculated Diff container
//
// divideFactor - means how many hashes you want to ask for once
//
Expand All @@ -31,6 +34,10 @@ import (
//
// Less threshold and divideFactor - less traffic but more requests
func New(divideFactor, compareThreshold int) Diff {
return newDiff(divideFactor, compareThreshold)
}

func newDiff(divideFactor, compareThreshold int) *diff {
if divideFactor < 2 {
divideFactor = 2
}
Expand All @@ -42,6 +49,9 @@ func New(divideFactor, compareThreshold int) Diff {
compareThreshold: compareThreshold,
}
d.sl = skiplist.New(d)
d.ranges = newHashRanges(divideFactor, compareThreshold, d.sl)
d.ranges.dirty[d.ranges.topRange] = struct{}{}
d.ranges.recalculateHashes()
return d
}

Expand All @@ -62,6 +72,7 @@ type Element struct {
// Range request to get RangeResult
type Range struct {
From, To uint64
Elements bool
Limit int
}

Expand Down Expand Up @@ -96,6 +107,8 @@ type Diff interface {
Hash() string
// Len returns count of elements in the diff
Len() int
// DiffType returns type of diff
DiffType() spacesyncproto.DiffType
}

// Remote interface for using in the Diff
Expand All @@ -108,6 +121,7 @@ type diff struct {
sl *skiplist.SkipList
divideFactor int
compareThreshold int
ranges *hashRanges
mu sync.RWMutex
}

Expand Down Expand Up @@ -140,10 +154,13 @@ func (d *diff) Set(elements ...Element) {
d.mu.Lock()
defer d.mu.Unlock()
for _, e := range elements {
el := &element{Element: e, hash: xxhash.Sum64([]byte(e.Id))}
hash := xxhash.Sum64([]byte(e.Id))
el := &element{Element: e, hash: hash}
d.sl.Remove(el)
d.sl.Set(el, nil)
d.ranges.addElement(hash)
}
d.ranges.recalculateHashes()
}

func (d *diff) Ids() (ids []string) {
Expand Down Expand Up @@ -198,51 +215,44 @@ func (d *diff) Element(id string) (Element, error) {
func (d *diff) Hash() string {
d.mu.RLock()
defer d.mu.RUnlock()
res := d.getRange(Range{To: math.MaxUint64})
return hex.EncodeToString(res.Hash)
return hex.EncodeToString(d.ranges.hash())
}

// RemoveId removes element by id
func (d *diff) RemoveId(id string) error {
d.mu.Lock()
defer d.mu.Unlock()
hash := xxhash.Sum64([]byte(id))
el := &element{Element: Element{
Id: id,
}, hash: xxhash.Sum64([]byte(id))}
}, hash: hash}
if d.sl.Remove(el) == nil {
return ErrElementNotFound
}
d.ranges.removeElement(hash)
d.ranges.recalculateHashes()
return nil
}

func (d *diff) getRange(r Range) (rr RangeResult) {
hasher := hashersPool.Get().(*blake3.Hasher)
defer hashersPool.Put(hasher)
hasher.Reset()
rng := d.ranges.getRange(r.From, r.To)
// if we have the division for this range
if rng != nil {
rr.Hash = rng.hash
rr.Count = rng.elements
if !r.Elements && rng.isDivided {
return
}
}

el := d.sl.Find(&element{hash: r.From})
rr.Elements = make([]Element, 0, r.Limit)
var overfill bool
rr.Elements = make([]Element, 0, d.divideFactor)
for el != nil && el.Key().(*element).hash <= r.To {
elem := el.Key().(*element).Element
el = el.Next()

hasher.WriteString(elem.Id)
hasher.WriteString(elem.Head)
rr.Count++
if !overfill {
if len(rr.Elements) < r.Limit {
rr.Elements = append(rr.Elements, elem)
}
if len(rr.Elements) == r.Limit && el != nil {
overfill = true
}
}
rr.Elements = append(rr.Elements, elem)
}
if overfill {
rr.Elements = nil
}
rr.Hash = hasher.Sum(nil)
rr.Count = len(rr.Elements)
return
}

Expand All @@ -267,13 +277,16 @@ type diffCtx struct {

var errMismatched = errors.New("query and results mismatched")

func (d *diff) DiffType() spacesyncproto.DiffType {
return spacesyncproto.DiffType_Precalculated
}

// Diff makes diff with remote container
func (d *diff) Diff(ctx context.Context, dl Remote) (newIds, changedIds, removedIds []string, err error) {
dctx := &diffCtx{}
dctx.toSend = append(dctx.toSend, Range{
From: 0,
To: math.MaxUint64,
Limit: d.compareThreshold,
From: 0,
To: math.MaxUint64,
})
for len(dctx.toSend) > 0 {
select {
Expand Down Expand Up @@ -307,26 +320,25 @@ func (d *diff) compareResults(dctx *diffCtx, r Range, myRes, otherRes RangeResul
return
}

// both has elements
if len(myRes.Elements) == myRes.Count && len(otherRes.Elements) == otherRes.Count {
d.compareElements(dctx, myRes.Elements, otherRes.Elements)
// other has elements
if len(otherRes.Elements) == otherRes.Count {
if len(myRes.Elements) == myRes.Count {
d.compareElements(dctx, myRes.Elements, otherRes.Elements)
} else {
r.Elements = true
d.compareElements(dctx, d.getRange(r).Elements, otherRes.Elements)
}
return
}

// make more queries
divideFactor := uint64(d.divideFactor)
perRange := (r.To - r.From) / divideFactor
align := ((r.To-r.From)%divideFactor + 1) % divideFactor
if align == 0 {
perRange += 1
// request all elements from other, because we don't have enough
if len(myRes.Elements) == myRes.Count {
r.Elements = true
dctx.prepare = append(dctx.prepare, r)
return
}
var j = r.From
for i := 0; i < d.divideFactor; i++ {
if i == d.divideFactor-1 {
perRange += align
}
dctx.prepare = append(dctx.prepare, Range{From: j, To: j + perRange - 1, Limit: r.Limit})
j += perRange
rangeTuples := genTupleRanges(r.From, r.To, d.divideFactor)
for _, tuple := range rangeTuples {
dctx.prepare = append(dctx.prepare, Range{From: tuple.from, To: tuple.to})
}
return
}
Expand Down
Loading

0 comments on commit 2b86cca

Please sign in to comment.