diff --git a/cdc/sink/dmlsink/txn/event.go b/cdc/sink/dmlsink/txn/event.go index e46abbfec60..00805917ef0 100644 --- a/cdc/sink/dmlsink/txn/event.go +++ b/cdc/sink/dmlsink/txn/event.go @@ -41,11 +41,44 @@ func (e *txnEvent) OnConflictResolved() { e.conflictResolved = time.Now() } -// ConflictKeys implements causality.txnEvent interface. -func (e *txnEvent) ConflictKeys(numSlots uint64) []uint64 { - keys := genTxnKeys(e.TxnCallbackableEvent.Event) - sort.Slice(keys, func(i, j int) bool { return keys[i]%numSlots < keys[j]%numSlots }) - return keys +// GenSortedDedupKeysHash implements causality.txnEvent interface. +func (e *txnEvent) GenSortedDedupKeysHash(numSlots uint64) []uint64 { + hashes := genTxnKeys(e.TxnCallbackableEvent.Event) + + // Sort and dedup hashes. + // Sort hashes by `hash % numSlots` to avoid deadlock, and then dedup + // hashes, so the same txn will not check confict with the same hash twice to + // prevent potential cyclic self dependency in the causality dependency + // graph. + return sortAndDedupHashes(hashes, numSlots) +} + +func sortAndDedupHashes(hashes []uint64, numSlots uint64) []uint64 { + if len(hashes) == 0 { + return nil + } + + // Sort hashes by `hash % numSlots` to avoid deadlock. + sort.Slice(hashes, func(i, j int) bool { return hashes[i]%numSlots < hashes[j]%numSlots }) + + // Dedup hashes + last := hashes[0] + j := 1 + for i, hash := range hashes { + if i == 0 { + // skip first one, start checking duplication from 2nd one + continue + } + if hash == last { + continue + } + last = hash + hashes[j] = hash + j++ + } + hashes = hashes[:j] + + return hashes } // genTxnKeys returns hash keys for `txn`. diff --git a/cdc/sink/dmlsink/txn/event_test.go b/cdc/sink/dmlsink/txn/event_test.go index 11126fb5f3a..45999132d59 100644 --- a/cdc/sink/dmlsink/txn/event_test.go +++ b/cdc/sink/dmlsink/txn/event_test.go @@ -203,3 +203,30 @@ func TestGenKeys(t *testing.T) { require.Equal(t, tc.expected, keys) } } + +func TestSortAndDedupHash(t *testing.T) { + // If a transaction contains multiple rows, these rows may generate the same hash + // in some rare cases. We should dedup these hashes to avoid unnecessary self cyclic + // dependency in the causality dependency graph. + t.Parallel() + testCases := []struct { + hashes []uint64 + expected []uint64 + }{{ + // No duplicate hashes + hashes: []uint64{1, 2, 3, 4, 5}, + expected: []uint64{1, 2, 3, 4, 5}, + }, { + // Duplicate hashes + hashes: []uint64{1, 2, 3, 4, 5, 1, 2, 3, 4, 5}, + expected: []uint64{1, 2, 3, 4, 5}, + }, { + // Has hash value larger than slots count, should sort by `hash % numSlots` first. + hashes: []uint64{4, 9, 9, 3}, + expected: []uint64{9, 3, 4}, + }} + + for _, tc := range testCases { + require.Equal(t, tc.expected, sortAndDedupHashes(tc.hashes, 8)) + } +} diff --git a/cdc/sink/dmlsink/txn/worker.go b/cdc/sink/dmlsink/txn/worker.go index 673a53b5adf..6fae294ddf3 100644 --- a/cdc/sink/dmlsink/txn/worker.go +++ b/cdc/sink/dmlsink/txn/worker.go @@ -29,7 +29,7 @@ import ( type txnWithNotifier struct { *txnEvent - wantMore func() + postTxnExecuted func() } type worker struct { @@ -49,9 +49,9 @@ type worker struct { metricTxnWorkerHandledRows prometheus.Counter // Fields only used in the background loop. - flushInterval time.Duration - hasPending bool - wantMoreCallbacks []func() + flushInterval time.Duration + hasPending bool + postTxnExecutedCallbacks []func() } func newWorker(ctx context.Context, changefeedID model.ChangeFeedID, @@ -73,17 +73,19 @@ func newWorker(ctx context.Context, changefeedID model.ChangeFeedID, metricTxnWorkerBusyRatio: txn.WorkerBusyRatio.WithLabelValues(changefeedID.Namespace, changefeedID.ID), metricTxnWorkerHandledRows: txn.WorkerHandledRows.WithLabelValues(changefeedID.Namespace, changefeedID.ID, wid), - flushInterval: backend.MaxFlushInterval(), - hasPending: false, - wantMoreCallbacks: make([]func(), 0, 1024), + flushInterval: backend.MaxFlushInterval(), + hasPending: false, + postTxnExecutedCallbacks: make([]func(), 0, 1024), } } // Add adds a txnEvent to the worker. -// The worker will call unlock() when it's ready to receive more events. -// In other words, it maybe advances the conflict detector. -func (w *worker) Add(txn *txnEvent, unlock func()) { - w.txnCh.In() <- txnWithNotifier{txn, unlock} +// The worker will call postTxnExecuted() after the txn executed. +// The postTxnExecuted will remove the txn related Node in the conflict detector's +// dependency graph and resolve related dependencies for these transacitons +// which depend on this executed txn. +func (w *worker) Add(txn *txnEvent, postTxnExecuted func()) { + w.txnCh.In() <- txnWithNotifier{txn, postTxnExecuted} } func (w *worker) close() { @@ -154,15 +156,15 @@ func (w *worker) onEvent(txn txnWithNotifier) bool { // The table where the event comes from is in stopping, so it's safe // to drop the event directly. txn.txnEvent.Callback() - // Still necessary to append the wantMore callback into the pending list. - w.wantMoreCallbacks = append(w.wantMoreCallbacks, txn.wantMore) + // Still necessary to append the callbacks into the pending list. + w.postTxnExecutedCallbacks = append(w.postTxnExecutedCallbacks, txn.postTxnExecuted) return false } w.metricConflictDetectDuration.Observe(txn.conflictResolved.Sub(txn.start).Seconds()) w.metricQueueDuration.Observe(time.Since(txn.start).Seconds()) w.metricTxnWorkerHandledRows.Add(float64(len(txn.Event.Rows))) - w.wantMoreCallbacks = append(w.wantMoreCallbacks, txn.wantMore) + w.postTxnExecutedCallbacks = append(w.postTxnExecutedCallbacks, txn.postTxnExecuted) return w.backend.OnTxnEvent(txn.txnEvent.TxnCallbackableEvent) } @@ -185,13 +187,13 @@ func (w *worker) doFlush(flushTimeSlice *time.Duration) error { return err } // Flush successfully, call callbacks to notify conflict detector. - for _, wantMore := range w.wantMoreCallbacks { - wantMore() + for _, postTxnExecuted := range w.postTxnExecutedCallbacks { + postTxnExecuted() } - w.wantMoreCallbacks = w.wantMoreCallbacks[:0] - if cap(w.wantMoreCallbacks) > 1024 { + w.postTxnExecutedCallbacks = w.postTxnExecutedCallbacks[:0] + if cap(w.postTxnExecutedCallbacks) > 1024 { // Resize the buffer if it's too big. - w.wantMoreCallbacks = make([]func(), 0, 1024) + w.postTxnExecutedCallbacks = make([]func(), 0, 1024) } } diff --git a/pkg/causality/conflict_detector.go b/pkg/causality/conflict_detector.go index 641dde8b87c..f1f7495e77c 100644 --- a/pkg/causality/conflict_detector.go +++ b/pkg/causality/conflict_detector.go @@ -44,8 +44,8 @@ type ConflictDetector[Worker worker[Txn], Txn txnEvent] struct { } type txnFinishedEvent struct { - node *internal.Node - conflictKeys []uint64 + node *internal.Node + sortedKeysHash []uint64 } // NewConflictDetector creates a new ConflictDetector. @@ -73,20 +73,29 @@ func NewConflictDetector[Worker worker[Txn], Txn txnEvent]( // Add pushes a transaction to the ConflictDetector. // -// NOTE: if multiple threads access this concurrently, Txn.ConflictKeys must be sorted. +// NOTE: if multiple threads access this concurrently, +// Txn.GenSortedDedupKeysHash must be sorted by the slot index. func (d *ConflictDetector[Worker, Txn]) Add(txn Txn) { - conflictKeys := txn.ConflictKeys(d.numSlots) + sortedKeysHash := txn.GenSortedDedupKeysHash(d.numSlots) node := internal.NewNode() node.OnResolved = func(workerID int64) { - unlock := func() { + // This callback is called after the transaction is executed. + postTxnExecuted := func() { + // After this transaction is executed, we can remove the node from the graph, + // and resolve related dependencies for these transacitons which depend on this + // executed transaction. node.Remove() - d.garbageNodes.In() <- txnFinishedEvent{node, conflictKeys} + + // Send this node to garbageNodes to GC it from the slots if this node is still + // occupied related slots. + d.garbageNodes.In() <- txnFinishedEvent{node, sortedKeysHash} } - d.sendToWorker(txn, unlock, workerID) + // Send this txn to related worker as soon as all dependencies are resolved. + d.sendToWorker(txn, postTxnExecuted, workerID) } node.RandWorkerID = func() int64 { return d.nextWorkerID.Add(1) % int64(len(d.workers)) } node.OnNotified = func(callback func()) { d.notifiedNodes.In() <- callback } - d.slots.Add(node, conflictKeys) + d.slots.Add(node, sortedKeysHash) } // Close closes the ConflictDetector. @@ -110,18 +119,18 @@ func (d *ConflictDetector[Worker, Txn]) runBackgroundTasks() { } case event := <-d.garbageNodes.Out(): if event.node != nil { - d.slots.Free(event.node, event.conflictKeys) + d.slots.Free(event.node, event.sortedKeysHash) } } } } // sendToWorker should not call txn.Callback if it returns an error. -func (d *ConflictDetector[Worker, Txn]) sendToWorker(txn Txn, unlock func(), workerID int64) { +func (d *ConflictDetector[Worker, Txn]) sendToWorker(txn Txn, postTxnExecuted func(), workerID int64) { if workerID < 0 { panic("must assign with a valid workerID") } txn.OnConflictResolved() worker := d.workers[workerID] - worker.Add(txn, unlock) + worker.Add(txn, postTxnExecuted) } diff --git a/pkg/causality/internal/node.go b/pkg/causality/internal/node.go index c9c7a3a29b0..cfcd5fc2ff8 100644 --- a/pkg/causality/internal/node.go +++ b/pkg/causality/internal/node.go @@ -48,7 +48,7 @@ type Node struct { // Immutable fields. id int64 - // Set the callback that the node is resolved. + // Called when all dependencies are resolved. OnResolved func(id workerID) // Set the id generator to get a random ID. RandWorkerID func() workerID @@ -56,10 +56,10 @@ type Node struct { OnNotified func(callback func()) // Following fields are used for notifying a node's dependers lock-free. - totalDependees int32 - resolvedDependees int32 - removedDependees int32 - resolvedList []int64 + totalDependencies int32 + resolvedDependencies int32 + removedDependencies int32 + resolvedList []int64 // Following fields are protected by `mu`. mu sync.Mutex @@ -87,9 +87,9 @@ func NewNode() (ret *Node) { ret.id = genNextNodeID() ret.OnResolved = nil ret.RandWorkerID = nil - ret.totalDependees = 0 - ret.resolvedDependees = 0 - ret.removedDependees = 0 + ret.totalDependencies = 0 + ret.resolvedDependencies = 0 + ret.removedDependencies = 0 ret.resolvedList = nil ret.assignedTo = unassigned ret.removed = false @@ -105,36 +105,43 @@ func (n *Node) NodeID() int64 { } // DependOn implements interface internal.SlotNode. -func (n *Node) DependOn(unresolvedDeps map[int64]*Node, resolvedDeps int) { - resolvedDependees, removedDependees := int32(0), int32(0) +func (n *Node) DependOn(dependencyNodes map[int64]*Node, noDependencyKeyCnt int) { + resolvedDependencies, removedDependencies := int32(0), int32(0) depend := func(target *Node) { if target == nil { // For a given Node, every dependency corresponds to a target. // If target is nil it means the dependency doesn't conflict // with any other nodes. However it's still necessary to track - // it because Node.tryResolve needs to know it. - resolvedDependees = stdatomic.AddInt32(&n.resolvedDependees, 1) - stdatomic.StoreInt64(&n.resolvedList[resolvedDependees-1], assignedToAny) - removedDependees = stdatomic.AddInt32(&n.removedDependees, 1) + // it because Node.tryResolve needs to counting the number of + // resolved dependencies. + resolvedDependencies = stdatomic.AddInt32(&n.resolvedDependencies, 1) + stdatomic.StoreInt64(&n.resolvedList[resolvedDependencies-1], assignedToAny) + removedDependencies = stdatomic.AddInt32(&n.removedDependencies, 1) return } if target.id == n.id { panic("you cannot depend on yourself") } - // Lock target and insert `n` into target.dependers. + + // The target node might be removed or modified in other places, for example + // after its corresponding transaction has been executed. target.mu.Lock() defer target.mu.Unlock() if target.assignedTo != unassigned { // The target has already been assigned to a worker. - resolvedDependees = stdatomic.AddInt32(&n.resolvedDependees, 1) - stdatomic.StoreInt64(&n.resolvedList[resolvedDependees-1], target.assignedTo) + // In this case, record the worker ID in `resolvedList`, and this node + // probably can be sent to the same worker and executed sequentially. + resolvedDependencies = stdatomic.AddInt32(&n.resolvedDependencies, 1) + stdatomic.StoreInt64(&n.resolvedList[resolvedDependencies-1], target.assignedTo) } + + // Add the node to the target's dependers if the target has not been removed. if target.removed { // The target has already been removed. - removedDependees = stdatomic.AddInt32(&n.removedDependees, 1) + removedDependencies = stdatomic.AddInt32(&n.removedDependencies, 1) } else if _, exist := target.getOrCreateDependers().ReplaceOrInsert(n); exist { // Should never depend on a target redundantly. panic("should never exist") @@ -143,23 +150,24 @@ func (n *Node) DependOn(unresolvedDeps map[int64]*Node, resolvedDeps int) { // Re-allocate ID in `DependOn` instead of creating the node, because the node can be // pending in slots after it's created. + // ?: why gen new ID here? n.id = genNextNodeID() - // `totalDependees` and `resolvedList` must be initialized before depending on any targets. - n.totalDependees = int32(len(unresolvedDeps) + resolvedDeps) - n.resolvedList = make([]int64, 0, n.totalDependees) - for i := 0; i < int(n.totalDependees); i++ { + // `totalDependcies` and `resolvedList` must be initialized before depending on any targets. + n.totalDependencies = int32(len(dependencyNodes) + noDependencyKeyCnt) + n.resolvedList = make([]int64, 0, n.totalDependencies) + for i := 0; i < int(n.totalDependencies); i++ { n.resolvedList = append(n.resolvedList, unassigned) } - for _, target := range unresolvedDeps { - depend(target) + for _, node := range dependencyNodes { + depend(node) } - for i := 0; i < resolvedDeps; i++ { + for i := 0; i < noDependencyKeyCnt; i++ { depend(nil) } - n.maybeResolve(resolvedDependees, removedDependees) + n.maybeResolve(resolvedDependencies, removedDependencies) } // Remove implements interface internal.SlotNode. @@ -171,8 +179,8 @@ func (n *Node) Remove() { if n.dependers != nil { // `mu` must be holded during accessing dependers. n.dependers.Ascend(func(node *Node) bool { - removedDependees := stdatomic.AddInt32(&node.removedDependees, 1) - node.maybeResolve(0, removedDependees) + removedDependencies := stdatomic.AddInt32(&node.removedDependencies, 1) + node.maybeResolve(0, removedDependencies) return true }) n.dependers.Clear(true) @@ -217,9 +225,9 @@ func (n *Node) assignTo(workerID int64) bool { if n.dependers != nil { // `mu` must be holded during accessing dependers. n.dependers.Ascend(func(node *Node) bool { - resolvedDependees := stdatomic.AddInt32(&node.resolvedDependees, 1) - stdatomic.StoreInt64(&node.resolvedList[resolvedDependees-1], n.assignedTo) - node.maybeResolve(resolvedDependees, 0) + resolvedDependencies := stdatomic.AddInt32(&node.resolvedDependencies, 1) + stdatomic.StoreInt64(&node.resolvedList[resolvedDependencies-1], n.assignedTo) + node.maybeResolve(resolvedDependencies, 0) return true }) } @@ -227,14 +235,16 @@ func (n *Node) assignTo(workerID int64) bool { return true } -func (n *Node) maybeResolve(resolvedDependees, removedDependees int32) { - if workerNum, ok := n.tryResolve(resolvedDependees, removedDependees); ok { +func (n *Node) maybeResolve(resolvedDependencies, removedDependencies int32) { + if workerNum, ok := n.tryResolve(resolvedDependencies, removedDependencies); ok { if workerNum < 0 { panic("Node.tryResolve must return a valid worker ID") } if n.OnNotified != nil { + // Notify the conflict detector background worker to assign the node to the worker asynchronously. n.OnNotified(func() { n.assignTo(workerNum) }) } else { + // Assign the node to the worker directly. n.assignTo(workerNum) } } @@ -244,39 +254,50 @@ func (n *Node) maybeResolve(resolvedDependees, removedDependees int32) { // Returns (_, false) if there is a conflict, // returns (rand, true) if there is no conflict, // returns (N, true) if only worker N can be used. -func (n *Node) tryResolve(resolvedDependees, removedDependees int32) (int64, bool) { - assignedTo, resolved := n.doResolve(resolvedDependees, removedDependees) +func (n *Node) tryResolve(resolvedDependencies, removedDependencies int32) (int64, bool) { + assignedTo, resolved := n.doResolve(resolvedDependencies, removedDependencies) if resolved && assignedTo == assignedToAny { assignedTo = n.RandWorkerID() } return assignedTo, resolved } -func (n *Node) doResolve(resolvedDependees, removedDependees int32) (int64, bool) { - if n.totalDependees == 0 { +func (n *Node) doResolve(resolvedDependencies, removedDependencies int32) (int64, bool) { + if n.totalDependencies == 0 { // No conflicts, can select any workers. return assignedToAny, true } - if resolvedDependees == n.totalDependees { + if resolvedDependencies == n.totalDependencies { firstDep := stdatomic.LoadInt64(&n.resolvedList[0]) hasDiffDep := false - for i := 1; i < int(n.totalDependees); i++ { + for i := 1; i < int(n.totalDependencies); i++ { curr := stdatomic.LoadInt64(&n.resolvedList[i]) + // // Todo: simplify assign to logic, only resolve dependencies nodes after + // // corresponding transactions are executed. + // // + // // In DependOn, depend(nil) set resolvedList[i] to assignedToAny + // // for these no dependecy keys. + // if curr == assignedToAny { + // continue + // } if firstDep != curr { hasDiffDep = true break } } if !hasDiffDep { - // If all dependees are assigned to one same worker, we can assign - // this node to the same worker directly. + // If all dependency nodes are assigned to the same worker, we can assign + // this node to the same worker directly, and they will execute sequentially. + // On the other hand, if dependency nodes are assigned to different workers, + // This node has to wait all dependency txn executed and all depencecy nodes + // are removed. return firstDep, true } } - // All dependees are removed, so assign the node to any worker is fine. - if removedDependees == n.totalDependees { + // All dependcies are removed, so assign the node to any worker is fine. + if removedDependencies == n.totalDependencies { return assignedToAny, true } diff --git a/pkg/causality/internal/slots.go b/pkg/causality/internal/slots.go index cc70a0c414b..d6d131e02c8 100644 --- a/pkg/causality/internal/slots.go +++ b/pkg/causality/internal/slots.go @@ -28,7 +28,7 @@ type SlotNode[T any] interface { // NodeID tells the node's ID. NodeID() int64 // Construct a dependency on `others`. - DependOn(unresolvedDeps map[int64]T, resolvedDeps int) + DependOn(dependencyNodes map[int64]T, noDependencyKeyCnt int) // Remove the node itself and notify all dependers. Remove() // Free the node itself and remove it from the graph. @@ -56,32 +56,43 @@ func NewSlots[E SlotNode[E]](numSlots uint64) *Slots[E] { } // Add adds an elem to the slots and calls DependOn for elem. -func (s *Slots[E]) Add(elem E, keys []uint64) { - unresolvedDeps := make(map[int64]E, len(keys)) - resolvedDeps := 0 +func (s *Slots[E]) Add(elem E, hashes []uint64) { + dependencyNodes := make(map[int64]E, len(hashes)) + noDependecyCnt := 0 var lastSlot uint64 = math.MaxUint64 - for _, key := range keys { - slotIdx := getSlot(key, s.numSlots) + for _, hash := range hashes { + // lock the slot that the node belongs to. + slotIdx := getSlot(hash, s.numSlots) if lastSlot != slotIdx { s.slots[slotIdx].mu.Lock() lastSlot = slotIdx } - if tail, ok := s.slots[slotIdx].nodes[key]; ok { - prevID := tail.NodeID() - unresolvedDeps[prevID] = tail + + // If there is a node occpuied the same hash slot, we may have conflict with it. + // Add the conflict node to the dependencyNodes. + if prevNode, ok := s.slots[slotIdx].nodes[hash]; ok { + prevID := prevNode.NodeID() + // If there are multiple hashes conflicts with the same node, we only need to + // depend on the node once. + dependencyNodes[prevID] = prevNode } else { - resolvedDeps += 1 + noDependecyCnt += 1 } - s.slots[slotIdx].nodes[key] = elem + // Add this node to the slot, make sure new coming nodes with the same hash should + // depend on this node. + s.slots[slotIdx].nodes[hash] = elem } - elem.DependOn(unresolvedDeps, resolvedDeps) + + // Construct the dependency graph based on collected `dependencyNodes` and with corresponding + // slots locked. + elem.DependOn(dependencyNodes, noDependecyCnt) // Lock those slots one by one and then unlock them one by one, so that // we can avoid 2 transactions get executed interleaved. lastSlot = math.MaxUint64 - for _, key := range keys { - slotIdx := getSlot(key, s.numSlots) + for _, hash := range hashes { + slotIdx := getSlot(hash, s.numSlots) if lastSlot != slotIdx { s.slots[slotIdx].mu.Unlock() lastSlot = slotIdx @@ -90,18 +101,21 @@ func (s *Slots[E]) Add(elem E, keys []uint64) { } // Free removes an element from the Slots. -func (s *Slots[E]) Free(elem E, keys []uint64) { - for _, key := range keys { - slotIdx := getSlot(key, s.numSlots) +func (s *Slots[E]) Free(elem E, hashes []uint64) { + for _, hash := range hashes { + slotIdx := getSlot(hash, s.numSlots) s.slots[slotIdx].mu.Lock() - if tail, ok := s.slots[slotIdx].nodes[key]; ok && tail.NodeID() == elem.NodeID() { - delete(s.slots[slotIdx].nodes, key) + // Remove the node from the slot. + // If the node is not in the slot, it means the node has been replaced by new node with the same hash, + // in this case we don't need to remove it from the slot. + if tail, ok := s.slots[slotIdx].nodes[hash]; ok && tail.NodeID() == elem.NodeID() { + delete(s.slots[slotIdx].nodes, hash) } s.slots[slotIdx].mu.Unlock() } elem.Free() } -func getSlot(key, numSlots uint64) uint64 { - return key % numSlots +func getSlot(hash, numSlots uint64) uint64 { + return hash % numSlots } diff --git a/pkg/causality/tests/integration_test.go b/pkg/causality/tests/integration_test.go index 9ed509531b6..35de5533d1b 100644 --- a/pkg/causality/tests/integration_test.go +++ b/pkg/causality/tests/integration_test.go @@ -40,7 +40,7 @@ func TestConflictBasics(t *testing.T) { numWorkers, numSlots, newUniformGenerator(workingSetSize, batchSize, numSlots), ).WithExecFunc( func(txn *txnForTest) error { - for _, key := range txn.ConflictKeys(numSlots) { + for _, key := range txn.GenSortedDedupKeysHash(numSlots) { // Access a position in the array without synchronization, // so that if causality check is buggy, the Go race detection would fail. conflictArray[key]++ diff --git a/pkg/causality/tests/worker.go b/pkg/causality/tests/worker.go index 901cc24f57e..f78f406ba44 100644 --- a/pkg/causality/tests/worker.go +++ b/pkg/causality/tests/worker.go @@ -27,7 +27,7 @@ type txnForTest struct { func (t *txnForTest) OnConflictResolved() {} -func (t *txnForTest) ConflictKeys(numSlots uint64) []uint64 { +func (t *txnForTest) GenSortedDedupKeysHash(numSlots uint64) []uint64 { return t.keys } diff --git a/pkg/causality/worker.go b/pkg/causality/worker.go index fb2234b2659..738ba889183 100644 --- a/pkg/causality/worker.go +++ b/pkg/causality/worker.go @@ -13,19 +13,15 @@ package causality -type ( - conflictKey = uint64 -) - type txnEvent interface { // OnConflictResolved is called when the event leaves ConflictDetector. OnConflictResolved() - // Keys are in range [0, math.MaxUint64) and must be deduped. + // Hashes are in range [0, math.MaxUint64) and must be deduped. // // NOTE: if the conflict detector is accessed by multiple threads concurrently, - // ConflictKeys must also be sorted based on `key % numSlots`. - ConflictKeys(numSlots uint64) []conflictKey + // GenSortedDedupKeysHash must also be sorted based on `key % numSlots`. + GenSortedDedupKeysHash(numSlots uint64) []uint64 } type worker[Txn txnEvent] interface {