Skip to content

Commit

Permalink
Merge pull request #20 from eventuate-tram/Improvements
Browse files Browse the repository at this point in the history
Get public repo up-to-date with latest improvements
  • Loading branch information
douggish authored Sep 20, 2023
2 parents 7bdda0d + 12ed210 commit ecd86a8
Show file tree
Hide file tree
Showing 69 changed files with 1,556 additions and 374 deletions.
3 changes: 2 additions & 1 deletion .gitattributes
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
# Auto detect text files and perform LF normalization
* text=auto
* text=auto
*.sh text eol=lf
14 changes: 7 additions & 7 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ jobs:

steps:
- name: Checkout source
uses: actions/checkout@v2
uses: actions/checkout@v4

- name: Setup .NET Core
uses: actions/setup-dotnet@v1
uses: actions/setup-dotnet@v3
with:
dotnet-version: 3.1.201
dotnet-version: 6.0.414

- name: Build
env:
Expand All @@ -29,7 +29,7 @@ jobs:
run: ./build.sh

- name: Upload bin folder
uses: actions/upload-artifact@v1
uses: actions/upload-artifact@v3
with:
name: bin
path: IO.Eventuate.Tram/bin
Expand All @@ -38,7 +38,7 @@ jobs:
run: dotnet test -c Release --no-build --verbosity normal --logger trx IO.Eventuate.Tram.UnitTests/

- name: Upload unit test results
uses: actions/upload-artifact@v1
uses: actions/upload-artifact@v3
if: always()
with:
name: unit-test-results
Expand All @@ -57,11 +57,11 @@ jobs:
docker stats --no-stream --all
- name: Upload integration test results
uses: actions/upload-artifact@v1
uses: actions/upload-artifact@v3
if: always()
with:
name: integration-test-results
path: IO.Eventuate.Tram.IntegrationTests/bin/Release/netcoreapp3.1/TestResults
path: IO.Eventuate.Tram.IntegrationTests/bin/Release/net6.0/TestResults

- name: Publish nuget package
# Don't publish nuget packages for builds triggered by pull requests (pull requests from forks won't have access to secrets anyway)
Expand Down
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Changelog

All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Added
- Initial port of Eventuate Tram in .NET supporting the following functionality:
- messaging - send and receive messages over named channels
- events - publish domain events and subscribe to domain events

[Unreleased]: https://github.com/eventuate-tram/eventuate-tram-core-dotnet/commits/HEAD
18 changes: 18 additions & 0 deletions DevSetup.ps1
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
Set-Item -Path Env:CDC_SERVICE_DOCKER_VERSION -Value ("0.6.0.RELEASE")
Push-Location -Path IO.Eventuate.Tram.IntegrationTests

docker compose down --remove-orphans
docker compose up -d mssql
docker compose up -d zookeeper
docker compose up -d kafka

Start-Sleep -Seconds 40
docker stats --no-stream

docker-compose up --exit-code-from kafka-setup kafka-setup
docker compose up --exit-code-from dbsetup dbsetup
docker compose up -d cdcservice

docker compose up -d kafka-ui

Pop-Location
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netcoreapp3.1</TargetFramework>
<TargetFramework>net6.0</TargetFramework>

<IsPackable>false</IsPackable>

Expand All @@ -23,13 +23,13 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="3.1.3" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="3.1.3" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="3.1.3" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.6.0" />
<PackageReference Include="Confluent.Kafka" Version="1.4.0" />
<PackageReference Include="NUnit" Version="3.12.0" />
<PackageReference Include="NUnit3TestAdapter" Version="3.16.1" />
<PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="6.0.16" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="6.0.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.6.0" />
<PackageReference Include="Confluent.Kafka" Version="2.1.1" />
<PackageReference Include="NUnit" Version="3.13.3" />
<PackageReference Include="NUnit3TestAdapter" Version="4.5.0" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using IO.Eventuate.Tram.Events.Common;
using IO.Eventuate.Tram.IntegrationTests.TestHelpers;
using IO.Eventuate.Tram.Local.Kafka.Consumer;
using NUnit.Framework;

namespace IO.Eventuate.Tram.IntegrationTests.TestFixtures;

