Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Supplier] feat: persistence proofs in submit proof msg handler (2) #328

Merged
merged 52 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from 43 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
8ab21b6
refactor: off-chain session validation
bryanchriswhite Dec 19, 2023
10df226
refactor: proof CLI
bryanchriswhite Dec 19, 2023
4a22787
refactor: proof store indices
bryanchriswhite Dec 12, 2023
3127f1b
fixup! refactor: off-chain session validation
bryanchriswhite Jan 4, 2024
84ffced
fixup! fixup! refactor: off-chain session validation
bryanchriswhite Jan 5, 2024
255a677
fixup! fixup! fixup! refactor: off-chain session validation
bryanchriswhite Jan 8, 2024
de6f735
chore: review feedback improvement
bryanchriswhite Jan 9, 2024
189ad8c
chore: remove unneeded whitespace
bryanchriswhite Jan 17, 2024
08d68ab
chore: update go.mod
bryanchriswhite Jan 17, 2024
db47b1a
feat: proof persistence
bryanchriswhite Dec 13, 2023
a7f4c5b
refactor: session fixture test helpers
bryanchriswhite Jan 8, 2024
0435959
test: submit proof message handling
bryanchriswhite Dec 19, 2023
d3a9daf
chore: update Proof protobuf usage
bryanchriswhite Dec 22, 2023
3c76f0f
test: proof cli querying
bryanchriswhite Jan 2, 2024
f34c4f1
todo: stub claim/proof creation tests
bryanchriswhite Jan 3, 2024
32920cd
chore: validate claim for proof
bryanchriswhite Jan 3, 2024
1116b21
refactor: proof msg handler errors
bryanchriswhite Jan 5, 2024
c6e14bb
refactor: create claim error & tests
bryanchriswhite Jan 8, 2024
a0d4960
wip: post cherry-pick fixes
bryanchriswhite Jan 16, 2024
fffcf86
chore: add TODO to session.feature
bryanchriswhite Jan 16, 2024
8b03fcc
test: E2E proof persistence
bryanchriswhite Jan 17, 2024
064a1b3
refactor: simplify EventsReplayClient
bryanchriswhite Jan 17, 2024
4b6f263
Revert "test: submit proof message handling"
bryanchriswhite Jan 17, 2024
9889fdc
Revert "refactor: session fixture test helpers"
bryanchriswhite Jan 17, 2024
1c17a7a
fix: tests
bryanchriswhite Jan 17, 2024
8e35ec4
fixup! wip: post cherry-pick fixes
bryanchriswhite Jan 17, 2024
cc1bac8
chore: self-review improvements
bryanchriswhite Jan 17, 2024
6fced72
chore: review feedback improvements
bryanchriswhite Jan 18, 2024
53868a5
chore: review feedback improvements
bryanchriswhite Jan 18, 2024
abcdd9c
Merge branch 'issues/141/refactor/proof-store-indices-2' into issues/…
bryanchriswhite Jan 18, 2024
ec741d2
chore: review feedback improvements
bryanchriswhite Jan 18, 2024
292f9f5
chore: review feedback improvements
bryanchriswhite Jan 18, 2024
e23acc6
chore: review feedback improvements
bryanchriswhite Jan 19, 2024
4d3ef9d
Merge branch 'main' into issues/141/refactor/proof-store-indices-2
bryanchriswhite Jan 19, 2024
403060c
chore: review feedback improvements
bryanchriswhite Jan 22, 2024
489ebdb
Merge branch 'issues/141/refactor/proof-store-indices-2' into issues/…
bryanchriswhite Jan 22, 2024
72c2342
chore: review feedback improvement
bryanchriswhite Jan 22, 2024
7e8cb19
Merge remote-tracking branch 'pokt/main' into issues/141/refactor/pro…
bryanchriswhite Jan 22, 2024
8f42d81
Merge branch 'issues/141/refactor/proof-store-indices-2' into issues/…
bryanchriswhite Jan 22, 2024
7fa49b7
fix: linter errors
bryanchriswhite Jan 22, 2024
fe62959
Merge branch 'main' into issues/141/feat/proof-persistence-2
bryanchriswhite Jan 23, 2024
351dc18
Merge branch 'main' into issues/141/feat/proof-persistence-2
bryanchriswhite Jan 23, 2024
36be615
chore: review feedback improvements
bryanchriswhite Jan 23, 2024
dafc425
chore: review feedback improvements
bryanchriswhite Jan 25, 2024
0ed19fe
chore: review feedback improvement
bryanchriswhite Jan 25, 2024
862bf4b
chore: review feedback improvement
bryanchriswhite Jan 25, 2024
49888cd
Merge remote-tracking branch 'pokt/main' into issues/141/feat/proof-p…
bryanchriswhite Jan 25, 2024
52ac55e
fix: session start block height in test
bryanchriswhite Jan 25, 2024
380b49d
Merge branch 'main' into issues/141/feat/proof-persistence-2
bryanchriswhite Jan 26, 2024
780baa6
Merge branch 'main' into issues/141/feat/proof-persistence-2
bryanchriswhite Jan 27, 2024
5e45ac6
Merge branch 'main' into issues/141/feat/proof-persistence-2
bryanchriswhite Jan 29, 2024
9fc6ad4
Merge branch 'main' into issues/141/feat/proof-persistence-2
Olshansk Feb 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion e2e/tests/session.feature
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
Feature: Session Namespace

