Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Claim/proof improvements #36

Draft
wants to merge 16 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,021 changes: 989 additions & 32 deletions docs/static/openapi.yml

Large diffs are not rendered by default.

13 changes: 13 additions & 0 deletions proto/poktroll/servicer/claim.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
syntax = "proto3";
package poktroll.servicer;

option go_package = "poktroll/x/servicer/types";

message Claim {
// TODO_REFACTOR: Use SessionHeader everywhere
string SessionId = 1;
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
uint64 SessionNumber = 2;
uint64 CommittedHeight = 3;
bytes SmstRootHash = 4;
string ServicerAddress = 5;
}
34 changes: 26 additions & 8 deletions proto/poktroll/servicer/query.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,40 +7,48 @@ import "google/api/annotations.proto";
import "cosmos/base/query/v1beta1/pagination.proto";
import "poktroll/servicer/params.proto";
import "poktroll/servicer/servicers.proto";
import "poktroll/servicer/claim.proto";
import "poktroll/servicer/tx.proto";

option go_package = "poktroll/x/servicer/types";

// Query defines the gRPC querier service.
service Query {

// Parameters queries the parameters of the module.
rpc Params (QueryParamsRequest) returns (QueryParamsResponse) {
option (google.api.http).get = "/poktroll/servicer/params";

}

// Queries a list of Servicers items.
rpc Servicers (QueryGetServicersRequest) returns (QueryGetServicersResponse) {
option (google.api.http).get = "/poktroll/servicer/servicers/{address}";

}
rpc ServicersAll (QueryAllServicersRequest) returns (QueryAllServicersResponse) {
option (google.api.http).get = "/poktroll/servicer/servicers";

}

// Queries a list of Claims items.
rpc Claims (QueryClaimsRequest) returns (QueryClaimsResponse) {
option (google.api.http).get = "/poktroll/servicer/claims/{servicer_address}";


}

// Queries a list of Proofs items.
rpc Proofs (QueryProofsRequest) returns (QueryProofsResponse) {
option (google.api.http).get = "/poktroll/servicer/proofs/{servicer_address}";

}
}
// QueryParamsRequest is request type for the Query/Params RPC method.
message QueryParamsRequest {}

// QueryParamsResponse is response type for the Query/Params RPC method.
message QueryParamsResponse {

// params holds all the parameters of this module.
Params params = 1 [(gogoproto.nullable) = false];
}
Expand Down Expand Up @@ -68,7 +76,17 @@ message QueryClaimsRequest {
}

message QueryClaimsResponse {
repeated MsgClaim claims = 1 [(gogoproto.nullable) = false];
repeated Claim claims = 1 [(gogoproto.nullable) = false];
cosmos.base.query.v1beta1.PageResponse pagination = 2;
}

message QueryProofsRequest {
string servicer_address = 1;
cosmos.base.query.v1beta1.PageRequest pagination = 2;
}

message QueryProofsResponse {
repeated MsgProof proofs = 1 [(gogoproto.nullable) = false];
cosmos.base.query.v1beta1.PageResponse pagination = 2;
}

6 changes: 4 additions & 2 deletions proto/poktroll/servicer/tx.proto
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ message MsgClaim {
bytes smst_root_hash = 2;
// IMPROVE: move session_id into a new session_header field
string session_id = 3;
// TECHDEBT: expiration_height is not used right now and could be computed from on-chain data
int64 expiration_height = 4;
uint64 session_number = 4;
// TECHDEBT: invalidation_height is not used right now and could be computed from on-chain data
// (NB: this is carryover from V0; renamed from "expiration_height" for clarity)
int64 invalidation_height = 5;
}

