Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Off-Chain, Scalability] Query client caching & history #985

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ go 1.23.0
// github.com/pokt-network/smt/kvstore/pebble => ../smt/kvstore/pebble
// )

replace nhooyr.io/websocket => github.com/coder/websocket v1.8.6
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert


// replace broken goleveldb
replace github.com/syndtr/goleveldb => github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7

Expand Down
5 changes: 2 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,8 @@ github.com/cockroachdb/redact v1.1.5/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZ
github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 h1:zuQyyAKVxetITBuuhv3BI9cMrmStnpT18zmgmTxunpo=
github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06/go.mod h1:7nc4anLGjupUW/PeY5qiNYsdNXj7zopG+eqsS7To5IQ=
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE/e/2PUdi/liOCUjSTXgM1o87ZssimdTWN964YiIeI=
github.com/coder/websocket v1.8.6 h1:OmNKdwUvLj7VvHnl5o8skaVghSPLjWdHGCnFbkWqs9w=
github.com/coder/websocket v1.8.6/go.mod h1:B70DZP8IakI65RVQ51MsWP/8jndNma26DVA/nFSCgW0=
github.com/cometbft/cometbft v0.38.10 h1:2ePuglchT+j0Iao+cfmt/nw5U7K2lnGDzXSUPGVdXaU=
github.com/cometbft/cometbft v0.38.10/go.mod h1:jHPx9vQpWzPHEAiYI/7EDKaB1NXhK6o3SArrrY8ExKc=
github.com/cometbft/cometbft-db v0.9.1 h1:MIhVX5ja5bXNHF8EYrThkG9F7r9kSfv8BX4LWaxWJ4M=
Expand Down Expand Up @@ -1861,9 +1863,6 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
nhooyr.io/websocket v1.8.6/go.mod h1:B70DZP8IakI65RVQ51MsWP/8jndNma26DVA/nFSCgW0=
nhooyr.io/websocket v1.8.7 h1:usjR2uOr/zjjkVMy0lW+PPohFok7PCow5sDjLgX4P4g=
nhooyr.io/websocket v1.8.7/go.mod h1:B70DZP8IakI65RVQ51MsWP/8jndNma26DVA/nFSCgW0=
pgregory.net/rapid v1.1.0 h1:CMa0sjHSru3puNx+J0MIAuiiEV4N0qj8/cMWGBBCsjw=
pgregory.net/rapid v1.1.0/go.mod h1:PY5XlDGj0+V1FCq0o192FdRhpKHGTRIWBgqjDBTrq04=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
Expand Down
7 changes: 7 additions & 0 deletions pkg/client/block/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,13 @@ func (b *blockReplayClient) queryLatestBlock(
errCh := make(chan error)

go func() {
// TODO_IN_THIS_COMMIT: extract labels (and values?) to constants.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[linter-name (fail-on-found)] reported by reviewdog 🐶
// TODO_IN_THIS_COMMIT: extract labels (and values?) to constants.

defer client.AllQueriesTotalCounter.With(
"method", "block",
"client_type", "block",
"msg_type", "",
).Add(1)

queryBlockResult, err := b.onStartQueryClient.Block(ctx, nil)
if err != nil {
errCh <- err
Expand Down
54 changes: 47 additions & 7 deletions pkg/client/interface.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//go:generate mockgen -destination=../../testutil/mockclient/grpc_conn_mock.go -package=mockclient github.com/cosmos/gogoproto/grpc ClientConn
//go:generate mockgen -destination=../../testutil/mockclient/events_query_client_mock.go -package=mockclient . Dialer,Connection,EventsQueryClient
//go:generate mockgen -destination=../../testutil/mockclient/block_client_mock.go -package=mockclient . Block,BlockClient
//go:generate mockgen -destination=../../testutil/mockclient/delegation_client_mock.go -package=mockclient . DelegationClient
Expand Down Expand Up @@ -34,6 +35,7 @@ import (
servicetypes "github.com/pokt-network/poktroll/x/service/types"
sessiontypes "github.com/pokt-network/poktroll/x/session/types"
sharedtypes "github.com/pokt-network/poktroll/x/shared/types"
suppliertypes "github.com/pokt-network/poktroll/x/supplier/types"
)

// MsgCreateClaim is an interface satisfying proof.MsgCreateClaim concrete type
Expand Down Expand Up @@ -267,6 +269,8 @@ type AccountQueryClient interface {
// ApplicationQueryClient defines an interface that enables the querying of the
// on-chain application information
type ApplicationQueryClient interface {
ParamsQuerier[*apptypes.Params]

// GetApplication queries the chain for the details of the application provided
GetApplication(ctx context.Context, appAddress string) (apptypes.Application, error)

Expand All @@ -277,30 +281,31 @@ type ApplicationQueryClient interface {
// SupplierQueryClient defines an interface that enables the querying of the
// on-chain supplier information
type SupplierQueryClient interface {
ParamsQuerier[*suppliertypes.Params]

// GetSupplier queries the chain for the details of the supplier provided
GetSupplier(ctx context.Context, supplierOperatorAddress string) (sharedtypes.Supplier, error)
}

// SessionQueryClient defines an interface that enables the querying of the
// on-chain session information
type SessionQueryClient interface {
ParamsQuerier[*sessiontypes.Params]

// GetSession queries the chain for the details of the session provided
GetSession(
ctx context.Context,
appAddress string,
serviceId string,
blockHeight int64,
) (*sessiontypes.Session, error)

// GetParams queries the chain for the session module parameters.
GetParams(ctx context.Context) (*sessiontypes.Params, error)
}

// SharedQueryClient defines an interface that enables the querying of the
// on-chain shared module params.
type SharedQueryClient interface {
// GetParams queries the chain for the current shared module parameters.
GetParams(ctx context.Context) (*sharedtypes.Params, error)
ParamsQuerier[*sharedtypes.Params]

// GetSessionGracePeriodEndHeight returns the block height at which the grace period
// for the session that includes queryHeight elapses.
// The grace period is the number of blocks after the session ends during which relays
Expand Down Expand Up @@ -333,6 +338,8 @@ type BlockQueryClient interface {
// protobuf message. Since the generated go types don't include interface types, this
// is necessary to prevent dependency cycles.
type ProofParams interface {
cosmostypes.Msg

GetProofRequestProbability() float64
GetProofRequirementThreshold() *cosmostypes.Coin
GetProofMissingPenalty() *cosmostypes.Coin
Expand All @@ -342,13 +349,14 @@ type ProofParams interface {
// ProofQueryClient defines an interface that enables the querying of the
// on-chain proof module params.
type ProofQueryClient interface {
// GetParams queries the chain for the current shared module parameters.
GetParams(ctx context.Context) (ProofParams, error)
ParamsQuerier[ProofParams]
}

// ServiceQueryClient defines an interface that enables the querying of the
// on-chain service information
type ServiceQueryClient interface {
ParamsQuerier[*servicetypes.Params]

// GetService queries the chain for the details of the service provided
GetService(ctx context.Context, serviceId string) (sharedtypes.Service, error)
GetServiceRelayDifficulty(ctx context.Context, serviceId string) (servicetypes.RelayMiningDifficulty, error)
Expand All @@ -360,3 +368,35 @@ type BankQueryClient interface {
// GetBalance queries the chain for the uPOKT balance of the account provided
GetBalance(ctx context.Context, address string) (*cosmostypes.Coin, error)
}

// QueryCache handles a single type of cached data
type QueryCache[T any] interface {
Get(key string) (T, error)
Set(key string, value T) error
Delete(key string)
Clear()
}

// HistoricalQueryCache extends QueryCache to support historical values at different heights
type HistoricalQueryCache[T any] interface {
QueryCache[T]
// GetAtHeight retrieves the nearest value <= the specified height
GetAtHeight(key string, height int64) (T, error)
// SetAtHeight adds or updates a value at a specific height
SetAtHeight(key string, value T, height int64) error
}

// ParamsQuerier represents a generic querier for module parameters.
// This interface should be implemented by any module-specific querier
// that needs to access and cache on-chain parameters.
//
// DEV_NOTE: Can't use cosmostypes.Msg instead of any because M
// would be a pointer but Keeper#GetParams() returns a value. 🙄
type ParamsQuerier[P any] interface {
// GetParams queries the chain for the current module parameters, where
// P is the params type of a given module (e.g. sharedtypes.Params).
GetParams(ctx context.Context) (P, error)
// GetParamsAtHeight returns the parameters as they were at the specified
// height, where M is the params type of a given module (e.g. sharedtypes.Params).
GetParamsAtHeight(ctx context.Context, height int64) (P, error)
}
38 changes: 38 additions & 0 deletions pkg/client/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package client

import (
"github.com/go-kit/kit/metrics/prometheus"
stdprometheus "github.com/prometheus/client_golang/prometheus"
)

const (
clientSubsystem = "client"

allQueriesTotal = "all_queries_total"
paramsQueriesTotal = "params_queries_total"
)

var (
// TODO_IN_THIS_COMMIT: godoc...
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[linter-name (fail-on-found)] reported by reviewdog 🐶
// TODO_IN_THIS_COMMIT: godoc...

AllQueriesTotalCounter = prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Subsystem: clientSubsystem,
Name: allQueriesTotal,
Help: "Total number of all query messages, of any type, sent by the client.",
// TODO_IN_THIS_COMMIT: extract labels to constants.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[linter-name (fail-on-found)] reported by reviewdog 🐶
// TODO_IN_THIS_COMMIT: extract labels to constants.

//}, []string{"client_type", "method", "msg_type", "claim_proof_lifecycle_stage"})
}, []string{"client_type", "method", "msg_type"})

// TODO_IN_THIS_COMMIT: godoc...
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[linter-name (fail-on-found)] reported by reviewdog 🐶
// TODO_IN_THIS_COMMIT: godoc...

ParamsQueriesTotalCounter = prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Subsystem: clientSubsystem,
Name: paramsQueriesTotal,
Help: "Total number of QueryParamsRequest messages sent by the client.",
}, []string{"client_type", "method", "claim_proof_lifecycle_stage"})

// TODO_IN_THIS_COMMIT: godoc & use...
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[linter-name (fail-on-found)] reported by reviewdog 🐶
// TODO_IN_THIS_COMMIT: godoc & use...

AllWebsocketEventsTotalCounter = prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Subsystem: clientSubsystem,
Name: "all_websocket_events_total",
Help: "Total number of websocket events received by the client.",
}, []string{"client_type", "event_type"})
)
21 changes: 14 additions & 7 deletions pkg/client/query/accquerier.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (

"cosmossdk.io/depinject"
cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types"
"github.com/cosmos/cosmos-sdk/types"
cosmostypes "github.com/cosmos/cosmos-sdk/types"
accounttypes "github.com/cosmos/cosmos-sdk/x/auth/types"
grpc "github.com/cosmos/gogoproto/grpc"
"github.com/cosmos/gogoproto/grpc"

"github.com/pokt-network/poktroll/pkg/client"
)
Expand All @@ -24,7 +24,7 @@ type accQuerier struct {

// accountCache is a cache of accounts that have already been queried.
// TODO_TECHDEBT: Add a size limit to the cache and consider an LRU cache.
accountCache map[string]types.AccountI
accountCache map[string]cosmostypes.AccountI
accountCacheMu sync.Mutex
}

Expand All @@ -34,7 +34,7 @@ type accQuerier struct {
// Required dependencies:
// - clientCtx
func NewAccountQuerier(deps depinject.Config) (client.AccountQueryClient, error) {
aq := &accQuerier{accountCache: make(map[string]types.AccountI)}
aq := &accQuerier{accountCache: make(map[string]cosmostypes.AccountI)}

if err := depinject.Inject(
deps,
Expand All @@ -52,9 +52,16 @@ func NewAccountQuerier(deps depinject.Config) (client.AccountQueryClient, error)
func (aq *accQuerier) GetAccount(
ctx context.Context,
address string,
) (types.AccountI, error) {
) (cosmostypes.AccountI, error) {
aq.accountCacheMu.Lock()
defer aq.accountCacheMu.Unlock()
defer func() {
aq.accountCacheMu.Unlock()
client.AllQueriesTotalCounter.With(
"method", "account",
"client_type", "account",
"msg_type", cosmostypes.MsgTypeURL(new(accounttypes.QueryAccountRequest)),
).Add(1)
}()

if foundAccount, isAccountFound := aq.accountCache[address]; isAccountFound {
return foundAccount, nil
Expand All @@ -68,7 +75,7 @@ func (aq *accQuerier) GetAccount(
}

// Unpack and cache the account object
var fetchedAccount types.AccountI
var fetchedAccount cosmostypes.AccountI
if err = queryCodec.UnpackAny(res.Account, &fetchedAccount); err != nil {
return nil, ErrQueryUnableToDeserializeAccount.Wrapf("address: %s [%v]", address, err)
}
Expand Down
42 changes: 36 additions & 6 deletions pkg/client/query/appquerier.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ import (
"context"

"cosmossdk.io/depinject"
grpc "github.com/cosmos/gogoproto/grpc"
cosmostypes "github.com/cosmos/cosmos-sdk/types"
accounttypes "github.com/cosmos/cosmos-sdk/x/auth/types"
gogogrpc "github.com/cosmos/gogoproto/grpc"

"github.com/pokt-network/poktroll/pkg/client"
apptypes "github.com/pokt-network/poktroll/x/application/types"
sharedtypes "github.com/pokt-network/poktroll/x/shared/types"
)

var _ client.ApplicationQueryClient = (*appQuerier)(nil)
Expand All @@ -16,19 +19,40 @@ var _ client.ApplicationQueryClient = (*appQuerier)(nil)
// querying of on-chain application information through a single exposed method
// which returns an apptypes.Application interface
type appQuerier struct {
clientConn grpc.ClientConn
client.ParamsQuerier[*apptypes.Params]

clientConn gogogrpc.ClientConn
applicationQuerier apptypes.QueryClient
}

// NewApplicationQuerier returns a new instance of a client.ApplicationQueryClient
// by injecting the dependecies provided by the depinject.Config
//
// Required dependencies:
// - clientCtx
func NewApplicationQuerier(deps depinject.Config) (client.ApplicationQueryClient, error) {
aq := &appQuerier{}
// - clientCtx (gogogrpc.ClientConn)
func NewApplicationQuerier(
deps depinject.Config,
opts ...ParamsQuerierOptionFn,
) (client.ApplicationQueryClient, error) {
cfg := DefaultParamsQuerierConfig()
for _, opt := range opts {
opt(cfg)
}

if err := depinject.Inject(
paramsQuerier, err := NewCachedParamsQuerier[*apptypes.Params, apptypes.ApplicationQueryClient](
deps, apptypes.NewAppQueryClient,
WithModuleInfo[*sharedtypes.Params](sharedtypes.ModuleName, sharedtypes.ErrSharedParamInvalid),
WithParamsCacheOptions(cfg.CacheOpts...),
)
if err != nil {
return nil, err
}

aq := &appQuerier{
ParamsQuerier: paramsQuerier,
}

if err = depinject.Inject(
deps,
&aq.clientConn,
); err != nil {
Expand All @@ -45,6 +69,12 @@ func (aq *appQuerier) GetApplication(
ctx context.Context,
appAddress string,
) (apptypes.Application, error) {
defer client.AllQueriesTotalCounter.With(
"method", "account",
"client_type", "account",
"msg_type", cosmostypes.MsgTypeURL(new(accounttypes.QueryAccountRequest)),
).Add(1)

req := apptypes.QueryGetApplicationRequest{Address: appAddress}
res, err := aq.applicationQuerier.Application(ctx, &req)
if err != nil {
Expand Down
11 changes: 11 additions & 0 deletions pkg/client/query/cache/badger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package cache

// TODO_UP_NEXT(@bryanchriswhite): Implement a persistent cache using badger.
//
// var _ query.QueryCache[any] = (*BadgerCache[any])(nil)

// BadgerCache is a persistent cache backed by a badger database.
type BadgerCache[T any] struct {
// db *badger.DB
// config CacheConfig
}
Loading
Loading