Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Proxy] feat: implement relayer proxy full workflow #101

Merged
merged 25 commits into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
006eb19
feat: add general proxy logic and server builder
red-0ne Oct 24, 2023
63f9467
fix: make var names and comments more consistent
red-0ne Oct 25, 2023
8f899fb
feat: add relay verification and signature
red-0ne Oct 25, 2023
69698c9
fix: interface assignment
red-0ne Oct 25, 2023
c81c3f6
feat: implement relayer proxy full workflow
red-0ne Oct 25, 2023
edbd628
chore: improve code comments
red-0ne Oct 25, 2023
08e1915
chore: change native terms to proxied
red-0ne Oct 26, 2023
4fd55e9
Merge branch 'feat/only-proxy-implementation' into feat/relayer-proxy
red-0ne Oct 26, 2023
1b835aa
chore: address change requests
red-0ne Oct 26, 2023
776e78c
chore: explain -32000 error code
red-0ne Oct 26, 2023
fbba106
chore: add log message on relaying success/failure
red-0ne Oct 26, 2023
2e41656
chore: Update AccountI/any unmarshaling comment
red-0ne Oct 26, 2023
4c6f7d9
Merge branch 'main' into feat/relayer-proxy
red-0ne Oct 28, 2023
b9dd8f3
Merge branch 'main' into feat/relayer-proxy
red-0ne Oct 28, 2023
b7fd6df
Merge remote-tracking branch 'origin/main' into feat/relayer-proxy
red-0ne Nov 7, 2023
46066a1
chore: update import paths to use GitHub organization
red-0ne Nov 7, 2023
920978d
Merge remote-tracking branch 'origin/main' into feat/relayer-proxy
red-0ne Nov 7, 2023
5041f43
chore: address change requests
red-0ne Nov 7, 2023
6ce1482
chore: add TODO for test implementation
red-0ne Nov 8, 2023
cae8969
chore: Refactor relay request & response methods in
red-0ne Nov 8, 2023
75e88c7
chore: Address request changes
red-0ne Nov 8, 2023
67273dd
chore: comment about current proxy signing approach
red-0ne Nov 8, 2023
3c94180
Merge remote-tracking branch 'origin/main' into feat/relayer-proxy
red-0ne Nov 8, 2023
6e30e58
chore: Revert relay request & response signing and
red-0ne Nov 8, 2023
230b74c
Merge remote-tracking branch 'origin/main' into feat/relayer-proxy
red-0ne Nov 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions pkg/relayer/proxy/error_reply.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ import (
)

