Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Proxy] feat: implement relayer proxy full workflow #101

Merged
merged 25 commits into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
006eb19
feat: add general proxy logic and server builder
red-0ne Oct 24, 2023
63f9467
fix: make var names and comments more consistent
red-0ne Oct 25, 2023
8f899fb
feat: add relay verification and signature
red-0ne Oct 25, 2023
69698c9
fix: interface assignment
red-0ne Oct 25, 2023
c81c3f6
feat: implement relayer proxy full workflow
red-0ne Oct 25, 2023
edbd628
chore: improve code comments
red-0ne Oct 25, 2023
08e1915
chore: change native terms to proxied
red-0ne Oct 26, 2023
4fd55e9
Merge branch 'feat/only-proxy-implementation' into feat/relayer-proxy
red-0ne Oct 26, 2023
1b835aa
chore: address change requests
red-0ne Oct 26, 2023
776e78c
chore: explain -32000 error code
red-0ne Oct 26, 2023
fbba106
chore: add log message on relaying success/failure
red-0ne Oct 26, 2023
2e41656
chore: Update AccountI/any unmarshaling comment
red-0ne Oct 26, 2023
4c6f7d9
Merge branch 'main' into feat/relayer-proxy
red-0ne Oct 28, 2023
b9dd8f3
Merge branch 'main' into feat/relayer-proxy
red-0ne Oct 28, 2023
b7fd6df
Merge remote-tracking branch 'origin/main' into feat/relayer-proxy
red-0ne Nov 7, 2023
46066a1
chore: update import paths to use GitHub organization
red-0ne Nov 7, 2023
920978d
Merge remote-tracking branch 'origin/main' into feat/relayer-proxy
red-0ne Nov 7, 2023
5041f43
chore: address change requests
red-0ne Nov 7, 2023
6ce1482
chore: add TODO for test implementation
red-0ne Nov 8, 2023
cae8969
chore: Refactor relay request & response methods in
red-0ne Nov 8, 2023
75e88c7
chore: Address request changes
red-0ne Nov 8, 2023
67273dd
chore: comment about current proxy signing approach
red-0ne Nov 8, 2023
3c94180
Merge remote-tracking branch 'origin/main' into feat/relayer-proxy
red-0ne Nov 8, 2023
6e30e58
chore: Revert relay request & response signing and
red-0ne Nov 8, 2023
230b74c
Merge remote-tracking branch 'origin/main' into feat/relayer-proxy
red-0ne Nov 8, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
cosmossdk.io/math v1.0.1
github.com/cometbft/cometbft v0.37.2
github.com/cometbft/cometbft-db v0.8.0
github.com/cosmos/cosmos-proto v1.0.0-beta.2
github.com/cosmos/cosmos-sdk v0.47.3
github.com/cosmos/gogoproto v1.4.10
github.com/cosmos/ibc-go/v7 v7.1.0
Expand All @@ -26,7 +27,9 @@ require (
github.com/stretchr/testify v1.8.4
go.uber.org/multierr v1.11.0
golang.org/x/crypto v0.12.0
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df
golang.org/x/sync v0.3.0
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1
google.golang.org/grpc v1.56.1
gopkg.in/yaml.v2 v2.4.0
)
Expand Down Expand Up @@ -70,7 +73,6 @@ require (
github.com/containerd/cgroups v1.1.0 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/cosmos/btcutil v1.0.5 // indirect
github.com/cosmos/cosmos-proto v1.0.0-beta.2 // indirect
github.com/cosmos/go-bip39 v1.0.0 // indirect
github.com/cosmos/gogogateway v1.2.0 // indirect
github.com/cosmos/iavl v0.20.0 // indirect
Expand Down Expand Up @@ -254,7 +256,6 @@ require (
go.uber.org/dig v1.16.1 // indirect
go.uber.org/fx v1.19.2 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df // indirect
golang.org/x/mod v0.11.0 // indirect
golang.org/x/net v0.14.0 // indirect
golang.org/x/oauth2 v0.7.0 // indirect
Expand All @@ -266,7 +267,6 @@ require (
gonum.org/v1/gonum v0.11.0 // indirect
google.golang.org/api v0.122.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions pkg/relayer/proxy/interface.go → pkg/relayer/interface.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package proxy
package relayer

import (
"context"
Expand Down Expand Up @@ -27,7 +27,7 @@ type RelayerProxy interface {

// VerifyRelayRequest is a shared method used by RelayServers to check the
// relay request signature and session validity.
VerifyRelayRequest(relayRequest *types.RelayRequest) (isValid bool, err error)
VerifyRelayRequest(ctx context.Context, relayRequest *types.RelayRequest, serviceId *sharedtypes.ServiceId) error
red-0ne marked this conversation as resolved.
Show resolved Hide resolved

// SignRelayResponse is a shared method used by RelayServers to sign the relay response.
SignRelayResponse(relayResponse *types.RelayResponse) ([]byte, error)
Expand Down
41 changes: 41 additions & 0 deletions pkg/relayer/proxy/error_reply.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package proxy

import (
"log"
"net/http"

"github.com/pokt-network/poktroll/x/service/types"
)

// replyWithError builds a JSONRPCResponseError from the passed in error and writes it to the writer.
// TODO_TECHDEBT: This method should be aware of the request id and use it in the response by having
// the caller pass it along with the error if available.
// TODO_TECHDEBT: This method should be aware of the nature of the error to use the appropriate JSONRPC
// Code, Message and Data. Possibly by augmenting the passed in error with the adequate information.
func (j *jsonRPCServer) replyWithError(writer http.ResponseWriter, err error) {
relayResponse := &types.RelayResponse{
Payload: &types.RelayResponse_JsonRpcPayload{
JsonRpcPayload: &types.JSONRPCResponsePayload{
Id: make([]byte, 0),
Jsonrpc: "2.0",
Error: &types.JSONRPCResponseError{
// Using conventional error code indicating internal server error.
Code: -32000,
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
Message: err.Error(),
Data: nil,
},
},
},
}

relayResponseBz, err := relayResponse.Marshal()
if err != nil {
log.Printf("ERROR: failed marshaling relay response: %s", err)
return
}

if _, err = writer.Write(relayResponseBz); err != nil {
log.Printf("ERROR: failed writing relay response: %s", err)
return
}
}
7 changes: 5 additions & 2 deletions pkg/relayer/proxy/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package proxy
import sdkerrors "cosmossdk.io/errors"

var (
codespace = "relayer/proxy"
ErrUnsupportedRPCType = sdkerrors.Register(codespace, 1, "unsupported rpc type")
codespace = "relayer/proxy"
ErrUnsupportedRPCType = sdkerrors.Register(codespace, 1, "unsupported rpc type")
ErrInvalidRelayRequestSignature = sdkerrors.Register(codespace, 2, "invalid relay request signature")
ErrInvalidSession = sdkerrors.Register(codespace, 3, "invalid session")
ErrInvalidSupplier = sdkerrors.Register(codespace, 4, "invalid supplier")
)
129 changes: 112 additions & 17 deletions pkg/relayer/proxy/jsonrpc.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
package proxy

import (
"bytes"
"context"
"io"
"log"
"net/http"
"net/url"

"github.com/pokt-network/poktroll/pkg/relayer"
"github.com/pokt-network/poktroll/x/service/types"
sharedtypes "github.com/pokt-network/poktroll/x/shared/types"
)

var _ RelayServer = (*jsonRPCServer)(nil)
var _ relayer.RelayServer = (*jsonRPCServer)(nil)

type jsonRPCServer struct {
// serviceId is the id of the service that the server is responsible for.
Expand All @@ -22,28 +26,28 @@ type jsonRPCServer struct {
// proxiedServiceEndpoint is the address of the proxied service that the server relays requests to.
proxiedServiceEndpoint url.URL

// server is the http server that listens for incoming relay requests.
// server is the HTTP server that listens for incoming relay requests.
server *http.Server

// relayerProxy is the main relayer proxy that the server uses to perform its operations.
relayerProxy RelayerProxy
relayerProxy relayer.RelayerProxy

// servedRelaysProducer is a channel that emits the relays that have been served so that the
// servedRelays observable can fan out the notifications to its subscribers.
// servedRelaysProducer is a channel that emits the relays that have been served, allowing
// the servedRelays observable to fan-out notifications to its subscribers.
servedRelaysProducer chan<- *types.Relay
}

// NewJSONRPCServer creates a new HTTP server that listens for incoming relay requests
// and forwards them to the supported proxied service endpoint.
// It takes the serviceId, endpointUrl, and the main RelayerProxy as arguments and returns
// a RelayServer that listens to incoming RelayRequests
// a RelayServer that listens to incoming RelayRequests.
func NewJSONRPCServer(
serviceId *sharedtypes.ServiceId,
supplierEndpoint *sharedtypes.SupplierEndpoint,
proxiedServiceEndpoint url.URL,
servedRelaysProducer chan<- *types.Relay,
proxy RelayerProxy,
) RelayServer {
proxy relayer.RelayerProxy,
) relayer.RelayServer {
return &jsonRPCServer{
serviceId: serviceId,
serverEndpoint: supplierEndpoint,
Expand All @@ -57,29 +61,120 @@ func NewJSONRPCServer(
// Start starts the service server and returns an error if it fails.
// It also waits for the passed in context to end before shutting down.
// This method is blocking and should be called in a goroutine.
func (j *jsonRPCServer) Start(ctx context.Context) error {
func (jsrv *jsonRPCServer) Start(ctx context.Context) error {
go func() {
<-ctx.Done()
j.server.Shutdown(ctx)
jsrv.server.Shutdown(ctx)
}()

return j.server.ListenAndServe()
return jsrv.server.ListenAndServe()
}

// Stop terminates the service server and returns an error if it fails.
func (j *jsonRPCServer) Stop(ctx context.Context) error {
return j.server.Shutdown(ctx)
func (jsrv *jsonRPCServer) Stop(ctx context.Context) error {
return jsrv.server.Shutdown(ctx)
}

// ServiceId returns the serviceId of the JSON-RPC service.
func (j *jsonRPCServer) ServiceId() *sharedtypes.ServiceId {
return j.serviceId
func (jsrv *jsonRPCServer) ServiceId() *sharedtypes.ServiceId {
return jsrv.serviceId
}

// ServeHTTP listens for incoming relay requests. It implements the respective
// method of the http.Handler interface. It is called by http.ListenAndServe()
// when jsonRPCServer is used as an http.Handler with an http.Server.
// (see https://pkg.go.dev/net/http#Handler)
func (j *jsonRPCServer) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
panic("TODO: implement jsonRPCServer.ServeHTTP")
func (jsrv *jsonRPCServer) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
ctx := request.Context()
// Relay the request to the proxied service and build the response that will be sent back to the client.
relay, err := jsrv.serveHTTP(ctx, request)
if err != nil {
// Reply with an error if relay response could not be built.
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
jsrv.replyWithError(writer, err)
log.Printf("WARN: failed serving relay request: %s", err)
return
}

// Send the relay response to the client.
if err := jsrv.sendRelayResponse(relay.Res, writer); err != nil {
jsrv.replyWithError(writer, err)
log.Printf("WARN: failed sending relay response: %s", err)
return
}

red-0ne marked this conversation as resolved.
Show resolved Hide resolved
log.Print("INFO: relay request served successfully")
red-0ne marked this conversation as resolved.
Show resolved Hide resolved

// Emit the relay to the servedRelays observable.
jsrv.servedRelaysProducer <- relay
}

// serveHTTP holds the underlying logic of ServeHTTP.
func (jsrv *jsonRPCServer) serveHTTP(ctx context.Context, request *http.Request) (*types.Relay, error) {
// Extract the relay request from the request body.
relayRequest, err := jsrv.newRelayRequest(request)
if err != nil {
return nil, err
}

// Verify the relay request signature and session.
// TODO_TECHDEBT: Currently, the relayer proxy is responsible for verifying
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
// the relay request signature. This responsibility should be shifted to the relayer itself.
// Consider using a middleware pattern to handle non-proxy specific logic, such as
// request signature verification, session verification, and response signature.
// This would help in separating concerns and improving code maintainability.
if err := jsrv.relayerProxy.VerifyRelayRequest(ctx, relayRequest, jsrv.serviceId); err != nil {
return nil, err
}

// Get the relayRequest payload's `io.ReadCloser` to add it to the http.Request
// that will be sent to the native service.
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
// (see https://pkg.go.dev/net/http#Request) Body field type.
var payloadBz []byte
if _, err = relayRequest.Payload.MarshalTo(payloadBz); err != nil {
return nil, err
}
requestBodyReader := io.NopCloser(bytes.NewBuffer(payloadBz))

// Build the request to be sent to the native service by substituting
// the destination URL's host with the native service's listen address.
destinationURL, err := url.Parse(request.URL.String())
if err != nil {
return nil, err
}
destinationURL.Host = jsrv.proxiedServiceEndpoint.Host

relayHTTPRequest := &http.Request{
Method: request.Method,
Header: request.Header,
URL: destinationURL,
Host: destinationURL.Host,
Body: requestBodyReader,
}

// Send the relay request to the native service.
httpResponse, err := http.DefaultClient.Do(relayHTTPRequest)
if err != nil {
return nil, err
}

// Build the relay response from the native service response
// Use relayRequest.Meta.SessionHeader on the relayResponse session header since it was verified to be valid
// and has to be the same as the relayResponse session header.
relayResponse, err := jsrv.newRelayResponse(httpResponse, relayRequest.Meta.SessionHeader)
if err != nil {
return nil, err
}

return &types.Relay{Req: relayRequest, Res: relayResponse}, nil
}

// sendRelayResponse marshals the relay response and sends it to the client.
func (j *jsonRPCServer) sendRelayResponse(relayResponse *types.RelayResponse, writer http.ResponseWriter) error {
relayResponseBz, err := relayResponse.Marshal()
if err != nil {
return err
}

_, err = writer.Write(relayResponseBz)
return err
}
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
42 changes: 20 additions & 22 deletions pkg/relayer/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,28 @@ import (
accounttypes "github.com/cosmos/cosmos-sdk/x/auth/types"
"golang.org/x/sync/errgroup"

// TODO_INCOMPLETE(@red-0ne): Import the appropriate block client interface once available.
// blocktypes "github.com/pokt-network/poktroll/pkg/client"
blocktypes "github.com/pokt-network/poktroll/pkg/client"
"github.com/pokt-network/poktroll/pkg/observable"
"github.com/pokt-network/poktroll/pkg/observable/channel"
"github.com/pokt-network/poktroll/pkg/relayer"
"github.com/pokt-network/poktroll/x/service/types"
sessiontypes "github.com/pokt-network/poktroll/x/session/types"
suppliertypes "github.com/pokt-network/poktroll/x/supplier/types"
)

var _ RelayerProxy = (*relayerProxy)(nil)
var _ relayer.RelayerProxy = (*relayerProxy)(nil)

type (
serviceId = string
relayServersMap = map[serviceId][]RelayServer
relayServersMap = map[serviceId][]relayer.RelayServer
servicesEndpointsMap = map[serviceId]url.URL
)

// relayerProxy is the main relayer proxy that takes takes relay requests of supported services from the client
// and proxies them to the supported proxied services.
// It is responsible for notifying the miner about the relays that have been served so they can be counted
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
// when the miner enters the claim/proof phase.
// TODO_TEST: Have tests for the relayer proxy.
type relayerProxy struct {
// keyName is the supplier's key name in the Cosmos's keybase. It is used along with the keyring to
// get the supplier address and sign the relay responses.
Expand All @@ -34,8 +39,7 @@ type relayerProxy struct {

// blocksClient is the client used to get the block at the latest height from the blockchain
// and be notified of new incoming blocks. It is used to update the current session data.
// TODO_INCOMPLETE(@red-0ne): Uncomment once the BlockClient interface is available.
// blockClient blocktypes.BlockClient
blockClient blocktypes.BlockClient

// accountsQuerier is the querier used to get account data (e.g. app publicKey) from the blockchain,
// which, in the context of the RelayerProxy, is used to verify the relay request signatures.
Expand Down Expand Up @@ -63,25 +67,28 @@ type relayerProxy struct {
// servedRelaysProducer is a channel that emits the relays that have been served so that the
// servedRelays observable can fan out the notifications to its subscribers.
servedRelaysProducer chan<- *types.Relay

// clientCtx is the Cosmos' client context used to build the needed query clients and unmarshal their replies.
clientCtx sdkclient.Context

// supplierAddress is the address of the supplier that the relayer proxy is running for.
supplierAddress string
}

func NewRelayerProxy(
ctx context.Context,
clientCtx sdkclient.Context,
keyName string,
keyring keyring.Keyring,
proxiedServicesEndpoints servicesEndpointsMap,
// TODO_INCOMPLETE(@red-0ne): Uncomment once the BlockClient interface is available.
// blockClient blocktypes.BlockClient,
) RelayerProxy {
blockClient blocktypes.BlockClient,
) relayer.RelayerProxy {
accountQuerier := accounttypes.NewQueryClient(clientCtx)
supplierQuerier := suppliertypes.NewQueryClient(clientCtx)
sessionQuerier := sessiontypes.NewQueryClient(clientCtx)
servedRelays, servedRelaysProducer := channel.NewObservable[*types.Relay]()

return &relayerProxy{
// TODO_INCOMPLETE(@red-0ne): Uncomment once the BlockClient interface is available.
// blockClient: blockClient,
blockClient: blockClient,
keyName: keyName,
keyring: keyring,
accountsQuerier: accountQuerier,
Expand All @@ -90,6 +97,7 @@ func NewRelayerProxy(
proxiedServicesEndpoints: proxiedServicesEndpoints,
servedRelays: servedRelays,
servedRelaysProducer: servedRelaysProducer,
clientCtx: clientCtx,
}
}

Expand Down Expand Up @@ -137,13 +145,3 @@ func (rp *relayerProxy) Stop(ctx context.Context) error {
func (rp *relayerProxy) ServedRelays() observable.Observable[*types.Relay] {
return rp.servedRelays
}

// VerifyRelayRequest is a shared method used by RelayServers to check the relay request signature and session validity.
func (rp *relayerProxy) VerifyRelayRequest(relayRequest *types.RelayRequest) (isValid bool, err error) {
panic("TODO: implement relayerProxy.VerifyRelayRequest")
}

// SignRelayResponse is a shared method used by RelayServers to sign the relay response.
func (rp *relayerProxy) SignRelayResponse(relayResponse *types.RelayResponse) ([]byte, error) {
panic("TODO: implement relayerProxy.SignRelayResponse")
}
Loading