From 064e140115ce612a8298d5228ee8d610a48817d7 Mon Sep 17 00:00:00 2001 From: Luuk Paulussen Date: Thu, 28 Sep 2017 11:03:26 +1300 Subject: [PATCH 1/5] Encapsulate client global data in a structure. This is preparatory to cleaning up correctly after a fork, and not requiring apteryx_init to be called. * Some unreachable code in apteryx_dump is removed. (prior ASSERT performs this check) * apteryx_shutdown_force is changed to just keep calling apteryx_shutdown until it returns false, rather than relying on refcount. This means that technically it will be called once more than necessary. Signed-off-by: Luuk Paulussen --- apteryx.c | 309 +++++++++++++++++++++++++++++++++++------------------- 1 file changed, 203 insertions(+), 106 deletions(-) diff --git a/apteryx.c b/apteryx.c index 620250c..64c9036 100644 --- a/apteryx.c +++ b/apteryx.c @@ -37,15 +37,8 @@ /* Configuration */ bool apteryx_debug = false; /* Debug enabled */ static const char *default_url = APTERYX_SERVER; /* Default path to Apteryx database */ -static int ref_count = 0; /* Library reference count */ static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; /* Protect globals */ -static rpc_instance rpc = NULL; /* RPC Service */ -static bool bound = false; /* Do we have a listen socket open */ -static bool have_callbacks = false; /* Have we ever registered any callbacks */ -static pthread_mutex_t pending_watches_lock = PTHREAD_MUTEX_INITIALIZER; -static pthread_cond_t no_pending_watches = PTHREAD_COND_INITIALIZER; -static int pending_watch_count = 0; /* Callback */ typedef struct _cb_t @@ -56,8 +49,42 @@ typedef struct _cb_t void *fn; void *data; } cb_t; -static uint64_t next_ref = 0; -static GList *cb_list = NULL; + +/* Apteryx client data */ +struct apteryx_client +{ + int ref_count; /* Client reference count */ + rpc_instance rpc; /* RPC Service */ + bool bound; /* Do we have a listen socket open */ + bool have_callbacks; /* Have we ever registered any callbacks */ + + pthread_mutex_t pending_watches_lock; + pthread_cond_t no_pending_watches; + int pending_watch_count; + + uint64_t next_ref; + GList *cb_list; +}; + +static struct apteryx_client *_client_data = NULL; + +/** + * Sets the client data ptr. + */ +static void +_apteryx_client_set_ptr (struct apteryx_client *client_data) +{ + _client_data = client_data; +} + +/** + * Returns the client data ptr. Won't try to create a client if it doesn't exist. + */ +static struct apteryx_client * +_apteryx_client_get_ptr (void) +{ + return _client_data; +} static void * call_callback (uint64_t ref, const char *path, const char *value) @@ -67,9 +94,14 @@ call_callback (uint64_t ref, const char *path, const char *value) void *fn = NULL; void *data = NULL; bool val = false; + struct apteryx_client *client_data; pthread_mutex_lock (&lock); - for (iter = g_list_first (cb_list); iter; iter = g_list_next (iter)) + /* We should never come into this function with client not initialized */ + client_data = _apteryx_client_get_ptr (); + ASSERT ((client_data), return NULL, "CB: Not initialised\n"); + + for (iter = g_list_first (client_data->cb_list); iter; iter = g_list_next (iter)) { cb = (cb_t *) iter->data; if (cb->ref == ref) @@ -172,6 +204,10 @@ handle_watch (rpc_message msg) uint64_t ref; const char *path; const char *value; + struct apteryx_client *client_data; + + client_data = _apteryx_client_get_ptr (); + ASSERT ((client_data), return false, "WATCH CB: Not initialised\n"); /* Parse the parameters */ ref = rpc_msg_decode_uint64 (msg); @@ -183,16 +219,16 @@ handle_watch (rpc_message msg) DEBUG ("WATCH CB \"%s\" = \"%s\" (0x%"PRIx64")\n", path, value, ref); - pthread_mutex_lock (&pending_watches_lock); - ++pending_watch_count; - pthread_mutex_unlock (&pending_watches_lock); + pthread_mutex_lock (&client_data->pending_watches_lock); + ++client_data->pending_watch_count; + pthread_mutex_unlock (&client_data->pending_watches_lock); /* Call callback */ call_callback (ref, path, value); - pthread_mutex_lock (&pending_watches_lock); - if (--pending_watch_count == 0) - pthread_cond_signal(&no_pending_watches); - pthread_mutex_unlock (&pending_watches_lock); + pthread_mutex_lock (&client_data->pending_watches_lock); + if (--client_data->pending_watch_count == 0) + pthread_cond_signal(&client_data->no_pending_watches); + pthread_mutex_unlock (&client_data->pending_watches_lock); rpc_msg_reset (msg); return true; } @@ -204,6 +240,10 @@ handle_validate (rpc_message msg) uint64_t ref; const char *path; const char *value; + struct apteryx_client *client_data; + + client_data = _apteryx_client_get_ptr (); + ASSERT ((client_data), return false, "VALIDATE CB: Not initialised\n"); /* Parse the parameters */ ref = rpc_msg_decode_uint64 (msg); @@ -216,14 +256,14 @@ handle_validate (rpc_message msg) DEBUG ("VALIDATE CB \"%s\" = \"%s\" (0x%"PRIx64")\n", path, value, ref); /* We want to wait for all pending watches to be processed */ - pthread_mutex_lock (&pending_watches_lock); - if (pending_watch_count) + pthread_mutex_lock (&client_data->pending_watches_lock); + if (client_data->pending_watch_count) { - pthread_cond_wait (&no_pending_watches, &pending_watches_lock); - pthread_mutex_unlock (&pending_watches_lock); + pthread_cond_wait (&client_data->no_pending_watches, &client_data->pending_watches_lock); + pthread_mutex_unlock (&client_data->pending_watches_lock); } else - pthread_mutex_unlock (&pending_watches_lock); + pthread_mutex_unlock (&client_data->pending_watches_lock); /* Process callback */ result = (uint32_t) (size_t) call_callback (ref, path, value); @@ -280,49 +320,84 @@ msg_handler (rpc_message msg) return false; } +/** + * The global lock must be held when calling this function. + */ +static struct apteryx_client * +_apteryx_init_client (void) +{ + static struct apteryx_client *client_data = NULL; + + client_data = calloc (1, sizeof (struct apteryx_client)); + ASSERT ((client_data), return NULL, "Init: Failed to allocate client\n"); + + pthread_mutex_init (&client_data->pending_watches_lock, NULL); + pthread_cond_init (&client_data->no_pending_watches, NULL); + + /* Increment refcount */ + client_data->ref_count = 0; + + return client_data; +} + bool apteryx_init (bool debug_enabled) { - /* Increment refcount */ + struct apteryx_client *client_data; + pthread_mutex_lock (&lock); - ref_count++; + client_data = _apteryx_client_get_ptr (); + if (!client_data) + { + client_data = _apteryx_init_client (); + _apteryx_client_set_ptr (client_data); + } + + if (!client_data) + { + ERROR ("Init: Failed to init client data\n"); + return false; + } + + /* Increment refcount */ + client_data->ref_count++; apteryx_debug |= debug_enabled; - if (ref_count == 1) + if (client_data->ref_count == 1) { char * uri = NULL; /* Create RPC instance */ - rpc = rpc_init (RPC_CLIENT_TIMEOUT_US, msg_handler); - if (rpc == NULL) + client_data->rpc = rpc_init (RPC_CLIENT_TIMEOUT_US, msg_handler); + if (client_data->rpc == NULL) { ERROR ("Init: Failed to initialise RPC service\n"); - ref_count--; + client_data->ref_count--; pthread_mutex_unlock (&lock); return false; } /* Only need to bind if we have previously added callbacks */ - if (have_callbacks) + if (client_data->have_callbacks) { /* Bind to the default uri for this client */ if (asprintf ((char **) &uri, APTERYX_SERVER".%"PRIu64, (uint64_t) getpid ()) <= 0 - || !rpc_server_bind (rpc, uri, uri)) + || !rpc_server_bind (client_data->rpc, uri, uri)) { ERROR ("Failed to bind uri %s\n", uri); - ref_count--; + client_data->ref_count--; pthread_mutex_unlock (&lock); free ((void*) uri); return false; } DEBUG ("Bound to uri %s\n", uri); - bound = true; + client_data->bound = true; free ((void*) uri); } } pthread_mutex_unlock (&lock); /* Ready to go */ - if (ref_count == 1) + if (client_data->ref_count == 1) DEBUG ("Init: Initialised\n"); return true; } @@ -330,29 +405,42 @@ apteryx_init (bool debug_enabled) bool apteryx_shutdown (void) { - if (ref_count <= 0) + struct apteryx_client *client_data; + + pthread_mutex_lock (&lock); + client_data = _apteryx_client_get_ptr (); + + if (!client_data || client_data->ref_count == 0) { DEBUG ("SHUTDOWN: Not initialised\n"); + pthread_mutex_unlock (&lock); return false; } /* Decrement ref count */ - pthread_mutex_lock (&lock); - ref_count--; - pthread_mutex_unlock (&lock); + client_data->ref_count--; /* Check if there are still other users */ - if (ref_count > 0) + if (client_data->ref_count > 0) { - DEBUG ("SHUTDOWN: More users (refcount=%d)\n", ref_count); + DEBUG ("SHUTDOWN: More users (refcount=%d)\n", client_data->ref_count); + pthread_mutex_unlock (&lock); return true; } + pthread_mutex_unlock (&lock); + /* Shutdown */ DEBUG ("SHUTDOWN: Shutting down\n"); - rpc_shutdown (rpc); - bound = false; + /* Holding the lock during this call may cause deadlock */ + rpc_shutdown (client_data->rpc); + + client_data->rpc = NULL; + client_data->bound = false; + // Callbacks are not cleaned up here - there are potentially still callbacks in Apteryx. + DEBUG ("SHUTDOWN: Shutdown\n"); + return true; } @@ -360,16 +448,18 @@ apteryx_shutdown (void) bool apteryx_shutdown_force (void) { - while (ref_count > 0) - apteryx_shutdown (); + while (apteryx_shutdown ()); + return true; } int apteryx_process (bool poll) { - ASSERT ((ref_count > 0), return false, "PROCESS: Not initialised\n"); - return rpc_server_process (rpc, poll); + struct apteryx_client *client_data = _apteryx_client_get_ptr (); + + ASSERT ((client_data && client_data->ref_count > 0), return false, "PROCESS: Not initialised\n"); + return rpc_server_process (client_data->rpc, poll); } bool @@ -377,8 +467,9 @@ apteryx_bind (const char *url) { char path[PATH_MAX]; bool result; + struct apteryx_client *client_data = _apteryx_client_get_ptr (); - ASSERT ((ref_count > 0), return false, "BIND: Not initialised\n"); + ASSERT ((client_data && client_data->ref_count > 0), return false, "BIND: Not initialised\n"); ASSERT (url, return false, "BIND: Invalid parameters\n"); DEBUG ("BIND: %s\n", url); @@ -395,8 +486,9 @@ bool apteryx_unbind (const char *url) { char path[PATH_MAX]; + struct apteryx_client *client_data = _apteryx_client_get_ptr (); - ASSERT ((ref_count > 0), return false, "UNBIND: Not initialised\n"); + ASSERT ((client_data && client_data->ref_count > 0), return false, "UNBIND: Not initialised\n"); ASSERT (url, return false, "UNBIND: Invalid parameters\n"); DEBUG ("UNBIND: %s\n", url); @@ -414,8 +506,9 @@ apteryx_prune (const char *path) rpc_client rpc_client; rpc_message_t msg = {}; int32_t result; + struct apteryx_client *client_data = _apteryx_client_get_ptr (); - ASSERT ((ref_count > 0), return false, "PRUNE: Not initialised\n"); + ASSERT ((client_data && client_data->ref_count > 0), return false, "PRUNE: Not initialised\n"); ASSERT (path, return false, "PRUNE: Invalid parameters\n"); DEBUG ("PRUNE: %s\n", path); @@ -430,7 +523,7 @@ apteryx_prune (const char *path) } /* IPC */ - rpc_client = rpc_client_connect (rpc, url); + rpc_client = rpc_client_connect (client_data->rpc, url); if (!rpc_client) { ERROR ("PRUNE: Path(%s) Failed to connect to server: %s\n", path, strerror (errno)); @@ -443,7 +536,7 @@ apteryx_prune (const char *path) { ERROR ("PRUNE: No response\n"); rpc_msg_reset (&msg); - rpc_client_release (rpc, rpc_client, false); + rpc_client_release (client_data->rpc, rpc_client, false); free (url); return false; } @@ -454,7 +547,7 @@ apteryx_prune (const char *path) DEBUG ("PRUNE: Error response: %s\n", strerror (-result)); errno = result; } - rpc_client_release (rpc, rpc_client, true); + rpc_client_release (client_data->rpc, rpc_client, true); free (url); /* Success */ @@ -465,21 +558,14 @@ bool apteryx_dump (const char *path, FILE *fp) { char *value = NULL; + struct apteryx_client *client_data = _apteryx_client_get_ptr (); - ASSERT ((ref_count > 0), return false, "DUMP: Not initialised\n"); + ASSERT ((client_data && client_data->ref_count > 0), return false, "DUMP: Not initialised\n"); ASSERT (path, return false, "DUMP: Invalid parameters\n"); ASSERT (fp, return false, "DUMP: Invalid parameters\n"); DEBUG ("DUMP: %s\n", path); - /* Check initialised */ - if (ref_count <= 0) - { - ERROR ("DUMP: not initialised!\n"); - assert(ref_count > 0); - return false; - } - if (strlen (path) > 0 && (value = apteryx_get (path))) { fprintf (fp, "%-64s%s\n", path, value); @@ -509,8 +595,9 @@ apteryx_set_full (const char *path, const char *value, uint64_t ts, bool ack) rpc_client rpc_client; rpc_message_t msg = {}; int result = -ETIMEDOUT; + struct apteryx_client *client_data = _apteryx_client_get_ptr (); - ASSERT ((ref_count > 0), return false, "SET: Not initialised\n"); + ASSERT ((client_data && client_data->ref_count > 0), return false, "SET: Not initialised\n"); ASSERT (path, return false, "SET: Invalid parameters\n"); DEBUG ("SET: %s = %s\n", path, value); @@ -526,7 +613,7 @@ apteryx_set_full (const char *path, const char *value, uint64_t ts, bool ack) } /* IPC */ - rpc_client = rpc_client_connect (rpc, url); + rpc_client = rpc_client_connect (client_data->rpc, url); if (!rpc_client) { ERROR ("SET: Path(%s) Failed to connect to server: %s\n", path, strerror (errno)); @@ -544,7 +631,7 @@ apteryx_set_full (const char *path, const char *value, uint64_t ts, bool ack) { ERROR ("SET: No response\n"); rpc_msg_reset (&msg); - rpc_client_release (rpc, rpc_client, false); + rpc_client_release (client_data->rpc, rpc_client, false); free (url); return false; } @@ -555,7 +642,7 @@ apteryx_set_full (const char *path, const char *value, uint64_t ts, bool ack) DEBUG ("SET: Error response: %s\n", strerror (-result)); errno = result; } - rpc_client_release (rpc, rpc_client, true); + rpc_client_release (client_data->rpc, rpc_client, true); free (url); /* Success */ @@ -628,8 +715,9 @@ apteryx_get (const char *path) char *value = NULL; rpc_client rpc_client; rpc_message_t msg = {}; + struct apteryx_client *client_data = _apteryx_client_get_ptr (); - ASSERT ((ref_count > 0), return NULL, "GET: Not initialised\n"); + ASSERT ((client_data && client_data->ref_count > 0), return NULL, "GET: Not initialised\n"); ASSERT (path, return NULL, "GET: Invalid parameters\n"); DEBUG ("GET: %s\n", path); @@ -645,7 +733,7 @@ apteryx_get (const char *path) } /* IPC */ - rpc_client = rpc_client_connect (rpc, url); + rpc_client = rpc_client_connect (client_data->rpc, url); if (!rpc_client) { ERROR ("GET: Path(%s) Failed to connect to server: %s\n", path, strerror (errno)); @@ -658,7 +746,7 @@ apteryx_get (const char *path) { ERROR ("GET: No response\n"); rpc_msg_reset (&msg); - rpc_client_release (rpc, rpc_client, false); + rpc_client_release (client_data->rpc, rpc_client, false); free (url); return NULL; } @@ -666,7 +754,7 @@ apteryx_get (const char *path) if (value) value = strdup (value); rpc_msg_reset (&msg); - rpc_client_release (rpc, rpc_client, true); + rpc_client_release (client_data->rpc, rpc_client, true); free (url); DEBUG (" = %s\n", value); @@ -936,8 +1024,9 @@ apteryx_set_tree_full (GNode* root, uint64_t ts, bool wait_for_completion) rpc_client rpc_client; rpc_message_t msg = {}; int32_t result = 0; + struct apteryx_client *client_data = _apteryx_client_get_ptr (); - ASSERT ((ref_count > 0), return false, "SET_TREE: Not initialised\n"); + ASSERT ((client_data && client_data->ref_count > 0), return false, "SET_TREE: Not initialised\n"); ASSERT (root, return false, "SET_TREE: Invalid parameters\n"); DEBUG ("SET_TREE: %d paths\n", g_node_n_nodes (root, G_TRAVERSE_LEAVES)); @@ -957,7 +1046,7 @@ apteryx_set_tree_full (GNode* root, uint64_t ts, bool wait_for_completion) } /* IPC */ - rpc_client = rpc_client_connect (rpc, url); + rpc_client = rpc_client_connect (client_data->rpc, url); if (!rpc_client) { ERROR ("SET_TREE: Path(%s) Failed to connect to server: %s\n", path, strerror (errno)); @@ -977,7 +1066,7 @@ apteryx_set_tree_full (GNode* root, uint64_t ts, bool wait_for_completion) { ERROR ("SET_TREE: No response\n"); rpc_msg_reset (&msg); - rpc_client_release (rpc, rpc_client, false); + rpc_client_release (client_data->rpc, rpc_client, false); root->data = old_root_name; free (url); return false; @@ -989,7 +1078,7 @@ apteryx_set_tree_full (GNode* root, uint64_t ts, bool wait_for_completion) DEBUG ("SET_TREE: Error response: %s\n", strerror (-result)); errno = result; } - rpc_client_release (rpc, rpc_client, true); + rpc_client_release (client_data->rpc, rpc_client, true); free (url); /* Reinstate original root name */ @@ -1060,8 +1149,9 @@ apteryx_get_tree (const char *path) GNode *root = NULL; int slen = strlen (path); char *value; + struct apteryx_client *client_data = _apteryx_client_get_ptr (); - ASSERT ((ref_count > 0), return NULL, "GET_TREE: Not initialised\n"); + ASSERT ((client_data && client_data->ref_count > 0), return NULL, "GET_TREE: Not initialised\n"); ASSERT (path, return NULL, "GET_TREE: Invalid parameters\n"); DEBUG ("GET_TREE: %s\n", path); @@ -1077,7 +1167,7 @@ apteryx_get_tree (const char *path) } /* IPC */ - rpc_client = rpc_client_connect (rpc, url); + rpc_client = rpc_client_connect (client_data->rpc, url); if (!rpc_client) { ERROR ("GET_TREE: Path(%s) Failed to connect to server: %s\n", path, strerror (errno)); @@ -1090,7 +1180,7 @@ apteryx_get_tree (const char *path) { ERROR ("GET_TREE: No response\n"); rpc_msg_reset (&msg); - rpc_client_release (rpc, rpc_client, false); + rpc_client_release (client_data->rpc, rpc_client, false); free (url); return NULL; } @@ -1118,7 +1208,7 @@ apteryx_get_tree (const char *path) DEBUG (" = (null)\n"); } rpc_msg_reset (&msg); - rpc_client_release (rpc, rpc_client, true); + rpc_client_release (client_data->rpc, rpc_client, true); free (url); return root; } @@ -1144,8 +1234,9 @@ apteryx_query (GNode *root) char *old_root_name = NULL; char *value = NULL; GNode *rroot = NULL; + struct apteryx_client *client_data = _apteryx_client_get_ptr (); - ASSERT ((ref_count > 0), return NULL, "QUERY: Not initialised\n"); + ASSERT ((client_data && client_data->ref_count > 0), return NULL, "QUERY: Not initialised\n"); ASSERT (root, return NULL, "QUERY: Invalid parameters\n"); DEBUG ("QUERY\n"); @@ -1169,7 +1260,7 @@ apteryx_query (GNode *root) } /* IPC */ - rpc_client = rpc_client_connect (rpc, url); + rpc_client = rpc_client_connect (client_data->rpc, url); if (!rpc_client) { ERROR ("QUERY: Path(%s) Failed to connect to server: %s\n", path, strerror (errno)); @@ -1186,7 +1277,7 @@ apteryx_query (GNode *root) { ERROR ("QUERY: No response\n"); rpc_msg_reset (&msg); - rpc_client_release (rpc, rpc_client, false); + rpc_client_release (client_data->rpc, rpc_client, false); free (url); return NULL; } @@ -1217,7 +1308,7 @@ apteryx_query (GNode *root) root->data = old_root_name; rpc_msg_reset (&msg); - rpc_client_release (rpc, rpc_client, true); + rpc_client_release (client_data->rpc, rpc_client, true); free (url); return rroot; } @@ -1229,8 +1320,9 @@ apteryx_search (const char *path) rpc_client rpc_client; rpc_message_t msg = {}; GList *paths = NULL; + struct apteryx_client *client_data = _apteryx_client_get_ptr (); - ASSERT ((ref_count > 0), return NULL, "SEARCH: Not initialised\n"); + ASSERT ((client_data && client_data->ref_count > 0), return NULL, "SEARCH: Not initialised\n"); ASSERT (path, return NULL, "SEARCH: Invalid parameters\n"); DEBUG ("SEARCH: %s\n", path); @@ -1267,7 +1359,7 @@ apteryx_search (const char *path) } /* IPC */ - rpc_client = rpc_client_connect (rpc, url); + rpc_client = rpc_client_connect (client_data->rpc, url); if (!rpc_client) { ERROR ("SEARCH: Path(%s) Failed to connect to server: %s\n", path, strerror (errno)); @@ -1280,7 +1372,7 @@ apteryx_search (const char *path) { ERROR ("SEARCH: No response\n"); rpc_msg_reset (&msg); - rpc_client_release (rpc, rpc_client, false); + rpc_client_release (client_data->rpc, rpc_client, false); free (url); return NULL; } @@ -1290,7 +1382,7 @@ apteryx_search (const char *path) paths = g_list_prepend (paths, (gpointer) strdup (path)); } rpc_msg_reset (&msg); - rpc_client_release (rpc, rpc_client, true); + rpc_client_release (client_data->rpc, rpc_client, true); free (url); /* Result */ @@ -1338,8 +1430,9 @@ apteryx_find (const char *path, const char *value) rpc_message_t msg = {}; GList *paths = NULL; char *tmp_path = NULL; + struct apteryx_client *client_data = _apteryx_client_get_ptr (); - ASSERT ((ref_count > 0), return NULL, "FIND: Not initialised\n"); + ASSERT ((client_data && client_data->ref_count > 0), return NULL, "FIND: Not initialised\n"); ASSERT (path, return NULL, "FIND: Invalid parameters\n"); ASSERT (value, return NULL, "FIND: Invalid parameters\n"); @@ -1380,7 +1473,7 @@ apteryx_find (const char *path, const char *value) *strrchr (tmp_path, '*') = '\0'; /* IPC */ - rpc_client = rpc_client_connect (rpc, url); + rpc_client = rpc_client_connect (client_data->rpc, url); if (!rpc_client) { ERROR ("FIND: Path(%s) Failed to connect to server: %s\n", path, strerror (errno)); @@ -1396,7 +1489,7 @@ apteryx_find (const char *path, const char *value) { ERROR ("FIND: No response\n"); rpc_msg_reset (&msg); - rpc_client_release (rpc, rpc_client, false); + rpc_client_release (client_data->rpc, rpc_client, false); free (tmp_path); free (url); return NULL; @@ -1407,7 +1500,7 @@ apteryx_find (const char *path, const char *value) paths = g_list_prepend (paths, (gpointer) strdup (path)); } rpc_msg_reset (&msg); - rpc_client_release (rpc, rpc_client, true); + rpc_client_release (client_data->rpc, rpc_client, true); free (tmp_path); free (url); @@ -1423,8 +1516,9 @@ apteryx_find_tree (GNode *root) rpc_message_t msg = {}; const char *path = APTERYX_NAME (root); GList *paths = NULL; + struct apteryx_client *client_data = _apteryx_client_get_ptr (); - ASSERT ((ref_count > 0), return NULL, "FIND_TREE: Not initialised\n"); + ASSERT ((client_data && client_data->ref_count > 0), return NULL, "FIND_TREE: Not initialised\n"); ASSERT (path, return NULL, "FIND_TREE: Invalid parameters\n"); DEBUG ("FIND_TREE: %s\n", path); @@ -1452,7 +1546,7 @@ apteryx_find_tree (GNode *root) } /* IPC */ - rpc_client = rpc_client_connect (rpc, url); + rpc_client = rpc_client_connect (client_data->rpc, url); if (!rpc_client) { ERROR ("FIND_TREE: Path(%s) Failed to connect to server: %s\n", path, strerror (errno)); @@ -1466,7 +1560,7 @@ apteryx_find_tree (GNode *root) { ERROR ("FIND_TREE: No response\n"); rpc_msg_reset (&msg); - rpc_client_release (rpc, rpc_client, false); + rpc_client_release (client_data->rpc, rpc_client, false); free (url); return NULL; } @@ -1476,7 +1570,7 @@ apteryx_find_tree (GNode *root) paths = g_list_prepend (paths, (gpointer) strdup (path)); } rpc_msg_reset (&msg); - rpc_client_release (rpc, rpc_client, true); + rpc_client_release (client_data->rpc, rpc_client, true); free (url); /* Result */ @@ -1489,28 +1583,29 @@ add_callback (const char *type, const char *path, void *fn, bool value, void *da size_t pid = getpid (); char _path[PATH_MAX]; cb_t *cb; + struct apteryx_client *client_data = _apteryx_client_get_ptr (); - ASSERT ((ref_count > 0), return false, "ADD_CB: Not initialised\n"); + ASSERT ((client_data && client_data->ref_count > 0), return false, "ADD_CB: Not initialised\n"); ASSERT (type, return false, "ADD_CB: Invalid type\n"); ASSERT (path, return false, "ADD_CB: Invalid path\n"); ASSERT (fn, return false, "ADD_CB: Invalid callback\n"); cb = calloc (1, sizeof (cb_t)); - cb->ref = next_ref++; + cb->ref = client_data->next_ref++; cb->path = strdup (path); cb->fn = fn; cb->value = value; cb->data = data; pthread_mutex_lock (&lock); - cb_list = g_list_prepend (cb_list, (void *) cb); - if (!bound) + client_data->cb_list = g_list_prepend (client_data->cb_list, (void *) cb); + if (!client_data->bound) { char * uri = NULL; /* Bind to the default uri for this client */ if (asprintf ((char **) &uri, APTERYX_SERVER".%"PRIu64, (uint64_t) getpid ()) <= 0 - || !rpc_server_bind (rpc, uri, uri)) + || !rpc_server_bind (client_data->rpc, uri, uri)) { ERROR ("Failed to bind uri %s\n", uri); pthread_mutex_unlock (&lock); @@ -1519,7 +1614,7 @@ add_callback (const char *type, const char *path, void *fn, bool value, void *da } DEBUG ("Bound to uri %s\n", uri); free ((void*) uri); - bound = true; + client_data->bound = true; } pthread_mutex_unlock (&lock); @@ -1528,7 +1623,7 @@ add_callback (const char *type, const char *path, void *fn, bool value, void *da return false; if (!apteryx_set (_path, path)) return false; - have_callbacks = true; + client_data->have_callbacks = true; return true; } @@ -1539,19 +1634,20 @@ delete_callback (const char *type, const char *path, void *fn) uint64_t ref; GList *iter; cb_t *cb; + struct apteryx_client *client_data = _apteryx_client_get_ptr (); - ASSERT ((ref_count > 0), return false, "DEL_CB: Not initialised\n"); + ASSERT ((client_data && client_data->ref_count > 0), return false, "DEL_CB: Not initialised\n"); ASSERT (type, return false, "DEL_CB: Invalid type\n"); ASSERT (path, return false, "DEL_CB: Invalid path\n"); ASSERT (fn, return false, "DEL_CB: Invalid callback\n"); pthread_mutex_lock (&lock); - for (iter = g_list_first (cb_list); iter; iter = g_list_next (iter)) + for (iter = g_list_first (client_data->cb_list); iter; iter = g_list_next (iter)) { cb = (cb_t *) iter->data; if (cb->fn == fn && strcmp (cb->path, path) == 0) { - cb_list = g_list_remove (cb_list, cb); + client_data->cb_list = g_list_remove (client_data->cb_list, cb); break; } cb = NULL; @@ -1653,8 +1749,9 @@ apteryx_timestamp (const char *path) uint64_t value = 0; rpc_client rpc_client; rpc_message_t msg = {}; + struct apteryx_client *client_data = _apteryx_client_get_ptr (); - ASSERT ((ref_count > 0), return 0, "TIMESTAMP: Not initialised\n"); + ASSERT ((client_data && client_data->ref_count > 0), return 0, "TIMESTAMP: Not initialised\n"); ASSERT (path, return 0, "TIMESTAMP: Invalid parameters\n"); DEBUG ("TIMESTAMP: %s\n", path); @@ -1672,7 +1769,7 @@ apteryx_timestamp (const char *path) } /* IPC */ - rpc_client = rpc_client_connect (rpc, url); + rpc_client = rpc_client_connect (client_data->rpc, url); if (!rpc_client) { ERROR ("TIMESTAMP: Path(%s) Failed to connect to server: %s\n", path, strerror (errno)); @@ -1685,13 +1782,13 @@ apteryx_timestamp (const char *path) { ERROR ("TIMESTAMP: No response\n"); rpc_msg_reset (&msg); - rpc_client_release (rpc, rpc_client, false); + rpc_client_release (client_data->rpc, rpc_client, false); free (url); return 0; } value = rpc_msg_decode_uint64 (&msg); rpc_msg_reset (&msg); - rpc_client_release (rpc, rpc_client, true); + rpc_client_release (client_data->rpc, rpc_client, true); free (url); DEBUG (" = %"PRIu64"\n", value); From 245f9572c06561d8e07521e4b143d0099537c40e Mon Sep 17 00:00:00 2001 From: Luuk Paulussen Date: Thu, 28 Sep 2017 11:21:40 +1300 Subject: [PATCH 2/5] Refactor contents of apteryx_init * Move the client init code done inside the lock to _apteryx_init_internal * Split the code to start the callback server into a separate function and call from _apteryx_init_internal and from add_callback Signed-off-by: Luuk Paulussen --- apteryx.c | 103 +++++++++++++++++++++++++++++------------------------- 1 file changed, 56 insertions(+), 47 deletions(-) diff --git a/apteryx.c b/apteryx.c index 64c9036..8d0cc3b 100644 --- a/apteryx.c +++ b/apteryx.c @@ -320,6 +320,28 @@ msg_handler (rpc_message msg) return false; } +/** + * The global lock must be held when calling this function. + */ +static bool +_apteryx_init_callback_server (struct apteryx_client *client_data) +{ + char *uri = NULL; + + /* Bind to the default uri for this client */ + if (asprintf (&uri, APTERYX_SERVER".%"PRIu64, (uint64_t) getpid ()) <= 0 + || !rpc_server_bind (client_data->rpc, uri, uri)) + { + ERROR ("Failed to bind uri %s\n", uri); + free (uri); + return false; + } + DEBUG ("Bound to uri %s\n", uri); + client_data->bound = true; + free (uri); + return true; +} + /** * The global lock must be held when calling this function. */ @@ -340,12 +362,14 @@ _apteryx_init_client (void) return client_data; } -bool -apteryx_init (bool debug_enabled) +/** + * The global lock must be held when calling this function. + */ +static bool +_apteryx_init_internal (void) { struct apteryx_client *client_data; - pthread_mutex_lock (&lock); client_data = _apteryx_client_get_ptr (); if (!client_data) { @@ -358,48 +382,46 @@ apteryx_init (bool debug_enabled) ERROR ("Init: Failed to init client data\n"); return false; } - - /* Increment refcount */ - client_data->ref_count++; - apteryx_debug |= debug_enabled; - if (client_data->ref_count == 1) + else { - char * uri = NULL; - - /* Create RPC instance */ - client_data->rpc = rpc_init (RPC_CLIENT_TIMEOUT_US, msg_handler); - if (client_data->rpc == NULL) + client_data->ref_count++; + if (client_data->ref_count == 1) { - ERROR ("Init: Failed to initialise RPC service\n"); - client_data->ref_count--; - pthread_mutex_unlock (&lock); - return false; - } + client_data->rpc = rpc_init (RPC_CLIENT_TIMEOUT_US, msg_handler); + if (client_data->rpc == NULL) + { + ERROR ("Init: Failed to initialise RPC service\n"); + client_data->ref_count--; + return false; + } - /* Only need to bind if we have previously added callbacks */ - if (client_data->have_callbacks) - { - /* Bind to the default uri for this client */ - if (asprintf ((char **) &uri, APTERYX_SERVER".%"PRIu64, (uint64_t) getpid ()) <= 0 - || !rpc_server_bind (client_data->rpc, uri, uri)) + if (client_data->have_callbacks && !_apteryx_init_callback_server (client_data)) { - ERROR ("Failed to bind uri %s\n", uri); client_data->ref_count--; - pthread_mutex_unlock (&lock); - free ((void*) uri); return false; } - DEBUG ("Bound to uri %s\n", uri); - client_data->bound = true; - free ((void*) uri); + + /* Ready to go */ + DEBUG ("Init: Initialised\n"); } } + + return true; +} + +bool +apteryx_init (bool debug_enabled) +{ + bool ret; + + pthread_mutex_lock (&lock); + apteryx_debug |= debug_enabled; + + ret = _apteryx_init_internal (); pthread_mutex_unlock (&lock); /* Ready to go */ - if (client_data->ref_count == 1) - DEBUG ("Init: Initialised\n"); - return true; + return ret; } bool @@ -1601,20 +1623,7 @@ add_callback (const char *type, const char *path, void *fn, bool value, void *da client_data->cb_list = g_list_prepend (client_data->cb_list, (void *) cb); if (!client_data->bound) { - char * uri = NULL; - - /* Bind to the default uri for this client */ - if (asprintf ((char **) &uri, APTERYX_SERVER".%"PRIu64, (uint64_t) getpid ()) <= 0 - || !rpc_server_bind (client_data->rpc, uri, uri)) - { - ERROR ("Failed to bind uri %s\n", uri); - pthread_mutex_unlock (&lock); - free ((void*) uri); - return false; - } - DEBUG ("Bound to uri %s\n", uri); - free ((void*) uri); - client_data->bound = true; + _apteryx_init_callback_server (client_data); } pthread_mutex_unlock (&lock); From dd5b0349260f17351f1b3d83c4da2db7a3e5fbf6 Mon Sep 17 00:00:00 2001 From: Luuk Paulussen Date: Thu, 28 Sep 2017 11:43:25 +1300 Subject: [PATCH 3/5] Gracefully handle fork When a process linked to an initialized apteryx client library forks, one of the threads and all of the internal data and socket handles are copied to the new process. All locks are left in their current state. This means that in the child process, the state of the client is indeterminate. Register pthread_atfork handlers to take the global lock and release it, so that we at least have one lock we know is safe in the child process. Also, set client data to NULL in the child process, so that it can no longer interfere with any sockets, etc. created by the parent. Currently, if the child calls apteryx_shutdown, the parent process loses its callback socket. This breaks the test_double_fork unit test. That test expects to be able to set/get after a fork without reinitialising. Signed-off-by: Luuk Paulussen --- apteryx.c | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/apteryx.c b/apteryx.c index 8d0cc3b..e783afe 100644 --- a/apteryx.c +++ b/apteryx.c @@ -1803,3 +1803,33 @@ apteryx_timestamp (const char *path) DEBUG (" = %"PRIu64"\n", value); return value; } + +static void +_apteryx_atfork_prepare (void) +{ + pthread_mutex_lock (&lock); +} + +static void +_apteryx_atfork_parent (void) +{ + pthread_mutex_unlock (&lock); +} + +static void +_apteryx_atfork_child (void) +{ + // Throw away all data related to the parent process. The assumption is that exec will + // be called shortly anyway, replacing the memory space, so memory leak doesn't matter. + _client_data = NULL; + pthread_mutex_unlock (&lock); +} + +/** + * Register fork handlers to ensure that we can gracefully handle forking + */ +__attribute__ ((constructor)) void +apteryx_load (void) +{ + pthread_atfork (_apteryx_atfork_prepare, _apteryx_atfork_parent, _apteryx_atfork_child); +} From 3a68db0a0d1dfe2e79d05c2dea457ca974e58e4b Mon Sep 17 00:00:00 2001 From: Luuk Paulussen Date: Thu, 28 Sep 2017 11:46:13 +1300 Subject: [PATCH 4/5] Automatically clean up Apteryx connections on library unload. This will cause any RPC connections and Unix socket descriptors to be cleaned up. This will also be called when a forked process unloads the library, but as we have set the client_data to NULL in _apteryx_atfork_child, this will have no negative effect. Signed-off-by: Luuk Paulussen --- apteryx.c | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/apteryx.c b/apteryx.c index e783afe..1b35ef6 100644 --- a/apteryx.c +++ b/apteryx.c @@ -1833,3 +1833,16 @@ apteryx_load (void) { pthread_atfork (_apteryx_atfork_prepare, _apteryx_atfork_parent, _apteryx_atfork_child); } + +/** + * When the Apteryx library is unloaded or a process is exited, this will be called + * automatically and will shut down all remaining users. This function will also get + * called after a forked process unloads the library, but apteryx_atfork_child should have + * been called to make this safe. + */ +__attribute__ ((destructor)) void +apteryx_unload (void) +{ + DEBUG ("UNLOAD: Shutting down remaining users\n"); + apteryx_shutdown_force (); +} From e0086c829ab5cf51c7035452e587df31129681dc Mon Sep 17 00:00:00 2001 From: Luuk Paulussen Date: Thu, 28 Sep 2017 11:56:08 +1300 Subject: [PATCH 5/5] Remove the need to call apteryx_init before doing an apteryx call Add a function that gets the apteryx client data if it exists and initialises it if it does not. Use this function in the apteryx client APIs. The test_init function is removed as its assertions are no longer valid. The test_double_fork test now passes again. Signed-off-by: Luuk Paulussen --- apteryx.c | 53 +++++++++++++++++++++++++++++++++++++---------------- test.c | 15 --------------- 2 files changed, 37 insertions(+), 31 deletions(-) diff --git a/apteryx.c b/apteryx.c index 1b35ef6..b36acf6 100644 --- a/apteryx.c +++ b/apteryx.c @@ -424,6 +424,27 @@ apteryx_init (bool debug_enabled) return ret; } +/** + * Get the apteryx client data. If the client is not initialized, the call will initialize + * and return a new client. + */ +static struct apteryx_client * +_apteryx_client_get (void) +{ + struct apteryx_client *client_data; + + pthread_mutex_lock (&lock); + client_data = _apteryx_client_get_ptr (); + if (!client_data || client_data->ref_count == 0) + { + _apteryx_init_internal (); + client_data = _apteryx_client_get_ptr (); + } + pthread_mutex_unlock (&lock); + + return client_data; +} + bool apteryx_shutdown (void) { @@ -478,7 +499,7 @@ apteryx_shutdown_force (void) int apteryx_process (bool poll) { - struct apteryx_client *client_data = _apteryx_client_get_ptr (); + struct apteryx_client *client_data = _apteryx_client_get (); ASSERT ((client_data && client_data->ref_count > 0), return false, "PROCESS: Not initialised\n"); return rpc_server_process (client_data->rpc, poll); @@ -489,7 +510,7 @@ apteryx_bind (const char *url) { char path[PATH_MAX]; bool result; - struct apteryx_client *client_data = _apteryx_client_get_ptr (); + struct apteryx_client *client_data = _apteryx_client_get (); ASSERT ((client_data && client_data->ref_count > 0), return false, "BIND: Not initialised\n"); ASSERT (url, return false, "BIND: Invalid parameters\n"); @@ -508,7 +529,7 @@ bool apteryx_unbind (const char *url) { char path[PATH_MAX]; - struct apteryx_client *client_data = _apteryx_client_get_ptr (); + struct apteryx_client *client_data = _apteryx_client_get (); ASSERT ((client_data && client_data->ref_count > 0), return false, "UNBIND: Not initialised\n"); ASSERT (url, return false, "UNBIND: Invalid parameters\n"); @@ -528,7 +549,7 @@ apteryx_prune (const char *path) rpc_client rpc_client; rpc_message_t msg = {}; int32_t result; - struct apteryx_client *client_data = _apteryx_client_get_ptr (); + struct apteryx_client *client_data = _apteryx_client_get (); ASSERT ((client_data && client_data->ref_count > 0), return false, "PRUNE: Not initialised\n"); ASSERT (path, return false, "PRUNE: Invalid parameters\n"); @@ -580,7 +601,7 @@ bool apteryx_dump (const char *path, FILE *fp) { char *value = NULL; - struct apteryx_client *client_data = _apteryx_client_get_ptr (); + struct apteryx_client *client_data = _apteryx_client_get (); ASSERT ((client_data && client_data->ref_count > 0), return false, "DUMP: Not initialised\n"); ASSERT (path, return false, "DUMP: Invalid parameters\n"); @@ -617,7 +638,7 @@ apteryx_set_full (const char *path, const char *value, uint64_t ts, bool ack) rpc_client rpc_client; rpc_message_t msg = {}; int result = -ETIMEDOUT; - struct apteryx_client *client_data = _apteryx_client_get_ptr (); + struct apteryx_client *client_data = _apteryx_client_get (); ASSERT ((client_data && client_data->ref_count > 0), return false, "SET: Not initialised\n"); ASSERT (path, return false, "SET: Invalid parameters\n"); @@ -737,7 +758,7 @@ apteryx_get (const char *path) char *value = NULL; rpc_client rpc_client; rpc_message_t msg = {}; - struct apteryx_client *client_data = _apteryx_client_get_ptr (); + struct apteryx_client *client_data = _apteryx_client_get (); ASSERT ((client_data && client_data->ref_count > 0), return NULL, "GET: Not initialised\n"); ASSERT (path, return NULL, "GET: Invalid parameters\n"); @@ -1046,7 +1067,7 @@ apteryx_set_tree_full (GNode* root, uint64_t ts, bool wait_for_completion) rpc_client rpc_client; rpc_message_t msg = {}; int32_t result = 0; - struct apteryx_client *client_data = _apteryx_client_get_ptr (); + struct apteryx_client *client_data = _apteryx_client_get (); ASSERT ((client_data && client_data->ref_count > 0), return false, "SET_TREE: Not initialised\n"); ASSERT (root, return false, "SET_TREE: Invalid parameters\n"); @@ -1171,7 +1192,7 @@ apteryx_get_tree (const char *path) GNode *root = NULL; int slen = strlen (path); char *value; - struct apteryx_client *client_data = _apteryx_client_get_ptr (); + struct apteryx_client *client_data = _apteryx_client_get (); ASSERT ((client_data && client_data->ref_count > 0), return NULL, "GET_TREE: Not initialised\n"); ASSERT (path, return NULL, "GET_TREE: Invalid parameters\n"); @@ -1256,7 +1277,7 @@ apteryx_query (GNode *root) char *old_root_name = NULL; char *value = NULL; GNode *rroot = NULL; - struct apteryx_client *client_data = _apteryx_client_get_ptr (); + struct apteryx_client *client_data = _apteryx_client_get (); ASSERT ((client_data && client_data->ref_count > 0), return NULL, "QUERY: Not initialised\n"); ASSERT (root, return NULL, "QUERY: Invalid parameters\n"); @@ -1342,7 +1363,7 @@ apteryx_search (const char *path) rpc_client rpc_client; rpc_message_t msg = {}; GList *paths = NULL; - struct apteryx_client *client_data = _apteryx_client_get_ptr (); + struct apteryx_client *client_data = _apteryx_client_get (); ASSERT ((client_data && client_data->ref_count > 0), return NULL, "SEARCH: Not initialised\n"); ASSERT (path, return NULL, "SEARCH: Invalid parameters\n"); @@ -1452,7 +1473,7 @@ apteryx_find (const char *path, const char *value) rpc_message_t msg = {}; GList *paths = NULL; char *tmp_path = NULL; - struct apteryx_client *client_data = _apteryx_client_get_ptr (); + struct apteryx_client *client_data = _apteryx_client_get (); ASSERT ((client_data && client_data->ref_count > 0), return NULL, "FIND: Not initialised\n"); ASSERT (path, return NULL, "FIND: Invalid parameters\n"); @@ -1538,7 +1559,7 @@ apteryx_find_tree (GNode *root) rpc_message_t msg = {}; const char *path = APTERYX_NAME (root); GList *paths = NULL; - struct apteryx_client *client_data = _apteryx_client_get_ptr (); + struct apteryx_client *client_data = _apteryx_client_get (); ASSERT ((client_data && client_data->ref_count > 0), return NULL, "FIND_TREE: Not initialised\n"); ASSERT (path, return NULL, "FIND_TREE: Invalid parameters\n"); @@ -1605,7 +1626,7 @@ add_callback (const char *type, const char *path, void *fn, bool value, void *da size_t pid = getpid (); char _path[PATH_MAX]; cb_t *cb; - struct apteryx_client *client_data = _apteryx_client_get_ptr (); + struct apteryx_client *client_data = _apteryx_client_get (); ASSERT ((client_data && client_data->ref_count > 0), return false, "ADD_CB: Not initialised\n"); ASSERT (type, return false, "ADD_CB: Invalid type\n"); @@ -1643,7 +1664,7 @@ delete_callback (const char *type, const char *path, void *fn) uint64_t ref; GList *iter; cb_t *cb; - struct apteryx_client *client_data = _apteryx_client_get_ptr (); + struct apteryx_client *client_data = _apteryx_client_get (); ASSERT ((client_data && client_data->ref_count > 0), return false, "DEL_CB: Not initialised\n"); ASSERT (type, return false, "DEL_CB: Invalid type\n"); @@ -1758,7 +1779,7 @@ apteryx_timestamp (const char *path) uint64_t value = 0; rpc_client rpc_client; rpc_message_t msg = {}; - struct apteryx_client *client_data = _apteryx_client_get_ptr (); + struct apteryx_client *client_data = _apteryx_client_get (); ASSERT ((client_data && client_data->ref_count > 0), return 0, "TIMESTAMP: Not initialised\n"); ASSERT (path, return 0, "TIMESTAMP: Invalid parameters\n"); diff --git a/test.c b/test.c index b9b9985..d91931d 100644 --- a/test.c +++ b/test.c @@ -70,20 +70,6 @@ assert_apteryx_empty (void) return ret; } -void -test_init () -{ - const char *path = TEST_PATH"/entity/zones/private/name"; - char *value = NULL; - - apteryx_shutdown_force (); - CU_ASSERT (apteryx_set (path, "private") == FALSE); - CU_ASSERT ((value = apteryx_get (path)) == NULL); - CU_ASSERT (apteryx_set (path, NULL) == FALSE); - CU_ASSERT (assert_apteryx_empty ()); - apteryx_init (apteryx_debug); -} - void test_set_get () { @@ -5009,7 +4995,6 @@ suite_clean (void) static CU_TestInfo tests_api[] = { { "doc example", test_docs }, - { "initialisation", test_init }, { "set and get", test_set_get }, { "set with ack", test_set_with_ack }, { "raw byte streams", test_set_get_raw },