Skip to content

Commit

Permalink
fix: Merge retry logic (sourcenetwork#2719)
Browse files Browse the repository at this point in the history
## Relevant issue(s)

Resolves sourcenetwork#2718 
Resolves sourcenetwork#2721 

## Description

This PR fixes issues with merge retry and DAG sync processes. It also
moves the `docQueue` from the `net` package into the `db` package.

## Tasks

- [x] I made sure the code is well commented, particularly
hard-to-understand areas.
- [x] I made sure the repository-held documentation is changed
accordingly.
- [x] I made sure the pull request title adheres to the conventional
commit style (the subset used in the project can be found in
[tools/configs/chglog/config.yml](tools/configs/chglog/config.yml)).
- [x] I made sure to discuss its limitations such as threats to
validity, vulnerability to mistake and misuse, robustness to
invalidation of assumptions, resource requirements, ...

## How has this been tested?

`make test`

Specify the platform(s) on which this was tested:
- MacOS
  • Loading branch information
nasdf authored Jun 17, 2024
1 parent 6626441 commit 07e431d
Show file tree
Hide file tree
Showing 9 changed files with 209 additions and 267 deletions.
3 changes: 2 additions & 1 deletion events/dag_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"sync"

"github.com/ipfs/go-cid"

"github.com/sourcenetwork/immutable"
)

Expand All @@ -23,6 +22,8 @@ type DAGMergeChannel = immutable.Option[Channel[DAGMerge]]

