Skip to content

Commit

Permalink
Introducing new DeltaType, which is a map from key to objects as a vi…
Browse files Browse the repository at this point in the history
…ew for the serialized log; bugfixing currentDeltaToBytes() which should clear the delta.

TODO: smoke test and regression test.
  • Loading branch information
songweijia committed May 2, 2024
1 parent 52e98f9 commit 1e31d58
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 28 deletions.
27 changes: 27 additions & 0 deletions include/cascade/detail/delta_store_core.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,33 @@ class DeltaCascadeStoreCore : public mutils::ByteRepresentable,
#endif

public:
/**
* @class DeltaType
* @brief the delta type that are stored in a delta.
* 1) The first sizeof(std::vector<KT>::size_type) bytes is the number of VT objects in the
* delta, followed by specified number of
* 2) Serialized VT objects.
*/
class DeltaType : public mutils::ByteRepresentable {
public:
/* The objects */
std::unordered_map<KT,VT> objects;
/** @fn DeltaType
* @brief Constructor
*/
DeltaType();
virtual std::size_t to_bytes(uint8_t*) const override;
virtual void post_object(const std::function<void(uint8_t const* const, std::size_t)>&) const override;
virtual std::size_t bytes_size() const override;
virtual void ensure_registered(mutils::DeserializationManager&);
static std::unique_ptr<DeltaType> from_bytes(mutils::DeserializationManager*,const uint8_t* const);
static mutils::context_ptr<DeltaType> from_bytes_noalloc(
mutils::DeserializationManager*,
const uint8_t* const);
static mutils::context_ptr<const DeltaType> from_bytes_noalloc_const(
mutils::DeserializationManager*,
const uint8_t* const);
};
/** The delta is a list of keys for the objects that are changed by put or remove. */
std::vector<KT> delta;
/** The KV map */
Expand Down
85 changes: 79 additions & 6 deletions include/cascade/detail/delta_store_core_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,86 @@
namespace derecho {
namespace cascade {

template <typename KT, typename VT, KT* IK, VT* IV>
DeltaCascadeStoreCore<KT, VT, IK, IV>::DeltaType::DeltaType() {}

template <typename KT, typename VT, KT* IK, VT* IV>
std::size_t DeltaCascadeStoreCore<KT, VT, IK, IV>::DeltaType::to_bytes(uint8_t*) const {
dbg_default_warn("{} should not be called. It is not designed for serialization.",__PRETTY_FUNCTION__);
return 0;
}

template <typename KT, typename VT, KT* IK, VT* IV>
void DeltaCascadeStoreCore<KT, VT, IK, IV>::DeltaType::post_object(
const std::function<void(uint8_t const* const buf, std::size_t)>&) const {
dbg_default_warn("{} should not be called. It is not designed for serialization.",__PRETTY_FUNCTION__);
}

template <typename KT, typename VT, KT* IK, VT* IV>
std::size_t DeltaCascadeStoreCore<KT, VT, IK, IV>::DeltaType::bytes_size() const {
dbg_default_warn("{} should not be called. It is not designed for serialization.",__PRETTY_FUNCTION__);
return 0;
}

template <typename KT, typename VT, KT* IK, VT* IV>
void DeltaCascadeStoreCore<KT, VT, IK, IV>::DeltaType::ensure_registered(mutils::DeserializationManager&) {}

template <typename KT, typename VT, KT* IK, VT* IV>
std::unique_ptr<typename DeltaCascadeStoreCore<KT, VT, IK, IV>::DeltaType>
DeltaCascadeStoreCore<KT, VT, IK, IV>::DeltaType::from_bytes(
mutils::DeserializationManager* dsm,const uint8_t* const v) {
size_t pos = 0;
std::size_t num_objects = *mutils::from_bytes_noalloc<std::size_t>(dsm,v + pos);
pos += mutils::bytes_size(num_objects);
auto pdelta = std::make_unique<DeltaCascadeStoreCore<KT,VT,IK,IV>::DeltaType>();
while (num_objects--) {
auto po = mutils::from_bytes<VT>(dsm,v + pos);
pos += mutils::bytes_size(*po);
KT key = po->get_key_ref();
pdelta->objects.emplace(std::move(key),std::move(*po));
}
return pdelta;
}

template <typename KT, typename VT, KT* IK, VT* IV>
mutils::context_ptr<typename DeltaCascadeStoreCore<KT, VT, IK, IV>::DeltaType>
DeltaCascadeStoreCore<KT, VT, IK, IV>::DeltaType::from_bytes_noalloc(
mutils::DeserializationManager* dsm,const uint8_t* const v) {
size_t pos = 0;
std::size_t num_objects = *mutils::from_bytes_noalloc<std::size_t>(dsm,v + pos);
pos += mutils::bytes_size(num_objects);
auto* pdelta = new DeltaCascadeStoreCore<KT,VT,IK,IV>::DeltaType();
while (num_objects--) {
auto po = mutils::from_bytes_noalloc<VT>(dsm,const_cast<uint8_t* const>(v) + pos);
pos += mutils::bytes_size(*po);
KT key = po->get_key_ref();
pdelta->objects.emplace(std::move(key),std::move(*po));
}
return mutils::context_ptr<DeltaCascadeStoreCore<KT,VT,IK,IV>::DeltaType>(pdelta);
}

template <typename KT, typename VT, KT* IK, VT* IV>
mutils::context_ptr<const typename DeltaCascadeStoreCore<KT, VT, IK, IV>::DeltaType>
DeltaCascadeStoreCore<KT, VT, IK, IV>::DeltaType::from_bytes_noalloc_const(
mutils::DeserializationManager* dsm,const uint8_t* const v) {
size_t pos = 0;
std::size_t num_objects = *mutils::from_bytes_noalloc<std::size_t>(dsm,v + pos);
pos += mutils::bytes_size(num_objects);
auto* pdelta = new DeltaCascadeStoreCore<KT,VT,IK,IV>::DeltaType();
while (num_objects--) {
auto po = mutils::from_bytes_noalloc<VT>(dsm,const_cast<uint8_t* const>(v) + pos);
pos += mutils::bytes_size(*po);
KT key = po->get_key_ref();
pdelta->objects.emplace(std::move(key),std::move(*po));
}
return mutils::context_ptr<const DeltaCascadeStoreCore<KT,VT,IK,IV>::DeltaType>(pdelta);
}

template <typename KT, typename VT, KT* IK, VT* IV>
size_t DeltaCascadeStoreCore<KT, VT, IK, IV>::currentDeltaSize() {
size_t delta_size = 0;
if (delta.size() > 0) {
delta_size += mutils::bytes_size(delta.size());
delta_size += mutils::bytes_size(static_cast<std::size_t>(delta.size()));
for (const auto& k:delta) {
delta_size+=mutils::bytes_size(this->kv_map[k]);
}
Expand All @@ -42,19 +117,17 @@ size_t DeltaCascadeStoreCore<KT, VT, IK, IV>::currentDeltaToBytes(uint8_t * cons
dbg_default_error("{}: failed because we need {} bytes for delta, but only a buffer with {} bytes given.\n",
__PRETTY_FUNCTION__, delta_size, buf_size);
}
size_t offset = mutils::to_bytes(delta.size(),buf);
size_t offset = mutils::to_bytes(static_cast<std::size_t>(delta.size()),buf);
for(const auto& k:delta) {
offset += mutils::to_bytes(this->kv_map[k],buf+offset);
}
delta.clear();
return offset;
}

template <typename KT, typename VT, KT* IK, VT* IV>
void DeltaCascadeStoreCore<KT, VT, IK, IV>::applyDelta(uint8_t const* const serialized_delta) {
auto num_objects =
*mutils::from_bytes<
std::result_of_t<decltype(&std::vector<KT>::size)(std::vector<KT>)>
>(nullptr,serialized_delta);
std::size_t num_objects = *mutils::from_bytes<std::size_t>(nullptr,serialized_delta);
size_t offset = mutils::bytes_size(num_objects);
while (num_objects--) {
offset +=
Expand Down
68 changes: 46 additions & 22 deletions include/cascade/detail/persistent_store_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,15 +130,17 @@ const VT PersistentCascadeStore<KT, VT, IK, IV, ST>::get(const KT& key, const pe
#endif
return persistent_core->lockless_get(key);
} else {
return persistent_core.template getDelta<VT>(requested_version, exact, [this, key, requested_version, exact, ver](const VT& v) {
if(key == v.get_key_ref()) {
return persistent_core.template getDelta<typename DeltaCascadeStoreCore<KT,VT,IK,IV>::DeltaType>(requested_version, exact,
[this, key, requested_version, exact, ver](const typename DeltaCascadeStoreCore<KT,VT,IK,IV>::DeltaType& delta) {
if(delta.objects.find(key) != delta.objects.cend()) {
debug_leave_func_with_value("key:{} is found at version:0x{:x}", key, requested_version);
#if __cplusplus > 201703L
LOG_TIMESTAMP_BY_TAG(TLT_PERSISTENT_GET_END, group,*IV,ver);
#else
LOG_TIMESTAMP_BY_TAG_EXTRA(TLT_PERSISTENT_GET_END, group,*IV,ver);
#endif
return v;
// This return is a copy to make sure returned value does not rely on the data in the delta log.
return delta.objects.at(key);
} else {
if(exact) {
// return invalid object for EXACT search.
Expand All @@ -158,9 +160,9 @@ const VT PersistentCascadeStore<KT, VT, IK, IV, ST>::get(const KT& key, const pe
persistent::version_t target_version = o.version;
while (target_version > requested_version) {
target_version =
persistent_core.template getDelta<VT>(target_version,true,
[](const VT& v){
return v.previous_version_by_key;
persistent_core.template getDelta<typename DeltaCascadeStoreCore<KT,VT,IK,IV>::DeltaType>(target_version,true,
[&key](const typename DeltaCascadeStoreCore<KT,VT,IK,IV>::DeltaType& delta){
return delta.objects.at(key).previous_version_by_key;
});
}
if (target_version == persistent::INVALID_VERSION) {
Expand All @@ -177,9 +179,11 @@ const VT PersistentCascadeStore<KT, VT, IK, IV, ST>::get(const KT& key, const pe
#else
LOG_TIMESTAMP_BY_TAG_EXTRA(TLT_PERSISTENT_GET_END, group,*IV,ver);
#endif
return persistent_core.template getDelta<VT>(target_version,true,
[](const VT& v){
return v;
return persistent_core.template getDelta<typename DeltaCascadeStoreCore<KT,VT,IK,IV>::DeltaType>(target_version,true,
[&key](const typename DeltaCascadeStoreCore<KT,VT,IK,IV>::DeltaType& delta){
// This return is a copy, which make sure the returned value does not rely on the data in
// the delta log.
return delta.objects.at(key);
});
}
}
Expand Down Expand Up @@ -300,10 +304,10 @@ uint64_t PersistentCascadeStore<KT, VT, IK, IV, ST>::get_size(const KT& key, con
#endif
return rvo_val;
} else {
return persistent_core.template getDelta<VT>(requested_version, exact, [this, key, requested_version, exact, ver](const VT& v) -> uint64_t {
if(key == v.get_key_ref()) {
return persistent_core.template getDelta<typename DeltaCascadeStoreCore<KT,VT,IK,IV>::DeltaType>(requested_version, exact, [this, &key, requested_version, exact, ver](const typename DeltaCascadeStoreCore<KT,VT,IK,IV>::DeltaType& delta) -> uint64_t {
if(delta.objects.find(key)!=delta.objects.cend()) {
debug_leave_func_with_value("key:{} is found at version:0x{:x}", key, requested_version);
uint64_t size = mutils::bytes_size(v);
uint64_t size = mutils::bytes_size(delta.objects.at(key));
#if __cplusplus > 201703L
LOG_TIMESTAMP_BY_TAG(TLT_PERSISTENT_GET_SIZE_END, group,*IV,ver);
#else
Expand All @@ -322,24 +326,44 @@ uint64_t PersistentCascadeStore<KT, VT, IK, IV, ST>::get_size(const KT& key, con
return 0ull;
} else {
// fall back to the slow path.
auto versioned_state_ptr = persistent_core.get(requested_version);
if(versioned_state_ptr->kv_map.find(key) != versioned_state_ptr->kv_map.end()) {
debug_leave_func_with_value("Reconstructed version:0x{:x} for key:{}", requested_version, key);
uint64_t size = mutils::bytes_size(versioned_state_ptr->kv_map.at(key));
// following the backward chain until its version is behind requested_version.
// TODO: We can introduce a per-key version index to achieve a better performance
// with a 64bit per log entry memory overhead.
VT o = persistent_core->lockless_get(key);
persistent::version_t target_version = o.version;
while (target_version > requested_version) {
target_version =
persistent_core.template getDelta<typename DeltaCascadeStoreCore<KT,VT,IK,IV>::DeltaType>(target_version,true,
[&key](const typename DeltaCascadeStoreCore<KT,VT,IK,IV>::DeltaType& delta){
if (delta.objects.find(key) != delta.objects.cend()) {
return delta.objects.at(key).previous_version_by_key;
}
return persistent::INVALID_VERSION;
});
}
if (target_version == persistent::INVALID_VERSION) {
#if __cplusplus > 201703L
LOG_TIMESTAMP_BY_TAG(TLT_PERSISTENT_GET_SIZE_END, group,*IV,ver);
#else
LOG_TIMESTAMP_BY_TAG_EXTRA(TLT_PERSISTENT_GET_SIZE_END, group,*IV,ver);
#endif
return size;
}
debug_leave_func_with_value("No data found for key:{} before version:0x{:x}", key, requested_version);
debug_leave_func_with_value("No data found for key:{} before version:0x{:x}", key, requested_version);
return 0ull;
} else {
auto size = persistent_core.template getDelta<typename DeltaCascadeStoreCore<KT,VT,IK,IV>::DeltaType>(target_version,true,
[&key](const typename DeltaCascadeStoreCore<KT,VT,IK,IV>::DeltaType& delta){
if (delta.objects.find(key) != delta.objects.cend()) {
return static_cast<uint64_t>(mutils::bytes_size(delta.objects.at(key)));
}
return static_cast<uint64_t>(0ull);
});
#if __cplusplus > 201703L
LOG_TIMESTAMP_BY_TAG(TLT_PERSISTENT_GET_SIZE_END, group,*IV,ver);
LOG_TIMESTAMP_BY_TAG(TLT_PERSISTENT_GET_SIZE_END, group,*IV,ver);
#else
LOG_TIMESTAMP_BY_TAG_EXTRA(TLT_PERSISTENT_GET_SIZE_END, group,*IV,ver);
LOG_TIMESTAMP_BY_TAG_EXTRA(TLT_PERSISTENT_GET_SIZE_END, group,*IV,ver);
#endif
return 0ull;
return size;
}
}
}
});
Expand Down

0 comments on commit 1e31d58

Please sign in to comment.