Skip to content

Commit

Permalink
[Supplier] feat: persistence proofs in submit proof msg handler (2) (#…
Browse files Browse the repository at this point in the history
…328)

- Validate and persist proofs received in `MsgSubmitProof` message by the supplier module.
- ~~Simplify `ReplayEventsClient` to reduce number of generic parameters~~
   *(merged in #330)*

---

Co-authored-by: Daniel Olshansky <[email protected]>
Co-authored-by: Redouane Lakrache <[email protected]>
Co-authored-by: h5law <[email protected]>
Co-authored-by: h5law <[email protected]>
  • Loading branch information
5 people authored Feb 1, 2024
1 parent 8dcf5c3 commit 6fe2102
Show file tree
Hide file tree
Showing 15 changed files with 401 additions and 145 deletions.
9 changes: 7 additions & 2 deletions e2e/tests/session.feature
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
Feature: Session Namespace

# TODO_TECHDEBT(@Olshansk, #180): This test requires you to run `make supplier1_stake && make app1_stake` first
# As a shorter workaround, we can also add steps that stake the application and supplier as part of the scenario.
Scenario: Supplier completes claim/proof lifecycle for a valid session
Given the user has the pocketd binary installed
When the supplier "supplier1" has serviced a session with "5" relays for service "svc1" for application "app1"
And after the supplier creates a claim for the session for service "svc1" for application "app1"
Then the claim created by supplier "supplier1" for service "svc1" for application "app1" should be persisted on-chain
# TODO_IMPROVE: ...
# And an event should be emitted...
# TODO_INCOMPLETE: add step(s) for proof validation.
And after the supplier submits a proof for the session for service "svc1" for application "app1"
Then the proof submitted by supplier "supplier1" for service "svc1" for application "app1" should be persisted on-chain
# TODO_IMPROVE: ...
# And an event should be emitted...

# TODO_BLOCKER(@red-0ne): Make sure to implement and validate this test
# One way to exercise this behavior is to close the `RelayMiner` port to prevent
Expand All @@ -34,4 +39,4 @@ Feature: Session Namespace
# And the supllier "supplier1" calls GetSession and gets session number "3"
# Then the supplier "supplier1" replys to application "app1" with a "session mismatch" error relay response
# And the application "app1" receives a failed relay response with a "session mismatch" error
# And the supplier "supplier1" do not update a claim for session number "1" for service "svc1" for application "app1"
# And the supplier "supplier1" do not update a claim for session number "1" for service "svc1" for application "app1"
197 changes: 129 additions & 68 deletions e2e/tests/session_steps_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,76 +10,41 @@ import (
"strings"
"time"

"cosmossdk.io/depinject"
abci "github.com/cometbft/cometbft/abci/types"
"github.com/stretchr/testify/require"

"github.com/pokt-network/poktroll/pkg/client"
"github.com/pokt-network/poktroll/pkg/client/events"
"github.com/pokt-network/poktroll/pkg/either"
"github.com/pokt-network/poktroll/pkg/observable"
"github.com/pokt-network/poktroll/pkg/observable/channel"
"github.com/pokt-network/poktroll/testutil/testclient"
suppliertypes "github.com/pokt-network/poktroll/x/supplier/types"
"github.com/stretchr/testify/require"
)

const (
createClaimTimeoutDuration = 10 * time.Second
eitherEventsReplayBufferSize = 100
msgClaimSenderQueryFmt = "tm.event='Tx' AND message.sender='%s' AND message.action='/pocket.supplier.MsgCreateClaim'"
testServiceId = "anvil"
eitherEventsBzReplayObsKey = "eitherEventsBzReplayObsKey"
preExistingClaimsKey = "preExistingClaimsKey"
// txEventTimeout is the duration of time to wait after sending a valid tx
// before the test should time out (fail).
txEventTimeout = 10 * time.Second
// txSenderEventSubscriptionQueryFmt is the format string which yields the
// cosmos-sdk event subscription "query" string for a given sender address.
// This is used by an events replay client to subscribe to tx events from the supplier.
// See: https://docs.cosmos.network/v0.47/learn/advanced/events#subscribing-to-events
txSenderEventSubscriptionQueryFmt = "tm.event='Tx' AND message.sender='%s'"
testEventsReplayClientBufferSize = 100
testServiceId = "anvil"
// eventsReplayClientKey is the suite#scenarioState key for the events replay client
// which is subscribed to tx events where the tx sender is the scenario's supplier.
eventsReplayClientKey = "eventsReplayClientKey"
// preExistingClaimsKey is the suite#scenarioState key for any pre-existing
// claims when querying for all claims prior to running the scenario.
preExistingClaimsKey = "preExistingClaimsKey"
// preExistingProofsKey is the suite#scenarioState key for any pre-existing
// proofs when querying for all proofs prior to running the scenario.
preExistingProofsKey = "preExistingProofsKey"
)

func (s *suite) AfterTheSupplierCreatesAClaimForTheSessionForServiceForApplication(serviceId, appName string) {
ctx, done := context.WithCancel(context.Background())

// TODO_CONSIDERATION: if this test suite gets more complex, it might make
// sense to refactor this key into a function that takes serviceId and appName
// as arguments and returns the key.
eitherEventsBzReplayObs := s.scenarioState[eitherEventsBzReplayObsKey].(observable.ReplayObservable[either.Bytes])

// TODO(#220): refactor to use EventsReplayClient once available.
channel.ForEach[either.Bytes](
ctx, eitherEventsBzReplayObs,
func(_ context.Context, eitherEventBz either.Bytes) {
eventBz, err := eitherEventBz.ValueOrError()
require.NoError(s, err)

if strings.Contains(string(eventBz), "jsonrpc") {
return
}

// Unmarshal event data into a TxEventResponse object.
txEvent := &abci.TxResult{}
err = json.Unmarshal(eventBz, txEvent)
require.NoError(s, err)

var found bool
for _, event := range txEvent.Result.Events {
for _, attribute := range event.Attributes {
if attribute.Key == "action" {
require.Equal(
s, "/pocket.supplier.MsgCreateClaim",
attribute.Value,
)
found = true
break
}
}
if found {
break
}
}
require.Truef(s, found, "unable to find event action attribute")

done()
},
)

select {
case <-ctx.Done():
case <-time.After(createClaimTimeoutDuration):
s.Fatal("timed out waiting for claim to be created")
}
s.waitForMessageAction("/pocket.supplier.MsgCreateClaim")
}

func (s *suite) TheClaimCreatedBySupplierForServiceForApplicationShouldBePersistedOnchain(supplierName, serviceId, appName string) {
Expand All @@ -94,7 +59,8 @@ func (s *suite) TheClaimCreatedBySupplierForServiceForApplicationShouldBePersist
require.NotNil(s, allClaimsRes)

// Assert that the number of claims has increased by one.
preExistingClaims := s.scenarioState[preExistingClaimsKey].([]suppliertypes.Claim)
preExistingClaims, ok := s.scenarioState[preExistingClaimsKey].([]suppliertypes.Claim)
require.True(s, ok, "preExistingClaimsKey not found in scenarioState")
// NB: We are avoiding the use of require.Len here because it provides unreadable output
// TODO_TECHDEBT: Due to the speed of the blocks of the LocalNet sequencer, along with the small number
// of blocks per session, multiple claims may be created throughout the duration of the test. Until
Expand All @@ -119,23 +85,43 @@ func (s *suite) TheSupplierHasServicedASessionWithRelaysForServiceForApplication
relayCount, err := strconv.Atoi(relayCountStr)
require.NoError(s, err)

// Query for any existing claims so that we can compensate for them in the
// Query for any existing claims so that we can compare against them in
// future assertions about changes in on-chain claims.
allClaimsRes, err := s.supplierQueryClient.AllClaims(ctx, &suppliertypes.QueryAllClaimsRequest{})
require.NoError(s, err)
s.scenarioState[preExistingClaimsKey] = allClaimsRes.Claim

// Query for any existing proofs so that we can compare against them in
// future assertions about changes in on-chain proofs.
allProofsRes, err := s.supplierQueryClient.AllProofs(ctx, &suppliertypes.QueryAllProofsRequest{})
require.NoError(s, err)
s.scenarioState[preExistingProofsKey] = allProofsRes.Proof

// Construct an events query client to listen for tx events from the supplier.
msgSenderQuery := fmt.Sprintf(msgClaimSenderQueryFmt, accNameToAddrMap[supplierName])
msgSenderQuery := fmt.Sprintf(txSenderEventSubscriptionQueryFmt, accNameToAddrMap[supplierName])

deps := depinject.Supply(events.NewEventsQueryClient(testclient.CometLocalWebsocketURL))
eventsReplayClient, err := events.NewEventsReplayClient[*abci.TxResult](
ctx,
deps,
msgSenderQuery,
func(eventBz []byte) (*abci.TxResult, error) {
if strings.Contains(string(eventBz), "jsonrpc") {
return nil, nil
}

// TODO_TECHDEBT(#220): refactor to use EventsReplayClient once available.
eventsQueryClient := events.NewEventsQueryClient(testclient.CometLocalWebsocketURL)
eitherEventsBzObs, err := eventsQueryClient.EventsBytes(ctx, msgSenderQuery)
// Unmarshal event data into an ABCI TxResult object.
txResult := &abci.TxResult{}
err = json.Unmarshal(eventBz, txResult)
require.NoError(s, err)

return txResult, nil
},
testEventsReplayClientBufferSize,
)
require.NoError(s, err)

eitherEventsBytesObs := observable.Observable[either.Bytes](eitherEventsBzObs)
eitherEventsBzRelayObs := channel.ToReplayObservable(ctx, eitherEventsReplayBufferSize, eitherEventsBytesObs)
s.scenarioState[eitherEventsBzReplayObsKey] = eitherEventsBzRelayObs
s.scenarioState[eventsReplayClientKey] = eventsReplayClient

s.sendRelaysForSession(
appName,
Expand All @@ -145,6 +131,42 @@ func (s *suite) TheSupplierHasServicedASessionWithRelaysForServiceForApplication
)
}

func (s *suite) AfterTheSupplierSubmitsAProofForTheSessionForServiceForApplication(a string, b string) {
s.waitForMessageAction("/pocket.supplier.MsgSubmitProof")
}

func (s *suite) TheProofSubmittedBySupplierForServiceForApplicationShouldBePersistedOnchain(supplierName, serviceId, appName string) {
ctx := context.Background()

// Retrieve all on-chain proofs for supplierName
allProofsRes, err := s.supplierQueryClient.AllProofs(ctx, &suppliertypes.QueryAllProofsRequest{
Filter: &suppliertypes.QueryAllProofsRequest_SupplierAddress{
SupplierAddress: accNameToAddrMap[supplierName],
},
})
require.NoError(s, err)
require.NotNil(s, allProofsRes)

// Assert that the number of proofs has increased by one.
preExistingProofs, ok := s.scenarioState[preExistingProofsKey].([]suppliertypes.Proof)
require.True(s, ok, "preExistingProofsKey not found in scenarioState")
// NB: We are avoiding the use of require.Len here because it provides unreadable output
// TODO_TECHDEBT: Due to the speed of the blocks of the LocalNet sequencer, along with the small number
// of blocks per session, multiple proofs may be created throughout the duration of the test. Until
// these values are appropriately adjusted, we assert on an increase in proofs rather than +1.
require.Greater(s, len(allProofsRes.Proof), len(preExistingProofs), "number of proofs must have increased")

// TODO_UPNEXT(@bryanchriswhite): assert that the root hash of the proof contains the correct
// SMST sum. The sum can be retrieved via the `GetSum` function exposed
// by the SMT.

// TODO_IMPROVE: add assertions about serviceId and appName and/or incorporate
// them into the scenarioState key(s).

proof := allProofsRes.Proof[0]
require.Equal(s, accNameToAddrMap[supplierName], proof.SupplierAddress)
}

func (s *suite) sendRelaysForSession(
appName string,
supplierName string,
Expand All @@ -163,3 +185,42 @@ func (s *suite) sendRelaysForSession(
s.TheApplicationReceivesASuccessfulRelayResponseSignedBy(appName, supplierName)
}
}

// waitForMessageAction waits for an event to be observed which has the given message action.
func (s *suite) waitForMessageAction(action string) {
ctx, done := context.WithCancel(context.Background())

eventsReplayClient, ok := s.scenarioState[eventsReplayClientKey].(client.EventsReplayClient[*abci.TxResult])
require.True(s, ok, "eventsReplayClientKey not found in scenarioState")
require.NotNil(s, eventsReplayClient)

// For each observed event, **asynchronously** check if it contains the given action.
channel.ForEach[*abci.TxResult](
ctx, eventsReplayClient.EventsSequence(ctx),
func(_ context.Context, txEvent *abci.TxResult) {
if txEvent == nil {
return
}

// Range over each event's attributes to find the "action" attribute
// and compare its value to that of the action provided.
for _, event := range txEvent.Result.Events {
for _, attribute := range event.Attributes {
if attribute.Key == "action" {
if attribute.Value == action {
done()
return
}
}
}
}
},
)

select {
case <-time.After(txEventTimeout):
s.Fatalf("timed out waiting for message with action %q", action)
case <-ctx.Done():
s.Log("Success; message detected before timeout.")
}
}
2 changes: 2 additions & 0 deletions proto/pocket/supplier/query.proto
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ message QueryAllClaimsRequest {
}

message QueryAllClaimsResponse {
// TODO_IMPROVE: Rename to `Claims` (plural).
repeated Claim claim = 1 [(gogoproto.nullable) = false];
cosmos.base.query.v1beta1.PageResponse pagination = 2;
}
Expand All @@ -116,6 +117,7 @@ message QueryAllProofsRequest {
}

message QueryAllProofsResponse {
// TODO_IMPROVE: Rename to `Proofs` (plural).
repeated Proof proof = 1 [(gogoproto.nullable) = false];
cosmos.base.query.v1beta1.PageResponse pagination = 2;
}
Expand Down
2 changes: 1 addition & 1 deletion x/session/keeper/query_get_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func (k Keeper) GetSession(goCtx context.Context, req *types.QueryGetSessionRequ
}

if err := req.ValidateBasic(); err != nil {
return nil, err
return nil, status.Error(codes.InvalidArgument, err.Error())
}

ctx := sdk.UnwrapSDKContext(goCtx)
Expand Down
3 changes: 3 additions & 0 deletions x/supplier/client/cli/tx_create_claim_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package cli

// TODO_NEXT(@bryanchriswhite #140): add comprehensive CLI test coverage for creating claims.
3 changes: 3 additions & 0 deletions x/supplier/client/cli/tx_submit_proof_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package cli

// TODO_NEXT(@bryanchriswhite #141): add comprehensive CLI test coverage for submitting proofs.
5 changes: 4 additions & 1 deletion x/supplier/keeper/msg_server_create_claim.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"

sdk "github.com/cosmos/cosmos-sdk/types"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

suppliertypes "github.com/pokt-network/poktroll/x/supplier/types"
)
Expand All @@ -14,6 +16,7 @@ func (k msgServer) CreateClaim(goCtx context.Context, msg *suppliertypes.MsgCrea

ctx := sdk.UnwrapSDKContext(goCtx)
logger := k.Logger(ctx).With("method", "CreateClaim")
logger.Debug("creating claim")

if err := msg.ValidateBasic(); err != nil {
return nil, err
Expand All @@ -25,7 +28,7 @@ func (k msgServer) CreateClaim(goCtx context.Context, msg *suppliertypes.MsgCrea
msg.GetSupplierAddress(),
)
if err != nil {
return nil, err
return nil, status.Error(codes.InvalidArgument, err.Error())
}

logger.
Expand Down
Loading

0 comments on commit 6fe2102

Please sign in to comment.