Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/incubator-datasketches-cpp…
Browse files Browse the repository at this point in the history
… into 2.1.x-incubating
  • Loading branch information
jmalkin committed Aug 14, 2020
2 parents 4c0edd3 + fa95167 commit c1a6f8e
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 116 deletions.
37 changes: 22 additions & 15 deletions kll/include/kll_quantile_calculator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,31 +26,38 @@ namespace datasketches {

template <typename T, typename C, typename A>
class kll_quantile_calculator {
typedef typename std::allocator_traits<A>::template rebind_alloc<uint32_t> AllocU32;
typedef typename std::allocator_traits<A>::template rebind_alloc<uint64_t> AllocU64;
public:
// assumes that all levels are sorted including level 0
kll_quantile_calculator(const T* items, const uint32_t* levels, uint8_t num_levels, uint64_t n);
~kll_quantile_calculator();
T get_quantile(double fraction) const;

private:
using AllocU32 = typename std::allocator_traits<A>::template rebind_alloc<uint32_t>;
using vector_u32 = std::vector<uint32_t, AllocU32>;
using Entry = std::pair<T, uint64_t>;
using AllocEntry = typename std::allocator_traits<A>::template rebind_alloc<Entry>;
using Container = std::vector<Entry, AllocEntry>;
uint64_t n_;
T* items_;
uint64_t* weights_;
uint32_t* levels_;
uint8_t levels_size_;
uint8_t num_levels_;
vector_u32 levels_;
Container entries_;

void populate_from_sketch(const T* items, uint32_t num_items, const uint32_t* levels, uint8_t num_levels);
void populate_from_sketch(const T* items, const uint32_t* levels, uint8_t num_levels);
T approximately_answer_positional_query(uint64_t pos) const;
static void convert_to_preceding_cummulative(uint64_t* weights, uint32_t weights_size);
void convert_to_preceding_cummulative();
uint32_t chunk_containing_pos(uint64_t pos) const;
uint32_t search_for_chunk_containing_pos(uint64_t pos, uint32_t l, uint32_t r) const;
static void merge_sorted_blocks(Container& entries, const uint32_t* levels, uint8_t num_levels, uint32_t num_items);
static void merge_sorted_blocks_direct(Container& orig, Container& temp, const uint32_t* levels, uint8_t starting_level, uint8_t num_levels);
static void merge_sorted_blocks_reversed(Container& orig, Container& temp, const uint32_t* levels, uint8_t starting_level, uint8_t num_levels);
static uint64_t pos_of_phi(double phi, uint64_t n);
static uint32_t chunk_containing_pos(uint64_t* weights, uint32_t weights_size, uint64_t pos);
static uint32_t search_for_chunk_containing_pos(const uint64_t* arr, uint64_t pos, uint32_t l, uint32_t r);
static void blocky_tandem_merge_sort(T* items, uint64_t* weights, uint32_t num_items, const uint32_t* levels, uint8_t num_levels);
static void blocky_tandem_merge_sort_recursion(T* items_src, uint64_t* weights_src, T* items_dst, uint64_t* weights_dst, const uint32_t* levels, uint8_t starting_level, uint8_t num_levels);
static void tandem_merge(const T* items_src, const uint64_t* weights_src, T* items_dst, uint64_t* weights_dst, const uint32_t* levels, uint8_t starting_level_1, uint8_t num_levels_1, uint8_t starting_level_2, uint8_t num_levels_2);

template<typename Comparator>
struct compare_pair_by_first {
template<typename Entry1, typename Entry2>
bool operator()(Entry1&& a, Entry2&& b) const {
return Comparator()(std::forward<Entry1>(a).first, std::forward<Entry2>(b).first);
}
};
};

} /* namespace datasketches */
Expand Down
159 changes: 69 additions & 90 deletions kll/include/kll_quantile_calculator_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,31 +22,22 @@

#include <memory>
#include <cmath>
#include <algorithm>

#include "kll_helper.hpp"

