Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

conflict detector(cdc): fix potential dead lock when a transaction generated multiple same hash #10335

Merged
merged 7 commits into from
Dec 27, 2023
43 changes: 38 additions & 5 deletions cdc/sink/dmlsink/txn/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,44 @@ func (e *txnEvent) OnConflictResolved() {
e.conflictResolved = time.Now()
}

// ConflictKeys implements causality.txnEvent interface.
func (e *txnEvent) ConflictKeys(numSlots uint64) []uint64 {
keys := genTxnKeys(e.TxnCallbackableEvent.Event)
sort.Slice(keys, func(i, j int) bool { return keys[i]%numSlots < keys[j]%numSlots })
return keys
// GenSortedDedupKeysHash implements causality.txnEvent interface.
func (e *txnEvent) GenSortedDedupKeysHash(numSlots uint64) []uint64 {
hashes := genTxnKeys(e.TxnCallbackableEvent.Event)

// Sort and dedup hashes.
// Sort hashes by `hash % numSlots` to avoid deadlock, and then dedup
// hashes, so the same txn will not check confict with the same hash twice to
// prevent potential cyclic self dependency in the causality dependency
// graph.
return sortAndDedupHashes(hashes, numSlots)
}

func sortAndDedupHashes(hashes []uint64, numSlots uint64) []uint64 {
if len(hashes) == 0 {
return nil
}

// Sort hashes by `hash % numSlots` to avoid deadlock.
sort.Slice(hashes, func(i, j int) bool { return hashes[i]%numSlots < hashes[j]%numSlots })

// Dedup hashes
last := hashes[0]
j := 1
for i, hash := range hashes {
if i == 0 {
// skip first one, start checking duplication from 2nd one
continue
}
if hash == last {
continue
}
last = hash
hashes[j] = hash
j++
}
hashes = hashes[:j]

return hashes
}

// genTxnKeys returns hash keys for `txn`.
Expand Down
27 changes: 27 additions & 0 deletions cdc/sink/dmlsink/txn/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,3 +203,30 @@ func TestGenKeys(t *testing.T) {
require.Equal(t, tc.expected, keys)
}
}

