diff --git a/src/blake3/blake3.c b/src/blake3/blake3.c index 22566962..acf5c556 100644 --- a/src/blake3/blake3.c +++ b/src/blake3/blake3.c @@ -212,7 +212,7 @@ static size_t blake3_compress_parents_parallel(blake3_host_state *hs, const uint unsigned int simd_degree_or_2; size_t parents_array_len = 0; - simd_degree_or_2 = hs->simd_degree_or_2; + simd_degree_or_2 = hs->simd_degree < 2 ? 2 : hs->simd_degree; assert(2 <= num_chaining_values); assert(num_chaining_values <= 2 * simd_degree_or_2); @@ -242,11 +242,11 @@ static size_t blake3_compress_parents_parallel(blake3_host_state *hs, const uint static size_t blake3_compress_subtree_wide(blake3_host_state *hs, const uint8_t *input, size_t input_len, const uint32_t key[8], uint64_t chunk_counter, uint8_t flags, uint8_t *out); -static void blake3_compress_subtree_wide_thread(struct _blake3_thread *t, void *arg) +static void blake3_compress_subtree_wide_thread(void *arg) { blake3_compress_subtree_state *s = arg; - s->n = blake3_compress_subtree_wide(t->hs, s->input, s->input_len, s->key, s->chunk_counter, s->flags, s->out); + s->n = blake3_compress_subtree_wide(s->hs, s->input, s->input_len, s->key, s->chunk_counter, s->flags, s->out); } // The wide helper function returns (writes out) an array of chaining values @@ -274,13 +274,14 @@ static size_t blake3_compress_subtree_wide(blake3_host_state *hs, const uint8_t uint8_t *cv_array, *left_cvs, *right_cvs; bool left_mt, right_mt; blake3_compress_subtree_state left_state, right_state; + blake3_thread_work left_work, right_work; blake3_thread *left_thread, *right_thread; int rc; (void)rc; simd_degree = hs->simd_degree; - simd_degree_or_2 = hs->simd_degree_or_2; + simd_degree_or_2 = hs->simd_degree < 2 ? 2 : hs->simd_degree; /* for multithreading to be worth it has to be this big */ mt_degree = hs->mt_degree; @@ -344,14 +345,14 @@ static size_t blake3_compress_subtree_wide(blake3_host_state *hs, const uint8_t // get a thread (if possible) if (left_mt) { - left_thread = blake3_host_state_thread_pool_reserve(hs, false); + left_thread = blake3_thread_pool_reserve(hs->tp, false); // fprintf(stderr, "%s: left_thread #%d ctr=%lu chunks=%lu\n", __func__, left_thread ? (int)left_thread->id : -1, // left_chunk_counter, left_input_len / BLAKE3_CHUNK_LEN); } else left_thread = NULL; if (right_mt) { - right_thread = blake3_host_state_thread_pool_reserve(hs, false); + right_thread = blake3_thread_pool_reserve(hs->tp, false); // fprintf(stderr, "%s: right_thread #%d ctr=%lu chunks=%lu\n", __func__, right_thread ? (int)right_thread->id : -1, // right_chunk_counter, right_input_len / BLAKE3_CHUNK_LEN); } else @@ -361,8 +362,10 @@ static size_t blake3_compress_subtree_wide(blake3_host_state *hs, const uint8_t left_n = blake3_compress_subtree_wide(hs, left_input, left_input_len, key, left_chunk_counter, flags, left_cvs); assert(left_n > 0); left_thread = NULL; + left_work.fn = NULL; + left_work.arg = NULL; } else { - + left_state.hs = hs; left_state.input = left_input; left_state.input_len = left_input_len; left_state.key = key; @@ -371,7 +374,10 @@ static size_t blake3_compress_subtree_wide(blake3_host_state *hs, const uint8_t left_state.out = left_cvs; left_state.n = 0; - rc = blake3_thread_submit(left_thread, blake3_compress_subtree_wide_thread, &left_state); + left_work.fn = blake3_compress_subtree_wide_thread; + left_work.arg = &left_state; + + rc = blake3_thread_submit_work(left_thread, &left_work); assert(!rc); } @@ -380,7 +386,11 @@ static size_t blake3_compress_subtree_wide(blake3_host_state *hs, const uint8_t assert(right_n > 0); right_thread = NULL; + right_work.fn = NULL; + right_work.arg = NULL; } else { + + right_state.hs = hs; right_state.input = right_input; right_state.input_len = right_input_len; right_state.key = key; @@ -389,19 +399,24 @@ static size_t blake3_compress_subtree_wide(blake3_host_state *hs, const uint8_t right_state.out = right_cvs; right_state.n = 0; - rc = blake3_thread_submit(right_thread, blake3_compress_subtree_wide_thread, &right_state); + right_work.fn = blake3_compress_subtree_wide_thread; + right_work.arg = &right_state; + + rc = blake3_thread_submit_work(right_thread, &right_work); assert(!rc); } if (left_thread) { - blake3_thread_wait_result_and_release(left_thread); - blake3_host_state_thread_pool_unreserve(hs, left_thread); + blake3_thread_wait_work(left_thread); + blake3_thread_pool_unreserve(hs->tp, left_thread); + left_thread = NULL; left_n = left_state.n; } if (right_thread) { - blake3_thread_wait_result_and_release(right_thread); - blake3_host_state_thread_pool_unreserve(hs, right_thread); + blake3_thread_wait_work(right_thread); + blake3_thread_pool_unreserve(hs->tp, right_thread); + right_thread = NULL; right_n = right_state.n; } @@ -436,7 +451,7 @@ static void blake3_compress_subtree_to_parent_node(blake3_host_state *hs, const uint8_t *cv_array, *out_array; size_t simd_degree_or_2, num_cvs; - simd_degree_or_2 = hs->simd_degree_or_2; + simd_degree_or_2 = hs->simd_degree < 2 ? 2 : hs->simd_degree; cv_array = alloca(simd_degree_or_2 * BLAKE3_OUT_LEN); out_array = alloca(simd_degree_or_2 * BLAKE3_OUT_LEN / 2); diff --git a/src/blake3/blake3_host_state.c b/src/blake3/blake3_host_state.c index 83d5b5ae..38ebcdac 100644 --- a/src/blake3/blake3_host_state.c +++ b/src/blake3/blake3_host_state.c @@ -16,43 +16,6 @@ #include "blake3_impl.h" #include "blake3_internal.h" -#if 0 -static void test_thread_fn(struct _blake3_thread *t, void *arg) -{ - int *p = arg; - - (*p)++; -} - -void test_threads(blake3_host_state *hs) -{ - blake3_thread **threads; - unsigned int i, count; - int test_count; - - count = hs->num_threads; - threads = alloca(count * sizeof(*threads)); - - test_count = 0; - - for (i = 0; i < count; i++) { - threads[i] = blake3_thread_pool_reserve(hs, true); - (void)blake3_thread_submit(threads[i], test_thread_fn, &test_count); - } - - for (i = 0; i < count; i++) - (void)blake3_thread_wait_result(threads[i]); - - fprintf(stderr, "%s: test_count=%d\n", __func__, test_count); - - for (i = 0; i < count; i++) { - blake3_thread_release(threads[i]); - blake3_thread_pool_unreserve(hs, threads[i]); - } - -} -#endif - static uint64_t supported_gpu_backends(void) { return 0; @@ -409,8 +372,6 @@ static int select_backends(blake3_host_state *hs) if (hs->simd_degree < hs->compress_in_place_be->simd_degree) hs->simd_degree = hs->compress_in_place_be->simd_degree; - hs->simd_degree_or_2 = hs->simd_degree < 2 ? 2 : hs->simd_degree; - hs->mt_degree = hs->cfg.mt_degree > 0 ? hs->cfg.mt_degree : 128; /* 128K default */ return 0; @@ -418,8 +379,6 @@ static int select_backends(blake3_host_state *hs) int blake3_host_state_init(blake3_host_state *hs, const blake3_host_config *cfg) { - blake3_thread *t; - unsigned int i; long scval; int rc; @@ -443,8 +402,8 @@ int blake3_host_state_init(blake3_host_state *hs, const blake3_host_config *cfg) assert(!rc); if (!hs->cfg.no_mthread) { - - hs->num_threads = hs->cfg.num_threads ? hs->cfg.num_threads : hs->num_cpus; + hs->num_threads = hs->cfg.num_threads ? hs->cfg.num_threads : (hs->num_cpus * 3) / 2; +#if 0 /* maximum number of threads for now */ if (hs->num_threads > 64) @@ -466,11 +425,12 @@ int blake3_host_state_init(blake3_host_state *hs, const blake3_host_config *cfg) rc = blake3_thread_init(hs, t); assert(rc == 0); } - -#if 0 - test_threads(hs); - test_threads(hs); +#else + hs->tp = blake3_thread_pool_create(hs->num_threads); + assert(hs->tp); #endif + } else { + hs->tp = NULL; } if (hs->cfg.debug) { @@ -479,7 +439,6 @@ int blake3_host_state_init(blake3_host_state *hs, const blake3_host_config *cfg) fprintf(stderr, "multithreading: %s\n", hs->cfg.no_mthread ? "false" : "true"); fprintf(stderr, "num_threads: %u\n", hs->num_threads); fprintf(stderr, "simd_degree: %u\n", hs->simd_degree); - fprintf(stderr, "simd_degree_or_2: %u\n", hs->simd_degree_or_2); fprintf(stderr, "mt_degree: %u\n", hs->mt_degree); fprintf(stderr, "supported_backends:\n"); @@ -499,11 +458,9 @@ int blake3_host_state_init(blake3_host_state *hs, const blake3_host_config *cfg) void blake3_host_state_cleanup(blake3_host_state *hs) { - blake3_thread *t; - unsigned int i; - assert(hs); +#if 0 if (hs->threads) { for (i = 0; i < hs->num_threads; i++) { @@ -515,6 +472,10 @@ void blake3_host_state_cleanup(blake3_host_state *hs) hs->threads = NULL; hs->num_threads = 0; } +#else + if (hs->tp) + blake3_thread_pool_destroy(hs->tp); +#endif } struct _blake3_host_state *blake3_host_state_create(const blake3_host_config *cfg) diff --git a/src/blake3/blake3_internal.h b/src/blake3/blake3_internal.h index eeaf0803..63f40a7f 100644 --- a/src/blake3/blake3_internal.h +++ b/src/blake3/blake3_internal.h @@ -202,6 +202,7 @@ static inline blake3_output_t blake3_chunk_state_output(const blake3_chunk_state // the state for the thread when doing compress subtree typedef struct { + struct _blake3_host_state *hs; // inputs const uint8_t *input; size_t input_len; @@ -280,10 +281,12 @@ typedef struct _blake3_host_state { blake3_compress_in_place_f compress_in_place; unsigned int simd_degree; - unsigned int simd_degree_or_2; unsigned int mt_degree; unsigned int num_threads; + + struct _blake3_thread_pool *tp; // thread pool +#if 0 struct _blake3_thread *threads; pthread_mutex_t lock_thread_free; // should not be needed at the end if we use atomics pthread_cond_t cond_thread_free; @@ -292,18 +295,9 @@ typedef struct _blake3_host_state { #else uint64_t thread_free; #endif -} blake3_host_state; - -int blake3_thread_init(blake3_host_state *hs, blake3_thread *t); -void blake3_thread_shutdown(blake3_thread *t); - -int blake3_thread_submit(blake3_thread *t, blake3_thread_fn fn, void *arg); -int blake3_thread_wait_result(blake3_thread *t); -void blake3_thread_release(blake3_thread *t); -int blake3_thread_wait_result_and_release(blake3_thread *t); +#endif -blake3_thread *blake3_host_state_thread_pool_reserve(blake3_host_state *hs, bool wait); -void blake3_host_state_thread_pool_unreserve(blake3_host_state *hs, blake3_thread *t); +} blake3_host_state; int blake3_host_state_init(blake3_host_state *hs, const blake3_host_config *cfg); void blake3_host_state_cleanup(blake3_host_state *hs); @@ -313,5 +307,11 @@ void blake3_hasher_init_keyed(blake3_host_state *hs, blake3_hasher *self, const void blake3_hasher_init_derive_key(blake3_host_state *hs, blake3_hasher *self, const char *context); void blake3_hasher_init_derive_key_raw(blake3_host_state *hs, blake3_hasher *self, const void *context, size_t context_len); +blake3_thread_pool *blake3_thread_pool_create(unsigned int num_threads); +void blake3_thread_pool_destroy(blake3_thread_pool *tp); +blake3_thread *blake3_thread_pool_reserve(blake3_thread_pool *tp, bool wait); +void blake3_thread_pool_unreserve(blake3_thread_pool *tp, blake3_thread *t); +int blake3_thread_submit_work(blake3_thread *t, const blake3_thread_work *work); +int blake3_thread_wait_work(blake3_thread *t); #endif diff --git a/src/blake3/blake3_thread.c b/src/blake3/blake3_thread.c index f18e821e..5940121a 100644 --- a/src/blake3/blake3_thread.c +++ b/src/blake3/blake3_thread.c @@ -16,390 +16,6 @@ #include "blake3_impl.h" #include "blake3_internal.h" -void *blake3_thread_start_fn(void *arg) -{ - blake3_thread *t = arg; - blake3_host_state *hs; - unsigned int state; - int rc; - - (void)rc; - (void)hs; - assert(t); - hs = t->hs; - assert(hs); - - assert(atomic_load(&t->state) == B3TS_STARTUP); - - /* verify that thread state is sane */ - assert(t >= hs->threads && t < hs->threads + hs->num_threads); - assert(t->id < hs->num_threads); - - // fprintf(stderr, "%s: starting #%d\n", __func__, t->id); - - pthread_mutex_lock(&t->lock); - - /* change state and signal the user */ - atomic_store(&t->state, B3TS_WAITING); - pthread_cond_signal(&t->cond); - - for (;;) { - - /* wait here until we get handed a function to execute */ - while ((state = atomic_load(&t->state)) == B3TS_WAITING) { - rc = pthread_cond_wait(&t->cond, &t->lock); - assert(!rc); - } - - /* shutting down? */ - if (state == B3TS_SHUTTINGDOWN) - break; -submitted: - /* only submitted allowed here */ - assert(state == B3TS_SUBMITTED); - assert(t->fn); - - /* we're now processing */ - atomic_store(&t->state, B3TS_PROCESSING); - pthread_mutex_unlock(&t->lock); - - /* execute out of the lock */ - t->fn(t, atomic_load(&t->arg)); - - pthread_mutex_lock(&t->lock); - - /* shutting down? */ - if (atomic_load(&t->state) == B3TS_SHUTTINGDOWN) - break; - - assert(atomic_load(&t->state) == B3TS_PROCESSING); - - /* we're done... change state and signal all */ - atomic_store(&t->state, B3TS_DONE); - pthread_cond_signal(&t->cond); - - /* wait here until the results are consumed */ - while ((state = atomic_load(&t->state)) == B3TS_DONE) { - rc = pthread_cond_wait(&t->cond, &t->lock); - assert(!rc); - } - - /* shutting down? */ - if (state == B3TS_SHUTTINGDOWN) - break; - - /* user must consume the data */ - assert(state == B3TS_RELEASE || state == B3TS_SUBMITTED); - - /* we went from done to submitted, it's cool */ - if (state == B3TS_SUBMITTED) - goto submitted; - - t->fn = NULL; - - /* and now we're back to waiting */ - atomic_store(&t->state, B3TS_WAITING); - rc = pthread_cond_signal(&t->cond); - assert(rc == 0); - } - - atomic_store(&t->state, B3TS_SHUTDOWN); - pthread_mutex_unlock(&t->lock); - - return NULL; -} - -int blake3_thread_init(blake3_host_state *hs, blake3_thread *t) -{ - int rc; - - (void)rc; - t->hs = hs; - - t->id = t - hs->threads; - - rc = pthread_mutex_init(&t->lock, NULL); - assert(!rc); - rc = pthread_cond_init(&t->cond, NULL); - assert(!rc); - - atomic_store(&t->state, B3TS_STARTUP); - - rc = pthread_create(&t->tid, NULL, blake3_thread_start_fn, t); - assert(rc == 0); - - /* wait until it's waiting */ - rc = pthread_mutex_lock(&t->lock); - assert(!rc); - while (t->state != B3TS_WAITING) { - rc = pthread_cond_wait(&t->cond, &t->lock); - assert(!rc); - } - rc = pthread_mutex_unlock(&t->lock); - assert(!rc); - - return 0; -} - -void blake3_thread_shutdown(blake3_thread *t) -{ - blake3_host_state *hs; - int rc; - - (void)rc; - (void)hs; - - assert(t); - hs = t->hs; - assert(hs); - - /* verify that thread state is sane */ - assert(t >= hs->threads && t < hs->threads + hs->num_threads); - assert(t->id < hs->num_threads); - - rc = pthread_mutex_lock(&t->lock); - assert(rc == 0); - while (t->state != B3TS_WAITING) { - rc = pthread_cond_wait(&t->cond, &t->lock); - assert(!rc); - } - - atomic_store(&t->state, B3TS_SHUTTINGDOWN); - - rc = pthread_cond_signal(&t->cond); - assert(rc == 0); - - rc = pthread_mutex_unlock(&t->lock); - assert(rc == 0); - - rc = pthread_join(t->tid, NULL); - assert(rc == 0); -} - -blake3_thread *blake3_host_state_thread_pool_reserve_portable(blake3_host_state *hs, bool wait) -{ - blake3_thread *t = NULL; - unsigned int slot, state; - int rc; - - (void)rc; - (void)state; - - assert(hs); - - rc = pthread_mutex_lock(&hs->lock_thread_free); - assert(!rc); - if (hs->thread_free == 0 && !wait) - goto out; - - while (hs->thread_free == 0) { - rc = pthread_cond_wait(&hs->cond_thread_free, &hs->lock_thread_free); - assert(!rc); - } - slot = highest_one(hs->thread_free); - assert(slot < hs->num_threads); - t = hs->threads + slot; - - assert((state = atomic_load(&t->state)) == B3TS_WAITING || state == B3TS_RELEASE); - - /* clear the slot */ - hs->thread_free &= ~((uint64_t)1 << slot); - -out: - pthread_mutex_unlock(&hs->lock_thread_free); - - return t; -} - -void blake3_host_state_thread_pool_unreserve_portable(blake3_host_state *hs, blake3_thread *t) -{ - unsigned int slot; - int rc; - - (void)rc; - slot = t->id; - - rc = pthread_mutex_lock(&hs->lock_thread_free); - assert(!rc); - - hs->thread_free |= ((uint64_t)1 << slot); - pthread_cond_signal(&hs->cond_thread_free); - - pthread_mutex_unlock(&hs->lock_thread_free); -} - -#if defined(__GNUC__) || defined(__clang__) - -blake3_thread *blake3_host_state_thread_pool_reserve(blake3_host_state *hs, bool wait) -{ - blake3_thread *t; - unsigned int slot, state; - uint64_t v, exp; - - (void)state; - - v = atomic_load(&hs->thread_free); - while (v) { - slot = highest_one(v); - exp = v; /* expecting the previous value */ - v &= ~((uint64_t)1 << slot); /* clear this bit */ - if (atomic_compare_exchange_strong(&hs->thread_free, &exp, v)) { - t = hs->threads + slot; - assert((state = atomic_load(&t->state)) == B3TS_WAITING || state == B3TS_RELEASE); - return t; - } - v = exp; - } - - if (!wait) - return NULL; - - /* fallback to the portable case */ - return blake3_host_state_thread_pool_reserve_portable(hs, wait); -} - -void blake3_host_state_thread_pool_unreserve(blake3_host_state *hs, blake3_thread *t) -{ - atomic_fetch_or(&hs->thread_free, (uint64_t)1 << t->id); - pthread_cond_broadcast(&hs->cond_thread_free); -} - -#else - -blake3_thread *blake3_host_state_thread_pool_reserve(blake3_host_state *hs, bool wait) -{ - return blake3_host_state_thread_pool_reserve_portable(hs, wait); -} - -void blake3_host_state_thread_pool_unreserve(blake3_host_state *hs, blake3_thread *t) -{ - blake3_host_state_thread_pool_unreserve_portable(hs, t); -} - -#endif - -int blake3_thread_submit(blake3_thread *t, blake3_thread_fn fn, void *arg) -{ - blake3_host_state *hs; - int rc; - - (void)rc; - (void)hs; - - assert(t); - hs = t->hs; - assert(hs); - - /* verify that thread state is sane */ - assert(t >= hs->threads && t < hs->threads + hs->num_threads); - assert(t->id < hs->num_threads); - - // fprintf(stderr, "%s: submit #%d\n", __func__, t->id); - - rc = pthread_mutex_lock(&t->lock); - assert(rc == 0); - while (t->state != B3TS_WAITING && t->state != B3TS_RELEASE) { - rc = pthread_cond_wait(&t->cond, &t->lock); - assert(!rc); - } - - atomic_store(&t->state, B3TS_SUBMITTED); - t->fn = fn; - t->arg = arg; - - rc = pthread_cond_signal(&t->cond); - assert(rc == 0); - - rc = pthread_mutex_unlock(&t->lock); - assert(rc == 0); - - return 0; -} - -int blake3_thread_wait_result(blake3_thread *t) -{ - blake3_host_state *hs; - unsigned int state; - int rc; - - (void)rc; - (void)hs; - - assert(t); - hs = t->hs; - assert(hs); - - /* verify that thread state is sane */ - assert(t >= hs->threads && t < hs->threads + hs->num_threads); - assert(t->id < hs->num_threads); - - // fprintf(stderr, "%s: wait_result #%d\n", __func__, t->id); - - rc = pthread_mutex_lock(&t->lock); - assert(rc == 0); - while ((state = atomic_load(&t->state)) == B3TS_SUBMITTED || state == B3TS_PROCESSING) { - rc = pthread_cond_wait(&t->cond, &t->lock); - assert(!rc); - } - - assert(state == B3TS_DONE); - - rc = pthread_mutex_unlock(&t->lock); - assert(rc == 0); - - return 0; -} - -void blake3_thread_release(blake3_thread *t) -{ - blake3_host_state *hs; - int rc; - - (void)rc; - (void)hs; - - assert(t); - hs = t->hs; - assert(hs); - - /* verify that thread state is sane */ - assert(t >= hs->threads && t < hs->threads + hs->num_threads); - assert(t->id < hs->num_threads); - - // fprintf(stderr, "%s: wait_result #%d\n", __func__, t->id); - -#if 0 - rc = pthread_mutex_lock(&t->lock); - assert(rc == 0); -#endif - - assert(atomic_load(&t->state) == B3TS_DONE); - - atomic_store(&t->state, B3TS_RELEASE); - rc = pthread_cond_signal(&t->cond); - assert(rc == 0); - -#if 0 - rc = pthread_mutex_unlock(&t->lock); - assert(rc == 0); -#endif -} - -int blake3_thread_wait_result_and_release(blake3_thread *t) -{ - int rc; - - (void)rc; - - rc = blake3_thread_wait_result(t); - assert(!rc); - - blake3_thread_release(t); - - return 0; -} - -#define B3FN_SHUTDOWN ((blake3_thread_fn)(void *)-1) #define B3WORK_SHUTDOWN ((const blake3_thread_work *)(void *)-1) void *blake3_worker_thread(void *arg) @@ -673,6 +289,31 @@ void blake3_thread_pool_cleanup(blake3_thread_pool *tp) memset(tp, 0, sizeof(*tp)); } +blake3_thread_pool *blake3_thread_pool_create(unsigned int num_threads) +{ + blake3_thread_pool *tp; + int rc; + + (void)rc; + tp = malloc(sizeof(*tp)); + if (!tp) + return NULL; + + rc = blake3_thread_pool_init(tp, num_threads); + assert(!rc); + + return tp; +} + +void blake3_thread_pool_destroy(blake3_thread_pool *tp) +{ + if (!tp) + return; + + blake3_thread_pool_cleanup(tp); + free(tp); +} + static void test_worker_thread_fn(void *arg) { int *p = arg;