Skip to content

Commit

Permalink
feat: add events + test to check if projections / producer are missin…
Browse files Browse the repository at this point in the history
…g events
  • Loading branch information
ArneD committed Nov 19, 2024
1 parent 2f3c89d commit e746932
Show file tree
Hide file tree
Showing 21 changed files with 460 additions and 68 deletions.
18 changes: 9 additions & 9 deletions paket.dependencies
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,15 @@ nuget Be.Vlaanderen.Basisregisters.Projector 15.1.0

nuget Be.Vlaanderen.Basisregisters.Crab 4.0.0

nuget Be.Vlaanderen.Basisregisters.GrAr.Common 21.14.0
nuget Be.Vlaanderen.Basisregisters.GrAr.Contracts 21.14.0
nuget Be.Vlaanderen.Basisregisters.GrAr.Edit 21.14.0
nuget Be.Vlaanderen.Basisregisters.GrAr.Import 21.14.0
nuget Be.Vlaanderen.Basisregisters.GrAr.Legacy 21.14.0
nuget Be.Vlaanderen.Basisregisters.GrAr.Oslo 21.14.0
nuget Be.Vlaanderen.Basisregisters.GrAr.Provenance 21.14.0
nuget Be.Vlaanderen.Basisregisters.GrAr.Provenance.AcmIdm 21.14.0
nuget Be.Vlaanderen.Basisregisters.GrAr.Extracts 21.14.0
nuget Be.Vlaanderen.Basisregisters.GrAr.Common 21.15.0
nuget Be.Vlaanderen.Basisregisters.GrAr.Contracts 21.15.0
nuget Be.Vlaanderen.Basisregisters.GrAr.Edit 21.15.0
nuget Be.Vlaanderen.Basisregisters.GrAr.Import 21.15.0
nuget Be.Vlaanderen.Basisregisters.GrAr.Legacy 21.15.0
nuget Be.Vlaanderen.Basisregisters.GrAr.Oslo 21.15.0
nuget Be.Vlaanderen.Basisregisters.GrAr.Provenance 21.15.0
nuget Be.Vlaanderen.Basisregisters.GrAr.Provenance.AcmIdm 21.15.0
nuget Be.Vlaanderen.Basisregisters.GrAr.Extracts 21.15.0

