Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Memory-mapped MLModel #275

Merged
merged 2 commits into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
259 changes: 259 additions & 0 deletions pecos/core/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,7 @@ def __init__(self, dirname, soname, forced_rebuild=False):
self.clib_float32 = corelib.load_dynamic_library(
dirname, soname + "_float32", forced_rebuild=forced_rebuild
)
self.link_mlmodel_methods()
self.link_xlinear_methods()
self.link_sparse_operations()
self.link_clustering()
Expand All @@ -537,6 +538,264 @@ def __init__(self, dirname, soname, forced_rebuild=False):
self.link_mmap_valstore_methods()
self.link_calibrator_methods()

def link_mlmodel_methods(self):
"""
Specify C-lib's MLModel methods argument and return type.
"""
# compile mmap model
arg_list = [c_char_p, c_char_p]
corelib.fillprototype(self.clib_float32.c_mlmodel_compile_mmap_model, None, arg_list)
# load mmap model
res_list = c_void_p
arg_list = [c_char_p, c_bool]
corelib.fillprototype(self.clib_float32.c_mlmodel_load_mmap_model, res_list, arg_list)
# destruct mmap model
arg_list = [c_void_p]
corelib.fillprototype(self.clib_float32.c_mlmodel_destruct_model, None, arg_list)
# get in attr (nr_labels, nr_codes, nr_features)
res_list = c_uint32
arg_list = [c_void_p, c_char_p]
corelib.fillprototype(self.clib_float32.c_mlmodel_get_int_attr, res_list, arg_list)

# Interface of sparse prediction
arg_list = [
c_void_p,
POINTER(ScipyCsrF32),
POINTER(ScipyCsrF32),
c_char_p,
c_uint32,
c_int,
ScipyCompressedSparseAllocator.CFUNCTYPE,
]
corelib.fillprototype(self.clib_float32.c_mlmodel_predict_csr_f32, None, arg_list)
# Interface of dense prediction
arg_list = [
c_void_p,
POINTER(ScipyDrmF32),
POINTER(ScipyCsrF32),
c_char_p,
c_uint32,
c_int,
ScipyCompressedSparseAllocator.CFUNCTYPE,
]
corelib.fillprototype(self.clib_float32.c_mlmodel_predict_drm_f32, None, arg_list)

# Interface of sparse prediction for selected outputs
arg_list = [
c_void_p,
POINTER(ScipyCsrF32),
POINTER(ScipyCsrF32),
POINTER(ScipyCsrF32),
c_char_p,
c_int,
ScipyCompressedSparseAllocator.CFUNCTYPE,
]
corelib.fillprototype(
self.clib_float32.c_mlmodel_predict_on_selected_outputs_csr_f32, None, arg_list
)
# Interface of dense prediction for selected outputs
arg_list = [
c_void_p,
POINTER(ScipyDrmF32),
POINTER(ScipyCsrF32),
POINTER(ScipyCsrF32),
c_char_p,
c_int,
ScipyCompressedSparseAllocator.CFUNCTYPE,
]
corelib.fillprototype(
self.clib_float32.c_mlmodel_predict_on_selected_outputs_drm_f32, None, arg_list
)

def mlmodel_compile_mmap_model(self, npz_folder, mmap_folder):
"""
Compile MLModel from npz format to memory-mapped format
for faster loading.
Args:
npz_folder (str): The source folder path for mlmodel npz model.
mmap_folder (str): The destination folder path for mlmodel mmap model.
"""
self.clib_float32.c_mlmodel_compile_mmap_model(
c_char_p(npz_folder.encode("utf-8")), c_char_p(mmap_folder.encode("utf-8"))
)

def mlmodel_load_mmap(self, folder, lazy_load=False):
"""
Load MLModel in read-only mmap mode for prediction.

Args:
folder (str): The mmap folder path for mlmodel.
lazy_load (bool): Whether to lazy-load, i.e. load when needed(True)
or fully load model before returning(False).

Return:
cmodel (ptr): The pointer to mlmodel.
"""
cmodel = self.clib_float32.c_mlmodel_load_mmap_model(
c_char_p(folder.encode("utf-8")), c_bool(lazy_load)
)
return cmodel

