Skip to content

Commit

Permalink
fix dedup, fix ww (#20382)
Browse files Browse the repository at this point in the history
Fix dedup. Fix get origin commit ts, use data row offset instead of tombstone physical row offset

Approved by: @XuPeng-SH
  • Loading branch information
jiangxinmeng1 authored Nov 26, 2024
1 parent f051da3 commit 8a79546
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 23 deletions.
49 changes: 49 additions & 0 deletions pkg/vm/engine/tae/db/test/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10187,6 +10187,55 @@ func TestDeleteAndMerge(t *testing.T) {
t.Log(tae.Catalog.SimplePPString(3))
}

func TestDeleteAndMerge2(t *testing.T) {
ctx := context.Background()

opts := config.WithLongScanAndCKPOpts(nil)
tae := testutil.NewTestEngine(ctx, ModuleName, t, opts)
defer tae.Close()
schema := catalog.MockSchemaAll(3, 2)
schema.Extra.BlockMaxRows = 5
schema.Extra.ObjectMaxBlocks = 256
tae.BindSchema(schema)
bat := catalog.MockBatch(schema, 10)
bats := bat.Split(10)
defer bat.Close()
tae.CreateRelAndAppend(bat, true)
tae.CompactBlocks(true)

txn, rel := tae.GetRelation()
var objs []*catalog.ObjectEntry
objIt := rel.MakeObjectIt(false)
for objIt.Next() {
obj := objIt.GetObject().GetMeta().(*catalog.ObjectEntry)
if !obj.IsAppendable() {
objs = append(objs, obj)
}
}
task, err := jobs.NewMergeObjectsTask(nil, txn, objs, tae.Runtime, 0, false)
assert.NoError(t, err)
err = task.OnExec(context.Background())
assert.NoError(t, err)
var appendTxn txnif.AsyncTxn
{
deleteTxn, deleteRel := tae.GetRelation()
v := bat.Vecs[schema.GetSingleSortKeyIdx()].Get(3)
filter := handle.NewEQFilter(v)
err := deleteRel.DeleteByFilter(context.Background(), filter)
assert.NoError(t, err)
err = deleteTxn.Commit(context.Background())
assert.NoError(t, err)
appendTxn, err = tae.StartTxn(nil)
assert.NoError(t, err)
}
assert.NoError(t, txn.Commit(context.Background()))
tae.CompactBlocks(true)
tae.DoAppendWithTxn(bats[3], appendTxn, false)

assert.NoError(t, appendTxn.Commit(ctx))
t.Log(tae.Catalog.SimplePPString(3))
}

func TestTransferInMerge2(t *testing.T) {
ctx := context.Background()

Expand Down
29 changes: 26 additions & 3 deletions pkg/vm/engine/tae/tables/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ import (
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/data"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/txnif"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/index"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/model"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/tables/updates"
)

Expand Down Expand Up @@ -329,10 +328,34 @@ func (obj *baseObject) containsWithLoad(
if isAblk {
dedupFn = containers.MakeForeachVectorOp(
keys.GetType().Oid, containsAlkFunctions, data.Vecs[0], keys, obj.LoadPersistedCommitTS, txn,
func(vrowID any) *model.TransDels {
func(vrowID any, commitTS types.TS) (types.TS, error) {
rowID := vrowID.(types.Rowid)
blkID := rowID.BorrowBlockID()
return obj.rt.TransferDelsMap.GetDelsForBlk(*blkID)
dels := obj.rt.TransferDelsMap.GetDelsForBlk(*blkID)
if dels == nil {
logutil.Info("Dedup-WW",
zap.String("txn", txn.Repr()),
zap.String("data row id", rowID.String()),
zap.String("commit ts %v", commitTS.ToString()),
)
return types.TS{}, txnif.ErrTxnWWConflict
}
row := rowID.GetRowOffset()
ts, ok := dels.Mapping[int(row)]
if !ok {
logutil.Info("Dedup-WW",
zap.String("txn", txn.Repr()),
zap.String("data row id", rowID.String()),
zap.String("commit ts", commitTS.ToString()),
)
return types.TS{}, txnif.ErrTxnWWConflict
}
logutil.Info("Dedup",
zap.String("txn", txn.Repr()),
zap.String("data row id", rowID.String()),
zap.String("commit ts", commitTS.ToString()),
)
return ts, nil
},
)
} else {
Expand Down
25 changes: 5 additions & 20 deletions pkg/vm/engine/tae/tables/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/compute"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/txnif"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/model"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -162,7 +161,7 @@ func parseAGetDuplicateRowIDsArgs(args ...any) (

func parseAContainsArgs(args ...any) (
vec containers.Vector, rowIDs containers.Vector,
scanFn func(uint16) (vec containers.Vector, err error), txn txnif.TxnReader, delsFn func(rowID any) *model.TransDels,
scanFn func(uint16) (vec containers.Vector, err error), txn txnif.TxnReader, delsFn func(rowID any, ts types.TS) (types.TS, error),
) {
vec = args[0].(containers.Vector)
if args[1] != nil {
Expand All @@ -175,7 +174,7 @@ func parseAContainsArgs(args ...any) (
txn = args[3].(txnif.TxnReader)
}
if args[4] != nil {
delsFn = args[4].(func(rowID any) *model.TransDels)
delsFn = args[4].(func(rowID any, ts types.TS) (types.TS, error))
}
return
}
Expand Down Expand Up @@ -407,23 +406,9 @@ func containsABlkFuncFactory[T types.FixedSizeT](comp func(T, T) int) func(args
commitTS := tsVec.Get(row).(types.TS)
startTS := txn.GetStartTS()
if commitTS.GT(&startTS) {
dels := delsFn(v1)
if dels == nil {
logutil.Info("Dedup-WW",
zap.String("txn", txn.Repr()),
zap.Int("row offset", row),
zap.String("commit ts", commitTS.ToString()),
)
return txnif.ErrTxnWWConflict
}
ts, ok := dels.Mapping[row]
if !ok {
logutil.Info("Dedup-WW",
zap.String("txn", txn.Repr()),
zap.Int("row offset", row),
zap.String("commit ts", commitTS.ToString()),
)
return txnif.ErrTxnWWConflict
ts, err := delsFn(v1, commitTS)
if err != nil {
return err
}
if ts.GT(&startTS) {
logutil.Info("Dedup-WW",
Expand Down

0 comments on commit 8a79546

Please sign in to comment.