Skip to content

Commit

Permalink
crosscluster/logical: commit-in-batch single row in kv writer
Browse files Browse the repository at this point in the history
Release note: none.
Epic: none.
  • Loading branch information
dt committed Aug 16, 2024
1 parent ab496aa commit 40943c6
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -931,7 +931,7 @@ func (t *txnBatch) HandleBatch(

stats := batchStats{}
var err error
if len(batch) == 1 && !useKVWriter {
if len(batch) == 1 {
s, err := t.rp.ProcessRow(ctx, nil /* txn */, batch[0].KeyValue, batch[0].PrevValue)
if err != nil {
return stats, err
Expand Down
78 changes: 56 additions & 22 deletions pkg/ccl/crosscluster/logical/lww_kv_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,74 +89,108 @@ func newKVRowProcessor(
var originID1Options = &kvpb.WriteOptions{OriginID: 1}

func (p *kvRowProcessor) ProcessRow(
ctx context.Context, txn isql.Txn, kv roachpb.KeyValue, prevValue roachpb.Value,
ctx context.Context, txn isql.Txn, keyValue roachpb.KeyValue, prevValue roachpb.Value,
) (batchStats, error) {
if err := p.injectFailure(); err != nil {
return batchStats{}, err
}

var err error
kv.Key, err = keys.StripTenantPrefix(kv.Key)
keyValue.Key, err = keys.StripTenantPrefix(keyValue.Key)
if err != nil {
return batchStats{}, errors.Wrap(err, "stripping tenant prefix")
}

row, err := p.decoder.DecodeKV(ctx, kv, cdcevent.CurrentRow, kv.Value.Timestamp, false)
row, err := p.decoder.DecodeKV(ctx, keyValue, cdcevent.CurrentRow, keyValue.Value.Timestamp, false)
if err != nil {
p.lastRow = cdcevent.Row{}
return batchStats{}, errors.Wrap(err, "decoding KeyValue")
}
p.lastRow = row

var stats batchStats
if err := p.processParsedRow(ctx, txn, row, keyValue, prevValue); err != nil {
return batchStats{}, err
}
return batchStats{}, nil

}

func (p *kvRowProcessor) processParsedRow(
ctx context.Context, txn isql.Txn, row cdcevent.Row, k roachpb.KeyValue, prevValue roachpb.Value,
) error {
dstTableID, ok := p.dstBySrc[row.TableID]
if !ok {
return batchStats{}, errors.AssertionFailedf("replication configuration missing for table %d / %q", row.TableID, row.TableName)
return errors.AssertionFailedf("replication configuration missing for table %d / %q", row.TableID, row.TableName)
}

kvTxn := txn.KV()
if txn == nil {
if err := p.cfg.DB.KV().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
b := txn.NewBatch()
b.Header.WriteOptions = originID1Options

w, err := p.getWriter(ctx, dstTableID, kvTxn.ProvisionalCommitTimestamp())
if err != nil {
return stats, err
}
if err := p.addToBatch(ctx, txn, b, dstTableID, row, k, prevValue); err != nil {
return err
}
return txn.CommitInBatch(ctx, b)
}); err != nil {
return err
}

// This batch should only commit if it can do so prior to the expiration of
// the lease of the descriptor used to encode it.
if err := kvTxn.UpdateDeadline(ctx, w.leased.Expiration(ctx)); err != nil {
return stats, err
return nil
}

kvTxn := txn.KV()
b := kvTxn.NewBatch()

b.Header.WriteOptions = originID1Options

if err := p.addToBatch(ctx, kvTxn, b, dstTableID, row, k, prevValue); err != nil {
return err
}
return txn.KV().Run(ctx, b)
}

func (p *kvRowProcessor) addToBatch(
ctx context.Context,
txn *kv.Txn,
b *kv.Batch,
dstTableID descpb.ID,
row cdcevent.Row,
keyValue roachpb.KeyValue,
prevValue roachpb.Value,
) error {
w, err := p.getWriter(ctx, dstTableID, txn.ProvisionalCommitTimestamp())
if err != nil {
return err
}
// This batch should only commit if it can do so prior to the expiration of
// the lease of the descriptor used to encode it.
if err := txn.UpdateDeadline(ctx, w.leased.Expiration(ctx)); err != nil {
return err
}
if prevValue.IsPresent() {
prevRow, err := p.decoder.DecodeKV(ctx, roachpb.KeyValue{
Key: kv.Key,
Key: keyValue.Key,
Value: prevValue,
}, cdcevent.PrevRow, prevValue.Timestamp, false)
if err != nil {
return batchStats{}, err
return err
}

if row.IsDeleted() {
if err := w.deleteRow(ctx, b, prevRow, row); err != nil {
return stats, err
return err
}
} else {
if err := w.updateRow(ctx, b, prevRow, row); err != nil {
return stats, err
return err
}
}
} else {
if err := w.insertRow(ctx, b, row); err != nil {
return stats, err
return err
}
}

return stats, txn.KV().Run(ctx, b)
return nil
}

// GetLastRow implements the RowProcessor interface.
Expand Down

0 comments on commit 40943c6

Please sign in to comment.