Skip to content

Commit

Permalink
Merge branch 'issues/516/feat/claim-window-close-offset-blocks' into …
Browse files Browse the repository at this point in the history
…issues/516/refactor/relayminer

* issues/516/feat/claim-window-close-offset-blocks:
  test: ValidateClaimWindowCloseOffsetBlocks
  test: ValidateClaimWindowOpenOffsetBlocks
  [On-chain] chore: add single param updates to `shared` module (#560)
  Empty commit
  [Code Health] chore: resolve & cleanup outstanding `TODO_TECHDEBT(#517)` comments (#559)
  [Session Module] refactor: session module fetches on-chain params (#557)
  [Ring Client] refactor: ring client to use on-chain params (#556)
  [LoadTest] Switch funding account to key name instead of address (#562)
  [Application] Refactor app module to fetch on-chain params (#555)
  [Relayminer] Fix: relayminer on time sessions (#550)
  Empty commit
  • Loading branch information
bryanchriswhite committed May 31, 2024
2 parents 4950bae + f906bdb commit f002ee2
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 89 deletions.
19 changes: 14 additions & 5 deletions load-testing/config/load_test_manifest_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ type ProvisionedActorConfig struct {
type LoadTestManifestYAML struct {
// IsEphemeralChain is a flag that indicates whether the test is expected to be
// run on LocalNet or long-living remote chain (i.e. TestNet/DevNet).
IsEphemeralChain bool `yaml:"is_ephemeral_chain"`
TestNetNode string `yaml:"testnet_node"`
ServiceId string `yaml:"service_id"`
Suppliers []ProvisionedActorConfig `yaml:"suppliers"`
Gateways []ProvisionedActorConfig `yaml:"gateways"`
IsEphemeralChain bool `yaml:"is_ephemeral_chain"`
TestNetNode string `yaml:"testnet_node"`
ServiceId string `yaml:"service_id"`
Suppliers []ProvisionedActorConfig `yaml:"suppliers"`
Gateways []ProvisionedActorConfig `yaml:"gateways"`
FundingAccountAddress string `yaml:"funding_account_address"`
}

// ParseLoadTestManifest reads the load test manifest from the given byte slice
Expand Down Expand Up @@ -62,6 +63,10 @@ func validatedEphemeralChainManifest(manifest *LoadTestManifestYAML) (*LoadTestM
return nil, ErrEphemeralChainLoadTestInvalidManifest.Wrap("empty service id")
}

if len(manifest.FundingAccountAddress) == 0 {
return nil, ErrEphemeralChainLoadTestInvalidManifest.Wrap("empty funding account address")
}

for _, gateway := range manifest.Gateways {
if len(gateway.Address) == 0 {
return nil, ErrEphemeralChainLoadTestInvalidManifest.Wrap("empty gateway address")
Expand Down Expand Up @@ -110,6 +115,10 @@ func validatedNonEphemeralChainManifest(manifest *LoadTestManifestYAML) (*LoadTe
return nil, ErrNonEphemeralChainLoadTestInvalidManifest.Wrap("empty service id")
}

if len(manifest.FundingAccountAddress) == 0 {
return nil, ErrNonEphemeralChainLoadTestInvalidManifest.Wrap("empty funding account address")
}

for _, gateway := range manifest.Gateways {
if len(gateway.Address) == 0 {
return nil, ErrNonEphemeralChainLoadTestInvalidManifest.Wrap("empty gateway address")
Expand Down
3 changes: 3 additions & 0 deletions load-testing/loadtest_manifest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ is_ephemeral_chain: false
testnet_node: https://testnet-validated-validator-rpc.poktroll.com
# The service ID to request relays from.
service_id: "0021"
# The address of the account that will be used to fund the the application accounts
# so that they can stake on the network.
funding_account_address: pokt14eh973xt99s7edugnyvd5d5u50d6j0hysw2vsm # address for pnf account
# In non-ephemeral chains, the gateways are identified by their address.
gateways:
# address is the bech32 pokt address of the gateway.
Expand Down
3 changes: 3 additions & 0 deletions load-testing/localnet_loadtest_manifest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
is_ephemeral_chain: true
# The service ID to use for the load test.
service_id: anvil
# The address of the account that will be used to fund the the application,
# gateway and supplier accounts so that they can stake on the network.
funding_account_address: pokt1eeeksh2tvkh7wzmfrljnhw4wrhs55lcuvmekkw # address for pnf account

# List of pre-provisioned suppliers used for load testing.
# Thse suppliers will be progressively staked during the load test, according
Expand Down
8 changes: 4 additions & 4 deletions load-testing/tests/relays_stress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ var (
// maxConcurrentRequestLimit is the maximum number of concurrent requests that can be made.
// By default, it is set to the number of logical CPUs available to the process.
maxConcurrentRequestLimit = runtime.GOMAXPROCS(0)
// fundingAccountAddress is the key name of the account used to fund other accounts.
fundingAccountAddress = "pokt1eeeksh2tvkh7wzmfrljnhw4wrhs55lcuvmekkw" // address for pnf account
// fundingAccountAddress is the address of the account used to fund other accounts.
fundingAccountAddress string
// supplierStakeAmount is the amount of tokens to stake by suppliers.
supplierStakeAmount sdk.Coin
// gatewayStakeAmount is the amount of tokens to stake by gateways.
Expand Down Expand Up @@ -195,7 +195,7 @@ type relaysSuite struct {
// It is used to ensure that the suppliers are staked in the order they are provisioned.
availableSupplierAddresses []string

// fundingAccountInfo is the account entry corresponding to the fundingAccountAddress.
// fundingAccountInfo is the account entry corresponding to the fundingAccountKeyName.
// It is used to send transactions to fund other accounts.
fundingAccountInfo *accountInfo
// preparedGateways is the list of gateways that are already staked, delegated
Expand Down Expand Up @@ -353,7 +353,7 @@ func (s *relaysSuite) LocalnetIsRunning() {
s.setupTxEventListeners()

// Initialize the funding account.
s.initFundingAccount(fundingAccountAddress)
s.initFundingAccount(loadTestParams.FundingAccountAddress)

// Initialize the on-chain claims and proofs counter.
s.countClaimAndProofs()
Expand Down
177 changes: 97 additions & 80 deletions pkg/relayer/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,12 @@ func NewRelayerSessions(
return nil, err
}

rs.sessionsToClaimObs = channel.Map(
sessionsToClaimObs, sessionsToClaimPublishCh := channel.NewObservable[[]relayer.SessionTree]()
rs.sessionsToClaimObs = sessionsToClaimObs
channel.ForEach(
ctx,
rs.blockClient.CommittedBlocksSequence(ctx),
rs.mapBlockToSessionsToClaim,
rs.forEachBlockClaimSessionsFn(sessionsToClaimPublishCh),
)

return rs, nil
Expand Down Expand Up @@ -163,94 +165,109 @@ func (rs *relayerSessionsManager) ensureSessionTree(sessionHeader *sessiontypes.
return sessionTree, nil
}

// mapBlockToSessionsToClaim is a MapFn which maps committed blocks to a list of
// sessions which can be claimed as of that block.
// forEachBlockClaimSessionsFn returns a new ForEachFn that sends a lists of sessions which
// are eligible to be claimed at each block height on sessionsToClaimsPublishCh, effectively
// mapping committed blocks to a list of sessions which can be claimed as of that block.
//
// forEachBlockClaimSessionsFn returns a new ForEachFn that is called once for each block height.
// Given the current sessions in the rs.sessionsTrees map at the time of each call, it:
// - fetches the current shared module params
// - builds a list of "on-time" & "late" sessions that are eligible to be claimed as of a given block height
// - sends "late" & "on-time" sessions on sessionsToClaimsPublishCh as distinct notifications
//
// If "late" sessions are found, they are emitted as quickly as possible and are expected
// to bypass downstream delay operations. "late" sessions are emitted, as they're discovered
// (by iterating over map keys).
//
// Under nominal conditions, only one set of "on-time" sessions (w/ the same session start/end heights)
// should be present in the rs.sessionsTrees map. "Late" sessions
// are expected to present in the presence of network interruptions, restarts, or other
// disruptions to the relayminer process.
// TODO_IMPROVE: Add the ability for the process to resume where it left off in
// case the process is restarted or the connection is dropped and reconnected.
func (rs *relayerSessionsManager) mapBlockToSessionsToClaim(
ctx context.Context,
block client.Block,
) (sessionTrees []relayer.SessionTree, skip bool) {
rs.sessionsTreesMu.Lock()
defer rs.sessionsTreesMu.Unlock()

// onTimeSessions are the sessions that are still within their grace period.
// They are on time and will wait for their create claim window to open.
// They will be emitted last, after all the late sessions have been emitted.
var onTimeSessions []relayer.SessionTree

// TODO_TECHDEBT(#543): We don't really want to have to query the params for every method call.
// Once `ModuleParamsClient` is implemented, use its replay observable's `#Last()` method
// to get the most recently (asynchronously) observed (and cached) value.
sharedParams, err := rs.sharedQueryClient.GetParams(ctx)
if err != nil {
rs.logger.Error().Err(err).Msg("unable to query shared module params")
return nil, true
}
func (rs *relayerSessionsManager) forEachBlockClaimSessionsFn(
sessionsToClaimsPublishCh chan<- []relayer.SessionTree,
) channel.ForEachFn[client.Block] {
return func(ctx context.Context, block client.Block) {
rs.sessionsTreesMu.Lock()
defer rs.sessionsTreesMu.Unlock()

// onTimeSessions are the sessions that are still within their grace period.
// They are on time and will wait for their create claim window to open.
// They will be emitted last, after all the late sessions have been emitted.
var onTimeSessions []relayer.SessionTree

// TODO_TECHDEBT(#543): We don't really want to have to query the params for every method call.
// Once `ModuleParamsClient` is implemented, use its replay observable's `#Last()` method
// to get the most recently (asynchronously) observed (and cached) value.
sharedParams, err := rs.sharedQueryClient.GetParams(ctx)
if err != nil {
rs.logger.Error().Err(err).Msg("unable to query shared module params")
return
}

numBlocksPerSession := sharedParams.NumBlocksPerSession

// Check if there are sessions that need to enter the claim/proof phase as their
// end block height was the one before the last committed block or earlier.
// Iterate over the sessionsTrees map to get the ones that end at a block height
// lower than the current block height.
for sessionEndHeight, sessionsTreesEndingAtBlockHeight := range rs.sessionsTrees {
// Late sessions are the ones that have their session grace period elapsed
// and should already have been claimed.
// Group them by their end block height and emit each group separately
// before emitting the on-time sessions.
var lateSessions []relayer.SessionTree

sessionGracePeriodEndHeight := shared.GetSessionGracePeriodEndHeight(sessionEndHeight)

// Checking for sessions to claim with <= operator,
// which means that it would include sessions that were supposed to be
// claimed in previous block heights too.
// These late sessions might have their create claim window closed and are
// no longer eligible to be claimed, but that's not always the case.
// Once claim window closing is implemented, they will be filtered out
// downstream at the waitForEarliestCreateClaimsHeight step.
// TODO_BLOCKER: Introduce governance claim and proof window durations,
// implement off-chain window closing and on-chain window checks.
if sessionGracePeriodEndHeight <= block.Height() {
// Iterate over the sessionsTrees that have grace period ending at this
// block height and add them to the list of sessionTrees to be published.
for _, sessionTree := range sessionsTreesEndingAtBlockHeight {
// Mark the session as claimed and add it to the list of sessionTrees to be published.
// If the session has already been claimed, it will be skipped.
// Appending the sessionTree to the list of sessionTrees is protected
// against concurrent access by the sessionsTreesMu such that the first
// call that marks the session as claimed will be the only one to add the
// sessionTree to the list.
if err := sessionTree.StartClaiming(); err != nil {
continue
numBlocksPerSession := sharedParams.NumBlocksPerSession

// Check if there are sessions that need to enter the claim/proof phase as their
// end block height was the one before the last committed block or earlier.
// Iterate over the sessionsTrees map to get the ones that end at a block height
// lower than the current block height.
for sessionEndHeight, sessionsTreesEndingAtBlockHeight := range rs.sessionsTrees {
// Late sessions are the ones that have their session grace period elapsed
// and should already have been claimed.
// Group them by their end block height and emit each group separately
// before emitting the on-time sessions.
var lateSessions []relayer.SessionTree

sessionGracePeriodEndHeight := shared.GetSessionGracePeriodEndHeight(sessionEndHeight)

// Checking for sessions to claim with <= operator,
// which means that it would include sessions that were supposed to be
// claimed in previous block heights too.
// These late sessions might have their create claim window closed and are
// no longer eligible to be claimed, but that's not always the case.
// Once claim window closing is implemented, they will be filtered out
// downstream at the waitForEarliestCreateClaimsHeight step.
// TODO_BLOCKER: Introduce governance claim and proof window durations,
// implement off-chain window closing and on-chain window checks.
if sessionGracePeriodEndHeight <= block.Height() {
// Iterate over the sessionsTrees that have grace period ending at this
// block height and add them to the list of sessionTrees to be published.
for _, sessionTree := range sessionsTreesEndingAtBlockHeight {
// Mark the session as claimed and add it to the list of sessionTrees to be published.
// If the session has already been claimed, it will be skipped.
// Appending the sessionTree to the list of sessionTrees is protected
// against concurrent access by the sessionsTreesMu such that the first
// call that marks the session as claimed will be the only one to add the
// sessionTree to the list.
if err := sessionTree.StartClaiming(); err != nil {
continue
}

// Separate the sessions that are on-time from the ones that are late.
// If the session is past its grace period, it is considered late,
// otherwise it is on time and will be emitted last.
if sessionGracePeriodEndHeight+int64(numBlocksPerSession) < block.Height() {
lateSessions = append(lateSessions, sessionTree)
} else {
onTimeSessions = append(onTimeSessions, sessionTree)
}
}

// Separate the sessions that are on-time from the ones that are late.
// If the session is past its grace period, it is considered late,
// otherwise it is on time and will be emitted last.
if sessionGracePeriodEndHeight+int64(numBlocksPerSession) < block.Height() {
lateSessions = append(lateSessions, sessionTree)
} else {
onTimeSessions = append(onTimeSessions, sessionTree)
// If there are any late sessions to be claimed, emit them first.
// The wait for claim submission window pipeline step will return immediately
// without blocking them.
if len(lateSessions) > 0 {
sessionsToClaimsPublishCh <- lateSessions
}
}

// If there are any late sessions to be claimed, emit them first.
// The wait for claim submission window pipeline step will return immediately
// without blocking them.
if len(lateSessions) > 0 {
return lateSessions, false
}
}
}

// Emit the on-time sessions last, after all the late sessions have been emitted.
if len(onTimeSessions) > 0 {
return onTimeSessions, false
// Emit the on-time sessions last, after all the late sessions have been emitted.
if len(onTimeSessions) > 0 {
sessionsToClaimsPublishCh <- onTimeSessions
}
}

return nil, true
}

// removeFromRelayerSessions removes the SessionTree from the relayerSessions.
Expand Down
60 changes: 60 additions & 0 deletions x/shared/types/params_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,63 @@ func TestParams_ValidateNumBlocksPerSession(t *testing.T) {
})
}
}

func TestParams_ValidateClaimWindowOpenOffsetBlocks(t *testing.T) {
tests := []struct {
desc string
claimWindowOpenOffsetBlocks any
err error
}{
{
desc: "invalid type",
claimWindowOpenOffsetBlocks: "invalid",
err: ErrSharedParamInvalid.Wrapf("invalid parameter type: %T", "invalid"),
},
{
desc: "valid ClaimWindowOpenOffsetBlocks",
claimWindowOpenOffsetBlocks: uint64(4),
},
}

for _, tt := range tests {
t.Run(tt.desc, func(t *testing.T) {
err := ValidateClaimWindowOpenOffsetBlocks(tt.claimWindowOpenOffsetBlocks)
if tt.err != nil {
require.Error(t, err)
require.Contains(t, err.Error(), tt.err.Error())
} else {
require.NoError(t, err)
}
})
}
}

func TestParams_ValidateClaimWindowCloseOffsetBlocks(t *testing.T) {
tests := []struct {
desc string
claimWindowCloseOffsetBlocks any
err error
}{
{
desc: "invalid type",
claimWindowCloseOffsetBlocks: "invalid",
err: ErrSharedParamInvalid.Wrapf("invalid parameter type: %T", "invalid"),
},
{
desc: "valid ClaimWindowCloseOffsetBlocks",
claimWindowCloseOffsetBlocks: uint64(4),
},
}

for _, tt := range tests {
t.Run(tt.desc, func(t *testing.T) {
err := ValidateClaimWindowCloseOffsetBlocks(tt.claimWindowCloseOffsetBlocks)
if tt.err != nil {
require.Error(t, err)
require.Contains(t, err.Error(), tt.err.Error())
} else {
require.NoError(t, err)
}
})
}
}

0 comments on commit f002ee2

Please sign in to comment.