From 16c6ebca3d705724f4f360697bf868acf64d1042 Mon Sep 17 00:00:00 2001 From: Dima Kniazev Date: Tue, 14 Nov 2023 16:45:27 -0800 Subject: [PATCH 01/16] [CI] Integrate E2E tests with GitHub CI (#152) * add more debug output * Empty commit * symlink to pocketd for now * rename pocketd * --wip-- [skip ci] * removing pocketd completely * also remove comment * rename repo * test the permissions * test permissions * set project_id: ${{ secrets.GKE_PROTOCOL_PROJECT }} * maybe that will help * test perms * try that * test job * try ls * add checkout * separate the jobs * ignite is needed for tests * avoid collision on name * backoffLimit 0 * wait for pod * change envs * add wait for sequencer before running a test * fix jq * fail fast * move to a script * try this * image sha to image tag * dont use make targets then * troubleshoot * pre-populate the variable * add debug output * install more stuff into the container * try this * cleanup * devnet-test-e2e label everywhere * add env * Update .github/workflows-helpers/run-e2e-test.sh Co-authored-by: Daniel Olshansky * Update .github/workflows/run-tests.yml Co-authored-by: Daniel Olshansky * requested changes --------- Co-authored-by: Daniel Olshansky --- .github/label-actions.yml | 14 ++--- .../run-e2e-test-job-template.yaml | 45 ++++++++++++++ .github/workflows-helpers/run-e2e-test.sh | 55 +++++++++++++++++ .github/workflows/{go.yml => main-build.yml} | 59 +++++++++++++------ .github/workflows/reviewdog.yml | 4 ++ .github/workflows/run-tests.yml | 48 +++++++++++++++ .gitignore | 1 - Dockerfile.dev | 4 +- e2e/tests/node.go | 6 ++ 9 files changed, 208 insertions(+), 28 deletions(-) create mode 100644 .github/workflows-helpers/run-e2e-test-job-template.yaml create mode 100644 .github/workflows-helpers/run-e2e-test.sh rename .github/workflows/{go.yml => main-build.yml} (63%) create mode 100644 .github/workflows/run-tests.yml diff --git a/.github/label-actions.yml b/.github/label-actions.yml index 0ab29008d..8618e0d5a 100644 --- a/.github/label-actions.yml +++ b/.github/label-actions.yml @@ -1,13 +1,13 @@ -# When `devnet-e2e-test` is added, also assign `devnet` to the PR. -devnet-e2e-test: +# When `devnet-test-e2e` is added, also assign `devnet` to the PR. +devnet-test-e2e: prs: comment: The CI will now also run the e2e tests on devnet, which increases the time it takes to complete all CI checks. label: - devnet - push-image -# When `devnet-e2e-test` is removed, also delete `devnet` from the PR. --devnet-e2e-test: +# When `devnet-test-e2e` is removed, also delete `devnet` from the PR. +-devnet-test-e2e: prs: unlabel: - devnet @@ -18,11 +18,11 @@ devnet: label: - push-image -# When `devnet` is removed, also delete `devnet-e2e-test` from the PR. +# When `devnet` is removed, also delete `devnet-test-e2e` from the PR. -devnet: prs: unlabel: - - devnet-e2e-test + - devnet-test-e2e # Let the developer know that they need to push another commit after attaching the label to PR. push-image: @@ -34,4 +34,4 @@ push-image: prs: unlabel: - devnet - - devnet-e2e-test + - devnet-test-e2e diff --git a/.github/workflows-helpers/run-e2e-test-job-template.yaml b/.github/workflows-helpers/run-e2e-test-job-template.yaml new file mode 100644 index 000000000..6ad0def13 --- /dev/null +++ b/.github/workflows-helpers/run-e2e-test-job-template.yaml @@ -0,0 +1,45 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: ${JOB_NAME} + namespace: ${NAMESPACE} +spec: + ttlSecondsAfterFinished: 120 + template: + spec: + containers: + - name: e2e-tests + image: ghcr.io/pokt-network/poktrolld:${IMAGE_TAG} + command: ["/bin/sh"] + args: ["-c", "poktrolld q gateway list-gateway --node=$POCKET_NODE && poktrolld q application list-application --node=$POCKET_NODE && poktrolld q supplier list-supplier --node=$POCKET_NODE && go test -v ./e2e/tests/... -tags=e2e"] + env: + - name: AUTH_TOKEN + valueFrom: + secretKeyRef: + key: auth_token + name: celestia-secret + - name: POCKET_NODE + value: tcp://${NAMESPACE}-sequencer:36657 + - name: E2E_DEBUG_OUTPUT + value: "false" # Flip to true to see the command and result of the execution + - name: POKTROLLD_HOME + value: /root/.pocket + - name: CELESTIA_HOSTNAME + value: celestia-rollkit + volumeMounts: + - mountPath: /root/.pocket/keyring-test/ + name: keys-volume + - mountPath: /root/.pocket/config/ + name: configs-volume + restartPolicy: Never + volumes: + - configMap: + defaultMode: 420 + name: poktrolld-keys + name: keys-volume + - configMap: + defaultMode: 420 + name: poktrolld-configs + name: configs-volume + serviceAccountName: default + backoffLimit: 0 diff --git a/.github/workflows-helpers/run-e2e-test.sh b/.github/workflows-helpers/run-e2e-test.sh new file mode 100644 index 000000000..723ab1eca --- /dev/null +++ b/.github/workflows-helpers/run-e2e-test.sh @@ -0,0 +1,55 @@ +# Check if the pod with the matching image SHA and purpose is ready +echo "Checking for ready sequencer pod with image SHA ${IMAGE_TAG}..." +while : ; do +# Get all pods with the matching purpose +PODS_JSON=$(kubectl get pods -n ${NAMESPACE} -l pokt.network/purpose=sequencer -o json) + +# Check if any pods are running and have the correct image SHA +READY_POD=$(echo $PODS_JSON | jq -r ".items[] | select(.status.phase == \"Running\") | select(.spec.containers[].image | contains(\"${IMAGE_TAG}\")) | .metadata.name") + +if [[ -n "${READY_POD}" ]]; then + echo "Ready pod found: ${READY_POD}" + break +else + echo "Sequencer with with an image ${IMAGE_TAG} is not ready yet. Will retry in 10 seconds..." + sleep 10 +fi +done + +# Create a job to run the e2e tests +envsubst < .github/workflows-helpers/run-e2e-test-job-template.yaml > job.yaml +kubectl apply -f job.yaml + +# Wait for the pod to be created and be in a running state +echo "Waiting for the pod to be in the running state..." +while : ; do +POD_NAME=$(kubectl get pods -n ${NAMESPACE} --selector=job-name=${JOB_NAME} -o jsonpath='{.items[*].metadata.name}') +[[ -z "${POD_NAME}" ]] && echo "Waiting for pod to be scheduled..." && sleep 5 && continue +POD_STATUS=$(kubectl get pod ${POD_NAME} -n ${NAMESPACE} -o jsonpath='{.status.phase}') +[[ "${POD_STATUS}" == "Running" ]] && break +echo "Current pod status: ${POD_STATUS}" +sleep 5 +done + +echo "Pod is running. Monitoring logs and status..." +# Stream the pod logs in the background +kubectl logs -f ${POD_NAME} -n ${NAMESPACE} & + +# Monitor pod status in a loop +while : ; do +CURRENT_STATUS=$(kubectl get pod ${POD_NAME} -n ${NAMESPACE} -o jsonpath="{.status.containerStatuses[0].state}") +if echo $CURRENT_STATUS | grep -q 'terminated'; then + EXIT_CODE=$(echo $CURRENT_STATUS | jq '.terminated.exitCode') + if [[ "$EXIT_CODE" != "0" ]]; then + echo "Container terminated with exit code ${EXIT_CODE}" + kubectl delete job ${JOB_NAME} -n ${NAMESPACE} + exit 1 + fi + break +fi +sleep 5 +done + +# If the loop exits without failure, the job succeeded +echo "Job completed successfully" +kubectl delete job ${JOB_NAME} -n ${NAMESPACE} diff --git a/.github/workflows/go.yml b/.github/workflows/main-build.yml similarity index 63% rename from .github/workflows/go.yml rename to .github/workflows/main-build.yml index 6f76334c1..83893df60 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/main-build.yml @@ -1,7 +1,4 @@ -# This workflow will build a golang project -# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-go - -name: Ignite build & test +name: Main build on: push: @@ -9,11 +6,11 @@ on: pull_request: concurrency: - group: ${{ github.head_ref || github.ref_name }} + group: ${{ github.workflow }}-${{ github.head_ref || github.ref_name }} cancel-in-progress: true jobs: - build: + build-push-container: runs-on: ubuntu-latest steps: - name: install ignite @@ -29,7 +26,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v4 with: - go-version: "1.20" + go-version: "1.20.10" - name: Install CI dependencies run: make install_ci_deps @@ -37,18 +34,9 @@ jobs: - name: Generate protobufs run: make proto_regen - - name: Generate mocks - run: make go_mockgen - - - name: Run golangci-lint - run: make go_lint - - name: Build run: ignite chain build -v --debug --skip-proto - - name: Test - run: make go_test - - name: Set up Docker Buildx if: (github.ref == 'refs/heads/main') || (contains(github.event.pull_request.labels.*.name, 'push-image')) uses: docker/setup-buildx-action@v3 @@ -61,7 +49,7 @@ jobs: DOCKER_METADATA_PR_HEAD_SHA: "true" with: images: | - ghcr.io/pokt-network/pocketd + ghcr.io/pokt-network/poktrolld tags: | type=ref,event=branch type=ref,event=pr @@ -76,11 +64,13 @@ jobs: username: ${{ github.actor }} password: ${{ secrets.GITHUB_TOKEN }} - - name: Copy binary to inside of the Docker context + - name: Copy binaries to inside of the Docker context if: (github.ref == 'refs/heads/main') || (contains(github.event.pull_request.labels.*.name, 'push-image')) run: | mkdir -p ./bin # Make sure the bin directory exists + cp $(which ignite) ./bin # Copy ignite binary to the repo's bin directory cp $(go env GOPATH)/bin/poktrolld ./bin # Copy the binary to the repo's bin directory + ls -la ./bin - name: Build and push Docker image if: (github.ref == 'refs/heads/main') || (contains(github.event.pull_request.labels.*.name, 'push-image')) @@ -95,3 +85,36 @@ jobs: cache-from: type=gha cache-to: type=gha,mode=max context: . + + run-e2e-tests: + needs: build-push-container + if: contains(github.event.pull_request.labels.*.name, 'devnet-test-e2e') + runs-on: ubuntu-latest + env: + GKE_CLUSTER: protocol-us-central1 + GKE_ZONE: us-central1 + steps: + - uses: actions/checkout@v4 + with: + sparse-checkout: | + .github + + - id: 'auth' + name: 'Authenticate to Google Cloud' + uses: 'google-github-actions/auth@v1' + with: + credentials_json: '${{ secrets.GKE_PROTOCOL_US_CENTRAL }}' + + - uses: google-github-actions/get-gke-credentials@v1 + with: + cluster_name: ${{ env.GKE_CLUSTER }} + location: ${{ env.GKE_ZONE }} + project_id: ${{ secrets.GKE_PROTOCOL_PROJECT }} + + - name: Run E2E test job + env: + IMAGE_TAG: sha-${{ github.event.pull_request.head.sha || github.sha }} + NAMESPACE: devnet-issue-${{ github.event.number }} + JOB_NAME: e2e-test-${{ github.event.pull_request.head.sha || github.sha }} + POCKET_NODE: tcp://devnet-issue-${{ github.event.number }}-sequencer:36657 + run: bash .github/workflows-helpers/run-e2e-test.sh \ No newline at end of file diff --git a/.github/workflows/reviewdog.yml b/.github/workflows/reviewdog.yml index 62afe323e..2fb9c9dd7 100644 --- a/.github/workflows/reviewdog.yml +++ b/.github/workflows/reviewdog.yml @@ -4,6 +4,10 @@ on: pull_request: branches: ["main"] +concurrency: + group: ${{ github.workflow }}-${{ github.head_ref || github.ref_name }} + cancel-in-progress: true + jobs: # Makes sure that comments like TODO_IN_THIS_PR or TODO_IN_THIS_COMMIT block merging to main # More info: https://github.com/pokt-network/action-fail-on-found diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml new file mode 100644 index 000000000..6bd5990c7 --- /dev/null +++ b/.github/workflows/run-tests.yml @@ -0,0 +1,48 @@ +name: Run tests + +on: + push: + branches: ["main"] + pull_request: + +concurrency: + group: ${{ github.workflow }}-${{ github.head_ref || github.ref_name }} + cancel-in-progress: true + +env: + GKE_CLUSTER: protocol-us-central1 + GKE_ZONE: us-central1 + +jobs: + go-test: + runs-on: ubuntu-latest + steps: + - name: install ignite + # If this step fails due to ignite.com failing, see #116 for a temporary workaround + run: | + curl https://get.ignite.com/cli! | bash + ignite version + + - uses: actions/checkout@v3 + with: + fetch-depth: "0" # Per https://github.com/ignite/cli/issues/1674#issuecomment-1144619147 + + - name: Set up Go + uses: actions/setup-go@v4 + with: + go-version: "1.20.10" + + - name: Install CI dependencies + run: make install_ci_deps + + - name: Generate protobufs + run: make proto_regen + + - name: Generate mocks + run: make go_mockgen + + - name: Run golangci-lint + run: make go_lint + + - name: Test + run: make go_test diff --git a/.gitignore b/.gitignore index 6dbdd3c40..a10753b59 100644 --- a/.gitignore +++ b/.gitignore @@ -32,7 +32,6 @@ localnet/*/config/*.json !localnet/poktrolld/config/client.toml !localnet/poktrolld/config/config.toml - # Macos .DS_Store **/.DS_Store diff --git a/Dockerfile.dev b/Dockerfile.dev index 2d10955e0..627a09fef 100644 --- a/Dockerfile.dev +++ b/Dockerfile.dev @@ -6,7 +6,7 @@ FROM golang:1.20 as base RUN apt update && \ apt-get install -y \ ca-certificates \ - curl jq make + curl jq make vim less # enable faster module downloading. ENV GOPROXY https://proxy.golang.org @@ -15,7 +15,7 @@ COPY . /poktroll WORKDIR /poktroll -RUN mv /poktroll/bin/poktrolld /usr/bin/poktrolld +RUN mv /poktroll/bin/ignite /usr/bin/ && mv /poktroll/bin/poktrolld /usr/bin/ EXPOSE 8545 EXPOSE 8546 diff --git a/e2e/tests/node.go b/e2e/tests/node.go index e8ed04dc1..ad0540942 100644 --- a/e2e/tests/node.go +++ b/e2e/tests/node.go @@ -21,6 +21,8 @@ var ( defaultRPCHost = "127.0.0.1" // defaultHome is the default home directory for pocketd defaultHome = os.Getenv("POKTROLLD_HOME") + // defaultDebugOutput provides verbose output on manipulations with binaries (cli command, stdout, stderr) + defaultDebugOutput = os.Getenv("E2E_DEBUG_OUTPUT") ) func init() { @@ -93,5 +95,9 @@ func (p *pocketdBin) runCmd(args ...string) (*commandResult, error) { err = fmt.Errorf("error running command [%s]: %v, stderr: %s", commandStr, err, stderrBuf.String()) } + if defaultDebugOutput == "true" { + fmt.Printf("%#v\n", r) + } + return r, err } From 8f61392c083eea3ef7292c84617c684e4aa5d76e Mon Sep 17 00:00:00 2001 From: Bryan White Date: Wed, 15 Nov 2023 09:36:07 +0100 Subject: [PATCH 02/16] [Supplier] chore: improve supplier not found error message (#183) * chore: improve error message seen by RPC consumers when the given supplier isn't on-chain * fix: query supplier test * chore: add todo comment --- x/supplier/keeper/query_supplier.go | 5 ++++- x/supplier/keeper/query_supplier_test.go | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/x/supplier/keeper/query_supplier.go b/x/supplier/keeper/query_supplier.go index 14d5afab5..7f381104c 100644 --- a/x/supplier/keeper/query_supplier.go +++ b/x/supplier/keeper/query_supplier.go @@ -2,6 +2,7 @@ package keeper import ( "context" + "fmt" "github.com/cosmos/cosmos-sdk/store/prefix" sdk "github.com/cosmos/cosmos-sdk/types" @@ -52,7 +53,9 @@ func (k Keeper) Supplier(goCtx context.Context, req *types.QueryGetSupplierReque req.Address, ) if !found { - return nil, status.Error(codes.NotFound, "not found") + // TODO_TECHDEBT(#181): conform to logging conventions once established + msg := fmt.Sprintf("supplier with address %q", req.GetAddress()) + return nil, status.Error(codes.NotFound, msg) } return &types.QueryGetSupplierResponse{Supplier: val}, nil diff --git a/x/supplier/keeper/query_supplier_test.go b/x/supplier/keeper/query_supplier_test.go index 6690e3f75..ad28a377b 100644 --- a/x/supplier/keeper/query_supplier_test.go +++ b/x/supplier/keeper/query_supplier_test.go @@ -47,7 +47,7 @@ func TestSupplierQuerySingle(t *testing.T) { request: &types.QueryGetSupplierRequest{ Address: strconv.Itoa(100000), }, - err: status.Error(codes.NotFound, "not found"), + err: status.Error(codes.NotFound, "supplier with address \"100000\""), }, { desc: "InvalidRequest", From 4ce831b7de798867f7964c1d353ae0b59ec7b421 Mon Sep 17 00:00:00 2001 From: harry <53987565+h5law@users.noreply.github.com> Date: Wed, 15 Nov 2023 15:40:09 +0000 Subject: [PATCH 03/16] chore: cleanup flags and dependencies for appgateserver cmd --- pkg/appgateserver/cmd/cmd.go | 137 ++++++++++++++++++++++++++--------- 1 file changed, 104 insertions(+), 33 deletions(-) diff --git a/pkg/appgateserver/cmd/cmd.go b/pkg/appgateserver/cmd/cmd.go index a9ff2908f..3ab648a93 100644 --- a/pkg/appgateserver/cmd/cmd.go +++ b/pkg/appgateserver/cmd/cmd.go @@ -7,26 +7,33 @@ import ( "log" "net/http" "net/url" - "os" - "os/signal" "cosmossdk.io/depinject" cosmosclient "github.com/cosmos/cosmos-sdk/client" - "github.com/cosmos/cosmos-sdk/client/flags" + cosmosflags "github.com/cosmos/cosmos-sdk/client/flags" "github.com/spf13/cobra" + "github.com/pokt-network/poktroll/cmd/signals" "github.com/pokt-network/poktroll/pkg/appgateserver" - blockclient "github.com/pokt-network/poktroll/pkg/client/block" + "github.com/pokt-network/poktroll/pkg/client/block" eventsquery "github.com/pokt-network/poktroll/pkg/client/events_query" ) +const omittedDefaultFlagValue = "explicitly omitting default" + var ( flagSigningKey string flagSelfSigning bool flagListeningEndpoint string - flagCometWebsocketUrl string + flagQueryNodeUrl string ) +type supplierFn func( + context.Context, + depinject.Config, + *cobra.Command, +) (depinject.Config, error) + func AppGateServerCmd() *cobra.Command { cmd := &cobra.Command{ Use: "appgate-server", @@ -58,11 +65,11 @@ relays to the AppGate server and function as an Application, provided that: cmd.Flags().StringVar(&flagSigningKey, "signing-key", "", "The name of the key that will be used to sign relays") cmd.Flags().StringVar(&flagListeningEndpoint, "listening-endpoint", "http://localhost:42069", "The host and port that the appgate server will listen on") - cmd.Flags().StringVar(&flagCometWebsocketUrl, "comet-websocket-url", "ws://localhost:36657/websocket", "The URL of the comet websocket endpoint to communicate with the pocket blockchain") cmd.Flags().BoolVar(&flagSelfSigning, "self-signing", false, "Whether the server should sign all incoming requests with its own ring (for applications)") + cmd.Flags().StringVar(&flagQueryNodeUrl, "query-node", omittedDefaultFlagValue, "The URL of the pocket node to query for on-chain data") - cmd.Flags().String(flags.FlagKeyringBackend, "", "Select keyring's backend (os|file|kwallet|pass|test)") - cmd.Flags().String(flags.FlagNode, "tcp://localhost:36657", "The URL of the comet tcp endpoint to communicate with the pocket blockchain") + cmd.Flags().String(cosmosflags.FlagKeyringBackend, "", "Select keyring's backend (os|file|kwallet|pass|test)") + cmd.Flags().String(cosmosflags.FlagNode, omittedDefaultFlagValue, "The URL of the comet tcp endpoint to communicate with the pocket blockchain") return cmd } @@ -72,18 +79,8 @@ func runAppGateServer(cmd *cobra.Command, _ []string) error { ctx, cancelCtx := context.WithCancel(cmd.Context()) defer cancelCtx() - // Handle interrupts in a goroutine. - go func() { - sigCh := make(chan os.Signal, 1) - signal.Notify(sigCh, os.Interrupt) - - // Block until we receive an interrupt or kill signal (OS-agnostic) - <-sigCh - log.Println("INFO: Interrupt signal received, shutting down...") - - // Signal goroutines to stop - cancelCtx() - }() + // Handle interrupt and kill signals asynchronously. + signals.GoOnExitSignal(cancelCtx) // Parse the listening endpoint. listeningUrl, err := url.Parse(flagListeningEndpoint) @@ -92,7 +89,7 @@ func runAppGateServer(cmd *cobra.Command, _ []string) error { } // Setup the AppGate server dependencies. - appGateServerDeps, err := setupAppGateServerDependencies(cmd, ctx, flagCometWebsocketUrl) + appGateServerDeps, err := setupAppGateServerDependencies(ctx, cmd) if err != nil { return fmt.Errorf("failed to setup AppGate server dependencies: %w", err) } @@ -127,21 +124,95 @@ func runAppGateServer(cmd *cobra.Command, _ []string) error { return nil } -func setupAppGateServerDependencies(cmd *cobra.Command, ctx context.Context, cometWebsocketUrl string) (depinject.Config, error) { - // Retrieve the client context for the chain interactions. - clientCtx := cosmosclient.GetClientContextFromCmd(cmd) +func setupAppGateServerDependencies( + ctx context.Context, + cmd *cobra.Command, +) (depinject.Config, error) { + pocketNodeWebsocketUrl, err := getPocketNodeWebsocketUrl() + if err != nil { + return nil, err + } + + supplierFuncs := []supplierFn{ + newSupplyEventsQueryClientFn(pocketNodeWebsocketUrl), + newSupplyBlockClientFn(pocketNodeWebsocketUrl), + newSupplyClientContextFn(cmd), + } + + // Initialize deps to with empty depinject config. + deps := depinject.Configs() + for _, supplyFn := range supplierFuncs { + deps, err = supplyFn(ctx, deps, cmd) + if err != nil { + return nil, err + } + } + + return deps, nil +} - // Create the events client. - eventsQueryClient := eventsquery.NewEventsQueryClient(cometWebsocketUrl) +// getPocketNodeWebsocketUrl returns the websocket URL of the Pocket Node to +// connect to for subscribing to on-chain events. +func getPocketNodeWebsocketUrl() (string, error) { + if flagQueryNodeUrl == omittedDefaultFlagValue { + return "", errors.New("missing required flag: --query-node") + } - // Create the block client. - log.Printf("INFO: Creating block client, using comet websocket URL: %s...", cometWebsocketUrl) - deps := depinject.Supply(eventsQueryClient) - blockClient, err := blockclient.NewBlockClient(ctx, deps, cometWebsocketUrl) + pocketNodeURL, err := url.Parse(flagQueryNodeUrl) if err != nil { - return nil, fmt.Errorf("failed to create block client: %w", err) + return "", err } - // Return the dependencie config. - return depinject.Supply(clientCtx, blockClient), nil + return fmt.Sprintf("ws://%s/websocket", pocketNodeURL.Host), nil +} + +// newSupplyEventsQueryClientFn constructs an EventsQueryClient instance and returns +// a new depinject.Config which is supplied with the given deps and the new +// EventsQueryClient. +func newSupplyEventsQueryClientFn( + pocketNodeWebsocketUrl string, +) supplierFn { + return func( + _ context.Context, + deps depinject.Config, + _ *cobra.Command, + ) (depinject.Config, error) { + eventsQueryClient := eventsquery.NewEventsQueryClient(pocketNodeWebsocketUrl) + + return depinject.Configs(deps, depinject.Supply(eventsQueryClient)), nil + } +} + +// newSupplyBlockClientFn returns a function with constructs a BlockClient instance +// with the given nodeURL and returns a new +// depinject.Config which is supplied with the given deps and the new +// BlockClient. +func newSupplyBlockClientFn(pocketNodeWebsocketUrl string) supplierFn { + return func( + ctx context.Context, + deps depinject.Config, + _ *cobra.Command, + ) (depinject.Config, error) { + blockClient, err := block.NewBlockClient(ctx, deps, pocketNodeWebsocketUrl) + if err != nil { + return nil, err + } + + return depinject.Configs(deps, depinject.Supply(blockClient)), nil + } +} + +// newSupplyClientContextFn returns a function with constructs a ClientContext instance +// with the given cmd and returns a new depinject.Config which is supplied with +// the given deps and the new ClientContext. +func newSupplyClientContextFn(cmd *cobra.Command) supplierFn { + return func( + _ context.Context, + deps depinject.Config, + _ *cobra.Command, + ) (depinject.Config, error) { + clientCtx := cosmosclient.GetClientContextFromCmd(cmd) + + return depinject.Configs(deps, depinject.Supply(clientCtx)), nil + } } From d83c1e74b09535bb8125d21b24bb8ac7ee9f5efa Mon Sep 17 00:00:00 2001 From: harry <53987565+h5law@users.noreply.github.com> Date: Wed, 15 Nov 2023 16:05:27 +0000 Subject: [PATCH 04/16] chore: move shared dependency setup logic to shared pkg --- pkg/appgateserver/cmd/cmd.go | 89 +++++++++++------------------------- pkg/deps/config/config.go | 73 +++++++++++++++++++++++++++++ pkg/relayer/cmd/cmd.go | 31 ++++--------- 3 files changed, 110 insertions(+), 83 deletions(-) create mode 100644 pkg/deps/config/config.go diff --git a/pkg/appgateserver/cmd/cmd.go b/pkg/appgateserver/cmd/cmd.go index 3ab648a93..e539cf7bd 100644 --- a/pkg/appgateserver/cmd/cmd.go +++ b/pkg/appgateserver/cmd/cmd.go @@ -15,8 +15,7 @@ import ( "github.com/pokt-network/poktroll/cmd/signals" "github.com/pokt-network/poktroll/pkg/appgateserver" - "github.com/pokt-network/poktroll/pkg/client/block" - eventsquery "github.com/pokt-network/poktroll/pkg/client/events_query" + "github.com/pokt-network/poktroll/pkg/deps/config" ) const omittedDefaultFlagValue = "explicitly omitting default" @@ -28,12 +27,6 @@ var ( flagQueryNodeUrl string ) -type supplierFn func( - context.Context, - depinject.Config, - *cobra.Command, -) (depinject.Config, error) - func AppGateServerCmd() *cobra.Command { cmd := &cobra.Command{ Use: "appgate-server", @@ -133,22 +126,13 @@ func setupAppGateServerDependencies( return nil, err } - supplierFuncs := []supplierFn{ - newSupplyEventsQueryClientFn(pocketNodeWebsocketUrl), - newSupplyBlockClientFn(pocketNodeWebsocketUrl), - newSupplyClientContextFn(cmd), - } - - // Initialize deps to with empty depinject config. - deps := depinject.Configs() - for _, supplyFn := range supplierFuncs { - deps, err = supplyFn(ctx, deps, cmd) - if err != nil { - return nil, err - } + supplierFuncs := []config.SupplierFn{ + config.NewSupplyEventsQueryClientFn(pocketNodeWebsocketUrl), + config.NewSupplyBlockClientFn(pocketNodeWebsocketUrl), + newSupplyQueryClientContextFn(flagQueryNodeUrl), } - return deps, nil + return config.SupplyConfig(ctx, cmd, supplierFuncs) } // getPocketNodeWebsocketUrl returns the websocket URL of the Pocket Node to @@ -166,53 +150,34 @@ func getPocketNodeWebsocketUrl() (string, error) { return fmt.Sprintf("ws://%s/websocket", pocketNodeURL.Host), nil } -// newSupplyEventsQueryClientFn constructs an EventsQueryClient instance and returns -// a new depinject.Config which is supplied with the given deps and the new -// EventsQueryClient. -func newSupplyEventsQueryClientFn( - pocketNodeWebsocketUrl string, -) supplierFn { +// newSupplyQueryClientContextFn returns a new depinject.Config which is supplied with +// the given deps and a new cosmos ClientCtx +func newSupplyQueryClientContextFn(pocketQueryClientUrl string) config.SupplierFn { return func( _ context.Context, deps depinject.Config, - _ *cobra.Command, - ) (depinject.Config, error) { - eventsQueryClient := eventsquery.NewEventsQueryClient(pocketNodeWebsocketUrl) - - return depinject.Configs(deps, depinject.Supply(eventsQueryClient)), nil - } -} - -// newSupplyBlockClientFn returns a function with constructs a BlockClient instance -// with the given nodeURL and returns a new -// depinject.Config which is supplied with the given deps and the new -// BlockClient. -func newSupplyBlockClientFn(pocketNodeWebsocketUrl string) supplierFn { - return func( - ctx context.Context, - deps depinject.Config, - _ *cobra.Command, + cmd *cobra.Command, ) (depinject.Config, error) { - blockClient, err := block.NewBlockClient(ctx, deps, pocketNodeWebsocketUrl) + // Set --node flag to the pocketQueryClientUrl for the client context + // This flag is read by cosmosclient.GetClientQueryContext. + err := cmd.Flags().Set(cosmosflags.FlagNode, pocketQueryClientUrl) if err != nil { return nil, err } - return depinject.Configs(deps, depinject.Supply(blockClient)), nil - } -} - -// newSupplyClientContextFn returns a function with constructs a ClientContext instance -// with the given cmd and returns a new depinject.Config which is supplied with -// the given deps and the new ClientContext. -func newSupplyClientContextFn(cmd *cobra.Command) supplierFn { - return func( - _ context.Context, - deps depinject.Config, - _ *cobra.Command, - ) (depinject.Config, error) { - clientCtx := cosmosclient.GetClientContextFromCmd(cmd) - - return depinject.Configs(deps, depinject.Supply(clientCtx)), nil + // NB: Currently, the implementations of GetClientTxContext() and + // GetClientQueryContext() are identical, allowing for their interchangeable + // use in both querying and transaction operations. However, in order to support + // independent configuration of client contexts for distinct querying and + // transacting purposes. E.g.: transactions are dispatched to the sequencer + // while queries are handled by a trusted full-node. + queryClientCtx, err := cosmosclient.GetClientQueryContext(cmd) + if err != nil { + return nil, err + } + deps = depinject.Configs(deps, depinject.Supply( + queryClientCtx, + )) + return deps, nil } } diff --git a/pkg/deps/config/config.go b/pkg/deps/config/config.go new file mode 100644 index 000000000..9621fa1de --- /dev/null +++ b/pkg/deps/config/config.go @@ -0,0 +1,73 @@ +package config + +import ( + "context" + + "cosmossdk.io/depinject" + "github.com/spf13/cobra" + + "github.com/pokt-network/poktroll/pkg/client/block" + eventsquery "github.com/pokt-network/poktroll/pkg/client/events_query" +) + +// SupplierFn is a function that is used to supply a depinject config. +type SupplierFn func( + context.Context, + depinject.Config, + *cobra.Command, +) (depinject.Config, error) + +// SupplyConfig supplies a depinject config by calling each of the supplied +// supplier functions in order and passing the result of each supplier to the +// next supplier, chaining them together. +func SupplyConfig( + ctx context.Context, + cmd *cobra.Command, + suppliers []SupplierFn, +) (deps depinject.Config, err error) { + // Initialize deps to with empty depinject config. + deps = depinject.Configs() + for _, supplyFn := range suppliers { + deps, err = supplyFn(ctx, deps, cmd) + if err != nil { + return nil, err + } + } + return deps, nil +} + +// NewSupplyEventsQueryClientFn constructs an EventsQueryClient instance and returns +// a new depinject.Config which is supplied with the given deps and the new +// EventsQueryClient. +func NewSupplyEventsQueryClientFn( + pocketNodeWebsocketUrl string, +) SupplierFn { + return func( + _ context.Context, + deps depinject.Config, + _ *cobra.Command, + ) (depinject.Config, error) { + eventsQueryClient := eventsquery.NewEventsQueryClient(pocketNodeWebsocketUrl) + + return depinject.Configs(deps, depinject.Supply(eventsQueryClient)), nil + } +} + +// NewSupplyBlockClientFn returns a function with constructs a BlockClient instance +// with the given nodeURL and returns a new +// depinject.Config which is supplied with the given deps and the new +// BlockClient. +func NewSupplyBlockClientFn(pocketNodeWebsocketUrl string) SupplierFn { + return func( + ctx context.Context, + deps depinject.Config, + _ *cobra.Command, + ) (depinject.Config, error) { + blockClient, err := block.NewBlockClient(ctx, deps, pocketNodeWebsocketUrl) + if err != nil { + return nil, err + } + + return depinject.Configs(deps, depinject.Supply(blockClient)), nil + } +} diff --git a/pkg/relayer/cmd/cmd.go b/pkg/relayer/cmd/cmd.go index a4db8c1ba..1f69b5397 100644 --- a/pkg/relayer/cmd/cmd.go +++ b/pkg/relayer/cmd/cmd.go @@ -17,6 +17,7 @@ import ( eventsquery "github.com/pokt-network/poktroll/pkg/client/events_query" "github.com/pokt-network/poktroll/pkg/client/supplier" "github.com/pokt-network/poktroll/pkg/client/tx" + "github.com/pokt-network/poktroll/pkg/deps/config" "github.com/pokt-network/poktroll/pkg/relayer" "github.com/pokt-network/poktroll/pkg/relayer/miner" "github.com/pokt-network/poktroll/pkg/relayer/proxy" @@ -32,12 +33,6 @@ var ( flagPocketNodeUrl string ) -type supplierFn func( - context.Context, - depinject.Config, - *cobra.Command, -) (depinject.Config, error) - func RelayerCmd() *cobra.Command { cmd := &cobra.Command{ Use: "relayminer", @@ -120,9 +115,9 @@ func setupRelayerDependencies( return nil, err } - supplierFuncs := []supplierFn{ - newSupplyEventsQueryClientFn(pocketNodeWebsocketUrl), // leaf - newSupplyBlockClientFn(pocketNodeWebsocketUrl), + supplierFuncs := []config.SupplierFn{ + config.NewSupplyEventsQueryClientFn(pocketNodeWebsocketUrl), // leaf + config.NewSupplyBlockClientFn(pocketNodeWebsocketUrl), supplyMiner, // leaf supplyQueryClientContext, // leaf supplyTxClientContext, // leaf @@ -134,16 +129,7 @@ func setupRelayerDependencies( supplyRelayerSessionsManager, } - // Initialize deps to with empty depinject config. - deps = depinject.Configs() - for _, supplyFn := range supplierFuncs { - deps, err = supplyFn(ctx, deps, cmd) - if err != nil { - return nil, err - } - } - - return deps, nil + return config.SupplyConfig(ctx, cmd, supplierFuncs) } // getPocketNodeWebsocketUrl returns the websocket URL of the Pocket Node to @@ -166,7 +152,7 @@ func getPocketNodeWebsocketUrl() (string, error) { // EventsQueryClient. func newSupplyEventsQueryClientFn( pocketNodeWebsocketUrl string, -) supplierFn { +) config.SupplierFn { return func( _ context.Context, deps depinject.Config, @@ -182,7 +168,7 @@ func newSupplyEventsQueryClientFn( // with the given nodeURL and returns a new // depinject.Config which is supplied with the given deps and the new // BlockClient. -func newSupplyBlockClientFn(pocketNodeWebsocketUrl string) supplierFn { +func newSupplyBlockClientFn(pocketNodeWebsocketUrl string) config.SupplierFn { return func( ctx context.Context, deps depinject.Config, @@ -212,6 +198,9 @@ func supplyMiner( return depinject.Configs(deps, depinject.Supply(mnr)), nil } +// supplyQueryClientContext returns a function with constructs a ClientContext +// instance with the given cmd and returns a new depinject.Config which is +// supplied with the given deps and the new ClientContext. func supplyQueryClientContext( _ context.Context, deps depinject.Config, From e58c8480c599295e30186f19457fff5dbfc32971 Mon Sep 17 00:00:00 2001 From: harry <53987565+h5law@users.noreply.github.com> Date: Wed, 15 Nov 2023 18:19:29 +0000 Subject: [PATCH 05/16] chore: update comment --- pkg/appgateserver/cmd/cmd.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/pkg/appgateserver/cmd/cmd.go b/pkg/appgateserver/cmd/cmd.go index e539cf7bd..079f0a1a0 100644 --- a/pkg/appgateserver/cmd/cmd.go +++ b/pkg/appgateserver/cmd/cmd.go @@ -165,12 +165,7 @@ func newSupplyQueryClientContextFn(pocketQueryClientUrl string) config.SupplierF return nil, err } - // NB: Currently, the implementations of GetClientTxContext() and - // GetClientQueryContext() are identical, allowing for their interchangeable - // use in both querying and transaction operations. However, in order to support - // independent configuration of client contexts for distinct querying and - // transacting purposes. E.g.: transactions are dispatched to the sequencer - // while queries are handled by a trusted full-node. + // Get the client context from the command. queryClientCtx, err := cosmosclient.GetClientQueryContext(cmd) if err != nil { return nil, err From 75343bb3104bdf0ac7f68e0ffc87e58eba637540 Mon Sep 17 00:00:00 2001 From: Daniel Olshansky Date: Wed, 15 Nov 2023 12:05:10 -0800 Subject: [PATCH 06/16] Update .gitignore --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index dc18d9d57..336be35aa 100644 --- a/.gitignore +++ b/.gitignore @@ -62,6 +62,7 @@ localnet_config.yaml release # SMT KVStore files +# TODO_TECHDEBT(#126, @red-0ne): Rename `smt` to `smt_stores` and make it configurable so it can be stored anywhere on this smt # Do not allow a multi-moduled projected From 1cc7085385d31058a3c18c2888c6822e96e946d1 Mon Sep 17 00:00:00 2001 From: Daniel Olshansky Date: Wed, 15 Nov 2023 12:12:03 -0800 Subject: [PATCH 07/16] Update OpenAPI spec --- .gitignore | 2 -- docs/static/openapi.yml | 30 ++++++++++++++++-------------- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/.gitignore b/.gitignore index 336be35aa..723084503 100644 --- a/.gitignore +++ b/.gitignore @@ -70,5 +70,3 @@ go.work.sum # TODO_IN_THIS_COMMIT: Why did we start generating .dot files? # **/*.dot - - diff --git a/docs/static/openapi.yml b/docs/static/openapi.yml index d82f777d7..f55a851f2 100644 --- a/docs/static/openapi.yml +++ b/docs/static/openapi.yml @@ -46480,7 +46480,7 @@ paths: service: title: >- The Service for which the application is - configured + configured for type: object properties: id: @@ -46660,7 +46660,9 @@ paths: type: object properties: service: - title: The Service for which the application is configured + title: >- + The Service for which the application is configured + for type: object properties: id: @@ -47176,7 +47178,7 @@ paths: service: title: >- The Service for which the application is - configured + configured for type: object properties: id: @@ -47243,7 +47245,7 @@ paths: service: title: >- The Service for which the supplier is - configured + configured for type: object properties: id: @@ -76787,7 +76789,7 @@ definitions: type: object properties: service: - title: The Service for which the application is configured + title: The Service for which the application is configured for type: object properties: id: @@ -76871,7 +76873,7 @@ definitions: type: object properties: service: - title: The Service for which the application is configured + title: The Service for which the application is configured for type: object properties: id: @@ -76965,7 +76967,7 @@ definitions: type: object properties: service: - title: The Service for which the application is configured + title: The Service for which the application is configured for type: object properties: id: @@ -77016,7 +77018,7 @@ definitions: type: object properties: service: - title: The Service for which the application is configured + title: The Service for which the application is configured for type: object properties: id: @@ -77284,7 +77286,7 @@ definitions: type: object properties: service: - title: The Service for which the application is configured + title: The Service for which the application is configured for type: object properties: id: @@ -77348,7 +77350,7 @@ definitions: type: object properties: service: - title: The Service for which the supplier is configured + title: The Service for which the supplier is configured for type: object properties: id: @@ -77538,7 +77540,7 @@ definitions: type: object properties: service: - title: The Service for which the application is configured + title: The Service for which the application is configured for type: object properties: id: @@ -77600,7 +77602,7 @@ definitions: type: object properties: service: - title: The Service for which the supplier is configured + title: The Service for which the supplier is configured for type: object properties: id: @@ -77814,7 +77816,7 @@ definitions: type: object properties: service: - title: The Service for which the supplier is configured + title: The Service for which the supplier is configured for type: object properties: id: @@ -77943,7 +77945,7 @@ definitions: type: object properties: service: - title: The Service for which the supplier is configured + title: The Service for which the supplier is configured for type: object properties: id: From a04ebfc33d623eeb8f0cd7ee50a1b983d4e8e8e5 Mon Sep 17 00:00:00 2001 From: Daniel Olshansky Date: Wed, 15 Nov 2023 12:45:13 -0800 Subject: [PATCH 08/16] Updated comments for post 177+179 work for okdas --- .gitignore | 2 +- Makefile | 9 +++++---- config.yml | 2 +- pkg/appgateserver/cmd/cmd.go | 4 +++- pkg/relayer/cmd/cmd.go | 4 +++- 5 files changed, 13 insertions(+), 8 deletions(-) diff --git a/.gitignore b/.gitignore index 723084503..8fe75f2aa 100644 --- a/.gitignore +++ b/.gitignore @@ -68,5 +68,5 @@ smt # Do not allow a multi-moduled projected go.work.sum -# TODO_IN_THIS_COMMIT: Why did we start generating .dot files? +# TODO_TECHDEBT: It seems that .dot files come and go so we need to figure out the root cause: https://github.com/pokt-network/poktroll/pull/177/files#r1392521547 # **/*.dot diff --git a/Makefile b/Makefile index 7a770d038..9200fb190 100644 --- a/Makefile +++ b/Makefile @@ -280,6 +280,7 @@ app_list: ## List all the staked applications app_stake: ## Stake tokens for the application specified (must specify the APP and SERVICES env vars) poktrolld --home=$(POKTROLLD_HOME) tx application stake-application 1000upokt $(SERVICES) --keyring-backend test --from $(APP) --node $(POCKET_NODE) +# TODO_IMPROVE(#180): Make sure genesis-staked actors are available via AccountKeeper .PHONY: app1_stake app1_stake: ## Stake app1 (also staked in genesis) APP=app1 SERVICES=anvil,svc1,svc2 make app_stake @@ -360,22 +361,22 @@ supplier_list: ## List all the staked supplier supplier_stake: ## Stake tokens for the supplier specified (must specify the APP env var) poktrolld --home=$(POKTROLLD_HOME) tx supplier stake-supplier 1000upokt "$(SERVICES)" --keyring-backend test --from $(SUPPLIER) --node $(POCKET_NODE) +# TODO_IMPROVE(#180): Make sure genesis-staked actors are available via AccountKeeper .PHONY: supplier1_stake supplier1_stake: ## Stake supplier1 (also staked in genesis) - # TODO_TECHDEBT(#179): once `relayminer` service is added to tilt, this hostname should point to that service. + # TODO_UPNEXT(@okdas): once `relayminer` service is added to tilt, this hostname should point to that service. # I.e.: replace `localhost` with `relayminer` (or whatever the service's hostname is). - # TODO_IMPROVE(#180): Make sure genesis-staked actors are available via AccountKeeper SUPPLIER=supplier1 SERVICES="anvil;http://localhost:8545,svc1;http://localhost:8081" make supplier_stake .PHONY: supplier2_stake supplier2_stake: ## Stake supplier2 - # TODO_TECHDEBT(#179): once `relayminer` service is added to tilt, this hostname should point to that service. + # TODO_UPNEXT(@okdas): once `relayminer` service is added to tilt, this hostname should point to that service. # I.e.: replace `localhost` with `relayminer` (or whatever the service's hostname is). SUPPLIER=supplier2 SERVICES="anvil;http://localhost:8545,svc2;http://localhost:8082" make supplier_stake .PHONY: supplier3_stake supplier3_stake: ## Stake supplier3 - # TODO_TECHDEBT(#179): once `relayminer` service is added to tilt, this hostname should point to that service. + # TODO_UPNEXT(@okdas): once `relayminer` service is added to tilt, this hostname should point to that service. # I.e.: replace `localhost` with `relayminer` (or whatever the service's hostname is). SUPPLIER=supplier3 SERVICES="anvil;http://localhost:8545,svc3;http://localhost:8083" make supplier_stake diff --git a/config.yml b/config.yml index 789f712ad..90229f193 100644 --- a/config.yml +++ b/config.yml @@ -100,7 +100,7 @@ genesis: - endpoints: - configs: [] rpc_type: JSON_RPC - # TODO_TECHDEBT(#179): once `relayminer` service is added to tilt, this hostname should point to it instead of `localhost`. + # TODO_UPNEXT(@okdas): once `relayminer` service is added to tilt, this hostname should point to it instead of `localhost`. url: http://localhost:8548 service: id: anvil diff --git a/pkg/appgateserver/cmd/cmd.go b/pkg/appgateserver/cmd/cmd.go index 079f0a1a0..d691403e8 100644 --- a/pkg/appgateserver/cmd/cmd.go +++ b/pkg/appgateserver/cmd/cmd.go @@ -24,7 +24,9 @@ var ( flagSigningKey string flagSelfSigning bool flagListeningEndpoint string - flagQueryNodeUrl string + // TODO_DISCUSS: Should we use `--node` for both querying and sending transactions, or have the respective + // `--network-node` for txs and `--query-node` for querying in the future? + flagQueryNodeUrl string ) func AppGateServerCmd() *cobra.Command { diff --git a/pkg/relayer/cmd/cmd.go b/pkg/relayer/cmd/cmd.go index 1f69b5397..a41820a40 100644 --- a/pkg/relayer/cmd/cmd.go +++ b/pkg/relayer/cmd/cmd.go @@ -344,7 +344,7 @@ func supplyRelayerProxy( _ *cobra.Command, ) (depinject.Config, error) { // TODO_BLOCKER:(#137): This MUST be populated via the `relayer.json` config file - // TODO_TECHDEBT(#179): this hostname should be updated to match that of the + // TODO_UPNEXT(@okdas): this hostname should be updated to match that of the // in-tilt anvil service. proxyServiceURL, err := url.Parse("http://localhost:8547/") if err != nil { @@ -372,6 +372,8 @@ func supplyRelayerProxy( // supplyRelayerSessionsManager constructs a RelayerSessionsManager instance // and returns a new depinject.Config which is supplied with the given deps and // the new RelayerSessionsManager. +// See the comment next to `flagQueryNodeUrl` (if it still exists) on how/why +// we have multiple flags pointing to different node types. func supplyRelayerSessionsManager( ctx context.Context, deps depinject.Config, From c908a7def364dca9a909e2e9473d7aa0afba060a Mon Sep 17 00:00:00 2001 From: Daniel Olshansky Date: Wed, 15 Nov 2023 12:55:51 -0800 Subject: [PATCH 09/16] [Test] Updating `relay.feature` to run curl command to enable E2E Relay Test (#178) - Adding `APPGATE_SERVER` to the Makefile - Adding the ability to run a `curl` command in the `e2e-test framework` - Particl update to `relay.feature` to start sending a curl command as part of the E2E test Testing the new test with: ```bash export APPGATE_SERVER=http://localhost:42069 POCKET_NODE=tcp://127.0.0.1:36657 && \ POKTROLLD_HOME=./localnet/poktrolld && \ go test -run TestFeatures/Relay_Namespace/App_can_send_relay_to_Supplier -v ./e2e/tests/... -tags=e2e ``` --- Makefile | 2 ++ config.yml | 2 +- docs/static/openapi.yml | 30 +++++++++++---------- e2e/tests/init_test.go | 9 +++++-- e2e/tests/node.go | 51 +++++++++++++++++++++++++++++++----- e2e/tests/relay.feature | 2 +- pkg/appgateserver/cmd/cmd.go | 2 +- pkg/appgateserver/jsonrpc.go | 1 - 8 files changed, 73 insertions(+), 26 deletions(-) diff --git a/Makefile b/Makefile index 1f06b4f68..104950507 100644 --- a/Makefile +++ b/Makefile @@ -2,6 +2,7 @@ POKTROLLD_HOME := ./localnet/poktrolld POCKET_NODE = tcp://127.0.0.1:36657 # The pocket rollup node (full node and sequencer in the localnet context) +APPGATE_SERVER = http://localhost:42069 POCKET_ADDR_PREFIX = pokt #################### @@ -136,6 +137,7 @@ go_imports: check_go_version ## Run goimports on all go files .PHONY: test_e2e test_e2e: ## Run all E2E tests export POCKET_NODE=$(POCKET_NODE) && \ + export APPGATE_SERVER=$(APPGATE_SERVER) && \ POKTROLLD_HOME=../../$(POKTROLLD_HOME) && \ go test -v ./e2e/tests/... -tags=e2e diff --git a/config.yml b/config.yml index f21fe2be6..f6e3c1e89 100644 --- a/config.yml +++ b/config.yml @@ -100,7 +100,7 @@ genesis: - endpoints: - configs: [] rpc_type: JSON_RPC - url: http://anvil:8547 + url: http://localhost:8545 service: id: anvil name: "" diff --git a/docs/static/openapi.yml b/docs/static/openapi.yml index d82f777d7..f55a851f2 100644 --- a/docs/static/openapi.yml +++ b/docs/static/openapi.yml @@ -46480,7 +46480,7 @@ paths: service: title: >- The Service for which the application is - configured + configured for type: object properties: id: @@ -46660,7 +46660,9 @@ paths: type: object properties: service: - title: The Service for which the application is configured + title: >- + The Service for which the application is configured + for type: object properties: id: @@ -47176,7 +47178,7 @@ paths: service: title: >- The Service for which the application is - configured + configured for type: object properties: id: @@ -47243,7 +47245,7 @@ paths: service: title: >- The Service for which the supplier is - configured + configured for type: object properties: id: @@ -76787,7 +76789,7 @@ definitions: type: object properties: service: - title: The Service for which the application is configured + title: The Service for which the application is configured for type: object properties: id: @@ -76871,7 +76873,7 @@ definitions: type: object properties: service: - title: The Service for which the application is configured + title: The Service for which the application is configured for type: object properties: id: @@ -76965,7 +76967,7 @@ definitions: type: object properties: service: - title: The Service for which the application is configured + title: The Service for which the application is configured for type: object properties: id: @@ -77016,7 +77018,7 @@ definitions: type: object properties: service: - title: The Service for which the application is configured + title: The Service for which the application is configured for type: object properties: id: @@ -77284,7 +77286,7 @@ definitions: type: object properties: service: - title: The Service for which the application is configured + title: The Service for which the application is configured for type: object properties: id: @@ -77348,7 +77350,7 @@ definitions: type: object properties: service: - title: The Service for which the supplier is configured + title: The Service for which the supplier is configured for type: object properties: id: @@ -77538,7 +77540,7 @@ definitions: type: object properties: service: - title: The Service for which the application is configured + title: The Service for which the application is configured for type: object properties: id: @@ -77600,7 +77602,7 @@ definitions: type: object properties: service: - title: The Service for which the supplier is configured + title: The Service for which the supplier is configured for type: object properties: id: @@ -77814,7 +77816,7 @@ definitions: type: object properties: service: - title: The Service for which the supplier is configured + title: The Service for which the supplier is configured for type: object properties: id: @@ -77943,7 +77945,7 @@ definitions: type: object properties: service: - title: The Service for which the supplier is configured + title: The Service for which the supplier is configured for type: object properties: id: diff --git a/e2e/tests/init_test.go b/e2e/tests/init_test.go index 6bd5aeb27..454238264 100644 --- a/e2e/tests/init_test.go +++ b/e2e/tests/init_test.go @@ -242,8 +242,13 @@ func (s *suite) TheSessionForApplicationAndServiceContainsTheSupplier(appName st s.Fatalf("session for app %s and service %s does not contain supplier %s", appName, serviceId, supplierName) } -func (s *suite) TheApplicationSendsTheSupplierARelayRequestForService(appName string, supplierName string, requestName string, serviceId string) { - // TODO(#126, @Olshansk): Implement this step +func (s *suite) TheApplicationSendsTheSupplierARequestForServiceWithData(appName, supplierName, serviceId, requestData string) { + res, err := s.pocketd.RunCurl("", serviceId, requestData) + if err != nil { + s.Fatalf("error sending relay request from app %s to supplier %s for service %s: %v", appName, supplierName, serviceId, err) + } + // TODO(#184): store & use the result of res + fmt.Println(res) } func (s *suite) TheApplicationReceivesASuccessfulRelayResponseSignedBy(appName string, supplierName string) { diff --git a/e2e/tests/node.go b/e2e/tests/node.go index ad0540942..533f347d7 100644 --- a/e2e/tests/node.go +++ b/e2e/tests/node.go @@ -21,6 +21,8 @@ var ( defaultRPCHost = "127.0.0.1" // defaultHome is the default home directory for pocketd defaultHome = os.Getenv("POKTROLLD_HOME") + // defaultAppGateServerURL used by curl commands to send relay requests + defaultAppGateServerURL = os.Getenv("APPGATE_SERVER") // defaultDebugOutput provides verbose output on manipulations with binaries (cli command, stdout, stderr) defaultDebugOutput = os.Getenv("E2E_DEBUG_OUTPUT") ) @@ -44,8 +46,9 @@ type commandResult struct { // PocketClient is a single function interface for interacting with a node type PocketClient interface { - RunCommand(...string) (*commandResult, error) - RunCommandOnHost(string, ...string) (*commandResult, error) + RunCommand(args ...string) (*commandResult, error) + RunCommandOnHost(rpcUrl string, args ...string) (*commandResult, error) + RunCurl(rpcUrl, service, data string, args ...string) (*commandResult, error) } // Ensure that pocketdBin struct fulfills PocketClient @@ -58,7 +61,7 @@ type pocketdBin struct { // RunCommand runs a command on the local machine using the pocketd binary func (p *pocketdBin) RunCommand(args ...string) (*commandResult, error) { - return p.runCmd(args...) + return p.runPocketCmd(args...) } // RunCommandOnHost runs a command on specified host with the given args @@ -67,11 +70,19 @@ func (p *pocketdBin) RunCommandOnHost(rpcUrl string, args ...string) (*commandRe rpcUrl = defaultRPCURL } args = append(args, "--node", rpcUrl) - return p.runCmd(args...) + return p.runPocketCmd(args...) } -// runCmd is a helper to run a command using the local pocketd binary with the flags provided -func (p *pocketdBin) runCmd(args ...string) (*commandResult, error) { +// RunCurl runs a curl command on the local machine +func (p *pocketdBin) RunCurl(rpcUrl, service, data string, args ...string) (*commandResult, error) { + if rpcUrl == "" { + rpcUrl = defaultAppGateServerURL + } + return p.runCurlPostCmd(rpcUrl, service, data, args...) +} + +// runPocketCmd is a helper to run a command using the local pocketd binary with the flags provided +func (p *pocketdBin) runPocketCmd(args ...string) (*commandResult, error) { base := []string{"--home", defaultHome} args = append(base, args...) commandStr := "poktrolld " + strings.Join(args, " ") // Create a string representation of the command @@ -101,3 +112,31 @@ func (p *pocketdBin) runCmd(args ...string) (*commandResult, error) { return r, err } + +// runCurlPostCmd is a helper to run a command using the local pocketd binary with the flags provided +func (p *pocketdBin) runCurlPostCmd(rpcUrl string, service string, data string, args ...string) (*commandResult, error) { + base := []string{"-v", "-X", "POST", "-H", "'Content-Type: application/json'", "--data", fmt.Sprintf("'%s'", data), fmt.Sprintf("%s/%s", rpcUrl, service)} + args = append(base, args...) + commandStr := "curl " + strings.Join(args, " ") // Create a string representation of the command + cmd := exec.Command("curl", args...) + + var stdoutBuf, stderrBuf bytes.Buffer + cmd.Stdout = &stdoutBuf + cmd.Stderr = &stderrBuf + + err := cmd.Run() + r := &commandResult{ + Command: commandStr, // Set the command string + Stdout: stdoutBuf.String(), + Stderr: stderrBuf.String(), + Err: err, + } + p.result = r + + if err != nil { + // Include the command executed in the error message for context + err = fmt.Errorf("error running command [%s]: %v, stderr: %s", commandStr, err, stderrBuf.String()) + } + + return r, err +} diff --git a/e2e/tests/relay.feature b/e2e/tests/relay.feature index 4f071bfa6..484172092 100644 --- a/e2e/tests/relay.feature +++ b/e2e/tests/relay.feature @@ -5,7 +5,7 @@ Feature: Relay Namespace And the application "app1" is staked for service "anvil" And the supplier "supplier1" is staked for service "anvil" And the session for application "app1" and service "anvil" contains the supplier "supplier1" - When the application "app1" sends the supplier "supplier1" a "getBlock" relay request for service "anvil" + When the application "app1" sends the supplier "supplier1" a request for service "anvil" with data '{"jsonrpc":"2.0","method":"eth_blockNumber","params":[],"id":1}' Then the application "app1" receives a successful relay response signed by "supplier1" # TODO_TEST(@Olshansk): diff --git a/pkg/appgateserver/cmd/cmd.go b/pkg/appgateserver/cmd/cmd.go index 21052e270..663562e98 100644 --- a/pkg/appgateserver/cmd/cmd.go +++ b/pkg/appgateserver/cmd/cmd.go @@ -142,6 +142,6 @@ func setupAppGateServerDependencies(cmd *cobra.Command, ctx context.Context, com return nil, fmt.Errorf("failed to create block client: %w", err) } - // Return the dependencie config. + // Return the dependencies config. return depinject.Supply(clientCtx, blockClient), nil } diff --git a/pkg/appgateserver/jsonrpc.go b/pkg/appgateserver/jsonrpc.go index 3be01cca7..80784fb44 100644 --- a/pkg/appgateserver/jsonrpc.go +++ b/pkg/appgateserver/jsonrpc.go @@ -87,7 +87,6 @@ func (app *appGateServer) handleJSONRPCRelay( Body: relayRequestReader, } - // Perform the HTTP request to the relayer. log.Printf("DEBUG: Sending signed relay request to %s", supplierUrl) relayHTTPResponse, err := http.DefaultClient.Do(relayHTTPRequest) if err != nil { From 59f863c0c81573bd099121b380784be88dd368c0 Mon Sep 17 00:00:00 2001 From: Daniel Olshansky Date: Wed, 15 Nov 2023 12:58:32 -0800 Subject: [PATCH 10/16] Update pkg/relayer/cmd/cmd.go --- pkg/relayer/cmd/cmd.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/relayer/cmd/cmd.go b/pkg/relayer/cmd/cmd.go index a41820a40..55da288c6 100644 --- a/pkg/relayer/cmd/cmd.go +++ b/pkg/relayer/cmd/cmd.go @@ -26,6 +26,7 @@ import ( const omittedDefaultFlagValue = "explicitly omitting default" +// TODO_CONSIDERATION: Consider moving all flags defined in `/pkg` to a `flags.go` file. var ( flagSigningKeyName string flagSmtStorePath string From 753c27ae97b8abf067e97f5698c18df3bc6d4e33 Mon Sep 17 00:00:00 2001 From: Daniel Olshansky Date: Wed, 15 Nov 2023 13:17:42 -0800 Subject: [PATCH 11/16] Update the names and references to queryNode/sequencerNode/fullNode etc --- pkg/appgateserver/cmd/cmd.go | 9 +++++---- pkg/relayer/cmd/cmd.go | 29 ++++++++++++++--------------- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/pkg/appgateserver/cmd/cmd.go b/pkg/appgateserver/cmd/cmd.go index d691403e8..a2e71bed6 100644 --- a/pkg/appgateserver/cmd/cmd.go +++ b/pkg/appgateserver/cmd/cmd.go @@ -18,15 +18,14 @@ import ( "github.com/pokt-network/poktroll/pkg/deps/config" ) +// We're `explicitly omitting default` so that the appgateserver crashes if these aren't specified. const omittedDefaultFlagValue = "explicitly omitting default" var ( flagSigningKey string flagSelfSigning bool flagListeningEndpoint string - // TODO_DISCUSS: Should we use `--node` for both querying and sending transactions, or have the respective - // `--network-node` for txs and `--query-node` for querying in the future? - flagQueryNodeUrl string + flagQueryNodeUrl string ) func AppGateServerCmd() *cobra.Command { @@ -58,11 +57,13 @@ relays to the AppGate server and function as an Application, provided that: RunE: runAppGateServer, } + // Custom flags cmd.Flags().StringVar(&flagSigningKey, "signing-key", "", "The name of the key that will be used to sign relays") cmd.Flags().StringVar(&flagListeningEndpoint, "listening-endpoint", "http://localhost:42069", "The host and port that the appgate server will listen on") cmd.Flags().BoolVar(&flagSelfSigning, "self-signing", false, "Whether the server should sign all incoming requests with its own ring (for applications)") - cmd.Flags().StringVar(&flagQueryNodeUrl, "query-node", omittedDefaultFlagValue, "The URL of the pocket node to query for on-chain data") + cmd.Flags().StringVar(&flagQueryNodeUrl, "query-node", omittedDefaultFlagValue, "tcp://: to a full pocket node for reading data and listening for on-chain events") + // Cosmos flags cmd.Flags().String(cosmosflags.FlagKeyringBackend, "", "Select keyring's backend (os|file|kwallet|pass|test)") cmd.Flags().String(cosmosflags.FlagNode, omittedDefaultFlagValue, "The URL of the comet tcp endpoint to communicate with the pocket blockchain") diff --git a/pkg/relayer/cmd/cmd.go b/pkg/relayer/cmd/cmd.go index 55da288c6..e5a5d1729 100644 --- a/pkg/relayer/cmd/cmd.go +++ b/pkg/relayer/cmd/cmd.go @@ -24,14 +24,15 @@ import ( "github.com/pokt-network/poktroll/pkg/relayer/session" ) +// We're `explicitly omitting default` so the relayer crashes if these aren't specified. const omittedDefaultFlagValue = "explicitly omitting default" // TODO_CONSIDERATION: Consider moving all flags defined in `/pkg` to a `flags.go` file. var ( - flagSigningKeyName string - flagSmtStorePath string - flagSequencerNodeUrl string - flagPocketNodeUrl string + flagSigningKeyName string + flagSmtStorePath string + flagNetworkNodeUrl string + flagQueryNodeUrl string ) func RelayerCmd() *cobra.Command { @@ -62,10 +63,8 @@ for such operations.`, // TODO_TECHDEBT(#137): This, alongside other flags, should be part of a config file suppliers provide. cmd.Flags().StringVar(&flagSmtStorePath, "smt-store", "smt", "Path to where the data backing SMT KV store exists on disk") // Communication flags - // TODO_TECHDEBT: We're using `explicitly omitting default` so the relayer crashes if these aren't specified. - // Figure out what good defaults should be post alpha. - cmd.Flags().StringVar(&flagSequencerNodeUrl, "sequencer-node", "explicitly omitting default", "tcp://: to sequencer node to submit txs") - cmd.Flags().StringVar(&flagPocketNodeUrl, "pocket-node", omittedDefaultFlagValue, "tcp://: to full pocket node for reading data and listening for on-chain events") + cmd.Flags().StringVar(&flagNetworkNodeUrl, "network-node", omittedDefaultFlagValue, "tcp://: to a pocket node that gossips transactions throughout the network (may or may not be the sequencer") + cmd.Flags().StringVar(&flagQueryNodeUrl, "query-node", omittedDefaultFlagValue, "tcp://: to a full pocket node for reading data and listening for on-chain events") cmd.Flags().String(cosmosflags.FlagNode, omittedDefaultFlagValue, "registering the default cosmos node flag; needed to initialize the cosmostx and query contexts correctly") return cmd @@ -136,11 +135,11 @@ func setupRelayerDependencies( // getPocketNodeWebsocketUrl returns the websocket URL of the Pocket Node to // connect to for subscribing to on-chain events. func getPocketNodeWebsocketUrl() (string, error) { - if flagPocketNodeUrl == omittedDefaultFlagValue { - return "", fmt.Errorf("--pocket-node flag is required") + if flagQueryNodeUrl == omittedDefaultFlagValue { + return "", fmt.Errorf("--query-node flag is required") } - pocketNodeURL, err := url.Parse(flagPocketNodeUrl) + pocketNodeURL, err := url.Parse(flagQueryNodeUrl) if err != nil { return "", err } @@ -207,9 +206,9 @@ func supplyQueryClientContext( deps depinject.Config, cmd *cobra.Command, ) (depinject.Config, error) { - // Set --node flag to the --pocket-node for the client context + // Set --node flag to the --query-node for the client context // This flag is read by cosmosclient.GetClientQueryContext. - err := cmd.Flags().Set(cosmosflags.FlagNode, flagPocketNodeUrl) + err := cmd.Flags().Set(cosmosflags.FlagNode, flagQueryNodeUrl) if err != nil { return nil, err } @@ -238,9 +237,9 @@ func supplyTxClientContext( deps depinject.Config, cmd *cobra.Command, ) (depinject.Config, error) { - // Set --node flag to the --sequencer-node for this client context. + // Set --node flag to the --network-node for this client context. // This flag is read by cosmosclient.GetClientTxContext. - err := cmd.Flags().Set(cosmosflags.FlagNode, flagSequencerNodeUrl) + err := cmd.Flags().Set(cosmosflags.FlagNode, flagNetworkNodeUrl) if err != nil { return nil, err } From 966b874a7df4b4bf38f4a8e30857a20c236b181d Mon Sep 17 00:00:00 2001 From: Daniel Olshansky Date: Wed, 15 Nov 2023 15:23:26 -0800 Subject: [PATCH 12/16] Update some comments and TODOs --- pkg/appgateserver/cmd/cmd.go | 2 +- pkg/relayer/cmd/cmd.go | 16 +++++++++------- pkg/relayer/proxy/proxy.go | 2 +- pkg/relayer/session/session.go | 5 +++-- proto/pocket/service/relay.proto | 2 ++ 5 files changed, 16 insertions(+), 11 deletions(-) diff --git a/pkg/appgateserver/cmd/cmd.go b/pkg/appgateserver/cmd/cmd.go index a2e71bed6..7a541a511 100644 --- a/pkg/appgateserver/cmd/cmd.go +++ b/pkg/appgateserver/cmd/cmd.go @@ -65,7 +65,7 @@ relays to the AppGate server and function as an Application, provided that: // Cosmos flags cmd.Flags().String(cosmosflags.FlagKeyringBackend, "", "Select keyring's backend (os|file|kwallet|pass|test)") - cmd.Flags().String(cosmosflags.FlagNode, omittedDefaultFlagValue, "The URL of the comet tcp endpoint to communicate with the pocket blockchain") + cmd.Flags().String(cosmosflags.FlagNode, omittedDefaultFlagValue, "registering the default cosmos node flag; needed to initialize the cosmostx and query contexts correctly and uses flagQueryNodeUrl underneath") return cmd } diff --git a/pkg/relayer/cmd/cmd.go b/pkg/relayer/cmd/cmd.go index e5a5d1729..ea3df89b6 100644 --- a/pkg/relayer/cmd/cmd.go +++ b/pkg/relayer/cmd/cmd.go @@ -39,18 +39,19 @@ func RelayerCmd() *cobra.Command { cmd := &cobra.Command{ Use: "relayminer", Short: "Run a relay miner", - // TODO_TECHDEBT: add a longer long description. Long: `Run a relay miner. The relay miner process configures and starts relay servers for each service the supplier actor identified by --signing-key is -staked for (configured on-chain). Relay requests received by the relay servers -are validated and proxied to their respective service endpoints. The responses +staked for (configured on-chain). + +Relay requests received by the relay servers are validated and proxied to their +respective service endpoints, maintained by the relayer off-chain. The responses are then signed and sent back to the requesting application. For each successfully served relay, the miner will hash and compare its difficulty against an on-chain threshold. If the difficulty is sufficient, it is applicable to relay volume and therefore rewards. Such relays are inserted into and persisted via an SMT KV store. The miner will monitor the current block height and periodically -submit claim and proof messages according to the protocol as sessions become eligable +submit claim and proof messages according to the protocol as sessions become eligible for such operations.`, RunE: runRelayer, } @@ -65,7 +66,7 @@ for such operations.`, // Communication flags cmd.Flags().StringVar(&flagNetworkNodeUrl, "network-node", omittedDefaultFlagValue, "tcp://: to a pocket node that gossips transactions throughout the network (may or may not be the sequencer") cmd.Flags().StringVar(&flagQueryNodeUrl, "query-node", omittedDefaultFlagValue, "tcp://: to a full pocket node for reading data and listening for on-chain events") - cmd.Flags().String(cosmosflags.FlagNode, omittedDefaultFlagValue, "registering the default cosmos node flag; needed to initialize the cosmostx and query contexts correctly") + cmd.Flags().String(cosmosflags.FlagNode, omittedDefaultFlagValue, "registering the default cosmos node flag; needed to initialize the cosmostx and query contexts correctly and uses flagQueryNodeUrl underneath") return cmd } @@ -351,8 +352,9 @@ func supplyRelayerProxy( return nil, err } - // TODO_TECHDEBT(#137, #130): Once the `relayer.json` config file is implemented an a local LLM node - // is supported, this needs to be expanded such that a single relayer can proxy to multiple services at once. + // TODO_TECHDEBT(#137, #130): Once the `relayer.json` config file is implemented AND a local LLM RPC service + // is supported on LocalNet, this needs to be expanded to include more than one service. The ability to support + // multiple services is already in place but currently (as seen below) is hardcoded. proxiedServiceEndpoints := map[string]url.URL{ "anvil": *proxyServiceURL, } diff --git a/pkg/relayer/proxy/proxy.go b/pkg/relayer/proxy/proxy.go index 28f92576c..b6c2a5f68 100644 --- a/pkg/relayer/proxy/proxy.go +++ b/pkg/relayer/proxy/proxy.go @@ -123,7 +123,7 @@ func NewRelayerProxy( rp.sessionQuerier = sessiontypes.NewQueryClient(clientCtx) rp.applicationQuerier = apptypes.NewQueryClient(clientCtx) rp.keyring = rp.clientCtx.Keyring - rp.ringCache = make(map[string][]ringtypes.Point) + rp.ringCache = make(map[string][]ringtypes.Point) // the key is the appAddress rp.ringCacheMutex = &sync.RWMutex{} for _, opt := range opts { diff --git a/pkg/relayer/session/session.go b/pkg/relayer/session/session.go index ef89e874c..08ea0f962 100644 --- a/pkg/relayer/session/session.go +++ b/pkg/relayer/session/session.go @@ -161,10 +161,11 @@ func (rs *relayerSessionsManager) mapBlockToSessionsToClaim( // Iterate over the sessionsTrees map to get the ones that end at a block height // lower than the current block height. for endBlockHeight, sessionsTreesEndingAtBlockHeight := range rs.sessionsTrees { - // TODO: We need this to be == instead of <= because we don't want to keep sending + // TODO_BLOCKER(@red-0ne): We need this to be == instead of <= because we don't want to keep sending // the same session while waiting the next step. This does not address the case // where the block client misses the target block which should be handled by the - // retry mechanism. + // retry mechanism. See the discussion in the following GitHub thread for next + // steps: https://github.com/pokt-network/poktroll/pull/177/files?show-viewed-files=true&file-filters%5B%5D=#r1391957041 if endBlockHeight == block.Height() { // Iterate over the sessionsTrees that end at this block height (or // less) and add them to the list of sessionTrees to be published. diff --git a/proto/pocket/service/relay.proto b/proto/pocket/service/relay.proto index dc269fad0..c9f0d5d77 100644 --- a/proto/pocket/service/relay.proto +++ b/proto/pocket/service/relay.proto @@ -34,6 +34,8 @@ message RelayRequest { } } +// TODO_TECHDEBT(#189, @h5law): See discussion related to #189 on how/why JSONRPC should be refactored altogether. + // JSONRPCRequestPayload contains the payload for a JSON-RPC request. // See https://www.jsonrpc.org/specification#request_object for more details. message JSONRPCRequestPayload { From c726dfc0287aa0e7c2378209a79a3c885b7f242c Mon Sep 17 00:00:00 2001 From: Daniel Olshansky Date: Wed, 15 Nov 2023 15:26:57 -0800 Subject: [PATCH 13/16] Added a couple more comments --- pkg/appgateserver/server.go | 2 ++ pkg/relayer/proxy/error_reply.go | 2 ++ 2 files changed, 4 insertions(+) diff --git a/pkg/appgateserver/server.go b/pkg/appgateserver/server.go index a85d5fae3..fe4f15000 100644 --- a/pkg/appgateserver/server.go +++ b/pkg/appgateserver/server.go @@ -225,6 +225,8 @@ func (app *appGateServer) replyWithError(writer http.ResponseWriter, err error) relayResponse := &types.RelayResponse{ Payload: &types.RelayResponse_JsonRpcPayload{ JsonRpcPayload: &types.JSONRPCResponsePayload{ + // TODO_BLOCKER(@red-0ne): This MUST match the Id provided by the request. + // If JSON-RPC request is not unmarshaled yet (i.e. can't extract ID), it SHOULD be a random ID. Id: 0, Jsonrpc: "2.0", Error: &types.JSONRPCResponseError{ diff --git a/pkg/relayer/proxy/error_reply.go b/pkg/relayer/proxy/error_reply.go index c6c5606e6..617f3b917 100644 --- a/pkg/relayer/proxy/error_reply.go +++ b/pkg/relayer/proxy/error_reply.go @@ -16,6 +16,8 @@ func (jsrv *jsonRPCServer) replyWithError(writer http.ResponseWriter, err error) relayResponse := &types.RelayResponse{ Payload: &types.RelayResponse_JsonRpcPayload{ JsonRpcPayload: &types.JSONRPCResponsePayload{ + // TODO_BLOCKER(@red-0ne): This MUST match the Id provided by the request. + // If JSON-RPC request is not unmarshaled yet (i.e. can't extract ID), it SHOULD be a random ID. Id: 0, Jsonrpc: "2.0", Error: &types.JSONRPCResponseError{ From c36db928217a21b1255d0918d8f7590e94efa85a Mon Sep 17 00:00:00 2001 From: Daniel Olshansky Date: Wed, 15 Nov 2023 15:41:27 -0800 Subject: [PATCH 14/16] More tiny comment updates --- pkg/relayer/cmd/cmd.go | 5 ++--- pkg/relayer/proxy/server_builder.go | 1 - 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/pkg/relayer/cmd/cmd.go b/pkg/relayer/cmd/cmd.go index ea3df89b6..4f43c4ff2 100644 --- a/pkg/relayer/cmd/cmd.go +++ b/pkg/relayer/cmd/cmd.go @@ -344,9 +344,8 @@ func supplyRelayerProxy( deps depinject.Config, _ *cobra.Command, ) (depinject.Config, error) { - // TODO_BLOCKER:(#137): This MUST be populated via the `relayer.json` config file - // TODO_UPNEXT(@okdas): this hostname should be updated to match that of the - // in-tilt anvil service. + // TODO_BLOCKER:(#137, @red-0ne): This MUST be populated via the `relayer.json` config file + // TODO_UPNEXT(@okdas): this hostname should be updated to match that of the in-tilt anvil service. proxyServiceURL, err := url.Parse("http://localhost:8547/") if err != nil { return nil, err diff --git a/pkg/relayer/proxy/server_builder.go b/pkg/relayer/proxy/server_builder.go index 4238c1ce3..3846ac3af 100644 --- a/pkg/relayer/proxy/server_builder.go +++ b/pkg/relayer/proxy/server_builder.go @@ -20,7 +20,6 @@ func (rp *relayerProxy) BuildProvidedServices(ctx context.Context) error { return err } - // TODO_DISCUSS: is there a reason not to assign rp.supplierAddress here? supplierAddress, err := supplierKey.GetAddress() if err != nil { return err From a07fd64e65f052e736059198d3afbedb9dca24cb Mon Sep 17 00:00:00 2001 From: Bryan White Date: Thu, 16 Nov 2023 02:21:25 +0100 Subject: [PATCH 15/16] [Relay] E2E Relay Gaps (#177) Spike to close the E2E relay MVP. --------- Co-authored-by: Redouane Lakrache Co-authored-by: Daniel Olshansky Co-authored-by: harry <53987565+h5law@users.noreply.github.com> --- .gitignore | 8 + Makefile | 18 +- cmd/pocketd/cmd/root.go | 6 + cmd/signals/on_exit.go | 23 ++ config.yml | 1 + pkg/appgateserver/cmd/cmd.go | 102 ++++--- pkg/appgateserver/endpoint_selector.go | 2 +- pkg/appgateserver/errors.go | 2 + pkg/appgateserver/jsonrpc.go | 49 ++-- pkg/appgateserver/relay_verifier.go | 14 +- pkg/appgateserver/server.go | 4 +- pkg/client/block/client.go | 3 + pkg/client/events_query/client.go | 5 + pkg/client/tx/client.go | 11 +- pkg/client/tx/context.go | 13 +- pkg/client/tx/errors.go | 2 +- pkg/deps/config/config.go | 73 +++++ pkg/relayer/cmd/cmd.go | 392 +++++++++++++++++++++++++ pkg/relayer/interface.go | 13 + pkg/relayer/protocol/block_heights.go | 4 +- pkg/relayer/proxy/error_reply.go | 6 +- pkg/relayer/proxy/jsonrpc.go | 42 +-- pkg/relayer/proxy/proxy.go | 33 ++- pkg/relayer/proxy/relay_builders.go | 24 +- pkg/relayer/proxy/relay_verifier.go | 3 + pkg/relayer/proxy/rings.go | 17 +- pkg/relayer/proxy/server_builder.go | 26 +- pkg/relayer/session/claim.go | 3 + pkg/relayer/session/proof.go | 7 +- pkg/relayer/session/session.go | 37 ++- pkg/relayer/session/sessiontree.go | 5 +- proto/pocket/service/relay.proto | 13 +- testutil/testclient/testtx/context.go | 4 +- 33 files changed, 826 insertions(+), 139 deletions(-) create mode 100644 cmd/signals/on_exit.go create mode 100644 pkg/deps/config/config.go create mode 100644 pkg/relayer/cmd/cmd.go diff --git a/.gitignore b/.gitignore index a10753b59..8fe75f2aa 100644 --- a/.gitignore +++ b/.gitignore @@ -61,4 +61,12 @@ localnet_config.yaml # Relase artifacts produced by `ignite chain build --release` release +# SMT KVStore files +# TODO_TECHDEBT(#126, @red-0ne): Rename `smt` to `smt_stores` and make it configurable so it can be stored anywhere on this +smt + +# Do not allow a multi-moduled projected go.work.sum + +# TODO_TECHDEBT: It seems that .dot files come and go so we need to figure out the root cause: https://github.com/pokt-network/poktroll/pull/177/files#r1392521547 +# **/*.dot diff --git a/Makefile b/Makefile index 104950507..19120c578 100644 --- a/Makefile +++ b/Makefile @@ -282,8 +282,9 @@ app_list: ## List all the staked applications app_stake: ## Stake tokens for the application specified (must specify the APP and SERVICES env vars) poktrolld --home=$(POKTROLLD_HOME) tx application stake-application 1000upokt $(SERVICES) --keyring-backend test --from $(APP) --node $(POCKET_NODE) +# TODO_IMPROVE(#180): Make sure genesis-staked actors are available via AccountKeeper .PHONY: app1_stake -app1_stake: ## Stake app1 +app1_stake: ## Stake app1 (also staked in genesis) APP=app1 SERVICES=anvil,svc1,svc2 make app_stake .PHONY: app2_stake @@ -362,17 +363,24 @@ supplier_list: ## List all the staked supplier supplier_stake: ## Stake tokens for the supplier specified (must specify the APP env var) poktrolld --home=$(POKTROLLD_HOME) tx supplier stake-supplier 1000upokt "$(SERVICES)" --keyring-backend test --from $(SUPPLIER) --node $(POCKET_NODE) +# TODO_IMPROVE(#180): Make sure genesis-staked actors are available via AccountKeeper .PHONY: supplier1_stake -supplier1_stake: ## Stake supplier1 - SUPPLIER=supplier1 SERVICES="anvil;http://anvil:8547,svc1;http://localhost:8081" make supplier_stake +supplier1_stake: ## Stake supplier1 (also staked in genesis) + # TODO_UPNEXT(@okdas): once `relayminer` service is added to tilt, this hostname should point to that service. + # I.e.: replace `localhost` with `relayminer` (or whatever the service's hostname is). + SUPPLIER=supplier1 SERVICES="anvil;http://localhost:8545,svc1;http://localhost:8081" make supplier_stake .PHONY: supplier2_stake supplier2_stake: ## Stake supplier2 - SUPPLIER=supplier2 SERVICES="anvil;http://anvil:8547,svc2;http://localhost:8082" make supplier_stake + # TODO_UPNEXT(@okdas): once `relayminer` service is added to tilt, this hostname should point to that service. + # I.e.: replace `localhost` with `relayminer` (or whatever the service's hostname is). + SUPPLIER=supplier2 SERVICES="anvil;http://localhost:8545,svc2;http://localhost:8082" make supplier_stake .PHONY: supplier3_stake supplier3_stake: ## Stake supplier3 - SUPPLIER=supplier3 SERVICES="anvil;http://anvil:8547,svc3;http://localhost:8083" make supplier_stake + # TODO_UPNEXT(@okdas): once `relayminer` service is added to tilt, this hostname should point to that service. + # I.e.: replace `localhost` with `relayminer` (or whatever the service's hostname is). + SUPPLIER=supplier3 SERVICES="anvil;http://localhost:8545,svc3;http://localhost:8083" make supplier_stake .PHONY: supplier_unstake supplier_unstake: ## Unstake an supplier (must specify the SUPPLIER env var) diff --git a/cmd/pocketd/cmd/root.go b/cmd/pocketd/cmd/root.go index cf58f2447..55b7071fc 100644 --- a/cmd/pocketd/cmd/root.go +++ b/cmd/pocketd/cmd/root.go @@ -44,6 +44,7 @@ import ( "github.com/pokt-network/poktroll/app" appparams "github.com/pokt-network/poktroll/app/params" appgateservercmd "github.com/pokt-network/poktroll/pkg/appgateserver/cmd" + relayercmd "github.com/pokt-network/poktroll/pkg/relayer/cmd" ) // NewRootCmd creates a new root command for a Cosmos SDK application @@ -142,6 +143,11 @@ func initRootCmd( addModuleInitFlags, ) + // add relayer command + rootCmd.AddCommand( + relayercmd.RelayerCmd(), + ) + // add keybase, auxiliary RPC, query, and tx child commands rootCmd.AddCommand( rpc.StatusCommand(), diff --git a/cmd/signals/on_exit.go b/cmd/signals/on_exit.go new file mode 100644 index 000000000..1106cce96 --- /dev/null +++ b/cmd/signals/on_exit.go @@ -0,0 +1,23 @@ +package signals + +import ( + "os" + "os/signal" +) + +// GoOnExitSignal calls the given callback when the process receives an interrupt +// or kill signal. +func GoOnExitSignal(onInterrupt func()) { + go func() { + // Set up sigCh to receive when this process receives an interrupt or + // kill signal. + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, os.Interrupt, os.Kill) + + // Block until we receive an interrupt or kill signal (OS-agnostic) + <-sigCh + + // Call the onInterrupt callback. + onInterrupt() + }() +} diff --git a/config.yml b/config.yml index f6e3c1e89..4dac5f556 100644 --- a/config.yml +++ b/config.yml @@ -100,6 +100,7 @@ genesis: - endpoints: - configs: [] rpc_type: JSON_RPC + # TODO_UPNEXT(@okdas): once `relayminer` service is added to tilt, this hostname should point to it instead of `localhost`. url: http://localhost:8545 service: id: anvil diff --git a/pkg/appgateserver/cmd/cmd.go b/pkg/appgateserver/cmd/cmd.go index 663562e98..7a541a511 100644 --- a/pkg/appgateserver/cmd/cmd.go +++ b/pkg/appgateserver/cmd/cmd.go @@ -7,24 +7,25 @@ import ( "log" "net/http" "net/url" - "os" - "os/signal" "cosmossdk.io/depinject" cosmosclient "github.com/cosmos/cosmos-sdk/client" - "github.com/cosmos/cosmos-sdk/client/flags" + cosmosflags "github.com/cosmos/cosmos-sdk/client/flags" "github.com/spf13/cobra" + "github.com/pokt-network/poktroll/cmd/signals" "github.com/pokt-network/poktroll/pkg/appgateserver" - blockclient "github.com/pokt-network/poktroll/pkg/client/block" - eventsquery "github.com/pokt-network/poktroll/pkg/client/events_query" + "github.com/pokt-network/poktroll/pkg/deps/config" ) +// We're `explicitly omitting default` so that the appgateserver crashes if these aren't specified. +const omittedDefaultFlagValue = "explicitly omitting default" + var ( flagSigningKey string flagSelfSigning bool flagListeningEndpoint string - flagCometWebsocketUrl string + flagQueryNodeUrl string ) func AppGateServerCmd() *cobra.Command { @@ -56,13 +57,15 @@ relays to the AppGate server and function as an Application, provided that: RunE: runAppGateServer, } + // Custom flags cmd.Flags().StringVar(&flagSigningKey, "signing-key", "", "The name of the key that will be used to sign relays") cmd.Flags().StringVar(&flagListeningEndpoint, "listening-endpoint", "http://localhost:42069", "The host and port that the appgate server will listen on") - cmd.Flags().StringVar(&flagCometWebsocketUrl, "comet-websocket-url", "ws://localhost:36657/websocket", "The URL of the comet websocket endpoint to communicate with the pocket blockchain") cmd.Flags().BoolVar(&flagSelfSigning, "self-signing", false, "Whether the server should sign all incoming requests with its own ring (for applications)") + cmd.Flags().StringVar(&flagQueryNodeUrl, "query-node", omittedDefaultFlagValue, "tcp://: to a full pocket node for reading data and listening for on-chain events") - cmd.Flags().String(flags.FlagKeyringBackend, "", "Select keyring's backend (os|file|kwallet|pass|test)") - cmd.Flags().String(flags.FlagNode, "tcp://localhost:36657", "The URL of the comet tcp endpoint to communicate with the pocket blockchain") + // Cosmos flags + cmd.Flags().String(cosmosflags.FlagKeyringBackend, "", "Select keyring's backend (os|file|kwallet|pass|test)") + cmd.Flags().String(cosmosflags.FlagNode, omittedDefaultFlagValue, "registering the default cosmos node flag; needed to initialize the cosmostx and query contexts correctly and uses flagQueryNodeUrl underneath") return cmd } @@ -72,18 +75,8 @@ func runAppGateServer(cmd *cobra.Command, _ []string) error { ctx, cancelCtx := context.WithCancel(cmd.Context()) defer cancelCtx() - // Handle interrupts in a goroutine. - go func() { - sigCh := make(chan os.Signal, 1) - signal.Notify(sigCh, os.Interrupt) - - // Block until we receive an interrupt or kill signal (OS-agnostic) - <-sigCh - log.Println("INFO: Interrupt signal received, shutting down...") - - // Signal goroutines to stop - cancelCtx() - }() + // Handle interrupt and kill signals asynchronously. + signals.GoOnExitSignal(cancelCtx) // Parse the listening endpoint. listeningUrl, err := url.Parse(flagListeningEndpoint) @@ -92,7 +85,7 @@ func runAppGateServer(cmd *cobra.Command, _ []string) error { } // Setup the AppGate server dependencies. - appGateServerDeps, err := setupAppGateServerDependencies(cmd, ctx, flagCometWebsocketUrl) + appGateServerDeps, err := setupAppGateServerDependencies(ctx, cmd) if err != nil { return fmt.Errorf("failed to setup AppGate server dependencies: %w", err) } @@ -127,21 +120,62 @@ func runAppGateServer(cmd *cobra.Command, _ []string) error { return nil } -func setupAppGateServerDependencies(cmd *cobra.Command, ctx context.Context, cometWebsocketUrl string) (depinject.Config, error) { - // Retrieve the client context for the chain interactions. - clientCtx := cosmosclient.GetClientContextFromCmd(cmd) +func setupAppGateServerDependencies( + ctx context.Context, + cmd *cobra.Command, +) (depinject.Config, error) { + pocketNodeWebsocketUrl, err := getPocketNodeWebsocketUrl() + if err != nil { + return nil, err + } - // Create the events client. - eventsQueryClient := eventsquery.NewEventsQueryClient(flagCometWebsocketUrl) + supplierFuncs := []config.SupplierFn{ + config.NewSupplyEventsQueryClientFn(pocketNodeWebsocketUrl), + config.NewSupplyBlockClientFn(pocketNodeWebsocketUrl), + newSupplyQueryClientContextFn(flagQueryNodeUrl), + } - // Create the block client. - log.Printf("INFO: Creating block client, using comet websocket URL: %s...", flagCometWebsocketUrl) - deps := depinject.Supply(eventsQueryClient) - blockClient, err := blockclient.NewBlockClient(ctx, deps, flagCometWebsocketUrl) + return config.SupplyConfig(ctx, cmd, supplierFuncs) +} + +// getPocketNodeWebsocketUrl returns the websocket URL of the Pocket Node to +// connect to for subscribing to on-chain events. +func getPocketNodeWebsocketUrl() (string, error) { + if flagQueryNodeUrl == omittedDefaultFlagValue { + return "", errors.New("missing required flag: --query-node") + } + + pocketNodeURL, err := url.Parse(flagQueryNodeUrl) if err != nil { - return nil, fmt.Errorf("failed to create block client: %w", err) + return "", err } - // Return the dependencies config. - return depinject.Supply(clientCtx, blockClient), nil + return fmt.Sprintf("ws://%s/websocket", pocketNodeURL.Host), nil +} + +// newSupplyQueryClientContextFn returns a new depinject.Config which is supplied with +// the given deps and a new cosmos ClientCtx +func newSupplyQueryClientContextFn(pocketQueryClientUrl string) config.SupplierFn { + return func( + _ context.Context, + deps depinject.Config, + cmd *cobra.Command, + ) (depinject.Config, error) { + // Set --node flag to the pocketQueryClientUrl for the client context + // This flag is read by cosmosclient.GetClientQueryContext. + err := cmd.Flags().Set(cosmosflags.FlagNode, pocketQueryClientUrl) + if err != nil { + return nil, err + } + + // Get the client context from the command. + queryClientCtx, err := cosmosclient.GetClientQueryContext(cmd) + if err != nil { + return nil, err + } + deps = depinject.Configs(deps, depinject.Supply( + queryClientCtx, + )) + return deps, nil + } } diff --git a/pkg/appgateserver/endpoint_selector.go b/pkg/appgateserver/endpoint_selector.go index 380c5dad5..c0cb22aee 100644 --- a/pkg/appgateserver/endpoint_selector.go +++ b/pkg/appgateserver/endpoint_selector.go @@ -31,7 +31,7 @@ func (app *appGateServer) getRelayerUrl( if endpoint.RpcType == rpcType { supplierUrl, err := url.Parse(endpoint.Url) if err != nil { - log.Printf("error parsing url: %s", err) + log.Printf("ERROR: error parsing url: %s", err) continue } return supplierUrl, supplier.Address, nil diff --git a/pkg/appgateserver/errors.go b/pkg/appgateserver/errors.go index 2c8f281bd..eecf99f44 100644 --- a/pkg/appgateserver/errors.go +++ b/pkg/appgateserver/errors.go @@ -10,4 +10,6 @@ var ( ErrAppGateMissingAppAddress = sdkerrors.Register(codespace, 4, "missing application address") ErrAppGateMissingSigningInformation = sdkerrors.Register(codespace, 5, "missing app client signing information") ErrAppGateMissingListeningEndpoint = sdkerrors.Register(codespace, 6, "missing app client listening endpoint") + ErrAppGateEmptyRelayResponse = sdkerrors.Register(codespace, 7, "empty relay response") + ErrAppGateHandleRelay = sdkerrors.Register(codespace, 8, "internal error handling relay request") ) diff --git a/pkg/appgateserver/jsonrpc.go b/pkg/appgateserver/jsonrpc.go index 80784fb44..915167dc8 100644 --- a/pkg/appgateserver/jsonrpc.go +++ b/pkg/appgateserver/jsonrpc.go @@ -25,23 +25,29 @@ func (app *appGateServer) handleJSONRPCRelay( // Read the request body bytes. payloadBz, err := io.ReadAll(request.Body) if err != nil { - return err + return ErrAppGateHandleRelay.Wrapf("reading relay request body: %s", err) } + log.Printf("DEBUG: relay request body: %s", string(payloadBz)) // Create the relay request payload. relayRequestPayload := &types.RelayRequest_JsonRpcPayload{} - relayRequestPayload.JsonRpcPayload.Unmarshal(payloadBz) + jsonPayload := &types.JSONRPCRequestPayload{} + cdc := types.ModuleCdc + if err := cdc.UnmarshalJSON(payloadBz, jsonPayload); err != nil { + return err + } + relayRequestPayload.JsonRpcPayload = jsonPayload session, err := app.getCurrentSession(ctx, appAddress, serviceId) if err != nil { - return err + return ErrAppGateHandleRelay.Wrapf("getting current session: %s", err) } log.Printf("DEBUG: Current session ID: %s", session.SessionId) // Get a supplier URL and address for the given service and session. supplierUrl, supplierAddress, err := app.getRelayerUrl(ctx, serviceId, sharedtypes.RPCType_JSON_RPC, session) if err != nil { - return err + return ErrAppGateHandleRelay.Wrapf("getting supplier URL: %s", err) } // Create the relay request. @@ -56,28 +62,32 @@ func (app *appGateServer) handleJSONRPCRelay( // Get the application's signer. signer, err := app.getRingSingerForAppAddress(ctx, appAddress) if err != nil { - return err + return ErrAppGateHandleRelay.Wrapf("getting signer: %s", err) } // Hash and sign the request's signable bytes. signableBz, err := relayRequest.GetSignableBytes() if err != nil { - return err + return ErrAppGateHandleRelay.Wrapf("getting signable bytes: %s", err) } hash := crypto.Sha256(signableBz) signature, err := signer.Sign(hash) if err != nil { - return err + return ErrAppGateHandleRelay.Wrapf("signing relay: %s", err) } relayRequest.Meta.Signature = signature // Marshal the relay request to bytes and create a reader to be used as an HTTP request body. - relayRequestBz, err := relayRequest.Marshal() + relayRequestBz, err := cdc.Marshal(relayRequest) if err != nil { - return err + return ErrAppGateHandleRelay.Wrapf("marshaling relay request: %s", err) } relayRequestReader := io.NopCloser(bytes.NewReader(relayRequestBz)) + var relayReq types.RelayRequest + if err := relayReq.Unmarshal(relayRequestBz); err != nil { + return ErrAppGateHandleRelay.Wrapf("unmarshaling relay response: %s", err) + } // Create the HTTP request to send the request to the relayer. relayHTTPRequest := &http.Request{ @@ -90,19 +100,19 @@ func (app *appGateServer) handleJSONRPCRelay( log.Printf("DEBUG: Sending signed relay request to %s", supplierUrl) relayHTTPResponse, err := http.DefaultClient.Do(relayHTTPRequest) if err != nil { - return err + return ErrAppGateHandleRelay.Wrapf("sending relay request: %s", err) } // Read the response body bytes. relayResponseBz, err := io.ReadAll(relayHTTPResponse.Body) if err != nil { - return err + return ErrAppGateHandleRelay.Wrapf("reading relay response body: %s", err) } // Unmarshal the response bytes into a RelayResponse. relayResponse := &types.RelayResponse{} if err := relayResponse.Unmarshal(relayResponseBz); err != nil { - return err + return ErrAppGateHandleRelay.Wrapf("unmarshaling relay response: %s", err) } // Verify the response signature. We use the supplier address that we got from @@ -111,20 +121,21 @@ func (app *appGateServer) handleJSONRPCRelay( // as in some relayer early failures, it may not be signed by the supplier. // TODO_IMPROVE: Add more logging & telemetry so we can get visibility and signal into // failed responses. - log.Println("DEBUG: Verifying signed relay response from...") if err := app.verifyResponse(ctx, supplierAddress, relayResponse); err != nil { - return err + // TODO_DISCUSS: should this be its own error type and asserted against in tests? + return ErrAppGateHandleRelay.Wrapf("verifying relay response signature: %s", err) } // Marshal the response payload to bytes to be sent back to the application. - var responsePayloadBz []byte - if _, err = relayResponse.Payload.MarshalTo(responsePayloadBz); err != nil { - return err + relayResponsePayloadBz, err := cdc.MarshalJSON(relayResponse.GetJsonRpcPayload()) + if err != nil { + return ErrAppGateHandleRelay.Wrapf("unmarshallig relay response: %s", err) } // Reply with the RelayResponse payload. - if _, err := writer.Write(relayRequestBz); err != nil { - return err + log.Printf("DEBUG: Writing relay response payload: %s", string(relayResponsePayloadBz)) + if _, err := writer.Write(relayResponsePayloadBz); err != nil { + return ErrAppGateHandleRelay.Wrapf("writing relay response payload: %s", err) } return nil diff --git a/pkg/appgateserver/relay_verifier.go b/pkg/appgateserver/relay_verifier.go index 712eda7f9..4f86e1cdd 100644 --- a/pkg/appgateserver/relay_verifier.go +++ b/pkg/appgateserver/relay_verifier.go @@ -4,6 +4,8 @@ import ( "context" "github.com/cometbft/cometbft/crypto" + "github.com/cosmos/cosmos-sdk/codec" + codectypes "github.com/cosmos/cosmos-sdk/codec/types" cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types" accounttypes "github.com/cosmos/cosmos-sdk/x/auth/types" @@ -23,6 +25,9 @@ func (app *appGateServer) verifyResponse( } // Extract the supplier's signature + if relayResponse.Meta == nil { + return ErrAppGateEmptyRelayResponse + } supplierSignature := relayResponse.Meta.SupplierSignature // Get the relay response signable bytes and hash them. @@ -60,12 +65,15 @@ func (app *appGateServer) getSupplierPubKeyFromAddress( } // Unmarshal the query response into a BaseAccount. - account := new(accounttypes.BaseAccount) - if err := account.Unmarshal(accQueryRes.Account.Value); err != nil { + var acc accounttypes.AccountI + reg := codectypes.NewInterfaceRegistry() + accounttypes.RegisterInterfaces(reg) + cdc := codec.NewProtoCodec(reg) + if err := cdc.UnpackAny(accQueryRes.Account, &acc); err != nil { return nil, err } - fetchedPubKey := account.GetPubKey() + fetchedPubKey := acc.GetPubKey() // Cache the retrieved public key. app.supplierAccountCache[supplierAddress] = fetchedPubKey diff --git a/pkg/appgateserver/server.go b/pkg/appgateserver/server.go index d6410021c..fe4f15000 100644 --- a/pkg/appgateserver/server.go +++ b/pkg/appgateserver/server.go @@ -225,7 +225,9 @@ func (app *appGateServer) replyWithError(writer http.ResponseWriter, err error) relayResponse := &types.RelayResponse{ Payload: &types.RelayResponse_JsonRpcPayload{ JsonRpcPayload: &types.JSONRPCResponsePayload{ - Id: make([]byte, 0), + // TODO_BLOCKER(@red-0ne): This MUST match the Id provided by the request. + // If JSON-RPC request is not unmarshaled yet (i.e. can't extract ID), it SHOULD be a random ID. + Id: 0, Jsonrpc: "2.0", Error: &types.JSONRPCResponseError{ // Using conventional error code indicating internal server error. diff --git a/pkg/client/block/client.go b/pkg/client/block/client.go index 375171d28..7ded373f2 100644 --- a/pkg/client/block/client.go +++ b/pkg/client/block/client.go @@ -75,6 +75,9 @@ type eventBytesToBlockMapFn = func( ) (client.Block, bool) // NewBlockClient creates a new block client from the given dependencies and cometWebsocketURL. +// +// Required dependencies: +// - client.EventsQueryClient func NewBlockClient( ctx context.Context, deps depinject.Config, diff --git a/pkg/client/events_query/client.go b/pkg/client/events_query/client.go index 88ace493f..34a7d35d2 100644 --- a/pkg/client/events_query/client.go +++ b/pkg/client/events_query/client.go @@ -62,6 +62,11 @@ func (ebc *eventsBytesAndConn) Close() { _ = ebc.conn.Close() } +// NewEventsQueryClient returns a new events query client which is used to +// subscribe to on-chain events matching the given query. +// +// Available options: +// - WithDialer func NewEventsQueryClient(cometWebsocketURL string, opts ...client.EventsQueryClientOption) client.EventsQueryClient { evtClient := &eventsQueryClient{ cometWebsocketURL: cometWebsocketURL, diff --git a/pkg/client/tx/client.go b/pkg/client/tx/client.go index a02b32542..39f5208e0 100644 --- a/pkg/client/tx/client.go +++ b/pkg/client/tx/client.go @@ -105,6 +105,15 @@ type TxEvent struct { // validateConfigAndSetDefaults method. // 5. Subscribes the client to its own transactions. This step might be // reconsidered for relocation to a potential Start() method in the future. +// +// Required dependencies: +// - client.TxContext +// - client.EventsQueryClient +// - client.BlockClient +// +// Available options: +// - WithSigningKeyName +// - WithCommitTimeoutHeightOffset func NewTxClient( ctx context.Context, deps depinject.Config, @@ -517,7 +526,7 @@ func (tClient *txClient) txEventFromEventBz( return either.Error[*TxEvent](ErrUnmarshalTx.Wrapf("%s", err)), true } - // For successful unmarshalling, return the TxEvent. + // For successful unmarshaling, return the TxEvent. return either.Success(txEvt), false } diff --git a/pkg/client/tx/context.go b/pkg/client/tx/context.go index eca32f943..2bfa0da5f 100644 --- a/pkg/client/tx/context.go +++ b/pkg/client/tx/context.go @@ -12,6 +12,7 @@ import ( authclient "github.com/cosmos/cosmos-sdk/x/auth/client" "github.com/pokt-network/poktroll/pkg/client" + "github.com/pokt-network/poktroll/pkg/relayer" ) var _ client.TxContext = (*cosmosTxContext)(nil) @@ -21,7 +22,7 @@ var _ client.TxContext = (*cosmosTxContext)(nil) type cosmosTxContext struct { // Holds cosmos-sdk client context. // (see: https://pkg.go.dev/github.com/cosmos/cosmos-sdk@v0.47.5/client#Context) - clientCtx cosmosclient.Context + clientCtx relayer.TxClientContext // Holds the cosmos-sdk transaction factory. // (see: https://pkg.go.dev/github.com/cosmos/cosmos-sdk@v0.47.5/client/tx#Factory) txFactory cosmostx.Factory @@ -30,6 +31,10 @@ type cosmosTxContext struct { // NewTxContext initializes a new cosmosTxContext with the given dependencies. // It uses depinject to populate its members and returns a client.TxContext // interface type. +// +// Required dependencies: +// - cosmosclient.Context +// - cosmostx.Factory func NewTxContext(deps depinject.Config) (client.TxContext, error) { txCtx := cosmosTxContext{} @@ -60,7 +65,7 @@ func (txCtx cosmosTxContext) SignTx( ) error { return authclient.SignTx( txCtx.txFactory, - txCtx.clientCtx, + cosmosclient.Context(txCtx.clientCtx), signingKeyName, txBuilder, offline, overwriteSig, @@ -80,8 +85,8 @@ func (txCtx cosmosTxContext) EncodeTx(txBuilder cosmosclient.TxBuilder) ([]byte, // BroadcastTx broadcasts the given transaction to the network, blocking until the check-tx // ABCI operation completes and returns a TxResponse of the transaction status at that point in time. func (txCtx cosmosTxContext) BroadcastTx(txBytes []byte) (*cosmostypes.TxResponse, error) { - return txCtx.clientCtx.BroadcastTxAsync(txBytes) - //return txCtx.clientCtx.BroadcastTxSync(txBytes) + clientCtx := cosmosclient.Context(txCtx.clientCtx) + return clientCtx.BroadcastTxAsync(txBytes) } // QueryTx queries the transaction based on its hash and optionally provides proof diff --git a/pkg/client/tx/errors.go b/pkg/client/tx/errors.go index 1e43f1d05..328d7a51d 100644 --- a/pkg/client/tx/errors.go +++ b/pkg/client/tx/errors.go @@ -32,7 +32,7 @@ var ( // byte data isn't recognized as a valid transaction event representation. ErrNonTxEventBytes = errorsmod.Register(codespace, 9, "attempted to deserialize non-tx event bytes") - // ErrUnmarshalTx signals a failure in the unmarshalling process of a transaction. + // ErrUnmarshalTx signals a failure in the unmarshaling process of a transaction. // This error is triggered when the system encounters issues translating a set of // bytes into the corresponding Tx structure or object. ErrUnmarshalTx = errorsmod.Register(codespace, 10, "failed to unmarshal tx") diff --git a/pkg/deps/config/config.go b/pkg/deps/config/config.go new file mode 100644 index 000000000..9621fa1de --- /dev/null +++ b/pkg/deps/config/config.go @@ -0,0 +1,73 @@ +package config + +import ( + "context" + + "cosmossdk.io/depinject" + "github.com/spf13/cobra" + + "github.com/pokt-network/poktroll/pkg/client/block" + eventsquery "github.com/pokt-network/poktroll/pkg/client/events_query" +) + +// SupplierFn is a function that is used to supply a depinject config. +type SupplierFn func( + context.Context, + depinject.Config, + *cobra.Command, +) (depinject.Config, error) + +// SupplyConfig supplies a depinject config by calling each of the supplied +// supplier functions in order and passing the result of each supplier to the +// next supplier, chaining them together. +func SupplyConfig( + ctx context.Context, + cmd *cobra.Command, + suppliers []SupplierFn, +) (deps depinject.Config, err error) { + // Initialize deps to with empty depinject config. + deps = depinject.Configs() + for _, supplyFn := range suppliers { + deps, err = supplyFn(ctx, deps, cmd) + if err != nil { + return nil, err + } + } + return deps, nil +} + +// NewSupplyEventsQueryClientFn constructs an EventsQueryClient instance and returns +// a new depinject.Config which is supplied with the given deps and the new +// EventsQueryClient. +func NewSupplyEventsQueryClientFn( + pocketNodeWebsocketUrl string, +) SupplierFn { + return func( + _ context.Context, + deps depinject.Config, + _ *cobra.Command, + ) (depinject.Config, error) { + eventsQueryClient := eventsquery.NewEventsQueryClient(pocketNodeWebsocketUrl) + + return depinject.Configs(deps, depinject.Supply(eventsQueryClient)), nil + } +} + +// NewSupplyBlockClientFn returns a function with constructs a BlockClient instance +// with the given nodeURL and returns a new +// depinject.Config which is supplied with the given deps and the new +// BlockClient. +func NewSupplyBlockClientFn(pocketNodeWebsocketUrl string) SupplierFn { + return func( + ctx context.Context, + deps depinject.Config, + _ *cobra.Command, + ) (depinject.Config, error) { + blockClient, err := block.NewBlockClient(ctx, deps, pocketNodeWebsocketUrl) + if err != nil { + return nil, err + } + + return depinject.Configs(deps, depinject.Supply(blockClient)), nil + } +} diff --git a/pkg/relayer/cmd/cmd.go b/pkg/relayer/cmd/cmd.go new file mode 100644 index 000000000..4f43c4ff2 --- /dev/null +++ b/pkg/relayer/cmd/cmd.go @@ -0,0 +1,392 @@ +package cmd + +import ( + "context" + "fmt" + "log" + "net/url" + + "cosmossdk.io/depinject" + cosmosclient "github.com/cosmos/cosmos-sdk/client" + cosmosflags "github.com/cosmos/cosmos-sdk/client/flags" + cosmostx "github.com/cosmos/cosmos-sdk/client/tx" + "github.com/spf13/cobra" + + "github.com/pokt-network/poktroll/cmd/signals" + "github.com/pokt-network/poktroll/pkg/client/block" + eventsquery "github.com/pokt-network/poktroll/pkg/client/events_query" + "github.com/pokt-network/poktroll/pkg/client/supplier" + "github.com/pokt-network/poktroll/pkg/client/tx" + "github.com/pokt-network/poktroll/pkg/deps/config" + "github.com/pokt-network/poktroll/pkg/relayer" + "github.com/pokt-network/poktroll/pkg/relayer/miner" + "github.com/pokt-network/poktroll/pkg/relayer/proxy" + "github.com/pokt-network/poktroll/pkg/relayer/session" +) + +// We're `explicitly omitting default` so the relayer crashes if these aren't specified. +const omittedDefaultFlagValue = "explicitly omitting default" + +// TODO_CONSIDERATION: Consider moving all flags defined in `/pkg` to a `flags.go` file. +var ( + flagSigningKeyName string + flagSmtStorePath string + flagNetworkNodeUrl string + flagQueryNodeUrl string +) + +func RelayerCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "relayminer", + Short: "Run a relay miner", + Long: `Run a relay miner. The relay miner process configures and starts +relay servers for each service the supplier actor identified by --signing-key is +staked for (configured on-chain). + +Relay requests received by the relay servers are validated and proxied to their +respective service endpoints, maintained by the relayer off-chain. The responses +are then signed and sent back to the requesting application. + +For each successfully served relay, the miner will hash and compare its difficulty +against an on-chain threshold. If the difficulty is sufficient, it is applicable +to relay volume and therefore rewards. Such relays are inserted into and persisted +via an SMT KV store. The miner will monitor the current block height and periodically +submit claim and proof messages according to the protocol as sessions become eligible +for such operations.`, + RunE: runRelayer, + } + + cmd.Flags().String(cosmosflags.FlagKeyringBackend, "", "Select keyring's backend (os|file|kwallet|pass|test)") + + // TODO_TECHDEBT: integrate these flags with the client context (i.e. cosmosflags, config, viper, etc.) + // This is simpler to do with server-side configs (see rootCmd#PersistentPreRunE) and requires more effort than currently justifiable. + cmd.Flags().StringVar(&flagSigningKeyName, "signing-key", "", "Name of the key to sign transactions") + // TODO_TECHDEBT(#137): This, alongside other flags, should be part of a config file suppliers provide. + cmd.Flags().StringVar(&flagSmtStorePath, "smt-store", "smt", "Path to where the data backing SMT KV store exists on disk") + // Communication flags + cmd.Flags().StringVar(&flagNetworkNodeUrl, "network-node", omittedDefaultFlagValue, "tcp://: to a pocket node that gossips transactions throughout the network (may or may not be the sequencer") + cmd.Flags().StringVar(&flagQueryNodeUrl, "query-node", omittedDefaultFlagValue, "tcp://: to a full pocket node for reading data and listening for on-chain events") + cmd.Flags().String(cosmosflags.FlagNode, omittedDefaultFlagValue, "registering the default cosmos node flag; needed to initialize the cosmostx and query contexts correctly and uses flagQueryNodeUrl underneath") + + return cmd +} + +func runRelayer(cmd *cobra.Command, _ []string) error { + ctx, cancelCtx := context.WithCancel(cmd.Context()) + // Ensure context cancellation. + defer cancelCtx() + + // Handle interrupt and kill signals asynchronously. + signals.GoOnExitSignal(cancelCtx) + + // Sets up the following dependencies: + // Miner, EventsQueryClient, BlockClient, cosmosclient.Context, TxFactory, + // TxContext, TxClient, SupplierClient, RelayerProxy, RelayerSessionsManager. + deps, err := setupRelayerDependencies(ctx, cmd) + if err != nil { + return err + } + + relayMiner, err := relayer.NewRelayMiner(ctx, deps) + if err != nil { + return err + } + + // Start the relay miner + log.Println("INFO: Starting relay miner...") + if err := relayMiner.Start(ctx); err != nil { + return err + } + + log.Println("INFO: Relay miner stopped; exiting") + return nil +} + +// setupRelayerDependencies sets up all the dependencies the relay miner needs +// to run by building the dependency tree from the leaves up, incrementally +// supplying each component to an accumulating depinject.Config: +// Miner, EventsQueryClient, BlockClient, cosmosclient.Context, TxFactory, TxContext, +// TxClient, SupplierClient, RelayerProxy, RelayerSessionsManager. +func setupRelayerDependencies( + ctx context.Context, + cmd *cobra.Command, +) (deps depinject.Config, err error) { + pocketNodeWebsocketUrl, err := getPocketNodeWebsocketUrl() + if err != nil { + return nil, err + } + + supplierFuncs := []config.SupplierFn{ + config.NewSupplyEventsQueryClientFn(pocketNodeWebsocketUrl), // leaf + config.NewSupplyBlockClientFn(pocketNodeWebsocketUrl), + supplyMiner, // leaf + supplyQueryClientContext, // leaf + supplyTxClientContext, // leaf + supplyTxFactory, + supplyTxContext, + supplyTxClient, + supplySupplierClient, + supplyRelayerProxy, + supplyRelayerSessionsManager, + } + + return config.SupplyConfig(ctx, cmd, supplierFuncs) +} + +// getPocketNodeWebsocketUrl returns the websocket URL of the Pocket Node to +// connect to for subscribing to on-chain events. +func getPocketNodeWebsocketUrl() (string, error) { + if flagQueryNodeUrl == omittedDefaultFlagValue { + return "", fmt.Errorf("--query-node flag is required") + } + + pocketNodeURL, err := url.Parse(flagQueryNodeUrl) + if err != nil { + return "", err + } + + return fmt.Sprintf("ws://%s/websocket", pocketNodeURL.Host), nil +} + +// newSupplyEventsQueryClientFn constructs an EventsQueryClient instance and returns +// a new depinject.Config which is supplied with the given deps and the new +// EventsQueryClient. +func newSupplyEventsQueryClientFn( + pocketNodeWebsocketUrl string, +) config.SupplierFn { + return func( + _ context.Context, + deps depinject.Config, + _ *cobra.Command, + ) (depinject.Config, error) { + eventsQueryClient := eventsquery.NewEventsQueryClient(pocketNodeWebsocketUrl) + + return depinject.Configs(deps, depinject.Supply(eventsQueryClient)), nil + } +} + +// newSupplyBlockClientFn returns a function with constructs a BlockClient instance +// with the given nodeURL and returns a new +// depinject.Config which is supplied with the given deps and the new +// BlockClient. +func newSupplyBlockClientFn(pocketNodeWebsocketUrl string) config.SupplierFn { + return func( + ctx context.Context, + deps depinject.Config, + _ *cobra.Command, + ) (depinject.Config, error) { + blockClient, err := block.NewBlockClient(ctx, deps, pocketNodeWebsocketUrl) + if err != nil { + return nil, err + } + + return depinject.Configs(deps, depinject.Supply(blockClient)), nil + } +} + +// supplyMiner constructs a Miner instance and returns a new depinject.Config +// which is supplied with the given deps and the new Miner. +func supplyMiner( + _ context.Context, + deps depinject.Config, + _ *cobra.Command, +) (depinject.Config, error) { + mnr, err := miner.NewMiner() + if err != nil { + return nil, err + } + + return depinject.Configs(deps, depinject.Supply(mnr)), nil +} + +// supplyQueryClientContext returns a function with constructs a ClientContext +// instance with the given cmd and returns a new depinject.Config which is +// supplied with the given deps and the new ClientContext. +func supplyQueryClientContext( + _ context.Context, + deps depinject.Config, + cmd *cobra.Command, +) (depinject.Config, error) { + // Set --node flag to the --query-node for the client context + // This flag is read by cosmosclient.GetClientQueryContext. + err := cmd.Flags().Set(cosmosflags.FlagNode, flagQueryNodeUrl) + if err != nil { + return nil, err + } + + // NB: Currently, the implementations of GetClientTxContext() and + // GetClientQueryContext() are identical, allowing for their interchangeable + // use in both querying and transaction operations. However, in order to support + // independent configuration of client contexts for distinct querying and + // transacting purposes. E.g.: transactions are dispatched to the sequencer + // while queries are handled by a trusted full-node. + queryClientCtx, err := cosmosclient.GetClientQueryContext(cmd) + if err != nil { + return nil, err + } + deps = depinject.Configs(deps, depinject.Supply( + relayer.QueryClientContext(queryClientCtx), + )) + return deps, nil +} + +// supplyTxClientContext constructs a cosmosclient.Context instance and returns a +// new depinject.Config which is supplied with the given deps and the new +// cosmosclient.Context. +func supplyTxClientContext( + _ context.Context, + deps depinject.Config, + cmd *cobra.Command, +) (depinject.Config, error) { + // Set --node flag to the --network-node for this client context. + // This flag is read by cosmosclient.GetClientTxContext. + err := cmd.Flags().Set(cosmosflags.FlagNode, flagNetworkNodeUrl) + if err != nil { + return nil, err + } + + // NB: Currently, the implementations of GetClientTxContext() and + // GetClientQueryContext() are identical, allowing for their interchangeable + // use in both querying and transaction operations. However, in order to support + // independent configuration of client contexts for distinct querying and + // transacting purposes. E.g.: transactions are dispatched to the sequencer + // while queries are handled by a trusted full-node. + txClientCtx, err := cosmosclient.GetClientTxContext(cmd) + if err != nil { + return nil, err + } + deps = depinject.Configs(deps, depinject.Supply( + relayer.TxClientContext(txClientCtx), + )) + return deps, nil +} + +// supplyTxFactory constructs a cosmostx.Factory instance and returns a new +// depinject.Config which is supplied with the given deps and the new +// cosmostx.Factory. +func supplyTxFactory( + _ context.Context, + deps depinject.Config, + cmd *cobra.Command, +) (depinject.Config, error) { + var txClientCtx relayer.TxClientContext + if err := depinject.Inject(deps, &txClientCtx); err != nil { + return nil, err + } + + clientCtx := cosmosclient.Context(txClientCtx) + clientFactory, err := cosmostx.NewFactoryCLI(clientCtx, cmd.Flags()) + if err != nil { + return nil, err + } + + return depinject.Configs(deps, depinject.Supply(clientFactory)), nil +} + +func supplyTxContext( + _ context.Context, + deps depinject.Config, + _ *cobra.Command, +) (depinject.Config, error) { + txContext, err := tx.NewTxContext(deps) + if err != nil { + return nil, err + } + + return depinject.Configs(deps, depinject.Supply(txContext)), nil +} + +// supplyTxClient constructs a TxClient instance and returns a new +// depinject.Config which is supplied with the given deps and the new TxClient. +func supplyTxClient( + ctx context.Context, + deps depinject.Config, + _ *cobra.Command, +) (depinject.Config, error) { + txClient, err := tx.NewTxClient( + ctx, + deps, + tx.WithSigningKeyName(flagSigningKeyName), + // TODO_TECHDEBT: populate this from some config. + tx.WithCommitTimeoutBlocks(tx.DefaultCommitTimeoutHeightOffset), + ) + if err != nil { + return nil, err + } + + return depinject.Configs(deps, depinject.Supply(txClient)), nil +} + +// supplySupplierClient constructs a SupplierClient instance and returns a new +// depinject.Config which is supplied with the given deps and the new +// SupplierClient. +func supplySupplierClient( + _ context.Context, + deps depinject.Config, + _ *cobra.Command, +) (depinject.Config, error) { + supplierClient, err := supplier.NewSupplierClient( + deps, + supplier.WithSigningKeyName(flagSigningKeyName), + ) + if err != nil { + return nil, err + } + + return depinject.Configs(deps, depinject.Supply(supplierClient)), nil +} + +// supplyRelayerProxy constructs a RelayerProxy instance and returns a new +// depinject.Config which is supplied with the given deps and the new +// RelayerProxy. +func supplyRelayerProxy( + _ context.Context, + deps depinject.Config, + _ *cobra.Command, +) (depinject.Config, error) { + // TODO_BLOCKER:(#137, @red-0ne): This MUST be populated via the `relayer.json` config file + // TODO_UPNEXT(@okdas): this hostname should be updated to match that of the in-tilt anvil service. + proxyServiceURL, err := url.Parse("http://localhost:8547/") + if err != nil { + return nil, err + } + + // TODO_TECHDEBT(#137, #130): Once the `relayer.json` config file is implemented AND a local LLM RPC service + // is supported on LocalNet, this needs to be expanded to include more than one service. The ability to support + // multiple services is already in place but currently (as seen below) is hardcoded. + proxiedServiceEndpoints := map[string]url.URL{ + "anvil": *proxyServiceURL, + } + + relayerProxy, err := proxy.NewRelayerProxy( + deps, + proxy.WithSigningKeyName(flagSigningKeyName), + proxy.WithProxiedServicesEndpoints(proxiedServiceEndpoints), + ) + if err != nil { + return nil, err + } + + return depinject.Configs(deps, depinject.Supply(relayerProxy)), nil +} + +// supplyRelayerSessionsManager constructs a RelayerSessionsManager instance +// and returns a new depinject.Config which is supplied with the given deps and +// the new RelayerSessionsManager. +// See the comment next to `flagQueryNodeUrl` (if it still exists) on how/why +// we have multiple flags pointing to different node types. +func supplyRelayerSessionsManager( + ctx context.Context, + deps depinject.Config, + _ *cobra.Command, +) (depinject.Config, error) { + relayerSessionsManager, err := session.NewRelayerSessions( + ctx, deps, + session.WithStoresDirectory(flagSmtStorePath), + ) + if err != nil { + return nil, err + } + + return depinject.Configs(deps, depinject.Supply(relayerSessionsManager)), nil +} diff --git a/pkg/relayer/interface.go b/pkg/relayer/interface.go index 539361f2d..d58d8149a 100644 --- a/pkg/relayer/interface.go +++ b/pkg/relayer/interface.go @@ -3,6 +3,7 @@ package relayer import ( "context" + "github.com/cosmos/cosmos-sdk/client" "github.com/pokt-network/smt" "github.com/pokt-network/poktroll/pkg/observable" @@ -11,6 +12,18 @@ import ( sharedtypes "github.com/pokt-network/poktroll/x/shared/types" ) +// TxClientContext is used to distinguish a cosmosclient.Context intended for use +// in transactions from others. +// This type is intentionally not an alias in order to make this distinction clear +// to the dependency injector +type TxClientContext client.Context + +// QueryClientContext is used to distinguish a cosmosclient.Context intended for use +// in queries from others. +// This type is intentionally not an alias in order to make this distinction clear +// to the dependency injector +type QueryClientContext client.Context + // Miner is responsible for observing servedRelayObs, hashing and checking the // difficulty of each, finally publishing those with sufficient difficulty to // minedRelayObs as they are applicable for relay volume. diff --git a/pkg/relayer/protocol/block_heights.go b/pkg/relayer/protocol/block_heights.go index b372376f7..fcf27ec10 100644 --- a/pkg/relayer/protocol/block_heights.go +++ b/pkg/relayer/protocol/block_heights.go @@ -14,7 +14,7 @@ import ( // TODO_TEST(@bryanchriswhite): Add test coverage and more logs func GetEarliestCreateClaimHeight(createClaimWindowStartBlock client.Block) int64 { createClaimWindowStartBlockHash := createClaimWindowStartBlock.Hash() - log.Printf("using createClaimWindowStartBlock %d's hash %x as randomness", createClaimWindowStartBlock.Height(), createClaimWindowStartBlockHash) + log.Printf("DEBUG: using createClaimWindowStartBlock %d's hash %x as randomness", createClaimWindowStartBlock.Height(), createClaimWindowStartBlockHash) rngSeed, _ := binary.Varint(createClaimWindowStartBlockHash) randomNumber := rand.NewSource(rngSeed).Int63() @@ -32,7 +32,7 @@ func GetEarliestCreateClaimHeight(createClaimWindowStartBlock client.Block) int6 // TODO_TEST(@bryanchriswhite): Add test coverage and more logs func GetEarliestSubmitProofHeight(submitProofWindowStartBlock client.Block) int64 { earliestSubmitProofBlockHash := submitProofWindowStartBlock.Hash() - log.Printf("using submitProofWindowStartBlock %d's hash %x as randomness", submitProofWindowStartBlock.Height(), earliestSubmitProofBlockHash) + log.Printf("DEBUG: using submitProofWindowStartBlock %d's hash %x as randomness", submitProofWindowStartBlock.Height(), earliestSubmitProofBlockHash) rngSeed, _ := binary.Varint(earliestSubmitProofBlockHash) randomNumber := rand.NewSource(rngSeed).Int63() diff --git a/pkg/relayer/proxy/error_reply.go b/pkg/relayer/proxy/error_reply.go index d881057a0..617f3b917 100644 --- a/pkg/relayer/proxy/error_reply.go +++ b/pkg/relayer/proxy/error_reply.go @@ -12,11 +12,13 @@ import ( // the caller pass it along with the error if available. // TODO_TECHDEBT: This method should be aware of the nature of the error to use the appropriate JSONRPC // Code, Message and Data. Possibly by augmenting the passed in error with the adequate information. -func (j *jsonRPCServer) replyWithError(writer http.ResponseWriter, err error) { +func (jsrv *jsonRPCServer) replyWithError(writer http.ResponseWriter, err error) { relayResponse := &types.RelayResponse{ Payload: &types.RelayResponse_JsonRpcPayload{ JsonRpcPayload: &types.JSONRPCResponsePayload{ - Id: make([]byte, 0), + // TODO_BLOCKER(@red-0ne): This MUST match the Id provided by the request. + // If JSON-RPC request is not unmarshaled yet (i.e. can't extract ID), it SHOULD be a random ID. + Id: 0, Jsonrpc: "2.0", Error: &types.JSONRPCResponseError{ // Using conventional error code indicating internal server error. diff --git a/pkg/relayer/proxy/jsonrpc.go b/pkg/relayer/proxy/jsonrpc.go index 8ea953d6f..113d6285e 100644 --- a/pkg/relayer/proxy/jsonrpc.go +++ b/pkg/relayer/proxy/jsonrpc.go @@ -19,10 +19,6 @@ type jsonRPCServer struct { // service is the service that the server is responsible for. service *sharedtypes.Service - // serverEndpoint is the advertised endpoint configuration that the server uses to - // listen for incoming relay requests. - serverEndpoint *sharedtypes.SupplierEndpoint - // proxiedServiceEndpoint is the address of the proxied service that the server relays requests to. proxiedServiceEndpoint url.URL @@ -43,15 +39,14 @@ type jsonRPCServer struct { // a RelayServer that listens to incoming RelayRequests. func NewJSONRPCServer( service *sharedtypes.Service, - supplierEndpoint *sharedtypes.SupplierEndpoint, + supplierEndpointHost string, proxiedServiceEndpoint url.URL, servedRelaysProducer chan<- *types.Relay, proxy relayer.RelayerProxy, ) relayer.RelayServer { return &jsonRPCServer{ service: service, - serverEndpoint: supplierEndpoint, - server: &http.Server{Addr: supplierEndpoint.Url}, + server: &http.Server{Addr: supplierEndpointHost}, relayerProxy: proxy, proxiedServiceEndpoint: proxiedServiceEndpoint, servedRelaysProducer: servedRelaysProducer, @@ -67,6 +62,9 @@ func (jsrv *jsonRPCServer) Start(ctx context.Context) error { jsrv.server.Shutdown(ctx) }() + // Set the HTTP handler. + jsrv.server.Handler = jsrv + return jsrv.server.ListenAndServe() } @@ -76,8 +74,8 @@ func (jsrv *jsonRPCServer) Stop(ctx context.Context) error { } // Service returns the JSON-RPC service. -func (j *jsonRPCServer) Service() *sharedtypes.Service { - return j.service +func (jsrv *jsonRPCServer) Service() *sharedtypes.Service { + return jsrv.service } // ServeHTTP listens for incoming relay requests. It implements the respective @@ -86,6 +84,9 @@ func (j *jsonRPCServer) Service() *sharedtypes.Service { // (see https://pkg.go.dev/net/http#Handler) func (jsrv *jsonRPCServer) ServeHTTP(writer http.ResponseWriter, request *http.Request) { ctx := request.Context() + + log.Printf("DEBUG: Serving JSON-RPC relay request...") + // Relay the request to the proxied service and build the response that will be sent back to the client. relay, err := jsrv.serveHTTP(ctx, request) if err != nil { @@ -107,7 +108,7 @@ func (jsrv *jsonRPCServer) ServeHTTP(writer http.ResponseWriter, request *http.R relay.Res.Meta.SessionHeader.ApplicationAddress, relay.Res.Meta.SessionHeader.Service.Id, relay.Res.Meta.SessionHeader.SessionStartBlockHeight, - jsrv.serverEndpoint.Url, + jsrv.server.Addr, ) // Emit the relay to the servedRelays observable. @@ -117,6 +118,7 @@ func (jsrv *jsonRPCServer) ServeHTTP(writer http.ResponseWriter, request *http.R // serveHTTP holds the underlying logic of ServeHTTP. func (jsrv *jsonRPCServer) serveHTTP(ctx context.Context, request *http.Request) (*types.Relay, error) { // Extract the relay request from the request body. + log.Printf("DEBUG: Extracting relay request from request body...") relayRequest, err := jsrv.newRelayRequest(request) if err != nil { return nil, err @@ -136,25 +138,27 @@ func (jsrv *jsonRPCServer) serveHTTP(ctx context.Context, request *http.Request) // Get the relayRequest payload's `io.ReadCloser` to add it to the http.Request // that will be sent to the proxied (i.e. staked for) service. // (see https://pkg.go.dev/net/http#Request) Body field type. - var payloadBz []byte - if _, err = relayRequest.Payload.MarshalTo(payloadBz); err != nil { + log.Printf("DEBUG: Getting relay request payload...") + cdc := types.ModuleCdc + payloadBz, err := cdc.MarshalJSON(relayRequest.GetJsonRpcPayload()) + if err != nil { return nil, err } requestBodyReader := io.NopCloser(bytes.NewBuffer(payloadBz)) + log.Printf("DEBUG: Relay request payload: %s", string(payloadBz)) // Build the request to be sent to the native service by substituting // the destination URL's host with the native service's listen address. - destinationURL, err := url.Parse(request.URL.String()) + log.Printf("DEBUG: Building relay request to native service %s...", jsrv.proxiedServiceEndpoint.String()) if err != nil { return nil, err } - destinationURL.Host = jsrv.proxiedServiceEndpoint.Host relayHTTPRequest := &http.Request{ Method: request.Method, Header: request.Header, - URL: destinationURL, - Host: destinationURL.Host, + URL: &jsrv.proxiedServiceEndpoint, + Host: jsrv.proxiedServiceEndpoint.Host, Body: requestBodyReader, } @@ -167,6 +171,7 @@ func (jsrv *jsonRPCServer) serveHTTP(ctx context.Context, request *http.Request) // Build the relay response from the native service response // Use relayRequest.Meta.SessionHeader on the relayResponse session header since it was verified to be valid // and has to be the same as the relayResponse session header. + log.Printf("DEBUG: Building relay response from native service response...") relayResponse, err := jsrv.newRelayResponse(httpResponse, relayRequest.Meta.SessionHeader) if err != nil { return nil, err @@ -176,8 +181,9 @@ func (jsrv *jsonRPCServer) serveHTTP(ctx context.Context, request *http.Request) } // sendRelayResponse marshals the relay response and sends it to the client. -func (j *jsonRPCServer) sendRelayResponse(relayResponse *types.RelayResponse, writer http.ResponseWriter) error { - relayResponseBz, err := relayResponse.Marshal() +func (jsrv *jsonRPCServer) sendRelayResponse(relayResponse *types.RelayResponse, writer http.ResponseWriter) error { + cdc := types.ModuleCdc + relayResponseBz, err := cdc.Marshal(relayResponse) if err != nil { return err } diff --git a/pkg/relayer/proxy/proxy.go b/pkg/relayer/proxy/proxy.go index 9ed38e963..b6c2a5f68 100644 --- a/pkg/relayer/proxy/proxy.go +++ b/pkg/relayer/proxy/proxy.go @@ -7,7 +7,7 @@ import ( "cosmossdk.io/depinject" ringtypes "github.com/athanorlabs/go-dleq/types" - sdkclient "github.com/cosmos/cosmos-sdk/client" + cosmosclient "github.com/cosmos/cosmos-sdk/client" "github.com/cosmos/cosmos-sdk/crypto/keyring" accounttypes "github.com/cosmos/cosmos-sdk/x/auth/types" "golang.org/x/sync/errgroup" @@ -72,9 +72,9 @@ type relayerProxy struct { // servedRelays is an observable that notifies the miner about the relays that have been served. servedRelays observable.Observable[*types.Relay] - // servedRelaysProducer is a channel that emits the relays that have been served so that the + // servedRelaysPublishCh is a channel that emits the relays that have been served so that the // servedRelays observable can fan out the notifications to its subscribers. - servedRelaysProducer chan<- *types.Relay + servedRelaysPublishCh chan<- *types.Relay // ringCache is a cache of the public keys used to create the ring for a given application // they are stored in a map of application address to a slice of points on the secp256k1 curve @@ -83,7 +83,7 @@ type relayerProxy struct { ringCacheMutex *sync.RWMutex // clientCtx is the Cosmos' client context used to build the needed query clients and unmarshal their replies. - clientCtx sdkclient.Context + clientCtx relayer.QueryClientContext // supplierAddress is the address of the supplier that the relayer proxy is running for. supplierAddress string @@ -91,6 +91,14 @@ type relayerProxy struct { // NewRelayerProxy creates a new relayer proxy with the given dependencies or returns // an error if the dependencies fail to resolve or the options are invalid. +// +// Required dependencies: +// - cosmosclient.Context +// - client.BlockClient +// +// Available options: +// - WithSigningKeyName +// - WithProxiedServicesEndpoints func NewRelayerProxy( deps depinject.Config, opts ...relayer.RelayerProxyOption, @@ -105,14 +113,18 @@ func NewRelayerProxy( return nil, err } + clientCtx := cosmosclient.Context(rp.clientCtx) servedRelays, servedRelaysProducer := channel.NewObservable[*types.Relay]() rp.servedRelays = servedRelays - rp.servedRelaysProducer = servedRelaysProducer - rp.accountsQuerier = accounttypes.NewQueryClient(rp.clientCtx) - rp.supplierQuerier = suppliertypes.NewQueryClient(rp.clientCtx) - rp.sessionQuerier = sessiontypes.NewQueryClient(rp.clientCtx) + rp.servedRelaysPublishCh = servedRelaysProducer + rp.accountsQuerier = accounttypes.NewQueryClient(clientCtx) + rp.supplierQuerier = suppliertypes.NewQueryClient(clientCtx) + rp.sessionQuerier = sessiontypes.NewQueryClient(clientCtx) + rp.applicationQuerier = apptypes.NewQueryClient(clientCtx) rp.keyring = rp.clientCtx.Keyring + rp.ringCache = make(map[string][]ringtypes.Point) // the key is the appAddress + rp.ringCacheMutex = &sync.RWMutex{} for _, opt := range opts { opt(rp) @@ -125,8 +137,9 @@ func NewRelayerProxy( return rp, nil } -// Start concurrently starts all advertised relay servers and returns an error if any of them fails to start. -// This method is blocking as long as all RelayServers are running. +// Start concurrently starts all advertised relay services and returns an error +// if any of them errors. +// This method IS BLOCKING until all RelayServers are stopped. func (rp *relayerProxy) Start(ctx context.Context) error { // The provided services map is built from the supplier's on-chain advertised information, // which is a runtime parameter that can be changed by the supplier. diff --git a/pkg/relayer/proxy/relay_builders.go b/pkg/relayer/proxy/relay_builders.go index beb400cad..c2e9775c4 100644 --- a/pkg/relayer/proxy/relay_builders.go +++ b/pkg/relayer/proxy/relay_builders.go @@ -2,6 +2,7 @@ package proxy import ( "io" + "log" "net/http" "github.com/pokt-network/poktroll/x/service/types" @@ -9,24 +10,25 @@ import ( ) // newRelayRequest builds a RelayRequest from an http.Request. -func (j *jsonRPCServer) newRelayRequest(request *http.Request) (*types.RelayRequest, error) { +func (jsrv *jsonRPCServer) newRelayRequest(request *http.Request) (*types.RelayRequest, error) { requestBz, err := io.ReadAll(request.Body) if err != nil { return nil, err } - var relayRequest types.RelayRequest - if err := relayRequest.Unmarshal(requestBz); err != nil { + log.Printf("DEBUG: Unmarshaling relay request...") + var relayReq types.RelayRequest + if err := relayReq.Unmarshal(requestBz); err != nil { return nil, err } - return &relayRequest, nil + return &relayReq, nil } // newRelayResponse builds a RelayResponse from an http.Response and a SessionHeader. // It also signs the RelayResponse and assigns it to RelayResponse.Meta.SupplierSignature. // If the response has a non-nil body, it will be parsed as a JSONRPCResponsePayload. -func (j *jsonRPCServer) newRelayResponse( +func (jsrv *jsonRPCServer) newRelayResponse( response *http.Response, sessionHeader *sessiontypes.SessionHeader, ) (*types.RelayResponse, error) { @@ -39,15 +41,19 @@ func (j *jsonRPCServer) newRelayResponse( return nil, err } - jsonRPCResponse := &types.JSONRPCResponsePayload{} - if err := jsonRPCResponse.Unmarshal(responseBz); err != nil { + log.Printf("DEBUG: Unmarshaling relay response...") + relayResponsePayload := &types.RelayResponse_JsonRpcPayload{} + jsonPayload := &types.JSONRPCResponsePayload{} + cdc := types.ModuleCdc + if err := cdc.UnmarshalJSON(responseBz, jsonPayload); err != nil { return nil, err } + relayResponsePayload.JsonRpcPayload = jsonPayload - relayResponse.Payload = &types.RelayResponse_JsonRpcPayload{JsonRpcPayload: jsonRPCResponse} + relayResponse.Payload = &types.RelayResponse_JsonRpcPayload{JsonRpcPayload: jsonPayload} // Sign the relay response and add the signature to the relay response metadata - if err = j.relayerProxy.SignRelayResponse(relayResponse); err != nil { + if err = jsrv.relayerProxy.SignRelayResponse(relayResponse); err != nil { return nil, err } diff --git a/pkg/relayer/proxy/relay_verifier.go b/pkg/relayer/proxy/relay_verifier.go index e64955d55..ba8bf7e32 100644 --- a/pkg/relayer/proxy/relay_verifier.go +++ b/pkg/relayer/proxy/relay_verifier.go @@ -2,6 +2,7 @@ package proxy import ( "context" + "log" sdkerrors "cosmossdk.io/errors" ring_secp256k1 "github.com/athanorlabs/go-dleq/secp256k1" @@ -20,6 +21,7 @@ func (rp *relayerProxy) VerifyRelayRequest( service *sharedtypes.Service, ) error { // extract the relay request's ring signature + log.Printf("DEBUG: Verifying relay request signature...") signature := relayRequest.Meta.Signature if signature == nil { return sdkerrors.Wrapf( @@ -73,6 +75,7 @@ func (rp *relayerProxy) VerifyRelayRequest( } // Query for the current session to check if relayRequest sessionId matches the current session. + log.Printf("DEBUG: Verifying relay request session...") currentBlock := rp.blockClient.LatestBlock(ctx) sessionQuery := &sessiontypes.QueryGetSessionRequest{ ApplicationAddress: appAddress, diff --git a/pkg/relayer/proxy/rings.go b/pkg/relayer/proxy/rings.go index 59a19ae70..86651247d 100644 --- a/pkg/relayer/proxy/rings.go +++ b/pkg/relayer/proxy/rings.go @@ -6,9 +6,12 @@ package proxy import ( "context" "fmt" + "log" ring_secp256k1 "github.com/athanorlabs/go-dleq/secp256k1" ringtypes "github.com/athanorlabs/go-dleq/types" + "github.com/cosmos/cosmos-sdk/codec" + codectypes "github.com/cosmos/cosmos-sdk/codec/types" "github.com/cosmos/cosmos-sdk/crypto/keys/secp256k1" accounttypes "github.com/cosmos/cosmos-sdk/x/auth/types" ring "github.com/noot/ring-go" @@ -28,7 +31,10 @@ func (rp *relayerProxy) getRingForAppAddress(ctx context.Context, appAddress str var err error if !ok { // if the ring is not in the cache, get it from the application module + log.Printf("DEBUG: Ring not found in cache for %s, fetching from application module...", appAddress) points, err = rp.getDelegatedPubKeysForAddress(ctx, appAddress) + } else { + log.Printf("DEBUG: Ring found in cache for %s", appAddress) } if err != nil { return nil, err @@ -50,8 +56,8 @@ func (rp *relayerProxy) getDelegatedPubKeysForAddress( ctx context.Context, appAddress string, ) ([]ringtypes.Point, error) { - rp.ringCacheMutex.RLock() - defer rp.ringCacheMutex.RUnlock() + rp.ringCacheMutex.Lock() + defer rp.ringCacheMutex.Unlock() // get the application's on chain state req := &apptypes.QueryGetApplicationRequest{Address: appAddress} @@ -101,8 +107,11 @@ func (rp *relayerProxy) addressesToPoints(ctx context.Context, addresses []strin if err != nil { return nil, fmt.Errorf("unable to get account for address: %s [%w]", addr, err) } - acc := new(accounttypes.BaseAccount) - if err := acc.Unmarshal(pubKeyRes.Account.Value); err != nil { + var acc accounttypes.AccountI + reg := codectypes.NewInterfaceRegistry() + accounttypes.RegisterInterfaces(reg) + cdc := codec.NewProtoCodec(reg) + if err := cdc.UnpackAny(pubKeyRes.Account, &acc); err != nil { return nil, fmt.Errorf("unable to deserialise account for address: %s [%w]", addr, err) } key := acc.GetPubKey() diff --git a/pkg/relayer/proxy/server_builder.go b/pkg/relayer/proxy/server_builder.go index a5abc47bd..3846ac3af 100644 --- a/pkg/relayer/proxy/server_builder.go +++ b/pkg/relayer/proxy/server_builder.go @@ -2,6 +2,8 @@ package proxy import ( "context" + "log" + "net/url" "github.com/pokt-network/poktroll/pkg/relayer" sharedtypes "github.com/pokt-network/poktroll/x/shared/types" @@ -13,7 +15,12 @@ import ( // is responsible for listening for incoming relay requests and relaying them to the supported proxied service. func (rp *relayerProxy) BuildProvidedServices(ctx context.Context) error { // Get the supplier address from the keyring - supplierAddress, err := rp.keyring.Key(rp.signingKeyName) + supplierKey, err := rp.keyring.Key(rp.signingKeyName) + if err != nil { + return err + } + + supplierAddress, err := supplierKey.GetAddress() if err != nil { return err } @@ -32,19 +39,30 @@ func (rp *relayerProxy) BuildProvidedServices(ctx context.Context) error { for _, serviceConfig := range services { service := serviceConfig.Service proxiedServicesEndpoints := rp.proxiedServicesEndpoints[service.Id] - serviceEndpoints := make([]relayer.RelayServer, len(serviceConfig.Endpoints)) + var serviceEndpoints []relayer.RelayServer for _, endpoint := range serviceConfig.Endpoints { + url, err := url.Parse(endpoint.Url) + if err != nil { + return err + } + supplierEndpointHost := url.Host + var server relayer.RelayServer + log.Printf( + "INFO: starting relay server for service %s at endpoint %s", + service.Id, endpoint.Url, + ) + // Switch to the RPC type to create the appropriate RelayServer switch endpoint.RpcType { case sharedtypes.RPCType_JSON_RPC: server = NewJSONRPCServer( service, - endpoint, + supplierEndpointHost, proxiedServicesEndpoints, - rp.servedRelaysProducer, + rp.servedRelaysPublishCh, rp, ) default: diff --git a/pkg/relayer/session/claim.go b/pkg/relayer/session/claim.go index 712e7a9e5..9401b4af7 100644 --- a/pkg/relayer/session/claim.go +++ b/pkg/relayer/session/claim.go @@ -106,6 +106,9 @@ func (rs *relayerSessionsManager) newMapClaimSessionFn( return either.Error[relayer.SessionTree](err), false } + latestBlock := rs.blockClient.LatestBlock(ctx) + log.Printf("INFO: currentBlock: %d, submitting claim", latestBlock.Height()+1) + sessionHeader := session.GetSessionHeader() if err := rs.supplierClient.CreateClaim(ctx, *sessionHeader, claimRoot); err != nil { failedCreateClaimSessionsPublishCh <- session diff --git a/pkg/relayer/session/proof.go b/pkg/relayer/session/proof.go index 4a7c415aa..00f9b3df5 100644 --- a/pkg/relayer/session/proof.go +++ b/pkg/relayer/session/proof.go @@ -30,8 +30,7 @@ func (rs *relayerSessionsManager) submitProofs( rs.mapWaitForEarliestSubmitProofHeight, ) - failedSubmitProofSessionsObs, failedSubmitProofSessionsPublishCh := - channel.NewObservable[relayer.SessionTree]() + failedSubmitProofSessionsObs, failedSubmitProofSessionsPublishCh := channel.NewObservable[relayer.SessionTree]() // Map sessionsWithOpenProofWindow to a new observable of an either type, // populated with the session or an error, which is notified after the session @@ -74,7 +73,7 @@ func (rs *relayerSessionsManager) waitForEarliestSubmitProofHeight( // + claimproofparams.GovSubmitProofWindowStartHeightOffset // we wait for submitProofWindowStartHeight to be received before proceeding since we need its hash - log.Printf("waiting and blocking for global earliest proof submission submitProofWindowStartBlock height: %d", submitProofWindowStartHeight) + log.Printf("INFO: waiting and blocking for global earliest proof submission submitProofWindowStartBlock height: %d", submitProofWindowStartHeight) submitProofWindowStartBlock := rs.waitForBlock(ctx, submitProofWindowStartHeight) earliestSubmitProofHeight := protocol.GetEarliestSubmitProofHeight(submitProofWindowStartBlock) @@ -100,7 +99,7 @@ func (rs *relayerSessionsManager) newMapProveSessionFn( return either.Error[relayer.SessionTree](err), false } - log.Printf("currentBlock: %d, submitting proof", latestBlock.Height()+1) + log.Printf("INFO: currentBlock: %d, submitting proof", latestBlock.Height()+1) // SubmitProof ensures on-chain proof inclusion so we can safely prune the tree. if err := rs.supplierClient.SubmitProof( ctx, diff --git a/pkg/relayer/session/session.go b/pkg/relayer/session/session.go index 4a444dfd0..08ea0f962 100644 --- a/pkg/relayer/session/session.go +++ b/pkg/relayer/session/session.go @@ -45,13 +45,21 @@ type relayerSessionsManager struct { } // NewRelayerSessions creates a new relayerSessions. +// +// Required dependencies: +// - client.BlockClient +// - client.SupplierClient +// +// Available options: +// - WithStoresDirectory func NewRelayerSessions( ctx context.Context, deps depinject.Config, opts ...relayer.RelayerSessionsManagerOption, ) (relayer.RelayerSessionsManager, error) { rs := &relayerSessionsManager{ - sessionsTrees: make(sessionsTreesMap), + sessionsTrees: make(sessionsTreesMap), + sessionsTreesMu: &sync.Mutex{}, } if err := depinject.Inject( @@ -79,10 +87,11 @@ func NewRelayerSessions( return rs, nil } -// Start iterates over the session trees at the end of each, respective, session. +// Start maps over the session trees at the end of each, respective, session. // The session trees are piped through a series of map operations which progress // them through the claim/proof lifecycle, broadcasting transactions to the // network as necessary. +// It IS NOT BLOCKING as map operations run in their own goroutines. func (rs *relayerSessionsManager) Start(ctx context.Context) { // Map eitherMinedRelays to a new observable of an error type which is // notified if an error was encountered while attempting to add the relay to @@ -113,9 +122,6 @@ func (rs *relayerSessionsManager) InsertRelays(relays observable.Observable[*rel // ensureSessionTree returns the SessionTree for a given session. // If no tree for the session exists, a new SessionTree is created before returning. func (rs *relayerSessionsManager) ensureSessionTree(sessionHeader *sessiontypes.SessionHeader) (relayer.SessionTree, error) { - rs.sessionsTreesMu.Lock() - defer rs.sessionsTreesMu.Unlock() - sessionsTrees, ok := rs.sessionsTrees[sessionHeader.SessionEndBlockHeight] // If there is no map for sessions at the sessionEndHeight, create one. @@ -128,8 +134,9 @@ func (rs *relayerSessionsManager) ensureSessionTree(sessionHeader *sessiontypes. sessionTree, ok := sessionsTrees[sessionHeader.SessionId] // If the sessionTree does not exist, create it. + var err error if !ok { - sessionTree, err := NewSessionTree(sessionHeader, rs.storesDirectory, rs.removeFromRelayerSessions) + sessionTree, err = NewSessionTree(sessionHeader, rs.storesDirectory, rs.removeFromRelayerSessions) if err != nil { return nil, err } @@ -146,12 +153,20 @@ func (rs *relayerSessionsManager) mapBlockToSessionsToClaim( _ context.Context, block client.Block, ) (sessionTrees []relayer.SessionTree, skip bool) { + rs.sessionsTreesMu.Lock() + defer rs.sessionsTreesMu.Unlock() + // Check if there are sessions that need to enter the claim/proof phase // as their end block height was the one before the last committed block. // Iterate over the sessionsTrees map to get the ones that end at a block height // lower than the current block height. for endBlockHeight, sessionsTreesEndingAtBlockHeight := range rs.sessionsTrees { - if endBlockHeight < block.Height() { + // TODO_BLOCKER(@red-0ne): We need this to be == instead of <= because we don't want to keep sending + // the same session while waiting the next step. This does not address the case + // where the block client misses the target block which should be handled by the + // retry mechanism. See the discussion in the following GitHub thread for next + // steps: https://github.com/pokt-network/poktroll/pull/177/files?show-viewed-files=true&file-filters%5B%5D=#r1391957041 + if endBlockHeight == block.Height() { // Iterate over the sessionsTrees that end at this block height (or // less) and add them to the list of sessionTrees to be published. for _, sessionTree := range sessionsTreesEndingAtBlockHeight { @@ -169,7 +184,7 @@ func (rs *relayerSessionsManager) removeFromRelayerSessions(sessionHeader *sessi sessionsTreesEndingAtBlockHeight, ok := rs.sessionsTrees[sessionHeader.SessionEndBlockHeight] if !ok { - log.Printf("no session tree found for sessions ending at height %d", sessionHeader.SessionEndBlockHeight) + log.Printf("DEBUG: no session tree found for sessions ending at height %d", sessionHeader.SessionEndBlockHeight) return } @@ -214,16 +229,18 @@ func (rs *relayerSessionsManager) mapAddRelayToSessionTree( _ context.Context, relay *relayer.MinedRelay, ) (_ error, skip bool) { + rs.sessionsTreesMu.Lock() + defer rs.sessionsTreesMu.Unlock() // ensure the session tree exists for this relay sessionHeader := relay.GetReq().GetMeta().GetSessionHeader() smst, err := rs.ensureSessionTree(sessionHeader) if err != nil { - log.Printf("failed to ensure session tree: %s\n", err) + log.Printf("ERROR: failed to ensure session tree: %s\n", err) return err, false } if err := smst.Update(relay.Hash, relay.Bytes, 1); err != nil { - log.Printf("failed to update smt: %s\n", err) + log.Printf("ERROR: failed to update smt: %s\n", err) return err, false } diff --git a/pkg/relayer/session/sessiontree.go b/pkg/relayer/session/sessiontree.go index fb27ff307..f9f03eaa8 100644 --- a/pkg/relayer/session/sessiontree.go +++ b/pkg/relayer/session/sessiontree.go @@ -68,8 +68,8 @@ func NewSessionTree( storePath := filepath.Join(storesDirectory, sessionHeader.SessionId) // Make sure storePath does not exist when creating a new SessionTree - if _, err := os.Stat(storePath); !os.IsNotExist(err) { - return nil, ErrSessionTreeUndefinedStoresDirectory + if _, err := os.Stat(storePath); err != nil && !os.IsNotExist(err) { + return nil, ErrSessionTreeStorePathExists.Wrapf("storePath: %q", storePath) } treeStore, err := smt.NewKVStore(storePath) @@ -86,6 +86,7 @@ func NewSessionTree( storePath: storePath, treeStore: treeStore, tree: tree, + sessionMu: &sync.Mutex{}, removeFromRelayerSessions: removeFromRelayerSessions, } diff --git a/proto/pocket/service/relay.proto b/proto/pocket/service/relay.proto index 7c9e888e4..c9f0d5d77 100644 --- a/proto/pocket/service/relay.proto +++ b/proto/pocket/service/relay.proto @@ -34,13 +34,18 @@ message RelayRequest { } } +// TODO_TECHDEBT(#189, @h5law): See discussion related to #189 on how/why JSONRPC should be refactored altogether. + // JSONRPCRequestPayload contains the payload for a JSON-RPC request. // See https://www.jsonrpc.org/specification#request_object for more details. message JSONRPCRequestPayload { - bytes id = 1; // Identifier established by the Client to create context for the request. + uint32 id = 1; // Identifier established by the Client to create context for the request. string jsonrpc = 2; // Version of JSON-RPC. Must be exactly "2.0". string method = 3; // Method being invoked on the server. - map parameters = 4; // Parameters for the method. https://www.jsonrpc.org/specification#parameter_structures + // TODO_TECHDEBT(#126): Make params a `oneof` of a list or map per the JSON-RPC specifications + // should they be a list of maps? + //map params = 4; // Parameters for the method. https://www.jsonrpc.org/specification#parameter_structures + repeated string params = 4; // Parameters for the method. https://www.jsonrpc.org/specification#parameter_structures } // RESTRequestType represents the type of REST request. @@ -82,9 +87,9 @@ message RelayResponseMetadata { // JSONRPCResponsePayload contains the response details for a JSON-RPC relay. // See www.jsonrpc.org/specification for more details. message JSONRPCResponsePayload { - bytes id = 1; // Identifier established by the Client to link the response back to the request. + uint32 id = 1; // Identifier established by the Client to link the response back to the request. string jsonrpc = 2; // Version of JSON-RPC. Must be exactly "2.0". - bytes result = 3; // Response result payload. + string result = 3; // Response result payload. JSONRPCResponseError error = 4; // Error message, if any. Can be nil. } diff --git a/testutil/testclient/testtx/context.go b/testutil/testclient/testtx/context.go index 35b3dfb71..60412baa7 100644 --- a/testutil/testclient/testtx/context.go +++ b/testutil/testclient/testtx/context.go @@ -17,6 +17,7 @@ import ( "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" + "github.com/pokt-network/poktroll/pkg/relayer" "github.com/pokt-network/poktroll/testutil/mockclient" "github.com/pokt-network/poktroll/pkg/client" @@ -264,7 +265,8 @@ func NewAnyTimesTxTxContext( require.NoError(t, err) require.NotEmpty(t, txFactory) - txCtxDeps := depinject.Supply(txFactory, clientCtx) + txClientCtx := relayer.TxClientContext(clientCtx) + txCtxDeps := depinject.Supply(txFactory, txClientCtx) txCtx, err := tx.NewTxContext(txCtxDeps) require.NoError(t, err) txCtxMock := mockclient.NewMockTxContext(ctrl) From c08a103c8386a5696833de98b0c2dc756f754f2d Mon Sep 17 00:00:00 2001 From: Dima Kniazev Date: Wed, 15 Nov 2023 17:32:45 -0800 Subject: [PATCH 16/16] [LocalNet] Run Relayer and AppGateServer (#179) * relayer+appgateserver * switch to the other chart * cleanup * Update Dockerfile.dev Co-authored-by: Daniel Olshansky --------- Co-authored-by: Daniel Olshansky --- Dockerfile.dev | 3 ++ Tiltfile | 45 ++++++++++++------- localnet/kubernetes/values-appgateserver.yaml | 2 + localnet/kubernetes/values-relayminer.yaml | 2 + 4 files changed, 37 insertions(+), 15 deletions(-) create mode 100644 localnet/kubernetes/values-appgateserver.yaml create mode 100644 localnet/kubernetes/values-relayminer.yaml diff --git a/Dockerfile.dev b/Dockerfile.dev index 627a09fef..0c4fe64df 100644 --- a/Dockerfile.dev +++ b/Dockerfile.dev @@ -17,7 +17,10 @@ WORKDIR /poktroll RUN mv /poktroll/bin/ignite /usr/bin/ && mv /poktroll/bin/poktrolld /usr/bin/ +# TODO_TECHDEBT(@okdas): Ports are not documented as they will soon be changed with a document to follow EXPOSE 8545 EXPOSE 8546 +EXPOSE 8547 +EXPOSE 8548 ENTRYPOINT ["ignite"] diff --git a/Tiltfile b/Tiltfile index 1333a406e..95e8c9a6b 100644 --- a/Tiltfile +++ b/Tiltfile @@ -2,13 +2,14 @@ load("ext://restart_process", "docker_build_with_restart") load("ext://helm_resource", "helm_resource", "helm_repo") # A list of directories where changes trigger a hot-reload of the sequencer -hot_reload_dirs = ["app", "cmd", "tools", "x"] +hot_reload_dirs = ["app", "cmd", "tools", "x", "pkg"] # Create a localnet config file from defaults, and if a default configuration doesn't exist, populate it with default values localnet_config_path = "localnet_config.yaml" localnet_config_defaults = { - "relayers": {"count": 1}, + "relayminers": {"count": 1}, "gateways": {"count": 1}, + "appgateservers": {"count": 1}, # By default, we use the `helm_repo` function below to point to the remote repository # but can update it to the locally cloned repo for testing & development "helm_chart_local_repo": {"enabled": False, "path": "../helm-charts"}, @@ -25,16 +26,12 @@ if (localnet_config_file != localnet_config) or ( # Configure helm chart reference. If using a local repo, set the path to the local repo; otherwise, use our own helm repo. helm_repo("pokt-network", "https://pokt-network.github.io/helm-charts/") -sequencer_chart = "pokt-network/poktroll-sequencer" -poktroll_chart = "pokt-network/poktroll" +chart_prefix = "pokt-network/" if localnet_config["helm_chart_local_repo"]["enabled"]: helm_chart_local_repo = localnet_config["helm_chart_local_repo"]["path"] hot_reload_dirs.append(helm_chart_local_repo) print("Using local helm chart repo " + helm_chart_local_repo) - - sequencer_chart = helm_chart_local_repo + "/charts/poktroll-sequencer" - poktroll_chart = helm_chart_local_repo + "/charts/poktroll" - + chart_prefix = helm_chart_local_repo + "/charts/" # Import files into Kubernetes ConfigMap def read_files_from_directory(directory): @@ -114,20 +111,32 @@ k8s_yaml( ["localnet/kubernetes/celestia-rollkit.yaml", "localnet/kubernetes/anvil.yaml"] ) -# Run pocket-specific nodes (sequencer, relayers, etc...) +# Run pocket-specific nodes (sequencer, relayminers, etc...) helm_resource( "sequencer", - sequencer_chart, + chart_prefix + "poktroll-sequencer", flags=["--values=./localnet/kubernetes/values-common.yaml"], image_deps=["poktrolld"], image_keys=[("image.repository", "image.tag")], ) helm_resource( - "relayers", - poktroll_chart, + "relayminers", + chart_prefix + "relayminer", flags=[ "--values=./localnet/kubernetes/values-common.yaml", - "--set=replicaCount=" + str(localnet_config["relayers"]["count"]), + "--values=./localnet/kubernetes/values-relayminer.yaml", + "--set=replicaCount=" + str(localnet_config["relayminers"]["count"]), + ], + image_deps=["poktrolld"], + image_keys=[("image.repository", "image.tag")], +) +helm_resource( + "appgateservers", + chart_prefix + "appgate-server", + flags=[ + "--values=./localnet/kubernetes/values-common.yaml", + "--values=./localnet/kubernetes/values-appgateserver.yaml", + "--set=replicaCount=" + str(localnet_config["appgateservers"]["count"]), ], image_deps=["poktrolld"], image_keys=[("image.repository", "image.tag")], @@ -146,9 +155,15 @@ k8s_resource( port_forwards=["36657", "40004"], ) k8s_resource( - "relayers", + "relayminers", + labels=["blockchains"], + resource_deps=["sequencer"], + port_forwards=["8548", "40005"], +) +k8s_resource( + "appgateservers", labels=["blockchains"], resource_deps=["sequencer"], - port_forwards=["8545", "8546", "40005"], + port_forwards=["42069", "40006"], ) k8s_resource("anvil", labels=["blockchains"], port_forwards=["8547"]) diff --git a/localnet/kubernetes/values-appgateserver.yaml b/localnet/kubernetes/values-appgateserver.yaml new file mode 100644 index 000000000..cefc1887d --- /dev/null +++ b/localnet/kubernetes/values-appgateserver.yaml @@ -0,0 +1,2 @@ +pocket: + node: sequencer-poktroll-sequencer \ No newline at end of file diff --git a/localnet/kubernetes/values-relayminer.yaml b/localnet/kubernetes/values-relayminer.yaml new file mode 100644 index 000000000..cefc1887d --- /dev/null +++ b/localnet/kubernetes/values-relayminer.yaml @@ -0,0 +1,2 @@ +pocket: + node: sequencer-poktroll-sequencer \ No newline at end of file