Skip to content

Commit

Permalink
apply feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
fredcarle committed May 31, 2024
1 parent 5197bf8 commit a7d3b0a
Showing 1 changed file with 167 additions and 111 deletions.
278 changes: 167 additions & 111 deletions internal/db/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/datastore"
"github.com/sourcenetwork/defradb/datastore/badger/v4"
"github.com/sourcenetwork/defradb/errors"
"github.com/sourcenetwork/defradb/events"
"github.com/sourcenetwork/defradb/internal/core"
Expand All @@ -48,7 +49,8 @@ func (db *db) handleMerges(ctx context.Context, merges events.Subscription[event
ctx,
"Failed to execute merge",
err,
corelog.String("cid", merge.Cid.String()),
corelog.String("CID", merge.Cid.String()),
corelog.String("Error", err.Error()),
)
}
}()
Expand All @@ -68,87 +70,86 @@ func (db *db) executeMerge(ctx context.Context, dagMerge events.DAGMerge) error
return err
}
defer txn.Discard(ctx)
mp, err := db.newMergeProcessor(ctx, dagMerge.Cid, dagMerge.SchemaRoot)
if err != nil {
return err
}
mt, err := mp.getHeads(ctx)

col, err := getCollectionFromRootSchema(ctx, db, dagMerge.SchemaRoot)
if err != nil {
return err
}
err = mp.getComposites(ctx, dagMerge.Cid, mt)

ls := cidlink.DefaultLinkSystem()
ls.SetReadStorage(txn.DAGstore().AsIPLDStorage())

docID, err := getDocIDFromBlock(ctx, ls, dagMerge.Cid)
if err != nil {
return err
}
err = mp.merge(ctx)
dsKey := base.MakeDataStoreKeyWithCollectionAndDocID(col.Description(), docID.String())

mt, err := getHeadsAsMergeTarget(ctx, txn, dsKey)
if err != nil {
return err
}
err = mp.syncIndexedDocs(ctx)

mp, err := db.newMergeProcessor(txn, ls, col, dsKey)
if err != nil {
return err
}
return txn.Commit(ctx)
}

type mergeProcessor struct {
ctx context.Context
txn datastore.Txn
ls linking.LinkSystem
docID client.DocID
mCRDTs map[uint32]merklecrdt.MerkleCRDT
col *collection
schemaVersionKey core.CollectionSchemaVersionKey
dsKey core.DataStoreKey
composites *list.List
}

func (db *db) newMergeProcessor(ctx context.Context, cid cid.Cid, rootSchema string) (*mergeProcessor, error) {
txn, ok := TryGetContextTxn(ctx)
if !ok {
return nil, ErrNoTransactionInContext
}

ls := cidlink.DefaultLinkSystem()
ls.SetReadStorage(txn.DAGstore().AsIPLDStorage())
nd, err := ls.Load(linking.LinkContext{Ctx: ctx}, cidlink.Link{Cid: cid}, coreblock.SchemaPrototype)
err = mp.loadComposites(ctx, dagMerge.Cid, mt)
if err != nil {
return nil, err
return err
}

block, err := coreblock.GetFromNode(nd)
if err != nil {
return nil, err
for retry := 0; retry < db.MaxTxnRetries(); retry++ {
err := mp.mergeComposites(ctx)
if err != nil {
return err
}
err = syncIndexedDocs(ctx, docID, col)
if err != nil {
return err
}
err = txn.Commit(ctx)
if err != nil {
if errors.Is(err, badger.ErrTxnConflict) {
txn, err = db.NewTxn(ctx, false)
if err != nil {
return err
}
ctx = SetContextTxn(ctx, txn)
mp.txn = txn
mp.ls.SetReadStorage(txn.DAGstore().AsIPLDStorage())
continue
}
return err
}
break
}

cols, err := db.getCollections(
ctx,
client.CollectionFetchOptions{
SchemaRoot: immutable.Some(rootSchema),
},
)
if err != nil {
return nil, err
}
return nil
}

col := cols[0].(*collection)
docID, err := client.NewDocIDFromString(string(block.Delta.GetDocID()))
if err != nil {
return nil, err
}
type mergeProcessor struct {
txn datastore.Txn
ls linking.LinkSystem
mCRDTs map[string]merklecrdt.MerkleCRDT
col *collection
dsKey core.DataStoreKey
composites *list.List
}

func (db *db) newMergeProcessor(
txn datastore.Txn,
ls linking.LinkSystem,
col *collection,
dsKey core.DataStoreKey,
) (*mergeProcessor, error) {
return &mergeProcessor{
ctx: ctx,
txn: txn,
ls: ls,
docID: docID,
mCRDTs: make(map[uint32]merklecrdt.MerkleCRDT),
col: col,
schemaVersionKey: core.CollectionSchemaVersionKey{
SchemaVersionID: col.Schema().VersionID,
CollectionID: col.ID(),
},
dsKey: base.MakeDataStoreKeyWithCollectionAndDocID(col.Description(), docID.String()),
txn: txn,
ls: ls,
mCRDTs: make(map[string]merklecrdt.MerkleCRDT),
col: col,
dsKey: dsKey,
composites: list.New(),
}, nil
}
Expand All @@ -164,9 +165,13 @@ func newMergeTarget() mergeTarget {
}
}

