Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Proxy] feat: implement relayer proxy full workflow #101

Merged
merged 25 commits into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
006eb19
feat: add general proxy logic and server builder
red-0ne Oct 24, 2023
63f9467
fix: make var names and comments more consistent
red-0ne Oct 25, 2023
8f899fb
feat: add relay verification and signature
red-0ne Oct 25, 2023
69698c9
fix: interface assignment
red-0ne Oct 25, 2023
c81c3f6
feat: implement relayer proxy full workflow
red-0ne Oct 25, 2023
edbd628
chore: improve code comments
red-0ne Oct 25, 2023
08e1915
chore: change native terms to proxied
red-0ne Oct 26, 2023
4fd55e9
Merge branch 'feat/only-proxy-implementation' into feat/relayer-proxy
red-0ne Oct 26, 2023
1b835aa
chore: address change requests
red-0ne Oct 26, 2023
776e78c
chore: explain -32000 error code
red-0ne Oct 26, 2023
fbba106
chore: add log message on relaying success/failure
red-0ne Oct 26, 2023
2e41656
chore: Update AccountI/any unmarshaling comment
red-0ne Oct 26, 2023
4c6f7d9
Merge branch 'main' into feat/relayer-proxy
red-0ne Oct 28, 2023
b9dd8f3
Merge branch 'main' into feat/relayer-proxy
red-0ne Oct 28, 2023
b7fd6df
Merge remote-tracking branch 'origin/main' into feat/relayer-proxy
red-0ne Nov 7, 2023
46066a1
chore: update import paths to use GitHub organization
red-0ne Nov 7, 2023
920978d
Merge remote-tracking branch 'origin/main' into feat/relayer-proxy
red-0ne Nov 7, 2023
5041f43
chore: address change requests
red-0ne Nov 7, 2023
6ce1482
chore: add TODO for test implementation
red-0ne Nov 8, 2023
cae8969
chore: Refactor relay request & response methods in
red-0ne Nov 8, 2023
75e88c7
chore: Address request changes
red-0ne Nov 8, 2023
67273dd
chore: comment about current proxy signing approach
red-0ne Nov 8, 2023
3c94180
Merge remote-tracking branch 'origin/main' into feat/relayer-proxy
red-0ne Nov 8, 2023
6e30e58
chore: Revert relay request & response signing and
red-0ne Nov 8, 2023
230b74c
Merge remote-tracking branch 'origin/main' into feat/relayer-proxy
red-0ne Nov 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions pkg/relayer/interface.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,62 @@
package relayer

import (
"context"

"github.com/pokt-network/smt"

"github.com/pokt-network/poktroll/pkg/observable"
"github.com/pokt-network/poktroll/x/service/types"
sessiontypes "github.com/pokt-network/poktroll/x/session/types"
sharedtypes "github.com/pokt-network/poktroll/x/shared/types"
)

// RelayerProxy is the interface for the proxy that serves relays to the application.
// It is responsible for starting and stopping all supported RelayServers.
// While handling requests and responding in a closed loop, it also notifies
// the miner about the relays that have been served.
type RelayerProxy interface {

// Start starts all advertised relay servers and returns an error if any of them fail to start.
Start(ctx context.Context) error

// Stop stops all advertised relay servers and returns an error if any of them fail.
Stop(ctx context.Context) error

// ServedRelays returns an observable that notifies the miner about the relays that have been served.
// A served relay is one whose RelayRequest's signature and session have been verified,
// and its RelayResponse has been signed and successfully sent to the client.
ServedRelays() observable.Observable[*types.Relay]

// VerifyRelayRequest is a shared method used by RelayServers to check the
// relay request signature and session validity.
// TODO_TECHDEBT(@red-0ne): This method should be moved out of the RelayerProxy interface
// that should not be responsible for verifying relay requests.
VerifyRelayRequest(
ctx context.Context,
relayRequest *types.RelayRequest,
service *sharedtypes.Service,
) error

// SignRelayResponse is a shared method used by RelayServers to sign
// and append the signature to the RelayResponse.
// TODO_TECHDEBT(@red-0ne): This method should be moved out of the RelayerProxy interface
// that should not be responsible for signing relay responses.
SignRelayResponse(relayResponse *types.RelayResponse) error
}

// RelayServer is the interface of the advertised relay servers provided by the RelayerProxy.
type RelayServer interface {
// Start starts the service server and returns an error if it fails.
Start(ctx context.Context) error

// Stop terminates the service server and returns an error if it fails.
Stop(ctx context.Context) error

// Service returns the service to which the RelayServer relays.
Service() *sharedtypes.Service
}

// RelayerSessionsManager is an interface for managing the relayer's sessions and Sparse
// Merkle Sum Trees (SMSTs). It provides notifications about closing sessions that are
// ready to be claimed, and handles the creation and retrieval of SMSTs for a given session.
Expand Down
41 changes: 41 additions & 0 deletions pkg/relayer/proxy/error_reply.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package proxy

