Skip to content

Commit

Permalink
Track messages that successfully completed the message or error pipel…
Browse files Browse the repository at this point in the history
…ine but failed to get acknowledged due to expired leases in receiveonly mode (#1047)

* Track messages that successfully completed the message or error pipeline but failed to get acknowledged due to expired leases in receiveonly mode (#1034)

* Update src/AcceptanceTests/Receiving/When_message_visibility_expired.cs

Co-authored-by: Travis Nickels <[email protected]>

---------

Co-authored-by: Daniel Marbach <[email protected]>
Co-authored-by: Travis Nickels <[email protected]>
  • Loading branch information
3 people authored Sep 25, 2024
1 parent 21b04cb commit 7984165
Show file tree
Hide file tree
Showing 9 changed files with 302 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,17 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="GitHubActionsTestLogger" Version="2.3.3" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.9.0" />
<PackageReference Include="Azure.Identity" Version="1.12.0" />
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.18.1" />
<PackageReference Include="BitFaster.Caching" Version="2.5.2" />
<PackageReference Include="NServiceBus.AcceptanceTests.Sources" Version="7.6.0" />
<PackageReference Include="NUnit" Version="3.14.0" />
<PackageReference Include="NUnit3TestAdapter" Version="4.5.0" />
</ItemGroup>

<ItemGroup Label="Force the latest version of the transitive dependencies">
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.11.1" />
<ItemGroup>
<PackageReference Include="GitHubActionsTestLogger" Version="2.4.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.11.1" />
<PackageReference Include="NUnit" Version="3.14.0" />
<PackageReference Include="NUnit3TestAdapter" Version="4.6.0" />
</ItemGroup>

</Project>
115 changes: 115 additions & 0 deletions src/AcceptanceTests/Receiving/When_message_visibility_expired.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
namespace NServiceBus.Transport.AzureServiceBus.AcceptanceTests
{
using System;
using System.Linq;
using System.Threading.Tasks;
using AcceptanceTesting;
using Azure.Messaging.ServiceBus;
using NServiceBus.AcceptanceTests;
using NServiceBus.AcceptanceTests.EndpointTemplates;
using NUnit.Framework;

public class When_message_visibility_expired : NServiceBusAcceptanceTest
{
[Test]
public async Task Should_complete_message_on_next_receive_when_pipeline_successful()
{
var ctx = await Scenario.Define<Context>()
.WithEndpoint<Receiver>(b =>
{
b.CustomConfig(c =>
{
// Limiting the concurrency for this test to make sure messages that are made available again are
// not concurrently processed. This is not necessary for the test to pass but it makes
// reasoning about the test easier.
c.LimitMessageProcessingConcurrencyTo(1);
});
b.When((session, _) => session.SendLocal(new MyMessage()));
})
.Done(c => c.NativeMessageId != null && c.Logs.Any(l => WasMarkedAsSuccessfullyCompleted(l, c)))
.Run();

var items = ctx.Logs.Where(l => WasMarkedAsSuccessfullyCompleted(l, ctx)).ToArray();

Assert.That(items, Is.Not.Empty);
}

[Test]
public async Task Should_complete_message_on_next_receive_when_error_pipeline_handled_the_message()
{
var ctx = await Scenario.Define<Context>(c =>
{
c.ShouldThrow = true;
})
.WithEndpoint<Receiver>(b =>
{
b.DoNotFailOnErrorMessages();
b.CustomConfig(c =>
{
var recoverability = c.Recoverability();
recoverability.AddUnrecoverableException<InvalidOperationException>();

// Limiting the concurrency for this test to make sure messages that are made available again are
// not concurrently processed. This is not necessary for the test to pass but it makes
// reasoning about the test easier.
c.LimitMessageProcessingConcurrencyTo(1);
});
b.When((session, _) => session.SendLocal(new MyMessage()));
})
.Done(c => c.NativeMessageId != null && c.Logs.Any(l => WasMarkedAsSuccessfullyCompleted(l, c)))
.Run();

var items = ctx.Logs.Where(l => WasMarkedAsSuccessfullyCompleted(l, ctx)).ToArray();

Assert.That(items, Is.Not.Empty);
}

static bool WasMarkedAsSuccessfullyCompleted(ScenarioContext.LogItem l, Context c)
=> l.Message.StartsWith($"Received message with id '{c.NativeMessageId}' was marked as successfully completed");

class Context : ScenarioContext
{
public bool ShouldThrow { get; set; }

public string NativeMessageId { get; set; }
}

class Receiver : EndpointConfigurationBuilder
{
public Receiver() => EndpointSetup<DefaultServer>(c =>
{
var transport = c.ConfigureTransport();
// Explicitly setting the transport transaction mode to ReceiveOnly because the message
// tracking only is implemented for this mode.
transport.Transactions(TransportTransactionMode.ReceiveOnly);
});
}

public class MyMessage : IMessage
{
}

class MyMessageHandler : IHandleMessages<MyMessage>
{
readonly Context _testContext;

public MyMessageHandler(Context testContext) => _testContext = testContext;

public async Task Handle(MyMessage message, IMessageHandlerContext context)
{
var messageReceiver = context.Extensions.Get<ServiceBusReceiver>();
// By abandoning the message, the message will be "immediately available" for retrieval again and effectively the message pump
// has lost the message visibility timeout because any Complete or Abandon will be rejected by the azure service bus.
var serviceBusReceivedMessage = context.Extensions.Get<ServiceBusReceivedMessage>();
await messageReceiver.AbandonMessageAsync(serviceBusReceivedMessage);

_testContext.NativeMessageId = serviceBusReceivedMessage.MessageId;

if (_testContext.ShouldThrow)
{
throw new InvalidOperationException("Simulated exception");
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
<ItemGroup>
<PackageReference Include="McMaster.Extensions.CommandLineUtils" Version="3.0.0" />
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.11.1" />
<PackageReference Include="Particular.Packaging" Version="2.3.0" PrivateAssets="All" />
<PackageReference Include="Azure.Identity" Version="1.12.0" />
<PackageReference Include="Particular.Packaging" Version="4.1.0" PrivateAssets="All" />
</ItemGroup>

</Project>
</Project>
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@

<ItemGroup>
<PackageReference Include="System.Linq.Async" Version="5.0.0" />
<PackageReference Include="GitHubActionsTestLogger" Version="2.3.3" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.9.0" />
<PackageReference Include="GitHubActionsTestLogger" Version="2.4.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.11.1" />
<PackageReference Include="NUnit" Version="3.14.0" />
<PackageReference Include="NUnit3TestAdapter" Version="4.5.0" />
<PackageReference Include="NUnit3TestAdapter" Version="4.6.0" />
</ItemGroup>

</Project>
17 changes: 9 additions & 8 deletions src/Tests/NServiceBus.Transport.AzureServiceBus.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,19 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="GitHubActionsTestLogger" Version="2.3.3" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.9.0" />
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.18.1" />
<PackageReference Include="BitFaster.Caching" Version="2.5.2" />
<PackageReference Include="NServiceBus" Version="7.2.3" />
<PackageReference Include="NUnit" Version="3.14.0" />
<PackageReference Include="NServiceBus.Testing" Version="7.2.0" />
<PackageReference Include="NUnit3TestAdapter" Version="4.5.0" />
<PackageReference Include="Particular.Approvals" Version="0.3.0" />
<PackageReference Include="PublicApiGenerator" Version="10.2.0" />
<PackageReference Include="Particular.Approvals" Version="0.6.0" />
<PackageReference Include="PublicApiGenerator" Version="11.1.0" />
</ItemGroup>

<ItemGroup Label="Force the latest version of the transitive dependencies">
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.11.1" />
<ItemGroup>
<PackageReference Include="GitHubActionsTestLogger" Version="2.4.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.11.1" />
<PackageReference Include="NUnit" Version="3.14.0" />
<PackageReference Include="NUnit3TestAdapter" Version="4.6.0" />
</ItemGroup>

</Project>
7 changes: 4 additions & 3 deletions src/Transport/NServiceBus.Transport.AzureServiceBus.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,15 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Azure.Messaging.ServiceBus" Version="[7.11.1, 8)" />
<PackageReference Include="Azure.Messaging.ServiceBus" Version="[7.11.1, 8.0.0)" />
<PackageReference Include="BitFaster.Caching" Version="[2.5.2, 3.0.0)" />
<PackageReference Include="NServiceBus" Version="[7.0.1, 8.0.0)" />
<PackageReference Include="Particular.Packaging" Version="2.3.0" PrivateAssets="All" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Fody" Version="6.6.0" PrivateAssets="All" />
<PackageReference Include="Fody" Version="6.8.0" PrivateAssets="All" />
<PackageReference Include="Obsolete.Fody" Version="5.3.0" PrivateAssets="All" />
<PackageReference Include="Particular.Packaging" Version="4.1.0" PrivateAssets="All" />
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)' == 'net461'" Label="Needed for net461">
Expand Down
Loading

0 comments on commit 7984165

Please sign in to comment.