message MsgClaimResponse {}
Expand Down
4 changes: 4 additions & 0 deletions proto/poktroll/session/session.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ message Session {
string session_id = 1;
uint64 session_number = 2; // The session number
uint64 session_block_start_height = 3; // The height at which the session started
// TODO_CONSIDERATION: do we really need this field? This should be a
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, we can probably remove it.

The ONLY thing I haven't thought through is how to handle changes in the number of blocks per session while a session is active.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had some thoughts about this. We could have a map of previous session numbers with their heights each time gov. changes this parameter (à la protocol upgrade).

Then if we want to check the height of a session we should calculate it using this map, eg.

[1, 10]: 20 // 2 blocks per session
[11, 15]: 40 // 11 to 15 had 4 blocks per session

If after session height 15, we have 5 blocks per session and we are at block 20 we do 40 + ((20 - 15) * 5) = 85 which is the block height for session 20. (don't mind off-by-one errors in the example if there are)

It also let us calculate the block height of any past session.

We may have a slightly more complex structure if we want different blocks_per_session per service.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's leave this for a post-alpha discussion and just assume a constant (governance-driven) parameter for now. What you're describing will work but I want to see if we can make it simpler.

// governance parameter & can't be trusted/used by the servicer
// msgServer/keeper. Need to think through what we want to when
// then number of blocks per session changes during an active session.
uint64 num_blocks_per_session = 4; // The number of blocks the session will last (starting at block_height)


Expand Down
1 change: 1 addition & 0 deletions relayer/client/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type ServicerClient interface {
// on-chain in exchange for a reward.
SubmitProof(
ctx context.Context,
sessionId string,
smtRootHash []byte,
closestKey []byte,
closestValueHash []byte,
Expand Down
2 changes: 2 additions & 0 deletions relayer/client/proofs.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

func (client *servicerClient) SubmitProof(
ctx context.Context,
sessionId string,
smtRootHash []byte,
closestKey []byte,
closestValueHash []byte,
Expand All @@ -25,6 +26,7 @@ func (client *servicerClient) SubmitProof(
}

msg := &types.MsgProof{
SessionId: sessionId,
ServicerAddress: client.address,
SmstRootHash: smtRootHash,
Path: closestKey,
Expand Down
37 changes: 28 additions & 9 deletions relayer/client/txs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,24 @@ package client
import (
"bytes"
"context"
"encoding/hex"
"encoding/json"
"fmt"
"log"
"poktroll/utils"
"poktroll/x/servicer/types"
"strings"

errorsmod "cosmossdk.io/errors"
abciTypes "github.com/cometbft/cometbft/abci/types"
cometTypes "github.com/cometbft/cometbft/types"
)

var (
errNotTxMsg = "expected tx websocket msg; got: %s"
errNotTxMsg = errorsmod.Register("relayer/client", 1, "Expected tx websocket msg")
errInvalidTimedOutTxHash = errorsmod.Register("relayer/client", 2, "Invalid time out tx hash")
errFailedToFetchTimedOutTx = errorsmod.Register("relayer/client", 3, "Failed to fetch error for timed out tx")
errTxTimeOut = errorsmod.Register("relayer/client", 4, "Tx timed out")
)

// cometTxResponseWebsocketMsg is used to deserialize incoming websocket messages from
Expand Down Expand Up @@ -48,8 +53,6 @@ func (client *servicerClient) subscribeToOwnTxs(

//return txsNotifee
go client.timeoutTxs(ctx, blocksNotifee)

return
}

// Closes the error channels for expect transactions from the latest block when it times out.
Expand Down Expand Up @@ -101,6 +104,8 @@ func (client *servicerClient) timeoutTxs(
txErrCh <- fmt.Errorf("tx timed out: %s", txHash)
close(txErrCh)
delete(txsByHash, txHash)

go client.getTxTimeoutError(ctx, txHash)
}

delete(client.txsByHashByTimeout, block.Height())
Expand All @@ -114,10 +119,9 @@ func (client *servicerClient) timeoutTxs(
func (client *servicerClient) txsFactoryHandler() messageHandler {
return func(ctx context.Context, msg []byte) error {
txMsg, err := client.newCometTxResponseMsg(msg)
expectedErr := fmt.Errorf(errNotTxMsg, string(msg))
switch {
case err == nil:
case err.Error() == expectedErr.Error():
case errNotTxMsg.Is(err):
return nil
case err != nil:
return fmt.Errorf("failed to parse new tx message: %w", err)
Expand Down Expand Up @@ -145,8 +149,8 @@ func (client *servicerClient) txsFactoryHandler() messageHandler {

// TODO_CONSIDERATION: do we really need both of these maps?
for timeoutHeight, txsByHash := range client.txsByHashByTimeout {
for txHash, _ := range txsByHash {
if txHash == txHash {
for txHashAsKey, _ := range txsByHash {
if txHash == txHashAsKey {
delete(txsByHash, txHash)
}
}
Expand All @@ -158,7 +162,7 @@ func (client *servicerClient) txsFactoryHandler() messageHandler {
}
}

// newCometTxResponseMsg attempts to deserialize the given bytes into a comet tx event byte slic.
// newCometTxResponseMsg attempts to deserialize the given bytes into a comet tx event byte slice.
// if the resulting block has a height of zero, assume the message was not a
// block message and return an errNotBlockMsg error.
func (client *servicerClient) newCometTxResponseMsg(txMsgBz []byte) (*cometTxResponseWebsocketMsg, error) {
Expand All @@ -169,8 +173,23 @@ func (client *servicerClient) newCometTxResponseMsg(txMsgBz []byte) (*cometTxRes

// If msg does not match the expected format then block will be its zero value.
if bytes.Equal(txResponseMsg.Tx, []byte{}) {
return nil, fmt.Errorf(errNotTxMsg, string(txMsgBz))
return nil, errNotTxMsg.Wrapf("got: %s", string(txMsgBz))
}

return txResponseMsg, nil
}

// This function is intended to be called as a goroutine
// TODO_DISCUSS: Should it be prefixed with `go`?
func (client *servicerClient) getTxTimeoutError(ctx context.Context, txHashHex string) error {
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
txHash, err := hex.DecodeString(txHashHex)
if err != nil {
return errInvalidTimedOutTxHash.Wrapf("got: %s", txHashHex)
}

txResponse, err := client.clientCtx.Client.Tx(ctx, txHash, false)
if err != nil {
return errFailedToFetchTimedOutTx.Wrapf("got tx: %s: %s", txHashHex, err.Error())
}
return errTxTimeOut.Wrapf("got: %x: %s", txHashHex, txResponse.TxResult.Log)
}
2 changes: 1 addition & 1 deletion relayer/client/websockets.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (client *servicerClient) subscribeWithQuery(ctx context.Context, query stri
panic(fmt.Errorf("failed to connect to websocket: %w", err))
}

// TODO_DISCUSS: Should we replace `requestId` with just
// TODO_DISCUSS: Should we replace `requestId` with just a random string for entropy
requestId := client.getNextRequestId()
conn.WriteJSON(map[string]interface{}{
"jsonrpc": "2.0",
Expand Down
10 changes: 9 additions & 1 deletion relayer/miner/miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,5 +174,13 @@ func (m *Miner) waitAndProve(ctx context.Context, session sessionmanager.Session
}

// SubmitProof ensures on-chain proof inclusion so we can safely prune the tree.
return m.client.SubmitProof(ctx, claimRoot, path, valueHash, sum, proof)
return m.client.SubmitProof(
ctx,
session.GetSessionId(),
claimRoot,
path,
valueHash,
sum,
proof,
)
}
2 changes: 2 additions & 0 deletions x/servicer/client/cli/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ func GetQueryCmd(queryRoute string) *cobra.Command {
cmd.AddCommand(CmdShowServicers())
cmd.AddCommand(CmdClaims())

cmd.AddCommand(CmdProofs())

// this line is used by starport scaffolding # 1

return cmd
Expand Down
46 changes: 46 additions & 0 deletions x/servicer/client/cli/query_proofs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package cli

import (
"strconv"

"github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/client/flags"
"github.com/spf13/cobra"
"poktroll/x/servicer/types"
)

var _ = strconv.Itoa(0)

func CmdProofs() *cobra.Command {
cmd := &cobra.Command{
Use: "proofs [servicer-address]",
Short: "Query proofs",
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) (err error) {
reqServicerAddress := args[0]

clientCtx, err := client.GetClientQueryContext(cmd)
if err != nil {
return err
}

queryClient := types.NewQueryClient(clientCtx)

params := &types.QueryProofsRequest{

ServicerAddress: reqServicerAddress,
}

res, err := queryClient.Proofs(cmd.Context(), params)
if err != nil {
return err
}

return clientCtx.PrintProto(res)
},
}

flags.AddQueryFlagsToCmd(cmd)

return cmd
}
21 changes: 19 additions & 2 deletions x/servicer/keeper/claims.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package keeper

import (
"fmt"

"github.com/cosmos/cosmos-sdk/store/prefix"
sdk "github.com/cosmos/cosmos-sdk/types"

"poktroll/x/servicer/types"
)

// InsertClaim inserts the given claim into the state tree.
func (k Keeper) InsertClaim(ctx sdk.Context, claim *types.MsgClaim) error {
func (k Keeper) InsertClaim(ctx sdk.Context, claim *types.Claim) error {
// TODO_CONSIDERATION: do we want to re-use the servicer store for claims or
// create a new "claims store"?
store := prefix.NewStore(ctx.KVStore(k.storeKey), types.KeyPrefix(types.ClaimsKeyPrefix))
Expand All @@ -17,7 +19,22 @@ func (k Keeper) InsertClaim(ctx sdk.Context, claim *types.MsgClaim) error {
return err
}

claimKey := fmt.Sprintf("%s", claim.SessionId)
claimKey := fmt.Sprintf("%s", claim.GetSessionId())
store.Set([]byte(claimKey), claimBz)
return nil
}

func (k Keeper) GetClaim(ctx sdk.Context, sessionId string) (*types.Claim, error) {
store := prefix.NewStore(ctx.KVStore(k.storeKey), types.KeyPrefix(types.ClaimsKeyPrefix))
claimBz := store.Get(types.ClaimsKey(sessionId))

if claimBz == nil {
return nil, fmt.Errorf("claim not found for sessionId: %s", sessionId)
}

var claim types.Claim
if err := claim.Unmarshal(claimBz); err != nil {
return nil, err
}
return &claim, nil
}
Loading