Skip to content

Commit

Permalink
feat: Add ACP to pubsub KMS (#3206)
Browse files Browse the repository at this point in the history
## Relevant issue(s)

Resolves #2893

This change adds ACP to pubsub KMS.

A new event `MergeComplete` was introduced in order to make it
correspond to `MergeCompleteName`. This is necessary to notify listeners
that the merge was executed on decrypted blocks.

Upon granting access to a document via `AddDocActorRelationship` we
publish not `Update` event so that the actors that being given access to
can now request a document anew.

In testing framework `WaitForSync` is extended to allow specifying
documents that should be received in decrypted state.
  • Loading branch information
islamaliev authored Nov 7, 2024
1 parent 6d6c9f2 commit 2f53878
Show file tree
Hide file tree
Showing 18 changed files with 972 additions and 81 deletions.
29 changes: 29 additions & 0 deletions errors/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,39 @@ func Wrap(message string, inner error, keyvals ...KV) error {
return err
}

// Is reports whether any error in err's tree matches target.
//
// The tree consists of err itself, followed by the errors obtained by repeatedly
// calling its Unwrap() error or Unwrap() []error method. When err wraps multiple
// errors, Is examines err followed by a depth-first traversal of its children.
//
// An error is considered to match a target if it is equal to that target or if
// it implements a method Is(error) bool such that Is(target) returns true.
//
// An error type might provide an Is method so it can be treated as equivalent
// to an existing error. For example, if MyError defines
//
// func (m MyError) Is(target error) bool { return target == fs.ErrExist }
//
// then Is(MyError{}, fs.ErrExist) returns true. See [syscall.Errno.Is] for
// an example in the standard library. An Is method should only shallowly
// compare err and the target and not call [Unwrap] on either.
func Is(err, target error) bool {
return errors.Is(err, target)
}

// Join returns an error that wraps the given errors.
// Any nil error values are discarded.
// Join returns nil if every value in errs is nil.
// The error formats as the concatenation of the strings obtained
// by calling the Error method of each element of errs, with a newline
// between each string.
//
// A non-nil error returned by Join implements the Unwrap() []error method.
func Join(errs ...error) error {
return errors.Join(errs...)
}

// This function will not be inlined by the compiler as it will spoil any stacktrace
// generated.
//
Expand Down
9 changes: 9 additions & 0 deletions event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,15 @@ type Merge struct {
SchemaRoot string
}

// MergeComplete is a notification that a merge has been completed.
type MergeComplete struct {
// Merge is the merge that was completed.
Merge Merge

// Decrypted specifies if the merge payload was decrypted.
Decrypted bool
}

// Message contains event info.
type Message struct {
// Name is the name of the event this message was generated from.
Expand Down
80 changes: 80 additions & 0 deletions internal/db/collection_retriever.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright 2024 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package db

import (
"context"

"github.com/sourcenetwork/immutable"

"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/internal/db/description"
)

// collectionRetriever is a helper struct that retrieves a collection from a document ID.
type collectionRetriever struct {
db client.DB
}

// NewCollectionRetriever creates a new CollectionRetriever.
func NewCollectionRetriever(db client.DB) collectionRetriever {
return collectionRetriever{
db: db,
}
}

// RetrieveCollectionFromDocID retrieves a collection from a document ID.
func (r collectionRetriever) RetrieveCollectionFromDocID(
ctx context.Context,
docID string,
) (client.Collection, error) {
ctx, txn, err := ensureContextTxn(ctx, r.db, false)
if err != nil {
return nil, err
}
defer txn.Discard(ctx)

headIterator, err := NewHeadBlocksIteratorFromTxn(ctx, txn, docID)
if err != nil {
return nil, err
}

hasValue, err := headIterator.Next()
if err != nil {
return nil, err
}

if !hasValue {
return nil, NewErrDocIDNotFound(docID)
}

schema, err := description.GetSchemaVersion(ctx, txn, headIterator.CurrentBlock().Delta.GetSchemaVersionID())
if err != nil {
return nil, err
}

cols, err := r.db.GetCollections(
ctx,
client.CollectionFetchOptions{
SchemaRoot: immutable.Some(schema.Root),
},
)

if err != nil {
return nil, err
}

if len(cols) == 0 {
return nil, NewErrCollectionWithSchemaRootNotFound(schema.Root)
}

return cols[0], nil
}
36 changes: 36 additions & 0 deletions internal/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,35 @@ func (db *db) AddPolicy(
return client.AddPolicyResult{PolicyID: policyID}, nil
}

// publishDocUpdateEvent publishes an update event for a document.
// It uses heads iterator to read the document's head blocks directly from the storage, i.e. without
// using a transaction.
func (db *db) publishDocUpdateEvent(ctx context.Context, docID string, collection client.Collection) error {
headsIterator, err := NewHeadBlocksIterator(ctx, db.multistore.Headstore(), db.Blockstore(), docID)
if err != nil {
return err
}

for {
hasValue, err := headsIterator.Next()
if err != nil {
return err
}
if !hasValue {
break
}

updateEvent := event.Update{
DocID: docID,
Cid: headsIterator.CurrentCid(),
SchemaRoot: collection.Schema().Root,
Block: headsIterator.CurrentRawBlock(),
}
db.events.Publish(event.NewMessage(event.UpdateName, updateEvent))
}
return nil
}

func (db *db) AddDocActorRelationship(
ctx context.Context,
collectionName string,
Expand Down Expand Up @@ -262,6 +291,13 @@ func (db *db) AddDocActorRelationship(
return client.AddDocActorRelationshipResult{}, err
}

if !exists {
err = db.publishDocUpdateEvent(ctx, docID, collection)
if err != nil {
return client.AddDocActorRelationshipResult{}, err
}
}

return client.AddDocActorRelationshipResult{ExistedAlready: exists}, nil
}

Expand Down
12 changes: 12 additions & 0 deletions internal/db/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ const (
errColNotMaterialized string = "non-materialized collections are not supported"
errMaterializedViewAndACPNotSupported string = "materialized views do not support ACP"
errInvalidDefaultFieldValue string = "default field value is invalid"
errDocIDNotFound string = "docID not found"
errCollectionWithSchemaRootNotFound string = "collection with schema root not found"
)

var (
Expand Down Expand Up @@ -152,6 +154,8 @@ var (
ErrContextDone = errors.New("context done")
ErrFailedToRetryDoc = errors.New("failed to retry doc")
ErrTimeoutDocRetry = errors.New("timeout while retrying doc")
ErrDocIDNotFound = errors.New(errDocIDNotFound)
ErrorCollectionWithSchemaRootNotFound = errors.New(errCollectionWithSchemaRootNotFound)
)

// NewErrFailedToGetHeads returns a new error indicating that the heads of a document
Expand Down Expand Up @@ -690,3 +694,11 @@ func NewErrDefaultFieldValueInvalid(collection string, inner error) error {
errors.NewKV("Inner", inner),
)
}

func NewErrDocIDNotFound(docID string) error {
return errors.New(errDocIDNotFound, errors.NewKV("DocID", docID))
}

func NewErrCollectionWithSchemaRootNotFound(schemaRoot string) error {
return errors.New(errCollectionWithSchemaRootNotFound, errors.NewKV("SchemaRoot", schemaRoot))
}
104 changes: 104 additions & 0 deletions internal/db/iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright 2024 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package db

import (
"context"

"github.com/ipfs/go-cid"

"github.com/sourcenetwork/defradb/datastore"
"github.com/sourcenetwork/defradb/internal/core"
coreblock "github.com/sourcenetwork/defradb/internal/core/block"
"github.com/sourcenetwork/defradb/internal/keys"
"github.com/sourcenetwork/defradb/internal/merkle/clock"
)

// DocHeadBlocksIterator is an iterator that iterates over the head blocks of a document.
type DocHeadBlocksIterator struct {
ctx context.Context
blockstore datastore.Blockstore
cids []cid.Cid

currentCid cid.Cid
currentBlock *coreblock.Block
currentRawBlock []byte
}

// NewHeadBlocksIterator creates a new DocHeadBlocksIterator.
func NewHeadBlocksIterator(
ctx context.Context,
headstore datastore.DSReaderWriter,
blockstore datastore.Blockstore,
docID string,
) (*DocHeadBlocksIterator, error) {
headStoreKey := keys.HeadStoreKey{
DocID: docID,
FieldID: core.COMPOSITE_NAMESPACE,
}
headset := clock.NewHeadSet(headstore, headStoreKey)
cids, _, err := headset.List(ctx)
if err != nil {
return nil, err
}
return &DocHeadBlocksIterator{
ctx: ctx,
blockstore: blockstore,
cids: cids,
}, nil
}

// NewHeadBlocksIteratorFromTxn creates a new DocHeadBlocksIterator from a transaction.
func NewHeadBlocksIteratorFromTxn(
ctx context.Context,
txn datastore.Txn,
docID string,
) (*DocHeadBlocksIterator, error) {
return NewHeadBlocksIterator(ctx, txn.Headstore(), txn.Blockstore(), docID)
}

// Next advances the iterator to the next block.
func (h *DocHeadBlocksIterator) Next() (bool, error) {
if len(h.cids) == 0 {
return false, nil
}
nextCid := h.cids[0]
h.cids = h.cids[1:]

rawBlock, err := h.blockstore.Get(h.ctx, nextCid)
if err != nil {
return false, err
}
blk, err := coreblock.GetFromBytes(rawBlock.RawData())
if err != nil {
return false, err
}

h.currentCid = nextCid
h.currentBlock = blk
h.currentRawBlock = rawBlock.RawData()
return true, nil
}

// CurrentCid returns the CID of the current block.
func (h *DocHeadBlocksIterator) CurrentCid() cid.Cid {
return h.currentCid
}

// CurrentBlock returns the current block.
func (h *DocHeadBlocksIterator) CurrentBlock() *coreblock.Block {
return h.currentBlock
}

// CurrentRawBlock returns the raw data of the current block.
func (h *DocHeadBlocksIterator) CurrentRawBlock() []byte {
return h.currentRawBlock
}
11 changes: 9 additions & 2 deletions internal/db/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,10 @@ func (db *db) executeMerge(ctx context.Context, dagMerge event.Merge) error {
}

// send a complete event so we can track merges in the integration tests
db.events.Publish(event.NewMessage(event.MergeCompleteName, dagMerge))
db.events.Publish(event.NewMessage(event.MergeCompleteName, event.MergeComplete{
Merge: dagMerge,
Decrypted: len(mp.missingEncryptionBlocks) == 0,
}))
return nil
}

Expand Down Expand Up @@ -264,7 +267,9 @@ func (mp *mergeProcessor) tryFetchMissingBlocksAndMerge(ctx context.Context) err
return res.Error
}

clear(mp.missingEncryptionBlocks)
if len(res.Items) == 0 {
return nil
}

for i := range res.Items {
_, link, err := cid.CidFromBytes(res.Items[i].Link)
Expand All @@ -280,6 +285,8 @@ func (mp *mergeProcessor) tryFetchMissingBlocksAndMerge(ctx context.Context) err
mp.availableEncryptionBlocks[cidlink.Link{Cid: link}] = &encBlock
}

clear(mp.missingEncryptionBlocks)

err := mp.mergeComposites(ctx)
if err != nil {
return err
Expand Down
Loading

0 comments on commit 2f53878

Please sign in to comment.