Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix train/predict bugs in PairwiseANN #271

Merged
merged 1 commit into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 37 additions & 38 deletions pecos/ann/pairwise/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,23 +48,24 @@ class PredParams(pecos.BaseParams):
"""Prediction Parameters of PairwiseANN class

Attributes:
topk (int): maximum number of candidates (sorted by distances, nearest first) return by the searcher per query
batch_size (int): maximum number of (input, label) pairs te be inference on for the Searchers
only_topk (int): maximum number of candidates (sorted by distances, nearest first) return by kNN
"""

topk: int = 10
batch_size: int = 1024
only_topk: int = 10

class Searchers(object):
def __init__(self, model, max_batch_size=256, max_only_topk=10, num_searcher=1):
def __init__(self, model, pred_params, num_searcher=1):
self.searchers_ptr = model.fn_dict["searchers_create"](
model.model_ptr,
num_searcher,
)
self.destruct_fn = model.fn_dict["searchers_destruct"]

# searchers also hold the memory of returned np.ndarray
self.max_batch_size = max_batch_size
self.max_only_topk = max_only_topk
max_nnz = max_batch_size * max_only_topk
self.pred_params = pred_params
max_nnz = pred_params.batch_size * pred_params.only_topk
self.Imat = np.zeros(max_nnz, dtype=np.uint32)
self.Mmat = np.zeros(max_nnz, dtype=np.uint32)
self.Dmat = np.zeros(max_nnz, dtype=np.float32)
Expand Down Expand Up @@ -214,11 +215,18 @@ def save(self, model_folder):
c_model_dir = f"{model_folder}/c_model"
self.fn_dict["save"](self.model_ptr, c_char_p(c_model_dir.encode("utf-8")))

def searchers_create(self, max_batch_size=256, max_only_topk=10, num_searcher=1):
def get_pred_params(self):
"""Return a deep copy of prediction parameters

Returns:
copied_pred_params (dict): Prediction parameters.
"""
return copy.deepcopy(self.pred_params)

def searchers_create(self, pred_params=None, num_searcher=1):
"""create searchers that pre-allocate intermediate variables (e.g., topk_queue)
Args:
max_batch_size (int): the maximum batch size for the input/label pairs to be inference
max_only_topk (int): the maximum only topk for the kNN to return
pred_params (Pairwise.PredParams, optional): instance of pecos.ann.pairwise.Pairwise.PredParams
num_searcher: number of searcher for multi-thread inference
Returns:
PairwiseANN.Searchers: the pre-allocated PairwiseANN.Searchers (class object)
Expand All @@ -227,31 +235,25 @@ def searchers_create(self, max_batch_size=256, max_only_topk=10, num_searcher=1)
raise ValueError("self.model_ptr must exist before using searchers_create()")
if num_searcher <= 0:
raise ValueError("num_searcher={} <= 0 is NOT valid".format(num_searcher))
return PairwiseANN.Searchers(self, max_batch_size, max_only_topk, num_searcher)

def get_pred_params(self):
"""Return a deep copy of prediction parameters

