Skip to content

Commit

Permalink
write path for ingesting transction wbwi
Browse files Browse the repository at this point in the history
  • Loading branch information
cbi42 committed Nov 18, 2024
1 parent 17b1c70 commit 40ae05a
Show file tree
Hide file tree
Showing 22 changed files with 925 additions and 97 deletions.
4 changes: 4 additions & 0 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,10 @@ class ColumnFamilyData {
mem_ = new_mem;
}

void AssignMemtableID(ReadOnlyMemTable* new_imm) {
new_imm->SetID(++last_memtable_id_);
}

// calculate the oldest log needed for the durability of this column family
uint64_t OldestLogToKeep();

Expand Down
42 changes: 40 additions & 2 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
#include "rocksdb/transaction_log.h"
#include "rocksdb/user_write_callback.h"
#include "rocksdb/utilities/replayer.h"
#include "rocksdb/utilities/write_batch_with_index.h"
#include "rocksdb/write_buffer_manager.h"
#include "table/merging_iterator.h"
#include "util/autovector.h"
Expand Down Expand Up @@ -1509,6 +1510,19 @@ class DBImpl : public DB {

void EraseThreadStatusDbInfo() const;

// For CFs that has updates in `wbwi`, their memtable will be switched,
// and `wbwi` will be added as the latest immutable memtable.
//
// REQUIRES: this thread is currently at the front of the main writer queue.
// @param prep_log refers to the WAL that contains prepare record
// for the transaction based on wbwi.
// @param assigned_seqno Sequence number for the ingested memtable.
// @param the value of versions_->LastSequence() after the write ingests
// `wbwi` is done.
Status IngestWBWI(std::shared_ptr<WriteBatchWithIndex> wbwi,
SequenceNumber assigned_seqno, uint64_t min_prep_log,
SequenceNumber last_seqno);

// If disable_memtable is set the application logic must guarantee that the
// batch will still be skipped from memtable during the recovery. An excption
// to this is seq_per_batch_ mode, in which since each batch already takes one
Expand All @@ -1524,14 +1538,26 @@ class DBImpl : public DB {
// batch_cnt is expected to be non-zero in seq_per_batch mode and
// indicates the number of sub-patches. A sub-patch is a subset of the write
// batch that does not have duplicate keys.
// `callback` is called before WAL write.
// See more in comment above WriteCallback::Callback().
// pre_release_callback is called after WAL write and before memtable write.
// See more in comment above PreReleaseCallback::Callback().
// post_memtable_callback is called after memtable write but before publishing
// the sequence number to readers.
//
// The main write queue. This is the only write queue that updates
// LastSequence. When using one write queue, the same sequence also indicates
// the last published sequence.
Status WriteImpl(const WriteOptions& options, WriteBatch* updates,
WriteCallback* callback = nullptr,
UserWriteCallback* user_write_cb = nullptr,
uint64_t* log_used = nullptr, uint64_t log_ref = 0,
bool disable_memtable = false, uint64_t* seq_used = nullptr,
size_t batch_cnt = 0,
PreReleaseCallback* pre_release_callback = nullptr,
PostMemTableCallback* post_memtable_callback = nullptr);
PostMemTableCallback* post_memtable_callback = nullptr,
std::shared_ptr<WriteBatchWithIndex> wbwi = nullptr,
uint64_t min_prep_log = 0);

Status PipelinedWriteImpl(const WriteOptions& options, WriteBatch* updates,
WriteCallback* callback = nullptr,
Expand Down Expand Up @@ -2055,7 +2081,19 @@ class DBImpl : public DB {

// Switches the current live memtable to immutable/read-only memtable.
// A new WAL is created if the current WAL is not empty.
Status SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context);
// If `new_imm` is not nullptr, it will be added as the newest immutable
// memtable, if and only if OK status is returned.
// `last_seqno` needs to be provided if `new_imm` is not nullptr. It is
// the value of versions_->LastSequence() after the write that ingests new_imm
// is done.
//
// REQUIRES: mutex_ is held
// REQUIRES: this thread is currently at the front of the writer queue
// REQUIRES: this thread is currently at the front of the 2nd writer queue if
// two_write_queues_ is true (This is to simplify the reasoning.)
Status SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context,
ReadOnlyMemTable* new_imm = nullptr,
SequenceNumber last_seqno = 0);

// Select and output column families qualified for atomic flush in
// `selected_cfds`. If `provided_candidate_cfds` is non-empty, it will be used
Expand Down
7 changes: 3 additions & 4 deletions db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1373,6 +1373,9 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
}
}
}
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Recovered to log #%" PRIu64 " seq #%" PRIu64, wal_number,
*next_sequence);

if (!status.ok() || old_log_record) {
if (status.IsNotSupported()) {
Expand Down Expand Up @@ -1405,10 +1408,6 @@ Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
if (corrupted_wal_found != nullptr) {
*corrupted_wal_found = true;
}
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"Point in time recovered to log #%" PRIu64
" seq #%" PRIu64,
wal_number, *next_sequence);
} else {
assert(immutable_db_options_.wal_recovery_mode ==
WALRecoveryMode::kTolerateCorruptedTailRecords ||
Expand Down
Loading

0 comments on commit 40ae05a

Please sign in to comment.