Skip to content

Commit

Permalink
Merge remote-tracking branch 'pokt/main' into issues/13/feat/miner
Browse files Browse the repository at this point in the history
* pokt/main:
  [Proxy] chore: Use depinject for relayerProxy (#173)
  [Sessions] chore: Use depinject for sessions mgr construction (#175)
  • Loading branch information
bryanchriswhite committed Nov 10, 2023
2 parents 9c75b2c + e49434e commit 286e1b2
Show file tree
Hide file tree
Showing 11 changed files with 137 additions and 51 deletions.
4 changes: 4 additions & 0 deletions pkg/relayer/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ type RelayerProxy interface {
SignRelayResponse(relayResponse *types.RelayResponse) error
}

type RelayerProxyOption func(RelayerProxy)

// RelayServer is the interface of the advertised relay servers provided by the RelayerProxy.
type RelayServer interface {
// Start starts the service server and returns an error if it fails.
Expand Down Expand Up @@ -92,6 +94,8 @@ type RelayerSessionsManager interface {
EnsureSessionTree(sessionHeader *sessiontypes.SessionHeader) (SessionTree, error)
}

type RelayerSessionsManagerOption func(RelayerSessionsManager)

// SessionTree is an interface that wraps an SMST (Sparse Merkle State Tree) and its corresponding session.
type SessionTree interface {
// GetSessionHeader returns the header of the session corresponding to the SMST.
Expand Down
12 changes: 7 additions & 5 deletions pkg/relayer/proxy/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package proxy
import sdkerrors "cosmossdk.io/errors"

var (
codespace = "relayer/proxy"
ErrUnsupportedRPCType = sdkerrors.Register(codespace, 1, "unsupported rpc type")
ErrInvalidRelayRequestSignature = sdkerrors.Register(codespace, 2, "invalid relay request signature")
ErrInvalidSession = sdkerrors.Register(codespace, 3, "invalid session")
ErrInvalidSupplier = sdkerrors.Register(codespace, 4, "invalid supplier")
codespace = "relayer_proxy"
ErrRelayerProxyUnsupportedRPCType = sdkerrors.Register(codespace, 1, "unsupported relayer proxy rpc type")
ErrRelayerProxyInvalidRelayRequestSignature = sdkerrors.Register(codespace, 2, "invalid relay request signature")
ErrRelayerProxyInvalidSession = sdkerrors.Register(codespace, 3, "invalid session in relayer request")
ErrRelayerProxyInvalidSupplier = sdkerrors.Register(codespace, 4, "invalid relayer proxy supplier")
ErrRelayerProxyUndefinedSigningKeyName = sdkerrors.Register(codespace, 5, "undefined relayer proxy signing key name")
ErrRelayerProxyUndefinedProxiedServicesEndpoints = sdkerrors.Register(codespace, 6, "undefined proxied services endpoints for relayer proxy")
)
20 changes: 20 additions & 0 deletions pkg/relayer/proxy/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package proxy

import (
"github.com/pokt-network/poktroll/pkg/relayer"
)

// WithSigningKeyName sets the signing key name used by the relayer proxy to sign relay responses.
// It is used along with the keyring to get the supplier address and sign the relay responses.
func WithSigningKeyName(keyName string) relayer.RelayerProxyOption {
return func(relProxy relayer.RelayerProxy) {
relProxy.(*relayerProxy).signingKeyName = keyName
}
}

// WithProxiedServicesEndpoints sets the endpoints of the proxied services.
func WithProxiedServicesEndpoints(proxiedServicesEndpoints servicesEndpointsMap) relayer.RelayerProxyOption {
return func(relProxy relayer.RelayerProxy) {
relProxy.(*relayerProxy).proxiedServicesEndpoints = proxiedServicesEndpoints
}
}
71 changes: 48 additions & 23 deletions pkg/relayer/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"net/url"

"cosmossdk.io/depinject"
sdkclient "github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/crypto/keyring"
accounttypes "github.com/cosmos/cosmos-sdk/x/auth/types"
Expand Down Expand Up @@ -32,10 +33,10 @@ type (
// when the miner enters the claim/proof phase.
// TODO_TEST: Have tests for the relayer proxy.
type relayerProxy struct {
// keyName is the supplier's key name in the Cosmos's keybase. It is used along with the keyring to
// signingKeyName is the supplier's key name in the Cosmos's keybase. It is used along with the keyring to
// get the supplier address and sign the relay responses.
keyName string
keyring keyring.Keyring
signingKeyName string
keyring keyring.Keyring

// blocksClient is the client used to get the block at the latest height from the blockchain
// and be notified of new incoming blocks. It is used to update the current session data.
Expand Down Expand Up @@ -75,30 +76,40 @@ type relayerProxy struct {
supplierAddress string
}

// NewRelayerProxy creates a new relayer proxy with the given dependencies or returns
// an error if the dependencies fail to resolve or the options are invalid.
func NewRelayerProxy(
clientCtx sdkclient.Context,
keyName string,
keyring keyring.Keyring,
proxiedServicesEndpoints servicesEndpointsMap,
blockClient blocktypes.BlockClient,
) relayer.RelayerProxy {
accountQuerier := accounttypes.NewQueryClient(clientCtx)
supplierQuerier := suppliertypes.NewQueryClient(clientCtx)
sessionQuerier := sessiontypes.NewQueryClient(clientCtx)
deps depinject.Config,
opts ...relayer.RelayerProxyOption,
) (relayer.RelayerProxy, error) {
rp := &relayerProxy{}

if err := depinject.Inject(
deps,
&rp.clientCtx,
&rp.blockClient,
); err != nil {
return nil, err
}

servedRelays, servedRelaysProducer := channel.NewObservable[*types.Relay]()

return &relayerProxy{
blockClient: blockClient,
keyName: keyName,
keyring: keyring,
accountsQuerier: accountQuerier,
supplierQuerier: supplierQuerier,
sessionQuerier: sessionQuerier,
proxiedServicesEndpoints: proxiedServicesEndpoints,
servedRelays: servedRelays,
servedRelaysProducer: servedRelaysProducer,
clientCtx: clientCtx,
rp.servedRelays = servedRelays
rp.servedRelaysProducer = servedRelaysProducer
rp.accountsQuerier = accounttypes.NewQueryClient(rp.clientCtx)
rp.supplierQuerier = suppliertypes.NewQueryClient(rp.clientCtx)
rp.sessionQuerier = sessiontypes.NewQueryClient(rp.clientCtx)
rp.keyring = rp.clientCtx.Keyring

for _, opt := range opts {
opt(rp)
}

if err := rp.validateConfig(); err != nil {
return nil, err
}

return rp, nil
}

// Start concurrently starts all advertised relay servers and returns an error if any of them fails to start.
Expand Down Expand Up @@ -145,3 +156,17 @@ func (rp *relayerProxy) Stop(ctx context.Context) error {
func (rp *relayerProxy) ServedRelays() observable.Observable[*types.Relay] {
return rp.servedRelays
}

// validateConfig validates the relayer proxy's configuration options and returns an error if it is invalid.
// TODO_TEST: Add tests for validating these configurations.
func (rp *relayerProxy) validateConfig() error {
if rp.signingKeyName == "" {
return ErrRelayerProxyUndefinedSigningKeyName
}

if rp.proxiedServicesEndpoints == nil {
return ErrRelayerProxyUndefinedProxiedServicesEndpoints
}

return nil
}
2 changes: 1 addition & 1 deletion pkg/relayer/proxy/relay_signer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func (rp *relayerProxy) SignRelayResponse(relayResponse *types.RelayResponse) er
}

hash := crypto.Sha256(responseBz)
relayResponse.Meta.SupplierSignature, _, err = rp.keyring.Sign(rp.keyName, hash)
relayResponse.Meta.SupplierSignature, _, err = rp.keyring.Sign(rp.signingKeyName, hash)

return err
}
6 changes: 3 additions & 3 deletions pkg/relayer/proxy/relay_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (rp *relayerProxy) VerifyRelayRequest(
}

if !account.GetPubKey().VerifySignature(hash, relayRequest.Meta.Signature) {
return ErrInvalidRelayRequestSignature
return ErrRelayerProxyInvalidRelayRequestSignature
}

// Query for the current session to check if relayRequest sessionId matches the current session.
Expand All @@ -62,7 +62,7 @@ func (rp *relayerProxy) VerifyRelayRequest(
// matches the relayRequest sessionId.
// TODO_INVESTIGATE: Revisit the assumptions above at some point in the future, but good enough for now.
if session.SessionId != relayRequest.Meta.SessionHeader.SessionId {
return ErrInvalidSession
return ErrRelayerProxyInvalidSession
}

// Check if the relayRequest is allowed to be served by the relayer proxy.
Expand All @@ -72,5 +72,5 @@ func (rp *relayerProxy) VerifyRelayRequest(
}
}

return ErrInvalidSupplier
return ErrRelayerProxyInvalidSupplier
}
4 changes: 2 additions & 2 deletions pkg/relayer/proxy/server_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
// is responsible for listening for incoming relay requests and relaying them to the supported proxied service.
func (rp *relayerProxy) BuildProvidedServices(ctx context.Context) error {
// Get the supplier address from the keyring
supplierAddress, err := rp.keyring.Key(rp.keyName)
supplierAddress, err := rp.keyring.Key(rp.signingKeyName)
if err != nil {
return err
}
Expand Down Expand Up @@ -48,7 +48,7 @@ func (rp *relayerProxy) BuildProvidedServices(ctx context.Context) error {
rp,
)
default:
return ErrUnsupportedRPCType
return ErrRelayerProxyUnsupportedRPCType
}

serviceEndpoints = append(serviceEndpoints, server)
Expand Down
11 changes: 6 additions & 5 deletions pkg/relayer/session/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ package session
import sdkerrors "cosmossdk.io/errors"

var (
codespace = "relayer/session"
ErrSessionTreeClosed = sdkerrors.Register(codespace, 1, "session tree already closed")
ErrSessionTreeNotClosed = sdkerrors.Register(codespace, 2, "session tree not closed")
ErrSessionStorePathExists = sdkerrors.Register(codespace, 3, "session store path already exists")
ErrSessionTreeProofPathMismatch = sdkerrors.Register(codespace, 4, "session tree proof path mismatch")
codespace = "relayer_session"
ErrSessionTreeClosed = sdkerrors.Register(codespace, 1, "session tree already closed")
ErrSessionTreeNotClosed = sdkerrors.Register(codespace, 2, "session tree not closed")
ErrSessionTreeStorePathExists = sdkerrors.Register(codespace, 3, "session tree store path already exists")
ErrSessionTreeProofPathMismatch = sdkerrors.Register(codespace, 4, "session tree proof path mismatch")
ErrSessionTreeUndefinedStoresDirectory = sdkerrors.Register(codespace, 5, "session tree key-value store directory undefined for where they will be saved on disk")
)
13 changes: 13 additions & 0 deletions pkg/relayer/session/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package session

import (
"github.com/pokt-network/poktroll/pkg/relayer"
)

// WithStoresDirectory sets the path on disk where KVStore data files used to store
// SMST of work sessions are created.
func WithStoresDirectory(storesDirectory string) relayer.RelayerSessionsManagerOption {
return func(relSessionMgr relayer.RelayerSessionsManager) {
relSessionMgr.(*relayerSessionsManager).storesDirectory = storesDirectory
}
}
43 changes: 32 additions & 11 deletions pkg/relayer/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"log"
"sync"

"cosmossdk.io/depinject"
"github.com/pokt-network/poktroll/pkg/client"
"github.com/pokt-network/poktroll/pkg/observable"
"github.com/pokt-network/poktroll/pkg/observable/channel"
Expand All @@ -22,9 +23,6 @@ type relayerSessionsManager struct {
// sessionsToClaim notifies about sessions that are ready to be claimed.
sessionsToClaim observable.Observable[relayer.SessionTree]

// sessionsToClaimPublisher is the channel used to publish sessions to claim.
sessionsToClaimPublisher chan<- relayer.SessionTree

// sessionTrees is a map of block heights pointing to a map of SessionTrees
// indexed by their sessionId.
// The block height index is used to know when the sessions contained in the entry should be closed,
Expand All @@ -42,22 +40,35 @@ type relayerSessionsManager struct {
// NewRelayerSessions creates a new relayerSessions.
func NewRelayerSessions(
ctx context.Context,
storesDirectory string,
blockClient client.BlockClient,
) relayer.RelayerSessionsManager {
deps depinject.Config,
opts ...relayer.RelayerSessionsManagerOption,
) (relayer.RelayerSessionsManager, error) {
rs := &relayerSessionsManager{
sessionsTrees: make(sessionsTreesMap),
storesDirectory: storesDirectory,
blockClient: blockClient,
sessionsTrees: make(sessionsTreesMap),
}

if err := depinject.Inject(
deps,
&rs.blockClient,
); err != nil {
return nil, err
}

for _, opt := range opts {
opt(rs)
}

if err := rs.validateConfig(); err != nil {
return nil, err
}

rs.sessionsToClaim = channel.MapExpand[client.Block, relayer.SessionTree](
ctx,
blockClient.CommittedBlocksSequence(ctx),
rs.blockClient.CommittedBlocksSequence(ctx),
rs.mapBlockToSessionsToClaim,
)

return rs
return rs, nil
}

// SessionsToClaim returns an observable that notifies when sessions are ready to be claimed.
Expand Down Expand Up @@ -136,3 +147,13 @@ func (rs *relayerSessionsManager) removeFromRelayerSessions(sessionHeader *sessi
delete(rs.sessionsTrees, sessionHeader.SessionEndBlockHeight)
}
}

// validateConfig validates the relayerSessionsManager's configuration.
// TODO_TEST: Add unit tests to validate these configurations.
func (rp *relayerSessionsManager) validateConfig() error {
if rp.storesDirectory == "" {
return ErrSessionTreeUndefinedStoresDirectory
}

return nil
}
2 changes: 1 addition & 1 deletion pkg/relayer/session/sessiontree.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func NewSessionTree(

// Make sure storePath does not exist when creating a new SessionTree
if _, err := os.Stat(storePath); !os.IsNotExist(err) {
return nil, ErrSessionStorePathExists
return nil, ErrSessionTreeUndefinedStoresDirectory
}

treeStore, err := smt.NewKVStore(storePath)
Expand Down

0 comments on commit 286e1b2

Please sign in to comment.