Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GO-2477: Change diff, precalculate hashes #127

Merged
merged 8 commits into from
Dec 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 61 additions & 49 deletions app/ldiff/diff.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,25 @@
// Package ldiff provides a container of elements with fixed id and changeable content.
// Diff can calculate the difference with another diff container (you can make it remote) with minimum hops and traffic.
//
//go:generate mockgen -destination mock_ldiff/mock_ldiff.go github.com/anyproto/any-sync/app/ldiff Diff,Remote
//go:generate mockgen -destination mock_ldiff/mock_ldiff.go github.com/anyproto/any-sync/app/ldiff Diff,Remote,DiffContainer
package ldiff

import (
"bytes"
"context"
"encoding/hex"
"errors"
"math"
"sync"

"github.com/cespare/xxhash"
"github.com/huandu/skiplist"
"github.com/zeebo/blake3"
"math"
"sync"

"github.com/anyproto/any-sync/commonspace/spacesyncproto"
)

// New creates new Diff container
// New creates precalculated Diff container
//
// divideFactor - means how many hashes you want to ask for once
//
Expand All @@ -31,6 +34,10 @@ import (
//
// Less threshold and divideFactor - less traffic but more requests
func New(divideFactor, compareThreshold int) Diff {
return newDiff(divideFactor, compareThreshold)
}

func newDiff(divideFactor, compareThreshold int) *diff {
if divideFactor < 2 {
divideFactor = 2
}
Expand All @@ -42,6 +49,9 @@ func New(divideFactor, compareThreshold int) Diff {
compareThreshold: compareThreshold,
}
d.sl = skiplist.New(d)
d.ranges = newHashRanges(divideFactor, compareThreshold, d.sl)
d.ranges.dirty[d.ranges.topRange] = struct{}{}
d.ranges.recalculateHashes()
return d
}

Expand All @@ -62,6 +72,7 @@ type Element struct {
// Range request to get RangeResult
type Range struct {
From, To uint64
Elements bool
Limit int
}

Expand Down Expand Up @@ -96,6 +107,8 @@ type Diff interface {
Hash() string
// Len returns count of elements in the diff
Len() int
// DiffType returns type of diff
DiffType() spacesyncproto.DiffType
}

// Remote interface for using in the Diff
Expand All @@ -108,6 +121,7 @@ type diff struct {
sl *skiplist.SkipList
divideFactor int
compareThreshold int
ranges *hashRanges
mu sync.RWMutex
}

Expand Down Expand Up @@ -140,10 +154,13 @@ func (d *diff) Set(elements ...Element) {
d.mu.Lock()
defer d.mu.Unlock()
for _, e := range elements {
el := &element{Element: e, hash: xxhash.Sum64([]byte(e.Id))}
hash := xxhash.Sum64([]byte(e.Id))
el := &element{Element: e, hash: hash}
d.sl.Remove(el)
d.sl.Set(el, nil)
d.ranges.addElement(hash)
}
d.ranges.recalculateHashes()
}

func (d *diff) Ids() (ids []string) {
Expand Down Expand Up @@ -198,51 +215,44 @@ func (d *diff) Element(id string) (Element, error) {
func (d *diff) Hash() string {
d.mu.RLock()
defer d.mu.RUnlock()
res := d.getRange(Range{To: math.MaxUint64})
return hex.EncodeToString(res.Hash)
return hex.EncodeToString(d.ranges.hash())
}

// RemoveId removes element by id
func (d *diff) RemoveId(id string) error {
d.mu.Lock()
defer d.mu.Unlock()
hash := xxhash.Sum64([]byte(id))
el := &element{Element: Element{
Id: id,
}, hash: xxhash.Sum64([]byte(id))}
}, hash: hash}
if d.sl.Remove(el) == nil {
return ErrElementNotFound
}
d.ranges.removeElement(hash)
d.ranges.recalculateHashes()
return nil
}

func (d *diff) getRange(r Range) (rr RangeResult) {
hasher := hashersPool.Get().(*blake3.Hasher)
defer hashersPool.Put(hasher)
hasher.Reset()
rng := d.ranges.getRange(r.From, r.To)
// if we have the division for this range
if rng != nil {
rr.Hash = rng.hash
rr.Count = rng.elements
if !r.Elements && rng.isDivided {
return
}
}

el := d.sl.Find(&element{hash: r.From})
rr.Elements = make([]Element, 0, r.Limit)
var overfill bool
rr.Elements = make([]Element, 0, d.divideFactor)
for el != nil && el.Key().(*element).hash <= r.To {
elem := el.Key().(*element).Element
el = el.Next()

hasher.WriteString(elem.Id)
hasher.WriteString(elem.Head)
rr.Count++
if !overfill {
if len(rr.Elements) < r.Limit {
rr.Elements = append(rr.Elements, elem)
}
if len(rr.Elements) == r.Limit && el != nil {
overfill = true
}
}
rr.Elements = append(rr.Elements, elem)
}
if overfill {
rr.Elements = nil
}
rr.Hash = hasher.Sum(nil)
rr.Count = len(rr.Elements)
return
}

Expand All @@ -267,13 +277,16 @@ type diffCtx struct {

var errMismatched = errors.New("query and results mismatched")

func (d *diff) DiffType() spacesyncproto.DiffType {
return spacesyncproto.DiffType_Precalculated
}

// Diff makes diff with remote container
func (d *diff) Diff(ctx context.Context, dl Remote) (newIds, changedIds, removedIds []string, err error) {
dctx := &diffCtx{}
dctx.toSend = append(dctx.toSend, Range{
From: 0,
To: math.MaxUint64,
Limit: d.compareThreshold,
From: 0,
To: math.MaxUint64,
})
for len(dctx.toSend) > 0 {
select {
Expand Down Expand Up @@ -307,26 +320,25 @@ func (d *diff) compareResults(dctx *diffCtx, r Range, myRes, otherRes RangeResul
return
}

// both has elements
if len(myRes.Elements) == myRes.Count && len(otherRes.Elements) == otherRes.Count {
d.compareElements(dctx, myRes.Elements, otherRes.Elements)
// other has elements
if len(otherRes.Elements) == otherRes.Count {
if len(myRes.Elements) == myRes.Count {
d.compareElements(dctx, myRes.Elements, otherRes.Elements)
} else {
r.Elements = true
d.compareElements(dctx, d.getRange(r).Elements, otherRes.Elements)
}
return
}

// make more queries
divideFactor := uint64(d.divideFactor)
perRange := (r.To - r.From) / divideFactor
align := ((r.To-r.From)%divideFactor + 1) % divideFactor
if align == 0 {
perRange += 1
// request all elements from other, because we don't have enough
if len(myRes.Elements) == myRes.Count {
r.Elements = true
dctx.prepare = append(dctx.prepare, r)
return
}
var j = r.From
for i := 0; i < d.divideFactor; i++ {
if i == d.divideFactor-1 {
perRange += align
}
dctx.prepare = append(dctx.prepare, Range{From: j, To: j + perRange - 1, Limit: r.Limit})
j += perRange
rangeTuples := genTupleRanges(r.From, r.To, d.divideFactor)
for _, tuple := range rangeTuples {
dctx.prepare = append(dctx.prepare, Range{From: tuple.from, To: tuple.to})
}
return
}
Expand Down
Loading