From 789afca0343481c13aad48977b8c5e6d335d1793 Mon Sep 17 00:00:00 2001 From: Ramon Smits Date: Wed, 27 Nov 2024 11:57:11 +0100 Subject: [PATCH] Refactored how SQS and SNS clients references are initialized and lifetime managed --- .../ConfigureEndpointSqsTransport.cs | 1 + ...ve_and_non_native_subscribers_in_a_loop.cs | 4 +- ...loop_in_the_context_of_incoming_message.cs | 4 +- ...ve_and_non_native_subscribers_in_a_loop.cs | 4 +- ...loop_in_the_context_of_incoming_message.cs | 4 +- .../Configure/SqsTransport.cs | 67 +++++++------------ .../SqsTransportSettings.cs | 14 +++- 7 files changed, 45 insertions(+), 53 deletions(-) diff --git a/src/NServiceBus.Transport.SQS.AcceptanceTests/ConfigureEndpointSqsTransport.cs b/src/NServiceBus.Transport.SQS.AcceptanceTests/ConfigureEndpointSqsTransport.cs index e76e0f998..b098f99d3 100644 --- a/src/NServiceBus.Transport.SQS.AcceptanceTests/ConfigureEndpointSqsTransport.cs +++ b/src/NServiceBus.Transport.SQS.AcceptanceTests/ConfigureEndpointSqsTransport.cs @@ -46,6 +46,7 @@ public static SqsTransport PrepareSqsTransport(bool supportsPublishSubscribe = t var transport = new SqsTransport( ClientFactories.CreateSqsClient(), ClientFactories.CreateSnsClient(), + externallyManaged: false, supportsPublishSubscribe: supportsPublishSubscribe ) { diff --git a/src/NServiceBus.Transport.SQS.AcceptanceTests/NativePubSub/HybridModeRateLimit/When_publishing_one_event_type_to_native_and_non_native_subscribers_in_a_loop.cs b/src/NServiceBus.Transport.SQS.AcceptanceTests/NativePubSub/HybridModeRateLimit/When_publishing_one_event_type_to_native_and_non_native_subscribers_in_a_loop.cs index b4db52f56..1369d0730 100644 --- a/src/NServiceBus.Transport.SQS.AcceptanceTests/NativePubSub/HybridModeRateLimit/When_publishing_one_event_type_to_native_and_non_native_subscribers_in_a_loop.cs +++ b/src/NServiceBus.Transport.SQS.AcceptanceTests/NativePubSub/HybridModeRateLimit/When_publishing_one_event_type_to_native_and_non_native_subscribers_in_a_loop.cs @@ -140,8 +140,8 @@ public Publisher() => c.UsePersistence().UseStorage(subscriptionStorage); // the default value is int.MaxValue which can lead to ephemeral port exhaustion due to the massive parallel publish - c.ConfigureSqsTransport().SqsClient = ClientFactories.CreateSqsClient(cfg => cfg.MaxConnectionsPerServer = 500); - c.ConfigureSqsTransport().SnsClient = ClientFactories.CreateSnsClient(cfg => cfg.MaxConnectionsPerServer = 500); + c.ConfigureSqsTransport().SetupSqsClient(ClientFactories.CreateSqsClient(cfg => cfg.MaxConnectionsPerServer = 500), false); + c.ConfigureSqsTransport().SetupSnsClient(ClientFactories.CreateSnsClient(cfg => cfg.MaxConnectionsPerServer = 500), false); c.OnEndpointSubscribed((s, context) => { diff --git a/src/NServiceBus.Transport.SQS.AcceptanceTests/NativePubSub/HybridModeRateLimit/When_publishing_one_event_type_to_native_and_non_native_subscribers_in_a_loop_in_the_context_of_incoming_message.cs b/src/NServiceBus.Transport.SQS.AcceptanceTests/NativePubSub/HybridModeRateLimit/When_publishing_one_event_type_to_native_and_non_native_subscribers_in_a_loop_in_the_context_of_incoming_message.cs index 0ccd7579a..8af96d689 100644 --- a/src/NServiceBus.Transport.SQS.AcceptanceTests/NativePubSub/HybridModeRateLimit/When_publishing_one_event_type_to_native_and_non_native_subscribers_in_a_loop_in_the_context_of_incoming_message.cs +++ b/src/NServiceBus.Transport.SQS.AcceptanceTests/NativePubSub/HybridModeRateLimit/When_publishing_one_event_type_to_native_and_non_native_subscribers_in_a_loop_in_the_context_of_incoming_message.cs @@ -131,8 +131,8 @@ public Publisher() => c.UsePersistence().UseStorage(subscriptionStorage); // the default value is int.MaxValue which can lead to ephemeral port exhaustion due to the massive parallel publish - c.ConfigureSqsTransport().SqsClient = ClientFactories.CreateSqsClient(cfg => cfg.MaxConnectionsPerServer = 500); - c.ConfigureSqsTransport().SnsClient = ClientFactories.CreateSnsClient(cfg => cfg.MaxConnectionsPerServer = 500); + c.ConfigureSqsTransport().SetupSqsClient(ClientFactories.CreateSqsClient(cfg => cfg.MaxConnectionsPerServer = 500), false); + c.ConfigureSqsTransport().SetupSnsClient(ClientFactories.CreateSnsClient(cfg => cfg.MaxConnectionsPerServer = 500), false); c.OnEndpointSubscribed((s, context) => { diff --git a/src/NServiceBus.Transport.SQS.AcceptanceTests/NativePubSub/HybridModeRateLimit/When_publishing_two_event_types_to_native_and_non_native_subscribers_in_a_loop.cs b/src/NServiceBus.Transport.SQS.AcceptanceTests/NativePubSub/HybridModeRateLimit/When_publishing_two_event_types_to_native_and_non_native_subscribers_in_a_loop.cs index 248d739e3..afc131f39 100644 --- a/src/NServiceBus.Transport.SQS.AcceptanceTests/NativePubSub/HybridModeRateLimit/When_publishing_two_event_types_to_native_and_non_native_subscribers_in_a_loop.cs +++ b/src/NServiceBus.Transport.SQS.AcceptanceTests/NativePubSub/HybridModeRateLimit/When_publishing_two_event_types_to_native_and_non_native_subscribers_in_a_loop.cs @@ -170,8 +170,8 @@ public Publisher() => c.UsePersistence().UseStorage(subscriptionStorage); // the default value is int.MaxValue which can lead to ephemeral port exhaustion due to the massive parallel publish - c.ConfigureSqsTransport().SqsClient = ClientFactories.CreateSqsClient(cfg => cfg.MaxConnectionsPerServer = 500); - c.ConfigureSqsTransport().SnsClient = ClientFactories.CreateSnsClient(cfg => cfg.MaxConnectionsPerServer = 500); + c.ConfigureSqsTransport().SetupSqsClient(ClientFactories.CreateSqsClient(cfg => cfg.MaxConnectionsPerServer = 500), false); + c.ConfigureSqsTransport().SetupSnsClient(ClientFactories.CreateSnsClient(cfg => cfg.MaxConnectionsPerServer = 500), false); c.OnEndpointSubscribed((s, context) => { diff --git a/src/NServiceBus.Transport.SQS.AcceptanceTests/NativePubSub/HybridModeRateLimit/When_publishing_two_event_types_to_native_and_non_native_subscribers_in_a_loop_in_the_context_of_incoming_message.cs b/src/NServiceBus.Transport.SQS.AcceptanceTests/NativePubSub/HybridModeRateLimit/When_publishing_two_event_types_to_native_and_non_native_subscribers_in_a_loop_in_the_context_of_incoming_message.cs index 53b7ae141..a7a6f04aa 100644 --- a/src/NServiceBus.Transport.SQS.AcceptanceTests/NativePubSub/HybridModeRateLimit/When_publishing_two_event_types_to_native_and_non_native_subscribers_in_a_loop_in_the_context_of_incoming_message.cs +++ b/src/NServiceBus.Transport.SQS.AcceptanceTests/NativePubSub/HybridModeRateLimit/When_publishing_two_event_types_to_native_and_non_native_subscribers_in_a_loop_in_the_context_of_incoming_message.cs @@ -146,8 +146,8 @@ public Publisher() => c.UsePersistence().UseStorage(subscriptionStorage); // the default value is int.MaxValue which can lead to ephemeral port exhaustion due to the massive parallel publish - c.ConfigureSqsTransport().SqsClient = ClientFactories.CreateSqsClient(cfg => cfg.MaxConnectionsPerServer = 500); - c.ConfigureSqsTransport().SnsClient = ClientFactories.CreateSnsClient(cfg => cfg.MaxConnectionsPerServer = 500); + c.ConfigureSqsTransport().SetupSqsClient(ClientFactories.CreateSqsClient(cfg => cfg.MaxConnectionsPerServer = 500), false); + c.ConfigureSqsTransport().SetupSnsClient(ClientFactories.CreateSnsClient(cfg => cfg.MaxConnectionsPerServer = 500), false); c.OnEndpointSubscribed((s, context) => { diff --git a/src/NServiceBus.Transport.SQS/Configure/SqsTransport.cs b/src/NServiceBus.Transport.SQS/Configure/SqsTransport.cs index ec96e6e18..be5a0634c 100644 --- a/src/NServiceBus.Transport.SQS/Configure/SqsTransport.cs +++ b/src/NServiceBus.Transport.SQS/Configure/SqsTransport.cs @@ -21,33 +21,23 @@ public partial class SqsTransport : TransportDefinition /// /// SQS client for the transport. /// - public IAmazonSQS SqsClient - { - get => sqsClient; - //For legacy API shim - internal set - { - ArgumentNullException.ThrowIfNull(value); - - sqsClient = value; - externallyManagedSqsClient = true; - } - } + public IAmazonSQS SqsClient => sqsClient.Instance; /// /// SNS client for the transport. /// - public IAmazonSimpleNotificationService SnsClient + public IAmazonSimpleNotificationService SnsClient => snsClient.Instance; + + internal void SetupSqsClient(IAmazonSQS sqsClient, bool externallyManaged) { - get => snsClient; - //For legacy API shim - internal set - { - ArgumentNullException.ThrowIfNull(value); + ArgumentNullException.ThrowIfNull(sqsClient); + this.sqsClient = (sqsClient, externallyManaged); + } - snsClient = value; - externallyManagedSnsClient = true; - } + internal void SetupSnsClient(IAmazonSimpleNotificationService snsClient, bool externallyManaged) + { + ArgumentNullException.ThrowIfNull(snsClient); + this.snsClient = (snsClient, externallyManaged); } /// @@ -214,10 +204,8 @@ public void MapEvent(Type subscribedEventType, Type publishedEventType) /// Creates a new instance of the SQS transport definition. /// public SqsTransport(IAmazonSQS sqsClient, IAmazonSimpleNotificationService snsClient) - : base(TransportTransactionMode.ReceiveOnly, true, true, true) + : this(sqsClient, snsClient, externallyManaged: true) { - SqsClient = sqsClient; - SnsClient = snsClient; } /// @@ -226,10 +214,8 @@ public SqsTransport(IAmazonSQS sqsClient, IAmazonSimpleNotificationService snsCl /// Uses SQS and SNS clients created using a default constructor (based on the the settings from the environment) /// public SqsTransport() - : base(TransportTransactionMode.ReceiveOnly, true, true, true) + : this(DefaultClientFactories.SqsFactory(), DefaultClientFactories.SnsFactory(), externallyManaged: false) { - sqsClient = DefaultClientFactories.SqsFactory(); - snsClient = DefaultClientFactories.SnsFactory(); } /// @@ -248,16 +234,16 @@ bool enableDelayedDelivery supportsTTBR: true ) { - // Use properties to ensure `externallyManagedSqsClient` is set - SqsClient = sqsClient; - SnsClient = snsClient; + SetupSqsClient(sqsClient, true); + SetupSnsClient(snsClient, true); } // Only invoke when not using external SQS and SNS clients internal SqsTransport( IAmazonSQS sqsClient, IAmazonSimpleNotificationService snsClient, - bool supportsPublishSubscribe, + bool externallyManaged, + bool supportsPublishSubscribe = true, bool enableDelayedDelivery = true ) : base( @@ -267,8 +253,8 @@ internal SqsTransport( supportsTTBR: true ) { - this.sqsClient = sqsClient; - this.snsClient = snsClient; + SetupSqsClient(sqsClient, externallyManaged); + SetupSnsClient(snsClient, externallyManaged); } /// @@ -302,8 +288,8 @@ public override async Task Initialize(HostSettings host QueueDelayTime, topicNamePrefix, DoNotWrapOutgoingMessages, - !externallyManagedSqsClient, - !externallyManagedSnsClient, + !sqsClient.ExternallyManaged, + !snsClient.ExternallyManaged, !SupportsDelayedDelivery ); @@ -350,16 +336,11 @@ static void AssertQueueNameGeneratorIdempotent(Func gene readonly EventToTopicsMappings eventToTopicsMappings = new EventToTopicsMappings(); readonly EventToEventsMappings eventToEventsMappings = new EventToEventsMappings(); - static readonly TransportTransactionMode[] SupportedTransactionModes = { - TransportTransactionMode.None, - TransportTransactionMode.ReceiveOnly - }; + static readonly TransportTransactionMode[] SupportedTransactionModes = { TransportTransactionMode.None, TransportTransactionMode.ReceiveOnly }; static readonly TimeSpan MaxTimeToLiveUpperBound = TimeSpan.FromDays(14); static readonly TimeSpan MaxTimeToLiveLowerBound = TimeSpan.FromSeconds(60); - IAmazonSQS sqsClient; - IAmazonSimpleNotificationService snsClient; - bool externallyManagedSqsClient; - bool externallyManagedSnsClient; + (IAmazonSQS Instance, bool ExternallyManaged) sqsClient; + (IAmazonSimpleNotificationService Instance, bool ExternallyManaged) snsClient; } } \ No newline at end of file diff --git a/src/NServiceBus.Transport.SQS/SqsTransportSettings.cs b/src/NServiceBus.Transport.SQS/SqsTransportSettings.cs index 7225dca26..713aa687d 100644 --- a/src/NServiceBus.Transport.SQS/SqsTransportSettings.cs +++ b/src/NServiceBus.Transport.SQS/SqsTransportSettings.cs @@ -39,7 +39,12 @@ public static TransportExtensions UseTransport(this EndpointCon Message = "The configuration has been moved to SqsTransport class.")] public static TransportExtensions ClientFactory(this TransportExtensions transportExtensions, Func factory) { - transportExtensions.Transport.SqsClient = factory(); + // =================================== + // WHEN REMOVING THIS OBSOLETE CODE: + // + // Refactor "SetupSqsClient" + // =================================== + transportExtensions.Transport.SetupSqsClient(factory(), true); return transportExtensions; } @@ -52,7 +57,12 @@ public static TransportExtensions ClientFactory(this TransportExte Message = "The configuration has been moved to SqsTransport class.")] public static TransportExtensions ClientFactory(this TransportExtensions transportExtensions, Func factory) { - transportExtensions.Transport.SnsClient = factory(); + // =================================== + // WHEN REMOVING THIS OBSOLETE CODE: + // + // Refactor "SetupSnsClient" + // =================================== + transportExtensions.Transport.SetupSnsClient(factory(), true); return transportExtensions; }