# TODO_TECHDEBT(@Olshansk, #180): This test requires you to run `make supplier1_stake && make app1_stake` first
# As a shorter workaround, we can also add steps that stake the application and supplier as part of the scenario.
Scenario: Supplier completes claim/proof lifecycle for a valid session
Given the user has the pocketd binary installed
When the supplier "supplier1" has serviced a session with "5" relays for service "svc1" for application "app1"
And after the supplier creates a claim for the session for service "svc1" for application "app1"
Then the claim created by supplier "supplier1" for service "svc1" for application "app1" should be persisted on-chain
# TODO_IMPROVE: ...
# And an event should be emitted...
h5law marked this conversation as resolved.
Show resolved Hide resolved
# TODO_INCOMPLETE: add step(s) for proof validation.
And after the supplier submits a proof for the session for service "svc1" for application "app1"
Then the proof submitted by supplier "supplier1" for service "svc1" for application "app1" should be persisted on-chain
# TODO_IMPROVE: ...
# And an event should be emitted...
197 changes: 129 additions & 68 deletions e2e/tests/session_steps_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,76 +10,41 @@ import (
"strings"
"time"

"cosmossdk.io/depinject"
abci "github.com/cometbft/cometbft/abci/types"
"github.com/stretchr/testify/require"

"github.com/pokt-network/poktroll/pkg/client"
"github.com/pokt-network/poktroll/pkg/client/events"
"github.com/pokt-network/poktroll/pkg/either"
"github.com/pokt-network/poktroll/pkg/observable"
"github.com/pokt-network/poktroll/pkg/observable/channel"
"github.com/pokt-network/poktroll/testutil/testclient"
suppliertypes "github.com/pokt-network/poktroll/x/supplier/types"
"github.com/stretchr/testify/require"
)

const (
createClaimTimeoutDuration = 10 * time.Second
eitherEventsReplayBufferSize = 100
msgClaimSenderQueryFmt = "tm.event='Tx' AND message.sender='%s' AND message.action='/pocket.supplier.MsgCreateClaim'"
testServiceId = "anvil"
eitherEventsBzReplayObsKey = "eitherEventsBzReplayObsKey"
preExistingClaimsKey = "preExistingClaimsKey"
// txEventTimeout is the duration of time to wait after sending a valid tx
// before the test should time out (fail).
txEventTimeout = 10 * time.Second
// txSenderEventSubscriptionQueryFmt is the format string which yields the
// cosmos-sdk event subscription "query" string for a given sender address.
// This is used by an events replay client to subscribe to tx events from the supplier.
// See: https://docs.cosmos.network/v0.46/core/events.html#subscribing-to-events.
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
txSenderEventSubscriptionQueryFmt = "tm.event='Tx' AND message.sender='%s'"
testEventsReplayClientBufferSize = 100
testServiceId = "anvil"
// eventsReplayClientKey is the suite#scenarioState key for the events replay client
// which is subscribed to tx events where the tx sender is the scenario's supplier.
eventsReplayClientKey = "eventsReplayClientKey"
// preExistingClaimsKey is the suite#scenarioState key for any pre-existing
// claims when querying for all claims prior to running the scenario.
preExistingClaimsKey = "preExistingClaimsKey"
// preExistingProofsKey is the suite#scenarioState key for any pre-existing
// proofs when querying for all proofs prior to running the scenario.
preExistingProofsKey = "preExistingProofsKey"
)

