Skip to content

Commit

Permalink
Fix crash in tdb_matrix_with_ids when there was an empty partition (#389
Browse files Browse the repository at this point in the history
)
  • Loading branch information
jparismorgan authored Jun 17, 2024
1 parent c42b9a8 commit 82cc11e
Show file tree
Hide file tree
Showing 2 changed files with 133 additions and 94 deletions.
152 changes: 59 additions & 93 deletions src/include/detail/linalg/tdb_partitioned_matrix.h
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ class tdbPartitionedMatrix
, relevant_parts_(relevant_parts)
, squashed_indices_(size(relevant_parts_) + 1)
, last_resident_part_{0} {
scoped_timer _{tdb_func__ + " " + partitioned_vectors_uri_};
if (relevant_parts_.size() >= indices.size()) {
throw std::runtime_error(
"Invalid partitioning, relevant_parts_ size (" +
Expand All @@ -287,9 +288,15 @@ class tdbPartitionedMatrix
std::to_string(indices.size()) + ")");
}

total_num_parts_ = size(relevant_parts_);
tiledb_datatype_t attr_type =
partitioned_vectors_schema_.attribute(0).type();
if (attr_type != tiledb::impl::type_to_tiledb<T>::tiledb_type) {
throw std::runtime_error(
"Attribute type mismatch: " + std::to_string(attr_type) + " != " +
std::to_string(tiledb::impl::type_to_tiledb<T>::tiledb_type));
}

scoped_timer _{tdb_func__ + " " + partitioned_vectors_uri_};
total_num_parts_ = size(relevant_parts_);

auto cell_order = partitioned_vectors_schema_.cell_order();
auto tile_order = partitioned_vectors_schema_.tile_order();
Expand Down Expand Up @@ -420,47 +427,23 @@ class tdbPartitionedMatrix
" != " + std::to_string(max_resident_parts_ + 1));
}

// The number of resident partitions.
// In a previous load() we may have read in some partitions. Start from
// where we left off:
// - The initial partition number of the resident partitions.
const index_type first_resident_part = last_resident_part_;
// - The initial index numbers of the resident columns.
const index_type first_resident_col = last_resident_col_;

