From 4414996d53bf1a04cc534825ff5fb1884cee37da Mon Sep 17 00:00:00 2001 From: Tzachi Aviran Date: Wed, 10 Jul 2024 15:02:02 +0300 Subject: [PATCH 1/2] Add initial support locking with external transaction for Postgres --- .../PostgresAdvisoryLock.cs | 41 +++++++++++-------- .../PostgresDistributedLock.cs | 17 ++++++++ .../PostgresDistributedReaderWriterLock.cs | 11 +++++ ...tgresDistributedSynchronizationProvider.cs | 14 +++++++ .../PublicAPI.Unshipped.txt | 3 ++ 5 files changed, 70 insertions(+), 16 deletions(-) diff --git a/src/DistributedLock.Postgres/PostgresAdvisoryLock.cs b/src/DistributedLock.Postgres/PostgresAdvisoryLock.cs index 2a8f9617..8684aa5b 100644 --- a/src/DistributedLock.Postgres/PostgresAdvisoryLock.cs +++ b/src/DistributedLock.Postgres/PostgresAdvisoryLock.cs @@ -46,11 +46,12 @@ private PostgresAdvisoryLock(bool isShared) // Our acquire command will use SET LOCAL to set up statement timeouts. This lasts until the end // of the current transaction instead of just the current batch if we're in a transaction. To make sure - // we don't leak those settings, in the case of a transaction we first set up a save point which we can + // we don't leak those settings, in the case of a transaction, we first set up a save point which we can // later roll back (taking the settings changes with it but NOT the lock). Because we can't confidently // roll back a save point without knowing that it has been set up, we start the save point in its own - // query before we try-catch - var needsSavePoint = await HasTransactionAsync(connection).ConfigureAwait(false); + // query before we try-catch. + var needsSavePoint = await ShouldDefineSavePoint(connection).ConfigureAwait(false); + if (needsSavePoint) { using var setSavePointCommand = connection.CreateCommand(); @@ -124,9 +125,7 @@ private PostgresAdvisoryLock(bool isShared) async ValueTask RollBackTransactionTimeoutVariablesIfNeededAsync(bool acquired) { if (needsSavePoint - // For transaction scoped locks, we can't roll back the save point on success because that will roll - // back our hold on the lock. It's ok to "leak" the savepoint in that case because it's an internally-owned - // transaction/connection and the savepoint will be cleaned up with the disposal of the transaction. + // For transaction scoped locks, we can't roll back the save point on success because that will roll back our hold on the lock. && !(acquired && UseTransactionScopedLock(connection))) { // attempt to clear the timeout variables we set @@ -182,13 +181,17 @@ private DatabaseCommand CreateAcquireCommand(DatabaseConnection connection, Post return command; } - private static async ValueTask HasTransactionAsync(DatabaseConnection connection) + private static async ValueTask ShouldDefineSavePoint(DatabaseConnection connection) { - if (connection.HasTransaction) { return true; } - if (!connection.IsExernallyOwned) { return false; } + // If the connection is internally-owned, we only define a save point if a transaction has been opened. + if (!connection.IsExernallyOwned) { return connection.HasTransaction; } + + // If the connection is externally-owned with an established transaction, we don't want to pollute it with a save point + // which we won't be able to release in case the lock will be acquired. + if (connection.HasTransaction) { return false; } - // If the connection is externally owned, then it might be part of a transaction that we can't - // see. In that case, the only real way to detect it is to begin a new one + // The externally-owned connection might still be part of a transaction that we can't see. + // In that case, the only real way to detect it is to begin a new one. try { await connection.BeginTransactionAsync().ConfigureAwait(false); @@ -199,6 +202,7 @@ private static async ValueTask HasTransactionAsync(DatabaseConnection conn } await connection.DisposeTransactionAsync().ConfigureAwait(false); + return false; } @@ -207,7 +211,13 @@ public ValueTask ReleaseAsync(DatabaseConnection connection, string resourceName private async ValueTask ReleaseAsync(DatabaseConnection connection, PostgresAdvisoryLockKey key, bool isTry) { - Invariant.Require(!UseTransactionScopedLock(connection)); + // For transaction scoped advisory locks, the lock can only be released by ending the transaction. + // If the transaction is internally-owned, then the lock will be released when the transaction is disposed as part of the internal connection management. + // If the transaction is externally-owned, then the lock will have to be released explicitly by the transaction initiator. + if (UseTransactionScopedLock(connection)) + { + return; + } using var command = connection.CreateCommand(); command.SetCommandText($"SELECT pg_catalog.pg_advisory_unlock{(this._isShared ? "_shared" : string.Empty)}({AddKeyParametersAndGetKeyArguments(command, key)})"); @@ -235,10 +245,9 @@ private static string AddKeyParametersAndGetKeyArguments(DatabaseCommand command } private static bool UseTransactionScopedLock(DatabaseConnection connection) => - // This implementation (similar to what we do for SQL Server) is based on the fact that we only create transactions on - // internally-owned connections when doing transaction-scoped locking, and we only support transaction-scoped locking on - // internally-owned connections (since there's no explicit release). - !connection.IsExernallyOwned && connection.HasTransaction; + // Transaction-scoped locking is supported on both externally-owned and internally-owned connections, + // as long as the connection has a transaction. + connection.HasTransaction; private static string AddPGLocksFilterParametersAndGetFilterExpression(DatabaseCommand command, PostgresAdvisoryLockKey key) { diff --git a/src/DistributedLock.Postgres/PostgresDistributedLock.cs b/src/DistributedLock.Postgres/PostgresDistributedLock.cs index dda68386..2163aca2 100644 --- a/src/DistributedLock.Postgres/PostgresDistributedLock.cs +++ b/src/DistributedLock.Postgres/PostgresDistributedLock.cs @@ -32,6 +32,17 @@ public PostgresDistributedLock(PostgresAdvisoryLockKey key, IDbConnection connec { } + /// + /// Constructs a lock with the given (effectively the lock name) and . + /// + /// The provided will be used to connect to the database and will provide lock scope. It is assumed to be externally managed and + /// will not be committed or rolled back. + /// + public PostgresDistributedLock(PostgresAdvisoryLockKey key, IDbTransaction transaction) + : this(key, CreateInternalLock(key, transaction)) + { + } + #if NET7_0_OR_GREATER /// /// Constructs a lock with the given (effectively the lock name) and , @@ -78,6 +89,12 @@ internal static IDbDistributedLock CreateInternalLock(PostgresAdvisoryLockKey ke return new DedicatedConnectionOrTransactionDbDistributedLock(key.ToString(), () => new PostgresDatabaseConnection(connection)); } + internal static IDbDistributedLock CreateInternalLock(PostgresAdvisoryLockKey key, IDbTransaction transaction) + { + if (transaction == null) { throw new ArgumentNullException(nameof(transaction)); } + return new DedicatedConnectionOrTransactionDbDistributedLock(key.ToString(), () => new PostgresDatabaseConnection(transaction)); + } + #if NET7_0_OR_GREATER internal static IDbDistributedLock CreateInternalLock(PostgresAdvisoryLockKey key, DbDataSource dbDataSource, Action? options) { diff --git a/src/DistributedLock.Postgres/PostgresDistributedReaderWriterLock.cs b/src/DistributedLock.Postgres/PostgresDistributedReaderWriterLock.cs index 8a713cc1..73777841 100644 --- a/src/DistributedLock.Postgres/PostgresDistributedReaderWriterLock.cs +++ b/src/DistributedLock.Postgres/PostgresDistributedReaderWriterLock.cs @@ -32,6 +32,17 @@ public PostgresDistributedReaderWriterLock(PostgresAdvisoryLockKey key, IDbConne { } + /// + /// Constructs a lock with the given (effectively the lock name) and . + /// + /// The provided will be used to connect to the database and will provide lock scope. It is assumed to be externally managed and + /// will not be committed or rolled back. + /// + public PostgresDistributedReaderWriterLock(PostgresAdvisoryLockKey key, IDbTransaction transaction) + : this(key, PostgresDistributedLock.CreateInternalLock(key, transaction)) + { + } + #if NET7_0_OR_GREATER /// /// Constructs a lock with the given (effectively the lock name) and , diff --git a/src/DistributedLock.Postgres/PostgresDistributedSynchronizationProvider.cs b/src/DistributedLock.Postgres/PostgresDistributedSynchronizationProvider.cs index bcb9f600..fa151084 100644 --- a/src/DistributedLock.Postgres/PostgresDistributedSynchronizationProvider.cs +++ b/src/DistributedLock.Postgres/PostgresDistributedSynchronizationProvider.cs @@ -36,6 +36,20 @@ public PostgresDistributedSynchronizationProvider(IDbConnection connection) this._readerWriterLockFactory = key => new PostgresDistributedReaderWriterLock(key, connection); } + /// + /// Constructs a provider which connects to Postgres using the provided . + /// + /// The provided will be used to connect to the database and will provide lock scope. It is assumed to be externally managed and + /// will not be committed or rolled back. + /// + public PostgresDistributedSynchronizationProvider(IDbTransaction transaction) + { + if (transaction == null) { throw new ArgumentNullException(nameof(transaction)); } + + this._lockFactory = key => new PostgresDistributedLock(key, transaction); + this._readerWriterLockFactory = key => new PostgresDistributedReaderWriterLock(key, transaction); + } + #if NET7_0_OR_GREATER /// /// Constructs a provider which connects to Postgres using the provided and . diff --git a/src/DistributedLock.Postgres/PublicAPI.Unshipped.txt b/src/DistributedLock.Postgres/PublicAPI.Unshipped.txt index e69de29b..79508f8f 100644 --- a/src/DistributedLock.Postgres/PublicAPI.Unshipped.txt +++ b/src/DistributedLock.Postgres/PublicAPI.Unshipped.txt @@ -0,0 +1,3 @@ +Medallion.Threading.Postgres.PostgresDistributedLock.PostgresDistributedLock(Medallion.Threading.Postgres.PostgresAdvisoryLockKey key, System.Data.IDbTransaction! transaction) -> void +Medallion.Threading.Postgres.PostgresDistributedReaderWriterLock.PostgresDistributedReaderWriterLock(Medallion.Threading.Postgres.PostgresAdvisoryLockKey key, System.Data.IDbTransaction! transaction) -> void +Medallion.Threading.Postgres.PostgresDistributedSynchronizationProvider.PostgresDistributedSynchronizationProvider(System.Data.IDbTransaction! transaction) -> void \ No newline at end of file From ad1c932579c3ba50fbf84435505c4aa322c6ebe0 Mon Sep 17 00:00:00 2001 From: Tzachi Aviran Date: Sun, 18 Aug 2024 10:32:26 +0300 Subject: [PATCH 2/2] Add support for static utility methods --- .../Internal/DistributedLockHelpers.cs | 27 ++++++++- src/DistributedLock.Core/Internal/Helpers.cs | 2 +- .../PostgresDistributedLock.Extensions.cs | 56 +++++++++++++++++++ .../PostgresDistributedLock.cs | 17 ------ .../PostgresDistributedReaderWriterLock.cs | 11 ---- ...tgresDistributedSynchronizationProvider.cs | 14 ----- .../PublicAPI.Unshipped.txt | 8 ++- 7 files changed, 88 insertions(+), 47 deletions(-) create mode 100644 src/DistributedLock.Postgres/PostgresDistributedLock.Extensions.cs diff --git a/src/DistributedLock.Core/Internal/DistributedLockHelpers.cs b/src/DistributedLock.Core/Internal/DistributedLockHelpers.cs index 7427b731..3b997933 100644 --- a/src/DistributedLock.Core/Internal/DistributedLockHelpers.cs +++ b/src/DistributedLock.Core/Internal/DistributedLockHelpers.cs @@ -1,4 +1,5 @@ -using System.Security.Cryptography; +using Medallion.Threading.Internal.Data; +using System.Security.Cryptography; using System.Text; namespace Medallion.Threading.Internal; @@ -133,6 +134,30 @@ public static bool TryUpgradeToWriteLock(IDistributedLockUpgradeableHandle handl SyncViaAsync.Run(t => t.handle.TryUpgradeToWriteLockAsync(t.timeout, t.cancellationToken), (handle, timeout, cancellationToken)); #endregion + #region ---- IDbSynchronizationStrategy implementations ---- + public static ValueTask TryAcquireAsync(IDbSynchronizationStrategy strategy, DatabaseConnection connection, string resourceName, TimeSpan timeout, CancellationToken cancellationToken) + where THandle : class => + strategy.TryAcquireAsync(connection, resourceName, timeout, cancellationToken); + + public static ValueTask AcquireAsync(IDbSynchronizationStrategy strategy, DatabaseConnection connection, string resourceName, TimeSpan? timeout, CancellationToken cancellationToken) + where THandle : class => + strategy.TryAcquireAsync(connection, resourceName, timeout, cancellationToken).ThrowTimeoutIfNull(); + + public static THandle Acquire(IDbSynchronizationStrategy strategy, DatabaseConnection connection, string resourceName, TimeSpan? timeout, CancellationToken cancellationToken) + where THandle : class => + SyncViaAsync.Run( + state => AcquireAsync(state.strategy, state.connection, state.resourceName, state.timeout, state.cancellationToken), + (strategy, connection, resourceName, timeout, cancellationToken) + ); + + public static THandle? TryAcquire(IDbSynchronizationStrategy strategy, DatabaseConnection connection, string resourceName, TimeSpan timeout, CancellationToken cancellationToken) + where THandle : class => + SyncViaAsync.Run( + state => TryAcquireAsync(strategy, connection, resourceName, timeout, cancellationToken), + (strategy, connection, resourceName, timeout, cancellationToken) + ); + #endregion + private static Exception LockTimeout(string? @object = null) => new TimeoutException($"Timeout exceeded when trying to acquire the {@object ?? "lock"}"); public static async ValueTask ThrowTimeoutIfNull(this ValueTask task, string? @object = null) where T : class => diff --git a/src/DistributedLock.Core/Internal/Helpers.cs b/src/DistributedLock.Core/Internal/Helpers.cs index d77cd1d0..7868c08d 100644 --- a/src/DistributedLock.Core/Internal/Helpers.cs +++ b/src/DistributedLock.Core/Internal/Helpers.cs @@ -28,7 +28,7 @@ public readonly struct TaskConversion public readonly struct TaskConversion { } - internal static async ValueTask ConvertToVoid(this ValueTask task) => await task.ConfigureAwait(false); + public static async ValueTask ConvertToVoid(this ValueTask task) => await task.ConfigureAwait(false); public static ValueTask AsValueTask(this Task task) => new(task); public static ValueTask AsValueTask(this Task task) => new(task); diff --git a/src/DistributedLock.Postgres/PostgresDistributedLock.Extensions.cs b/src/DistributedLock.Postgres/PostgresDistributedLock.Extensions.cs new file mode 100644 index 00000000..deee7272 --- /dev/null +++ b/src/DistributedLock.Postgres/PostgresDistributedLock.Extensions.cs @@ -0,0 +1,56 @@ +using Medallion.Threading.Internal; +using System.Data; + +namespace Medallion.Threading.Postgres; + +public partial class PostgresDistributedLock +{ + public static bool TryAcquireWithTransaction(PostgresAdvisoryLockKey key, IDbTransaction transaction, TimeSpan timeout = default, CancellationToken cancellationToken = default) + { + if (key == null) { throw new ArgumentNullException(nameof(key)); } + if (transaction == null) { throw new ArgumentNullException(nameof(transaction)); } + + var connection = new PostgresDatabaseConnection(transaction); + + var handle = DistributedLockHelpers.TryAcquire(PostgresAdvisoryLock.ExclusiveLock, connection, key.ToString(), timeout, cancellationToken); + + return handle != null; + } + + public static void AcquireWithTransaction(PostgresAdvisoryLockKey key, IDbTransaction transaction, TimeSpan? timeout = null, CancellationToken cancellationToken = default) + { + if (key == null) { throw new ArgumentNullException(nameof(key)); } + if (transaction == null) { throw new ArgumentNullException(nameof(transaction)); } + + var connection = new PostgresDatabaseConnection(transaction); + + DistributedLockHelpers.Acquire(PostgresAdvisoryLock.ExclusiveLock, connection, key.ToString(), timeout, cancellationToken); + } + + public static ValueTask TryAcquireWithTransactionAsync(PostgresAdvisoryLockKey key, IDbTransaction transaction, TimeSpan timeout = default, CancellationToken cancellationToken = default) + { + if (key == null) { throw new ArgumentNullException(nameof(key)); } + if (transaction == null) { throw new ArgumentNullException(nameof(transaction)); } + + var connection = new PostgresDatabaseConnection(transaction); + + return TryAcquireAsync(); + + async ValueTask TryAcquireAsync() + { + var handle = await DistributedLockHelpers.TryAcquireAsync(PostgresAdvisoryLock.ExclusiveLock, connection, key.ToString(), timeout, cancellationToken); + + return handle != null; + } + } + + public static ValueTask AcquireWithTransactionAsync(PostgresAdvisoryLockKey key, IDbTransaction transaction, TimeSpan? timeout = null, CancellationToken cancellationToken = default) + { + if (key == null) { throw new ArgumentNullException(nameof(key)); } + if (transaction == null) { throw new ArgumentNullException(nameof(transaction)); } + + var connection = new PostgresDatabaseConnection(transaction); + + return DistributedLockHelpers.AcquireAsync(PostgresAdvisoryLock.ExclusiveLock, connection, key.ToString(), timeout, cancellationToken).ConvertToVoid(); + } +} diff --git a/src/DistributedLock.Postgres/PostgresDistributedLock.cs b/src/DistributedLock.Postgres/PostgresDistributedLock.cs index 2163aca2..dda68386 100644 --- a/src/DistributedLock.Postgres/PostgresDistributedLock.cs +++ b/src/DistributedLock.Postgres/PostgresDistributedLock.cs @@ -32,17 +32,6 @@ public PostgresDistributedLock(PostgresAdvisoryLockKey key, IDbConnection connec { } - /// - /// Constructs a lock with the given (effectively the lock name) and . - /// - /// The provided will be used to connect to the database and will provide lock scope. It is assumed to be externally managed and - /// will not be committed or rolled back. - /// - public PostgresDistributedLock(PostgresAdvisoryLockKey key, IDbTransaction transaction) - : this(key, CreateInternalLock(key, transaction)) - { - } - #if NET7_0_OR_GREATER /// /// Constructs a lock with the given (effectively the lock name) and , @@ -89,12 +78,6 @@ internal static IDbDistributedLock CreateInternalLock(PostgresAdvisoryLockKey ke return new DedicatedConnectionOrTransactionDbDistributedLock(key.ToString(), () => new PostgresDatabaseConnection(connection)); } - internal static IDbDistributedLock CreateInternalLock(PostgresAdvisoryLockKey key, IDbTransaction transaction) - { - if (transaction == null) { throw new ArgumentNullException(nameof(transaction)); } - return new DedicatedConnectionOrTransactionDbDistributedLock(key.ToString(), () => new PostgresDatabaseConnection(transaction)); - } - #if NET7_0_OR_GREATER internal static IDbDistributedLock CreateInternalLock(PostgresAdvisoryLockKey key, DbDataSource dbDataSource, Action? options) { diff --git a/src/DistributedLock.Postgres/PostgresDistributedReaderWriterLock.cs b/src/DistributedLock.Postgres/PostgresDistributedReaderWriterLock.cs index 73777841..8a713cc1 100644 --- a/src/DistributedLock.Postgres/PostgresDistributedReaderWriterLock.cs +++ b/src/DistributedLock.Postgres/PostgresDistributedReaderWriterLock.cs @@ -32,17 +32,6 @@ public PostgresDistributedReaderWriterLock(PostgresAdvisoryLockKey key, IDbConne { } - /// - /// Constructs a lock with the given (effectively the lock name) and . - /// - /// The provided will be used to connect to the database and will provide lock scope. It is assumed to be externally managed and - /// will not be committed or rolled back. - /// - public PostgresDistributedReaderWriterLock(PostgresAdvisoryLockKey key, IDbTransaction transaction) - : this(key, PostgresDistributedLock.CreateInternalLock(key, transaction)) - { - } - #if NET7_0_OR_GREATER /// /// Constructs a lock with the given (effectively the lock name) and , diff --git a/src/DistributedLock.Postgres/PostgresDistributedSynchronizationProvider.cs b/src/DistributedLock.Postgres/PostgresDistributedSynchronizationProvider.cs index fa151084..bcb9f600 100644 --- a/src/DistributedLock.Postgres/PostgresDistributedSynchronizationProvider.cs +++ b/src/DistributedLock.Postgres/PostgresDistributedSynchronizationProvider.cs @@ -36,20 +36,6 @@ public PostgresDistributedSynchronizationProvider(IDbConnection connection) this._readerWriterLockFactory = key => new PostgresDistributedReaderWriterLock(key, connection); } - /// - /// Constructs a provider which connects to Postgres using the provided . - /// - /// The provided will be used to connect to the database and will provide lock scope. It is assumed to be externally managed and - /// will not be committed or rolled back. - /// - public PostgresDistributedSynchronizationProvider(IDbTransaction transaction) - { - if (transaction == null) { throw new ArgumentNullException(nameof(transaction)); } - - this._lockFactory = key => new PostgresDistributedLock(key, transaction); - this._readerWriterLockFactory = key => new PostgresDistributedReaderWriterLock(key, transaction); - } - #if NET7_0_OR_GREATER /// /// Constructs a provider which connects to Postgres using the provided and . diff --git a/src/DistributedLock.Postgres/PublicAPI.Unshipped.txt b/src/DistributedLock.Postgres/PublicAPI.Unshipped.txt index 79508f8f..00b28778 100644 --- a/src/DistributedLock.Postgres/PublicAPI.Unshipped.txt +++ b/src/DistributedLock.Postgres/PublicAPI.Unshipped.txt @@ -1,3 +1,5 @@ -Medallion.Threading.Postgres.PostgresDistributedLock.PostgresDistributedLock(Medallion.Threading.Postgres.PostgresAdvisoryLockKey key, System.Data.IDbTransaction! transaction) -> void -Medallion.Threading.Postgres.PostgresDistributedReaderWriterLock.PostgresDistributedReaderWriterLock(Medallion.Threading.Postgres.PostgresAdvisoryLockKey key, System.Data.IDbTransaction! transaction) -> void -Medallion.Threading.Postgres.PostgresDistributedSynchronizationProvider.PostgresDistributedSynchronizationProvider(System.Data.IDbTransaction! transaction) -> void \ No newline at end of file +#nullable enable +static Medallion.Threading.Postgres.PostgresDistributedLock.TryAcquireWithTransaction(Medallion.Threading.Postgres.PostgresAdvisoryLockKey! key, System.Data.IDbTransaction! transaction, System.TimeSpan timeout = default(System.TimeSpan), System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> bool +static Medallion.Threading.Postgres.PostgresDistributedLock.AcquireWithTransaction(Medallion.Threading.Postgres.PostgresAdvisoryLockKey! key, System.Data.IDbTransaction! transaction, System.TimeSpan? timeout = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> void +static Medallion.Threading.Postgres.PostgresDistributedLock.TryAcquireWithTransactionAsync(Medallion.Threading.Postgres.PostgresAdvisoryLockKey! key, System.Data.IDbTransaction! transaction, System.TimeSpan timeout = default(System.TimeSpan), System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask +static Medallion.Threading.Postgres.PostgresDistributedLock.AcquireWithTransactionAsync(Medallion.Threading.Postgres.PostgresAdvisoryLockKey! key, System.Data.IDbTransaction! transaction, System.TimeSpan? timeout = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask \ No newline at end of file