From 3f4d8263ee9e5a92d376319d5b89a4df00086ab8 Mon Sep 17 00:00:00 2001 From: sylwiaszunejko Date: Wed, 18 Dec 2024 18:08:40 +0100 Subject: [PATCH] Let retry policy to decide what to do with potentially executed non idempotent queries --- conn_test.go | 4 ++++ policies.go | 24 ++++++++++++++++++++++++ query_executor.go | 17 +++++++++++++---- 3 files changed, 41 insertions(+), 4 deletions(-) diff --git a/conn_test.go b/conn_test.go index 0d4a46885..8bfc6a92a 100644 --- a/conn_test.go +++ b/conn_test.go @@ -456,6 +456,10 @@ func (t *testRetryPolicy) Attempt(qry RetryableQuery) bool { return qry.Attempts() <= t.NumRetries } func (t *testRetryPolicy) GetRetryType(err error) RetryType { + var executedErr *QueryError + if errors.As(err, &executedErr) && executedErr.potentiallyExecuted { + return Rethrow + } return Retry } diff --git a/policies.go b/policies.go index ca89aecba..3dfeb7229 100644 --- a/policies.go +++ b/policies.go @@ -160,6 +160,10 @@ func (s *SimpleRetryPolicy) AttemptLWT(q RetryableQuery) bool { } func (s *SimpleRetryPolicy) GetRetryType(err error) RetryType { + var executedErr *QueryError + if errors.As(err, &executedErr) && executedErr.potentiallyExecuted && !executedErr.isIdempotent { + return Rethrow + } return RetryNextHost } @@ -168,6 +172,10 @@ func (s *SimpleRetryPolicy) GetRetryType(err error) RetryType { // even timeouts if other clients send statements touching the same // partition to the original node at the same time. func (s *SimpleRetryPolicy) GetRetryTypeLWT(err error) RetryType { + var executedErr *QueryError + if errors.As(err, &executedErr) && executedErr.potentiallyExecuted && !executedErr.isIdempotent { + return Rethrow + } return Retry } @@ -208,6 +216,10 @@ func getExponentialTime(min time.Duration, max time.Duration, attempts int) time } func (e *ExponentialBackoffRetryPolicy) GetRetryType(err error) RetryType { + var executedErr *QueryError + if errors.As(err, &executedErr) && executedErr.potentiallyExecuted && !executedErr.isIdempotent { + return Rethrow + } return RetryNextHost } @@ -216,6 +228,10 @@ func (e *ExponentialBackoffRetryPolicy) GetRetryType(err error) RetryType { // even timeouts if other clients send statements touching the same // partition to the original node at the same time. func (e *ExponentialBackoffRetryPolicy) GetRetryTypeLWT(err error) RetryType { + var executedErr *QueryError + if errors.As(err, &executedErr) && executedErr.potentiallyExecuted && !executedErr.isIdempotent { + return Rethrow + } return Retry } @@ -250,6 +266,14 @@ func (d *DowngradingConsistencyRetryPolicy) Attempt(q RetryableQuery) bool { } func (d *DowngradingConsistencyRetryPolicy) GetRetryType(err error) RetryType { + var executedErr *QueryError + if errors.As(err, &executedErr) { + err = executedErr.err + if executedErr.potentiallyExecuted && !executedErr.isIdempotent { + return Rethrow + } + } + switch t := err.(type) { case *RequestErrUnavailable: if t.Alive > 0 { diff --git a/query_executor.go b/query_executor.go index 64b984168..4d8005117 100644 --- a/query_executor.go +++ b/query_executor.go @@ -2,6 +2,7 @@ package gocql import ( "context" + "errors" "sync" "time" ) @@ -116,6 +117,7 @@ func (q *queryExecutor) do(ctx context.Context, qry ExecutableQuery, hostIter Ne var lastErr error var iter *Iter var conn *Conn + var potentiallyExecuted bool for selectedHost != nil { host := selectedHost.Info() if host == nil || !host.IsUp() { @@ -144,11 +146,16 @@ func (q *queryExecutor) do(ctx context.Context, qry ExecutableQuery, hostIter Ne iter = q.attemptQuery(ctx, qry, conn) iter.host = selectedHost.Info() // Update host - switch iter.err { - case context.Canceled, context.DeadlineExceeded, ErrNotFound: + switch { + case errors.Is(iter.err, context.Canceled), + errors.Is(iter.err, context.DeadlineExceeded), + errors.Is(iter.err, ErrNotFound): // those errors represents logical errors, they should not count // toward removing a node from the pool selectedHost.Mark(nil) + if potentiallyExecuted && !qry.IsIdempotent() { + iter.err = &QueryError{err: iter.err, potentiallyExecuted: true, isIdempotent: false} + } return iter default: selectedHost.Mark(iter.err) @@ -162,8 +169,10 @@ func (q *queryExecutor) do(ctx context.Context, qry ExecutableQuery, hostIter Ne lastErr = iter.err - if customErr, ok := iter.err.(*QueryError); ok && customErr.potentiallyExecuted && !qry.IsIdempotent() { - return iter + if customErr, ok := iter.err.(*QueryError); ok && customErr.potentiallyExecuted { + customErr.isIdempotent = qry.IsIdempotent() + lastErr = customErr + potentiallyExecuted = true } } }