Skip to content

Commit

Permalink
feat: Add support for branchable collection time-traveling (#3260)
Browse files Browse the repository at this point in the history
## Relevant issue(s)

Resolves #3257

## Description

Add support for branchable collection time-traveling.

Also fixes the docID param which was misbehaving (see commit `Create
document with actual docID not user provided value`).
  • Loading branch information
AndrewSisley authored Nov 20, 2024
1 parent c45b07e commit fa0d92b
Show file tree
Hide file tree
Showing 6 changed files with 287 additions and 69 deletions.
103 changes: 48 additions & 55 deletions internal/db/fetcher/versioned.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"fmt"

"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"

"github.com/sourcenetwork/immutable"

Expand Down Expand Up @@ -89,15 +89,11 @@ type VersionedFetcher struct {
root datastore.Rootstore
store datastore.Txn

dsKey keys.DataStoreKey

queuedCids *list.List

acp immutable.Option[acp.ACP]

col client.Collection
// @todo index *client.IndexDescription
mCRDTs map[client.FieldID]merklecrdt.MerkleCRDT
}

// Init initializes the VersionedFetcher.
Expand All @@ -116,7 +112,6 @@ func (vf *VersionedFetcher) Init(
vf.acp = acp
vf.col = col
vf.queuedCids = list.New()
vf.mCRDTs = make(map[client.FieldID]merklecrdt.MerkleCRDT)
vf.txn = txn

// create store
Expand Down Expand Up @@ -158,10 +153,6 @@ func (vf *VersionedFetcher) Start(ctx context.Context, spans ...core.Span) error
prefix := spans[0].Start.(keys.HeadstoreDocKey)

vf.ctx = ctx
vf.dsKey = keys.DataStoreKey{
CollectionRootID: vf.col.Description().RootID,
DocID: prefix.DocID,
}

if err := vf.seekTo(prefix.Cid); err != nil {
return NewErrFailedToSeek(prefix.Cid, err)
Expand All @@ -170,11 +161,6 @@ func (vf *VersionedFetcher) Start(ctx context.Context, spans ...core.Span) error
return vf.DocumentFetcher.Start(ctx)
}

// Rootstore returns the rootstore of the VersionedFetcher.
func (vf *VersionedFetcher) Rootstore() ds.Datastore {
return vf.root
}

// Start a fetcher with the needed info (cid embedded in a span)

/*
Expand Down Expand Up @@ -324,56 +310,63 @@ func (vf *VersionedFetcher) merge(c cid.Cid) error {
return err
}

link, err := block.GenerateLink()
if err != nil {
return err
}
var mcrdt merklecrdt.MerkleCRDT
switch {
case block.Delta.IsCollection():
mcrdt = merklecrdt.NewMerkleCollection(
vf.store,
keys.NewCollectionSchemaVersionKey(vf.col.Description().SchemaVersionID, vf.col.Description().ID),
keys.NewHeadstoreColKey(vf.col.Description().RootID),
)

// first arg 0 is the index for the composite DAG in the mCRDTs cache
mcrdt, exists := vf.mCRDTs[0]
if !exists {
case block.Delta.IsComposite():
mcrdt = merklecrdt.NewMerkleCompositeDAG(
vf.store,
keys.CollectionSchemaVersionKey{},
vf.dsKey.WithFieldID(core.COMPOSITE_NAMESPACE),
keys.NewCollectionSchemaVersionKey(block.Delta.GetSchemaVersionID(), vf.col.Description().RootID),
keys.DataStoreKey{
CollectionRootID: vf.col.Description().RootID,
DocID: string(block.Delta.GetDocID()),
FieldID: fmt.Sprint(core.COMPOSITE_NAMESPACE),
},
)
vf.mCRDTs[0] = mcrdt
}
err = mcrdt.Clock().ProcessBlock(vf.ctx, block, link)
if err != nil {
return err
}

// handle subgraphs
for _, l := range block.Links {
// get node
subBlock, err := vf.getDAGBlock(l.Link.Cid)
if err != nil {
return err
}

field, ok := vf.col.Definition().GetFieldByName(l.Name)
default:
field, ok := vf.col.Definition().GetFieldByName(block.Delta.GetFieldName())
if !ok {
return client.NewErrFieldNotExist(l.Name)
return client.NewErrFieldNotExist(block.Delta.GetFieldName())
}

mcrdt, exists := vf.mCRDTs[field.ID]
if !exists {
mcrdt, err = merklecrdt.FieldLevelCRDTWithStore(
vf.store,
keys.CollectionSchemaVersionKey{},
field.Typ,
field.Kind,
vf.dsKey.WithFieldID(fmt.Sprint(field.ID)),
field.Name,
)
if err != nil {
return err
}
vf.mCRDTs[field.ID] = mcrdt
mcrdt, err = merklecrdt.FieldLevelCRDTWithStore(
vf.store,
keys.NewCollectionSchemaVersionKey(block.Delta.GetSchemaVersionID(), vf.col.Description().RootID),
field.Typ,
field.Kind,
keys.DataStoreKey{
CollectionRootID: vf.col.Description().RootID,
DocID: string(block.Delta.GetDocID()),
FieldID: fmt.Sprint(field.ID),
},
field.Name,
)
if err != nil {
return err
}
}

err = mcrdt.Clock().ProcessBlock(vf.ctx, subBlock, l.Link)
err = mcrdt.Clock().ProcessBlock(
vf.ctx,
block,
cidlink.Link{
Cid: c,
},
)
if err != nil {
return err
}

// handle subgraphs
for _, l := range block.AllLinks() {
err = vf.merge(l.Cid)
if err != nil {
return err
}
Expand Down
12 changes: 5 additions & 7 deletions internal/planner/select.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,17 +263,15 @@ func (n *selectNode) initSource() ([]aggregateNode, error) {
return nil, err
}

var docID string
if len(n.selectReq.DocIDs.Value()) > 0 {
docID = n.selectReq.DocIDs.Value()[0]
}

// This exists because the fetcher interface demands a []Span, yet the versioned
// fetcher type (that will be the only one consuming this []Span) does not use it
// as either a span, or even a prefix. And with this design limitation this is
// currently the least bad way of passing the cid in to the fetcher.
origScan.Spans(
[]core.Span{
core.NewSpan(
keys.HeadstoreDocKey{
DocID: docID,
Cid: c,
Cid: c,
},
keys.HeadstoreDocKey{},
),
Expand Down
9 changes: 4 additions & 5 deletions internal/request/graphql/schema/descriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,10 @@ An optional set of docIDs for this field. Only documents with a docID
be ignored.
`
cidArgDescription string = `
An optional value that specifies the commit ID of a document to return.
This CID does not need to be the most recent for a document, if it
corresponds to an older version of a document the document will be returned
at the state it was in at the time of that commit. If a matching commit is
not found then an empty set will be returned.
An optional value that specifies the commit ID of a document or a branchable collection.
This CID does not need to be the most recent. If it corresponds to an older version
the document(s) will be returned at the state they were in at the time of that commit.
If a matching commit is not found then an empty set will be returned.
`
singleFieldFilterArgDescription string = `
An optional filter for this join, if the related record does
Expand Down
161 changes: 161 additions & 0 deletions tests/integration/query/simple/with_cid_branchable_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
// Copyright 2024 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package simple

import (
"testing"

testUtils "github.com/sourcenetwork/defradb/tests/integration"
)

func TestQuerySimpleWithCidOfBranchableCollection_FirstCid(t *testing.T) {
test := testUtils.TestCase{
Actions: []any{
testUtils.SchemaUpdate{
Schema: `
type Users @branchable {
name: String
}
`,
},
testUtils.CreateDoc{
Doc: `{
"name": "Fred"
}`,
},
testUtils.UpdateDoc{
Doc: `{
"name": "Freddddd"
}`,
},
testUtils.CreateDoc{
Doc: `{
"name": "John"
}`,
},
testUtils.Request{
Request: `query {
Users (
cid: "bafyreiewwsnu2ld5qlntamdm77ayb7xtmxz3p5difvaaakaome7zbtpo4u"
) {
name
}
}`,
Results: map[string]any{
"Users": []map[string]any{
{
"name": "Fred",
},
},
},
},
},
}

testUtils.ExecuteTestCase(t, test)
}

func TestQuerySimpleWithCidOfBranchableCollection_MiddleCid(t *testing.T) {
test := testUtils.TestCase{
Actions: []any{
testUtils.SchemaUpdate{
Schema: `
type Users @branchable {
name: String
}
`,
},
testUtils.CreateDoc{
Doc: `{
"name": "Fred"
}`,
},
testUtils.UpdateDoc{
Doc: `{
"name": "Freddddd"
}`,
},
testUtils.CreateDoc{
Doc: `{
"name": "John"
}`,
},
testUtils.Request{
Request: `query {
Users (
cid: "bafyreifpamlyhcbriztgbhds5ctgi5rm6w5wcar2py7246lo6j5v7iusxm"
) {
name
}
}`,
Results: map[string]any{
"Users": []map[string]any{
{
"name": "Freddddd",
},
},
},
},
},
}

testUtils.ExecuteTestCase(t, test)
}

func TestQuerySimpleWithCidOfBranchableCollection_LastCid(t *testing.T) {
test := testUtils.TestCase{
Actions: []any{
testUtils.SchemaUpdate{
Schema: `
type Users @branchable {
name: String
}
`,
},
testUtils.CreateDoc{
Doc: `{
"name": "Fred"
}`,
},
testUtils.UpdateDoc{
Doc: `{
"name": "Freddddd"
}`,
},
testUtils.CreateDoc{
Doc: `{
"name": "John"
}`,
},
testUtils.Request{
Request: `query {
Users (
cid: "bafyreigmt6ytph32jjxts2bij7fkne5ntionldsnklp35vcamvvl2x3a5i"
) {
name
}
}`,
Results: map[string]any{
"Users": []map[string]any{
{
"name": "Freddddd",
},
{
"name": "John",
},
},
},
},
},
}

testUtils.ExecuteTestCase(t, test)
}
Loading

0 comments on commit fa0d92b

Please sign in to comment.