Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process raw messages #2341

Merged
merged 12 commits into from
Nov 10, 2023
Merged

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -34,44 +34,6 @@ await NativeEndpoint.SendTo<Receiver>(new Dictionary<string, MessageAttributeVal
Assert.AreEqual("Hello!", context.MessageReceived);
}

[Test]
public async Task Should_fail_when_messagetypefullname_not_present()
{
using var cancellationTokenSource = new CancellationTokenSource();
var cancellationToken = cancellationTokenSource.Token;
try
{
await Scenario.Define<Context>()
.WithEndpoint<Receiver>(c =>
{
c.CustomConfig((cfg, ctx) =>
{
ctx.ErrorQueueAddress = cfg.GetSettings().ErrorQueueAddress();
});
c.When(async (session, ctx) =>
{
await NativeEndpoint.SendTo<Receiver>(new Dictionary<string, MessageAttributeValue>
{
// unfortunately only the message id attribute is preserved when moving to the poison queue
{
Headers.MessageId, new MessageAttributeValue {DataType = "String", StringValue = ctx.TestRunId.ToString()}
}
}, MessageToSend);
_ = NativeEndpoint.ConsumePoisonQueue(ctx.TestRunId, ctx.ErrorQueueAddress, _ =>
{
ctx.MessageMovedToPoisonQueue = true;
}, cancellationToken);
}).DoNotFailOnErrorMessages();
})
.Done(c => c.MessageMovedToPoisonQueue)
.Run();
}
finally
{
cancellationTokenSource.Cancel();
}
}

