Skip to content

Commit

Permalink
Fix MassTransit Sql Transport migration issue
Browse files Browse the repository at this point in the history
  • Loading branch information
ImoutoChan committed Oct 20, 2024
1 parent 9c1c068 commit 097b0d5
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 1 deletion.
2 changes: 1 addition & 1 deletion Source/Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<PackageVersion Include="Mackiovello.Maybe" Version="1.0.0"/>
<PackageVersion Include="MahApps.Metro" Version="2.4.10"/>
<PackageVersion Include="MahApps.Metro.IconPacks" Version="5.0.1"/>
<PackageVersion Include="MassTransit.SqlTransport.PostgreSQL" Version="8.2.5"/>
<PackageVersion Include="MassTransit.SqlTransport.PostgreSQL" Version="8.3.0"/>
<PackageVersion Include="MediatR" Version="12.4.1"/>
<PackageVersion Include="Microsoft-WindowsAPICodePack-Core" Version="1.1.5"/>
<PackageVersion Include="Microsoft-WindowsAPICodePack-Shell" Version="1.1.5"/>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
using System.Reflection;
using MassTransit;
using MassTransit.SqlTransport;
using MassTransit.SqlTransport.PostgreSql;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Npgsql;

namespace ImoutoRebirth.Common.MassTransit;
Expand Down Expand Up @@ -34,6 +37,10 @@ public static IServiceCollection AddSqlMassTransit(
});
services.AddPostgresMigrationHostedService();

services.RemoveAll<ISqlTransportDatabaseMigrator>();
services.AddTransient<PostgresDatabaseMigrator>();
services.AddTransient<ISqlTransportDatabaseMigrator, UpgradablePostgresDatabaseMigrator>();

services.AddMassTransit(
x =>
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
using System.Transactions;
using Dapper;
using MassTransit;
using MassTransit.SqlTransport;
using MassTransit.SqlTransport.PostgreSql;
using Microsoft.Extensions.Logging;

namespace ImoutoRebirth.Common.MassTransit;

internal class UpgradablePostgresDatabaseMigrator : ISqlTransportDatabaseMigrator
{
private const string DropViewsAndFunctionsSql =
"""
DO $$
DECLARE
r RECORD;
schema_name TEXT := '{0}';
BEGIN
FOR r IN (
SELECT table_name
FROM information_schema.views
WHERE table_schema = schema_name
) LOOP
EXECUTE format('DROP VIEW IF EXISTS %I.%I CASCADE', schema_name, r.table_name);
END LOOP;
FOR r IN (
SELECT routine_name, specific_name
FROM information_schema.routines
WHERE routine_schema = schema_name
AND routine_type = 'FUNCTION'
) LOOP
EXECUTE format('DROP FUNCTION IF EXISTS %I.%I CASCADE', schema_name, r.routine_name);
END LOOP;
END $$;
""";

private readonly ISqlTransportDatabaseMigrator _defaultMigrator;
private readonly ILogger<UpgradablePostgresDatabaseMigrator> _logger;

public UpgradablePostgresDatabaseMigrator(
PostgresDatabaseMigrator defaultMigrator,
ILogger<UpgradablePostgresDatabaseMigrator> logger)
{
_defaultMigrator = defaultMigrator;
_logger = logger;
}

public async Task CreateInfrastructure(SqlTransportOptions options, CancellationToken ct)
{
try
{
await _defaultMigrator.CreateInfrastructure(options, ct);
}
catch (Exception e)
{
_logger.LogWarning(
e,
"Unable to migrate infrastructure for MassTransit. Retrying after removing functions and views.");

using var transaction = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled);

await DropObsoleteViewsAndFunctions(options, ct);
await _defaultMigrator.CreateInfrastructure(options, ct);

transaction.Complete();
}
}

private async Task DropObsoleteViewsAndFunctions(SqlTransportOptions options, CancellationToken ct)
{
await using var connection = PostgresSqlTransportConnection.GetDatabaseConnection(options);
await connection.Open(ct).ConfigureAwait(false);

try
{
await connection.Connection
.ExecuteScalarAsync<int>(string.Format(DropViewsAndFunctionsSql, options.Schema))
.ConfigureAwait(false);

_logger.LogDebug("Functions and views were removed in schema {Schema}", options.Schema);
}
finally
{
await connection.Close().ConfigureAwait(false);
}
}

public Task CreateDatabase(SqlTransportOptions options, CancellationToken ct)
=> _defaultMigrator.CreateDatabase(options, ct);

public Task DeleteDatabase(SqlTransportOptions options, CancellationToken ct)
=> _defaultMigrator.DeleteDatabase(options, ct);
}

0 comments on commit 097b0d5

Please sign in to comment.