// replyWithError builds a JSONRPCResponseError from the passed in error and writes it to the writer.
// TODO_IMPROVE: This method should be aware of the request id and use it in the response by having
// TODO_TECHDEBT: This method should be aware of the request id and use it in the response by having
// the caller pass it along with the error if available.
// TODO_IMPROVE: This method should be aware of the nature of the error to use the appropriate JSONRPC
// TODO_TECHDEBT: This method should be aware of the nature of the error to use the appropriate JSONRPC
// Code, Message and Data. Possibly by augmenting the passed in error with the adequate information.
func (j *jsonRPCServer) replyWithError(writer http.ResponseWriter, err error) {
relayResponse := &types.RelayResponse{
Expand All @@ -19,6 +19,7 @@ func (j *jsonRPCServer) replyWithError(writer http.ResponseWriter, err error) {
Id: make([]byte, 0),
Jsonrpc: "2.0",
Error: &types.JSONRPCResponseError{
// Using conventional error code indicating internal server error.
Code: -32000,
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
Message: err.Error(),
Data: nil,
Expand All @@ -33,8 +34,7 @@ func (j *jsonRPCServer) replyWithError(writer http.ResponseWriter, err error) {
return
}

_, err = writer.Write(relayResponseBz)
if err != nil {
if _, err = writer.Write(relayResponseBz); err != nil {
log.Printf("ERROR: failed writing relay response: %s", err)
return
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/relayer/proxy/errors.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package proxy

import errorsmod "cosmossdk.io/errors"
import sdkerrors "cosmossdk.io/errors"

var (
ErrUnsupportedRPCType = errorsmod.Register(codespace, 1, "unsupported rpc type")
ErrInvalidSignature = errorsmod.Register(codespace, 2, "invalid signature")
ErrInvalidSession = errorsmod.Register(codespace, 3, "invalid session")
ErrInvalidSupplier = errorsmod.Register(codespace, 4, "invalid supplier")
codespace = "relayer/proxy"
codespace = "relayer/proxy"
ErrUnsupportedRPCType = sdkerrors.Register(codespace, 1, "unsupported rpc type")
ErrInvalidRelayRequestSignature = sdkerrors.Register(codespace, 2, "invalid relay request signature")
ErrInvalidSession = sdkerrors.Register(codespace, 3, "invalid session")
ErrInvalidSupplier = sdkerrors.Register(codespace, 4, "invalid supplier")
)
5 changes: 3 additions & 2 deletions pkg/relayer/proxy/interface.go
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"pocket/pkg/observable"
"pocket/x/service/types"
sharedtypes "pocket/x/shared/types"
)

// RelayerProxy is the interface for the proxy that serves relays to the application.
Expand All @@ -26,7 +27,7 @@ type RelayerProxy interface {

// VerifyRelayRequest is a shared method used by RelayServers to check the
// relay request signature and session validity.
VerifyRelayRequest(ctx context.Context, relayRequest *types.RelayRequest, serviceId string) error
VerifyRelayRequest(ctx context.Context, relayRequest *types.RelayRequest, serviceId *sharedtypes.ServiceId) error

// SignRelayResponse is a shared method used by RelayServers to sign the relay response.
SignRelayResponse(relayResponse *types.RelayResponse) ([]byte, error)
Expand All @@ -41,5 +42,5 @@ type RelayServer interface {
Stop(ctx context.Context) error

// ServiceId returns the serviceId of the service.
ServiceId() string
ServiceId() *sharedtypes.ServiceId
}
94 changes: 49 additions & 45 deletions pkg/relayer/proxy/jsonrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,26 @@ import (
"bytes"
"context"
"io"
"log"
"net/http"
"net/url"

"pocket/x/service/types"
sharedtypes "pocket/x/shared/types"
)

var _ RelayServer = (*jsonRPCServer)(nil)

type jsonRPCServer struct {
// serviceId is the identifier of the service that the server is responsible for.
serviceId string
// serviceId is the id of the service that the server is responsible for.
serviceId *sharedtypes.ServiceId

// endpointUrl is the URL that the server listens to for incoming relay requests.
endpointUrl string
// serverEndpoint is the advertised endpoint configuration that the server uses to
// listen for incoming relay requests.
serverEndpoint *sharedtypes.SupplierEndpoint

// nativeServiceListenAddress is the address of the native service to which the server relays requests.
nativeServiceListenAddress string
// proxiedServiceEndpoint is the address of the proxied service that the server relays requests to.
proxiedServiceEndpoint url.URL

// server is the HTTP server that listens for incoming relay requests.
server *http.Server
Expand All @@ -34,87 +37,92 @@ type jsonRPCServer struct {
}

// NewJSONRPCServer creates a new HTTP server that listens for incoming relay requests
// and relays them to the supported native service.
// and forwards them to the supported proxied service endpoint.
// It takes the serviceId, endpointUrl, and the main RelayerProxy as arguments and returns
// a RelayServer that listens to incoming RelayRequests.
func NewJSONRPCServer(
serviceId string,
endpointUrl string,
nativeServiceListenAddress string,
serviceId *sharedtypes.ServiceId,
supplierEndpoint *sharedtypes.SupplierEndpoint,
proxiedServiceEndpoint url.URL,
servedRelaysProducer chan<- *types.Relay,
proxy RelayerProxy,
) RelayServer {
return &jsonRPCServer{
serviceId: serviceId,
endpointUrl: endpointUrl,
server: &http.Server{Addr: endpointUrl},
relayerProxy: proxy,
nativeServiceListenAddress: nativeServiceListenAddress,
servedRelaysProducer: servedRelaysProducer,
serviceId: serviceId,
serverEndpoint: supplierEndpoint,
server: &http.Server{Addr: supplierEndpoint.Url},
relayerProxy: proxy,
proxiedServiceEndpoint: proxiedServiceEndpoint,
servedRelaysProducer: servedRelaysProducer,
}
}

// Start starts the service server and returns an error if it fails.
// It also waits for the passed-in context to be done in order to shut down.
// It also waits for the passed in context to end before shutting down.
// This method is blocking and should be called in a goroutine.
func (j *jsonRPCServer) Start(ctx context.Context) error {
func (jsrv *jsonRPCServer) Start(ctx context.Context) error {
go func() {
<-ctx.Done()
j.server.Shutdown(ctx)
jsrv.server.Shutdown(ctx)
}()

return j.server.ListenAndServe()
return jsrv.server.ListenAndServe()
}

// Stop terminates the service server and returns an error if it fails.
func (j *jsonRPCServer) Stop(ctx context.Context) error {
return j.server.Shutdown(ctx)
func (jsrv *jsonRPCServer) Stop(ctx context.Context) error {
return jsrv.server.Shutdown(ctx)
}

// ServiceId returns the serviceId of the service.
func (j *jsonRPCServer) ServiceId() string {
return j.serviceId
// ServiceId returns the serviceId of the JSON-RPC service.
func (jsrv *jsonRPCServer) ServiceId() *sharedtypes.ServiceId {
return jsrv.serviceId
}

// ServeHTTP listens for incoming relay requests. It implements the respective
// method of the http.Handler interface. It is called by http.ListenAndServe()
// when jsonRPCServer is used as an http.Handler with an http.Server.
// (see https://pkg.go.dev/net/http#Handler)
func (j *jsonRPCServer) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
func (jsrv *jsonRPCServer) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
ctx := request.Context()
// Relay the request to the native service and build the response that will be sent back to the client.
relay, err := j.serveHTTP(ctx, request)
// Relay the request to the proxied service and build the response that will be sent back to the client.
relay, err := jsrv.serveHTTP(ctx, request)
if err != nil {
// Reply with an error if relay response could not be built.
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
j.replyWithError(writer, err)
jsrv.replyWithError(writer, err)
log.Printf("WARN: failed serving relay request: %s", err)
return
}

// Send the relay response to the client.
if err := j.sendRelayResponse(relay.Res, writer); err != nil {
j.replyWithError(writer, err)
if err := jsrv.sendRelayResponse(relay.Res, writer); err != nil {
jsrv.replyWithError(writer, err)
log.Print("WARN: failed sending relay response: %s", err)
return
}

red-0ne marked this conversation as resolved.
Show resolved Hide resolved
log.Print("INFO: relay request served successfully")
red-0ne marked this conversation as resolved.
Show resolved Hide resolved

// Emit the relay to the servedRelays observable.
j.servedRelaysProducer <- relay
jsrv.servedRelaysProducer <- relay
}

// serveHTTP holds the underlying logic of ServeHTTP.
func (j *jsonRPCServer) serveHTTP(ctx context.Context, request *http.Request) (*types.Relay, error) {
func (jsrv *jsonRPCServer) serveHTTP(ctx context.Context, request *http.Request) (*types.Relay, error) {
// Extract the relay request from the request body.
relayRequest, err := j.newRelayRequest(request)
relayRequest, err := jsrv.newRelayRequest(request)
if err != nil {
return nil, err
}

// Verify the relay request signature and session.
if err := j.relayerProxy.VerifyRelayRequest(ctx, relayRequest, j.serviceId); err != nil {
if err := jsrv.relayerProxy.VerifyRelayRequest(ctx, relayRequest, jsrv.serviceId); err != nil {
return nil, err
}

// Get the relayRequest payload's ReadCloser to add it to the http.Request
// Get the relayRequest payload's `io.ReadCloser` to add it to the http.Request
// that will be sent to the native service.
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
// (see https://pkg.go.dev/net/http#Request) Body field type.
var payloadBz []byte
if _, err = relayRequest.Payload.MarshalTo(payloadBz); err != nil {
return nil, err
Expand All @@ -127,7 +135,7 @@ func (j *jsonRPCServer) serveHTTP(ctx context.Context, request *http.Request) (*
if err != nil {
return nil, err
}
destinationURL.Host = j.nativeServiceListenAddress
destinationURL.Host = jsrv.proxiedServiceEndpoint.Host

relayHTTPRequest := &http.Request{
Method: request.Method,
Expand All @@ -146,7 +154,7 @@ func (j *jsonRPCServer) serveHTTP(ctx context.Context, request *http.Request) (*
// Build the relay response from the native service response
// Use relayRequest.Meta.SessionHeader on the relayResponse session header since it was verified to be valid
// and has to be the same as the relayResponse session header.
relayResponse, err := j.newRelayResponse(httpResponse, relayRequest.Meta.SessionHeader)
relayResponse, err := jsrv.newRelayResponse(httpResponse, relayRequest.Meta.SessionHeader)
if err != nil {
return nil, err
}
Expand All @@ -156,15 +164,11 @@ func (j *jsonRPCServer) serveHTTP(ctx context.Context, request *http.Request) (*

// sendRelayResponse marshals the relay response and sends it to the client.
func (j *jsonRPCServer) sendRelayResponse(relayResponse *types.RelayResponse, writer http.ResponseWriter) error {
relayResposeBz, err := relayResponse.Marshal()
if err != nil {
return err
}

_, err = writer.Write(relayResposeBz)
relayResponseBz, err := relayResponse.Marshal()
if err != nil {
return err
}

return nil
_, err = writer.Write(relayResponseBz)
return err
}
red-0ne marked this conversation as resolved.
Show resolved Hide resolved
52 changes: 27 additions & 25 deletions pkg/relayer/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@

import (
"context"
"net/url"

sdkclient "github.com/cosmos/cosmos-sdk/client"
"github.com/cosmos/cosmos-sdk/crypto/keyring"
accounttypes "github.com/cosmos/cosmos-sdk/x/auth/types"
"golang.org/x/sync/errgroup"

blocktypes "pocket/pkg/client"

Check failure on line 12 in pkg/relayer/proxy/proxy.go

View workflow job for this annotation

GitHub Actions / build

package pocket/pkg/client is not in GOROOT (/opt/hostedtoolcache/go/1.20.10/x64/src/pocket/pkg/client)
"pocket/pkg/observable"
"pocket/pkg/observable/channel"
"pocket/x/service/types"
Expand All @@ -19,8 +20,9 @@
var _ RelayerProxy = (*relayerProxy)(nil)

type (
serviceId = string
RelayServersMap = map[serviceId][]RelayServer
serviceId = string
relayServersMap = map[serviceId][]RelayServer
servicesEndpointsMap = map[serviceId]url.URL
)

type relayerProxy struct {
Expand All @@ -47,12 +49,11 @@

// advertisedRelayServers is a map of the services provided by the relayer proxy. Each provided service
// has the necessary information to start the server that listens for incoming relay requests and
// the client that relays the request to the supported native service.
advertisedRelayServers RelayServersMap
// the client that relays the request to the supported proxied service.
advertisedRelayServers relayServersMap

// nativeServiceListenAddresses is a map of the native services server's listen addresses
// that are supported by the relayer proxy.
nativeServicesListenAddress map[serviceId]string
// proxiedServicesEndpoints is a map of the proxied services endpoints that the relayer proxy supports.
proxiedServicesEndpoints servicesEndpointsMap

// servedRelays is an observable that notifies the miner about the relays that have been served.
servedRelays observable.Observable[*types.Relay]
Expand All @@ -73,7 +74,7 @@
clientCtx sdkclient.Context,
keyName string,
keyring keyring.Keyring,
nativeServicesListenAddress map[serviceId]string,
proxiedServicesEndpoints servicesEndpointsMap,
blockClient blocktypes.BlockClient,
) RelayerProxy {
accountQuerier := accounttypes.NewQueryClient(clientCtx)
Expand All @@ -82,50 +83,51 @@
servedRelays, servedRelaysProducer := channel.NewObservable[*types.Relay]()

return &relayerProxy{
blockClient: blockClient,
keyName: keyName,
keyring: keyring,
accountsQuerier: accountQuerier,
supplierQuerier: supplierQuerier,
sessionQuerier: sessionQuerier,
nativeServicesListenAddress: nativeServicesListenAddress,
servedRelays: servedRelays,
servedRelaysProducer: servedRelaysProducer,
clientCtx: clientCtx,
blockClient: blockClient,
keyName: keyName,
keyring: keyring,
accountsQuerier: accountQuerier,
supplierQuerier: supplierQuerier,
sessionQuerier: sessionQuerier,
proxiedServicesEndpoints: proxiedServicesEndpoints,
servedRelays: servedRelays,
servedRelaysProducer: servedRelaysProducer,
clientCtx: clientCtx,
}
}

// Start concurrently starts all advertised relay servers and returns an error if any of them fails to start.
// This method is blocking until all RelayServers are started.
func (rp *relayerProxy) Start(ctx context.Context) error {
// The provided services map is built from the supplier's on-chain advertised information,
// which is a runtime parameter that can be changed by the supplier.
// Build the provided services map at Start instead of NewRelayerProxy to avoid having to
// NOTE: We build the provided services map at Start instead of NewRelayerProxy to avoid having to
// return an error from the constructor.
err := rp.BuildProvidedServices(ctx)
if err != nil {
if err := rp.BuildProvidedServices(ctx); err != nil {
return err
}

startGroup, gctx := errgroup.WithContext(ctx)
startGroup, ctx := errgroup.WithContext(ctx)

for _, relayServer := range rp.advertisedRelayServers {
for _, svr := range relayServer {
server := svr // create a new variable scoped to the anonymous function
startGroup.Go(func() error { return server.Start(gctx) })
startGroup.Go(func() error { return server.Start(ctx) })
}
}

return startGroup.Wait()
}

// Stop concurrently stops all advertised relay servers and returns an error if any of them fails.
// This method is blocking until all RelayServers are stopped.
func (rp *relayerProxy) Stop(ctx context.Context) error {
stopGroup, gctx := errgroup.WithContext(ctx)
stopGroup, ctx := errgroup.WithContext(ctx)

for _, providedService := range rp.advertisedRelayServers {
for _, svr := range providedService {
server := svr // create a new variable scoped to the anonymous function
stopGroup.Go(func() error { return server.Stop(gctx) })
stopGroup.Go(func() error { return server.Stop(ctx) })
}
}

Expand Down
Loading
Loading