Skip to content

Commit

Permalink
When not wrapping fallback to base64 encoding in case invalid SQS cha…
Browse files Browse the repository at this point in the history
…rs are being used (#2646)

* When not wrapping fallback to base64 encoding in case invalid SQS chars are being used (most straightforward impl ignoring perf)

* If only I would understand bools

* Better test

* Better assert

* Documentation

* Acceptance test

* Copy body

* Cleanup

* Header value with invalid chars works too

---------

Co-authored-by: danielmarbach <[email protected]>
  • Loading branch information
danielmarbach and danielmarbach authored Dec 2, 2024
1 parent c9e2e9e commit 291c8cc
Show file tree
Hide file tree
Showing 9 changed files with 190 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,39 +33,22 @@ public async Task Can_be_sent_and_processed()

class Sender : EndpointConfigurationBuilder
{
public Sender()
{
EndpointSetup<DefaultServer>(cfg => cfg.ConfigureSqsTransport().DoNotWrapOutgoingMessages = true);
}
public Sender() => EndpointSetup<DefaultServer>(cfg => cfg.ConfigureSqsTransport().DoNotWrapOutgoingMessages = true);

class DispatchControlMessageAtStartup : Feature
{
public DispatchControlMessageAtStartup()
{
EnableByDefault();
}
public DispatchControlMessageAtStartup() => EnableByDefault();

protected override void Setup(FeatureConfigurationContext context)
{
protected override void Setup(FeatureConfigurationContext context) =>
context.RegisterStartupTask(sp => new Startup(
sp.GetRequiredService<IMessageDispatcher>(),
sp.GetRequiredService<MyContext>())
);
}

class Startup : FeatureStartupTask
class Startup(IMessageDispatcher dispatcher, MyContext context) : FeatureStartupTask
{
readonly IMessageDispatcher dispatcher;
readonly MyContext context;

public Startup(IMessageDispatcher dispatcher, MyContext context)
{
this.dispatcher = dispatcher;
this.context = context;
}

protected override Task OnStart(IMessageSession session,
CancellationToken cancellationToken = new CancellationToken())
CancellationToken cancellationToken = default)
{
var transportOperations = new TransportOperations(
new TransportOperation(
Expand All @@ -86,57 +69,27 @@ protected override Task OnStart(IMessageSession session,
}

protected override Task OnStop(IMessageSession session,
CancellationToken cancellationToken = new CancellationToken()) => Task.CompletedTask;
CancellationToken cancellationToken = default) => Task.CompletedTask;
}
}
}

class Receiver : EndpointConfigurationBuilder
{
public Receiver()
{
EndpointSetup<DefaultServer>();
}
public Receiver() => EndpointSetup<DefaultServer>(c => c.Pipeline.Register("CatchControlMessage", typeof(CatchControlMessageBehavior), "Catches control message"));

public class DoTheThing : Feature
class CatchControlMessageBehavior(MyContext myContext) : Behavior<IIncomingPhysicalMessageContext>
{
public DoTheThing()
public override Task Invoke(IIncomingPhysicalMessageContext context, Func<Task> next)
{
EnableByDefault();
}

protected override void Setup(FeatureConfigurationContext context)
{
context.Pipeline.Register<PipelineBehavior.Registration>();
}

class PipelineBehavior : Behavior<IIncomingPhysicalMessageContext>
{
readonly MyContext myContext;

public PipelineBehavior(MyContext myContext)
if (context.MessageHeaders.ContainsKey("MyControlMessage"))
{
this.myContext = myContext;
myContext.ControlMessageBodyLength = context.Message.Body.Length;
myContext.ControlMessageReceived = true;
return Task.CompletedTask;
}

public override Task Invoke(IIncomingPhysicalMessageContext context, Func<Task> next)
{
if (context.MessageHeaders.ContainsKey("MyControlMessage"))
{
myContext.ControlMessageBodyLength = context.Message.Body.Length;
myContext.ControlMessageReceived = true;
return Task.CompletedTask;
}

return next();
}

public class Registration : RegisterStep
{
public Registration() : base("CatchControlMessage", typeof(PipelineBehavior), "Catch control message")
{
}
}
return next();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
namespace NServiceBus.AcceptanceTests.Sending
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using AcceptanceTesting;
using EndpointTemplates;
using Features;
using Microsoft.Extensions.DependencyInjection;
using NServiceBus.Pipeline;
using NServiceBus.Routing;
using NUnit.Framework;
using Transport;

class When_sending_messages_with_invalid_sqs_chars : NServiceBusAcceptanceTest
{
[Test]
public async Task Can_be_sent_and_processed()
{
var context = await Scenario.Define<MyContext>(ctx =>
{
ctx.DestinationQueueName = TestNameHelper.GetSqsQueueName("SendingMessagesWithInvalidSqsChars.Receiver", SetupFixture.NamePrefix);
ctx.ControlMessageId = Guid.NewGuid().ToString();
})
.WithEndpoint<Sender>()
.WithEndpoint<Receiver>()
.Done(ctx => ctx.ControlMessageReceived)
.Run();

Assert.That(context.ControlMessageBody, Is.Not.Empty);
}

class Sender : EndpointConfigurationBuilder
{
public Sender() => EndpointSetup<DefaultServer>(cfg => cfg.ConfigureSqsTransport().DoNotWrapOutgoingMessages = true);

class DispatchControlMessageAtStartup : Feature
{
public DispatchControlMessageAtStartup() => EnableByDefault();

protected override void Setup(FeatureConfigurationContext context) =>
context.RegisterStartupTask(sp => new Startup(
sp.GetRequiredService<IMessageDispatcher>(),
sp.GetRequiredService<MyContext>())
);

class Startup(IMessageDispatcher dispatcher, MyContext context) : FeatureStartupTask
{
protected override Task OnStart(IMessageSession session,
CancellationToken cancellationToken = default)
{
var transportOperations = new TransportOperations(
new TransportOperation(
new OutgoingMessage(
context.ControlMessageId,
new Dictionary<string, string>
{
[Headers.MessageId] = context.ControlMessageId
},
CreateBodyWithDisallowedCharacters()
),
new UnicastAddressTag(context.DestinationQueueName)
)
);
var transportTransaction = new TransportTransaction();
return dispatcher.Dispatch(transportOperations, transportTransaction, cancellationToken);
}

protected override Task OnStop(IMessageSession session,
CancellationToken cancellationToken = default) => Task.CompletedTask;
}
}

// See https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html
static byte[] CreateBodyWithDisallowedCharacters()
{
var disallowed = new List<int>(16559);

// Characters below #x9
disallowed.AddRange(Enumerable.Range(0x0, 0x9));

// Characters between #xB and #xC
disallowed.AddRange(Enumerable.Range(0xB, 2)); // #xB, #xC

// Characters between #xE and #x1F
disallowed.AddRange(Enumerable.Range(0xE, 0x20 - 0xE));

// Surrogate pairs (from #xD800 to #xDFFF) cannot be added because ConvertFromUtf32 throws
// disallowed.AddRange(Enumerable.Range(0xD800, 0xE000 - 0xD800));

// Characters greater than #x10FFFF
for (int i = 0x110000; i <= 0x10FFFF; i++)
{
disallowed.Add(i);
}

var byteList = new List<byte>(disallowed.Count * 4);
foreach (var codePoint in disallowed)
{
if (codePoint <= 0x10FFFF)
{
string charAsString = char.ConvertFromUtf32(codePoint);
byte[] utf8Bytes = Encoding.UTF8.GetBytes(charAsString);
byteList.AddRange(utf8Bytes);
}
}

return [.. byteList];
}
}

class Receiver : EndpointConfigurationBuilder
{
public Receiver() => EndpointSetup<DefaultServer>(c => c.Pipeline.Register("CatchControlMessage", typeof(CatchControlMessageBehavior), "Catches control message"));

class CatchControlMessageBehavior(MyContext myContext) : Behavior<IIncomingPhysicalMessageContext>
{
public override Task Invoke(IIncomingPhysicalMessageContext context, Func<Task> next)
{
if (context.MessageId == myContext.ControlMessageId)
{
myContext.ControlMessageBody = context.Message.Body.ToString();
myContext.ControlMessageReceived = true;
return Task.CompletedTask;
}

return next();
}
}
}

class MyContext : ScenarioContext
{
public string DestinationQueueName { get; set; }
public string ControlMessageId { get; set; }
public bool ControlMessageReceived { get; set; }
public string ControlMessageBody { get; set; }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,4 @@
<Compile Remove="$(PkgNServiceBus_TransportTests_Sources)\**\When_multiple_messages_are_available_and_concurrency_is_lowered_after_pump_started.cs" />
</ItemGroup>

<!-- The following tests currently fail as we don't support binary serialization in SQS, except for ServiceControl Metrics messages -->
<!-- See https://github.com/Particular/NServiceBus.AmazonSQS/issues/2630 -->
<ItemGroup Condition="'$(PkgNServiceBus_TransportTests_Sources)' != ''">
<Compile Remove="$(PkgNServiceBus_TransportTests_Sources)\**\When_message_is_available.cs" />
</ItemGroup>

</Project>

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,38 +1,47 @@
namespace NServiceBus.TransportTests
{
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using NUnit.Framework;
using Transport;

public class Sending_metrics_messages : NServiceBusTransportTest
public class Sending_messages_with_invalid_sqs_chars : NServiceBusTransportTest
{
[TestCase(TransportTransactionMode.None)]
[TestCase(TransportTransactionMode.ReceiveOnly)]
public async Task Should_not_fail_when_using_do_not_wrap(
public async Task Should_receive_message(
TransportTransactionMode transactionMode)
{
var messageProcessed = CreateTaskCompletionSource<MessageContext>();
byte[] copyOfTheBody = null;

await StartPump(
(context, _) => messageProcessed.SetCompleted(context),
(context, _) =>
{
// This is crucial due to internal buffer pooling in SQS transport
copyOfTheBody = context.Body.ToArray();
return messageProcessed.SetCompleted(context);
},
(_, __) => Task.FromResult(ErrorHandleResult.Handled),
TransportTransactionMode.None);

var headers = new Dictionary<string, string>
{
{ Transport.SQS.Constants.MetricsMessageMetricTypeHeaderKey, "doesn't matter" },
{ Headers.ContentType, Transport.SQS.Constants.MetricsMessageContentTypeHeaderValue }
{ "SomeHeader", "header value with invalid chars: \0" },
};
var body = Guid.NewGuid().ToByteArray();

var body = "body with invalid chars: \0"u8.ToArray();

await SendMessage(InputQueueName, headers, body: body);

var messageContext = await messageProcessed.Task;

Assert.That(messageContext.Headers, Is.Not.Empty);
Assert.That(messageContext.Headers, Is.SupersetOf(headers));
Assert.Multiple(() =>
{
Assert.That(messageContext.Headers, Is.SupersetOf(headers));
Assert.That(copyOfTheBody, Is.EquivalentTo(body));
});
}
}
}
2 changes: 2 additions & 0 deletions src/NServiceBus.Transport.SQS/Configure/SqsTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ public Func<Type, string, string> TopicNameGenerator
/// NServiceBus headers will be sent as an Amazon message attribute.
/// Only turn this on if all your endpoints are version 6.1.0 or above.
/// </summary>
/// <remarks>In cases when the outgoing message contains characters that are not compliant with the <see href="https://www.w3.org/TR/REC-xml/#charsets">W3C specification
/// for characters</see> <see href="https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html">SQS requires</see> the payload is base64 encoded automatically.</remarks>
public bool DoNotWrapOutgoingMessages { get; set; }

/// <summary>
Expand Down
7 changes: 0 additions & 7 deletions src/NServiceBus.Transport.SQS/Constants.cs

This file was deleted.

Loading

0 comments on commit 291c8cc

Please sign in to comment.