public class BackpressureTests : IntegrationTestsBase
{
private const uint PauseThreshold = 20;
private const uint ResumeThreshold = 5;

[SetUp]
public async Task Setup()
{
await CleanupKafkaTopicsAsync();
// Initialize the backpressure properties
var properties = new EventuateKafkaConsumerConfigurationProperties
{
BackPressure =
{
PauseThreshold = PauseThreshold,
ResumeThreshold = ResumeThreshold
}
};
TestSetup("eventuate", false, properties);
await CleanupTestAsync();
}

[TearDown]
public void TearDown()
{
DisposeTestHost();
}

[Test]
public void PublishWithBackpressure_Send1000Messages_AllMessagesEventuallyProcessed()
{
// Arrange
uint messagesPerType = 250;
uint totalMessages = messagesPerType * 4;
TestMessageType1 msg1 = new TestMessageType1("Msg1", 1, 1.2);
TestMessageType2 msg2 = new TestMessageType2("Msg2", 2);
TestMessageType3 msg3 = new TestMessageType3("Msg3", 3);
TestMessageType4 msg4 = new TestMessageType4("Msg4", 4);

for (int x = 0; x < messagesPerType; x++)
{
GetTestPublisher().Publish(AggregateType12, AggregateType12, new List<IDomainEvent> { msg1 });
GetTestPublisher().Publish(AggregateType12, AggregateType12, new List<IDomainEvent> { msg2 });
GetTestPublisher().Publish(AggregateType34, AggregateType34, new List<IDomainEvent> { msg3 });
GetTestPublisher().Publish(AggregateType34, AggregateType34, new List<IDomainEvent> { msg4 });
}

// Act
TestEventConsumer consumer = GetTestConsumer();

// Allow time for messages to process
int count = 300;
while (consumer.TotalMessageCount() < totalMessages && count > 0)
{
TestContext.WriteLine($"TotalMessageCount: {consumer.TotalMessageCount()} ({count})");
Thread.Sleep(1000);
count--;
}
ShowTestResults();

// Assert
Assert.That(GetDbContext().Messages.Count(),
Is.EqualTo(totalMessages), "Number of messages produced");
Assert.That(GetDbContext().Messages.Count(msg => msg.Published == 0),
Is.EqualTo(0), "Number of unpublished messages");
foreach (var eventType in new[] { typeof(TestMessageType1), typeof(TestMessageType2), typeof(TestMessageType3), typeof(TestMessageType4) })
{
TestEventConsumer.EventStatistics eventStatistics = consumer.GetEventStatistics(eventType);
Assert.That(eventStatistics.MessageCount,
Is.EqualTo(messagesPerType), $"Number of {eventType.Name} messages received by consumer");
Assert.That(eventStatistics.ReceivedMessages.Count,
Is.EqualTo(messagesPerType), $"Number of received {eventType.Name} messages");
}

Assert.That(consumer.TotalMessageCount(),
Is.EqualTo(totalMessages), "Total number of messages received by consumer");
Assert.That(GetDbContext().ReceivedMessages.Count(msg => msg.MessageId != null),
Is.EqualTo(totalMessages), "Number of received messages");
}
}
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Confluent.Kafka;
using Confluent.Kafka.Admin;
using IO.Eventuate.Tram.Database;
using IO.Eventuate.Tram.Events.Publisher;
using IO.Eventuate.Tram.IntegrationTests.TestHelpers;
using IO.Eventuate.Tram.Local.Kafka.Consumer;
using IO.Eventuate.Tram.Messaging.Common;
using IO.Eventuate.Tram.Messaging.Consumer;
using Microsoft.EntityFrameworkCore;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
Expand All @@ -24,11 +23,16 @@ public class IntegrationTestsBase
private string _subscriberId = "xx";
protected const string AggregateType12 = "TestMessage12Topic";
protected const string AggregateType34 = "TestMessage34Topic";
protected const string AggregateTypeDelay = "TestMessageDelayTopic";
protected const string TestPartitionAssignmentTopic1 = "TestPartitionAssignmentTopic1";
protected const string TestPartitionAssignmentTopic2 = "TestPartitionAssignmentTopic2";
protected string EventuateDatabaseSchemaName = "eventuate";
public static string PingFileName = "ping.txt";

protected TestSettings TestSettings;