Returns:
copied_pred_params (dict): Prediction parameters.
"""
return copy.deepcopy(self.pred_params)
pred_params = self.get_pred_params() if pred_params is None else pred_params
return PairwiseANN.Searchers(self, pred_params, num_searcher)

def predict(self, input_feat, label_keys, searchers, pred_params=None, is_same_input=False):
def predict(self, input_feat, label_keys, searchers, is_same_input=False):
"""predict with multi-thread. The searchers are required to be provided.
Args:
input_feat (numpy.array or smat.csr_matrix): input feature matrix (first key) to find kNN.
if is_same_input == False, the shape should be (batch_size, feat_dim)
if is_same_input == True, the shape should be (1, feat_dim)
label_keys (numpy.array): the label keys (second key) to find kNN. The shape should be (batch_size, ).
searchers (c_void_p): pointer to C/C++ vector<pecos::ann::PairwiseANN:Searcher>. Created by PairwiseANN.searchers_create().
pred_params (Pairwise.PredParams, optional): instance of pecos.ann.pairwise.Pairwise.PredParams.
if is_same_input == False, the shape should be (batch_size, feat_dim).
if is_same_input == True, the shape should be (1, feat_dim).
label_keys (numpy.array): the label keys (second key) to find kNN.
The shape should be (batch_size, ).
searchers (c_void_p): pointer to C/C++ vector<pecos::ann::PairwiseANN:Searcher>.
Created by PairwiseANN.searchers_create().
is_same_input (bool): whether to use the same first row of X to do prediction.
For real-time inference with same input query, set is_same_input = True.
For batch prediction with varying input querues, set is_same_input = False.
Returns:
Imat (np.array): returned kNN input key indices. Shape of (batch_size, topk)
Mmat (np.array): returned kNN masking array. 1/0 mean value is or is not presented. Shape of (batch_size, topk)
Mmat (np.array): returned kNN masking array. 1/0 mean value IS/ISNOT presented. Shape of (batch_size, topk)
Dmat (np.array): returned kNN distance array. Shape of (batch_size, topk)
Vmat (np.array): returned kNN value array. Shape of (batch_size, topk)
"""
Expand All @@ -273,19 +275,16 @@ def predict(self, input_feat, label_keys, searchers, pred_params=None, is_same_i
if not is_same_input and input_feat_py.rows != label_keys.shape[0]:
raise ValueError(f"input_feat_py.rows != label_keys.shape[0]")

batch_size = label_keys.shape[0]
pred_params = self.get_pred_params() if pred_params is None else pred_params
only_topk = pred_params.topk
cur_nnz = batch_size * only_topk
if batch_size > searchers.max_batch_size:
raise ValueError(f"cur_batch_size > searchers.max_batch_size")
if only_topk > searchers.max_only_topk:
raise ValueError(f"cur_only_topk > searchers.max_only_topk")
cur_bsz = label_keys.shape[0]
if cur_bsz > searchers.pred_params.batch_size:
raise ValueError(f"cur_batch_size > searchers.batch_size!")
only_topk = searchers.pred_params.only_topk
cur_nnz = cur_bsz * only_topk

searchers.reset(cur_nnz)
self.fn_dict["predict"](
searchers.ctypes(),
batch_size,
cur_bsz,
only_topk,
input_feat_py,
label_keys.ctypes.data_as(POINTER(c_uint32)),
Expand All @@ -295,8 +294,8 @@ def predict(self, input_feat, label_keys, searchers, pred_params=None, is_same_i
searchers.Vmat.ctypes.data_as(POINTER(c_float)),
c_bool(is_same_input),
)
Imat = searchers.Imat[:cur_nnz].reshape(batch_size, only_topk)
Mmat = searchers.Mmat[:cur_nnz].reshape(batch_size, only_topk)
Dmat = searchers.Dmat[:cur_nnz].reshape(batch_size, only_topk)
Vmat = searchers.Vmat[:cur_nnz].reshape(batch_size, only_topk)
Imat = searchers.Imat[:cur_nnz].reshape(cur_bsz, only_topk)
Mmat = searchers.Mmat[:cur_nnz].reshape(cur_bsz, only_topk)
Dmat = searchers.Dmat[:cur_nnz].reshape(cur_bsz, only_topk)
Vmat = searchers.Vmat[:cur_nnz].reshape(cur_bsz, only_topk)
return Imat, Mmat, Dmat, Vmat
35 changes: 22 additions & 13 deletions pecos/core/ann/pairwise.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ namespace ann {
mmap_s.fput_one<index_type>(X.cols);
mmap_s.fput_one<mem_index_type>(nnz);
mmap_s.fput_multiple<value_type>(X.val, nnz);
}
}

template<class MAT_T>
void load_mat(MAT_T &X, mmap_util::MmapStore& mmap_s) {
Expand All @@ -99,7 +99,7 @@ namespace ann {
X.cols = mmap_s.fget_one<index_type>();
auto nnz = mmap_s.fget_one<mem_index_type>();
X.val = mmap_s.fget_multiple<value_type>(nnz);
}
}

template <typename T1, typename T2>
struct KeyValPair {
Expand All @@ -111,16 +111,16 @@ namespace ann {
bool operator<(const KeyValPair<T1, T2>& other) const { return input_key_dist < other.input_key_dist; }
bool operator>(const KeyValPair<T1, T2>& other) const { return input_key_dist > other.input_key_dist; }
};
// PairwiseANN Interface

// PairwiseANN Interface
template<class FeatVec_T, class MAT_T>
struct PairwiseANN {
typedef FeatVec_T feat_vec_t;
typedef MAT_T mat_t;
typedef pecos::ann::KeyValPair<index_type, value_type> pair_t;
typedef pecos::ann::heap_t<pair_t, std::less<pair_t>> max_heap_t;

struct Searcher {
struct Searcher {
typedef PairwiseANN<feat_vec_t, mat_t> pairwise_ann_t;

const pairwise_ann_t* pairwise_ann;
Expand All @@ -132,8 +132,8 @@ namespace ann {

max_heap_t& predict_single(const feat_vec_t& query_vec, const index_type label_key, index_type topk) {
return pairwise_ann->predict_single(query_vec, label_key, topk, *this);
}
};
}
};

Searcher create_searcher() const {
return Searcher(this);
Expand All @@ -143,7 +143,7 @@ namespace ann {
index_type num_input_keys; // N
index_type num_label_keys; // L
index_type feat_dim; // d

// matrices
pecos::csc_t Y_csc; // shape of [N, L]
mat_t X_trn; // shape of [N, d]
Expand All @@ -152,7 +152,14 @@ namespace ann {
pecos::mmap_util::MmapStore mmap_store;

// destructor
~PairwiseANN() {}
~PairwiseANN() {
// If mmap_store is not open for read, then the memory of Y/X is owned by this class
// Thus, we need to explicitly free the underlying memory of Y/X during destructor
if (!mmap_store.is_open_for_read()) {
this->Y_csc.free_underlying_memory();
this->X_trn.free_underlying_memory();
}
}

static nlohmann::json load_config(const std::string& filepath) {
std::ifstream loadfile(filepath);
Expand Down Expand Up @@ -215,7 +222,7 @@ namespace ann {
save_mat(X_trn, mmap_s);
mmap_s.close();
}

void load(const std::string& model_dir, bool lazy_load = false) {
auto config = load_config(model_dir + "/config.json");
std::string version = config.find("version") != config.end() ? config["version"] : "not found";
Expand Down Expand Up @@ -248,9 +255,11 @@ namespace ann {
this->num_input_keys = Y_csc.rows;
this->num_label_keys = Y_csc.cols;
this->feat_dim = X_trn.cols;
// matrices
this->Y_csc = Y_csc;
this->X_trn = X_trn;
// Deepcopy the memory of X/Y.
// Otherwise, after Python API of PairwiseANN.train(),
// the input matrices pX/pY can be modified or deleted.
this->Y_csc = Y_csc.deep_copy();
this->X_trn = X_trn.deep_copy();
}

max_heap_t& predict_single(
Expand Down
6 changes: 3 additions & 3 deletions pecos/core/libpecos.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -543,7 +543,7 @@ extern "C" {
void c_pairwise_ann_predict ## SUFFIX( \
void* searchers_ptr, \
uint32_t batch_size, \
uint32_t topk, \
uint32_t only_topk, \
const PY_MAT* pQ, \
uint32_t* label_keys, \
uint32_t* ret_Imat, \
Expand All @@ -559,9 +559,9 @@ extern "C" {
int tid = omp_get_thread_num(); \
auto input_key = (is_same_input ? 0 : bidx); \
auto label_key = label_keys[bidx]; \
auto& ret_pairs = searchers[tid].predict_single(Q_tst.get_row(input_key), label_key, topk); \
auto& ret_pairs = searchers[tid].predict_single(Q_tst.get_row(input_key), label_key, only_topk); \
for (uint32_t k=0; k < ret_pairs.size(); k++) { \
uint64_t offset = static_cast<uint64_t>(bidx) * topk; \
uint64_t offset = static_cast<uint64_t>(bidx) * only_topk; \
ret_Imat[offset + k] = ret_pairs[k].input_key_idx; \
ret_Dmat[offset + k] = ret_pairs[k].input_key_dist; \
ret_Vmat[offset + k] = ret_pairs[k].input_label_val; \
Expand Down
30 changes: 29 additions & 1 deletion pecos/core/utils/matrix.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ namespace pecos {
if(touched_indices[i] < len) {
touched_indices[write_pos] = touched_indices[i];
write_pos += 1;
}
}
}
nr_touch = write_pos;
}
Expand Down Expand Up @@ -540,6 +540,34 @@ namespace pecos {
mem_index_type get_nnz() const {
return static_cast<mem_index_type>(rows) * static_cast<mem_index_type>(cols);
}

// Frees the underlying memory of the matrix (i.e., col_ptr, row_idx, and val arrays)
// Every function in the inference code that returns a matrix has allocated memory, and
// therefore one should call this function to free that memory.
void free_underlying_memory() {
if (val) {
delete[] val;
val = nullptr;
}
}

// Creates a deep copy of this matrix
// This allocates memory, so one should call free_underlying_memory on the copy when
// one is finished using it.
drm_t deep_copy() const {
mem_index_type nnz = get_nnz();
drm_t res;
res.allocate(rows, cols, nnz);
std::memcpy(res.val, val, sizeof(value_type) * nnz);
return res;
}

void allocate(index_type rows, index_type cols, mem_index_type nnz) {
this->rows = rows;
this->cols = cols;
val = new value_type[nnz];
}

};

struct dcm_t { // Dense Column Majored Matrix
Expand Down
Loading
Loading