Skip to content

Commit

Permalink
node/rpc/sync: cancel sync sub when internal msg buffer is full (#596)
Browse files Browse the repository at this point in the history
  • Loading branch information
bas-vk authored Aug 1, 2024
1 parent 38988a5 commit 33c4f69
Show file tree
Hide file tree
Showing 5 changed files with 322 additions and 52 deletions.
145 changes: 145 additions & 0 deletions core/node/rpc/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1563,3 +1563,148 @@ func TestStreamSyncPingPong(t *testing.T) {
return slices.Equal(pings, pongs)
}, 20*time.Second, 100*time.Millisecond, "didn't receive all pongs in reasonable time or out of order")
}

func TestSyncSubscriptionWithTooSlowClient(t *testing.T) {
t.SkipNow()

var (
req = require.New(t)
services = newServiceTester(t, serviceTesterOpts{numNodes: 5, start: true})
client0 = services.testClient(0)
client1 = services.testClient(1)
ctx = services.ctx
wallets []*crypto.Wallet
users []*protocol.SyncCookie
channels []*protocol.SyncCookie
)

// create users that will join and add messages to channels.
for range 10 {
// Create user streams
wallet, err := crypto.NewWallet(ctx)
req.NoError(err, "new wallet")
syncCookie, _, err := createUser(ctx, wallet, client0, nil)
req.NoError(err, "create user")

_, _, err = createUserDeviceKeyStream(ctx, wallet, client0, nil)
req.NoError(err)

wallets = append(wallets, wallet)
users = append(users, syncCookie)
}

// create a space and several channels in it
spaceID := testutils.FakeStreamId(STREAM_SPACE_BIN)
resspace, _, err := createSpace(ctx, wallets[0], client0, spaceID, nil)
req.NoError(err)
req.NotNil(resspace, "create space sync cookie")

// create enough channels that they will be distributed among local and remote nodes
for range TestStreams {
channelId := testutils.FakeStreamId(STREAM_CHANNEL_BIN)
channel, _, err := createChannel(ctx, wallets[0], client0, spaceID, channelId, nil)
req.NoError(err)
req.NotNil(channel, "nil create channel sync cookie")
channels = append(channels, channel)
}

// subscribe to channel updates
fmt.Printf("sync streams on node %s\n", services.nodes[1].address)
syncPos := append(users, channels...)
syncRes, err := client1.SyncStreams(ctx, connect.NewRequest(&protocol.SyncStreamsRequest{SyncPos: syncPos}))
req.NoError(err, "sync streams")

syncRes.Receive()
syncID := syncRes.Msg().SyncId
t.Logf("subscription %s created on node: %s", syncID, services.nodes[1].address)

subCancelled := make(chan struct{})

go func() {
// don't read from syncRes and simulate an inactive client.
// the node should drop the subscription when the internal buffer is full with events it can't deliver
<-time.After(10 * time.Second)

_, err := client1.PingSync(ctx, connect.NewRequest(&protocol.PingSyncRequest{SyncId: syncID, Nonce: "ping"}))
req.Error(err, "sync must have been dropped")

close(subCancelled)
}()

// users join channels
channelsCount := len(channels)
for i, wallet := range wallets[1:] {
for c := range channelsCount {
channel := channels[c]

miniBlockHashResp, err := client1.GetLastMiniblockHash(
ctx,
connect.NewRequest(&protocol.GetLastMiniblockHashRequest{StreamId: users[i+1].StreamId}))

req.NoError(err, "get last miniblock hash")

channelId, _ := StreamIdFromBytes(channel.GetStreamId())
userJoin, err := events.MakeEnvelopeWithPayload(
wallet,
events.Make_UserPayload_Membership(protocol.MembershipOp_SO_JOIN, channelId, nil, spaceID[:]),
miniBlockHashResp.Msg.GetHash(),
)
req.NoError(err)

resp, err := client1.AddEvent(
ctx,
connect.NewRequest(
&protocol.AddEventRequest{
StreamId: users[i+1].StreamId,
Event: userJoin,
},
),
)

req.NoError(err)
req.Nil(resp.Msg.GetError())
}
}

// send a bunch of messages and ensure that the sync op is cancelled because the client buffer becomes full
for i := range 10000 {
wallet := wallets[rand.Int()%len(wallets)]
channel := channels[rand.Int()%len(channels)]
msgContents := fmt.Sprintf("msg #%d", i)

getStreamResp, err := client1.GetStream(ctx, connect.NewRequest(&protocol.GetStreamRequest{
StreamId: channel.GetStreamId(),
Optional: false,
}))
req.NoError(err)

message, err := events.MakeEnvelopeWithPayload(
wallet,
events.Make_ChannelPayload_Message(msgContents),
getStreamResp.Msg.GetStream().GetNextSyncCookie().GetPrevMiniblockHash(),
)
req.NoError(err)

_, err = client1.AddEvent(
ctx,
connect.NewRequest(
&protocol.AddEventRequest{
StreamId: channel.GetStreamId(),
Event: message,
},
),
)

req.NoError(err)
}

req.Eventuallyf(func() bool {
select {
case <-subCancelled:
fmt.Println("sub cancelled as expected")
return true
default:
return false
}
}, 20*time.Second, 100*time.Millisecond, "subscription not cancelled in reasonable time")
}
46 changes: 33 additions & 13 deletions core/node/rpc/sync/client/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package client

