Skip to content

Commit

Permalink
[Proxy] feat: implement relayer proxy full workflow (#101)
Browse files Browse the repository at this point in the history
* feat: add general proxy logic and server builder

* fix: make var names and comments more consistent

* feat: add relay verification and signature

* fix: interface assignment

* feat: implement relayer proxy full workflow

* chore: improve code comments

* chore: change native terms to proxied

* chore: address change requests

* chore: explain -32000 error code

* chore: add log message on relaying success/failure

* chore: Update AccountI/any unmarshaling comment

Co-authored-by: Daniel Olshansky <[email protected]>

* chore: update import paths to use GitHub organization
name

* chore: address change requests

* chore: add TODO for test implementation

* chore: Refactor relay request & response methods in
RelayerProxy

* chore: Address request changes

* chore: comment about current proxy signing approach

* Merge remote-tracking branch 'origin/main' into feat/relayer-proxy

* chore: Revert relay request & response signing and
verification interface

---------

Co-authored-by: Daniel Olshansky <[email protected]>
  • Loading branch information
red-0ne and Olshansk authored Nov 8, 2023
1 parent 2bc3017 commit 260b823
Show file tree
Hide file tree
Showing 12 changed files with 399 additions and 93 deletions.
50 changes: 50 additions & 0 deletions pkg/relayer/interface.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,62 @@
package relayer

import (
"context"

"github.com/pokt-network/smt"

"github.com/pokt-network/poktroll/pkg/observable"
"github.com/pokt-network/poktroll/x/service/types"
sessiontypes "github.com/pokt-network/poktroll/x/session/types"
sharedtypes "github.com/pokt-network/poktroll/x/shared/types"
)

// RelayerProxy is the interface for the proxy that serves relays to the application.
// It is responsible for starting and stopping all supported RelayServers.
// While handling requests and responding in a closed loop, it also notifies
// the miner about the relays that have been served.
type RelayerProxy interface {

// Start starts all advertised relay servers and returns an error if any of them fail to start.
Start(ctx context.Context) error

// Stop stops all advertised relay servers and returns an error if any of them fail.
Stop(ctx context.Context) error

// ServedRelays returns an observable that notifies the miner about the relays that have been served.
// A served relay is one whose RelayRequest's signature and session have been verified,
// and its RelayResponse has been signed and successfully sent to the client.
ServedRelays() observable.Observable[*types.Relay]

// VerifyRelayRequest is a shared method used by RelayServers to check the
// relay request signature and session validity.
// TODO_TECHDEBT(@red-0ne): This method should be moved out of the RelayerProxy interface
// that should not be responsible for verifying relay requests.
VerifyRelayRequest(
ctx context.Context,
relayRequest *types.RelayRequest,
service *sharedtypes.Service,
) error

// SignRelayResponse is a shared method used by RelayServers to sign
// and append the signature to the RelayResponse.
// TODO_TECHDEBT(@red-0ne): This method should be moved out of the RelayerProxy interface
// that should not be responsible for signing relay responses.
SignRelayResponse(relayResponse *types.RelayResponse) error
}

// RelayServer is the interface of the advertised relay servers provided by the RelayerProxy.
type RelayServer interface {
// Start starts the service server and returns an error if it fails.
Start(ctx context.Context) error

// Stop terminates the service server and returns an error if it fails.
Stop(ctx context.Context) error

// Service returns the service to which the RelayServer relays.
Service() *sharedtypes.Service
}

// RelayerSessionsManager is an interface for managing the relayer's sessions and Sparse
// Merkle Sum Trees (SMSTs). It provides notifications about closing sessions that are
// ready to be claimed, and handles the creation and retrieval of SMSTs for a given session.
Expand Down
41 changes: 41 additions & 0 deletions pkg/relayer/proxy/error_reply.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package proxy

import (
"log"
"net/http"

"github.com/pokt-network/poktroll/x/service/types"
)

// replyWithError builds a JSONRPCResponseError from the passed in error and writes it to the writer.
// TODO_TECHDEBT: This method should be aware of the request id and use it in the response by having
// the caller pass it along with the error if available.
// TODO_TECHDEBT: This method should be aware of the nature of the error to use the appropriate JSONRPC
// Code, Message and Data. Possibly by augmenting the passed in error with the adequate information.
func (j *jsonRPCServer) replyWithError(writer http.ResponseWriter, err error) {
relayResponse := &types.RelayResponse{
Payload: &types.RelayResponse_JsonRpcPayload{
JsonRpcPayload: &types.JSONRPCResponsePayload{
Id: make([]byte, 0),
Jsonrpc: "2.0",
Error: &types.JSONRPCResponseError{
// Using conventional error code indicating internal server error.
Code: -32000,
Message: err.Error(),
Data: nil,
},
},
},
}

relayResponseBz, err := relayResponse.Marshal()
if err != nil {
log.Printf("ERROR: failed marshaling relay response: %s", err)
return
}

if _, err = writer.Write(relayResponseBz); err != nil {
log.Printf("ERROR: failed writing relay response: %s", err)
return
}
}
7 changes: 5 additions & 2 deletions pkg/relayer/proxy/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package proxy
import sdkerrors "cosmossdk.io/errors"

var (
codespace = "relayer/proxy"
ErrUnsupportedRPCType = sdkerrors.Register(codespace, 1, "unsupported rpc type")
codespace = "relayer/proxy"
ErrUnsupportedRPCType = sdkerrors.Register(codespace, 1, "unsupported rpc type")
ErrInvalidRelayRequestSignature = sdkerrors.Register(codespace, 2, "invalid relay request signature")
ErrInvalidSession = sdkerrors.Register(codespace, 3, "invalid session")
ErrInvalidSupplier = sdkerrors.Register(codespace, 4, "invalid supplier")
)
46 changes: 0 additions & 46 deletions pkg/relayer/proxy/interface.go

This file was deleted.

132 changes: 117 additions & 15 deletions pkg/relayer/proxy/jsonrpc.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
package proxy

import (
"bytes"
"context"
"io"
"log"
"net/http"
"net/url"

"github.com/pokt-network/poktroll/pkg/relayer"
"github.com/pokt-network/poktroll/x/service/types"
sharedtypes "github.com/pokt-network/poktroll/x/shared/types"
)

var _ RelayServer = (*jsonRPCServer)(nil)
var _ relayer.RelayServer = (*jsonRPCServer)(nil)

type jsonRPCServer struct {
// service is the service that the server is responsible for.
Expand All @@ -22,28 +26,28 @@ type jsonRPCServer struct {
// proxiedServiceEndpoint is the address of the proxied service that the server relays requests to.
proxiedServiceEndpoint url.URL

// server is the http server that listens for incoming relay requests.
// server is the HTTP server that listens for incoming relay requests.
server *http.Server

// relayerProxy is the main relayer proxy that the server uses to perform its operations.
relayerProxy RelayerProxy
relayerProxy relayer.RelayerProxy

// servedRelaysProducer is a channel that emits the relays that have been served so that the
// servedRelays observable can fan out the notifications to its subscribers.
// servedRelaysProducer is a channel that emits the relays that have been served, allowing
// the servedRelays observable to fan-out notifications to its subscribers.
servedRelaysProducer chan<- *types.Relay
}

// NewJSONRPCServer creates a new HTTP server that listens for incoming relay requests
// and forwards them to the supported proxied service endpoint.
// It takes the serviceId, endpointUrl, and the main RelayerProxy as arguments and returns
// a RelayServer that listens to incoming RelayRequests
// a RelayServer that listens to incoming RelayRequests.
func NewJSONRPCServer(
service *sharedtypes.Service,
supplierEndpoint *sharedtypes.SupplierEndpoint,
proxiedServiceEndpoint url.URL,
servedRelaysProducer chan<- *types.Relay,
proxy RelayerProxy,
) RelayServer {
proxy relayer.RelayerProxy,
) relayer.RelayServer {
return &jsonRPCServer{
service: service,
serverEndpoint: supplierEndpoint,
Expand All @@ -57,18 +61,18 @@ func NewJSONRPCServer(
// Start starts the service server and returns an error if it fails.
// It also waits for the passed in context to end before shutting down.
// This method is blocking and should be called in a goroutine.
func (j *jsonRPCServer) Start(ctx context.Context) error {
func (jsrv *jsonRPCServer) Start(ctx context.Context) error {
go func() {
<-ctx.Done()
j.server.Shutdown(ctx)
jsrv.server.Shutdown(ctx)
}()

return j.server.ListenAndServe()
return jsrv.server.ListenAndServe()
}

// Stop terminates the service server and returns an error if it fails.
func (j *jsonRPCServer) Stop(ctx context.Context) error {
return j.server.Shutdown(ctx)
func (jsrv *jsonRPCServer) Stop(ctx context.Context) error {
return jsrv.server.Shutdown(ctx)
}

// Service returns the JSON-RPC service.
Expand All @@ -80,6 +84,104 @@ func (j *jsonRPCServer) Service() *sharedtypes.Service {
// method of the http.Handler interface. It is called by http.ListenAndServe()
// when jsonRPCServer is used as an http.Handler with an http.Server.
// (see https://pkg.go.dev/net/http#Handler)
func (j *jsonRPCServer) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
panic("TODO: implement jsonRPCServer.ServeHTTP")
func (jsrv *jsonRPCServer) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
ctx := request.Context()
// Relay the request to the proxied service and build the response that will be sent back to the client.
relay, err := jsrv.serveHTTP(ctx, request)
if err != nil {
// Reply with an error if the relay could not be served.
jsrv.replyWithError(writer, err)
log.Printf("WARN: failed serving relay request: %s", err)
return
}

// Send the relay response to the client.
if err := jsrv.sendRelayResponse(relay.Res, writer); err != nil {
jsrv.replyWithError(writer, err)
log.Printf("WARN: failed sending relay response: %s", err)
return
}

log.Printf(
"INFO: relay request served successfully for application %s, service %s, session start block height %d, proxied service %s",
relay.Res.Meta.SessionHeader.ApplicationAddress,
relay.Res.Meta.SessionHeader.Service.Id,
relay.Res.Meta.SessionHeader.SessionStartBlockHeight,
jsrv.serverEndpoint.Url,
)

// Emit the relay to the servedRelays observable.
jsrv.servedRelaysProducer <- relay
}

// serveHTTP holds the underlying logic of ServeHTTP.
func (jsrv *jsonRPCServer) serveHTTP(ctx context.Context, request *http.Request) (*types.Relay, error) {
// Extract the relay request from the request body.
relayRequest, err := jsrv.newRelayRequest(request)
if err != nil {
return nil, err
}

// Verify the relay request signature and session.
// TODO_TECHDEBT(red-0ne): Currently, the relayer proxy is responsible for verifying
// the relay request signature. This responsibility should be shifted to the relayer itself.
// Consider using a middleware pattern to handle non-proxy specific logic, such as
// request signature verification, session verification, and response signature.
// This would help in separating concerns and improving code maintainability.
// See https://github.com/pokt-network/poktroll/issues/160
if err = jsrv.relayerProxy.VerifyRelayRequest(ctx, relayRequest, jsrv.service); err != nil {
return nil, err
}

// Get the relayRequest payload's `io.ReadCloser` to add it to the http.Request
// that will be sent to the proxied (i.e. staked for) service.
// (see https://pkg.go.dev/net/http#Request) Body field type.
var payloadBz []byte
if _, err = relayRequest.Payload.MarshalTo(payloadBz); err != nil {
return nil, err
}
requestBodyReader := io.NopCloser(bytes.NewBuffer(payloadBz))

// Build the request to be sent to the native service by substituting
// the destination URL's host with the native service's listen address.
destinationURL, err := url.Parse(request.URL.String())
if err != nil {
return nil, err
}
destinationURL.Host = jsrv.proxiedServiceEndpoint.Host

relayHTTPRequest := &http.Request{
Method: request.Method,
Header: request.Header,
URL: destinationURL,
Host: destinationURL.Host,
Body: requestBodyReader,
}

// Send the relay request to the native service.
httpResponse, err := http.DefaultClient.Do(relayHTTPRequest)
if err != nil {
return nil, err
}

// Build the relay response from the native service response
// Use relayRequest.Meta.SessionHeader on the relayResponse session header since it was verified to be valid
// and has to be the same as the relayResponse session header.
relayResponse, err := jsrv.newRelayResponse(httpResponse, relayRequest.Meta.SessionHeader)
if err != nil {
return nil, err
}

return &types.Relay{Req: relayRequest, Res: relayResponse}, nil
}

// sendRelayResponse marshals the relay response and sends it to the client.
func (j *jsonRPCServer) sendRelayResponse(relayResponse *types.RelayResponse, writer http.ResponseWriter) error {
relayResponseBz, err := relayResponse.Marshal()
if err != nil {
return err
}

_, err = writer.Write(relayResponseBz)
return err
}
Loading

0 comments on commit 260b823

Please sign in to comment.