Skip to content

Commit

Permalink
conflict detector(cdc): fix potential dead lock when a transaction ge…
Browse files Browse the repository at this point in the history
…nerated multiple same hash (#10335) (#10367)

close #10334
  • Loading branch information
ti-chi-bot authored Dec 28, 2023
1 parent ec1af7c commit 39aa61a
Show file tree
Hide file tree
Showing 9 changed files with 210 additions and 108 deletions.
43 changes: 38 additions & 5 deletions cdc/sinkv2/eventsink/txn/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,44 @@ func (e *txnEvent) OnConflictResolved() {
e.conflictResolved = time.Now()
}

// ConflictKeys implements causality.txnEvent interface.
func (e *txnEvent) ConflictKeys(numSlots uint64) []uint64 {
keys := genTxnKeys(e.TxnCallbackableEvent.Event)
sort.Slice(keys, func(i, j int) bool { return keys[i]%numSlots < keys[j]%numSlots })
return keys
// GenSortedDedupKeysHash implements causality.txnEvent interface.
func (e *txnEvent) GenSortedDedupKeysHash(numSlots uint64) []uint64 {
hashes := genTxnKeys(e.TxnCallbackableEvent.Event)

// Sort and dedup hashes.
// Sort hashes by `hash % numSlots` to avoid deadlock, and then dedup
// hashes, so the same txn will not check confict with the same hash twice to
// prevent potential cyclic self dependency in the causality dependency
// graph.
return sortAndDedupHashes(hashes, numSlots)
}

func sortAndDedupHashes(hashes []uint64, numSlots uint64) []uint64 {
if len(hashes) == 0 {
return nil
}

// Sort hashes by `hash % numSlots` to avoid deadlock.
sort.Slice(hashes, func(i, j int) bool { return hashes[i]%numSlots < hashes[j]%numSlots })

// Dedup hashes
last := hashes[0]
j := 1
for i, hash := range hashes {
if i == 0 {
// skip first one, start checking duplication from 2nd one
continue
}
if hash == last {
continue
}
last = hash
hashes[j] = hash
j++
}
hashes = hashes[:j]

return hashes
}

// genTxnKeys returns hash keys for `txn`.
Expand Down
27 changes: 27 additions & 0 deletions cdc/sinkv2/eventsink/txn/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,3 +203,30 @@ func TestGenKeys(t *testing.T) {
require.Equal(t, tc.expected, keys)
}
}

func TestSortAndDedupHash(t *testing.T) {
// If a transaction contains multiple rows, these rows may generate the same hash
// in some rare cases. We should dedup these hashes to avoid unnecessary self cyclic
// dependency in the causality dependency graph.
t.Parallel()
testCases := []struct {
hashes []uint64
expected []uint64
}{{
// No duplicate hashes
hashes: []uint64{1, 2, 3, 4, 5},
expected: []uint64{1, 2, 3, 4, 5},
}, {
// Duplicate hashes
hashes: []uint64{1, 2, 3, 4, 5, 1, 2, 3, 4, 5},
expected: []uint64{1, 2, 3, 4, 5},
}, {
// Has hash value larger than slots count, should sort by `hash % numSlots` first.
hashes: []uint64{4, 9, 9, 3},
expected: []uint64{9, 3, 4},
}}

for _, tc := range testCases {
require.Equal(t, tc.expected, sortAndDedupHashes(tc.hashes, 8))
}
}
40 changes: 21 additions & 19 deletions cdc/sinkv2/eventsink/txn/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (

type txnWithNotifier struct {
*txnEvent
wantMore func()
postTxnExecuted func()
}

type worker struct {
Expand All @@ -49,9 +49,9 @@ type worker struct {
metricTxnWorkerHandledRows prometheus.Counter

// Fields only used in the background loop.
flushInterval time.Duration
hasPending bool
wantMoreCallbacks []func()
flushInterval time.Duration
hasPending bool
postTxnExecutedCallbacks []func()
}

func newWorker(ctx context.Context, ID int, backend backend, workerCount int) *worker {
Expand All @@ -72,17 +72,19 @@ func newWorker(ctx context.Context, ID int, backend backend, workerCount int) *w
metricTxnWorkerBusyRatio: txn.WorkerBusyRatio.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricTxnWorkerHandledRows: txn.WorkerHandledRows.WithLabelValues(changefeedID.Namespace, changefeedID.ID, wid),

flushInterval: backend.MaxFlushInterval(),
hasPending: false,
wantMoreCallbacks: make([]func(), 0, 1024),
flushInterval: backend.MaxFlushInterval(),
hasPending: false,
postTxnExecutedCallbacks: make([]func(), 0, 1024),
}
}

// Add adds a txnEvent to the worker.
// The worker will call unlock() when it's ready to receive more events.
// In other words, it maybe advances the conflict detector.
func (w *worker) Add(txn *txnEvent, unlock func()) {
w.txnCh.In() <- txnWithNotifier{txn, unlock}
// The worker will call postTxnExecuted() after the txn executed.
// The postTxnExecuted will remove the txn related Node in the conflict detector's
// dependency graph and resolve related dependencies for these transacitons
// which depend on this executed txn.
func (w *worker) Add(txn *txnEvent, postTxnExecuted func()) {
w.txnCh.In() <- txnWithNotifier{txn, postTxnExecuted}
}

func (w *worker) close() {
Expand Down Expand Up @@ -153,15 +155,15 @@ func (w *worker) onEvent(txn txnWithNotifier) bool {
// The table where the event comes from is in stopping, so it's safe
// to drop the event directly.
txn.txnEvent.Callback()
// Still necessary to append the wantMore callback into the pending list.
w.wantMoreCallbacks = append(w.wantMoreCallbacks, txn.wantMore)
// Still necessary to append the callbacks into the pending list.
w.postTxnExecutedCallbacks = append(w.postTxnExecutedCallbacks, txn.postTxnExecuted)
return false
}

w.metricConflictDetectDuration.Observe(txn.conflictResolved.Sub(txn.start).Seconds())
w.metricQueueDuration.Observe(time.Since(txn.start).Seconds())
w.metricTxnWorkerHandledRows.Add(float64(len(txn.Event.Rows)))
w.wantMoreCallbacks = append(w.wantMoreCallbacks, txn.wantMore)
w.postTxnExecutedCallbacks = append(w.postTxnExecutedCallbacks, txn.postTxnExecuted)
return w.backend.OnTxnEvent(txn.txnEvent.TxnCallbackableEvent)
}

Expand All @@ -184,13 +186,13 @@ func (w *worker) doFlush(flushTimeSlice *time.Duration) error {
return err
}
// Flush successfully, call callbacks to notify conflict detector.
for _, wantMore := range w.wantMoreCallbacks {
wantMore()
for _, postTxnExecuted := range w.postTxnExecutedCallbacks {
postTxnExecuted()
}
w.wantMoreCallbacks = w.wantMoreCallbacks[:0]
if cap(w.wantMoreCallbacks) > 1024 {
w.postTxnExecutedCallbacks = w.postTxnExecutedCallbacks[:0]
if cap(w.postTxnExecutedCallbacks) > 1024 {
// Resize the buffer if it's too big.
w.wantMoreCallbacks = make([]func(), 0, 1024)
w.postTxnExecutedCallbacks = make([]func(), 0, 1024)
}
}

Expand Down
31 changes: 20 additions & 11 deletions pkg/causality/conflict_detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ type ConflictDetector[Worker worker[Txn], Txn txnEvent] struct {
}

type txnFinishedEvent struct {
node *internal.Node
conflictKeys []uint64
node *internal.Node
sortedKeysHash []uint64
}

// NewConflictDetector creates a new ConflictDetector.
Expand Down Expand Up @@ -73,20 +73,29 @@ func NewConflictDetector[Worker worker[Txn], Txn txnEvent](

// Add pushes a transaction to the ConflictDetector.
//
// NOTE: if multiple threads access this concurrently, Txn.ConflictKeys must be sorted.
// NOTE: if multiple threads access this concurrently,
// Txn.GenSortedDedupKeysHash must be sorted by the slot index.
func (d *ConflictDetector[Worker, Txn]) Add(txn Txn) {
conflictKeys := txn.ConflictKeys(d.numSlots)
sortedKeysHash := txn.GenSortedDedupKeysHash(d.numSlots)
node := internal.NewNode()
node.OnResolved = func(workerID int64) {
unlock := func() {
// This callback is called after the transaction is executed.
postTxnExecuted := func() {
// After this transaction is executed, we can remove the node from the graph,
// and resolve related dependencies for these transacitons which depend on this
// executed transaction.
node.Remove()
d.garbageNodes.In() <- txnFinishedEvent{node, conflictKeys}

// Send this node to garbageNodes to GC it from the slots if this node is still
// occupied related slots.
d.garbageNodes.In() <- txnFinishedEvent{node, sortedKeysHash}
}
d.sendToWorker(txn, unlock, workerID)
// Send this txn to related worker as soon as all dependencies are resolved.
d.sendToWorker(txn, postTxnExecuted, workerID)
}
node.RandWorkerID = func() int64 { return d.nextWorkerID.Add(1) % int64(len(d.workers)) }
node.OnNotified = func(callback func()) { d.notifiedNodes.In() <- callback }
d.slots.Add(node, conflictKeys)
d.slots.Add(node, sortedKeysHash)
}

// Close closes the ConflictDetector.
Expand All @@ -110,18 +119,18 @@ func (d *ConflictDetector[Worker, Txn]) runBackgroundTasks() {
}
case event := <-d.garbageNodes.Out():
if event.node != nil {
d.slots.Free(event.node, event.conflictKeys)
d.slots.Free(event.node, event.sortedKeysHash)
}
}
}
}

// sendToWorker should not call txn.Callback if it returns an error.
func (d *ConflictDetector[Worker, Txn]) sendToWorker(txn Txn, unlock func(), workerID int64) {
func (d *ConflictDetector[Worker, Txn]) sendToWorker(txn Txn, postTxnExecuted func(), workerID int64) {
if workerID < 0 {
panic("must assign with a valid workerID")
}
txn.OnConflictResolved()
worker := d.workers[workerID]
worker.Add(txn, unlock)
worker.Add(txn, postTxnExecuted)
}
Loading

0 comments on commit 39aa61a

Please sign in to comment.