Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure workers have correct sigmask #425

Merged
merged 1 commit into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 18 additions & 9 deletions rpc.c
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,14 @@ work_destroy (gpointer data)
}

static void
worker_func (gpointer a, gpointer b)
worker_func (struct rpc_work_s *work, sigset_t *sigmask)
{
struct rpc_work_s *work = (struct rpc_work_s *)a;
sigset_t oldmask;

/* Process callbacks using the worker sigmask */
if (sigmask)
pthread_sigmask (SIG_SETMASK, sigmask, &oldmask);

if (work && work->cb)
{
if (work->cb (work->data) == G_SOURCE_REMOVE)
Expand All @@ -114,10 +119,6 @@ worker_func (gpointer a, gpointer b)
rpc_id id = work->id;
rpc_message msg = &work->msg;

/* Set the signal mask for the worker thread */
sigset_t *mask = (sigset_t *)b;
pthread_sigmask (SIG_SETMASK, mask, NULL);

/* TEST: force a delay here to change callback timing */
if (rpc_test_random_watch_delay)
usleep (rand() & RPC_TEST_DELAY_MASK);
Expand All @@ -128,7 +129,7 @@ worker_func (gpointer a, gpointer b)
{
DEBUG ("RPC[%i]: handler failed\n", sock->sock);
work_destroy (work);
return;
goto exit;
}

/* Send result */
Expand All @@ -144,6 +145,10 @@ worker_func (gpointer a, gpointer b)
}
work_destroy (work);
}
exit:
/* Restore the process sigmask */
if (sigmask)
pthread_sigmask (SIG_SETMASK, sigmask, &oldmask);
}

static gboolean
Expand All @@ -170,14 +175,18 @@ slow_callback_fn (gpointer arg1)
}
}
else
worker_func (arg1, NULL);
worker_func (arg1, &rpc->worker_sigmask);
g_atomic_int_dec_and_test (&rpc->slow_count);
return G_SOURCE_REMOVE;
}

static gpointer
slow_thread_fn (gpointer data)
{
/* Block all signals */
sigset_t set;
sigfillset (&set);
pthread_sigmask (SIG_BLOCK, &set, NULL);
rpc_instance rpc = (rpc_instance) data;
g_main_loop_run (rpc->slow_loop);
g_main_loop_unref (rpc->slow_loop);
Expand Down Expand Up @@ -552,7 +561,7 @@ rpc_server_process (rpc_instance rpc, bool poll)
/* Check for work and process it if required */
if (poll)
{
gpointer *work = g_async_queue_try_pop (rpc->queue);
struct rpc_work_s *work = (struct rpc_work_s *) g_async_queue_try_pop (rpc->queue);
if (work)
{
/* Process a single work job */
Expand Down
63 changes: 63 additions & 0 deletions test.c
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,22 @@ assert_apteryx_empty (void)
return true;
}

static inline void
dump_sigset_t (sigset_t *set)
{
int i = SIGRTMAX;
do {
int x = 0;
i -= 4;
if (sigismember(set, i+1)) x |= 1;
if (sigismember(set, i+2)) x |= 2;
if (sigismember(set, i+3)) x |= 4;
if (sigismember(set, i+4)) x |= 8;
printf ("%x", x);
} while (i >= 4);
printf ("\n");
}

void
test_init ()
{
Expand Down Expand Up @@ -2169,6 +2185,29 @@ test_watch_ack_thread ()
_path = NULL;
}

static bool
test_watch_sigmask_callback (const char *path, const char *value)
{
sigset_t empty;
sigset_t set;
sigemptyset (&empty);
pthread_sigmask (SIG_SETMASK, NULL, &set);
if (memcmp ((void *)&set, (void *)&empty, sizeof (sigset_t)) != 0)
dump_sigset_t (&set);
CU_ASSERT (memcmp ((void *)&set, (void *)&empty, sizeof (sigset_t)) == 0);
return true;
}

void
test_watch_sigmask ()
{
const char *path = TEST_PATH"/entity/zones/private/state";
CU_ASSERT (apteryx_watch (path, test_watch_sigmask_callback));
CU_ASSERT (apteryx_set_string (path, NULL, "down"));
CU_ASSERT (apteryx_unwatch (path, test_watch_sigmask_callback));
apteryx_set_string (path, NULL, NULL);
}

static bool
test_perf_watch_callback (const char *path, const char *value)
{
Expand Down Expand Up @@ -3209,6 +3248,28 @@ test_refresh_multiple_branches ()
CU_ASSERT (assert_apteryx_empty ());
}

static uint64_t
test_refresh_sigmask_callback (const char *path)
{
sigset_t empty;
sigset_t set;
sigemptyset (&empty);
pthread_sigmask (SIG_SETMASK, NULL, &set);
if (memcmp ((void *)&set, (void *)&empty, sizeof (sigset_t)) != 0)
dump_sigset_t (&set);
CU_ASSERT (memcmp ((void *)&set, (void *)&empty, sizeof (sigset_t)) == 0);
return _cb_timeout;
}

void
test_refresh_sigmask ()
{
const char *path = TEST_PATH"/interfaces/eth0/state";
CU_ASSERT (apteryx_refresh (path, test_refresh_sigmask_callback));
CU_ASSERT (apteryx_get (path) == NULL);
CU_ASSERT (apteryx_unrefresh (path, test_refresh_sigmask_callback));
}

/* If a refresher is called while traversing the database we can end up
* with lock contention when the refresher attempts to write to the
* database. The callback can be expected to be called more than once
Expand Down Expand Up @@ -10212,6 +10273,7 @@ static CU_TestInfo tests_api_watch[] = {
{ "watch rpc restart", test_watch_rpc_restart },
{ "watch myself blocked", test_watch_myself_blocked },
{ "watch and watch_with_ack in same thread", test_watch_ack_thread },
{ "watch called with correct sigmask", test_watch_sigmask},
CU_TEST_INFO_NULL,
};

Expand Down Expand Up @@ -10257,6 +10319,7 @@ static CU_TestInfo tests_api_refresh[] = {
{ "refresh more specific second", test_refresh_more_specific_second },
{ "refresh node substring", test_refresh_node_substring },
{ "refresh multiple branches", test_refresh_multiple_branches },
{ "refresh called with correct sigmask", test_refresh_sigmask },
CU_TEST_INFO_NULL,
};

Expand Down
Loading