nuget Be.Vlaanderen.Basisregisters.MessageHandling.AwsSqs.Simple 5.1.0
nuget Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer 5.1.0
Expand Down
28 changes: 14 additions & 14 deletions paket.lock
Original file line number Diff line number Diff line change
Expand Up @@ -247,18 +247,18 @@ NUGET
Autofac.Extensions.DependencyInjection (>= 9.0)
Be.Vlaanderen.Basisregisters.EventHandling (6.0)
Be.Vlaanderen.Basisregisters.Generators.Guid.Deterministic (4.0)
Be.Vlaanderen.Basisregisters.GrAr.Common (21.14)
Be.Vlaanderen.Basisregisters.GrAr.Common (21.15)
Be.Vlaanderen.Basisregisters.AggregateSource (>= 9.0.1)
Be.Vlaanderen.Basisregisters.CommandHandling (>= 9.0.1)
NetTopologySuite (>= 2.5)
NodaTime (>= 3.1.11)
Be.Vlaanderen.Basisregisters.GrAr.Contracts (21.14)
Be.Vlaanderen.Basisregisters.GrAr.Edit (21.14)
Be.Vlaanderen.Basisregisters.GrAr.Contracts (21.15)
Be.Vlaanderen.Basisregisters.GrAr.Edit (21.15)
NetTopologySuite (>= 2.5)
Be.Vlaanderen.Basisregisters.GrAr.Extracts (21.14)
Be.Vlaanderen.Basisregisters.GrAr.Extracts (21.15)
Be.Vlaanderen.Basisregisters.Api (>= 21.0)
Be.Vlaanderen.Basisregisters.Shaperon (>= 10.0.2)
Be.Vlaanderen.Basisregisters.GrAr.Import (21.14)
Be.Vlaanderen.Basisregisters.GrAr.Import (21.15)
Autofac (>= 8.0)
Be.Vlaanderen.Basisregisters.AggregateSource.SqlStreamStore (>= 9.0.1)
Be.Vlaanderen.Basisregisters.CommandHandling (>= 9.0.1)
Expand All @@ -273,28 +273,28 @@ NUGET
Serilog (>= 3.1.1)
Serilog.Extensions.Logging (>= 8.0)
System.Threading.Tasks.Dataflow (>= 8.0)
Be.Vlaanderen.Basisregisters.GrAr.Legacy (21.14)
Be.Vlaanderen.Basisregisters.GrAr.Common (21.14)
Be.Vlaanderen.Basisregisters.GrAr.Legacy (21.15)
Be.Vlaanderen.Basisregisters.GrAr.Common (21.15)
Be.Vlaanderen.Basisregisters.Utilities.Rfc3339DateTimeOffset (>= 4.0)
Newtonsoft.Json (>= 13.0.3)
Be.Vlaanderen.Basisregisters.GrAr.Oslo (21.14)
Be.Vlaanderen.Basisregisters.GrAr.Oslo (21.15)
Be.Vlaanderen.Basisregisters.AspNetCore.Mvc.Formatters.Json (>= 5.0)
Be.Vlaanderen.Basisregisters.GrAr.Common (21.14)
Be.Vlaanderen.Basisregisters.GrAr.Common (21.15)
Be.Vlaanderen.Basisregisters.Utilities.Rfc3339DateTimeOffset (>= 4.0)
Microsoft.Extensions.Configuration (>= 8.0)
Microsoft.Extensions.Http.Polly (>= 8.0.3)
Newtonsoft.Json (>= 13.0.3)
Be.Vlaanderen.Basisregisters.GrAr.Provenance (21.14)
Be.Vlaanderen.Basisregisters.GrAr.Provenance (21.15)
Be.Vlaanderen.Basisregisters.CommandHandling (>= 9.0.1)
Be.Vlaanderen.Basisregisters.Crab (>= 4.0)
Be.Vlaanderen.Basisregisters.GrAr.Common (21.14)
Be.Vlaanderen.Basisregisters.GrAr.Common (21.15)
Microsoft.CSharp (>= 4.7)
Be.Vlaanderen.Basisregisters.GrAr.Provenance.AcmIdm (21.14)
Be.Vlaanderen.Basisregisters.GrAr.Provenance.AcmIdm (21.15)
Be.Vlaanderen.Basisregisters.Auth.AcmIdm (>= 2.0)
Be.Vlaanderen.Basisregisters.CommandHandling (>= 9.0.1)
Be.Vlaanderen.Basisregisters.Crab (>= 4.0)
Be.Vlaanderen.Basisregisters.GrAr.Common (21.14)
Be.Vlaanderen.Basisregisters.GrAr.Provenance (21.14)
Be.Vlaanderen.Basisregisters.GrAr.Common (21.15)
Be.Vlaanderen.Basisregisters.GrAr.Provenance (21.15)
Microsoft.CSharp (>= 4.7)
Be.Vlaanderen.Basisregisters.MessageHandling.AwsSqs.Simple (5.1)
AWSSDK.Core (>= 3.7.302.15)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ namespace StreetNameRegistry.Producer.Snapshot.Oslo
using System.Threading;
using System.Threading.Tasks;
using AllStream.Events;
using Be.Vlaanderen.Basisregisters.EventHandling;
using Be.Vlaanderen.Basisregisters.GrAr.Oslo.SnapshotProducer;
using Be.Vlaanderen.Basisregisters.MessageHandling.Kafka;
using Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Producer;
Expand Down Expand Up @@ -271,6 +272,29 @@ await snapshotManager.FindMatchingSnapshot(
{
await Produce($"{osloNamespace}/{message.Message.PersistentLocalId}", message.Message.PersistentLocalId.ToString(),"{}", message.Position, ct);
});


When<Be.Vlaanderen.Basisregisters.ProjectionHandling.SqlStreamStore.Envelope<MunicipalityNisCodeWasChanged>>(async (_, message, ct) =>
{
foreach (var persistentLocalId in message.Message.StreetNamePersistentLocalIds)
{
await Produce($"{osloNamespace}/{persistentLocalId}", persistentLocalId.ToString(),"{}", message.Position, ct);
}
});

