Skip to content

Commit

Permalink
Let retry policy to decide what to do with potentially executed non i…
Browse files Browse the repository at this point in the history
…dempotent queries
  • Loading branch information
sylwiaszunejko committed Dec 18, 2024
1 parent a7433d8 commit 83d3210
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 1 deletion.
12 changes: 12 additions & 0 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1936,3 +1936,15 @@ func (e *QueryError) Error() string {
func (e *QueryError) Unwrap() error {
return e.err
}

type PotentiallyExecutedNotIdempotentError struct {
err error
}

func (e *PotentiallyExecutedNotIdempotentError) Error() string {
return fmt.Sprintf("Potentially executed not idempotent query: %s", e.err.Error())
}

func (e *PotentiallyExecutedNotIdempotentError) Unwrap() error {
return e.err
}
4 changes: 4 additions & 0 deletions conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,10 @@ func (t *testRetryPolicy) Attempt(qry RetryableQuery) bool {
return qry.Attempts() <= t.NumRetries
}
func (t *testRetryPolicy) GetRetryType(err error) RetryType {
var executedErr *PotentiallyExecutedNotIdempotentError
if errors.As(err, &executedErr) {
return Rethrow
}
return Retry
}

Expand Down
21 changes: 21 additions & 0 deletions policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ func (s *SimpleRetryPolicy) AttemptLWT(q RetryableQuery) bool {
}

func (s *SimpleRetryPolicy) GetRetryType(err error) RetryType {
var executedErr *PotentiallyExecutedNotIdempotentError
if errors.As(err, &executedErr) {
return Rethrow
}
return RetryNextHost
}

Expand All @@ -168,6 +172,10 @@ func (s *SimpleRetryPolicy) GetRetryType(err error) RetryType {
// even timeouts if other clients send statements touching the same
// partition to the original node at the same time.
func (s *SimpleRetryPolicy) GetRetryTypeLWT(err error) RetryType {
var executedErr *PotentiallyExecutedNotIdempotentError
if errors.As(err, &executedErr) {
return Rethrow
}
return Retry
}

Expand Down Expand Up @@ -208,6 +216,10 @@ func getExponentialTime(min time.Duration, max time.Duration, attempts int) time
}

func (e *ExponentialBackoffRetryPolicy) GetRetryType(err error) RetryType {
var executedErr *PotentiallyExecutedNotIdempotentError
if errors.As(err, &executedErr) {
return Rethrow
}
return RetryNextHost
}

Expand All @@ -216,6 +228,10 @@ func (e *ExponentialBackoffRetryPolicy) GetRetryType(err error) RetryType {
// even timeouts if other clients send statements touching the same
// partition to the original node at the same time.
func (e *ExponentialBackoffRetryPolicy) GetRetryTypeLWT(err error) RetryType {
var executedErr *PotentiallyExecutedNotIdempotentError
if errors.As(err, &executedErr) {
return Rethrow
}
return Retry
}

Expand Down Expand Up @@ -250,6 +266,11 @@ func (d *DowngradingConsistencyRetryPolicy) Attempt(q RetryableQuery) bool {
}

func (d *DowngradingConsistencyRetryPolicy) GetRetryType(err error) RetryType {
var executedErr *PotentiallyExecutedNotIdempotentError
if errors.As(err, &executedErr) {
return Rethrow
}

switch t := err.(type) {
case *RequestErrUnavailable:
if t.Alive > 0 {
Expand Down
2 changes: 1 addition & 1 deletion query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (q *queryExecutor) do(ctx context.Context, qry ExecutableQuery, hostIter Ne
lastErr = iter.err

if customErr, ok := iter.err.(*QueryError); ok && customErr.potentiallyExecuted && !qry.IsIdempotent() {
return iter
lastErr = &PotentiallyExecutedNotIdempotentError{err: customErr.err}
}
}
}
Expand Down

0 comments on commit 83d3210

Please sign in to comment.