diff --git a/internal/db/fetcher/versioned.go b/internal/db/fetcher/versioned.go index 199ca38d21..01afe7e2e2 100644 --- a/internal/db/fetcher/versioned.go +++ b/internal/db/fetcher/versioned.go @@ -16,7 +16,7 @@ import ( "fmt" "github.com/ipfs/go-cid" - ds "github.com/ipfs/go-datastore" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/sourcenetwork/immutable" @@ -89,15 +89,11 @@ type VersionedFetcher struct { root datastore.Rootstore store datastore.Txn - dsKey keys.DataStoreKey - queuedCids *list.List acp immutable.Option[acp.ACP] col client.Collection - // @todo index *client.IndexDescription - mCRDTs map[client.FieldID]merklecrdt.MerkleCRDT } // Init initializes the VersionedFetcher. @@ -116,7 +112,6 @@ func (vf *VersionedFetcher) Init( vf.acp = acp vf.col = col vf.queuedCids = list.New() - vf.mCRDTs = make(map[client.FieldID]merklecrdt.MerkleCRDT) vf.txn = txn // create store @@ -158,10 +153,6 @@ func (vf *VersionedFetcher) Start(ctx context.Context, spans ...core.Span) error prefix := spans[0].Start.(keys.HeadstoreDocKey) vf.ctx = ctx - vf.dsKey = keys.DataStoreKey{ - CollectionRootID: vf.col.Description().RootID, - DocID: prefix.DocID, - } if err := vf.seekTo(prefix.Cid); err != nil { return NewErrFailedToSeek(prefix.Cid, err) @@ -170,11 +161,6 @@ func (vf *VersionedFetcher) Start(ctx context.Context, spans ...core.Span) error return vf.DocumentFetcher.Start(ctx) } -// Rootstore returns the rootstore of the VersionedFetcher. -func (vf *VersionedFetcher) Rootstore() ds.Datastore { - return vf.root -} - // Start a fetcher with the needed info (cid embedded in a span) /* @@ -324,56 +310,63 @@ func (vf *VersionedFetcher) merge(c cid.Cid) error { return err } - link, err := block.GenerateLink() - if err != nil { - return err - } + var mcrdt merklecrdt.MerkleCRDT + switch { + case block.Delta.IsCollection(): + mcrdt = merklecrdt.NewMerkleCollection( + vf.store, + keys.NewCollectionSchemaVersionKey(vf.col.Description().SchemaVersionID, vf.col.Description().ID), + keys.NewHeadstoreColKey(vf.col.Description().RootID), + ) - // first arg 0 is the index for the composite DAG in the mCRDTs cache - mcrdt, exists := vf.mCRDTs[0] - if !exists { + case block.Delta.IsComposite(): mcrdt = merklecrdt.NewMerkleCompositeDAG( vf.store, - keys.CollectionSchemaVersionKey{}, - vf.dsKey.WithFieldID(core.COMPOSITE_NAMESPACE), + keys.NewCollectionSchemaVersionKey(block.Delta.GetSchemaVersionID(), vf.col.Description().RootID), + keys.DataStoreKey{ + CollectionRootID: vf.col.Description().RootID, + DocID: string(block.Delta.GetDocID()), + FieldID: fmt.Sprint(core.COMPOSITE_NAMESPACE), + }, ) - vf.mCRDTs[0] = mcrdt - } - err = mcrdt.Clock().ProcessBlock(vf.ctx, block, link) - if err != nil { - return err - } - - // handle subgraphs - for _, l := range block.Links { - // get node - subBlock, err := vf.getDAGBlock(l.Link.Cid) - if err != nil { - return err - } - field, ok := vf.col.Definition().GetFieldByName(l.Name) + default: + field, ok := vf.col.Definition().GetFieldByName(block.Delta.GetFieldName()) if !ok { - return client.NewErrFieldNotExist(l.Name) + return client.NewErrFieldNotExist(block.Delta.GetFieldName()) } - mcrdt, exists := vf.mCRDTs[field.ID] - if !exists { - mcrdt, err = merklecrdt.FieldLevelCRDTWithStore( - vf.store, - keys.CollectionSchemaVersionKey{}, - field.Typ, - field.Kind, - vf.dsKey.WithFieldID(fmt.Sprint(field.ID)), - field.Name, - ) - if err != nil { - return err - } - vf.mCRDTs[field.ID] = mcrdt + mcrdt, err = merklecrdt.FieldLevelCRDTWithStore( + vf.store, + keys.NewCollectionSchemaVersionKey(block.Delta.GetSchemaVersionID(), vf.col.Description().RootID), + field.Typ, + field.Kind, + keys.DataStoreKey{ + CollectionRootID: vf.col.Description().RootID, + DocID: string(block.Delta.GetDocID()), + FieldID: fmt.Sprint(field.ID), + }, + field.Name, + ) + if err != nil { + return err } + } - err = mcrdt.Clock().ProcessBlock(vf.ctx, subBlock, l.Link) + err = mcrdt.Clock().ProcessBlock( + vf.ctx, + block, + cidlink.Link{ + Cid: c, + }, + ) + if err != nil { + return err + } + + // handle subgraphs + for _, l := range block.AllLinks() { + err = vf.merge(l.Cid) if err != nil { return err } diff --git a/internal/planner/select.go b/internal/planner/select.go index 56245666cf..6078d67650 100644 --- a/internal/planner/select.go +++ b/internal/planner/select.go @@ -263,17 +263,15 @@ func (n *selectNode) initSource() ([]aggregateNode, error) { return nil, err } - var docID string - if len(n.selectReq.DocIDs.Value()) > 0 { - docID = n.selectReq.DocIDs.Value()[0] - } - + // This exists because the fetcher interface demands a []Span, yet the versioned + // fetcher type (that will be the only one consuming this []Span) does not use it + // as either a span, or even a prefix. And with this design limitation this is + // currently the least bad way of passing the cid in to the fetcher. origScan.Spans( []core.Span{ core.NewSpan( keys.HeadstoreDocKey{ - DocID: docID, - Cid: c, + Cid: c, }, keys.HeadstoreDocKey{}, ), diff --git a/internal/request/graphql/schema/descriptions.go b/internal/request/graphql/schema/descriptions.go index 07a6873d61..b7c0373abc 100644 --- a/internal/request/graphql/schema/descriptions.go +++ b/internal/request/graphql/schema/descriptions.go @@ -73,11 +73,10 @@ An optional set of docIDs for this field. Only documents with a docID be ignored. ` cidArgDescription string = ` -An optional value that specifies the commit ID of a document to return. - This CID does not need to be the most recent for a document, if it - corresponds to an older version of a document the document will be returned - at the state it was in at the time of that commit. If a matching commit is - not found then an empty set will be returned. +An optional value that specifies the commit ID of a document or a branchable collection. + This CID does not need to be the most recent. If it corresponds to an older version + the document(s) will be returned at the state they were in at the time of that commit. + If a matching commit is not found then an empty set will be returned. ` singleFieldFilterArgDescription string = ` An optional filter for this join, if the related record does diff --git a/tests/integration/query/simple/with_cid_branchable_test.go b/tests/integration/query/simple/with_cid_branchable_test.go new file mode 100644 index 0000000000..58590a5d5a --- /dev/null +++ b/tests/integration/query/simple/with_cid_branchable_test.go @@ -0,0 +1,161 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package simple + +import ( + "testing" + + testUtils "github.com/sourcenetwork/defradb/tests/integration" +) + +func TestQuerySimpleWithCidOfBranchableCollection_FirstCid(t *testing.T) { + test := testUtils.TestCase{ + Actions: []any{ + testUtils.SchemaUpdate{ + Schema: ` + type Users @branchable { + name: String + } + `, + }, + testUtils.CreateDoc{ + Doc: `{ + "name": "Fred" + }`, + }, + testUtils.UpdateDoc{ + Doc: `{ + "name": "Freddddd" + }`, + }, + testUtils.CreateDoc{ + Doc: `{ + "name": "John" + }`, + }, + testUtils.Request{ + Request: `query { + Users ( + cid: "bafyreiewwsnu2ld5qlntamdm77ayb7xtmxz3p5difvaaakaome7zbtpo4u" + ) { + name + } + }`, + Results: map[string]any{ + "Users": []map[string]any{ + { + "name": "Fred", + }, + }, + }, + }, + }, + } + + testUtils.ExecuteTestCase(t, test) +} + +func TestQuerySimpleWithCidOfBranchableCollection_MiddleCid(t *testing.T) { + test := testUtils.TestCase{ + Actions: []any{ + testUtils.SchemaUpdate{ + Schema: ` + type Users @branchable { + name: String + } + `, + }, + testUtils.CreateDoc{ + Doc: `{ + "name": "Fred" + }`, + }, + testUtils.UpdateDoc{ + Doc: `{ + "name": "Freddddd" + }`, + }, + testUtils.CreateDoc{ + Doc: `{ + "name": "John" + }`, + }, + testUtils.Request{ + Request: `query { + Users ( + cid: "bafyreifpamlyhcbriztgbhds5ctgi5rm6w5wcar2py7246lo6j5v7iusxm" + ) { + name + } + }`, + Results: map[string]any{ + "Users": []map[string]any{ + { + "name": "Freddddd", + }, + }, + }, + }, + }, + } + + testUtils.ExecuteTestCase(t, test) +} + +func TestQuerySimpleWithCidOfBranchableCollection_LastCid(t *testing.T) { + test := testUtils.TestCase{ + Actions: []any{ + testUtils.SchemaUpdate{ + Schema: ` + type Users @branchable { + name: String + } + `, + }, + testUtils.CreateDoc{ + Doc: `{ + "name": "Fred" + }`, + }, + testUtils.UpdateDoc{ + Doc: `{ + "name": "Freddddd" + }`, + }, + testUtils.CreateDoc{ + Doc: `{ + "name": "John" + }`, + }, + testUtils.Request{ + Request: `query { + Users ( + cid: "bafyreigmt6ytph32jjxts2bij7fkne5ntionldsnklp35vcamvvl2x3a5i" + ) { + name + } + }`, + Results: map[string]any{ + "Users": []map[string]any{ + { + "name": "Freddddd", + }, + { + "name": "John", + }, + }, + }, + }, + }, + } + + testUtils.ExecuteTestCase(t, test) +} diff --git a/tests/integration/query/simple/with_cid_doc_id_branchable_test.go b/tests/integration/query/simple/with_cid_doc_id_branchable_test.go new file mode 100644 index 0000000000..18b8a55ad6 --- /dev/null +++ b/tests/integration/query/simple/with_cid_doc_id_branchable_test.go @@ -0,0 +1,67 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package simple + +import ( + "testing" + + testUtils "github.com/sourcenetwork/defradb/tests/integration" +) + +func TestQuerySimpleWithCidOfBranchableCollectionAndDocID(t *testing.T) { + test := testUtils.TestCase{ + Actions: []any{ + testUtils.SchemaUpdate{ + Schema: ` + type Users @branchable { + name: String + } + `, + }, + testUtils.CreateDoc{ + Doc: `{ + "name": "Fred" + }`, + }, + testUtils.CreateDoc{ + Doc: `{ + "name": "John" + }`, + }, + testUtils.UpdateDoc{ + Doc: `{ + "name": "Freddddd" + }`, + }, + testUtils.Request{ + // This is the cid of the collection-commit when the second doc (John) is created. + // Without the docID param both John and Fred should be returned. + Request: `query { + Users ( + cid: "bafyreiboen2mw2unu4fty2pyyd5nicqi57vcdahrrag6bjm54md5myj54u", + docID: "bae-3a7df128-bfa9-559a-a9c5-96f2bf6d1038" + ) { + name + } + }`, + Results: map[string]any{ + "Users": []map[string]any{ + { + "name": "Fred", + }, + }, + }, + }, + }, + } + + testUtils.ExecuteTestCase(t, test) +} diff --git a/tests/integration/query/simple/with_cid_doc_id_test.go b/tests/integration/query/simple/with_cid_doc_id_test.go index 8c6476b1e5..29a630ac19 100644 --- a/tests/integration/query/simple/with_cid_doc_id_test.go +++ b/tests/integration/query/simple/with_cid_doc_id_test.go @@ -335,7 +335,7 @@ func TestCidAndDocIDQuery_ContainsPNCounterWithIntKind_NoError(t *testing.T) { Request: `query { Users ( cid: "bafyreihsqayh6zvmjrvmma3sjmrb4bkeiyy6l56nt6y2t2tm4xajkif3gu", - docID: "bae-d8cb53d4-ac5a-5c55-8306-64df633d400d" + docID: "bae-bc5464e4-26a6-5307-b516-aada0abeb089" ) { name points @@ -389,7 +389,7 @@ func TestCidAndDocIDQuery_ContainsPNCounterWithFloatKind_NoError(t *testing.T) { Request: `query { Users ( cid: "bafyreigkdjnvkpqfjoqoke3aqc3b6ibb45xjuxx5djpk7c6tart2lw3dcm", - docID: "bae-d420ebcd-023a-5800-ae2e-8ea89442318e" + docID: "bae-2c7c40a7-92c1-5ed4-8a00-9e8595514945" ) { name points