diff --git a/src/Modules/Administration/Infrastructure/Configuration/Processing/Inbox/ProcessInboxCommandHandler.cs b/src/Modules/Administration/Infrastructure/Configuration/Processing/Inbox/ProcessInboxCommandHandler.cs index 7a7bd4cc..0b08398a 100644 --- a/src/Modules/Administration/Infrastructure/Configuration/Processing/Inbox/ProcessInboxCommandHandler.cs +++ b/src/Modules/Administration/Infrastructure/Configuration/Processing/Inbox/ProcessInboxCommandHandler.cs @@ -26,12 +26,13 @@ public ProcessInboxCommandHandler(IMediator mediator, ISqlConnectionFactory sqlC public async Task Handle(ProcessInboxCommand command, CancellationToken cancellationToken) { var connection = this._sqlConnectionFactory.GetOpenConnection(); - const string sql = "SELECT " + - "[InboxMessage].[Id], " + - "[InboxMessage].[Type], " + - "[InboxMessage].[Data] " + - "FROM [administration].[InboxMessages] AS [InboxMessage] " + - "WHERE [InboxMessage].[ProcessedDate] IS NULL"; + string sql = "SELECT " + + $"[InboxMessage].[Id] AS [{nameof(InboxMessageDto.Id)}], " + + $"[InboxMessage].[Type] AS [{nameof(InboxMessageDto.Type)}], " + + $"[InboxMessage].[Data] AS [{nameof(InboxMessageDto.Data)}] " + + "FROM [administration].[InboxMessages] AS [InboxMessage] " + + "WHERE [InboxMessage].[ProcessedDate] IS NULL " + + "ORDER BY [InboxMessage].[OccurredOn]"; var messages = await connection.QueryAsync(sql); diff --git a/src/Modules/Administration/Infrastructure/Configuration/Processing/InternalCommands/ProcessInternalCommandsCommandHandler.cs b/src/Modules/Administration/Infrastructure/Configuration/Processing/InternalCommands/ProcessInternalCommandsCommandHandler.cs index 4a46e842..e48d8c42 100644 --- a/src/Modules/Administration/Infrastructure/Configuration/Processing/InternalCommands/ProcessInternalCommandsCommandHandler.cs +++ b/src/Modules/Administration/Infrastructure/Configuration/Processing/InternalCommands/ProcessInternalCommandsCommandHandler.cs @@ -34,7 +34,8 @@ public async Task Handle(ProcessInternalCommandsCommand command, Cancellat $"[Command].[Type] AS [{nameof(InternalCommandDto.Type)}], " + $"[Command].[Data] AS [{nameof(InternalCommandDto.Data)}] " + "FROM [administration].[InternalCommands] AS [Command] " + - "WHERE [Command].[ProcessedDate] IS NULL"; + "WHERE [Command].[ProcessedDate] IS NULL " + + "ORDER BY [Command].[EnqueueDate]"; var commands = await connection.QueryAsync(sql); diff --git a/src/Modules/Administration/Infrastructure/Configuration/Processing/Outbox/ProcessOutboxCommandHandler.cs b/src/Modules/Administration/Infrastructure/Configuration/Processing/Outbox/ProcessOutboxCommandHandler.cs index 445ca02e..bac3e0ff 100644 --- a/src/Modules/Administration/Infrastructure/Configuration/Processing/Outbox/ProcessOutboxCommandHandler.cs +++ b/src/Modules/Administration/Infrastructure/Configuration/Processing/Outbox/ProcessOutboxCommandHandler.cs @@ -36,12 +36,13 @@ public ProcessOutboxCommandHandler( public async Task Handle(ProcessOutboxCommand command, CancellationToken cancellationToken) { var connection = this._sqlConnectionFactory.GetOpenConnection(); - const string sql = "SELECT " + - "[OutboxMessage].[Id], " + - "[OutboxMessage].[Type], " + - "[OutboxMessage].[Data] " + + string sql = "SELECT " + + $"[OutboxMessage].[Id] AS [{nameof(OutboxMessageDto.Id)}], " + + $"[OutboxMessage].[Type] AS [{nameof(OutboxMessageDto.Type)}], " + + $"[OutboxMessage].[Data] AS [{nameof(OutboxMessageDto.Data)}] " + "FROM [administration].[OutboxMessages] AS [OutboxMessage] " + - "WHERE [OutboxMessage].[ProcessedDate] IS NULL"; + "WHERE [OutboxMessage].[ProcessedDate] IS NULL " + + "ORDER BY [OutboxMessage].[OccurredOn]"; var messages = await connection.QueryAsync(sql); var messagesList = messages.AsList(); diff --git a/src/Modules/Meetings/Infrastructure/Configuration/Processing/Inbox/ProcessInboxCommandHandler.cs b/src/Modules/Meetings/Infrastructure/Configuration/Processing/Inbox/ProcessInboxCommandHandler.cs index 5f944a5a..a5dc51ab 100644 --- a/src/Modules/Meetings/Infrastructure/Configuration/Processing/Inbox/ProcessInboxCommandHandler.cs +++ b/src/Modules/Meetings/Infrastructure/Configuration/Processing/Inbox/ProcessInboxCommandHandler.cs @@ -25,12 +25,13 @@ public ProcessInboxCommandHandler(IMediator mediator, ISqlConnectionFactory sqlC public async Task Handle(ProcessInboxCommand command, CancellationToken cancellationToken) { var connection = this._sqlConnectionFactory.GetOpenConnection(); - const string sql = "SELECT " + - "[InboxMessage].[Id], " + - "[InboxMessage].[Type], " + - "[InboxMessage].[Data] " + + string sql = "SELECT " + + $"[InboxMessage].[Id] AS [{nameof(InboxMessageDto.Id)}], " + + $"[InboxMessage].[Type] AS [{nameof(InboxMessageDto.Type)}], " + + $"[InboxMessage].[Data] AS [{nameof(InboxMessageDto.Data)}] " + "FROM [meetings].[InboxMessages] AS [InboxMessage] " + - "WHERE [InboxMessage].[ProcessedDate] IS NULL"; + "WHERE [InboxMessage].[ProcessedDate] IS NULL " + + "ORDER BY [InboxMessage].[OccurredOn]"; var messages = await connection.QueryAsync(sql); diff --git a/src/Modules/Meetings/Infrastructure/Configuration/Processing/InternalCommands/ProcessInternalCommandsCommandHandler.cs b/src/Modules/Meetings/Infrastructure/Configuration/Processing/InternalCommands/ProcessInternalCommandsCommandHandler.cs index c3418815..7e303d56 100644 --- a/src/Modules/Meetings/Infrastructure/Configuration/Processing/InternalCommands/ProcessInternalCommandsCommandHandler.cs +++ b/src/Modules/Meetings/Infrastructure/Configuration/Processing/InternalCommands/ProcessInternalCommandsCommandHandler.cs @@ -32,7 +32,8 @@ public async Task Handle(ProcessInternalCommandsCommand command, Cancellat $"[Command].[Type] AS [{nameof(InternalCommandDto.Type)}], " + $"[Command].[Data] AS [{nameof(InternalCommandDto.Data)}] " + "FROM [meetings].[InternalCommands] AS [Command] " + - "WHERE [Command].[ProcessedDate] IS NULL"; + "WHERE [Command].[ProcessedDate] IS NULL " + + "ORDER BY [Command].[EnqueueDate]"; var commands = await connection.QueryAsync(sql); var internalCommandsList = commands.AsList(); diff --git a/src/Modules/Meetings/Infrastructure/Configuration/Processing/Outbox/ProcessOutboxCommandHandler.cs b/src/Modules/Meetings/Infrastructure/Configuration/Processing/Outbox/ProcessOutboxCommandHandler.cs index 81a574c7..c3075495 100644 --- a/src/Modules/Meetings/Infrastructure/Configuration/Processing/Outbox/ProcessOutboxCommandHandler.cs +++ b/src/Modules/Meetings/Infrastructure/Configuration/Processing/Outbox/ProcessOutboxCommandHandler.cs @@ -35,12 +35,13 @@ public ProcessOutboxCommandHandler( public async Task Handle(ProcessOutboxCommand command, CancellationToken cancellationToken) { var connection = this._sqlConnectionFactory.GetOpenConnection(); - const string sql = "SELECT " + - "[OutboxMessage].[Id], " + - "[OutboxMessage].[Type], " + - "[OutboxMessage].[Data] " + - "FROM [meetings].[OutboxMessages] AS [OutboxMessage] " + - "WHERE [OutboxMessage].[ProcessedDate] IS NULL"; + string sql = "SELECT " + + $"[OutboxMessage].[Id] AS [{nameof(OutboxMessageDto.Id)}], " + + $"[OutboxMessage].[Type] AS [{nameof(OutboxMessageDto.Type)}], " + + $"[OutboxMessage].[Data] AS [{nameof(OutboxMessageDto.Data)}] " + + "FROM [meetings].[OutboxMessages] AS [OutboxMessage] " + + "WHERE [OutboxMessage].[ProcessedDate] IS NULL " + + "ORDER BY [OutboxMessage].[OccurredOn]"; var messages = await connection.QueryAsync(sql); var messagesList = messages.AsList(); diff --git a/src/Modules/Payments/Infrastructure/AggregateStore/SqlStreamAggregateStore.cs b/src/Modules/Payments/Infrastructure/AggregateStore/SqlStreamAggregateStore.cs index d9ba86b3..dccd1f81 100644 --- a/src/Modules/Payments/Infrastructure/AggregateStore/SqlStreamAggregateStore.cs +++ b/src/Modules/Payments/Infrastructure/AggregateStore/SqlStreamAggregateStore.cs @@ -56,9 +56,11 @@ public async Task Load(AggregateId aggregateId) IList domainEvents = new List(); ReadStreamPage readStreamPage; + int position = StreamVersion.Start; + int take = 100; do { - readStreamPage = await _streamStore.ReadStreamForwards(streamId, StreamVersion.Start, maxCount: 100); + readStreamPage = await _streamStore.ReadStreamForwards(streamId, position, take); var messages = readStreamPage.Messages; foreach (var streamMessage in messages) { @@ -68,6 +70,8 @@ public async Task Load(AggregateId aggregateId) domainEvents.Add(domainEvent); } + + position += take; } while (!readStreamPage.IsEnd); diff --git a/src/Modules/Payments/Infrastructure/Configuration/Processing/Inbox/ProcessInboxCommandHandler.cs b/src/Modules/Payments/Infrastructure/Configuration/Processing/Inbox/ProcessInboxCommandHandler.cs index 33197d92..3716a727 100644 --- a/src/Modules/Payments/Infrastructure/Configuration/Processing/Inbox/ProcessInboxCommandHandler.cs +++ b/src/Modules/Payments/Infrastructure/Configuration/Processing/Inbox/ProcessInboxCommandHandler.cs @@ -25,12 +25,13 @@ public ProcessInboxCommandHandler(IMediator mediator, ISqlConnectionFactory sqlC public async Task Handle(ProcessInboxCommand command, CancellationToken cancellationToken) { var connection = this._sqlConnectionFactory.GetOpenConnection(); - const string sql = "SELECT " + - "[InboxMessage].[Id], " + - "[InboxMessage].[Type], " + - "[InboxMessage].[Data] " + + string sql = "SELECT " + + $"[InboxMessage].[Id] AS [{nameof(InboxMessageDto.Id)}], " + + $"[InboxMessage].[Type] AS [{nameof(InboxMessageDto.Type)}], " + + $"[InboxMessage].[Data] AS [{nameof(InboxMessageDto.Data)}] " + "FROM [payments].[InboxMessages] AS [InboxMessage] " + - "WHERE [InboxMessage].[ProcessedDate] IS NULL"; + "WHERE [InboxMessage].[ProcessedDate] IS NULL " + + "ORDER BY [InboxMessage].[OccurredOn]"; var messages = await connection.QueryAsync(sql); diff --git a/src/Modules/Payments/Infrastructure/Configuration/Processing/InternalCommands/ProcessInternalCommandsCommandHandler.cs b/src/Modules/Payments/Infrastructure/Configuration/Processing/InternalCommands/ProcessInternalCommandsCommandHandler.cs index 62d59d01..0fac3192 100644 --- a/src/Modules/Payments/Infrastructure/Configuration/Processing/InternalCommands/ProcessInternalCommandsCommandHandler.cs +++ b/src/Modules/Payments/Infrastructure/Configuration/Processing/InternalCommands/ProcessInternalCommandsCommandHandler.cs @@ -29,7 +29,8 @@ public async Task Handle(ProcessInternalCommandsCommand command, Cancellat $"[Command].[Type] AS [{nameof(InternalCommandDto.Type)}], " + $"[Command].[Data] AS [{nameof(InternalCommandDto.Data)}] " + "FROM [payments].[InternalCommands] AS [Command] " + - "WHERE [Command].[ProcessedDate] IS NULL"; + "WHERE [Command].[ProcessedDate] IS NULL " + + "ORDER BY [Command].[EnqueueDate]"; var commands = await connection.QueryAsync(sql); var internalCommandsList = commands.AsList(); diff --git a/src/Modules/Payments/Infrastructure/Configuration/Processing/Outbox/ProcessOutboxCommandHandler.cs b/src/Modules/Payments/Infrastructure/Configuration/Processing/Outbox/ProcessOutboxCommandHandler.cs index 28ca8d3d..df223cde 100644 --- a/src/Modules/Payments/Infrastructure/Configuration/Processing/Outbox/ProcessOutboxCommandHandler.cs +++ b/src/Modules/Payments/Infrastructure/Configuration/Processing/Outbox/ProcessOutboxCommandHandler.cs @@ -35,12 +35,13 @@ public ProcessOutboxCommandHandler( public async Task Handle(ProcessOutboxCommand command, CancellationToken cancellationToken) { var connection = this._sqlConnectionFactory.GetOpenConnection(); - const string sql = "SELECT " + - "[OutboxMessage].[Id], " + - "[OutboxMessage].[Type], " + - "[OutboxMessage].[Data] " + - "FROM [payments].[OutboxMessages] AS [OutboxMessage] " + - "WHERE [OutboxMessage].[ProcessedDate] IS NULL"; + string sql = "SELECT " + + $"[OutboxMessage].[Id] AS [{nameof(OutboxMessageDto.Id)}], " + + $"[OutboxMessage].[Type] AS [{nameof(OutboxMessageDto.Type)}], " + + $"[OutboxMessage].[Data] AS [{nameof(OutboxMessageDto.Data)}] " + + "FROM [payments].[OutboxMessages] AS [OutboxMessage] " + + "WHERE [OutboxMessage].[ProcessedDate] IS NULL " + + "ORDER BY [OutboxMessage].[OccurredOn]"; var messages = await connection.QueryAsync(sql); var messagesList = messages.AsList(); diff --git a/src/Modules/UserAccess/Infrastructure/Configuration/Processing/Inbox/ProcessInboxCommandHandler.cs b/src/Modules/UserAccess/Infrastructure/Configuration/Processing/Inbox/ProcessInboxCommandHandler.cs index 6c738158..6e62efc8 100644 --- a/src/Modules/UserAccess/Infrastructure/Configuration/Processing/Inbox/ProcessInboxCommandHandler.cs +++ b/src/Modules/UserAccess/Infrastructure/Configuration/Processing/Inbox/ProcessInboxCommandHandler.cs @@ -25,12 +25,13 @@ public ProcessInboxCommandHandler(IMediator mediator, ISqlConnectionFactory sqlC public async Task Handle(ProcessInboxCommand command, CancellationToken cancellationToken) { var connection = this._sqlConnectionFactory.GetOpenConnection(); - const string sql = "SELECT " + - "[InboxMessage].[Id], " + - "[InboxMessage].[Type], " + - "[InboxMessage].[Data] " + - "FROM [users].[InboxMessages] AS [InboxMessage] " + - "WHERE [InboxMessage].[ProcessedDate] IS NULL"; + string sql = "SELECT " + + $"[InboxMessage].[Id] AS [{nameof(InboxMessageDto.Id)}], " + + $"[InboxMessage].[Type] AS [{nameof(InboxMessageDto.Type)}], " + + $"[InboxMessage].[Data] AS [{nameof(InboxMessageDto.Data)}] " + + "FROM [users].[InboxMessages] AS [InboxMessage] " + + "WHERE [InboxMessage].[ProcessedDate] IS NULL " + + "ORDER BY [InboxMessage].[OccurredOn]"; var messages = await connection.QueryAsync(sql); diff --git a/src/Modules/UserAccess/Infrastructure/Configuration/Processing/InternalCommands/ProcessInternalCommandsCommandHandler.cs b/src/Modules/UserAccess/Infrastructure/Configuration/Processing/InternalCommands/ProcessInternalCommandsCommandHandler.cs index 420e0861..5f8a79c3 100644 --- a/src/Modules/UserAccess/Infrastructure/Configuration/Processing/InternalCommands/ProcessInternalCommandsCommandHandler.cs +++ b/src/Modules/UserAccess/Infrastructure/Configuration/Processing/InternalCommands/ProcessInternalCommandsCommandHandler.cs @@ -31,7 +31,8 @@ public async Task Handle(ProcessInternalCommandsCommand command, Cancellat $"[Command].[Type] AS [{nameof(InternalCommandDto.Type)}], " + $"[Command].[Data] AS [{nameof(InternalCommandDto.Data)}] " + "FROM [users].[InternalCommands] AS [Command] " + - "WHERE [Command].[ProcessedDate] IS NULL"; + "WHERE [Command].[ProcessedDate] IS NULL " + + "ORDER BY [Command].[EnqueueDate]"; var commands = await connection.QueryAsync(sql); var internalCommandsList = commands.AsList(); diff --git a/src/Modules/UserAccess/Infrastructure/Configuration/Processing/Outbox/ProcessOutboxCommandHandler.cs b/src/Modules/UserAccess/Infrastructure/Configuration/Processing/Outbox/ProcessOutboxCommandHandler.cs index 76a36ce6..02f58b14 100644 --- a/src/Modules/UserAccess/Infrastructure/Configuration/Processing/Outbox/ProcessOutboxCommandHandler.cs +++ b/src/Modules/UserAccess/Infrastructure/Configuration/Processing/Outbox/ProcessOutboxCommandHandler.cs @@ -36,12 +36,13 @@ public ProcessOutboxCommandHandler( public async Task Handle(ProcessOutboxCommand command, CancellationToken cancellationToken) { var connection = this._sqlConnectionFactory.GetOpenConnection(); - const string sql = "SELECT " + - "[OutboxMessage].[Id], " + - "[OutboxMessage].[Type], " + - "[OutboxMessage].[Data] " + - "FROM [users].[OutboxMessages] AS [OutboxMessage] " + - "WHERE [OutboxMessage].[ProcessedDate] IS NULL"; + string sql = "SELECT " + + $"[OutboxMessage].[Id] AS [{nameof(OutboxMessageDto.Id)}], " + + $"[OutboxMessage].[Type] AS [{nameof(OutboxMessageDto.Type)}], " + + $"[OutboxMessage].[Data] AS [{nameof(OutboxMessageDto.Data)}] " + + "FROM [users].[OutboxMessages] AS [OutboxMessage] " + + "WHERE [OutboxMessage].[ProcessedDate] IS NULL " + + "ORDER BY [OutboxMessage].[OccurredOn]"; var messages = await connection.QueryAsync(sql); var messagesList = messages.AsList();