// getComposites retrieves the composite blocks for the given document until it reaches a
// block that has already been merged or until we reach the genesis block.
func (mp *mergeProcessor) getComposites(ctx context.Context, blockCid cid.Cid, mt mergeTarget) error {
// loadComposites retrieves and stores into the merge processor the composite blocks for the given
// document until it reaches a block that has already been merged or until we reach the genesis block.
func (mp *mergeProcessor) loadComposites(
ctx context.Context,
blockCid cid.Cid,
mt mergeTarget,
) error {
nd, err := mp.ls.Load(linking.LinkContext{Ctx: ctx}, cidlink.Link{Cid: blockCid}, coreblock.SchemaPrototype)
if err != nil {
return err
Expand All @@ -186,7 +191,7 @@ func (mp *mergeProcessor) getComposites(ctx context.Context, blockCid cid.Cid, m
mp.composites.PushFront(block)
for _, link := range block.Links {
if link.Name == core.HEAD {
err := mp.getComposites(ctx, link.Cid, mt)
err := mp.loadComposites(ctx, link.Cid, mt)
if err != nil {
return err
}
Expand All @@ -210,43 +215,12 @@ func (mp *mergeProcessor) getComposites(ctx context.Context, blockCid cid.Cid, m
newMT.headHeigth = childBlock.Delta.GetPriority()
}
}
return mp.getComposites(ctx, blockCid, newMT)
return mp.loadComposites(ctx, blockCid, newMT)
}
return nil
}

// getHeads retrieves the heads of the composite DAG for the given document.
func (mp *mergeProcessor) getHeads(ctx context.Context) (mergeTarget, error) {
headset := clock.NewHeadSet(
mp.txn.Headstore(),
mp.dsKey.WithFieldId(core.COMPOSITE_NAMESPACE).ToHeadStoreKey(),
)

cids, _, err := headset.List(ctx)
if err != nil {
return mergeTarget{}, err
}

mt := newMergeTarget()
for _, cid := range cids {
b, err := mp.txn.DAGstore().Get(ctx, cid)
if err != nil {
return mergeTarget{}, err
}

block, err := coreblock.GetFromBytes(b.RawData())
if err != nil {
return mergeTarget{}, err
}

mt.heads[cid] = block
// All heads have the same height so overwriting is ok.
mt.headHeigth = block.Delta.GetPriority()
}
return mt, nil
}

func (mp *mergeProcessor) merge(ctx context.Context) error {
func (mp *mergeProcessor) mergeComposites(ctx context.Context) error {
for e := mp.composites.Front(); e != nil; e = e.Next() {
block := e.Value.(*coreblock.Block)
link, err := block.GenerateLink()
Expand Down Expand Up @@ -288,12 +262,12 @@ func (mp *mergeProcessor) processBlock(
continue
}

b, err := mp.txn.DAGstore().Get(ctx, link.Cid)
nd, err := mp.ls.Load(linking.LinkContext{Ctx: ctx}, link.Link, coreblock.SchemaPrototype)
if err != nil {
return err
}

childBlock, err := coreblock.GetFromBytes(b.RawData())
childBlock, err := coreblock.GetFromNode(nd)
if err != nil {
return err
}
Expand All @@ -309,13 +283,25 @@ func (mp *mergeProcessor) processBlock(
func (mp *mergeProcessor) initCRDTForType(
field string,
) (merklecrdt.MerkleCRDT, error) {
mcrdt, exists := mp.mCRDTs[field]
if exists {
return mcrdt, nil
}

schemaVersionKey := core.CollectionSchemaVersionKey{
SchemaVersionID: mp.col.Schema().VersionID,
CollectionID: mp.col.ID(),
}

if field == "" {
return merklecrdt.NewMerkleCompositeDAG(
mcrdt = merklecrdt.NewMerkleCompositeDAG(
mp.txn,
mp.schemaVersionKey,
schemaVersionKey,
mp.dsKey.WithFieldId(core.COMPOSITE_NAMESPACE),
"",
), nil
)
mp.mCRDTs[field] = mcrdt
return mcrdt, nil
}

fd, ok := mp.col.Definition().GetFieldByName(field)
Expand All @@ -324,39 +310,109 @@ func (mp *mergeProcessor) initCRDTForType(
return nil, nil
}

return merklecrdt.InstanceWithStore(
mcrdt, err := merklecrdt.InstanceWithStore(
mp.txn,
mp.schemaVersionKey,
schemaVersionKey,
fd.Typ,
fd.Kind,
mp.dsKey.WithFieldId(fd.ID.String()),
field,
)
if err != nil {
return nil, err
}

mp.mCRDTs[field] = mcrdt
return mcrdt, nil
}

func getDocIDFromBlock(ctx context.Context, ls linking.LinkSystem, cid cid.Cid) (client.DocID, error) {
nd, err := ls.Load(linking.LinkContext{Ctx: ctx}, cidlink.Link{Cid: cid}, coreblock.SchemaPrototype)
if err != nil {
return client.DocID{}, err
}
block, err := coreblock.GetFromNode(nd)
if err != nil {
return client.DocID{}, err
}
return client.NewDocIDFromString(string(block.Delta.GetDocID()))
}

func getCollectionFromRootSchema(ctx context.Context, db *db, rootSchema string) (*collection, error) {
cols, err := db.getCollections(
ctx,
client.CollectionFetchOptions{
SchemaRoot: immutable.Some(rootSchema),
},
)
if err != nil {
return nil, err
}
if len(cols) == 0 {
return nil, client.NewErrCollectionNotFoundForSchema(rootSchema)
}
// We currently only support one active collection per root schema
// so it is safe to return the first one.
return cols[0].(*collection), nil
}

// getHeadsAsMergeTarget retrieves the heads of the composite DAG for the given document
// and returns them as a merge target.
func getHeadsAsMergeTarget(ctx context.Context, txn datastore.Txn, dsKey core.DataStoreKey) (mergeTarget, error) {
headset := clock.NewHeadSet(
txn.Headstore(),
dsKey.WithFieldId(core.COMPOSITE_NAMESPACE).ToHeadStoreKey(),
)

cids, _, err := headset.List(ctx)
if err != nil {
return mergeTarget{}, err
}

mt := newMergeTarget()
for _, cid := range cids {
b, err := txn.DAGstore().Get(ctx, cid)
if err != nil {
return mergeTarget{}, err
}

block, err := coreblock.GetFromBytes(b.RawData())
if err != nil {
return mergeTarget{}, err
}

mt.heads[cid] = block
// All heads have the same height so overwriting is ok.
mt.headHeigth = block.Delta.GetPriority()
}
return mt, nil
}

func (mp *mergeProcessor) syncIndexedDocs(
func syncIndexedDocs(
ctx context.Context,
docID client.DocID,
col *collection,
) error {
// remove transaction from old context
oldCtx := SetContextTxn(ctx, nil)

oldDoc, err := mp.col.Get(oldCtx, mp.docID, false)
oldDoc, err := col.Get(oldCtx, docID, false)
isNewDoc := errors.Is(err, client.ErrDocumentNotFoundOrNotAuthorized)
if !isNewDoc && err != nil {
return err
}

doc, err := mp.col.Get(ctx, mp.docID, false)
doc, err := col.Get(ctx, docID, false)
isDeletedDoc := errors.Is(err, client.ErrDocumentNotFoundOrNotAuthorized)
if !isDeletedDoc && err != nil {
return err
}

if isDeletedDoc {
return mp.col.deleteIndexedDoc(ctx, oldDoc)
return col.deleteIndexedDoc(ctx, oldDoc)
} else if isNewDoc {
return mp.col.indexNewDoc(ctx, doc)
return col.indexNewDoc(ctx, doc)
} else {
return mp.col.updateDocIndex(ctx, oldDoc, doc)
return col.updateDocIndex(ctx, oldDoc, doc)
}
}

0 comments on commit a7d3b0a

Please sign in to comment.