diff --git a/errors/errors.go b/errors/errors.go index 45c1202e77..dd5bf706c4 100644 --- a/errors/errors.go +++ b/errors/errors.go @@ -63,10 +63,39 @@ func Wrap(message string, inner error, keyvals ...KV) error { return err } +// Is reports whether any error in err's tree matches target. +// +// The tree consists of err itself, followed by the errors obtained by repeatedly +// calling its Unwrap() error or Unwrap() []error method. When err wraps multiple +// errors, Is examines err followed by a depth-first traversal of its children. +// +// An error is considered to match a target if it is equal to that target or if +// it implements a method Is(error) bool such that Is(target) returns true. +// +// An error type might provide an Is method so it can be treated as equivalent +// to an existing error. For example, if MyError defines +// +// func (m MyError) Is(target error) bool { return target == fs.ErrExist } +// +// then Is(MyError{}, fs.ErrExist) returns true. See [syscall.Errno.Is] for +// an example in the standard library. An Is method should only shallowly +// compare err and the target and not call [Unwrap] on either. func Is(err, target error) bool { return errors.Is(err, target) } +// Join returns an error that wraps the given errors. +// Any nil error values are discarded. +// Join returns nil if every value in errs is nil. +// The error formats as the concatenation of the strings obtained +// by calling the Error method of each element of errs, with a newline +// between each string. +// +// A non-nil error returned by Join implements the Unwrap() []error method. +func Join(errs ...error) error { + return errors.Join(errs...) +} + // This function will not be inlined by the compiler as it will spoil any stacktrace // generated. // diff --git a/event/event.go b/event/event.go index 5ae882c6bb..53d5f0dbb4 100644 --- a/event/event.go +++ b/event/event.go @@ -96,6 +96,15 @@ type Merge struct { SchemaRoot string } +// MergeComplete is a notification that a merge has been completed. +type MergeComplete struct { + // Merge is the merge that was completed. + Merge Merge + + // Decrypted specifies if the merge payload was decrypted. + Decrypted bool +} + // Message contains event info. type Message struct { // Name is the name of the event this message was generated from. diff --git a/internal/db/collection_retriever.go b/internal/db/collection_retriever.go new file mode 100644 index 0000000000..6fc134c722 --- /dev/null +++ b/internal/db/collection_retriever.go @@ -0,0 +1,80 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package db + +import ( + "context" + + "github.com/sourcenetwork/immutable" + + "github.com/sourcenetwork/defradb/client" + "github.com/sourcenetwork/defradb/internal/db/description" +) + +// collectionRetriever is a helper struct that retrieves a collection from a document ID. +type collectionRetriever struct { + db client.DB +} + +// NewCollectionRetriever creates a new CollectionRetriever. +func NewCollectionRetriever(db client.DB) collectionRetriever { + return collectionRetriever{ + db: db, + } +} + +// RetrieveCollectionFromDocID retrieves a collection from a document ID. +func (r collectionRetriever) RetrieveCollectionFromDocID( + ctx context.Context, + docID string, +) (client.Collection, error) { + ctx, txn, err := ensureContextTxn(ctx, r.db, false) + if err != nil { + return nil, err + } + defer txn.Discard(ctx) + + headIterator, err := NewHeadBlocksIteratorFromTxn(ctx, txn, docID) + if err != nil { + return nil, err + } + + hasValue, err := headIterator.Next() + if err != nil { + return nil, err + } + + if !hasValue { + return nil, NewErrDocIDNotFound(docID) + } + + schema, err := description.GetSchemaVersion(ctx, txn, headIterator.CurrentBlock().Delta.GetSchemaVersionID()) + if err != nil { + return nil, err + } + + cols, err := r.db.GetCollections( + ctx, + client.CollectionFetchOptions{ + SchemaRoot: immutable.Some(schema.Root), + }, + ) + + if err != nil { + return nil, err + } + + if len(cols) == 0 { + return nil, NewErrCollectionWithSchemaRootNotFound(schema.Root) + } + + return cols[0], nil +} diff --git a/internal/db/db.go b/internal/db/db.go index f2782bbe3a..630bd0ae43 100644 --- a/internal/db/db.go +++ b/internal/db/db.go @@ -227,6 +227,35 @@ func (db *db) AddPolicy( return client.AddPolicyResult{PolicyID: policyID}, nil } +// publishDocUpdateEvent publishes an update event for a document. +// It uses heads iterator to read the document's head blocks directly from the storage, i.e. without +// using a transaction. +func (db *db) publishDocUpdateEvent(ctx context.Context, docID string, collection client.Collection) error { + headsIterator, err := NewHeadBlocksIterator(ctx, db.multistore.Headstore(), db.Blockstore(), docID) + if err != nil { + return err + } + + for { + hasValue, err := headsIterator.Next() + if err != nil { + return err + } + if !hasValue { + break + } + + updateEvent := event.Update{ + DocID: docID, + Cid: headsIterator.CurrentCid(), + SchemaRoot: collection.Schema().Root, + Block: headsIterator.CurrentRawBlock(), + } + db.events.Publish(event.NewMessage(event.UpdateName, updateEvent)) + } + return nil +} + func (db *db) AddDocActorRelationship( ctx context.Context, collectionName string, @@ -262,6 +291,13 @@ func (db *db) AddDocActorRelationship( return client.AddDocActorRelationshipResult{}, err } + if !exists { + err = db.publishDocUpdateEvent(ctx, docID, collection) + if err != nil { + return client.AddDocActorRelationshipResult{}, err + } + } + return client.AddDocActorRelationshipResult{ExistedAlready: exists}, nil } diff --git a/internal/db/errors.go b/internal/db/errors.go index bd38cf052e..1bc200f2b4 100644 --- a/internal/db/errors.go +++ b/internal/db/errors.go @@ -106,6 +106,8 @@ const ( errColNotMaterialized string = "non-materialized collections are not supported" errMaterializedViewAndACPNotSupported string = "materialized views do not support ACP" errInvalidDefaultFieldValue string = "default field value is invalid" + errDocIDNotFound string = "docID not found" + errCollectionWithSchemaRootNotFound string = "collection with schema root not found" ) var ( @@ -152,6 +154,8 @@ var ( ErrContextDone = errors.New("context done") ErrFailedToRetryDoc = errors.New("failed to retry doc") ErrTimeoutDocRetry = errors.New("timeout while retrying doc") + ErrDocIDNotFound = errors.New(errDocIDNotFound) + ErrorCollectionWithSchemaRootNotFound = errors.New(errCollectionWithSchemaRootNotFound) ) // NewErrFailedToGetHeads returns a new error indicating that the heads of a document @@ -690,3 +694,11 @@ func NewErrDefaultFieldValueInvalid(collection string, inner error) error { errors.NewKV("Inner", inner), ) } + +func NewErrDocIDNotFound(docID string) error { + return errors.New(errDocIDNotFound, errors.NewKV("DocID", docID)) +} + +func NewErrCollectionWithSchemaRootNotFound(schemaRoot string) error { + return errors.New(errCollectionWithSchemaRootNotFound, errors.NewKV("SchemaRoot", schemaRoot)) +} diff --git a/internal/db/iterator.go b/internal/db/iterator.go new file mode 100644 index 0000000000..00519d1915 --- /dev/null +++ b/internal/db/iterator.go @@ -0,0 +1,104 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package db + +import ( + "context" + + "github.com/ipfs/go-cid" + + "github.com/sourcenetwork/defradb/datastore" + "github.com/sourcenetwork/defradb/internal/core" + coreblock "github.com/sourcenetwork/defradb/internal/core/block" + "github.com/sourcenetwork/defradb/internal/keys" + "github.com/sourcenetwork/defradb/internal/merkle/clock" +) + +// DocHeadBlocksIterator is an iterator that iterates over the head blocks of a document. +type DocHeadBlocksIterator struct { + ctx context.Context + blockstore datastore.Blockstore + cids []cid.Cid + + currentCid cid.Cid + currentBlock *coreblock.Block + currentRawBlock []byte +} + +// NewHeadBlocksIterator creates a new DocHeadBlocksIterator. +func NewHeadBlocksIterator( + ctx context.Context, + headstore datastore.DSReaderWriter, + blockstore datastore.Blockstore, + docID string, +) (*DocHeadBlocksIterator, error) { + headStoreKey := keys.HeadStoreKey{ + DocID: docID, + FieldID: core.COMPOSITE_NAMESPACE, + } + headset := clock.NewHeadSet(headstore, headStoreKey) + cids, _, err := headset.List(ctx) + if err != nil { + return nil, err + } + return &DocHeadBlocksIterator{ + ctx: ctx, + blockstore: blockstore, + cids: cids, + }, nil +} + +// NewHeadBlocksIteratorFromTxn creates a new DocHeadBlocksIterator from a transaction. +func NewHeadBlocksIteratorFromTxn( + ctx context.Context, + txn datastore.Txn, + docID string, +) (*DocHeadBlocksIterator, error) { + return NewHeadBlocksIterator(ctx, txn.Headstore(), txn.Blockstore(), docID) +} + +// Next advances the iterator to the next block. +func (h *DocHeadBlocksIterator) Next() (bool, error) { + if len(h.cids) == 0 { + return false, nil + } + nextCid := h.cids[0] + h.cids = h.cids[1:] + + rawBlock, err := h.blockstore.Get(h.ctx, nextCid) + if err != nil { + return false, err + } + blk, err := coreblock.GetFromBytes(rawBlock.RawData()) + if err != nil { + return false, err + } + + h.currentCid = nextCid + h.currentBlock = blk + h.currentRawBlock = rawBlock.RawData() + return true, nil +} + +// CurrentCid returns the CID of the current block. +func (h *DocHeadBlocksIterator) CurrentCid() cid.Cid { + return h.currentCid +} + +// CurrentBlock returns the current block. +func (h *DocHeadBlocksIterator) CurrentBlock() *coreblock.Block { + return h.currentBlock +} + +// CurrentRawBlock returns the raw data of the current block. +func (h *DocHeadBlocksIterator) CurrentRawBlock() []byte { + return h.currentRawBlock +} diff --git a/internal/db/merge.go b/internal/db/merge.go index 74db1ad302..898700a9ed 100644 --- a/internal/db/merge.go +++ b/internal/db/merge.go @@ -84,7 +84,10 @@ func (db *db) executeMerge(ctx context.Context, dagMerge event.Merge) error { } // send a complete event so we can track merges in the integration tests - db.events.Publish(event.NewMessage(event.MergeCompleteName, dagMerge)) + db.events.Publish(event.NewMessage(event.MergeCompleteName, event.MergeComplete{ + Merge: dagMerge, + Decrypted: len(mp.missingEncryptionBlocks) == 0, + })) return nil } @@ -264,7 +267,9 @@ func (mp *mergeProcessor) tryFetchMissingBlocksAndMerge(ctx context.Context) err return res.Error } - clear(mp.missingEncryptionBlocks) + if len(res.Items) == 0 { + return nil + } for i := range res.Items { _, link, err := cid.CidFromBytes(res.Items[i].Link) @@ -280,6 +285,8 @@ func (mp *mergeProcessor) tryFetchMissingBlocksAndMerge(ctx context.Context) err mp.availableEncryptionBlocks[cidlink.Link{Cid: link}] = &encBlock } + clear(mp.missingEncryptionBlocks) + err := mp.mergeComposites(ctx) if err != nil { return err diff --git a/internal/db/p2p_replicator.go b/internal/db/p2p_replicator.go index 61c082d210..a6d28f261a 100644 --- a/internal/db/p2p_replicator.go +++ b/internal/db/p2p_replicator.go @@ -23,10 +23,9 @@ import ( "github.com/sourcenetwork/defradb/client" "github.com/sourcenetwork/defradb/datastore" - "github.com/sourcenetwork/defradb/errors" + dbErrors "github.com/sourcenetwork/defradb/errors" "github.com/sourcenetwork/defradb/event" "github.com/sourcenetwork/defradb/internal/core" - coreblock "github.com/sourcenetwork/defradb/internal/core/block" "github.com/sourcenetwork/defradb/internal/keys" "github.com/sourcenetwork/defradb/internal/merkle/clock" ) @@ -163,7 +162,7 @@ func (db *db) getDocsHeads( log.ErrorContextE( ctx, "Failed to get all docIDs", - NewErrReplicatorDocID(err, errors.NewKV("Collection", col.Name().Value())), + NewErrReplicatorDocID(err, dbErrors.NewKV("Collection", col.Name().Value())), ) continue } @@ -646,31 +645,28 @@ func (db *db) retryDoc(ctx context.Context, docID string) error { return err } defer txn.Discard(ctx) - headStoreKey := keys.HeadStoreKey{ - DocID: docID, - FieldID: core.COMPOSITE_NAMESPACE, - } - headset := clock.NewHeadSet(txn.Headstore(), headStoreKey) - cids, _, err := headset.List(ctx) + + headsIterator, err := NewHeadBlocksIteratorFromTxn(ctx, txn, docID) if err != nil { return err } - for _, c := range cids { + for { select { case <-ctx.Done(): return ErrContextDone default: } - rawblk, err := txn.Blockstore().Get(ctx, c) + + hasValue, err := headsIterator.Next() if err != nil { return err } - blk, err := coreblock.GetFromBytes(rawblk.RawData()) - if err != nil { - return err + if !hasValue { + break } - schema, err := db.getSchemaByVersionID(ctx, blk.Delta.GetSchemaVersionID()) + + schema, err := db.getSchemaByVersionID(ctx, headsIterator.CurrentBlock().Delta.GetSchemaVersionID()) if err != nil { return err } @@ -678,9 +674,9 @@ func (db *db) retryDoc(ctx context.Context, docID string) error { defer close(successChan) updateEvent := event.Update{ DocID: docID, - Cid: c, + Cid: headsIterator.CurrentCid(), SchemaRoot: schema.Root, - Block: rawblk.RawData(), + Block: headsIterator.CurrentRawBlock(), IsRetry: true, // Because the retry is done in a separate goroutine but the retry handling process should be synchronous, // we use a channel to block while waiting for the success status of the retry. diff --git a/internal/db/permission/check.go b/internal/db/permission/check.go index b19500f41b..ce111bccaf 100644 --- a/internal/db/permission/check.go +++ b/internal/db/permission/check.go @@ -50,7 +50,7 @@ func CheckAccessOfDocOnCollectionWithACP( // Now that we know acp is available and the collection is permissioned, before checking access with // acp directly we need to make sure that the document is not public, as public documents will not - // be regestered with acp. We give unrestricted access to public documents, so it does not matter + // be registered with acp. We give unrestricted access to public documents, so it does not matter // whether the request has a signature identity or not at this stage of the check. isRegistered, err := acpSystem.IsDocRegistered( ctx, @@ -69,7 +69,7 @@ func CheckAccessOfDocOnCollectionWithACP( // At this point if the request is not signatured, then it has no access, because: // the collection has a policy on it, and the acp is enabled/available, - // and the document is not public (is regestered with acp). + // and the document is not public (is registered with acp). if !identity.HasValue() { return false, nil } diff --git a/internal/kms/pubsub.go b/internal/kms/pubsub.go index cbcd6ee141..13cef16269 100644 --- a/internal/kms/pubsub.go +++ b/internal/kms/pubsub.go @@ -20,12 +20,18 @@ import ( cidlink "github.com/ipld/go-ipld-prime/linking/cid" libpeer "github.com/libp2p/go-libp2p/core/peer" rpc "github.com/sourcenetwork/go-libp2p-pubsub-rpc" + "github.com/sourcenetwork/immutable" grpcpeer "google.golang.org/grpc/peer" + "github.com/sourcenetwork/defradb/acp" + "github.com/sourcenetwork/defradb/acp/identity" + "github.com/sourcenetwork/defradb/client" "github.com/sourcenetwork/defradb/crypto" "github.com/sourcenetwork/defradb/datastore" "github.com/sourcenetwork/defradb/errors" "github.com/sourcenetwork/defradb/event" + coreblock "github.com/sourcenetwork/defradb/internal/core/block" + "github.com/sourcenetwork/defradb/internal/db/permission" "github.com/sourcenetwork/defradb/internal/encryption" ) @@ -36,6 +42,10 @@ type PubSubServer interface { SendPubSubMessage(context.Context, string, []byte) (<-chan rpc.Response, error) } +type CollectionRetriever interface { + RetrieveCollectionFromDocID(context.Context, string) (client.Collection, error) +} + type pubSubService struct { ctx context.Context peerID libpeer.ID @@ -43,6 +53,9 @@ type pubSubService struct { keyRequestedSub *event.Subscription eventBus *event.Bus encStore *ipldEncStorage + acp immutable.Option[acp.ACP] + colRetriever CollectionRetriever + nodeDID string } var _ Service = (*pubSubService)(nil) @@ -69,13 +82,19 @@ func NewPubSubService( pubsub PubSubServer, eventBus *event.Bus, encstore datastore.Blockstore, + acp immutable.Option[acp.ACP], + colRetriever CollectionRetriever, + nodeDID string, ) (*pubSubService, error) { s := &pubSubService{ - ctx: ctx, - peerID: peerID, - pubsub: pubsub, - eventBus: eventBus, - encStore: newIPLDEncryptionStorage(encstore), + ctx: ctx, + peerID: peerID, + pubsub: pubsub, + eventBus: eventBus, + encStore: newIPLDEncryptionStorage(encstore), + acp: acp, + colRetriever: colRetriever, + nodeDID: nodeDID, } err := pubsub.AddPubSubTopic(pubsubTopic, s.handleRequestFromPeer) if err != nil { @@ -127,6 +146,7 @@ func (s *pubSubService) handleKeyRequestedEvent() { } type fetchEncryptionKeyRequest struct { + Identity []byte Links [][]byte EphemeralPublicKey []byte } @@ -153,6 +173,7 @@ func (s *pubSubService) prepareFetchEncryptionKeyRequest( ephemeralPublicKey []byte, ) (*fetchEncryptionKeyRequest, error) { req := &fetchEncryptionKeyRequest{ + Identity: []byte(s.nodeDID), EphemeralPublicKey: ephemeralPublicKey, } @@ -260,9 +281,12 @@ func (s *pubSubService) tryGenEncryptionKeyLocally( req *fetchEncryptionKeyRequest, ) (*fetchEncryptionKeyReply, error) { blocks, err := s.getEncryptionKeysLocally(ctx, req) - if err != nil || len(blocks) == 0 { + if err != nil { return nil, err } + if len(blocks) == 0 { + return &fetchEncryptionKeyReply{}, nil + } reqEphPubKey, err := crypto.X25519PublicKeyFromBytes(req.EphemeralPublicKey) if err != nil { @@ -317,6 +341,14 @@ func (s *pubSubService) getEncryptionKeysLocally( continue } + hasPerm, err := s.doesIdentityHaveDocPermission(ctx, string(req.Identity), encBlock) + if err != nil { + return nil, err + } + if !hasPerm { + continue + } + encBlockBytes, err := encBlock.Marshal() if err != nil { return nil, err @@ -327,6 +359,31 @@ func (s *pubSubService) getEncryptionKeysLocally( return blocks, nil } +func (s *pubSubService) doesIdentityHaveDocPermission( + ctx context.Context, + actorIdentity string, + entBlock *coreblock.Encryption, +) (bool, error) { + if !s.acp.HasValue() { + return true, nil + } + + docID := string(entBlock.DocID) + collection, err := s.colRetriever.RetrieveCollectionFromDocID(ctx, docID) + if err != nil { + return false, err + } + + return permission.CheckAccessOfDocOnCollectionWithACP( + ctx, + immutable.Some(identity.Identity{DID: actorIdentity}), + s.acp.Value(), + collection, + acp.ReadPermission, + docID, + ) +} + func encodeToBase64(data []byte) []byte { encoded := make([]byte, base64.StdEncoding.EncodedLen(len(data))) base64.StdEncoding.Encode(encoded, data) diff --git a/node/node.go b/node/node.go index 0a1b813862..aa47bfbc5c 100644 --- a/node/node.go +++ b/node/node.go @@ -158,6 +158,12 @@ func (n *Node) Start(ctx context.Context) error { if err != nil { return err } + + ident, err := n.DB.GetNodeIdentity(ctx) + if err != nil { + return err + } + if n.options.kmsType.HasValue() { switch n.options.kmsType.Value() { case kms.PubSubServiceType: @@ -167,6 +173,9 @@ func (n *Node) Start(ctx context.Context) error { n.Peer.Server(), n.DB.Events(), n.DB.Encstore(), + acp, + db.NewCollectionRetriever(n.DB), + ident.Value().DID, ) } if err != nil { diff --git a/tests/integration/acp.go b/tests/integration/acp.go index 8269245757..b98be7a059 100644 --- a/tests/integration/acp.go +++ b/tests/integration/acp.go @@ -181,11 +181,14 @@ func addDocActorRelationshipACP( s *state, action AddDocActorRelationship, ) { + var docID string + actionNodeID := action.NodeID nodeIDs, nodes := getNodesWithIDs(action.NodeID, s.nodes) for index, node := range nodes { nodeID := nodeIDs[index] - collectionName, docID := getCollectionAndDocInfo(s, action.CollectionID, action.DocID, nodeID) + var collectionName string + collectionName, docID = getCollectionAndDocInfo(s, action.CollectionID, action.DocID, nodeID) exists, err := node.AddDocActorRelationship( getContextWithIdentity(s.ctx, s, action.RequestorIdentity, nodeID), @@ -206,9 +209,14 @@ func addDocActorRelationshipACP( // The relationship should only be added to a SourceHub chain once - there is no need to loop through // the nodes. if acpType == SourceHubACPType { + actionNodeID = immutable.Some(0) break } } + + if action.ExpectedError == "" && !action.ExpectedExistence { + waitForUpdateEvents(s, actionNodeID, map[string]struct{}{docID: {}}) + } } // DeleteDocActorRelationship will attempt to delete a relationship between a document and an actor. @@ -356,7 +364,9 @@ func setupSourceHub(s *state) ([]node.ACPOpt, error) { return nil, err } - out, err := exec.Command("sourcehubd", "init", moniker, "--chain-id", chainID, "--home", directory).CombinedOutput() + args := []string{"init", moniker, "--chain-id", chainID, "--home", directory} + s.t.Log("$ sourcehubd " + strings.Join(args, " ")) + out, err := exec.Command("sourcehubd", args...).CombinedOutput() s.t.Log(string(out)) if err != nil { return nil, err @@ -389,22 +399,27 @@ func setupSourceHub(s *state) ([]node.ACPOpt, error) { return nil, err } - out, err = exec.Command( - "sourcehubd", "keys", "import-hex", validatorName, acpKeyHex, + args = []string{ + "keys", "import-hex", validatorName, acpKeyHex, "--keyring-backend", keyringBackend, "--home", directory, - ).CombinedOutput() + } + + s.t.Log("$ sourcehubd " + strings.Join(args, " ")) + out, err = exec.Command("sourcehubd", args...).CombinedOutput() s.t.Log(string(out)) if err != nil { return nil, err } - out, err = exec.Command( - "sourcehubd", "keys", "show", validatorName, + args = []string{ + "keys", "show", validatorName, "--address", "--keyring-backend", keyringBackend, "--home", directory, - ).CombinedOutput() + } + s.t.Log("$ sourcehubd " + strings.Join(args, " ")) + out, err = exec.Command("sourcehubd", args...).CombinedOutput() s.t.Log(string(out)) if err != nil { return nil, err @@ -414,28 +429,31 @@ func setupSourceHub(s *state) ([]node.ACPOpt, error) { validatorAddress := strings.TrimSpace(string(out)) s.sourcehubAddress = validatorAddress - out, err = exec.Command( - "sourcehubd", "genesis", "add-genesis-account", validatorAddress, "900000000stake", + args = []string{"genesis", "add-genesis-account", validatorAddress, "900000000stake", "--keyring-backend", keyringBackend, "--home", directory, - ).CombinedOutput() + } + s.t.Log("$ sourcehubd " + strings.Join(args, " ")) + out, err = exec.Command("sourcehubd", args...).CombinedOutput() s.t.Log(string(out)) if err != nil { return nil, err } - out, err = exec.Command( - "sourcehubd", "genesis", "gentx", validatorName, "10000000stake", + args = []string{"genesis", "gentx", validatorName, "10000000stake", "--chain-id", chainID, "--keyring-backend", keyringBackend, - "--home", directory, - ).CombinedOutput() + "--home", directory} + s.t.Log("$ sourcehubd " + strings.Join(args, " ")) + out, err = exec.Command("sourcehubd", args...).CombinedOutput() s.t.Log(string(out)) if err != nil { return nil, err } - out, err = exec.Command("sourcehubd", "genesis", "collect-gentxs", "--home", directory).CombinedOutput() + args = []string{"genesis", "collect-gentxs", "--home", directory} + s.t.Log("$ sourcehubd " + strings.Join(args, " ")) + out, err = exec.Command("sourcehubd", args...).CombinedOutput() s.t.Log(string(out)) if err != nil { return nil, err @@ -485,8 +503,7 @@ func setupSourceHub(s *state) ([]node.ACPOpt, error) { releaseP2pPort() releasePprofPort() - sourceHubCmd := exec.Command( - "sourcehubd", + args = []string{ "start", "--minimum-gas-prices", "0stake", "--home", directory, @@ -494,7 +511,9 @@ func setupSourceHub(s *state) ([]node.ACPOpt, error) { "--rpc.laddr", rpcAddress, "--p2p.laddr", p2pAddress, "--rpc.pprof_laddr", pprofAddress, - ) + } + s.t.Log("$ sourcehubd " + strings.Join(args, " ")) + sourceHubCmd := exec.Command("sourcehubd", args...) var bf testBuffer bf.Lines = make(chan string, 100) sourceHubCmd.Stdout = &bf @@ -566,23 +585,32 @@ func getFreePort() (int, func(), error) { // crossLock forms a cross process lock by attempting to listen to the given port. // -// This function will only return once the port is free. A function to unbind from the -// port is returned - this unlock function may be called multiple times without issue. +// This function will only return once the port is free or the timeout is reached. +// A function to unbind from the port is returned - this unlock function may be called +// multiple times without issue. func crossLock(port uint16) (func(), error) { - l, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%v", port)) - if err != nil { - if strings.Contains(err.Error(), "address already in use") { - time.Sleep(5 * time.Millisecond) - return crossLock(port) + timeout := time.After(20 * time.Second) + for { + select { + case <-timeout: + return nil, fmt.Errorf("timeout reached while trying to acquire cross process lock on port %v", port) + default: + l, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%v", port)) + if err != nil { + if strings.Contains(err.Error(), "address already in use") { + time.Sleep(5 * time.Millisecond) + continue + } + return nil, err + } + + return func() { + // there are no errors that this returns that we actually care about + _ = l.Close() + }, + nil } - return nil, err } - - return func() { - // there are no errors that this returns that we actually care about - _ = l.Close() - }, - nil } func getNodeAudience(s *state, nodeIndex int) immutable.Option[string] { diff --git a/tests/integration/encryption/peer_acp_test.go b/tests/integration/encryption/peer_acp_test.go new file mode 100644 index 0000000000..bb6705c626 --- /dev/null +++ b/tests/integration/encryption/peer_acp_test.go @@ -0,0 +1,468 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package encryption + +import ( + "fmt" + "testing" + "time" + + "github.com/sourcenetwork/immutable" + + testUtils "github.com/sourcenetwork/defradb/tests/integration" +) + +const policy = ` +name: Test Policy + +description: A Policy + +actor: + name: actor + +resources: + users: + permissions: + read: + expr: owner + reader + writer + + write: + expr: owner + writer + + nothing: + expr: dummy + + relations: + owner: + types: + - actor + + reader: + types: + - actor + + writer: + types: + - actor + + admin: + manages: + - reader + types: + - actor + + dummy: + types: + - actor +` + +func TestDocEncryptionACP_IfUserAndNodeHaveAccess_ShouldFetch(t *testing.T) { + expectedPolicyID := "fc56b7509c20ac8ce682b3b9b4fdaad868a9c70dda6ec16720298be64f16e9a4" + + test := testUtils.TestCase{ + KMS: testUtils.KMS{Activated: true}, + SupportedACPTypes: immutable.Some( + []testUtils.ACPType{ + testUtils.SourceHubACPType, + }, + ), + Actions: []any{ + testUtils.RandomNetworkingConfig(), + testUtils.RandomNetworkingConfig(), + testUtils.AddPolicy{ + Identity: testUtils.ClientIdentity(0), + Policy: policy, + ExpectedPolicyID: expectedPolicyID, + }, + testUtils.SchemaUpdate{ + Schema: fmt.Sprintf(` + type Users @policy( + id: "%s", + resource: "users" + ) { + name: String + age: Int + } + `, + expectedPolicyID, + ), + }, + testUtils.ConnectPeers{ + SourceNodeID: 1, + TargetNodeID: 0, + }, + testUtils.SubscribeToCollection{ + NodeID: 1, + CollectionIDs: []int{0}, + }, + testUtils.CreateDoc{ + NodeID: immutable.Some(0), + Identity: testUtils.ClientIdentity(0), + Doc: ` + { + "name": "Fred", + "age": 33 + } + `, + IsDocEncrypted: true, + }, + testUtils.AddDocActorRelationship{ + RequestorIdentity: testUtils.ClientIdentity(0), + TargetIdentity: testUtils.ClientIdentity(1), + DocID: 0, + Relation: "reader", + }, + testUtils.AddDocActorRelationship{ + RequestorIdentity: testUtils.ClientIdentity(0), + TargetIdentity: testUtils.NodeIdentity(1), + DocID: 0, + Relation: "reader", + }, + testUtils.WaitForSync{ + Decrypted: []int{0}, + }, + testUtils.Request{ + NodeID: immutable.Some(1), + Identity: testUtils.ClientIdentity(1), + Request: ` + query { + Users { + name + } + } + `, + Results: map[string]any{ + "Users": []map[string]any{ + {"name": "Fred"}, + }, + }, + }, + }, + } + + testUtils.ExecuteTestCase(t, test) +} + +func TestDocEncryptionACP_IfUserHasAccessButNotNode_ShouldNotFetch(t *testing.T) { + expectedPolicyID := "fc56b7509c20ac8ce682b3b9b4fdaad868a9c70dda6ec16720298be64f16e9a4" + + test := testUtils.TestCase{ + KMS: testUtils.KMS{Activated: true}, + SupportedACPTypes: immutable.Some( + []testUtils.ACPType{ + testUtils.SourceHubACPType, + }, + ), + Actions: []any{ + testUtils.RandomNetworkingConfig(), + testUtils.RandomNetworkingConfig(), + testUtils.AddPolicy{ + Identity: testUtils.ClientIdentity(0), + Policy: policy, + ExpectedPolicyID: expectedPolicyID, + }, + testUtils.SchemaUpdate{ + Schema: fmt.Sprintf(` + type Users @policy( + id: "%s", + resource: "users" + ) { + name: String + age: Int + } + `, + expectedPolicyID, + ), + }, + testUtils.ConnectPeers{ + SourceNodeID: 1, + TargetNodeID: 0, + }, + testUtils.SubscribeToCollection{ + NodeID: 1, + CollectionIDs: []int{0}, + }, + testUtils.CreateDoc{ + NodeID: immutable.Some(0), + Identity: testUtils.ClientIdentity(0), + Doc: ` + { + "name": "Fred", + "age": 33 + } + `, + IsDocEncrypted: true, + }, + testUtils.AddDocActorRelationship{ + RequestorIdentity: testUtils.ClientIdentity(0), + TargetIdentity: testUtils.ClientIdentity(1), + DocID: 0, + Relation: "reader", + }, + testUtils.Wait{Duration: 100 * time.Millisecond}, + testUtils.Request{ + NodeID: immutable.Some(1), + Identity: testUtils.ClientIdentity(1), + Request: ` + query { + Users { + name + } + } + `, + Results: map[string]any{ + "Users": []map[string]any{}, + }, + }, + }, + } + + testUtils.ExecuteTestCase(t, test) +} + +func TestDocEncryptionACP_IfNodeHasAccessToSomeDocs_ShouldFetchOnlyThem(t *testing.T) { + expectedPolicyID := "fc56b7509c20ac8ce682b3b9b4fdaad868a9c70dda6ec16720298be64f16e9a4" + + test := testUtils.TestCase{ + KMS: testUtils.KMS{Activated: true}, + SupportedACPTypes: immutable.Some( + []testUtils.ACPType{ + testUtils.SourceHubACPType, + }, + ), + Actions: []any{ + testUtils.RandomNetworkingConfig(), + testUtils.RandomNetworkingConfig(), + testUtils.AddPolicy{ + Identity: testUtils.NodeIdentity(0), + Policy: policy, + ExpectedPolicyID: expectedPolicyID, + }, + testUtils.SchemaUpdate{ + Schema: fmt.Sprintf(` + type Users @policy( + id: "%s", + resource: "users" + ) { + name: String + age: Int + } + `, + expectedPolicyID, + ), + }, + testUtils.ConnectPeers{ + SourceNodeID: 1, + TargetNodeID: 0, + }, + testUtils.SubscribeToCollection{ + NodeID: 1, + CollectionIDs: []int{0}, + }, + // encrypted, private, shared + testUtils.CreateDoc{ + NodeID: immutable.Some(0), + Identity: testUtils.NodeIdentity(0), + Doc: ` + { + "name": "Fred", + "age": 33 + } + `, + IsDocEncrypted: true, + }, + testUtils.AddDocActorRelationship{ + RequestorIdentity: testUtils.NodeIdentity(0), + TargetIdentity: testUtils.NodeIdentity(1), + DocID: 0, + Relation: "reader", + }, + // encrypted, private, not shared + testUtils.CreateDoc{ + NodeID: immutable.Some(0), + Identity: testUtils.NodeIdentity(0), + Doc: ` + { + "name": "Andy", + "age": 33 + } + `, + IsDocEncrypted: true, + }, + // encrypted, public + testUtils.CreateDoc{ + NodeID: immutable.Some(0), + Doc: ` + { + "name": "Islam", + "age": 33 + } + `, + IsDocEncrypted: true, + }, + // not encrypted, private, shared + testUtils.CreateDoc{ + NodeID: immutable.Some(0), + Identity: testUtils.NodeIdentity(0), + Doc: ` + { + "name": "John", + "age": 33 + } + `, + }, + testUtils.AddDocActorRelationship{ + RequestorIdentity: testUtils.NodeIdentity(0), + TargetIdentity: testUtils.NodeIdentity(1), + DocID: 3, + Relation: "reader", + }, + // not encrypted, private, not shared + testUtils.CreateDoc{ + NodeID: immutable.Some(0), + Identity: testUtils.NodeIdentity(0), + Doc: ` + { + "name": "Keenan", + "age": 33 + } + `, + }, + // not encrypted, public + testUtils.CreateDoc{ + NodeID: immutable.Some(0), + Doc: ` + { + "name": "Shahzad", + "age": 33 + } + `, + }, + testUtils.WaitForSync{ + Decrypted: []int{0, 2}, + }, + testUtils.Request{ + NodeID: immutable.Some(1), + Identity: testUtils.NodeIdentity(1), + Request: ` + query { + Users { + name + } + } + `, + Results: map[string]any{ + "Users": []map[string]any{ + {"name": "John"}, + {"name": "Islam"}, + {"name": "Shahzad"}, + {"name": "Fred"}, + }, + }, + }, + }, + } + + testUtils.ExecuteTestCase(t, test) +} + +func TestDocEncryptionACP_IfClientNodeHasDocPermissionButServerNodeIsNotAvailable_ShouldNotFetch(t *testing.T) { + expectedPolicyID := "fc56b7509c20ac8ce682b3b9b4fdaad868a9c70dda6ec16720298be64f16e9a4" + + test := testUtils.TestCase{ + KMS: testUtils.KMS{Activated: true}, + SupportedACPTypes: immutable.Some( + []testUtils.ACPType{ + testUtils.SourceHubACPType, + }, + ), + Actions: []any{ + testUtils.RandomNetworkingConfig(), + testUtils.RandomNetworkingConfig(), + testUtils.RandomNetworkingConfig(), + testUtils.AddPolicy{ + Identity: testUtils.NodeIdentity(0), + Policy: policy, + ExpectedPolicyID: expectedPolicyID, + }, + testUtils.SchemaUpdate{ + Schema: fmt.Sprintf(` + type Users @policy( + id: "%s", + resource: "users" + ) { + name: String + age: Int + } + `, + expectedPolicyID, + ), + }, + testUtils.ConnectPeers{ + SourceNodeID: 1, + TargetNodeID: 0, + }, + testUtils.SubscribeToCollection{ + NodeID: 1, + CollectionIDs: []int{0}, + }, + testUtils.ConnectPeers{ + SourceNodeID: 2, + TargetNodeID: 0, + }, + testUtils.SubscribeToCollection{ + NodeID: 2, + CollectionIDs: []int{0}, + }, + testUtils.CreateDoc{ + NodeID: immutable.Some(0), + Identity: testUtils.NodeIdentity(0), + Doc: ` + { + "name": "Fred", + "age": 33 + } + `, + IsDocEncrypted: true, + }, + testUtils.WaitForSync{}, + testUtils.Close{ + NodeID: immutable.Some(0), + }, + testUtils.AddDocActorRelationship{ + NodeID: immutable.Some(1), + RequestorIdentity: testUtils.NodeIdentity(0), + TargetIdentity: testUtils.NodeIdentity(1), + DocID: 0, + Relation: "reader", + }, + testUtils.Wait{ + Duration: 100 * time.Millisecond, + }, + testUtils.Request{ + NodeID: immutable.Some(1), + Identity: testUtils.NodeIdentity(1), + Request: ` + query { + Users { + name + } + } + `, + Results: map[string]any{ + "Users": []map[string]any{}, + }, + }, + }, + } + + testUtils.ExecuteTestCase(t, test) +} diff --git a/tests/integration/events.go b/tests/integration/events.go index 6129d600ee..1fbc64416e 100644 --- a/tests/integration/events.go +++ b/tests/integration/events.go @@ -74,7 +74,7 @@ func waitForReplicatorConfigureEvent(s *state, cfg ConfigureReplicator) { // all previous documents should be merged on the subscriber node for key, val := range s.nodeP2P[cfg.SourceNodeID].actualDocHeads { - s.nodeP2P[cfg.TargetNodeID].expectedDocHeads[key] = val + s.nodeP2P[cfg.TargetNodeID].expectedDocHeads[key] = val.cid } // update node connections and replicators @@ -160,6 +160,10 @@ func waitForUpdateEvents( continue // node is not selected } + if _, ok := s.closedNodes[i]; ok { + continue // node is closed + } + expect := make(map[string]struct{}, len(docIDs)) for k := range docIDs { expect[k] = struct{}{} @@ -170,17 +174,17 @@ func waitForUpdateEvents( select { case msg, ok := <-s.nodeEvents[i].update.Message(): if !ok { - require.Fail(s.t, "subscription closed waiting for update event") + require.Fail(s.t, "subscription closed waiting for update event", "Node %d", i) } evt = msg.Data.(event.Update) case <-time.After(eventTimeout): - require.Fail(s.t, "timeout waiting for update event") + require.Fail(s.t, "timeout waiting for update event", "Node %d", i) } // make sure the event is expected _, ok := expect[evt.DocID] - require.True(s.t, ok, "unexpected document update") + require.True(s.t, ok, "unexpected document update", "Node %d", i) delete(expect, evt.DocID) // we only need to update the network state if the nodes @@ -196,41 +200,63 @@ func waitForUpdateEvents( // // Will fail the test if an event is not received within the expected time interval to prevent tests // from running forever. -func waitForMergeEvents(s *state) { +func waitForMergeEvents(s *state, action WaitForSync) { for nodeID := 0; nodeID < len(s.nodes); nodeID++ { + if _, ok := s.closedNodes[nodeID]; ok { + continue // node is closed + } + expect := s.nodeP2P[nodeID].expectedDocHeads // remove any docs that are already merged // up to the expected document head for key, val := range s.nodeP2P[nodeID].actualDocHeads { - if head, ok := expect[key]; ok && head.String() == val.String() { + if head, ok := expect[key]; ok && head.String() == val.cid.String() { delete(expect, key) } } + expectDecrypted := make(map[string]struct{}, len(action.Decrypted)) + for _, docIndex := range action.Decrypted { + if len(s.docIDs[0]) <= docIndex { + require.Fail(s.t, "doc index %d out of range", docIndex) + } + docID := s.docIDs[0][docIndex].String() + actual, hasActual := s.nodeP2P[nodeID].actualDocHeads[docID] + if !hasActual || !actual.decrypted { + expectDecrypted[docID] = struct{}{} + } + } + // wait for all expected doc heads to be merged // // the order of merges does not matter as we only // expect the latest head to eventually be merged // // unexpected merge events are ignored - for len(expect) > 0 { - var evt event.Merge + for len(expect) > 0 || len(expectDecrypted) > 0 { + var evt event.MergeComplete select { case msg, ok := <-s.nodeEvents[nodeID].merge.Message(): if !ok { require.Fail(s.t, "subscription closed waiting for merge complete event") } - evt = msg.Data.(event.Merge) + evt = msg.Data.(event.MergeComplete) case <-time.After(30 * eventTimeout): require.Fail(s.t, "timeout waiting for merge complete event") } - head, ok := expect[evt.DocID] - if ok && head.String() == evt.Cid.String() { - delete(expect, evt.DocID) + _, ok := expectDecrypted[evt.Merge.DocID] + if ok && evt.Decrypted { + delete(expectDecrypted, evt.Merge.DocID) + } + + head, ok := expect[evt.Merge.DocID] + if ok && head.String() == evt.Merge.Cid.String() { + delete(expect, evt.Merge.DocID) } + s.nodeP2P[nodeID].actualDocHeads[evt.Merge.DocID] = docHeadState{cid: evt.Merge.Cid, decrypted: evt.Decrypted} } } } @@ -247,7 +273,8 @@ func updateNetworkState(s *state, nodeID int, evt event.Update) { } // update the actual document head on the node that updated it - s.nodeP2P[nodeID].actualDocHeads[evt.DocID] = evt.Cid + // as the node created the document, it is already decrypted + s.nodeP2P[nodeID].actualDocHeads[evt.DocID] = docHeadState{cid: evt.Cid, decrypted: true} // update the expected document heads of replicator targets for id := range s.nodeP2P[nodeID].replicators { @@ -309,8 +336,8 @@ func getEventsForCreateDoc(s *state, action CreateDoc) map[string]struct{} { return expect } -func waitForSync(s *state) { - waitForMergeEvents(s) +func waitForSync(s *state, action WaitForSync) { + waitForMergeEvents(s, action) } // getEventsForUpdateWithFilter returns a map of docIDs that should be diff --git a/tests/integration/p2p.go b/tests/integration/p2p.go index 7c5b20e69a..87e224dce4 100644 --- a/tests/integration/p2p.go +++ b/tests/integration/p2p.go @@ -133,7 +133,10 @@ type GetAllP2PCollections struct { // // For example you will likely wish to `WaitForSync` after creating a document in node 0 before querying // node 1 to see if it has been replicated. -type WaitForSync struct{} +type WaitForSync struct { + // Decrypted is a list of document indexes that are expected to be merged and synced decrypted. + Decrypted []int +} // connectPeers connects two existing, started, nodes as peers. It returns a channel // that will receive an empty struct upon sync completion of all expected peer-sync events. diff --git a/tests/integration/state.go b/tests/integration/state.go index b4a3777d03..e7130f2ebd 100644 --- a/tests/integration/state.go +++ b/tests/integration/state.go @@ -45,7 +45,7 @@ type p2pState struct { // actualDocHeads contains all document heads that exist on a node. // // The map key is the doc id. The map value is the doc head. - actualDocHeads map[string]cid.Cid + actualDocHeads map[string]docHeadState // expectedDocHeads contains all document heads that are expected to exist on a node. // @@ -53,13 +53,22 @@ type p2pState struct { expectedDocHeads map[string]cid.Cid } +// docHeadState contains the state of a document head. +// It is used to track if a document at a certain head has been decrypted. +type docHeadState struct { + // The actual document head. + cid cid.Cid + // Indicates if the document at the given head has been decrypted. + decrypted bool +} + // newP2PState returns a new empty p2p state. func newP2PState() *p2pState { return &p2pState{ connections: make(map[int]struct{}), replicators: make(map[int]struct{}), peerCollections: make(map[int]struct{}), - actualDocHeads: make(map[string]cid.Cid), + actualDocHeads: make(map[string]docHeadState), expectedDocHeads: make(map[string]cid.Cid), } } @@ -156,6 +165,9 @@ type state struct { // The nodes active in this test. nodes []clients.Client + // closedNodes contains the indexes of nodes that have been closed. + closedNodes map[int]struct{} + // nodeP2P contains p2p states for all nodes nodeP2P []*p2pState @@ -223,6 +235,7 @@ func newState( nodeConfigs: [][]net.NodeOpt{}, nodeP2P: []*p2pState{}, nodes: []clients.Client{}, + closedNodes: map[int]struct{}{}, dbPaths: []string{}, collections: [][]client.Collection{}, collectionNames: collectionNames, diff --git a/tests/integration/test_case.go b/tests/integration/test_case.go index e1c9b0b6f1..a1ab291257 100644 --- a/tests/integration/test_case.go +++ b/tests/integration/test_case.go @@ -12,6 +12,7 @@ package tests import ( "testing" + "time" "github.com/lens-vm/lens/host-go/config/model" "github.com/sourcenetwork/immutable" @@ -806,3 +807,9 @@ type GetNodeIdentity struct { // Default value is `NoIdentity()`. ExpectedIdentity immutable.Option[identityRef] } + +// Wait is an action that will wait for the given duration. +type Wait struct { + // Duration is the duration to wait. + Duration time.Duration +} diff --git a/tests/integration/utils.go b/tests/integration/utils.go index aff1ebecb7..f827ac0130 100644 --- a/tests/integration/utils.go +++ b/tests/integration/utils.go @@ -400,7 +400,10 @@ func performAction( assertClientIntrospectionResults(s, action) case WaitForSync: - waitForSync(s) + waitForSync(s, action) + + case Wait: + <-time.After(action.Duration) case Benchmark: benchmarkAction(s, actionIndex, action) @@ -574,9 +577,10 @@ func closeNodes( s *state, action Close, ) { - _, nodes := getNodesWithIDs(action.NodeID, s.nodes) - for _, node := range nodes { + nodeIDs, nodes := getNodesWithIDs(action.NodeID, s.nodes) + for i, node := range nodes { node.Close() + s.closedNodes[nodeIDs[i]] = struct{}{} } } @@ -781,6 +785,8 @@ func startNodes(s *state, action Start) { require.NoError(s.t, err) s.nodeEvents[nodeIndex] = eventState + delete(s.closedNodes, nodeIndex) + waitForNetworkSetupEvents(s, i) }