Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ml service] Move the duplicated structure and functions of the extension_service and training_offloading_services to ml_service. #573

Merged
merged 2 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 22 additions & 105 deletions c/src/ml-api-service-extension.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,30 +34,6 @@ typedef enum
ML_EXTENSION_TYPE_MAX
} ml_extension_type_e;

/**
* @brief Internal enumeration for the node type in pipeline.
*/
typedef enum
{
ML_EXTENSION_NODE_TYPE_UNKNOWN = 0,
ML_EXTENSION_NODE_TYPE_INPUT = 1,
ML_EXTENSION_NODE_TYPE_OUTPUT = 2,

ML_EXTENSION_NODE_TYPE_MAX
} ml_extension_node_type_e;

/**
* @brief Internal structure of the node info in pipeline.
*/
typedef struct
{
gchar *name;
ml_extension_node_type_e type;
ml_tensors_info_h info;
void *handle;
void *mls;
} ml_extension_node_info_s;

/**
* @brief Internal structure of the message in ml-service extension handle.
*/
Expand Down Expand Up @@ -94,12 +70,12 @@ typedef struct
/**
* @brief Internal function to create node info in pipeline.
*/
static ml_extension_node_info_s *
static ml_service_node_info_s *
_ml_extension_node_info_new (ml_service_s * mls, const gchar * name,
ml_extension_node_type_e type)
ml_service_node_type_e type)
{
ml_extension_s *ext = (ml_extension_s *) mls->priv;
ml_extension_node_info_s *node_info;
ml_service_node_info_s *node_info;

if (!STR_IS_VALID (name)) {
_ml_error_report_return (NULL,
Expand All @@ -111,7 +87,7 @@ _ml_extension_node_info_new (ml_service_s * mls, const gchar * name,
"Cannot add duplicated node '%s' in ml-service pipeline.", name);
}

node_info = g_try_new0 (ml_extension_node_info_s, 1);
node_info = g_try_new0 (ml_service_node_info_s, 1);
if (!node_info) {
_ml_error_report_return (NULL,
"Failed to allocate new memory for node info in ml-service pipeline. Out of memory?");
Expand All @@ -132,7 +108,7 @@ _ml_extension_node_info_new (ml_service_s * mls, const gchar * name,
static void
_ml_extension_node_info_free (gpointer data)
{
ml_extension_node_info_s *node_info = (ml_extension_node_info_s *) data;
ml_service_node_info_s *node_info = (ml_service_node_info_s *) data;

if (!node_info)
return;
Expand All @@ -147,7 +123,7 @@ _ml_extension_node_info_free (gpointer data)
/**
* @brief Internal function to get the node info in ml-service extension.
*/
static ml_extension_node_info_s *
static ml_service_node_info_s *
_ml_extension_node_info_get (ml_extension_s * ext, const gchar * name)
{
if (!STR_IS_VALID (name))
Expand All @@ -156,65 +132,6 @@ _ml_extension_node_info_get (ml_extension_s * ext, const gchar * name)
return g_hash_table_lookup (ext->node_table, name);
}

/**
* @brief Internal function to invoke ml-service event for new data.
*/
static int
_ml_extension_invoke_event_new_data (ml_service_s * mls, const char *name,
const ml_tensors_data_h data)
{
ml_service_event_cb_info_s cb_info = { 0 };
ml_information_h info = NULL;
int status = ML_ERROR_NONE;

if (!mls || !data) {
_ml_error_report_return (ML_ERROR_INVALID_PARAMETER,
"Failed to create ml-service event data, invalid parameter.");
}

_ml_service_get_event_cb_info (mls, &cb_info);

if (cb_info.cb) {
/* Create information handle for ml-service event. */
status = _ml_information_create (&info);
if (status != ML_ERROR_NONE)
goto done;

if (name) {
status = _ml_information_set (info, "name", (void *) name, NULL);
if (status != ML_ERROR_NONE)
goto done;
}

status = _ml_information_set (info, "data", (void *) data, NULL);
if (status == ML_ERROR_NONE)
cb_info.cb (ML_SERVICE_EVENT_NEW_DATA, info, cb_info.pdata);
}

done:
if (info)
ml_information_destroy (info);

if (status != ML_ERROR_NONE) {
_ml_error_report ("Failed to invoke 'new data' event.");
}

return status;
}

/**
* @brief Internal callback for sink node in pipeline description.
*/
static void
_ml_extension_pipeline_sink_cb (const ml_tensors_data_h data,
const ml_tensors_info_h info, void *user_data)
{
ml_extension_node_info_s *node_info = (ml_extension_node_info_s *) user_data;
ml_service_s *mls = (ml_service_s *) node_info->mls;

_ml_extension_invoke_event_new_data (mls, node_info->name, data);
}

/**
* @brief Internal function to release ml-service extension message.
*/
Expand Down Expand Up @@ -263,7 +180,7 @@ _ml_extension_msg_thread (gpointer data)
status = ml_single_invoke (ext->single, msg->input, &msg->output);

if (status == ML_ERROR_NONE) {
_ml_extension_invoke_event_new_data (mls, NULL, msg->output);
_ml_service_invoke_event_new_data (mls, NULL, msg->output);
} else {
_ml_error_report
("Failed to invoke the model in ml-service extension thread.");
Expand All @@ -272,11 +189,11 @@ _ml_extension_msg_thread (gpointer data)
}
case ML_EXTENSION_TYPE_PIPELINE:
{
ml_extension_node_info_s *node_info;
ml_service_node_info_s *node_info;

node_info = _ml_extension_node_info_get (ext, msg->name);

if (node_info && node_info->type == ML_EXTENSION_NODE_TYPE_INPUT) {
if (node_info && node_info->type == ML_SERVICE_NODE_TYPE_INPUT) {
/* The input data will be released in the pipeline. */
status = ml_pipeline_src_input_data (node_info->handle, msg->input,
ML_PIPELINE_BUF_POLICY_AUTO_FREE);
Expand Down Expand Up @@ -435,7 +352,7 @@ _ml_extension_conf_parse_single (ml_service_s * mls, JsonObject * single)
*/
static int
_ml_extension_conf_parse_pipeline_node (ml_service_s * mls, JsonNode * node,
ml_extension_node_type_e type)
ml_service_node_type_e type)
{
ml_extension_s *ext = (ml_extension_s *) mls->priv;
JsonArray *array = NULL;
Expand All @@ -451,7 +368,7 @@ _ml_extension_conf_parse_pipeline_node (ml_service_s * mls, JsonNode * node,

for (i = 0; i < n; i++) {
const gchar *name = NULL;
ml_extension_node_info_s *node_info;
ml_service_node_info_s *node_info;

if (array)
object = json_array_get_object_element (array, i);
Expand Down Expand Up @@ -481,13 +398,13 @@ _ml_extension_conf_parse_pipeline_node (ml_service_s * mls, JsonNode * node,
}

switch (type) {
case ML_EXTENSION_NODE_TYPE_INPUT:
case ML_SERVICE_NODE_TYPE_INPUT:
status = ml_pipeline_src_get_handle (ext->pipeline, name,
&node_info->handle);
break;
case ML_EXTENSION_NODE_TYPE_OUTPUT:
case ML_SERVICE_NODE_TYPE_OUTPUT:
status = ml_pipeline_sink_register (ext->pipeline, name,
_ml_extension_pipeline_sink_cb, node_info, &node_info->handle);
_ml_service_pipeline_sink_cb, node_info, &node_info->handle);
break;
default:
status = ML_ERROR_INVALID_PARAMETER;
Expand Down Expand Up @@ -545,7 +462,7 @@ _ml_extension_conf_parse_pipeline (ml_service_s * mls, JsonObject * pipe)
JsonNode *node = json_object_get_member (pipe, "input_node");

status = _ml_extension_conf_parse_pipeline_node (mls, node,
ML_EXTENSION_NODE_TYPE_INPUT);
ML_SERVICE_NODE_TYPE_INPUT);
if (status != ML_ERROR_NONE) {
_ml_error_report_return (status,
"Failed to parse configuration file, cannot get the input node.");
Expand All @@ -559,7 +476,7 @@ _ml_extension_conf_parse_pipeline (ml_service_s * mls, JsonObject * pipe)
JsonNode *node = json_object_get_member (pipe, "output_node");

status = _ml_extension_conf_parse_pipeline_node (mls, node,
ML_EXTENSION_NODE_TYPE_OUTPUT);
ML_SERVICE_NODE_TYPE_OUTPUT);
if (status != ML_ERROR_NONE) {
_ml_error_report_return (status,
"Failed to parse configuration file, cannot get the output node.");
Expand Down Expand Up @@ -766,11 +683,11 @@ _ml_service_extension_get_input_information (ml_service_s * mls,
break;
case ML_EXTENSION_TYPE_PIPELINE:
{
ml_extension_node_info_s *node_info;
ml_service_node_info_s *node_info;

node_info = _ml_extension_node_info_get (ext, name);

if (node_info && node_info->type == ML_EXTENSION_NODE_TYPE_INPUT) {
if (node_info && node_info->type == ML_SERVICE_NODE_TYPE_INPUT) {
status = _ml_tensors_info_create_from (node_info->info, info);
} else {
status = ML_ERROR_INVALID_PARAMETER;
Expand Down Expand Up @@ -801,11 +718,11 @@ _ml_service_extension_get_output_information (ml_service_s * mls,
break;
case ML_EXTENSION_TYPE_PIPELINE:
{
ml_extension_node_info_s *node_info;
ml_service_node_info_s *node_info;

node_info = _ml_extension_node_info_get (ext, name);

if (node_info && node_info->type == ML_EXTENSION_NODE_TYPE_OUTPUT) {
if (node_info && node_info->type == ML_SERVICE_NODE_TYPE_OUTPUT) {
status = _ml_tensors_info_create_from (node_info->info, info);
} else {
status = ML_ERROR_INVALID_PARAMETER;
Expand Down Expand Up @@ -859,7 +776,7 @@ _ml_service_extension_request (ml_service_s * mls, const char *name,
int status, len;

if (ext->type == ML_EXTENSION_TYPE_PIPELINE) {
ml_extension_node_info_s *node_info;
ml_service_node_info_s *node_info;

if (!STR_IS_VALID (name)) {
_ml_error_report_return (ML_ERROR_INVALID_PARAMETER,
Expand All @@ -868,7 +785,7 @@ _ml_service_extension_request (ml_service_s * mls, const char *name,

node_info = _ml_extension_node_info_get (ext, name);

if (!node_info || node_info->type != ML_EXTENSION_NODE_TYPE_INPUT) {
if (!node_info || node_info->type != ML_SERVICE_NODE_TYPE_INPUT) {
_ml_error_report_return (ML_ERROR_INVALID_PARAMETER,
"The parameter, name '%s', is invalid, cannot find the input node from pipeline.",
name);
Expand Down
36 changes: 36 additions & 0 deletions c/src/ml-api-service-private.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,31 @@ typedef enum
ML_SERVICE_TYPE_MAX
} ml_service_type_e;

/**
* @brief Enumeration for the node type in pipeline.
*/
typedef enum
{
ML_SERVICE_NODE_TYPE_UNKNOWN = 0,
ML_SERVICE_NODE_TYPE_INPUT = 1,
ML_SERVICE_NODE_TYPE_OUTPUT = 2,
ML_SERVICE_NODE_TYPE_TRAINING = 3,

ML_EXTENSION_NODE_TYPE_MAX
} ml_service_node_type_e;

/**
* @brief Structure of the node info in pipeline.
*/
typedef struct
{
gchar *name;
ml_service_node_type_e type;
ml_tensors_info_h info;
void *handle;
void *mls;
} ml_service_node_info_s;

/**
* @brief Structure for ml-service event callback.
*/
Expand Down Expand Up @@ -125,6 +150,17 @@ int _ml_service_query_release_internal (ml_service_s *mls);
*/
const gchar * _ml_service_get_json_string_member (JsonObject *object, const gchar *member_name);

/**
* @brief Generating an ML service event and passing received data and event to a registered callback function.
*/
int _ml_service_invoke_event_new_data (ml_service_s * mls, const char *name, const ml_tensors_data_h data);

/**
* @brief Callback for sink node in pipeline description.
songgot marked this conversation as resolved.
Show resolved Hide resolved
* Processes incoming data from pipeline sink element and forwards it to _ml_service_invoke_event_new_data().
*/
void _ml_service_pipeline_sink_cb (const ml_tensors_data_h data, const ml_tensors_info_h info, void *user_data);

#ifdef __cplusplus
}
#endif /* __cplusplus */
Expand Down
Loading
Loading