Skip to content

Commit

Permalink
Implement apteryx_watch_tree_masked
Browse files Browse the repository at this point in the history
Allows users to create a watch callback that masks
sets made by itself as defined by namespace and pid.

Flags are now passed to apteryxd via the GUID.
RPC messages contain the sender ns/pid.
Use macros to alow users to activate the required
functionality without them calling the _full fn.
  • Loading branch information
carlgsmith committed Apr 14, 2024
1 parent 21bffd3 commit 710fc6d
Show file tree
Hide file tree
Showing 9 changed files with 170 additions and 74 deletions.
26 changes: 8 additions & 18 deletions apteryx.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ typedef struct _cb_t
bool value;
void *fn;
void *data;
uint32_t flags;
uint64_t flags;
guint timeout_ms;
} cb_t;
static uint64_t next_ref = 0;
Expand Down Expand Up @@ -2003,7 +2003,7 @@ apteryx_find_tree (GNode *root)
}

bool
add_callback (const char *type, const char *path, void *fn, bool value, void *data, uint32_t flags, uint64_t timeout_ms)
add_callback (const char *type, const char *path, void *fn, bool value, void *data, uint64_t flags, uint64_t timeout_ms)
{
size_t pid = getpid ();
char _path[PATH_MAX];
Expand Down Expand Up @@ -2044,7 +2044,7 @@ add_callback (const char *type, const char *path, void *fn, bool value, void *da
pthread_mutex_unlock (&lock);

if (sprintf (_path, "%s/"APTERYX_GUID_FORMAT,
type, getns (), (uint64_t)pid, cb->ref, (uint64_t)g_str_hash (path)) <= 0)
type, getns (), (uint64_t)pid, cb->ref, cb->flags, (uint64_t)g_str_hash (path)) <= 0)
return false;
if (!apteryx_set (_path, path))
return false;
Expand All @@ -2057,6 +2057,7 @@ delete_callback (const char *type, const char *path, void *fn, void *data)
{
char _path[PATH_MAX];
uint64_t ref;
uint64_t flags;
GList *iter;
cb_t *cb;

Expand All @@ -2079,11 +2080,12 @@ delete_callback (const char *type, const char *path, void *fn, void *data)
pthread_mutex_unlock (&lock);
ASSERT (cb, return false, "CB: not found (%s)\n", path);
ref = cb->ref;
flags = cb->flags;
free ((void *) cb->path);
free (cb);

if (sprintf (_path, "%s/"APTERYX_GUID_FORMAT,
type, getns (), (uint64_t)getpid (), ref, (uint64_t)g_str_hash (path)) <= 0)
type, getns (), (uint64_t)getpid (), ref, flags, (uint64_t)g_str_hash (path)) <= 0)
return false;
if (!apteryx_set (_path, NULL))
return false;
Expand Down Expand Up @@ -2115,9 +2117,9 @@ apteryx_unwatch (const char *path, apteryx_watch_callback cb)
}

bool
apteryx_watch_tree (const char *path, apteryx_watch_tree_callback cb)
apteryx_watch_tree_full (const char *path, apteryx_watch_tree_callback cb, int flags, guint quiet_ms)
{
return add_callback (APTERYX_WATCHERS_PATH, path, (void *)cb, true, NULL, 1, 0);
return add_callback (APTERYX_WATCHERS_PATH, path, (void *)cb, true, NULL, (WATCH_F_TREE_CB | flags), quiet_ms);
}

bool
Expand All @@ -2126,18 +2128,6 @@ apteryx_unwatch_tree (const char *path, apteryx_watch_tree_callback cb)
return delete_callback (APTERYX_WATCHERS_PATH, path, (void *)cb, NULL);
}

bool
apteryx_watch_tree_full (const char *path, apteryx_watch_tree_callback cb, guint quiet_ms)
{
return add_callback (APTERYX_WATCHERS_PATH, path, (void *)cb, true, NULL, 1, quiet_ms);
}

bool
apteryx_unwatch_tree_full (const char *path, apteryx_watch_tree_callback cb)
{
return delete_callback (APTERYX_WATCHERS_PATH, path, (void *)cb, NULL);
}

