Skip to content

Commit

Permalink
Merge branch 'main' into issues/584/claims/expiration-use-index
Browse files Browse the repository at this point in the history
  • Loading branch information
bryanchriswhite authored Jul 15, 2024
2 parents 60bd91b + cd183ed commit 84226fe
Show file tree
Hide file tree
Showing 25 changed files with 1,515 additions and 216 deletions.
572 changes: 572 additions & 0 deletions api/poktroll/gateway/event.pulsar.go

Large diffs are not rendered by default.

9 changes: 5 additions & 4 deletions load-testing/tests/relays_stress_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func (s *relaysSuite) mapSessionInfoForLoadTestDurationFn(
if waitingForFirstSession && blockHeight != sessionInfo.sessionStartBlockHeight {
countDownToTestStart := sessionInfo.sessionEndBlockHeight - blockHeight + 1
infoLogger.Msgf(
"waiting for next testsession to start: in %d blocks",
"waiting for next test session to start: in %d blocks",
countDownToTestStart,
)

Expand All @@ -206,20 +206,21 @@ func (s *relaysSuite) mapSessionInfoForLoadTestDurationFn(
s.testStartHeight = blockHeight
// Mark the test as started.
waitingForFirstSession = false
// Calculate the end block height of the test.
s.testEndHeight = s.testStartHeight + s.plans.totalDurationBlocks(s.sharedParams, blockHeight)

logger.Info().Msgf("Test starting at block height: %d", s.testStartHeight)
}

// If the test duration is reached, stop sending requests
sendRelaysEndHeight := s.testStartHeight + s.relayLoadDurationBlocks
if blockHeight >= sendRelaysEndHeight {
testEndHeight := s.testStartHeight + s.plans.totalDurationBlocks(s.sharedParams, blockHeight)

remainingRelayLoadBlocks := blockHeight - sendRelaysEndHeight
waitForSettlementBlocks := testEndHeight - sendRelaysEndHeight
waitForSettlementBlocks := s.testEndHeight - sendRelaysEndHeight
logger.Info().Msgf("Stop sending relays, waiting for last claims and proofs to be submitted; block until validation: %d/%d", remainingRelayLoadBlocks, waitForSettlementBlocks)
// Wait for one more session to let the last claims and proofs be submitted.
if blockHeight > testEndHeight {
if blockHeight > s.testEndHeight {
s.cancelCtx()
}
return nil, true
Expand Down
1 change: 1 addition & 0 deletions load-testing/tests/relays_stress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ type relaysSuite struct {
// testStartHeight is the block height at which the test started.
// It is used to calculate the progress of the test.
testStartHeight int64
testEndHeight int64

// relayLoadDurationBlocks is the duration in blocks it takes to send all relay requests.
// After this duration, the test suite will stop sending relay requests, but will continue
Expand Down
26 changes: 21 additions & 5 deletions pkg/client/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"

cometrpctypes "github.com/cometbft/cometbft/rpc/core/types"
coretypes "github.com/cometbft/cometbft/rpc/core/types"
comettypes "github.com/cometbft/cometbft/types"
cosmosclient "github.com/cosmos/cosmos-sdk/client"
cosmoskeyring "github.com/cosmos/cosmos-sdk/crypto/keyring"
Expand All @@ -29,12 +28,29 @@ import (

"github.com/pokt-network/poktroll/pkg/either"
"github.com/pokt-network/poktroll/pkg/observable"
"github.com/pokt-network/poktroll/pkg/relayer"
apptypes "github.com/pokt-network/poktroll/x/application/types"
sessiontypes "github.com/pokt-network/poktroll/x/session/types"
sharedtypes "github.com/pokt-network/poktroll/x/shared/types"
)

// MsgCreateClaim is an interface satisfying proof.MsgCreateClaim concrete type
// used by the SupplierClient interface to avoid cyclic dependencies.
type MsgCreateClaim interface {
cosmostypes.Msg
GetRootHash() []byte
GetSessionHeader() *sessiontypes.SessionHeader
GetSupplierAddress() string
}

// MsgSubmitProof is an interface satisfying proof.MsgSubmitProof concrete type
// used by the SupplierClient interface to avoid cyclic dependencies.
type MsgSubmitProof interface {
cosmostypes.Msg
GetProof() []byte
GetSessionHeader() *sessiontypes.SessionHeader
GetSupplierAddress() string
}

// SupplierClient is an interface for sufficient for a supplier operator to be
// able to construct blockchain transactions from pocket protocol-specific messages
// related to its role.
Expand All @@ -44,7 +60,7 @@ type SupplierClient interface {
// session's mined relays.
CreateClaims(
ctx context.Context,
sessionClaims []*relayer.SessionClaim,
claimMsgs ...MsgCreateClaim,
) error
// SubmitProof sends proof messages which contain the smt.SparseMerkleClosestProof,
// corresponding to some previously created claim for the same session.
Expand All @@ -53,7 +69,7 @@ type SupplierClient interface {
// the amount of data stored on-chain.
SubmitProofs(
ctx context.Context,
sessionProofs []*relayer.SessionProof,
sessionProofs ...MsgSubmitProof,
) error
// Address returns the address of the SupplierClient that will be submitting proofs & claims.
Address() *cosmostypes.AccAddress
Expand Down Expand Up @@ -312,7 +328,7 @@ type SharedQueryClient interface {
// on-chain block information for a given height. If height is nil, the
// latest block is returned.
type BlockQueryClient interface {
Block(ctx context.Context, height *int64) (*coretypes.ResultBlock, error)
Block(ctx context.Context, height *int64) (*cometrpctypes.ResultBlock, error)
}

// ProofParams is a go interface type which corresponds to the poktroll.proof.Params
Expand Down
62 changes: 35 additions & 27 deletions pkg/client/supplier/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ package supplier

import (
"context"
"sync"

"cosmossdk.io/depinject"
cosmostypes "github.com/cosmos/cosmos-sdk/types"

"github.com/pokt-network/poktroll/pkg/client"
"github.com/pokt-network/poktroll/pkg/client/keyring"
"github.com/pokt-network/poktroll/pkg/polylog"
"github.com/pokt-network/poktroll/pkg/relayer"
prooftypes "github.com/pokt-network/poktroll/x/proof/types"
)

Expand All @@ -20,6 +20,9 @@ type supplierClient struct {
signingKeyName string
signingKeyAddr cosmostypes.AccAddress

// pendingTxMu is used to prevent concurrent txs with the same sequence number.
pendingTxMu sync.Mutex

txClient client.TxClient
txCtx client.TxContext
}
Expand Down Expand Up @@ -63,34 +66,35 @@ func NewSupplierClient(
// the transaction is included in a block or times out.
func (sClient *supplierClient) SubmitProofs(
ctx context.Context,
sessionProofs []*relayer.SessionProof,
proofMsgs ...client.MsgSubmitProof,
) error {
sClient.pendingTxMu.Lock()
defer sClient.pendingTxMu.Unlock()
logger := polylog.Ctx(ctx)

msgs := make([]cosmostypes.Msg, len(sessionProofs))

for i, sessionProof := range sessionProofs {
// TODO(@bryanchriswhite): reconcile splitting of supplier & proof modules
// with off-chain pkgs/nomenclature.
msgs[i] = &prooftypes.MsgSubmitProof{
SupplierAddress: sessionProof.SupplierAddress.String(),
SessionHeader: sessionProof.SessionHeader,
Proof: sessionProof.ProofBz,
}
msgs := make([]cosmostypes.Msg, 0, len(proofMsgs))
for _, p := range proofMsgs {
msgs = append(msgs, p)
}

// TODO(@bryanchriswhite): reconcile splitting of supplier & proof modules
// with off-chain pkgs/nomenclature.
eitherErr := sClient.txClient.SignAndBroadcast(ctx, msgs...)
err, errCh := eitherErr.SyncOrAsyncError()
if err != nil {
return err
}

for _, sessionProof := range sessionProofs {
sessionHeader := sessionProof.SessionHeader
for _, p := range proofMsgs {
// Type casting does not need to be checked here since the concrete type is
// guaranteed to implement the interface which is just an identity for the
// concrete type.
proofMsg, _ := p.(*prooftypes.MsgSubmitProof)
sessionHeader := proofMsg.SessionHeader
// TODO_IMPROVE: log details related to what & how much is being proven
logger.Info().
Fields(map[string]any{
"supplier_addr": sessionProof.SupplierAddress.String(),
"supplier_addr": proofMsg.SupplierAddress,
"app_addr": sessionHeader.ApplicationAddress,
"session_id": sessionHeader.SessionId,
"service": sessionHeader.Service.Id,
Expand All @@ -106,33 +110,37 @@ func (sClient *supplierClient) SubmitProofs(
// the transaction is included in a block or times out.
func (sClient *supplierClient) CreateClaims(
ctx context.Context,
sessionClaims []*relayer.SessionClaim,
claimMsgs ...client.MsgCreateClaim,
) error {
// Prevent concurrent txs with the same sequence number.
sClient.pendingTxMu.Lock()
defer sClient.pendingTxMu.Unlock()

logger := polylog.Ctx(ctx)

msgs := make([]cosmostypes.Msg, len(sessionClaims))
msgs := make([]cosmostypes.Msg, 0, len(claimMsgs))
for _, c := range claimMsgs {
msgs = append(msgs, c)
}

// TODO(@bryanchriswhite): reconcile splitting of supplier & proof modules
// with off-chain pkgs/nomenclature.
for i, sessionClaim := range sessionClaims {
msgs[i] = &prooftypes.MsgCreateClaim{
SupplierAddress: sessionClaim.SupplierAddress.String(),
SessionHeader: sessionClaim.SessionHeader,
RootHash: sessionClaim.RootHash,
}
}
eitherErr := sClient.txClient.SignAndBroadcast(ctx, msgs...)
err, errCh := eitherErr.SyncOrAsyncError()
if err != nil {
return err
}

for _, claim := range sessionClaims {
sessionHeader := claim.SessionHeader
for _, c := range claimMsgs {
// Type casting does not need to be checked here since the concrete type is
// guaranteed to implement the interface which is just an identity for the
// concrete type.
claimMsg, _ := c.(*prooftypes.MsgCreateClaim)
sessionHeader := claimMsg.SessionHeader
// TODO_IMPROVE: log details related to how much is claimed
logger.Info().
Fields(map[string]any{
"supplier_addr": claim.SupplierAddress.String(),
"supplier_addr": claimMsg.SupplierAddress,
"app_addr": sessionHeader.ApplicationAddress,
"session_id": sessionHeader.SessionId,
"service": sessionHeader.Service.Id,
Expand Down
22 changes: 9 additions & 13 deletions pkg/client/supplier/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ import (

"github.com/pokt-network/poktroll/pkg/client/keyring"
"github.com/pokt-network/poktroll/pkg/client/supplier"
"github.com/pokt-network/poktroll/pkg/relayer"
"github.com/pokt-network/poktroll/testutil/mockclient"
"github.com/pokt-network/poktroll/testutil/testclient/testkeyring"
"github.com/pokt-network/poktroll/testutil/testclient/testtx"
prooftypes "github.com/pokt-network/poktroll/x/proof/types"
sessiontypes "github.com/pokt-network/poktroll/x/session/types"
sharedtypes "github.com/pokt-network/poktroll/x/shared/types"
)
Expand Down Expand Up @@ -112,15 +112,13 @@ func TestSupplierClient_CreateClaim(t *testing.T) {
},
}

sessionClaims := []*relayer.SessionClaim{
{
RootHash: rootHash,
SessionHeader: &sessionHeader,
},
msgClaim := &prooftypes.MsgCreateClaim{
RootHash: rootHash,
SessionHeader: &sessionHeader,
}

go func() {
err = supplierClient.CreateClaims(ctx, sessionClaims)
err = supplierClient.CreateClaims(ctx, msgClaim)
require.NoError(t, err)
close(doneCh)
}()
Expand Down Expand Up @@ -191,15 +189,13 @@ func TestSupplierClient_SubmitProof(t *testing.T) {
proofBz, err := proof.Marshal()
require.NoError(t, err)

sessionProofs := []*relayer.SessionProof{
{
ProofBz: proofBz,
SessionHeader: &sessionHeader,
},
msgProof := &prooftypes.MsgSubmitProof{
Proof: proofBz,
SessionHeader: &sessionHeader,
}

go func() {
err = supplierClient.SubmitProofs(ctx, sessionProofs)
err = supplierClient.SubmitProofs(ctx, msgProof)
require.NoError(t, err)
close(doneCh)
}()
Expand Down
18 changes: 7 additions & 11 deletions pkg/relayer/proxy/server_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,28 +95,25 @@ func (rp *relayerProxy) BuildProvidedServices(ctx context.Context) error {
}

rp.AddressToSigningKeyNameMap[supplier.Address] = signingKeyName
}

if rp.servers, err = rp.initializeProxyServers(supplier.Services); err != nil {
return err
}
var err error
if rp.servers, err = rp.initializeProxyServers(); err != nil {
return err
}

return nil
}

// initializeProxyServers initializes the proxy servers for each server config.
func (rp *relayerProxy) initializeProxyServers(
supplierServices []*sharedtypes.SupplierServiceConfig,
) (proxyServerMap map[string]relayer.RelayServer, err error) {
func (rp *relayerProxy) initializeProxyServers() (proxyServerMap map[string]relayer.RelayServer, err error) {
// Build a map of serviceId -> service for the supplier's advertised services
supplierServiceMap := make(map[string]*sharedtypes.Service)
for _, service := range supplierServices {
supplierServiceMap[service.Service.Id] = service.Service
}

// Build a map of listenAddress -> RelayServer for each server defined in the config file
servers := make(map[string]relayer.RelayServer)

// serverConfigs is a map with ListenAddress as the key which guarantees that
// there are no duplicate servers with the same ListenAddress.
for _, serverConfig := range rp.serverConfigs {
rp.logger.Info().Str("server host", serverConfig.ListenAddress).Msg("starting relay proxy server")

Expand All @@ -129,7 +126,6 @@ func (rp *relayerProxy) initializeProxyServers(
servers[serverConfig.ListenAddress] = NewSynchronousServer(
rp.logger,
serverConfig,
supplierServiceMap,
rp.servedRelaysPublishCh,
rp,
)
Expand Down
6 changes: 0 additions & 6 deletions pkg/relayer/proxy/synchronous.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@ func init() {
type synchronousRPCServer struct {
logger polylog.Logger

// supplierServiceMap is a map of serviceId -> SupplierServiceConfig
// representing the supplier's advertised services.
supplierServiceMap map[string]*sharedtypes.Service

// serverConfig is the configuration of the proxy server. It contains the
// host address of the server, the service endpoint, and the advertised service.
// endpoints it gets relay requests from.
Expand All @@ -64,13 +60,11 @@ type synchronousRPCServer struct {
func NewSynchronousServer(
logger polylog.Logger,
serverConfig *config.RelayMinerServerConfig,
supplierServiceMap map[string]*sharedtypes.Service,
servedRelaysProducer chan<- *types.Relay,
proxy relayer.RelayerProxy,
) relayer.RelayServer {
return &synchronousRPCServer{
logger: logger,
supplierServiceMap: supplierServiceMap,
server: &http.Server{Addr: serverConfig.ListenAddress},
relayerProxy: proxy,
servedRelaysProducer: servedRelaysProducer,
Expand Down
Loading

0 comments on commit 84226fe

Please sign in to comment.