// 1. Calculate the number of resident partitions to load.
size_t num_resident_parts{0};
// The offset of the first partitions in the resident vectors.
// Should be equal to first element of part_view_.
index_type resident_part_offset{0};
// The initial partition number of the resident partitions.
index_type first_resident_part{0};
// The initial index numbers of the resident columns.
index_type first_resident_col{0};
{
const size_t attr_idx = 0;
auto attr = partitioned_vectors_schema_.attribute(attr_idx);

std::string attr_name = attr.name();
tiledb_datatype_t attr_type = attr.type();
if (attr_type != tiledb::impl::type_to_tiledb<T>::tiledb_type) {
throw std::runtime_error(
"Attribute type mismatch: " + std::to_string(attr_type) + " != " +
std::to_string(tiledb::impl::type_to_tiledb<T>::tiledb_type));
}

/*
* Fit as many partitions as we can into column_capacity_
*/

// In a previous load() we may have read in some partitions. Start from
// where we left off.
first_resident_col = last_resident_col_;
first_resident_part = last_resident_part_;

// Now our goal is to calculate the number of columns (i.e. vectors) that
// we can read in, and set num_resident_cols_ to that.
// we can read in, and set num_resident_cols_ to that. We want to fit as
// many partitions as we can into column_capacity_.
last_resident_part_ = first_resident_part;
for (size_t i = first_resident_part; i < total_num_parts_; ++i) {
auto next_part_size = squashed_indices_[i + 1] - squashed_indices_[i];

// Continue if this partition is empty
if (next_part_size == 0) {
continue;
}

if (last_resident_col_ + next_part_size >
first_resident_col + column_capacity_) {
break;
Expand All @@ -482,24 +465,23 @@ class tdbPartitionedMatrix

// This is the number of partitions we will read in.
num_resident_parts = last_resident_part_ - first_resident_part;
resident_part_offset = first_resident_part;
if (num_resident_parts > max_resident_parts_) {
throw std::runtime_error(
"Invalid partitioning, num_resident_parts " +
std::to_string(num_resident_parts) + " > " +
std::to_string(max_resident_parts_));
}

if (num_resident_cols_ == 0) {
return false;
}
if ((num_resident_cols_ == 0 && num_resident_parts != 0) ||
(num_resident_cols_ != 0 && num_resident_parts == 0)) {
throw std::runtime_error(
"Invalid partitioning, " + std::to_string(num_resident_cols_) +
" resident cols and " + std::to_string(num_resident_parts) +
" resident parts");
}
if (num_resident_cols_ == 0) {
return false;
}

if (this->part_index_.size() != max_resident_parts_ + 1) {
throw std::runtime_error(
Expand All @@ -508,19 +490,24 @@ class tdbPartitionedMatrix
") != max_resident_parts_ + 1 (" +
std::to_string(max_resident_parts_ + 1) + ")");
}
}

/*
* Set up the subarray to read the partitions
*/
// 2. Load the vectors and IDs.
{
// a. Set up the vectors subarray.
auto attr = partitioned_vectors_schema_.attribute(0);
std::string attr_name = attr.name();
tiledb::Subarray subarray(ctx_, *(this->partitioned_vectors_array_));

// For a 128 dimension vector, Dimension 0 will go from 0 to 127.
auto dimension = num_array_rows_;
subarray.add_range(0, 0, (int)dimension - 1);

/**
* Read in the next batch of partitions
*/
// b. Set up the IDs subarray.
auto ids_attr = ids_schema_.attribute(0);
std::string ids_attr_name = ids_attr.name();
tiledb::Subarray ids_subarray(ctx_, *partitioned_ids_array_);

// b. Read in the next batch of partitions
size_t col_count = 0;
for (size_t j = first_resident_part; j < last_resident_part_; ++j) {
size_t start = master_indices_[relevant_parts_[j]];
Expand All @@ -531,81 +518,47 @@ class tdbPartitionedMatrix
}
col_count += len;
subarray.add_range(1, (int)start, (int)stop - 1);
ids_subarray.add_range(0, (int)start, (int)stop - 1);
}
if (col_count != last_resident_col_ - first_resident_col) {
throw std::runtime_error("Column count mismatch");
}

auto cell_order = partitioned_vectors_schema_.cell_order();
auto layout_order = cell_order;

// c. Execute the vectors query.
tiledb::Query query(ctx_, *(this->partitioned_vectors_array_));

auto ptr = this->data();
query.set_subarray(subarray)
.set_layout(layout_order)
.set_layout(partitioned_vectors_schema_.cell_order())
.set_data_buffer(attr_name, ptr, col_count * dimension);
// tiledb_helpers::submit_query(tdb_func__, partitioned_vectors_uri_,
// query);
query.submit();
tiledb_helpers::submit_query(tdb_func__, partitioned_vectors_uri_, query);
_memory_data.insert_entry(tdb_func__, col_count * dimension * sizeof(T));

// assert(tiledb::Query::Status::COMPLETE == query.query_dstatus());
auto qs = query.query_status();
// @todo Handle incomplete queries.
if (tiledb::Query::Status::COMPLETE != query.query_status()) {
throw std::runtime_error("Query status is not complete -- fix me");
}
}

// Repeat for ids -- use separate scopes for partitions and ids to keep from
// cross pollinating identifiers
// @todo -- combine these two blocks
{
auto ids_attr_idx = 0;

auto ids_attr = ids_schema_.attribute(ids_attr_idx);
std::string ids_attr_name = ids_attr.name();

tiledb::Subarray ids_subarray(ctx_, *partitioned_ids_array_);

size_t ids_col_count = 0;
for (size_t j = first_resident_part; j < last_resident_part_; ++j) {
size_t start = master_indices_[relevant_parts_[j]];
size_t stop = master_indices_[relevant_parts_[j] + 1];
size_t len = stop - start;
if (len == 0) {
continue;
}
ids_col_count += len;
ids_subarray.add_range(0, (int)start, (int)stop - 1);
}
if (ids_col_count != last_resident_col_ - first_resident_col) {
throw std::runtime_error("Column count mismatch");
}

// d. Execute the IDs query.
tiledb::Query ids_query(ctx_, *partitioned_ids_array_);

auto ids_ptr = this->ids_.data();
ids_query.set_subarray(ids_subarray)
.set_data_buffer(ids_attr_name, ids_ptr, ids_col_count);
.set_data_buffer(ids_attr_name, ids_ptr, col_count);
tiledb_helpers::submit_query(tdb_func__, partitioned_ids_uri_, ids_query);
_memory_data.insert_entry(tdb_func__, ids_col_count * sizeof(T));
_memory_data.insert_entry(tdb_func__, col_count * sizeof(T));

// assert(tiledb::Query::Status::COMPLETE == query.query_status());
if (tiledb::Query::Status::COMPLETE != ids_query.query_status()) {
throw std::runtime_error("Query status is not complete -- fix me");
}
}

/*
* Copy indices for resident partitions into Base::part_index_
* resident_part_offset will be the first index into squashed
* Also [first_resident_part, last_resident_part_)
*/
auto sub = squashed_indices_[resident_part_offset];
// 3. Copy indices for resident partitions into Base::part_index_
// first_resident_part will be the first index into squashed
// Also [first_resident_part, last_resident_part_)
auto sub = squashed_indices_[first_resident_part];
for (size_t i = 0; i < num_resident_parts + 1; ++i) {
this->part_index_[i] = squashed_indices_[i + resident_part_offset] - sub;
this->part_index_[i] = squashed_indices_[i + first_resident_part] - sub;
}

this->num_vectors_ = num_resident_cols_;
Expand All @@ -626,6 +579,19 @@ class tdbPartitionedMatrix
partitioned_ids_array_->close();
}
}