When<Be.Vlaanderen.Basisregisters.ProjectionHandling.SqlStreamStore.Envelope<MunicipalityBecameCurrent>>(DoNothing);
When<Be.Vlaanderen.Basisregisters.ProjectionHandling.SqlStreamStore.Envelope<MunicipalityFacilityLanguageWasAdded>>(DoNothing);
When<Be.Vlaanderen.Basisregisters.ProjectionHandling.SqlStreamStore.Envelope<MunicipalityFacilityLanguageWasRemoved>>(DoNothing);
When<Be.Vlaanderen.Basisregisters.ProjectionHandling.SqlStreamStore.Envelope<MunicipalityOfficialLanguageWasAdded>>(DoNothing);
When<Be.Vlaanderen.Basisregisters.ProjectionHandling.SqlStreamStore.Envelope<MunicipalityOfficialLanguageWasRemoved>>(DoNothing);
When<Be.Vlaanderen.Basisregisters.ProjectionHandling.SqlStreamStore.Envelope<MunicipalityWasCorrectedToCurrent>>(DoNothing);
When<Be.Vlaanderen.Basisregisters.ProjectionHandling.SqlStreamStore.Envelope<MunicipalityWasCorrectedToRetired>>(DoNothing);
When<Be.Vlaanderen.Basisregisters.ProjectionHandling.SqlStreamStore.Envelope<MunicipalityWasImported>>(DoNothing);
When<Be.Vlaanderen.Basisregisters.ProjectionHandling.SqlStreamStore.Envelope<MunicipalityWasMerged>>(DoNothing);
When<Be.Vlaanderen.Basisregisters.ProjectionHandling.SqlStreamStore.Envelope<MunicipalityWasNamed>>(DoNothing);
When<Be.Vlaanderen.Basisregisters.ProjectionHandling.SqlStreamStore.Envelope<MunicipalityWasRetired>>(DoNothing);
When<Be.Vlaanderen.Basisregisters.ProjectionHandling.SqlStreamStore.Envelope<StreetNameHomonymAdditionsWereCorrected>>(DoNothing);
When<Be.Vlaanderen.Basisregisters.ProjectionHandling.SqlStreamStore.Envelope<StreetNameHomonymAdditionsWereRemoved>>(DoNothing);
}

private async Task FindAndProduce(
Expand Down Expand Up @@ -304,5 +328,7 @@ private async Task Produce(
throw new InvalidOperationException(result.Error + Environment.NewLine + result.ErrorReason);
}
}

private static Task DoNothing<T>(ProducerContext context, Be.Vlaanderen.Basisregisters.ProjectionHandling.SqlStreamStore.Envelope<T> envelope, CancellationToken ct) where T: IMessage => Task.CompletedTask;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -197,5 +197,13 @@ public static Contracts.StreetNameHomonymAdditionsWereRemoved ToContract(

public static Contracts.StreetNameWasRemovedV2 ToContract(this StreetNameWasRemovedV2 message) =>
new Contracts.StreetNameWasRemovedV2(message.MunicipalityId.ToString("D"), message.PersistentLocalId, message.Provenance.ToContract());

public static Contracts.MunicipalityNisCodeWasChanged ToContract(
this MunicipalityNisCodeWasChanged message) =>
new Contracts.MunicipalityNisCodeWasChanged(
message.MunicipalityId.ToString("D"),
message.NisCode,
message.StreetNamePersistentLocalIds,
message.Provenance.ToContract());
}
}
35 changes: 34 additions & 1 deletion src/StreetNameRegistry.Producer/ProducerMigrateProjections.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Be.Vlaanderen.Basisregisters.EventHandling;
using Be.Vlaanderen.Basisregisters.GrAr.Contracts;
using Be.Vlaanderen.Basisregisters.MessageHandling.Kafka;
using Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Producer;
Expand Down Expand Up @@ -141,6 +142,26 @@ await Produce(message.Message.PersistentLocalId, message.Message.ToContract(), m
await Produce(message.Message.PersistentLocalId, message.Message.ToContract(), message.Position,
ct);
});

When<Be.Vlaanderen.Basisregisters.ProjectionHandling.SqlStreamStore.Envelope<MunicipalityDomain.MunicipalityNisCodeWasChanged>>(
async (_, message, ct) =>
{
await Produce(message.Message.MunicipalityId.ToString(), message.Message.ToContract(), message.Position, ct);
});