def mlmodel_destruct_model(self, c_model):
"""
Destruct mlmodel.

Args:
cmodel (ptr): The pointer to xlinear model.
"""
self.clib_float32.mlmodel_destruct_model(c_model)

def mlmodel_get_int_attr(self, c_model, attr):
"""
Get int attribute from C mlmodel.

Args:
c_model (ptr): The C mlmodel pointer.
attr (str): The attribute name to get.

Return:
int_attr (int): The int attribute under given name.
"""
assert attr in {
"nr_labels",
"nr_codes",
"nr_features",
}, f"attr {attr} not implemented"
return self.clib_float32.c_mlmodel_get_int_attr(c_model, c_char_p(attr.encode("utf-8")))

def mlmodel_predict(
self,
c_model,
X,
csr_codes,
overriden_post_processor_str,
overriden_only_topk,
threads,
pred_alloc,
):
"""
Performs a full prediction using the given model and queries.

Args:
c_model (c_pointer): A C pointer to the model to use for prediction.
This pointer is returned by the c_mlmodel_load_mmap_model in corelib.clib_float32.
X: The query matrix (admissible formats are smat.csr_matrix,
np.ndarray, ScipyCsrF32, or ScipyDrmF32). Note that if this is smat.csr_matrix,
the matrix must have sorted indices. You can call sort_indices() to ensure this.
csr_codes (smat.csr_matrix or ScipyCsrF32): The prediction for the previous layer.
None if this is the first layer.
overriden_post_processor_str (string): Overrides the post processor to use by name. Use
None for model defaults.
overriden_only_topk (uint): Overrides the number of results to return for each query. Use
None for model defaults.
threads (int): Sets the number of threads to use in computation. Use
-1 to use the maximum amount of available threads.
pred_alloc (ScipyCompressedSparseAllocator): The allocator to store the result in.
"""
clib = self.clib_float32

if isinstance(X, smat.csr_matrix):
if not X.has_sorted_indices:
raise ValueError("Query matrix does not have sorted indices!")
X = ScipyCsrF32.init_from(X)
elif isinstance(X, np.ndarray):
X = ScipyDrmF32.init_from(X)

if isinstance(X, ScipyCsrF32):
c_predict = clib.c_mlmodel_predict_csr_f32
elif isinstance(X, ScipyDrmF32):
c_predict = clib.c_mlmodel_predict_drm_f32
else:
raise NotImplementedError("type(X) = {} not implemented".format(type(X)))

if csr_codes is not None:
# Check that the csr_code is of valid shape
nr_codes = clib.c_mlmodel_get_int_attr(c_model, c_char_p("nr_codes".encode("utf-8")))
if csr_codes.shape[0] != X.shape[0]:
raise ValueError("Instance dimension of query and csr_codes matrix do not match")
if csr_codes.shape[1] != nr_codes:
raise ValueError("Label dimension of csr_codes and C matrix do not match")
csr_codes = ScipyCsrF32.init_from(csr_codes)

c_predict(
c_model,
byref(X),
byref(csr_codes) if csr_codes is not None else None,
overriden_post_processor_str.encode("utf-8") if overriden_post_processor_str else None,
overriden_only_topk if overriden_only_topk else 0,
threads,
pred_alloc.cfunc,
)

