Skip to content

Commit

Permalink
fix(core): fix duplicate mutation entries for count index (#9208) (#9209
Browse files Browse the repository at this point in the history
)

Due to duplicate entries in the count index, sometimes we had wrong
count being reported. This wrong count was causing the transcation is
too old issue. This diff fixes the duplicate entries fixing the issue.
  • Loading branch information
harshil-goel authored Nov 5, 2024
1 parent 2d1ccb6 commit 27450c1
Show file tree
Hide file tree
Showing 10 changed files with 637 additions and 404 deletions.
12 changes: 11 additions & 1 deletion dgraph/cmd/debug/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,10 @@ func lookup(db *badger.DB) {
if err != nil {
log.Fatal(err)
}
fmt.Fprintf(&buf, " Length: %d", pl.Length(math.MaxUint64, 0))
pl.RLock()
c, _, _ := pl.GetLength(math.MaxUint64)
pl.RUnlock()
fmt.Fprintf(&buf, " Length: %d", c)

splits := pl.PartSplits()
isMultiPart := len(splits) > 0
Expand Down Expand Up @@ -611,6 +614,13 @@ func printKeys(db *badger.DB) {
}

var sz, deltaCount int64
pl, err := posting.GetNew(key, db, opt.readTs)
if err == nil {
pl.RLock()
c, _, _ := pl.GetLength(math.MaxUint64)
fmt.Fprintf(&buf, " countValue: [%d]", c)
pl.RUnlock()
}
LOOP:
for ; itr.ValidForPrefix(prefix); itr.Next() {
item := itr.Item()
Expand Down
75 changes: 35 additions & 40 deletions posting/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,18 @@ type countParams struct {
reverse bool
}

// When we want to update count edges, we should set them with OVR instead of SET as SET will mess with count
func shouldAddCountEdge(found bool, edge *pb.DirectedEdge) bool {
if found {
if edge.Op != pb.DirectedEdge_DEL {
edge.Op = pb.DirectedEdge_OVR
}
return true
} else {
return edge.Op != pb.DirectedEdge_DEL
}
}

func (txn *Txn) addReverseMutationHelper(ctx context.Context, plist *List,
hasCountIndex bool, edge *pb.DirectedEdge) (countParams, error) {
countBefore, countAfter := 0, 0
Expand All @@ -242,12 +254,14 @@ func (txn *Txn) addReverseMutationHelper(ctx context.Context, plist *List,
defer plist.Unlock()
if hasCountIndex {
countBefore, found, _ = plist.getPostingAndLengthNoSort(txn.StartTs, 0, edge.ValueId)
if countBefore == -1 {
if countBefore < 0 {
return emptyCountParams, errors.Wrapf(ErrTsTooOld, "Adding reverse mutation helper count")
}
}
if err := plist.addMutationInternal(ctx, txn, edge); err != nil {
return emptyCountParams, err
if !(hasCountIndex && !shouldAddCountEdge(found, edge)) {
if err := plist.addMutationInternal(ctx, txn, edge); err != nil {
return emptyCountParams, err
}
}
if hasCountIndex {
countAfter = countAfterMutation(countBefore, found, edge.Op)
Expand Down Expand Up @@ -311,7 +325,7 @@ func (txn *Txn) addReverseAndCountMutation(ctx context.Context, t *pb.DirectedEd
// entries for this key in the index are removed.
pred, ok := schema.State().Get(ctx, t.Attr)
isSingleUidUpdate := ok && !pred.GetList() && pred.GetValueType() == pb.Posting_UID &&
t.Op == pb.DirectedEdge_SET && t.ValueId != 0
t.Op != pb.DirectedEdge_DEL && t.ValueId != 0
if isSingleUidUpdate {
dataKey := x.DataKey(t.Attr, t.Entity)
dataList, err := getFn(dataKey)
Expand Down Expand Up @@ -458,7 +472,7 @@ func (txn *Txn) updateCount(ctx context.Context, params countParams) error {
}

func countAfterMutation(countBefore int, found bool, op pb.DirectedEdge_Op) int {
if !found && op == pb.DirectedEdge_SET {
if !found && op != pb.DirectedEdge_DEL {
return countBefore + 1
} else if found && op == pb.DirectedEdge_DEL {
return countBefore - 1
Expand Down Expand Up @@ -531,8 +545,10 @@ func (txn *Txn) addMutationHelper(ctx context.Context, l *List, doUpdateIndex bo
}
}

if err = l.addMutationInternal(ctx, txn, t); err != nil {
return val, found, emptyCountParams, err
if !(hasCountIndex && !shouldAddCountEdge(found && currPost.Op != Del, t)) {
if err = l.addMutationInternal(ctx, txn, t); err != nil {
return val, found, emptyCountParams, err
}
}

if found && doUpdateIndex {
Expand Down Expand Up @@ -596,7 +612,7 @@ func (l *List) AddMutationWithIndex(ctx context.Context, edge *pb.DirectedEdge,
return err
}
}
if edge.Op == pb.DirectedEdge_SET {
if edge.Op != pb.DirectedEdge_DEL {
val = types.Val{
Tid: types.TypeID(edge.ValueType),
Value: edge.Value,
Expand Down Expand Up @@ -895,15 +911,13 @@ func (r *rebuilder) Run(ctx context.Context) error {
// We set it to 1 in case there are no keys found and NewStreamAt is called with ts=0.
var counter uint64 = 1

var txn *Txn

tmpWriter := tmpDB.NewManagedWriteBatch()
stream := pstore.NewStreamAt(r.startTs)
stream.LogPrefix = fmt.Sprintf("Rebuilding index for predicate %s (1/2):", r.attr)
stream.Prefix = r.prefix
//TODO We need to create a single transaction irrespective of the type of the predicate
if pred.ValueType == pb.Posting_VFLOAT {
txn = NewTxn(r.startTs)
x.AssertTrue(false)
}
stream.KeyToList = func(key []byte, itr *badger.Iterator) (*bpb.KVList, error) {
// We should return quickly if the context is no longer valid.
Expand All @@ -923,44 +937,25 @@ func (r *rebuilder) Run(ctx context.Context) error {
return nil, errors.Wrapf(err, "error reading posting list from disk")
}

// We are using different transactions in each call to KeyToList function. This could
// be a problem for computing reverse count indexes if deltas for same key are added
// in different transactions. Such a case doesn't occur for now.
// TODO: Maybe we can always use txn initialized in rebuilder.Run().
streamTxn := txn
if streamTxn == nil {
streamTxn = NewTxn(r.startTs)
}
edges, err := r.fn(pk.Uid, l, streamTxn)
kvs, err := l.Rollup(nil, r.startTs)
if err != nil {
return nil, err
}

if txn != nil {
kvs := make([]*bpb.KV, 0, len(edges))
for _, edge := range edges {
version := atomic.AddUint64(&counter, 1)
key := x.DataKey(edge.Attr, edge.Entity)
pl, err := txn.GetFromDelta(key)
if err != nil {
return &bpb.KVList{}, nil
}
data := pl.getMutation(r.startTs)
kv := bpb.KV{
Key: x.DataKey(edge.Attr, edge.Entity),
Value: data,
UserMeta: []byte{BitDeltaPosting},
Version: version,
}
kvs = append(kvs, &kv)
}
return &bpb.KVList{Kv: kvs}, nil
for _, kv := range kvs {
version := atomic.AddUint64(&counter, 1)
kv.Version = version
}

streamTxn := NewTxn(r.startTs)
_, err = r.fn(pk.Uid, l, streamTxn)
if err != nil {
return nil, err
}

// Convert data into deltas.
streamTxn.Update()
// txn.cache.Lock() is not required because we are the only one making changes to txn.
kvs := make([]*bpb.KV, 0, len(streamTxn.cache.deltas))
for key, data := range streamTxn.cache.deltas {
version := atomic.AddUint64(&counter, 1)
kv := bpb.KV{
Expand Down
31 changes: 24 additions & 7 deletions posting/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,12 @@ var (
)

const (
// Set means overwrite in mutation layer. It contributes 0 in Length.
// Set means set in mutation layer. It contributes 1 in Length.
Set uint32 = 0x01
// Del means delete in mutation layer. It contributes -1 in Length.
Del uint32 = 0x02
// Ovr means overwrite in mutation layer. It contributes 0 in Length.
Ovr uint32 = 0x03

// BitSchemaPosting signals that the value stores a schema or type.
BitSchemaPosting byte = 0x01
Expand Down Expand Up @@ -305,6 +307,8 @@ func NewPosting(t *pb.DirectedEdge) *pb.Posting {
switch t.Op {
case pb.DirectedEdge_SET:
op = Set
case pb.DirectedEdge_OVR:
op = Set
case pb.DirectedEdge_DEL:
op = Del
default:
Expand Down Expand Up @@ -340,7 +344,7 @@ func hasDeleteAll(mpost *pb.Posting) bool {
// Ensure that you either abort the uncommitted postings or commit them before calling me.
func (l *List) updateMutationLayer(mpost *pb.Posting, singleUidUpdate bool) error {
l.AssertLock()
x.AssertTrue(mpost.Op == Set || mpost.Op == Del)
x.AssertTrue(mpost.Op == Set || mpost.Op == Del || mpost.Op == Ovr)

// If we have a delete all, then we replace the map entry with just one.
if hasDeleteAll(mpost) {
Expand Down Expand Up @@ -529,7 +533,7 @@ func (l *List) addMutationInternal(ctx context.Context, txn *Txn, t *pb.Directed
}
pred, ok := schema.State().Get(ctx, t.Attr)
isSingleUidUpdate := ok && !pred.GetList() && pred.GetValueType() == pb.Posting_UID &&
pk.IsData() && mpost.Op == Set && mpost.PostingType == pb.Posting_REF
pk.IsData() && mpost.Op != Del && mpost.PostingType == pb.Posting_REF

if err != l.updateMutationLayer(mpost, isSingleUidUpdate) {
return errors.Wrapf(err, "cannot update mutation layer of key %s with value %+v",
Expand All @@ -555,6 +559,10 @@ func (l *List) getPosting(startTs uint64) *pb.PostingList {
return nil
}

func (l *List) GetPosting(startTs uint64) *pb.PostingList {
return l.getPosting(startTs)
}

// getMutation returns a marshaled version of posting list mutation stored internally.
func (l *List) getMutation(startTs uint64) []byte {
l.RLock()
Expand Down Expand Up @@ -817,31 +825,40 @@ func (l *List) IsEmpty(readTs, afterUid uint64) (bool, error) {
return count == 0, nil
}

func (l *List) GetLength(readTs uint64) (int, bool, *pb.Posting) {
return l.getPostingAndLengthNoSort(readTs, 0, 0)
}

func (l *List) getPostingAndLengthNoSort(readTs, afterUid, uid uint64) (int, bool, *pb.Posting) {
l.AssertRLock()

dec := codec.Decoder{Pack: l.plist.Pack}
uids := dec.Seek(uid, codec.SeekStart)
length := codec.ExactLen(l.plist.Pack)
found := len(uids) > 0 && uids[0] == uid
found_ts := uint64(0)

for _, plist := range l.mutationMap {
for _, mpost := range plist.Postings {
ts := mpost.CommitTs
if mpost.StartTs == readTs {
ts = mpost.StartTs
}
if (mpost.CommitTs > 0 && mpost.CommitTs <= readTs) || (mpost.StartTs == readTs) {
if hasDeleteAll(mpost) {
found = false
length = 0
continue
}
if mpost.Uid == uid {
found = (mpost.Op == Set)
if mpost.Uid == uid && found_ts < ts {
found = (mpost.Op != Del)
found_ts = ts
}
if mpost.Op == Set {
length += 1
} else {
} else if mpost.Op == Del {
length -= 1
}

}
}
}
Expand Down
4 changes: 4 additions & 0 deletions posting/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,6 +555,10 @@ func PostingListCacheEnabled() bool {
return lCache != nil
}

func GetNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) {
return getNew(key, pstore, readTs)
}

func getNew(key []byte, pstore *badger.DB, readTs uint64) (*List, error) {
if PostingListCacheEnabled() {
l, ok := lCache.Get(key)
Expand Down
1 change: 1 addition & 0 deletions protos/pb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ message DirectedEdge {
enum Op {
SET = 0;
DEL = 1;
OVR = 2;
}
Op op = 8;
repeated api.Facet facets = 9;
Expand Down
Loading

0 comments on commit 27450c1

Please sign in to comment.