Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#213 Postgres: Add support for transaction-scoped advisory locks with external transactions #222

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion src/DistributedLock.Core/Internal/DistributedLockHelpers.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Security.Cryptography;
using Medallion.Threading.Internal.Data;
using System.Security.Cryptography;
using System.Text;

namespace Medallion.Threading.Internal;
Expand Down Expand Up @@ -133,6 +134,30 @@ public static bool TryUpgradeToWriteLock(IDistributedLockUpgradeableHandle handl
SyncViaAsync.Run(t => t.handle.TryUpgradeToWriteLockAsync(t.timeout, t.cancellationToken), (handle, timeout, cancellationToken));
#endregion

#region ---- IDbSynchronizationStrategy implementations ----
public static ValueTask<THandle?> TryAcquireAsync<THandle>(IDbSynchronizationStrategy<THandle> strategy, DatabaseConnection connection, string resourceName, TimeSpan timeout, CancellationToken cancellationToken)
where THandle : class =>
strategy.TryAcquireAsync(connection, resourceName, timeout, cancellationToken);

public static ValueTask<THandle> AcquireAsync<THandle>(IDbSynchronizationStrategy<THandle> strategy, DatabaseConnection connection, string resourceName, TimeSpan? timeout, CancellationToken cancellationToken)
where THandle : class =>
strategy.TryAcquireAsync(connection, resourceName, timeout, cancellationToken).ThrowTimeoutIfNull();

public static THandle Acquire<THandle>(IDbSynchronizationStrategy<THandle> strategy, DatabaseConnection connection, string resourceName, TimeSpan? timeout, CancellationToken cancellationToken)
where THandle : class =>
SyncViaAsync.Run(
state => AcquireAsync(state.strategy, state.connection, state.resourceName, state.timeout, state.cancellationToken),
(strategy, connection, resourceName, timeout, cancellationToken)
);

public static THandle? TryAcquire<THandle>(IDbSynchronizationStrategy<THandle> strategy, DatabaseConnection connection, string resourceName, TimeSpan timeout, CancellationToken cancellationToken)
where THandle : class =>
SyncViaAsync.Run(
state => TryAcquireAsync(strategy, connection, resourceName, timeout, cancellationToken),
(strategy, connection, resourceName, timeout, cancellationToken)
);
#endregion
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's remove these helper methods. They're just calling strategy.TryAcquire(Async) which can be called directly by PostgresDistributedLock


private static Exception LockTimeout(string? @object = null) => new TimeoutException($"Timeout exceeded when trying to acquire the {@object ?? "lock"}");

public static async ValueTask<T> ThrowTimeoutIfNull<T>(this ValueTask<T?> task, string? @object = null) where T : class =>
Expand Down
2 changes: 1 addition & 1 deletion src/DistributedLock.Core/Internal/Helpers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public readonly struct TaskConversion

public readonly struct TaskConversion<TTo> { }

internal static async ValueTask ConvertToVoid<TResult>(this ValueTask<TResult> task) => await task.ConfigureAwait(false);
public static async ValueTask ConvertToVoid<TResult>(this ValueTask<TResult> task) => await task.ConfigureAwait(false);

public static ValueTask<T> AsValueTask<T>(this Task<T> task) => new(task);
public static ValueTask AsValueTask(this Task task) => new(task);
Expand Down
41 changes: 25 additions & 16 deletions src/DistributedLock.Postgres/PostgresAdvisoryLock.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,12 @@ private PostgresAdvisoryLock(bool isShared)

// Our acquire command will use SET LOCAL to set up statement timeouts. This lasts until the end
// of the current transaction instead of just the current batch if we're in a transaction. To make sure
// we don't leak those settings, in the case of a transaction we first set up a save point which we can
// we don't leak those settings, in the case of a transaction, we first set up a save point which we can
// later roll back (taking the settings changes with it but NOT the lock). Because we can't confidently
// roll back a save point without knowing that it has been set up, we start the save point in its own
// query before we try-catch
var needsSavePoint = await HasTransactionAsync(connection).ConfigureAwait(false);
// query before we try-catch.
var needsSavePoint = await ShouldDefineSavePoint(connection).ConfigureAwait(false);

if (needsSavePoint)
{
using var setSavePointCommand = connection.CreateCommand();
Expand Down Expand Up @@ -124,9 +125,7 @@ private PostgresAdvisoryLock(bool isShared)
async ValueTask RollBackTransactionTimeoutVariablesIfNeededAsync(bool acquired)
{
if (needsSavePoint
// For transaction scoped locks, we can't roll back the save point on success because that will roll
// back our hold on the lock. It's ok to "leak" the savepoint in that case because it's an internally-owned
// transaction/connection and the savepoint will be cleaned up with the disposal of the transaction.
// For transaction scoped locks, we can't roll back the save point on success because that will roll back our hold on the lock.
&& !(acquired && UseTransactionScopedLock(connection)))
{
// attempt to clear the timeout variables we set
Expand Down Expand Up @@ -182,13 +181,17 @@ private DatabaseCommand CreateAcquireCommand(DatabaseConnection connection, Post
return command;
}

private static async ValueTask<bool> HasTransactionAsync(DatabaseConnection connection)
private static async ValueTask<bool> ShouldDefineSavePoint(DatabaseConnection connection)
{
if (connection.HasTransaction) { return true; }
if (!connection.IsExernallyOwned) { return false; }
// If the connection is internally-owned, we only define a save point if a transaction has been opened.
if (!connection.IsExernallyOwned) { return connection.HasTransaction; }

// If the connection is externally-owned with an established transaction, we don't want to pollute it with a save point
// which we won't be able to release in case the lock will be acquired.
if (connection.HasTransaction) { return false; }

// If the connection is externally owned, then it might be part of a transaction that we can't
// see. In that case, the only real way to detect it is to begin a new one
// The externally-owned connection might still be part of a transaction that we can't see.
// In that case, the only real way to detect it is to begin a new one.
try
{
await connection.BeginTransactionAsync().ConfigureAwait(false);
Expand All @@ -199,6 +202,7 @@ private static async ValueTask<bool> HasTransactionAsync(DatabaseConnection conn
}

await connection.DisposeTransactionAsync().ConfigureAwait(false);

return false;
}

Expand All @@ -207,7 +211,13 @@ public ValueTask ReleaseAsync(DatabaseConnection connection, string resourceName

private async ValueTask ReleaseAsync(DatabaseConnection connection, PostgresAdvisoryLockKey key, bool isTry)
{
Invariant.Require(!UseTransactionScopedLock(connection));
// For transaction scoped advisory locks, the lock can only be released by ending the transaction.
// If the transaction is internally-owned, then the lock will be released when the transaction is disposed as part of the internal connection management.
// If the transaction is externally-owned, then the lock will have to be released explicitly by the transaction initiator.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not comfortable with these semantics; it's just too different from how the other locks work to say that releasing the handle does not release the lock. This feels like the kind of thing that will be hard to discover, since correct-looking code will just be wrong and I don't like having to add except with pg externally-owned-transaction-scoped-locks! to all the generic code examples.

The static utility method feels like a better model for what we're trying to do here, which is to apply a one-way change to a transaction without any notion of a returned disposable scope.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I can see why a static method in the API may be a better option in this case. I currently have some worries regarding how exactly it will be implemented, but I'll try.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @madelson, I've finally managed to look into the static utility methods. I still didn't add summay comments for the static methods, and I need to revert the change in the ReleaseAsync method in the PostgresAdvisoryLock class, but please take a look at the recent changes and tell me if I am on the right track.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @madelson, do you think you will have time to look into my changes soon?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Tzachi009 apologies for the long delay. The new static methods look like they're on the right track. I left a few comments.

if (UseTransactionScopedLock(connection))
{
return;
}

using var command = connection.CreateCommand();
command.SetCommandText($"SELECT pg_catalog.pg_advisory_unlock{(this._isShared ? "_shared" : string.Empty)}({AddKeyParametersAndGetKeyArguments(command, key)})");
Expand Down Expand Up @@ -235,10 +245,9 @@ private static string AddKeyParametersAndGetKeyArguments(DatabaseCommand command
}

private static bool UseTransactionScopedLock(DatabaseConnection connection) =>
// This implementation (similar to what we do for SQL Server) is based on the fact that we only create transactions on
// internally-owned connections when doing transaction-scoped locking, and we only support transaction-scoped locking on
// internally-owned connections (since there's no explicit release).
!connection.IsExernallyOwned && connection.HasTransaction;
// Transaction-scoped locking is supported on both externally-owned and internally-owned connections,
// as long as the connection has a transaction.
connection.HasTransaction;

private static string AddPGLocksFilterParametersAndGetFilterExpression(DatabaseCommand command, PostgresAdvisoryLockKey key)
{
Expand Down
56 changes: 56 additions & 0 deletions src/DistributedLock.Postgres/PostgresDistributedLock.Extensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
using Medallion.Threading.Internal;
using System.Data;

namespace Medallion.Threading.Postgres;

public partial class PostgresDistributedLock
{
public static bool TryAcquireWithTransaction(PostgresAdvisoryLockKey key, IDbTransaction transaction, TimeSpan timeout = default, CancellationToken cancellationToken = default)
{
if (key == null) { throw new ArgumentNullException(nameof(key)); }
if (transaction == null) { throw new ArgumentNullException(nameof(transaction)); }

var connection = new PostgresDatabaseConnection(transaction);

var handle = DistributedLockHelpers.TryAcquire(PostgresAdvisoryLock.ExclusiveLock, connection, key.ToString(), timeout, cancellationToken);

return handle != null;
Copy link
Owner

@madelson madelson Dec 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use the SyncViaAsync class to unify the sync/async implementations here. E.g. I think this method should just be:

return SyncViaAsync.Run(
    state => TryAcquireWithTransactionAsync(state.key, state.transaction, state.timeout, state.cancellationToken),
    (key, transaction, timeout, cancellationToken);

}

public static void AcquireWithTransaction(PostgresAdvisoryLockKey key, IDbTransaction transaction, TimeSpan? timeout = null, CancellationToken cancellationToken = default)
{
if (key == null) { throw new ArgumentNullException(nameof(key)); }
if (transaction == null) { throw new ArgumentNullException(nameof(transaction)); }

var connection = new PostgresDatabaseConnection(transaction);

DistributedLockHelpers.Acquire(PostgresAdvisoryLock.ExclusiveLock, connection, key.ToString(), timeout, cancellationToken);
}

public static ValueTask<bool> TryAcquireWithTransactionAsync(PostgresAdvisoryLockKey key, IDbTransaction transaction, TimeSpan timeout = default, CancellationToken cancellationToken = default)
{
if (key == null) { throw new ArgumentNullException(nameof(key)); }
if (transaction == null) { throw new ArgumentNullException(nameof(transaction)); }

var connection = new PostgresDatabaseConnection(transaction);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like we can dispose this object after the acquire operation (just move the creation inside the helper function and add an async using block).


return TryAcquireAsync();

async ValueTask<bool> TryAcquireAsync()
{
var handle = await DistributedLockHelpers.TryAcquireAsync(PostgresAdvisoryLock.ExclusiveLock, connection, key.ToString(), timeout, cancellationToken);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All awaits need ConfigureAwait(false)

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want to call await handle.DisposeAsync().ConfigureAwait(false); here with a comment saying that for an externally-owned transaction the release is a noop but we want to dispose proactively to prevent the handle's managed finalizer (see ManagedFinalizerQueue) from running.


return handle != null;
}
}

public static ValueTask AcquireWithTransactionAsync(PostgresAdvisoryLockKey key, IDbTransaction transaction, TimeSpan? timeout = null, CancellationToken cancellationToken = default)
{
if (key == null) { throw new ArgumentNullException(nameof(key)); }
if (transaction == null) { throw new ArgumentNullException(nameof(transaction)); }

var connection = new PostgresDatabaseConnection(transaction);

return DistributedLockHelpers.AcquireAsync(PostgresAdvisoryLock.ExclusiveLock, connection, key.ToString(), timeout, cancellationToken).ConvertToVoid();
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd expect the implementation of this to be something like:

return TryAcquireWithTransactionAsync(...).ThrowTimeoutIfFalse(); // currently private in DistributedLockHelpers, can be made public

}
}
5 changes: 5 additions & 0 deletions src/DistributedLock.Postgres/PublicAPI.Unshipped.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#nullable enable
static Medallion.Threading.Postgres.PostgresDistributedLock.TryAcquireWithTransaction(Medallion.Threading.Postgres.PostgresAdvisoryLockKey! key, System.Data.IDbTransaction! transaction, System.TimeSpan timeout = default(System.TimeSpan), System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> bool
static Medallion.Threading.Postgres.PostgresDistributedLock.AcquireWithTransaction(Medallion.Threading.Postgres.PostgresAdvisoryLockKey! key, System.Data.IDbTransaction! transaction, System.TimeSpan? timeout = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> void
static Medallion.Threading.Postgres.PostgresDistributedLock.TryAcquireWithTransactionAsync(Medallion.Threading.Postgres.PostgresAdvisoryLockKey! key, System.Data.IDbTransaction! transaction, System.TimeSpan timeout = default(System.TimeSpan), System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask<bool>
static Medallion.Threading.Postgres.PostgresDistributedLock.AcquireWithTransactionAsync(Medallion.Threading.Postgres.PostgresAdvisoryLockKey! key, System.Data.IDbTransaction! transaction, System.TimeSpan? timeout = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask