Skip to content

Commit

Permalink
[RelayMiner, Testing, Off-chain] test: relayer pkg (#193)
Browse files Browse the repository at this point in the history
* refactor: `MapFn`s receive context arg

* chore: add `ForEach` map shorthand operator

* chore: add `/pkg/observable/filter`

* chore: add `/pkg/observable/logging`

* chore: add `/pkg/relayer/protocol`

* chore: add `Miner` interface

* feat: add `Miner` implementation

* test: `Miner` implementation

* chore: fix comment

* chore: add godoc comments

* feat: Add Relayer struct

* chore: Rename to RelayMiner

* chore: Rename relay miner file

* chore: Remove unused
RelayerOption parameter

* [Test] First step for automated E2E Relay test (#167)

- Fixed helpers for localnet regenesis
- Added an application & supplier to the genesis file
- Initializing appMap & supplierMap in E2E tests
- Add support for the app's codec (for unmarshaling responses) in E2E tests
- Adding a placeholder for `e2e/tests/relay.feature`

---

Co-authored-by: harry <[email protected]>

* [Relayer] refactor: simplify `RelayerSessionsManager`  (#169)

* refactor: `MapFn`s receive context arg

* feat: add `MapExpand` observable operator

* refactor: `RelayerSessionsManager` to be more reactive

* chore: add godoc comment

* chore: review feedback improvements

* trigger CI

* chore: review feedback improvements

Co-authored-by: Daniel Olshansky <[email protected]>

* chore: review feedback improvements

* chore: update start mining comment

* fix: Update Miner interface

* fix: import cycle & goimports

* chore: review feedback improvements

* chore: cleanup TODO_THIS_COMMIT comments

* chore: improve var & func names for clarity and consistency

* refactor: move claim/proof lifecycle concerns to `relayerSessionsManager`.

* chore: review feedback improvements

* chore: review feedback improvements

* refactor: `miner#hash()` method

* chore: tidy up

* chore: simplify

* wip: relayer CLI

* chore: finish first pass

* chore: review feedback improvements

Co-authored-by: Daniel Olshansky <[email protected]>

* chore: review feedback improvements

Co-authored-by: Daniel Olshansky <[email protected]>

* chore: review feedback improvements

Co-authored-by: Daniel Olshansky <[email protected]>

* chore: review feedback improvements

* chore: review feedback improvements

* chore: tidy up cmd creation

* fix: incomplete refactor

* chore: simplify

* chore: add log lines

* wip: react to miner, refactor, construct miner, refactor

* chore: cleanup

* chore: Reflect responsibility changes of session manager

* feat: Use relay miner to start

* [WIP] Updating relay.feature to run curl command

* chore: Improve comment about startig relayer proxy

* wip: debugging

* Continued implementation but still failing

* Getting an invalid request right now but figuring it out...

* wip: debugging

* Added service and switched to AppGate

* wip: debugging

* chore: Rename falg variables

* wip: debugging

* revertme: disable tilt relayer service

* chore: use arg not flag

* chore: rename command

* Debugging checkpoint

* wip: debugging - improvments

* wip: debugging

* wip: debugging

* wip: debugging

* revert-or-fixme: add error log lines

* revert-or-fixme: add debug log lines

* fix: set relay server handle function

* revert-or-fixme: add debug log lines

* chore: rename some chan vars

* feat: fix all bugs, e2e relay works

* chore: add some todo comments

* wip: debugging

* fix: use remote helm charts again

* fix: put adequate proxied services endpoitns, prevent session republishing

* chore: Refactor JSONRPCServer and server builder

* Upate a couple small comments in the maketfile

* revert: comment relayers out of tiltfile

* chore: fix subcmd name `relayerminer` -> `relayminer`

* chore: improve logging

* chore: cleanup error messaging & logging in appgate server

* refactor: rename misnamed `jsonRPCServer` receiver var

* chore: remove appgate server debug log

* chore: unexport `relayMiner` struct

* refactor: interrupt signal handling

* chore: improve comments

* chore: improve comments

* revert: tiltfile hot-reload dirs

* refactor: re-consolidate client contexts

* fix: typo

* chore: remove todo

* chore: add todo comment

* revert: comment change

* fix: error format strings

* chore: remove comment

* fix: error format strings

* chore: add `-features-path` flag to cucumber tests

* fix: set the relayminer URL in the curl cmd

* chore: remove redundant `-X` curl arg (says curl)

* squash: fix relayminer url: reword: s/relayminer/appgateserver/

* chore: improve error messaging

* fix: curl invocation

* test: implement step definition to assert agains relay response

* chore: improve error name & messaging

* Self review

* fixup: merge upstream

* chore: review feedback improvements

* chore: update anvil service port in make targets

* chore: review feedback improvements

Co-authored-by: Daniel Olshansky <[email protected]>

* refactor: relayminer depinject helpers & godoc comments on all constructors

* refactor: separate tx and query client contexts 🙄

* fix: sessiontree store path check

* fix: sessiontree store path check

* chore: review feedback improvements

Co-authored-by: Daniel Olshansky <[email protected]>

* chore: review feedback improvements

* chore: add long command description

* fix: supplier client test

* chore: cleanup flags and dependencies for appgateserver cmd

* chore: move shared dependency setup logic to shared pkg

* chore: update comment

* Update .gitignore

* Update OpenAPI spec

* Updated comments for post 177+179 work for okdas

* Update pkg/relayer/cmd/cmd.go

* Update the names and references to queryNode/sequencerNode/fullNode etc

* Update some comments and TODOs

* Added a couple more comments

* More tiny comment updates

* chore: review feedback improvements

Co-authored-by: Daniel Olshansky <[email protected]>

* chore: add silent w/ error flag to curl

* refactor: observable types to work around gomock

* fix: `relayMiner#Stop()`

* test: relayminer

* chore: add godoc comments

* chore: add comments

* chore: review feedback improvements

Co-authored-by: Daniel Olshansky <[email protected]>

---------

Co-authored-by: Redouane Lakrache <[email protected]>
Co-authored-by: Daniel Olshansky <[email protected]>
Co-authored-by: harry <[email protected]>
  • Loading branch information
4 people authored Nov 17, 2023
1 parent dd34341 commit dd8f35a
Show file tree
Hide file tree
Showing 11 changed files with 235 additions and 19 deletions.
8 changes: 7 additions & 1 deletion e2e/tests/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,13 @@ func (p *pocketdBin) runPocketCmd(args ...string) (*commandResult, error) {
func (p *pocketdBin) runCurlPostCmd(rpcUrl string, service string, data string, args ...string) (*commandResult, error) {
dataStr := fmt.Sprintf("%s", data)
urlStr := fmt.Sprintf("%s/%s", rpcUrl, service)
base := []string{"-v", "POST", "-H", "Content-Type: application/json", "--data", dataStr, urlStr}
base := []string{
"-v", // verbose output
"-sS", // silent with error
"POST", // HTTP method
"-H", "Content-Type: application/json", // HTTP headers
"--data", dataStr, urlStr, // POST data
}
args = append(base, args...)
commandStr := "curl " + strings.Join(args, " ") // Create a string representation of the command
cmd := exec.Command("curl", args...)
Expand Down
32 changes: 25 additions & 7 deletions pkg/relayer/interface.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
//go:generate mockgen -destination=../../testutil/mockrelayer/relayer_proxy_mock.go -package=mockrelayer . RelayerProxy
//go:generate mockgen -destination=../../testutil/mockrelayer/miner_mock.go -package=mockrelayer . Miner
//go:generate mockgen -destination=../../testutil/mockrelayer/relayer_sessions_manager_mock.go -package=mockrelayer . RelayerSessionsManager

package relayer

import (
Expand All @@ -7,7 +11,7 @@ import (
"github.com/pokt-network/smt"

"github.com/pokt-network/poktroll/pkg/observable"
"github.com/pokt-network/poktroll/x/service/types"
servicetypes "github.com/pokt-network/poktroll/x/service/types"
sessiontypes "github.com/pokt-network/poktroll/x/session/types"
sharedtypes "github.com/pokt-network/poktroll/x/shared/types"
)
Expand All @@ -24,14 +28,28 @@ type TxClientContext client.Context
// to the dependency injector
type QueryClientContext client.Context

// RelaysObservable is an observable which is notified with Relay values.
//
// TODO_HACK: The purpose of this type is to work around gomock's lack of
// support for generic types. For the same reason, this type cannot be an
// alias (i.e. RelaysObservable = observable.Observable[*servicetypes.Relay]).
type RelaysObservable observable.Observable[*servicetypes.Relay]

// MinedRelaysObservable is an observable which is notified with MinedRelay values.
//
// TODO_HACK: The purpose of this type is to work around gomock's lack of
// support for generic types. For the same reason, this type cannot be an
// alias (i.e. MinedRelaysObservable = observable.Observable[*MinedRelay]).
type MinedRelaysObservable observable.Observable[*MinedRelay]

// Miner is responsible for observing servedRelayObs, hashing and checking the
// difficulty of each, finally publishing those with sufficient difficulty to
// minedRelayObs as they are applicable for relay volume.
type Miner interface {
MinedRelays(
ctx context.Context,
servedRelayObs observable.Observable[*types.Relay],
) (minedRelaysObs observable.Observable[*MinedRelay])
servedRelayObs RelaysObservable,
) (minedRelaysObs MinedRelaysObservable)
}

type MinerOption func(Miner)
Expand All @@ -51,23 +69,23 @@ type RelayerProxy interface {
// ServedRelays returns an observable that notifies the miner about the relays that have been served.
// A served relay is one whose RelayRequest's signature and session have been verified,
// and its RelayResponse has been signed and successfully sent to the client.
ServedRelays() observable.Observable[*types.Relay]
ServedRelays() RelaysObservable

// VerifyRelayRequest is a shared method used by RelayServers to check the
// relay request signature and session validity.
// TODO_TECHDEBT(@red-0ne): This method should be moved out of the RelayerProxy interface
// that should not be responsible for verifying relay requests.
VerifyRelayRequest(
ctx context.Context,
relayRequest *types.RelayRequest,
relayRequest *servicetypes.RelayRequest,
service *sharedtypes.Service,
) error

// SignRelayResponse is a shared method used by RelayServers to sign
// and append the signature to the RelayResponse.
// TODO_TECHDEBT(@red-0ne): This method should be moved out of the RelayerProxy interface
// that should not be responsible for signing relay responses.
SignRelayResponse(relayResponse *types.RelayResponse) error
SignRelayResponse(relayResponse *servicetypes.RelayResponse) error
}

type RelayerProxyOption func(RelayerProxy)
Expand Down Expand Up @@ -95,7 +113,7 @@ type RelayServer interface {
type RelayerSessionsManager interface {
// InsertRelays receives an observable of relays that should be included
// in their respective session's SMST (tree).
InsertRelays(minedRelaysObs observable.Observable[*MinedRelay])
InsertRelays(minedRelaysObs MinedRelaysObservable)

// Start iterates over the session trees at the end of each, respective, session.
// The session trees are piped through a series of map operations which progress
Expand Down
11 changes: 8 additions & 3 deletions pkg/relayer/miner/miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,17 @@ func NewMiner(
// It DOES NOT BLOCK as map operations run in their own goroutines.
func (mnr *miner) MinedRelays(
ctx context.Context,
servedRelaysObs observable.Observable[*servicetypes.Relay],
) observable.Observable[*relayer.MinedRelay] {
servedRelaysObs relayer.RelaysObservable,
) relayer.MinedRelaysObservable {
// NB: must cast back to generic observable type to use with Map.
// relayer.RelaysObervable cannot be an alias due to gomock's lack of
// support for generic types.
relaysObs := observable.Observable[*servicetypes.Relay](servedRelaysObs)

// Map servedRelaysObs to a new observable of an either type, populated with
// the minedRelay or an error. It is notified after the relay has been mined
// or an error has been encountered, respectively.
eitherMinedRelaysObs := channel.Map(ctx, servedRelaysObs, mnr.mapMineRelay)
eitherMinedRelaysObs := channel.Map(ctx, relaysObs, mnr.mapMineRelay)
logging.LogErrors(ctx, filter.EitherError(ctx, eitherMinedRelaysObs))

return filter.EitherSuccess(ctx, eitherMinedRelaysObs)
Expand Down
5 changes: 2 additions & 3 deletions pkg/relayer/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"golang.org/x/sync/errgroup"

blocktypes "github.com/pokt-network/poktroll/pkg/client"
"github.com/pokt-network/poktroll/pkg/observable"
"github.com/pokt-network/poktroll/pkg/observable/channel"
"github.com/pokt-network/poktroll/pkg/relayer"
apptypes "github.com/pokt-network/poktroll/x/application/types"
Expand Down Expand Up @@ -70,7 +69,7 @@ type relayerProxy struct {
proxiedServicesEndpoints servicesEndpointsMap

// servedRelays is an observable that notifies the miner about the relays that have been served.
servedRelays observable.Observable[*types.Relay]
servedRelays relayer.RelaysObservable

// servedRelaysPublishCh is a channel that emits the relays that have been served so that the
// servedRelays observable can fan out the notifications to its subscribers.
Expand Down Expand Up @@ -179,7 +178,7 @@ func (rp *relayerProxy) Stop(ctx context.Context) error {
// ServedRelays returns an observable that notifies the miner about the relays that have been served.
// A served relay is one whose RelayRequest's signature and session have been verified,
// and its RelayResponse has been signed and successfully sent to the client.
func (rp *relayerProxy) ServedRelays() observable.Observable[*types.Relay] {
func (rp *relayerProxy) ServedRelays() relayer.RelaysObservable {
return rp.servedRelays
}

Expand Down
6 changes: 6 additions & 0 deletions pkg/relayer/relayminer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ type relayMiner struct {

// NewRelayMiner creates a new Relayer instance with the given dependencies.
// It injects the dependencies into the Relayer instance and returns it.
//
// Required dependencies:
// - RelayerProxy
// - Miner
// - RelayerSessionsManager
func NewRelayMiner(ctx context.Context, deps depinject.Config) (*relayMiner, error) {
rel := &relayMiner{}

Expand Down Expand Up @@ -61,5 +66,6 @@ func (rel *relayMiner) Start(ctx context.Context) error {
// Stop stops the relayer proxy which in turn stops all advertised relay servers
// and unsubscribes the miner from the served relays observable.
func (rel *relayMiner) Stop(ctx context.Context) error {
rel.relayerSessionsManager.Stop()
return rel.relayerProxy.Stop(ctx)
}
58 changes: 58 additions & 0 deletions pkg/relayer/relayminer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package relayer_test

import (
"context"
"testing"
"time"

"cosmossdk.io/depinject"
"github.com/stretchr/testify/require"

"github.com/pokt-network/poktroll/pkg/observable/channel"
"github.com/pokt-network/poktroll/pkg/relayer"
"github.com/pokt-network/poktroll/testutil/testrelayer"
servicetypes "github.com/pokt-network/poktroll/x/service/types"
)

func TestRelayMiner_StartAndStop(t *testing.T) {
srObs, _ := channel.NewObservable[*servicetypes.Relay]()
servedRelaysObs := relayer.RelaysObservable(srObs)

mrObs, _ := channel.NewObservable[*relayer.MinedRelay]()
minedRelaysObs := relayer.MinedRelaysObservable(mrObs)

ctx := context.Background()
relayerProxyMock := testrelayer.NewMockOneTimeRelayerProxy(
ctx, t,
servedRelaysObs,
)

minerMock := testrelayer.NewMockOneTimeMiner(
ctx, t,
servedRelaysObs,
minedRelaysObs,
)

relayerSessionsManagerMock := testrelayer.NewMockOneTimeRelayerSessionsManager(
ctx, t,
minedRelaysObs,
)

deps := depinject.Supply(
relayerProxyMock,
minerMock,
relayerSessionsManagerMock,
)

relayminer, err := relayer.NewRelayMiner(ctx, deps)
require.NoError(t, err)
require.NotNil(t, relayminer)

err = relayminer.Start(ctx)
require.NoError(t, err)

time.Sleep(time.Millisecond)

err = relayminer.Stop(ctx)
require.NoError(t, err)
}
15 changes: 10 additions & 5 deletions pkg/relayer/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type sessionsTreesMap = map[int64]map[string]relayer.SessionTree
// relayerSessionsManager is an implementation of the RelayerSessions interface.
// TODO_TEST: Add tests to the relayerSessionsManager.
type relayerSessionsManager struct {
relayObs observable.Observable[*relayer.MinedRelay]
relayObs relayer.MinedRelaysObservable

// sessionsToClaimObs notifies about sessions that are ready to be claimed.
sessionsToClaimObs observable.Observable[relayer.SessionTree]
Expand Down Expand Up @@ -93,10 +93,15 @@ func NewRelayerSessions(
// network as necessary.
// It IS NOT BLOCKING as map operations run in their own goroutines.
func (rs *relayerSessionsManager) Start(ctx context.Context) {
// NB: must cast back to generic observable type to use with Map.
// relayer.MinedRelaysObservable cannot be an alias due to gomock's lack of
// support for generic types.
relayObs := observable.Observable[*relayer.MinedRelay](rs.relayObs)

// Map eitherMinedRelays to a new observable of an error type which is
// notified if an error was encountered while attempting to add the relay to
// the session tree.
miningErrorsObs := channel.Map(ctx, rs.relayObs, rs.mapAddRelayToSessionTree)
miningErrorsObs := channel.Map(ctx, relayObs, rs.mapAddMinedRelayToSessionTree)
logging.LogErrors(ctx, miningErrorsObs)

// Start claim/proof pipeline.
Expand All @@ -115,7 +120,7 @@ func (rs *relayerSessionsManager) Stop() {
}

// SessionsToClaim returns an observable that notifies when sessions are ready to be claimed.
func (rs *relayerSessionsManager) InsertRelays(relays observable.Observable[*relayer.MinedRelay]) {
func (rs *relayerSessionsManager) InsertRelays(relays relayer.MinedRelaysObservable) {
rs.relayObs = relays
}

Expand Down Expand Up @@ -222,10 +227,10 @@ func (rs *relayerSessionsManager) waitForBlock(ctx context.Context, height int64
return nil
}

// mapAddRelayToSessionTree is intended to be used as a MapFn. It adds the relay
// mapAddMinedRelayToSessionTree is intended to be used as a MapFn. It adds the relay
// to the session tree. If it encounters an error, it returns the error. Otherwise,
// it skips output (only outputs errors).
func (rs *relayerSessionsManager) mapAddRelayToSessionTree(
func (rs *relayerSessionsManager) mapAddMinedRelayToSessionTree(
_ context.Context,
relay *relayer.MinedRelay,
) (_ error, skip bool) {
Expand Down
11 changes: 11 additions & 0 deletions testutil/mockrelayer/mocks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package mockrelayer

// This file is in place to declare the package for dynamically generated structs.
//
// Note that this does not follow the Cosmos SDK pattern of committing Mocks to main.
// For example, they commit auto-generate code to main: https://github.com/cosmos/cosmos-sdk/blob/main/x/gov/testutil/expected_keepers_mocks.go
// Documentation on how Cosmos uses mockgen can be found here: https://docs.cosmos.network/main/build/building-modules/testing#unit-tests
//
// IMPORTANT: We have attempted to use `.gitkeep` files instead, but it causes a circular dependency issue with protobuf and mock generation
// since we are leveraging `ignite` to compile `.proto` files which runs `go mod tidy` before generating, requiring the entire dependency tree
// to be valid before mock implementations have been generated.
34 changes: 34 additions & 0 deletions testutil/testrelayer/miner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package testrelayer

import (
"context"
"testing"

"github.com/golang/mock/gomock"

"github.com/pokt-network/poktroll/pkg/relayer"
"github.com/pokt-network/poktroll/testutil/mockrelayer"
)

// NewMockOneTimeMiner creates a new mock Miner. This mock Miner will expect a
// call to MinedRelays with the given context and expectedRelayObs args. When
// that call is made, returnedMinedRelaysObs is returned.
func NewMockOneTimeMiner(
ctx context.Context,
t *testing.T,
expectedRelaysObs relayer.RelaysObservable,
returnedMinedRelaysObs relayer.MinedRelaysObservable,
) *mockrelayer.MockMiner {
t.Helper()

ctrl := gomock.NewController(t)
minerMock := mockrelayer.NewMockMiner(ctrl)
minerMock.EXPECT().
MinedRelays(
gomock.Eq(ctx),
gomock.Eq(expectedRelaysObs),
).
Return(returnedMinedRelaysObs).
Times(1)
return minerMock
}
37 changes: 37 additions & 0 deletions testutil/testrelayer/proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package testrelayer

import (
"context"
"testing"

"github.com/golang/mock/gomock"

"github.com/pokt-network/poktroll/pkg/relayer"
"github.com/pokt-network/poktroll/testutil/mockrelayer"
)

// NewMockOneTimeRelayerProxy creates a new mock RelayerProxy. This mock
// RelayerProxy will expect a call to ServedRelays with the given context, and
// when that call is made, returnedRelaysObs is returned. It also expects a call
// to Start and Stop with the given context.
func NewMockOneTimeRelayerProxy(
ctx context.Context,
t *testing.T,
returnedRelaysObs relayer.RelaysObservable,
) *mockrelayer.MockRelayerProxy {
t.Helper()

ctrl := gomock.NewController(t)
relayerProxyMock := mockrelayer.NewMockRelayerProxy(ctrl)
relayerProxyMock.EXPECT().
Start(gomock.Eq(ctx)).
Times(1)
relayerProxyMock.EXPECT().
Stop(gomock.Eq(ctx)).
Times(1)
relayerProxyMock.EXPECT().
ServedRelays().
Return(returnedRelaysObs).
Times(1)
return relayerProxyMock
}
37 changes: 37 additions & 0 deletions testutil/testrelayer/sessions_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package testrelayer

import (
"context"
"testing"

"github.com/golang/mock/gomock"

"github.com/pokt-network/poktroll/pkg/relayer"
"github.com/pokt-network/poktroll/testutil/mockrelayer"
)

// NewMockOneTimeRelayerSessionsManager creates a new mock RelayerSessionsManager.
// This mock RelayerSessionsManager will expect a call to InsertRelays with the
// given context and expectedMinedRelaysObs args. When that call is made,
// returnedMinedRelaysObs is returned. It also expects a call to Start with the
// given context, and stop.
func NewMockOneTimeRelayerSessionsManager(
ctx context.Context,
t *testing.T,
expectedMinedRelaysObs relayer.MinedRelaysObservable,
) *mockrelayer.MockRelayerSessionsManager {
t.Helper()

ctrl := gomock.NewController(t)
relayerSessionsManagerMock := mockrelayer.NewMockRelayerSessionsManager(ctrl)
relayerSessionsManagerMock.EXPECT().
InsertRelays(gomock.Eq(expectedMinedRelaysObs)).
Times(1)
relayerSessionsManagerMock.EXPECT().
Start(gomock.Eq(ctx)).
Times(1)
relayerSessionsManagerMock.EXPECT().
Stop().
Times(1)
return relayerSessionsManagerMock
}

0 comments on commit dd8f35a

Please sign in to comment.