Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add ACP to pubsub KMS #3206

Merged
29 changes: 29 additions & 0 deletions errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,39 @@
return err
}

// Is reports whether any error in err's tree matches target.
//
// The tree consists of err itself, followed by the errors obtained by repeatedly
// calling its Unwrap() error or Unwrap() []error method. When err wraps multiple
// errors, Is examines err followed by a depth-first traversal of its children.
//
// An error is considered to match a target if it is equal to that target or if
// it implements a method Is(error) bool such that Is(target) returns true.
//
// An error type might provide an Is method so it can be treated as equivalent
// to an existing error. For example, if MyError defines
//
// func (m MyError) Is(target error) bool { return target == fs.ErrExist }
//
// then Is(MyError{}, fs.ErrExist) returns true. See [syscall.Errno.Is] for
// an example in the standard library. An Is method should only shallowly
// compare err and the target and not call [Unwrap] on either.
func Is(err, target error) bool {
return errors.Is(err, target)
}

// Join returns an error that wraps the given errors.
// Any nil error values are discarded.
// Join returns nil if every value in errs is nil.
// The error formats as the concatenation of the strings obtained
// by calling the Error method of each element of errs, with a newline
// between each string.
//
// A non-nil error returned by Join implements the Unwrap() []error method.
func Join(errs ...error) error {
return errors.Join(errs...)

Check warning on line 96 in errors/errors.go

View check run for this annotation

Codecov / codecov/patch

errors/errors.go#L95-L96

Added lines #L95 - L96 were not covered by tests
}

// This function will not be inlined by the compiler as it will spoil any stacktrace
// generated.
//
Expand Down
9 changes: 9 additions & 0 deletions event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,15 @@ type Merge struct {
SchemaRoot string
}

// MergeComplete is a notification that a merge has been completed.
type MergeComplete struct {
// Merge is the merge that was completed.
Merge Merge

// Decrypted specifies if the merge payload was decrypted.
Decrypted bool
}

// Message contains event info.
type Message struct {
// Name is the name of the event this message was generated from.
Expand Down
80 changes: 80 additions & 0 deletions internal/db/collection_retriever.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright 2024 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package db

import (
"context"

"github.com/sourcenetwork/immutable"

"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/internal/db/description"
)

// collectionRetriever is a helper struct that retrieves a collection from a document ID.
type collectionRetriever struct {
db client.DB
}

// NewCollectionRetriever creates a new CollectionRetriever.
func NewCollectionRetriever(db client.DB) *collectionRetriever {
return &collectionRetriever{
db: db,
}
}

// RetrieveCollectionFromDocID retrieves a collection from a document ID.
func (r *collectionRetriever) RetrieveCollectionFromDocID(
ctx context.Context,
docID string,
) (client.Collection, error) {
ctx, txn, err := ensureContextTxn(ctx, r.db, false)
if err != nil {
return nil, err
}

Check warning on line 42 in internal/db/collection_retriever.go

View check run for this annotation

Codecov / codecov/patch

internal/db/collection_retriever.go#L41-L42

Added lines #L41 - L42 were not covered by tests
defer txn.Discard(ctx)

headIterator, err := NewHeadBlocksIteratorFromTxn(ctx, txn, docID)
if err != nil {
return nil, err
}

Check warning on line 48 in internal/db/collection_retriever.go

View check run for this annotation

Codecov / codecov/patch

internal/db/collection_retriever.go#L47-L48

Added lines #L47 - L48 were not covered by tests

hasValue, err := headIterator.Next()
if err != nil {
return nil, err
}

Check warning on line 53 in internal/db/collection_retriever.go

View check run for this annotation

Codecov / codecov/patch

internal/db/collection_retriever.go#L52-L53

Added lines #L52 - L53 were not covered by tests

if !hasValue {
return nil, NewErrDocIDNotFound(docID)
}

Check warning on line 57 in internal/db/collection_retriever.go

View check run for this annotation

Codecov / codecov/patch

internal/db/collection_retriever.go#L56-L57

Added lines #L56 - L57 were not covered by tests

schema, err := description.GetSchemaVersion(ctx, txn, headIterator.CurrentBlock().Delta.GetSchemaVersionID())
if err != nil {
return nil, err
}

Check warning on line 62 in internal/db/collection_retriever.go

View check run for this annotation

Codecov / codecov/patch

internal/db/collection_retriever.go#L61-L62

Added lines #L61 - L62 were not covered by tests

cols, err := r.db.GetCollections(
ctx,
client.CollectionFetchOptions{
SchemaRoot: immutable.Some(schema.Root),
},
)

if err != nil {
return nil, err
}

Check warning on line 73 in internal/db/collection_retriever.go

View check run for this annotation

Codecov / codecov/patch

internal/db/collection_retriever.go#L72-L73

Added lines #L72 - L73 were not covered by tests

if len(cols) == 0 {
return nil, NewErrCollectionWithSchemaRootNotFound(schema.Root)
}

Check warning on line 77 in internal/db/collection_retriever.go

View check run for this annotation

Codecov / codecov/patch

internal/db/collection_retriever.go#L76-L77

Added lines #L76 - L77 were not covered by tests

return cols[0], nil
}
33 changes: 33 additions & 0 deletions internal/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,32 @@
return client.AddPolicyResult{PolicyID: policyID}, nil
}

