Skip to content

Commit

Permalink
[EventsReplayClient] Fix Replay Client Bugs (#267)
Browse files Browse the repository at this point in the history
  • Loading branch information
h5law authored Dec 23, 2023
1 parent ec93520 commit 06f8040
Show file tree
Hide file tree
Showing 25 changed files with 371 additions and 106 deletions.
28 changes: 20 additions & 8 deletions docs/static/openapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47176,7 +47176,7 @@ paths:
service:
title: >-
The Service for which the application is
configured for
configured
type: object
properties:
id:
Expand Down Expand Up @@ -47243,7 +47243,7 @@ paths:
service:
title: >-
The Service for which the supplier is
configured for
configured
type: object
properties:
id:
Expand Down Expand Up @@ -47931,12 +47931,14 @@ paths:
- GRPC
- WEBSOCKET
- JSON_RPC
- REST
default: UNKNOWN_RPC
description: |-
- UNKNOWN_RPC: Undefined RPC type
- GRPC: gRPC
- WEBSOCKET: WebSocket
- JSON_RPC: JSON-RPC
- REST: REST
configs:
type: array
items:
Expand Down Expand Up @@ -48080,12 +48082,14 @@ paths:
- GRPC
- WEBSOCKET
- JSON_RPC
- REST
default: UNKNOWN_RPC
description: |-
- UNKNOWN_RPC: Undefined RPC type
- GRPC: gRPC
- WEBSOCKET: WebSocket
- JSON_RPC: JSON-RPC
- REST: REST
configs:
type: array
items:
Expand Down Expand Up @@ -77536,7 +77540,7 @@ definitions:
type: object
properties:
service:
title: The Service for which the application is configured for
title: The Service for which the application is configured
type: object
properties:
id:
Expand Down Expand Up @@ -77600,7 +77604,7 @@ definitions:
type: object
properties:
service:
title: The Service for which the supplier is configured for
title: The Service for which the supplier is configured
type: object
properties:
id:
Expand Down Expand Up @@ -77790,7 +77794,7 @@ definitions:
type: object
properties:
service:
title: The Service for which the application is configured for
title: The Service for which the application is configured
type: object
properties:
id:
Expand Down Expand Up @@ -77852,7 +77856,7 @@ definitions:
type: object
properties:
service:
title: The Service for which the supplier is configured for
title: The Service for which the supplier is configured
type: object
properties:
id:
Expand Down Expand Up @@ -78032,6 +78036,7 @@ definitions:
- GRPC
- WEBSOCKET
- JSON_RPC
- REST
default: UNKNOWN_RPC
description: |-
- UNKNOWN_RPC: Undefined RPC type
Expand Down Expand Up @@ -78066,7 +78071,7 @@ definitions:
type: object
properties:
service:
title: The Service for which the supplier is configured for
title: The Service for which the supplier is configured
type: object
properties:
id:
Expand Down Expand Up @@ -78099,6 +78104,7 @@ definitions:
- GRPC
- WEBSOCKET
- JSON_RPC
- REST
default: UNKNOWN_RPC
description: |-
- UNKNOWN_RPC: Undefined RPC type
Expand Down Expand Up @@ -78156,6 +78162,7 @@ definitions:
- GRPC
- WEBSOCKET
- JSON_RPC
- REST
default: UNKNOWN_RPC
description: |-
- UNKNOWN_RPC: Undefined RPC type
Expand Down Expand Up @@ -78195,7 +78202,7 @@ definitions:
type: object
properties:
service:
title: The Service for which the supplier is configured for
title: The Service for which the supplier is configured
type: object
properties:
id:
Expand Down Expand Up @@ -78227,6 +78234,7 @@ definitions:
- GRPC
- WEBSOCKET
- JSON_RPC
- REST
default: UNKNOWN_RPC
description: |-
- UNKNOWN_RPC: Undefined RPC type
Expand Down Expand Up @@ -78481,12 +78489,14 @@ definitions:
- GRPC
- WEBSOCKET
- JSON_RPC
- REST
default: UNKNOWN_RPC
description: |-
- UNKNOWN_RPC: Undefined RPC type
- GRPC: gRPC
- WEBSOCKET: WebSocket
- JSON_RPC: JSON-RPC
- REST: REST
configs:
type: array
items:
Expand Down Expand Up @@ -78661,12 +78671,14 @@ definitions:
- GRPC
- WEBSOCKET
- JSON_RPC
- REST
default: UNKNOWN_RPC
description: |-
- UNKNOWN_RPC: Undefined RPC type
- GRPC: gRPC
- WEBSOCKET: WebSocket
- JSON_RPC: JSON-RPC
- REST: REST
configs:
type: array
items:
Expand Down
4 changes: 2 additions & 2 deletions pkg/appgateserver/cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,13 @@ func setupAppGateServerDependencies(
supplierFuncs := []config.SupplierFn{
config.NewSupplyLoggerFromCtx(ctx),
config.NewSupplyEventsQueryClientFn(queryNodeURL.Host), // leaf
config.NewSupplyBlockClientFn(queryNodeURL.Host), // leaf
config.NewSupplyBlockClientFn(), // leaf
config.NewSupplyQueryClientContextFn(queryNodeURL.String()), // leaf
config.NewSupplyAccountQuerierFn(), // leaf
config.NewSupplyApplicationQuerierFn(), // leaf
config.NewSupplySessionQuerierFn(), // leaf
config.NewSupplyRingCacheFn(),
config.NewSupplyPOKTRollSDKFn(queryNodeURL, appGateConfig.SigningKey),
config.NewSupplyPOKTRollSDKFn(appGateConfig.SigningKey),
}

return config.SupplyConfig(ctx, cmd, supplierFuncs)
Expand Down
2 changes: 0 additions & 2 deletions pkg/client/block/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,13 @@ const (
func NewBlockClient(
ctx context.Context,
deps depinject.Config,
cometWebsocketURL string,
) (client.BlockClient, error) {
client, err := events.NewEventsReplayClient[
client.Block,
client.EventsObservable[client.Block],
](
ctx,
deps,
cometWebsocketURL,
committedBlocksQuery,
newCometBlockEventFactoryFn(),
defaultBlocksReplayLimit,
Expand Down
4 changes: 2 additions & 2 deletions pkg/client/block/client_integration_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
//go:build integration

package block_test

import (
Expand All @@ -19,6 +17,7 @@ import (
const blockIntegrationSubTimeout = 5 * time.Second

func TestBlockClient_LastNBlocks(t *testing.T) {
t.Skip("TODO(@h5law): Figure out how to subscribe to events on the simulated localnet")
ctx := context.Background()

blockClient := testblock.NewLocalnetClient(ctx, t)
Expand All @@ -29,6 +28,7 @@ func TestBlockClient_LastNBlocks(t *testing.T) {
}

func TestBlockClient_BlocksObservable(t *testing.T) {
t.Skip("TODO(@h5law): Figure out how to subscribe to events on the simulated localnet")
ctx := context.Background()

blockClient := testblock.NewLocalnetClient(ctx, t)
Expand Down
3 changes: 1 addition & 2 deletions pkg/client/block/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (

"github.com/pokt-network/poktroll/pkg/client"
"github.com/pokt-network/poktroll/pkg/client/block"
"github.com/pokt-network/poktroll/testutil/testclient"
"github.com/pokt-network/poktroll/testutil/testclient/testeventsquery"
)

Expand Down Expand Up @@ -53,7 +52,7 @@ func TestBlockClient(t *testing.T) {
deps := depinject.Supply(eventsQueryClient)

// Set up block client.
blockClient, err := block.NewBlockClient(ctx, deps, testclient.CometLocalWebsocketURL)
blockClient, err := block.NewBlockClient(ctx, deps)
require.NoError(t, err)
require.NotNil(t, blockClient)

Expand Down
2 changes: 0 additions & 2 deletions pkg/client/delegation/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,13 @@ const (
func NewDelegationClient(
ctx context.Context,
deps depinject.Config,
cometWebsocketURL string,
) (client.DelegationClient, error) {
client, err := events.NewEventsReplayClient[
client.Redelegation,
client.EventsObservable[client.Redelegation],
](
ctx,
deps,
cometWebsocketURL,
delegationEventQuery,
newRedelegationEventFactoryFn(),
defaultRedelegationsReplayLimit,
Expand Down
16 changes: 7 additions & 9 deletions pkg/client/delegation/client_integration_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
//go:build integration

package delegation_test

// TODO(@h5law): Figure out how to use real components of the localnet
Expand All @@ -8,7 +6,7 @@ package delegation_test
// - Delegate to the gateway
// - Undelegate from the gateway
// Currently this test doesn't work, because (I think) it is using a mock
// keeper etc and this isnt actually interacting with the localnet where
// keeper etc and this isn't actually interacting with the localnet where
// the DelegationClient is listening for events from.

import (
Expand All @@ -34,10 +32,10 @@ const (
)

// TODO_UPNEXT(@h5law): Figure out the correct way to subscribe to events on the
// simulated localnet. Currently this test doesn't work. Although the block client
// subscribes it doesn't receive any events.
// simulated localnet. Currently this test doesn't work. Although the delegation
// client subscribes it doesn't receive any events.
func TestDelegationClient_RedelegationsObservables(t *testing.T) {
t.SkipNow()
t.Skip("TODO(@h5law): Figure out how to subscribe to events on the simulated localnet")
// Create the network with 2 applications and 1 gateway
net, appAddresses, gatewayAddr := createNetworkWithApplicationsAndGateways(t)
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -46,7 +44,7 @@ func TestDelegationClient_RedelegationsObservables(t *testing.T) {
// Create the delegation client
evtQueryClient := events.NewEventsQueryClient("ws://localhost:26657/websocket")
deps := depinject.Supply(evtQueryClient)
delegationClient, err := delegation.NewDelegationClient(ctx, deps, "ws://localhost:26657/websocket")
delegationClient, err := delegation.NewDelegationClient(ctx, deps)
require.NoError(t, err)
require.NotNil(t, delegationClient)
t.Cleanup(func() {
Expand Down Expand Up @@ -74,7 +72,7 @@ func TestDelegationClient_RedelegationsObservables(t *testing.T) {
// of the Redelegation event alternates between app1 and app2
if previousRedelegation != nil {
require.NotEqual(t, previousRedelegation.GetAppAddress(), change.GetAppAddress())
if previousRedelegation.AppAddress() == appAddresses[0] {
if previousRedelegation.GetAppAddress() == appAddresses[0] {
require.Equal(t, appAddresses[1], change.GetAppAddress())
} else {
require.Equal(t, appAddresses[0], change.GetAppAddress())
Expand Down Expand Up @@ -129,7 +127,7 @@ func TestDelegationClient_RedelegationsObservables(t *testing.T) {
}

// createNetworkWithApplicationsAndGateways creates a network with 2 applications
// and 1 gateway. It returns the network with all accoutns initialized via a
// and 1 gateway. It returns the network with all accounts initialized via a
// transaction from the first validator.
func createNetworkWithApplicationsAndGateways(
t *testing.T,
Expand Down
3 changes: 1 addition & 2 deletions pkg/client/delegation/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/pokt-network/poktroll/pkg/client"
"github.com/pokt-network/poktroll/pkg/client/delegation"
"github.com/pokt-network/poktroll/testutil/sample"
"github.com/pokt-network/poktroll/testutil/testclient"
"github.com/pokt-network/poktroll/testutil/testclient/testeventsquery"
apptypes "github.com/pokt-network/poktroll/x/application/types"
)
Expand Down Expand Up @@ -47,7 +46,7 @@ func TestDelegationClient(t *testing.T) {

// Set up delegation client.
// NB: the URL passed to `NewDelegationClient` is irrelevant here because `eventsQueryClient` is a mock.
delegationClient, err := delegation.NewDelegationClient(ctx, deps, testclient.CometLocalWebsocketURL)
delegationClient, err := delegation.NewDelegationClient(ctx, deps)
require.NoError(t, err)
require.NotNil(t, delegationClient)

Expand Down
2 changes: 1 addition & 1 deletion pkg/client/delegation/godoc.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Package delegation contains a light wrapper of the EventsReplayClient[DeelgateeChange]
// Package delegation contains a light wrapper of the EventsReplayClient[Redelegation]
// generic which listens for redelegation events on chain and emits them
// through a ReplayObservable. This enables consumers to listen for on-chain
// application redelegation events and react to them asynchronously.
Expand Down
1 change: 1 addition & 0 deletions pkg/client/events/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ var (
ErrEventsConnClosed = sdkerrors.Register(codespace, 2, "connection closed")
ErrEventsSubscribe = sdkerrors.Register(codespace, 3, "failed to subscribe to events")
ErrEventsUnmarshalEvent = sdkerrors.Register(codespace, 4, "failed to unmarshal event bytes")
ErrEventsConsClosed = sdkerrors.Register(codespace, 5, "eventsqueryclient connection closed")
)
5 changes: 3 additions & 2 deletions pkg/client/events/query_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ func (eqc *eventsQueryClient) goPublishEventsBz(
}

eqc.close()

return
}

Expand All @@ -244,8 +245,8 @@ func (eqc *eventsQueryClient) goUnsubscribeOnDone(
// Wait for the context to be done.
<-ctx.Done()
// Only close the eventsBytes for the given query.
eqc.eventsBytesAndConnsMu.RLock()
defer eqc.eventsBytesAndConnsMu.RUnlock()
eqc.eventsBytesAndConnsMu.Lock()
defer eqc.eventsBytesAndConnsMu.Unlock()

if eventsBzConn, ok := eqc.eventsBytesAndConns[query]; ok {
// Unsubscribe all observers of the given query's eventsBzConn's observable
Expand Down
Loading

0 comments on commit 06f8040

Please sign in to comment.