From b158e7a6df4af13a02ab4eaeff31fc7efcba3eb1 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Thu, 5 Dec 2024 10:30:12 -0500 Subject: [PATCH 1/6] Remove closed subs --- pkg/solana/client/multinode/client.go | 17 ++++++++++++++++- pkg/solana/client/multinode/client_test.go | 13 +++++++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/pkg/solana/client/multinode/client.go b/pkg/solana/client/multinode/client.go index 9753c26da..bf7db4e23 100644 --- a/pkg/solana/client/multinode/client.go +++ b/pkg/solana/client/multinode/client.go @@ -60,8 +60,24 @@ func (m *MultiNodeClient[RPC, HEAD]) LenSubs() int { return len(m.subs) } +// removeClosedSubscriptions removes any subscriptions that have been closed +func (m *MultiNodeClient[RPC, HEAD]) removeClosedSubscriptions() { + m.subsSliceMu.Lock() + defer m.subsSliceMu.Unlock() + for sub := range m.subs { + select { + case _, ok := <-sub.Err(): + if !ok { + delete(m.subs, sub) + } + default: + } + } +} + // RegisterSub adds the sub to the rpcClient list func (m *MultiNodeClient[RPC, HEAD]) RegisterSub(sub Subscription, stopInFLightCh chan struct{}) error { + defer m.removeClosedSubscriptions() m.subsSliceMu.Lock() defer m.subsSliceMu.Unlock() // ensure that the `sub` belongs to current life cycle of the `rpcClient` and it should not be killed due to @@ -72,7 +88,6 @@ func (m *MultiNodeClient[RPC, HEAD]) RegisterSub(sub Subscription, stopInFLightC return fmt.Errorf("failed to register subscription - all in-flight requests were canceled") default: } - // TODO: BCI-3358 - delete sub when caller unsubscribes. m.subs[sub] = struct{}{} return nil } diff --git a/pkg/solana/client/multinode/client_test.go b/pkg/solana/client/multinode/client_test.go index 5cb59f797..7b1992d20 100644 --- a/pkg/solana/client/multinode/client_test.go +++ b/pkg/solana/client/multinode/client_test.go @@ -94,6 +94,19 @@ func TestMultiNodeClient_HeadSubscriptions(t *testing.T) { t.Fatal("failed to receive finalized head: ", ctx.Err()) } }) + + t.Run("Remove Closed Subscriptions", func(t *testing.T) { + _, sub1, err := c.SubscribeToHeads(tests.Context(t)) + require.NoError(t, err) + require.Equal(t, 1, c.LenSubs()) + sub1.Unsubscribe() + + _, sub2, err := c.SubscribeToHeads(tests.Context(t)) + require.NoError(t, err) + defer sub2.Unsubscribe() + // Ensure sub1 was removed since it was closed + require.Equal(t, 1, c.LenSubs()) + }) } type mockSub struct { From 16416ef23d3779521ba073a969c3c6babe8b5583 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Thu, 5 Dec 2024 11:21:40 -0500 Subject: [PATCH 2/6] Add wrapped subscription --- pkg/solana/client/multinode/client.go | 51 ++++++++++++++-------- pkg/solana/client/multinode/client_test.go | 21 ++++----- 2 files changed, 43 insertions(+), 29 deletions(-) diff --git a/pkg/solana/client/multinode/client.go b/pkg/solana/client/multinode/client.go index bf7db4e23..62349fbef 100644 --- a/pkg/solana/client/multinode/client.go +++ b/pkg/solana/client/multinode/client.go @@ -37,6 +37,19 @@ type MultiNodeClient[RPC any, HEAD Head] struct { latestChainInfo ChainInfo } +// WrappedSubscription is used to ensure that the subscription is removed from the client when unsubscribed +type WrappedSubscription struct { + Subscription + removeSub func(sub Subscription) +} + +func (w *WrappedSubscription) Unsubscribe() { + w.Subscription.Unsubscribe() + if w.removeSub != nil { + w.removeSub(w) + } +} + func NewMultiNodeClient[RPC any, HEAD Head]( cfg *mnCfg.MultiNodeConfig, rpc *RPC, ctxTimeout time.Duration, log logger.Logger, latestBlock func(ctx context.Context, rpc *RPC) (HEAD, error), @@ -60,24 +73,14 @@ func (m *MultiNodeClient[RPC, HEAD]) LenSubs() int { return len(m.subs) } -// removeClosedSubscriptions removes any subscriptions that have been closed -func (m *MultiNodeClient[RPC, HEAD]) removeClosedSubscriptions() { +func (m *MultiNodeClient[RPC, HEAD]) removeSubscription(sub Subscription) { m.subsSliceMu.Lock() defer m.subsSliceMu.Unlock() - for sub := range m.subs { - select { - case _, ok := <-sub.Err(): - if !ok { - delete(m.subs, sub) - } - default: - } - } + delete(m.subs, sub) } -// RegisterSub adds the sub to the rpcClient list -func (m *MultiNodeClient[RPC, HEAD]) RegisterSub(sub Subscription, stopInFLightCh chan struct{}) error { - defer m.removeClosedSubscriptions() +// registerSub adds the sub to the rpcClient list +func (m *MultiNodeClient[RPC, HEAD]) registerSub(sub Subscription, stopInFLightCh chan struct{}) error { m.subsSliceMu.Lock() defer m.subsSliceMu.Unlock() // ensure that the `sub` belongs to current life cycle of the `rpcClient` and it should not be killed due to @@ -148,13 +151,18 @@ func (m *MultiNodeClient[RPC, HEAD]) SubscribeToHeads(ctx context.Context) (<-ch return nil, nil, err } - err := m.RegisterSub(&poller, chStopInFlight) + sub := &WrappedSubscription{ + Subscription: &poller, + removeSub: m.removeSubscription, + } + + err := m.registerSub(sub, chStopInFlight) if err != nil { - poller.Unsubscribe() + sub.Unsubscribe() return nil, nil, err } - return channel, &poller, nil + return channel, sub, nil } func (m *MultiNodeClient[RPC, HEAD]) SubscribeToFinalizedHeads(ctx context.Context) (<-chan HEAD, Subscription, error) { @@ -176,13 +184,18 @@ func (m *MultiNodeClient[RPC, HEAD]) SubscribeToFinalizedHeads(ctx context.Conte return nil, nil, err } - err := m.RegisterSub(&poller, chStopInFlight) + sub := &WrappedSubscription{ + Subscription: &poller, + removeSub: m.removeSubscription, + } + + err := m.registerSub(sub, chStopInFlight) if err != nil { poller.Unsubscribe() return nil, nil, err } - return channel, &poller, nil + return channel, sub, nil } func (m *MultiNodeClient[RPC, HEAD]) OnNewHead(ctx context.Context, requestCh <-chan struct{}, head HEAD) { diff --git a/pkg/solana/client/multinode/client_test.go b/pkg/solana/client/multinode/client_test.go index 7b1992d20..0869824c0 100644 --- a/pkg/solana/client/multinode/client_test.go +++ b/pkg/solana/client/multinode/client_test.go @@ -95,17 +95,18 @@ func TestMultiNodeClient_HeadSubscriptions(t *testing.T) { } }) - t.Run("Remove Closed Subscriptions", func(t *testing.T) { + t.Run("Remove Subscription on Unsubscribe", func(t *testing.T) { _, sub1, err := c.SubscribeToHeads(tests.Context(t)) require.NoError(t, err) require.Equal(t, 1, c.LenSubs()) - sub1.Unsubscribe() - - _, sub2, err := c.SubscribeToHeads(tests.Context(t)) + _, sub2, err := c.SubscribeToFinalizedHeads(tests.Context(t)) require.NoError(t, err) - defer sub2.Unsubscribe() - // Ensure sub1 was removed since it was closed + require.Equal(t, 2, c.LenSubs()) + + sub1.Unsubscribe() require.Equal(t, 1, c.LenSubs()) + sub2.Unsubscribe() + require.Equal(t, 0, c.LenSubs()) }) } @@ -129,7 +130,7 @@ func TestMultiNodeClient_RegisterSubs(t *testing.T) { t.Run("registerSub", func(t *testing.T) { sub := newMockSub() - err := c.RegisterSub(sub, make(chan struct{})) + err := c.registerSub(sub, make(chan struct{})) require.NoError(t, err) require.Equal(t, 1, c.LenSubs()) c.UnsubscribeAllExcept() @@ -139,7 +140,7 @@ func TestMultiNodeClient_RegisterSubs(t *testing.T) { chStopInFlight := make(chan struct{}) close(chStopInFlight) sub := newMockSub() - err := c.RegisterSub(sub, chStopInFlight) + err := c.registerSub(sub, chStopInFlight) require.Error(t, err) require.Equal(t, true, sub.unsubscribed) }) @@ -148,9 +149,9 @@ func TestMultiNodeClient_RegisterSubs(t *testing.T) { chStopInFlight := make(chan struct{}) sub1 := newMockSub() sub2 := newMockSub() - err := c.RegisterSub(sub1, chStopInFlight) + err := c.registerSub(sub1, chStopInFlight) require.NoError(t, err) - err = c.RegisterSub(sub2, chStopInFlight) + err = c.registerSub(sub2, chStopInFlight) require.NoError(t, err) require.Equal(t, 2, c.LenSubs()) From fc002f3a81f0c881b734a0c3b9fbe4c6060d5045 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Thu, 5 Dec 2024 13:51:53 -0500 Subject: [PATCH 3/6] Fix deadlock --- pkg/solana/client/multinode/client.go | 3 +++ pkg/solana/client/multinode/client_test.go | 10 ++++++++++ 2 files changed, 13 insertions(+) diff --git a/pkg/solana/client/multinode/client.go b/pkg/solana/client/multinode/client.go index 62349fbef..cbc17fe07 100644 --- a/pkg/solana/client/multinode/client.go +++ b/pkg/solana/client/multinode/client.go @@ -272,7 +272,10 @@ func (m *MultiNodeClient[RPC, HEAD]) UnsubscribeAllExcept(subs ...Subscription) for sub := range m.subs { if _, keep := keepSubs[sub]; !keep { + // Release lock to avoid deadlock on unsubscribe + m.subsSliceMu.Unlock() sub.Unsubscribe() + m.subsSliceMu.Lock() delete(m.subs, sub) } } diff --git a/pkg/solana/client/multinode/client_test.go b/pkg/solana/client/multinode/client_test.go index 0869824c0..17953222a 100644 --- a/pkg/solana/client/multinode/client_test.go +++ b/pkg/solana/client/multinode/client_test.go @@ -108,6 +108,16 @@ func TestMultiNodeClient_HeadSubscriptions(t *testing.T) { sub2.Unsubscribe() require.Equal(t, 0, c.LenSubs()) }) + + t.Run("Ensure no deadlock on UnsubscribeAll", func(t *testing.T) { + _, _, err := c.SubscribeToHeads(tests.Context(t)) + require.NoError(t, err) + require.Equal(t, 1, c.LenSubs()) + _, _, err = c.SubscribeToFinalizedHeads(tests.Context(t)) + require.NoError(t, err) + require.Equal(t, 2, c.LenSubs()) + c.UnsubscribeAllExcept() + }) } type mockSub struct { From b45500ae8c209e7afa84d153490e4208c35bbba7 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Fri, 6 Dec 2024 15:20:07 -0500 Subject: [PATCH 4/6] Update naming --- pkg/solana/client/multinode/client.go | 10 +++++----- pkg/solana/client/multinode/client_test.go | 1 + 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/pkg/solana/client/multinode/client.go b/pkg/solana/client/multinode/client.go index cbc17fe07..89ec11b71 100644 --- a/pkg/solana/client/multinode/client.go +++ b/pkg/solana/client/multinode/client.go @@ -37,13 +37,13 @@ type MultiNodeClient[RPC any, HEAD Head] struct { latestChainInfo ChainInfo } -// WrappedSubscription is used to ensure that the subscription is removed from the client when unsubscribed -type WrappedSubscription struct { +// ManagedSubscription is used to ensure that the subscription is removed from the client when unsubscribed +type ManagedSubscription struct { Subscription removeSub func(sub Subscription) } -func (w *WrappedSubscription) Unsubscribe() { +func (w *ManagedSubscription) Unsubscribe() { w.Subscription.Unsubscribe() if w.removeSub != nil { w.removeSub(w) @@ -151,7 +151,7 @@ func (m *MultiNodeClient[RPC, HEAD]) SubscribeToHeads(ctx context.Context) (<-ch return nil, nil, err } - sub := &WrappedSubscription{ + sub := &ManagedSubscription{ Subscription: &poller, removeSub: m.removeSubscription, } @@ -184,7 +184,7 @@ func (m *MultiNodeClient[RPC, HEAD]) SubscribeToFinalizedHeads(ctx context.Conte return nil, nil, err } - sub := &WrappedSubscription{ + sub := &ManagedSubscription{ Subscription: &poller, removeSub: m.removeSubscription, } diff --git a/pkg/solana/client/multinode/client_test.go b/pkg/solana/client/multinode/client_test.go index 17953222a..1cf915a45 100644 --- a/pkg/solana/client/multinode/client_test.go +++ b/pkg/solana/client/multinode/client_test.go @@ -117,6 +117,7 @@ func TestMultiNodeClient_HeadSubscriptions(t *testing.T) { require.NoError(t, err) require.Equal(t, 2, c.LenSubs()) c.UnsubscribeAllExcept() + require.Equal(t, 0, c.LenSubs()) }) } From d2c881da7970cb9dcbfc37d559f32ef4be1d9d15 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Fri, 6 Dec 2024 15:29:28 -0500 Subject: [PATCH 5/6] Add managed subscription --- pkg/solana/client/multinode/client.go | 33 ++++++++------------------- pkg/solana/client/multinode/types.go | 13 +++++++++++ 2 files changed, 23 insertions(+), 23 deletions(-) diff --git a/pkg/solana/client/multinode/client.go b/pkg/solana/client/multinode/client.go index 89ec11b71..cd5d32cef 100644 --- a/pkg/solana/client/multinode/client.go +++ b/pkg/solana/client/multinode/client.go @@ -37,19 +37,6 @@ type MultiNodeClient[RPC any, HEAD Head] struct { latestChainInfo ChainInfo } -// ManagedSubscription is used to ensure that the subscription is removed from the client when unsubscribed -type ManagedSubscription struct { - Subscription - removeSub func(sub Subscription) -} - -func (w *ManagedSubscription) Unsubscribe() { - w.Subscription.Unsubscribe() - if w.removeSub != nil { - w.removeSub(w) - } -} - func NewMultiNodeClient[RPC any, HEAD Head]( cfg *mnCfg.MultiNodeConfig, rpc *RPC, ctxTimeout time.Duration, log logger.Logger, latestBlock func(ctx context.Context, rpc *RPC) (HEAD, error), @@ -73,12 +60,6 @@ func (m *MultiNodeClient[RPC, HEAD]) LenSubs() int { return len(m.subs) } -func (m *MultiNodeClient[RPC, HEAD]) removeSubscription(sub Subscription) { - m.subsSliceMu.Lock() - defer m.subsSliceMu.Unlock() - delete(m.subs, sub) -} - // registerSub adds the sub to the rpcClient list func (m *MultiNodeClient[RPC, HEAD]) registerSub(sub Subscription, stopInFLightCh chan struct{}) error { m.subsSliceMu.Lock() @@ -95,6 +76,12 @@ func (m *MultiNodeClient[RPC, HEAD]) registerSub(sub Subscription, stopInFLightC return nil } +func (m *MultiNodeClient[RPC, HEAD]) removeSub(sub Subscription) { + m.subsSliceMu.Lock() + defer m.subsSliceMu.Unlock() + delete(m.subs, sub) +} + func (m *MultiNodeClient[RPC, HEAD]) LatestBlock(ctx context.Context) (HEAD, error) { // capture chStopInFlight to ensure we are not updating chainInfo with observations related to previous life cycle ctx, cancel, chStopInFlight, rpc := m.AcquireQueryCtx(ctx, m.ctxTimeout) @@ -152,8 +139,8 @@ func (m *MultiNodeClient[RPC, HEAD]) SubscribeToHeads(ctx context.Context) (<-ch } sub := &ManagedSubscription{ - Subscription: &poller, - removeSub: m.removeSubscription, + Subscription: &poller, + onUnsubscribe: m.removeSub, } err := m.registerSub(sub, chStopInFlight) @@ -185,8 +172,8 @@ func (m *MultiNodeClient[RPC, HEAD]) SubscribeToFinalizedHeads(ctx context.Conte } sub := &ManagedSubscription{ - Subscription: &poller, - removeSub: m.removeSubscription, + Subscription: &poller, + onUnsubscribe: m.removeSub, } err := m.registerSub(sub, chStopInFlight) diff --git a/pkg/solana/client/multinode/types.go b/pkg/solana/client/multinode/types.go index 51b70e573..5762fcaf5 100644 --- a/pkg/solana/client/multinode/types.go +++ b/pkg/solana/client/multinode/types.go @@ -32,6 +32,19 @@ type Subscription interface { Err() <-chan error } +// ManagedSubscription is a Subscription which contains an onUnsubscribe callback +type ManagedSubscription struct { + Subscription + onUnsubscribe func(sub Subscription) +} + +func (w *ManagedSubscription) Unsubscribe() { + w.Subscription.Unsubscribe() + if w.onUnsubscribe != nil { + w.onUnsubscribe(w) + } +} + // RPCClient includes all the necessary generalized RPC methods used by Node to perform health checks type RPCClient[ CHAIN_ID ID, From ed7c35fad200e1740d071cb649d3826b5b7722c2 Mon Sep 17 00:00:00 2001 From: Dylan Tinianov Date: Wed, 11 Dec 2024 11:00:32 -0500 Subject: [PATCH 6/6] Fix tests --- pkg/solana/client/multinode/adaptor_test.go | 37 ++++++++++++++++----- 1 file changed, 29 insertions(+), 8 deletions(-) diff --git a/pkg/solana/client/multinode/adaptor_test.go b/pkg/solana/client/multinode/adaptor_test.go index 94e551337..35361e121 100644 --- a/pkg/solana/client/multinode/adaptor_test.go +++ b/pkg/solana/client/multinode/adaptor_test.go @@ -116,6 +116,7 @@ func TestMultiNodeClient_HeadSubscriptions(t *testing.T) { }) t.Run("Remove Subscription on Unsubscribe", func(t *testing.T) { + c := newTestClient(t) _, sub1, err := c.SubscribeToHeads(tests.Context(t)) require.NoError(t, err) require.Equal(t, 1, c.LenSubs()) @@ -130,6 +131,7 @@ func TestMultiNodeClient_HeadSubscriptions(t *testing.T) { }) t.Run("Ensure no deadlock on UnsubscribeAll", func(t *testing.T) { + c := newTestClient(t) _, _, err := c.SubscribeToHeads(tests.Context(t)) require.NoError(t, err) require.Equal(t, 1, c.LenSubs()) @@ -159,41 +161,60 @@ func (s *mockSub) Err() <-chan error { func TestMultiNodeClient_RegisterSubs(t *testing.T) { t.Run("registerSub", func(t *testing.T) { c := newTestClient(t) - sub := newMockSub() + mockSub := newMockSub() + sub := &ManagedSubscription{ + Subscription: mockSub, + onUnsubscribe: c.removeSub, + } err := c.registerSub(sub, make(chan struct{})) require.NoError(t, err) require.Equal(t, 1, c.LenSubs()) c.UnsubscribeAllExcept() + require.Equal(t, 0, c.LenSubs()) + require.Equal(t, true, mockSub.unsubscribed) }) t.Run("chStopInFlight returns error and unsubscribes", func(t *testing.T) { c := newTestClient(t) chStopInFlight := make(chan struct{}) close(chStopInFlight) - sub := newMockSub() + mockSub := newMockSub() + sub := &ManagedSubscription{ + Subscription: mockSub, + onUnsubscribe: c.removeSub, + } err := c.registerSub(sub, chStopInFlight) require.Error(t, err) - require.Equal(t, true, sub.unsubscribed) + require.Equal(t, true, mockSub.unsubscribed) }) t.Run("UnsubscribeAllExcept", func(t *testing.T) { c := newTestClient(t) chStopInFlight := make(chan struct{}) - sub1 := newMockSub() - sub2 := newMockSub() + mockSub1 := newMockSub() + sub1 := &ManagedSubscription{ + Subscription: mockSub1, + onUnsubscribe: c.removeSub, + } + mockSub2 := newMockSub() + sub2 := &ManagedSubscription{ + Subscription: mockSub2, + onUnsubscribe: c.removeSub, + } err := c.registerSub(sub1, chStopInFlight) require.NoError(t, err) err = c.registerSub(sub2, chStopInFlight) require.NoError(t, err) require.Equal(t, 2, c.LenSubs()) + // Ensure passed sub is not removed c.UnsubscribeAllExcept(sub1) require.Equal(t, 1, c.LenSubs()) - require.Equal(t, true, sub2.unsubscribed) - require.Equal(t, false, sub1.unsubscribed) + require.Equal(t, true, mockSub2.unsubscribed) + require.Equal(t, false, mockSub1.unsubscribed) c.UnsubscribeAllExcept() require.Equal(t, 0, c.LenSubs()) - require.Equal(t, true, sub1.unsubscribed) + require.Equal(t, true, mockSub1.unsubscribed) }) }