Skip to content

Commit

Permalink
Merge branch 'issues/141/prep/in-memory-network' into issues/141/feat…
Browse files Browse the repository at this point in the history
…/in-memory-network

* issues/141/prep/in-memory-network:
  [Testing, Tooling] chore: in-memory network interface & config types (#289)
  trigger CI
  [Supplier] refactor: supplier module errors (#265)
  [Supplier] refactor: supplier module keys (#264)
  [Supplier] refactor: claim & proof protobufs + (#263)
  chore: review feedback improvements
  [Configs] feat: Add staking config parser of gateway staking (#302)
  chore: review feedback improvements
  fix: usage raw string literal
  chore: review feedback improvements
  [RingCache] Invalidate Cache On Redelegation Events (#239)
  • Loading branch information
bryanchriswhite committed Jan 10, 2024
2 parents fe5e743 + 6c35227 commit b0b0436
Show file tree
Hide file tree
Showing 38 changed files with 733 additions and 157 deletions.
8 changes: 4 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -274,19 +274,19 @@ gateway_list: ## List all the staked gateways

.PHONY: gateway_stake
gateway_stake: ## Stake tokens for the gateway specified (must specify the gateway env var)
poktrolld --home=$(POKTROLLD_HOME) tx gateway stake-gateway 1000upokt --keyring-backend test --from $(GATEWAY) --node $(POCKET_NODE)
poktrolld --home=$(POKTROLLD_HOME) tx gateway stake-gateway --config $(POKTROLLD_HOME)/config/$(STAKE) --keyring-backend test --from $(GATEWAY) --node $(POCKET_NODE)

.PHONY: gateway1_stake
gateway1_stake: ## Stake gateway1
GATEWAY=gateway1 make gateway_stake
GATEWAY=gateway1 STAKE=gateway1_stake_config.yaml make gateway_stake

.PHONY: gateway2_stake
gateway2_stake: ## Stake gateway2
GATEWAY=gateway2 make gateway_stake
GATEWAY=gateway2 STAKE=gateway2_stake_config.yaml make gateway_stake

.PHONY: gateway3_stake
gateway3_stake: ## Stake gateway3
GATEWAY=gateway3 make gateway_stake
GATEWAY=gateway3 STAKE=gateway3_stake_config.yaml make gateway_stake

.PHONY: gateway_unstake
gateway_unstake: ## Unstake an gateway (must specify the GATEWAY env var)
Expand Down
22 changes: 21 additions & 1 deletion e2e/tests/init_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package e2e
import (
"flag"
"fmt"
"io/ioutil"
"log"
"os"
"regexp"
Expand Down Expand Up @@ -166,17 +167,36 @@ func (s *suite) TheUserShouldWaitForSeconds(dur int64) {
}

func (s *suite) TheUserStakesAWithUpoktFromTheAccount(actorType string, amount int64, accName string) {
// Create a temporary config file
configPathPattern := fmt.Sprintf("%s_stake_config_*.yaml", accName)
configContent := fmt.Sprintf(`stake_amount: %d upokt`, amount)
configFile, err := ioutil.TempFile("", configPathPattern)
if err != nil {
s.Fatalf("error creating config file: %q", err)
}
if _, err = configFile.Write([]byte(configContent)); err != nil {
s.Fatalf("error writing config file: %q", err)
}

args := []string{
"tx",
actorType,
fmt.Sprintf("stake-%s", actorType),
fmt.Sprintf("%dupokt", amount),
"--config",
configFile.Name(),
"--from",
accName,
keyRingFlag,
"-y",
}
res, err := s.pocketd.RunCommandOnHost("", args...)

// Remove the temporary config file
err = os.Remove(configFile.Name())
if err != nil {
s.Fatalf("error removing config file: %q", err)
}

if err != nil {
s.Fatalf("error staking %s: %s", actorType, err)
}
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ require (
github.com/athanorlabs/go-dleq v0.1.0
github.com/cometbft/cometbft v0.37.2
github.com/cometbft/cometbft-db v0.8.0
github.com/cosmos/cosmos-proto v1.0.0-beta.2
github.com/cosmos/cosmos-sdk v0.47.3
github.com/cosmos/gogoproto v1.4.11
github.com/cosmos/ibc-go/v7 v7.1.0
Expand All @@ -48,7 +47,6 @@ require (
golang.org/x/crypto v0.15.0
golang.org/x/exp v0.0.0-20230905200255-921286631fa9
golang.org/x/sync v0.5.0
google.golang.org/genproto/googleapis/api v0.0.0-20230913181813-007df8e322eb
google.golang.org/grpc v1.59.0
gopkg.in/yaml.v2 v2.4.0
)
Expand Down Expand Up @@ -92,6 +90,7 @@ require (
github.com/containerd/cgroups v1.1.0 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/cosmos/btcutil v1.0.5 // indirect
github.com/cosmos/cosmos-proto v1.0.0-beta.2 // indirect
github.com/cosmos/go-bip39 v1.0.0 // indirect
github.com/cosmos/gogogateway v1.2.0 // indirect
github.com/cosmos/iavl v0.20.0 // indirect
Expand Down Expand Up @@ -286,6 +285,7 @@ require (
google.golang.org/api v0.143.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20230913181813-007df8e322eb // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230913181813-007df8e322eb // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230920204549-e6e6cdab5c13 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
Expand Down
1 change: 1 addition & 0 deletions localnet/poktrolld/config/gateway1_stake_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
stake_amount: 1000upokt
1 change: 1 addition & 0 deletions localnet/poktrolld/config/gateway2_stake_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
stake_amount: 1000upokt
1 change: 1 addition & 0 deletions localnet/poktrolld/config/gateway3_stake_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
stake_amount: 1000upokt
2 changes: 2 additions & 0 deletions pkg/appgateserver/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,12 @@ func setupAppGateServerDependencies(
config.NewSupplyEventsQueryClientFn(queryNodeRPCURL), // leaf
config.NewSupplyBlockClientFn(), // leaf
config.NewSupplyQueryClientContextFn(queryNodeGRPCURL), // leaf
config.NewSupplyDelegationClientFn(), // leaf
config.NewSupplyAccountQuerierFn(), // leaf
config.NewSupplyApplicationQuerierFn(), // leaf
config.NewSupplySessionQuerierFn(), // leaf
config.NewSupplyRingCacheFn(),

config.NewSupplyPOKTRollSDKFn(appGateConfig.SigningKey),
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/client/block/client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/pokt-network/poktroll/testutil/testclient/testblock"
)

// TODO(#255): Refactor this integration test to use an in-memory simulated network

const blockIntegrationSubTimeout = 5 * time.Second

func TestBlockClient_LastNBlocks(t *testing.T) {
Expand Down
18 changes: 14 additions & 4 deletions pkg/client/delegation/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,25 @@ import (

const (
// delegationEventQuery is the query used by the EventsQueryClient to subscribe
// to new delegation events from the the application module on chain.
// to all application module events in order to filter for redelegation events.
// See: https://docs.cosmos.network/v0.47/learn/advanced/events#subscribing-to-events
// And: https://docs.cosmos.network/v0.47/learn/advanced/events#default-events
delegationEventQuery = "message.action='pocket.application.EventRedelegation'"
// TODO_HACK(#280): Instead of listening to all events and doing a verbose
// filter, we should subscribe to both MsgDelegateToGateway and MsgUndelegateFromGateway
// messages directly, and filter those for the EventRedelegation event types.
// This would save the delegation client from listening to a lot of unnecessary
// events, that it filters out.
// NB: This is not currently possible because the observer pattern does not
// support multiplexing multiple observables into a single observable, that
// can supply the EventsReplayClient with both the MsgDelegateToGateway and
// MsgUndelegateFromGateway events.
delegationEventQuery = "tm.event='Tx' AND message.module='application'"

// defaultRedelegationsReplayLimit is the number of redelegations that the
// replay observable returned by LastNRedelegations() will be able to replay.
// TODO_TECHDEBT/TODO_FUTURE: add a `redelegationsReplayLimit` field to the `delegationClient`
// struct that defaults to this but can be overridden via an option.
// TODO_TECHDEBT/TODO_FUTURE: add a `redelegationsReplayLimit` field to the
// delegation client struct that defaults to this but can be overridden via
// an option in future work.
defaultRedelegationsReplayLimit = 100
)

Expand Down
25 changes: 12 additions & 13 deletions pkg/client/delegation/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package delegation_test

import (
"context"
"encoding/json"
"testing"
"time"

Expand All @@ -12,29 +11,25 @@ import (
"github.com/pokt-network/poktroll/pkg/client"
"github.com/pokt-network/poktroll/pkg/client/delegation"
"github.com/pokt-network/poktroll/testutil/sample"
"github.com/pokt-network/poktroll/testutil/testclient/testdelegation"
"github.com/pokt-network/poktroll/testutil/testclient/testeventsquery"
apptypes "github.com/pokt-network/poktroll/x/application/types"
)

const (
testTimeoutDuration = 100 * time.Millisecond

// duplicates pkg/client/delegation/client.go's delegationEventQuery for testing purposes.
delegationEventQuery = "message.action='pocket.application.EventRedelegation'"
delegationEventQuery = "tm.event='Tx' AND message.module='application'"
)

func TestDelegationClient(t *testing.T) {
var (
expectedAddress = sample.AccAddress()
expectedDelegationEvent = apptypes.EventRedelegation{
AppAddress: expectedAddress,
GatewayAddress: sample.AccAddress(),
}
ctx = context.Background()
expectedAppAddress = sample.AccAddress()
expectedGatewayAddress = sample.AccAddress()
ctx = context.Background()
)

expectedEventBz, err := json.Marshal(expectedDelegationEvent)
require.NoError(t, err)
expectedEventBz := testdelegation.NewRedelegationEventBytes(t, expectedAppAddress, expectedGatewayAddress)

eventsQueryClient := testeventsquery.NewAnyTimesEventsBytesEventsQueryClient(
ctx, t,
Expand All @@ -58,6 +53,8 @@ func TestDelegationClient(t *testing.T) {
name: "LastNRedelegations successfully returns latest redelegation",
fn: func() client.Redelegation {
lastRedelegation := delegationClient.LastNRedelegations(ctx, 1)[0]
require.Equal(t, expectedAppAddress, lastRedelegation.GetAppAddress())
require.Equal(t, expectedGatewayAddress, lastRedelegation.GetGatewayAddress())
return lastRedelegation
},
},
Expand All @@ -69,7 +66,8 @@ func TestDelegationClient(t *testing.T) {

// Ensure that the observable is replayable via Last.
lastRedelegation := redelegationObs.Last(ctx, 1)[0]
require.Equal(t, expectedAddress, lastRedelegation.GetAppAddress())
require.Equal(t, expectedAppAddress, lastRedelegation.GetAppAddress())
require.Equal(t, expectedGatewayAddress, lastRedelegation.GetGatewayAddress())

return lastRedelegation
},
Expand All @@ -90,7 +88,8 @@ func TestDelegationClient(t *testing.T) {

select {
case actualRedelegation := <-actualRedelegationCh:
require.Equal(t, expectedAddress, actualRedelegation.GetAppAddress())
require.Equal(t, expectedAppAddress, actualRedelegation.GetAppAddress())
require.Equal(t, expectedGatewayAddress, actualRedelegation.GetGatewayAddress())
case <-time.After(testTimeoutDuration):
t.Fatal("timed out waiting for redelegation event")
}
Expand Down
73 changes: 62 additions & 11 deletions pkg/client/delegation/redelegation.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,32 @@
package delegation

// TODO_TECHDEBT(#280): Refactor to use merged observables and subscribe to
// MsgDelegateToGateway and MsgUndelegateFromGateway messages directly, instead
// of listening to all events and doing a verbose filter.

import (
"encoding/json"
"strconv"

"cosmossdk.io/api/tendermint/abci"

"github.com/pokt-network/poktroll/pkg/client"
"github.com/pokt-network/poktroll/pkg/client/events"
)

// redelegationEventType is the type of the EventRedelegation event emitted by
// both the MsgDelegateToGateway and MsgUndelegateFromGateway messages.
const redelegationEventType = "pocket.application.EventRedelegation"

var _ client.Redelegation = (*redelegation)(nil)

// TxEvent is an alias for the CometBFT TxResult type used to decode the
// response bytes from the EventsQueryClient's subscription
type TxEvent = abci.TxResult

// redelegation wraps the EventRedelegation event emitted by the application
// module, for use in the observable
// module, for use in the observable, it is one of the log entries embedded
// within the log field of the response struct from the app module's query.
type redelegation struct {
AppAddress string `json:"app_address"`
GatewayAddress string `json:"gateway_address"`
Expand All @@ -27,22 +43,57 @@ func (d redelegation) GetGatewayAddress() string {
}

// newRedelegationEventFactoryFn is a factory function that returns a
// function that attempts to deserialise the given bytes into a redelegation
// function that attempts to deserialize the given bytes into a redelegation
// struct. If the delegate struct has an empty app address then an
// ErrUnmarshalRedelegation error is returned. Otherwise if deserialisation
// fails then the error is returned.
func newRedelegationEventFactoryFn() events.NewEventsFn[client.Redelegation] {
return func(redelegationEventBz []byte) (client.Redelegation, error) {
redelegationEvent := new(redelegation)
if err := json.Unmarshal(redelegationEventBz, redelegationEvent); err != nil {
return func(eventBz []byte) (client.Redelegation, error) {
txEvent := new(TxEvent)
// Try to deserialize the provided bytes into a TxEvent.
if err := json.Unmarshal(eventBz, txEvent); err != nil {
return nil, err
}

if redelegationEvent.AppAddress == "" || redelegationEvent.GatewayAddress == "" {
return nil, events.ErrEventsUnmarshalEvent.
Wrapf("with redelegation: %s", string(redelegationEventBz))
// Check if the TxEvent has empty transaction bytes, which indicates
// the message is probably not a valid transaction event.
if len(txEvent.Tx) == 0 {
return nil, events.ErrEventsUnmarshalEvent.Wrap("empty transaction bytes")
}

return redelegationEvent, nil
// Iterate through the log entries to find EventRedelegation
for _, event := range txEvent.Result.Events {
if event.GetType_() != redelegationEventType {
continue
}
var redelegationEvent redelegation
for _, attr := range event.Attributes {
switch attr.Key {
case "app_address":
appAddr, err := unescape(attr.Value)
if err != nil {
return nil, events.ErrEventsUnmarshalEvent.Wrapf("cannot retrieve app address: %v", err)
}
redelegationEvent.AppAddress = appAddr
case "gateway_address":
gatewayAddr, err := unescape(attr.Value)
if err != nil {
return nil, events.ErrEventsUnmarshalEvent.Wrapf("cannot retrieve gateway address: %v", err)
}
redelegationEvent.GatewayAddress = gatewayAddr
default:
return nil, events.ErrEventsUnmarshalEvent.Wrapf("unknown attribute key: %s", attr.Key)
}
}
// Handle the redelegation event
if redelegationEvent.AppAddress == "" || redelegationEvent.GatewayAddress == "" {
return nil, events.ErrEventsUnmarshalEvent.
Wrapf("empty redelegation event: %s", string(eventBz))
}
return redelegationEvent, nil
}
return nil, events.ErrEventsUnmarshalEvent.Wrap("no redelegation event found")
}
}

func unescape(s string) (string, error) {
return strconv.Unquote(s)
}
8 changes: 4 additions & 4 deletions pkg/client/events/replay_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,10 @@ func (rClient *replayClient[T, R]) goRemapEventsSequence(ctx context.Context, pu
func(ctx context.Context, eventTypeObs R) {
if prevEventTypeObs != nil {
// Just in case the assumption that all transport errors are
// persistent does not hold, unsubscribe from the previous
// event type observable in order to prevent unexpected and/or
// duplicate notifications on the obsersvable returned by this
// function.
// persistent (i.e. they occur once and do not repeat) does not
// hold, unsubscribe from the previous event type observable in
// order to prevent unexpected and/or duplicate notifications
// on the observable returned by this function.
prevEventTypeObs.UnsubscribeAll()
} else {
prevEventTypeObs = eventTypeObs
Expand Down
3 changes: 3 additions & 0 deletions pkg/client/events/replay_client_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ func TestReplayClient_Remapping(t *testing.T) {

eventNum := readEventCounter.Add(1) - 1
event := newMessageEventBz(eventNum)
// After an arbitrary number of events (2 in this case), simulate
// the connection closing so that the replay client can remap the
// events it receives without the caller having to resubscribe.
if eventNum == 2 {
// Simulate the connection closing
connMock.Close()
Expand Down
2 changes: 1 addition & 1 deletion pkg/client/query/accquerier.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (aq *accQuerier) GetAccount(
}
var acc accounttypes.AccountI
if err = queryCodec.UnpackAny(res.Account, &acc); err != nil {
return nil, ErrQueryUnableToDeserialiseAccount.Wrapf("address: %s [%v]", address, err)
return nil, ErrQueryUnableToDeserializeAccount.Wrapf("address: %s [%v]", address, err)
}
return acc, nil
}
2 changes: 1 addition & 1 deletion pkg/client/query/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ import (
var (
codespace = "query"
ErrQueryAccountNotFound = sdkerrors.Register(codespace, 1, "account not found")
ErrQueryUnableToDeserialiseAccount = sdkerrors.Register(codespace, 2, "unable to deserialise account")
ErrQueryUnableToDeserializeAccount = sdkerrors.Register(codespace, 2, "unable to deserialize account")
ErrQueryRetrieveSession = sdkerrors.Register(codespace, 3, "error while trying to retrieve a session")
)
14 changes: 14 additions & 0 deletions pkg/crypto/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,19 @@ import (
// the addresses of the gateways the application is delegated to, and converting
// them into their corresponding public key points on the secp256k1 curve.
type RingCache interface {
// Start starts the ring cache, it takes a cancellable context and, in a
// separate goroutine, listens for on-chain delegation events and invalidates
// the cache if the redelegation event's AppAddress is stored in the cache.
Start(ctx context.Context)
// GetCachedAddresses returns the addresses of the applications that are
// currently cached in the ring cache.
GetCachedAddresses() []string
// GetRingForAddress returns the ring for the given application address if
// it exists. If it does not exist in the cache, it follows a lazy approach
// of querying the on-chain state and creating it just-in-time, caching for
// future retrievals.
GetRingForAddress(ctx context.Context, appAddress string) (*ring.Ring, error)
// Stop stops the ring cache by unsubscribing from on-chain delegation events.
// And clears the cache, so that it no longer contains any rings,
Stop()
}
Loading

0 comments on commit b0b0436

Please sign in to comment.