Skip to content

Commit

Permalink
change flushmemtable lock range
Browse files Browse the repository at this point in the history
  • Loading branch information
KANIOYH committed Sep 24, 2024
1 parent b01bedd commit 5085612
Showing 1 changed file with 89 additions and 91 deletions.
180 changes: 89 additions & 91 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,112 +415,110 @@ func (db *DB) waitMemtableSpace() error {
//
//nolint:funlen
func (db *DB) flushMemtable(table *memtable) {
{
db.flushLock.Lock()
defer db.flushLock.Unlock()

sklIter := table.skl.NewIterator()
var deletedKeys [][]byte
var logRecords []*ValueLogRecord

// iterate all records in memtable, divide them into deleted keys and log records
// for every log record, we generate uuid.
for sklIter.SeekToFirst(); sklIter.Valid(); sklIter.Next() {
key, valueStruct := y.ParseKey(sklIter.Key()), sklIter.Value()
if valueStruct.Meta == LogRecordDeleted {
deletedKeys = append(deletedKeys, key)
} else {
logRecord := ValueLogRecord{key: key, value: valueStruct.Value, uid: uuid.New()}
logRecords = append(logRecords, &logRecord)
}
}
_ = sklIter.Close()
// log.Println("len del:",len(deletedKeys),len(logRecords))
db.flushLock.Lock()
defer db.flushLock.Unlock()

// write to value log, get the positions of keys
keyPos, err := db.vlog.writeBatch(logRecords)
if err != nil {
log.Println("vlog writeBatch failed:", err)
return
}
sklIter := table.skl.NewIterator()
var deletedKeys [][]byte
var logRecords []*ValueLogRecord

// sync the value log
if err = db.vlog.sync(); err != nil {
log.Println("vlog sync failed:", err)
return
// iterate all records in memtable, divide them into deleted keys and log records
// for every log record, we generate uuid.
for sklIter.SeekToFirst(); sklIter.Valid(); sklIter.Next() {
key, valueStruct := y.ParseKey(sklIter.Key()), sklIter.Value()
if valueStruct.Meta == LogRecordDeleted {
deletedKeys = append(deletedKeys, key)
} else {
logRecord := ValueLogRecord{key: key, value: valueStruct.Value, uid: uuid.New()}
logRecords = append(logRecords, &logRecord)
}
}
_ = sklIter.Close()
// log.Println("len del:",len(deletedKeys),len(logRecords))

// Add old key uuid into deprecatedtable, write all keys and positions to index.
var putMatchKeys []diskhash.MatchKeyFunc
if db.options.IndexType == Hash && len(keyPos) > 0 {
putMatchKeys = make([]diskhash.MatchKeyFunc, len(keyPos))
for i := range putMatchKeys {
putMatchKeys[i] = MatchKeyFunc(db, keyPos[i].key, nil, nil)
}
}
// write to value log, get the positions of keys
keyPos, err := db.vlog.writeBatch(logRecords)
if err != nil {
log.Println("vlog writeBatch failed:", err)
return
}

// Write all keys and positions to index.
oldKeyPostions, err := db.index.PutBatch(keyPos, putMatchKeys...)
if err != nil {
log.Println("index PutBatch failed:", err)
return
}
// sync the value log
if err = db.vlog.sync(); err != nil {
log.Println("vlog sync failed:", err)
return
}

// Add old key uuid into deprecatedtable
for _, oldKeyPostion := range oldKeyPostions {
db.vlog.setDeprecated(oldKeyPostion.partition, oldKeyPostion.uid)
// Add old key uuid into deprecatedtable, write all keys and positions to index.
var putMatchKeys []diskhash.MatchKeyFunc
if db.options.IndexType == Hash && len(keyPos) > 0 {
putMatchKeys = make([]diskhash.MatchKeyFunc, len(keyPos))
for i := range putMatchKeys {
putMatchKeys[i] = MatchKeyFunc(db, keyPos[i].key, nil, nil)
}
}

// Add deleted key uuid into deprecatedtable, and delete the deleted keys from index.
var deleteMatchKeys []diskhash.MatchKeyFunc
if db.options.IndexType == Hash && len(deletedKeys) > 0 {
deleteMatchKeys = make([]diskhash.MatchKeyFunc, len(deletedKeys))
for i := range deleteMatchKeys {
deleteMatchKeys[i] = MatchKeyFunc(db, deletedKeys[i], nil, nil)
}
}
// Write all keys and positions to index.
oldKeyPostions, err := db.index.PutBatch(keyPos, putMatchKeys...)
if err != nil {
log.Println("index PutBatch failed:", err)
return
}

// delete the deleted keys from index
if oldKeyPostions, err = db.index.DeleteBatch(deletedKeys, deleteMatchKeys...); err != nil {
log.Println("index DeleteBatch failed:", err)
return
}
// Add old key uuid into deprecatedtable
for _, oldKeyPostion := range oldKeyPostions {
db.vlog.setDeprecated(oldKeyPostion.partition, oldKeyPostion.uid)
}

// uuid into deprecatedtable
for _, oldKeyPostion := range oldKeyPostions {
db.vlog.setDeprecated(oldKeyPostion.partition, oldKeyPostion.uid)
// Add deleted key uuid into deprecatedtable, and delete the deleted keys from index.
var deleteMatchKeys []diskhash.MatchKeyFunc
if db.options.IndexType == Hash && len(deletedKeys) > 0 {
deleteMatchKeys = make([]diskhash.MatchKeyFunc, len(deletedKeys))
for i := range deleteMatchKeys {
deleteMatchKeys[i] = MatchKeyFunc(db, deletedKeys[i], nil, nil)
}
}

// sync the index
if err = db.index.Sync(); err != nil {
log.Println("index sync failed:", err)
return
}
// delete the deleted keys from index
if oldKeyPostions, err = db.index.DeleteBatch(deletedKeys, deleteMatchKeys...); err != nil {
log.Println("index DeleteBatch failed:", err)
return
}

// delete the wal
if err = table.deleteWAl(); err != nil {
log.Println("delete wal failed:", err)
return
}
// uuid into deprecatedtable
for _, oldKeyPostion := range oldKeyPostions {
db.vlog.setDeprecated(oldKeyPostion.partition, oldKeyPostion.uid)
}

// delete old memtable kept in memory
db.mu.Lock()
defer db.mu.Unlock()
if table == db.activeMem {
options := db.activeMem.options
options.tableID++
// open a new memtable for writing
table, err = openMemtable(options)
if err != nil {
panic("flush activate memtable wrong")
}
db.activeMem = table
// sync the index
if err = db.index.Sync(); err != nil {
log.Println("index sync failed:", err)
return
}

// delete the wal
if err = table.deleteWAl(); err != nil {
log.Println("delete wal failed:", err)
return
}

// delete old memtable kept in memory
db.mu.Lock()
defer db.mu.Unlock()
if table == db.activeMem {
options := db.activeMem.options
options.tableID++
// open a new memtable for writing
table, err = openMemtable(options)
if err != nil {
panic("flush activate memtable wrong")
}
db.activeMem = table
} else {
if len(db.immuMems) == 1 {
db.immuMems = db.immuMems[:0]
} else {
if len(db.immuMems) == 1 {
db.immuMems = db.immuMems[:0]
} else {
db.immuMems = db.immuMems[1:]
}
db.immuMems = db.immuMems[1:]
}
}

Expand Down

0 comments on commit 5085612

Please sign in to comment.