[Test]
public async Task Should_support_loading_body_from_s3()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace TransportTests.TransportMessageExtraction
namespace NServiceBus.Transport.SQS.Tests
{
using System;
using System.Collections.Generic;
Expand Down Expand Up @@ -39,6 +39,44 @@ static IEnumerable<TestCaseData> GenerateTestCases(bool passMessageIdInMessageAt
var nsbMessageIdPassedThroughHeaders = "NSB Message Id passed via headers";
bool passBodyInMessage = !pushBodyToS3;

#region Raw message with no headers at all
yield return TestCase(
"Non-JSON message",
native => native.WithBody("Body Contents"),
transport => transport.WithBody("Body Contents"),
expectedMessageId: nativeMessageId);

yield return TestCase(
"JSON without headers",
native => native.WithBody("{}"),
transport => transport.WithBody("{}"),
expectedMessageId: nativeMessageId);

yield return TestCase(
"JSON without headers property with wrong type",
native => native.WithBody(@"{ ""Headers"" : 6 }"),
transport => transport.WithBody(@"{ ""Headers"" : 6 }"),
expectedMessageId: nativeMessageId);

yield return TestCase(
"JSON with additional properties",
native => native.WithBody(@"{ ""Headers"" : {}, ""Unexpected"" : null}"),
transport => transport.WithBody(@"{ ""Headers"" : {}, ""Unexpected"" : null}"),
expectedMessageId: nativeMessageId);

yield return TestCase(
"JSON with empty headers",
native => native.WithBody(@"{ ""Headers"" : {} }"),
transport => transport.WithBody(@"{ ""Headers"" : {} }"),
expectedMessageId: nativeMessageId);

yield return TestCase(
"JSON with unrecognized headers",
native => native.WithBody(@"{ ""Headers"" : { ""SomeHeader"" : ""some value""} }"),
transport => transport.WithBody(@"{ ""Headers"" : { ""SomeHeader"" : ""some value""} }"),
expectedMessageId: nativeMessageId);
#endregion

#region NSB headers in message attribute tests
yield return TestCase(
"Transport headers in message attribute",
Expand Down Expand Up @@ -104,6 +142,7 @@ static IEnumerable<TestCaseData> GenerateTestCases(bool passMessageIdInMessageAt
var senderTransportMessage = new TransportMessageBuilder()
.If(passMessageIdInNsbHeaders, t => t
.WithHeader(Headers.MessageId, nsbMessageIdPassedThroughHeaders))
.WithHeader(Headers.EnclosedMessageTypes, "Enclosed message type")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTE: Message-Id is one of the headers that is used to decide whether this is a valid non-native message. If it's not included then we need the other header, which is enclosed message type

.If(passBodyInMessage, t => t
.WithBody("Body Contents"))
.If(pushBodyToS3, t => t
Expand All @@ -117,43 +156,13 @@ static IEnumerable<TestCaseData> GenerateTestCases(bool passMessageIdInMessageAt
transport => transport
// HINT: This is needed here because the serializer reads it and it gets a default (MAX). When it is deserialized it gets included
.WithHeader(TransportHeaders.TimeToBeReceived, TimeSpan.MaxValue.ToString())
.WithHeader(Headers.EnclosedMessageTypes, "Enclosed message type")
.If(passBodyInMessage, t => t
.WithBody("Body Contents"))
.If(pushBodyToS3, t => t
.WithS3BodyKey("S3 Body Key"))
);

#region Corrupted transport message tests
// HINT: These should all throw
yield return TestCase(
"Corrupted serialized transport message no headers",
native => native.WithBody(@"{
""Body"": ""Body Contents""
}"),
considerPoison: true
);

yield return TestCase(
"Fully corrupted serialized transport message",
native => native
.WithBody(@"{
""NonExistingProperty"": ""Does not matter""
}"),
considerPoison: true
);

yield return TestCase(
"Corrupted headers on serialized transport message",
native => native
.WithBody(@"{
""Headers"": ""NOT A JSON DICTIONARY""
}"),
considerPoison: true
);

// TODO: Add more test cases with malformed transport message objects
#endregion

#endregion

TestCaseData TestCase(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Amazon.SQS;
using Amazon.SQS.Model;
using Transport;
using Transport.SQS;
Expand All @@ -13,7 +14,7 @@ public class Sending_poison_messages : NServiceBusTransportTest
{
[TestCase(TransportTransactionMode.None)]
[TestCase(TransportTransactionMode.ReceiveOnly)]
public async Task Should_move_to_error_queue(TransportTransactionMode transactionMode)
public async Task Should_move_to_error_queue_when_unwrapped_and_cannot_deserialize_headers(TransportTransactionMode transactionMode)
{
var onMessageCalled = false;
var onErrorCalled = false;
Expand All @@ -33,29 +34,39 @@ await StartPump(
return Task.FromResult(ErrorHandleResult.Handled);
}, transactionMode);

await SendPoisonMessage(InputQueueName);
using var sqsClient = ClientFactories.CreateSqsClient();
var queueUrl = await GetQueueUrl(sqsClient, InputQueueName);

var sendMessageRequest = new SendMessageRequest
{
QueueUrl = queueUrl,
MessageBody = UnwrappedAndNotRelevantPoisonMessageBody,
MessageAttributes =
{
[TransportHeaders.Headers] = new MessageAttributeValue
{
StringValue = "junk:this.will.fail.deserializing",
DataType = "String"
}
}
};

await sqsClient.SendMessageAsync(sendMessageRequest);

await CheckErrorQueue(ErrorQueueName, cancellationTokenSource.Token);

Assert.False(onErrorCalled, "Poison message should not invoke onError");
Assert.False(onMessageCalled, "Poison message should not invoke onMessage");
}

string PoisonMessageBody = "this is a poison message that won't deserialize to valid json";

async Task SendPoisonMessage(string inputQueueName)
static async Task<string> GetQueueUrl(IAmazonSQS sqsClient, string inputQueueName)
{
using var sqsClient = ClientFactories.CreateSqsClient();
var getQueueUrlResponse = await sqsClient.GetQueueUrlAsync(new GetQueueUrlRequest
{
QueueName = QueueCache.GetSqsQueueName(inputQueueName, SetupFixture.GetNamePrefix())
}).ConfigureAwait(false);

await sqsClient.SendMessageAsync(new SendMessageRequest
{
QueueUrl = getQueueUrlResponse.QueueUrl,
MessageBody = PoisonMessageBody
}).ConfigureAwait(false);
return getQueueUrlResponse.QueueUrl;
}

async Task CheckErrorQueue(string errorQueueName, CancellationToken cancellationToken)
Expand Down Expand Up @@ -87,7 +98,9 @@ async Task CheckErrorQueue(string errorQueueName, CancellationToken cancellation

Assert.NotNull(receiveMessageResponse);
Assert.AreEqual(1, receiveMessageResponse.Messages.Count);
Assert.AreEqual(PoisonMessageBody, receiveMessageResponse.Messages.Single().Body);
Assert.AreEqual(UnwrappedAndNotRelevantPoisonMessageBody, receiveMessageResponse.Messages.Single().Body);
}

const string UnwrappedAndNotRelevantPoisonMessageBody = "The body doesn't matter, this will be treated as an unwrapped message";
}
}
Loading