// DAGMerge is a notification that a merge can be performed up to the provided CID.
type DAGMerge struct {
// DocID is the unique identifier for the document being merged.
DocID string
// Cid is the id of the composite commit that formed this update in the DAG.
Cid cid.Cid
// SchemaRoot is the root identifier of the schema that defined the shape of the document that was updated.
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ require (
github.com/ipfs/go-log/v2 v2.5.1
github.com/ipld/go-ipld-prime v0.21.0
github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20240322071758-198d7dba8fb8
github.com/ipld/go-ipld-prime/storage/bsrvadapter v0.0.0-20240322071758-198d7dba8fb8
github.com/jbenet/goprocess v0.1.4
github.com/lens-vm/lens/host-go v0.0.0-20231127204031-8d858ed2926c
github.com/lestrrat-go/jwx/v2 v2.0.21
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,8 @@ github.com/ipld/go-ipld-prime v0.21.0 h1:n4JmcpOlPDIxBcY037SVfpd1G+Sj1nKZah0m6QH
github.com/ipld/go-ipld-prime v0.21.0/go.mod h1:3RLqy//ERg/y5oShXXdx5YIp50cFGOanyMctpPjsvxQ=
github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20240322071758-198d7dba8fb8 h1:WQVfplCGOHtFNyZH7eOaEqGsbbje3NP8EFeGggUvEQs=
github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20240322071758-198d7dba8fb8/go.mod h1:PVDd/V/Zz9IW+Diz9LEhD+ZYS9pKzawmtVQhVd0hcgQ=
github.com/ipld/go-ipld-prime/storage/bsrvadapter v0.0.0-20240322071758-198d7dba8fb8 h1:adq3fTx2YXmpTPNvBRIM0Zi5lX4JjQTRjdLYKhXMkQg=
github.com/ipld/go-ipld-prime/storage/bsrvadapter v0.0.0-20240322071758-198d7dba8fb8/go.mod h1:ej/GTRX+HjlHMs/M3zg9fM8mUlQXgHqRvPJjtp+atHw=
github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus=
github.com/jackpal/go-nat-pmp v1.0.2/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc=
github.com/jbenet/go-cienv v0.1.0/go.mod h1:TqNnHUmJgXau0nCzC7kXWeotg3J9W34CUv5Djy1+FlA=
Expand Down Expand Up @@ -1166,6 +1168,8 @@ github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49u
github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM=
github.com/vito/go-sse v1.0.0 h1:e6/iTrrvy8BRrOwJwmQmlndlil+TLdxXvHi55ZDzH6M=
github.com/vito/go-sse v1.0.0/go.mod h1:2wkcaQ+jtlZ94Uve8gYZjFpL68luAjssTINA2hpgcZs=
github.com/warpfork/go-testmark v0.12.1 h1:rMgCpJfwy1sJ50x0M0NgyphxYYPMOODIJHhsXyEHU0s=
github.com/warpfork/go-testmark v0.12.1/go.mod h1:kHwy7wfvGSPh1rQJYKayD4AbtNaeyZdcGi9tNJTaa5Y=
github.com/warpfork/go-wish v0.0.0-20220906213052-39a1cc7a02d0 h1:GDDkbFiaK8jsSDJfjId/PEGEShv6ugrt4kYsC5UIDaQ=
github.com/warpfork/go-wish v0.0.0-20220906213052-39a1cc7a02d0/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw=
github.com/wasmerio/wasmer-go v1.0.4 h1:MnqHoOGfiQ8MMq2RF6wyCeebKOe84G88h5yv+vmxJgs=
Expand Down
124 changes: 74 additions & 50 deletions internal/db/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package db
import (
"container/list"
"context"
"sync"

"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime/linking"
Expand All @@ -34,6 +35,7 @@ import (
)

func (db *db) handleMerges(ctx context.Context, merges events.Subscription[events.DAGMerge]) {
queue := newMergeQueue()
for {
select {
case <-ctx.Done():
Expand All @@ -43,28 +45,40 @@ func (db *db) handleMerges(ctx context.Context, merges events.Subscription[event
return
}
go func() {
err := db.executeMerge(ctx, merge)
// ensure only one merge per docID
queue.add(merge.DocID)
defer queue.done(merge.DocID)

// retry the merge process if a conflict occurs
//
// conficts occur when a user updates a document
// while a merge is in progress.
var err error
for i := 0; i < db.MaxTxnRetries(); i++ {
err = db.executeMerge(ctx, merge)
if errors.Is(err, badger.ErrTxnConflict) {
continue // retry merge
}
break // merge success or error
}

if err != nil {
log.ErrorContextE(
ctx,
"Failed to execute merge",
err,
corelog.String("CID", merge.Cid.String()),
corelog.String("Error", err.Error()),
)
corelog.Any("Error", err),
corelog.Any("Event", merge))
}
if merge.Wg != nil {
merge.Wg.Done()
}
}()
}
}
}

func (db *db) executeMerge(ctx context.Context, dagMerge events.DAGMerge) error {
defer func() {
// Notify the caller that the merge is complete.
if dagMerge.Wg != nil {
dagMerge.Wg.Done()
}
}()
ctx, txn, err := ensureContextTxn(ctx, db, false)
if err != nil {
return err
Expand All @@ -79,7 +93,7 @@ func (db *db) executeMerge(ctx context.Context, dagMerge events.DAGMerge) error
ls := cidlink.DefaultLinkSystem()
ls.SetReadStorage(txn.DAGstore().AsIPLDStorage())

docID, err := getDocIDFromBlock(ctx, ls, dagMerge.Cid)
docID, err := client.NewDocIDFromString(dagMerge.DocID)
if err != nil {
return err
}
Expand All @@ -100,35 +114,57 @@ func (db *db) executeMerge(ctx context.Context, dagMerge events.DAGMerge) error
return err
}

for retry := 0; retry < db.MaxTxnRetries(); retry++ {
err := mp.mergeComposites(ctx)
if err != nil {
return err
}
err = syncIndexedDoc(ctx, docID, col)
if err != nil {
return err
}
err = txn.Commit(ctx)
if err != nil {
if errors.Is(err, badger.ErrTxnConflict) {
txn, err = db.NewTxn(ctx, false)
if err != nil {
return err
}
ctx = SetContextTxn(ctx, txn)
mp.txn = txn
mp.lsys.SetReadStorage(txn.DAGstore().AsIPLDStorage())
// Reset the CRDTs to avoid reusing the old transaction.
mp.mCRDTs = make(map[string]merklecrdt.MerkleCRDT)
continue
}
return err
}
break
err = mp.mergeComposites(ctx)
if err != nil {
return err
}

return nil
err = syncIndexedDoc(ctx, docID, col)
if err != nil {
return err
}

return txn.Commit(ctx)
}

// mergeQueue is synchronization source to ensure that concurrent
// document merges do not cause transaction conflicts.
type mergeQueue struct {
docs map[string]chan struct{}
mutex sync.Mutex
}

func newMergeQueue() *mergeQueue {
return &mergeQueue{
docs: make(map[string]chan struct{}),
}
}

// add adds a docID to the queue. If the docID is already in the queue, it will
// wait for the docID to be removed from the queue. For every add call, done must
// be called to remove the docID from the queue. Otherwise, subsequent add calls will
// block forever.
func (m *mergeQueue) add(docID string) {
m.mutex.Lock()
done, ok := m.docs[docID]
if !ok {
m.docs[docID] = make(chan struct{})
}
m.mutex.Unlock()
if ok {
<-done
m.add(docID)
}
}

func (m *mergeQueue) done(docID string) {
m.mutex.Lock()
defer m.mutex.Unlock()
done, ok := m.docs[docID]
if ok {
delete(m.docs, docID)
close(done)
}
}

type mergeProcessor struct {
Expand Down Expand Up @@ -333,18 +369,6 @@ func (mp *mergeProcessor) initCRDTForType(
return mcrdt, nil
}

func getDocIDFromBlock(ctx context.Context, ls linking.LinkSystem, cid cid.Cid) (client.DocID, error) {
nd, err := ls.Load(linking.LinkContext{Ctx: ctx}, cidlink.Link{Cid: cid}, coreblock.SchemaPrototype)
if err != nil {
return client.DocID{}, err
}
block, err := coreblock.GetFromNode(nd)
if err != nil {
return client.DocID{}, err
}
return client.NewDocIDFromString(string(block.Delta.GetDocID()))
}

func getCollectionFromRootSchema(ctx context.Context, db *db, rootSchema string) (*collection, error) {
cols, err := db.getCollections(
ctx,
Expand Down
28 changes: 28 additions & 0 deletions internal/db/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package db
import (
"context"
"testing"
"time"

"github.com/fxamacker/cbor/v2"
"github.com/ipld/go-ipld-prime"
Expand Down Expand Up @@ -59,6 +60,7 @@ func TestMerge_SingleBranch_NoError(t *testing.T) {
require.NoError(t, err)

err = db.executeMerge(ctx, events.DAGMerge{
DocID: docID.String(),
Cid: compInfo2.link.Cid,
SchemaRoot: col.SchemaRoot(),
})
Expand Down Expand Up @@ -103,6 +105,7 @@ func TestMerge_DualBranch_NoError(t *testing.T) {
require.NoError(t, err)

err = db.executeMerge(ctx, events.DAGMerge{
DocID: docID.String(),
Cid: compInfo2.link.Cid,
SchemaRoot: col.SchemaRoot(),
})
Expand All @@ -112,6 +115,7 @@ func TestMerge_DualBranch_NoError(t *testing.T) {
require.NoError(t, err)

err = db.executeMerge(ctx, events.DAGMerge{
DocID: docID.String(),
Cid: compInfo3.link.Cid,
SchemaRoot: col.SchemaRoot(),
})
Expand Down Expand Up @@ -159,6 +163,7 @@ func TestMerge_DualBranchWithOneIncomplete_CouldNotFindCID(t *testing.T) {
require.NoError(t, err)

err = db.executeMerge(ctx, events.DAGMerge{
DocID: docID.String(),
Cid: compInfo2.link.Cid,
SchemaRoot: col.SchemaRoot(),
})
Expand All @@ -177,6 +182,7 @@ func TestMerge_DualBranchWithOneIncomplete_CouldNotFindCID(t *testing.T) {
require.NoError(t, err)

err = db.executeMerge(ctx, events.DAGMerge{
DocID: docID.String(),
Cid: compInfo3.link.Cid,
SchemaRoot: col.SchemaRoot(),
})
Expand Down Expand Up @@ -292,3 +298,25 @@ func encodeValue(val any) []byte {
}
return b
}

func TestMergeQueue(t *testing.T) {
q := newMergeQueue()

testDocID := "test"

q.add(testDocID)
go q.add(testDocID)
// give time for the goroutine to block
time.Sleep(10 * time.Millisecond)
require.Len(t, q.docs, 1)
q.done(testDocID)
// give time for the goroutine to add the docID
time.Sleep(10 * time.Millisecond)
q.mutex.Lock()
require.Len(t, q.docs, 1)
q.mutex.Unlock()
q.done(testDocID)
q.mutex.Lock()
require.Len(t, q.docs, 0)
q.mutex.Unlock()
}
Loading

0 comments on commit 07e431d

Please sign in to comment.