Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Proxy] feat: implement relayer proxy full workflow #101

Merged
merged 25 commits into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
006eb19
feat: add general proxy logic and server builder
red-0ne Oct 24, 2023
63f9467
fix: make var names and comments more consistent
red-0ne Oct 25, 2023
8f899fb
feat: add relay verification and signature
red-0ne Oct 25, 2023
69698c9
fix: interface assignment
red-0ne Oct 25, 2023
c81c3f6
feat: implement relayer proxy full workflow
red-0ne Oct 25, 2023
edbd628
chore: improve code comments
red-0ne Oct 25, 2023
08e1915
chore: change native terms to proxied
red-0ne Oct 26, 2023
4fd55e9
Merge branch 'feat/only-proxy-implementation' into feat/relayer-proxy
red-0ne Oct 26, 2023
1b835aa
chore: address change requests
red-0ne Oct 26, 2023
776e78c
chore: explain -32000 error code
red-0ne Oct 26, 2023
fbba106
chore: add log message on relaying success/failure
red-0ne Oct 26, 2023
2e41656
chore: Update AccountI/any unmarshaling comment
red-0ne Oct 26, 2023
4c6f7d9
Merge branch 'main' into feat/relayer-proxy
red-0ne Oct 28, 2023
b9dd8f3
Merge branch 'main' into feat/relayer-proxy
red-0ne Oct 28, 2023
b7fd6df
Merge remote-tracking branch 'origin/main' into feat/relayer-proxy
red-0ne Nov 7, 2023
46066a1
chore: update import paths to use GitHub organization
red-0ne Nov 7, 2023
920978d
Merge remote-tracking branch 'origin/main' into feat/relayer-proxy
red-0ne Nov 7, 2023
5041f43
chore: address change requests
red-0ne Nov 7, 2023
6ce1482
chore: add TODO for test implementation
red-0ne Nov 8, 2023
cae8969
chore: Refactor relay request & response methods in
red-0ne Nov 8, 2023
75e88c7
chore: Address request changes
red-0ne Nov 8, 2023
67273dd
chore: comment about current proxy signing approach
red-0ne Nov 8, 2023
3c94180
Merge remote-tracking branch 'origin/main' into feat/relayer-proxy
red-0ne Nov 8, 2023
6e30e58
chore: Revert relay request & response signing and
red-0ne Nov 8, 2023
230b74c
Merge remote-tracking branch 'origin/main' into feat/relayer-proxy
red-0ne Nov 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions pkg/relayer/proxy/error_reply.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package proxy

import (
"log"
"net/http"

"pocket/x/service/types"
)

// replyWithError builds a JSONRPCResponseError from the passed in error and writes it to the writer.
// TODO_TECHDEBT: This method should be aware of the request id and use it in the response by having
// the caller pass it along with the error if available.
// TODO_TECHDEBT: This method should be aware of the nature of the error to use the appropriate JSONRPC
// Code, Message and Data. Possibly by augmenting the passed in error with the adequate information.
func (j *jsonRPCServer) replyWithError(writer http.ResponseWriter, err error) {
relayResponse := &types.RelayResponse{
Payload: &types.RelayResponse_JsonRpcPayload{
JsonRpcPayload: &types.JSONRPCResponsePayload{
Id: make([]byte, 0),
Jsonrpc: "2.0",
Error: &types.JSONRPCResponseError{
// Using conventional error code indicating internal server error.
Code: -32000,
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
Message: err.Error(),
Data: nil,
},
},
},
}

relayResponseBz, err := relayResponse.Marshal()
if err != nil {
log.Printf("ERROR: failed marshaling relay response: %s", err)
return
}

if _, err = writer.Write(relayResponseBz); err != nil {
log.Printf("ERROR: failed writing relay response: %s", err)
return
}
}
7 changes: 5 additions & 2 deletions pkg/relayer/proxy/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package proxy
import sdkerrors "cosmossdk.io/errors"

