-
Notifications
You must be signed in to change notification settings - Fork 200
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
#213 Postgres: Add support for transaction-scoped advisory locks with external transactions #222
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -46,11 +46,12 @@ private PostgresAdvisoryLock(bool isShared) | |
|
||
// Our acquire command will use SET LOCAL to set up statement timeouts. This lasts until the end | ||
// of the current transaction instead of just the current batch if we're in a transaction. To make sure | ||
// we don't leak those settings, in the case of a transaction we first set up a save point which we can | ||
// we don't leak those settings, in the case of a transaction, we first set up a save point which we can | ||
// later roll back (taking the settings changes with it but NOT the lock). Because we can't confidently | ||
// roll back a save point without knowing that it has been set up, we start the save point in its own | ||
// query before we try-catch | ||
var needsSavePoint = await HasTransactionAsync(connection).ConfigureAwait(false); | ||
// query before we try-catch. | ||
var needsSavePoint = await ShouldDefineSavePoint(connection).ConfigureAwait(false); | ||
|
||
if (needsSavePoint) | ||
{ | ||
using var setSavePointCommand = connection.CreateCommand(); | ||
|
@@ -124,9 +125,7 @@ private PostgresAdvisoryLock(bool isShared) | |
async ValueTask RollBackTransactionTimeoutVariablesIfNeededAsync(bool acquired) | ||
{ | ||
if (needsSavePoint | ||
// For transaction scoped locks, we can't roll back the save point on success because that will roll | ||
// back our hold on the lock. It's ok to "leak" the savepoint in that case because it's an internally-owned | ||
// transaction/connection and the savepoint will be cleaned up with the disposal of the transaction. | ||
// For transaction scoped locks, we can't roll back the save point on success because that will roll back our hold on the lock. | ||
&& !(acquired && UseTransactionScopedLock(connection))) | ||
{ | ||
// attempt to clear the timeout variables we set | ||
|
@@ -182,13 +181,17 @@ private DatabaseCommand CreateAcquireCommand(DatabaseConnection connection, Post | |
return command; | ||
} | ||
|
||
private static async ValueTask<bool> HasTransactionAsync(DatabaseConnection connection) | ||
private static async ValueTask<bool> ShouldDefineSavePoint(DatabaseConnection connection) | ||
{ | ||
if (connection.HasTransaction) { return true; } | ||
if (!connection.IsExernallyOwned) { return false; } | ||
// If the connection is internally-owned, we only define a save point if a transaction has been opened. | ||
if (!connection.IsExernallyOwned) { return connection.HasTransaction; } | ||
|
||
// If the connection is externally-owned with an established transaction, we don't want to pollute it with a save point | ||
// which we won't be able to release in case the lock will be acquired. | ||
if (connection.HasTransaction) { return false; } | ||
|
||
// If the connection is externally owned, then it might be part of a transaction that we can't | ||
// see. In that case, the only real way to detect it is to begin a new one | ||
// The externally-owned connection might still be part of a transaction that we can't see. | ||
// In that case, the only real way to detect it is to begin a new one. | ||
try | ||
{ | ||
await connection.BeginTransactionAsync().ConfigureAwait(false); | ||
|
@@ -199,6 +202,7 @@ private static async ValueTask<bool> HasTransactionAsync(DatabaseConnection conn | |
} | ||
|
||
await connection.DisposeTransactionAsync().ConfigureAwait(false); | ||
|
||
return false; | ||
} | ||
|
||
|
@@ -207,7 +211,13 @@ public ValueTask ReleaseAsync(DatabaseConnection connection, string resourceName | |
|
||
private async ValueTask ReleaseAsync(DatabaseConnection connection, PostgresAdvisoryLockKey key, bool isTry) | ||
{ | ||
Invariant.Require(!UseTransactionScopedLock(connection)); | ||
// For transaction scoped advisory locks, the lock can only be released by ending the transaction. | ||
// If the transaction is internally-owned, then the lock will be released when the transaction is disposed as part of the internal connection management. | ||
// If the transaction is externally-owned, then the lock will have to be released explicitly by the transaction initiator. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not comfortable with these semantics; it's just too different from how the other locks work to say that releasing the handle does not release the lock. This feels like the kind of thing that will be hard to discover, since correct-looking code will just be wrong and I don't like having to add The static utility method feels like a better model for what we're trying to do here, which is to apply a one-way change to a transaction without any notion of a returned disposable scope. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, I can see why a static method in the API may be a better option in this case. I currently have some worries regarding how exactly it will be implemented, but I'll try. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @madelson, I've finally managed to look into the static utility methods. I still didn't add summay comments for the static methods, and I need to revert the change in the ReleaseAsync method in the PostgresAdvisoryLock class, but please take a look at the recent changes and tell me if I am on the right track. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hi @madelson, do you think you will have time to look into my changes soon? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @Tzachi009 apologies for the long delay. The new static methods look like they're on the right track. I left a few comments. |
||
if (UseTransactionScopedLock(connection)) | ||
{ | ||
return; | ||
} | ||
|
||
using var command = connection.CreateCommand(); | ||
command.SetCommandText($"SELECT pg_catalog.pg_advisory_unlock{(this._isShared ? "_shared" : string.Empty)}({AddKeyParametersAndGetKeyArguments(command, key)})"); | ||
|
@@ -235,10 +245,9 @@ private static string AddKeyParametersAndGetKeyArguments(DatabaseCommand command | |
} | ||
|
||
private static bool UseTransactionScopedLock(DatabaseConnection connection) => | ||
// This implementation (similar to what we do for SQL Server) is based on the fact that we only create transactions on | ||
// internally-owned connections when doing transaction-scoped locking, and we only support transaction-scoped locking on | ||
// internally-owned connections (since there's no explicit release). | ||
!connection.IsExernallyOwned && connection.HasTransaction; | ||
// Transaction-scoped locking is supported on both externally-owned and internally-owned connections, | ||
// as long as the connection has a transaction. | ||
connection.HasTransaction; | ||
|
||
private static string AddPGLocksFilterParametersAndGetFilterExpression(DatabaseCommand command, PostgresAdvisoryLockKey key) | ||
{ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
using Medallion.Threading.Internal; | ||
using System.Data; | ||
|
||
namespace Medallion.Threading.Postgres; | ||
|
||
public partial class PostgresDistributedLock | ||
{ | ||
public static bool TryAcquireWithTransaction(PostgresAdvisoryLockKey key, IDbTransaction transaction, TimeSpan timeout = default, CancellationToken cancellationToken = default) | ||
{ | ||
if (key == null) { throw new ArgumentNullException(nameof(key)); } | ||
if (transaction == null) { throw new ArgumentNullException(nameof(transaction)); } | ||
|
||
var connection = new PostgresDatabaseConnection(transaction); | ||
|
||
var handle = DistributedLockHelpers.TryAcquire(PostgresAdvisoryLock.ExclusiveLock, connection, key.ToString(), timeout, cancellationToken); | ||
|
||
return handle != null; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can use the
|
||
} | ||
|
||
public static void AcquireWithTransaction(PostgresAdvisoryLockKey key, IDbTransaction transaction, TimeSpan? timeout = null, CancellationToken cancellationToken = default) | ||
{ | ||
if (key == null) { throw new ArgumentNullException(nameof(key)); } | ||
if (transaction == null) { throw new ArgumentNullException(nameof(transaction)); } | ||
|
||
var connection = new PostgresDatabaseConnection(transaction); | ||
|
||
DistributedLockHelpers.Acquire(PostgresAdvisoryLock.ExclusiveLock, connection, key.ToString(), timeout, cancellationToken); | ||
} | ||
|
||
public static ValueTask<bool> TryAcquireWithTransactionAsync(PostgresAdvisoryLockKey key, IDbTransaction transaction, TimeSpan timeout = default, CancellationToken cancellationToken = default) | ||
{ | ||
if (key == null) { throw new ArgumentNullException(nameof(key)); } | ||
if (transaction == null) { throw new ArgumentNullException(nameof(transaction)); } | ||
|
||
var connection = new PostgresDatabaseConnection(transaction); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I feel like we can dispose this object after the acquire operation (just move the creation inside the helper function and add an async using block). |
||
|
||
return TryAcquireAsync(); | ||
|
||
async ValueTask<bool> TryAcquireAsync() | ||
{ | ||
var handle = await DistributedLockHelpers.TryAcquireAsync(PostgresAdvisoryLock.ExclusiveLock, connection, key.ToString(), timeout, cancellationToken); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All awaits need ConfigureAwait(false) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we want to call |
||
|
||
return handle != null; | ||
} | ||
} | ||
|
||
public static ValueTask AcquireWithTransactionAsync(PostgresAdvisoryLockKey key, IDbTransaction transaction, TimeSpan? timeout = null, CancellationToken cancellationToken = default) | ||
{ | ||
if (key == null) { throw new ArgumentNullException(nameof(key)); } | ||
if (transaction == null) { throw new ArgumentNullException(nameof(transaction)); } | ||
|
||
var connection = new PostgresDatabaseConnection(transaction); | ||
|
||
return DistributedLockHelpers.AcquireAsync(PostgresAdvisoryLock.ExclusiveLock, connection, key.ToString(), timeout, cancellationToken).ConvertToVoid(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd expect the implementation of this to be something like:
|
||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
#nullable enable | ||
static Medallion.Threading.Postgres.PostgresDistributedLock.TryAcquireWithTransaction(Medallion.Threading.Postgres.PostgresAdvisoryLockKey! key, System.Data.IDbTransaction! transaction, System.TimeSpan timeout = default(System.TimeSpan), System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> bool | ||
static Medallion.Threading.Postgres.PostgresDistributedLock.AcquireWithTransaction(Medallion.Threading.Postgres.PostgresAdvisoryLockKey! key, System.Data.IDbTransaction! transaction, System.TimeSpan? timeout = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> void | ||
static Medallion.Threading.Postgres.PostgresDistributedLock.TryAcquireWithTransactionAsync(Medallion.Threading.Postgres.PostgresAdvisoryLockKey! key, System.Data.IDbTransaction! transaction, System.TimeSpan timeout = default(System.TimeSpan), System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask<bool> | ||
static Medallion.Threading.Postgres.PostgresDistributedLock.AcquireWithTransactionAsync(Medallion.Threading.Postgres.PostgresAdvisoryLockKey! key, System.Data.IDbTransaction! transaction, System.TimeSpan? timeout = null, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's remove these helper methods. They're just calling
strategy.TryAcquire(Async)
which can be called directly byPostgresDistributedLock