diff --git a/events/dag_sync.go b/events/dag_sync.go index b8f0137ba2..d6150c9118 100644 --- a/events/dag_sync.go +++ b/events/dag_sync.go @@ -31,6 +31,4 @@ type DAGMerge struct { // Wg is a wait group that can be used to synchronize the merge, // allowing the caller to optionnaly block until the merge is complete. Wg *sync.WaitGroup - // RetryCount is the number of times this merge has been retried due to a conflict. - RetryCount int } diff --git a/internal/db/merge.go b/internal/db/merge.go index 63a728e38c..ef10ea62c4 100644 --- a/internal/db/merge.go +++ b/internal/db/merge.go @@ -49,19 +49,26 @@ func (db *db) handleMerges(ctx context.Context, merges events.Subscription[event queue.add(merge.DocID) defer queue.done(merge.DocID) + var err error // retry merge up to max txn retries for i := 0; i < db.MaxTxnRetries(); i++ { - err := db.executeMerge(ctx, merge) + err = db.executeMerge(ctx, merge) if errors.Is(err, badger.ErrTxnConflict) { continue // retry merge } - if err != nil { - log.ErrorContextE(ctx, "Failed to execute merge", err, corelog.Any("Merge", merge)) - } - if merge.Wg != nil { - merge.Wg.Done() - } - break // merge completed + break // merge success or error + } + + if err != nil { + log.ErrorContextE( + ctx, + "Failed to execute merge", + err, + corelog.Any("Error", err), + corelog.Any("Event", merge)) + } + if merge.Wg != nil { + merge.Wg.Done() } }() }