-
Notifications
You must be signed in to change notification settings - Fork 28
/
pindex_bleve_scorch_rollback.go
145 lines (121 loc) · 4.27 KB
/
pindex_bleve_scorch_rollback.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
// Copyright 2018-Present Couchbase, Inc.
//
// Use of this software is governed by the Business Source License included
// in the file licenses/BSL-Couchbase.txt. As of the Change Date specified
// in that file, in accordance with the Business Source License, use of this
// software will be governed by the Apache License, Version 2.0, included in
// the file licenses/APL2.txt.
package cbft
import (
"encoding/binary"
"fmt"
"os"
"strconv"
"github.com/blevesearch/bleve/v2/index/scorch"
"github.com/couchbase/cbgt"
log "github.com/couchbase/clog"
)
// Scorch implementation sketch: fetch all available rollback points.
// Determine which point is of relevance to get to at or before the
// wanted rollbackSeq and vBucketUUID. If found, rollback to that
// particular point.
func (t *BleveDest) partialScorchRollbackLOCKED(sh *scorch.Scorch) (
bool, bool, error) {
if t == nil || t.rollbackInfo == nil {
return false, false, fmt.Errorf("pindex_bleve_scorch_rollback: invalid" +
" bleveDest or rollbackInfo")
}
partition := t.rollbackInfo.partition
vBucketUUID := t.rollbackInfo.vBucketUUID
rollbackSeq := t.rollbackInfo.rollbackSeq
seqMaxKey := []byte(partition)
// get vBucketMap/Opaque key
var vBucketMapKey []byte
if t.partitions[partition] != nil {
vBucketMapKey = t.partitions[partition].partitionOpaque
}
totSnapshotsExamined := 0
defer func() {
log.Printf("pindex_bleve_scorch_rollback: path: %s, totSnapshotExamined: %d",
t.path, totSnapshotsExamined)
}()
// close the scorch index as rollback works in offline.
err := t.closeLOCKED(false)
if err != nil {
return false, false, err
}
idxPath := t.path + string(os.PathSeparator) + "store"
rollbackPoints, err := scorch.RollbackPoints(idxPath)
if err != nil {
return true, false, err
}
for _, rollbackPoint := range rollbackPoints {
totSnapshotsExamined++
var tryRevert bool
tryRevert, err = scorchSnapshotAtOrBeforeSeq(t.path, rollbackPoint, seqMaxKey,
vBucketMapKey, rollbackSeq, vBucketUUID)
if err != nil {
return true, false, err
}
if tryRevert {
log.Printf("pindex_bleve_scorch_rollback: trying revert, path: %s", t.path)
if ServerlessMode {
v := rollbackPoint.GetInternal([]byte("TotBytesWritten"))
if v == nil {
// what's the right thing to do here?
return false, err == nil, err
}
if t.bindex != nil {
RollbackRefund(t.bindex.Name(), t.sourceName, binary.LittleEndian.Uint64(v))
}
}
err = scorch.Rollback(idxPath, rollbackPoint)
if err != nil {
log.Printf("pindex_bleve_scorch_rollback: Rollback failed, err: %v", err)
}
return true, err == nil, err
}
}
return true, false, err
}
// scorchSnapshotAtOrBeforeSeq returns true if the snapshot represents a seq
// number at or before the given seq number with a matching vBucket UUID.
func scorchSnapshotAtOrBeforeSeq(path string, rbp *scorch.RollbackPoint,
seqMaxKey, vBucketMapKey []byte,
seqMaxWant, vBucketUUIDWant uint64) (bool, error) {
v := rbp.GetInternal(seqMaxKey)
if v == nil {
return false, nil
}
if len(v) != 8 {
return false, fmt.Errorf("wrong len seqMaxKey: %s, v: %s", seqMaxKey, v)
}
seqMaxCurr := binary.BigEndian.Uint64(v[0:8])
// when no vBucketUUIDWant is given from Rollback
// then fallback to seqMaxCurr checks
if vBucketUUIDWant == 0 {
log.Printf("pindex_bleve_scorch_rollback: examining snapshot, path: %s,"+
" seqMaxKey: %s, seqMaxCurr: %d, seqMaxWant: %d",
path, seqMaxKey, seqMaxCurr, seqMaxWant)
return seqMaxCurr <= seqMaxWant, nil
}
// get the vBucketUUID
vbMap := rbp.GetInternal(vBucketMapKey)
if vbMap == nil {
log.Printf("pindex_bleve_scorch_rollback: No vBucketMap for vBucketMapKey: %s",
vBucketMapKey)
return false, nil
}
vBucketUUIDCurr, err := strconv.ParseUint(cbgt.ParseOpaqueToUUID(vbMap), 10, 64)
if err != nil {
log.Printf("pindex_bleve_scorch_rollback: ParseOpaqueToUUID failed for "+
"vbMap: %s, err: %s", vbMap, err)
return false, err
}
log.Printf("pindex_bleve_scorch_rollback: examining snapshot, path: %s,"+
" seqMaxKey: %s, seqMaxCurr: %d, seqMaxWant: %d"+
" vBucketMapKey: %s, vBucketUUIDCurr: %d, vBucketUUIDWant: %d",
path, seqMaxKey, seqMaxCurr, seqMaxWant, vBucketMapKey,
vBucketUUIDCurr, vBucketUUIDWant)
return (seqMaxCurr <= seqMaxWant && vBucketUUIDCurr == vBucketUUIDWant), nil
}