diff --git a/apteryx.c b/apteryx.c index 620250c..b36acf6 100644 --- a/apteryx.c +++ b/apteryx.c @@ -37,15 +37,8 @@ /* Configuration */ bool apteryx_debug = false; /* Debug enabled */ static const char *default_url = APTERYX_SERVER; /* Default path to Apteryx database */ -static int ref_count = 0; /* Library reference count */ static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; /* Protect globals */ -static rpc_instance rpc = NULL; /* RPC Service */ -static bool bound = false; /* Do we have a listen socket open */ -static bool have_callbacks = false; /* Have we ever registered any callbacks */ -static pthread_mutex_t pending_watches_lock = PTHREAD_MUTEX_INITIALIZER; -static pthread_cond_t no_pending_watches = PTHREAD_COND_INITIALIZER; -static int pending_watch_count = 0; /* Callback */ typedef struct _cb_t @@ -56,8 +49,42 @@ typedef struct _cb_t void *fn; void *data; } cb_t; -static uint64_t next_ref = 0; -static GList *cb_list = NULL; + +/* Apteryx client data */ +struct apteryx_client +{ + int ref_count; /* Client reference count */ + rpc_instance rpc; /* RPC Service */ + bool bound; /* Do we have a listen socket open */ + bool have_callbacks; /* Have we ever registered any callbacks */ + + pthread_mutex_t pending_watches_lock; + pthread_cond_t no_pending_watches; + int pending_watch_count; + + uint64_t next_ref; + GList *cb_list; +}; + +static struct apteryx_client *_client_data = NULL; + +/** + * Sets the client data ptr. + */ +static void +_apteryx_client_set_ptr (struct apteryx_client *client_data) +{ + _client_data = client_data; +} + +/** + * Returns the client data ptr. Won't try to create a client if it doesn't exist. + */ +static struct apteryx_client * +_apteryx_client_get_ptr (void) +{ + return _client_data; +} static void * call_callback (uint64_t ref, const char *path, const char *value) @@ -67,9 +94,14 @@ call_callback (uint64_t ref, const char *path, const char *value) void *fn = NULL; void *data = NULL; bool val = false; + struct apteryx_client *client_data; pthread_mutex_lock (&lock); - for (iter = g_list_first (cb_list); iter; iter = g_list_next (iter)) + /* We should never come into this function with client not initialized */ + client_data = _apteryx_client_get_ptr (); + ASSERT ((client_data), return NULL, "CB: Not initialised\n"); + + for (iter = g_list_first (client_data->cb_list); iter; iter = g_list_next (iter)) { cb = (cb_t *) iter->data; if (cb->ref == ref) @@ -172,6 +204,10 @@ handle_watch (rpc_message msg) uint64_t ref; const char *path; const char *value; + struct apteryx_client *client_data; + + client_data = _apteryx_client_get_ptr (); + ASSERT ((client_data), return false, "WATCH CB: Not initialised\n"); /* Parse the parameters */ ref = rpc_msg_decode_uint64 (msg); @@ -183,16 +219,16 @@ handle_watch (rpc_message msg) DEBUG ("WATCH CB \"%s\" = \"%s\" (0x%"PRIx64")\n", path, value, ref); - pthread_mutex_lock (&pending_watches_lock); - ++pending_watch_count; - pthread_mutex_unlock (&pending_watches_lock); + pthread_mutex_lock (&client_data->pending_watches_lock); + ++client_data->pending_watch_count; + pthread_mutex_unlock (&client_data->pending_watches_lock); /* Call callback */ call_callback (ref, path, value); - pthread_mutex_lock (&pending_watches_lock); - if (--pending_watch_count == 0) - pthread_cond_signal(&no_pending_watches); - pthread_mutex_unlock (&pending_watches_lock); + pthread_mutex_lock (&client_data->pending_watches_lock); + if (--client_data->pending_watch_count == 0) + pthread_cond_signal(&client_data->no_pending_watches); + pthread_mutex_unlock (&client_data->pending_watches_lock); rpc_msg_reset (msg); return true; } @@ -204,6 +240,10 @@ handle_validate (rpc_message msg) uint64_t ref; const char *path; const char *value; + struct apteryx_client *client_data; + + client_data = _apteryx_client_get_ptr (); + ASSERT ((client_data), return false, "VALIDATE CB: Not initialised\n"); /* Parse the parameters */ ref = rpc_msg_decode_uint64 (msg); @@ -216,14 +256,14 @@ handle_validate (rpc_message msg) DEBUG ("VALIDATE CB \"%s\" = \"%s\" (0x%"PRIx64")\n", path, value, ref); /* We want to wait for all pending watches to be processed */ - pthread_mutex_lock (&pending_watches_lock); - if (pending_watch_count) + pthread_mutex_lock (&client_data->pending_watches_lock); + if (client_data->pending_watch_count) { - pthread_cond_wait (&no_pending_watches, &pending_watches_lock); - pthread_mutex_unlock (&pending_watches_lock); + pthread_cond_wait (&client_data->no_pending_watches, &client_data->pending_watches_lock); + pthread_mutex_unlock (&client_data->pending_watches_lock); } else - pthread_mutex_unlock (&pending_watches_lock); + pthread_mutex_unlock (&client_data->pending_watches_lock); /* Process callback */ result = (uint32_t) (size_t) call_callback (ref, path, value); @@ -280,79 +320,170 @@ msg_handler (rpc_message msg) return false; } -bool -apteryx_init (bool debug_enabled) +/** + * The global lock must be held when calling this function. + */ +static bool +_apteryx_init_callback_server (struct apteryx_client *client_data) { + char *uri = NULL; + + /* Bind to the default uri for this client */ + if (asprintf (&uri, APTERYX_SERVER".%"PRIu64, (uint64_t) getpid ()) <= 0 + || !rpc_server_bind (client_data->rpc, uri, uri)) + { + ERROR ("Failed to bind uri %s\n", uri); + free (uri); + return false; + } + DEBUG ("Bound to uri %s\n", uri); + client_data->bound = true; + free (uri); + return true; +} + +/** + * The global lock must be held when calling this function. + */ +static struct apteryx_client * +_apteryx_init_client (void) +{ + static struct apteryx_client *client_data = NULL; + + client_data = calloc (1, sizeof (struct apteryx_client)); + ASSERT ((client_data), return NULL, "Init: Failed to allocate client\n"); + + pthread_mutex_init (&client_data->pending_watches_lock, NULL); + pthread_cond_init (&client_data->no_pending_watches, NULL); + /* Increment refcount */ - pthread_mutex_lock (&lock); - ref_count++; - apteryx_debug |= debug_enabled; - if (ref_count == 1) + client_data->ref_count = 0; + + return client_data; +} + +/** + * The global lock must be held when calling this function. + */ +static bool +_apteryx_init_internal (void) +{ + struct apteryx_client *client_data; + + client_data = _apteryx_client_get_ptr (); + if (!client_data) { - char * uri = NULL; + client_data = _apteryx_init_client (); + _apteryx_client_set_ptr (client_data); + } - /* Create RPC instance */ - rpc = rpc_init (RPC_CLIENT_TIMEOUT_US, msg_handler); - if (rpc == NULL) + if (!client_data) + { + ERROR ("Init: Failed to init client data\n"); + return false; + } + else + { + client_data->ref_count++; + if (client_data->ref_count == 1) { - ERROR ("Init: Failed to initialise RPC service\n"); - ref_count--; - pthread_mutex_unlock (&lock); - return false; - } + client_data->rpc = rpc_init (RPC_CLIENT_TIMEOUT_US, msg_handler); + if (client_data->rpc == NULL) + { + ERROR ("Init: Failed to initialise RPC service\n"); + client_data->ref_count--; + return false; + } - /* Only need to bind if we have previously added callbacks */ - if (have_callbacks) - { - /* Bind to the default uri for this client */ - if (asprintf ((char **) &uri, APTERYX_SERVER".%"PRIu64, (uint64_t) getpid ()) <= 0 - || !rpc_server_bind (rpc, uri, uri)) + if (client_data->have_callbacks && !_apteryx_init_callback_server (client_data)) { - ERROR ("Failed to bind uri %s\n", uri); - ref_count--; - pthread_mutex_unlock (&lock); - free ((void*) uri); + client_data->ref_count--; return false; } - DEBUG ("Bound to uri %s\n", uri); - bound = true; - free ((void*) uri); + + /* Ready to go */ + DEBUG ("Init: Initialised\n"); } } + + return true; +} + +bool +apteryx_init (bool debug_enabled) +{ + bool ret; + + pthread_mutex_lock (&lock); + apteryx_debug |= debug_enabled; + + ret = _apteryx_init_internal (); pthread_mutex_unlock (&lock); /* Ready to go */ - if (ref_count == 1) - DEBUG ("Init: Initialised\n"); - return true; + return ret; +} + +/** + * Get the apteryx client data. If the client is not initialized, the call will initialize + * and return a new client. + */ +static struct apteryx_client * +_apteryx_client_get (void) +{ + struct apteryx_client *client_data; + + pthread_mutex_lock (&lock); + client_data = _apteryx_client_get_ptr (); + if (!client_data || client_data->ref_count == 0) + { + _apteryx_init_internal (); + client_data = _apteryx_client_get_ptr (); + } + pthread_mutex_unlock (&lock); + + return client_data; } bool apteryx_shutdown (void) { - if (ref_count <= 0) + struct apteryx_client *client_data; + + pthread_mutex_lock (&lock); + client_data = _apteryx_client_get_ptr (); + + if (!client_data || client_data->ref_count == 0) { DEBUG ("SHUTDOWN: Not initialised\n"); + pthread_mutex_unlock (&lock); return false; } /* Decrement ref count */ - pthread_mutex_lock (&lock); - ref_count--; - pthread_mutex_unlock (&lock); + client_data->ref_count--; /* Check if there are still other users */ - if (ref_count > 0) + if (client_data->ref_count > 0) { - DEBUG ("SHUTDOWN: More users (refcount=%d)\n", ref_count); + DEBUG ("SHUTDOWN: More users (refcount=%d)\n", client_data->ref_count); + pthread_mutex_unlock (&lock); return true; } + pthread_mutex_unlock (&lock); + /* Shutdown */ DEBUG ("SHUTDOWN: Shutting down\n"); - rpc_shutdown (rpc); - bound = false; + /* Holding the lock during this call may cause deadlock */ + rpc_shutdown (client_data->rpc); + + client_data->rpc = NULL; + client_data->bound = false; + // Callbacks are not cleaned up here - there are potentially still callbacks in Apteryx. + DEBUG ("SHUTDOWN: Shutdown\n"); + return true; } @@ -360,16 +491,18 @@ apteryx_shutdown (void) bool apteryx_shutdown_force (void) { - while (ref_count > 0) - apteryx_shutdown (); + while (apteryx_shutdown ()); + return true; } int apteryx_process (bool poll) { - ASSERT ((ref_count > 0), return false, "PROCESS: Not initialised\n"); - return rpc_server_process (rpc, poll); + struct apteryx_client *client_data = _apteryx_client_get (); + + ASSERT ((client_data && client_data->ref_count > 0), return false, "PROCESS: Not initialised\n"); + return rpc_server_process (client_data->rpc, poll); } bool @@ -377,8 +510,9 @@ apteryx_bind (const char *url) { char path[PATH_MAX]; bool result; + struct apteryx_client *client_data = _apteryx_client_get (); - ASSERT ((ref_count > 0), return false, "BIND: Not initialised\n"); + ASSERT ((client_data && client_data->ref_count > 0), return false, "BIND: Not initialised\n"); ASSERT (url, return false, "BIND: Invalid parameters\n"); DEBUG ("BIND: %s\n", url); @@ -395,8 +529,9 @@ bool apteryx_unbind (const char *url) { char path[PATH_MAX]; + struct apteryx_client *client_data = _apteryx_client_get (); - ASSERT ((ref_count > 0), return false, "UNBIND: Not initialised\n"); + ASSERT ((client_data && client_data->ref_count > 0), return false, "UNBIND: Not initialised\n"); ASSERT (url, return false, "UNBIND: Invalid parameters\n"); DEBUG ("UNBIND: %s\n", url); @@ -414,8 +549,9 @@ apteryx_prune (const char *path) rpc_client rpc_client; rpc_message_t msg = {}; int32_t result; + struct apteryx_client *client_data = _apteryx_client_get (); - ASSERT ((ref_count > 0), return false, "PRUNE: Not initialised\n"); + ASSERT ((client_data && client_data->ref_count > 0), return false, "PRUNE: Not initialised\n"); ASSERT (path, return false, "PRUNE: Invalid parameters\n"); DEBUG ("PRUNE: %s\n", path); @@ -430,7 +566,7 @@ apteryx_prune (const char *path) } /* IPC */ - rpc_client = rpc_client_connect (rpc, url); + rpc_client = rpc_client_connect (client_data->rpc, url); if (!rpc_client) { ERROR ("PRUNE: Path(%s) Failed to connect to server: %s\n", path, strerror (errno)); @@ -443,7 +579,7 @@ apteryx_prune (const char *path) { ERROR ("PRUNE: No response\n"); rpc_msg_reset (&msg); - rpc_client_release (rpc, rpc_client, false); + rpc_client_release (client_data->rpc, rpc_client, false); free (url); return false; } @@ -454,7 +590,7 @@ apteryx_prune (const char *path) DEBUG ("PRUNE: Error response: %s\n", strerror (-result)); errno = result; } - rpc_client_release (rpc, rpc_client, true); + rpc_client_release (client_data->rpc, rpc_client, true); free (url); /* Success */ @@ -465,21 +601,14 @@ bool apteryx_dump (const char *path, FILE *fp) { char *value = NULL; + struct apteryx_client *client_data = _apteryx_client_get (); - ASSERT ((ref_count > 0), return false, "DUMP: Not initialised\n"); + ASSERT ((client_data && client_data->ref_count > 0), return false, "DUMP: Not initialised\n"); ASSERT (path, return false, "DUMP: Invalid parameters\n"); ASSERT (fp, return false, "DUMP: Invalid parameters\n"); DEBUG ("DUMP: %s\n", path); - /* Check initialised */ - if (ref_count <= 0) - { - ERROR ("DUMP: not initialised!\n"); - assert(ref_count > 0); - return false; - } - if (strlen (path) > 0 && (value = apteryx_get (path))) { fprintf (fp, "%-64s%s\n", path, value); @@ -509,8 +638,9 @@ apteryx_set_full (const char *path, const char *value, uint64_t ts, bool ack) rpc_client rpc_client; rpc_message_t msg = {}; int result = -ETIMEDOUT; + struct apteryx_client *client_data = _apteryx_client_get (); - ASSERT ((ref_count > 0), return false, "SET: Not initialised\n"); + ASSERT ((client_data && client_data->ref_count > 0), return false, "SET: Not initialised\n"); ASSERT (path, return false, "SET: Invalid parameters\n"); DEBUG ("SET: %s = %s\n", path, value); @@ -526,7 +656,7 @@ apteryx_set_full (const char *path, const char *value, uint64_t ts, bool ack) } /* IPC */ - rpc_client = rpc_client_connect (rpc, url); + rpc_client = rpc_client_connect (client_data->rpc, url); if (!rpc_client) { ERROR ("SET: Path(%s) Failed to connect to server: %s\n", path, strerror (errno)); @@ -544,7 +674,7 @@ apteryx_set_full (const char *path, const char *value, uint64_t ts, bool ack) { ERROR ("SET: No response\n"); rpc_msg_reset (&msg); - rpc_client_release (rpc, rpc_client, false); + rpc_client_release (client_data->rpc, rpc_client, false); free (url); return false; } @@ -555,7 +685,7 @@ apteryx_set_full (const char *path, const char *value, uint64_t ts, bool ack) DEBUG ("SET: Error response: %s\n", strerror (-result)); errno = result; } - rpc_client_release (rpc, rpc_client, true); + rpc_client_release (client_data->rpc, rpc_client, true); free (url); /* Success */ @@ -628,8 +758,9 @@ apteryx_get (const char *path) char *value = NULL; rpc_client rpc_client; rpc_message_t msg = {}; + struct apteryx_client *client_data = _apteryx_client_get (); - ASSERT ((ref_count > 0), return NULL, "GET: Not initialised\n"); + ASSERT ((client_data && client_data->ref_count > 0), return NULL, "GET: Not initialised\n"); ASSERT (path, return NULL, "GET: Invalid parameters\n"); DEBUG ("GET: %s\n", path); @@ -645,7 +776,7 @@ apteryx_get (const char *path) } /* IPC */ - rpc_client = rpc_client_connect (rpc, url); + rpc_client = rpc_client_connect (client_data->rpc, url); if (!rpc_client) { ERROR ("GET: Path(%s) Failed to connect to server: %s\n", path, strerror (errno)); @@ -658,7 +789,7 @@ apteryx_get (const char *path) { ERROR ("GET: No response\n"); rpc_msg_reset (&msg); - rpc_client_release (rpc, rpc_client, false); + rpc_client_release (client_data->rpc, rpc_client, false); free (url); return NULL; } @@ -666,7 +797,7 @@ apteryx_get (const char *path) if (value) value = strdup (value); rpc_msg_reset (&msg); - rpc_client_release (rpc, rpc_client, true); + rpc_client_release (client_data->rpc, rpc_client, true); free (url); DEBUG (" = %s\n", value); @@ -936,8 +1067,9 @@ apteryx_set_tree_full (GNode* root, uint64_t ts, bool wait_for_completion) rpc_client rpc_client; rpc_message_t msg = {}; int32_t result = 0; + struct apteryx_client *client_data = _apteryx_client_get (); - ASSERT ((ref_count > 0), return false, "SET_TREE: Not initialised\n"); + ASSERT ((client_data && client_data->ref_count > 0), return false, "SET_TREE: Not initialised\n"); ASSERT (root, return false, "SET_TREE: Invalid parameters\n"); DEBUG ("SET_TREE: %d paths\n", g_node_n_nodes (root, G_TRAVERSE_LEAVES)); @@ -957,7 +1089,7 @@ apteryx_set_tree_full (GNode* root, uint64_t ts, bool wait_for_completion) } /* IPC */ - rpc_client = rpc_client_connect (rpc, url); + rpc_client = rpc_client_connect (client_data->rpc, url); if (!rpc_client) { ERROR ("SET_TREE: Path(%s) Failed to connect to server: %s\n", path, strerror (errno)); @@ -977,7 +1109,7 @@ apteryx_set_tree_full (GNode* root, uint64_t ts, bool wait_for_completion) { ERROR ("SET_TREE: No response\n"); rpc_msg_reset (&msg); - rpc_client_release (rpc, rpc_client, false); + rpc_client_release (client_data->rpc, rpc_client, false); root->data = old_root_name; free (url); return false; @@ -989,7 +1121,7 @@ apteryx_set_tree_full (GNode* root, uint64_t ts, bool wait_for_completion) DEBUG ("SET_TREE: Error response: %s\n", strerror (-result)); errno = result; } - rpc_client_release (rpc, rpc_client, true); + rpc_client_release (client_data->rpc, rpc_client, true); free (url); /* Reinstate original root name */ @@ -1060,8 +1192,9 @@ apteryx_get_tree (const char *path) GNode *root = NULL; int slen = strlen (path); char *value; + struct apteryx_client *client_data = _apteryx_client_get (); - ASSERT ((ref_count > 0), return NULL, "GET_TREE: Not initialised\n"); + ASSERT ((client_data && client_data->ref_count > 0), return NULL, "GET_TREE: Not initialised\n"); ASSERT (path, return NULL, "GET_TREE: Invalid parameters\n"); DEBUG ("GET_TREE: %s\n", path); @@ -1077,7 +1210,7 @@ apteryx_get_tree (const char *path) } /* IPC */ - rpc_client = rpc_client_connect (rpc, url); + rpc_client = rpc_client_connect (client_data->rpc, url); if (!rpc_client) { ERROR ("GET_TREE: Path(%s) Failed to connect to server: %s\n", path, strerror (errno)); @@ -1090,7 +1223,7 @@ apteryx_get_tree (const char *path) { ERROR ("GET_TREE: No response\n"); rpc_msg_reset (&msg); - rpc_client_release (rpc, rpc_client, false); + rpc_client_release (client_data->rpc, rpc_client, false); free (url); return NULL; } @@ -1118,7 +1251,7 @@ apteryx_get_tree (const char *path) DEBUG (" = (null)\n"); } rpc_msg_reset (&msg); - rpc_client_release (rpc, rpc_client, true); + rpc_client_release (client_data->rpc, rpc_client, true); free (url); return root; } @@ -1144,8 +1277,9 @@ apteryx_query (GNode *root) char *old_root_name = NULL; char *value = NULL; GNode *rroot = NULL; + struct apteryx_client *client_data = _apteryx_client_get (); - ASSERT ((ref_count > 0), return NULL, "QUERY: Not initialised\n"); + ASSERT ((client_data && client_data->ref_count > 0), return NULL, "QUERY: Not initialised\n"); ASSERT (root, return NULL, "QUERY: Invalid parameters\n"); DEBUG ("QUERY\n"); @@ -1169,7 +1303,7 @@ apteryx_query (GNode *root) } /* IPC */ - rpc_client = rpc_client_connect (rpc, url); + rpc_client = rpc_client_connect (client_data->rpc, url); if (!rpc_client) { ERROR ("QUERY: Path(%s) Failed to connect to server: %s\n", path, strerror (errno)); @@ -1186,7 +1320,7 @@ apteryx_query (GNode *root) { ERROR ("QUERY: No response\n"); rpc_msg_reset (&msg); - rpc_client_release (rpc, rpc_client, false); + rpc_client_release (client_data->rpc, rpc_client, false); free (url); return NULL; } @@ -1217,7 +1351,7 @@ apteryx_query (GNode *root) root->data = old_root_name; rpc_msg_reset (&msg); - rpc_client_release (rpc, rpc_client, true); + rpc_client_release (client_data->rpc, rpc_client, true); free (url); return rroot; } @@ -1229,8 +1363,9 @@ apteryx_search (const char *path) rpc_client rpc_client; rpc_message_t msg = {}; GList *paths = NULL; + struct apteryx_client *client_data = _apteryx_client_get (); - ASSERT ((ref_count > 0), return NULL, "SEARCH: Not initialised\n"); + ASSERT ((client_data && client_data->ref_count > 0), return NULL, "SEARCH: Not initialised\n"); ASSERT (path, return NULL, "SEARCH: Invalid parameters\n"); DEBUG ("SEARCH: %s\n", path); @@ -1267,7 +1402,7 @@ apteryx_search (const char *path) } /* IPC */ - rpc_client = rpc_client_connect (rpc, url); + rpc_client = rpc_client_connect (client_data->rpc, url); if (!rpc_client) { ERROR ("SEARCH: Path(%s) Failed to connect to server: %s\n", path, strerror (errno)); @@ -1280,7 +1415,7 @@ apteryx_search (const char *path) { ERROR ("SEARCH: No response\n"); rpc_msg_reset (&msg); - rpc_client_release (rpc, rpc_client, false); + rpc_client_release (client_data->rpc, rpc_client, false); free (url); return NULL; } @@ -1290,7 +1425,7 @@ apteryx_search (const char *path) paths = g_list_prepend (paths, (gpointer) strdup (path)); } rpc_msg_reset (&msg); - rpc_client_release (rpc, rpc_client, true); + rpc_client_release (client_data->rpc, rpc_client, true); free (url); /* Result */ @@ -1338,8 +1473,9 @@ apteryx_find (const char *path, const char *value) rpc_message_t msg = {}; GList *paths = NULL; char *tmp_path = NULL; + struct apteryx_client *client_data = _apteryx_client_get (); - ASSERT ((ref_count > 0), return NULL, "FIND: Not initialised\n"); + ASSERT ((client_data && client_data->ref_count > 0), return NULL, "FIND: Not initialised\n"); ASSERT (path, return NULL, "FIND: Invalid parameters\n"); ASSERT (value, return NULL, "FIND: Invalid parameters\n"); @@ -1380,7 +1516,7 @@ apteryx_find (const char *path, const char *value) *strrchr (tmp_path, '*') = '\0'; /* IPC */ - rpc_client = rpc_client_connect (rpc, url); + rpc_client = rpc_client_connect (client_data->rpc, url); if (!rpc_client) { ERROR ("FIND: Path(%s) Failed to connect to server: %s\n", path, strerror (errno)); @@ -1396,7 +1532,7 @@ apteryx_find (const char *path, const char *value) { ERROR ("FIND: No response\n"); rpc_msg_reset (&msg); - rpc_client_release (rpc, rpc_client, false); + rpc_client_release (client_data->rpc, rpc_client, false); free (tmp_path); free (url); return NULL; @@ -1407,7 +1543,7 @@ apteryx_find (const char *path, const char *value) paths = g_list_prepend (paths, (gpointer) strdup (path)); } rpc_msg_reset (&msg); - rpc_client_release (rpc, rpc_client, true); + rpc_client_release (client_data->rpc, rpc_client, true); free (tmp_path); free (url); @@ -1423,8 +1559,9 @@ apteryx_find_tree (GNode *root) rpc_message_t msg = {}; const char *path = APTERYX_NAME (root); GList *paths = NULL; + struct apteryx_client *client_data = _apteryx_client_get (); - ASSERT ((ref_count > 0), return NULL, "FIND_TREE: Not initialised\n"); + ASSERT ((client_data && client_data->ref_count > 0), return NULL, "FIND_TREE: Not initialised\n"); ASSERT (path, return NULL, "FIND_TREE: Invalid parameters\n"); DEBUG ("FIND_TREE: %s\n", path); @@ -1452,7 +1589,7 @@ apteryx_find_tree (GNode *root) } /* IPC */ - rpc_client = rpc_client_connect (rpc, url); + rpc_client = rpc_client_connect (client_data->rpc, url); if (!rpc_client) { ERROR ("FIND_TREE: Path(%s) Failed to connect to server: %s\n", path, strerror (errno)); @@ -1466,7 +1603,7 @@ apteryx_find_tree (GNode *root) { ERROR ("FIND_TREE: No response\n"); rpc_msg_reset (&msg); - rpc_client_release (rpc, rpc_client, false); + rpc_client_release (client_data->rpc, rpc_client, false); free (url); return NULL; } @@ -1476,7 +1613,7 @@ apteryx_find_tree (GNode *root) paths = g_list_prepend (paths, (gpointer) strdup (path)); } rpc_msg_reset (&msg); - rpc_client_release (rpc, rpc_client, true); + rpc_client_release (client_data->rpc, rpc_client, true); free (url); /* Result */ @@ -1489,37 +1626,25 @@ add_callback (const char *type, const char *path, void *fn, bool value, void *da size_t pid = getpid (); char _path[PATH_MAX]; cb_t *cb; + struct apteryx_client *client_data = _apteryx_client_get (); - ASSERT ((ref_count > 0), return false, "ADD_CB: Not initialised\n"); + ASSERT ((client_data && client_data->ref_count > 0), return false, "ADD_CB: Not initialised\n"); ASSERT (type, return false, "ADD_CB: Invalid type\n"); ASSERT (path, return false, "ADD_CB: Invalid path\n"); ASSERT (fn, return false, "ADD_CB: Invalid callback\n"); cb = calloc (1, sizeof (cb_t)); - cb->ref = next_ref++; + cb->ref = client_data->next_ref++; cb->path = strdup (path); cb->fn = fn; cb->value = value; cb->data = data; pthread_mutex_lock (&lock); - cb_list = g_list_prepend (cb_list, (void *) cb); - if (!bound) + client_data->cb_list = g_list_prepend (client_data->cb_list, (void *) cb); + if (!client_data->bound) { - char * uri = NULL; - - /* Bind to the default uri for this client */ - if (asprintf ((char **) &uri, APTERYX_SERVER".%"PRIu64, (uint64_t) getpid ()) <= 0 - || !rpc_server_bind (rpc, uri, uri)) - { - ERROR ("Failed to bind uri %s\n", uri); - pthread_mutex_unlock (&lock); - free ((void*) uri); - return false; - } - DEBUG ("Bound to uri %s\n", uri); - free ((void*) uri); - bound = true; + _apteryx_init_callback_server (client_data); } pthread_mutex_unlock (&lock); @@ -1528,7 +1653,7 @@ add_callback (const char *type, const char *path, void *fn, bool value, void *da return false; if (!apteryx_set (_path, path)) return false; - have_callbacks = true; + client_data->have_callbacks = true; return true; } @@ -1539,19 +1664,20 @@ delete_callback (const char *type, const char *path, void *fn) uint64_t ref; GList *iter; cb_t *cb; + struct apteryx_client *client_data = _apteryx_client_get (); - ASSERT ((ref_count > 0), return false, "DEL_CB: Not initialised\n"); + ASSERT ((client_data && client_data->ref_count > 0), return false, "DEL_CB: Not initialised\n"); ASSERT (type, return false, "DEL_CB: Invalid type\n"); ASSERT (path, return false, "DEL_CB: Invalid path\n"); ASSERT (fn, return false, "DEL_CB: Invalid callback\n"); pthread_mutex_lock (&lock); - for (iter = g_list_first (cb_list); iter; iter = g_list_next (iter)) + for (iter = g_list_first (client_data->cb_list); iter; iter = g_list_next (iter)) { cb = (cb_t *) iter->data; if (cb->fn == fn && strcmp (cb->path, path) == 0) { - cb_list = g_list_remove (cb_list, cb); + client_data->cb_list = g_list_remove (client_data->cb_list, cb); break; } cb = NULL; @@ -1653,8 +1779,9 @@ apteryx_timestamp (const char *path) uint64_t value = 0; rpc_client rpc_client; rpc_message_t msg = {}; + struct apteryx_client *client_data = _apteryx_client_get (); - ASSERT ((ref_count > 0), return 0, "TIMESTAMP: Not initialised\n"); + ASSERT ((client_data && client_data->ref_count > 0), return 0, "TIMESTAMP: Not initialised\n"); ASSERT (path, return 0, "TIMESTAMP: Invalid parameters\n"); DEBUG ("TIMESTAMP: %s\n", path); @@ -1672,7 +1799,7 @@ apteryx_timestamp (const char *path) } /* IPC */ - rpc_client = rpc_client_connect (rpc, url); + rpc_client = rpc_client_connect (client_data->rpc, url); if (!rpc_client) { ERROR ("TIMESTAMP: Path(%s) Failed to connect to server: %s\n", path, strerror (errno)); @@ -1685,15 +1812,58 @@ apteryx_timestamp (const char *path) { ERROR ("TIMESTAMP: No response\n"); rpc_msg_reset (&msg); - rpc_client_release (rpc, rpc_client, false); + rpc_client_release (client_data->rpc, rpc_client, false); free (url); return 0; } value = rpc_msg_decode_uint64 (&msg); rpc_msg_reset (&msg); - rpc_client_release (rpc, rpc_client, true); + rpc_client_release (client_data->rpc, rpc_client, true); free (url); DEBUG (" = %"PRIu64"\n", value); return value; } + +static void +_apteryx_atfork_prepare (void) +{ + pthread_mutex_lock (&lock); +} + +static void +_apteryx_atfork_parent (void) +{ + pthread_mutex_unlock (&lock); +} + +static void +_apteryx_atfork_child (void) +{ + // Throw away all data related to the parent process. The assumption is that exec will + // be called shortly anyway, replacing the memory space, so memory leak doesn't matter. + _client_data = NULL; + pthread_mutex_unlock (&lock); +} + +/** + * Register fork handlers to ensure that we can gracefully handle forking + */ +__attribute__ ((constructor)) void +apteryx_load (void) +{ + pthread_atfork (_apteryx_atfork_prepare, _apteryx_atfork_parent, _apteryx_atfork_child); +} + +/** + * When the Apteryx library is unloaded or a process is exited, this will be called + * automatically and will shut down all remaining users. This function will also get + * called after a forked process unloads the library, but apteryx_atfork_child should have + * been called to make this safe. + */ +__attribute__ ((destructor)) void +apteryx_unload (void) +{ + DEBUG ("UNLOAD: Shutting down remaining users\n"); + apteryx_shutdown_force (); +} diff --git a/test.c b/test.c index b9b9985..d91931d 100644 --- a/test.c +++ b/test.c @@ -70,20 +70,6 @@ assert_apteryx_empty (void) return ret; } -void -test_init () -{ - const char *path = TEST_PATH"/entity/zones/private/name"; - char *value = NULL; - - apteryx_shutdown_force (); - CU_ASSERT (apteryx_set (path, "private") == FALSE); - CU_ASSERT ((value = apteryx_get (path)) == NULL); - CU_ASSERT (apteryx_set (path, NULL) == FALSE); - CU_ASSERT (assert_apteryx_empty ()); - apteryx_init (apteryx_debug); -} - void test_set_get () { @@ -5009,7 +4995,6 @@ suite_clean (void) static CU_TestInfo tests_api[] = { { "doc example", test_docs }, - { "initialisation", test_init }, { "set and get", test_set_get }, { "set with ack", test_set_with_ack }, { "raw byte streams", test_set_get_raw },