Skip to content

Commit

Permalink
[Relayminer] Query for on-chain session param num_blocks_per_session (
Browse files Browse the repository at this point in the history
#538)

Co-authored-by: Daniel Olshansky <[email protected]>
  • Loading branch information
bryanchriswhite and Olshansk authored May 29, 2024
1 parent 6f2fe5b commit 7507dc5
Show file tree
Hide file tree
Showing 34 changed files with 313 additions and 162 deletions.
27 changes: 13 additions & 14 deletions load-testing/tests/relays_stress_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (
"github.com/pokt-network/poktroll/testutil/testclient/testeventsquery"
apptypes "github.com/pokt-network/poktroll/x/application/types"
gatewaytypes "github.com/pokt-network/poktroll/x/gateway/types"
"github.com/pokt-network/poktroll/x/session/keeper"
"github.com/pokt-network/poktroll/x/shared"
sharedtypes "github.com/pokt-network/poktroll/x/shared/types"
suppliertypes "github.com/pokt-network/poktroll/x/supplier/types"
)
Expand Down Expand Up @@ -173,9 +173,9 @@ func (s *relaysSuite) mapSessionInfoForLoadTestDurationFn(

sessionInfo := &sessionInfoNotif{
blockHeight: blockHeight,
sessionNumber: keeper.GetSessionNumber(blockHeight),
sessionStartBlockHeight: keeper.GetSessionStartBlockHeight(blockHeight),
sessionEndBlockHeight: keeper.GetSessionEndBlockHeight(blockHeight),
sessionNumber: shared.GetSessionNumber(blockHeight),
sessionStartBlockHeight: shared.GetSessionStartBlockHeight(blockHeight),
sessionEndBlockHeight: shared.GetSessionEndBlockHeight(blockHeight),
}

infoLogger := logger.Info().
Expand Down Expand Up @@ -330,15 +330,15 @@ func (plans *actorLoadTestIncrementPlans) validateAppSupplierPermutations(t gocu
// Otherwise, the expected baseline for several metrics will be periodically skewed.
func (plans *actorLoadTestIncrementPlans) validateIncrementRates(t gocuke.TestingT) {
require.Truef(t,
plans.gateways.blocksPerIncrement%keeper.NumBlocksPerSession == 0,
plans.gateways.blocksPerIncrement%shared.NumBlocksPerSession == 0,
"gateway increment rate must be a multiple of the session length",
)
require.Truef(t,
plans.suppliers.blocksPerIncrement%keeper.NumBlocksPerSession == 0,
plans.suppliers.blocksPerIncrement%shared.NumBlocksPerSession == 0,
"supplier increment rate must be a multiple of the session length",
)
require.Truef(t,
plans.apps.blocksPerIncrement%keeper.NumBlocksPerSession == 0,
plans.apps.blocksPerIncrement%shared.NumBlocksPerSession == 0,
"app increment rate must be a multiple of the session length",
)
}
Expand Down Expand Up @@ -369,12 +369,11 @@ func (plans *actorLoadTestIncrementPlans) totalDurationBlocks() int64 {
// last increment duration (i.e. **after** maxActorCount actors are activated).
blocksToLastSessionEnd := plans.maxActorBlocksToFinalIncrementEnd()

sessionGracePeriodBlocks := keeper.GetSessionGracePeriodBlockCount()
blocksToLastProofWindowEnd := blocksToLastSessionEnd + sessionGracePeriodBlocks
blocksToLastProofWindowEnd := blocksToLastSessionEnd + shared.SessionGracePeriodBlocks

// Add one session length so that the duration is inclusive of the block which
// commits the last session's proof.
return blocksToLastProofWindowEnd + keeper.NumBlocksPerSession
return blocksToLastProofWindowEnd + shared.NumBlocksPerSession
}

// blocksToFinalIncrementStart returns the number of blocks that will have
Expand Down Expand Up @@ -721,9 +720,9 @@ func (plan *actorLoadTestIncrementPlan) shouldIncrementActorCount(
return false
}

initialSessionNumber := keeper.GetSessionNumber(startBlockHeight)
initialSessionNumber := shared.GetSessionNumber(startBlockHeight)
// TODO_TECHDEBT(#21): replace with gov param query when available.
actorSessionIncRate := plan.blocksPerIncrement / keeper.NumBlocksPerSession
actorSessionIncRate := plan.blocksPerIncrement / shared.NumBlocksPerSession
nextSessionNumber := sessionInfo.sessionNumber + 1 - initialSessionNumber
isSessionStartHeight := sessionInfo.blockHeight == sessionInfo.sessionStartBlockHeight
isActorIncrementHeight := nextSessionNumber%actorSessionIncRate == 0
Expand All @@ -747,9 +746,9 @@ func (plan *actorLoadTestIncrementPlan) shouldIncrementSupplierCount(
return false
}

initialSessionNumber := keeper.GetSessionNumber(startBlockHeight)
initialSessionNumber := shared.GetSessionNumber(startBlockHeight)
// TODO_TECHDEBT(#21): replace with gov param query when available.
supplierSessionIncRate := plan.blocksPerIncrement / keeper.NumBlocksPerSession
supplierSessionIncRate := plan.blocksPerIncrement / shared.NumBlocksPerSession
nextSessionNumber := sessionInfo.sessionNumber + 1 - initialSessionNumber
isSessionEndHeight := sessionInfo.blockHeight == sessionInfo.sessionEndBlockHeight
isActorIncrementHeight := nextSessionNumber%supplierSessionIncRate == 0
Expand Down
8 changes: 8 additions & 0 deletions pkg/client/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
//go:generate mockgen -destination=../../testutil/mockclient/application_query_client_mock.go -package=mockclient . ApplicationQueryClient
//go:generate mockgen -destination=../../testutil/mockclient/supplier_query_client_mock.go -package=mockclient . SupplierQueryClient
//go:generate mockgen -destination=../../testutil/mockclient/session_query_client_mock.go -package=mockclient . SessionQueryClient
//go:generate mockgen -destination=../../testutil/mockclient/shared_query_client_mock.go -package=mockclient . SharedQueryClient
//go:generate mockgen -destination=../../testutil/mockclient/cosmos_tx_builder_mock.go -package=mockclient github.com/cosmos/cosmos-sdk/client TxBuilder
//go:generate mockgen -destination=../../testutil/mockclient/cosmos_keyring_mock.go -package=mockclient github.com/cosmos/cosmos-sdk/crypto/keyring Keyring
//go:generate mockgen -destination=../../testutil/mockclient/cosmos_client_mock.go -package=mockclient github.com/cosmos/cosmos-sdk/client AccountRetriever
Expand Down Expand Up @@ -275,3 +276,10 @@ type SessionQueryClient interface {
blockHeight int64,
) (*sessiontypes.Session, error)
}

// SharedQueryClient defines an interface that enables the querying of the
// on-chain shared module information.
type SharedQueryClient interface {
// GetParams queries the chain for the current shared module parameters.
GetParams(ctx context.Context) (*sharedtypes.Params, error)
}
1 change: 1 addition & 0 deletions pkg/client/query/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ var (
ErrQueryUnableToDeserializeAccount = sdkerrors.Register(codespace, 2, "unable to deserialize account")
ErrQueryRetrieveSession = sdkerrors.Register(codespace, 3, "error while trying to retrieve a session")
ErrQueryPubKeyNotFound = sdkerrors.Register(codespace, 4, "account pub key not found")
ErrQuerySessionParams = sdkerrors.Register(codespace, 5, "unable to query session params")
)
2 changes: 1 addition & 1 deletion pkg/client/query/sessionquerier.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"

"cosmossdk.io/depinject"
grpc "github.com/cosmos/gogoproto/grpc"
"github.com/cosmos/gogoproto/grpc"

"github.com/pokt-network/poktroll/pkg/client"
sessiontypes "github.com/pokt-network/poktroll/x/session/types"
Expand Down
55 changes: 55 additions & 0 deletions pkg/client/query/sharedquerier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package query

import (
"context"

"cosmossdk.io/depinject"
"github.com/cosmos/gogoproto/grpc"

"github.com/pokt-network/poktroll/pkg/client"
sharedtypes "github.com/pokt-network/poktroll/x/shared/types"
)

var _ client.SharedQueryClient = (*sharedQuerier)(nil)

// sharedQuerier is a wrapper around the sharedtypes.QueryClient that enables the
// querying of on-chain shared information through a single exposed method
// which returns an sharedtypes.Session struct
type sharedQuerier struct {
clientConn grpc.ClientConn
sharedQuerier sharedtypes.QueryClient
}

// NewSharedQuerier returns a new instance of a client.SharedQueryClient by
// injecting the dependecies provided by the depinject.Config.
//
// Required dependencies:
// - clientCtx
func NewSharedQuerier(deps depinject.Config) (client.SharedQueryClient, error) {
querier := &sharedQuerier{}

if err := depinject.Inject(
deps,
&querier.clientConn,
); err != nil {
return nil, err
}

querier.sharedQuerier = sharedtypes.NewQueryClient(querier.clientConn)

return querier, nil
}

// GetParams queries & returns the shared module on-chain parameters.
//
// TODO_TECHDEBT(#543): We don't really want to have to query the params for every method call.
// Once `ModuleParamsClient` is implemented, use its replay observable's `#Last()` method
// to get the most recently (asynchronously) observed (and cached) value.
func (sq *sharedQuerier) GetParams(ctx context.Context) (*sharedtypes.Params, error) {
req := &sharedtypes.QueryParamsRequest{}
res, err := sq.sharedQuerier.Params(ctx, req)
if err != nil {
return nil, ErrQuerySessionParams.Wrapf("[%v]", err)
}
return &res.Params, nil
}
6 changes: 3 additions & 3 deletions pkg/crypto/rings/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ import (
ring_secp256k1 "github.com/athanorlabs/go-dleq/secp256k1"
ringtypes "github.com/athanorlabs/go-dleq/types"
cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types"
ring "github.com/noot/ring-go"
"github.com/noot/ring-go"

"github.com/pokt-network/poktroll/pkg/client"
"github.com/pokt-network/poktroll/pkg/crypto"
"github.com/pokt-network/poktroll/pkg/polylog"
apptypes "github.com/pokt-network/poktroll/x/application/types"
"github.com/pokt-network/poktroll/x/service/types"
sessionkeeper "github.com/pokt-network/poktroll/x/session/keeper"
"github.com/pokt-network/poktroll/x/shared"
)

var _ crypto.RingClient = (*ringClient)(nil)
Expand Down Expand Up @@ -265,7 +265,7 @@ func (rc *ringClient) getRingPointsForAddressAtHeight(
// gateways that have been undelegated after the target session end height.
func GetRingAddressesAtBlock(app *apptypes.Application, blockHeight int64) []string {
// Get the target session end height at which we want to get the active delegations.
targetSessionEndHeight := uint64(sessionkeeper.GetSessionEndBlockHeight(blockHeight))
targetSessionEndHeight := uint64(shared.GetSessionEndBlockHeight(blockHeight))
// Get the current active delegations for the application and use them as a base.
activeDelegationsAtHeight := app.DelegateeGatewayAddresses

Expand Down
20 changes: 19 additions & 1 deletion pkg/deps/config/suppliers.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ func NewSupplyPOKTRollSDKFn(signingKeyName string) SupplierFn {
}
}

// newSupplyBlockQueryClientFn returns a function which constructs a
// NewSupplyBlockQueryClientFn returns a function which constructs a
// BlockQueryClient instance and returns a new depinject.Config which
// is supplied with the given deps and the new BlockQueryClient.
func NewSupplyBlockQueryClientFn(queryNodeRPCUrl *url.URL) SupplierFn {
Expand All @@ -395,3 +395,21 @@ func NewSupplyBlockQueryClientFn(queryNodeRPCUrl *url.URL) SupplierFn {
return depinject.Configs(deps, depinject.Supply(blockQueryClient)), nil
}
}

// NewSupplySharedQueryClientFn returns a function which constructs a
// SharedQueryClient instance and returns a new depinject.Config which
// is supplied with the given deps and the new SharedQueryClient.
func NewSupplySharedQueryClientFn() SupplierFn {
return func(
_ context.Context,
deps depinject.Config,
_ *cobra.Command,
) (depinject.Config, error) {
sharedQuerier, err := query.NewSharedQuerier(deps)
if err != nil {
return nil, err
}

return depinject.Configs(deps, depinject.Supply(sharedQuerier)), nil
}
}
1 change: 1 addition & 0 deletions pkg/relayer/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ func setupRelayerDependencies(
supplyMiner, // leaf
config.NewSupplyTxClientContextFn(queryNodeGRPCUrl, txNodeRPCUrl), // leaf
config.NewSupplyDelegationClientFn(), // leaf
config.NewSupplySharedQueryClientFn(), // leaf
config.NewSupplyAccountQuerierFn(),
config.NewSupplyApplicationQuerierFn(),
config.NewSupplySupplierQuerierFn(),
Expand Down
5 changes: 3 additions & 2 deletions pkg/relayer/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ type relayerProxy struct {
// which contains the supported services, RPC types, and endpoints, etc...
supplierQuerier client.SupplierQueryClient

// sessionQuerier is the querier used to get the current session from the blockchain,
// which is needed to check if the relay proxy should be serving an incoming relay request.
// sessionQuerier is the query client used to get the current session & session params
// from the blockchain, which are needed to check if the relay proxy should be serving an
// incoming relay request.
sessionQuerier client.SessionQueryClient

// servers is a map of listenAddress -> RelayServer provided by the relayer proxy,
Expand Down
12 changes: 6 additions & 6 deletions pkg/relayer/proxy/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/pokt-network/poktroll/pkg/relayer/config"
"github.com/pokt-network/poktroll/pkg/relayer/proxy"
"github.com/pokt-network/poktroll/testutil/testproxy"
sessionkeeper "github.com/pokt-network/poktroll/x/session/keeper"
"github.com/pokt-network/poktroll/x/shared"
sharedtypes "github.com/pokt-network/poktroll/x/shared/types"
)

Expand Down Expand Up @@ -346,14 +346,14 @@ func TestRelayerProxy_Relays(t *testing.T) {
// blockOutsideSessionGracePeriod is the block height that is after the first
// session's grace period and within the second session's grace period,
// meaning a relay should not be handled at this block height.
blockOutsideSessionGracePeriod := blockHeight +
sessionkeeper.NumBlocksPerSession +
sessionkeeper.GetSessionGracePeriodBlockCount()
blockOutsideSessionGracePeriod := int64(blockHeight +
shared.NumBlocksPerSession +
shared.SessionGracePeriodBlocks)

// blockWithinSessionGracePeriod is the block height that is after the first
// session but within its session's grace period, meaning a relay should be
// handled at this block height.
blockWithinSessionGracePeriod := blockHeight + sessionkeeper.GetSessionGracePeriodBlockCount()
blockWithinSessionGracePeriod := int64(blockHeight + shared.SessionGracePeriodBlocks)

tests := []struct {
desc string
Expand Down Expand Up @@ -656,7 +656,7 @@ func sendRequestWithDifferentSession(
test *testproxy.TestBehavior,
) (errCode int32, errorMessage string) {
// Use a block height that generates a different session ID
blockHeightAfterSessionGracePeriod := blockHeight + sessionkeeper.GetSessionGracePeriodBlockCount()
blockHeightAfterSessionGracePeriod := int64(blockHeight + shared.SessionGracePeriodBlocks)
req := testproxy.GenerateRelayRequest(
test,
appPrivateKey,
Expand Down
4 changes: 2 additions & 2 deletions pkg/relayer/proxy/relay_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package proxy
import (
"context"

sessiontypes "github.com/pokt-network/poktroll/pkg/relayer/session"
"github.com/pokt-network/poktroll/x/service/types"
"github.com/pokt-network/poktroll/x/shared"
sharedtypes "github.com/pokt-network/poktroll/x/shared/types"
)

Expand Down Expand Up @@ -99,7 +99,7 @@ func (rp *relayerProxy) getTargetSessionBlockHeight(
if sessionEndblockHeight < currentBlockHeight {
// Do not process the `RelayRequest` if the session has expired and the current
// block height is outside the session's grace period.
if sessiontypes.IsWithinGracePeriod(sessionEndblockHeight, currentBlockHeight) {
if !shared.IsGracePeriodElapsed(sessionEndblockHeight, currentBlockHeight) {
// The RelayRequest's session has expired but is still within the
// grace period so process it as if the session is still active.
return sessionEndblockHeight, nil
Expand Down
11 changes: 8 additions & 3 deletions pkg/relayer/session/claim.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/pokt-network/poktroll/pkg/observable/logging"
"github.com/pokt-network/poktroll/pkg/relayer"
"github.com/pokt-network/poktroll/pkg/relayer/protocol"
sessionkeeper "github.com/pokt-network/poktroll/x/session/keeper"
"github.com/pokt-network/poktroll/x/shared"
)

// createClaims maps over the sessionsToClaimObs observable. For each claim batch, it:
Expand Down Expand Up @@ -42,6 +42,10 @@ func (rs *relayerSessionsManager) createClaims(
)

// TODO_TECHDEBT: pass failed create claim sessions to some retry mechanism.
// TODO_IMPROVE: It may be useful for the retry mechanism which consumes the
// observable which corresponds to failSubmitProofsSessionsCh to have a
// reference to the error which caused the proof submission to fail.
// In this case, the error may not be persistent.
_ = failedCreateClaimSessionsObs
logging.LogErrors(ctx, filter.EitherError(ctx, eitherClaimedSessionsObs))

Expand Down Expand Up @@ -82,11 +86,12 @@ func (rs *relayerSessionsManager) waitForEarliestCreateClaimsHeight(
// first one from the group to calculate the earliest height for claim creation.
sessionEndHeight := sessionTrees[0].GetSessionHeader().GetSessionEndBlockHeight()

// TODO_TECHDEBT(@red-0ne): Centralize the business logic that involves taking
// TODO_TECHDEBT(#516): Centralize the business logic that involves taking
// into account the heights, windows and grace periods into helper functions.
// An additional block is added to permit to relays arriving at the last block
// of the session to be included in the claim before the smt is closed.
createClaimsWindowStartHeight := sessionEndHeight + sessionkeeper.GetSessionGracePeriodBlockCount() + 1
sessionGracePeriodEndHeight := shared.GetSessionGracePeriodEndHeight(sessionEndHeight)
createClaimsWindowStartHeight := sessionGracePeriodEndHeight + 1

// TODO_BLOCKER: query the on-chain governance parameter once available.
// + claimproofparams.GovCreateClaimWindowStartHeightOffset
Expand Down
16 changes: 10 additions & 6 deletions pkg/relayer/session/proof.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/pokt-network/poktroll/pkg/relayer"
"github.com/pokt-network/poktroll/pkg/relayer/protocol"
proofkeeper "github.com/pokt-network/poktroll/x/proof/keeper"
sessionkeeper "github.com/pokt-network/poktroll/x/session/keeper"
"github.com/pokt-network/poktroll/x/shared"
)

// submitProofs maps over the given claimedSessions observable.
Expand Down Expand Up @@ -79,12 +79,16 @@ func (rs *relayerSessionsManager) waitForEarliestSubmitProofsHeightAndGeneratePr
// first one from the group to calculate the earliest height for proof submission.
sessionEndHeight := sessionTrees[0].GetSessionHeader().GetSessionEndBlockHeight()

sessionGracePeriodEndHeight := sessionkeeper.GetSessionGracePeriodBlockCount() + sessionEndHeight

// TODO_TECHDEBT(#516): Centralize the business logic that involves taking
// into account the heights, windows and grace periods into helper functions.
// TODO_BLOCKER(#516): The proof submission window SHOULD NOT overlap with the claim window.
submitProofsWindowStartHeight := sessionGracePeriodEndHeight + 1
// TODO_BLOCKER(#516): The proof submission window SHOULD NOT overlap with the
// claim window. The proof submission window start SHOULD be relative to the
// claim window end.
sessionGracePeriodEndHeight := shared.GetSessionGracePeriodEndHeight(sessionEndHeight)
// An additional block is added to permit to relays arriving at the last block
// of the session to be included in the claim before the smt is closed.
createClaimsWindowStartHeight := sessionGracePeriodEndHeight + 1
submitProofsWindowStartHeight := createClaimsWindowStartHeight
// TODO_BLOCKER(#516): query the on-chain governance parameter once available.
// + claimproofparams.GovSubmitProofWindowStartHeightOffset

Expand All @@ -95,7 +99,7 @@ func (rs *relayerSessionsManager) waitForEarliestSubmitProofsHeightAndGeneratePr

// TODO_BLOCKER(@bryanchriswhite): The block that'll be used as a source of entropy for
// which branch(es) to prove should be deterministic and use on-chain governance params.
// submitProofWindowStartBlock is the block that will have its hash used as the
// sessionPathBlock is the block that will have its hash used as the
// source of entropy for all the session trees in that batch, waiting for it to
// be received before proceeding.
sessionPathBlock := rs.waitForBlock(ctx, sessionGracePeriodEndHeight)
Expand Down
Loading

0 comments on commit 7507dc5

Please sign in to comment.