Skip to content

Commit

Permalink
Refactored how SQS and SNS clients references are initialized and lif…
Browse files Browse the repository at this point in the history
…etime managed
  • Loading branch information
ramonsmits committed Nov 27, 2024
1 parent 1f34143 commit 789afca
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public static SqsTransport PrepareSqsTransport(bool supportsPublishSubscribe = t
var transport = new SqsTransport(
ClientFactories.CreateSqsClient(),
ClientFactories.CreateSnsClient(),
externallyManaged: false,
supportsPublishSubscribe: supportsPublishSubscribe
)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ public Publisher() =>
c.UsePersistence<TestingInMemoryPersistence, StorageType.Subscriptions>().UseStorage(subscriptionStorage);

// the default value is int.MaxValue which can lead to ephemeral port exhaustion due to the massive parallel publish
c.ConfigureSqsTransport().SqsClient = ClientFactories.CreateSqsClient(cfg => cfg.MaxConnectionsPerServer = 500);
c.ConfigureSqsTransport().SnsClient = ClientFactories.CreateSnsClient(cfg => cfg.MaxConnectionsPerServer = 500);
c.ConfigureSqsTransport().SetupSqsClient(ClientFactories.CreateSqsClient(cfg => cfg.MaxConnectionsPerServer = 500), false);
c.ConfigureSqsTransport().SetupSnsClient(ClientFactories.CreateSnsClient(cfg => cfg.MaxConnectionsPerServer = 500), false);

c.OnEndpointSubscribed<Context>((s, context) =>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ public Publisher() =>
c.UsePersistence<TestingInMemoryPersistence, StorageType.Subscriptions>().UseStorage(subscriptionStorage);

// the default value is int.MaxValue which can lead to ephemeral port exhaustion due to the massive parallel publish
c.ConfigureSqsTransport().SqsClient = ClientFactories.CreateSqsClient(cfg => cfg.MaxConnectionsPerServer = 500);
c.ConfigureSqsTransport().SnsClient = ClientFactories.CreateSnsClient(cfg => cfg.MaxConnectionsPerServer = 500);
c.ConfigureSqsTransport().SetupSqsClient(ClientFactories.CreateSqsClient(cfg => cfg.MaxConnectionsPerServer = 500), false);
c.ConfigureSqsTransport().SetupSnsClient(ClientFactories.CreateSnsClient(cfg => cfg.MaxConnectionsPerServer = 500), false);

c.OnEndpointSubscribed<Context>((s, context) =>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ public Publisher() =>
c.UsePersistence<TestingInMemoryPersistence, StorageType.Subscriptions>().UseStorage(subscriptionStorage);

// the default value is int.MaxValue which can lead to ephemeral port exhaustion due to the massive parallel publish
c.ConfigureSqsTransport().SqsClient = ClientFactories.CreateSqsClient(cfg => cfg.MaxConnectionsPerServer = 500);
c.ConfigureSqsTransport().SnsClient = ClientFactories.CreateSnsClient(cfg => cfg.MaxConnectionsPerServer = 500);
c.ConfigureSqsTransport().SetupSqsClient(ClientFactories.CreateSqsClient(cfg => cfg.MaxConnectionsPerServer = 500), false);
c.ConfigureSqsTransport().SetupSnsClient(ClientFactories.CreateSnsClient(cfg => cfg.MaxConnectionsPerServer = 500), false);

c.OnEndpointSubscribed<Context>((s, context) =>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ public Publisher() =>
c.UsePersistence<TestingInMemoryPersistence, StorageType.Subscriptions>().UseStorage(subscriptionStorage);

// the default value is int.MaxValue which can lead to ephemeral port exhaustion due to the massive parallel publish
c.ConfigureSqsTransport().SqsClient = ClientFactories.CreateSqsClient(cfg => cfg.MaxConnectionsPerServer = 500);
c.ConfigureSqsTransport().SnsClient = ClientFactories.CreateSnsClient(cfg => cfg.MaxConnectionsPerServer = 500);
c.ConfigureSqsTransport().SetupSqsClient(ClientFactories.CreateSqsClient(cfg => cfg.MaxConnectionsPerServer = 500), false);
c.ConfigureSqsTransport().SetupSnsClient(ClientFactories.CreateSnsClient(cfg => cfg.MaxConnectionsPerServer = 500), false);

c.OnEndpointSubscribed<Context>((s, context) =>
{
Expand Down
67 changes: 24 additions & 43 deletions src/NServiceBus.Transport.SQS/Configure/SqsTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,33 +21,23 @@ public partial class SqsTransport : TransportDefinition
/// <summary>
/// SQS client for the transport.
/// </summary>
public IAmazonSQS SqsClient
{
get => sqsClient;
//For legacy API shim
internal set
{
ArgumentNullException.ThrowIfNull(value);

sqsClient = value;
externallyManagedSqsClient = true;
}
}
public IAmazonSQS SqsClient => sqsClient.Instance;

/// <summary>
/// SNS client for the transport.
/// </summary>
public IAmazonSimpleNotificationService SnsClient
public IAmazonSimpleNotificationService SnsClient => snsClient.Instance;

internal void SetupSqsClient(IAmazonSQS sqsClient, bool externallyManaged)
{
get => snsClient;
//For legacy API shim
internal set
{
ArgumentNullException.ThrowIfNull(value);
ArgumentNullException.ThrowIfNull(sqsClient);
this.sqsClient = (sqsClient, externallyManaged);
}

snsClient = value;
externallyManagedSnsClient = true;
}
internal void SetupSnsClient(IAmazonSimpleNotificationService snsClient, bool externallyManaged)
{
ArgumentNullException.ThrowIfNull(snsClient);
this.snsClient = (snsClient, externallyManaged);
}

/// <summary>
Expand Down Expand Up @@ -214,10 +204,8 @@ public void MapEvent(Type subscribedEventType, Type publishedEventType)
/// Creates a new instance of the SQS transport definition.
/// </summary>
public SqsTransport(IAmazonSQS sqsClient, IAmazonSimpleNotificationService snsClient)
: base(TransportTransactionMode.ReceiveOnly, true, true, true)
: this(sqsClient, snsClient, externallyManaged: true)
{
SqsClient = sqsClient;
SnsClient = snsClient;
}

/// <summary>
Expand All @@ -226,10 +214,8 @@ public SqsTransport(IAmazonSQS sqsClient, IAmazonSimpleNotificationService snsCl
/// Uses SQS and SNS clients created using a default constructor (based on the the settings from the environment)
/// </summary>
public SqsTransport()
: base(TransportTransactionMode.ReceiveOnly, true, true, true)
: this(DefaultClientFactories.SqsFactory(), DefaultClientFactories.SnsFactory(), externallyManaged: false)
{
sqsClient = DefaultClientFactories.SqsFactory();
snsClient = DefaultClientFactories.SnsFactory();
}

/// <summary>
Expand All @@ -248,16 +234,16 @@ bool enableDelayedDelivery
supportsTTBR: true
)
{
// Use properties to ensure `externallyManagedSqsClient` is set
SqsClient = sqsClient;
SnsClient = snsClient;
SetupSqsClient(sqsClient, true);
SetupSnsClient(snsClient, true);
}

// Only invoke when not using external SQS and SNS clients
internal SqsTransport(
IAmazonSQS sqsClient,
IAmazonSimpleNotificationService snsClient,
bool supportsPublishSubscribe,
bool externallyManaged,
bool supportsPublishSubscribe = true,
bool enableDelayedDelivery = true
)
: base(
Expand All @@ -267,8 +253,8 @@ internal SqsTransport(
supportsTTBR: true
)
{
this.sqsClient = sqsClient;
this.snsClient = snsClient;
SetupSqsClient(sqsClient, externallyManaged);
SetupSnsClient(snsClient, externallyManaged);
}

/// <summary>
Expand Down Expand Up @@ -302,8 +288,8 @@ public override async Task<TransportInfrastructure> Initialize(HostSettings host
QueueDelayTime,
topicNamePrefix,
DoNotWrapOutgoingMessages,
!externallyManagedSqsClient,
!externallyManagedSnsClient,
!sqsClient.ExternallyManaged,
!snsClient.ExternallyManaged,
!SupportsDelayedDelivery
);

Expand Down Expand Up @@ -350,16 +336,11 @@ static void AssertQueueNameGeneratorIdempotent(Func<string, string, string> gene
readonly EventToTopicsMappings eventToTopicsMappings = new EventToTopicsMappings();
readonly EventToEventsMappings eventToEventsMappings = new EventToEventsMappings();

static readonly TransportTransactionMode[] SupportedTransactionModes = {
TransportTransactionMode.None,
TransportTransactionMode.ReceiveOnly
};
static readonly TransportTransactionMode[] SupportedTransactionModes = { TransportTransactionMode.None, TransportTransactionMode.ReceiveOnly };

static readonly TimeSpan MaxTimeToLiveUpperBound = TimeSpan.FromDays(14);
static readonly TimeSpan MaxTimeToLiveLowerBound = TimeSpan.FromSeconds(60);
IAmazonSQS sqsClient;
IAmazonSimpleNotificationService snsClient;
bool externallyManagedSqsClient;
bool externallyManagedSnsClient;
(IAmazonSQS Instance, bool ExternallyManaged) sqsClient;
(IAmazonSimpleNotificationService Instance, bool ExternallyManaged) snsClient;
}
}
14 changes: 12 additions & 2 deletions src/NServiceBus.Transport.SQS/SqsTransportSettings.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,12 @@ public static TransportExtensions<SqsTransport> UseTransport<T>(this EndpointCon
Message = "The configuration has been moved to SqsTransport class.")]
public static TransportExtensions<SqsTransport> ClientFactory(this TransportExtensions<SqsTransport> transportExtensions, Func<IAmazonSQS> factory)
{
transportExtensions.Transport.SqsClient = factory();
// ===================================
// WHEN REMOVING THIS OBSOLETE CODE:
//
// Refactor "SetupSqsClient"
// ===================================
transportExtensions.Transport.SetupSqsClient(factory(), true);
return transportExtensions;
}

Expand All @@ -52,7 +57,12 @@ public static TransportExtensions<SqsTransport> ClientFactory(this TransportExte
Message = "The configuration has been moved to SqsTransport class.")]
public static TransportExtensions<SqsTransport> ClientFactory(this TransportExtensions<SqsTransport> transportExtensions, Func<IAmazonSimpleNotificationService> factory)
{
transportExtensions.Transport.SnsClient = factory();
// ===================================
// WHEN REMOVING THIS OBSOLETE CODE:
//
// Refactor "SetupSnsClient"
// ===================================
transportExtensions.Transport.SetupSnsClient(factory(), true);
return transportExtensions;
}

Expand Down

0 comments on commit 789afca

Please sign in to comment.