func (s *suite) AfterTheSupplierCreatesAClaimForTheSessionForServiceForApplication(serviceId, appName string) {
ctx, done := context.WithCancel(context.Background())

// TODO_CONSIDERATION: if this test suite gets more complex, it might make
// sense to refactor this key into a function that takes serviceId and appName
// as arguments and returns the key.
eitherEventsBzReplayObs := s.scenarioState[eitherEventsBzReplayObsKey].(observable.ReplayObservable[either.Bytes])

// TODO(#220): refactor to use EventsReplayClient once available.
channel.ForEach[either.Bytes](
ctx, eitherEventsBzReplayObs,
func(_ context.Context, eitherEventBz either.Bytes) {
eventBz, err := eitherEventBz.ValueOrError()
require.NoError(s, err)

if strings.Contains(string(eventBz), "jsonrpc") {
return
}

// Unmarshal event data into a TxEventResponse object.
txEvent := &abci.TxResult{}
err = json.Unmarshal(eventBz, txEvent)
require.NoError(s, err)

var found bool
for _, event := range txEvent.Result.Events {
for _, attribute := range event.Attributes {
if attribute.Key == "action" {
require.Equal(
s, "/pocket.supplier.MsgCreateClaim",
attribute.Value,
)
found = true
break
}
}
if found {
break
}
}
require.Truef(s, found, "unable to find event action attribute")

done()
},
)

select {
case <-ctx.Done():
case <-time.After(createClaimTimeoutDuration):
s.Fatal("timed out waiting for claim to be created")
}
s.waitForMessageAction("/pocket.supplier.MsgCreateClaim")
}

