Skip to content

Commit

Permalink
Track messages that successfully completed the message or error pipel…
Browse files Browse the repository at this point in the history
…ine but failed to get acknowledged due to expired leases in receiveonly mode (#1044)

* Track messages that successfully completed the message or error pipeline but failed to get acknowledged due to expired leases in receiveonly mode (#1034)

* Update src/AcceptanceTests/Receiving/When_message_visibility_expired.cs

Co-authored-by: Travis Nickels <[email protected]>

---------

Co-authored-by: Travis Nickels <[email protected]>
  • Loading branch information
danielmarbach and TravisNickels authored Sep 25, 2024
1 parent f883fdd commit bf938e9
Show file tree
Hide file tree
Showing 12 changed files with 471 additions and 106 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,13 @@
<ItemGroup>
<PackageReference Include="Azure.Identity" Version="1.12.0" />
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.18.1" />
<PackageReference Include="GitHubActionsTestLogger" Version="2.4.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.10.0" />
<PackageReference Include="BitFaster.Caching" Version="2.5.2" />
<PackageReference Include="NServiceBus.AcceptanceTests.Sources" Version="9.1.1" GeneratePathProperty="true" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="GitHubActionsTestLogger" Version="2.4.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.11.1" />
<PackageReference Include="NUnit" Version="3.14.0" />
<PackageReference Include="NUnit3TestAdapter" Version="4.6.0" />
</ItemGroup>
Expand Down
109 changes: 109 additions & 0 deletions src/AcceptanceTests/Receiving/When_message_visibility_expired.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
namespace NServiceBus.Transport.AzureServiceBus.AcceptanceTests
{
using System;
using System.Linq;
using System.Threading.Tasks;
using AcceptanceTesting;
using Azure.Messaging.ServiceBus;
using NServiceBus.AcceptanceTests;
using NServiceBus.AcceptanceTests.EndpointTemplates;
using NUnit.Framework;

public class When_message_visibility_expired : NServiceBusAcceptanceTest
{
[Test]
public async Task Should_complete_message_on_next_receive_when_pipeline_successful()
{
var ctx = await Scenario.Define<Context>()
.WithEndpoint<Receiver>(b =>
{
b.CustomConfig(c =>
{
// Limiting the concurrency for this test to make sure messages that are made available again are
// not concurrently processed. This is not necessary for the test to pass but it makes
// reasoning about the test easier.
c.LimitMessageProcessingConcurrencyTo(1);
});
b.When((session, _) => session.SendLocal(new MyMessage()));
})
.Done(c => c.NativeMessageId is not null && c.Logs.Any(l => WasMarkedAsSuccessfullyCompleted(l, c)))
.Run();

var items = ctx.Logs.Where(l => WasMarkedAsSuccessfullyCompleted(l, ctx)).ToArray();

Assert.That(items, Is.Not.Empty);
}

[Test]
public async Task Should_complete_message_on_next_receive_when_error_pipeline_handled_the_message()
{
var ctx = await Scenario.Define<Context>(c =>
{
c.ShouldThrow = true;
})
.WithEndpoint<Receiver>(b =>
{
b.DoNotFailOnErrorMessages();
b.CustomConfig(c =>
{
var recoverability = c.Recoverability();
recoverability.AddUnrecoverableException<InvalidOperationException>();

// Limiting the concurrency for this test to make sure messages that are made available again are
// not concurrently processed. This is not necessary for the test to pass but it makes
// reasoning about the test easier.
c.LimitMessageProcessingConcurrencyTo(1);
});
b.When((session, _) => session.SendLocal(new MyMessage()));
})
.Done(c => c.NativeMessageId is not null && c.Logs.Any(l => WasMarkedAsSuccessfullyCompleted(l, c)))
.Run();

var items = ctx.Logs.Where(l => WasMarkedAsSuccessfullyCompleted(l, ctx)).ToArray();

Assert.That(items, Is.Not.Empty);
}

static bool WasMarkedAsSuccessfullyCompleted(ScenarioContext.LogItem l, Context c)
=> l.Message.StartsWith($"Received message with id '{c.NativeMessageId}' was marked as successfully completed");

class Context : ScenarioContext
{
public bool ShouldThrow { get; set; }

public string NativeMessageId { get; set; }
}

class Receiver : EndpointConfigurationBuilder
{
public Receiver() => EndpointSetup<DefaultServer>(c =>
{
var transport = c.ConfigureTransport<AzureServiceBusTransport>();
// Explicitly setting the transport transaction mode to ReceiveOnly because the message
// tracking only is implemented for this mode.
transport.TransportTransactionMode = TransportTransactionMode.ReceiveOnly;
});
}

public class MyMessage : IMessage;

class MyMessageHandler(Context testContext) : IHandleMessages<MyMessage>
{
public async Task Handle(MyMessage message, IMessageHandlerContext context)
{
var messageEventArgs = context.Extensions.Get<ProcessMessageEventArgs>();
// By abandoning the message, the message will be "immediately available" for retrieval again and effectively the message pump
// has lost the message visibility timeout because any Complete or Abandon will be rejected by the azure service bus.
var serviceBusReceivedMessage = context.Extensions.Get<ServiceBusReceivedMessage>();
await messageEventArgs.AbandonMessageAsync(serviceBusReceivedMessage);

testContext.NativeMessageId = serviceBusReceivedMessage.MessageId;

if (testContext.ShouldThrow)
{
throw new InvalidOperationException("Simulated exception");
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

<ItemGroup>
<PackageReference Include="GitHubActionsTestLogger" Version="2.4.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.10.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.11.1" />
<PackageReference Include="NUnit" Version="3.14.0" />
<PackageReference Include="NUnit3TestAdapter" Version="4.6.0" />
</ItemGroup>
Expand Down
27 changes: 25 additions & 2 deletions src/Tests/FakeProcessor.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
#nullable enable

namespace NServiceBus.Transport.AzureServiceBus.Tests
{
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
Expand All @@ -21,7 +24,27 @@ public class FakeProcessor : ServiceBusProcessor
return Task.CompletedTask;
}

public Task ProcessMessage(ServiceBusReceivedMessage message, ServiceBusReceiver receiver = null, CancellationToken cancellationToken = default)
=> OnProcessMessageAsync(new ProcessMessageEventArgs(message, receiver ?? new FakeReceiver(), cancellationToken));
public Task ProcessMessage(ServiceBusReceivedMessage message, ServiceBusReceiver? receiver = null, CancellationToken cancellationToken = default)
{
var eventArgs = new CustomProcessMessageEventArgs(message, receiver ?? new FakeReceiver(), cancellationToken);
receivedMessageToEventArgs.Add(message, eventArgs);
return OnProcessMessageAsync(eventArgs);
}

readonly ConditionalWeakTable<ServiceBusReceivedMessage, CustomProcessMessageEventArgs>
receivedMessageToEventArgs = [];

sealed class CustomProcessMessageEventArgs : ProcessMessageEventArgs
{
public CustomProcessMessageEventArgs(ServiceBusReceivedMessage message, ServiceBusReceiver receiver, CancellationToken cancellationToken) : base(message, receiver, cancellationToken)
{
}

public CustomProcessMessageEventArgs(ServiceBusReceivedMessage message, ServiceBusReceiver receiver, string identifier, CancellationToken cancellationToken) : base(message, receiver, identifier, cancellationToken)
{
}

public Task RaiseMessageLockLost(MessageLockLostEventArgs args, CancellationToken cancellationToken = default) => OnMessageLockLostAsync(args);
}
}
}
12 changes: 10 additions & 2 deletions src/Tests/FakeReceiver.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
namespace NServiceBus.Transport.AzureServiceBus.Tests
{
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
Expand All @@ -9,25 +10,32 @@ public class FakeReceiver : ServiceBusReceiver
{
readonly List<(ServiceBusReceivedMessage, IDictionary<string, object> propertiesToModify)> abandonedMessages = [];
readonly List<ServiceBusReceivedMessage> completedMessages = [];
readonly List<ServiceBusReceivedMessage> completingMessages = [];

public Func<ServiceBusReceivedMessage, CancellationToken, Task> CompleteMessageCallback = (_, _) => Task.CompletedTask;

public IReadOnlyCollection<(ServiceBusReceivedMessage, IDictionary<string, object> propertiesToModify)> AbandonedMessages
=> abandonedMessages;

public IReadOnlyCollection<ServiceBusReceivedMessage> CompletedMessages
=> completedMessages;

public IReadOnlyCollection<ServiceBusReceivedMessage> CompletingMessages
=> completingMessages;

public override Task AbandonMessageAsync(ServiceBusReceivedMessage message, IDictionary<string, object> propertiesToModify = null,
CancellationToken cancellationToken = default)
{
abandonedMessages.Add((message, propertiesToModify ?? new Dictionary<string, object>(0)));
return Task.CompletedTask;
}

public override Task CompleteMessageAsync(ServiceBusReceivedMessage message,
public override async Task CompleteMessageAsync(ServiceBusReceivedMessage message,
CancellationToken cancellationToken = default)
{
completingMessages.Add(message);
await CompleteMessageCallback(message, cancellationToken);
completedMessages.Add(message);
return Task.CompletedTask;
}
}
}
12 changes: 8 additions & 4 deletions src/Tests/NServiceBus.Transport.AzureServiceBus.Tests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,18 @@

<ItemGroup>
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.18.1" />
<PackageReference Include="GitHubActionsTestLogger" Version="2.4.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.10.0" />
<PackageReference Include="BitFaster.Caching" Version="2.5.2" />
<PackageReference Include="NServiceBus" Version="9.1.1" />
<PackageReference Include="NServiceBus.Testing" Version="9.0.0" />
<PackageReference Include="NUnit" Version="3.14.0" />
<PackageReference Include="NUnit3TestAdapter" Version="4.6.0" />
<PackageReference Include="Particular.Approvals" Version="1.0.0" />
<PackageReference Include="PublicApiGenerator" Version="11.1.0" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="GitHubActionsTestLogger" Version="2.4.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.11.1" />
<PackageReference Include="NUnit" Version="3.14.0" />
<PackageReference Include="NUnit3TestAdapter" Version="4.6.0" />
</ItemGroup>

</Project>
137 changes: 137 additions & 0 deletions src/Tests/Receiving/MessagePumpTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ namespace NServiceBus.Transport.AzureServiceBus.Tests.Receiving
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
Expand Down Expand Up @@ -61,6 +62,142 @@ await pump.Initialize(new PushRuntimeSettings(1), (context, token) =>
Assert.That(pumpWasCalled, Is.False);
}

[Test]
public async Task Should_complete_message_on_next_receive_receiveonly_mode_when_pipeline_successful_but_completion_failed_due_to_expired_lease()
{
var fakeClient = new FakeServiceBusClient();
var fakeReceiver = new FakeReceiver();
var onMessageCalled = 0;
var onErrorCalled = 0;

var pump = new MessagePump(fakeClient, new AzureServiceBusTransport { TransportTransactionMode = TransportTransactionMode.ReceiveOnly }, "receiveAddress",
new ReceiveSettings("TestReceiver", new QueueAddress("receiveAddress"), false, false, "error"), (s, exception, arg3) => { }, null);

using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var pumpExecutingTaskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
await using var _ = cancellationTokenSource.Token.Register(() => pumpExecutingTaskCompletionSource.TrySetCanceled());

await pump.Initialize(new PushRuntimeSettings(1), (_, _) =>
{
onMessageCalled++;
return Task.CompletedTask;
},
(_, _) =>
{
onErrorCalled++;
return Task.FromResult(ErrorHandleResult.Handled);
}, CancellationToken.None);
await pump.StartReceive();

var firstReceivedMessage = ServiceBusModelFactory.ServiceBusReceivedMessage(messageId: "SomeId", lockedUntil: DateTimeOffset.UtcNow.AddSeconds(60));
var secondReceivedMessage = ServiceBusModelFactory.ServiceBusReceivedMessage(messageId: "SomeId", lockedUntil: DateTimeOffset.UtcNow.AddSeconds(60));

fakeReceiver.CompleteMessageCallback = (message, _) => message == firstReceivedMessage ?
Task.FromException(new ServiceBusException("Lock Lost", reason: ServiceBusFailureReason.MessageLockLost)) :
Task.CompletedTask;

var fakeProcessor = fakeClient.Processors["receiveAddress"];
await fakeProcessor.ProcessMessage(firstReceivedMessage, fakeReceiver);
await fakeProcessor.ProcessMessage(secondReceivedMessage, fakeReceiver);

Assert.That(fakeReceiver.CompletedMessages, Does.Not.Contain(firstReceivedMessage));
Assert.That(fakeReceiver.CompletedMessages, Does.Contain(secondReceivedMessage));
Assert.That(fakeReceiver.AbandonedMessages, Is.Empty);
Assert.That(onMessageCalled, Is.EqualTo(1));
Assert.That(onErrorCalled, Is.Zero);
}

[Test]
public async Task Should_abandon_message_in_atomic_mode_when_pipeline_successful_but_completion_failed_due_to_expired_lease()
{
var fakeClient = new FakeServiceBusClient();
var fakeReceiver = new FakeReceiver();
var onMessageCalled = 0;
var onErrorCalled = 0;

var pump = new MessagePump(fakeClient, new AzureServiceBusTransport { TransportTransactionMode = TransportTransactionMode.SendsAtomicWithReceive }, "receiveAddress",
new ReceiveSettings("TestReceiver", new QueueAddress("receiveAddress"), false, false, "error"), (s, exception, arg3) => { }, null);

using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var pumpExecutingTaskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
await using var _ = cancellationTokenSource.Token.Register(() => pumpExecutingTaskCompletionSource.TrySetCanceled());

await pump.Initialize(new PushRuntimeSettings(1), (_, _) =>
{
onMessageCalled++;
return Task.CompletedTask;
},
(_, _) =>
{
onErrorCalled++;
return Task.FromResult(ErrorHandleResult.Handled);
}, CancellationToken.None);
await pump.StartReceive();

var receivedMessage = ServiceBusModelFactory.ServiceBusReceivedMessage(messageId: "SomeId", lockedUntil: DateTimeOffset.UtcNow.AddSeconds(60));

fakeReceiver.CompleteMessageCallback = (message, _) => message == receivedMessage ?
Task.FromException(new ServiceBusException("Lock Lost", reason: ServiceBusFailureReason.MessageLockLost)) :
Task.CompletedTask;

var fakeProcessor = fakeClient.Processors["receiveAddress"];
await fakeProcessor.ProcessMessage(receivedMessage, fakeReceiver);

Assert.Multiple(() =>
{
Assert.That(fakeReceiver.AbandonedMessages.Select((tuple, _) => { var (message, _) = tuple; return message; })
.ToList(), Does.Contain(receivedMessage));
Assert.That(fakeReceiver.CompletedMessages, Is.Empty);
Assert.That(onMessageCalled, Is.EqualTo(1));
Assert.That(onErrorCalled, Is.EqualTo(1));
});
}

[Test]
public async Task Should_complete_message_on_next_receive_receiveonly_mode_when_error_pipeline_successful_but_completion_failed_due_to_expired_lease()
{
var fakeClient = new FakeServiceBusClient();
var fakeReceiver = new FakeReceiver();
var onMessageCalled = 0;
var onErrorCalled = 0;

var pump = new MessagePump(fakeClient, new AzureServiceBusTransport { TransportTransactionMode = TransportTransactionMode.ReceiveOnly }, "receiveAddress",
new ReceiveSettings("TestReceiver", new QueueAddress("receiveAddress"), false, false, "error"), (s, exception, arg3) => { }, null);

using var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var pumpExecutingTaskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
await using var _ = cancellationTokenSource.Token.Register(() => pumpExecutingTaskCompletionSource.TrySetCanceled());

await pump.Initialize(new PushRuntimeSettings(1), (_, _) =>
{
onMessageCalled++;
return Task.FromException<InvalidOperationException>(new InvalidOperationException());
},
(_, _) =>
{
onErrorCalled++;
return Task.FromResult(ErrorHandleResult.Handled);
}, CancellationToken.None);
await pump.StartReceive();

var firstReceivedMessage = ServiceBusModelFactory.ServiceBusReceivedMessage(messageId: "SomeId", lockedUntil: DateTimeOffset.UtcNow.AddSeconds(60));
var secondReceivedMessage = ServiceBusModelFactory.ServiceBusReceivedMessage(messageId: "SomeId", lockedUntil: DateTimeOffset.UtcNow.AddSeconds(60));

fakeReceiver.CompleteMessageCallback = (message, _) => message == firstReceivedMessage ?
Task.FromException(new ServiceBusException("Lock Lost", reason: ServiceBusFailureReason.MessageLockLost)) :
Task.CompletedTask;

var fakeProcessor = fakeClient.Processors["receiveAddress"];
await fakeProcessor.ProcessMessage(firstReceivedMessage, fakeReceiver);
await fakeProcessor.ProcessMessage(secondReceivedMessage, fakeReceiver);

Assert.That(fakeReceiver.CompletedMessages, Does.Not.Contain(firstReceivedMessage));
Assert.That(fakeReceiver.CompletedMessages, Does.Contain(secondReceivedMessage));
Assert.That(fakeReceiver.AbandonedMessages, Is.Empty);
Assert.That(onMessageCalled, Is.EqualTo(1));
Assert.That(onErrorCalled, Is.EqualTo(1));
}

[Test]
public async Task Should_abandon_message_upon_failure_with_retry_required()
{
Expand Down
Loading

0 comments on commit bf938e9

Please sign in to comment.