func (db *db) publishDocUpdateEvent(ctx context.Context, docID string, collection client.Collection) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo: This function is scoped to a large and important type, does very similar things to a couple of existing places, and ignores the context txn. It looks very easy to erroneously call this, and in an area where testing is relatively weak (transactions).

Please document this function to highlight that it ignores the transaction, and does a whole load of work to find the heads (something the two existing locations do not need to do).

Please do not refactor the existing locations as I have changed them in another open PR, with this 3rd addition we can look at refactoring the 3 after this (and maybe the other PR, depending on timing) has merged.

headsIterator, err := NewHeadBlocksIterator(ctx, db.multistore.Headstore(), db.Blockstore(), docID)
if err != nil {
return err
}

Check warning on line 234 in internal/db/db.go

View check run for this annotation

Codecov / codecov/patch

internal/db/db.go#L233-L234

Added lines #L233 - L234 were not covered by tests

for {
hasValue, err := headsIterator.Next()
if err != nil {
return err
}

Check warning on line 240 in internal/db/db.go

View check run for this annotation

Codecov / codecov/patch

internal/db/db.go#L239-L240

Added lines #L239 - L240 were not covered by tests
if !hasValue {
break
}

updateEvent := event.Update{
DocID: docID,
Cid: headsIterator.CurrentCid(),
SchemaRoot: collection.Schema().Root,
Block: headsIterator.CurrentRawBlock(),
}
db.events.Publish(event.NewMessage(event.UpdateName, updateEvent))
}
return nil
}

func (db *db) AddDocActorRelationship(
ctx context.Context,
collectionName string,
Expand Down Expand Up @@ -262,6 +288,13 @@
return client.AddDocActorRelationshipResult{}, err
}

if !exists {
err = db.publishDocUpdateEvent(ctx, docID, collection)
if err != nil {
return client.AddDocActorRelationshipResult{}, err
}

Check warning on line 295 in internal/db/db.go

View check run for this annotation

Codecov / codecov/patch

internal/db/db.go#L294-L295

Added lines #L294 - L295 were not covered by tests
}

return client.AddDocActorRelationshipResult{ExistedAlready: exists}, nil
}

Expand Down
12 changes: 12 additions & 0 deletions internal/db/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@
errColNotMaterialized string = "non-materialized collections are not supported"
errMaterializedViewAndACPNotSupported string = "materialized views do not support ACP"
errInvalidDefaultFieldValue string = "default field value is invalid"
errDocIDNotFound string = "docID not found"
errCollectionWithSchemaRootNotFound string = "collection with schema root not found"
)

var (
Expand Down Expand Up @@ -152,6 +154,8 @@
ErrContextDone = errors.New("context done")
ErrFailedToRetryDoc = errors.New("failed to retry doc")
ErrTimeoutDocRetry = errors.New("timeout while retrying doc")
ErrDocIDNotFound = errors.New(errDocIDNotFound)
ErrorCollectionWithSchemaRootNotFound = errors.New(errCollectionWithSchemaRootNotFound)
)

// NewErrFailedToGetHeads returns a new error indicating that the heads of a document
Expand Down Expand Up @@ -690,3 +694,11 @@
errors.NewKV("Inner", inner),
)
}

func NewErrDocIDNotFound(docID string) error {
return errors.New(errDocIDNotFound, errors.NewKV("DocID", docID))

Check warning on line 699 in internal/db/errors.go

View check run for this annotation

Codecov / codecov/patch

internal/db/errors.go#L698-L699

Added lines #L698 - L699 were not covered by tests
}

func NewErrCollectionWithSchemaRootNotFound(schemaRoot string) error {
return errors.New(errCollectionWithSchemaRootNotFound, errors.NewKV("SchemaRoot", schemaRoot))

Check warning on line 703 in internal/db/errors.go

View check run for this annotation

Codecov / codecov/patch

internal/db/errors.go#L702-L703

Added lines #L702 - L703 were not covered by tests
}
104 changes: 104 additions & 0 deletions internal/db/iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright 2024 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package db

import (
"context"

"github.com/ipfs/go-cid"

"github.com/sourcenetwork/defradb/datastore"
"github.com/sourcenetwork/defradb/internal/core"
coreblock "github.com/sourcenetwork/defradb/internal/core/block"
"github.com/sourcenetwork/defradb/internal/keys"
"github.com/sourcenetwork/defradb/internal/merkle/clock"
)

// DocHeadBlocksIterator is an iterator that iterates over the head blocks of a document.
type DocHeadBlocksIterator struct {
ctx context.Context
blockstore datastore.Blockstore
cids []cid.Cid

currentCid cid.Cid
currentBlock *coreblock.Block
currentRawBlock []byte
}

