diff --git a/db/column_family.h b/db/column_family.h index baaa9b90420..66802b4f284 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -392,6 +392,10 @@ class ColumnFamilyData { mem_ = new_mem; } + void AssignMemtableID(ReadOnlyMemTable* new_imm) { + new_imm->SetID(++last_memtable_id_); + } + // calculate the oldest log needed for the durability of this column family uint64_t OldestLogToKeep(); diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index f82d8761aab..8ca76faee4a 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -60,6 +60,7 @@ #include "rocksdb/transaction_log.h" #include "rocksdb/user_write_callback.h" #include "rocksdb/utilities/replayer.h" +#include "rocksdb/utilities/write_batch_with_index.h" #include "rocksdb/write_buffer_manager.h" #include "table/merging_iterator.h" #include "util/autovector.h" @@ -1509,6 +1510,19 @@ class DBImpl : public DB { void EraseThreadStatusDbInfo() const; + // For CFs that has updates in `wbwi`, their memtable will be switched, + // and `wbwi` will be added as the latest immutable memtable. + // + // REQUIRES: this thread is currently at the front of the main writer queue. + // @param prep_log refers to the WAL that contains prepare record + // for the transaction based on wbwi. + // @param assigned_seqno Sequence number for the ingested memtable. + // @param the value of versions_->LastSequence() after the write ingests + // `wbwi` is done. + Status IngestWBWI(std::shared_ptr wbwi, + SequenceNumber assigned_seqno, uint64_t min_prep_log, + SequenceNumber last_seqno); + // If disable_memtable is set the application logic must guarantee that the // batch will still be skipped from memtable during the recovery. An excption // to this is seq_per_batch_ mode, in which since each batch already takes one @@ -1524,6 +1538,16 @@ class DBImpl : public DB { // batch_cnt is expected to be non-zero in seq_per_batch mode and // indicates the number of sub-patches. A sub-patch is a subset of the write // batch that does not have duplicate keys. + // `callback` is called before WAL write. + // See more in comment above WriteCallback::Callback(). + // pre_release_callback is called after WAL write and before memtable write. + // See more in comment above PreReleaseCallback::Callback(). + // post_memtable_callback is called after memtable write but before publishing + // the sequence number to readers. + // + // The main write queue. This is the only write queue that updates + // LastSequence. When using one write queue, the same sequence also indicates + // the last published sequence. Status WriteImpl(const WriteOptions& options, WriteBatch* updates, WriteCallback* callback = nullptr, UserWriteCallback* user_write_cb = nullptr, @@ -1531,7 +1555,9 @@ class DBImpl : public DB { bool disable_memtable = false, uint64_t* seq_used = nullptr, size_t batch_cnt = 0, PreReleaseCallback* pre_release_callback = nullptr, - PostMemTableCallback* post_memtable_callback = nullptr); + PostMemTableCallback* post_memtable_callback = nullptr, + std::shared_ptr wbwi = nullptr, + uint64_t min_prep_log = 0); Status PipelinedWriteImpl(const WriteOptions& options, WriteBatch* updates, WriteCallback* callback = nullptr, @@ -2055,7 +2081,19 @@ class DBImpl : public DB { // Switches the current live memtable to immutable/read-only memtable. // A new WAL is created if the current WAL is not empty. - Status SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context); + // If `new_imm` is not nullptr, it will be added as the newest immutable + // memtable, if and only if OK status is returned. + // `last_seqno` needs to be provided if `new_imm` is not nullptr. It is + // the value of versions_->LastSequence() after the write that ingests new_imm + // is done. + // + // REQUIRES: mutex_ is held + // REQUIRES: this thread is currently at the front of the writer queue + // REQUIRES: this thread is currently at the front of the 2nd writer queue if + // two_write_queues_ is true (This is to simplify the reasoning.) + Status SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context, + ReadOnlyMemTable* new_imm = nullptr, + SequenceNumber last_seqno = 0); // Select and output column families qualified for atomic flush in // `selected_cfds`. If `provided_candidate_cfds` is non-empty, it will be used diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index e902968249f..ec6d5c35091 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -1373,6 +1373,9 @@ Status DBImpl::RecoverLogFiles(const std::vector& wal_numbers, } } } + ROCKS_LOG_INFO(immutable_db_options_.info_log, + "Recovered to log #%" PRIu64 " seq #%" PRIu64, wal_number, + *next_sequence); if (!status.ok() || old_log_record) { if (status.IsNotSupported()) { @@ -1405,10 +1408,6 @@ Status DBImpl::RecoverLogFiles(const std::vector& wal_numbers, if (corrupted_wal_found != nullptr) { *corrupted_wal_found = true; } - ROCKS_LOG_INFO(immutable_db_options_.info_log, - "Point in time recovered to log #%" PRIu64 - " seq #%" PRIu64, - wal_number, *next_sequence); } else { assert(immutable_db_options_.wal_recovery_mode == WALRecoveryMode::kTolerateCorruptedTailRecords || diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 91a2f0f4c6b..ef3b4b4c647 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -12,6 +12,7 @@ #include "db/error_handler.h" #include "db/event_helpers.h" #include "logging/logging.h" +#include "memtable/wbwi_memtable.h" #include "monitoring/perf_context_imp.h" #include "options/options_helper.h" #include "test_util/sync_point.h" @@ -189,16 +190,98 @@ Status DBImpl::WriteWithCallback(const WriteOptions& write_options, return s; } -// The main write queue. This is the only write queue that updates LastSequence. -// When using one write queue, the same sequence also indicates the last -// published sequence. +Status DBImpl::IngestWBWI(std::shared_ptr wbwi, + SequenceNumber assigned_seqno, uint64_t prep_log, + SequenceNumber last_seqno_after_ingest) { + // Keys in new memtable have seqno > last_seqno_after_ingest. + assert(assigned_seqno <= last_seqno_after_ingest); + // Keys in the current memtable have seqno <= LastSequence(). + assert(assigned_seqno > versions_->LastSequence()); + // wbwi is not available for reads yet + autovector memtables; + autovector cfds; + const std::unordered_map& cf_to_entry_count = + wbwi->GetColumnFamilyToEntryCount(); + + InstrumentedMutexLock lock(&mutex_); + ColumnFamilySet* cf_set = versions_->GetColumnFamilySet(); + + // Create WBWIMemTables + for (const auto [cf_id, count] : cf_to_entry_count) { + ColumnFamilyData* cfd = cf_set->GetColumnFamily(cf_id); + if (!cfd) { + for (auto mem : memtables) { + delete mem; + } + return Status::InvalidArgument( + "Invalid column family id from WriteBatchWithIndex: " + + std::to_string(cf_id)); + } + WBWIMemTable* wbwi_memtable = + new WBWIMemTable(wbwi, cfd->user_comparator(), cf_id, cfd->ioptions(), + cfd->GetLatestMutableCFOptions(), count); + wbwi_memtable->Ref(); + wbwi_memtable->SetGlobalSequenceNumber(assigned_seqno); + // This is needed to keep the WAL that contains Prepare alive until + // committed data in this memtable is persisted. + wbwi_memtable->SetMinPrepLog(prep_log); + memtables.push_back(wbwi_memtable); + cfds.push_back(cfd); + } + + // Stop writes to the DB by entering both write threads + WriteThread::Writer nonmem_w; + if (two_write_queues_) { + nonmem_write_thread_.EnterUnbatched(&nonmem_w, &mutex_); + } + WaitForPendingWrites(); + + // Switch memtable and add WBWIMemTables + Status s; + for (size_t i = 0; i < memtables.size(); ++i) { + assert(!immutable_db_options_.atomic_flush); + // NOTE: to support atomic flush, need to call + // SelectColumnFamiliesForAtomicFlush() + WriteContext write_context; + // TODO: not switch on empty memtable, may need to update metadata + // like NextLogNumber(), earliest_seqno and memtable id. + s = SwitchMemtable(cfds[i], &write_context, memtables[i], + last_seqno_after_ingest); + if (!s.ok()) { + for (size_t j = i; j < memtables.size(); j++) { + delete memtables[j]; + return s; + } + } + } + + // Resume writes to the DB + if (two_write_queues_) { + nonmem_write_thread_.ExitUnbatched(&nonmem_w); + } + // Trigger flushes for the new immutable memtables. + assert(s.ok()); + for (const auto cfd : cfds) { + cfd->imm()->FlushRequested(); + FlushRequest flush_req; + // TODO: a new flush reason for ingesting memtable + GenerateFlushRequest({cfd}, FlushReason::kExternalFileIngestion, + &flush_req); + EnqueuePendingFlush(flush_req); + } + MaybeScheduleFlushOrCompaction(); + return s; +} + Status DBImpl::WriteImpl(const WriteOptions& write_options, WriteBatch* my_batch, WriteCallback* callback, UserWriteCallback* user_write_cb, uint64_t* log_used, uint64_t log_ref, bool disable_memtable, uint64_t* seq_used, size_t batch_cnt, PreReleaseCallback* pre_release_callback, - PostMemTableCallback* post_memtable_callback) { + PostMemTableCallback* post_memtable_callback, + std::shared_ptr wbwi, + uint64_t prep_log) { assert(!seq_per_batch_ || batch_cnt != 0); assert(my_batch == nullptr || my_batch->Count() == 0 || write_options.protection_bytes_per_key == 0 || @@ -287,6 +370,23 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, return Status::NotSupported( "DeleteRange is not compatible with row cache."); } + if (wbwi) { + assert(prep_log > 0); + // Used only in WriteCommittedTxn::CommitInternal() with no `callback`. + assert(!callback); + if (immutable_db_options_.unordered_write) { + return Status::NotSupported( + "Ingesting WriteBatch does not support unordered_write"); + } + if (immutable_db_options_.enable_pipelined_write) { + return Status::NotSupported( + "Ingesting WriteBatch does not support pipelined_write"); + } + if (immutable_db_options_.atomic_flush) { + return Status::NotSupported( + "Ingesting WriteBatch does not support atomic_flush"); + } + } // Otherwise IsLatestPersistentState optimization does not make sense assert(!WriteBatchInternal::IsLatestPersistentState(my_batch) || disable_memtable); @@ -344,7 +444,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, PERF_TIMER_GUARD(write_pre_and_post_process_time); WriteThread::Writer w(write_options, my_batch, callback, user_write_cb, log_ref, disable_memtable, batch_cnt, - pre_release_callback, post_memtable_callback); + pre_release_callback, post_memtable_callback, + /*_ingest_wbwi=*/wbwi != nullptr); StopWatch write_sw(immutable_db_options_.clock, stats_, DB_WRITE); write_thread_.JoinBatchGroup(&w); @@ -441,6 +542,9 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, TEST_SYNC_POINT("DBImpl::WriteImpl:BeforeLeaderEnters"); last_batch_group_size_ = write_thread_.EnterAsBatchGroupLeader(&w, &write_group); + if (wbwi) { + assert(write_group.size == 1); + } IOStatus io_s; Status pre_release_cb_status; @@ -494,10 +598,24 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, // Note about seq_per_batch_: either disableWAL is set for the entire write // group or not. In either case we inc seq for each write batch with no // failed callback. This means that there could be a batch with - // disalbe_memtable in between; although we do not write this batch to + // disable_memtable in between; although we do not write this batch to // memtable it still consumes a seq. Otherwise, if !seq_per_batch_, we inc // the seq per valid written key to mem. size_t seq_inc = seq_per_batch_ ? valid_batches : total_count; + if (wbwi) { + // Reserve sequence numbers for recovery. During recovery, + // transactions do not commit by ingesting WBWI. The sequence number + // associated with the commit entry in WAL is used as the starting + // sequence number for inserting into memtable. We need to reserve + // enough sequence numbers here (at least the number of operations + // in write batch) to assign to memtable entries for this transaction. + // This prevents updates in different transactions from using out-of-order + // sequence numbers or the same key+seqno. + // + // WBWI ingestion requires not grouping writes, so we don't need to + // consider incrementing sequence number for WBWI from other writers. + seq_inc += wbwi->GetWriteBatch()->Count(); + } const bool concurrent_update = two_write_queues_; // Update stats while we are an exclusive group leader, so we know @@ -674,6 +792,22 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options, // handle exit, false means somebody else did should_exit_batch_group = write_thread_.CompleteParallelMemTableWriter(&w); } + if (wbwi) { + if (status.ok()) { + assert(versions_->LastSequence() + w.batch->Count() + + wbwi->GetWriteBatch()->Count() == + last_sequence); + // w.batch contains commit time batch updates + SequenceNumber assigned_seqno = + versions_->LastSequence() + w.batch->Count() + 1; + if (two_write_queues_) { + assert(assigned_seqno <= versions_->LastAllocatedSequence()); + } + status = IngestWBWI(wbwi, assigned_seqno, prep_log, last_sequence); + MemTableInsertStatusCheck(status); + } + } + if (should_exit_batch_group) { if (status.ok()) { for (auto* tmp_w : write_group) { @@ -2204,16 +2338,13 @@ void DBImpl::NotifyOnMemTableSealed(ColumnFamilyData* /*cfd*/, mutex_.Lock(); } -// REQUIRES: mutex_ is held -// REQUIRES: this thread is currently at the front of the writer queue -// REQUIRES: this thread is currently at the front of the 2nd writer queue if -// two_write_queues_ is true (This is to simplify the reasoning.) -Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { +Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context, + ReadOnlyMemTable* new_imm, + SequenceNumber last_seqno) { mutex_.AssertHeld(); assert(lock_wal_count_ == 0); // TODO: plumb Env::IOActivity, Env::IOPriority - const ReadOptions read_options; const WriteOptions write_options; log::Writer* new_log = nullptr; @@ -2249,6 +2380,7 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { const MutableCFOptions mutable_cf_options = *cfd->GetLatestMutableCFOptions(); // Set memtable_info for memtable sealed callback + // TODO: memtable_info for `new_imm` MemTableInfo memtable_info; memtable_info.cf_name = cfd->GetName(); memtable_info.first_seqno = cfd->mem()->GetFirstSequenceNumber(); @@ -2276,8 +2408,20 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { } } if (s.ok()) { - SequenceNumber seq = versions_->LastSequence(); - new_mem = cfd->ConstructNewMemtable(mutable_cf_options, seq); + // FIXME: from the comment for GetEarliestSequenceNumber(), any key with + // seqno >= earliest_seqno should be in this or later memtable. This means + // we should use LastSequence() + 1 or last_seqno + 1 here. And it needs to + // be incremented with file ingestion and other operations that consumes + // sequence number. + SequenceNumber seq; + if (new_imm) { + assert(last_seqno > versions_->LastSequence()); + seq = last_seqno; + } else { + seq = versions_->LastSequence(); + } + new_mem = + cfd->ConstructNewMemtable(mutable_cf_options, /*earliest_seq=*/seq); context->superversion_context.NewSuperVersion(); ROCKS_LOG_INFO(immutable_db_options_.info_log, @@ -2359,6 +2503,8 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { versions_->PreComputeMinLogNumberWithUnflushedData(logfile_number_); if (min_wal_number_to_keep > versions_->GetWalSet().GetMinWalNumberToKeep()) { + // TODO: plumb Env::IOActivity, Env::IOPriority + const ReadOptions read_options; // Get a snapshot of the empty column families. // LogAndApply may release and reacquire db // mutex, during that period, column family may become empty (e.g. its @@ -2416,6 +2562,18 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { cfd->mem()->SetNextLogNumber(logfile_number_); assert(new_mem != nullptr); cfd->imm()->Add(cfd->mem(), &context->memtables_to_free_); + if (new_imm) { + // Need to assign memtable id here before SetMemtable() below assigns id to + // the new live memtable + cfd->AssignMemtableID(new_imm); + // NOTE: new_imm and cfd->mem() references the same WAL and has the same + // NextLogNumber(). They should be flushed together. For non-atomic-flush, + // we always try to flush all immutable memtable. For atomic flush, these + // two memtables will be marked eligible for flush in the same call to + // AssignAtomicFlushSeq(). + new_imm->SetNextLogNumber(logfile_number_); + cfd->imm()->Add(new_imm, &context->memtables_to_free_); + } new_mem->Ref(); cfd->SetMemtable(new_mem); InstallSuperVersionAndScheduleWork(cfd, &context->superversion_context, @@ -2428,6 +2586,9 @@ Status DBImpl::SwitchMemtable(ColumnFamilyData* cfd, WriteContext* context) { // that is okay. If we did, it most likely means that s was already an error. // In any case, ignore any unchecked error for i_os here. io_s.PermitUncheckedError(); + // We guarantee that if a non-ok status is returned, `new_imm` was not added + // to the db. + assert(s.ok()); return s; } diff --git a/db/db_test_util.cc b/db/db_test_util.cc index 79532a13e6a..f9734e3a084 100644 --- a/db/db_test_util.cc +++ b/db/db_test_util.cc @@ -1593,42 +1593,74 @@ std::vector DBTestBase::ListTableFiles(Env* env, return file_numbers; } -void DBTestBase::VerifyDBFromMap(std::map true_data, - size_t* total_reads_res, bool tailing_iter, - std::map status) { +void DBTestBase::VerifyDBFromMap( + std::map true_data, size_t* total_reads_res, + bool tailing_iter, ReadOptions* ro, ColumnFamilyHandle* cf, + std::unordered_set* not_found) const { + ReadOptions temp_ro; + if (!ro) { + ro = &temp_ro; + ro->verify_checksums = true; + } + if (!cf) { + cf = db_->DefaultColumnFamily(); + } + + // Get size_t total_reads = 0; + std::string result; + for (auto& [k, v] : true_data) { + ASSERT_OK(db_->Get(*ro, cf, k, &result)) << "key is " << k; + ASSERT_EQ(v, result); + total_reads++; + } + if (not_found) { + for (const auto& k : *not_found) { + ASSERT_TRUE(db_->Get(*ro, cf, k, &result).IsNotFound()) << "key is " << k; + } + } - for (auto& kv : true_data) { - Status s = status[kv.first]; - if (s.ok()) { - ASSERT_EQ(Get(kv.first), kv.second); - } else { - std::string value; - ASSERT_EQ(s, db_->Get(ReadOptions(), kv.first, &value)); + // MultiGet + std::vector key_slice; + for (const auto& [k, _] : true_data) { + key_slice.emplace_back(k); + } + std::vector values; + std::vector cfs(key_slice.size(), cf); + std::vector status = db_->MultiGet(*ro, cfs, key_slice, &values); + total_reads += key_slice.size(); + auto data_iter = true_data.begin(); + for (size_t i = 0; i < key_slice.size(); ++i, ++data_iter) { + ASSERT_OK(status[i]); + ASSERT_EQ(values[i], data_iter->second); + } + // MultiGet - not found + if (not_found) { + key_slice.clear(); + for (const auto& k : *not_found) { + key_slice.emplace_back(k); + } + cfs = std::vector(key_slice.size(), cf); + values.clear(); + status = db_->MultiGet(*ro, cfs, key_slice, &values); + for (const auto& s : status) { + ASSERT_TRUE(s.IsNotFound()); } - total_reads++; } // Normal Iterator { int iter_cnt = 0; - ReadOptions ro; - ro.total_order_seek = true; - Iterator* iter = db_->NewIterator(ro); + ReadOptions ro_ = *ro; + ro_.total_order_seek = true; + Iterator* iter = db_->NewIterator(ro_, cf); // Verify Iterator::Next() iter_cnt = 0; - auto data_iter = true_data.begin(); + data_iter = true_data.begin(); Status s; - for (iter->SeekToFirst(); iter->Valid(); iter->Next(), data_iter++) { + for (iter->SeekToFirst(); iter->Valid(); iter->Next(), ++data_iter) { ASSERT_EQ(iter->key().ToString(), data_iter->first); - Status current_status = status[data_iter->first]; - if (!current_status.ok()) { - s = current_status; - } - ASSERT_EQ(iter->status(), s); - if (current_status.ok()) { ASSERT_EQ(iter->value().ToString(), data_iter->second); - } iter_cnt++; total_reads++; } @@ -1639,20 +1671,13 @@ void DBTestBase::VerifyDBFromMap(std::map true_data, // Verify Iterator::Prev() // Use a new iterator to make sure its status is clean. - iter = db_->NewIterator(ro); + iter = db_->NewIterator(ro_, cf); iter_cnt = 0; s = Status::OK(); auto data_rev = true_data.rbegin(); for (iter->SeekToLast(); iter->Valid(); iter->Prev(), data_rev++) { ASSERT_EQ(iter->key().ToString(), data_rev->first); - Status current_status = status[data_rev->first]; - if (!current_status.ok()) { - s = current_status; - } - ASSERT_EQ(iter->status(), s); - if (current_status.ok()) { ASSERT_EQ(iter->value().ToString(), data_rev->second); - } iter_cnt++; total_reads++; } @@ -1660,12 +1685,20 @@ void DBTestBase::VerifyDBFromMap(std::map true_data, ASSERT_EQ(data_rev, true_data.rend()) << iter_cnt << " / " << true_data.size(); - // Verify Iterator::Seek() - for (const auto& kv : true_data) { - iter->Seek(kv.first); - ASSERT_EQ(kv.first, iter->key().ToString()); - ASSERT_EQ(kv.second, iter->value().ToString()); - total_reads++; + // Verify Iterator::Seek() and SeekForPrev() + for (const auto& [k, v] : true_data) { + for (bool prev : {false, true}) { + if (prev) { + iter->SeekForPrev(k); + } else { + iter->Seek(k); + } + ASSERT_TRUE(iter->Valid()); + ASSERT_OK(iter->status()); + ASSERT_EQ(iter->key(), k); + ASSERT_EQ(iter->value(), v); + ++total_reads; + } } delete iter; } @@ -1673,14 +1706,14 @@ void DBTestBase::VerifyDBFromMap(std::map true_data, if (tailing_iter) { // Tailing iterator int iter_cnt = 0; - ReadOptions ro; - ro.tailing = true; - ro.total_order_seek = true; - Iterator* iter = db_->NewIterator(ro); + ReadOptions ro_ = *ro; + ro_.tailing = true; + ro_.total_order_seek = true; + Iterator* iter = db_->NewIterator(ro_, cf); // Verify ForwardIterator::Next() iter_cnt = 0; - auto data_iter = true_data.begin(); + data_iter = true_data.begin(); for (iter->SeekToFirst(); iter->Valid(); iter->Next(), data_iter++) { ASSERT_EQ(iter->key().ToString(), data_iter->first); ASSERT_EQ(iter->value().ToString(), data_iter->second); diff --git a/db/db_test_util.h b/db/db_test_util.h index a7dc06659ec..424eb698cc7 100644 --- a/db/db_test_util.h +++ b/db/db_test_util.h @@ -1364,7 +1364,8 @@ class DBTestBase : public testing::Test { void VerifyDBFromMap( std::map true_data, size_t* total_reads_res = nullptr, bool tailing_iter = false, - std::map status = std::map()); + ReadOptions* ro = nullptr, ColumnFamilyHandle* cf = nullptr, + std::unordered_set* not_found = nullptr) const; void VerifyDBInternal( std::vector> true_data); diff --git a/db/flush_job.cc b/db/flush_job.cc index 69ce1030223..56d1869256a 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -624,7 +624,9 @@ Status FlushJob::MemPurge() { // Construct fragmented memtable range tombstones without mutex new_mem->ConstructFragmentedRangeTombstones(); db_mutex_->Lock(); - uint64_t new_mem_id = mems_[0]->GetID(); + // Take the newest id, so that memtables in MemtableList don't have + // out-of-order memtable ids. + uint64_t new_mem_id = mems_.back()->GetID(); new_mem->SetID(new_mem_id); new_mem->SetNextLogNumber(mems_[0]->GetNextLogNumber()); diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 05b085f878b..968bfe61fb1 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -32,6 +32,10 @@ class Mutex; class VersionSet; void MemTableListVersion::AddMemTable(ReadOnlyMemTable* m) { + if (!memlist_.empty()) { + // ID can be equal for MemPurge + assert(m->GetID() >= memlist_.front()->GetID()); + } memlist_.push_front(m); *parent_memtable_list_memory_usage_ += m->ApproximateMemoryUsage(); } @@ -410,7 +414,8 @@ void MemTableList::PickMemtablesToFlush(uint64_t max_memtable_id, // ret is filled with memtables already sorted in increasing MemTable ID. // However, when the mempurge feature is activated, new memtables with older // IDs will be added to the memlist. - for (auto it = memlist.rbegin(); it != memlist.rend(); ++it) { + auto it = memlist.rbegin(); + for (; it != memlist.rend(); ++it) { ReadOnlyMemTable* m = *it; if (!atomic_flush && m->atomic_flush_seqno_ != kMaxSequenceNumber) { atomic_flush = true; @@ -439,6 +444,11 @@ void MemTableList::PickMemtablesToFlush(uint64_t max_memtable_id, break; } } + if (!ret->empty() && it != memlist.rend()) { + // Ingested memtable should be flushed together with the memtable before it + // since they map to the same WAL and have the same NextLogNumber(). + assert(strcmp((*it)->Name(), "WBWIMemTable") != 0); + } if (!atomic_flush || num_flush_not_started_ == 0) { flush_requested_ = false; // start-flush request is complete } diff --git a/db/memtable_list_test.cc b/db/memtable_list_test.cc index 812c176d27b..2de350ae8da 100644 --- a/db/memtable_list_test.cc +++ b/db/memtable_list_test.cc @@ -330,6 +330,7 @@ TEST_F(MemTableListTest, GetTest) { // This is to make assert(memtable->IsFragmentedRangeTombstonesConstructed()) // in MemTableListVersion::GetFromList work. mem->ConstructFragmentedRangeTombstones(); + mem->SetID(1); list.Add(mem, &to_delete); SequenceNumber saved_seq = seq; @@ -338,6 +339,7 @@ TEST_F(MemTableListTest, GetTest) { WriteBufferManager wb2(options.db_write_buffer_size); MemTable* mem2 = new MemTable(cmp, ioptions, MutableCFOptions(options), &wb2, kMaxSequenceNumber, 0 /* column_family_id */); + mem2->SetID(2); mem2->Ref(); ASSERT_OK( diff --git a/db/write_thread.cc b/db/write_thread.cc index 87c82627249..bc4cc3c380a 100644 --- a/db/write_thread.cc +++ b/db/write_thread.cc @@ -523,8 +523,10 @@ size_t WriteThread::EnterAsBatchGroupLeader(Writer* leader, // those are something else. They want to be alone (w->callback != nullptr && !w->callback->AllowWriteBatching()) || // dont batch writes that don't want to be batched - (size + WriteBatchInternal::ByteSize(w->batch) > max_size) + (size + WriteBatchInternal::ByteSize(w->batch) > max_size) || // Do not make batch too big + (leader->ingest_wbwi || w->ingest_wbwi) + // ingesting WBWI needs to be its own group ) { // remove from list w->link_older->link_newer = w->link_newer; diff --git a/db/write_thread.h b/db/write_thread.h index 16af946b124..42256970f41 100644 --- a/db/write_thread.h +++ b/db/write_thread.h @@ -148,6 +148,8 @@ class WriteThread { Writer* link_older; // read/write only before linking, or as leader Writer* link_newer; // lazy, read/write only before linking, or as leader + bool ingest_wbwi; + Writer() : batch(nullptr), sync(false), @@ -174,7 +176,8 @@ class WriteThread { WriteCallback* _callback, UserWriteCallback* _user_write_cb, uint64_t _log_ref, bool _disable_memtable, size_t _batch_cnt = 0, PreReleaseCallback* _pre_release_callback = nullptr, - PostMemTableCallback* _post_memtable_callback = nullptr) + PostMemTableCallback* _post_memtable_callback = nullptr, + bool _ingest_wbwi = false) : batch(_batch), // TODO: store a copy of WriteOptions instead of its seperated data // members @@ -196,7 +199,8 @@ class WriteThread { write_group(nullptr), sequence(kMaxSequenceNumber), link_older(nullptr), - link_newer(nullptr) {} + link_newer(nullptr), + ingest_wbwi(_ingest_wbwi) {} ~Writer() { if (made_waitable) { diff --git a/db_stress_tool/db_stress_common.h b/db_stress_tool/db_stress_common.h index 4d59240d1a1..e276c6935e3 100644 --- a/db_stress_tool/db_stress_common.h +++ b/db_stress_tool/db_stress_common.h @@ -419,6 +419,7 @@ DECLARE_bool(inplace_update_support); DECLARE_uint32(uncache_aggressiveness); DECLARE_int32(test_ingest_standalone_range_deletion_one_in); DECLARE_bool(allow_unprepared_value); +DECLARE_uint32(commit_bypass_memtable_one_in); constexpr long KB = 1024; constexpr int kRandomValueMaxFactor = 3; diff --git a/db_stress_tool/db_stress_gflags.cc b/db_stress_tool/db_stress_gflags.cc index 06147a188c5..6c7e22f68ea 100644 --- a/db_stress_tool/db_stress_gflags.cc +++ b/db_stress_tool/db_stress_gflags.cc @@ -1465,4 +1465,7 @@ DEFINE_bool(paranoid_memory_checks, ROCKSDB_NAMESPACE::Options().paranoid_memory_checks, "Sets CF option paranoid_memory_checks."); +DEFINE_uint32(commit_bypass_memtable_one_in, 0, + "If greater than zero, transaction option will set " + "commit_bypass_memtable to per every N transactions on average."); #endif // GFLAGS diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc index 039ef2f055e..2d43a550228 100644 --- a/db_stress_tool/db_stress_test_base.cc +++ b/db_stress_tool/db_stress_test_base.cc @@ -795,7 +795,7 @@ void StressTest::ProcessRecoveredPreparedTxnsHelper(Transaction* txn, } } -Status StressTest::NewTxn(WriteOptions& write_opts, +Status StressTest::NewTxn(WriteOptions& write_opts, ThreadState* thread, std::unique_ptr* out_txn) { if (!FLAGS_use_txn) { return Status::InvalidArgument("NewTxn when FLAGS_use_txn is not set"); @@ -811,6 +811,12 @@ Status StressTest::NewTxn(WriteOptions& write_opts, FLAGS_use_only_the_last_commit_time_batch_for_recovery; txn_options.lock_timeout = 600000; // 10 min txn_options.deadlock_detect = true; + if (FLAGS_commit_bypass_memtable_one_in > 0) { + assert(FLAGS_txn_write_policy == 0); + assert(FLAGS_user_timestamp_size == 0); + txn_options.commit_bypass_memtable = + thread->rand.OneIn(FLAGS_commit_bypass_memtable_one_in); + } out_txn->reset(txn_db_->BeginTransaction(write_opts, txn_options)); auto istr = std::to_string(txn_id.fetch_add(1)); Status s = (*out_txn)->SetName("xid" + istr); @@ -870,7 +876,7 @@ Status StressTest::ExecuteTransaction( WriteOptions& write_opts, ThreadState* thread, std::function&& ops) { std::unique_ptr txn; - Status s = NewTxn(write_opts, &txn); + Status s = NewTxn(write_opts, thread, &txn); std::string try_again_messages; if (s.ok()) { for (int tries = 1;; ++tries) { diff --git a/db_stress_tool/db_stress_test_base.h b/db_stress_tool/db_stress_test_base.h index cf6f174b8dd..1b338e8940a 100644 --- a/db_stress_tool/db_stress_test_base.h +++ b/db_stress_tool/db_stress_test_base.h @@ -138,7 +138,7 @@ class StressTest { SharedState* shared); // ExecuteTransaction is recommended instead - Status NewTxn(WriteOptions& write_opts, + Status NewTxn(WriteOptions& write_opts, ThreadState* thread, std::unique_ptr* out_txn); Status CommitTxn(Transaction& txn, ThreadState* thread = nullptr); diff --git a/db_stress_tool/multi_ops_txns_stress.cc b/db_stress_tool/multi_ops_txns_stress.cc index 574552c52e7..6ec42e4e7fb 100644 --- a/db_stress_tool/multi_ops_txns_stress.cc +++ b/db_stress_tool/multi_ops_txns_stress.cc @@ -544,6 +544,11 @@ Status MultiOpsTxnsStressTest::TestCustomOperations( // Should never reach here. assert(false); } + if (!s.ok()) { + fprintf(stderr, "Transaction failed %s\n", s.ToString().c_str()); + fflush(stderr); + thread->shared->SafeTerminate(); + } return s; } @@ -579,7 +584,7 @@ Status MultiOpsTxnsStressTest::PrimaryKeyUpdateTxn(ThreadState* thread, std::string new_pk = Record::EncodePrimaryKey(new_a); std::unique_ptr txn; WriteOptions wopts; - Status s = NewTxn(wopts, &txn); + Status s = NewTxn(wopts, thread, &txn); if (!s.ok()) { assert(!txn); thread->stats.AddErrors(1); @@ -611,7 +616,12 @@ Status MultiOpsTxnsStressTest::PrimaryKeyUpdateTxn(ThreadState* thread, } auto& key_gen = key_gen_for_a_[thread->tid]; key_gen->UndoAllocation(new_a); - txn->Rollback().PermitUncheckedError(); + s = txn->Rollback(); + if (!s.ok()) { + fprintf(stderr, "Transaction rollback failed %s\n", s.ToString().c_str()); + fflush(stderr); + assert(false); + } }); ReadOptions ropts; @@ -699,7 +709,7 @@ Status MultiOpsTxnsStressTest::SecondaryKeyUpdateTxn(ThreadState* thread, uint32_t new_c) { std::unique_ptr txn; WriteOptions wopts; - Status s = NewTxn(wopts, &txn); + Status s = NewTxn(wopts, thread, &txn); if (!s.ok()) { assert(!txn); thread->stats.AddErrors(1); @@ -735,7 +745,12 @@ Status MultiOpsTxnsStressTest::SecondaryKeyUpdateTxn(ThreadState* thread, } auto& key_gen = key_gen_for_c_[thread->tid]; key_gen->UndoAllocation(new_c); - txn->Rollback().PermitUncheckedError(); + s = txn->Rollback(); + if (!s.ok()) { + fprintf(stderr, "Transaction rollback failed %s\n", s.ToString().c_str()); + fflush(stderr); + assert(false); + } }); // TODO (yanqin) try SetSnapshotOnNextOperation(). We currently need to take @@ -901,7 +916,7 @@ Status MultiOpsTxnsStressTest::UpdatePrimaryIndexValueTxn(ThreadState* thread, std::string pk_str = Record::EncodePrimaryKey(a); std::unique_ptr txn; WriteOptions wopts; - Status s = NewTxn(wopts, &txn); + Status s = NewTxn(wopts, thread, &txn); if (!s.ok()) { assert(!txn); thread->stats.AddErrors(1); @@ -927,7 +942,12 @@ Status MultiOpsTxnsStressTest::UpdatePrimaryIndexValueTxn(ThreadState* thread, } else { thread->stats.AddErrors(1); } - txn->Rollback().PermitUncheckedError(); + s = txn->Rollback(); + if (!s.ok()) { + fprintf(stderr, "Transaction rollback failed %s\n", s.ToString().c_str()); + fflush(stderr); + assert(false); + } }); ReadOptions ropts; ropts.rate_limiter_priority = @@ -982,7 +1002,7 @@ Status MultiOpsTxnsStressTest::PointLookupTxn(ThreadState* thread, std::unique_ptr txn; WriteOptions wopts; - Status s = NewTxn(wopts, &txn); + Status s = NewTxn(wopts, thread, &txn); if (!s.ok()) { assert(!txn); thread->stats.AddErrors(1); @@ -1026,7 +1046,7 @@ Status MultiOpsTxnsStressTest::RangeScanTxn(ThreadState* thread, std::unique_ptr txn; WriteOptions wopts; - Status s = NewTxn(wopts, &txn); + Status s = NewTxn(wopts, thread, &txn); if (!s.ok()) { assert(!txn); thread->stats.AddErrors(1); @@ -1384,6 +1404,12 @@ Status MultiOpsTxnsStressTest::CommitAndCreateTimestampedSnapshotIfNeeded( } else { s = txn.Commit(); } + if (!s.ok()) { + fprintf(stderr, "Txn %s commit failed with %s\n", txn.GetName().c_str(), + s.ToString().c_str()); + fflush(stderr); + } + assert(txn_db_); if (FLAGS_create_timestamped_snapshot_one_in > 0 && thread->rand.OneInOpt(50000)) { diff --git a/db_stress_tool/no_batched_ops_stress.cc b/db_stress_tool/no_batched_ops_stress.cc index cd14bac68a3..3b5341392ec 100644 --- a/db_stress_tool/no_batched_ops_stress.cc +++ b/db_stress_tool/no_batched_ops_stress.cc @@ -666,7 +666,7 @@ class NonBatchedOpsStressTest : public StressTest { if (FLAGS_rate_limit_auto_wal_flush) { wo.rate_limiter_priority = Env::IO_USER; } - Status s = NewTxn(wo, &txn); + Status s = NewTxn(wo, thread, &txn); if (!s.ok()) { fprintf(stderr, "NewTxn error: %s\n", s.ToString().c_str()); shared->SafeTerminate(); @@ -1127,7 +1127,7 @@ class NonBatchedOpsStressTest : public StressTest { write_options.rate_limiter_priority = Env::IO_USER; } - const Status s = NewTxn(write_options, &txn); + const Status s = NewTxn(write_options, thread, &txn); if (!s.ok()) { fprintf(stderr, "NewTxn error: %s\n", s.ToString().c_str()); thread->shared->SafeTerminate(); diff --git a/memtable/wbwi_memtable.h b/memtable/wbwi_memtable.h index 1daf5d77974..6e851d0bb84 100644 --- a/memtable/wbwi_memtable.h +++ b/memtable/wbwi_memtable.h @@ -108,6 +108,8 @@ class WBWIMemTableIterator final : public InternalIterator { return s_; } + bool IsValuePinned() const override { return true; } + private: static const std::unordered_map WriteTypeToValueTypeMap; @@ -146,12 +148,13 @@ class WBWIMemTable final : public ReadOnlyMemTable { WBWIMemTable(const std::shared_ptr& wbwi, const Comparator* cmp, uint32_t cf_id, const ImmutableOptions* immutable_options, - const MutableCFOptions* cf_options) + const MutableCFOptions* cf_options, uint64_t num_entries) : wbwi_(wbwi), comparator_(cmp), ikey_comparator_(comparator_), moptions_(*immutable_options, *cf_options), clock_(immutable_options->clock), + num_entries_(num_entries), cf_id_(cf_id) {} // No copying allowed @@ -240,7 +243,7 @@ class WBWIMemTable final : public ReadOnlyMemTable { // - verify number of entries processed during flush // - stats for estimate num entries and num entries in immutable memtables // - MemPurgeDecider - return 0; + return num_entries_; } uint64_t NumDeletion() const override { @@ -310,6 +313,10 @@ class WBWIMemTable final : public ReadOnlyMemTable { global_seqno_ = global_seqno; } + void SetMinPrepLog(uint64_t min_prep_log) { + min_prep_log_referenced_ = min_prep_log; + } + private: InternalIterator* NewIterator() const { assert(global_seqno_ != kMaxSequenceNumber); @@ -326,6 +333,7 @@ class WBWIMemTable final : public ReadOnlyMemTable { const ImmutableMemTableOptions moptions_; SystemClock* clock_; uint64_t min_prep_log_referenced_{0}; + uint64_t num_entries_; // WBWI can contains updates to multiple CFs. `cf_id_` determines which CF // this memtable is for. uint32_t cf_id_; diff --git a/tools/db_crashtest.py b/tools/db_crashtest.py index cd829cbd61c..2de2aeb28bf 100644 --- a/tools/db_crashtest.py +++ b/tools/db_crashtest.py @@ -669,6 +669,7 @@ def is_direct_io_supported(dbname): "txn_write_policy": 0, # TODO re-enable pipelined write. Not well tested atm "enable_pipelined_write": 0, + "commit_bypass_memtable_one_in": random.choice([0] * 4 + [100]), } multiops_wp_txn_params = { @@ -971,6 +972,18 @@ def finalize_and_sanitize(src_params): dest_params["lock_wal_one_in"] = 0 if dest_params.get("ingest_external_file_one_in") == 0 or dest_params.get("delrangepercent") == 0: dest_params["test_ingest_standalone_range_deletion_one_in"] = 0 + if dest_params.get("commit_bypass_memtable_one_in") > 0: + dest_params["enable_blob_files"] = 0 + dest_params["allow_setting_blob_options_dynamically"] = 0 + dest_params["atomic_flush"] = 0 + dest_params["allow_concurrent_memtable_write"] = 0 + dest_params["use_put_entity_one_in"] = 0 + dest_params["use_get_entity"] = 0 + dest_params["use_multi_get_entity"] = 0 + dest_params["use_merge"] = 0 + dest_params["use_full_merge_v1"] = 0 + dest_params["enable_pipelined_write"] = 0 + return dest_params diff --git a/utilities/transactions/pessimistic_transaction.cc b/utilities/transactions/pessimistic_transaction.cc index e5ed49cda93..c02a0b7b801 100644 --- a/utilities/transactions/pessimistic_transaction.cc +++ b/utilities/transactions/pessimistic_transaction.cc @@ -846,6 +846,7 @@ Status WriteCommittedTxn::CommitInternal() { if (!needs_ts) { s = WriteBatchInternal::MarkCommit(working_batch, name_); } else { + assert(!commit_bypass_memtable_); assert(commit_timestamp_ != kMaxTxnTimestamp); char commit_ts_buf[sizeof(kMaxTxnTimestamp)]; EncodeFixed64(commit_ts_buf, commit_timestamp_); @@ -881,11 +882,14 @@ Status WriteCommittedTxn::CommitInternal() { // any operations appended to this working_batch will be ignored from WAL working_batch->MarkWalTerminationPoint(); - // insert prepared batch into Memtable only skipping WAL. - // Memtable will ignore BeginPrepare/EndPrepare markers - // in non recovery mode and simply insert the values - s = WriteBatchInternal::Append(working_batch, wb); - assert(s.ok()); + const bool bypass_memtable = commit_bypass_memtable_ && wb->Count() > 0; + if (!bypass_memtable) { + // insert prepared batch into Memtable only skipping WAL. + // Memtable will ignore BeginPrepare/EndPrepare markers + // in non recovery mode and simply insert the values + s = WriteBatchInternal::Append(working_batch, wb); + assert(s.ok()); + } uint64_t seq_used = kMaxSequenceNumber; SnapshotCreationCallback snapshot_creation_cb(db_impl_, commit_timestamp_, @@ -899,12 +903,28 @@ Status WriteCommittedTxn::CommitInternal() { post_mem_cb = &snapshot_creation_cb; } } - s = db_impl_->WriteImpl(write_options_, working_batch, /*callback*/ nullptr, - /*user_write_cb=*/nullptr, - /*log_used*/ nullptr, /*log_ref*/ log_number_, - /*disable_memtable*/ false, &seq_used, - /*batch_cnt=*/0, /*pre_release_callback=*/nullptr, - post_mem_cb); + assert(log_number_ > 0); + if (bypass_memtable) { + s = db_impl_->WriteImpl( + write_options_, working_batch, /*callback*/ nullptr, + /*user_write_cb=*/nullptr, + /*log_used*/ nullptr, /*log_ref*/ log_number_, + /*disable_memtable*/ false, &seq_used, + /*batch_cnt=*/0, /*pre_release_callback=*/nullptr, post_mem_cb, + /*wbwi=*/std::make_shared(std::move(write_batch_)), + /*min_prep_log=*/log_number_); + // Reset write_batch_ since it's accessed in transaction clean up and + // might be used for transaction reuse. + write_batch_ = WriteBatchWithIndex(cmp_, 0, true, 0, + write_options_.protection_bytes_per_key); + } else { + s = db_impl_->WriteImpl(write_options_, working_batch, /*callback*/ nullptr, + /*user_write_cb=*/nullptr, + /*log_used*/ nullptr, /*log_ref*/ log_number_, + /*disable_memtable*/ false, &seq_used, + /*batch_cnt=*/0, /*pre_release_callback=*/nullptr, + post_mem_cb); + } assert(!s.ok() || seq_used != kMaxSequenceNumber); if (s.ok()) { SetId(seq_used); diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index a089605d374..317813380d3 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -8100,6 +8100,474 @@ TEST_F(TransactionDBTest, FlushedLogWithPendingPrepareIsSynced) { } } +class CommitBypassMemtableTest : public DBTestBase, + public ::testing::WithParamInterface { + public: + CommitBypassMemtableTest() : DBTestBase("commit_bypass_memtable_test", true) { + SetUpTransactionDB(); + } + + protected: + TransactionDB* txn_db = nullptr; + Options options; + TransactionDBOptions txn_db_opts; + + void SetUpTransactionDB() { + options = CurrentOptions(); + options.create_if_missing = true; + options.allow_2pc = true; + options.two_write_queues = GetParam(); + // Avoid write stall + options.max_write_buffer_number = 8; + // Destroy the DB to recreate as a TransactionDB. + Close(); + Destroy(options, true); + + txn_db_opts.write_policy = TxnDBWritePolicy::WRITE_COMMITTED; + ASSERT_OK(TransactionDB::Open(options, txn_db_opts, dbname_, &txn_db)); + ASSERT_NE(txn_db, nullptr); + db_ = txn_db; + } +}; + +INSTANTIATE_TEST_CASE_P(, CommitBypassMemtableTest, testing::Bool()); + +// TODO: parameterize other tests in the file with commit_bypass_memtable +TEST_P(CommitBypassMemtableTest, SingleCFUpdate) { + // 10000 updates for one CF in a single transaction. + // Tests basic read before and after flush, with and w/o snapshot. + for (bool disable_flush : {false, true}) { + SetUpTransactionDB(); + if (disable_flush) { + ASSERT_OK(db_->PauseBackgroundWork()); + } + + WriteOptions wopts; + std::unordered_set not_found_at_snapshot; + std::map snapshot_map; + std::map expected_map; + + for (int i = 0; i < 10000; i += 3) { + ASSERT_OK(db_->Put(wopts, Key(i), "foo")); + not_found_at_snapshot.insert(Key(i + 1)); + snapshot_map[Key(i)] = "foo"; + } + const Snapshot* snapshot = db_->GetSnapshot(); + + TransactionOptions txn_opts; + txn_opts.commit_bypass_memtable = true; + Transaction* txn1 = txn_db->BeginTransaction(wopts, txn_opts, nullptr); + std::unordered_set expected_not_found; + for (int i = 0; i < 10000; i += 2) { + std::string v = "val" + std::to_string(i); + txn1->Put(Key(i), v); + expected_map[Key(i)] = v; + txn1->Delete(Key(i + 1)); + expected_not_found.insert(Key(i + 1)); + } + ASSERT_OK(txn1->SetName("xid1")); + ASSERT_OK(txn1->Prepare()); + ASSERT_OK(txn1->Commit()); + delete txn1; + + ReadOptions ro_snapshot; + ro_snapshot.snapshot = snapshot; + // Verify at snapshot + VerifyDBFromMap(snapshot_map, nullptr, false, &ro_snapshot, nullptr, + ¬_found_at_snapshot); + // Verify latest state + VerifyDBFromMap(expected_map, nullptr, false, nullptr, nullptr, + &expected_not_found); + + db_->ReleaseSnapshot(snapshot); + if (disable_flush) { + ASSERT_OK(db_->ContinueBackgroundWork()); + } + ASSERT_OK(db_->Flush({})); + VerifyDBFromMap(expected_map, nullptr, false, nullptr, nullptr, + &expected_not_found); + } +} + +TEST_P(CommitBypassMemtableTest, SingleCFUpdateWithOverWrite) { + // Test the case where DB has base data and there are overwrites + // over the data in WBWI for one CF. + // + // live mem has k2 + // ingested wbwi has k2 del k4, del k5, k6 + // older imm mem has k3, k4, k5, k6 + // SST has k1, k2, k4 + for (bool single_del : {false, true}) { + SCOPED_TRACE("use single_del " + std::to_string(single_del)); + for (bool disable_flush : {false, true}) { + SCOPED_TRACE("disable_flush: " + std::to_string(disable_flush)); + SetUpTransactionDB(); + + std::map expected = {{"k1", "sst"}, + {"k2", "sst"}}; + WriteOptions wopts; + ASSERT_OK(txn_db->Put(wopts, "k1", "sst")); + ASSERT_OK(txn_db->Put(wopts, "k2", "sst")); + if (!single_del) { + // single del does not allow overwrite. We write to this key below. + ASSERT_OK(txn_db->Put(wopts, "k4", "sst")); + expected["k4"] = "sst"; + } + ASSERT_OK(txn_db->Flush({})); + std::unordered_set not_found; + VerifyDBFromMap(expected, nullptr, false, nullptr, nullptr, ¬_found); + + if (disable_flush) { + ASSERT_OK(db_->PauseBackgroundWork()); + } + + // immutable mem + TransactionOptions topts; + Transaction* txn = txn_db->BeginTransaction(wopts, topts); + ASSERT_OK(txn->Put("k3", "imm")); + ASSERT_OK(txn->Put("k4", "imm")); + ASSERT_OK(txn->Put("k5", "imm")); + ASSERT_OK(txn->Put("k6", "imm")); + ASSERT_OK(txn->SetName("xid1")); + ASSERT_OK(txn->Prepare()); + ASSERT_OK(txn->Commit()); + auto dbimpl = static_cast_with_check(txn_db->GetRootDB()); + ASSERT_OK(dbimpl->TEST_SwitchMemtable()); + for (const auto k : {"k3", "k4", "k5", "k6"}) { + expected[k] = "imm"; + } + VerifyDBFromMap(expected, nullptr, false, nullptr, nullptr, ¬_found); + + uint64_t num_imm_mems; + ASSERT_TRUE(txn_db->GetIntProperty(DB::Properties::kNumImmutableMemTable, + &num_imm_mems)); + ASSERT_EQ(num_imm_mems, 1); + + // ingest wbwi + const Snapshot* snapshot = txn_db->GetSnapshot(); + TransactionOptions topts_ingest; + topts_ingest.commit_bypass_memtable = true; + // reuse txn1 + txn = txn_db->BeginTransaction(wopts, topts_ingest, txn); + ASSERT_OK(txn->Put("k2", "wbwi")); + if (single_del) { + ASSERT_OK(txn->SingleDelete("k4")); + ASSERT_OK(txn->SingleDelete("k5")); + } else { + ASSERT_OK(txn->Delete("k4")); + ASSERT_OK(txn->Delete("k5")); + } + ASSERT_OK(txn->Put("k6", "wbwi")); + ASSERT_OK(txn->SetName("xid2")); + ASSERT_OK(txn->Prepare()); + VerifyDBFromMap(expected, nullptr, false, nullptr, nullptr, ¬_found); + + ASSERT_OK(txn->Commit()); + // TODO: need to check disable flush, flakuy? + ASSERT_TRUE(txn_db->GetIntProperty(DB::Properties::kNumImmutableMemTable, + &num_imm_mems)); + ASSERT_EQ(num_imm_mems, 3); + + ReadOptions ro_snapshot; + ro_snapshot.snapshot = snapshot; + auto expected_at_snapshot = expected; + auto not_found_at_snapshot = not_found; + VerifyDBFromMap(expected, nullptr, false, &ro_snapshot, nullptr, + ¬_found); + not_found = {"k4", "k5"}; + expected.erase("k4"); + expected.erase("k5"); + expected["k2"] = "wbwi"; + expected["k6"] = "wbwi"; + VerifyDBFromMap(expected, nullptr, false, nullptr, nullptr, ¬_found); + + // live mem + txn = txn_db->BeginTransaction(wopts, topts, txn); + ASSERT_OK(txn->Put("k2", "live_mem")); + ASSERT_OK(txn->SetName("xid3")); + ASSERT_OK(txn->Prepare()); + ASSERT_OK(txn->Commit()); + expected["k2"] = "live_mem"; + VerifyDBFromMap(expected, nullptr, false, nullptr, nullptr, ¬_found); + + if (disable_flush) { + ASSERT_OK(db_->ContinueBackgroundWork()); + ASSERT_OK(db_->Flush({})); + } else { + ASSERT_OK(db_->WaitForCompact({})); + } + ASSERT_TRUE(txn_db->GetIntProperty(DB::Properties::kNumImmutableMemTable, + &num_imm_mems)); + ASSERT_EQ(num_imm_mems, 0); + + VerifyDBFromMap(expected, nullptr, false, nullptr, nullptr, ¬_found); + VerifyDBFromMap(expected_at_snapshot, nullptr, false, &ro_snapshot, + nullptr, ¬_found_at_snapshot); + txn_db->ReleaseSnapshot(snapshot); + } + } +} + +TEST_P(CommitBypassMemtableTest, MultiCFOverwrite) { + // mini-stress test multi CF update + const int kNumKeys = 10000; + for (bool disable_flush : {false, true}) { + SCOPED_TRACE("disable_flush: " + std::to_string(disable_flush)); + SetUpTransactionDB(); + if (disable_flush) { + txn_db->PauseBackgroundWork(); + } + std::vector cfs = {"puppy", "kitty", "meta"}; + CreateColumnFamilies(cfs, options); + ASSERT_EQ(handles_.size(), 3); + ASSERT_EQ(handles_[0]->GetName(), cfs[0]); + ASSERT_EQ(handles_[1]->GetName(), cfs[1]); + ASSERT_EQ(handles_[2]->GetName(), cfs[2]); + + std::map expected_cf0; + std::unordered_set not_found_cf0; + std::map expected_cf1; + std::unordered_set not_found_cf1; + WriteOptions wopts; + Random* rnd = Random::GetTLSInstance(); + // Some base data + for (int i = 0; i < kNumKeys; ++i) { + std::string val_cf0 = "cf0_sst" + std::to_string(i); + if (rnd->OneIn(2)) { + ASSERT_OK(txn_db->Put(wopts, handles_[0], Key(i), val_cf0)); + expected_cf0[Key(i)] = val_cf0; + } else { + not_found_cf0.insert(Key(i)); + } + std::string val_cf1 = "cf1_sst" + std::to_string(i); + ASSERT_OK(txn_db->Put(wopts, handles_[1], Key(i), val_cf1)); + expected_cf1[Key(i)] = val_cf1; + } + SCOPED_TRACE("Verify after SST"); + ASSERT_OK(txn_db->Put(wopts, handles_[2], "id", "0")); + std::map expected_cf2 = {{"id", "0"}}; + VerifyDBFromMap(expected_cf0, nullptr, false, nullptr, handles_[0], + ¬_found_cf0); + VerifyDBFromMap(expected_cf1, nullptr, false, nullptr, handles_[1], + ¬_found_cf1); + + if (!disable_flush) { + ASSERT_OK(txn_db->Flush({}, handles_[0])); + ASSERT_OK(txn_db->Flush({}, handles_[1])); + } + + const Snapshot* snapshot = nullptr; + std::map expected_cf0_snapshot; + std::unordered_set not_found_cf0_snapshot; + std::map expected_cf1_snapshot; + std::unordered_set not_found_cf1_snapshot; + std::map expected_cf2_snapshot; + auto init_snapshot_states = [&]() { + snapshot = txn_db->GetSnapshot(); + expected_cf0_snapshot = expected_cf0; + not_found_cf0_snapshot = not_found_cf0; + expected_cf1_snapshot = expected_cf1; + not_found_cf1_snapshot = not_found_cf1; + expected_cf2_snapshot = expected_cf2; + }; + if (rnd->OneIn(4)) { + init_snapshot_states(); + } + + // Transaction 1 + TransactionOptions topts; + topts.commit_bypass_memtable = true; + Transaction* txn1 = txn_db->BeginTransaction(wopts, topts); + auto fill_txn = [&](Transaction* txn) { + std::vector indices(kNumKeys); + std::iota(indices.begin(), indices.end(), 0); + RandomShuffle(indices.begin(), indices.end()); + for (int i : indices) { + // CF0 update + if (rnd->OneIn(4)) { + std::string val_cf0 = "cf0_wbwi" + std::to_string(i); + ASSERT_OK(txn->Put(handles_[0], Key(i), val_cf0)); + expected_cf0[Key(i)] = val_cf0; + not_found_cf0.erase(Key(i)); + } else if (rnd->OneIn(4)) { + ASSERT_OK(txn->Delete(handles_[0], Key(i))); + expected_cf0.erase(Key(i)); + not_found_cf0.insert(Key(i)); + } + + // CF1 update + if (rnd->OneIn(4)) { + ASSERT_OK(txn->SingleDelete(handles_[1], Key(i))); + expected_cf1.erase(Key(i)); + not_found_cf1.insert(Key(i)); + } + if (rnd->OneIn(4) && + not_found_cf1.find(Key(i)) != not_found_cf1.end()) { + // Can not overwrite when using SD. + std::string val_cf1 = "cf1_wbwi" + std::to_string(i); + ASSERT_OK(txn->Put(handles_[1], Key(i), val_cf1)); + expected_cf1[Key(i)] = val_cf1; + not_found_cf1.erase(Key(i)); + } + } + }; + fill_txn(txn1); + ASSERT_OK(txn1->SetName("xid1")); + ASSERT_OK(txn1->Prepare()); + if (rnd->OneIn(2)) { + ASSERT_OK(txn1->GetCommitTimeWriteBatch()->Put(handles_[2], "id", "1")); + expected_cf2["id"] = "1"; + } + ASSERT_OK(txn1->Commit()); + delete txn1; + + uint64_t num_imm_mems = 0; + if (disable_flush) { + ASSERT_TRUE(txn_db->GetIntProperty( + handles_[0], DB::Properties::kNumImmutableMemTable, &num_imm_mems)); + ASSERT_EQ(num_imm_mems, 2); + + ASSERT_TRUE(txn_db->GetIntProperty( + handles_[1], DB::Properties::kNumImmutableMemTable, &num_imm_mems)); + ASSERT_EQ(num_imm_mems, 2); + ASSERT_TRUE(txn_db->GetIntProperty( + handles_[2], DB::Properties::kNumImmutableMemTable, &num_imm_mems)); + ASSERT_EQ(num_imm_mems, 0); + } + SCOPED_TRACE("Verify cf0 after txn1"); + VerifyDBFromMap(expected_cf0, nullptr, false, nullptr, handles_[0], + ¬_found_cf0); + SCOPED_TRACE("Verify cf1 after txn1"); + VerifyDBFromMap(expected_cf1, nullptr, false, nullptr, handles_[1], + ¬_found_cf1); + VerifyDBFromMap(expected_cf2, nullptr, false, nullptr, handles_[2]); + auto verify_at_snapshot = [&](const std::string& m) { + if (snapshot) { + SCOPED_TRACE("Verify at snapshot " + m); + ReadOptions ro; + ro.snapshot = snapshot; + SCOPED_TRACE("Verify cf0 after txn1"); + VerifyDBFromMap(expected_cf0_snapshot, nullptr, false, &ro, handles_[0], + ¬_found_cf0_snapshot); + SCOPED_TRACE("Verify cf1 after txn1"); + VerifyDBFromMap(expected_cf1_snapshot, nullptr, false, &ro, handles_[1], + ¬_found_cf1_snapshot); + VerifyDBFromMap(expected_cf2_snapshot, nullptr, false, &ro, + handles_[2]); + if (rnd->OneIn(2)) { + txn_db->ReleaseSnapshot(snapshot); + snapshot = nullptr; + } + } + }; + verify_at_snapshot("txn1"); + if (!snapshot && rnd->OneIn(4)) { + init_snapshot_states(); + } + + // Live memtable or another transaction with commit_bypass_memtable + TransactionOptions topts2; + topts2.commit_bypass_memtable = rnd->OneIn(2); + SCOPED_TRACE("txn2 commit_bypass_memtable = " + + std::to_string(topts2.commit_bypass_memtable)); + Transaction* txn2 = txn_db->BeginTransaction(wopts, topts2); + fill_txn(txn2); + txn2->SetName("xid2"); + ASSERT_OK(txn2->Prepare()); + if (rnd->OneIn(2)) { + ASSERT_OK(txn2->GetCommitTimeWriteBatch()->Put(handles_[2], "id", "2")); + expected_cf2["id"] = "2"; + } + ASSERT_OK(txn2->Commit()); + if (disable_flush) { + ASSERT_TRUE(txn_db->GetIntProperty( + handles_[0], DB::Properties::kNumImmutableMemTable, &num_imm_mems)); + ASSERT_EQ(num_imm_mems, topts2.commit_bypass_memtable ? 4 : 2); + + ASSERT_TRUE(txn_db->GetIntProperty( + handles_[1], DB::Properties::kNumImmutableMemTable, &num_imm_mems)); + ASSERT_EQ(num_imm_mems, topts2.commit_bypass_memtable ? 4 : 2); + ASSERT_TRUE(txn_db->GetIntProperty( + handles_[2], DB::Properties::kNumImmutableMemTable, &num_imm_mems)); + ASSERT_EQ(num_imm_mems, 0); + } + + SCOPED_TRACE("Verify cf0 after txn2"); + VerifyDBFromMap(expected_cf0, nullptr, false, nullptr, handles_[0], + ¬_found_cf0); + SCOPED_TRACE("Verify cf1 after txn2"); + VerifyDBFromMap(expected_cf1, nullptr, false, nullptr, handles_[1], + ¬_found_cf1); + // cf2 should not have keys in not_found_cf1 either + VerifyDBFromMap(expected_cf2, nullptr, false, nullptr, handles_[2], + ¬_found_cf1); + verify_at_snapshot("txn2"); + if (!snapshot && rnd->OneIn(4)) { + init_snapshot_states(); + } + + // Verify all data is flushed + if (disable_flush) { + ASSERT_OK(db_->ContinueBackgroundWork()); + ASSERT_OK(db_->Flush({}, handles_[0])); + ASSERT_OK(db_->Flush({}, handles_[1])); + } else { + ASSERT_OK(db_->WaitForCompact({})); + } + + VerifyDBFromMap(expected_cf0, nullptr, false, nullptr, handles_[0], + ¬_found_cf0); + VerifyDBFromMap(expected_cf1, nullptr, false, nullptr, handles_[1], + ¬_found_cf1); + VerifyDBFromMap(expected_cf2, nullptr, false, nullptr, handles_[2], + ¬_found_cf1); + verify_at_snapshot("flush"); + if (snapshot) { + db_->ReleaseSnapshot(snapshot); + } + } +} + +TEST_P(CommitBypassMemtableTest, Recovery) { + // Test that ingested txn can be recovered. + // Implementation detail: this tests that write path reserves enough sequence + // number for the ingested memtables (see seq_inc in WriteImpl()). For + // example, suppose that we only assign one sequence number per ingested + // memtable. Then txn1 will be assigned seqno 1 and txn2 will be assigned 2. + // During recovery, we will not use memtable ingestion and will fall back + // to memtable insertion. The sequence number for the first key in a + // transaction is the commit sequence number. So for txn1, we will insert into + // memtable k2@1 and k1@2. And for txn2, we will insert k1@2 and k2@3. This + // results in duplicated key@seqno. + WriteOptions wopts; + TransactionOptions txn_opts; + txn_opts.commit_bypass_memtable = true; + Transaction* txn1 = txn_db->BeginTransaction(wopts, txn_opts, nullptr); + txn1->SetName("xid1"); + txn1->Put("k2", "v2"); + txn1->Put("k1", "v1"); + txn1->Prepare(); + txn1->Commit(); + + // Test txn reuse code path + txn1 = txn_db->BeginTransaction(wopts, txn_opts, txn1); + txn1->SetName("xid2"); + txn1->Put("k1", "v3"); + txn1->Put("k2", "v4"); + txn1->Prepare(); + txn1->Commit(); + delete txn1; + + std::map expected = {{"k1", "v3"}, {"k2", "v4"}}; + VerifyDBFromMap(expected); + + ASSERT_OK(db_->Close()); + ASSERT_OK(TransactionDB::Open(options, txn_db_opts, dbname_, &txn_db)); + db_ = txn_db; + + VerifyDBFromMap(expected); +} + } // namespace ROCKSDB_NAMESPACE int main(int argc, char** argv) { diff --git a/utilities/write_batch_with_index/write_batch_with_index_test.cc b/utilities/write_batch_with_index/write_batch_with_index_test.cc index 113f84d9fe2..c866ef5b537 100644 --- a/utilities/write_batch_with_index/write_batch_with_index_test.cc +++ b/utilities/write_batch_with_index/write_batch_with_index_test.cc @@ -3417,6 +3417,31 @@ TEST_P(WriteBatchWithIndexTest, EntityReadSanityChecks) { } } +TEST_P(WriteBatchWithIndexTest, TrackCFAndEntryCountClear) { + Status s; + std::string value; + batch_->SetTrackCFAndEntryCount(true); + ASSERT_OK(batch_->Put("A", "val")); + ASSERT_OK(batch_->Delete("B")); + + ColumnFamilyHandleImplDummy cf1(/*id=*/1, BytewiseComparator()); + ASSERT_OK(batch_->Put(&cf1, "bar", "foo")); + + auto cf_id_to_count = batch_->GetColumnFamilyToEntryCount(); + ASSERT_EQ(2, cf_id_to_count.size()); + for (const auto [cf_id, count] : cf_id_to_count) { + if (cf_id == 0) { + ASSERT_EQ(2, count); + } else { + ASSERT_EQ(cf_id, 1); + ASSERT_EQ(1, count); + } + } + + batch_->Clear(); + ASSERT_TRUE(batch_->GetColumnFamilyToEntryCount().empty()); +} + INSTANTIATE_TEST_CASE_P(WBWI, WriteBatchWithIndexTest, testing::Bool()); std::string Get(const std::string& k, std::unique_ptr& wbwi_mem, @@ -3455,6 +3480,7 @@ TEST_F(WBWIMemTableTest, ReadFromWBWIMemtable) { Random rnd(301); auto wbwi = std::make_shared(cmp, 0, true, 0, 0); + wbwi->SetTrackCFAndEntryCount(true); std::vector> expected; expected.resize(10000); for (int i = 0; i < 10000; ++i) { @@ -3468,7 +3494,8 @@ TEST_F(WBWIMemTableTest, ReadFromWBWIMemtable) { RandomShuffle(expected.begin(), expected.end()); std::unique_ptr wbwi_mem{ new WBWIMemTable(wbwi, cmp, - /*cf_id=*/0, &immutable_opts, &mutable_cf_options)}; + /*cf_id=*/0, &immutable_opts, &mutable_cf_options, + /*num_entries=*/10000)}; ASSERT_TRUE(wbwi_mem->IsEmpty()); constexpr SequenceNumber visible_seq = 3; constexpr SequenceNumber non_visible_seq = 1; @@ -3540,7 +3567,6 @@ TEST_F(WBWIMemTableTest, ReadFromWBWIMemtable) { ASSERT_TRUE(val == Get(key, wbwi_mem, visible_seq, &found_final_value)); ASSERT_TRUE(found_final_value); } - // MultiGet int batch_size = 30; for (int i = 0; i < 10000; i += batch_size) { @@ -3668,7 +3694,8 @@ TEST_F(WBWIMemTableTest, ReadFromWBWIMemtable) { // Read from another CF std::unique_ptr meta_wbwi_mem{new WBWIMemTable( - wbwi, cmp, /*cf_id=*/1, &immutable_opts, &mutable_cf_options)}; + wbwi, cmp, /*cf_id=*/1, &immutable_opts, &mutable_cf_options, + /*num_entries=*/1)}; meta_wbwi_mem->SetGlobalSequenceNumber(assigned_seq); found_final_value = false; ASSERT_TRUE("foo" == Get(DBTestBase::Key(0), meta_wbwi_mem, visible_seq,