When<Be.Vlaanderen.Basisregisters.ProjectionHandling.SqlStreamStore.Envelope<MunicipalityDomain.MunicipalityBecameCurrent>>(DoNothing);
When<Be.Vlaanderen.Basisregisters.ProjectionHandling.SqlStreamStore.Envelope<MunicipalityDomain.MunicipalityFacilityLanguageWasAdded>>(DoNothing);
When<Be.Vlaanderen.Basisregisters.ProjectionHandling.SqlStreamStore.Envelope<MunicipalityDomain.MunicipalityFacilityLanguageWasRemoved>>(DoNothing);
When<Be.Vlaanderen.Basisregisters.ProjectionHandling.SqlStreamStore.Envelope<MunicipalityDomain.MunicipalityOfficialLanguageWasAdded>>(DoNothing);
When<Be.Vlaanderen.Basisregisters.ProjectionHandling.SqlStreamStore.Envelope<MunicipalityDomain.MunicipalityOfficialLanguageWasRemoved>>(DoNothing);
When<Be.Vlaanderen.Basisregisters.ProjectionHandling.SqlStreamStore.Envelope<MunicipalityDomain.MunicipalityWasCorrectedToCurrent>>(DoNothing);
When<Be.Vlaanderen.Basisregisters.ProjectionHandling.SqlStreamStore.Envelope<MunicipalityDomain.MunicipalityWasCorrectedToRetired>>(DoNothing);
When<Be.Vlaanderen.Basisregisters.ProjectionHandling.SqlStreamStore.Envelope<MunicipalityDomain.MunicipalityWasImported>>(DoNothing);
When<Be.Vlaanderen.Basisregisters.ProjectionHandling.SqlStreamStore.Envelope<MunicipalityDomain.MunicipalityWasMerged>>(DoNothing);
When<Be.Vlaanderen.Basisregisters.ProjectionHandling.SqlStreamStore.Envelope<MunicipalityDomain.MunicipalityWasNamed>>(DoNothing);
When<Be.Vlaanderen.Basisregisters.ProjectionHandling.SqlStreamStore.Envelope<MunicipalityDomain.MunicipalityWasRetired>>(DoNothing);
When<Be.Vlaanderen.Basisregisters.ProjectionHandling.SqlStreamStore.Envelope<MunicipalityDomain.StreetNameHomonymAdditionsWereCorrected>>(DoNothing);
When<Be.Vlaanderen.Basisregisters.ProjectionHandling.SqlStreamStore.Envelope<MunicipalityDomain.StreetNameHomonymAdditionsWereRemoved>>(DoNothing);
}

private async Task Produce<T>(
Expand All @@ -149,9 +170,19 @@ private async Task Produce<T>(
long storePosition,
CancellationToken cancellationToken = default)
where T : class, IQueueMessage
{
await Produce(persistentLocalId.ToString(), message, storePosition, cancellationToken);
}

private async Task Produce<T>(
string messageKey,
T message,
long storePosition,
CancellationToken cancellationToken = default)
where T : class, IQueueMessage
{
var result = await _producer.ProduceJsonMessage(
new MessageKey(persistentLocalId.ToString()),
new MessageKey(messageKey),
message,
new List<MessageHeader> { new MessageHeader(MessageHeader.IdempotenceKey, storePosition.ToString()) },
cancellationToken);
Expand All @@ -162,5 +193,7 @@ private async Task Produce<T>(
result.ErrorReason); //TODO: create custom exception
}
}

private static Task DoNothing<T>(ProducerContext context, Be.Vlaanderen.Basisregisters.ProjectionHandling.SqlStreamStore.Envelope<T> envelope, CancellationToken ct) where T: IMessage => Task.CompletedTask;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ namespace StreetNameRegistry.Projections.Extract.StreetNameExtract
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Be.Vlaanderen.Basisregisters.EventHandling;
using Be.Vlaanderen.Basisregisters.GrAr.Common;
using Be.Vlaanderen.Basisregisters.GrAr.Extracts;
Expand Down Expand Up @@ -312,7 +314,7 @@ await context.FindAndUpdateStreetNameExtract(
}, ct);
});

When<Envelope<MunicipalityNisCodeWasChanged>>(async (context, message, ct) =>
When<Envelope<MunicipalityNisCodeWasChanged>>(async (context, message, _) =>
{
var streetNames = context
.StreetNameExtractV2
Expand All @@ -325,10 +327,24 @@ await context.FindAndUpdateStreetNameExtract(
UpdateRecord(streetName, i => i.gemeenteid.Value = message.Message.NisCode);
}
});

When<Envelope<MunicipalityBecameCurrent>>(DoNothing);
When<Envelope<MunicipalityFacilityLanguageWasAdded>>(DoNothing);
When<Envelope<MunicipalityFacilityLanguageWasRemoved>>(DoNothing);
When<Envelope<MunicipalityOfficialLanguageWasAdded>>(DoNothing);
When<Envelope<MunicipalityOfficialLanguageWasRemoved>>(DoNothing);
When<Envelope<MunicipalityWasCorrectedToCurrent>>(DoNothing);
When<Envelope<MunicipalityWasCorrectedToRetired>>(DoNothing);
When<Envelope<MunicipalityWasImported>>(DoNothing);
When<Envelope<MunicipalityWasMerged>>(DoNothing);
When<Envelope<MunicipalityWasNamed>>(DoNothing);
When<Envelope<MunicipalityWasRetired>>(DoNothing);
When<Envelope<StreetNameHomonymAdditionsWereCorrected>>(DoNothing);
When<Envelope<StreetNameHomonymAdditionsWereRemoved>>(DoNothing);
}

