From f99e5daf34a601d617e92107826fcd261436a72e Mon Sep 17 00:00:00 2001 From: Redouane Lakrache Date: Thu, 21 Mar 2024 23:45:44 +0100 Subject: [PATCH] [Redelegation] fix: EventRedelegation unmarshaling failure (#435) Co-authored-by: Dmitry K --- .github/workflows-helpers/run-e2e-test.sh | 47 +++++++------------- pkg/client/delegation/redelegation.go | 24 +++------- pkg/client/tx/client.go | 14 +++--- pkg/client/tx/client_test.go | 27 +---------- testutil/testclient/testdelegation/client.go | 22 ++++++++- 5 files changed, 51 insertions(+), 83 deletions(-) diff --git a/.github/workflows-helpers/run-e2e-test.sh b/.github/workflows-helpers/run-e2e-test.sh index 044fcb32e..8a5143496 100644 --- a/.github/workflows-helpers/run-e2e-test.sh +++ b/.github/workflows-helpers/run-e2e-test.sh @@ -1,5 +1,3 @@ -# Enhanced Script for Debugging and Error Handling - # Log environment variables for debugging echo "Environment variables:" echo "NAMESPACE: ${NAMESPACE}" @@ -13,34 +11,23 @@ while :; do # Log the command echo "Running kubectl command to get pods with matching purpose=validator:" - # Get all pods with the matching purpose - PODS_JSON=$(kubectl get pods -n "${NAMESPACE}" -l pokt.network/purpose=validator -o json) - - # Log the raw output for debugging - echo "${PODS_JSON}" - - # Validate JSON output - if ! echo "${PODS_JSON}" | jq empty; then - echo "Error: kubectl command did not produce valid JSON." - exit 1 - fi - # Check if any pods are running and have the correct image SHA - READY_POD=$(echo "${PODS_JSON}" | jq -r ".items[] | select(.status.phase == \"Running\") | select(.spec.containers[].image | contains(\"${IMAGE_TAG}\")) | .metadata.name") + READY_POD=$(kubectl get pods -n "${NAMESPACE}" -l pokt.network/purpose=validator -o json | jq -r ".items[] | select(.status.phase == \"Running\") | select(any(.spec.containers[]; .image | contains(\"${IMAGE_TAG}\"))) | .metadata.name") # Check for non-running pods with incorrect image SHA to delete - NON_RUNNING_PODS=$(echo "${PODS_JSON}" | jq -r ".items[] | select(.status.phase != \"Running\") | .metadata.name") - INCORRECT_POD=$(echo "${NON_RUNNING_PODS}" | jq -r "select(.spec.containers[].image | contains(\"${IMAGE_TAG}\") | not) | .metadata.name") + kubectl get pods -n "${NAMESPACE}" -l pokt.network/purpose=validator -o json | jq -r ".items[] | select(.status.phase != \"Running\") | select(any(.spec.containers[]; .image | contains(\"${IMAGE_TAG}\") | not)) | .metadata.name" | while read INCORRECT_POD; do + if [[ -n "${INCORRECT_POD}" ]]; then + echo "Non-ready pod with incorrect image found: ${INCORRECT_POD}. Deleting..." + kubectl delete pod -n "${NAMESPACE}" "${INCORRECT_POD}" + echo "Pod deleted. StatefulSet will recreate the pod." + # Wait for a short duration to allow the StatefulSet to recreate the pod before checking again + sleep 10 + fi + done if [[ -n "${READY_POD}" ]]; then echo "Ready pod found: ${READY_POD}" break - elif [[ -n "${INCORRECT_POD}" ]]; then - echo "Non-ready pod with incorrect image found: ${INCORRECT_POD}. Deleting..." - kubectl delete pod -n ${NAMESPACE} ${INCORRECT_POD} - echo "Pod deleted. StatefulSet will recreate the pod." - # Wait for a short duration to allow the StatefulSet to recreate the pod before checking again - sleep 10 else echo "Validator with image ${IMAGE_TAG} is not ready yet and no incorrect pods found. Will retry checking for ready or incorrect pods in 10 seconds..." sleep 10 @@ -65,9 +52,9 @@ kubectl apply -f job.yaml # Wait for the pod to be created and be in a running state echo "Waiting for the e2e test pod to be in the running state..." while :; do - POD_NAME=$(kubectl get pods -n ${NAMESPACE} --selector=job-name=${JOB_NAME} -o jsonpath='{.items[*].metadata.name}') + POD_NAME=$(kubectl get pods -n "${NAMESPACE}" --selector=job-name=${JOB_NAME} -o jsonpath='{.items[*].metadata.name}') [[ -z "${POD_NAME}" ]] && echo "Waiting for pod to be scheduled..." && sleep 5 && continue - POD_STATUS=$(kubectl get pod ${POD_NAME} -n ${NAMESPACE} -o jsonpath='{.status.phase}') + POD_STATUS=$(kubectl get pod "${POD_NAME}" -n "${NAMESPACE}" -o jsonpath='{.status.phase}') [[ "${POD_STATUS}" == "Running" ]] && break echo "Current pod status: ${POD_STATUS}. Waiting for 'Running' status..." sleep 5 @@ -76,23 +63,19 @@ done echo "Pod is running. Monitoring logs and status..." # Stream the pod logs in the background -kubectl logs -f ${POD_NAME} -n ${NAMESPACE} & +kubectl logs -f "${POD_NAME}" -n "${NAMESPACE}" & # Monitor pod status in a loop while :; do - CURRENT_STATUS=$(kubectl get pod ${POD_NAME} -n ${NAMESPACE} -o jsonpath="{.status.containerStatuses[0].state}") + CURRENT_STATUS=$(kubectl get pod "${POD_NAME}" -n "${NAMESPACE}" -o jsonpath="{.status.containerStatuses[0].state}") if echo $CURRENT_STATUS | grep -q 'terminated'; then EXIT_CODE=$(echo $CURRENT_STATUS | jq '.terminated.exitCode') if [[ "$EXIT_CODE" != "0" ]]; then echo "Container terminated with exit code ${EXIT_CODE}" - kubectl delete job ${JOB_NAME} -n ${NAMESPACE} + kubectl delete job "${JOB_NAME}" -n "${NAMESPACE}" exit 1 fi break fi sleep 5 done - -# If the loop exits without failure, the job succeeded -echo "Job completed successfully" -kubectl delete job ${JOB_NAME} -n ${NAMESPACE} diff --git a/pkg/client/delegation/redelegation.go b/pkg/client/delegation/redelegation.go index 86af6906f..18772a86a 100644 --- a/pkg/client/delegation/redelegation.go +++ b/pkg/client/delegation/redelegation.go @@ -5,13 +5,11 @@ package delegation // of listening to all events and doing a verbose filter. import ( - "encoding/json" "strconv" - "cosmossdk.io/api/tendermint/abci" - "github.com/pokt-network/poktroll/pkg/client" "github.com/pokt-network/poktroll/pkg/client/events" + "github.com/pokt-network/poktroll/pkg/client/tx" ) // redelegationEventType is the type of the EventRedelegation event emitted by @@ -20,10 +18,6 @@ const redelegationEventType = "pocket.application.EventRedelegation" var _ client.Redelegation = (*redelegation)(nil) -// TxEvent is an alias for the CometBFT TxResult type used to decode the -// response bytes from the EventsQueryClient's subscription -type TxEvent = abci.TxResult - // redelegation wraps the EventRedelegation event emitted by the application // module, for use in the observable, it is one of the log entries embedded // within the log field of the response struct from the app module's query. @@ -49,19 +43,15 @@ func (d redelegation) GetGatewayAddress() string { // fails then the error is returned. func newRedelegationEventFactoryFn() events.NewEventsFn[client.Redelegation] { return func(eventBz []byte) (client.Redelegation, error) { - txEvent := new(TxEvent) - // Try to deserialize the provided bytes into a TxEvent. - if err := json.Unmarshal(eventBz, txEvent); err != nil { + // Try to deserialize the provided bytes into an abci.TxResult. + txResult, err := tx.UnmarshalTxResult(eventBz) + if err != nil { return nil, err } - // Check if the TxEvent has empty transaction bytes, which indicates - // the message is probably not a valid transaction event. - if len(txEvent.Tx) == 0 { - return nil, events.ErrEventsUnmarshalEvent.Wrap("empty transaction bytes") - } + // Iterate through the log entries to find EventRedelegation - for _, event := range txEvent.Result.Events { - if event.GetType_() != redelegationEventType { + for _, event := range txResult.Result.Events { + if event.GetType() != redelegationEventType { continue } var redelegationEvent redelegation diff --git a/pkg/client/tx/client.go b/pkg/client/tx/client.go index 27e1a6f4d..849de62aa 100644 --- a/pkg/client/tx/client.go +++ b/pkg/client/tx/client.go @@ -43,13 +43,13 @@ const ( // In order to simplify the logic of the TxClient var _ client.TxClient = (*txClient)(nil) -// cometTxEvent is used to deserialize incoming transaction event messages +// CometTxEvent is used to deserialize incoming transaction event messages // from the respective events query subscription. This structure is adapted // to handle CometBFT's unique serialization format, which diverges from // conventional approaches seen in implementations like rollkit's. The design // ensures accurate parsing and compatibility with CometBFT's serialization // of transaction results. -type cometTxEvent struct { +type CometTxEvent struct { Data struct { // TxResult is nested to accommodate CometBFT's serialization format, // ensuring correct deserialization of transaction results. @@ -512,24 +512,24 @@ func (txnClient *txClient) getTxTimeoutError(ctx context.Context, txHashHex stri // If the resulting TxResult has empty transaction bytes, it assumes that // the message was not a transaction results and returns an error. func UnmarshalTxResult(txResultBz []byte) (*abci.TxResult, error) { - var rpcReponse rpctypes.RPCResponse + var rpcResponse rpctypes.RPCResponse // Try to deserialize the provided bytes into an RPCResponse. - if err := json.Unmarshal(txResultBz, &rpcReponse); err != nil { + if err := json.Unmarshal(txResultBz, &rpcResponse); err != nil { return nil, events.ErrEventsUnmarshalEvent.Wrap(err.Error()) } - var txResult cometTxEvent + var txResult CometTxEvent // Try to deserialize the provided bytes into a TxResult. - if err := json.Unmarshal(rpcReponse.Result, &txResult); err != nil { + if err := json.Unmarshal(rpcResponse.Result, &txResult); err != nil { return nil, events.ErrEventsUnmarshalEvent.Wrap(err.Error()) } // Check if the TxResult has empty transaction bytes, which indicates // the message might not be a valid transaction event. if bytes.Equal(txResult.Data.Value.TxResult.Tx, []byte{}) { - return nil, events.ErrEventsUnmarshalEvent.Wrap("event bytes do not correspond to an comettypes.EventDataTx event") + return nil, events.ErrEventsUnmarshalEvent.Wrap("event bytes do not correspond to an abci.TxResult") } return &txResult.Data.Value.TxResult, nil diff --git a/pkg/client/tx/client_test.go b/pkg/client/tx/client_test.go index 770848a09..50849c6b6 100644 --- a/pkg/client/tx/client_test.go +++ b/pkg/client/tx/client_test.go @@ -8,7 +8,6 @@ import ( "cosmossdk.io/depinject" "cosmossdk.io/math" - abci "github.com/cometbft/cometbft/abci/types" cometbytes "github.com/cometbft/cometbft/libs/bytes" "github.com/cometbft/cometbft/libs/json" rpctypes "github.com/cometbft/cometbft/rpc/jsonrpc/types" @@ -108,13 +107,8 @@ func TestTxClient_SignAndBroadcast_Succeeds(t *testing.T) { require.NoError(t, err) // Construct the expected transaction event bytes from the expected transaction bytes. - txResultEvent := &testTxEvent{ - Data: testTxEventDataStruct{ - Value: testTxEventValueStruct{ - TxResult: abci.TxResult{Tx: expectedTx}, - }, - }, - } + txResultEvent := &tx.CometTxEvent{} + txResultEvent.Data.Value.TxResult.Tx = expectedTx txResultBz, err := json.Marshal(txResultEvent) require.NoError(t, err) @@ -438,20 +432,3 @@ func TestTxClient_SignAndBroadcast_Timeout(t *testing.T) { func TestTxClient_SignAndBroadcast_MultipleMsgs(t *testing.T) { t.SkipNow() } - -// TODO_BLOCKER: Fix duplicate definitions of this type across tests & source code. -// This duplicates the unexported `cometTxEvent` from `pkg/client/tx/client.go`. -// We need to answer the following questions to avoid this: -// - Should tests be their own packages? (i.e. `package block` vs `package block_test`) -// - Should we prefer export types which are not required for API consumption? -// - Should we use `//go:build“ test constraint on new files using it for testing purposes? -// - Should we enforce all tests to use `-tags=test`? -type testTxEvent struct { - Data testTxEventDataStruct `json:"data"` -} -type testTxEventDataStruct struct { - Value testTxEventValueStruct `json:"value"` -} -type testTxEventValueStruct struct { - TxResult abci.TxResult -} diff --git a/testutil/testclient/testdelegation/client.go b/testutil/testclient/testdelegation/client.go index c99d5b2cc..c019ae0d3 100644 --- a/testutil/testclient/testdelegation/client.go +++ b/testutil/testclient/testdelegation/client.go @@ -6,11 +6,14 @@ import ( "testing" "cosmossdk.io/depinject" + "github.com/cometbft/cometbft/libs/json" + rpctypes "github.com/cometbft/cometbft/rpc/jsonrpc/types" "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" "github.com/pokt-network/poktroll/pkg/client" "github.com/pokt-network/poktroll/pkg/client/delegation" + "github.com/pokt-network/poktroll/pkg/client/tx" "github.com/pokt-network/poktroll/pkg/observable" "github.com/pokt-network/poktroll/pkg/observable/channel" "github.com/pokt-network/poktroll/testutil/mockclient" @@ -149,6 +152,21 @@ func NewRedelegationEventBytes( ) []byte { t.Helper() jsonTemplate := `{"tx":"SGVsbG8sIHdvcmxkIQ==","result":{"events":[{"type":"message","attributes":[{"key":"action","value":"/pocket.application.MsgDelegateToGateway"},{"key":"sender","value":"pokt1exampleaddress"},{"key":"module","value":"application"}]},{"type":"pocket.application.EventRedelegation","attributes":[{"key":"app_address","value":"\"%s\""},{"key":"gateway_address","value":"\"%s\""}]}]}}` - json := fmt.Sprintf(jsonTemplate, appAddress, gatewayAddress) - return []byte(json) + + txResultEvent := &tx.CometTxEvent{} + + err := json.Unmarshal( + []byte(fmt.Sprintf(jsonTemplate, appAddress, gatewayAddress)), + &txResultEvent.Data.Value.TxResult, + ) + require.NoError(t, err) + + txResultBz, err := json.Marshal(txResultEvent) + require.NoError(t, err) + + rpcResult := &rpctypes.RPCResponse{Result: txResultBz} + rpcResultBz, err := json.Marshal(rpcResult) + require.NoError(t, err) + + return rpcResultBz }