def mlmodel_predict_on_selected_outputs(
self,
c_model,
X,
selected_outputs_csr,
csr_codes,
overriden_post_processor_str,
threads,
pred_alloc,
):
"""
Performs a select prediction using the given model and queries.

Args:
c_model (c_pointer): A C pointer to the model to use for prediction.
This pointer is returned by the c_mlmodel_load_mmap_model in corelib.clib_float32.
X: The query matrix (admissible formats are smat.csr_matrix,
np.ndarray, ScipyCsrF32, or ScipyDrmF32). Note that if this is smat.csr_matrix,
the matrix must have sorted indices. You can call sort_indices() to ensure this.
selected_outputs_csr (csr_matrix): the selected outputs to predict
csr_codes (smat.csr_matrix or ScipyCsrF32): The prediction for the previous layer.
None if this is the first layer.
overriden_post_processor_str (string): Overrides the post processor to use by name. Use
None for model defaults.
threads (int): Sets the number of threads to use in computation. Use
-1 to use the maximum amount of available threads.
pred_alloc (ScipyCompressedSparseAllocator): The allocator to store the result in.
"""
clib = self.clib_float32

if isinstance(X, smat.csr_matrix):
if not X.has_sorted_indices:
raise ValueError("Query matrix does not have sorted indices!")
X = ScipyCsrF32.init_from(X)
elif isinstance(X, np.ndarray):
X = ScipyDrmF32.init_from(X)

if not isinstance(selected_outputs_csr, smat.csr_matrix):
raise ValueError(
"type(selected_outputs_csr) = {} not implemented".format(type(selected_outputs_csr))
)
selected_outputs_csr = ScipyCsrF32.init_from(selected_outputs_csr)

if isinstance(X, ScipyCsrF32):
c_predict = clib.c_mlmodel_predict_on_selected_outputs_csr_f32
elif isinstance(X, ScipyDrmF32):
c_predict = clib.c_mlmodel_predict_on_selected_outputs_drm_f32
else:
raise NotImplementedError("type(X) = {} not implemented".format(type(X)))

if csr_codes is not None:
# Check that the csr_code is of valid shape
nr_codes = clib.c_mlmodel_get_int_attr(c_model, c_char_p("nr_codes".encode("utf-8")))
if csr_codes.shape[0] != X.shape[0]:
raise ValueError("Instance dimension of query and csr_codes matrix do not match")
if csr_codes.shape[1] != nr_codes:
raise ValueError("Label dimension of csr_codes and C matrix do not match")
csr_codes = ScipyCsrF32.init_from(csr_codes)

c_predict(
c_model,
byref(X),
byref(selected_outputs_csr),
byref(csr_codes) if csr_codes is not None else None,
overriden_post_processor_str.encode("utf-8") if overriden_post_processor_str else None,
threads,
pred_alloc.cfunc,
)