// NewHeadBlocksIterator creates a new DocHeadBlocksIterator.
func NewHeadBlocksIterator(
ctx context.Context,
headstore datastore.DSReaderWriter,
blockstore datastore.Blockstore,
docID string,
) (*DocHeadBlocksIterator, error) {
headStoreKey := keys.HeadStoreKey{
DocID: docID,
FieldID: core.COMPOSITE_NAMESPACE,
}
headset := clock.NewHeadSet(headstore, headStoreKey)
cids, _, err := headset.List(ctx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo: There appears to be no reason to fully scan all the head cids in the constructor, only to return an iterator from it that then iterates through the results. It seems to defeat the point a little bit and makes the code look odd, and more scary than it needs to be.

This is worse given that in the normal/happy use-case (RetrieveCollectionFromDocID, not retry), *DocHeadBlocksIterator.Next is only called once, and so only one of the potentially many fetched cids are actually used.

Please rework this (maybe make HeadSet an iterator) to avoid the extra fetching.

Note: Please see my note RE a bug in the model before spending time on this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Next is only called once

It uses all produced value in db::db.publishDocUpdateEvent and db::db.retryDoc.
Only in db::CollectionRetriever.RetrieveCollectionFromDocID it uses just the first produces value.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I missed publishDocUpdateEvent thanks - that is still half of the happy-path paths though. And doesn't change the unusualness of doing extra/discarded work in the constructor of a type that is already an iterator.

I'm not sure we'll convince each other either way here though, and it's only immediate impact on users is (probably minor) performance, so if you don't want to do the suggestion please leave this thread unresolved in case anyone else feels like joining the conversation :)

if err != nil {
return nil, err
}

Check warning on line 51 in internal/db/iterator.go

View check run for this annotation

Codecov / codecov/patch

internal/db/iterator.go#L50-L51

Added lines #L50 - L51 were not covered by tests
return &DocHeadBlocksIterator{
ctx: ctx,
blockstore: blockstore,
cids: cids,
}, nil
}

// NewHeadBlocksIteratorFromTxn creates a new DocHeadBlocksIterator from a transaction.
func NewHeadBlocksIteratorFromTxn(
ctx context.Context,
txn datastore.Txn,
docID string,
) (*DocHeadBlocksIterator, error) {
return NewHeadBlocksIterator(ctx, txn.Headstore(), txn.Blockstore(), docID)
}

// Next advances the iterator to the next block.
func (h *DocHeadBlocksIterator) Next() (bool, error) {
if len(h.cids) == 0 {
return false, nil
}
nextCid := h.cids[0]
h.cids = h.cids[1:]

rawBlock, err := h.blockstore.Get(h.ctx, nextCid)
if err != nil {
return false, err
}

Check warning on line 79 in internal/db/iterator.go

View check run for this annotation

Codecov / codecov/patch

internal/db/iterator.go#L78-L79

Added lines #L78 - L79 were not covered by tests
blk, err := coreblock.GetFromBytes(rawBlock.RawData())
if err != nil {
return false, err
}

Check warning on line 83 in internal/db/iterator.go

View check run for this annotation

Codecov / codecov/patch

internal/db/iterator.go#L82-L83

Added lines #L82 - L83 were not covered by tests

h.currentCid = nextCid
h.currentBlock = blk
h.currentRawBlock = rawBlock.RawData()
return true, nil
}

// CurrentCid returns the CID of the current block.
func (h *DocHeadBlocksIterator) CurrentCid() cid.Cid {
return h.currentCid
}

// CurrentBlock returns the current block.
func (h *DocHeadBlocksIterator) CurrentBlock() *coreblock.Block {
return h.currentBlock
}

// CurrentRawBlock returns the raw data of the current block.
func (h *DocHeadBlocksIterator) CurrentRawBlock() []byte {
return h.currentRawBlock
}
11 changes: 9 additions & 2 deletions internal/db/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,10 @@ func (db *db) executeMerge(ctx context.Context, dagMerge event.Merge) error {
}

// send a complete event so we can track merges in the integration tests
db.events.Publish(event.NewMessage(event.MergeCompleteName, dagMerge))
db.events.Publish(event.NewMessage(event.MergeCompleteName, event.MergeComplete{
Merge: dagMerge,
Decrypted: len(mp.missingEncryptionBlocks) == 0,
}))
return nil
}

Expand Down Expand Up @@ -264,7 +267,9 @@ func (mp *mergeProcessor) tryFetchMissingBlocksAndMerge(ctx context.Context) err
return res.Error
}

clear(mp.missingEncryptionBlocks)
if len(res.Items) == 0 {
return nil
}

for i := range res.Items {
_, link, err := cid.CidFromBytes(res.Items[i].Link)
Expand All @@ -280,6 +285,8 @@ func (mp *mergeProcessor) tryFetchMissingBlocksAndMerge(ctx context.Context) err
mp.availableEncryptionBlocks[cidlink.Link{Cid: link}] = &encBlock
}

clear(mp.missingEncryptionBlocks)

err := mp.mergeComposites(ctx)
if err != nil {
return err
Expand Down
Loading
Loading