Skip to content

Commit

Permalink
Always consider retry policy when executing query
Browse files Browse the repository at this point in the history
  • Loading branch information
sylwiaszunejko committed Dec 18, 2024
1 parent 11bc473 commit a7433d8
Showing 1 changed file with 52 additions and 42 deletions.
94 changes: 52 additions & 42 deletions query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,45 +115,61 @@ func (q *queryExecutor) do(ctx context.Context, qry ExecutableQuery, hostIter Ne

var lastErr error
var iter *Iter
var conn *Conn
for selectedHost != nil {
host := selectedHost.Info()
if host == nil || !host.IsUp() {
selectedHost = hostIter()
continue
}

pool, ok := q.pool.getPool(host)
if !ok {
selectedHost = hostIter()
continue
}

conn := pool.Pick(selectedHost.Token(), qry)
if conn == nil {
selectedHost = hostIter()
continue
}

iter = q.attemptQuery(ctx, qry, conn)
iter.host = selectedHost.Info()
// Update host
switch iter.err {
case context.Canceled, context.DeadlineExceeded, ErrNotFound:
// those errors represents logical errors, they should not count
// toward removing a node from the pool
selectedHost.Mark(nil)
return iter
default:
selectedHost.Mark(iter.err)
}

// Exit if the query was successful
// or no retry policy defined
if iter.err == nil || rt == nil {
return iter
if rt == nil {
selectedHost = hostIter()
lastErr = ErrHostDown
continue
}
} else {
pool, ok := q.pool.getPool(host)
if !ok {
if rt == nil {
selectedHost = hostIter()
lastErr = ErrNoPool
continue
}
} else {
conn = pool.Pick(selectedHost.Token(), qry)
if conn == nil {
if rt == nil {
selectedHost = hostIter()
lastErr = ErrNoConnectionsInPool
continue
}
} else {
iter = q.attemptQuery(ctx, qry, conn)
iter.host = selectedHost.Info()
// Update host
switch iter.err {
case context.Canceled, context.DeadlineExceeded, ErrNotFound:
// those errors represents logical errors, they should not count
// toward removing a node from the pool
selectedHost.Mark(nil)
return iter
default:
selectedHost.Mark(iter.err)
}

// Exit if the query was successful
// or no retry policy defined
if iter.err == nil || rt == nil {
return iter
}

lastErr = iter.err

if customErr, ok := iter.err.(*QueryError); ok && customErr.potentiallyExecuted && !qry.IsIdempotent() {
return iter
}
}
}
}

// or retry policy decides to not retry anymore
// Exit if retry policy decides to not retry anymore
if use_lwt_rt {
if !lwt_rt.AttemptLWT(qry) {
return iter
Expand All @@ -164,17 +180,11 @@ func (q *queryExecutor) do(ctx context.Context, qry ExecutableQuery, hostIter Ne
}
}

lastErr = iter.err

if customErr, ok := iter.err.(*QueryError); ok && customErr.potentiallyExecuted && !qry.IsIdempotent() {
return iter
}

var retry_type RetryType
if use_lwt_rt {
retry_type = lwt_rt.GetRetryTypeLWT(iter.err)
retry_type = lwt_rt.GetRetryTypeLWT(lastErr)
} else {
retry_type = rt.GetRetryType(iter.err)
retry_type = rt.GetRetryType(lastErr)
}

// If query is unsuccessful, check the error with RetryPolicy to retry
Expand Down

0 comments on commit a7433d8

Please sign in to comment.