def link_xlinear_methods(self):
"""
Specify C-lib's Xlinear methods argument and return type.
Expand Down
88 changes: 87 additions & 1 deletion pecos/core/libpecos.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,92 @@
// C Interface of Types/Structures can be found in utils/matrix.hpp

extern "C" {
// ==== C Interface of XMC Models ====
// ==== C Interface of MLModels ====
// Only implemented for w_matrix_t = pecos::csc_t
typedef pecos::csc_t MLMODEL_MAT_T;
void c_mlmodel_compile_mmap_model(const char* model_path, const char* mmap_model_path) {
auto model = new pecos::MLModel<MLMODEL_MAT_T>(model_path, 0);
model->save_mmap(mmap_model_path);
delete model;
}
void* c_mlmodel_load_mmap_model(const char* model_path, const bool lazy_load) {
auto mlm = new pecos::MLModel<MLMODEL_MAT_T>(model_path, 0, lazy_load);
return static_cast<void*>(mlm);
}
void c_mlmodel_destruct_model(void* ptr) {
pecos::MLModel<MLMODEL_MAT_T>* mlm = static_cast<pecos::MLModel<MLMODEL_MAT_T>*>(ptr);
delete mlm;
}
// Allowed attr: nr_labels, nr_codes, nr_features
uint32_t c_mlmodel_get_int_attr(void* ptr, const char* attr) {
pecos::MLModel<MLMODEL_MAT_T>* mlm = static_cast<pecos::MLModel<MLMODEL_MAT_T>*>(ptr);
return mlm->get_int_attr(attr);
}

#define C_MLMODEL_PREDICT(SUFFIX, PY_MAT, C_MAT) \
void c_mlmodel_predict ## SUFFIX( \
void* ptr, \
const PY_MAT* input_x, \
const ScipyCsrF32* csr_codes, \
const char* overridden_post_processor, \
const uint32_t overridden_only_topk, \
const int num_threads, \
py_sparse_allocator_t pred_alloc) { \
pecos::MLModel<MLMODEL_MAT_T>* mlm = static_cast<pecos::MLModel<MLMODEL_MAT_T>*>(ptr); \
C_MAT X(input_x); \
pecos::csr_t prev_layer_pred; \
bool no_prev_pred; \
if (csr_codes) { \
prev_layer_pred = pecos::csr_t(csr_codes).deep_copy(); \
no_prev_pred = false; \
} else { \
prev_layer_pred.fill_ones(X.rows, mlm->code_count()); \
no_prev_pred = true; \
} \
pecos::csr_t cur_layer_pred; \
mlm->predict(X, prev_layer_pred, no_prev_pred, \
overridden_only_topk, overridden_post_processor, \
cur_layer_pred, num_threads); \
cur_layer_pred.create_pycsr(pred_alloc); \
cur_layer_pred.free_underlying_memory(); \
prev_layer_pred.free_underlying_memory(); \
}
C_MLMODEL_PREDICT(_csr_f32, ScipyCsrF32, pecos::csr_t)
C_MLMODEL_PREDICT(_drm_f32, ScipyDrmF32, pecos::drm_t)

#define C_MLMODEL_PREDICT_ON_SELECTED_OUTPUTS(SUFFIX, PY_MAT, C_MAT) \
void c_mlmodel_predict_on_selected_outputs ## SUFFIX( \
void* ptr, \
const PY_MAT* input_x, \
const ScipyCsrF32* selected_outputs_csr, \
const ScipyCsrF32* csr_codes, \
const char* overridden_post_processor, \
const int num_threads, \
py_sparse_allocator_t pred_alloc) { \
pecos::MLModel<MLMODEL_MAT_T>* mlm = static_cast<pecos::MLModel<MLMODEL_MAT_T>*>(ptr); \
C_MAT X(input_x); \
pecos::csr_t curr_outputs_csr = pecos::csr_t(selected_outputs_csr).deep_copy(); \
pecos::csr_t prev_layer_pred; \
bool no_prev_pred; \
if (csr_codes) { \
prev_layer_pred = pecos::csr_t(csr_codes).deep_copy(); \
no_prev_pred = false; \
} else { \
prev_layer_pred.fill_ones(X.rows, mlm->code_count()); \
no_prev_pred = true; \
} \
pecos::csr_t cur_layer_pred; \
mlm->predict_on_selected_outputs(X, curr_outputs_csr, prev_layer_pred, no_prev_pred, \
overridden_post_processor, cur_layer_pred, num_threads); \
cur_layer_pred.create_pycsr(pred_alloc); \
cur_layer_pred.free_underlying_memory(); \
curr_outputs_csr.free_underlying_memory(); \
prev_layer_pred.free_underlying_memory(); \
}
C_MLMODEL_PREDICT_ON_SELECTED_OUTPUTS(_csr_f32, ScipyCsrF32, pecos::csr_t)
C_MLMODEL_PREDICT_ON_SELECTED_OUTPUTS(_drm_f32, ScipyDrmF32, pecos::drm_t)

// ==== C Interface of XLinearModels ====
void* c_xlinear_load_model_from_disk(const char* model_path) {
auto model = new pecos::HierarchicalMLModel(model_path);
return static_cast<void*>(model);
Expand All @@ -49,6 +134,7 @@ extern "C" {
// Only implemented for bin_search_chunked
auto model = new pecos::HierarchicalMLModel(model_path, pecos::layer_type_t::LAYER_TYPE_BINARY_SEARCH_CHUNKED);
model->save_mmap(mmap_model_path);
delete model;
}

void c_xlinear_destruct_model(void* ptr) {
Expand Down
Loading
Loading