func (s *suite) TheClaimCreatedBySupplierForServiceForApplicationShouldBePersistedOnchain(supplierName, serviceId, appName string) {
Expand All @@ -94,7 +59,8 @@ func (s *suite) TheClaimCreatedBySupplierForServiceForApplicationShouldBePersist
require.NotNil(s, allClaimsRes)

// Assert that the number of claims has increased by one.
preExistingClaims := s.scenarioState[preExistingClaimsKey].([]suppliertypes.Claim)
preExistingClaims, ok := s.scenarioState[preExistingClaimsKey].([]suppliertypes.Claim)
require.True(s, ok, "preExistingClaimsKey not found in scenarioState")
// NB: We are avoiding the use of require.Len here because it provides unreadable output
// TODO_TECHDEBT: Due to the speed of the blocks of the LocalNet sequencer, along with the small number
// of blocks per session, multiple claims may be created throughout the duration of the test. Until
Expand All @@ -119,23 +85,43 @@ func (s *suite) TheSupplierHasServicedASessionWithRelaysForServiceForApplication
relayCount, err := strconv.Atoi(relayCountStr)
require.NoError(s, err)

// Query for any existing claims so that we can compensate for them in the
// Query for any existing claims so that we can compare against them in
// future assertions about changes in on-chain claims.
allClaimsRes, err := s.supplierQueryClient.AllClaims(ctx, &suppliertypes.QueryAllClaimsRequest{})
require.NoError(s, err)
s.scenarioState[preExistingClaimsKey] = allClaimsRes.Claim

// Query for any existing proofs so that we can compare against them in
// future assertions about changes in on-chain proofs.
allProofsRes, err := s.supplierQueryClient.AllProofs(ctx, &suppliertypes.QueryAllProofsRequest{})
require.NoError(s, err)
s.scenarioState[preExistingProofsKey] = allProofsRes.Proof
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved

// Construct an events query client to listen for tx events from the supplier.
msgSenderQuery := fmt.Sprintf(msgClaimSenderQueryFmt, accNameToAddrMap[supplierName])
msgSenderQuery := fmt.Sprintf(txSenderEventSubscriptionQueryFmt, accNameToAddrMap[supplierName])

deps := depinject.Supply(events.NewEventsQueryClient(testclient.CometLocalWebsocketURL))
eventsReplayClient, err := events.NewEventsReplayClient[*abci.TxResult](
ctx,
deps,
msgSenderQuery,
func(eventBz []byte) (*abci.TxResult, error) {
if strings.Contains(string(eventBz), "jsonrpc") {
return nil, nil
}

// TODO_TECHDEBT(#220): refactor to use EventsReplayClient once available.
eventsQueryClient := events.NewEventsQueryClient(testclient.CometLocalWebsocketURL)
eitherEventsBzObs, err := eventsQueryClient.EventsBytes(ctx, msgSenderQuery)
// Unmarshal event data into an ABCI TxResult object.
txResult := &abci.TxResult{}
err = json.Unmarshal(eventBz, txResult)
require.NoError(s, err)

return txResult, nil
},
testEventsReplayClientBufferSize,
)
require.NoError(s, err)

eitherEventsBytesObs := observable.Observable[either.Bytes](eitherEventsBzObs)
eitherEventsBzRelayObs := channel.ToReplayObservable(ctx, eitherEventsReplayBufferSize, eitherEventsBytesObs)
s.scenarioState[eitherEventsBzReplayObsKey] = eitherEventsBzRelayObs
s.scenarioState[eventsReplayClientKey] = eventsReplayClient

s.sendRelaysForSession(
appName,
Expand All @@ -145,6 +131,42 @@ func (s *suite) TheSupplierHasServicedASessionWithRelaysForServiceForApplication
)
}

func (s *suite) AfterTheSupplierSubmitsAProofForTheSessionForServiceForApplication(a string, b string) {
s.waitForMessageAction("/pocket.supplier.MsgSubmitProof")
}

func (s *suite) TheProofSubmittedBySupplierForServiceForApplicationShouldBePersistedOnchain(supplierName, serviceId, appName string) {
ctx := context.Background()

// Retrieve all on-chain proofs for supplierName
allProofsRes, err := s.supplierQueryClient.AllProofs(ctx, &suppliertypes.QueryAllProofsRequest{
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
Filter: &suppliertypes.QueryAllProofsRequest_SupplierAddress{
SupplierAddress: accNameToAddrMap[supplierName],
},
})
require.NoError(s, err)
require.NotNil(s, allProofsRes)

// Assert that the number of proofs has increased by one.
preExistingProofs, ok := s.scenarioState[preExistingProofsKey].([]suppliertypes.Proof)
require.True(s, ok, "preExistingProofsKey not found in scenarioState")
// NB: We are avoiding the use of require.Len here because it provides unreadable output
// TODO_TECHDEBT: Due to the speed of the blocks of the LocalNet sequencer, along with the small number
// of blocks per session, multiple proofs may be created throughout the duration of the test. Until
// these values are appropriately adjusted, we assert on an increase in proofs rather than +1.
require.Greater(s, len(allProofsRes.Proof), len(preExistingProofs), "number of proofs must have increased")

// TODO_IMPROVE: assert that the root hash of the proof contains the correct
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
// SMST sum. The sum can be retrieved via the `GetSum` function exposed
// by the SMT.

// TODO_IMPROVE: add assertions about serviceId and appName and/or incorporate
// them into the scenarioState key(s).

proof := allProofsRes.Proof[0]
require.Equal(s, accNameToAddrMap[supplierName], proof.SupplierAddress)
}

func (s *suite) sendRelaysForSession(
appName string,
supplierName string,
Expand All @@ -163,3 +185,42 @@ func (s *suite) sendRelaysForSession(
s.TheApplicationReceivesASuccessfulRelayResponseSignedBy(appName, supplierName)
}
}