private static IHost _host;
private static IServiceScope _testServiceScope;
private static EventuateTramDbContext _dbContext;
private static IDomainEventPublisher _domainEventPublisher;
private static TestEventConsumer _testEventConsumer;
Expand Down Expand Up @@ -57,45 +61,46 @@ protected void TestSetup(string schema, bool withInterceptor, EventuateKafkaCons
{
EventuateDatabaseSchemaName = schema;
_subscriberId = Guid.NewGuid().ToString();

// Clear the ping file
File.WriteAllText(PingFileName, string.Empty);

if (_host == null)
{
_host = SetupTestHost(withInterceptor, consumerConfigProperties);
_dbContext = _host.Services.GetService<EventuateTramDbContext>();
_domainEventPublisher = _host.Services.GetService<IDomainEventPublisher>();
_testEventConsumer = _host.Services.GetService<TestEventConsumer>();
_interceptor = (TestMessageInterceptor)_host.Services.GetService<IMessageInterceptor>();
var scopeFactory = _host.Services.GetRequiredService<IServiceScopeFactory>();
_testServiceScope = scopeFactory.CreateScope();
_dbContext = _testServiceScope.ServiceProvider.GetRequiredService<EventuateTramDbContext>();
_domainEventPublisher = _testServiceScope.ServiceProvider.GetRequiredService<IDomainEventPublisher>();
_testEventConsumer = _testServiceScope.ServiceProvider.GetRequiredService<TestEventConsumer>();
_interceptor = (TestMessageInterceptor)_testServiceScope.ServiceProvider.GetService<IMessageInterceptor>();
}
}

protected void CleanupTest()
protected async Task CleanupTestAsync()
{
ClearDb(GetDbContext(), EventuateDatabaseSchemaName);
await ClearDbAsync(GetDbContext(), EventuateDatabaseSchemaName);
GetTestConsumer().Reset();
GetTestMessageInterceptor()?.Reset();
}

protected async Task CleanupKafka()
protected async Task CleanupKafkaTopicsAsync()
{
var config = new AdminClientConfig();
config.BootstrapServers = TestSettings.KafkaBootstrapServers;
try
{
using (var admin = new AdminClientBuilder(config).Build())
{
await admin.DeleteTopicsAsync(new[] {AggregateType12, AggregateType34});
}
}
catch (DeleteTopicsException e)
using var admin = new AdminClientBuilder(config).Build();
Metadata kafkaMetadata = admin.GetMetadata(TimeSpan.FromSeconds(10));
foreach (var topic in new[] {AggregateType12, AggregateType34, AggregateTypeDelay, TestPartitionAssignmentTopic1, TestPartitionAssignmentTopic2})
{
// Don't fail if topic wasn't found (nothing to delete)
if (e.Results.Where(r => r.Error.IsError).All(r => r.Error.Code == ErrorCode.UnknownTopicOrPart))
TopicMetadata paMessagesMetadata = kafkaMetadata.Topics.Find(t => t.Topic.Equals(topic));
if (paMessagesMetadata != null)
{
TestContext.WriteLine(e.Message);
await admin.DeleteRecordsAsync(paMessagesMetadata.Partitions.Select(p => new TopicPartitionOffset(new TopicPartition(
topic, p.PartitionId), Offset.End)));
}
else
{
throw;
TestContext.WriteLine($"Topic {topic} did not exist.");
}
}
}
Expand All @@ -109,6 +114,7 @@ protected void ShowTestResults()
TestContext.WriteLine(" Dispatcher Id: {0}", _subscriberId);
TestContext.WriteLine(" Aggregate Type 12: {0}", AggregateType12);
TestContext.WriteLine(" Aggregate Type 34: {0}", AggregateType34);
TestContext.WriteLine(" Aggregate Type Delay: {0}", AggregateTypeDelay);

TestContext.WriteLine("Test Results");
TestContext.WriteLine(" N Messages in DB: {0}", _dbContext.Messages.Count());
Expand Down Expand Up @@ -166,7 +172,7 @@ protected IHost SetupTestHost(bool withInterceptor, EventuateKafkaConsumerConfig
provider =>
{
var consumer = provider.GetRequiredService<TestEventConsumer>();
return consumer.DomainEventHandlers(AggregateType12, AggregateType34);
return consumer.DomainEventHandlers(AggregateType12, AggregateType34, AggregateTypeDelay);
})
.SetConsumerConfigProperties(consumerConfigProperties)
.Build<TestEventConsumer>(withInterceptor);
Expand All @@ -179,8 +185,7 @@ protected void DisposeTestHost()
if (_host == null)
return;

var messageConsumer = _host.Services.GetService<IMessageConsumer>();
messageConsumer.Close();
_testServiceScope.Dispose();
_host.StopAsync().Wait();
_host.Dispose();
_host = null;
Expand Down Expand Up @@ -209,10 +214,10 @@ protected EventuateTramDbContext GetDbContext()
return _dbContext;
}

protected void ClearDb(EventuateTramDbContext dbContext, String eventuateDatabaseSchemaName)
protected static async Task ClearDbAsync(EventuateTramDbContext dbContext, String eventuateDatabaseSchemaName)
{
dbContext.Database.ExecuteSqlRaw(String.Format("Delete from [{0}].[message]", eventuateDatabaseSchemaName));
dbContext.Database.ExecuteSqlRaw(String.Format("Delete from [{0}].[received_messages]", eventuateDatabaseSchemaName));
await dbContext.Database.ExecuteSqlRawAsync($"Delete from [{eventuateDatabaseSchemaName}].[message]");
await dbContext.Database.ExecuteSqlRawAsync($"Delete from [{eventuateDatabaseSchemaName}].[received_messages]");
}
}
}
Loading

0 comments on commit ecd86a8

Please sign in to comment.