void debug_tdb_partitioned_matrix(const std::string& msg, size_t max_size) {
debug_partitioned_matrix(*this, msg, max_size);
debug_vector(master_indices_, "# master_indices_", max_size);
debug_vector(relevant_parts_, "# relevant_parts_", max_size);
debug_vector(squashed_indices_, "# squashed_indices_", max_size);
std::cout << "# total_num_parts_: " << total_num_parts_ << std::endl;
std::cout << "# last_resident_part_: " << last_resident_part_ << std::endl;
std::cout << "# column_capacity_: " << column_capacity_ << std::endl;
std::cout << "# num_resident_cols_: " << num_resident_cols_ << std::endl;
std::cout << "# last_resident_col_: " << last_resident_col_ << std::endl;
std::cout << "# max_resident_parts_: " << max_resident_parts_ << std::endl;
}
};

/**
Expand Down
75 changes: 74 additions & 1 deletion src/include/test/unit_tdb_partitioned_matrix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "cpos.h"
#include "detail/linalg/matrix.h"
#include "detail/linalg/tdb_io.h"
#include "detail/linalg/tdb_matrix_with_ids.h"
#include "detail/linalg/tdb_partitioned_matrix.h"
#include "mdspan/mdspan.hpp"

Expand Down Expand Up @@ -85,7 +86,6 @@ std::vector<std::vector<T>> generateSubsets(int num_parts) {
}

TEST_CASE("can load correctly", "[tdb_partitioned_matrix]") {
return;
tiledb::Context ctx;
tiledb::VFS vfs(ctx);

Expand Down Expand Up @@ -316,3 +316,76 @@ TEST_CASE("test different combinations", "[tdb_partitioned_matrix]") {
}
}
}

TEST_CASE(
"tdb_partitioned_matrix: empty partition", "[tdb_partitioned_matrix]") {
tiledb::Context ctx;
tiledb::VFS vfs(ctx);

using feature_type = uint64_t;
using id_type = uint64_t;
using part_index_type = uint64_t;

std::string partitioned_vectors_uri =
(std::filesystem::temp_directory_path() / "partitioned_vectors").string();
std::string ids_uri =
(std::filesystem::temp_directory_path() / "ids").string();

size_t num_vectors = 10000;
size_t dimensions = 128;

// Setup data.
{
if (vfs.is_dir(partitioned_vectors_uri)) {
vfs.remove_dir(partitioned_vectors_uri);
}
if (vfs.is_dir(ids_uri)) {
vfs.remove_dir(ids_uri);
}

auto partitioned_vectors =
ColMajorMatrix<feature_type>(dimensions, num_vectors);
for (size_t i = 0; i < dimensions; ++i) {
for (size_t j = 0; j < num_vectors; ++j) {
partitioned_vectors(i, j) = j;
}
}
write_matrix(ctx, partitioned_vectors, partitioned_vectors_uri);
std::vector<id_type> ids(num_vectors, 0);
for (size_t i = 0; i < num_vectors; ++i) {
ids[i] = i;
}
write_vector(ctx, ids, ids_uri);
}

// Test that we do not crash if we have an empty part (i.e. two elements in
// indices with the same value). These values were taken from running
// `api_ivf_flat_index: read index and query infinite and finite - finite out
// of core, 1000, nprobe: 32, max_iter: 8` which used to crash with these
// values.
std::vector<part_index_type> relevant_parts = {
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16,
18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 31, 32, 33, 34,
35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50,
51, 52, 53, 55, 56, 57, 58, 60, 61, 62, 63, 64, 65, 66, 67, 68,
69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84,
85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99};
std::vector<part_index_type> indices = {
0, 1, 116, 215, 318, 418, 600, 662, 862, 1041, 1176, 1248,
1349, 1488, 1612, 1754, 1877, 1878, 1880, 2028, 2135, 2228, 2328, 2330,
2464, 2526, 2682, 2785, 2911, 3059, 3191, 3192, 3266, 3395, 3516, 3607,
3757, 3758, 3861, 3998, 4100, 4306, 4446, 4618, 4733, 4838, 4958, 5112,
5169, 5277, 5372, 5466, 5653, 5729, 5810, 5811, 5977, 6056, 6057, 6266,
6269, 6337, 6338, 6338, 6437, 6570, 6660, 6727, 6820, 6900, 7004, 7138,
7139, 7220, 7227, 7339, 7414, 7539, 7695, 7781, 8004, 8095, 8161, 8235,
8320, 8389, 8495, 8619, 8769, 8840, 9043, 9088, 9183, 9241, 9293, 9425,
9548, 9625, 9743, 9880, 10000};
auto matrix =
tdbColMajorPartitionedMatrix<feature_type, id_type, part_index_type>(
ctx, partitioned_vectors_uri, indices, ids_uri, relevant_parts, 1000);
while (matrix.load()) {
CHECK(matrix.num_vectors() > 0);
CHECK(matrix.num_partitions() > 0);
CHECK(_cpo::dimensions(matrix) == dimensions);
}
}

0 comments on commit 82cc11e

Please sign in to comment.