var (
codespace = "relayer/proxy"
ErrUnsupportedRPCType = sdkerrors.Register(codespace, 1, "unsupported rpc type")
codespace = "relayer/proxy"
ErrUnsupportedRPCType = sdkerrors.Register(codespace, 1, "unsupported rpc type")
ErrInvalidRelayRequestSignature = sdkerrors.Register(codespace, 2, "invalid relay request signature")
ErrInvalidSession = sdkerrors.Register(codespace, 3, "invalid session")
ErrInvalidSupplier = sdkerrors.Register(codespace, 4, "invalid supplier")
)
2 changes: 1 addition & 1 deletion pkg/relayer/proxy/interface.go
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type RelayerProxy interface {

// VerifyRelayRequest is a shared method used by RelayServers to check the
// relay request signature and session validity.
VerifyRelayRequest(relayRequest *types.RelayRequest) (isValid bool, err error)
VerifyRelayRequest(ctx context.Context, relayRequest *types.RelayRequest, serviceId *sharedtypes.ServiceId) error

// SignRelayResponse is a shared method used by RelayServers to sign the relay response.
SignRelayResponse(relayResponse *types.RelayResponse) ([]byte, error)
Expand Down
115 changes: 102 additions & 13 deletions pkg/relayer/proxy/jsonrpc.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package proxy

import (
"bytes"
"context"
"io"
"log"
"net/http"
"net/url"

Expand All @@ -22,21 +25,21 @@
// proxiedServiceEndpoint is the address of the proxied service that the server relays requests to.
proxiedServiceEndpoint url.URL

// server is the http server that listens for incoming relay requests.
// server is the HTTP server that listens for incoming relay requests.
server *http.Server

// relayerProxy is the main relayer proxy that the server uses to perform its operations.
relayerProxy RelayerProxy

// servedRelaysProducer is a channel that emits the relays that have been served so that the
// servedRelays observable can fan out the notifications to its subscribers.
// servedRelaysProducer is a channel that emits the relays that have been served, allowing
// the servedRelays observable to fan-out notifications to its subscribers.
servedRelaysProducer chan<- *types.Relay
}

// NewJSONRPCServer creates a new HTTP server that listens for incoming relay requests
// and forwards them to the supported proxied service endpoint.
// It takes the serviceId, endpointUrl, and the main RelayerProxy as arguments and returns
// a RelayServer that listens to incoming RelayRequests
// a RelayServer that listens to incoming RelayRequests.
func NewJSONRPCServer(
serviceId *sharedtypes.ServiceId,
supplierEndpoint *sharedtypes.SupplierEndpoint,
Expand All @@ -57,29 +60,115 @@
// Start starts the service server and returns an error if it fails.
// It also waits for the passed in context to end before shutting down.
// This method is blocking and should be called in a goroutine.
func (j *jsonRPCServer) Start(ctx context.Context) error {
func (jsrv *jsonRPCServer) Start(ctx context.Context) error {
go func() {
<-ctx.Done()
j.server.Shutdown(ctx)
jsrv.server.Shutdown(ctx)
}()

return j.server.ListenAndServe()
return jsrv.server.ListenAndServe()
}

// Stop terminates the service server and returns an error if it fails.
func (j *jsonRPCServer) Stop(ctx context.Context) error {
return j.server.Shutdown(ctx)
func (jsrv *jsonRPCServer) Stop(ctx context.Context) error {
return jsrv.server.Shutdown(ctx)
}

// ServiceId returns the serviceId of the JSON-RPC service.
func (j *jsonRPCServer) ServiceId() *sharedtypes.ServiceId {
return j.serviceId
func (jsrv *jsonRPCServer) ServiceId() *sharedtypes.ServiceId {
return jsrv.serviceId
}

// ServeHTTP listens for incoming relay requests. It implements the respective
// method of the http.Handler interface. It is called by http.ListenAndServe()
// when jsonRPCServer is used as an http.Handler with an http.Server.
// (see https://pkg.go.dev/net/http#Handler)
func (j *jsonRPCServer) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
panic("TODO: implement jsonRPCServer.ServeHTTP")
func (jsrv *jsonRPCServer) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
ctx := request.Context()
// Relay the request to the proxied service and build the response that will be sent back to the client.
relay, err := jsrv.serveHTTP(ctx, request)
if err != nil {
// Reply with an error if relay response could not be built.
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
jsrv.replyWithError(writer, err)
log.Printf("WARN: failed serving relay request: %s", err)
return
}

// Send the relay response to the client.
if err := jsrv.sendRelayResponse(relay.Res, writer); err != nil {
jsrv.replyWithError(writer, err)
log.Print("WARN: failed sending relay response: %s", err)
return
}

red-0ne marked this conversation as resolved.
Show resolved Hide resolved
log.Print("INFO: relay request served successfully")
red-0ne marked this conversation as resolved.
Show resolved Hide resolved

// Emit the relay to the servedRelays observable.
jsrv.servedRelaysProducer <- relay
}

// serveHTTP holds the underlying logic of ServeHTTP.
func (jsrv *jsonRPCServer) serveHTTP(ctx context.Context, request *http.Request) (*types.Relay, error) {
// Extract the relay request from the request body.
relayRequest, err := jsrv.newRelayRequest(request)
if err != nil {
return nil, err
}

// Verify the relay request signature and session.
if err := jsrv.relayerProxy.VerifyRelayRequest(ctx, relayRequest, jsrv.serviceId); err != nil {
return nil, err
}

// Get the relayRequest payload's `io.ReadCloser` to add it to the http.Request
// that will be sent to the native service.
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
// (see https://pkg.go.dev/net/http#Request) Body field type.
var payloadBz []byte
if _, err = relayRequest.Payload.MarshalTo(payloadBz); err != nil {
return nil, err
}
requestBodyReader := io.NopCloser(bytes.NewBuffer(payloadBz))

// Build the request to be sent to the native service by substituting
// the destination URL's host with the native service's listen address.
destinationURL, err := url.Parse(request.URL.String())
if err != nil {
return nil, err
}
destinationURL.Host = jsrv.proxiedServiceEndpoint.Host

relayHTTPRequest := &http.Request{
Method: request.Method,
Header: request.Header,
URL: destinationURL,
Host: destinationURL.Host,
Body: requestBodyReader,
}

// Send the relay request to the native service.
httpResponse, err := http.DefaultClient.Do(relayHTTPRequest)
if err != nil {
return nil, err
}

// Build the relay response from the native service response
// Use relayRequest.Meta.SessionHeader on the relayResponse session header since it was verified to be valid
// and has to be the same as the relayResponse session header.
relayResponse, err := jsrv.newRelayResponse(httpResponse, relayRequest.Meta.SessionHeader)

Check failure on line 157 in pkg/relayer/proxy/jsonrpc.go

View workflow job for this annotation

GitHub Actions / build

relayRequest.Meta.SessionHeader undefined (type *"pocket/x/service/types".RelayRequestMetadata has no field or method SessionHeader)
if err != nil {
return nil, err
}

return &types.Relay{Req: relayRequest, Res: relayResponse}, nil
}

// sendRelayResponse marshals the relay response and sends it to the client.
func (j *jsonRPCServer) sendRelayResponse(relayResponse *types.RelayResponse, writer http.ResponseWriter) error {
relayResponseBz, err := relayResponse.Marshal()
if err != nil {
return err
}

_, err = writer.Write(relayResponseBz)
return err
}
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
29 changes: 11 additions & 18 deletions pkg/relayer/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@
accounttypes "github.com/cosmos/cosmos-sdk/x/auth/types"
"golang.org/x/sync/errgroup"

// TODO_INCOMPLETE(@red-0ne): Import the appropriate block client interface once available.
// blocktypes "pocket/pkg/client"
blocktypes "pocket/pkg/client"
"pocket/pkg/observable"
"pocket/pkg/observable/channel"
"pocket/x/service/types"
Expand All @@ -34,8 +33,7 @@

// blocksClient is the client used to get the block at the latest height from the blockchain
// and be notified of new incoming blocks. It is used to update the current session data.
// TODO_INCOMPLETE(@red-0ne): Uncomment once the BlockClient interface is available.
// blockClient blocktypes.BlockClient
blockClient blocktypes.BlockClient

Check failure on line 36 in pkg/relayer/proxy/proxy.go

View workflow job for this annotation

GitHub Actions / build

undefined: blocktypes.BlockClient

// accountsQuerier is the querier used to get account data (e.g. app publicKey) from the blockchain,
// which, in the context of the RelayerProxy, is used to verify the relay request signatures.
Expand Down Expand Up @@ -63,6 +61,12 @@
// servedRelaysProducer is a channel that emits the relays that have been served so that the
// servedRelays observable can fan out the notifications to its subscribers.
servedRelaysProducer chan<- *types.Relay

// clientCtx is the Cosmos' client context used to build the needed query clients and unmarshal their replies.
clientCtx sdkclient.Context

// supplierAddress is the address of the supplier that the relayer proxy is running for.
supplierAddress string
}

func NewRelayerProxy(
Expand All @@ -71,17 +75,15 @@
keyName string,
keyring keyring.Keyring,
proxiedServicesEndpoints servicesEndpointsMap,
// TODO_INCOMPLETE(@red-0ne): Uncomment once the BlockClient interface is available.
// blockClient blocktypes.BlockClient,
blockClient blocktypes.BlockClient,

Check failure on line 78 in pkg/relayer/proxy/proxy.go

View workflow job for this annotation

GitHub Actions / build

undefined: blocktypes.BlockClient
) RelayerProxy {
accountQuerier := accounttypes.NewQueryClient(clientCtx)
supplierQuerier := suppliertypes.NewQueryClient(clientCtx)
sessionQuerier := sessiontypes.NewQueryClient(clientCtx)
servedRelays, servedRelaysProducer := channel.NewObservable[*types.Relay]()

return &relayerProxy{
// TODO_INCOMPLETE(@red-0ne): Uncomment once the BlockClient interface is available.
// blockClient: blockClient,
blockClient: blockClient,
keyName: keyName,
keyring: keyring,
accountsQuerier: accountQuerier,
Expand All @@ -90,6 +92,7 @@
proxiedServicesEndpoints: proxiedServicesEndpoints,
servedRelays: servedRelays,
servedRelaysProducer: servedRelaysProducer,
clientCtx: clientCtx,
}
}

Expand Down Expand Up @@ -137,13 +140,3 @@
func (rp *relayerProxy) ServedRelays() observable.Observable[*types.Relay] {
return rp.servedRelays
}

// VerifyRelayRequest is a shared method used by RelayServers to check the relay request signature and session validity.
func (rp *relayerProxy) VerifyRelayRequest(relayRequest *types.RelayRequest) (isValid bool, err error) {
panic("TODO: implement relayerProxy.VerifyRelayRequest")
}

// SignRelayResponse is a shared method used by RelayServers to sign the relay response.
func (rp *relayerProxy) SignRelayResponse(relayResponse *types.RelayResponse) ([]byte, error) {
panic("TODO: implement relayerProxy.SignRelayResponse")
}
57 changes: 57 additions & 0 deletions pkg/relayer/proxy/relay_builders.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package proxy

import (
"io"
"net/http"

"pocket/x/service/types"
sessiontypes "pocket/x/session/types"
)

// newRelayRequest builds a RelayRequest from an http.Request.
func (j *jsonRPCServer) newRelayRequest(request *http.Request) (*types.RelayRequest, error) {
requestBz, err := io.ReadAll(request.Body)
if err != nil {
return nil, err
}

var relayRequest types.RelayRequest
if err := relayRequest.Unmarshal(requestBz); err != nil {
return nil, err
}

return &relayRequest, nil
}

// newRelayResponse builds a RelayResponse from an http.Response and a SessionHeader.
// It also signs the RelayResponse and assigns it to RelayResponse.Meta.SupplierSignature.
// If the response has a non-nil body, it will be parsed as a JSONRPCResponsePayload.
func (j *jsonRPCServer) newRelayResponse(
response *http.Response,
sessionHeader *sessiontypes.SessionHeader,
) (*types.RelayResponse, error) {
relayResponse := &types.RelayResponse{
Meta: &types.RelayResponseMetadata{SessionHeader: sessionHeader},

Check failure on line 34 in pkg/relayer/proxy/relay_builders.go

View workflow job for this annotation

GitHub Actions / build

unknown field SessionHeader in struct literal of type "pocket/x/service/types".RelayResponseMetadata
}

responseBz, err := io.ReadAll(response.Body)
if err != nil {
return nil, err
}

jsonRPCResponse := &types.JSONRPCResponsePayload{}
if err := jsonRPCResponse.Unmarshal(responseBz); err != nil {
return nil, err
}

relayResponse.Payload = &types.RelayResponse_JsonRpcPayload{JsonRpcPayload: jsonRPCResponse}

// Sign the relay response and add the signature to the relay response metadata
signature, err := j.relayerProxy.SignRelayResponse(relayResponse)
if err != nil {
return nil, err
}
relayResponse.Meta.SupplierSignature = signature

return relayResponse, nil
}
22 changes: 22 additions & 0 deletions pkg/relayer/proxy/relay_signer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package proxy
red-0ne marked this conversation as resolved.
Show resolved Hide resolved

import (
"github.com/cometbft/cometbft/crypto"

"pocket/x/service/types"
)

// SignRelayResponse is a shared method used by the RelayServers to sign a RelayResponse.Payload.
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
// It uses the keyring and keyName to sign the payload and returns the signature.
func (rp *relayerProxy) SignRelayResponse(relayResponse *types.RelayResponse) ([]byte, error) {
var payloadBz []byte
_, err := relayResponse.Payload.MarshalTo(payloadBz)
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, err
}

hash := crypto.Sha256(payloadBz)
signature, _, err := rp.keyring.Sign(rp.keyName, hash)

return signature, err
}
Loading
Loading