namespace datasketches {

template <typename T, typename C, typename A>
kll_quantile_calculator<T, C, A>::kll_quantile_calculator(const T* items, const uint32_t* levels, uint8_t num_levels, uint64_t n) {
n_ = n;
kll_quantile_calculator<T, C, A>::kll_quantile_calculator(const T* items, const uint32_t* levels, uint8_t num_levels, uint64_t n):
n_(n), levels_(num_levels + 1)
{
const uint32_t num_items = levels[num_levels] - levels[0];
items_ = A().allocate(num_items);
weights_ = AllocU64().allocate(num_items + 1); // one more is intentional
levels_size_ = num_levels + 1;
levels_ = AllocU32().allocate(levels_size_);
populate_from_sketch(items, num_items, levels, num_levels);
blocky_tandem_merge_sort(items_, weights_, num_items, levels_, num_levels_);
convert_to_preceding_cummulative(weights_, num_items + 1);
}

template <typename T, typename C, typename A>
kll_quantile_calculator<T, C, A>::~kll_quantile_calculator() {
const uint32_t num_items = levels_[num_levels_] - levels_[0];
for (uint32_t i = 0; i < num_items; i++) items_[i].~T();
A().deallocate(items_, num_items);
AllocU64().deallocate(weights_, num_items + 1);
AllocU32().deallocate(levels_, levels_size_);
entries_.reserve(num_items);
populate_from_sketch(items, levels, num_levels);
merge_sorted_blocks(entries_, levels_.data(), levels_.size() - 1, num_items);
if (!is_sorted(entries_.begin(), entries_.end(), compare_pair_by_first<C>())) throw std::logic_error("entries must be sorted");
convert_to_preceding_cummulative();
}

template <typename T, typename C, typename A>
Expand All @@ -55,42 +46,43 @@ T kll_quantile_calculator<T, C, A>::get_quantile(double fraction) const {
}

template <typename T, typename C, typename A>
void kll_quantile_calculator<T, C, A>::populate_from_sketch(const T* items, uint32_t num_items, const uint32_t* levels, uint8_t num_levels) {
kll_helper::copy_construct<T>(items, levels[0], levels[num_levels], items_, 0);
uint8_t src_level = 0;
uint8_t dst_level = 0;
void kll_quantile_calculator<T, C, A>::populate_from_sketch(const T* items, const uint32_t* levels, uint8_t num_levels) {
size_t src_level = 0;
size_t dst_level = 0;
uint64_t weight = 1;
uint32_t offset = levels[0];
while (src_level < num_levels) {
const uint32_t from_index(levels[src_level] - offset);
const uint32_t to_index(levels[src_level + 1] - offset); // exclusive
if (from_index < to_index) { // skip empty levels
std::fill(&weights_[from_index], &weights_[to_index], weight);
for (uint32_t i = from_index; i < to_index; ++i) {
entries_.push_back(Entry(items[i + offset], weight));
}
levels_[dst_level] = from_index;
levels_[dst_level + 1] = to_index;
dst_level++;
}
src_level++;
weight *= 2;
}
weights_[num_items] = 0;
num_levels_ = dst_level;
if (levels_.size() > static_cast<size_t>(dst_level + 1)) levels_.resize(dst_level + 1);
}

template <typename T, typename C, typename A>
T kll_quantile_calculator<T, C, A>::approximately_answer_positional_query(uint64_t pos) const {
if (pos >= n_) throw std::logic_error("position out of range");
const uint32_t weights_size(levels_[num_levels_] + 1);
const uint32_t index = chunk_containing_pos(weights_, weights_size, pos);
return items_[index];
const uint32_t num_items = levels_[levels_.size() - 1];
if (pos > entries_[num_items - 1].second) return entries_[num_items - 1].first;
const uint32_t index = chunk_containing_pos(pos);
return entries_[index].first;
}

template <typename T, typename C, typename A>
void kll_quantile_calculator<T, C, A>::convert_to_preceding_cummulative(uint64_t* weights, uint32_t weights_size) {
uint64_t subtotal(0);
for (uint32_t i = 0; i < weights_size; i++) {
const uint32_t new_subtotal = subtotal + weights[i];
weights[i] = subtotal;
void kll_quantile_calculator<T, C, A>::convert_to_preceding_cummulative() {
uint64_t subtotal = 0;
for (auto& entry: entries_) {
const uint32_t new_subtotal = subtotal + entry.second;
entry.second = subtotal;
subtotal = new_subtotal;
}
}
Expand All @@ -102,87 +94,74 @@ uint64_t kll_quantile_calculator<T, C, A>::pos_of_phi(double phi, uint64_t n) {
}

template <typename T, typename C, typename A>
uint32_t kll_quantile_calculator<T, C, A>::chunk_containing_pos(uint64_t* weights, uint32_t weights_size, uint64_t pos) {
if (weights_size <= 1) throw std::logic_error("weights array too short"); // weights_ contains an "extra" position
const uint32_t nominal_length(weights_size - 1);
if (pos < weights[0]) throw std::logic_error("position too small");
if (pos >= weights[nominal_length]) throw std::logic_error("position too large");
return search_for_chunk_containing_pos(weights, pos, 0, nominal_length);
uint32_t kll_quantile_calculator<T, C, A>::chunk_containing_pos(uint64_t pos) const {
if (entries_.size() < 1) throw std::logic_error("array too short");
if (pos < entries_[0].second) throw std::logic_error("position too small");
if (pos > entries_[entries_.size() - 1].second) throw std::logic_error("position too large");
return search_for_chunk_containing_pos(pos, 0, entries_.size());
}

template <typename T, typename C, typename A>
uint32_t kll_quantile_calculator<T, C, A>::search_for_chunk_containing_pos(const uint64_t* arr, uint64_t pos, uint32_t l, uint32_t r) {
uint32_t kll_quantile_calculator<T, C, A>::search_for_chunk_containing_pos(uint64_t pos, uint32_t l, uint32_t r) const {
if (l + 1 == r) {
return l;
}
const uint32_t m(l + (r - l) / 2);
if (arr[m] <= pos) {
return search_for_chunk_containing_pos(arr, pos, m, r);
if (entries_[m].second <= pos) {
return search_for_chunk_containing_pos(pos, m, r);
}
return search_for_chunk_containing_pos(arr, pos, l, m);
return search_for_chunk_containing_pos(pos, l, m);
}

template <typename T, typename C, typename A>
void kll_quantile_calculator<T, C, A>::blocky_tandem_merge_sort(T* items, uint64_t* weights, uint32_t num_items, const uint32_t* levels, uint8_t num_levels) {
void kll_quantile_calculator<T, C, A>::merge_sorted_blocks(Container& entries, const uint32_t* levels, uint8_t num_levels, uint32_t num_items) {
if (num_levels == 1) return;

// move the input in preparation for the "ping-pong" reduction strategy
auto tmp_items_deleter = [num_items](T* ptr) {
for (uint32_t i = 0; i < num_items; i++) ptr[i].~T();
A().deallocate(ptr, num_items);
};
std::unique_ptr<T, decltype(tmp_items_deleter)> tmp_items(A().allocate(num_items), tmp_items_deleter);
kll_helper::move_construct<T>(items, 0, num_items, tmp_items.get(), 0, false); // do not destroy since the items will be moved back
auto tmp_weights_deleter = [num_items](uint64_t* ptr) { AllocU64().deallocate(ptr, num_items); };
std::unique_ptr<uint64_t[], decltype(tmp_weights_deleter)> tmp_weights(AllocU64().allocate(num_items), tmp_weights_deleter); // don't need the extra one here
std::copy(weights, &weights[num_items], tmp_weights.get());
blocky_tandem_merge_sort_recursion(tmp_items.get(), tmp_weights.get(), items, weights, levels, 0, num_levels);
Container temporary;
temporary.reserve(num_items);
merge_sorted_blocks_direct(entries, temporary, levels, 0, num_levels);
}

template <typename T, typename C, typename A>
void kll_quantile_calculator<T, C, A>::blocky_tandem_merge_sort_recursion(T* items_src, uint64_t* weights_src, T* items_dst, uint64_t* weights_dst, const uint32_t* levels, uint8_t starting_level, uint8_t num_levels) {
void kll_quantile_calculator<T, C, A>::merge_sorted_blocks_direct(Container& orig, Container& temp, const uint32_t* levels,
uint8_t starting_level, uint8_t num_levels) {
if (num_levels == 1) return;
const uint8_t num_levels_1 = num_levels / 2;
const uint8_t num_levels_2 = num_levels - num_levels_1;
if (num_levels_1 < 1) throw std::logic_error("level above 0 expected");
if (num_levels_2 < num_levels_1) throw std::logic_error("wrong order of levels");
const uint8_t starting_level_1 = starting_level;
const uint8_t starting_level_2 = starting_level + num_levels_1;
// swap roles of src and dst
blocky_tandem_merge_sort_recursion(items_dst, weights_dst, items_src, weights_src, levels, starting_level_1, num_levels_1);
blocky_tandem_merge_sort_recursion(items_dst, weights_dst, items_src, weights_src, levels, starting_level_2, num_levels_2);
tandem_merge(items_src, weights_src, items_dst, weights_dst, levels, starting_level_1, num_levels_1, starting_level_2, num_levels_2);
const auto chunk_begin = temp.begin() + temp.size();
merge_sorted_blocks_reversed(orig, temp, levels, starting_level_1, num_levels_1);
merge_sorted_blocks_reversed(orig, temp, levels, starting_level_2, num_levels_2);
const uint32_t num_items_1 = levels[starting_level_1 + num_levels_1] - levels[starting_level_1];
std::merge(
std::make_move_iterator(chunk_begin), std::make_move_iterator(chunk_begin + num_items_1),
std::make_move_iterator(chunk_begin + num_items_1), std::make_move_iterator(temp.end()),
orig.begin() + levels[starting_level], compare_pair_by_first<C>()
);
temp.erase(chunk_begin, temp.end());
}

template <typename T, typename C, typename A>
void kll_quantile_calculator<T, C, A>::tandem_merge(const T* items_src, const uint64_t* weights_src, T* items_dst, uint64_t* weights_dst, const uint32_t* levels, uint8_t starting_level_1, uint8_t num_levels_1, uint8_t starting_level_2, uint8_t num_levels_2) {
const auto from_index_1 = levels[starting_level_1];
const auto to_index_1 = levels[starting_level_1 + num_levels_1]; // exclusive
const auto from_index_2 = levels[starting_level_2];
const auto to_index_2 = levels[starting_level_2 + num_levels_2]; // exclusive
auto i_src_1 = from_index_1;
auto i_src_2 = from_index_2;
auto i_dst = from_index_1;

while ((i_src_1 < to_index_1) && (i_src_2 < to_index_2)) {
if (C()(items_src[i_src_1], items_src[i_src_2])) {
items_dst[i_dst] = std::move(items_src[i_src_1]);
weights_dst[i_dst] = weights_src[i_src_1];
i_src_1++;
} else {
items_dst[i_dst] = std::move(items_src[i_src_2]);
weights_dst[i_dst] = weights_src[i_src_2];
i_src_2++;
}
i_dst++;
}
if (i_src_1 < to_index_1) {
std::move(&items_src[i_src_1], &items_src[to_index_1], &items_dst[i_dst]);
std::copy(&weights_src[i_src_1], &weights_src[to_index_1], &weights_dst[i_dst]);
} else if (i_src_2 < to_index_2) {
std::move(&items_src[i_src_2], &items_src[to_index_2], &items_dst[i_dst]);
std::copy(&weights_src[i_src_2], &weights_src[to_index_2], &weights_dst[i_dst]);
void kll_quantile_calculator<T, C, A>::merge_sorted_blocks_reversed(Container& orig, Container& temp, const uint32_t* levels,
uint8_t starting_level, uint8_t num_levels) {
if (num_levels == 1) {
std::move(orig.begin() + levels[starting_level], orig.begin() + levels[starting_level + 1], std::back_inserter(temp));
return;
}
const uint8_t num_levels_1 = num_levels / 2;
const uint8_t num_levels_2 = num_levels - num_levels_1;
const uint8_t starting_level_1 = starting_level;
const uint8_t starting_level_2 = starting_level + num_levels_1;
merge_sorted_blocks_direct(orig, temp, levels, starting_level_1, num_levels_1);
merge_sorted_blocks_direct(orig, temp, levels, starting_level_2, num_levels_2);
std::merge(
std::make_move_iterator(orig.begin() + levels[starting_level_1]),
std::make_move_iterator(orig.begin() + levels[starting_level_1 + num_levels_1]),
std::make_move_iterator(orig.begin() + levels[starting_level_2]),
std::make_move_iterator(orig.begin() + levels[starting_level_2 + num_levels_2]),
std::back_inserter(temp),
compare_pair_by_first<C>()
);
}

} /* namespace datasketches */
Expand Down
23 changes: 12 additions & 11 deletions kll/include/kll_sketch_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -312,9 +312,9 @@ std::vector<T, A> kll_sketch<T, C, S, A>::get_quantiles(size_t num) const {
template<typename T, typename C, typename S, typename A>
double kll_sketch<T, C, S, A>::get_rank(const T& value) const {
if (is_empty()) return std::numeric_limits<double>::quiet_NaN();
uint8_t level(0);
uint64_t weight(1);
uint64_t total(0);
uint8_t level = 0;
uint64_t weight = 1;
uint64_t total = 0;
while (level < num_levels_) {
const auto from_index(levels_[level]);
const auto to_index(levels_[level + 1]); // exclusive
Expand Down Expand Up @@ -391,7 +391,7 @@ void kll_sketch<T, C, S, A>::serialize(std::ostream& os) const {
os.write(reinterpret_cast<const char*>(&flags_byte), sizeof(flags_byte));
os.write((char*)&k_, sizeof(k_));
os.write((char*)&m_, sizeof(m_));
const uint8_t unused(0);
const uint8_t unused = 0;
os.write(reinterpret_cast<const char*>(&unused), sizeof(unused));
if (is_empty()) return;
if (!is_single_item) {
Expand All @@ -412,7 +412,7 @@ vector_u8<A> kll_sketch<T, C, S, A>::serialize(unsigned header_size_bytes) const
const size_t size = header_size_bytes + get_serialized_size_bytes();
vector_u8<A> bytes(size);
uint8_t* ptr = bytes.data() + header_size_bytes;
uint8_t* end_ptr = ptr + size;
const uint8_t* end_ptr = ptr + size;
const uint8_t preamble_ints(is_empty() || is_single_item ? PREAMBLE_INTS_SHORT : PREAMBLE_INTS_FULL);
ptr += copy_to_mem(&preamble_ints, ptr, sizeof(preamble_ints));
const uint8_t serial_version(is_single_item ? SERIAL_VERSION_2 : SERIAL_VERSION_1);
Expand All @@ -427,7 +427,7 @@ vector_u8<A> kll_sketch<T, C, S, A>::serialize(unsigned header_size_bytes) const
ptr += copy_to_mem(&flags_byte, ptr, sizeof(flags_byte));
ptr += copy_to_mem(&k_, ptr, sizeof(k_));
ptr += copy_to_mem(&m_, ptr, sizeof(m_));
const uint8_t unused(0);
const uint8_t unused = 0;
ptr += copy_to_mem(&unused, ptr, sizeof(unused));
if (!is_empty()) {
if (!is_single_item) {
Expand Down Expand Up @@ -776,8 +776,8 @@ vector_d<A> kll_sketch<T, C, S, A>::get_PMF_or_CDF(const T* split_points, uint32
if (is_empty()) return vector_d<A>();
kll_helper::validate_values<T, C>(split_points, size);
vector_d<A> buckets(size + 1, 0);
uint8_t level(0);
uint64_t weight(1);
uint8_t level = 0;
uint64_t weight = 1;
while (level < num_levels_) {
const auto from_index = levels_[level];
const auto to_index = levels_[level + 1]; // exclusive
Expand Down Expand Up @@ -871,8 +871,9 @@ void kll_sketch<T, C, S, A>::merge_higher_levels(O&& other, uint64_t final_n) {
const uint32_t free_space_at_bottom = result.final_capacity - result.final_num_items;
kll_helper::move_construct<T>(workbuf.get(), outlevels[0], outlevels[0] + result.final_num_items, items_, free_space_at_bottom, true);

if (levels_.size() < (result.final_num_levels + 1)) {
levels_.resize(result.final_num_levels + 1);
const size_t new_levels_size = result.final_num_levels + 1;
if (levels_.size() < new_levels_size) {
levels_.resize(new_levels_size);
}
const uint32_t offset = free_space_at_bottom - outlevels[0];
for (uint8_t lvl = 0; lvl < levels_.size(); lvl++) { // includes the "extra" index
Expand Down Expand Up @@ -1027,7 +1028,7 @@ string<A> kll_sketch<T, C, S, A>::to_string(bool print_levels, bool print_items)

if (print_items) {
os << "### KLL sketch data:" << std::endl;
uint8_t level(0);
uint8_t level = 0;
while (level < num_levels_) {
const uint32_t from_index = levels_[level];
const uint32_t to_index = levels_[level + 1]; // exclusive
Expand Down

0 comments on commit c1a6f8e

Please sign in to comment.