private void UpdateHomoniemtv(StreetNameExtractItemV2 streetName, List<StreetNameHomonymAddition> homonymAdditions)
=> UpdateRecord(streetName, record =>
=> UpdateRecord(streetName, _ =>
{
foreach (var streetNameHomonymAddition in homonymAdditions)
{
Expand Down Expand Up @@ -357,7 +373,7 @@ private void UpdateHomoniemtv(StreetNameExtractItemV2 streetName, List<StreetNam
});

private void UpdateStraatnm(StreetNameExtractItemV2 streetName, IDictionary<Language, string> streetNameNames)
=> UpdateRecord(streetName, record =>
=> UpdateRecord(streetName, _ =>
{
foreach (var (language, streetNameName) in streetNameNames)
{
Expand Down Expand Up @@ -403,5 +419,7 @@ private void UpdateRecord(StreetNameExtractItemV2 municipality, Action<StreetNam

municipality.DbaseRecord = record.ToBytes(_encoding);
}

private static Task DoNothing<T>(ExtractContext context, Envelope<T> envelope, CancellationToken ct) where T: IMessage => Task.CompletedTask;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@
{
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Be.Vlaanderen.Basisregisters.EventHandling;
using Be.Vlaanderen.Basisregisters.ProjectionHandling.Connector;
using Be.Vlaanderen.Basisregisters.ProjectionHandling.SqlStreamStore;
using Converters;
Expand Down Expand Up @@ -255,6 +258,22 @@ await context.FindAndUpdateStreetNameLatestItem(message.Message.PersistentLocalI
item.VersionTimestamp = message.Message.Provenance.Timestamp;
}, ct);
});

When<Envelope<MunicipalityBecameCurrent>>(DoNothing);
When<Envelope<MunicipalityFacilityLanguageWasAdded>>(DoNothing);
When<Envelope<MunicipalityFacilityLanguageWasRemoved>>(DoNothing);
When<Envelope<MunicipalityOfficialLanguageWasAdded>>(DoNothing);
When<Envelope<MunicipalityOfficialLanguageWasRemoved>>(DoNothing);
When<Envelope<MunicipalityWasCorrectedToCurrent>>(DoNothing);
When<Envelope<MunicipalityWasCorrectedToRetired>>(DoNothing);
When<Envelope<MunicipalityWasImported>>(DoNothing);
When<Envelope<MunicipalityWasMerged>>(DoNothing);
When<Envelope<MunicipalityWasNamed>>(DoNothing);
When<Envelope<MunicipalityWasRetired>>(DoNothing);
When<Envelope<StreetNameHomonymAdditionsWereCorrected>>(DoNothing);
When<Envelope<StreetNameHomonymAdditionsWereRemoved>>(DoNothing);
}

private static Task DoNothing<T>(IntegrationContext context, Envelope<T> envelope, CancellationToken ct) where T: IMessage => Task.CompletedTask;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
using Municipality;
using StreetName.Events;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Be.Vlaanderen.Basisregisters.EventHandling;
using Microsoft.EntityFrameworkCore;

[ConnectedProjectionName("Integratie straatnaam versie")]
Expand Down Expand Up @@ -511,6 +514,22 @@ await context.NewStreetNameVersion(message.Message.PersistentLocalId, message, i
item.IsRemoved = true;
}, ct);
});

When<Envelope<MunicipalityBecameCurrent>>(DoNothing);
When<Envelope<MunicipalityFacilityLanguageWasAdded>>(DoNothing);
When<Envelope<MunicipalityFacilityLanguageWasRemoved>>(DoNothing);
When<Envelope<MunicipalityOfficialLanguageWasAdded>>(DoNothing);
When<Envelope<MunicipalityOfficialLanguageWasRemoved>>(DoNothing);
When<Envelope<MunicipalityWasCorrectedToCurrent>>(DoNothing);
When<Envelope<MunicipalityWasCorrectedToRetired>>(DoNothing);
When<Envelope<MunicipalityWasImported>>(DoNothing);
When<Envelope<MunicipalityWasMerged>>(DoNothing);
When<Envelope<MunicipalityWasNamed>>(DoNothing);
When<Envelope<MunicipalityWasRetired>>(DoNothing);
When<Envelope<StreetNameHomonymAdditionsWereCorrected>>(DoNothing);
When<Envelope<StreetNameHomonymAdditionsWereRemoved>>(DoNothing);
}

private static Task DoNothing<T>(IntegrationContext context, Envelope<T> envelope, CancellationToken ct) where T: IMessage => Task.CompletedTask;
}
}
Loading

0 comments on commit e746932

Please sign in to comment.