Skip to content

Commit

Permalink
add tests for merge
Browse files Browse the repository at this point in the history
  • Loading branch information
fredcarle committed Jun 7, 2024
1 parent d779a7f commit d2eebb7
Show file tree
Hide file tree
Showing 4 changed files with 327 additions and 15 deletions.
2 changes: 1 addition & 1 deletion internal/db/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func newMemoryDB(ctx context.Context) (*db, error) {

func newDefraMemoryDB(ctx context.Context) (*db, error) {
rootstore := memory.NewDatastore(ctx)
return newDB(ctx, rootstore, acp.NoACP)
return newDB(ctx, rootstore, acp.NoACP, nil)
}

func TestNewDB(t *testing.T) {
Expand Down
14 changes: 7 additions & 7 deletions internal/db/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (db *db) executeMerge(ctx context.Context, dagMerge events.DAGMerge) error
}
ctx = SetContextTxn(ctx, txn)
mp.txn = txn
mp.ls.SetReadStorage(txn.DAGstore().AsIPLDStorage())
mp.lsys.SetReadStorage(txn.DAGstore().AsIPLDStorage())
// Reset the CRDTs to avoid reusing the old transaction.
mp.mCRDTs = make(map[string]merklecrdt.MerkleCRDT)
continue
Expand All @@ -133,7 +133,7 @@ func (db *db) executeMerge(ctx context.Context, dagMerge events.DAGMerge) error

type mergeProcessor struct {
txn datastore.Txn
ls linking.LinkSystem
lsys linking.LinkSystem
mCRDTs map[string]merklecrdt.MerkleCRDT
col *collection
dsKey core.DataStoreKey
Expand All @@ -142,13 +142,13 @@ type mergeProcessor struct {

func (db *db) newMergeProcessor(
txn datastore.Txn,
ls linking.LinkSystem,
lsys linking.LinkSystem,
col *collection,
dsKey core.DataStoreKey,
) (*mergeProcessor, error) {
return &mergeProcessor{
txn: txn,
ls: ls,
lsys: lsys,
mCRDTs: make(map[string]merklecrdt.MerkleCRDT),
col: col,
dsKey: dsKey,
Expand Down Expand Up @@ -179,7 +179,7 @@ func (mp *mergeProcessor) loadComposites(
return nil
}

nd, err := mp.ls.Load(linking.LinkContext{Ctx: ctx}, cidlink.Link{Cid: blockCid}, coreblock.SchemaPrototype)
nd, err := mp.lsys.Load(linking.LinkContext{Ctx: ctx}, cidlink.Link{Cid: blockCid}, coreblock.SchemaPrototype)
if err != nil {
return err
}
Expand Down Expand Up @@ -207,7 +207,7 @@ func (mp *mergeProcessor) loadComposites(
for _, b := range mt.heads {
for _, link := range b.Links {
if link.Name == core.HEAD {
nd, err := mp.ls.Load(linking.LinkContext{Ctx: ctx}, link.Link, coreblock.SchemaPrototype)
nd, err := mp.lsys.Load(linking.LinkContext{Ctx: ctx}, link.Link, coreblock.SchemaPrototype)
if err != nil {
return err
}
Expand Down Expand Up @@ -269,7 +269,7 @@ func (mp *mergeProcessor) processBlock(
continue
}

nd, err := mp.ls.Load(linking.LinkContext{Ctx: ctx}, link.Link, coreblock.SchemaPrototype)
nd, err := mp.lsys.Load(linking.LinkContext{Ctx: ctx}, link.Link, coreblock.SchemaPrototype)
if err != nil {
return err
}
Expand Down
313 changes: 313 additions & 0 deletions internal/db/merge_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,313 @@
// Copyright 2024 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package db

import (
"context"
"testing"

"github.com/ipld/go-ipld-prime/linking"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/stretchr/testify/require"

"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/events"
"github.com/sourcenetwork/defradb/internal/core"
coreblock "github.com/sourcenetwork/defradb/internal/core/block"
"github.com/sourcenetwork/defradb/internal/db/base"
"github.com/sourcenetwork/defradb/internal/merkle/clock"
)

const userSchema = `
type User {
name: String
age: Int
email: String
points: Int
}
`

func TestMerge_NoError(t *testing.T) {
// Test that a merge can be performed up to the provided CID.
ctx := context.Background()

// Setup the "local" database
localDB, err := newDefraMemoryDB(ctx)
require.NoError(t, err)
_, err = localDB.AddSchema(ctx, userSchema)
require.NoError(t, err)
localCol, err := localDB.GetCollectionByName(ctx, "User")
require.NoError(t, err)
docMap := map[string]any{
"name": "Alice",
"age": 30,
}
doc, err := client.NewDocFromMap(docMap, localCol.Definition())
require.NoError(t, err)

err = localCol.Create(ctx, doc)
require.NoError(t, err)

// Setup the "remote" database
remoteDB, err := newDefraMemoryDB(ctx)
require.NoError(t, err)
_, err = remoteDB.AddSchema(ctx, userSchema)
require.NoError(t, err)
remoteCol, err := remoteDB.GetCollectionByName(ctx, "User")
require.NoError(t, err)
doc, err = client.NewDocFromMap(docMap, localCol.Definition())
require.NoError(t, err)
err = remoteCol.Create(ctx, doc)
require.NoError(t, err)

// Add a few changes to the remote node
err = doc.Set("points", 100)
require.NoError(t, err)
err = remoteCol.Update(ctx, doc)
require.NoError(t, err)

// Sync the remote blocks to the local node
err = syncAndMerge(ctx, remoteDB, localDB, remoteCol, localCol, doc.ID().String())
require.NoError(t, err)

// verify the local node has the same data as the remote node
localDoc, err := localCol.Get(ctx, doc.ID(), false)
require.NoError(t, err)
localDocString, err := localDoc.String()
require.NoError(t, err)
remoteDoc, err := remoteCol.Get(ctx, doc.ID(), false)
require.NoError(t, err)
remoteDocString, err := remoteDoc.String()
require.NoError(t, err)
require.Equal(t, remoteDocString, localDocString)
}

func TestMerge_DelayedSync_NoError(t *testing.T) {
// Test that a merge can be performed up to the provided CID.
ctx := context.Background()

// Setup the "local" database
localDB, err := newDefraMemoryDB(ctx)
require.NoError(t, err)
_, err = localDB.AddSchema(ctx, userSchema)
require.NoError(t, err)
localCol, err := localDB.GetCollectionByName(ctx, "User")
require.NoError(t, err)
docMap := map[string]any{
"name": "Alice",
"age": 30,
}
doc, err := client.NewDocFromMap(docMap, localCol.Definition())
require.NoError(t, err)

err = localCol.Create(ctx, doc)
require.NoError(t, err)

// Setup the "remote" database
remoteDB, err := newDefraMemoryDB(ctx)
require.NoError(t, err)
_, err = remoteDB.AddSchema(ctx, userSchema)
require.NoError(t, err)
remoteCol, err := remoteDB.GetCollectionByName(ctx, "User")
require.NoError(t, err)
doc, err = client.NewDocFromMap(docMap, localCol.Definition())
require.NoError(t, err)
err = remoteCol.Create(ctx, doc)
require.NoError(t, err)

// Add a few changes to the remote node
err = doc.Set("points", 100)
require.NoError(t, err)
err = remoteCol.Update(ctx, doc)
require.NoError(t, err)

err = doc.Set("age", 31)
require.NoError(t, err)
err = remoteCol.Update(ctx, doc)
require.NoError(t, err)

err = doc.Set("email", "[email protected]")
require.NoError(t, err)
err = remoteCol.Update(ctx, doc)
require.NoError(t, err)

// Sync the remote blocks to the local node
err = syncAndMerge(ctx, remoteDB, localDB, remoteCol, localCol, doc.ID().String())
require.NoError(t, err)

// verify the local node has the same data as the remote node
localDoc, err := localCol.Get(ctx, doc.ID(), false)
require.NoError(t, err)
localDocString, err := localDoc.String()
require.NoError(t, err)
remoteDoc, err := remoteCol.Get(ctx, doc.ID(), false)
require.NoError(t, err)
remoteDocString, err := remoteDoc.String()
require.NoError(t, err)
require.Equal(t, remoteDocString, localDocString)
}

func TestMerge_DelayedSyncTwoBranches_NoError(t *testing.T) {
// Test that a merge can be performed up to the provided CID.
ctx := context.Background()

// Setup the "local" database
localDB, err := newDefraMemoryDB(ctx)
require.NoError(t, err)
_, err = localDB.AddSchema(ctx, userSchema)
require.NoError(t, err)
localCol, err := localDB.GetCollectionByName(ctx, "User")
require.NoError(t, err)
docMap := map[string]interface{}{
"name": "Alice",
"age": 30,
}
doc, err := client.NewDocFromMap(docMap, localCol.Definition())
require.NoError(t, err)

err = localCol.Create(ctx, doc)
require.NoError(t, err)

// Setup the "remote" database
remoteDB1, err := newDefraMemoryDB(ctx)
require.NoError(t, err)
_, err = remoteDB1.AddSchema(ctx, userSchema)
require.NoError(t, err)
remoteCol1, err := remoteDB1.GetCollectionByName(ctx, "User")
require.NoError(t, err)
doc, err = client.NewDocFromMap(docMap, remoteCol1.Definition())
require.NoError(t, err)
err = remoteCol1.Create(ctx, doc)
require.NoError(t, err)

// Setup the second "remote" database
remoteDB2, err := newDefraMemoryDB(ctx)
require.NoError(t, err)
_, err = remoteDB2.AddSchema(ctx, userSchema)
require.NoError(t, err)
remoteCol2, err := remoteDB2.GetCollectionByName(ctx, "User")
require.NoError(t, err)
doc2, err := client.NewDocFromMap(docMap, remoteCol2.Definition())
require.NoError(t, err)
err = remoteCol2.Create(ctx, doc2)
require.NoError(t, err)

// Add a few changes to the remote nodes creating two branches
err = doc.Set("points", 200)
require.NoError(t, err)
err = remoteCol1.Update(ctx, doc)
require.NoError(t, err)

err = doc2.Set("points", 100)
require.NoError(t, err)
err = remoteCol2.Update(ctx, doc2)
require.NoError(t, err)

err = doc.Set("age", 31)
require.NoError(t, err)
err = remoteCol1.Update(ctx, doc)
require.NoError(t, err)

err = doc2.Set("age", 32)
require.NoError(t, err)
err = remoteCol2.Update(ctx, doc2)
require.NoError(t, err)

err = doc.Set("email", "[email protected]")
require.NoError(t, err)
err = remoteCol1.Update(ctx, doc)
require.NoError(t, err)

err = doc2.Set("email", "[email protected]")
require.NoError(t, err)
err = remoteCol2.Update(ctx, doc2)
require.NoError(t, err)

// Sync the remote blocks to the local node
err = syncAndMerge(ctx, remoteDB2, remoteDB1, remoteCol2, remoteCol1, doc.ID().String())
require.NoError(t, err)
err = syncAndMerge(ctx, remoteDB1, localDB, remoteCol1, localCol, doc.ID().String())
require.NoError(t, err)

// verify the local node has the same data as the remote node
localDoc, err := localCol.Get(ctx, doc.ID(), false)
require.NoError(t, err)
localDocString, err := localDoc.String()
require.NoError(t, err)
remoteDoc1, err := remoteCol1.Get(ctx, doc.ID(), false)
require.NoError(t, err)
remoteDocString1, err := remoteDoc1.String()
require.NoError(t, err)
require.Equal(t, remoteDocString1, localDocString)
}

func syncAndMerge(ctx context.Context, from, to *db, fromCol, toCol client.Collection, docID string) error {
dsKey := base.MakeDataStoreKeyWithCollectionAndDocID(fromCol.Description(), docID)
headset := clock.NewHeadSet(
from.multistore.Headstore(),
dsKey.WithFieldId(core.COMPOSITE_NAMESPACE).ToHeadStoreKey(),
)

cids, _, err := headset.List(ctx)
if err != nil {
return err
}

for _, cid := range cids {
blockBytes, err := from.multistore.DAGstore().AsIPLDStorage().Get(ctx, cid.KeyString())
if err != nil {
return err
}
block, err := coreblock.GetFromBytes(blockBytes)
if err != nil {
return err
}
err = syncDAG(ctx, from, to, block)
if err != nil {
return err
}
err = to.executeMerge(ctx, events.DAGMerge{
Cid: cid,
SchemaRoot: toCol.SchemaRoot(),
})
if err != nil {
return err
}
}
return nil
}

func syncDAG(ctx context.Context, from, to *db, block *coreblock.Block) error {
lsys := cidlink.DefaultLinkSystem()
lsys.SetWriteStorage(to.multistore.DAGstore().AsIPLDStorage())
_, err := lsys.Store(linking.LinkContext{Ctx: ctx}, coreblock.GetLinkPrototype(), block.GenerateNode())
if err != nil {
return err
}

for _, link := range block.Links {
lsys := cidlink.DefaultLinkSystem()
lsys.SetReadStorage(from.multistore.DAGstore().AsIPLDStorage())
nd, err := lsys.Load(linking.LinkContext{Ctx: ctx}, link, coreblock.SchemaPrototype)
if err != nil {
return err
}
block, err := coreblock.GetFromNode(nd)
if err != nil {
return err
}
err = syncDAG(ctx, from, to, block)
if err != nil {
return err
}
}
return nil
}
Loading

0 comments on commit d2eebb7

Please sign in to comment.