diff --git a/Makefile b/Makefile index 96407cbf6..43eaf02e5 100644 --- a/Makefile +++ b/Makefile @@ -411,6 +411,10 @@ test_persistence: ## Run all go unit tests in the Persistence module test_persistence_state_hash: ## Run all go unit tests in the Persistence module related to the state hash go test ${VERBOSE_TEST} -count=1 -tags=test -run TestStateHash ./persistence/... +.PHONY: test_servicer_relay +test_servicer_relay: ## Run all go unit tests related to servicer relays + go test ${VERBOSE_TEST} -count=1 -tags=test ./utility/servicer -run TestRelay + .PHONY: test_p2p test_p2p: ## Run all p2p related tests go test ${VERBOSE_TEST} -count=1 -tags=test ./p2p/... diff --git a/persistence/docs/CHANGELOG.md b/persistence/docs/CHANGELOG.md index 9ce5f0102..1bdae0d55 100644 --- a/persistence/docs/CHANGELOG.md +++ b/persistence/docs/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.0.0.60] - 2023-06-26 + +- Add place-holder for local context and servicer token usage support methods + ## [0.0.0.59] - 2023-06-14 - Refactors the persistence treeStore to be an IntegratableModule diff --git a/persistence/local/module.go b/persistence/local/module.go new file mode 100644 index 000000000..ced14fbcc --- /dev/null +++ b/persistence/local/module.go @@ -0,0 +1,80 @@ +package local + +import ( + "math/big" + + "github.com/pokt-network/pocket/logger" + coreTypes "github.com/pokt-network/pocket/shared/core/types" + "github.com/pokt-network/pocket/shared/modules" + "github.com/pokt-network/pocket/shared/modules/base_modules" +) + +const ( + LocalModuleName = "local" +) + +var _ modules.PersistenceLocalContext = &persistenceLocalContext{} + +type persistenceLocalContext struct { + base_modules.IntegratableModule + + logger *modules.Logger + databasePath string +} + +func WithLocalDatabasePath(databasePath string) modules.ModuleOption { + return func(m modules.InitializableModule) { + if plc, ok := m.(*persistenceLocalContext); ok { + plc.databasePath = databasePath + } + } +} + +func CreateLocalContext(bus modules.Bus, options ...modules.ModuleOption) (modules.PersistenceLocalContext, error) { + m, err := new(persistenceLocalContext).Create(bus, options...) + if err != nil { + return nil, err + } + return m.(modules.PersistenceLocalContext), nil +} + +func (*persistenceLocalContext) Create(bus modules.Bus, options ...modules.ModuleOption) (modules.Module, error) { + m := &persistenceLocalContext{} + + for _, option := range options { + option(m) + } + + bus.RegisterModule(m) + + m.logger = logger.Global.CreateLoggerForModule(m.GetModuleName()) + + return m, nil +} + +func (m *persistenceLocalContext) GetModuleName() string { + return LocalModuleName +} + +// INCOMPLETE(#826): implement this +func (m *persistenceLocalContext) Start() error { + return nil +} + +// INCOMPLETE(#826): implement this +func (m *persistenceLocalContext) Stop() error { + return nil +} + +// INCOMPLETE(#826): implement this +// OPTIMIZE: both the relay and the response can be large structures: we may need to truncate the stored values +// StoreServicedRelay implements the PersistenceLocalContext interface +func (local *persistenceLocalContext) StoreServicedRelay(session *coreTypes.Session, relayDigest, relayReqResBytes []byte) error { + return nil +} + +// INCOMPLETE(#826): implement this +// GetSessionTokensUsed implements the PersistenceLocalContext interface +func (local *persistenceLocalContext) GetSessionTokensUsed(*coreTypes.Session) (*big.Int, error) { + return nil, nil +} diff --git a/persistence/module.go b/persistence/module.go index cd943394f..31727796a 100644 --- a/persistence/module.go +++ b/persistence/module.go @@ -9,6 +9,7 @@ import ( "github.com/pokt-network/pocket/logger" "github.com/pokt-network/pocket/persistence/blockstore" "github.com/pokt-network/pocket/persistence/indexer" + "github.com/pokt-network/pocket/persistence/local" "github.com/pokt-network/pocket/persistence/trees" "github.com/pokt-network/pocket/runtime/configs" "github.com/pokt-network/pocket/runtime/genesis" @@ -249,6 +250,12 @@ func (m *persistenceModule) NewWriteContext() modules.PersistenceRWContext { return m.writeContext } +// INCOMPLETE(#826): implement this +// GetLocalContext returns a new local context for storing off-chain, i.e. node-specific, data. +func (m *persistenceModule) GetLocalContext() (modules.PersistenceLocalContext, error) { + return local.CreateLocalContext(m.GetBus()) +} + // HACK(olshansky): Simplify and externalize the logic for whether genesis should be populated and // // move the if logic out of this file. diff --git a/persistence/servicer.go b/persistence/servicer.go index 63d00755d..e05a0ca4b 100644 --- a/persistence/servicer.go +++ b/persistence/servicer.go @@ -2,6 +2,7 @@ package persistence import ( "encoding/hex" + "math/big" "github.com/pokt-network/pocket/persistence/types" coreTypes "github.com/pokt-network/pocket/shared/core/types" @@ -79,3 +80,15 @@ func (p *PostgresContext) SetServicerPauseHeight(address []byte, height int64) e func (p *PostgresContext) GetServicerOutputAddress(operator []byte, height int64) (output []byte, err error) { return p.GetActorOutputAddress(types.ServicerActor, operator, height) } + +// INCOMPLETE: implement this +// DISCUSS: both the relay and the response can be large structures: we may need to truncate the stored values +func (p *PostgresContext) RecordRelayService(applicationAddress string, key []byte, relay *coreTypes.Relay, response *coreTypes.RelayResponse) error { + return nil +} + +// INCOMPLETE: implement this +// GetServicerTokenUsage returns the number of tokens used by the servicer in the current session, i.e. for the application associated with the session +func (p *PostgresContext) GetServicerTokenUsage(session *coreTypes.Session) (*big.Int, error) { + return nil, nil +} diff --git a/rpc/doc/CHANGELOG.md b/rpc/doc/CHANGELOG.md index bb2aaf381..e9883b76f 100644 --- a/rpc/doc/CHANGELOG.md +++ b/rpc/doc/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.0.0.24] - 2023-06-21 + +- Update handlers to use the new relay payload types + ## [0.0.0.23] - 2023-06-19 - Remove AAT reference from handlers diff --git a/rpc/v1/openapi.yaml b/rpc/v1/openapi.yaml index 736397321..eb36f0c76 100644 --- a/rpc/v1/openapi.yaml +++ b/rpc/v1/openapi.yaml @@ -1696,11 +1696,12 @@ components: Payload: type: object required: - - method - jsonrpc + - method properties: id: type: string + format: byte jsonrpc: type: string method: diff --git a/runtime/configs/config.go b/runtime/configs/config.go index 3e387a8c8..6e428652d 100644 --- a/runtime/configs/config.go +++ b/runtime/configs/config.go @@ -157,6 +157,7 @@ func NewDefaultConfig(options ...func(*Config)) *Config { VaultMountPath: defaults.DefaultKeybaseVaultMountPath, }, Validator: &ValidatorConfig{}, + // INCOMPLETE(#858): use defaultServicerConfig once the default configuration issue is resolved, i.e. once configuring fisherman disables default servicer Servicer: &ServicerConfig{}, Fisherman: &FishermanConfig{}, IBC: &IBCConfig{ @@ -214,3 +215,23 @@ func CreateTempConfig(cfg *Config) (*Config, error) { return ParseConfig(tmpfile.Name()), nil } + +// INCOMPLETE(#858): enable default servicer config once the default config is adjusted based on user-defined config +// nolint:unused // Use the servicer default config once #858 is resolved: see above description +func defaultServicerConfig() *ServicerConfig { + return &ServicerConfig{ + Enabled: true, + RelayMiningVolumeAccuracy: 0.2, + Services: map[string]*ServiceConfig{ + // TODO(#831): Design how Chain/Service IDs should be described/defined. + "POKT-LocalNet": { + Url: "http://localhost", + TimeoutMsec: 5000, + BasicAuth: &BasicAuth{ + UserName: "user", + Password: "password", + }, + }, + }, + } +} diff --git a/runtime/configs/proto/persistence_config.proto b/runtime/configs/proto/persistence_config.proto index 2b10daf74..afd3d9f6b 100644 --- a/runtime/configs/proto/persistence_config.proto +++ b/runtime/configs/proto/persistence_config.proto @@ -16,4 +16,6 @@ message PersistenceConfig { string max_conn_lifetime = 8; // See pkg.go.dev/time#ParseDuration for reference string max_conn_idle_time = 9; // See pkg.go.dev/time#ParseDuration for reference string health_check_period = 10; // See pkg.go.dev/time#ParseDuration for reference + // TODO: `local_database_path` may need to be expanded to multiple stores depending on how usage evolves + string local_database_path = 11; // The path used to store local, i.e. off-chain and node-specific, data. } diff --git a/runtime/configs/proto/servicer_config.proto b/runtime/configs/proto/servicer_config.proto index 04cf41f22..0a3bc65bf 100644 --- a/runtime/configs/proto/servicer_config.proto +++ b/runtime/configs/proto/servicer_config.proto @@ -5,11 +5,34 @@ package configs; option go_package = "github.com/pokt-network/pocket/runtime/configs"; +// TODO: Reevaluate whether each utility actor should contain address/pubKey configs or if it should be shared // ServicerConfig defines the configuration for the node acting as a servicer. Servicers earn rewards for providing Web3 access over a function of volume and quality message ServicerConfig { // Enabled defines whether or not the node is a servicer. bool enabled = 1; string public_key = 2; string address = 3; - repeated string chains = 4; + map services = 4; + + // relay_mining_volume_accuracy is a parameter used to adjust the calculated number of service tokens for an application. + // It is introduced to minimize the chance of under-utilization of application's tokens, while removing the overhead of + // communication between servicers which would be necessary otherwise. + // See the following for more details: + // https://arxiv.org/abs/2305.10672 + double relay_mining_volume_accuracy = 5; +} + +// ServiceConfig holds configurations related to where/how the application/client can access the backing RPC service. It is analogous to "ChainConfig" in v0 but can support any RPC service. +message ServiceConfig { + string url = 1; // url specifies the URL at which the service is provided/requested + uint64 timeout_msec = 2; // timeout specifes the maximum amount of time, in milliseconds, to allow for the service to return a response + BasicAuth basic_auth = 3; // optional: basic authentication for HTTP services. +} + +// BasicAuth stores authentication data for HTTP services +// When supplied, this data will be used as specified by the HTTP basic authentication scheme. +message BasicAuth { + string user_name = 1; + // IMPROVE: enforce encryption of plaintext password + string password = 2; } diff --git a/runtime/docs/CHANGELOG.md b/runtime/docs/CHANGELOG.md index b36289c67..410e0664c 100644 --- a/runtime/docs/CHANGELOG.md +++ b/runtime/docs/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.0.0.44] - 2023-06-26 + +- Add a new ServiceConfig field to servicer config + ## [0.0.0.43] - 2023-06-14 - Rename package import types to coreTypes for consitency and clarity diff --git a/runtime/manager_test.go b/runtime/manager_test.go index 8c3ce360f..1eece78aa 100644 --- a/runtime/manager_test.go +++ b/runtime/manager_test.go @@ -1816,11 +1816,8 @@ func TestNewManagerFromReaders(t *testing.T) { Timeout: 30000, UseCors: false, }, - Keybase: defaultCfg.Keybase, - Servicer: &configs.ServicerConfig{ - Enabled: true, - Chains: []string{"0001"}, - }, + Keybase: defaultCfg.Keybase, + Servicer: &configs.ServicerConfig{Enabled: true}, Validator: &configs.ValidatorConfig{Enabled: true}, Fisherman: defaultCfg.Fisherman, IBC: &configs.IBCConfig{Enabled: true}, diff --git a/shared/CHANGELOG.md b/shared/CHANGELOG.md index 5e76d1d64..48fdf3845 100644 --- a/shared/CHANGELOG.md +++ b/shared/CHANGELOG.md @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.0.0.60] - 2023-06-21 + +- Add a LocalContext type and place-holders for servicer token usage support to persistence +- Add a TODO comment for supporting different relay payload types in core types + ## [0.0.0.59] - 2023-06-14 - Added validators for JSONRPC and REST payloads to shared types diff --git a/shared/core/types/proto/relay.proto b/shared/core/types/proto/relay.proto index d4671f1ab..6061a3a62 100644 --- a/shared/core/types/proto/relay.proto +++ b/shared/core/types/proto/relay.proto @@ -6,6 +6,7 @@ option go_package = "github.com/pokt-network/pocket/shared/core/types"; message Relay { RelayMeta meta = 1; + // Every different chain/service may have its own custom payload (e.g. HTTP, JSON, GRPC, non-chain services) oneof relay_payload { JSONRPCPayload json_rpc_payload = 2; RESTPayload rest_payload = 3; @@ -16,6 +17,7 @@ message Relay { } } +// INCOMPLETE: add REST relay payload fields message RESTPayload { string contents = 1; string http_path = 2; @@ -50,7 +52,8 @@ message JSONRPCPayload { message RelayMeta { int64 block_height = 1; string servicer_public_key = 2; - // TODO: Make Chain identifier type consistent in Session and Meta: use Identifiable for Chain in Session (or a string here to match the session) + // TODO(M5): Consider renaming `relay_chain` to `rpc_service` or something similar + // TODO: Make Chain/Service identifier type consistent in Session and Meta: use Identifiable for Chain/Service in Session (or a string here to match the session) Identifiable relay_chain = 3; Identifiable geo_zone = 4; string signature = 5; // TECHDEBT: Consolidate with `Signature` proto used elsewhere in the future @@ -73,3 +76,10 @@ message Identifiable { string id = 1; string name = 2; } + + +// RelayReqRes contains a relay request and its response, used for persistence of relay service evidence +message RelayReqRes { + Relay relay = 1; + RelayResponse response = 2; +} diff --git a/shared/core/types/proto/session.proto b/shared/core/types/proto/session.proto index 267fd4b65..e0e584f5e 100644 --- a/shared/core/types/proto/session.proto +++ b/shared/core/types/proto/session.proto @@ -11,7 +11,7 @@ import "actor.proto"; message Session { string id = 1; // a universally unique ID for the session int64 session_number = 2; // a monotonically increasing number representing the # on the chain - int64 session_height = 3; // the number of blocks (out of numBlocksPerSession) in this session + int64 session_height = 3; // the height at which the session starts int64 num_session_blocks = 4; // the number of blocks the session is valid from // CONSIDERATION: Should we add a `RelayChain` enum and use it across the board? // CONSIDERATION: Should a single session support multiple relay chains? diff --git a/shared/modules/persistence_module.go b/shared/modules/persistence_module.go index 3160ef391..0dbccfc6e 100644 --- a/shared/modules/persistence_module.go +++ b/shared/modules/persistence_module.go @@ -1,8 +1,10 @@ package modules -//go:generate mockgen -destination=./mocks/persistence_module_mock.go github.com/pokt-network/pocket/shared/modules PersistenceModule,PersistenceRWContext,PersistenceReadContext,PersistenceWriteContext +//go:generate mockgen -destination=./mocks/persistence_module_mock.go github.com/pokt-network/pocket/shared/modules PersistenceModule,PersistenceRWContext,PersistenceReadContext,PersistenceWriteContext,PersistenceLocalContext import ( + "math/big" + "github.com/pokt-network/pocket/persistence/blockstore" "github.com/pokt-network/pocket/persistence/indexer" "github.com/pokt-network/pocket/runtime/genesis" @@ -18,6 +20,9 @@ type PersistenceModule interface { // Context operations NewRWContext(height int64) (PersistenceRWContext, error) + // TODO(#406): removing height from "NewReadContext" input and passing it to specific methods seems a better choice. + // This could prevent confusion when retrieving the value of a parameter for a height less than the current height, + // e.g. when getting the App Token sessions multiplier for the starting height of a session. NewReadContext(height int64) (PersistenceReadContext, error) ReleaseWriteContext() error // The module can maintain many read contexts, but only one write context can exist at a time @@ -35,6 +40,11 @@ type PersistenceModule interface { // Debugging / development only HandleDebugMessage(*messaging.DebugMessage) error + + // GetLocalContext returns a local persistence context that can be used to store/retrieve node-specific, i.e. off-chain, data + // The module can maintain a single (i.e. a singleton) local context for both read and write operations: subsequent calls to GetLocalContext return + // the same local context. + GetLocalContext() (PersistenceLocalContext, error) } // Interface defining the context within which the node can operate with the persistence layer. @@ -133,6 +143,8 @@ type PersistenceWriteContext interface { // Flag Operations InitFlags() error SetFlag(paramName string, value any, enabled bool) error + + RecordRelayService(applicationAddress string, key []byte, relay *coreTypes.Relay, response *coreTypes.RelayResponse) error } type PersistenceReadContext interface { @@ -224,3 +236,20 @@ type PersistenceReadContext interface { GetStringFlag(paramName string, height int64) (string, bool, error) GetBytesFlag(paramName string, height int64) ([]byte, bool, error) } + +// PersistenceLocalContext defines the set of operations specific to local persistence. +// +// This context should be used for node-specific data, e.g. records of served relays. +// This is in contrast to PersistenceRWContext which should be used to store on-chain data. +type PersistenceLocalContext interface { + // StoreServicedRelay stores record of a serviced relay and its response in the local context. + // The stored service relays will be used to: + // a) check the number of tokens used per session, and + // b) prepare claim/proof messages once the session is over + // The "relayDigest" and "relayReqResBytes" parameters will be used as key and leaf contents in the constructed SMT, respectively. + StoreServicedRelay(session *coreTypes.Session, relayDigest, relayReqResBytes []byte) error + // GetSessionTokensUsed returns the number of tokens that have been used for the provided session. + // It returns the count of tokens used by the servicer instance + // for the application associated with the session + GetSessionTokensUsed(*coreTypes.Session) (*big.Int, error) +} diff --git a/shared/modules/utility_module.go b/shared/modules/utility_module.go index f89513200..c52bbac3c 100644 --- a/shared/modules/utility_module.go +++ b/shared/modules/utility_module.go @@ -83,6 +83,10 @@ type UnstakingActor interface { // CONSIDERATION: Consider removing `Utility` from `UtilityUnitOfWork` altogether // UtilityUnitOfWork is a unit of work (https://martinfowler.com/eaaCatalog/unitOfWork.html) that allows for atomicity and commit/rollback functionality +// +// It should be used to track a single, atomic, rollbackable state transition related to on-chain data. +// Conversely, use Local context from persistence module to track off-chain data that will eventually lead to a state transition, +// e.g. reward claim for a session that spans multiple blocks. type UtilityUnitOfWork interface { IntegratableModule diff --git a/utility/doc/CHANGELOG.md b/utility/doc/CHANGELOG.md index a8ee95591..ddd75c0b6 100644 --- a/utility/doc/CHANGELOG.md +++ b/utility/doc/CHANGELOG.md @@ -7,6 +7,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +## [0.0.0.46] - 2023-06-12 + +- Add trustless relay validation: available service tokens for the application +- Add HTTP relay execution logic to servicer + ## [0.0.0.45] - 2023-06-06 - Adds servicer, fisherman, and validator utility modules diff --git a/utility/servicer/module.go b/utility/servicer/module.go index 431463a0d..eb4ec9c26 100644 --- a/utility/servicer/module.go +++ b/utility/servicer/module.go @@ -1,20 +1,36 @@ package servicer import ( + "bytes" + "encoding/hex" "errors" "fmt" + "io" + "math/big" + "net/http" + "net/url" + "sync" + "time" "github.com/pokt-network/pocket/logger" + "github.com/pokt-network/pocket/persistence" "github.com/pokt-network/pocket/runtime/configs" + "github.com/pokt-network/pocket/shared/codec" coreTypes "github.com/pokt-network/pocket/shared/core/types" + "github.com/pokt-network/pocket/shared/crypto" "github.com/pokt-network/pocket/shared/modules" "github.com/pokt-network/pocket/shared/modules/base_modules" + "github.com/pokt-network/pocket/shared/utils" + typesUtil "github.com/pokt-network/pocket/utility/types" "golang.org/x/exp/slices" ) +// TECHDEBT(#519): Refactor error handling and consolidate with `shared/core/types/error.go` var ( errValidateBlockHeight = errors.New("relay failed block height validation") errValidateRelayMeta = errors.New("relay failed metadata validation") + errValidateServicer = errors.New("relay failed servicer validation") + errShouldMineRelay = errors.New("relay failed validating available tokens") _ modules.ServicerModule = &servicer{} ) @@ -23,12 +39,27 @@ const ( ServicerModuleName = "servicer" ) +// sessionTokens is used to cache the starting number of tokens available +// during a specific session: it is used as the value for a map with keys being applications' public keys +// TODO: What if we have a servicer managing more than one session from the same app at once? We may/may not need to resolve this in the future. +type sessionTokens struct { + sessionNumber int64 + startingTokenCountAvailable *big.Int +} + type servicer struct { base_modules.IntegratableModule base_modules.InterruptableModule logger *modules.Logger config *configs.ServicerConfig + + // This lock is needed to allow multiple GO routines update the totalTokens cache as part of serving relays + // NB: per the description in pkg.go.dev/sync#Map, we have chosen explicitly not to use sync.Map + rwlock sync.RWMutex + // totalTokens is a mapping from application public keys to session metadata to keep track of session tokens + // OPTIMIZE: There is an opportunity to simplify the code through various means such as, but not limited to, avoiding extra math.big operations or excess GetParam calls + totalTokens map[string]*sessionTokens } var ( @@ -44,7 +75,9 @@ func CreateServicer(bus modules.Bus, options ...modules.ModuleOption) (modules.S } func (*servicer) Create(bus modules.Bus, options ...modules.ModuleOption) (modules.Module, error) { - s := &servicer{} + s := &servicer{ + totalTokens: make(map[string]*sessionTokens), + } for _, option := range options { option(s) @@ -86,17 +119,88 @@ func (s *servicer) HandleRelay(relay *coreTypes.Relay) (*coreTypes.RelayResponse return nil, fmt.Errorf("Error admitting relay: %w", err) } - // TODO: implement Persist Relay - // TODO: implement execution - // TODO: implement state maintenance - // TODO: validate the response from the node? - // TODO: (QUESTION) Should we persist SignedRPC? - return nil, nil + response, err := s.executeRelay(relay) + if err != nil { + return nil, fmt.Errorf("Error executing relay: %w", err) + } + + // TODO(M6): Look into data integrity checks and response validation. + + session, err := s.getSession(relay) + if err != nil { + return nil, err + } + + relayDigest, relayReqResBytes, shouldStore, err := s.isRelayVolumeApplicable(session, relay, response) + if err != nil { + return nil, fmt.Errorf("Error calculating relay service digest: %w", err) + } + if !shouldStore { + return response, nil + } + + localCtx, err := s.GetBus().GetPersistenceModule().GetLocalContext() + if err != nil { + return nil, fmt.Errorf("Error getting a local context to update token usage for application %s: %w", relay.Meta.ApplicationAddress, err) + } + + if err := localCtx.StoreServicedRelay(session, relayDigest, relayReqResBytes); err != nil { + return nil, fmt.Errorf("Error recording service proof for application %s: %w", relay.Meta.ApplicationAddress, err) + } + + return response, nil +} + +// isRelayVolumeApplicable returns: +// 1. The signed digest of a relay/response pair +// 2. Whether a legit relay eligible for claiming rewards +// Legit means satisfying at-least the following conditions: not-replay and having a proper signature, +func (s *servicer) isRelayVolumeApplicable(session *coreTypes.Session, relay *coreTypes.Relay, response *coreTypes.RelayResponse) (digest, serializedRelayRes []byte, collides bool, err error) { + relayReqResBytes, err := codec.GetCodec().Marshal(&coreTypes.RelayReqRes{Relay: relay, Response: response}) + if err != nil { + return nil, nil, false, fmt.Errorf("Error marshalling relay and/or response: %w", err) + } + + relayDigest := crypto.SHA3Hash(relayReqResBytes) + signedDigest := s.sign(relayDigest) + response.ServicerSignature = hex.EncodeToString(signedDigest) + collision, err := s.isRelayVolumeApplicableOnChain(session, relayDigest) + if err != nil { + return nil, nil, false, fmt.Errorf("Error checking for relay replay by app %s for chain %s during session number %d: %w", + session.Application.Address, relay.Meta.RelayChain.Id, session.SessionNumber, err) + } + + return signedDigest, relayReqResBytes, collision, nil +} + +// INCOMPLETE(#832): provide a private key to the servicer and use it to sign all relays +func (s *servicer) sign(bz []byte) []byte { + return bz +} + +// INCOMPLETE: implement this according to the comment below +// isRelayVolumeApplicableOnChain returns whether the serialized serviced relay and the response, provided as `digest`, is eligible for reward +// +// on the service/chain corresponding to the provided session. +func (s *servicer) isRelayVolumeApplicableOnChain(session *coreTypes.Session, digest []byte) (bool, error) { + return false, nil +} + +// executeRelay performs the passed relay using the correct method depending on the relay payload type. +func (s *servicer) executeRelay(relay *coreTypes.Relay) (*coreTypes.RelayResponse, error) { + switch payload := relay.RelayPayload.(type) { + case *coreTypes.Relay_JsonRpcPayload: + return s.executeJsonRPCRelay(relay.Meta, payload.JsonRpcPayload) + case *coreTypes.Relay_RestPayload: + return s.executeRESTRelay(relay.Meta, payload.RestPayload) + default: + return nil, fmt.Errorf("Error executing relay on application %s: Unsupported type on payload %s", relay.Meta.ApplicationAddress, payload) + } } // validateRelayMeta ensures the relay metadata is valid for being handled by the servicer // REFACTOR: move the meta-specific validation to a Validator method on RelayMeta struct -func (s servicer) validateRelayMeta(meta *coreTypes.RelayMeta, currentHeight int64) error { +func (s *servicer) validateRelayMeta(meta *coreTypes.RelayMeta, currentHeight int64) error { if meta == nil { return fmt.Errorf("empty relay metadata") } @@ -105,7 +209,6 @@ func (s servicer) validateRelayMeta(meta *coreTypes.RelayMeta, currentHeight int return fmt.Errorf("relay chain unspecified") } - // TODO: supported chains: needs to be crossed-checked with the world state from the persistence layer if err := s.validateRelayChainSupport(meta.RelayChain, currentHeight); err != nil { return fmt.Errorf("validation of support for relay chain %s failed: %w", meta.RelayChain.Id, err) } @@ -113,9 +216,9 @@ func (s servicer) validateRelayMeta(meta *coreTypes.RelayMeta, currentHeight int return nil } -func (s servicer) validateRelayChainSupport(relayChain *coreTypes.Identifiable, currentHeight int64) error { - if !slices.Contains(s.config.Chains, relayChain.Id) { - return fmt.Errorf("chain %s not supported by servicer %s configuration", relayChain.Id, s.config.Address) +func (s *servicer) validateRelayChainSupport(relayChain *coreTypes.Identifiable, currentHeight int64) error { + if _, ok := s.config.Services[relayChain.Id]; !ok { + return fmt.Errorf("service %s not supported by servicer %s configuration", relayChain.Id, s.config.Address) } // DISCUSS: either update NewReadContext to take a uint64, or the GetCurrentHeight to return an int64. @@ -138,19 +241,87 @@ func (s servicer) validateRelayChainSupport(relayChain *coreTypes.Identifiable, return nil } -// TODO: implement -// validateApplication makes sure the application has not received more relays than allocated in the current session. -func (s servicer) validateApplication(meta *coreTypes.RelayMeta, session *coreTypes.Session) error { - /* - // if maxRelaysPerSession, overServiced := calculateAppSessionTokens(); overServiced { - return fmt.Errorf("application %s has exceeded its allocated relays %d for the session %d", meta.ApplicationPublicKey, maxRelaysPerSession) - } - */ - return nil +// ADDTEST: Need to add more unit tests to account for potential edge cases +// shouldMineRelay makes sure the application has not received more relays than allocated in the current session. +// returns nil if the servicer should attempt to mine another relay for the session provided +func (s *servicer) shouldMineRelay(session *coreTypes.Session) error { + servicerAppSessionTokens, err := s.startingTokenCountAvailable(session) + if err != nil { + return fmt.Errorf("Error calculating servicer tokens for application: %w", err) + } + + localCtx, err := s.GetBus().GetPersistenceModule().GetLocalContext() + if err != nil { + return fmt.Errorf("Error getting local persistence context: application %s session number %d: %w", session.Application.PublicKey, session.SessionNumber, err) + } + + usedAppSessionTokens, err := localCtx.GetSessionTokensUsed(session) + if err != nil { + return fmt.Errorf("Error getting servicer token usage: application %s session number %d: %w", session.Application.PublicKey, session.SessionNumber, err) + } + + if usedAppSessionTokens == nil || usedAppSessionTokens.Cmp(servicerAppSessionTokens) < 0 { + return nil // should attempt to mine a relay + } + + return fmt.Errorf("application %s has exceeded its allocated relays %s for session %d", + session.Application.PublicKey, + servicerAppSessionTokens, + session.SessionNumber) +} + +// cachedAppTokens returns the cached number of starting tokens for a session. +// +// This caching is done to remove the need for getting the starting number of tokens for a session every time a relay is being served. +func (s *servicer) cachedAppTokens(session *coreTypes.Session) *sessionTokens { + s.rwlock.RLock() + defer s.rwlock.RUnlock() + + return s.totalTokens[session.Application.PublicKey] +} + +// ADDTEST: Need to add more unit tests for the numerical portion of this functionality +// startingTokenCountAvailable returns the total number of tokens the Application corresponding to the provided session has per servicer at the start of the session. +// +// If nothing is cached, the maximum number of session tokens is computed. +func (s *servicer) startingTokenCountAvailable(session *coreTypes.Session) (*big.Int, error) { + tokens := s.cachedAppTokens(session) + if tokens != nil && tokens.startingTokenCountAvailable != nil && tokens.sessionNumber == session.SessionNumber { + return big.NewInt(1).Set(tokens.startingTokenCountAvailable), nil + } + + // Calculate this servicer's limit for the application in the current session. + // This is distributed rate limiting (DRL): no need to know how many requests have + // been performed for this application by other servicers. Instead, simply enforce + // this servicer's share of the application's tokens for this session. + appSessionTokens, err := s.calculateAppSessionTokens(session) + if err != nil { + return nil, fmt.Errorf("Error calculating application %s total tokens for session %d: %w", session.Application.PublicKey, session.SessionNumber, err) + } + + // type conversion from big.Int to big.Float + appTokens := big.NewFloat(1).SetInt(appSessionTokens) + servicerTokens := appTokens.Quo(appTokens, big.NewFloat(float64(len(session.Servicers)))) + + // This multiplication is performed to minimize the chance of under-utilization of application's tokens, + // while removing the overhead of communication between servicers which would be necessary otherwise. + // see https://arxiv.org/abs/2305.10672 for details on application and servicer distributed rate-limiting + adjustedTokens := servicerTokens.Mul(servicerTokens, big.NewFloat(1+s.config.RelayMiningVolumeAccuracy)) + roundedTokens, _ := adjustedTokens.Int(big.NewInt(1)) + + s.setAppSessionTokens(session, &sessionTokens{session.SessionNumber, roundedTokens}) + return roundedTokens, nil +} + +func (s *servicer) setAppSessionTokens(session *coreTypes.Session, tokens *sessionTokens) { + s.rwlock.Lock() + defer s.rwlock.Unlock() + + s.totalTokens[session.Application.PublicKey] = tokens } // validateServicer makes sure the servicer is A) active in the current session, and B) has not served more than its allocated relays for the session -func (s servicer) validateServicer(meta *coreTypes.RelayMeta, session *coreTypes.Session) error { +func (s *servicer) validateServicer(meta *coreTypes.RelayMeta, session *coreTypes.Session) error { if meta.ServicerPublicKey != s.config.PublicKey { return fmt.Errorf("relay servicer key %s does not match this servicer instance %s", meta.ServicerPublicKey, s.config.PublicKey) } @@ -167,12 +338,22 @@ func (s servicer) validateServicer(meta *coreTypes.RelayMeta, session *coreTypes return fmt.Errorf("relay servicer key %s not found in session %d with %d servicers", meta.ServicerPublicKey, session.SessionNumber, len(session.Servicers)) } - // TODO: implement isServicerMaxedOut return nil } +// getSession returns a session for the current height and the passed relay +func (s *servicer) getSession(relay *coreTypes.Relay) (*coreTypes.Session, error) { + height := s.GetBus().GetConsensusModule().CurrentHeight() + session, err := s.GetBus().GetUtilityModule().GetSession(relay.Meta.ApplicationAddress, int64(height), relay.Meta.RelayChain.Id, relay.Meta.GeoZone.Id) + if err != nil { + return nil, fmt.Errorf("failed to get a session for height %d for relay meta %s: %w", height, relay.Meta, err) + } + + return session, nil +} + // admitRelay decides whether the relay should be served -func (s servicer) admitRelay(relay *coreTypes.Relay) error { +func (s *servicer) admitRelay(relay *coreTypes.Relay) error { // TODO: utility module should initialize the servicer (if this module instance is a servicer) const errPrefix = "Error admitting relay" @@ -182,31 +363,123 @@ func (s servicer) admitRelay(relay *coreTypes.Relay) error { height := s.GetBus().GetConsensusModule().CurrentHeight() if err := s.validateRelayMeta(relay.Meta, int64(height)); err != nil { - return fmt.Errorf("%w: %s", errValidateRelayMeta, err.Error()) + return fmt.Errorf("%s: %w", err.Error(), errValidateRelayMeta) } - // TODO: update the CLI to include ApplicationAddress(or Application Public Key) in the RelayMeta - session, err := s.GetBus().GetUtilityModule().GetSession(relay.Meta.ApplicationAddress, int64(height), relay.Meta.RelayChain.Id, relay.Meta.GeoZone.Id) + session, err := s.getSession(relay) if err != nil { - return fmt.Errorf("%s: failed to get a session for height %d for relay meta %s: %w", errPrefix, height, relay.Meta, err) + return err } - // TODO: (REFACTOR) use a loop to run all validators: would also remove the need for passing the session around if err := validateRelayBlockHeight(relay.Meta, session); err != nil { - return fmt.Errorf("%w: %s", errValidateBlockHeight, err.Error()) + return fmt.Errorf("%s: %w", err.Error(), errValidateBlockHeight) } - if err := s.validateApplication(relay.Meta, session); err != nil { - return fmt.Errorf("%s: relay failed application validation: %w", errPrefix, err) + if err := s.validateServicer(relay.Meta, session); err != nil { + return fmt.Errorf("%s: %s: %w", errPrefix, err.Error(), errValidateServicer) } - if err := s.validateServicer(relay.Meta, session); err != nil { - return fmt.Errorf("%s: relay failed servicer instance validation: %w", errPrefix, err) + if err := s.shouldMineRelay(session); err != nil { + return fmt.Errorf("%s: %s: %w", errPrefix, err.Error(), errShouldMineRelay) } return nil } +// ADDTEST: Need to add more unit tests for the numerical portion of this functionality +// calculateAppSessionTokens determines the number of "session tokens" an application gets at the beginning +// of every session. Each servicer will serve a maximum of ~(Session Tokens / Number of Servicers in the Session) relays for the application +func (s *servicer) calculateAppSessionTokens(session *coreTypes.Session) (*big.Int, error) { + appStake, err := utils.StringToBigInt(session.Application.StakedAmount) + if err != nil { + return nil, fmt.Errorf("Error processing application's staked amount %s: %w", session.Application.StakedAmount, coreTypes.ErrStringToBigInt(err)) + } + + // TODO(M5): find the right document to explain the following: + // We assume that the value of certain parameters only changes/takes effect at the start of a session. + // In this specific case, the `AppSessionTokensMultiplierParamName` parameter is retrieved for the height that + // matches the beginning of the session. + readCtx, err := s.GetBus().GetPersistenceModule().NewReadContext(session.SessionHeight) + if err != nil { + return nil, fmt.Errorf("error getting persistence context at height %d: %w", session.SessionHeight, err) + } + defer readCtx.Release() //nolint:errcheck // We only need to make sure the readCtx is released + + appStakeTokensMultiplier, err := persistence.GetParameter[int](readCtx, typesUtil.AppSessionTokensMultiplierParamName, session.SessionHeight) + if err != nil { + return nil, fmt.Errorf("error reading parameter %s at height %d from persistence: %w", typesUtil.AppSessionTokensMultiplierParamName, session.SessionHeight, err) + } + + return appStake.Mul(appStake, big.NewInt(int64(appStakeTokensMultiplier))), nil +} + +// executeJsonRPCRelay performs the relay for JSON-RPC payloads, sending them to the chain's/service's URL. +func (s *servicer) executeJsonRPCRelay(meta *coreTypes.RelayMeta, payload *coreTypes.JSONRPCPayload) (*coreTypes.RelayResponse, error) { + if meta == nil || meta.RelayChain == nil || meta.RelayChain.Id == "" { + return nil, fmt.Errorf("Relay for application %s does not specify relay chain", meta.ApplicationAddress) + } + + serviceConfig, ok := s.config.Services[meta.RelayChain.Id] + if !ok { + return nil, fmt.Errorf("Chain %s not found in servicer configuration: %w", meta.RelayChain.Id, errValidateRelayMeta) + } + + relayBytes, err := codec.GetCodec().Marshal(payload) + if err != nil { + return nil, fmt.Errorf("Error marshalling payload %s: %w", payload.String(), err) + } + + return s.executeHTTPRelay(serviceConfig, relayBytes, payload.Headers) +} + +// executeRESTRelay performs the relay for REST payloads, sending them to the chain's/service's URL. +// INCOMPLETE(#860): RESTful service relays: basic checks and execution through HTTP calls. +func (s *servicer) executeRESTRelay(meta *coreTypes.RelayMeta, _ *coreTypes.RESTPayload) (*coreTypes.RelayResponse, error) { + if _, ok := s.config.Services[meta.RelayChain.Id]; !ok { + return nil, fmt.Errorf("Chain %s not found in servicer configuration: %w", meta.RelayChain.Id, errValidateRelayMeta) + } + return nil, nil +} + +// executeHTTPRequest performs the HTTP request that sends the relay to the chain's/service's URL. +func (s *servicer) executeHTTPRelay(serviceConfig *configs.ServiceConfig, payload []byte, headers map[string]string) (*coreTypes.RelayResponse, error) { + serviceUrl, err := url.Parse(serviceConfig.Url) + if err != nil { + return nil, fmt.Errorf("Error parsing chain URL %s: %w", serviceConfig.Url, err) + } + + req, err := http.NewRequest(http.MethodPost, serviceUrl.String(), bytes.NewBuffer(payload)) + if err != nil { + return nil, err + } + + if auth := serviceConfig.BasicAuth; auth != nil && auth.UserName != "" { + req.SetBasicAuth(auth.UserName, auth.Password) + } + + // INVESTIGATE: do we need a default user-agent for HTTP requests? + for k, v := range headers { + req.Header.Set(k, v) + } + if req.Header.Get("Content-Type") == "" { + req.Header.Set("Content-Type", "application/json") + } + + // INCOMPLETE(#837): Optimize usage of HTTP client, e.g. connection reuse, depending on the volume of relays a servicer is expected to handle + resp, err := (&http.Client{Timeout: time.Duration(serviceConfig.TimeoutMsec) * time.Millisecond}).Do(req) + if err != nil { + return nil, fmt.Errorf("Error performing the HTTP request for relay: %w", err) + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("Error reading response body: %w", err) + } + + return &coreTypes.RelayResponse{Payload: string(body)}, nil +} + // IMPROVE: Add session height tolerance to account for session rollovers func validateRelayBlockHeight(relayMeta *coreTypes.RelayMeta, session *coreTypes.Session) error { sessionStartingBlock := session.SessionNumber * session.NumSessionBlocks diff --git a/utility/servicer/module_test.go b/utility/servicer/module_test.go index e0833b5b1..6ab5a5646 100644 --- a/utility/servicer/module_test.go +++ b/utility/servicer/module_test.go @@ -1,7 +1,12 @@ package servicer import ( - "errors" + "fmt" + "log" + "math/big" + "net/http" + "net/http/httptest" + "os" "testing" "github.com/golang/mock/gomock" @@ -9,27 +14,86 @@ import ( "github.com/pokt-network/pocket/runtime/configs" coreTypes "github.com/pokt-network/pocket/shared/core/types" + "github.com/pokt-network/pocket/shared/crypto" "github.com/pokt-network/pocket/shared/modules" mockModules "github.com/pokt-network/pocket/shared/modules/mocks" + typesUtil "github.com/pokt-network/pocket/utility/types" ) -var testServicer1 = &coreTypes.Actor{ - ActorType: coreTypes.ActorType_ACTOR_TYPE_SERVICER, - Address: "a3d9ea9d9ad9c58bb96ec41340f83cb2cabb6496", - PublicKey: "a6cd0a304c38d76271f74dd3c90325144425d904ef1b9a6fbab9b201d75a998b", - Chains: []string{"0021"}, +const ( + testAppsTokensMultiplier = int(2) + testCurrentHeight = int64(9) +) + +// INCOMPLETE(#833) add e2e testing on servicer's features + +var ( + // Initialized in TestMain + testServicer1 *coreTypes.Actor + + // Initialized in TestMain + testApp1 *coreTypes.Actor + + // Initialized in TestMain + testServiceConfig1 *configs.ServiceConfig +) + +// testPublicKey is a helper that returns a public key and its corresponding address +func testPublicKey() (publicKey, address string) { + pk, err := crypto.GeneratePublicKey() + if err != nil { + log.Fatalf("Error creating public key: %s", err) + } + + return pk.String(), pk.Address().String() +} + +// TestMain initialized the test fixtures for all the unit tests in the servicer package +func TestMain(m *testing.M) { + servicerPublicKey, servicerAddr := testPublicKey() + testServicer1 = &coreTypes.Actor{ + ActorType: coreTypes.ActorType_ACTOR_TYPE_SERVICER, + Address: servicerAddr, + PublicKey: servicerPublicKey, + Chains: []string{"POKT-UnitTestNet"}, + StakedAmount: "1000", + } + + appPublicKey, appAddr := testPublicKey() + testApp1 = &coreTypes.Actor{ + ActorType: coreTypes.ActorType_ACTOR_TYPE_APP, + Address: appAddr, + PublicKey: appPublicKey, + StakedAmount: "1000", + } + + testServiceConfig1 = &configs.ServiceConfig{ + Url: "http://chain-url.pokt.network", + TimeoutMsec: 1234, + BasicAuth: &configs.BasicAuth{ + UserName: "user1", + Password: "password1", + }, + } + + os.Exit(m.Run()) } -func TestAdmitRelay(t *testing.T) { - currentHeight := uint64(9) +func TestRelay_Admit(t *testing.T) { + const ( + currentSessionNumber = 2 + testSessionStartingHeight = 8 + ) + testCases := []struct { - name string - relay *coreTypes.Relay - expected error + name string + usedSessionTokens int64 + relay *coreTypes.Relay + expected error }{ { name: "valid relay is admitted", - relay: testRelay("0021", int64(currentHeight)), + relay: testRelay(), }, { name: "Relay with empty Meta is rejected", @@ -42,64 +106,166 @@ func TestAdmitRelay(t *testing.T) { expected: errValidateRelayMeta, }, { - name: "Relay for unsupported chain is rejected", - relay: testRelay("foo", 8), + name: "Relay for unsupported service is rejected", + relay: testRelay(testRelayChain("foo")), expected: errValidateRelayMeta, }, { name: "Relay with height set in a past session is rejected", - relay: testRelay("0021", 5), + relay: testRelay(testRelayHeight(5)), expected: errValidateBlockHeight, }, { name: "Relay with height set in a future session is rejected", - relay: testRelay("0021", 9999), + relay: testRelay(testRelayHeight(9999)), expected: errValidateBlockHeight, }, + { + name: "Relay not matching the servicer in this session is rejected", + relay: testRelay(testRelayServicer("bar")), + expected: errValidateServicer, + }, + { + name: "Relay for app out of quota is rejected", + relay: testRelay(), + usedSessionTokens: 999999, + expected: errShouldMineRelay, + }, } for _, testCase := range testCases { t.Run(testCase.name, func(t *testing.T) { config := testServicerConfig() session := testSession( - sessionNumber(2), + sessionNumber(currentSessionNumber), sessionBlocks(4), - sessionHeight(2), + sessionHeight(testSessionStartingHeight), sessionServicers(testServicer1), ) - mockBus := mockBus(t, &config, currentHeight, session) + mockBus := mockBus(t, &config, uint64(testCurrentHeight), session, testCase.usedSessionTokens) servicerMod, err := CreateServicer(mockBus) require.NoError(t, err) - err = servicerMod.(*servicer).admitRelay(testCase.relay) - if !errors.Is(err, testCase.expected) { - t.Fatalf("Expected error %v got: %v", testCase.expected, err) + servicer, ok := servicerMod.(*servicer) + require.True(t, ok) + + err = servicer.admitRelay(testCase.relay) + require.ErrorIs(t, err, testCase.expected) + }) + } +} + +func TestRelay_Execute(t *testing.T) { + testCases := []struct { + name string + relay *coreTypes.Relay + expectedErr error + }{ + { + name: "relay is rejected if service is not specified in the config", + relay: testRelay(testRelayChain("foo")), + expectedErr: errValidateRelayMeta, + }, + { + name: "Relay for accepted service is executed", + relay: testRelay(), + }, + { + name: "JSONRPC Relay is executed", + relay: testRelay(testEthGoerliRelay()), + }, + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, `{"0x1234"}`) + })) + defer ts.Close() + + config := testServicerConfig() + for svc := range config.Services { + config.Services[svc].Url = ts.URL } + + servicer := &servicer{config: &config} + _, err := servicer.executeRelay(testCase.relay) + require.ErrorIs(t, err, testCase.expectedErr) + // INCOMPLETE(@adshmh): verify HTTP request properties: payload/headers/user-agent/etc. }) } } -func testRelay(chain string, height int64) *coreTypes.Relay { - return &coreTypes.Relay{ +type relayEditor func(*coreTypes.Relay) + +func testRelayServicer(publicKey string) relayEditor { + return func(relay *coreTypes.Relay) { + relay.Meta.ServicerPublicKey = publicKey + } +} + +func testRelayChain(chain string) relayEditor { + return func(relay *coreTypes.Relay) { + relay.Meta.RelayChain = &coreTypes.Identifiable{Id: chain} + } +} + +func testRelayHeight(height int64) relayEditor { + return func(relay *coreTypes.Relay) { + relay.Meta.BlockHeight = height + } +} + +func testEthGoerliRelay() relayEditor { + return func(relay *coreTypes.Relay) { + relay.Meta.RelayChain.Id = "ETH-Goerli" + relay.RelayPayload = &coreTypes.Relay_JsonRpcPayload{ + JsonRpcPayload: &coreTypes.JSONRPCPayload{ + Id: []byte("1"), + JsonRpc: "2.0", + Method: "eth_blockNumber", + }, + } + } +} + +func testRelay(editors ...relayEditor) *coreTypes.Relay { + relay := &coreTypes.Relay{ Meta: &coreTypes.RelayMeta{ - ServicerPublicKey: testServicer1.PublicKey, - BlockHeight: height, + ServicerPublicKey: testServicer1.PublicKey, + ApplicationAddress: testApp1.Address, + BlockHeight: testCurrentHeight, RelayChain: &coreTypes.Identifiable{ - Id: chain, + Id: "POKT-UnitTestNet", }, GeoZone: &coreTypes.Identifiable{ Id: "geozone", }, }, + RelayPayload: &coreTypes.Relay_RestPayload{ + RestPayload: &coreTypes.RESTPayload{ + HttpPath: "/v1/height", + RequestType: coreTypes.RESTRequestType_RESTRequestTypeGET, + }, + }, } + + for _, editor := range editors { + editor(relay) + } + + return relay } func testServicerConfig() configs.ServicerConfig { return configs.ServicerConfig{ PublicKey: testServicer1.PublicKey, Address: testServicer1.Address, - Chains: testServicer1.Chains, + Services: map[string]*configs.ServiceConfig{ + "POKT-UnitTestNet": testServiceConfig1, + "ETH-Goerli": testServiceConfig1, + }, } } @@ -131,7 +297,8 @@ func sessionServicers(servicers ...*coreTypes.Actor) func(*coreTypes.Session) { func testSession(editors ...sessionModifier) *coreTypes.Session { session := coreTypes.Session{ - Id: "session-1", + Id: "session-1", + Application: testApp1, } for _, editor := range editors { editor(&session) @@ -140,7 +307,7 @@ func testSession(editors ...sessionModifier) *coreTypes.Session { } // Create a mockBus with mock implementations of consensus and utility modules -func mockBus(t *testing.T, cfg *configs.ServicerConfig, height uint64, session *coreTypes.Session) *mockModules.MockBus { +func mockBus(t *testing.T, cfg *configs.ServicerConfig, height uint64, session *coreTypes.Session, usedSessionTokens int64) *mockModules.MockBus { ctrl := gomock.NewController(t) runtimeMgrMock := mockModules.NewMockRuntimeMgr(ctrl) runtimeMgrMock.EXPECT().GetConfig().Return(&configs.Config{Servicer: cfg}).AnyTimes() @@ -148,15 +315,22 @@ func mockBus(t *testing.T, cfg *configs.ServicerConfig, height uint64, session * consensusMock := mockModules.NewMockConsensusModule(ctrl) consensusMock.EXPECT().CurrentHeight().Return(height).AnyTimes() - persistenceMock := mockModules.NewMockPersistenceModule(ctrl) persistenceReadContextMock := mockModules.NewMockPersistenceReadContext(ctrl) - persistenceReadContextMock.EXPECT().GetServicer(gomock.Any(), gomock.Any()).Return(testServicer1, nil).AnyTimes() persistenceReadContextMock.EXPECT().Release().AnyTimes() + persistenceReadContextMock.EXPECT().GetServicer(gomock.Any(), gomock.Any()).Return(testServicer1, nil).AnyTimes() + persistenceReadContextMock.EXPECT().GetIntParam(typesUtil.AppSessionTokensMultiplierParamName, session.SessionHeight). + Return(testAppsTokensMultiplier, nil).AnyTimes() + persistenceLocalContextMock := mockModules.NewMockPersistenceLocalContext(ctrl) + persistenceLocalContextMock.EXPECT().StoreServicedRelay(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + persistenceLocalContextMock.EXPECT().GetSessionTokensUsed(gomock.Any()).Return(big.NewInt(usedSessionTokens), nil).AnyTimes() + + persistenceMock := mockModules.NewMockPersistenceModule(ctrl) persistenceMock.EXPECT().GetModuleName().Return(modules.PersistenceModuleName).AnyTimes() persistenceMock.EXPECT().Start().Return(nil).AnyTimes() persistenceMock.EXPECT().SetBus(gomock.Any()).Return().AnyTimes() persistenceMock.EXPECT().NewReadContext(gomock.Any()).Return(persistenceReadContextMock, nil).AnyTimes() + persistenceMock.EXPECT().GetLocalContext().Return(persistenceLocalContextMock, nil).AnyTimes() busMock := mockModules.NewMockBus(ctrl) busMock.EXPECT().GetRuntimeMgr().Return(runtimeMgrMock).AnyTimes()