Skip to content

Commit

Permalink
Add Blockchain Transaction Event (#1787)
Browse files Browse the repository at this point in the history
What's going on?
A user can upload any transaction receipt to their user stream, the
transaction must
- exist on chain 
- have at least 1 confirmation
- blocknumber, log count, log bytes, topic count, topic bytes, log data
bytes, and the to and from addresses must match
- the receipt must be from a wallet that is linked to the user's root
key
- not already uploaded

If a user uploads a Tip, the node will
- verify metadata about the tip (still todo)
- forward a Tip_Received event to the recipient's user stream
- forward the original tip transaction to the channel stream

This will allow us to render tips in real time in the app, and receive
push notifications just like we do for emojis.

New types
- blockchain transaction
- blockchain transaction receipt
- blockchain transaction kind
- two user payloads
- one member payload (generic across all streams)

New tech
- i updated the can add event return values to include a "verifyReceipt"
param
- i added a new chain auth type that just checks to see if a wallet is
linked to the principal.


Still todo in a follow up
- tip receipt validation. I need to parse out the values from the
receipt logs and make sure they match what's in the event metadata
- user and channel stream message tests. I have stubs written, but not
there yet.
- snapshotting relevant data
  • Loading branch information
texuf authored Dec 18, 2024
1 parent a5ee5a7 commit ef37f1a
Show file tree
Hide file tree
Showing 21 changed files with 3,401 additions and 1,620 deletions.
156 changes: 152 additions & 4 deletions core/node/auth/auth_impl.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package auth

import (
"bytes"
"context"
"fmt"
"strings"
Expand All @@ -11,7 +12,7 @@ import (

"github.com/river-build/river/core/config"
"github.com/river-build/river/core/contracts/base"
"github.com/river-build/river/core/contracts/types"
types "github.com/river-build/river/core/contracts/types"
. "github.com/river-build/river/core/node/base"
"github.com/river-build/river/core/node/crypto"
"github.com/river-build/river/core/node/dlog"
Expand All @@ -21,6 +22,7 @@ import (
"github.com/river-build/river/core/xchain/entitlement"

"github.com/ethereum/go-ethereum/common"
ethTypes "github.com/ethereum/go-ethereum/core/types"
)

type ChainAuth interface {
Expand All @@ -45,6 +47,7 @@ type ChainAuth interface {
linked wallets are entitled to the channel, the permission check passes. Otherwise, it fails.
*/
IsEntitled(ctx context.Context, cfg *config.Config, args *ChainAuthArgs) (bool, error)
VerifyReceipt(ctx context.Context, cfg *config.Config, receipt *BlockchainTransactionReceipt) (bool, error)
}

var everyone = common.HexToAddress("0x1") // This represents an Ethereum address of "0x1"
Expand Down Expand Up @@ -81,6 +84,17 @@ func NewChainAuthArgsForIsSpaceMember(spaceId shared.StreamId, userId string) *C
}
}

func NewChainAuthArgsForIsWalletLinked(
userId []byte,
walletAddress []byte,
) *ChainAuthArgs {
return &ChainAuthArgs{
kind: chainAuthKindIsWalletLinked,
principal: common.BytesToAddress(userId),
walletAddress: common.BytesToAddress(walletAddress),
}
}

type chainAuthKind int

const (
Expand All @@ -89,6 +103,7 @@ const (
chainAuthKindSpaceEnabled
chainAuthKindChannelEnabled
chainAuthKindIsSpaceMember
chainAuthKindIsWalletLinked
)

type ChainAuthArgs struct {
Expand All @@ -98,6 +113,7 @@ type ChainAuthArgs struct {
principal common.Address
permission Permission
linkedWallets string // a serialized list of linked wallets to comply with the cache key constraints
walletAddress common.Address
}

func (args *ChainAuthArgs) Principal() common.Address {
Expand All @@ -106,13 +122,14 @@ func (args *ChainAuthArgs) Principal() common.Address {

func (args *ChainAuthArgs) String() string {
return fmt.Sprintf(
"ChainAuthArgs{kind: %d, spaceId: %s, channelId: %s, principal: %s, permission: %s, linkedWallets: %s}",
"ChainAuthArgs{kind: %d, spaceId: %s, channelId: %s, principal: %s, permission: %s, linkedWallets: %s, walletAddress: %s}",
args.kind,
args.spaceId,
args.channelId,
args.principal.Hex(),
args.permission,
args.linkedWallets,
args.walletAddress.Hex(),
)
}

Expand Down Expand Up @@ -268,6 +285,124 @@ func NewChainAuth(
}, nil
}

func (ca *chainAuth) VerifyReceipt(
ctx context.Context,
cfg *config.Config,
userReceipt *BlockchainTransactionReceipt,
) (bool, error) {
client, err := ca.evaluator.GetClient(userReceipt.GetChainId())
if err != nil {
return false, err
}
txHash := common.BytesToHash(userReceipt.GetTransactionHash())
chainReceipt, err := client.TransactionReceipt(ctx, txHash)
if err != nil {
return false, err
}
// Check if the block number matches:
if chainReceipt.BlockNumber.Uint64() != userReceipt.BlockNumber {
return false, RiverError(Err_PERMISSION_DENIED, "Block number mismatch", "got",
chainReceipt.BlockNumber.Uint64(), "user uploaded", userReceipt.BlockNumber)
}

// Check logs count and match the event log data
if len(chainReceipt.Logs) != len(userReceipt.Logs) {
return false, RiverError(Err_PERMISSION_DENIED, "Log count mismatch: chain:",
len(chainReceipt.Logs), "uploaded:", len(userReceipt.Logs))
}

// For each log, check address, topics, data
for i, chainLog := range chainReceipt.Logs {
uploadedLog := userReceipt.Logs[i]
if !bytes.Equal(chainLog.Address[:], uploadedLog.Address) {
return false, RiverError(
Err_PERMISSION_DENIED,
"Log address mismatch:",
i,
"address:",
chainLog.Address.Hex(),
"uploaded:",
uploadedLog.Address,
)
}

if len(chainLog.Topics) != len(uploadedLog.Topics) {
return false, RiverError(Err_PERMISSION_DENIED, "Log topics count mismatch", i)
}

for j, topic := range chainLog.Topics {
if !bytes.Equal(topic[:], uploadedLog.Topics[j]) {
return false, RiverError(Err_PERMISSION_DENIED, "Log topic mismatch",
i, "topic index: ", j, "chain: ", topic.Hex(), "uploaded: ", uploadedLog.Topics[j])
}
}

if !bytes.Equal(chainLog.Data, uploadedLog.Data) {
return false, RiverError(Err_PERMISSION_DENIED, "Log data mismatch", i)
}
}

// get the transaction
tx, isPending, err := client.TransactionByHash(ctx, txHash)
if err != nil {
return false, err
}
if isPending {
return false, RiverError(Err_PERMISSION_DENIED, "Transaction is pending", "txHash", txHash.Hex())
}

// check the to address
if !bytes.Equal(tx.To()[:], userReceipt.GetTo()) {
return false, RiverError(
Err_PERMISSION_DENIED,
"To address mismatch",
"chain",
tx.To().Hex(),
"uploaded",
userReceipt.To,
)
}

// check the from addresses
signer := ethTypes.LatestSignerForChainID(tx.ChainId())
sender, err := signer.Sender(tx)
if err != nil {
return false, err
}
if !bytes.Equal(sender.Bytes(), userReceipt.GetFrom()) {
return false, RiverError(
Err_PERMISSION_DENIED,
"From address mismatch",
"chain",
sender.Hex(),
"uploaded",
userReceipt.From,
)
}

// If we reach here, the logs match exactly.

// 3) Check the number of confirmations
latestBlockNumber, err := ca.blockchain.Client.BlockNumber(ctx)
if err != nil {
return false, RiverError(Err_PERMISSION_DENIED, "Failed to get latest block number: %v", err)
}

confirmations := latestBlockNumber - chainReceipt.BlockNumber.Uint64()
if confirmations < 1 {
return false, RiverError(
Err_PERMISSION_DENIED,
"Transaction has 0 confirmations.",
"latestBlockNumber",
latestBlockNumber,
"uploaded:",
chainReceipt.BlockNumber.Uint64(),
)
}

return true, nil
}

func (ca *chainAuth) IsEntitled(ctx context.Context, cfg *config.Config, args *ChainAuthArgs) (bool, error) {
// TODO: counter for cache hits here?
result, _, err := ca.entitlementCache.executeUsingCache(
Expand Down Expand Up @@ -723,10 +858,11 @@ func (ca *chainAuth) getLinkedWallets(

userCacheKey := newArgsForLinkedWallets(args.principal)
// We want fresh linked wallets when evaluating space and channel joins, key solicitations,
// and user scrubs, all of which request the Read permission.
// user scrubs, and checking if a wallet is linked, all of which request the Read permission.
// Note: space joins seem to request Read on the space, but they should probably actually
// be sending chain auth args with kind set to chainAuthKindIsSpaceMember.
if args.permission == PermissionRead || args.kind == chainAuthKindIsSpaceMember {
if args.permission == PermissionRead || args.kind == chainAuthKindIsSpaceMember ||
args.kind == chainAuthKindIsWalletLinked {
ca.linkedWalletCache.bust(userCacheKey)
ca.linkedWalletCacheBust.Inc()
}
Expand Down Expand Up @@ -834,6 +970,8 @@ func (ca *chainAuth) checkStreamIsEnabled(
return false, err
}
return isEnabled, nil
} else if args.kind == chainAuthKindIsWalletLinked {
return true, nil
} else {
return false, RiverError(Err_INTERNAL, "Unknown chain auth kind").Func("checkStreamIsEnabled")
}
Expand Down Expand Up @@ -868,6 +1006,16 @@ func (ca *chainAuth) checkEntitlement(
return nil, err
}

// handle checking if the user is linked to a specific wallet
if args.kind == chainAuthKindIsWalletLinked {
for _, wallet := range wallets {
if wallet == args.walletAddress {
return boolCacheResult(true), nil
}
}
return boolCacheResult(false), nil
}

args = args.withLinkedWallets(wallets)

isMemberCtx, isMemberCancel := context.WithCancel(ctx)
Expand Down
9 changes: 9 additions & 0 deletions core/node/auth/fake_auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"github.com/river-build/river/core/config"
"github.com/river-build/river/core/node/protocol"
)

// This checkers always returns true, used for some testing scenarios.
Expand All @@ -18,3 +19,11 @@ var _ ChainAuth = (*fakeChainAuth)(nil)
func (a *fakeChainAuth) IsEntitled(ctx context.Context, cfg *config.Config, args *ChainAuthArgs) (bool, error) {
return true, nil
}

func (a *fakeChainAuth) VerifyReceipt(
ctx context.Context,
cfg *config.Config,
receipt *protocol.BlockchainTransactionReceipt,
) (bool, error) {
return true, nil
}
34 changes: 34 additions & 0 deletions core/node/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,22 @@ func Make_MemberPayload_DisplayName(displayName *EncryptedData) *StreamEvent_Mem
}
}

func Make_MemberPayload_BlockchainTransaction(
fromUserAddress []byte,
transaction *BlockchainTransaction,
) *StreamEvent_MemberPayload {
return &StreamEvent_MemberPayload{
MemberPayload: &MemberPayload{
Content: &MemberPayload_MemberBlockchainTransaction_{
MemberBlockchainTransaction: &MemberPayload_MemberBlockchainTransaction{
Transaction: transaction,
FromUserAddress: fromUserAddress,
},
},
},
}
}

func Make_DmChannelPayload_Inception(
streamId StreamId,
firstPartyAddress common.Address,
Expand Down Expand Up @@ -474,6 +490,24 @@ func Make_UserPayload_Membership(
}
}

func Make_UserPayload_ReceivedBlockchainTransaction(
kind ReceivedBlockchainTransactionKind,
fromUserAddress []byte,
transaction *BlockchainTransaction,
) *StreamEvent_UserPayload {
return &StreamEvent_UserPayload{
UserPayload: &UserPayload{
Content: &UserPayload_ReceivedBlockchainTransaction_{
ReceivedBlockchainTransaction: &UserPayload_ReceivedBlockchainTransaction{
Kind: kind,
Transaction: transaction,
FromUserAddress: fromUserAddress,
},
},
},
}
}

func Make_UserSettingsPayload_Inception(streamId StreamId, settings *StreamSettings) *StreamEvent_UserSettingsPayload {
return &StreamEvent_UserSettingsPayload{
UserSettingsPayload: &UserSettingsPayload{
Expand Down
8 changes: 8 additions & 0 deletions core/node/events/stream_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,14 @@ func (r *streamViewImpl) blockWithNum(mininblockNum int64) (*MiniblockInfo, erro
return r.blocks[index], nil
}

// iterate over events starting at startBlock including events in the minipool
func (r *streamViewImpl) ForEachEvent(
startBlock int,
op func(e *ParsedEvent, minibockNum int64, eventNum int64) (bool, error),
) error {
return r.forEachEvent(startBlock, op)
}

// iterate over events starting at startBlock including events in the minipool
func (r *streamViewImpl) forEachEvent(
startBlock int,
Expand Down
1 change: 1 addition & 0 deletions core/node/events/stream_viewstate_joinable.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type JoinableStreamView interface {
GetMembership(userAddress []byte) (protocol.MembershipOp, error)
GetKeySolicitations(userAddress []byte) ([]*protocol.MemberPayload_KeySolicitation, error)
GetPinnedMessages() ([]*protocol.MemberPayload_SnappedPin, error)
HasTransaction(receipt *protocol.BlockchainTransactionReceipt) (bool, error) // defined in userStreamView
}

var _ JoinableStreamView = (*streamViewImpl)(nil)
Expand Down
39 changes: 39 additions & 0 deletions core/node/events/stream_viewstate_user.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package events

import (
"bytes"

. "github.com/river-build/river/core/node/base"
. "github.com/river-build/river/core/node/protocol"
"github.com/river-build/river/core/node/shared"
Expand All @@ -10,6 +12,7 @@ type UserStreamView interface {
GetUserInception() (*UserPayload_Inception, error)
GetUserMembership(streamId shared.StreamId) (MembershipOp, error)
IsMemberOf(streamId shared.StreamId) bool
HasTransaction(receipt *BlockchainTransactionReceipt) (bool, error)
}

var _ UserStreamView = (*streamViewImpl)(nil)
Expand Down Expand Up @@ -82,3 +85,39 @@ func (r *streamViewImpl) GetUserMembership(streamId shared.StreamId) (Membership
err = r.forEachEvent(r.snapshotIndex+1, updateFn)
return retValue, err
}

// handles transactions for user streams and member payload of any stream
func (r *streamViewImpl) HasTransaction(receipt *BlockchainTransactionReceipt) (bool, error) {
retValue := false
updateFn := func(e *ParsedEvent, minibockNum int64, eventNum int64) (bool, error) {
switch payload := e.Event.Payload.(type) {
case *StreamEvent_UserPayload:
switch payload := payload.UserPayload.Content.(type) {
case *UserPayload_BlockchainTransaction:
if bytes.Equal(payload.BlockchainTransaction.Receipt.TransactionHash, receipt.TransactionHash) {
retValue = true
return false, nil
}
case *UserPayload_ReceivedBlockchainTransaction_:
if bytes.Equal(payload.ReceivedBlockchainTransaction.Transaction.Receipt.TransactionHash, receipt.TransactionHash) {
retValue = true
return false, nil
}
}

case *StreamEvent_MemberPayload:
switch payload := payload.MemberPayload.Content.(type) {
case *MemberPayload_MemberBlockchainTransaction_:
if bytes.Equal(payload.MemberBlockchainTransaction.Transaction.Receipt.TransactionHash, receipt.TransactionHash) {
retValue = true
return false, nil
}
}
}

return true, nil
}

err := r.forEachEvent(0, updateFn)
return retValue, err
}
Loading

0 comments on commit ef37f1a

Please sign in to comment.