// waitForMessageAction waits for an event to be observed which has the given message action.
func (s *suite) waitForMessageAction(action string) {
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
ctx, done := context.WithCancel(context.Background())

eventsReplayClient, ok := s.scenarioState[eventsReplayClientKey].(client.EventsReplayClient[*abci.TxResult])
require.True(s, ok, "eventsReplayClientKey not found in scenarioState")
require.NotNil(s, eventsReplayClient)

// For each observed event, **asynchronously** check if it contains the given action.
channel.ForEach[*abci.TxResult](
bryanchriswhite marked this conversation as resolved.
Show resolved Hide resolved
Olshansk marked this conversation as resolved.
Show resolved Hide resolved
ctx, eventsReplayClient.EventsSequence(ctx),
func(_ context.Context, txEvent *abci.TxResult) {
if txEvent == nil {
return
}

// Range over each event's attributes to find the "action" attribute
// and compare its value to that of the action provided.
for _, event := range txEvent.Result.Events {
for _, attribute := range event.Attributes {
if attribute.Key == "action" {
if attribute.Value == action {
done()
return
}
}
}
}
},
)

select {
case <-time.After(txEventTimeout):
s.Fatalf("timed out waiting for message with action %q", action)
case <-ctx.Done():
s.Log("Success; message detected before timeout.")
}
}
2 changes: 2 additions & 0 deletions proto/pocket/supplier/query.proto
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ message QueryAllClaimsRequest {
}

message QueryAllClaimsResponse {
// TODO_IMPROVE: Rename to `Claims` (plural).
repeated Claim claim = 1 [(gogoproto.nullable) = false];
cosmos.base.query.v1beta1.PageResponse pagination = 2;
}
Expand All @@ -116,6 +117,7 @@ message QueryAllProofsRequest {
}

message QueryAllProofsResponse {
// TODO_IMPROVE: Rename to `Proofs` (plural).
repeated Proof proof = 1 [(gogoproto.nullable) = false];
cosmos.base.query.v1beta1.PageResponse pagination = 2;
}
Expand Down
2 changes: 1 addition & 1 deletion x/session/keeper/query_get_session.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func (k Keeper) GetSession(goCtx context.Context, req *types.QueryGetSessionRequ
}

if err := req.ValidateBasic(); err != nil {
return nil, err
return nil, status.Error(codes.InvalidArgument, err.Error())
}

ctx := sdk.UnwrapSDKContext(goCtx)
Expand Down
3 changes: 3 additions & 0 deletions x/supplier/client/cli/tx_create_claim_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package cli

// TODO_NEXT(@bryanchriswhite #140): add comprehensive CLI test coverage for creating claims.
3 changes: 3 additions & 0 deletions x/supplier/client/cli/tx_submit_proof_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package cli

// TODO_NEXT(@bryanchriswhite #141): add comprehensive CLI test coverage for submitting proofs.
5 changes: 4 additions & 1 deletion x/supplier/keeper/msg_server_create_claim.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"

sdk "github.com/cosmos/cosmos-sdk/types"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

suppliertypes "github.com/pokt-network/poktroll/x/supplier/types"
)
Expand All @@ -14,6 +16,7 @@ func (k msgServer) CreateClaim(goCtx context.Context, msg *suppliertypes.MsgCrea

ctx := sdk.UnwrapSDKContext(goCtx)
logger := k.Logger(ctx).With("method", "CreateClaim")
logger.Debug("creating claim")

if err := msg.ValidateBasic(); err != nil {
return nil, err
Expand All @@ -25,7 +28,7 @@ func (k msgServer) CreateClaim(goCtx context.Context, msg *suppliertypes.MsgCrea
msg.GetSupplierAddress(),
)
if err != nil {
return nil, err
return nil, status.Error(codes.InvalidArgument, err.Error())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this something we should be doing in message handlers, over returning an error from types/errors.go? Or is it specific for this use case that we are not doing so?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

logger.
Expand Down
Loading
Loading