From 33c4f69dd20259997890502b734bdd9610bad4ff Mon Sep 17 00:00:00 2001 From: bas-vk Date: Thu, 1 Aug 2024 14:38:02 +0200 Subject: [PATCH] node/rpc/sync: cancel sync sub when internal msg buffer is full (#596) --- core/node/rpc/service_test.go | 145 ++++++++++++++++++++++++ core/node/rpc/sync/client/local.go | 46 +++++--- core/node/rpc/sync/client/remote.go | 92 +++++++++++---- core/node/rpc/sync/client/syncer_set.go | 32 ++++-- core/node/rpc/sync/operation.go | 59 ++++++++-- 5 files changed, 322 insertions(+), 52 deletions(-) diff --git a/core/node/rpc/service_test.go b/core/node/rpc/service_test.go index 2f81122ce..c81b5c4e0 100644 --- a/core/node/rpc/service_test.go +++ b/core/node/rpc/service_test.go @@ -1563,3 +1563,148 @@ func TestStreamSyncPingPong(t *testing.T) { return slices.Equal(pings, pongs) }, 20*time.Second, 100*time.Millisecond, "didn't receive all pongs in reasonable time or out of order") } + +func TestSyncSubscriptionWithTooSlowClient(t *testing.T) { + t.SkipNow() + + var ( + req = require.New(t) + services = newServiceTester(t, serviceTesterOpts{numNodes: 5, start: true}) + client0 = services.testClient(0) + client1 = services.testClient(1) + ctx = services.ctx + wallets []*crypto.Wallet + users []*protocol.SyncCookie + channels []*protocol.SyncCookie + ) + + // create users that will join and add messages to channels. + for range 10 { + // Create user streams + wallet, err := crypto.NewWallet(ctx) + req.NoError(err, "new wallet") + syncCookie, _, err := createUser(ctx, wallet, client0, nil) + req.NoError(err, "create user") + + _, _, err = createUserDeviceKeyStream(ctx, wallet, client0, nil) + req.NoError(err) + + wallets = append(wallets, wallet) + users = append(users, syncCookie) + } + + // create a space and several channels in it + spaceID := testutils.FakeStreamId(STREAM_SPACE_BIN) + resspace, _, err := createSpace(ctx, wallets[0], client0, spaceID, nil) + req.NoError(err) + req.NotNil(resspace, "create space sync cookie") + + // create enough channels that they will be distributed among local and remote nodes + for range TestStreams { + channelId := testutils.FakeStreamId(STREAM_CHANNEL_BIN) + channel, _, err := createChannel(ctx, wallets[0], client0, spaceID, channelId, nil) + req.NoError(err) + req.NotNil(channel, "nil create channel sync cookie") + channels = append(channels, channel) + } + + // subscribe to channel updates + fmt.Printf("sync streams on node %s\n", services.nodes[1].address) + syncPos := append(users, channels...) + syncRes, err := client1.SyncStreams(ctx, connect.NewRequest(&protocol.SyncStreamsRequest{SyncPos: syncPos})) + req.NoError(err, "sync streams") + + syncRes.Receive() + syncID := syncRes.Msg().SyncId + t.Logf("subscription %s created on node: %s", syncID, services.nodes[1].address) + + subCancelled := make(chan struct{}) + + go func() { + // don't read from syncRes and simulate an inactive client. + // the node should drop the subscription when the internal buffer is full with events it can't deliver + <-time.After(10 * time.Second) + + _, err := client1.PingSync(ctx, connect.NewRequest(&protocol.PingSyncRequest{SyncId: syncID, Nonce: "ping"})) + req.Error(err, "sync must have been dropped") + + close(subCancelled) + }() + + // users join channels + channelsCount := len(channels) + for i, wallet := range wallets[1:] { + for c := range channelsCount { + channel := channels[c] + + miniBlockHashResp, err := client1.GetLastMiniblockHash( + ctx, + connect.NewRequest(&protocol.GetLastMiniblockHashRequest{StreamId: users[i+1].StreamId})) + + req.NoError(err, "get last miniblock hash") + + channelId, _ := StreamIdFromBytes(channel.GetStreamId()) + userJoin, err := events.MakeEnvelopeWithPayload( + wallet, + events.Make_UserPayload_Membership(protocol.MembershipOp_SO_JOIN, channelId, nil, spaceID[:]), + miniBlockHashResp.Msg.GetHash(), + ) + req.NoError(err) + + resp, err := client1.AddEvent( + ctx, + connect.NewRequest( + &protocol.AddEventRequest{ + StreamId: users[i+1].StreamId, + Event: userJoin, + }, + ), + ) + + req.NoError(err) + req.Nil(resp.Msg.GetError()) + } + } + + // send a bunch of messages and ensure that the sync op is cancelled because the client buffer becomes full + for i := range 10000 { + wallet := wallets[rand.Int()%len(wallets)] + channel := channels[rand.Int()%len(channels)] + msgContents := fmt.Sprintf("msg #%d", i) + + getStreamResp, err := client1.GetStream(ctx, connect.NewRequest(&protocol.GetStreamRequest{ + StreamId: channel.GetStreamId(), + Optional: false, + })) + req.NoError(err) + + message, err := events.MakeEnvelopeWithPayload( + wallet, + events.Make_ChannelPayload_Message(msgContents), + getStreamResp.Msg.GetStream().GetNextSyncCookie().GetPrevMiniblockHash(), + ) + req.NoError(err) + + _, err = client1.AddEvent( + ctx, + connect.NewRequest( + &protocol.AddEventRequest{ + StreamId: channel.GetStreamId(), + Event: message, + }, + ), + ) + + req.NoError(err) + } + + req.Eventuallyf(func() bool { + select { + case <-subCancelled: + fmt.Println("sub cancelled as expected") + return true + default: + return false + } + }, 20*time.Second, 100*time.Millisecond, "subscription not cancelled in reasonable time") +} diff --git a/core/node/rpc/sync/client/local.go b/core/node/rpc/sync/client/local.go index 4a6493f7e..b83e322fa 100644 --- a/core/node/rpc/sync/client/local.go +++ b/core/node/rpc/sync/client/local.go @@ -2,6 +2,7 @@ package client import ( "context" + "github.com/river-build/river/core/node/dlog" "sync" "github.com/ethereum/go-ethereum/common" @@ -12,7 +13,10 @@ import ( ) type localSyncer struct { - syncStreamCtx context.Context + globalSyncOpID string + + syncStreamCtx context.Context + cancelGlobalSyncOp context.CancelFunc streamCache events.StreamCache cookies []*SyncCookie @@ -25,18 +29,22 @@ type localSyncer struct { func newLocalSyncer( ctx context.Context, + globalSyncOpID string, + cancelGlobalSyncOp context.CancelFunc, localAddr common.Address, streamCache events.StreamCache, cookies []*SyncCookie, messages chan<- *SyncStreamsResponse, ) (*localSyncer, error) { return &localSyncer{ - syncStreamCtx: ctx, - streamCache: streamCache, - localAddr: localAddr, - cookies: cookies, - messages: messages, - activeStreams: make(map[StreamId]events.SyncStream), + globalSyncOpID: globalSyncOpID, + syncStreamCtx: ctx, + cancelGlobalSyncOp: cancelGlobalSyncOp, + streamCache: streamCache, + localAddr: localAddr, + cookies: cookies, + messages: messages, + activeStreams: make(map[StreamId]events.SyncStream), }, nil } @@ -84,9 +92,15 @@ func (s *localSyncer) RemoveStream(_ context.Context, streamID StreamId) (bool, // OnUpdate is called each time a new cookie is available for a stream func (s *localSyncer) OnUpdate(r *StreamAndCookie) { - s.messages <- &SyncStreamsResponse{ - SyncOp: SyncOp_SYNC_UPDATE, - Stream: r, + select { + case s.messages <- &SyncStreamsResponse{SyncOp: SyncOp_SYNC_UPDATE, Stream: r}: + return + case <-s.syncStreamCtx.Done(): + return + default: + log := dlog.FromCtx(s.syncStreamCtx) + log.Error("Cancel client sync operation - client buffer full", "syncId", s.globalSyncOpID) + s.cancelGlobalSyncOp() } } @@ -104,9 +118,15 @@ func (s *localSyncer) OnSyncError(error) { // OnStreamSyncDown is called when updates for a stream could not be given. func (s *localSyncer) OnStreamSyncDown(streamID StreamId) { - s.messages <- &SyncStreamsResponse{ - SyncOp: SyncOp_SYNC_DOWN, - StreamId: streamID[:], + select { + case s.messages <- &SyncStreamsResponse{SyncOp: SyncOp_SYNC_DOWN, StreamId: streamID[:]}: + return + case <-s.syncStreamCtx.Done(): + return + default: + log := dlog.FromCtx(s.syncStreamCtx) + log.Error("Cancel client sync operation - client buffer full", "syncId", s.globalSyncOpID) + s.cancelGlobalSyncOp() } } diff --git a/core/node/rpc/sync/client/remote.go b/core/node/rpc/sync/client/remote.go index 92bd6fcfb..1a4c04a2a 100644 --- a/core/node/rpc/sync/client/remote.go +++ b/core/node/rpc/sync/client/remote.go @@ -2,6 +2,7 @@ package client import ( "context" + "errors" "fmt" "sync" "sync/atomic" @@ -17,20 +18,22 @@ import ( ) type remoteSyncer struct { - syncStreamCtx context.Context - syncStreamCancel context.CancelFunc - syncID string - forwarderSyncID string - remoteAddr common.Address - client protocolconnect.StreamServiceClient - cookies []*SyncCookie - messages chan<- *SyncStreamsResponse - streams sync.Map - responseStream *connect.ServerStreamForClient[SyncStreamsResponse] + cancelGlobalSyncOp context.CancelFunc + syncStreamCtx context.Context + syncStreamCancel context.CancelFunc + syncID string + forwarderSyncID string + remoteAddr common.Address + client protocolconnect.StreamServiceClient + cookies []*SyncCookie + messages chan<- *SyncStreamsResponse + streams sync.Map + responseStream *connect.ServerStreamForClient[SyncStreamsResponse] } func newRemoteSyncer( ctx context.Context, + cancelGlobalSyncOp context.CancelFunc, forwarderSyncID string, remoteAddr common.Address, client protocolconnect.StreamServiceClient, @@ -66,14 +69,15 @@ func newRemoteSyncer( } s := &remoteSyncer{ - forwarderSyncID: forwarderSyncID, - syncStreamCtx: syncStreamCtx, - syncStreamCancel: syncStreamCancel, - client: client, - cookies: cookies, - messages: messages, - responseStream: responseStream, - remoteAddr: remoteAddr, + forwarderSyncID: forwarderSyncID, + cancelGlobalSyncOp: cancelGlobalSyncOp, + syncStreamCtx: syncStreamCtx, + syncStreamCancel: syncStreamCancel, + client: client, + cookies: cookies, + messages: messages, + responseStream: responseStream, + remoteAddr: remoteAddr, } s.syncID = responseStream.Msg().GetSyncId() @@ -87,6 +91,8 @@ func newRemoteSyncer( } func (s *remoteSyncer) Run() { + log := dlog.FromCtx(s.syncStreamCtx) + defer s.responseStream.Close() var latestMsgReceived atomic.Value @@ -105,10 +111,23 @@ func (s *remoteSyncer) Run() { res := s.responseStream.Msg() if res.GetSyncOp() == SyncOp_SYNC_UPDATE { - s.messages <- res + if err := s.sendSyncStreamResponseToClient(res); err != nil { + if !errors.Is(err, context.Canceled) { + log.Error("Cancel remote sync with client", "remote", s.remoteAddr, "err", err) + s.cancelGlobalSyncOp() + } + return + } } else if res.GetSyncOp() == SyncOp_SYNC_DOWN { if streamID, err := StreamIdFromBytes(res.GetStreamId()); err == nil { - s.messages <- res + if err := s.sendSyncStreamResponseToClient(res); err != nil { + if !errors.Is(err, context.Canceled) { + log.Error("Cancel remote sync with client", "remote", s.remoteAddr, "err", err) + s.cancelGlobalSyncOp() + } + return + } + s.streams.Delete(streamID) } } @@ -116,26 +135,45 @@ func (s *remoteSyncer) Run() { // stream interrupted while client didn't cancel sync -> remote is unavailable if s.syncStreamCtx.Err() == nil { - log := dlog.FromCtx(s.syncStreamCtx) log.Info("remote node disconnected", "remote", s.remoteAddr) s.streams.Range(func(key, value any) bool { streamID := key.(StreamId) - log.Debug("stream down", "syncId", s.forwarderSyncID, "remote", s.remoteAddr, "stream", streamID) - s.messages <- &SyncStreamsResponse{ - SyncOp: SyncOp_SYNC_DOWN, - StreamId: streamID[:], + + msg := &SyncStreamsResponse{SyncOp: SyncOp_SYNC_DOWN, StreamId: streamID[:]} + + if err := s.sendSyncStreamResponseToClient(msg); err != nil { + log.Error("Cancel remote sync with client", "remote", s.remoteAddr, "err", err) + s.cancelGlobalSyncOp() + return false } + return true }) } } +// sendSyncStreamResponseToClient tries to write msg to the client send message channel. +// If the channel is full or the sync operation is cancelled, the function returns an error. +func (s *remoteSyncer) sendSyncStreamResponseToClient(msg *SyncStreamsResponse) error { + select { + case s.messages <- msg: + return nil + case <-s.syncStreamCtx.Done(): + return s.syncStreamCtx.Err() + default: + return RiverError(Err_BUFFER_FULL, "Client sync subscription message channel is full"). + Tag("syncOpId", s.forwarderSyncID). + Func("sendSyncStreamResponseToClient") + } +} + // connectionAlive periodically pings remote to check if the connection is still alive. // if the remote can't be reach the sync stream is canceled. func (s *remoteSyncer) connectionAlive(latestMsgReceived *atomic.Value) { var ( + log = dlog.FromCtx(s.syncStreamCtx) // check every pingTicker if it's time to send a ping req to remote pingTicker = time.NewTicker(3 * time.Second) // don't send a ping req if there was activity within recentActivityInterval @@ -151,6 +189,7 @@ func (s *remoteSyncer) connectionAlive(latestMsgReceived *atomic.Value) { now := time.Now() lastMsgRecv := latestMsgReceived.Load().(time.Time) if lastMsgRecv.Add(recentActivityDeadline).Before(now) { // no recent activity -> conn dead + log.Warn("remote sync node time out", "remote", s.remoteAddr) s.syncStreamCancel() return } @@ -164,6 +203,9 @@ func (s *remoteSyncer) connectionAlive(latestMsgReceived *atomic.Value) { SyncId: s.syncID, Nonce: fmt.Sprintf("%d", now.Unix()), })); err != nil { + if !errors.Is(err, context.Canceled) { + log.Error("ping sync failed", "remote", s.remoteAddr, "err", err) + } s.syncStreamCancel() return } diff --git a/core/node/rpc/sync/client/syncer_set.go b/core/node/rpc/sync/client/syncer_set.go index e3a9a8d84..582b70197 100644 --- a/core/node/rpc/sync/client/syncer_set.go +++ b/core/node/rpc/sync/client/syncer_set.go @@ -28,6 +28,8 @@ type ( SyncerSet struct { // ctx is the root context for all syncers in this set and used to cancel them ctx context.Context + // globalSyncOpCtxCancel cancels ctx + globalSyncOpCtxCancel context.CancelFunc // syncID is the sync id as used between the client and this node syncID string // localNodeAddress is the node address for this stream node instance @@ -42,6 +44,8 @@ type ( syncerTasks sync.WaitGroup // muSyncers guards syncers and streamID2Syncer muSyncers sync.Mutex + // stopped holds an indication if the sync operation is stopped + stopped bool // syncers is the existing set of syncers, indexed by the syncer node address syncers map[common.Address]StreamsSyncer // streamID2Syncer maps from a stream to its syncer @@ -75,6 +79,7 @@ func (cs SyncCookieSet) AsSlice() []*SyncCookie { // are streamed to the client. func NewSyncers( ctx context.Context, + globalSyncOpCtxCancel context.CancelFunc, syncID string, streamCache events.StreamCache, nodeRegistry nodes.NodeRegistry, @@ -90,7 +95,8 @@ func NewSyncers( // instantiate background syncers for sync operation for nodeAddress, cookieSet := range cookies { if nodeAddress == localNodeAddress { // stream managed by this node - syncer, err := newLocalSyncer(ctx, localNodeAddress, streamCache, cookieSet.AsSlice(), messages) + syncer, err := newLocalSyncer( + ctx, syncID, globalSyncOpCtxCancel, localNodeAddress, streamCache, cookieSet.AsSlice(), messages) if err != nil { return nil, nil, err } @@ -101,7 +107,8 @@ func NewSyncers( return nil, nil, err } - syncer, err := newRemoteSyncer(ctx, syncID, nodeAddress, client, cookieSet.AsSlice(), messages) + syncer, err := newRemoteSyncer( + ctx, globalSyncOpCtxCancel, syncID, nodeAddress, client, cookieSet.AsSlice(), messages) if err != nil { return nil, nil, err } @@ -135,8 +142,13 @@ func (ss *SyncerSet) Run() { } ss.muSyncers.Unlock() - <-ss.ctx.Done() // sync cancelled by client or client conn dropped - ss.syncerTasks.Wait() // background syncers finished + <-ss.ctx.Done() // sync cancelled by client, client conn dropped or client send buffer full + + ss.muSyncers.Lock() + ss.stopped = true + ss.muSyncers.Unlock() + + ss.syncerTasks.Wait() // background syncers finished -> safe to close messages channel close(ss.messages) // close will cause the sync operation to send the SYNC_CLOSE message to the client } @@ -144,6 +156,10 @@ func (ss *SyncerSet) AddStream(ctx context.Context, nodeAddress common.Address, ss.muSyncers.Lock() defer ss.muSyncers.Unlock() + if ss.stopped { + return RiverError(Err_CANCELED, "Sync operation stopped", "syncId", ss.syncID) + } + if _, found := ss.streamID2Syncer[streamID]; found { return nil // stream is already part of sync operation } @@ -163,7 +179,9 @@ func (ss *SyncerSet) AddStream(ctx context.Context, nodeAddress common.Address, err error ) if nodeAddress == ss.localNodeAddress { - if syncer, err = newLocalSyncer(ss.ctx, ss.localNodeAddress, ss.streamCache, []*SyncCookie{cookie}, ss.messages); err != nil { + if syncer, err = newLocalSyncer( + ss.ctx, ss.syncID, ss.globalSyncOpCtxCancel, ss.localNodeAddress, + ss.streamCache, []*SyncCookie{cookie}, ss.messages); err != nil { return err } } else { @@ -171,7 +189,7 @@ func (ss *SyncerSet) AddStream(ctx context.Context, nodeAddress common.Address, if err != nil { return err } - if syncer, err = newRemoteSyncer(ss.ctx, ss.syncID, nodeAddress, client, []*SyncCookie{cookie}, ss.messages); err != nil { + if syncer, err = newRemoteSyncer(ss.ctx, ss.globalSyncOpCtxCancel, ss.syncID, nodeAddress, client, []*SyncCookie{cookie}, ss.messages); err != nil { return err } } @@ -188,10 +206,10 @@ func (ss *SyncerSet) startSyncer(syncer StreamsSyncer) { ss.syncerTasks.Add(1) go func() { syncer.Run() + ss.syncerTasks.Done() ss.muSyncers.Lock() delete(ss.syncers, syncer.Address()) ss.muSyncers.Unlock() - ss.syncerTasks.Done() }() } diff --git a/core/node/rpc/sync/operation.go b/core/node/rpc/sync/operation.go index 77dc9eb3c..9853004f7 100644 --- a/core/node/rpc/sync/operation.go +++ b/core/node/rpc/sync/operation.go @@ -2,6 +2,8 @@ package sync import ( "context" + "github.com/river-build/river/core/node/dlog" + "time" "connectrpc.com/connect" "github.com/ethereum/go-ethereum/common" @@ -21,7 +23,7 @@ type ( // ctx is the root context for this subscription, when expires the subscription and all background syncers are // cancelled ctx context.Context - // cancel sync operation + // cancel sync operation by expiring ctx cancel context.CancelFunc // commands holds incoming requests from the client to add/remove/cancel commands commands chan *subCommand @@ -39,6 +41,7 @@ type ( RmStreamReq *connect.Request[RemoveStreamFromSyncRequest] AddStreamReq *connect.Request[AddStreamToSyncRequest] PingReq *connect.Request[PingSyncRequest] + CancelReq *connect.Request[CancelSyncRequest] DebugDropStream shared.StreamId reply chan error } @@ -82,13 +85,17 @@ func (syncOp *StreamSyncOperation) Run( ) error { defer syncOp.cancel() + log := dlog.FromCtx(syncOp.ctx).With("syncId", syncOp.SyncID) + cookies, err := client.ValidateAndGroupSyncCookies(req.Msg.GetSyncPos()) if err != nil { return err } syncers, messages, err := client.NewSyncers( - syncOp.ctx, syncOp.SyncID, syncOp.streamCache, syncOp.nodeRegistry, syncOp.thisNodeAddress, cookies) + syncOp.ctx, syncOp.cancel, syncOp.SyncID, syncOp.streamCache, + syncOp.nodeRegistry, syncOp.thisNodeAddress, cookies) + if err != nil { return err } @@ -98,7 +105,7 @@ func (syncOp *StreamSyncOperation) Run( for { select { case msg, ok := <-messages: - if !ok { // messages is closed in syncers when syncOp.ctx is cancelled + if !ok { _ = res.Send(&SyncStreamsResponse{ SyncId: syncOp.SyncID, SyncOp: SyncOp_SYNC_CLOSE, @@ -108,9 +115,13 @@ func (syncOp *StreamSyncOperation) Run( msg.SyncId = syncOp.SyncID if err = res.Send(msg); err != nil { + log.Error("Unable to send sync stream update to client", "err", err) return err } + case <-syncOp.ctx.Done(): + return nil + case cmd := <-syncOp.commands: if cmd.AddStreamReq != nil { nodeAddress := common.BytesToAddress(cmd.AddStreamReq.Msg.GetSyncPos().GetNodeAddress()) @@ -136,6 +147,16 @@ func (syncOp *StreamSyncOperation) Run( cmd.Reply(err) } else if cmd.DebugDropStream != (shared.StreamId{}) { cmd.Reply(syncers.DebugDropStream(cmd.Ctx, cmd.DebugDropStream)) + } else if cmd.CancelReq != nil { + syncOp.cancel() + + _ = res.Send(&SyncStreamsResponse{ + SyncId: syncOp.SyncID, + SyncOp: SyncOp_SYNC_CLOSE, + }) + + cmd.Reply(nil) + return nil } } } @@ -184,16 +205,36 @@ func (syncOp *StreamSyncOperation) RemoveStreamFromSync( } func (syncOp *StreamSyncOperation) CancelSync( - _ context.Context, + ctx context.Context, req *connect.Request[CancelSyncRequest], ) (*connect.Response[CancelSyncResponse], error) { if req.Msg.GetSyncId() != syncOp.SyncID { - return nil, RiverError(Err_INVALID_ARGUMENT, "invalid syncId").Tag("syncId", req.Msg.GetSyncId()) + return nil, RiverError(Err_INVALID_ARGUMENT, "invalid syncId"). + Tag("syncId", req.Msg.GetSyncId()) + } + + cmd := &subCommand{ + Ctx: ctx, + CancelReq: req, + reply: make(chan error, 1), } - syncOp.cancel() + timeout := time.After(15 * time.Second) - return connect.NewResponse(&CancelSyncResponse{}), nil + select { + case syncOp.commands <- cmd: + select { + case err := <-cmd.reply: + if err == nil { + return connect.NewResponse(&CancelSyncResponse{}), nil + } + return nil, err + case <-timeout: + return nil, RiverError(Err_UNAVAILABLE, "sync operation command queue full") + } + case <-timeout: + return nil, RiverError(Err_UNAVAILABLE, "sync operation command queue full") + } } func (syncOp *StreamSyncOperation) PingSync( @@ -236,6 +277,10 @@ func (syncOp *StreamSyncOperation) process(cmd *subCommand) error { case <-syncOp.ctx.Done(): return RiverError(Err_CANCELED, "sync operation cancelled").Tags("syncId", syncOp.SyncID) } + case <-time.After(10 * time.Second): + err := RiverError(Err_DEADLINE_EXCEEDED, "sync operation command queue full").Tags("syncId", syncOp.SyncID) + dlog.FromCtx(syncOp.ctx).Error("Sync operation command queue full", "err", err) + return err case <-syncOp.ctx.Done(): return RiverError(Err_CANCELED, "sync operation cancelled").Tags("syncId", syncOp.SyncID) }