import (
"context"
"github.com/river-build/river/core/node/dlog"
"sync"

"github.com/ethereum/go-ethereum/common"
Expand All @@ -12,7 +13,10 @@ import (
)

type localSyncer struct {
syncStreamCtx context.Context
globalSyncOpID string

syncStreamCtx context.Context
cancelGlobalSyncOp context.CancelFunc

streamCache events.StreamCache
cookies []*SyncCookie
Expand All @@ -25,18 +29,22 @@ type localSyncer struct {

func newLocalSyncer(
ctx context.Context,
globalSyncOpID string,
cancelGlobalSyncOp context.CancelFunc,
localAddr common.Address,
streamCache events.StreamCache,
cookies []*SyncCookie,
messages chan<- *SyncStreamsResponse,
) (*localSyncer, error) {
return &localSyncer{
syncStreamCtx: ctx,
streamCache: streamCache,
localAddr: localAddr,
cookies: cookies,
messages: messages,
activeStreams: make(map[StreamId]events.SyncStream),
globalSyncOpID: globalSyncOpID,
syncStreamCtx: ctx,
cancelGlobalSyncOp: cancelGlobalSyncOp,
streamCache: streamCache,
localAddr: localAddr,
cookies: cookies,
messages: messages,
activeStreams: make(map[StreamId]events.SyncStream),
}, nil
}

Expand Down Expand Up @@ -84,9 +92,15 @@ func (s *localSyncer) RemoveStream(_ context.Context, streamID StreamId) (bool,

// OnUpdate is called each time a new cookie is available for a stream
func (s *localSyncer) OnUpdate(r *StreamAndCookie) {
s.messages <- &SyncStreamsResponse{
SyncOp: SyncOp_SYNC_UPDATE,
Stream: r,
select {
case s.messages <- &SyncStreamsResponse{SyncOp: SyncOp_SYNC_UPDATE, Stream: r}:
return
case <-s.syncStreamCtx.Done():
return
default:
log := dlog.FromCtx(s.syncStreamCtx)
log.Error("Cancel client sync operation - client buffer full", "syncId", s.globalSyncOpID)
s.cancelGlobalSyncOp()
}
}

Expand All @@ -104,9 +118,15 @@ func (s *localSyncer) OnSyncError(error) {

// OnStreamSyncDown is called when updates for a stream could not be given.
func (s *localSyncer) OnStreamSyncDown(streamID StreamId) {
s.messages <- &SyncStreamsResponse{
SyncOp: SyncOp_SYNC_DOWN,
StreamId: streamID[:],
select {
case s.messages <- &SyncStreamsResponse{SyncOp: SyncOp_SYNC_DOWN, StreamId: streamID[:]}:
return
case <-s.syncStreamCtx.Done():
return
default:
log := dlog.FromCtx(s.syncStreamCtx)
log.Error("Cancel client sync operation - client buffer full", "syncId", s.globalSyncOpID)
s.cancelGlobalSyncOp()
}
}

Expand Down
Loading

0 comments on commit 33c4f69

Please sign in to comment.