Skip to content

Commit

Permalink
new steal
Browse files Browse the repository at this point in the history
Signed-off-by: Pantelis Antoniou <[email protected]>
  • Loading branch information
pantoniou committed Sep 17, 2023
1 parent 6d9f327 commit d1bcdce
Show file tree
Hide file tree
Showing 2 changed files with 187 additions and 0 deletions.
173 changes: 173 additions & 0 deletions src/blake3/blake3_thread.c
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,9 @@ void blake3_thread_pool_cleanup(blake3_thread_pool *tp)
free(tp->threads);
}

if (tp->work_pool)
free(tp->work_pool);

if (tp->records) {
for (i = 0, wr = tp->records; i < tp->num_records; i++, wr++) {
/* XXX */
Expand Down Expand Up @@ -535,6 +538,12 @@ int blake3_thread_pool_init(blake3_thread_pool *tp, unsigned int num_threads)

memset(tp->records, 0, size);

tp->num_work_pool = num_threads;
tp->work_pool = malloc(sizeof(*tp->work_pool) * tp->num_work_pool);
if (!tp->work_pool)
goto err_out;
memset(tp->work_pool, 0, sizeof(*tp->work_pool) * tp->num_work_pool);

masks = (uint64_t *)(tp->records + tp->num_records);
tp->available = (void *)masks;
masks += num_records_words;
Expand Down Expand Up @@ -743,6 +752,7 @@ void blake3_thread_arg_join(blake3_thread_pool *tp, blake3_work_exec_fn fn, blak
blake3_thread_work_join(tp, works, count, check_fn);
}

#if 0
static void *blake3_worker_thread_steal(void *arg)
{
blake3_thread *t = arg;
Expand Down Expand Up @@ -894,6 +904,79 @@ static void *blake3_worker_thread_steal(void *arg)

return NULL;
}
#else
static void *blake3_worker_thread_steal(void *arg)
{
blake3_thread *t = arg;
blake3_thread_pool *tp;
const blake3_thread_work *work;
blake3_thread_work *w, *wexp;
_Atomic(blake3_thread_work *) *wpp, *wppe;
unsigned int k, pos;
bool stored;

/* store per thread info */
pthread_setspecific(t->tp->key, t);

fprintf(stderr, "%s: T#%u start; wait for go\n", __func__, t->id);

while ((work = blake3_worker_wait_for_work(t)) != B3WORK_SHUTDOWN) {

fprintf(stderr, "%s: T#%u got work %p\n", __func__, t->id, work);

if (work != B3WORK_ENTER_STEAL)
work->fn(work->arg);
blake3_worker_signal_work_done(t, work);

if (work == B3WORK_ENTER_STEAL) {
fprintf(stderr, "%s: T#%u entered steal mode\n", __func__, t->id);
tp = t->tp;

assert(tp);

fprintf(stderr, "%s: T#%u start from - %u\n", __func__, t->id, t->id % tp->num_work_pool);

/* this is going to be simple */
assert(tp->work_pool);
wpp = tp->work_pool + (t->id % tp->num_work_pool);
wppe = tp->work_pool + tp->num_work_pool;
assert(wpp < wppe);
pos = wpp - tp->work_pool;
assert(pos < tp->num_work_pool);

while (atomic_load(&tp->steal_mode)) {

pos = wpp - tp->work_pool;
assert(pos < tp->num_work_pool);

w = atomic_load(wpp);
if (w) {
wexp = w;
stored = atomic_compare_exchange_strong(wpp, &wexp, NULL);
} else
stored = false;
if (++wpp >= wppe)
wpp = tp->work_pool;

if (stored) {
fprintf(stderr, "%s: T#%u executing work from pool at pos #%u\n", __func__, t->id, pos);
w->fn(w->arg);
k = atomic_fetch_sub(&w->wp->work_left, 1);
if (k == 1) {
fprintf(stderr, "%s: T#%u last work\n", __func__, t->id);
}
} else
sched_yield();


}
fprintf(stderr, "%s: T#%u leaving steal mode\n", __func__, t->id);
}
}

return NULL;
}
#endif

int blake3_thread_pool_enable_steal_mode(blake3_thread_pool *tp)
{
Expand Down Expand Up @@ -929,6 +1012,7 @@ int blake3_thread_pool_enable_steal_mode(blake3_thread_pool *tp)
return 0;
}

#if 0
void blake3_thread_work_join_steal(blake3_thread_pool *tp, const blake3_thread_work *works, size_t work_count, blake3_work_check_fn check_fn)
{
struct blake3_work_record *wr;
Expand Down Expand Up @@ -1096,6 +1180,95 @@ void blake3_thread_work_join_steal(blake3_thread_pool *tp, const blake3_thread_w

fprintf(stderr, "%s: T#%d ALL DONE\n", __func__, tid);
}
#else
void blake3_thread_work_join_steal(blake3_thread_pool *tp, blake3_thread_work *works, size_t work_count, blake3_work_check_fn check_fn)
{
blake3_work_pool wp_local, *wp = &wp_local;
struct blake3_thread_work *w, *wexp;
_Atomic(blake3_thread_work *) *wpp, *wppe;
unsigned int i, j, k, pos;
bool stored;
blake3_thread *t;
int tid;

t = pthread_getspecific(tp->key);
tid = t ? (int)t->id : -1;
(void)tid;

/* just a single (or no) work, or no threads? execute directly, or no work record available */
if (work_count <= 1 || !tp || !tp->num_threads) {
fprintf(stderr, "%s: no parallel needed\n", __func__);
for (i = 0, w = works; i < work_count; i++, w++)
w->fn(w->arg);
return;
}

wp->work_left = work_count;

/* this is going to be simple */
wpp = tp->work_pool;
wppe = wpp + tp->num_work_pool;
pos = wpp - tp->work_pool;

(void)pos;

/* pump work, while there's still unassigned */
for (i = 0, w = works; i < work_count; i++, w++) {

fprintf(stderr, "%s: W#%u\n", __func__, i);

w->wp = wp;
stored = false;
for (j = 0; j < tp->num_work_pool && !stored; j++) {
wexp = NULL;
stored = atomic_compare_exchange_strong(wpp, &wexp, w);
pos = wpp - tp->work_pool;
if (++wpp >= wppe)
wpp = tp->work_pool;
}

/* unable to store? execute locally */
if (!stored) {
fprintf(stderr, "%s: W#%u not stored, executing direct\n", __func__, i);

w->fn(w->arg);
/* atomically decrease the amount of work left */
atomic_fetch_sub(&wp->work_left, 1);
} else {
fprintf(stderr, "%s: W#%u stored at pos #%u\n", __func__, i, pos);
}
}

fprintf(stderr, "%s: submission complete\n", __func__);

/* XXX this can be futex? */
while (atomic_load(&wp->work_left)) {

w = atomic_load(wpp);
if (w) {
wexp = w;
stored = atomic_compare_exchange_strong(wpp, &wexp, NULL);
} else
stored = false;
pos = wpp - tp->work_pool;
if (++wpp >= wppe)
wpp = tp->work_pool;

if (stored) {
fprintf(stderr, "%s: executing work from pool at pos #%u\n", __func__, pos);
w->fn(w->arg);
k = atomic_fetch_sub(&w->wp->work_left, 1);
if (k == 1) {
fprintf(stderr, "%s: last work\n", __func__);
}
} else
sched_yield();
}

fprintf(stderr, "%s: done\n", __func__);
}
#endif


void blake3_thread_args_join_steal(blake3_thread_pool *tp, blake3_work_exec_fn fn, blake3_work_check_fn check_fn, void **args, size_t count)
{
Expand Down
14 changes: 14 additions & 0 deletions src/blake3/blake3_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,17 @@ struct blake3_thread_pool;
typedef void (*blake3_work_exec_fn)(void *arg);
typedef bool (*blake3_work_check_fn)(const void *arg);

struct blake3_work_record;

typedef struct blake3_work_pool {
_Atomic(unsigned int) work_left;
} blake3_work_pool;

typedef struct blake3_thread_work {
blake3_work_exec_fn fn;
void *arg;
struct blake3_work_pool *wp;
void *unused;
} blake3_thread_work;

typedef struct blake3_work_record {
Expand Down Expand Up @@ -64,6 +72,8 @@ typedef struct blake3_thread_pool {
_Atomic(uint64_t) *submit;
_Atomic(bool) steal_mode;
pthread_key_t key;
unsigned int num_work_pool;
_Atomic(struct blake3_thread_work *) *work_pool;
} blake3_thread_pool;

blake3_thread_pool *blake3_thread_pool_create(unsigned int num_threads);
Expand All @@ -83,7 +93,11 @@ void blake3_thread_pool_unreserve_work_record(blake3_work_record *wr);

int blake3_thread_pool_enable_steal_mode(blake3_thread_pool *tp);

#if 0
void blake3_thread_join_steal(blake3_thread_pool *tp, const blake3_thread_work *works, size_t work_count, blake3_work_check_fn check_fn);
#else
void blake3_thread_join_steal(blake3_thread_pool *tp, blake3_thread_work *works, size_t work_count, blake3_work_check_fn check_fn);
#endif
void blake3_thread_args_join_steal(blake3_thread_pool *tp, blake3_work_exec_fn, blake3_work_check_fn check_fn, void **args, size_t count);
void blake3_thread_arg_array_join_steal(blake3_thread_pool *tp, blake3_work_exec_fn fn, blake3_work_check_fn check_fn, void *args, size_t argsize, size_t count);
void blake3_thread_arg_join_steal(blake3_thread_pool *tp, blake3_work_exec_fn fn, blake3_work_check_fn check_fn, void *arg, size_t count);
Expand Down

0 comments on commit d1bcdce

Please sign in to comment.