bool
apteryx_validate (const char *path, apteryx_validate_callback cb)
{
Expand Down
34 changes: 16 additions & 18 deletions apteryx.h
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,13 @@ bool apteryx_index (const char *path, apteryx_index_callback cb);
/** No longer provide search results for a root path */
bool apteryx_unindex (const char *path, apteryx_index_callback cb);

/**
* Flags to configure the watch behaviour
*/
#define WATCH_F_DEFAULT 0 /* Default behaviour */
#define WATCH_F_TREE_CB (1 << 0) /* Internal use only */
#define WATCH_F_MASK_MYSELF (1 << 1) /* No watch callbacks if it is me (ns:pid) that does the set */

/**
* Callback function to be called when a
* watched value changes.
Expand Down Expand Up @@ -608,33 +615,24 @@ bool apteryx_unwatch (const char *path, apteryx_watch_callback cb);
typedef bool (*apteryx_watch_tree_callback) (GNode *root);

/**
* Watch for changes in the path
* Supports *(wildcard) at the end of path for all children under this path
* Supports /(level) at the end of path for children only under this current path (one level down)
* Whenever a change occurs in a watched path, cb is called with the changed
* tree of changes that occurred in one transaction (e.g. an apteryx_set_tree)
* @param path path to the value to be watched
* @param cb function to call when the value changes
* @return true on successful registration
*/
bool apteryx_watch_tree (const char *path, apteryx_watch_tree_callback cb);
/** UnWatch for changes in the path */
bool apteryx_unwatch_tree (const char *path, apteryx_watch_tree_callback cb);

/**
* Watch for changes in the path but only notify after a period of quiet
* Watch for changes in the path and pass teh callback a tree of changes
* Supports *(wildcard) at the end of path for all children under this path
* Supports /(level) at the end of path for children only under this current path (one level down)
* Whenever a change occurs in a monitor path, cb is called with the
* longest common path to all recent changes
* @param path path to the value to be monitored
* @param cb function to call when the value changes
* @param quiet_ms only notify after this period of quiet time in milliseconds (0=no timeout)
* @param flags to change the watch behaviour
* @param quiet_ms aggregate sets and notify only after this period of quiet time in milliseconds (0=no timeout)
* @return true on successful registration
*/
bool apteryx_watch_tree_full (const char *path, apteryx_watch_tree_callback cb, guint quiet_ms);
bool apteryx_watch_tree_full (const char *path, apteryx_watch_tree_callback cb, int flags, guint quiet_ms);
#define apteryx_watch_tree(p,c) apteryx_watch_tree_full(p, c, 0, 0)
#define apteryx_watch_tree_wait(p,c,ms) apteryx_watch_tree_full(p, c, 0, ms)
#define apteryx_watch_tree_masked(p,c) apteryx_watch_tree_full(p, c, WATCH_F_MASK_MYSELF, 0)
#define apteryx_watch_tree_wait_masked(p,c,ms) apteryx_watch_tree_full(p, c, WATCH_F_MASK_MYSELF, ms)
/** UnWatch for changes in the path */
bool apteryx_unwatch_tree_full (const char *path, apteryx_watch_tree_callback cb);
bool apteryx_unwatch_tree (const char *path, apteryx_watch_tree_callback cb);

/**
* Callback function to be called to validate a new value
Expand Down
3 changes: 2 additions & 1 deletion apteryxc.c
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ struct stat_t
uint64_t pid;
uint64_t callback;
uint64_t ns;
uint64_t flags;
uint64_t hash;
uint64_t count;
uint64_t min;
Expand Down Expand Up @@ -125,7 +126,7 @@ _parse_stats (GNode *node, gpointer data)
struct stat_t *stat = g_malloc0 (sizeof (struct stat_t));
stat->guid = g_strdup (APTERYX_NAME (node));
if (sscanf (APTERYX_NAME (node), APTERYX_GUID_FORMAT,
&stat->ns, &stat->pid, &stat->callback, &stat->hash) != 4 ||
&stat->ns, &stat->pid, &stat->callback, &stat->flags, &stat->hash) != 5 ||
sscanf (APTERYX_VALUE (node), "%" PRIu64 ",%" PRIu64 ",%" PRIu64 ",%" PRIu64 "",
&stat->count, &stat->min, &stat->avg, &stat->max) != 4 ||
stat->count == 0)
Expand Down
15 changes: 10 additions & 5 deletions apteryxd.c
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ send_watch_notification (cb_info_t *watcher, GList *paths, GList *values, int ac
}

static void
notify_watchers (GList *paths, GList *values, bool ack)
notify_watchers (GList *paths, GList *values, bool ack, uint64_t ns, uint64_t pid)
{
GList *common_watchers = NULL;
GList *used_watchers = NULL;
Expand All @@ -382,7 +382,9 @@ notify_watchers (GList *paths, GList *values, bool ack)
for (iter = common_watchers; iter; iter = g_list_next (iter))
{
cb_info_t *watcher = iter->data;

/* Skip watchers that don't want to be notified for their own sets */
if ((watcher->flags & WATCH_F_MASK_MYSELF) && watcher->ns == ns && watcher->id == pid)
continue;
if (watcher->id != getpid ())
{
send_watch_notification (watcher, paths, values, ack);
Expand Down Expand Up @@ -422,6 +424,9 @@ notify_watchers (GList *paths, GList *values, bool ack)
cb (path, value);
continue;
}
/* Skip watchers that don't want to be notified for their own sets */
if ((watcher->flags & WATCH_F_MASK_MYSELF) && watcher->ns == ns && watcher->id == pid)
continue;
GList *watch_paths = g_list_append(NULL, (void *) path);
GList *watch_values = g_list_append(NULL, (void *) value);
send_watch_notification (watcher, watch_paths, watch_values, ack);
Expand Down Expand Up @@ -1273,7 +1278,7 @@ handle_set (rpc_message msg, bool ack)
GList *wpaths = g_list_append (NULL, (gpointer) path);
GList *wvalues = g_list_append (NULL, (gpointer) value);
GList *next;
notify_watchers (wpaths, wvalues, ack);
notify_watchers (wpaths, wvalues, ack, msg->ns, msg->pid);
g_list_free (wpaths);
g_list_free (wvalues);

Expand Down Expand Up @@ -1342,7 +1347,7 @@ handle_set (rpc_message msg, bool ack)
/* Notify watchers, if any are present */
if (config_tree_has_watchers (root_path))
{
notify_watchers (lists.paths, lists.values, ack);
notify_watchers (lists.paths, lists.values, ack, msg->ns, msg->pid);
}
}

