Skip to content

Commit

Permalink
Fixes for messages processing. Fix for ES aggregate load.
Browse files Browse the repository at this point in the history
  • Loading branch information
kgrzybek committed Nov 7, 2020
1 parent 1a3f92a commit 7f2b763
Show file tree
Hide file tree
Showing 13 changed files with 66 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@ public ProcessInboxCommandHandler(IMediator mediator, ISqlConnectionFactory sqlC
public async Task<Unit> Handle(ProcessInboxCommand command, CancellationToken cancellationToken)
{
var connection = this._sqlConnectionFactory.GetOpenConnection();
const string sql = "SELECT " +
"[InboxMessage].[Id], " +
"[InboxMessage].[Type], " +
"[InboxMessage].[Data] " +
"FROM [administration].[InboxMessages] AS [InboxMessage] " +
"WHERE [InboxMessage].[ProcessedDate] IS NULL";
string sql = "SELECT " +
$"[InboxMessage].[Id] AS [{nameof(InboxMessageDto.Id)}], " +
$"[InboxMessage].[Type] AS [{nameof(InboxMessageDto.Type)}], " +
$"[InboxMessage].[Data] AS [{nameof(InboxMessageDto.Data)}] " +
"FROM [administration].[InboxMessages] AS [InboxMessage] " +
"WHERE [InboxMessage].[ProcessedDate] IS NULL " +
"ORDER BY [InboxMessage].[OccurredOn]";

var messages = await connection.QueryAsync<InboxMessageDto>(sql);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ public async Task<Unit> Handle(ProcessInternalCommandsCommand command, Cancellat
$"[Command].[Type] AS [{nameof(InternalCommandDto.Type)}], " +
$"[Command].[Data] AS [{nameof(InternalCommandDto.Data)}] " +
"FROM [administration].[InternalCommands] AS [Command] " +
"WHERE [Command].[ProcessedDate] IS NULL";
"WHERE [Command].[ProcessedDate] IS NULL " +
"ORDER BY [Command].[EnqueueDate]";

var commands = await connection.QueryAsync<InternalCommandDto>(sql);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ public ProcessOutboxCommandHandler(
public async Task<Unit> Handle(ProcessOutboxCommand command, CancellationToken cancellationToken)
{
var connection = this._sqlConnectionFactory.GetOpenConnection();
const string sql = "SELECT " +
"[OutboxMessage].[Id], " +
"[OutboxMessage].[Type], " +
"[OutboxMessage].[Data] " +
string sql = "SELECT " +
$"[OutboxMessage].[Id] AS [{nameof(OutboxMessageDto.Id)}], " +
$"[OutboxMessage].[Type] AS [{nameof(OutboxMessageDto.Type)}], " +
$"[OutboxMessage].[Data] AS [{nameof(OutboxMessageDto.Data)}] " +
"FROM [administration].[OutboxMessages] AS [OutboxMessage] " +
"WHERE [OutboxMessage].[ProcessedDate] IS NULL";
"WHERE [OutboxMessage].[ProcessedDate] IS NULL " +
"ORDER BY [OutboxMessage].[OccurredOn]";

var messages = await connection.QueryAsync<OutboxMessageDto>(sql);
var messagesList = messages.AsList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ public ProcessInboxCommandHandler(IMediator mediator, ISqlConnectionFactory sqlC
public async Task<Unit> Handle(ProcessInboxCommand command, CancellationToken cancellationToken)
{
var connection = this._sqlConnectionFactory.GetOpenConnection();
const string sql = "SELECT " +
"[InboxMessage].[Id], " +
"[InboxMessage].[Type], " +
"[InboxMessage].[Data] " +
string sql = "SELECT " +
$"[InboxMessage].[Id] AS [{nameof(InboxMessageDto.Id)}], " +
$"[InboxMessage].[Type] AS [{nameof(InboxMessageDto.Type)}], " +
$"[InboxMessage].[Data] AS [{nameof(InboxMessageDto.Data)}] " +
"FROM [meetings].[InboxMessages] AS [InboxMessage] " +
"WHERE [InboxMessage].[ProcessedDate] IS NULL";
"WHERE [InboxMessage].[ProcessedDate] IS NULL " +
"ORDER BY [InboxMessage].[OccurredOn]";

var messages = await connection.QueryAsync<InboxMessageDto>(sql);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ public async Task<Unit> Handle(ProcessInternalCommandsCommand command, Cancellat
$"[Command].[Type] AS [{nameof(InternalCommandDto.Type)}], " +
$"[Command].[Data] AS [{nameof(InternalCommandDto.Data)}] " +
"FROM [meetings].[InternalCommands] AS [Command] " +
"WHERE [Command].[ProcessedDate] IS NULL";
"WHERE [Command].[ProcessedDate] IS NULL " +
"ORDER BY [Command].[EnqueueDate]";
var commands = await connection.QueryAsync<InternalCommandDto>(sql);

var internalCommandsList = commands.AsList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@ public ProcessOutboxCommandHandler(
public async Task<Unit> Handle(ProcessOutboxCommand command, CancellationToken cancellationToken)
{
var connection = this._sqlConnectionFactory.GetOpenConnection();
const string sql = "SELECT " +
"[OutboxMessage].[Id], " +
"[OutboxMessage].[Type], " +
"[OutboxMessage].[Data] " +
"FROM [meetings].[OutboxMessages] AS [OutboxMessage] " +
"WHERE [OutboxMessage].[ProcessedDate] IS NULL";
string sql = "SELECT " +
$"[OutboxMessage].[Id] AS [{nameof(OutboxMessageDto.Id)}], " +
$"[OutboxMessage].[Type] AS [{nameof(OutboxMessageDto.Type)}], " +
$"[OutboxMessage].[Data] AS [{nameof(OutboxMessageDto.Data)}] " +
"FROM [meetings].[OutboxMessages] AS [OutboxMessage] " +
"WHERE [OutboxMessage].[ProcessedDate] IS NULL " +
"ORDER BY [OutboxMessage].[OccurredOn]";

var messages = await connection.QueryAsync<OutboxMessageDto>(sql);
var messagesList = messages.AsList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,11 @@ public async Task<T> Load<T>(AggregateId<T> aggregateId)

IList<IDomainEvent> domainEvents = new List<IDomainEvent>();
ReadStreamPage readStreamPage;
int position = StreamVersion.Start;
int take = 100;
do
{
readStreamPage = await _streamStore.ReadStreamForwards(streamId, StreamVersion.Start, maxCount: 100);
readStreamPage = await _streamStore.ReadStreamForwards(streamId, position, take);
var messages = readStreamPage.Messages;
foreach (var streamMessage in messages)
{
Expand All @@ -68,6 +70,8 @@ public async Task<T> Load<T>(AggregateId<T> aggregateId)

domainEvents.Add(domainEvent);
}

position += take;
}
while (!readStreamPage.IsEnd);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ public ProcessInboxCommandHandler(IMediator mediator, ISqlConnectionFactory sqlC
public async Task<Unit> Handle(ProcessInboxCommand command, CancellationToken cancellationToken)
{
var connection = this._sqlConnectionFactory.GetOpenConnection();
const string sql = "SELECT " +
"[InboxMessage].[Id], " +
"[InboxMessage].[Type], " +
"[InboxMessage].[Data] " +
string sql = "SELECT " +
$"[InboxMessage].[Id] AS [{nameof(InboxMessageDto.Id)}], " +
$"[InboxMessage].[Type] AS [{nameof(InboxMessageDto.Type)}], " +
$"[InboxMessage].[Data] AS [{nameof(InboxMessageDto.Data)}] " +
"FROM [payments].[InboxMessages] AS [InboxMessage] " +
"WHERE [InboxMessage].[ProcessedDate] IS NULL";
"WHERE [InboxMessage].[ProcessedDate] IS NULL " +
"ORDER BY [InboxMessage].[OccurredOn]";

var messages = await connection.QueryAsync<InboxMessageDto>(sql);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ public async Task<Unit> Handle(ProcessInternalCommandsCommand command, Cancellat
$"[Command].[Type] AS [{nameof(InternalCommandDto.Type)}], " +
$"[Command].[Data] AS [{nameof(InternalCommandDto.Data)}] " +
"FROM [payments].[InternalCommands] AS [Command] " +
"WHERE [Command].[ProcessedDate] IS NULL";
"WHERE [Command].[ProcessedDate] IS NULL " +
"ORDER BY [Command].[EnqueueDate]";
var commands = await connection.QueryAsync<InternalCommandDto>(sql);

var internalCommandsList = commands.AsList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@ public ProcessOutboxCommandHandler(
public async Task<Unit> Handle(ProcessOutboxCommand command, CancellationToken cancellationToken)
{
var connection = this._sqlConnectionFactory.GetOpenConnection();
const string sql = "SELECT " +
"[OutboxMessage].[Id], " +
"[OutboxMessage].[Type], " +
"[OutboxMessage].[Data] " +
"FROM [payments].[OutboxMessages] AS [OutboxMessage] " +
"WHERE [OutboxMessage].[ProcessedDate] IS NULL";
string sql = "SELECT " +
$"[OutboxMessage].[Id] AS [{nameof(OutboxMessageDto.Id)}], " +
$"[OutboxMessage].[Type] AS [{nameof(OutboxMessageDto.Type)}], " +
$"[OutboxMessage].[Data] AS [{nameof(OutboxMessageDto.Data)}] " +
"FROM [payments].[OutboxMessages] AS [OutboxMessage] " +
"WHERE [OutboxMessage].[ProcessedDate] IS NULL " +
"ORDER BY [OutboxMessage].[OccurredOn]";

var messages = await connection.QueryAsync<OutboxMessageDto>(sql);
var messagesList = messages.AsList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ public ProcessInboxCommandHandler(IMediator mediator, ISqlConnectionFactory sqlC
public async Task<Unit> Handle(ProcessInboxCommand command, CancellationToken cancellationToken)
{
var connection = this._sqlConnectionFactory.GetOpenConnection();
const string sql = "SELECT " +
"[InboxMessage].[Id], " +
"[InboxMessage].[Type], " +
"[InboxMessage].[Data] " +
"FROM [users].[InboxMessages] AS [InboxMessage] " +
"WHERE [InboxMessage].[ProcessedDate] IS NULL";
string sql = "SELECT " +
$"[InboxMessage].[Id] AS [{nameof(InboxMessageDto.Id)}], " +
$"[InboxMessage].[Type] AS [{nameof(InboxMessageDto.Type)}], " +
$"[InboxMessage].[Data] AS [{nameof(InboxMessageDto.Data)}] " +
"FROM [users].[InboxMessages] AS [InboxMessage] " +
"WHERE [InboxMessage].[ProcessedDate] IS NULL " +
"ORDER BY [InboxMessage].[OccurredOn]";

var messages = await connection.QueryAsync<InboxMessageDto>(sql);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ public async Task<Unit> Handle(ProcessInternalCommandsCommand command, Cancellat
$"[Command].[Type] AS [{nameof(InternalCommandDto.Type)}], " +
$"[Command].[Data] AS [{nameof(InternalCommandDto.Data)}] " +
"FROM [users].[InternalCommands] AS [Command] " +
"WHERE [Command].[ProcessedDate] IS NULL";
"WHERE [Command].[ProcessedDate] IS NULL " +
"ORDER BY [Command].[EnqueueDate]";
var commands = await connection.QueryAsync<InternalCommandDto>(sql);

var internalCommandsList = commands.AsList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,13 @@ public ProcessOutboxCommandHandler(
public async Task<Unit> Handle(ProcessOutboxCommand command, CancellationToken cancellationToken)
{
var connection = this._sqlConnectionFactory.GetOpenConnection();
const string sql = "SELECT " +
"[OutboxMessage].[Id], " +
"[OutboxMessage].[Type], " +
"[OutboxMessage].[Data] " +
"FROM [users].[OutboxMessages] AS [OutboxMessage] " +
"WHERE [OutboxMessage].[ProcessedDate] IS NULL";
string sql = "SELECT " +
$"[OutboxMessage].[Id] AS [{nameof(OutboxMessageDto.Id)}], " +
$"[OutboxMessage].[Type] AS [{nameof(OutboxMessageDto.Type)}], " +
$"[OutboxMessage].[Data] AS [{nameof(OutboxMessageDto.Data)}] " +
"FROM [users].[OutboxMessages] AS [OutboxMessage] " +
"WHERE [OutboxMessage].[ProcessedDate] IS NULL " +
"ORDER BY [OutboxMessage].[OccurredOn]";

var messages = await connection.QueryAsync<OutboxMessageDto>(sql);
var messagesList = messages.AsList();
Expand Down

0 comments on commit 7f2b763

Please sign in to comment.