diff --git a/Makefile b/Makefile index aca87acab..969eab96a 100644 --- a/Makefile +++ b/Makefile @@ -330,6 +330,10 @@ test_e2e: test_e2e_env ## Run all E2E tests test_e2e_app: go test -v ./e2e/tests/... -tags=e2e,test --features-path=stake_app.feature +.PHONY: test_e2e_supplier +test_e2e_supplier: + go test -v ./e2e/tests/... -tags=e2e,test --features-path=stake_supplier.feature + .PHONY: test_e2e_gateway test_e2e_gateway: go test -v ./e2e/tests/... -tags=e2e,test --features-path=stake_gateway.feature @@ -772,6 +776,10 @@ params_query_all: check_jq ## Query the params from all available modules ignite_acc_list: ## List all the accounts in LocalNet ignite account list --keyring-dir=$(POKTROLLD_HOME) --keyring-backend test --address-prefix $(POCKET_ADDR_PREFIX) +.PHONY: ignite_poktrolld_build +ignite_poktrolld_build: check_go_version check_ignite_version ## Build the poktrolld binary using Ignite + ignite chain build --skip-proto --debug -v -o $(shell go env GOPATH)/bin + ################## ### CI Helpers ### ################## diff --git a/Tiltfile b/Tiltfile index 1a4a186a4..d4b5a18fe 100644 --- a/Tiltfile +++ b/Tiltfile @@ -179,9 +179,6 @@ WORKDIR / """, only=["./bin/poktrolld"], entrypoint=[ - "/tilt-restart-wrapper", - "--watch_file=/tmp/.restart-proc", - "--entr_flags=-r", "poktrolld", ], live_update=[sync("bin/poktrolld", "/usr/local/bin/poktrolld")], @@ -254,6 +251,9 @@ for x in range(localnet_config["relayminers"]["count"]): # Run `curl localhost:PORT` to see the current snapshot of relayminer metrics. str(9069 + actor_number) + ":9090", # Relayminer metrics port. relayminer1 - exposes 9070, relayminer2 exposes 9071, etc. + # Use with pprof like this: `go tool pprof -http=:3333 http://localhost:6070/debug/pprof/goroutine` + str(6069 + actor_number) + + ":6060", # Relayminer pprof port. relayminer1 - exposes 6070, relayminer2 exposes 6071, etc. ], ) @@ -295,6 +295,9 @@ for x in range(localnet_config["appgateservers"]["count"]): # Run `curl localhost:PORT` to see the current snapshot of appgateserver metrics. str(9079 + actor_number) + ":9090", # appgateserver metrics port. appgateserver1 - exposes 9080, appgateserver2 exposes 9081, etc. + # Use with pprof like this: `go tool pprof -http=:3333 http://localhost:6080/debug/pprof/goroutine` + str(6079 + actor_number) + + ":6090", # appgateserver metrics port. appgateserver1 - exposes 6080, appgateserver2 exposes 6081, etc. ], ) @@ -336,13 +339,22 @@ for x in range(localnet_config["gateways"]["count"]): # Run `curl localhost:PORT` to see the current snapshot of gateway metrics. str(9089 + actor_number) + ":9090", # gateway metrics port. gateway1 - exposes 9090, gateway2 exposes 9091, etc. + # Use with pprof like this: `go tool pprof -http=:3333 http://localhost:6090/debug/pprof/goroutine` + str(6089 + actor_number) + + ":6060", # gateway metrics port. gateway1 - exposes 6090, gateway2 exposes 6091, etc. ], ) k8s_resource( "validator", labels=["pocket_network"], - port_forwards=["36657", "36658", "40004"], + port_forwards=[ + "36657", + "36658", + "40004", + # Use with pprof like this: `go tool pprof -http=:3333 http://localhost:6061/debug/pprof/goroutine` + "6061:6060", + ], links=[ link( "http://localhost:3003/d/cosmoscometbft/protocol-cometbft-dashboard?orgId=1&from=now-1h&to=now", diff --git a/docusaurus/docs/develop/developer_guide/performance_troubleshooting.md b/docusaurus/docs/develop/developer_guide/performance_troubleshooting.md new file mode 100644 index 000000000..f27da1145 --- /dev/null +++ b/docusaurus/docs/develop/developer_guide/performance_troubleshooting.md @@ -0,0 +1,155 @@ +--- +sidebar_position: 4 +title: Performance troubleshooting +--- + +# Performance troubleshooting + +- [What is pprof](#what-is-pprof) +- [`pprof` and Dependencies - Installation](#pprof-and-dependencies---installation) +- [How to Use `pprof`](#how-to-use-pprof) + - [Available `pprof` Endpoints](#available-pprof-endpoints) + - [Configure Software to Expose `pprof` Endpoints](#configure-software-to-expose-pprof-endpoints) + - [Full Nodes and Validator Configuration](#full-nodes-and-validator-configuration) + - [AppGate Server and RelayMiner](#appgate-server-and-relayminer) + - [Save the Profiling Data](#save-the-profiling-data) + - [Explore the Profiling Data](#explore-the-profiling-data) + - [Explore without saving data](#explore-without-saving-data) + - [Report Issues](#report-issues) + +If you believe you've encountered an issue related to memory, goroutine leaks, +or some sort of synchronization blocking scenario, `pprof` is a good tool to +help identify & investigate the problem. + +It is open-source and maintained by Google: [google/pprof](https://github.com/google/pprof) + +## What is pprof + +`pprof` is a tool for profiling and visualizing profiling data. In modern Go versions, +it is included with the compiler (`go tool pprof`), but it can also be installed as a +standalone binary from [github.com/google/pprof](https://github.com/google/pprof). + +```bash +go install +``` + +More information can be found in the [pprof README](https://github.com/google/pprof/blob/main/doc/README.md). + +## `pprof` and Dependencies - Installation + +1. [Required] `pprof` - Go compiler or standalone pprof binary: + + 1. pprof that comes with Golang is available via `go tool pprof` + 2. A standalone binary can be installed with: + + ```bash + go install github.com/google/pprof@latest + ``` + +2. [Optional] `graphviz` - Recommended for visualization. It can be skipped if you're not planning to use visualizations. + + - [Installation guide](https://graphviz.readthedocs.io/en/stable/#installation) + - On MacOS, it can be installed with: + + ```bash + brew install graphviz + ``` + +## How to Use `pprof` + +`pprof` operates by connecting to an exposed endpoint in the software you want to profile. + +It can create snapshots for later examination, or can show information in a browser +for an already running process. + +We're going to use `go tool pprof` in the examples below, but if you installed a +standalone binary, just replace `go tool pprof` with `pprof`. + +### Available `pprof` Endpoints + +Before running `pprof`, you need to decide what kind of profiling you need to do. + +The `pprof` package provides several endpoints that are useful for profiling and +debugging. Here are the most commonly used ones: + +- `/debug/pprof/heap`: Snapshot of the memory allocation of the heap. +- `/debug/pprof/allocs`: Similar to `/debug/pprof/heap`, but includes all past memory allocations, not just the ones currently in the heap. +- `/debug/pprof/goroutine`: All current go-routines. +- `/debug/pprof/threadcreate`: Records stack traces that led to the creation of new OS threads. +- `/debug/pprof/block`: Displays stack traces that led to blocking on synchronization primitives. +- `/debug/pprof/profile`: Collects 30 seconds of CPU profiling data - configurable via the `seconds` parameter. +- `/debug/pprof/symbol`: Looks up the program counters provided in the request, returning function names. +- `/debug/pprof/trace`: Provides a trace of the program execution. + +### Configure Software to Expose `pprof` Endpoints + +:::warning Exposing pprof + +It is recommended to never expose `pprof` to the internet, as this feature allows +operational control of the software. A malicious actor could potentially disrupt +or DoS your services if these endpoints are exposed to the internet. + +::: + +#### Full Nodes and Validator Configuration + +In `config.toml`, you can configure `pprof_laddr` to expose a `pprof` endpoint +on a particular network interface and port. By default, `pprof` listens on `localhost:6060`. + +If the value has been modified, you must restart the process. + +#### AppGate Server and RelayMiner + +Both `AppGate Server` and `RelayMiner` can be configured to expose a `pprof` +endpoint using a configuration file like this: + +```yaml +pprof: + enabled: true + addr: localhost:6060 +``` + +If any of these values have been modified, you must restart the process. + +### Save the Profiling Data + +You can save profiling data to a file using by running: + +```bash +curl -o http:/// +``` + +For example, a command to save a heap profile looks like this: + +```bash +curl -o heap_profile.pprof http://localhost:6061/debug/pprof/heap +``` + +That file can be shared with other people. + +### Explore the Profiling Data + +Now, you can use the file to get insights into the profiling data, including visualizations. +A command like this will start an HTTP server and open a browser: + +```bash +go tool pprof -http=:PORT +``` + +For example, to open a `heap_profile.pprof` from the example above, you can run: + +```bash +go tool pprof -http=:3333 heap_profile.pprof +``` + +### Explore without saving data + +It is also possible to visualize `pprof` data without saving to the file. For example: + +```bash +go tool pprof -http=:3333 http://localhost:6061/debug/pprof/goroutine +``` + +### Report Issues + +If you believe you've found a performance problem, please [open a GitHub Issue](https://github.com/pokt-network/poktroll/issues). Make sure to attach the profiling data. diff --git a/docusaurus/docs/operate/configs/appgate_server_config.md b/docusaurus/docs/operate/configs/appgate_server_config.md index a4e1f7f32..f98ff8233 100644 --- a/docusaurus/docs/operate/configs/appgate_server_config.md +++ b/docusaurus/docs/operate/configs/appgate_server_config.md @@ -22,6 +22,7 @@ It is responsible for multiple things: - [`signing_key`](#signing_key) - [`listening_endpoint`](#listening_endpoint) - [`metrics`](#metrics) + - [`pprof`](#pprof) ## Usage @@ -135,3 +136,20 @@ metrics: When `enabled` is set to `true`, the exporter is active. The addr `value` of `:9090` implies the exporter is bound to port 9090 on all available network interfaces. + +### `pprof` + +_`Optional`_ + +Configures a [pprof](https://github.com/google/pprof/blob/main/doc/README.md) +endpoint for troubleshooting and debugging performance issues. + +Example configuration: + +```yaml +pprof: + enabled: true + addr: localhost:6060 +``` + +You can learn how to use that endpoint on the [Performance Troubleshooting](../../develop/developer_guide/performance_troubleshooting.md) page. diff --git a/docusaurus/docs/operate/configs/relayminer_config.md b/docusaurus/docs/operate/configs/relayminer_config.md index 5128edb82..0adc1ec92 100644 --- a/docusaurus/docs/operate/configs/relayminer_config.md +++ b/docusaurus/docs/operate/configs/relayminer_config.md @@ -18,6 +18,7 @@ and which domains to accept queries from._ - [`signing_key_name`](#signing_key_name) - [`smt_store_path`](#smt_store_path) - [`metrics`](#metrics) + - [`pprof`](#pprof) - [Pocket node connectivity](#pocket-node-connectivity) - [`query_node_rpc_url`](#query_node_rpc_url) - [`query_node_grpc_url`](#query_node_grpc_url) @@ -144,6 +145,23 @@ When `enabled` is set to `true`, the exporter is active. The addr `value` of `:9090` implies the exporter is bound to port 9090 on all available network interfaces. +### `pprof` + +_`Optional`_ + +Configures a [pprof](https://github.com/google/pprof/blob/main/doc/README.md) +endpoint for troubleshooting and debugging performance issues. + +Example configuration: + +```yaml +pprof: + enabled: true + addr: localhost:6060 +``` + +You can learn how to use that endpoint on the [Performance Troubleshooting](../../develop/developer_guide/performance_troubleshooting.md) page. + ## Pocket node connectivity ```yaml diff --git a/docusaurus/docs/operate/user_guide/_category_.json b/docusaurus/docs/operate/user_guide/_category_.json new file mode 100644 index 000000000..77b3e0c8d --- /dev/null +++ b/docusaurus/docs/operate/user_guide/_category_.json @@ -0,0 +1,8 @@ +{ + "label": "User Guide", + "position": 1, + "link": { + "type": "generated-index", + "description": "poktrolld CLI documentation" + } +} diff --git a/docusaurus/docs/operate/user_guide/check-balance.md b/docusaurus/docs/operate/user_guide/check-balance.md new file mode 100644 index 000000000..eec0bead4 --- /dev/null +++ b/docusaurus/docs/operate/user_guide/check-balance.md @@ -0,0 +1,76 @@ +--- +title: Check balance +sidebar_position: 3 +--- + +# Checking Your Wallet Balance + +:::note Usage requirements + +You will need access to your wallet address and the denomination of the token you +wish to query (i.e. `upokt`). + +The default node is set to interact with a local instance. For network-specific +queries (i.e. accessing TestNet or MainNet), you will need an RPC endpoint. + +::: + +Knowing your account's balance is crucial for effective transaction management +on Pocket Network. This guide provides the necessary steps to check your wallet's +balance using the `poktrolld` command-line interface (CLI). + +- [Pre-requisites](#pre-requisites) +- [Step 1: Preparing the Query](#step-1-preparing-the-query) +- [Step 2: Viewing the Balance](#step-2-viewing-the-balance) +- [Accessing non-local environments](#accessing-non-local-environments) + +## Pre-requisites + +1. `poktrolld` is installed on your system; see the [installation guide](./install-poktrolld) for more details +2. You have the address of the wallet you wish to check +3. You know the token denomination you wish to check; `upokt` for POKT tokens + +:::info What is a upokt? + +1 POKT = 1,000,000 upokt + +1 upokt = 1 micro POKT = 10^-6 POKT = 0.000001 POKT + +::: + +## Step 1: Preparing the Query + +You can check your wallet's balance by specifying the `address` in the following command: + +```sh +poktrolld query bank balance [address] upokt +``` + +Example: + +```sh +poktrolld query bank balance pokt1hdfggsqdy66awgvr4lclyupddz4n2dfrl9rjwv upokt +``` + +## Step 2: Viewing the Balance + +Upon executing the command, you'll receive output similar to the following, showing your balance: + +```plaintext +balance: + amount: "8999" + denom: upokt +``` + +This output indicates that the wallet address holds 8999 `upokt` tokens. + +## Accessing non-local environments + +You must provide the `--node` flag to access non LocalNet environments. + +For example, to check a balance on TestNet, you would use the following command: + +```sh +poktrolld query bank balance [address] upokt \ + --node=https://testnet-validated-validator-rpc.poktroll.com +``` diff --git a/docusaurus/docs/operate/user_guide/create-new-wallet.md b/docusaurus/docs/operate/user_guide/create-new-wallet.md new file mode 100644 index 000000000..c4af97c58 --- /dev/null +++ b/docusaurus/docs/operate/user_guide/create-new-wallet.md @@ -0,0 +1,76 @@ +--- +title: Create new wallet +sidebar_position: 1 +--- + +# Creating a New Wallet + +:::warning Security Notice + +**ALWAYS back up your key and/or mnemonic**. Store it in a secure +location accessible only to you, such as a password manager, or written down +in a safe place. Under your 🛏️ does not count! + +::: + +This guide will walk you through creating a new wallet on the Pocket Network. + +- [What is a keyring backend?](#what-is-a-keyring-backend) +- [Step 1: Install poktrolld](#step-1-install-poktrolld) +- [Step 2: Creating the Wallet](#step-2-creating-the-wallet) +- [Step 3: Backing Up Your Wallet](#step-3-backing-up-your-wallet) + +## What is a keyring backend? + +Before proceeding, it's critical to understand the implications of keyring backends +for securing your wallet. + +By default, `--keyring-backend=test` is used for demonstration +purposes in this documentation, suitable for initial testing. + +In production, operators should consider using a more secure keyring backend +such as `os`, `file`, or `kwallet`. For more information on keyring backends, +refer to the [Cosmos SDK Keyring documentation](https://docs.cosmos.network/main/user/run-node/keyring). + +## Step 1: Install poktrolld + +Ensure you have `poktrolld` installed on your system. + +Follow the [installation guide](./install-poktrolld) specific to your operating system. + +## Step 2: Creating the Wallet + +To create a new wallet, use the `poktrolld keys add` command followed by your +desired wallet name. This will generate a new address and mnemonic phrase for your wallet. + +```bash +poktrolld keys add +``` + +After running the command, you'll receive an output similar to the following: + +```plaintext +- address: pokt1beef420 + name: myNewWallet + pubkey: '{"@type":"/cosmos.crypto.secp256k1.PubKey","key":"A31T7iUyr6SwT5Wyy3BNgRqlObq3FqYpW4cTAkfE+6c2"}' + type: local + + +**Important** write this mnemonic phrase in a safe place. +It is the only way to recover your account if you ever forget your password. + +your seed mnemonic phase here +``` + +## Step 3: Backing Up Your Wallet + +After creating your wallet, **YOU MUST** back up your mnemonic phrase. This phrase +is the key to your wallet, and losing it means losing access to your funds. + +Here are some tips for securely backing up your mnemonic phrase: + +- Write it down on paper and store it in multiple secure locations. +- Consider using a password manager to store it digitally, ensuring the service is reputable and secure. +- Avoid storing it in plaintext on your computer or online services prone to hacking. + +**Congratulations!** You have successfully created a new wallet on Pocket Network. diff --git a/docusaurus/docs/operate/user_guide/install-poktrolld.md b/docusaurus/docs/operate/user_guide/install-poktrolld.md new file mode 100644 index 000000000..0b9bfa94b --- /dev/null +++ b/docusaurus/docs/operate/user_guide/install-poktrolld.md @@ -0,0 +1,33 @@ +--- +title: Installing poktrolld +sidebar_position: 0 +--- + +- [Full environment setup](#full-environment-setup) +- [`poktrolld` Installation from src](#poktrolld-installation-from-src) +- [\[TODO\] Package manager](#todo-package-manager) + +## Full environment setup + +We recommend following the instructions provided in our [Developer Quickstart Guide](../../develop/developer_guide/quickstart.md) to make your environment and tools are fully ready for development. +It will build `poktrolld` from source as a bi-product. + +## `poktrolld` Installation from src + +```bash +git clone https://github.com/pokt-network/poktroll.git +cd poktroll +make go_develop +make ignite_poktrolld_build +``` + +Verify it worked by running: + +```bash +poktrolld --help +``` + +## [TODO] Package manager + +_TODO(@okdas): Add ready-to-use binaries (available via homebrew, tea or other package +managers)._ diff --git a/docusaurus/docs/operate/user_guide/recover-with-mnemonic.md b/docusaurus/docs/operate/user_guide/recover-with-mnemonic.md new file mode 100644 index 000000000..aaf2011d0 --- /dev/null +++ b/docusaurus/docs/operate/user_guide/recover-with-mnemonic.md @@ -0,0 +1,64 @@ +--- +title: Recover using Mnemonic Seed Phrase +sidebar_position: 2 +--- + +# Recovering an Account from a Mnemonic Seed Phrase + +:::warning Security Notice + +Recovering your wallet with a mnemonic seed phrase requires +you to enter sensitive information. Ensure you are in a secure and private environment +before proceeding. + +::: + +Losing access to your wallet can be stressful, but if you've backed up your mnemonic +seed phrase, recovering your account is straightforward! + +- [Pre-requisites](#pre-requisites) +- [Step 1: Prepare to Recover Your Wallet](#step-1-prepare-to-recover-your-wallet) +- [Step 2: Recovering the Wallet](#step-2-recovering-the-wallet) +- [Step 3: Verify Wallet Recovery](#step-3-verify-wallet-recovery) + +## Pre-requisites + +- You have the mnemonic seed phrase of the wallet you wish to recover +- `poktrolld` is installed on your system; see the [installation guide](./install-poktrolld) for more details + +## Step 1: Prepare to Recover Your Wallet + +Before you start, ensure you're in a secure and private environment. +The mnemonic seed phrase is the key to your wallet, and exposing it can lead to loss of funds. + +## Step 2: Recovering the Wallet + +To recover your wallet, use the `poktrolld keys add` command with the `--recover` flag. +You will be prompted to enter the mnemonic seed phrase and optionally, a BIP39 passphrase if you've set one. + +```bash +poktrolld keys add --recover +``` + +Example: + +```bash +poktrolld keys add myRecoveredWallet --recover +``` + +After entering the mnemonic seed phrase, the command will recover your wallet, +displaying the wallet's address and public key. + +No mnemonic will be shown since the wallet is being recovered, not created anew. + +## Step 3: Verify Wallet Recovery + +After recovery, you can use the `poktrolld keys list` command to list all wallets in your keyring. + +Verify that the recovered wallet appears in the list with the correct address. + +```sh +poktrolld keys list +``` + +**Congratulations!** You have successfully recovered your Pocket Network wallet! diff --git a/docusaurus/docs/operate/user_guide/send-tokens.md b/docusaurus/docs/operate/user_guide/send-tokens.md new file mode 100644 index 000000000..f585708c4 --- /dev/null +++ b/docusaurus/docs/operate/user_guide/send-tokens.md @@ -0,0 +1,91 @@ +--- +title: Send tokens +sidebar_position: 4 +--- + +# Sending Tokens + +This guide covers the process of sending tokens from one account to another on +Pocket Network using the `poktrolld` command-line interface (CLI). + +- [Pre-requisites](#pre-requisites) +- [Step 1: Preparing a Node Endpoint](#step-1-preparing-a-node-endpoint) +- [Step 2: Sending Tokens](#step-2-sending-tokens) +- [Step 3: Confirming the Transaction](#step-3-confirming-the-transaction) +- [Step 4: Transaction Completion](#step-4-transaction-completion) +- [Additional Flags](#additional-flags) + +## Pre-requisites + +1. `poktrolld` is installed on your system; see the [installation guide](./install-poktrolld) for more details +2. You have access to your wallet with sufficient tokens for the transaction and fees +3. You have the recipient's address + +## Step 1: Preparing a Node Endpoint + +Before initiating the transaction, you must specify the node endpoint you'll be interacting with. + +For testing purposes, you can use the provided TestNet node: + +```bash +--node=https://testnet-validated-validator-rpc.poktroll.com/ +``` + +On MainNet, ensure you're connected to a trusted full node, validator, or other client on the network. + +## Step 2: Sending Tokens + +To send tokens, you'll use the `poktrolld tx bank send` command followed by the +sender's address or key name, the recipient's address, and the amount to send. + +```sh +poktrolld tx bank send [from_key_or_address] [to_address] [amount] \ + --node= [additional_flags] +``` + +- Replace `[from_key_or_address]` with your wallet name or address +- Replace `[to_address]` with the recipient's address +- Replace `[amount]` with the amount you wish to send, including the denomination (e.g., 1000upokt) +- Replace `` with the node endpoint URL + +For example, the following command sends `1000upokt` from `myWallet` to `pokt1recipientaddress420`: + +```bash +poktrolld tx bank send myWallet pokt1recipientaddress420 1000upokt \ + --node=https://testnet-validated-validator-rpc.poktroll.com/ +``` + +## Step 3: Confirming the Transaction + +After executing the send command, you'll receive a prompt to confirm the transaction details. +Review the information carefully. If everything looks correct, proceed by confirming the transaction. + +:::caution Check Recipient + +Double-check the recipient's address and the amount being sent. +Transactions on the blockchain are irreversible. + +::: + +## Step 4: Transaction Completion + +Once confirmed, the transaction will be broadcast to the network. +You'll receive a transaction hash which can be used to track the status of the transaction on a blockchain explorer. + +**Congratulations!** You've successfully sent tokens on the poktrolld blockchain. + +:::tip + +For automated scripts or applications, you can use the `--yes` flag to skip the confirmation prompt. + +::: + +## Additional Flags + +Refer to the command's help output for additional flags and options that can customize +your transaction. For example, you can set custom gas prices, use a specific account number, +or operate in offline mode for signing transactions. + +```sh +poktrolld tx bank send --help +``` diff --git a/e2e/tests/init_test.go b/e2e/tests/init_test.go index 12895bddf..cbfafd912 100644 --- a/e2e/tests/init_test.go +++ b/e2e/tests/init_test.go @@ -24,6 +24,7 @@ import ( "github.com/pokt-network/poktroll/app" "github.com/pokt-network/poktroll/testutil/testclient" + "github.com/pokt-network/poktroll/testutil/yaml" apptypes "github.com/pokt-network/poktroll/x/application/types" prooftypes "github.com/pokt-network/poktroll/x/proof/types" sessiontypes "github.com/pokt-network/poktroll/x/session/types" @@ -224,14 +225,16 @@ func (s *suite) TheUserStakesAWithUpoktFromTheAccount(actorType string, amount i func (s *suite) TheUserStakesAWithUpoktForServiceFromTheAccount(actorType string, amount int64, serviceId, accName string) { // Create a temporary config file - configPathPattern := fmt.Sprintf("%s_stake_config_*.yaml", accName) + configPathPattern := fmt.Sprintf("%s_stake_config.yaml", accName) configFile, err := os.CreateTemp("", configPathPattern) require.NoError(s, err, "error creating config file in %q", path.Join(os.TempDir(), configPathPattern)) - configContent := fmt.Sprintf("stake_amount: %d upokt\nservice_ids:\n - %s", amount, serviceId) + // Write the config content to the file + configContent := s.getConfigFileContent(amount, actorType, serviceId) _, err = configFile.Write([]byte(configContent)) require.NoError(s, err, "error writing config file %q", configFile.Name()) + // Prepare the command arguments args := []string{ "tx", actorType, @@ -253,6 +256,31 @@ func (s *suite) TheUserStakesAWithUpoktForServiceFromTheAccount(actorType string s.pocketd.result = res } +func (s *suite) getConfigFileContent(amount int64, actorType, serviceId string) string { + var configContent string + switch actorType { + case "application": + configContent = fmt.Sprintf(` + stake_amount: %dupokt + service_ids: + - %s`, + amount, serviceId) + case "supplier": + configContent = fmt.Sprintf(` + stake_amount: %dupokt + services: + - service_id: %s + endpoints: + - publicly_exposed_url: http://relayminer:8545 + rpc_type: json_rpc`, + amount, serviceId) + default: + s.Fatalf("unknown actor type %s", actorType) + } + fmt.Println(yaml.NormalizeYAMLIndentation(configContent)) + return yaml.NormalizeYAMLIndentation(configContent) +} + func (s *suite) TheUserUnstakesAFromTheAccount(actorType string, accName string) { args := []string{ "tx", diff --git a/e2e/tests/stake_supplier.feature b/e2e/tests/stake_supplier.feature new file mode 100644 index 000000000..fc7776c64 --- /dev/null +++ b/e2e/tests/stake_supplier.feature @@ -0,0 +1,28 @@ +Feature: Stake Supplier Namespace + + Scenario: User can stake a Supplier + Given the user has the pocketd binary installed + And the "supplier" for account "supplier2" is not staked + # Stake with 1 uPOKT more than the current stake used in genesis to make + # the transaction succeed. + And the account "supplier2" has a balance greater than "1000070" uPOKT + When the user stakes a "supplier" with "1000070" uPOKT for "anvil" service from the account "supplier2" + Then the user should be able to see standard output containing "txhash:" + And the user should be able to see standard output containing "code: 0" + And the pocketd binary should exit without error + # TODO_TECHDEBT(@Olshansk, @red-0ne): Replace these time-based waits with event listening waits + And the user should wait for "5" seconds + And the "supplier" for account "supplier2" is staked with "1000070" uPOKT + And the account balance of "supplier2" should be "1000070" uPOKT "less" than before + + Scenario: User can unstake a Supplier + Given the user has the pocketd binary installed + And the "supplier" for account "supplier2" is staked with "1000070" uPOKT + And an account exists for "supplier2" + When the user unstakes a "supplier" from the account "supplier2" + Then the user should be able to see standard output containing "txhash:" + And the user should be able to see standard output containing "code: 0" + And the pocketd binary should exit without error + And the user should wait for "5" seconds + And the "supplier" for account "supplier2" is not staked + And the account balance of "supplier2" should be "1000070" uPOKT "more" than before \ No newline at end of file diff --git a/localnet/grafana-dashboards/cometbft.json b/localnet/grafana-dashboards/cometbft.json index f07128aca..32ef31852 100644 --- a/localnet/grafana-dashboards/cometbft.json +++ b/localnet/grafana-dashboards/cometbft.json @@ -35,6 +35,20 @@ "type": "dashboards", "url": "" }, + { + "asDropdown": false, + "icon": "external link", + "includeVars": false, + "keepTime": false, + "tags": [ + "supplier_logs" + ], + "targetBlank": false, + "title": "Supplier Logs", + "tooltip": "", + "type": "dashboards", + "url": "" + }, { "asDropdown": false, "icon": "external link", diff --git a/localnet/grafana-dashboards/supplier_logs.json b/localnet/grafana-dashboards/supplier_logs.json new file mode 100644 index 000000000..609ea1b1e --- /dev/null +++ b/localnet/grafana-dashboards/supplier_logs.json @@ -0,0 +1,116 @@ +{ + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": { + "type": "grafana", + "uid": "-- Grafana --" + }, + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "id": 35, + "links": [], + "panels": [ + { + "datasource": { + "type": "loki", + "uid": "P8E80F9AEF21F6940" + }, + "gridPos": { + "h": 8, + "w": 18, + "x": 0, + "y": 0 + }, + "id": 1, + "options": { + "dedupStrategy": "none", + "enableLogDetails": true, + "prettifyLogMessage": false, + "showCommonLabels": false, + "showLabels": false, + "showTime": false, + "sortOrder": "Descending", + "wrapLogMessage": false + }, + "targets": [ + { + "datasource": { + "type": "loki", + "uid": "P8E80F9AEF21F6940" + }, + "editorMode": "code", + "expr": "{container=\"poktrolld-validator\"} | json | method = `StakeSupplier`", + "queryType": "range", + "refId": "Supplier Staking" + } + ], + "title": "Supplier Staking", + "type": "logs" + }, + { + "datasource": { + "type": "loki", + "uid": "P8E80F9AEF21F6940" + }, + "gridPos": { + "h": 8, + "w": 18, + "x": 0, + "y": 8 + }, + "id": 3, + "options": { + "dedupStrategy": "none", + "enableLogDetails": true, + "prettifyLogMessage": false, + "showCommonLabels": false, + "showLabels": false, + "showTime": false, + "sortOrder": "Descending", + "wrapLogMessage": false + }, + "targets": [ + { + "datasource": { + "type": "loki", + "uid": "P8E80F9AEF21F6940" + }, + "editorMode": "code", + "expr": "{container=\"poktrolld-validator\"} | json | method = `UnstakeSupplier`", + "queryType": "range", + "refId": "Supplier Unstaking" + } + ], + "title": "Supplier Unstaking", + "type": "logs" + } + ], + "schemaVersion": 39, + "tags": [ + "supplier_logs" + ], + "templating": { + "list": [] + }, + "time": { + "from": "now-6h", + "to": "now" + }, + "timepicker": {}, + "timezone": "browser", + "title": "Protocol / Supplier Logs", + "uid": "supplier_logs", + "version": 7, + "weekStart": "" +} diff --git a/localnet/kubernetes/values-appgateserver.yaml b/localnet/kubernetes/values-appgateserver.yaml index d6e949c0d..dd6400ae1 100644 --- a/localnet/kubernetes/values-appgateserver.yaml +++ b/localnet/kubernetes/values-appgateserver.yaml @@ -4,3 +4,6 @@ config: metrics: enabled: true addr: :9090 + pprof: + enabled: true + addr: localhost:6060 diff --git a/localnet/kubernetes/values-gateway.yaml b/localnet/kubernetes/values-gateway.yaml index 1cae6cb5e..3560905dc 100644 --- a/localnet/kubernetes/values-gateway.yaml +++ b/localnet/kubernetes/values-gateway.yaml @@ -5,3 +5,6 @@ config: metrics: enabled: true addr: :9090 + pprof: + enabled: true + addr: localhost:6060 diff --git a/localnet/kubernetes/values-relayminer-common.yaml b/localnet/kubernetes/values-relayminer-common.yaml index f222f7a39..a8070be01 100644 --- a/localnet/kubernetes/values-relayminer-common.yaml +++ b/localnet/kubernetes/values-relayminer-common.yaml @@ -8,3 +8,6 @@ config: query_node_grpc_url: tcp://validator-poktroll-validator:36658 tx_node_rpc_url: tcp://validator-poktroll-validator:36657 suppliers: [] + pprof: + enabled: true + addr: localhost:6060 diff --git a/localnet/poktrolld/config/appgate_server_config.yaml b/localnet/poktrolld/config/appgate_server_config.yaml index bba48c7e5..146528551 100644 --- a/localnet/poktrolld/config/appgate_server_config.yaml +++ b/localnet/poktrolld/config/appgate_server_config.yaml @@ -6,3 +6,6 @@ listening_endpoint: http://localhost:42069 metrics: enabled: true addr: :9090 +pprof: + enabled: true + addr: localhost:6060 diff --git a/localnet/poktrolld/config/appgate_server_config_example.yaml b/localnet/poktrolld/config/appgate_server_config_example.yaml index 9ae9b5b72..c14a15b83 100644 --- a/localnet/poktrolld/config/appgate_server_config_example.yaml +++ b/localnet/poktrolld/config/appgate_server_config_example.yaml @@ -15,3 +15,6 @@ metrics: enabled: true # The address that the metrics exporter will listen on. Can be just a port, or host:port addr: :9090 +pprof: + enabled: true + addr: localhost:6060 diff --git a/localnet/poktrolld/config/appgate_server_config_localnet_vscode.yaml b/localnet/poktrolld/config/appgate_server_config_localnet_vscode.yaml index c79a07377..cc3aa6bb0 100644 --- a/localnet/poktrolld/config/appgate_server_config_localnet_vscode.yaml +++ b/localnet/poktrolld/config/appgate_server_config_localnet_vscode.yaml @@ -6,3 +6,6 @@ listening_endpoint: http://0.0.0.0:42069 metrics: enabled: true addr: :9090 +pprof: + enabled: true + addr: localhost:6060 diff --git a/localnet/poktrolld/config/relayminer_config.yaml b/localnet/poktrolld/config/relayminer_config.yaml index a44d59e6a..031999030 100644 --- a/localnet/poktrolld/config/relayminer_config.yaml +++ b/localnet/poktrolld/config/relayminer_config.yaml @@ -14,3 +14,6 @@ suppliers: backend_url: http://anvil:8547/ publicly_exposed_endpoints: - relayminers +pprof: + enabled: false + addr: localhost:6060 diff --git a/localnet/poktrolld/config/relayminer_config_full_example.yaml b/localnet/poktrolld/config/relayminer_config_full_example.yaml index 4dab186d1..0769acfc2 100644 --- a/localnet/poktrolld/config/relayminer_config_full_example.yaml +++ b/localnet/poktrolld/config/relayminer_config_full_example.yaml @@ -11,6 +11,12 @@ metrics: # The address (host:port or just port) for the metrics exporter to listen on. addr: :9090 +# Pprof endpoint configuration. More information: +# https://pkg.go.dev/github.com/google/pprof#section-readme +pprof: + enabled: false + addr: localhost:6060 + pocket_node: # Pocket node URL exposing the CometBFT JSON-RPC API. # Used by the Cosmos client SDK, event subscriptions, etc. diff --git a/pkg/appgateserver/cmd/cmd.go b/pkg/appgateserver/cmd/cmd.go index f8a439716..7a3ea08c1 100644 --- a/pkg/appgateserver/cmd/cmd.go +++ b/pkg/appgateserver/cmd/cmd.go @@ -144,6 +144,13 @@ func runAppGateServer(cmd *cobra.Command, _ []string) error { } } + if appGateConfigs.Pprof.Enabled { + err = appGateServer.ServePprof(ctx, appGateConfigs.Pprof.Addr) + if err != nil { + return fmt.Errorf("failed to start pprof endpoint: %w", err) + } + } + // Start the AppGate server. if err := appGateServer.Start(ctx); err != nil && !errors.Is(err, http.ErrServerClosed) { return fmt.Errorf("failed to start app gate server: %w", err) @@ -185,7 +192,7 @@ func setupAppGateServerDependencies( supplierFuncs := []config.SupplierFn{ config.NewSupplyLoggerFromCtx(ctx), config.NewSupplyEventsQueryClientFn(queryNodeRPCURL), // leaf - config.NewSupplyBlockClientFn(), // leaf + config.NewSupplyBlockClientFn(queryNodeRPCURL), // leaf config.NewSupplyQueryClientContextFn(queryNodeGRPCURL), // leaf config.NewSupplyDelegationClientFn(), // leaf config.NewSupplyAccountQuerierFn(), // leaf diff --git a/pkg/appgateserver/config/appgate_configs_reader.go b/pkg/appgateserver/config/appgate_configs_reader.go index 5d59c9e52..d0b42cd67 100644 --- a/pkg/appgateserver/config/appgate_configs_reader.go +++ b/pkg/appgateserver/config/appgate_configs_reader.go @@ -15,6 +15,7 @@ type YAMLAppGateServerConfig struct { QueryNodeRPCUrl string `yaml:"query_node_rpc_url"` SelfSigning bool `yaml:"self_signing"` SigningKey string `yaml:"signing_key"` + Pprof YAMLAppGateServerPprofConfig `yaml:"pprof"` } // YAMLAppGateServerMetricsConfig is the structure used to unmarshal the metrics @@ -24,6 +25,13 @@ type YAMLAppGateServerMetricsConfig struct { Addr string `yaml:"addr"` } +// YAMLAppGateServerPprofConfig is the structure used to unmarshal the config +// for `pprof`. +type YAMLAppGateServerPprofConfig struct { + Enabled bool `yaml:"enabled,omitempty"` + Addr string `yaml:"addr,omitempty"` +} + // AppGateServerConfig is the structure describing the AppGateServer config type AppGateServerConfig struct { ListeningEndpoint *url.URL @@ -32,15 +40,23 @@ type AppGateServerConfig struct { QueryNodeRPCUrl *url.URL SelfSigning bool SigningKey string + Pprof *AppGateServerPprofConfig } // AppGateServerMetricsConfig is the structure resulting from parsing the metrics -// section of the AppGateServer config file +// section of the AppGateServer config file. type AppGateServerMetricsConfig struct { Enabled bool Addr string } +// AppGateServerPprofConfig is the structure resulting from parsing the pprof +// section of the AppGateServer config file. +type AppGateServerPprofConfig struct { + Enabled bool + Addr string +} + // ParseAppGateServerConfigs parses the stake config file into a AppGateConfig // NOTE: If SelfSigning is not defined in the config file, it will default to false func ParseAppGateServerConfigs(configContent []byte) (*AppGateServerConfig, error) { @@ -102,5 +118,10 @@ func ParseAppGateServerConfigs(configContent []byte) (*AppGateServerConfig, erro Addr: yamlAppGateServerConfig.Metrics.Addr, } + appGateServerConfig.Pprof = &AppGateServerPprofConfig{ + Enabled: yamlAppGateServerConfig.Pprof.Enabled, + Addr: yamlAppGateServerConfig.Pprof.Addr, + } + return appGateServerConfig, nil } diff --git a/pkg/appgateserver/server.go b/pkg/appgateserver/server.go index 761ff0b18..08d31405c 100644 --- a/pkg/appgateserver/server.go +++ b/pkg/appgateserver/server.go @@ -7,6 +7,7 @@ import ( "io" "net" "net/http" + "net/http/pprof" "net/url" "strings" "sync" @@ -272,4 +273,33 @@ func (app *appGateServer) ServeMetrics(addr string) error { return nil } +// Starts a pprof server on the given address. +func (app *appGateServer) ServePprof(ctx context.Context, addr string) error { + pprofMux := http.NewServeMux() + pprofMux.HandleFunc("/debug/pprof/", pprof.Index) + pprofMux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) + pprofMux.HandleFunc("/debug/pprof/profile", pprof.Profile) + pprofMux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) + pprofMux.HandleFunc("/debug/pprof/trace", pprof.Trace) + + server := &http.Server{ + Addr: addr, + Handler: pprofMux, + } + + // If no error, start the server in a new goroutine + go func() { + app.logger.Info().Str("endpoint", addr).Msg("starting a pprof endpoint") + server.ListenAndServe() + }() + + go func() { + <-ctx.Done() + app.logger.Info().Str("endpoint", addr).Msg("stopping a pprof endpoint") + server.Shutdown(ctx) + }() + + return nil +} + type appGateServerOption func(*appGateServer) diff --git a/pkg/client/block/block_result.go b/pkg/client/block/block_result.go new file mode 100644 index 000000000..3d6f68eb2 --- /dev/null +++ b/pkg/client/block/block_result.go @@ -0,0 +1,18 @@ +package block + +import ( + coretypes "github.com/cometbft/cometbft/rpc/core/types" +) + +// cometBlockResult is a non-alias of the comet ResultBlock type that implements +// the client.Block interface. It is used across the codebase to standardize the access +// to a block's height and hash across different block clients. +type cometBlockResult coretypes.ResultBlock + +func (cbr *cometBlockResult) Height() int64 { + return cbr.Block.Height +} + +func (cbr *cometBlockResult) Hash() []byte { + return cbr.BlockID.Hash +} diff --git a/pkg/client/block/client.go b/pkg/client/block/client.go index ba809194d..65971ee28 100644 --- a/pkg/client/block/client.go +++ b/pkg/client/block/client.go @@ -4,9 +4,12 @@ import ( "context" "cosmossdk.io/depinject" + cometclient "github.com/cosmos/cosmos-sdk/client" "github.com/pokt-network/poktroll/pkg/client" "github.com/pokt-network/poktroll/pkg/client/events" + "github.com/pokt-network/poktroll/pkg/observable" + "github.com/pokt-network/poktroll/pkg/observable/channel" ) const ( @@ -18,7 +21,7 @@ const ( // defaultBlocksReplayLimit is the number of blocks that the replay // observable returned by LastNBlocks() will be able to replay. - // TODO_TECHDEBT/TODO_FUTURE: add a `blocksReplayLimit` field to the blockClient + // TODO_TECHDEBT/TODO_FUTURE: add a `blocksReplayLimit` field to the blockReplayClient // struct that defaults to this but can be overridden via an option. defaultBlocksReplayLimit = 100 ) @@ -36,7 +39,9 @@ func NewBlockClient( ctx context.Context, deps depinject.Config, ) (client.BlockClient, error) { - client, err := events.NewEventsReplayClient[client.Block]( + ctx, close := context.WithCancel(ctx) + + eventsReplayClient, err := events.NewEventsReplayClient[client.Block]( ctx, deps, committedBlocksQuery, @@ -44,34 +49,143 @@ func NewBlockClient( defaultBlocksReplayLimit, ) if err != nil { + close() + return nil, err + } + + // latestBlockPublishCh is the channel that notifies the latestBlockReplayObs of a + // new block, whether it comes from a direct query or an event subscription query. + latestBlockReplayObs, latestBlockPublishCh := channel.NewReplayObservable[client.Block](ctx, 10) + blockReplayClient := &blockReplayClient{ + eventsReplayClient: eventsReplayClient, + latestBlockReplayObs: latestBlockReplayObs, + close: close, + } + + if err := depinject.Inject(deps, &blockReplayClient.onStartQueryClient); err != nil { + return nil, err + } + + blockReplayClient.asyncForwardBlockEvent(ctx, latestBlockPublishCh) + + if err := blockReplayClient.getInitialBlock(ctx, latestBlockPublishCh); err != nil { return nil, err } - return &blockClient{eventsReplayClient: client}, nil + + return blockReplayClient, nil } -// blockClient is a wrapper around an EventsReplayClient that implements the -// BlockClient interface for use with cosmos-sdk networks. -type blockClient struct { +// blockReplayClient is BlockClient implementation that combines a CometRPC client +// to get the its first block at start up and an EventsReplayClient that subscribes +// to new committed block events. +// It uses a ReplayObservable to retain and replay past observed blocks. +type blockReplayClient struct { + // onStartQueryClient is the RPC client that is used to query for the initial block + // upon blockReplayClient construction. The result of this query is only used if it + // returns before the eventsReplayClient receives its first event. + onStartQueryClient cometclient.CometRPC + // eventsReplayClient is the underlying EventsReplayClient that is used to // subscribe to new committed block events. It uses both the Block type // and the BlockReplayObservable type as its generic types. // These enable the EventsReplayClient to correctly map the raw event bytes // to Block objects and to correctly return a BlockReplayObservable eventsReplayClient client.EventsReplayClient[client.Block] + + // latestBlockReplayObs is a replay observable that combines blocks observed by + // the block query client & the events replay client. It is the "canonical" + // source of block notifications for the blockReplayClient. + latestBlockReplayObs observable.ReplayObservable[client.Block] + + // close is a function that cancels the context of the blockReplayClient. + close context.CancelFunc } // CommittedBlocksSequence returns a replay observable of new block events. -func (b *blockClient) CommittedBlocksSequence(ctx context.Context) client.BlockReplayObservable { - return b.eventsReplayClient.EventsSequence(ctx) +func (b *blockReplayClient) CommittedBlocksSequence(ctx context.Context) client.BlockReplayObservable { + return b.latestBlockReplayObs } -// LatestsNBlocks returns the last n blocks observed by the BockClient. -func (b *blockClient) LastNBlocks(ctx context.Context, n int) []client.Block { - return b.eventsReplayClient.LastNEvents(ctx, n) +// LastBlock returns the last blocks observed by the blockReplayClient. +func (b *blockReplayClient) LastBlock(ctx context.Context) (block client.Block) { + // ReplayObservable#Last() is guaranteed to return at least one element since + // it fetches the latest block using the onStartQueryClient if no blocks have + // been received yet from the eventsReplayClient. + return b.latestBlockReplayObs.Last(ctx, 1)[0] } // Close closes the underlying websocket connection for the EventsQueryClient // and closes all downstream connections. -func (b *blockClient) Close() { +func (b *blockReplayClient) Close() { b.eventsReplayClient.Close() + b.close() +} + +// asyncForwardBlockEvent asynchronously observes block event notifications from the +// EventsReplayClient's EventsSequence observable & publishes each to latestBlockPublishCh. +func (b *blockReplayClient) asyncForwardBlockEvent( + ctx context.Context, + latestBlockPublishCh chan<- client.Block, +) { + channel.ForEach(ctx, b.eventsReplayClient.EventsSequence(ctx), + func(ctx context.Context, block client.Block) { + latestBlockPublishCh <- block + }, + ) +} + +// getInitialBlock fetches the latest committed on-chain block at the time the +// client starts up, while concurrently waiting for the next block event, +// publishing whichever occurs first to latestBlockPublishCh. +// This is necessary to ensure that the most recent block is available to the +// blockReplayClient when it is first created. +func (b *blockReplayClient) getInitialBlock( + ctx context.Context, + latestBlockPublishCh chan<- client.Block, +) error { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + // Query the latest block asynchronously. + blockQueryResultCh := make(chan client.Block) + queryErrCh := b.queryLatestBlock(ctx, blockQueryResultCh) + + // Wait for either the latest block query response, error, or the first block + // event to arrive & use whichever occurs first or return an error. + var initialBlock client.Block + select { + case initialBlock = <-blockQueryResultCh: + case <-b.latestBlockReplayObs.Subscribe(ctx).Ch(): + return nil + case err := <-queryErrCh: + return err + } + + // At this point blockQueryResultCh was the first to receive the first block. + // Publish the initialBlock to the latestBlockPublishCh. + latestBlockPublishCh <- initialBlock + return nil +} + +// queryLatestBlock uses comet RPC block client to asynchronously query for +// the latest block. It returns an error channel which may be sent a block query error. +// It is *NOT* intended to be called in a goroutine. +func (b *blockReplayClient) queryLatestBlock( + ctx context.Context, + blockQueryResultCh chan<- client.Block, +) <-chan error { + errCh := make(chan error) + + go func() { + queryBlockResult, err := b.onStartQueryClient.Block(ctx, nil) + if err != nil { + errCh <- err + return + } + + blockResult := cometBlockResult(*queryBlockResult) + blockQueryResultCh <- &blockResult + }() + + return errCh } diff --git a/pkg/client/block/client_integration_test.go b/pkg/client/block/client_integration_test.go index 4cf9258cc..77fa321c8 100644 --- a/pkg/client/block/client_integration_test.go +++ b/pkg/client/block/client_integration_test.go @@ -18,17 +18,17 @@ import ( const blockIntegrationSubTimeout = 5 * time.Second -func TestBlockClient_LastNBlocks(t *testing.T) { +func TestBlockClient_LastBlock(t *testing.T) { t.Skip("TODO(@h5law): Figure out how to subscribe to events on the simulated localnet") ctx := context.Background() blockClient := testblock.NewLocalnetClient(ctx, t) require.NotNil(t, blockClient) - block := blockClient.LastNBlocks(ctx, 1) + block := blockClient.LastBlock(ctx) require.NotEmpty(t, block) - require.NotZero(t, block[0].Height()) - require.NotZero(t, block[0].Hash()) + require.NotZero(t, block.Height()) + require.NotZero(t, block.Hash()) } func TestBlockClient_BlocksObservable(t *testing.T) { diff --git a/pkg/client/block/client_test.go b/pkg/client/block/client_test.go index 478859085..cd860d6c6 100644 --- a/pkg/client/block/client_test.go +++ b/pkg/client/block/client_test.go @@ -7,13 +7,15 @@ import ( "cosmossdk.io/depinject" "github.com/cometbft/cometbft/libs/json" + coretypes "github.com/cometbft/cometbft/rpc/core/types" rpctypes "github.com/cometbft/cometbft/rpc/jsonrpc/types" "github.com/cometbft/cometbft/types" - comettypes "github.com/cometbft/cometbft/types" + "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" "github.com/pokt-network/poktroll/pkg/client" "github.com/pokt-network/poktroll/pkg/client/block" + "github.com/pokt-network/poktroll/testutil/mockclient" "github.com/pokt-network/poktroll/testutil/testclient/testeventsquery" ) @@ -33,7 +35,7 @@ func TestBlockClient(t *testing.T) { Data: testBlockEventDataStruct{ Value: testBlockEventValueStruct{ Block: &types.Block{ - Header: comettypes.Header{ + Header: types.Header{ Height: 1, Time: time.Now(), }, @@ -63,7 +65,25 @@ func TestBlockClient(t *testing.T) { expectedRPCResponseBz, ) - deps := depinject.Supply(eventsQueryClient) + ctrl := gomock.NewController(t) + cometClientMock := mockclient.NewMockCometRPC(ctrl) + + cometClientMock.EXPECT(). + Block(gomock.Any(), gomock.Any()). + DoAndReturn(func(_ context.Context, height *int64) (*coretypes.ResultBlock, error) { + return &coretypes.ResultBlock{ + Block: &types.Block{ + Header: types.Header{ + Height: expectedHeight, + }, + }, + BlockID: types.BlockID{ + Hash: expectedHash, + }, + }, nil + }) + + deps := depinject.Supply(eventsQueryClient, cometClientMock) // Set up block client. blockClient, err := block.NewBlockClient(ctx, deps) @@ -75,9 +95,16 @@ func TestBlockClient(t *testing.T) { fn func() client.Block }{ { - name: "LastNBlocks(1) successfully returns latest block", + name: "LastBlock successfully returns latest block", + fn: func() client.Block { + lastBlock := blockClient.LastBlock(ctx) + return lastBlock + }, + }, + { + name: "LastBlock successfully returns latest block", fn: func() client.Block { - lastBlock := blockClient.LastNBlocks(ctx, 1)[0] + lastBlock := blockClient.LastBlock(ctx) return lastBlock }, }, diff --git a/pkg/client/events/replay_client.go b/pkg/client/events/replay_client.go index 112fa44c3..abf8ec9fc 100644 --- a/pkg/client/events/replay_client.go +++ b/pkg/client/events/replay_client.go @@ -76,6 +76,11 @@ type replayClient[T any] struct { // observable; // For example when the connection is re-established after erroring. replayObsCachePublishCh chan<- observable.ReplayObservable[T] + // eventTypeObs is the replay observable for the generic type T. + eventTypeObs observable.ReplayObservable[T] + // replayClientCancelCtx is the function to cancel the context of the replay client. + // It is called when the replay client is closed. + replayClientCancelCtx func() } // NewEventsReplayClient creates a new EventsReplayClient from the given @@ -94,11 +99,14 @@ func NewEventsReplayClient[T any]( newEventFn NewEventsFn[T], replayObsBufferSize int, ) (client.EventsReplayClient[T], error) { + ctx, cancel := context.WithCancel(ctx) + // Initialize the replay client rClient := &replayClient[T]{ - queryString: queryString, - eventDecoder: newEventFn, - replayObsBufferSize: replayObsBufferSize, + queryString: queryString, + eventDecoder: newEventFn, + replayObsBufferSize: replayObsBufferSize, + replayClientCancelCtx: cancel, } // TODO_REFACTOR(@h5law): Look into making this a regular observable as // we may no longer depend on it being replayable. @@ -119,13 +127,6 @@ func NewEventsReplayClient[T any]( // Concurrently publish events to the observable emitted by replayObsCache. go rClient.goPublishEvents(ctx) - return rClient, nil -} - -// EventsSequence returns a new ReplayObservable, with the buffer size provided -// during the EventsReplayClient construction, which is notified when new -// events are received by the encapsulated EventsQueryClient. -func (rClient *replayClient[T]) EventsSequence(ctx context.Context) observable.ReplayObservable[T] { // Create a new replay observable and publish channel for event type T with // a buffer size matching that provided during the EventsReplayClient // construction. @@ -138,8 +139,17 @@ func (rClient *replayClient[T]) EventsSequence(ctx context.Context) observable.R // notifications from the latest open replay observable. go rClient.goRemapEventsSequence(ctx, replayEventTypeObsPublishCh) - // Return the event type observable. - return eventTypeObs + // Store the event type observable. + rClient.eventTypeObs = eventTypeObs + + return rClient, nil +} + +// EventsSequence returns a new ReplayObservable, with the buffer size provided +// during the EventsReplayClient construction, which is notified when new +// events are received by the encapsulated EventsQueryClient. +func (rClient *replayClient[T]) EventsSequence(ctx context.Context) observable.ReplayObservable[T] { + return rClient.eventTypeObs } // goRemapEventsSequence publishes events observed by the most recent cached @@ -172,7 +182,8 @@ func (rClient *replayClient[T]) LastNEvents(ctx context.Context, n int) []T { func (rClient *replayClient[T]) Close() { // Closing eventsClient will cascade unsubscribe and close downstream observers. rClient.eventsClient.Close() - close(rClient.replayObsCachePublishCh) + // Close all the downstream observers of the replay client. + rClient.replayClientCancelCtx() } // goPublishEvents runs the work function returned by retryPublishEventsFactory, diff --git a/pkg/client/interface.go b/pkg/client/interface.go index 14c8b6a6c..b9515ecd8 100644 --- a/pkg/client/interface.go +++ b/pkg/client/interface.go @@ -10,6 +10,7 @@ //go:generate mockgen -destination=../../testutil/mockclient/cosmos_tx_builder_mock.go -package=mockclient github.com/cosmos/cosmos-sdk/client TxBuilder //go:generate mockgen -destination=../../testutil/mockclient/cosmos_keyring_mock.go -package=mockclient github.com/cosmos/cosmos-sdk/crypto/keyring Keyring //go:generate mockgen -destination=../../testutil/mockclient/cosmos_client_mock.go -package=mockclient github.com/cosmos/cosmos-sdk/client AccountRetriever +//go:generate mockgen -destination=../../testutil/mockclient/comet_rpc_client_mock.go -package=mockclient github.com/cosmos/cosmos-sdk/client CometRPC package client @@ -145,9 +146,8 @@ type BlockClient interface { // CommittedBlocksSequence returns a BlockObservable that emits the // latest blocks that have been committed to the chain. CommittedBlocksSequence(context.Context) BlockReplayObservable - // LastNBlocks returns the latest N blocks that have been committed to - // the chain. - LastNBlocks(context.Context, int) []Block + // LastBlock returns the latest block that has been committed on-chain. + LastBlock(context.Context) Block // Close unsubscribes all observers of the committed block sequence // observable and closes the events query client. Close() diff --git a/pkg/client/tx/client.go b/pkg/client/tx/client.go index 95e1a2599..12af116ef 100644 --- a/pkg/client/tx/client.go +++ b/pkg/client/tx/client.go @@ -229,7 +229,7 @@ func (txnClient *txClient) SignAndBroadcast( } // Calculate timeout height - timeoutHeight := txnClient.blockClient.LastNBlocks(ctx, 1)[0]. + timeoutHeight := txnClient.blockClient.LastBlock(ctx). Height() + txnClient.commitTimeoutHeightOffset // TODO_TECHDEBT: this should be configurable diff --git a/pkg/deps/config/suppliers.go b/pkg/deps/config/suppliers.go index 5fb6fca77..98a873c94 100644 --- a/pkg/deps/config/suppliers.go +++ b/pkg/deps/config/suppliers.go @@ -5,10 +5,10 @@ import ( "net/url" "cosmossdk.io/depinject" - cosmosclient "github.com/cosmos/cosmos-sdk/client" + sdkclient "github.com/cosmos/cosmos-sdk/client" cosmosflags "github.com/cosmos/cosmos-sdk/client/flags" cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types" - grpc "github.com/cosmos/gogoproto/grpc" + "github.com/cosmos/gogoproto/grpc" "github.com/spf13/cobra" "github.com/pokt-network/poktroll/pkg/client/block" @@ -77,12 +77,22 @@ func NewSupplyEventsQueryClientFn(queryNodeRPCURL *url.URL) SupplierFn { } // NewSupplyBlockClientFn supplies a depinject config with a blockClient. -func NewSupplyBlockClientFn() SupplierFn { +func NewSupplyBlockClientFn(queryNodeRPCURL *url.URL) SupplierFn { return func( ctx context.Context, deps depinject.Config, _ *cobra.Command, ) (depinject.Config, error) { + + // Create a cosmos client from the queryNodeRPCURL used by the block client + // to initialize the block client by querying the latest block. + cometClient, err := sdkclient.NewClientFromNode(queryNodeRPCURL.String()) + if err != nil { + return nil, err + } + + deps = depinject.Configs(deps, depinject.Supply(cometClient)) + // Requires a query client to be supplied to the deps blockClient, err := block.NewBlockClient(ctx, deps) if err != nil { @@ -129,7 +139,7 @@ func NewSupplyQueryClientContextFn(queryNodeGRPCURL *url.URL) SupplierFn { } // Set --grpc-addr flag to the pocketQueryNodeURL for the client context - // This flag is read by cosmosclient.GetClientQueryContext. + // This flag is read by sdkclient.GetClientQueryContext. // Cosmos-SDK is expecting a GRPC address formatted as [:], // so we only need to set the Host parameter of the URL to cosmosflags.FlagGRPC value. if err := cmd.Flags().Set(cosmosflags.FlagGRPC, queryNodeGRPCURL.Host); err != nil { @@ -143,7 +153,7 @@ func NewSupplyQueryClientContextFn(queryNodeGRPCURL *url.URL) SupplierFn { // transacting purposes. // For example, txs could be dispatched to a validator while queries // could be handled by a full-node. - queryClientCtx, err := cosmosclient.GetClientQueryContext(cmd) + queryClientCtx, err := sdkclient.GetClientQueryContext(cmd) if err != nil { return nil, err } @@ -192,13 +202,13 @@ func NewSupplyTxClientContextFn( } // Set --node flag to the txNodeRPCURL for the client context - // This flag is read by cosmosclient.GetClientTxContext. + // This flag is read by sdkclient.GetClientTxContext. if err := cmd.Flags().Set(cosmosflags.FlagNode, txNodeRPCURL.String()); err != nil { return nil, err } // Set --grpc-addr flag to the queryNodeGRPCURL for the client context - // This flag is read by cosmosclient.GetClientTxContext to query accounts + // This flag is read by sdkclient.GetClientTxContext to query accounts // for transaction signing. // Cosmos-SDK is expecting a GRPC address formatted as [:], // so we only need to set the Host parameter of the URL to cosmosflags.FlagGRPC value. @@ -222,7 +232,7 @@ func NewSupplyTxClientContextFn( // transacting purposes. // For example, txs could be dispatched to a validator while queries // could be handled by a full-node - txClientCtx, err := cosmosclient.GetClientTxContext(cmd) + txClientCtx, err := sdkclient.GetClientTxContext(cmd) if err != nil { return nil, err } @@ -348,7 +358,7 @@ func NewSupplyPOKTRollSDKFn(signingKeyName string) SupplierFn { deps depinject.Config, _ *cobra.Command, ) (depinject.Config, error) { - var clientCtx cosmosclient.Context + var clientCtx sdkclient.Context // On a Cosmos environment we get the private key from the keyring // Inject the client context, get the keyring from it then get the private key diff --git a/pkg/observable/channel/replay.go b/pkg/observable/channel/replay.go index dd9906140..5b147ee16 100644 --- a/pkg/observable/channel/replay.go +++ b/pkg/observable/channel/replay.go @@ -3,28 +3,17 @@ package channel import ( "context" "sync" - "time" "github.com/pokt-network/poktroll/pkg/observable" "github.com/pokt-network/poktroll/pkg/polylog" ) -// replayPartialBufferTimeout is the duration to wait for the replay buffer to -// accumulate at least 1 value before returning the accumulated values. -// TODO_CONSIDERATION: perhaps this should be parameterized. -const replayPartialBufferTimeout = 100 * time.Millisecond - var ( _ observable.ReplayObservable[any] = (*replayObservable[any])(nil) _ observable.Observable[any] = (*replayObservable[any])(nil) ) type replayObservable[V any] struct { - // embed observerManager to encapsulate concurrent-safe read/write access to - // observers. This also allows higher-level objects to wrap this observable - // without knowing its specific type by asserting that it implements the - // observerManager interface. - observerManager[V] // replayBufferSize is the number of notifications to buffer so that they // can be replayed to new observers. replayBufferSize int @@ -34,6 +23,9 @@ type replayObservable[V any] struct { // by this observable. This buffer is replayed to new observers, on subscribing, // prior to any new notifications being propagated. replayBuffer []V + // bufferingObsvbl is an observable that emits all buffered values in one + // notification. + bufferingObsvbl observable.Observable[[]V] } // NewReplayObservable returns a new ReplayObservable with the given replay buffer @@ -44,13 +36,13 @@ func NewReplayObservable[V any]( opts ...option[V], ) (observable.ReplayObservable[V], chan<- V) { obsvbl, publishCh := NewObservable[V](opts...) - return ToReplayObservable[V](ctx, replayBufferSize, obsvbl), publishCh + + return ToReplayObservable(ctx, replayBufferSize, obsvbl), publishCh } // ToReplayObservable returns an observable which replays the last replayBufferSize // number of values published to the source observable to new observers, before // publishing new values. -// It panics if srcObservable does not implement the observerManager interface. // It should only be used with a srcObservable which contains channelObservers // (i.e. channelObservable or similar). func ToReplayObservable[V any]( @@ -58,194 +50,137 @@ func ToReplayObservable[V any]( replayBufferSize int, srcObsvbl observable.Observable[V], ) observable.ReplayObservable[V] { - // Assert that the source observable implements the observerMngr required - // to embed and wrap it. - observerMngr := srcObsvbl.(observerManager[V]) - replayObsvbl := &replayObservable[V]{ - observerManager: observerMngr, replayBufferSize: replayBufferSize, - replayBuffer: make([]V, 0, replayBufferSize), + replayBuffer: []V{}, } - srcObserver := srcObsvbl.Subscribe(ctx) - go replayObsvbl.goBufferReplayNotifications(srcObserver) - + replayObsvbl.bufferingObsvbl = replayObsvbl.initBufferingObservable(ctx, srcObsvbl) return replayObsvbl } -// Last synchronously returns the last n values from the replay buffer. It blocks -// until at least 1 notification has been accumulated, then waits replayPartialBufferTimeout -// duration before returning all notifications accumulated notifications by that time. -// If the replay buffer contains at least n notifications, this function will only -// block as long as it takes to accumulate and return them. -// If n is greater than the replay buffer size, the entire replay buffer is returned. +// Last synchronously returns the last n values from the replay buffer. +// It blocks until n values have been accumulated or its context is canceled, +// If it is canceled before n values are accumulated, it returns all the available +// items at the time of cancellation. +// The values returned are ordered from newest to oldest (i.e. LIFO) func (ro *replayObservable[V]) Last(ctx context.Context, n int) []V { logger := polylog.Ctx(ctx) ctx, cancel := context.WithCancel(ctx) defer cancel() - // Use a temporary observer to accumulate replay values. - // Subscribe will always start with the replay buffer, so we can safely - // leverage it here for synchronization (i.e. blocking until at least 1 - // notification has been accumulated). This also eliminates the need for - // locking and/or copying the replay buffer. - tempObserver := ro.Subscribe(ctx) - // If n is greater than the replay buffer size, return the entire replay buffer. if n > ro.replayBufferSize { n = ro.replayBufferSize logger.Warn(). Int("requested_replay_buffer_size", n). - Int("replay_buffer_capacity", cap(ro.replayBuffer)). + Int("replay_buffer_capacity", ro.replayBufferSize). Msg("requested replay buffer size is greater than replay buffer capacity; returning entire replay buffer") } - // accumulateReplayValues works concurrently and returns a context and cancellation - // function for signaling completion. - return accumulateReplayValues(tempObserver, n) -} - -// Subscribe returns an observer which is notified when the publishCh channel -// receives a value. -func (ro *replayObservable[V]) Subscribe(ctx context.Context) observable.Observer[V] { + // Lock any concurrent updates to the replay buffer. ro.replayBufferMu.RLock() - defer ro.replayBufferMu.RUnlock() - if ctx == nil { - ctx = context.Background() + // If the replay buffer has enough values, return the most recent n values. + if len(ro.replayBuffer) >= n { + values := ro.replayBuffer[:n] + ro.replayBufferMu.RUnlock() + return values } - // caller can cancel context or close the publish channel to unsubscribe active observers - ctx, cancel := context.WithCancel(ctx) - removeAndCancel := func(toRemove observable.Observer[V]) { - ro.observerManager.remove(toRemove) - cancel() + // If the replay buffer does not have enough values, wait for the source observable + // to emit enough values to satisfy the request. + bufferedValuesCh := ro.bufferingObsvbl.Subscribe(ctx).Ch() + // Initialize latestValues with the values in the replay buffer in case the + // source observable is closed or the context is canceled before it has a chance + // to emit an updated buffer of values. + latestValues := ro.replayBuffer[:] + // Unlock the replay buffer to allow new values to be added. + // These new values will be collected in the loop below instead of the replay buffer. + ro.replayBufferMu.RUnlock() + // bufferValuesCh emits all buffered values in one notification. + for values := range bufferedValuesCh { + // If n is greater than the number of values emitted, update latestValues with + // the most recent values emitted so far so it could be returned if the context + // is canceled or the source observable is closed. + if len(values) >= n { + latestValues = values[:n] + break + } + + // Update latestValues with the most recent values emitted so far. + latestValues = values[:] } - observer := NewObserver[V](ctx, removeAndCancel) + // Return the most recent n values or all available values if the context is canceled + // before n values are accumulated. + return latestValues +} - // Replay all buffered replayBuffer to the observer channel buffer before - // any new values have an opportunity to send on observerCh (i.e. appending - // observer to ro.observers). - // - // TODO_IMPROVE: this assumes that the observer channel buffer is large enough - // to hold all replay (buffered) notifications. - for _, notification := range ro.replayBuffer { - observer.notify(notification) - } +// Subscribe returns an observer which is notified when the publishCh channel +// receives a value. +// It replays the values stored in the replay buffer in the order of their arrival +// before emitting new values. +func (ro *replayObservable[V]) Subscribe(ctx context.Context) observable.Observer[V] { + obs, ch := NewObservable[V]() + ctx, cancel := context.WithCancel(ctx) - ro.observerManager.add(observer) + go func() { + // Replay the values stored in the buffer form the oldest to the newest. + ro.replayBufferMu.RLock() + for i := len(ro.replayBuffer) - 1; i >= 0; i-- { + ch <- ro.replayBuffer[i] + } - // asynchronously wait for the context to be done and then unsubscribe - // this observer. - go ro.observerManager.goUnsubscribeOnDone(ctx, observer) + bufferedValuesCh := ro.bufferingObsvbl.Subscribe(ctx).Ch() + ro.replayBufferMu.RUnlock() + // Since bufferingObsvbl emits all buffered values in one notification + // and the replay buffer has already been replayed, only the most recent + // value needs to be published + for values := range bufferedValuesCh { + ch <- values[0] + } + // Cancel the context to stop the observer when the source observable is closed. + cancel() + }() - return observer + return obs.Subscribe(ctx) } -// UnsubscribeAll unsubscribes and removes all observers from the observable. +// UnsubscribeAll unsubscribes all observers from the replay observable. func (ro *replayObservable[V]) UnsubscribeAll() { - ro.observerManager.removeAll() + ro.bufferingObsvbl.UnsubscribeAll() } -// goBufferReplayNotifications buffers the last n notifications from a source -// observer. It is intended to be run in a goroutine. -func (ro *replayObservable[V]) goBufferReplayNotifications(srcObserver observable.Observer[V]) { - for notification := range srcObserver.Ch() { - ro.replayBufferMu.Lock() - // Add the notification to the buffer. - if len(ro.replayBuffer) < ro.replayBufferSize { - ro.replayBuffer = append(ro.replayBuffer, notification) - } else { - // buffer full, make room for the new notification by removing the - // oldest notification. - ro.replayBuffer = append(ro.replayBuffer[1:], notification) - } - ro.replayBufferMu.Unlock() - } -} +// initBufferingObservable receives and buffers the last n notifications from +// the a source observable and emits all buffered values at once. +func (ro *replayObservable[V]) initBufferingObservable( + ctx context.Context, + srcObsvbl observable.Observable[V], +) observable.Observable[[]V] { + bufferedObsvbl, bufferedObsvblCh := NewObservable[[]V]() + ch := srcObsvbl.Subscribe(ctx).Ch() + subscriptionReady := make(chan struct{}) -// accumulateReplayValues synchronously (but concurrently) accumulates n values -// from the observer channel into the slice pointed to by accValues and then returns -// said slice. It cancels the context either when n values have been accumulated -// or when at least 1 value has been accumulated and replayPartialBufferTimeout -// has elapsed. -func accumulateReplayValues[V any](observer observable.Observer[V], n int) []V { - var ( - // accValuesMu protects accValues from concurrent access. - accValuesMu sync.Mutex - // Accumulate replay values in a new slice to avoid (read) locking replayBufferMu. - accValues = new([]V) - // canceling the context will cause the loop in the goroutine to exit. - ctx, cancel = context.WithCancel(context.Background()) - ) - - // Concurrently accumulate n values from the observer channel. go func() { - // Defer canceling the context and unlocking accValuesMu. The function - // assumes that the mutex is locked when it gets execution control back - // from the loop. - defer func() { - cancel() - accValuesMu.Unlock() - }() - for { - // Lock the mutex to read accValues here and potentially write in - // the first case branch in the select below. - accValuesMu.Lock() - - // The context was canceled since the last iteration. - if ctx.Err() != nil { - return - } - - // We've accumulated n values. - if len(*accValues) >= n { - return - } - - // Receive from the observer's channel if we can, otherwise let - // the loop run. - select { - // Receiving from the observer channel blocks if replayBuffer is empty. - case value, ok := <-observer.Ch(): - // tempObserver was closed concurrently. - if !ok { - return - } - - // Update the accumulated values pointed to by accValues. - *accValues = append(*accValues, value) - default: - // If we can't receive from the observer channel immediately, - // let the loop run. + subscriptionReady <- struct{}{} + for value := range ch { + ro.replayBufferMu.Lock() + // The newest value is always at the beginning of the replay buffer. + if len(ro.replayBuffer) < ro.replayBufferSize { + ro.replayBuffer = append([]V{value}, ro.replayBuffer...) + } else { + ro.replayBuffer = append([]V{value}, ro.replayBuffer[:ro.replayBufferSize-1]...) } - - // Unlock accValuesMu so that the select below gets a chance to check - // the length of *accValues to decide whether to cancel, and it can - // be relocked at the top of the loop as it must be locked when the - // loop exits. - accValuesMu.Unlock() - // Wait a tick before continuing the loop. - time.Sleep(time.Millisecond) + // Emit all buffered values at once. + bufferedObsvblCh <- ro.replayBuffer + ro.replayBufferMu.Unlock() } }() - // Wait for N values to be accumulated or timeout. When timing out, if we - // have at least 1 value, we can return it. Otherwise, we need to wait for - // the next value to be published (i.e. continue the loop). - for { - select { - case <-ctx.Done(): - return *accValues - case <-time.After(replayPartialBufferTimeout): - accValuesMu.Lock() - if len(*accValues) > 1 { - cancel() - return *accValues - } - accValuesMu.Unlock() - } - } + // Ensure that the source observable has been subscribed to before allowing + // the replay observable to be subscribed to. + // It is needed to ensure that no values are missed by the replay observable + <-subscriptionReady + return bufferedObsvbl } diff --git a/pkg/observable/channel/replay_test.go b/pkg/observable/channel/replay_test.go index 8a06b9161..5839ced97 100644 --- a/pkg/observable/channel/replay_test.go +++ b/pkg/observable/channel/replay_test.go @@ -2,6 +2,7 @@ package channel_test import ( "context" + "slices" "testing" "time" @@ -12,6 +13,53 @@ import ( "github.com/pokt-network/poktroll/testutil/testerrors" ) +func TestReplayObservable_Overflow(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + replayObs, replayPublishCh := channel.NewReplayObservable[int](context.Background(), 6) + + // Ensure that the replay observable can handle synchronous publishing + replayPublishCh <- 0 + replayPublishCh <- 1 + replayPublishCh <- 2 + + // Publish values asynchronously at different intervals + go func() { + time.Sleep(time.Millisecond) + replayPublishCh <- 3 + }() + + go func() { + time.Sleep(20 * time.Millisecond) + replayPublishCh <- 4 + }() + + go func() { + time.Sleep(40 * time.Millisecond) + replayPublishCh <- 5 + }() + + // Assert that calling last synchronously returns the synchronously published values. + actualValues := replayObs.Last(ctx, 3) + require.ElementsMatch(t, []int{2, 1, 0}, actualValues) + + // Assert that the items returned by Last are the expected ones according to + // when they were published. + + time.Sleep(10 * time.Millisecond) + actualValues = replayObs.Last(ctx, 3) + require.ElementsMatch(t, []int{3, 2, 1}, actualValues) + + time.Sleep(20 * time.Millisecond) + actualValues = replayObs.Last(ctx, 3) + require.ElementsMatch(t, []int{4, 3, 2}, actualValues) + + time.Sleep(20 * time.Millisecond) + actualValues = replayObs.Last(ctx, 3) + require.ElementsMatch(t, []int{5, 4, 3}, actualValues) +} + func TestReplayObservable(t *testing.T) { var ( replayBufferSize = 3 @@ -96,6 +144,10 @@ func TestReplayObservable(t *testing.T) { func TestReplayObservable_Last_Full_ReplayBuffer(t *testing.T) { values := []int{1, 2, 3, 4, 5} + expectedValues := values + // Reverse the expected values to have the most recent values first. + slices.Reverse(expectedValues) + tests := []struct { name string replayBufferSize int @@ -107,9 +159,9 @@ func TestReplayObservable_Last_Full_ReplayBuffer(t *testing.T) { name: "n < replayBufferSize", replayBufferSize: 5, lastN: 3, - // the replay buffer is not full so Last should return values - // starting from the first published value. - expectedValues: values[:3], // []int{1, 2, 3}, + // the replay buffer has enough values to return to Last, it should return + // the last n values in the replay buffer. + expectedValues: values[2:], // []int{5, 4, 3}, }, { name: "n = replayBufferSize", @@ -123,7 +175,7 @@ func TestReplayObservable_Last_Full_ReplayBuffer(t *testing.T) { lastN: 5, // the replay buffer is full so Last should return values starting // from lastN - replayBufferSize. - expectedValues: values[2:], // []int{3, 4, 5}, + expectedValues: values[2:], // []int{5, 4, 3}, }, } @@ -164,23 +216,31 @@ func TestReplayObservable_Last_Blocks_And_Times_Out(t *testing.T) { getLastValues := func() chan []int { lastValuesCh := make(chan []int, 1) go func() { - // Last should block until lastN values have been published. + // The replay observable's Last method does not timeout if there is less + // than lastN values in the replay buffer. + // Add a timeout to ensure that Last doesn't block indefinitely and return + // whatever values are available in the replay buffer. + ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond) + // Last should block until lastN values have been published or the timeout + // specified above is reached. // NOTE: this will produce a warning log which can safely be ignored: // > WARN: requested replay buffer size 3 is greater than replay buffer // > capacity 3; returning entire replay buffer lastValuesCh <- replayObsvbl.Last(ctx, lastN) + cancel() }() return lastValuesCh } - // Ensure that last blocks when the replay buffer is empty + // Ensure that Last blocks when the replay buffer is empty select { case actualValues := <-getLastValues(): t.Fatalf( - "Last should block until at lest 1 value has been published; actualValues: %v", + "Last should block until it gets %d values; actualValues: %v", + lastN, actualValues, ) - case <-time.After(10 * time.Millisecond): + case <-time.After(50 * time.Millisecond): } // Publish some values (up to splitIdx). @@ -190,16 +250,17 @@ func TestReplayObservable_Last_Blocks_And_Times_Out(t *testing.T) { } // Ensure Last works as expected when n <= len(published_values). - require.ElementsMatch(t, []int{1}, replayObsvbl.Last(ctx, 1)) - require.ElementsMatch(t, []int{1, 2}, replayObsvbl.Last(ctx, 2)) - require.ElementsMatch(t, []int{1, 2, 3}, replayObsvbl.Last(ctx, 3)) + require.ElementsMatch(t, []int{3}, replayObsvbl.Last(ctx, 1)) + require.ElementsMatch(t, []int{3, 2}, replayObsvbl.Last(ctx, 2)) + require.ElementsMatch(t, []int{3, 2, 1}, replayObsvbl.Last(ctx, 3)) // Ensure that Last blocks when n > len(published_values) and the replay // buffer is not full. select { case actualValues := <-getLastValues(): t.Fatalf( - "Last should block until replayPartialBufferTimeout has elapsed; received values: %v", + "Last should block until %d items are published; received values: %v", + lastN, actualValues, ) default: @@ -232,9 +293,10 @@ func TestReplayObservable_Last_Blocks_And_Times_Out(t *testing.T) { } // Ensure that Last still works as expected when n <= len(published_values). - require.ElementsMatch(t, []int{1}, replayObsvbl.Last(ctx, 1)) - require.ElementsMatch(t, []int{1, 2}, replayObsvbl.Last(ctx, 2)) - require.ElementsMatch(t, []int{1, 2, 3}, replayObsvbl.Last(ctx, 3)) - require.ElementsMatch(t, []int{1, 2, 3, 4}, replayObsvbl.Last(ctx, 4)) - require.ElementsMatch(t, []int{1, 2, 3, 4, 5}, replayObsvbl.Last(ctx, 5)) + // The values are ordered from most recent to least recent. + require.ElementsMatch(t, []int{5}, replayObsvbl.Last(ctx, 1)) + require.ElementsMatch(t, []int{5, 4}, replayObsvbl.Last(ctx, 2)) + require.ElementsMatch(t, []int{5, 4, 3}, replayObsvbl.Last(ctx, 3)) + require.ElementsMatch(t, []int{5, 4, 3, 2}, replayObsvbl.Last(ctx, 4)) + require.ElementsMatch(t, []int{5, 4, 3, 2, 1}, replayObsvbl.Last(ctx, 5)) } diff --git a/pkg/observable/interface.go b/pkg/observable/interface.go index d86da414f..d5992afc6 100644 --- a/pkg/observable/interface.go +++ b/pkg/observable/interface.go @@ -11,7 +11,8 @@ import "context" // to new observers, before publishing new values to observers. type ReplayObservable[V any] interface { Observable[V] - // Last synchronously returns the last n values from the replay buffer. + // Last synchronously returns the last n values from the replay buffer with + // LIFO ordering Last(ctx context.Context, n int) []V } @@ -21,6 +22,7 @@ type ReplayObservable[V any] interface { type Observable[V any] interface { // Subscribe returns an observer which is notified when the publishCh channel // receives a value. + // The order the values published by the subscription is FIFO. Subscribe(context.Context) Observer[V] // UnsubscribeAll unsubscribes and removes all observers from the observable. UnsubscribeAll() diff --git a/pkg/relayer/cmd/cmd.go b/pkg/relayer/cmd/cmd.go index 1dba8122b..87b5493a5 100644 --- a/pkg/relayer/cmd/cmd.go +++ b/pkg/relayer/cmd/cmd.go @@ -129,6 +129,13 @@ func runRelayer(cmd *cobra.Command, _ []string) error { } } + if relayMinerConfig.Pprof.Enabled { + err = relayMiner.ServePprof(ctx, relayMinerConfig.Pprof.Addr) + if err != nil { + return fmt.Errorf("failed to start pprof endpoint: %w", err) + } + } + // Start the relay miner logger.Info().Msg("Starting relay miner...") if err := relayMiner.Start(ctx); err != nil && !errors.Is(err, http.ErrServerClosed) { @@ -183,7 +190,7 @@ func setupRelayerDependencies( supplierFuncs := []config.SupplierFn{ config.NewSupplyLoggerFromCtx(ctx), config.NewSupplyEventsQueryClientFn(queryNodeRPCUrl), // leaf - config.NewSupplyBlockClientFn(), // leaf + config.NewSupplyBlockClientFn(queryNodeRPCUrl), // leaf config.NewSupplyQueryClientContextFn(queryNodeGRPCUrl), // leaf supplyMiner, // leaf config.NewSupplyTxClientContextFn(queryNodeGRPCUrl, txNodeRPCUrl), // leaf diff --git a/pkg/relayer/config/proxy_http_config_parser.go b/pkg/relayer/config/proxy_http_config_parser.go index ad704d306..1ce7695c5 100644 --- a/pkg/relayer/config/proxy_http_config_parser.go +++ b/pkg/relayer/config/proxy_http_config_parser.go @@ -26,11 +26,11 @@ func (serverConfig *RelayMinerServerConfig) parseHTTPServerConfig( return nil } -// parseHTTPSupplierConfig populates the supplier fields of the target structure -// that are relevant to "http" specific service configurations. +// parseSupplierBackendUrl populates the supplier fields of the target structure +// that are relevant to "http" and "https" backend url service configurations. // This function alters the target RelayMinerSupplierServiceConfig structure // as a side effect. -func (supplierServiceConfig *RelayMinerSupplierServiceConfig) parseHTTPSupplierConfig( +func (supplierServiceConfig *RelayMinerSupplierServiceConfig) parseSupplierBackendUrl( yamlSupplierServiceConfig YAMLRelayMinerSupplierServiceConfig, ) error { // Check if the supplier backend url is empty diff --git a/pkg/relayer/config/relayminer_configs_reader.go b/pkg/relayer/config/relayminer_configs_reader.go index e3ae3a52b..353b52d9c 100644 --- a/pkg/relayer/config/relayminer_configs_reader.go +++ b/pkg/relayer/config/relayminer_configs_reader.go @@ -1,6 +1,8 @@ package config -import yaml "gopkg.in/yaml.v2" +import ( + yaml "gopkg.in/yaml.v2" +) // ParseRelayMinerConfigs parses the relay miner config file into a RelayMinerConfig func ParseRelayMinerConfigs(configContent []byte) (*RelayMinerConfig, error) { @@ -39,6 +41,11 @@ func ParseRelayMinerConfigs(configContent []byte) (*RelayMinerConfig, error) { Addr: yamlRelayMinerConfig.Metrics.Addr, } + relayMinerConfig.Pprof = &RelayMinerPprofConfig{ + Enabled: yamlRelayMinerConfig.Pprof.Enabled, + Addr: yamlRelayMinerConfig.Pprof.Addr, + } + // Hydrate the pocket node urls if err := relayMinerConfig.HydratePocketNodeUrls(&yamlRelayMinerConfig.PocketNode); err != nil { return nil, err diff --git a/pkg/relayer/config/supplier_hydrator.go b/pkg/relayer/config/supplier_hydrator.go index 1fbb8e979..2284b24ce 100644 --- a/pkg/relayer/config/supplier_hydrator.go +++ b/pkg/relayer/config/supplier_hydrator.go @@ -68,10 +68,10 @@ func (supplierConfig *RelayMinerSupplierConfig) HydrateSupplier( // by their own functions. supplierConfig.ServiceConfig = &RelayMinerSupplierServiceConfig{} switch backendUrl.Scheme { - case "http": + case "http", "https": supplierConfig.ServerType = RelayMinerServerTypeHTTP if err := supplierConfig.ServiceConfig. - parseHTTPSupplierConfig(yamlSupplierConfig.ServiceConfig); err != nil { + parseSupplierBackendUrl(yamlSupplierConfig.ServiceConfig); err != nil { return err } default: diff --git a/pkg/relayer/config/types.go b/pkg/relayer/config/types.go index 7b273292e..6aa2e4167 100644 --- a/pkg/relayer/config/types.go +++ b/pkg/relayer/config/types.go @@ -23,6 +23,7 @@ type YAMLRelayMinerConfig struct { SmtStorePath string `yaml:"smt_store_path"` Metrics YAMLRelayMinerMetricsConfig `yaml:"metrics"` Suppliers []YAMLRelayMinerSupplierConfig `yaml:"suppliers"` + Pprof YAMLRelayMinerPprofConfig `yaml:"pprof"` } // YAMLRelayMinerPocketNodeConfig is the structure used to unmarshal the pocket @@ -66,6 +67,13 @@ type YAMLRelayMinerSupplierServiceAuthentication struct { Password string `yaml:"password,omitempty"` } +// YAMLRelayMinerPprofConfig is the structure used to unmarshal the config +// for `pprof`. +type YAMLRelayMinerPprofConfig struct { + Enabled bool `yaml:"enabled,omitempty"` + Addr string `yaml:"addr,omitempty"` +} + // RelayMinerConfig is the structure describing the RelayMiner config type RelayMinerConfig struct { PocketNode *RelayMinerPocketNodeConfig @@ -73,6 +81,7 @@ type RelayMinerConfig struct { Metrics *RelayMinerMetricsConfig SigningKeyName string SmtStorePath string + Pprof *RelayMinerPprofConfig } // RelayMinerPocketNodeConfig is the structure resulting from parsing the pocket @@ -148,8 +157,15 @@ type RelayMinerSupplierServiceConfig struct { // RelayMinerSupplierServiceAuthentication is the structure resulting from parsing // the supplier service basic auth of the RelayMiner config file when the -// supplier is of type "http" +// supplier is of type "http". type RelayMinerSupplierServiceAuthentication struct { Username string Password string } + +// RelayMinerPprofConfig is the structure resulting from parsing the pprof config +// section of a RelayMiner config. +type RelayMinerPprofConfig struct { + Enabled bool + Addr string +} diff --git a/pkg/relayer/proxy/relay_verifier.go b/pkg/relayer/proxy/relay_verifier.go index 98cdd65cd..5aeff8589 100644 --- a/pkg/relayer/proxy/relay_verifier.go +++ b/pkg/relayer/proxy/relay_verifier.go @@ -90,7 +90,7 @@ func (rp *relayerProxy) getTargetSessionBlockHeight( ctx context.Context, relayRequest *types.RelayRequest, ) (sessionBlockHeight int64, err error) { - currentBlockHeight := rp.blockClient.LastNBlocks(ctx, 1)[0].Height() + currentBlockHeight := rp.blockClient.LastBlock(ctx).Height() sessionEndblockHeight := relayRequest.Meta.SessionHeader.GetSessionEndBlockHeight() // Check if the RelayRequest's session has expired. diff --git a/pkg/relayer/proxy/synchronous.go b/pkg/relayer/proxy/synchronous.go index 75d76679b..0156f50ca 100644 --- a/pkg/relayer/proxy/synchronous.go +++ b/pkg/relayer/proxy/synchronous.go @@ -4,6 +4,7 @@ import ( "bytes" "compress/gzip" "context" + "crypto/tls" "fmt" "io" "net/http" @@ -277,8 +278,21 @@ func (sync *synchronousRPCServer) serveHTTP( relayHTTPRequest.Header.Add(key, value) } + // Configure the HTTP client to use the appropriate transport based on the + // backend URL scheme. + var client *http.Client + switch serviceConfig.BackendUrl.Scheme { + case "https": + transport := &http.Transport{ + TLSClientConfig: &tls.Config{}, + } + client = &http.Client{Transport: transport} + default: + client = http.DefaultClient + } + // Send the relay request to the native service. - httpResponse, err := http.DefaultClient.Do(relayHTTPRequest) + httpResponse, err := client.Do(relayHTTPRequest) if err != nil { return nil, err } diff --git a/pkg/relayer/relayminer.go b/pkg/relayer/relayminer.go index f4ef30ecf..018d1c14c 100644 --- a/pkg/relayer/relayminer.go +++ b/pkg/relayer/relayminer.go @@ -4,6 +4,7 @@ import ( "context" "net" "net/http" + "net/http/pprof" "cosmossdk.io/depinject" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -103,3 +104,31 @@ func (rel *relayMiner) ServeMetrics(addr string) error { return nil } + +// Starts a pprof server on the given address. +func (rel *relayMiner) ServePprof(ctx context.Context, addr string) error { + pprofMux := http.NewServeMux() + pprofMux.HandleFunc("/debug/pprof/", pprof.Index) + pprofMux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) + pprofMux.HandleFunc("/debug/pprof/profile", pprof.Profile) + pprofMux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) + pprofMux.HandleFunc("/debug/pprof/trace", pprof.Trace) + + server := &http.Server{ + Addr: addr, + Handler: pprofMux, + } + // If no error, start the server in a new goroutine + go func() { + rel.logger.Info().Str("endpoint", addr).Msg("starting a pprof endpoint") + server.ListenAndServe() + }() + + go func() { + <-ctx.Done() + rel.logger.Info().Str("endpoint", addr).Msg("stopping a pprof endpoint") + server.Shutdown(ctx) + }() + + return nil +} diff --git a/pkg/relayer/session/claim.go b/pkg/relayer/session/claim.go index 9ac33b485..7a8bd7407 100644 --- a/pkg/relayer/session/claim.go +++ b/pkg/relayer/session/claim.go @@ -120,7 +120,7 @@ func (rs *relayerSessionsManager) newMapClaimSessionFn( return either.Error[relayer.SessionTree](err), false } - latestBlock := rs.blockClient.LastNBlocks(ctx, 1)[0] + latestBlock := rs.blockClient.LastBlock(ctx) logger.Info(). Int64("current_block", latestBlock.Height()+1). Msg("submitting claim") diff --git a/pkg/relayer/session/proof.go b/pkg/relayer/session/proof.go index 228b50ebc..45d75517f 100644 --- a/pkg/relayer/session/proof.go +++ b/pkg/relayer/session/proof.go @@ -99,7 +99,7 @@ func (rs *relayerSessionsManager) newMapProveSessionFn( // TODO_BLOCKER: The block that'll be used as a source of entropy for which // branch(es) to prove should be deterministic and use on-chain governance params // rather than latest. - latestBlock := rs.blockClient.LastNBlocks(ctx, 1)[0] + latestBlock := rs.blockClient.LastBlock(ctx) // TODO_BLOCKER(@red-0ne, @Olshansk): Update the path given to `ProveClosest` // from `BlockHash` to `Foo(BlockHash, SessionId)` diff --git a/pkg/sdk/deps_builder.go b/pkg/sdk/deps_builder.go index 57c9ab745..351ea962b 100644 --- a/pkg/sdk/deps_builder.go +++ b/pkg/sdk/deps_builder.go @@ -6,12 +6,13 @@ import ( "net/url" "cosmossdk.io/depinject" + sdkclient "github.com/cosmos/cosmos-sdk/client" grpctypes "github.com/cosmos/gogoproto/grpc" grpc "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" - block "github.com/pokt-network/poktroll/pkg/client/block" + "github.com/pokt-network/poktroll/pkg/client/block" "github.com/pokt-network/poktroll/pkg/client/delegation" eventsquery "github.com/pokt-network/poktroll/pkg/client/events" "github.com/pokt-network/poktroll/pkg/client/query" @@ -28,11 +29,18 @@ func (sdk *poktrollSDK) buildDeps( ) (depinject.Config, error) { pocketNodeWebsocketURL := RPCToWebsocketURL(config.QueryNodeUrl) - // Have a new depinject config - deps := depinject.Configs() + cometClient, err := sdkclient.NewClientFromNode(config.QueryNodeUrl.String()) + if err != nil { + return nil, err + } - // Supply the logger - deps = depinject.Configs(deps, depinject.Supply(polylog.Ctx(ctx))) + // Supply the logger & cosmos RPC client. + deps := depinject.Configs( + depinject.Supply( + polylog.Ctx(ctx), + cometClient, + ), + ) // Create and supply the events query client eventsQueryClient := eventsquery.NewEventsQueryClient(pocketNodeWebsocketURL) diff --git a/pkg/sdk/session.go b/pkg/sdk/session.go index 634f7a7d1..701c9c59c 100644 --- a/pkg/sdk/session.go +++ b/pkg/sdk/session.go @@ -42,7 +42,7 @@ func (sdk *poktrollSDK) GetSessionSupplierEndpoints( sdk.serviceSessionSuppliersMu.RLock() defer sdk.serviceSessionSuppliersMu.RUnlock() - latestBlockHeight := sdk.blockClient.LastNBlocks(ctx, 1)[0].Height() + latestBlockHeight := sdk.blockClient.LastBlock(ctx).Height() // Create the latestSessions map entry for the serviceId if it doesn't exist. if _, ok := sdk.serviceSessionSuppliers[serviceId]; !ok { diff --git a/testutil/testclient/testblock/client.go b/testutil/testclient/testblock/client.go index 0724de904..bc5ddd582 100644 --- a/testutil/testclient/testblock/client.go +++ b/testutil/testclient/testblock/client.go @@ -41,8 +41,8 @@ func NewAnyTimesCommittedBlocksSequenceBlockClient( ) *mockclient.MockBlockClient { t.Helper() - // Create a mock for the block client which expects the LastNBlocks method to be called any number of times. - blockClientMock := NewAnyTimeLastNBlocksBlockClient(t, blockHash, 0) + // Create a mock for the block client which expects the LastBlock method to be called any number of times. + blockClientMock := NewAnyTimeLastBlockBlockClient(t, blockHash, 0) // Set up the mock expectation for the CommittedBlocksSequence method. When // the method is called, it returns a new replay observable that publishes @@ -66,8 +66,8 @@ func NewOneTimeCommittedBlocksSequenceBlockClient( ) *mockclient.MockBlockClient { t.Helper() - // Create a mock for the block client which expects the LastNBlocks method to be called any number of times. - blockClientMock := NewAnyTimeLastNBlocksBlockClient(t, nil, 0) + // Create a mock for the block client which expects the LastBlock method to be called any number of times. + blockClientMock := NewAnyTimeLastBlockBlockClient(t, nil, 0) // Set up the mock expectation for the CommittedBlocksSequence method. When // the method is called, it returns a new replay observable that publishes @@ -87,10 +87,10 @@ func NewOneTimeCommittedBlocksSequenceBlockClient( return blockClientMock } -// NewAnyTimeLastNBlocksBlockClient creates a mock BlockClient that expects -// calls to the LastNBlocks method any number of times. When the LastNBlocks +// NewAnyTimeLastBlockBlockClient creates a mock BlockClient that expects +// calls to the LastBlock method any number of times. When the LastBlock // method is called, it returns a mock Block with the provided hash and height. -func NewAnyTimeLastNBlocksBlockClient( +func NewAnyTimeLastBlockBlockClient( t *testing.T, blockHash []byte, blockHeight int64, @@ -100,10 +100,10 @@ func NewAnyTimeLastNBlocksBlockClient( // Create a mock block that returns the provided hash and height. blockMock := NewAnyTimesBlock(t, blockHash, blockHeight) - // Create a mock block client that expects calls to LastNBlocks method and + // Create a mock block client that expects calls to LastBlock method and // returns the mock block. blockClientMock := mockclient.NewMockBlockClient(ctrl) - blockClientMock.EXPECT().LastNBlocks(gomock.Any(), gomock.Any()).Return([]client.Block{blockMock}).AnyTimes() + blockClientMock.EXPECT().LastBlock(gomock.Any()).Return(blockMock).AnyTimes() return blockClientMock } diff --git a/testutil/testclient/testeventsquery/client.go b/testutil/testclient/testeventsquery/client.go index aec74609f..052f07446 100644 --- a/testutil/testclient/testeventsquery/client.go +++ b/testutil/testclient/testeventsquery/client.go @@ -43,9 +43,10 @@ func NewOneTimeEventsQuery( ) *mockclient.MockEventsQueryClient { t.Helper() ctrl := gomock.NewController(t) + cancellableCtx, _ := context.WithCancel(ctx) eventsQueryClient := mockclient.NewMockEventsQueryClient(ctrl) - eventsQueryClient.EXPECT().EventsBytes(gomock.Eq(ctx), gomock.Eq(query)). + eventsQueryClient.EXPECT().EventsBytes(gomock.AssignableToTypeOf(cancellableCtx), gomock.Eq(query)). DoAndReturn(func( ctx context.Context, query string, @@ -103,7 +104,7 @@ func NewAnyTimesEventsBytesEventsQueryClient( eventsQueryClient := mockclient.NewMockEventsQueryClient(ctrl) eventsQueryClient.EXPECT().Close().Times(1) eventsQueryClient.EXPECT(). - EventsBytes(gomock.AssignableToTypeOf(ctx), gomock.Eq(expectedQuery)). + EventsBytes(gomock.Any(), gomock.Eq(expectedQuery)). DoAndReturn( func(ctx context.Context, query string) (client.EventsBytesObservable, error) { bytesObsvbl, bytesPublishCh := channel.NewReplayObservable[either.Bytes](ctx, 1) diff --git a/testutil/testproxy/relayerproxy.go b/testutil/testproxy/relayerproxy.go index 0eadee2d7..152d1df56 100644 --- a/testutil/testproxy/relayerproxy.go +++ b/testutil/testproxy/relayerproxy.go @@ -104,7 +104,7 @@ func WithRelayerProxyDependenciesForBlockHeight( sessionQueryClient := testqueryclients.NewTestSessionQueryClient(test.t) supplierQueryClient := testqueryclients.NewTestSupplierQueryClient(test.t) - blockClient := testblock.NewAnyTimeLastNBlocksBlockClient(test.t, []byte{}, blockHeight) + blockClient := testblock.NewAnyTimeLastBlockBlockClient(test.t, []byte{}, blockHeight) keyring, _ := testkeyring.NewTestKeyringWithKey(test.t, keyName) redelegationObs, _ := channel.NewReplayObservable[client.Redelegation](test.ctx, 1) diff --git a/x/tokenomics/keeper/settle_pending_claims.go b/x/tokenomics/keeper/settle_pending_claims.go index ec39727bf..a8600264c 100644 --- a/x/tokenomics/keeper/settle_pending_claims.go +++ b/x/tokenomics/keeper/settle_pending_claims.go @@ -57,11 +57,11 @@ func (k Keeper) SettlePendingClaims(ctx sdk.Context) (numClaimsSettled, numClaim sessionId := claim.SessionHeader.SessionId + _, isProofFound := k.proofKeeper.GetProof(ctx, sessionId, claim.SupplierAddress) // Using the probabilistic proofs approach, determine if this expiring // claim required an on-chain proof isProofRequiredForClaim := k.isProofRequiredForClaim(ctx, &claim) if isProofRequiredForClaim { - _, isProofFound := k.proofKeeper.GetProof(ctx, sessionId, claim.SupplierAddress) // If a proof is not found, the claim will expire and never be settled. if !isProofFound { // Emit an event that a claim has expired and being removed without being settled. @@ -101,9 +101,13 @@ func (k Keeper) SettlePendingClaims(ctx sdk.Context) (numClaimsSettled, numClaim // The claim & proof are no longer necessary, so there's no need for them // to take up on-chain space. k.proofKeeper.RemoveClaim(ctx, sessionId, claim.SupplierAddress) - // NB: We are calling `RemoveProof` of whether or not the proof was required - // to delete it from the state. It is okay for it to fail here if it doesn't exist. - k.proofKeeper.RemoveProof(ctx, sessionId, claim.SupplierAddress) + // Whether or not the proof is required, the supplier may have submitted one + // so we need to delete it either way. If we don't have the if structure, + // a safe error will be printed, but it can be confusing to the operator + // or developer. + if isProofFound { + k.proofKeeper.RemoveProof(ctx, sessionId, claim.SupplierAddress) + } numClaimsSettled++ logger.Info(fmt.Sprintf("Successfully settled claim for session ID %q at block height %d", claim.SessionHeader.SessionId, blockHeight)) diff --git a/x/tokenomics/keeper/settle_session_accounting.go b/x/tokenomics/keeper/settle_session_accounting.go index c3e76a157..909517c49 100644 --- a/x/tokenomics/keeper/settle_session_accounting.go +++ b/x/tokenomics/keeper/settle_session_accounting.go @@ -147,8 +147,8 @@ func (k Keeper) SettleSessionAccounting( // Verify that the application has enough uPOKT to pay for the services it consumed if application.Stake.IsLT(settlementAmt) { - logger.Error(fmt.Sprintf( - "THIS SHOULD NOT HAPPEN. Application with address %s needs to be charged more than it has staked: %v > %v", + logger.Warn(fmt.Sprintf( + "THIS SHOULD NEVER HAPPEN. Application with address %s needs to be charged more than it has staked: %v > %v", applicationAddress, settlementAmtuPOKT, application.Stake, @@ -157,7 +157,9 @@ func (k Keeper) SettleSessionAccounting( // goes "into debt". Need to design a way to handle this when we implement // probabilistic proofs and add all the parameter logic. Do we touch the application balance? // Do we just let it go into debt? Do we penalize the application? Do we unstake it? Etc... - settlementAmtuPOKT = sdk.NewCoins(*application.Stake) + settlementAmt = sdk.NewCoin("upokt", math.Int(application.Stake.Amount)) + settlementAmtuPOKT = sdk.NewCoins(settlementAmt) + // TODO_BLOCKER: The application should be immediately unstaked at this point in time } // Burn uPOKT from the application module account which was held in escrow @@ -172,7 +174,7 @@ func (k Keeper) SettleSessionAccounting( // Update the application's on-chain stake newAppStake, err := (*application.Stake).SafeSub(settlementAmt) if err != nil { - return types.ErrTokenomicsApplicationNewStakeInvalid.Wrapf("application %q stake cannot be reduce to a negative amount %v", applicationAddress, newAppStake) + return types.ErrTokenomicsApplicationNewStakeInvalid.Wrapf("application %q stake cannot be reduced to a negative amount %v", applicationAddress, newAppStake) } application.Stake = &newAppStake k.applicationKeeper.SetApplication(ctx, application) diff --git a/x/tokenomics/keeper/settle_session_accounting_test.go b/x/tokenomics/keeper/settle_session_accounting_test.go index 567af0d7d..51f135b17 100644 --- a/x/tokenomics/keeper/settle_session_accounting_test.go +++ b/x/tokenomics/keeper/settle_session_accounting_test.go @@ -5,21 +5,63 @@ import ( "fmt" "testing" + "cosmossdk.io/math" + "github.com/cosmos/cosmos-sdk/types" "github.com/pokt-network/smt" "github.com/stretchr/testify/require" testkeeper "github.com/pokt-network/poktroll/testutil/keeper" "github.com/pokt-network/poktroll/testutil/sample" + apptypes "github.com/pokt-network/poktroll/x/application/types" prooftypes "github.com/pokt-network/poktroll/x/proof/types" sessionkeeper "github.com/pokt-network/poktroll/x/session/keeper" sessiontypes "github.com/pokt-network/poktroll/x/session/types" sharedtypes "github.com/pokt-network/poktroll/x/shared/types" - "github.com/pokt-network/poktroll/x/tokenomics/types" + tokenomicstypes "github.com/pokt-network/poktroll/x/tokenomics/types" ) // TODO_TEST(@bryanchriswhite, @Olshansk): Improve tokenomics tests (i.e. checking balances) // once in-memory network integration tests are supported. +func TestSettleSessionAccounting_HandleAppGoingIntoDebt(t *testing.T) { + keepers, ctx := testkeeper.NewTokenomicsModuleKeepers(t) + + // Add a new application + appStake := types.NewCoin("upokt", math.NewInt(1000000)) + app := apptypes.Application{ + Address: sample.AccAddress(), + Stake: &appStake, + } + keepers.SetApplication(ctx, app) + + // Add a new supplier + supplierStake := types.NewCoin("upokt", math.NewInt(1000000)) + supplier := sharedtypes.Supplier{ + Address: sample.AccAddress(), + Stake: &supplierStake, + } + + // The base claim whose root will be customized for testing purposes + claim := prooftypes.Claim{ + SupplierAddress: supplier.Address, + SessionHeader: &sessiontypes.SessionHeader{ + ApplicationAddress: app.Address, + Service: &sharedtypes.Service{ + Id: "svc1", + Name: "svcName1", + }, + SessionId: "session_id", + SessionStartBlockHeight: 1, + SessionEndBlockHeight: sessionkeeper.GetSessionEndBlockHeight(1), + }, + RootHash: smstRootWithSum(appStake.Amount.Uint64() + 1), // More than the app stake + } + + err := keepers.SettleSessionAccounting(ctx, &claim) + require.NoError(t, err) + // TODO_BLOCKER: Need to make sure the application is unstaked at this point in time. +} + func TestSettleSessionAccounting_ValidAccounting(t *testing.T) { t.Skip("TODO_BLOCKER(@Olshansk): Add E2E and integration tests so we validate the actual state changes of the bank & account keepers.") // Assert that `suppliertypes.ModuleName` account module balance is *unchanged* @@ -58,7 +100,7 @@ func TestSettleSessionAccounting_AppNotFound(t *testing.T) { err := keeper.SettleSessionAccounting(ctx, &claim) require.Error(t, err) - require.ErrorIs(t, err, types.ErrTokenomicsApplicationNotFound) + require.ErrorIs(t, err, tokenomicstypes.ErrTokenomicsApplicationNotFound) } func TestSettleSessionAccounting_InvalidRoot(t *testing.T) { @@ -169,7 +211,7 @@ func TestSettleSessionAccounting_InvalidClaim(t *testing.T) { desc: "Nil Claim", claim: nil, errExpected: true, - expectErr: types.ErrTokenomicsClaimNil, + expectErr: tokenomicstypes.ErrTokenomicsClaimNil, }, { desc: "Claim with nil session header", @@ -179,7 +221,7 @@ func TestSettleSessionAccounting_InvalidClaim(t *testing.T) { return &claim }(), errExpected: true, - expectErr: types.ErrTokenomicsSessionHeaderNil, + expectErr: tokenomicstypes.ErrTokenomicsSessionHeaderNil, }, { desc: "Claim with invalid session id", @@ -189,7 +231,7 @@ func TestSettleSessionAccounting_InvalidClaim(t *testing.T) { return &claim }(), errExpected: true, - expectErr: types.ErrTokenomicsSessionHeaderInvalid, + expectErr: tokenomicstypes.ErrTokenomicsSessionHeaderInvalid, }, { desc: "Claim with invalid application address", @@ -199,7 +241,7 @@ func TestSettleSessionAccounting_InvalidClaim(t *testing.T) { return &claim }(), errExpected: true, - expectErr: types.ErrTokenomicsSessionHeaderInvalid, + expectErr: tokenomicstypes.ErrTokenomicsSessionHeaderInvalid, }, { desc: "Claim with invalid supplier address", @@ -209,7 +251,7 @@ func TestSettleSessionAccounting_InvalidClaim(t *testing.T) { return &claim }(), errExpected: true, - expectErr: types.ErrTokenomicsSupplierAddressInvalid, + expectErr: tokenomicstypes.ErrTokenomicsSupplierAddressInvalid, }, } diff --git a/x/tokenomics/types/errors.go b/x/tokenomics/types/errors.go index cda0ff164..ac1b623f5 100644 --- a/x/tokenomics/types/errors.go +++ b/x/tokenomics/types/errors.go @@ -19,6 +19,6 @@ var ( ErrTokenomicsApplicationAddressInvalid = sdkerrors.Register(ModuleName, 1112, "the application address in the claim is not a valid bech32 address") ErrTokenomicsParamsInvalid = sdkerrors.Register(ModuleName, 1113, "provided params are invalid") ErrTokenomicsRootHashInvalid = sdkerrors.Register(ModuleName, 1114, "the root hash in the claim is invalid") - ErrTokenomicsApplicationNewStakeInvalid = sdkerrors.Register(ModuleName, 1115, "application stake cannot be reduce to a -ve amount") + ErrTokenomicsApplicationNewStakeInvalid = sdkerrors.Register(ModuleName, 1115, "application stake cannot be reduced to a -ve amount") ErrTokenomicsParamNameInvalid = sdkerrors.Register(ModuleName, 1116, "the provided param name is invalid") )