Expand Down Expand Up @@ -2356,7 +2361,7 @@ handle_prune (rpc_message msg)
if (validation_result >= 0)
{
/* Call watchers for each pruned path */
notify_watchers (paths, NULL, false);
notify_watchers (paths, NULL, false, msg->ns, msg->pid);
}

/* Release validation lock - this is a sensitive value */
Expand Down
21 changes: 11 additions & 10 deletions callbacks.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ static pthread_mutex_t tree_lock = PTHREAD_MUTEX_INITIALIZER;

cb_info_t *
cb_create (struct callback_node *tree_root, const char *guid, const char *path,
uint64_t id, uint64_t callback, uint64_t ns)
uint64_t id, uint64_t callback, uint64_t ns, uint64_t flags)
{
cb_info_t *cb = (cb_info_t *) g_malloc0 (sizeof (cb_info_t));
cb->active = true;
Expand All @@ -46,6 +46,7 @@ cb_create (struct callback_node *tree_root, const char *guid, const char *path,
cb->id = id;
cb->uri = g_strdup_printf (APTERYX_CLIENT, cb->ns, cb->id);
cb->ref = callback;
cb->flags = flags;
g_atomic_int_set (&cb->refcnt, 1);

g_atomic_int_inc (&cb->refcnt);
Expand Down Expand Up @@ -643,7 +644,7 @@ test_cb_match ()
cb_info_t *cb = NULL;
/* Wildcard in path */
struct callback_node *watches_list = cb_init ();
cb = cb_create (watches_list, "tester", "/firewall/rules/*/app", 1, 0, 0);
cb = cb_create (watches_list, "tester", "/firewall/rules/*/app", 1, 0, 0, 0);
cb_release (cb);
matches = cb_match (watches_list, "/firewall/rules/10/app");
CU_ASSERT (matches != NULL);
Expand All @@ -658,7 +659,7 @@ test_cb_match ()

/* directory */
watches_list = cb_init ();
cb = cb_create (watches_list, "tester", "/firewall/rules/10/", 2, 0, 0);
cb = cb_create (watches_list, "tester", "/firewall/rules/10/", 2, 0, 0, 0);
cb_release (cb);

matches = cb_match (watches_list, "/firewall/rules/10/app");
Expand All @@ -673,7 +674,7 @@ test_cb_match ()
cb_shutdown (watches_list);

watches_list = cb_init ();
cb = cb_create (watches_list, "tester", "/firewall/rules/10/app", 3, 0, 0);
cb = cb_create (watches_list, "tester", "/firewall/rules/10/app", 3, 0, 0, 0);
cb_release (cb);
matches = cb_match (watches_list, "/firewall/rules/10/app");
CU_ASSERT (matches != NULL);
Expand All @@ -687,7 +688,7 @@ test_cb_match ()
cb_shutdown (watches_list);

watches_list = cb_init ();
cb = cb_create (watches_list, "tester", "/firewall/rules/10", 4, 0, 0);
cb = cb_create (watches_list, "tester", "/firewall/rules/10", 4, 0, 0, 0);
cb_release (cb);

matches = cb_match (watches_list, "/firewall/rules/10/app");
Expand All @@ -701,7 +702,7 @@ test_cb_match ()
cb_shutdown (watches_list);

watches_list = cb_init ();
cb = cb_create (watches_list, "tester", "/firewall/rules/*", 5, 0, 0);
cb = cb_create (watches_list, "tester", "/firewall/rules/*", 5, 0, 0, 0);
cb_release (cb);

matches = cb_match (watches_list, "/firewall/rules/10/app");
Expand All @@ -725,7 +726,7 @@ test_cb_release ()
{
cb_info_t *cb;
struct callback_node *watches_list = cb_init ();
cb = cb_create (watches_list, "abc", "/test", 1, 0, 0);
cb = cb_create (watches_list, "abc", "/test", 1, 0, 0, 0);
cb_release (cb);
CU_ASSERT (g_atomic_int_get (&cb->refcnt) == 1);
cb_release (cb);
Expand All @@ -738,7 +739,7 @@ test_cb_disable ()
{
cb_info_t *cb;
struct callback_node *watches_list = cb_init ();
cb = cb_create (watches_list, "abc", "/test", 1, 0, 0);
cb = cb_create (watches_list, "abc", "/test", 1, 0, 0, 0);

cb_disable (cb);
CU_ASSERT (!hashtree_empty (&watches_list->hashtree_node));
Expand Down Expand Up @@ -770,7 +771,7 @@ match_perf_test (PERF_TEST_INDEX index)
{
sprintf (path, "/database/test%d/test%d", i, i);
sprintf (guid, "%zX", (size_t) g_str_hash (path));
cb = cb_create (watches_list, guid, path, 1, 0, 0);
cb = cb_create (watches_list, guid, path, 1, 0, 0, 0);
cb_release (cb);
}
CU_ASSERT (!hashtree_empty (&watches_list->hashtree_node));
Expand Down Expand Up @@ -821,7 +822,7 @@ _cb_exist_locking_thrasher (void *list)
while (test_running)
{
cb_info_t *cb =
cb_create (test_list, "tester", "/test/callback/path/down/*/someplace", 1, 0, 0);
cb_create (test_list, "tester", "/test/callback/path/down/*/someplace", 1, 0, 0, 0);
cb_release (cb);
/* remove this callback */
cb_release (cb);
Expand Down
28 changes: 14 additions & 14 deletions config.c
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,10 @@ static cb_info_t *
update_callback (struct callback_node *list, const char *guid, const char *value)
{
cb_info_t *cb;
uint64_t ns, pid, callback, hash;
uint64_t ns, pid, callback, flags, hash;

/* Parse callback info from the encoded guid */
if (sscanf (guid, APTERYX_GUID_FORMAT, &ns, &pid, &callback, &hash) != 4)
if (sscanf (guid, APTERYX_GUID_FORMAT, &ns, &pid, &callback, &flags, &hash) != 5)
{
ERROR ("Invalid GUID (%s)\n", guid ? : "NULL");
return NULL;
Expand Down Expand Up @@ -116,7 +116,7 @@ update_callback (struct callback_node *list, const char *guid, const char *value
cb_disable (cb);
cb_release (cb);
}
cb = cb_create (list, guid, value, pid, callback, ns);
cb = cb_create (list, guid, value, pid, callback, ns, flags);

/* This will either replace the entry removed above, or add a new one. */
pthread_rwlock_wrlock (&guid_lock);
Expand Down Expand Up @@ -417,55 +417,55 @@ config_init (void)

/* Debug set */
cb = cb_create (watch_list, "debug", APTERYX_DEBUG_PATH,
(uint64_t) getpid (), (uint64_t) (size_t) handle_debug_set, 0);
(uint64_t) getpid (), (uint64_t) (size_t) handle_debug_set, 0, 0);
cb_release (cb);

/* Counters */
cb = cb_create (index_list, "counters", APTERYX_COUNTERS "/",
(uint64_t) getpid (), (uint64_t) (size_t) handle_counters_index, 0);
(uint64_t) getpid (), (uint64_t) (size_t) handle_counters_index, 0, 0);
cb_release (cb);
cb = cb_create (provide_list, "counters", APTERYX_COUNTERS "/",
(uint64_t) getpid (), (uint64_t) (size_t) handle_counters_get, 0);
(uint64_t) getpid (), (uint64_t) (size_t) handle_counters_get, 0, 0);
cb_release (cb);

/* Statistics */
cb = cb_create (refresh_list, "statistics", APTERYX_STATISTICS "/*",
(uint64_t) getpid (), (uint64_t) (size_t) handle_statistics_refresh, 0);
(uint64_t) getpid (), (uint64_t) (size_t) handle_statistics_refresh, 0, 0);
cb_release (cb);

/* Sockets */
cb = cb_create (watch_list, "sockets", APTERYX_SOCKETS_PATH "/",
(uint64_t) getpid (), (uint64_t) (size_t) handle_sockets_set, 0);
(uint64_t) getpid (), (uint64_t) (size_t) handle_sockets_set, 0, 0);
cb_release (cb);

/* Indexers */
cb = cb_create (watch_list, "indexers", APTERYX_INDEXERS_PATH "/",
(uint64_t) getpid (), (uint64_t) (size_t) handle_indexers_set, 0);
(uint64_t) getpid (), (uint64_t) (size_t) handle_indexers_set, 0, 0);
cb_release (cb);

/* Watchers */
cb = cb_create (watch_list, "watchers", APTERYX_WATCHERS_PATH "/",
(uint64_t) getpid (), (uint64_t) (size_t) handle_watchers_set, 0);
(uint64_t) getpid (), (uint64_t) (size_t) handle_watchers_set, 0, 0);
cb_release (cb);