func TestSortAndDedupHash(t *testing.T) {
// If a transaction contains multiple rows, these rows may generate the same hash
// in some rare cases. We should dedup these hashes to avoid unnecessary self cyclic
// dependency in the causality dependency graph.
t.Parallel()
testCases := []struct {
hashes []uint64
expected []uint64
}{{
// No duplicate hashes
hashes: []uint64{1, 2, 3, 4, 5},
expected: []uint64{1, 2, 3, 4, 5},
}, {
// Duplicate hashes
hashes: []uint64{1, 2, 3, 4, 5, 1, 2, 3, 4, 5},
expected: []uint64{1, 2, 3, 4, 5},
}, {
// Has hash value larger than slots count, should sort by `hash % numSlots` first.
hashes: []uint64{4, 9, 9, 3},
expected: []uint64{9, 3, 4},
}}

for _, tc := range testCases {
require.Equal(t, tc.expected, sortAndDedupHashes(tc.hashes, 8))
}
}
40 changes: 21 additions & 19 deletions cdc/sink/dmlsink/txn/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (

type txnWithNotifier struct {
*txnEvent
wantMore func()
postTxnExecuted func()
}

type worker struct {
Expand All @@ -49,9 +49,9 @@ type worker struct {
metricTxnWorkerHandledRows prometheus.Counter

// Fields only used in the background loop.
flushInterval time.Duration
hasPending bool
wantMoreCallbacks []func()
flushInterval time.Duration
hasPending bool
postTxnExecutedCallbacks []func()
}

func newWorker(ctx context.Context, changefeedID model.ChangeFeedID,
Expand All @@ -73,17 +73,19 @@ func newWorker(ctx context.Context, changefeedID model.ChangeFeedID,
metricTxnWorkerBusyRatio: txn.WorkerBusyRatio.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricTxnWorkerHandledRows: txn.WorkerHandledRows.WithLabelValues(changefeedID.Namespace, changefeedID.ID, wid),

flushInterval: backend.MaxFlushInterval(),
hasPending: false,
wantMoreCallbacks: make([]func(), 0, 1024),
flushInterval: backend.MaxFlushInterval(),
hasPending: false,
postTxnExecutedCallbacks: make([]func(), 0, 1024),
}
}

// Add adds a txnEvent to the worker.
// The worker will call unlock() when it's ready to receive more events.
// In other words, it maybe advances the conflict detector.
func (w *worker) Add(txn *txnEvent, unlock func()) {
w.txnCh.In() <- txnWithNotifier{txn, unlock}
// The worker will call postTxnExecuted() after the txn executed.
// The postTxnExecuted will remove the txn related Node in the conflict detector's
// dependency graph and resolve related dependencies for these transacitons
// which depend on this executed txn.
func (w *worker) Add(txn *txnEvent, postTxnExecuted func()) {
w.txnCh.In() <- txnWithNotifier{txn, postTxnExecuted}
}

func (w *worker) close() {
Expand Down Expand Up @@ -154,15 +156,15 @@ func (w *worker) onEvent(txn txnWithNotifier) bool {
// The table where the event comes from is in stopping, so it's safe
// to drop the event directly.
txn.txnEvent.Callback()
// Still necessary to append the wantMore callback into the pending list.
w.wantMoreCallbacks = append(w.wantMoreCallbacks, txn.wantMore)
// Still necessary to append the callbacks into the pending list.
w.postTxnExecutedCallbacks = append(w.postTxnExecutedCallbacks, txn.postTxnExecuted)
return false
}

w.metricConflictDetectDuration.Observe(txn.conflictResolved.Sub(txn.start).Seconds())
w.metricQueueDuration.Observe(time.Since(txn.start).Seconds())
w.metricTxnWorkerHandledRows.Add(float64(len(txn.Event.Rows)))
w.wantMoreCallbacks = append(w.wantMoreCallbacks, txn.wantMore)
w.postTxnExecutedCallbacks = append(w.postTxnExecutedCallbacks, txn.postTxnExecuted)
return w.backend.OnTxnEvent(txn.txnEvent.TxnCallbackableEvent)
}

Expand All @@ -185,13 +187,13 @@ func (w *worker) doFlush(flushTimeSlice *time.Duration) error {
return err
}
// Flush successfully, call callbacks to notify conflict detector.
for _, wantMore := range w.wantMoreCallbacks {
wantMore()
for _, postTxnExecuted := range w.postTxnExecutedCallbacks {
postTxnExecuted()
}
w.wantMoreCallbacks = w.wantMoreCallbacks[:0]
if cap(w.wantMoreCallbacks) > 1024 {
w.postTxnExecutedCallbacks = w.postTxnExecutedCallbacks[:0]
if cap(w.postTxnExecutedCallbacks) > 1024 {
// Resize the buffer if it's too big.
w.wantMoreCallbacks = make([]func(), 0, 1024)
w.postTxnExecutedCallbacks = make([]func(), 0, 1024)
}
}

Expand Down
31 changes: 20 additions & 11 deletions pkg/causality/conflict_detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ type ConflictDetector[Worker worker[Txn], Txn txnEvent] struct {
}

type txnFinishedEvent struct {
node *internal.Node
conflictKeys []uint64
node *internal.Node
sortedKeysHash []uint64
}

// NewConflictDetector creates a new ConflictDetector.
Expand Down Expand Up @@ -73,20 +73,29 @@ func NewConflictDetector[Worker worker[Txn], Txn txnEvent](

// Add pushes a transaction to the ConflictDetector.
//
// NOTE: if multiple threads access this concurrently, Txn.ConflictKeys must be sorted.
// NOTE: if multiple threads access this concurrently,
// Txn.GenSortedDedupKeysHash must be sorted by the slot index.
func (d *ConflictDetector[Worker, Txn]) Add(txn Txn) {
conflictKeys := txn.ConflictKeys(d.numSlots)
sortedKeysHash := txn.GenSortedDedupKeysHash(d.numSlots)
node := internal.NewNode()
node.OnResolved = func(workerID int64) {
unlock := func() {
// This callback is called after the transaction is executed.
postTxnExecuted := func() {
// After this transaction is executed, we can remove the node from the graph,
// and resolve related dependencies for these transacitons which depend on this
// executed transaction.
node.Remove()
d.garbageNodes.In() <- txnFinishedEvent{node, conflictKeys}

// Send this node to garbageNodes to GC it from the slots if this node is still
// occupied related slots.
d.garbageNodes.In() <- txnFinishedEvent{node, sortedKeysHash}
}
d.sendToWorker(txn, unlock, workerID)
// Send this txn to related worker as soon as all dependencies are resolved.
d.sendToWorker(txn, postTxnExecuted, workerID)
}
node.RandWorkerID = func() int64 { return d.nextWorkerID.Add(1) % int64(len(d.workers)) }
node.OnNotified = func(callback func()) { d.notifiedNodes.In() <- callback }
d.slots.Add(node, conflictKeys)
d.slots.Add(node, sortedKeysHash)
}

// Close closes the ConflictDetector.
Expand All @@ -110,18 +119,18 @@ func (d *ConflictDetector[Worker, Txn]) runBackgroundTasks() {
}
case event := <-d.garbageNodes.Out():
if event.node != nil {
d.slots.Free(event.node, event.conflictKeys)
d.slots.Free(event.node, event.sortedKeysHash)
}
}
}
}

// sendToWorker should not call txn.Callback if it returns an error.
func (d *ConflictDetector[Worker, Txn]) sendToWorker(txn Txn, unlock func(), workerID int64) {
func (d *ConflictDetector[Worker, Txn]) sendToWorker(txn Txn, postTxnExecuted func(), workerID int64) {
if workerID < 0 {
panic("must assign with a valid workerID")
}
txn.OnConflictResolved()
worker := d.workers[workerID]
worker.Add(txn, unlock)
worker.Add(txn, postTxnExecuted)
}
Loading
Loading