Skip to content

Commit

Permalink
fixes nanomsg#451 task finalization could be better/smarter (resolver)
Browse files Browse the repository at this point in the history
This changes nni_task_fini to always run synchronously, waiting
for the task to finish before cleaning up.  Much simpler code.

Additionally, we've refactored the resolver code to avoid the
use of taskqs, which added complexity and inefficiency.  The
approach of just allocating its own threads and a work queue
to process them turns out to be vastly simpler, and actually
reduces extra allocations and context switches.

wip

POSIX resolv threads.

(Taskqs are just overhead and complexity here.)

Windows resolver changes.

Task cleanup.

fix up windows mutex.
  • Loading branch information
gdamore committed May 18, 2018
1 parent 109a559 commit 846c300
Show file tree
Hide file tree
Showing 3 changed files with 209 additions and 161 deletions.
22 changes: 2 additions & 20 deletions src/core/taskq.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ struct nni_task {
nni_taskq * task_tq;
unsigned task_busy;
bool task_prep;
bool task_fini;
nni_mtx task_mtx;
nni_cv task_cv;
};
Expand Down Expand Up @@ -56,14 +55,6 @@ nni_taskq_thread(void *self)
nni_mtx_lock(&task->task_mtx);
task->task_busy--;
if (task->task_busy == 0) {
if (task->task_fini) {
task->task_fini = false;
nni_mtx_unlock(&task->task_mtx);
nni_task_fini(task);

nni_mtx_lock(&tq->tq_mtx);
continue;
}
nni_cv_wake(&task->task_cv);
}
nni_mtx_unlock(&task->task_mtx);
Expand Down Expand Up @@ -158,12 +149,6 @@ nni_task_exec(nni_task *task)
nni_mtx_lock(&task->task_mtx);
task->task_busy--;
if (task->task_busy == 0) {
if (task->task_fini) {
task->task_fini = false;
nni_mtx_unlock(&task->task_mtx);
nni_task_fini(task);
return;
}
nni_cv_wake(&task->task_cv);
}
nni_mtx_unlock(&task->task_mtx);
Expand Down Expand Up @@ -238,11 +223,8 @@ nni_task_fini(nni_task *task)
{
NNI_ASSERT(!nni_list_node_active(&task->task_node));
nni_mtx_lock(&task->task_mtx);
if (task->task_busy) {
// destroy later.
task->task_fini = true;
nni_mtx_unlock(&task->task_mtx);
return;
while (task->task_busy) {
nni_cv_wait(&task->task_cv);
}
nni_mtx_unlock(&task->task_mtx);
nni_cv_fini(&task->task_cv);
Expand Down
155 changes: 97 additions & 58 deletions src/platform/posix/posix_resolv_gai.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,11 @@
#define NNG_POSIX_RESOLV_CONCURRENCY 4
#endif

static nni_taskq *resolv_tq = NULL;
static nni_mtx resolv_mtx;
static nni_mtx resolv_mtx;
static nni_cv resolv_cv;
static bool resolv_fini;
static nni_list resolv_aios;
static nni_thr resolv_thrs[NNG_POSIX_RESOLV_CONCURRENCY];

typedef struct resolv_item resolv_item;
struct resolv_item {
Expand All @@ -48,45 +51,40 @@ struct resolv_item {
const char * serv;
int proto;
nni_aio * aio;
nni_task * task;
nng_sockaddr sa;
};

static void
resolv_finish(resolv_item *item, int rv)
{
nni_aio *aio;

if (((aio = item->aio) != NULL) &&
(nni_aio_get_prov_data(aio) == item)) {
nng_sockaddr *sa = nni_aio_get_input(aio, 0);
nni_aio_set_prov_data(aio, NULL);
item->aio = NULL;
memcpy(sa, &item->sa, sizeof(*sa));
nni_aio_finish(aio, rv, 0);
nni_task_fini(item->task);
NNI_FREE_STRUCT(item);
}
}

static void
resolv_cancel(nni_aio *aio, int rv)
{
resolv_item *item;

nni_mtx_lock(&resolv_mtx);
if ((item = nni_aio_get_prov_data(aio)) == NULL) {
// Already canceled?
nni_mtx_unlock(&resolv_mtx);
return;
}
nni_aio_set_prov_data(aio, NULL);
item->aio = NULL;
nni_mtx_unlock(&resolv_mtx);
if (nni_aio_list_active(aio)) {
// We have not been picked up by a resolver thread yet,
// so we can just discard everything.
nni_aio_list_remove(aio);
nni_mtx_unlock(&resolv_mtx);
NNI_FREE_STRUCT(item);
} else {
// This case indicates the resolver is still processing our
// node. We can discard our interest in the result, but we
// can't interrupt the resolver itself. (Too bad, name
// resolution is utterly synchronous for now.)
item->aio = NULL;
nni_mtx_unlock(&resolv_mtx);
}
nni_aio_finish_error(aio, rv);
}

static int
nni_posix_gai_errno(int rv)
posix_gai_errno(int rv)
{
switch (rv) {
case 0:
Expand Down Expand Up @@ -116,25 +114,14 @@ nni_posix_gai_errno(int rv)
}
}

static void
resolv_task(void *arg)
static int
resolv_task(resolv_item *item)
{
resolv_item * item = arg;
struct addrinfo hints;
struct addrinfo *results;
struct addrinfo *probe;
int rv;

nni_mtx_lock(&resolv_mtx);
if (item->aio == NULL) {
nni_mtx_unlock(&resolv_mtx);
// Caller canceled, and no longer cares about this.
nni_task_fini(item->task);
NNI_FREE_STRUCT(item);
return;
}
nni_mtx_unlock(&resolv_mtx);

results = NULL;

// We treat these all as IP addresses. The service and the
Expand All @@ -152,7 +139,7 @@ resolv_task(void *arg)

rv = getaddrinfo(item->name, item->serv, &hints, &results);
if (rv != 0) {
rv = nni_posix_gai_errno(rv);
rv = posix_gai_errno(rv);
goto done;
}

Expand Down Expand Up @@ -196,9 +183,7 @@ resolv_task(void *arg)
freeaddrinfo(results);
}

nni_mtx_lock(&resolv_mtx);
resolv_finish(item, rv);
nni_mtx_unlock(&resolv_mtx);
return (rv);
}

static void
Expand Down Expand Up @@ -232,13 +217,6 @@ resolv_ip(const char *host, const char *serv, int passive, int family,
return;
}

if ((rv = nni_task_init(&item->task, resolv_tq, resolv_task, item)) !=
0) {
NNI_FREE_STRUCT(item);
nni_aio_finish_error(aio, rv);
return;
};

// NB: host and serv must remain valid until this is completed.
memset(&item->sa, 0, sizeof(item->sa));
item->passive = passive;
Expand All @@ -249,14 +227,19 @@ resolv_ip(const char *host, const char *serv, int passive, int family,
item->family = fam;

nni_mtx_lock(&resolv_mtx);
if ((rv = nni_aio_schedule(aio, resolv_cancel, item)) != 0) {
if (resolv_fini) {
rv = NNG_ECLOSED;
} else {
rv = nni_aio_schedule(aio, resolv_cancel, item);
}
if (rv != 0) {
nni_mtx_unlock(&resolv_mtx);
nni_task_fini(item->task);
NNI_FREE_STRUCT(item);
nni_aio_finish_error(aio, rv);
return;
}
nni_task_dispatch(item->task);
nni_list_append(&resolv_aios, aio);
nni_cv_wake1(&resolv_cv);
nni_mtx_unlock(&resolv_mtx);
}

Expand All @@ -274,27 +257,83 @@ nni_plat_udp_resolv(
resolv_ip(host, serv, passive, family, IPPROTO_UDP, aio);
}

void
resolv_worker(void *notused)
{

NNI_ARG_UNUSED(notused);

nni_mtx_lock(&resolv_mtx);
for (;;) {
nni_aio * aio;
resolv_item *item;
int rv;

if ((aio = nni_list_first(&resolv_aios)) == NULL) {
if (resolv_fini) {
break;
}
nni_cv_wait(&resolv_cv);
continue;
}

item = nni_aio_get_prov_data(aio);
nni_aio_list_remove(aio);

// Now attempt to do the work. This runs synchronously.
nni_mtx_unlock(&resolv_mtx);
rv = resolv_task(item);
nni_mtx_lock(&resolv_mtx);

// Check to make sure we were not canceled.
if ((aio = item->aio) != NULL) {
nng_sockaddr *sa = nni_aio_get_input(aio, 0);
nni_aio_set_prov_data(aio, NULL);
item->aio = NULL;
memcpy(sa, &item->sa, sizeof(*sa));
nni_aio_finish(aio, rv, 0);

NNI_FREE_STRUCT(item);
}
}
nni_mtx_unlock(&resolv_mtx);
}

int
nni_posix_resolv_sysinit(void)
{
int rv;

nni_mtx_init(&resolv_mtx);
nni_cv_init(&resolv_cv, &resolv_mtx);
nni_aio_list_init(&resolv_aios);

resolv_fini = false;

if ((rv = nni_taskq_init(&resolv_tq, 4)) != 0) {
nni_mtx_fini(&resolv_mtx);
return (rv);
for (int i = 0; i < NNG_POSIX_RESOLV_CONCURRENCY; i++) {
int rv = nni_thr_init(&resolv_thrs[i], resolv_worker, NULL);
if (rv != 0) {
nni_posix_resolv_sysfini();
return (rv);
}
}
for (int i = 0; i < NNG_POSIX_RESOLV_CONCURRENCY; i++) {
nni_thr_run(&resolv_thrs[i]);
}

return (0);
}

void
nni_posix_resolv_sysfini(void)
{
if (resolv_tq != NULL) {
nni_taskq_fini(resolv_tq);
resolv_tq = NULL;
nni_mtx_lock(&resolv_mtx);
resolv_fini = true;
nni_cv_wake(&resolv_cv);
nni_mtx_unlock(&resolv_mtx);

for (int i = 0; i < NNG_POSIX_RESOLV_CONCURRENCY; i++) {
nni_thr_fini(&resolv_thrs[i]);
}
nni_cv_fini(&resolv_cv);
nni_mtx_fini(&resolv_mtx);
}

Expand Down
Loading

0 comments on commit 846c300

Please sign in to comment.