From 097b0d50fe0b010f095675472fe62ce63a62a7bc Mon Sep 17 00:00:00 2001 From: ImoutoChan Date: Sun, 20 Oct 2024 19:14:40 +0500 Subject: [PATCH] Fix MassTransit Sql Transport migration issue --- Source/Directory.Packages.props | 2 +- .../MassTransitSqlExtensions.cs | 7 ++ .../UpgradablePostgresDatabaseMigrator.cs | 94 +++++++++++++++++++ 3 files changed, 102 insertions(+), 1 deletion(-) create mode 100644 Source/ImoutoRebirth.Common/ImoutoRebirth.Common.MassTransit/UpgradablePostgresDatabaseMigrator.cs diff --git a/Source/Directory.Packages.props b/Source/Directory.Packages.props index cabd7baf..95df4008 100644 --- a/Source/Directory.Packages.props +++ b/Source/Directory.Packages.props @@ -20,7 +20,7 @@ - + diff --git a/Source/ImoutoRebirth.Common/ImoutoRebirth.Common.MassTransit/MassTransitSqlExtensions.cs b/Source/ImoutoRebirth.Common/ImoutoRebirth.Common.MassTransit/MassTransitSqlExtensions.cs index 8add9e39..e971dd11 100644 --- a/Source/ImoutoRebirth.Common/ImoutoRebirth.Common.MassTransit/MassTransitSqlExtensions.cs +++ b/Source/ImoutoRebirth.Common/ImoutoRebirth.Common.MassTransit/MassTransitSqlExtensions.cs @@ -1,7 +1,10 @@ using System.Reflection; using MassTransit; +using MassTransit.SqlTransport; +using MassTransit.SqlTransport.PostgreSql; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; using Npgsql; namespace ImoutoRebirth.Common.MassTransit; @@ -34,6 +37,10 @@ public static IServiceCollection AddSqlMassTransit( }); services.AddPostgresMigrationHostedService(); + services.RemoveAll(); + services.AddTransient(); + services.AddTransient(); + services.AddMassTransit( x => { diff --git a/Source/ImoutoRebirth.Common/ImoutoRebirth.Common.MassTransit/UpgradablePostgresDatabaseMigrator.cs b/Source/ImoutoRebirth.Common/ImoutoRebirth.Common.MassTransit/UpgradablePostgresDatabaseMigrator.cs new file mode 100644 index 00000000..f74dc910 --- /dev/null +++ b/Source/ImoutoRebirth.Common/ImoutoRebirth.Common.MassTransit/UpgradablePostgresDatabaseMigrator.cs @@ -0,0 +1,94 @@ +using System.Transactions; +using Dapper; +using MassTransit; +using MassTransit.SqlTransport; +using MassTransit.SqlTransport.PostgreSql; +using Microsoft.Extensions.Logging; + +namespace ImoutoRebirth.Common.MassTransit; + +internal class UpgradablePostgresDatabaseMigrator : ISqlTransportDatabaseMigrator +{ + private const string DropViewsAndFunctionsSql = + """ + DO $$ + DECLARE + r RECORD; + schema_name TEXT := '{0}'; + BEGIN + FOR r IN ( + SELECT table_name + FROM information_schema.views + WHERE table_schema = schema_name + ) LOOP + EXECUTE format('DROP VIEW IF EXISTS %I.%I CASCADE', schema_name, r.table_name); + END LOOP; + + FOR r IN ( + SELECT routine_name, specific_name + FROM information_schema.routines + WHERE routine_schema = schema_name + AND routine_type = 'FUNCTION' + ) LOOP + EXECUTE format('DROP FUNCTION IF EXISTS %I.%I CASCADE', schema_name, r.routine_name); + END LOOP; + END $$; + """; + + private readonly ISqlTransportDatabaseMigrator _defaultMigrator; + private readonly ILogger _logger; + + public UpgradablePostgresDatabaseMigrator( + PostgresDatabaseMigrator defaultMigrator, + ILogger logger) + { + _defaultMigrator = defaultMigrator; + _logger = logger; + } + + public async Task CreateInfrastructure(SqlTransportOptions options, CancellationToken ct) + { + try + { + await _defaultMigrator.CreateInfrastructure(options, ct); + } + catch (Exception e) + { + _logger.LogWarning( + e, + "Unable to migrate infrastructure for MassTransit. Retrying after removing functions and views."); + + using var transaction = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled); + + await DropObsoleteViewsAndFunctions(options, ct); + await _defaultMigrator.CreateInfrastructure(options, ct); + + transaction.Complete(); + } + } + + private async Task DropObsoleteViewsAndFunctions(SqlTransportOptions options, CancellationToken ct) + { + await using var connection = PostgresSqlTransportConnection.GetDatabaseConnection(options); + await connection.Open(ct).ConfigureAwait(false); + + try + { + await connection.Connection + .ExecuteScalarAsync(string.Format(DropViewsAndFunctionsSql, options.Schema)) + .ConfigureAwait(false); + + _logger.LogDebug("Functions and views were removed in schema {Schema}", options.Schema); + } + finally + { + await connection.Close().ConfigureAwait(false); + } + } + + public Task CreateDatabase(SqlTransportOptions options, CancellationToken ct) + => _defaultMigrator.CreateDatabase(options, ct); + + public Task DeleteDatabase(SqlTransportOptions options, CancellationToken ct) + => _defaultMigrator.DeleteDatabase(options, ct); +}