From d1bcdce964e6a33a4baa74170937b4771769c0e6 Mon Sep 17 00:00:00 2001 From: Pantelis Antoniou Date: Sun, 17 Sep 2023 18:12:43 +0300 Subject: [PATCH] new steal Signed-off-by: Pantelis Antoniou --- src/blake3/blake3_thread.c | 173 +++++++++++++++++++++++++++++++++++++ src/blake3/blake3_thread.h | 14 +++ 2 files changed, 187 insertions(+) diff --git a/src/blake3/blake3_thread.c b/src/blake3/blake3_thread.c index d15c8c73..2c1261c7 100644 --- a/src/blake3/blake3_thread.c +++ b/src/blake3/blake3_thread.c @@ -450,6 +450,9 @@ void blake3_thread_pool_cleanup(blake3_thread_pool *tp) free(tp->threads); } + if (tp->work_pool) + free(tp->work_pool); + if (tp->records) { for (i = 0, wr = tp->records; i < tp->num_records; i++, wr++) { /* XXX */ @@ -535,6 +538,12 @@ int blake3_thread_pool_init(blake3_thread_pool *tp, unsigned int num_threads) memset(tp->records, 0, size); + tp->num_work_pool = num_threads; + tp->work_pool = malloc(sizeof(*tp->work_pool) * tp->num_work_pool); + if (!tp->work_pool) + goto err_out; + memset(tp->work_pool, 0, sizeof(*tp->work_pool) * tp->num_work_pool); + masks = (uint64_t *)(tp->records + tp->num_records); tp->available = (void *)masks; masks += num_records_words; @@ -743,6 +752,7 @@ void blake3_thread_arg_join(blake3_thread_pool *tp, blake3_work_exec_fn fn, blak blake3_thread_work_join(tp, works, count, check_fn); } +#if 0 static void *blake3_worker_thread_steal(void *arg) { blake3_thread *t = arg; @@ -894,6 +904,79 @@ static void *blake3_worker_thread_steal(void *arg) return NULL; } +#else +static void *blake3_worker_thread_steal(void *arg) +{ + blake3_thread *t = arg; + blake3_thread_pool *tp; + const blake3_thread_work *work; + blake3_thread_work *w, *wexp; + _Atomic(blake3_thread_work *) *wpp, *wppe; + unsigned int k, pos; + bool stored; + + /* store per thread info */ + pthread_setspecific(t->tp->key, t); + + fprintf(stderr, "%s: T#%u start; wait for go\n", __func__, t->id); + + while ((work = blake3_worker_wait_for_work(t)) != B3WORK_SHUTDOWN) { + + fprintf(stderr, "%s: T#%u got work %p\n", __func__, t->id, work); + + if (work != B3WORK_ENTER_STEAL) + work->fn(work->arg); + blake3_worker_signal_work_done(t, work); + + if (work == B3WORK_ENTER_STEAL) { + fprintf(stderr, "%s: T#%u entered steal mode\n", __func__, t->id); + tp = t->tp; + + assert(tp); + + fprintf(stderr, "%s: T#%u start from - %u\n", __func__, t->id, t->id % tp->num_work_pool); + + /* this is going to be simple */ + assert(tp->work_pool); + wpp = tp->work_pool + (t->id % tp->num_work_pool); + wppe = tp->work_pool + tp->num_work_pool; + assert(wpp < wppe); + pos = wpp - tp->work_pool; + assert(pos < tp->num_work_pool); + + while (atomic_load(&tp->steal_mode)) { + + pos = wpp - tp->work_pool; + assert(pos < tp->num_work_pool); + + w = atomic_load(wpp); + if (w) { + wexp = w; + stored = atomic_compare_exchange_strong(wpp, &wexp, NULL); + } else + stored = false; + if (++wpp >= wppe) + wpp = tp->work_pool; + + if (stored) { + fprintf(stderr, "%s: T#%u executing work from pool at pos #%u\n", __func__, t->id, pos); + w->fn(w->arg); + k = atomic_fetch_sub(&w->wp->work_left, 1); + if (k == 1) { + fprintf(stderr, "%s: T#%u last work\n", __func__, t->id); + } + } else + sched_yield(); + + + } + fprintf(stderr, "%s: T#%u leaving steal mode\n", __func__, t->id); + } + } + + return NULL; +} +#endif int blake3_thread_pool_enable_steal_mode(blake3_thread_pool *tp) { @@ -929,6 +1012,7 @@ int blake3_thread_pool_enable_steal_mode(blake3_thread_pool *tp) return 0; } +#if 0 void blake3_thread_work_join_steal(blake3_thread_pool *tp, const blake3_thread_work *works, size_t work_count, blake3_work_check_fn check_fn) { struct blake3_work_record *wr; @@ -1096,6 +1180,95 @@ void blake3_thread_work_join_steal(blake3_thread_pool *tp, const blake3_thread_w fprintf(stderr, "%s: T#%d ALL DONE\n", __func__, tid); } +#else +void blake3_thread_work_join_steal(blake3_thread_pool *tp, blake3_thread_work *works, size_t work_count, blake3_work_check_fn check_fn) +{ + blake3_work_pool wp_local, *wp = &wp_local; + struct blake3_thread_work *w, *wexp; + _Atomic(blake3_thread_work *) *wpp, *wppe; + unsigned int i, j, k, pos; + bool stored; + blake3_thread *t; + int tid; + + t = pthread_getspecific(tp->key); + tid = t ? (int)t->id : -1; + (void)tid; + + /* just a single (or no) work, or no threads? execute directly, or no work record available */ + if (work_count <= 1 || !tp || !tp->num_threads) { + fprintf(stderr, "%s: no parallel needed\n", __func__); + for (i = 0, w = works; i < work_count; i++, w++) + w->fn(w->arg); + return; + } + + wp->work_left = work_count; + + /* this is going to be simple */ + wpp = tp->work_pool; + wppe = wpp + tp->num_work_pool; + pos = wpp - tp->work_pool; + + (void)pos; + + /* pump work, while there's still unassigned */ + for (i = 0, w = works; i < work_count; i++, w++) { + + fprintf(stderr, "%s: W#%u\n", __func__, i); + + w->wp = wp; + stored = false; + for (j = 0; j < tp->num_work_pool && !stored; j++) { + wexp = NULL; + stored = atomic_compare_exchange_strong(wpp, &wexp, w); + pos = wpp - tp->work_pool; + if (++wpp >= wppe) + wpp = tp->work_pool; + } + + /* unable to store? execute locally */ + if (!stored) { + fprintf(stderr, "%s: W#%u not stored, executing direct\n", __func__, i); + + w->fn(w->arg); + /* atomically decrease the amount of work left */ + atomic_fetch_sub(&wp->work_left, 1); + } else { + fprintf(stderr, "%s: W#%u stored at pos #%u\n", __func__, i, pos); + } + } + + fprintf(stderr, "%s: submission complete\n", __func__); + + /* XXX this can be futex? */ + while (atomic_load(&wp->work_left)) { + + w = atomic_load(wpp); + if (w) { + wexp = w; + stored = atomic_compare_exchange_strong(wpp, &wexp, NULL); + } else + stored = false; + pos = wpp - tp->work_pool; + if (++wpp >= wppe) + wpp = tp->work_pool; + + if (stored) { + fprintf(stderr, "%s: executing work from pool at pos #%u\n", __func__, pos); + w->fn(w->arg); + k = atomic_fetch_sub(&w->wp->work_left, 1); + if (k == 1) { + fprintf(stderr, "%s: last work\n", __func__); + } + } else + sched_yield(); + } + + fprintf(stderr, "%s: done\n", __func__); +} +#endif + void blake3_thread_args_join_steal(blake3_thread_pool *tp, blake3_work_exec_fn fn, blake3_work_check_fn check_fn, void **args, size_t count) { diff --git a/src/blake3/blake3_thread.h b/src/blake3/blake3_thread.h index 0f9675ad..86166cfb 100644 --- a/src/blake3/blake3_thread.h +++ b/src/blake3/blake3_thread.h @@ -15,9 +15,17 @@ struct blake3_thread_pool; typedef void (*blake3_work_exec_fn)(void *arg); typedef bool (*blake3_work_check_fn)(const void *arg); +struct blake3_work_record; + +typedef struct blake3_work_pool { + _Atomic(unsigned int) work_left; +} blake3_work_pool; + typedef struct blake3_thread_work { blake3_work_exec_fn fn; void *arg; + struct blake3_work_pool *wp; + void *unused; } blake3_thread_work; typedef struct blake3_work_record { @@ -64,6 +72,8 @@ typedef struct blake3_thread_pool { _Atomic(uint64_t) *submit; _Atomic(bool) steal_mode; pthread_key_t key; + unsigned int num_work_pool; + _Atomic(struct blake3_thread_work *) *work_pool; } blake3_thread_pool; blake3_thread_pool *blake3_thread_pool_create(unsigned int num_threads); @@ -83,7 +93,11 @@ void blake3_thread_pool_unreserve_work_record(blake3_work_record *wr); int blake3_thread_pool_enable_steal_mode(blake3_thread_pool *tp); +#if 0 void blake3_thread_join_steal(blake3_thread_pool *tp, const blake3_thread_work *works, size_t work_count, blake3_work_check_fn check_fn); +#else +void blake3_thread_join_steal(blake3_thread_pool *tp, blake3_thread_work *works, size_t work_count, blake3_work_check_fn check_fn); +#endif void blake3_thread_args_join_steal(blake3_thread_pool *tp, blake3_work_exec_fn, blake3_work_check_fn check_fn, void **args, size_t count); void blake3_thread_arg_array_join_steal(blake3_thread_pool *tp, blake3_work_exec_fn fn, blake3_work_check_fn check_fn, void *args, size_t argsize, size_t count); void blake3_thread_arg_join_steal(blake3_thread_pool *tp, blake3_work_exec_fn fn, blake3_work_check_fn check_fn, void *arg, size_t count);