Skip to content

Commit

Permalink
Let retry policy to decide what to do with potentially executed non i…
Browse files Browse the repository at this point in the history
…dempotent queries
  • Loading branch information
sylwiaszunejko committed Dec 23, 2024
1 parent ad5eff6 commit 3f4d826
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 4 deletions.
4 changes: 4 additions & 0 deletions conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,10 @@ func (t *testRetryPolicy) Attempt(qry RetryableQuery) bool {
return qry.Attempts() <= t.NumRetries
}
func (t *testRetryPolicy) GetRetryType(err error) RetryType {
var executedErr *QueryError
if errors.As(err, &executedErr) && executedErr.potentiallyExecuted {
return Rethrow
}
return Retry
}

Expand Down
24 changes: 24 additions & 0 deletions policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ func (s *SimpleRetryPolicy) AttemptLWT(q RetryableQuery) bool {
}

func (s *SimpleRetryPolicy) GetRetryType(err error) RetryType {
var executedErr *QueryError
if errors.As(err, &executedErr) && executedErr.potentiallyExecuted && !executedErr.isIdempotent {
return Rethrow
}
return RetryNextHost
}

Expand All @@ -168,6 +172,10 @@ func (s *SimpleRetryPolicy) GetRetryType(err error) RetryType {
// even timeouts if other clients send statements touching the same
// partition to the original node at the same time.
func (s *SimpleRetryPolicy) GetRetryTypeLWT(err error) RetryType {
var executedErr *QueryError
if errors.As(err, &executedErr) && executedErr.potentiallyExecuted && !executedErr.isIdempotent {
return Rethrow
}
return Retry
}

Expand Down Expand Up @@ -208,6 +216,10 @@ func getExponentialTime(min time.Duration, max time.Duration, attempts int) time
}

func (e *ExponentialBackoffRetryPolicy) GetRetryType(err error) RetryType {
var executedErr *QueryError
if errors.As(err, &executedErr) && executedErr.potentiallyExecuted && !executedErr.isIdempotent {
return Rethrow
}
return RetryNextHost
}

Expand All @@ -216,6 +228,10 @@ func (e *ExponentialBackoffRetryPolicy) GetRetryType(err error) RetryType {
// even timeouts if other clients send statements touching the same
// partition to the original node at the same time.
func (e *ExponentialBackoffRetryPolicy) GetRetryTypeLWT(err error) RetryType {
var executedErr *QueryError
if errors.As(err, &executedErr) && executedErr.potentiallyExecuted && !executedErr.isIdempotent {
return Rethrow
}
return Retry
}

Expand Down Expand Up @@ -250,6 +266,14 @@ func (d *DowngradingConsistencyRetryPolicy) Attempt(q RetryableQuery) bool {
}

func (d *DowngradingConsistencyRetryPolicy) GetRetryType(err error) RetryType {
var executedErr *QueryError
if errors.As(err, &executedErr) {
err = executedErr.err
if executedErr.potentiallyExecuted && !executedErr.isIdempotent {
return Rethrow
}
}

switch t := err.(type) {
case *RequestErrUnavailable:
if t.Alive > 0 {
Expand Down
17 changes: 13 additions & 4 deletions query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package gocql

import (
"context"
"errors"
"sync"
"time"
)
Expand Down Expand Up @@ -116,6 +117,7 @@ func (q *queryExecutor) do(ctx context.Context, qry ExecutableQuery, hostIter Ne
var lastErr error
var iter *Iter
var conn *Conn
var potentiallyExecuted bool
for selectedHost != nil {
host := selectedHost.Info()
if host == nil || !host.IsUp() {
Expand Down Expand Up @@ -144,11 +146,16 @@ func (q *queryExecutor) do(ctx context.Context, qry ExecutableQuery, hostIter Ne
iter = q.attemptQuery(ctx, qry, conn)
iter.host = selectedHost.Info()
// Update host
switch iter.err {
case context.Canceled, context.DeadlineExceeded, ErrNotFound:
switch {
case errors.Is(iter.err, context.Canceled),
errors.Is(iter.err, context.DeadlineExceeded),
errors.Is(iter.err, ErrNotFound):
// those errors represents logical errors, they should not count
// toward removing a node from the pool
selectedHost.Mark(nil)
if potentiallyExecuted && !qry.IsIdempotent() {
iter.err = &QueryError{err: iter.err, potentiallyExecuted: true, isIdempotent: false}
}
return iter
default:
selectedHost.Mark(iter.err)
Expand All @@ -162,8 +169,10 @@ func (q *queryExecutor) do(ctx context.Context, qry ExecutableQuery, hostIter Ne

lastErr = iter.err

if customErr, ok := iter.err.(*QueryError); ok && customErr.potentiallyExecuted && !qry.IsIdempotent() {
return iter
if customErr, ok := iter.err.(*QueryError); ok && customErr.potentiallyExecuted {
customErr.isIdempotent = qry.IsIdempotent()
lastErr = customErr
potentiallyExecuted = true
}
}
}
Expand Down

0 comments on commit 3f4d826

Please sign in to comment.