Skip to content

Commit

Permalink
add P2P ACP
Browse files Browse the repository at this point in the history
  • Loading branch information
fredcarle committed Dec 11, 2024
1 parent e1502c5 commit 25d14f1
Show file tree
Hide file tree
Showing 21 changed files with 746 additions and 107 deletions.
44 changes: 39 additions & 5 deletions acp/identity/identity.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,19 @@ func FromPrivateKey(privateKey *secp256k1.PrivateKey) (Identity, error) {
}, nil
}

// FromPublicRawIdentity returns a new raw identity using the given public raw identity.
func FromPublicRawIdentity(rawIdentity PublicRawIdentity) (Identity, error) {
publicKey, err := secp256k1.ParsePubKey([]byte(rawIdentity.PublicKey))
if err != nil {
return Identity{}, err
}

Check warning on line 73 in acp/identity/identity.go

View check run for this annotation

Codecov / codecov/patch

acp/identity/identity.go#L69-L73

Added lines #L69 - L73 were not covered by tests

return Identity{
DID: rawIdentity.DID,
PublicKey: publicKey,
}, nil

Check warning on line 78 in acp/identity/identity.go

View check run for this annotation

Codecov / codecov/patch

acp/identity/identity.go#L75-L78

Added lines #L75 - L78 were not covered by tests
}

