diff --git a/testutil/e2e/app.go b/testutil/e2e/app.go new file mode 100644 index 000000000..102d25323 --- /dev/null +++ b/testutil/e2e/app.go @@ -0,0 +1,141 @@ +package e2e + +import ( + "context" + "encoding/json" + "net" + "net/http" + "sync" + "testing" + + "github.com/gorilla/websocket" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + coretypes "github.com/cometbft/cometbft/rpc/core/types" + rpctypes "github.com/cometbft/cometbft/rpc/jsonrpc/types" + + "github.com/pokt-network/poktroll/testutil/integration" +) + +// E2EApp wraps an integration.App and provides both gRPC and WebSocket servers for end-to-end testing +type E2EApp struct { + *integration.App + grpcServer *grpc.Server + grpcListener net.Listener + wsServer *http.Server + wsListener net.Listener + httpServer *http.Server + httpListener net.Listener + wsUpgrader websocket.Upgrader + wsConnections map[*websocket.Conn]map[string]struct{} // maps connections to their subscribed event queries + wsConnMutex sync.RWMutex + blockEventChan chan *coretypes.ResultEvent +} + +// NewE2EApp creates a new E2EApp instance with integration.App, gRPC, and WebSocket servers +func NewE2EApp(t *testing.T, opts ...integration.IntegrationAppOptionFn) *E2EApp { + t.Helper() + + // Create the integration app + app := integration.NewCompleteIntegrationApp(t, opts...) + + // Create listeners for gRPC, WebSocket, and HTTP + grpcListener, err := net.Listen("tcp", "localhost:42069") + require.NoError(t, err, "failed to create gRPC listener") + + wsListener, err := net.Listen("tcp", "localhost:6969") + require.NoError(t, err, "failed to create WebSocket listener") + + httpListener, err := net.Listen("tcp", "localhost:42070") + require.NoError(t, err, "failed to create HTTP listener") + + e2eApp := &E2EApp{ + App: app, + grpcListener: grpcListener, + wsListener: wsListener, + httpListener: httpListener, + wsConnections: make(map[*websocket.Conn]map[string]struct{}), + wsUpgrader: websocket.Upgrader{}, + blockEventChan: make(chan *coretypes.ResultEvent, 1), + } + + // Initialize and start gRPC server + e2eApp.grpcServer = newGRPCServer(e2eApp, t) + go func() { + if err := e2eApp.grpcServer.Serve(grpcListener); err != nil { + panic(err) + } + }() + + // Initialize and start WebSocket server + e2eApp.wsServer = newWebSocketServer(e2eApp) + go func() { + if err := e2eApp.wsServer.Serve(wsListener); err != nil && err != http.ErrServerClosed { + panic(err) + } + }() + + // Initialize and start HTTP server + mux := http.NewServeMux() + mux.HandleFunc("/", e2eApp.handleHTTP) + e2eApp.httpServer = &http.Server{Handler: mux} + go func() { + if err := e2eApp.httpServer.Serve(httpListener); err != nil && err != http.ErrServerClosed { + panic(err) + } + }() + + // Start event handling + go e2eApp.handleBlockEvents(t) + + return e2eApp +} + +// Close gracefully shuts down the E2EApp and its servers +func (app *E2EApp) Close() error { + app.grpcServer.GracefulStop() + if err := app.wsServer.Close(); err != nil { + return err + } + if err := app.httpServer.Close(); err != nil { + return err + } + close(app.blockEventChan) + return nil +} + +// GetClientConn returns a gRPC client connection to the E2EApp's gRPC server +func (app *E2EApp) GetClientConn(ctx context.Context) (*grpc.ClientConn, error) { + return grpc.DialContext( + ctx, + app.grpcListener.Addr().String(), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) +} + +// GetWSEndpoint returns the WebSocket endpoint URL +func (app *E2EApp) GetWSEndpoint() string { + return "ws://" + app.wsListener.Addr().String() + "/websocket" +} + +// handleHTTP handles incoming HTTP requests by responding with RPCResponse +func (app *E2EApp) handleHTTP(w http.ResponseWriter, r *http.Request) { + var req rpctypes.RPCRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + // Process the request - for now just return a basic response + // TODO_IMPROVE: Implement proper CometBFT RPC endpoint handling + response := rpctypes.RPCResponse{ + JSONRPC: "2.0", + ID: req.ID, + Result: json.RawMessage(`{}`), + } + + w.Header().Set("Content-Type", "application/json") + json.NewEncoder(w).Encode(response) +} diff --git a/testutil/e2e/app_test.go b/testutil/e2e/app_test.go new file mode 100644 index 000000000..515bebf5b --- /dev/null +++ b/testutil/e2e/app_test.go @@ -0,0 +1,81 @@ +package e2e + +import ( + "testing" + + "cosmossdk.io/depinject" + "cosmossdk.io/math" + sdkclient "github.com/cosmos/cosmos-sdk/client" + cosmostx "github.com/cosmos/cosmos-sdk/client/tx" + "github.com/cosmos/cosmos-sdk/crypto/hd" + "github.com/cosmos/cosmos-sdk/crypto/keyring" + cosmostypes "github.com/cosmos/cosmos-sdk/types" + "github.com/stretchr/testify/require" + + "github.com/pokt-network/poktroll/app/volatile" + "github.com/pokt-network/poktroll/pkg/client/block" + "github.com/pokt-network/poktroll/pkg/client/events" + "github.com/pokt-network/poktroll/pkg/client/query" + "github.com/pokt-network/poktroll/pkg/client/tx" + txtypes "github.com/pokt-network/poktroll/pkg/client/tx/types" + "github.com/pokt-network/poktroll/testutil/testclient" + gatewaytypes "github.com/pokt-network/poktroll/x/gateway/types" +) + +func TestNewE2EApp(t *testing.T) { + app := NewE2EApp(t) + + blockQueryClient, err := sdkclient.NewClientFromNode("tcp://127.0.0.1:42070") + require.NoError(t, err) + + deps := depinject.Supply(app.QueryHelper(), blockQueryClient) + + sharedQueryClient, err := query.NewSharedQuerier(deps) + require.NoError(t, err) + + sharedParams, err := sharedQueryClient.GetParams(app.GetSdkCtx()) + require.NoError(t, err) + + t.Logf("shared params: %+v", sharedParams) + + eventsQueryClient := events.NewEventsQueryClient("ws://127.0.0.1:6969/websocket") + deps = depinject.Configs(deps, depinject.Supply(eventsQueryClient)) + blockClient, err := block.NewBlockClient(app.GetSdkCtx(), deps) + require.NoError(t, err) + + keyRing := keyring.NewInMemory(app.GetCodec()) + // TODO: add the gateway2 key... + _, err = keyRing.NewAccount( + "gateway2", + "suffer wet jelly furnace cousin flip layer render finish frequent pledge feature economy wink like water disease final erase goat include apple state furnace", + "", + cosmostypes.FullFundraiserPath, + hd.Secp256k1, + ) + require.NoError(t, err) + + flagSet := testclient.NewLocalnetFlagSet(t) + clientCtx := testclient.NewLocalnetClientCtx(t, flagSet).WithKeyring(keyRing) + + txFactory, err := cosmostx.NewFactoryCLI(clientCtx, flagSet) + require.NoError(t, err) + + deps = depinject.Configs(deps, depinject.Supply(txtypes.Context(clientCtx), txFactory)) + txContext, err := tx.NewTxContext(deps) + require.NoError(t, err) + + deps = depinject.Configs(deps, depinject.Supply(blockClient, txContext)) + txClient, err := tx.NewTxClient(app.GetSdkCtx(), deps, tx.WithSigningKeyName("gateway2")) + require.NoError(t, err) + + eitherErr := txClient.SignAndBroadcast( + app.GetSdkCtx(), + gatewaytypes.NewMsgStakeGateway( + "pokt15w3fhfyc0lttv7r585e2ncpf6t2kl9uh8rsnyz", + cosmostypes.NewCoin(volatile.DenomuPOKT, math.NewInt(100000000)), + ), + ) + err, errCh := eitherErr.SyncOrAsyncError() + require.NoError(t, err) + require.NoError(t, <-errCh) +} diff --git a/testutil/e2e/grpc_server.go b/testutil/e2e/grpc_server.go new file mode 100644 index 000000000..e5bbfdd76 --- /dev/null +++ b/testutil/e2e/grpc_server.go @@ -0,0 +1,96 @@ +package e2e + +import ( + "context" + "fmt" + "strings" + "testing" + + "google.golang.org/grpc" + "google.golang.org/grpc/reflection" + "google.golang.org/protobuf/proto" + + "github.com/cosmos/cosmos-sdk/baseapp" + sdk "github.com/cosmos/cosmos-sdk/types" +) + +// newGRPCServer creates and configures a new gRPC server for the E2EApp +func newGRPCServer(app *E2EApp, t *testing.T) *grpc.Server { + grpcServer := grpc.NewServer() + reflection.Register(grpcServer) + + forwarder := &grpcForwarderServer{ + queryHelper: app.QueryHelper(), + app: app, + t: t, + } + + grpcServer.RegisterService(&grpc.ServiceDesc{ + ServiceName: "cosmos.Service", + HandlerType: (*interface{})(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{}, + Metadata: "", + }, forwarder) + + return grpcServer +} + +// grpcForwarderServer implements a generic gRPC service that forwards all queries +// to the queryHelper and messages to the app +type grpcForwarderServer struct { + queryHelper *baseapp.QueryServiceTestHelper + app *E2EApp + t *testing.T +} + +// Invoke implements the grpc.Server interface and forwards all requests appropriately +func (s *grpcForwarderServer) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...grpc.CallOption) error { + // Determine if this is a query or message based on the method name + if isQuery(method) { + return s.queryHelper.Invoke(ctx, method, args, reply) + } + + // If it's not a query, treat it as a message + msg, ok := args.(sdk.Msg) + if !ok { + return fmt.Errorf("expected sdk.Msg, got %T", args) + } + + // Run the message through the app + msgRes, err := s.app.RunMsg(s.t, msg) + if err != nil { + return err + } + + // Type assert the reply as a proto.Message + protoReply, ok := reply.(proto.Message) + if !ok { + return fmt.Errorf("expected proto.Message, got %T", reply) + } + + // Type assert the response as a proto.Message + protoRes, ok := msgRes.(proto.Message) + if !ok { + return fmt.Errorf("expected proto.Message response, got %T", msgRes) + } + + // Marshal the response to bytes + resBz, err := proto.Marshal(protoRes) + if err != nil { + return fmt.Errorf("failed to marshal response: %w", err) + } + + // Unmarshal into the reply + return proto.Unmarshal(resBz, protoReply) +} + +// NewStream implements the grpc.Server interface but is not used in this implementation +func (s *grpcForwarderServer) NewStream(ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption) (grpc.ClientStream, error) { + return nil, fmt.Errorf("streaming is not supported") +} + +// isQuery returns true if the method name indicates this is a query request +func isQuery(method string) bool { + return strings.Contains(method, ".Query/") +} diff --git a/testutil/e2e/ws_server.go b/testutil/e2e/ws_server.go new file mode 100644 index 000000000..c21187a28 --- /dev/null +++ b/testutil/e2e/ws_server.go @@ -0,0 +1,181 @@ +package e2e + +import ( + "encoding/json" + "net/http" + "strings" + "testing" + + "github.com/gorilla/websocket" + + coretypes "github.com/cometbft/cometbft/rpc/core/types" + rpctypes "github.com/cometbft/cometbft/rpc/jsonrpc/types" + sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/cosmos/cosmos-sdk/types/tx" +) + +// newWebSocketServer creates and configures a new WebSocket server for the E2EApp +func newWebSocketServer(app *E2EApp) *http.Server { + mux := http.NewServeMux() + mux.HandleFunc("/websocket", app.handleWebSocket) + return &http.Server{Handler: mux} +} + +// handleWebSocket handles incoming WebSocket connections and subscriptions +func (app *E2EApp) handleWebSocket(w http.ResponseWriter, r *http.Request) { + conn, err := app.wsUpgrader.Upgrade(w, r, nil) + if err != nil { + return + } + + app.wsConnMutex.Lock() + app.wsConnections[conn] = make(map[string]struct{}) + app.wsConnMutex.Unlock() + + go app.handleWebSocketConnection(conn) +} + +// handleWebSocketConnection handles messages from a specific WebSocket connection +func (app *E2EApp) handleWebSocketConnection(conn *websocket.Conn) { + defer func() { + app.wsConnMutex.Lock() + delete(app.wsConnections, conn) + app.wsConnMutex.Unlock() + conn.Close() + }() + + for { + _, message, err := conn.ReadMessage() + if err != nil { + return + } + + var req rpctypes.RPCRequest + if err := json.Unmarshal(message, &req); err != nil { + continue + } + + // Handle subscribe/unsubscribe requests + if req.Method == "subscribe" { + var params struct { + Query string `json:"query"` + } + if err := json.Unmarshal(req.Params, ¶ms); err != nil { + continue + } + + app.wsConnMutex.Lock() + app.wsConnections[conn][params.Query] = struct{}{} + app.wsConnMutex.Unlock() + + // Send subscription response + resp := rpctypes.RPCResponse{ + JSONRPC: "2.0", + ID: req.ID, + // TODO_IN_THIS_COMMIT: generate a mock result... + Result: json.RawMessage("{}"), + } + if err := conn.WriteJSON(resp); err != nil { + return + } + } + } +} + +// handleBlockEvents coordinates block finalization with WebSocket event broadcasting +func (app *E2EApp) handleBlockEvents(t *testing.T) { + for event := range app.blockEventChan { + app.wsConnMutex.RLock() + for conn, queries := range app.wsConnections { + // Check if connection is subscribed to this event type + for query := range queries { + if eventMatchesQuery(event, query) { + // Marshal the event to JSON + eventJSON, err := json.Marshal(event) + if err != nil { + t.Logf("failed to marshal event: %v", err) + continue + } + + response := rpctypes.RPCResponse{ + JSONRPC: "2.0", + ID: nil, // Events don't have an ID + Result: json.RawMessage(eventJSON), + } + + if err := conn.WriteJSON(response); err != nil { + app.wsConnMutex.RUnlock() + app.wsConnMutex.Lock() + delete(app.wsConnections, conn) + app.wsConnMutex.Unlock() + app.wsConnMutex.RLock() + continue + } + } + } + } + app.wsConnMutex.RUnlock() + } +} + +// TODO_IN_THIS_COMMIT: also wrap RunMsgs... +// TODO_IN_THIS_COMMIT: godoc... +// Override RunMsg to also emit transaction events via WebSocket +func (app *E2EApp) RunMsg(t *testing.T, msg sdk.Msg) (tx.MsgResponse, error) { + msgRes, err := app.App.RunMsg(t, msg) + if err != nil { + return nil, err + } + + // Create and emit block event with transaction results + blockEvent := createBlockEvent(app.GetSdkCtx(), msgRes) + app.blockEventChan <- blockEvent + + return msgRes, nil +} + +// createBlockEvent creates a CometBFT-compatible event from transaction results +func createBlockEvent(ctx *sdk.Context, msgRes tx.MsgResponse) *coretypes.ResultEvent { + // Convert SDK events to map[string][]string format that CometBFT expects + events := make(map[string][]string) + for _, event := range ctx.EventManager().Events() { + // Each event type becomes a key, and its attributes become the values + for _, attr := range event.Attributes { + if events[event.Type] == nil { + events[event.Type] = make([]string, 0) + } + events[event.Type] = append(events[event.Type], string(attr.Value)) + } + } + + return &coretypes.ResultEvent{ + Query: "tm.event='NewBlock'", + Data: map[string]interface{}{ + "height": ctx.BlockHeight(), + "hash": ctx.BlockHeader().LastBlockId.Hash, + "events": events, + // Add other relevant block and transaction data here as needed + }, + Events: events, + } +} + +//// createTxEvent creates a CometBFT-compatible event from transaction results +//func createTxEvent(tx *coretypes.ResultTx, index int) *coretypes.ResultEvent { +// return &coretypes.ResultEvent{ +// Query: "tm.event='Tx'", +// Data: map[string]interface{}{ +// "height": ctx.BlockHeight(), +// "hash": ctx.BlockHeader().LastBlockId.Hash, +// "events": events, +// // Add other relevant block and transaction data here as needed +// }, +// Events: events, +// } +//} + +// eventMatchesQuery checks if an event matches a subscription query +func eventMatchesQuery(event *coretypes.ResultEvent, query string) bool { + // Basic implementation - should be expanded to handle more complex queries + return strings.Contains(query, event.Query) +}