diff --git a/pkg/ccl/crosscluster/logical/logical_replication_writer_processor.go b/pkg/ccl/crosscluster/logical/logical_replication_writer_processor.go index 3d8bba21365d..9c1e5edb0229 100644 --- a/pkg/ccl/crosscluster/logical/logical_replication_writer_processor.go +++ b/pkg/ccl/crosscluster/logical/logical_replication_writer_processor.go @@ -931,7 +931,7 @@ func (t *txnBatch) HandleBatch( stats := batchStats{} var err error - if len(batch) == 1 && !useKVWriter { + if len(batch) == 1 { s, err := t.rp.ProcessRow(ctx, nil /* txn */, batch[0].KeyValue, batch[0].PrevValue) if err != nil { return stats, err diff --git a/pkg/ccl/crosscluster/logical/lww_kv_processor.go b/pkg/ccl/crosscluster/logical/lww_kv_processor.go index ec2b2ee6dc39..39675354d3de 100644 --- a/pkg/ccl/crosscluster/logical/lww_kv_processor.go +++ b/pkg/ccl/crosscluster/logical/lww_kv_processor.go @@ -89,74 +89,108 @@ func newKVRowProcessor( var originID1Options = &kvpb.WriteOptions{OriginID: 1} func (p *kvRowProcessor) ProcessRow( - ctx context.Context, txn isql.Txn, kv roachpb.KeyValue, prevValue roachpb.Value, + ctx context.Context, txn isql.Txn, keyValue roachpb.KeyValue, prevValue roachpb.Value, ) (batchStats, error) { if err := p.injectFailure(); err != nil { return batchStats{}, err } var err error - kv.Key, err = keys.StripTenantPrefix(kv.Key) + keyValue.Key, err = keys.StripTenantPrefix(keyValue.Key) if err != nil { return batchStats{}, errors.Wrap(err, "stripping tenant prefix") } - row, err := p.decoder.DecodeKV(ctx, kv, cdcevent.CurrentRow, kv.Value.Timestamp, false) + row, err := p.decoder.DecodeKV(ctx, keyValue, cdcevent.CurrentRow, keyValue.Value.Timestamp, false) if err != nil { p.lastRow = cdcevent.Row{} return batchStats{}, errors.Wrap(err, "decoding KeyValue") } p.lastRow = row - var stats batchStats + if err := p.processParsedRow(ctx, txn, row, keyValue, prevValue); err != nil { + return batchStats{}, err + } + return batchStats{}, nil + +} +func (p *kvRowProcessor) processParsedRow( + ctx context.Context, txn isql.Txn, row cdcevent.Row, k roachpb.KeyValue, prevValue roachpb.Value, +) error { dstTableID, ok := p.dstBySrc[row.TableID] if !ok { - return batchStats{}, errors.AssertionFailedf("replication configuration missing for table %d / %q", row.TableID, row.TableName) + return errors.AssertionFailedf("replication configuration missing for table %d / %q", row.TableID, row.TableName) } - kvTxn := txn.KV() + if txn == nil { + if err := p.cfg.DB.KV().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + b := txn.NewBatch() + b.Header.WriteOptions = originID1Options - w, err := p.getWriter(ctx, dstTableID, kvTxn.ProvisionalCommitTimestamp()) - if err != nil { - return stats, err - } + if err := p.addToBatch(ctx, txn, b, dstTableID, row, k, prevValue); err != nil { + return err + } + return txn.CommitInBatch(ctx, b) + }); err != nil { + return err + } - // This batch should only commit if it can do so prior to the expiration of - // the lease of the descriptor used to encode it. - if err := kvTxn.UpdateDeadline(ctx, w.leased.Expiration(ctx)); err != nil { - return stats, err + return nil } + kvTxn := txn.KV() b := kvTxn.NewBatch() - b.Header.WriteOptions = originID1Options + if err := p.addToBatch(ctx, kvTxn, b, dstTableID, row, k, prevValue); err != nil { + return err + } + return txn.KV().Run(ctx, b) +} + +func (p *kvRowProcessor) addToBatch( + ctx context.Context, + txn *kv.Txn, + b *kv.Batch, + dstTableID descpb.ID, + row cdcevent.Row, + keyValue roachpb.KeyValue, + prevValue roachpb.Value, +) error { + w, err := p.getWriter(ctx, dstTableID, txn.ProvisionalCommitTimestamp()) + if err != nil { + return err + } + // This batch should only commit if it can do so prior to the expiration of + // the lease of the descriptor used to encode it. + if err := txn.UpdateDeadline(ctx, w.leased.Expiration(ctx)); err != nil { + return err + } if prevValue.IsPresent() { prevRow, err := p.decoder.DecodeKV(ctx, roachpb.KeyValue{ - Key: kv.Key, + Key: keyValue.Key, Value: prevValue, }, cdcevent.PrevRow, prevValue.Timestamp, false) if err != nil { - return batchStats{}, err + return err } if row.IsDeleted() { if err := w.deleteRow(ctx, b, prevRow, row); err != nil { - return stats, err + return err } } else { if err := w.updateRow(ctx, b, prevRow, row); err != nil { - return stats, err + return err } } } else { if err := w.insertRow(ctx, b, row); err != nil { - return stats, err + return err } } - - return stats, txn.KV().Run(ctx, b) + return nil } // GetLastRow implements the RowProcessor interface.