// FromToken constructs a new `Identity` from a bearer token.
func FromToken(data []byte) (Identity, error) {
token, err := jwt.Parse(data, jwt.WithVerify(false))
Expand Down Expand Up @@ -127,6 +140,28 @@ func (identity *Identity) UpdateToken(
audience immutable.Option[string],
authorizedAccount immutable.Option[string],
) error {
signedToken, err := identity.NewToken(duration, audience, authorizedAccount)
if err != nil {
return err
}

Check warning on line 146 in acp/identity/identity.go

View check run for this annotation

Codecov / codecov/patch

acp/identity/identity.go#L145-L146

Added lines #L145 - L146 were not covered by tests

identity.BearerToken = string(signedToken)
return nil
}

// NewToken creates and returns a new `BearerToken`.
//
// - duration: The [time.Duration] that this identity is valid for.
// - audience: The audience that this identity is valid for. This is required
// by the Defra http client. For example `github.com/sourcenetwork/defradb`
// - authorizedAccount: An account that this identity is authorizing to make
// SourceHub calls on behalf of this actor. This is currently required when
// using SourceHub ACP.
func (identity Identity) NewToken(
duration time.Duration,
audience immutable.Option[string],
authorizedAccount immutable.Option[string],
) ([]byte, error) {
var signedToken []byte
subject := hex.EncodeToString(identity.PublicKey.SerializeCompressed())
now := time.Now()
Expand All @@ -144,21 +179,20 @@ func (identity *Identity) UpdateToken(

token, err := jwtBuilder.Build()
if err != nil {
return err
return nil, err

Check warning on line 182 in acp/identity/identity.go

View check run for this annotation

Codecov / codecov/patch

acp/identity/identity.go#L182

Added line #L182 was not covered by tests
}

if authorizedAccount.HasValue() {
err = token.Set(acptypes.AuthorizedAccountClaim, authorizedAccount.Value())
if err != nil {
return err
return nil, err

Check warning on line 188 in acp/identity/identity.go

View check run for this annotation

Codecov / codecov/patch

acp/identity/identity.go#L188

Added line #L188 was not covered by tests
}
}

signedToken, err = jwt.Sign(token, jwt.WithKey(BearerTokenSignatureScheme, identity.PrivateKey.ToECDSA()))
if err != nil {
return err
return nil, err

Check warning on line 194 in acp/identity/identity.go

View check run for this annotation

Codecov / codecov/patch

acp/identity/identity.go#L194

Added line #L194 was not covered by tests
}

identity.BearerToken = string(signedToken)
return nil
return signedToken, nil
}
11 changes: 9 additions & 2 deletions internal/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func NewDB(
acp immutable.Option[acp.ACP],
lens client.LensRegistry,
options ...Option,
) (client.DB, error) {
) (*db, error) {
return newDB(ctx, rootstore, acp, lens, options...)
}

Expand Down Expand Up @@ -339,13 +339,20 @@ func (db *db) DeleteDocActorRelationship(
return client.DeleteDocActorRelationshipResult{RecordFound: recordFound}, nil
}

func (db *db) GetNodeIdentity(context.Context) (immutable.Option[identity.PublicRawIdentity], error) {
func (db *db) GetNodeIdentity(_ context.Context) (immutable.Option[identity.PublicRawIdentity], error) {
if db.nodeIdentity.HasValue() {
return immutable.Some(db.nodeIdentity.Value().IntoRawIdentity().Public()), nil
}
return immutable.None[identity.PublicRawIdentity](), nil
}

func (db *db) GetIdentityToken(_ context.Context, audience immutable.Option[string]) ([]byte, error) {
if db.nodeIdentity.HasValue() {
return db.nodeIdentity.Value().NewToken(time.Hour*24, audience, immutable.None[string]())
}
return nil, nil

Check warning on line 353 in internal/db/db.go

View check run for this annotation

Codecov / codecov/patch

internal/db/db.go#L353

Added line #L353 was not covered by tests
}

// Initialize is called when a database is first run and creates all the db global meta data
// like Collection ID counters.
func (db *db) initialize(ctx context.Context) error {
Expand Down
76 changes: 76 additions & 0 deletions internal/db/permission/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,81 @@ func CheckAccessOfDocOnCollectionWithACP(
collection client.Collection,
permission acp.DPIPermission,
docID string,
) (bool, error) {
identityFunc := func() immutable.Option[acpIdentity.Identity] {
return identity
}
return CheckDocAccessWithIdentityFunc(
ctx,
identityFunc,
acpSystem,
collection,
permission,
docID,
)
}

// CheckAccessDocAccessWithDID handles the check, which tells us if access to the target
// document is valid, with respect to the permission type, and the specified collection.
//
// The identity is determined by a DID.
//
// This function should only be called if acp is available. As we have unrestricted
// access when acp is not available (acp turned off).
//
// Since we know acp is enabled we have these components to check in this function:
// (1) the request is permissioned (has an identity),
// (2) the collection is permissioned (has a policy),
//
// Unrestricted Access to document if:
// - (2) is false.
// - Document is public (unregistered), whether signatured request or not doesn't matter.
func CheckAccessDocAccessWithDID(
ctx context.Context,
did string,
acpSystem acp.ACP,
collection client.Collection,
permission acp.DPIPermission,
docID string,
) (bool, error) {
identityFunc := func() immutable.Option[acpIdentity.Identity] {
if did == "" {
return immutable.None[acpIdentity.Identity]()
}

Check warning on line 83 in internal/db/permission/check.go

View check run for this annotation

Codecov / codecov/patch

internal/db/permission/check.go#L82-L83

Added lines #L82 - L83 were not covered by tests
return immutable.Some[acpIdentity.Identity](acpIdentity.Identity{DID: did})
}
return CheckDocAccessWithIdentityFunc(
ctx,
identityFunc,
acpSystem,
collection,
permission,
docID,
)
}

// CheckDocAccessWithIdentityFunc handles the check, which tells us if access to the target
// document is valid, with respect to the permission type, and the specified collection.
//
// The identity is determined by an identity function.
//
// This function should only be called if acp is available. As we have unrestricted
// access when acp is not available (acp turned off).
//
// Since we know acp is enabled we have these components to check in this function:
// (1) the request is permissioned (has an identity),
// (2) the collection is permissioned (has a policy),
//
// Unrestricted Access to document if:
// - (2) is false.
// - Document is public (unregistered), whether signatured request or not doesn't matter.
func CheckDocAccessWithIdentityFunc(
ctx context.Context,
identityFunc func() immutable.Option[acpIdentity.Identity],
acpSystem acp.ACP,
collection client.Collection,
permission acp.DPIPermission,
docID string,
) (bool, error) {
// Even if acp exists, but there is no policy on the collection (unpermissioned collection)
// then we still have unrestricted access.
Expand Down Expand Up @@ -67,6 +142,7 @@ func CheckAccessOfDocOnCollectionWithACP(
return true, nil
}

identity := identityFunc()
var identityValue string
if !identity.HasValue() {
// We can't assume that there is no-access just because there is no identity even if the document
Expand Down
31 changes: 31 additions & 0 deletions net/acp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
// Copyright 2024 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package net

import (
"context"

"github.com/sourcenetwork/immutable"

"github.com/sourcenetwork/defradb/acp"
"github.com/sourcenetwork/defradb/acp/identity"
"github.com/sourcenetwork/defradb/client"
)

type ACP interface {
acp.ACP
// GetCollections returns the list of collections according to the given options.
GetCollections(ctx context.Context, opts client.CollectionFetchOptions) ([]client.Collection, error)
// GetIndentityToken returns an identity token for the given audience.
GetIdentityToken(ctx context.Context, audience immutable.Option[string]) ([]byte, error)
// GetNodeIdentity returns the node's public raw identity.
GetNodeIdentity(ctx context.Context) (immutable.Option[identity.PublicRawIdentity], error)
}
23 changes: 23 additions & 0 deletions net/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,26 @@ func (s *server) pushLog(evt event.Update, pid peer.ID) (err error) {
}
return nil
}

// getIdentity creates a getIdentity request and sends it to another node
func (s *server) getIdentity(ctx context.Context, pid peer.ID) (getIdentityReply, error) {
client, err := s.dial(pid) // grpc dial over P2P stream
if err != nil {
return getIdentityReply{}, NewErrPushLog(err)
}

Check warning on line 80 in net/client.go

View check run for this annotation

Codecov / codecov/patch

net/client.go#L79-L80

Added lines #L79 - L80 were not covered by tests

ctx, cancel := context.WithTimeout(ctx, PushTimeout)
defer cancel()

req := getIdentityRequest{
PeerID: s.peer.host.ID().String(),
}
resp := getIdentityReply{}
if err := client.Invoke(ctx, serviceGetIdentityName, req, &resp); err != nil {
return getIdentityReply{}, NewErrFailedToGetIdentity(
err,
errors.NewKV("PeerID", pid),
)
}

Check warning on line 94 in net/client.go

View check run for this annotation

Codecov / codecov/patch

net/client.go#L90-L94

Added lines #L90 - L94 were not covered by tests
return resp, nil
}
7 changes: 7 additions & 0 deletions net/dialer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"
"testing"

"github.com/sourcenetwork/immutable"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand All @@ -29,6 +30,7 @@ func TestDial_WithConnectedPeer_NoError(t *testing.T) {
db1.Blockstore(),
db1.Encstore(),
db1.Events(),
immutable.None[ACP](),
WithListenAddresses("/ip4/127.0.0.1/tcp/0"),
)
assert.NoError(t, err)
Expand All @@ -38,6 +40,7 @@ func TestDial_WithConnectedPeer_NoError(t *testing.T) {
db2.Blockstore(),
db1.Encstore(),
db2.Events(),
immutable.None[ACP](),
WithListenAddresses("/ip4/127.0.0.1/tcp/0"),
)
assert.NoError(t, err)
Expand All @@ -61,6 +64,7 @@ func TestDial_WithConnectedPeerAndSecondConnection_NoError(t *testing.T) {
db1.Blockstore(),
db1.Encstore(),
db1.Events(),
immutable.None[ACP](),
WithListenAddresses("/ip4/127.0.0.1/tcp/0"),
)
assert.NoError(t, err)
Expand All @@ -70,6 +74,7 @@ func TestDial_WithConnectedPeerAndSecondConnection_NoError(t *testing.T) {
db2.Blockstore(),
db1.Encstore(),
db2.Events(),
immutable.None[ACP](),
WithListenAddresses("/ip4/127.0.0.1/tcp/0"),
)
assert.NoError(t, err)
Expand All @@ -96,6 +101,7 @@ func TestDial_WithConnectedPeerAndSecondConnectionWithConnectionShutdown_Closing
db1.Blockstore(),
db1.Encstore(),
db1.Events(),
immutable.None[ACP](),
WithListenAddresses("/ip4/127.0.0.1/tcp/0"),
)
assert.NoError(t, err)
Expand All @@ -105,6 +111,7 @@ func TestDial_WithConnectedPeerAndSecondConnectionWithConnectionShutdown_Closing
db2.Blockstore(),
db1.Encstore(),
db2.Events(),
immutable.None[ACP](),
WithListenAddresses("/ip4/127.0.0.1/tcp/0"),
)
assert.NoError(t, err)
Expand Down
5 changes: 5 additions & 0 deletions net/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ const (
errRequestingEncryptionKeys = "failed to request encryption keys with %v"
errTopicAlreadyExist = "topic with name \"%s\" already exists"
errTopicDoesNotExist = "topic with name \"%s\" does not exists"
errFailedToGetIdentity = "failed to get identity"
)

var (
Expand Down Expand Up @@ -59,3 +60,7 @@ func NewErrTopicAlreadyExist(topic string) error {
func NewErrTopicDoesNotExist(topic string) error {
return errors.New(fmt.Sprintf(errTopicDoesNotExist, topic))
}

func NewErrFailedToGetIdentity(inner error, kv ...errors.KV) error {
return errors.Wrap(errFailedToGetIdentity, inner, kv...)

Check warning on line 65 in net/errors.go

View check run for this annotation

Codecov / codecov/patch

net/errors.go#L64-L65

Added lines #L64 - L65 were not covered by tests
}
40 changes: 40 additions & 0 deletions net/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const (
serviceGetLogName = "/" + grpcServiceName + "/GetLog"
servicePushLogName = "/" + grpcServiceName + "/PushLog"
serviceGetHeadLogName = "/" + grpcServiceName + "/GetHeadLog"
serviceGetIdentityName = "/" + grpcServiceName + "/GetIdentity"
)

type getDocGraphRequest struct{}
Expand Down Expand Up @@ -52,6 +53,17 @@ type pushLogRequest struct {

type pushLogReply struct{}

type getIdentityRequest struct {
// PeerID is the ID of the requesting peer.
// It will be used as the audience for the identity token.
PeerID string
}

type getIdentityReply struct {
// IdentityToken is the token that can be used to authenticate the peer.
IdentityToken []byte
}

type serviceServer interface {
// GetDocGraph from this peer.
GetDocGraph(context.Context, *getDocGraphRequest) (*getDocGraphReply, error)
Expand All @@ -63,6 +75,30 @@ type serviceServer interface {
PushLog(context.Context, *pushLogRequest) (*pushLogReply, error)
// GetHeadLog from this peer
GetHeadLog(context.Context, *getHeadLogRequest) (*getHeadLogReply, error)
GetIdentity(context.Context, *getIdentityRequest) (*getIdentityReply, error)
}

func getIdentityHandler(
srv any,
ctx context.Context,
dec func(any) error,
interceptor grpc.UnaryServerInterceptor,
) (any, error) {
in := new(getIdentityRequest)
if err := dec(in); err != nil {
return nil, err
}

Check warning on line 90 in net/grpc.go

View check run for this annotation

Codecov / codecov/patch

net/grpc.go#L89-L90

Added lines #L89 - L90 were not covered by tests
if interceptor == nil {
return srv.(serviceServer).GetIdentity(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: serviceGetIdentityName,
}
handler := func(ctx context.Context, req any) (any, error) {
return srv.(serviceServer).GetIdentity(ctx, req.(*getIdentityRequest))
}
return interceptor(ctx, in, info, handler)

Check warning on line 101 in net/grpc.go

View check run for this annotation

Codecov / codecov/patch

net/grpc.go#L94-L101

Added lines #L94 - L101 were not covered by tests
}

func pushLogHandler(
Expand Down Expand Up @@ -97,6 +133,10 @@ func registerServiceServer(s grpc.ServiceRegistrar, srv serviceServer) {
MethodName: "PushLog",
Handler: pushLogHandler,
},
{
MethodName: "GetIdentity",
Handler: getIdentityHandler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "defradb.cbor",
Expand Down
Loading

0 comments on commit 25d14f1

Please sign in to comment.