import (
"log"
"net/http"

"github.com/pokt-network/poktroll/x/service/types"
)

// replyWithError builds a JSONRPCResponseError from the passed in error and writes it to the writer.
// TODO_TECHDEBT: This method should be aware of the request id and use it in the response by having
// the caller pass it along with the error if available.
// TODO_TECHDEBT: This method should be aware of the nature of the error to use the appropriate JSONRPC
// Code, Message and Data. Possibly by augmenting the passed in error with the adequate information.
func (j *jsonRPCServer) replyWithError(writer http.ResponseWriter, err error) {
relayResponse := &types.RelayResponse{
Payload: &types.RelayResponse_JsonRpcPayload{
JsonRpcPayload: &types.JSONRPCResponsePayload{
Id: make([]byte, 0),
Jsonrpc: "2.0",
Error: &types.JSONRPCResponseError{
// Using conventional error code indicating internal server error.
Code: -32000,
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
Message: err.Error(),
Data: nil,
},
},
},
}

relayResponseBz, err := relayResponse.Marshal()
if err != nil {
log.Printf("ERROR: failed marshaling relay response: %s", err)
return
}

if _, err = writer.Write(relayResponseBz); err != nil {
log.Printf("ERROR: failed writing relay response: %s", err)
return
}
}
7 changes: 5 additions & 2 deletions pkg/relayer/proxy/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package proxy
import sdkerrors "cosmossdk.io/errors"

var (
codespace = "relayer/proxy"
ErrUnsupportedRPCType = sdkerrors.Register(codespace, 1, "unsupported rpc type")
codespace = "relayer/proxy"
ErrUnsupportedRPCType = sdkerrors.Register(codespace, 1, "unsupported rpc type")
ErrInvalidRelayRequestSignature = sdkerrors.Register(codespace, 2, "invalid relay request signature")
ErrInvalidSession = sdkerrors.Register(codespace, 3, "invalid session")
ErrInvalidSupplier = sdkerrors.Register(codespace, 4, "invalid supplier")
)
46 changes: 0 additions & 46 deletions pkg/relayer/proxy/interface.go

This file was deleted.

132 changes: 117 additions & 15 deletions pkg/relayer/proxy/jsonrpc.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
package proxy

import (
"bytes"
"context"
"io"
"log"
"net/http"
"net/url"

"github.com/pokt-network/poktroll/pkg/relayer"
"github.com/pokt-network/poktroll/x/service/types"
sharedtypes "github.com/pokt-network/poktroll/x/shared/types"
)

var _ RelayServer = (*jsonRPCServer)(nil)
var _ relayer.RelayServer = (*jsonRPCServer)(nil)

