-
Notifications
You must be signed in to change notification settings - Fork 12
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
c888c1d
commit 3cf4cd3
Showing
4 changed files
with
499 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,141 @@ | ||
package e2e | ||
|
||
import ( | ||
"context" | ||
"encoding/json" | ||
"net" | ||
"net/http" | ||
"sync" | ||
"testing" | ||
|
||
"github.com/gorilla/websocket" | ||
"github.com/stretchr/testify/require" | ||
"google.golang.org/grpc" | ||
"google.golang.org/grpc/credentials/insecure" | ||
|
||
coretypes "github.com/cometbft/cometbft/rpc/core/types" | ||
rpctypes "github.com/cometbft/cometbft/rpc/jsonrpc/types" | ||
|
||
"github.com/pokt-network/poktroll/testutil/integration" | ||
) | ||
|
||
// E2EApp wraps an integration.App and provides both gRPC and WebSocket servers for end-to-end testing | ||
type E2EApp struct { | ||
*integration.App | ||
grpcServer *grpc.Server | ||
grpcListener net.Listener | ||
wsServer *http.Server | ||
wsListener net.Listener | ||
httpServer *http.Server | ||
httpListener net.Listener | ||
wsUpgrader websocket.Upgrader | ||
wsConnections map[*websocket.Conn]map[string]struct{} // maps connections to their subscribed event queries | ||
wsConnMutex sync.RWMutex | ||
blockEventChan chan *coretypes.ResultEvent | ||
} | ||
|
||
// NewE2EApp creates a new E2EApp instance with integration.App, gRPC, and WebSocket servers | ||
func NewE2EApp(t *testing.T, opts ...integration.IntegrationAppOptionFn) *E2EApp { | ||
t.Helper() | ||
|
||
// Create the integration app | ||
app := integration.NewCompleteIntegrationApp(t, opts...) | ||
|
||
// Create listeners for gRPC, WebSocket, and HTTP | ||
grpcListener, err := net.Listen("tcp", "localhost:42069") | ||
require.NoError(t, err, "failed to create gRPC listener") | ||
|
||
wsListener, err := net.Listen("tcp", "localhost:6969") | ||
require.NoError(t, err, "failed to create WebSocket listener") | ||
|
||
httpListener, err := net.Listen("tcp", "localhost:42070") | ||
require.NoError(t, err, "failed to create HTTP listener") | ||
|
||
e2eApp := &E2EApp{ | ||
App: app, | ||
grpcListener: grpcListener, | ||
wsListener: wsListener, | ||
httpListener: httpListener, | ||
wsConnections: make(map[*websocket.Conn]map[string]struct{}), | ||
wsUpgrader: websocket.Upgrader{}, | ||
blockEventChan: make(chan *coretypes.ResultEvent, 1), | ||
} | ||
|
||
// Initialize and start gRPC server | ||
e2eApp.grpcServer = newGRPCServer(e2eApp, t) | ||
go func() { | ||
if err := e2eApp.grpcServer.Serve(grpcListener); err != nil { | ||
panic(err) | ||
} | ||
}() | ||
|
||
// Initialize and start WebSocket server | ||
e2eApp.wsServer = newWebSocketServer(e2eApp) | ||
go func() { | ||
if err := e2eApp.wsServer.Serve(wsListener); err != nil && err != http.ErrServerClosed { | ||
panic(err) | ||
} | ||
}() | ||
|
||
// Initialize and start HTTP server | ||
mux := http.NewServeMux() | ||
mux.HandleFunc("/", e2eApp.handleHTTP) | ||
e2eApp.httpServer = &http.Server{Handler: mux} | ||
go func() { | ||
if err := e2eApp.httpServer.Serve(httpListener); err != nil && err != http.ErrServerClosed { | ||
panic(err) | ||
} | ||
}() | ||
|
||
// Start event handling | ||
go e2eApp.handleBlockEvents(t) | ||
|
||
return e2eApp | ||
} | ||
|
||
// Close gracefully shuts down the E2EApp and its servers | ||
func (app *E2EApp) Close() error { | ||
app.grpcServer.GracefulStop() | ||
if err := app.wsServer.Close(); err != nil { | ||
return err | ||
} | ||
if err := app.httpServer.Close(); err != nil { | ||
return err | ||
} | ||
close(app.blockEventChan) | ||
return nil | ||
} | ||
|
||
// GetClientConn returns a gRPC client connection to the E2EApp's gRPC server | ||
func (app *E2EApp) GetClientConn(ctx context.Context) (*grpc.ClientConn, error) { | ||
return grpc.DialContext( | ||
ctx, | ||
app.grpcListener.Addr().String(), | ||
grpc.WithTransportCredentials(insecure.NewCredentials()), | ||
) | ||
} | ||
|
||
// GetWSEndpoint returns the WebSocket endpoint URL | ||
func (app *E2EApp) GetWSEndpoint() string { | ||
return "ws://" + app.wsListener.Addr().String() + "/websocket" | ||
} | ||
|
||
// handleHTTP handles incoming HTTP requests by responding with RPCResponse | ||
func (app *E2EApp) handleHTTP(w http.ResponseWriter, r *http.Request) { | ||
var req rpctypes.RPCRequest | ||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil { | ||
http.Error(w, err.Error(), http.StatusBadRequest) | ||
return | ||
} | ||
|
||
// Process the request - for now just return a basic response | ||
// TODO_IMPROVE: Implement proper CometBFT RPC endpoint handling | ||
response := rpctypes.RPCResponse{ | ||
JSONRPC: "2.0", | ||
ID: req.ID, | ||
Result: json.RawMessage(`{}`), | ||
} | ||
|
||
w.Header().Set("Content-Type", "application/json") | ||
json.NewEncoder(w).Encode(response) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
package e2e | ||
|
||
import ( | ||
"testing" | ||
|
||
"cosmossdk.io/depinject" | ||
"cosmossdk.io/math" | ||
sdkclient "github.com/cosmos/cosmos-sdk/client" | ||
cosmostx "github.com/cosmos/cosmos-sdk/client/tx" | ||
"github.com/cosmos/cosmos-sdk/crypto/hd" | ||
"github.com/cosmos/cosmos-sdk/crypto/keyring" | ||
cosmostypes "github.com/cosmos/cosmos-sdk/types" | ||
"github.com/stretchr/testify/require" | ||
|
||
"github.com/pokt-network/poktroll/app/volatile" | ||
"github.com/pokt-network/poktroll/pkg/client/block" | ||
"github.com/pokt-network/poktroll/pkg/client/events" | ||
"github.com/pokt-network/poktroll/pkg/client/query" | ||
"github.com/pokt-network/poktroll/pkg/client/tx" | ||
txtypes "github.com/pokt-network/poktroll/pkg/client/tx/types" | ||
"github.com/pokt-network/poktroll/testutil/testclient" | ||
gatewaytypes "github.com/pokt-network/poktroll/x/gateway/types" | ||
) | ||
|
||
func TestNewE2EApp(t *testing.T) { | ||
app := NewE2EApp(t) | ||
|
||
blockQueryClient, err := sdkclient.NewClientFromNode("tcp://127.0.0.1:42070") | ||
require.NoError(t, err) | ||
|
||
deps := depinject.Supply(app.QueryHelper(), blockQueryClient) | ||
|
||
sharedQueryClient, err := query.NewSharedQuerier(deps) | ||
require.NoError(t, err) | ||
|
||
sharedParams, err := sharedQueryClient.GetParams(app.GetSdkCtx()) | ||
require.NoError(t, err) | ||
|
||
t.Logf("shared params: %+v", sharedParams) | ||
|
||
eventsQueryClient := events.NewEventsQueryClient("ws://127.0.0.1:6969/websocket") | ||
deps = depinject.Configs(deps, depinject.Supply(eventsQueryClient)) | ||
blockClient, err := block.NewBlockClient(app.GetSdkCtx(), deps) | ||
require.NoError(t, err) | ||
|
||
keyRing := keyring.NewInMemory(app.GetCodec()) | ||
// TODO: add the gateway2 key... | ||
_, err = keyRing.NewAccount( | ||
"gateway2", | ||
"suffer wet jelly furnace cousin flip layer render finish frequent pledge feature economy wink like water disease final erase goat include apple state furnace", | ||
"", | ||
cosmostypes.FullFundraiserPath, | ||
hd.Secp256k1, | ||
) | ||
require.NoError(t, err) | ||
|
||
flagSet := testclient.NewLocalnetFlagSet(t) | ||
clientCtx := testclient.NewLocalnetClientCtx(t, flagSet).WithKeyring(keyRing) | ||
|
||
txFactory, err := cosmostx.NewFactoryCLI(clientCtx, flagSet) | ||
require.NoError(t, err) | ||
|
||
deps = depinject.Configs(deps, depinject.Supply(txtypes.Context(clientCtx), txFactory)) | ||
txContext, err := tx.NewTxContext(deps) | ||
require.NoError(t, err) | ||
|
||
deps = depinject.Configs(deps, depinject.Supply(blockClient, txContext)) | ||
txClient, err := tx.NewTxClient(app.GetSdkCtx(), deps, tx.WithSigningKeyName("gateway2")) | ||
require.NoError(t, err) | ||
|
||
eitherErr := txClient.SignAndBroadcast( | ||
app.GetSdkCtx(), | ||
gatewaytypes.NewMsgStakeGateway( | ||
"pokt15w3fhfyc0lttv7r585e2ncpf6t2kl9uh8rsnyz", | ||
cosmostypes.NewCoin(volatile.DenomuPOKT, math.NewInt(100000000)), | ||
), | ||
) | ||
err, errCh := eitherErr.SyncOrAsyncError() | ||
require.NoError(t, err) | ||
require.NoError(t, <-errCh) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
package e2e | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"strings" | ||
"testing" | ||
|
||
"google.golang.org/grpc" | ||
"google.golang.org/grpc/reflection" | ||
"google.golang.org/protobuf/proto" | ||
|
||
"github.com/cosmos/cosmos-sdk/baseapp" | ||
sdk "github.com/cosmos/cosmos-sdk/types" | ||
) | ||
|
||
// newGRPCServer creates and configures a new gRPC server for the E2EApp | ||
func newGRPCServer(app *E2EApp, t *testing.T) *grpc.Server { | ||
grpcServer := grpc.NewServer() | ||
reflection.Register(grpcServer) | ||
|
||
forwarder := &grpcForwarderServer{ | ||
queryHelper: app.QueryHelper(), | ||
app: app, | ||
t: t, | ||
} | ||
|
||
grpcServer.RegisterService(&grpc.ServiceDesc{ | ||
ServiceName: "cosmos.Service", | ||
HandlerType: (*interface{})(nil), | ||
Methods: []grpc.MethodDesc{}, | ||
Streams: []grpc.StreamDesc{}, | ||
Metadata: "", | ||
}, forwarder) | ||
|
||
return grpcServer | ||
} | ||
|
||
// grpcForwarderServer implements a generic gRPC service that forwards all queries | ||
// to the queryHelper and messages to the app | ||
type grpcForwarderServer struct { | ||
queryHelper *baseapp.QueryServiceTestHelper | ||
app *E2EApp | ||
t *testing.T | ||
} | ||
|
||
// Invoke implements the grpc.Server interface and forwards all requests appropriately | ||
func (s *grpcForwarderServer) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...grpc.CallOption) error { | ||
// Determine if this is a query or message based on the method name | ||
if isQuery(method) { | ||
return s.queryHelper.Invoke(ctx, method, args, reply) | ||
} | ||
|
||
// If it's not a query, treat it as a message | ||
msg, ok := args.(sdk.Msg) | ||
if !ok { | ||
return fmt.Errorf("expected sdk.Msg, got %T", args) | ||
} | ||
|
||
// Run the message through the app | ||
msgRes, err := s.app.RunMsg(s.t, msg) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
// Type assert the reply as a proto.Message | ||
protoReply, ok := reply.(proto.Message) | ||
if !ok { | ||
return fmt.Errorf("expected proto.Message, got %T", reply) | ||
} | ||
|
||
// Type assert the response as a proto.Message | ||
protoRes, ok := msgRes.(proto.Message) | ||
if !ok { | ||
return fmt.Errorf("expected proto.Message response, got %T", msgRes) | ||
} | ||
|
||
// Marshal the response to bytes | ||
resBz, err := proto.Marshal(protoRes) | ||
if err != nil { | ||
return fmt.Errorf("failed to marshal response: %w", err) | ||
} | ||
|
||
// Unmarshal into the reply | ||
return proto.Unmarshal(resBz, protoReply) | ||
} | ||
|
||
// NewStream implements the grpc.Server interface but is not used in this implementation | ||
func (s *grpcForwarderServer) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) { | ||
return nil, fmt.Errorf("streaming is not supported") | ||
} | ||
|
||
// isQuery returns true if the method name indicates this is a query request | ||
func isQuery(method string) bool { | ||
return strings.Contains(method, ".Query/") | ||
} |
Oops, something went wrong.