-
Notifications
You must be signed in to change notification settings - Fork 12
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[AppGate] Implement the MVP AppGateServer (#108)
Co-authored-by: h5law <[email protected]>
- Loading branch information
Showing
23 changed files
with
1,214 additions
and
91 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,147 @@ | ||
package cmd | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
"log" | ||
"net/http" | ||
"net/url" | ||
"os" | ||
"os/signal" | ||
|
||
"cosmossdk.io/depinject" | ||
cosmosclient "github.com/cosmos/cosmos-sdk/client" | ||
"github.com/cosmos/cosmos-sdk/client/flags" | ||
"github.com/spf13/cobra" | ||
|
||
"github.com/pokt-network/poktroll/pkg/appgateserver" | ||
blockclient "github.com/pokt-network/poktroll/pkg/client/block" | ||
eventsquery "github.com/pokt-network/poktroll/pkg/client/events_query" | ||
) | ||
|
||
var ( | ||
flagSigningKey string | ||
flagSelfSigning bool | ||
flagListeningEndpoint string | ||
flagCometWebsocketUrl string | ||
) | ||
|
||
func AppGateServerCmd() *cobra.Command { | ||
cmd := &cobra.Command{ | ||
Use: "appgate-server", | ||
Short: "Starts the AppGate server", | ||
Long: `Starts the AppGate server that listens for incoming relay requests and handles | ||
the necessary on-chain interactions (sessions, suppliers, etc) to receive the | ||
respective relay response. | ||
-- App Mode (Flag)- - | ||
If the server is started with a defined '--self-signing' flag, it will behave | ||
as an Application. Any incoming requests will be signed by using the private | ||
key and ring associated with the '--signing-key' flag. | ||
-- Gateway Mode (Flag)-- | ||
If the '--self-signing' flag is not provided, the server will behave as a Gateway. | ||
It will sign relays on behalf of any Application sending it relays, provided | ||
that the address associated with '--signing-key' has been delegated to. This is | ||
necessary for the application<->gateway ring signature to function. | ||
-- App Mode (HTTP) -- | ||
If an application doesn't provide the '--self-signing' flag, it can still send | ||
relays to the AppGate server and function as an Application, provided that: | ||
1. Each request contains the '?senderAddress=[address]' query parameter | ||
2. The key associated with the '--signing-key' flag belongs to the address | ||
provided in the request, otherwise the ring signature will not be valid.`, | ||
Args: cobra.NoArgs, | ||
RunE: runAppGateServer, | ||
} | ||
|
||
cmd.Flags().StringVar(&flagSigningKey, "signing-key", "", "The name of the key that will be used to sign relays") | ||
cmd.Flags().StringVar(&flagListeningEndpoint, "listening-endpoint", "http://localhost:42069", "The host and port that the appgate server will listen on") | ||
cmd.Flags().StringVar(&flagCometWebsocketUrl, "comet-websocket-url", "ws://localhost:36657/websocket", "The URL of the comet websocket endpoint to communicate with the pocket blockchain") | ||
cmd.Flags().BoolVar(&flagSelfSigning, "self-signing", false, "Whether the server should sign all incoming requests with its own ring (for applications)") | ||
|
||
cmd.Flags().String(flags.FlagKeyringBackend, "", "Select keyring's backend (os|file|kwallet|pass|test)") | ||
cmd.Flags().String(flags.FlagNode, "tcp://localhost:36657", "The URL of the comet tcp endpoint to communicate with the pocket blockchain") | ||
|
||
return cmd | ||
} | ||
|
||
func runAppGateServer(cmd *cobra.Command, _ []string) error { | ||
// Create a context that is canceled when the command is interrupted | ||
ctx, cancelCtx := context.WithCancel(cmd.Context()) | ||
defer cancelCtx() | ||
|
||
// Handle interrupts in a goroutine. | ||
go func() { | ||
sigCh := make(chan os.Signal, 1) | ||
signal.Notify(sigCh, os.Interrupt) | ||
|
||
// Block until we receive an interrupt or kill signal (OS-agnostic) | ||
<-sigCh | ||
log.Println("INFO: Interrupt signal received, shutting down...") | ||
|
||
// Signal goroutines to stop | ||
cancelCtx() | ||
}() | ||
|
||
// Parse the listening endpoint. | ||
listeningUrl, err := url.Parse(flagListeningEndpoint) | ||
if err != nil { | ||
return fmt.Errorf("failed to parse listening endpoint: %w", err) | ||
} | ||
|
||
// Setup the AppGate server dependencies. | ||
appGateServerDeps, err := setupAppGateServerDependencies(cmd, ctx, flagCometWebsocketUrl) | ||
if err != nil { | ||
return fmt.Errorf("failed to setup AppGate server dependencies: %w", err) | ||
} | ||
|
||
log.Println("INFO: Creating AppGate server...") | ||
|
||
// Create the AppGate server. | ||
appGateServer, err := appgateserver.NewAppGateServer( | ||
appGateServerDeps, | ||
appgateserver.WithSigningInformation(&appgateserver.SigningInformation{ | ||
// provide the name of the key to use for signing all incoming requests | ||
SigningKeyName: flagSigningKey, | ||
// provide whether the appgate server should sign all incoming requests | ||
// with its own ring (for applications) or not (for gateways) | ||
SelfSigning: flagSelfSigning, | ||
}), | ||
appgateserver.WithListeningUrl(listeningUrl), | ||
) | ||
if err != nil { | ||
return fmt.Errorf("failed to create AppGate server: %w", err) | ||
} | ||
|
||
log.Printf("INFO: Starting AppGate server, listening on %s...", listeningUrl.String()) | ||
|
||
// Start the AppGate server. | ||
if err := appGateServer.Start(ctx); err != nil && !errors.Is(err, http.ErrServerClosed) { | ||
return fmt.Errorf("failed to start app gate server: %w", err) | ||
} else if errors.Is(err, http.ErrServerClosed) { | ||
log.Println("INFO: AppGate server stopped") | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func setupAppGateServerDependencies(cmd *cobra.Command, ctx context.Context, cometWebsocketUrl string) (depinject.Config, error) { | ||
// Retrieve the client context for the chain interactions. | ||
clientCtx := cosmosclient.GetClientContextFromCmd(cmd) | ||
|
||
// Create the events client. | ||
eventsQueryClient := eventsquery.NewEventsQueryClient(flagCometWebsocketUrl) | ||
|
||
// Create the block client. | ||
log.Printf("INFO: Creating block client, using comet websocket URL: %s...", flagCometWebsocketUrl) | ||
deps := depinject.Supply(eventsQueryClient) | ||
blockClient, err := blockclient.NewBlockClient(ctx, deps, flagCometWebsocketUrl) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to create block client: %w", err) | ||
} | ||
|
||
// Return the dependencie config. | ||
return depinject.Supply(clientCtx, blockClient), nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
package appgateserver | ||
|
||
import ( | ||
"context" | ||
"log" | ||
"net/url" | ||
|
||
sessiontypes "github.com/pokt-network/poktroll/x/session/types" | ||
sharedtypes "github.com/pokt-network/poktroll/x/shared/types" | ||
) | ||
|
||
// TODO_IMPROVE: This implements a naive greedy approach that defaults to the | ||
// first available supplier. Future optimizations (e.g. Quality-of-Service) can be introduced here. | ||
// TODO(@h5law): Look into different endpoint selection depending on their suitability. | ||
// getRelayerUrl gets the URL of the relayer for the given service. | ||
func (app *appGateServer) getRelayerUrl( | ||
ctx context.Context, | ||
serviceId string, | ||
rpcType sharedtypes.RPCType, | ||
session *sessiontypes.Session, | ||
) (supplierUrl *url.URL, supplierAddress string, err error) { | ||
for _, supplier := range session.Suppliers { | ||
for _, service := range supplier.Services { | ||
// Skip services that don't match the requested serviceId. | ||
if service.Service.Id != serviceId { | ||
continue | ||
} | ||
|
||
for _, endpoint := range service.Endpoints { | ||
// Return the first endpoint url that matches the JSON RPC RpcType. | ||
if endpoint.RpcType == rpcType { | ||
supplierUrl, err := url.Parse(endpoint.Url) | ||
if err != nil { | ||
log.Printf("error parsing url: %s", err) | ||
continue | ||
} | ||
return supplierUrl, supplier.Address, nil | ||
} | ||
} | ||
} | ||
} | ||
|
||
// Return an error if no relayer endpoints were found. | ||
return nil, "", ErrAppGateNoRelayEndpoints | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
package appgateserver | ||
|
||
import sdkerrors "cosmossdk.io/errors" | ||
|
||
var ( | ||
codespace = "appgateserver" | ||
ErrAppGateInvalidRelayResponseSignature = sdkerrors.Register(codespace, 1, "invalid relay response signature") | ||
ErrAppGateNoRelayEndpoints = sdkerrors.Register(codespace, 2, "no relay endpoints found") | ||
ErrAppGateInvalidRequestURL = sdkerrors.Register(codespace, 3, "invalid request URL") | ||
ErrAppGateMissingAppAddress = sdkerrors.Register(codespace, 4, "missing application address") | ||
ErrAppGateMissingSigningInformation = sdkerrors.Register(codespace, 5, "missing app client signing information") | ||
ErrAppGateMissingListeningEndpoint = sdkerrors.Register(codespace, 6, "missing app client listening endpoint") | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,132 @@ | ||
package appgateserver | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"io" | ||
"log" | ||
"net/http" | ||
|
||
"github.com/cometbft/cometbft/crypto" | ||
|
||
"github.com/pokt-network/poktroll/x/service/types" | ||
sharedtypes "github.com/pokt-network/poktroll/x/shared/types" | ||
) | ||
|
||
// handleJSONRPCRelay handles JSON RPC relay requests. | ||
// It does everything from preparing, signing and sending the request. | ||
// It then blocks on the response to come back and forward it to the provided writer. | ||
func (app *appGateServer) handleJSONRPCRelay( | ||
ctx context.Context, | ||
appAddress, serviceId string, | ||
request *http.Request, | ||
writer http.ResponseWriter, | ||
) error { | ||
// Read the request body bytes. | ||
payloadBz, err := io.ReadAll(request.Body) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
// Create the relay request payload. | ||
relayRequestPayload := &types.RelayRequest_JsonRpcPayload{} | ||
relayRequestPayload.JsonRpcPayload.Unmarshal(payloadBz) | ||
|
||
session, err := app.getCurrentSession(ctx, appAddress, serviceId) | ||
if err != nil { | ||
return err | ||
} | ||
log.Printf("DEBUG: Current session ID: %s", session.SessionId) | ||
|
||
// Get a supplier URL and address for the given service and session. | ||
supplierUrl, supplierAddress, err := app.getRelayerUrl(ctx, serviceId, sharedtypes.RPCType_JSON_RPC, session) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
// Create the relay request. | ||
relayRequest := &types.RelayRequest{ | ||
Meta: &types.RelayRequestMetadata{ | ||
SessionHeader: session.Header, | ||
Signature: nil, // signature added below | ||
}, | ||
Payload: relayRequestPayload, | ||
} | ||
|
||
// Get the application's signer. | ||
signer, err := app.getRingSingerForAppAddress(ctx, appAddress) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
// Hash and sign the request's signable bytes. | ||
signableBz, err := relayRequest.GetSignableBytes() | ||
if err != nil { | ||
return err | ||
} | ||
|
||
hash := crypto.Sha256(signableBz) | ||
signature, err := signer.Sign(hash) | ||
if err != nil { | ||
return err | ||
} | ||
relayRequest.Meta.Signature = signature | ||
|
||
// Marshal the relay request to bytes and create a reader to be used as an HTTP request body. | ||
relayRequestBz, err := relayRequest.Marshal() | ||
if err != nil { | ||
return err | ||
} | ||
relayRequestReader := io.NopCloser(bytes.NewReader(relayRequestBz)) | ||
|
||
// Create the HTTP request to send the request to the relayer. | ||
relayHTTPRequest := &http.Request{ | ||
Method: request.Method, | ||
Header: request.Header, | ||
URL: supplierUrl, | ||
Body: relayRequestReader, | ||
} | ||
|
||
// Perform the HTTP request to the relayer. | ||
log.Printf("DEBUG: Sending signed relay request to %s", supplierUrl) | ||
relayHTTPResponse, err := http.DefaultClient.Do(relayHTTPRequest) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
// Read the response body bytes. | ||
relayResponseBz, err := io.ReadAll(relayHTTPResponse.Body) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
// Unmarshal the response bytes into a RelayResponse. | ||
relayResponse := &types.RelayResponse{} | ||
if err := relayResponse.Unmarshal(relayResponseBz); err != nil { | ||
return err | ||
} | ||
|
||
// Verify the response signature. We use the supplier address that we got from | ||
// the getRelayerUrl function since this is the address we are expecting to sign the response. | ||
// TODO_TECHDEBT: if the RelayResponse is an internal error response, we should not verify the signature | ||
// as in some relayer early failures, it may not be signed by the supplier. | ||
// TODO_IMPROVE: Add more logging & telemetry so we can get visibility and signal into | ||
// failed responses. | ||
log.Println("DEBUG: Verifying signed relay response from...") | ||
if err := app.verifyResponse(ctx, supplierAddress, relayResponse); err != nil { | ||
return err | ||
} | ||
|
||
// Marshal the response payload to bytes to be sent back to the application. | ||
var responsePayloadBz []byte | ||
if _, err = relayResponse.Payload.MarshalTo(responsePayloadBz); err != nil { | ||
return err | ||
} | ||
|
||
// Reply with the RelayResponse payload. | ||
if _, err := writer.Write(relayRequestBz); err != nil { | ||
return err | ||
} | ||
|
||
return nil | ||
} |
Oops, something went wrong.