type jsonRPCServer struct {
// service is the service that the server is responsible for.
Expand All @@ -22,28 +26,28 @@ type jsonRPCServer struct {
// proxiedServiceEndpoint is the address of the proxied service that the server relays requests to.
proxiedServiceEndpoint url.URL

// server is the http server that listens for incoming relay requests.
// server is the HTTP server that listens for incoming relay requests.
server *http.Server

// relayerProxy is the main relayer proxy that the server uses to perform its operations.
relayerProxy RelayerProxy
relayerProxy relayer.RelayerProxy

// servedRelaysProducer is a channel that emits the relays that have been served so that the
// servedRelays observable can fan out the notifications to its subscribers.
// servedRelaysProducer is a channel that emits the relays that have been served, allowing
// the servedRelays observable to fan-out notifications to its subscribers.
servedRelaysProducer chan<- *types.Relay
}

// NewJSONRPCServer creates a new HTTP server that listens for incoming relay requests
// and forwards them to the supported proxied service endpoint.
// It takes the serviceId, endpointUrl, and the main RelayerProxy as arguments and returns
// a RelayServer that listens to incoming RelayRequests
// a RelayServer that listens to incoming RelayRequests.
func NewJSONRPCServer(
service *sharedtypes.Service,
supplierEndpoint *sharedtypes.SupplierEndpoint,
proxiedServiceEndpoint url.URL,
servedRelaysProducer chan<- *types.Relay,
proxy RelayerProxy,
) RelayServer {
proxy relayer.RelayerProxy,
) relayer.RelayServer {
return &jsonRPCServer{
service: service,
serverEndpoint: supplierEndpoint,
Expand All @@ -57,18 +61,18 @@ func NewJSONRPCServer(
// Start starts the service server and returns an error if it fails.
// It also waits for the passed in context to end before shutting down.
// This method is blocking and should be called in a goroutine.
func (j *jsonRPCServer) Start(ctx context.Context) error {
func (jsrv *jsonRPCServer) Start(ctx context.Context) error {
go func() {
<-ctx.Done()
j.server.Shutdown(ctx)
jsrv.server.Shutdown(ctx)
}()

return j.server.ListenAndServe()
return jsrv.server.ListenAndServe()
}

// Stop terminates the service server and returns an error if it fails.
func (j *jsonRPCServer) Stop(ctx context.Context) error {
return j.server.Shutdown(ctx)
func (jsrv *jsonRPCServer) Stop(ctx context.Context) error {
return jsrv.server.Shutdown(ctx)
}

// Service returns the JSON-RPC service.
Expand All @@ -80,6 +84,104 @@ func (j *jsonRPCServer) Service() *sharedtypes.Service {
// method of the http.Handler interface. It is called by http.ListenAndServe()
// when jsonRPCServer is used as an http.Handler with an http.Server.
// (see https://pkg.go.dev/net/http#Handler)
func (j *jsonRPCServer) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
panic("TODO: implement jsonRPCServer.ServeHTTP")
func (jsrv *jsonRPCServer) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
ctx := request.Context()
// Relay the request to the proxied service and build the response that will be sent back to the client.
relay, err := jsrv.serveHTTP(ctx, request)
if err != nil {
// Reply with an error if the relay could not be served.
jsrv.replyWithError(writer, err)
log.Printf("WARN: failed serving relay request: %s", err)
return
}

// Send the relay response to the client.
if err := jsrv.sendRelayResponse(relay.Res, writer); err != nil {
jsrv.replyWithError(writer, err)
log.Printf("WARN: failed sending relay response: %s", err)
return
}

red-0ne marked this conversation as resolved.
Show resolved Hide resolved
log.Printf(
"INFO: relay request served successfully for application %s, service %s, session start block height %d, proxied service %s",
relay.Res.Meta.SessionHeader.ApplicationAddress,
relay.Res.Meta.SessionHeader.Service.Id,
relay.Res.Meta.SessionHeader.SessionStartBlockHeight,
jsrv.serverEndpoint.Url,
)

// Emit the relay to the servedRelays observable.
jsrv.servedRelaysProducer <- relay
}

// serveHTTP holds the underlying logic of ServeHTTP.
func (jsrv *jsonRPCServer) serveHTTP(ctx context.Context, request *http.Request) (*types.Relay, error) {
// Extract the relay request from the request body.
relayRequest, err := jsrv.newRelayRequest(request)
if err != nil {
return nil, err
}

// Verify the relay request signature and session.
// TODO_TECHDEBT(red-0ne): Currently, the relayer proxy is responsible for verifying
// the relay request signature. This responsibility should be shifted to the relayer itself.
// Consider using a middleware pattern to handle non-proxy specific logic, such as
// request signature verification, session verification, and response signature.
// This would help in separating concerns and improving code maintainability.
// See https://github.com/pokt-network/poktroll/issues/160
if err = jsrv.relayerProxy.VerifyRelayRequest(ctx, relayRequest, jsrv.service); err != nil {
return nil, err
}

// Get the relayRequest payload's `io.ReadCloser` to add it to the http.Request
// that will be sent to the proxied (i.e. staked for) service.
// (see https://pkg.go.dev/net/http#Request) Body field type.
var payloadBz []byte
if _, err = relayRequest.Payload.MarshalTo(payloadBz); err != nil {
return nil, err
}
requestBodyReader := io.NopCloser(bytes.NewBuffer(payloadBz))

// Build the request to be sent to the native service by substituting
// the destination URL's host with the native service's listen address.
destinationURL, err := url.Parse(request.URL.String())
if err != nil {
return nil, err
}
destinationURL.Host = jsrv.proxiedServiceEndpoint.Host

relayHTTPRequest := &http.Request{
Method: request.Method,
Header: request.Header,
URL: destinationURL,
Host: destinationURL.Host,
Body: requestBodyReader,
}

// Send the relay request to the native service.
httpResponse, err := http.DefaultClient.Do(relayHTTPRequest)
if err != nil {
return nil, err
}

// Build the relay response from the native service response
// Use relayRequest.Meta.SessionHeader on the relayResponse session header since it was verified to be valid
// and has to be the same as the relayResponse session header.
relayResponse, err := jsrv.newRelayResponse(httpResponse, relayRequest.Meta.SessionHeader)
if err != nil {
return nil, err
}

return &types.Relay{Req: relayRequest, Res: relayResponse}, nil
}

// sendRelayResponse marshals the relay response and sends it to the client.
func (j *jsonRPCServer) sendRelayResponse(relayResponse *types.RelayResponse, writer http.ResponseWriter) error {
relayResponseBz, err := relayResponse.Marshal()
if err != nil {
return err
}

_, err = writer.Write(relayResponseBz)
return err
}
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
Loading