/* Refeshers */
cb = cb_create (watch_list, "refreshers", APTERYX_REFRESHERS_PATH "/",
(uint64_t) getpid (), (uint64_t) (size_t) handle_refreshers_set, 0);
(uint64_t) getpid (), (uint64_t) (size_t) handle_refreshers_set, 0, 0);
cb_release (cb);

/* Providers */
cb = cb_create (watch_list, "providers", APTERYX_PROVIDERS_PATH "/",
(uint64_t) getpid (), (uint64_t) (size_t) handle_providers_set, 0);
(uint64_t) getpid (), (uint64_t) (size_t) handle_providers_set, 0, 0);
cb_release (cb);

/* Validators */
cb = cb_create (watch_list, "validators", APTERYX_VALIDATORS_PATH "/",
(uint64_t) getpid (), (uint64_t) (size_t) handle_validators_set, 0);
(uint64_t) getpid (), (uint64_t) (size_t) handle_validators_set, 0, 0);
cb_release (cb);

/* Proxies */
cb = cb_create (watch_list, "proxies", APTERYX_PROXIES_PATH "/",
(uint64_t) getpid (), (uint64_t) (size_t) handle_proxies_set, 0);
(uint64_t) getpid (), (uint64_t) (size_t) handle_proxies_set, 0, 0);
cb_release (cb);
if (!cb)
{
Expand Down
Loading

0 comments on commit 710fc6d

Please sign in to comment.