Skip to content

Commit

Permalink
more cleanups
Browse files Browse the repository at this point in the history
Signed-off-by: Pantelis Antoniou <[email protected]>
  • Loading branch information
pantoniou committed Sep 6, 2023
1 parent 6977fe5 commit 59c09f6
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 461 deletions.
43 changes: 29 additions & 14 deletions src/blake3/blake3.c
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ static size_t blake3_compress_parents_parallel(blake3_host_state *hs, const uint
unsigned int simd_degree_or_2;
size_t parents_array_len = 0;

simd_degree_or_2 = hs->simd_degree_or_2;
simd_degree_or_2 = hs->simd_degree < 2 ? 2 : hs->simd_degree;

assert(2 <= num_chaining_values);
assert(num_chaining_values <= 2 * simd_degree_or_2);
Expand Down Expand Up @@ -242,11 +242,11 @@ static size_t blake3_compress_parents_parallel(blake3_host_state *hs, const uint

static size_t blake3_compress_subtree_wide(blake3_host_state *hs, const uint8_t *input, size_t input_len, const uint32_t key[8], uint64_t chunk_counter, uint8_t flags, uint8_t *out);

static void blake3_compress_subtree_wide_thread(struct _blake3_thread *t, void *arg)
static void blake3_compress_subtree_wide_thread(void *arg)
{
blake3_compress_subtree_state *s = arg;

s->n = blake3_compress_subtree_wide(t->hs, s->input, s->input_len, s->key, s->chunk_counter, s->flags, s->out);
s->n = blake3_compress_subtree_wide(s->hs, s->input, s->input_len, s->key, s->chunk_counter, s->flags, s->out);
}

// The wide helper function returns (writes out) an array of chaining values
Expand Down Expand Up @@ -274,13 +274,14 @@ static size_t blake3_compress_subtree_wide(blake3_host_state *hs, const uint8_t
uint8_t *cv_array, *left_cvs, *right_cvs;
bool left_mt, right_mt;
blake3_compress_subtree_state left_state, right_state;
blake3_thread_work left_work, right_work;
blake3_thread *left_thread, *right_thread;
int rc;

(void)rc;

simd_degree = hs->simd_degree;
simd_degree_or_2 = hs->simd_degree_or_2;
simd_degree_or_2 = hs->simd_degree < 2 ? 2 : hs->simd_degree;

/* for multithreading to be worth it has to be this big */
mt_degree = hs->mt_degree;
Expand Down Expand Up @@ -344,14 +345,14 @@ static size_t blake3_compress_subtree_wide(blake3_host_state *hs, const uint8_t

// get a thread (if possible)
if (left_mt) {
left_thread = blake3_host_state_thread_pool_reserve(hs, false);
left_thread = blake3_thread_pool_reserve(hs->tp, false);
// fprintf(stderr, "%s: left_thread #%d ctr=%lu chunks=%lu\n", __func__, left_thread ? (int)left_thread->id : -1,
// left_chunk_counter, left_input_len / BLAKE3_CHUNK_LEN);
} else
left_thread = NULL;

if (right_mt) {
right_thread = blake3_host_state_thread_pool_reserve(hs, false);
right_thread = blake3_thread_pool_reserve(hs->tp, false);
// fprintf(stderr, "%s: right_thread #%d ctr=%lu chunks=%lu\n", __func__, right_thread ? (int)right_thread->id : -1,
// right_chunk_counter, right_input_len / BLAKE3_CHUNK_LEN);
} else
Expand All @@ -361,8 +362,10 @@ static size_t blake3_compress_subtree_wide(blake3_host_state *hs, const uint8_t
left_n = blake3_compress_subtree_wide(hs, left_input, left_input_len, key, left_chunk_counter, flags, left_cvs);
assert(left_n > 0);
left_thread = NULL;
left_work.fn = NULL;
left_work.arg = NULL;
} else {

left_state.hs = hs;
left_state.input = left_input;
left_state.input_len = left_input_len;
left_state.key = key;
Expand All @@ -371,7 +374,10 @@ static size_t blake3_compress_subtree_wide(blake3_host_state *hs, const uint8_t
left_state.out = left_cvs;
left_state.n = 0;

rc = blake3_thread_submit(left_thread, blake3_compress_subtree_wide_thread, &left_state);
left_work.fn = blake3_compress_subtree_wide_thread;
left_work.arg = &left_state;

rc = blake3_thread_submit_work(left_thread, &left_work);
assert(!rc);
}

Expand All @@ -380,7 +386,11 @@ static size_t blake3_compress_subtree_wide(blake3_host_state *hs, const uint8_t
assert(right_n > 0);

right_thread = NULL;
right_work.fn = NULL;
right_work.arg = NULL;
} else {

right_state.hs = hs;
right_state.input = right_input;
right_state.input_len = right_input_len;
right_state.key = key;
Expand All @@ -389,19 +399,24 @@ static size_t blake3_compress_subtree_wide(blake3_host_state *hs, const uint8_t
right_state.out = right_cvs;
right_state.n = 0;

rc = blake3_thread_submit(right_thread, blake3_compress_subtree_wide_thread, &right_state);
right_work.fn = blake3_compress_subtree_wide_thread;
right_work.arg = &right_state;

rc = blake3_thread_submit_work(right_thread, &right_work);
assert(!rc);
}

if (left_thread) {
blake3_thread_wait_result_and_release(left_thread);
blake3_host_state_thread_pool_unreserve(hs, left_thread);
blake3_thread_wait_work(left_thread);
blake3_thread_pool_unreserve(hs->tp, left_thread);
left_thread = NULL;

left_n = left_state.n;
}
if (right_thread) {
blake3_thread_wait_result_and_release(right_thread);
blake3_host_state_thread_pool_unreserve(hs, right_thread);
blake3_thread_wait_work(right_thread);
blake3_thread_pool_unreserve(hs->tp, right_thread);
right_thread = NULL;

right_n = right_state.n;
}
Expand Down Expand Up @@ -436,7 +451,7 @@ static void blake3_compress_subtree_to_parent_node(blake3_host_state *hs, const
uint8_t *cv_array, *out_array;
size_t simd_degree_or_2, num_cvs;

simd_degree_or_2 = hs->simd_degree_or_2;
simd_degree_or_2 = hs->simd_degree < 2 ? 2 : hs->simd_degree;

cv_array = alloca(simd_degree_or_2 * BLAKE3_OUT_LEN);
out_array = alloca(simd_degree_or_2 * BLAKE3_OUT_LEN / 2);
Expand Down
63 changes: 12 additions & 51 deletions src/blake3/blake3_host_state.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,43 +16,6 @@
#include "blake3_impl.h"
#include "blake3_internal.h"

#if 0
static void test_thread_fn(struct _blake3_thread *t, void *arg)
{
int *p = arg;

(*p)++;
}

void test_threads(blake3_host_state *hs)
{
blake3_thread **threads;
unsigned int i, count;
int test_count;

count = hs->num_threads;
threads = alloca(count * sizeof(*threads));

test_count = 0;

for (i = 0; i < count; i++) {
threads[i] = blake3_thread_pool_reserve(hs, true);
(void)blake3_thread_submit(threads[i], test_thread_fn, &test_count);
}

for (i = 0; i < count; i++)
(void)blake3_thread_wait_result(threads[i]);

fprintf(stderr, "%s: test_count=%d\n", __func__, test_count);

for (i = 0; i < count; i++) {
blake3_thread_release(threads[i]);
blake3_thread_pool_unreserve(hs, threads[i]);
}

}
#endif

static uint64_t supported_gpu_backends(void)
{
return 0;
Expand Down Expand Up @@ -409,17 +372,13 @@ static int select_backends(blake3_host_state *hs)
if (hs->simd_degree < hs->compress_in_place_be->simd_degree)
hs->simd_degree = hs->compress_in_place_be->simd_degree;

hs->simd_degree_or_2 = hs->simd_degree < 2 ? 2 : hs->simd_degree;

hs->mt_degree = hs->cfg.mt_degree > 0 ? hs->cfg.mt_degree : 128; /* 128K default */

return 0;
}

int blake3_host_state_init(blake3_host_state *hs, const blake3_host_config *cfg)
{
blake3_thread *t;
unsigned int i;
long scval;
int rc;

Expand All @@ -443,8 +402,8 @@ int blake3_host_state_init(blake3_host_state *hs, const blake3_host_config *cfg)
assert(!rc);

if (!hs->cfg.no_mthread) {

hs->num_threads = hs->cfg.num_threads ? hs->cfg.num_threads : hs->num_cpus;
hs->num_threads = hs->cfg.num_threads ? hs->cfg.num_threads : (hs->num_cpus * 3) / 2;
#if 0

/* maximum number of threads for now */
if (hs->num_threads > 64)
Expand All @@ -466,11 +425,12 @@ int blake3_host_state_init(blake3_host_state *hs, const blake3_host_config *cfg)
rc = blake3_thread_init(hs, t);
assert(rc == 0);
}

#if 0
test_threads(hs);
test_threads(hs);
#else
hs->tp = blake3_thread_pool_create(hs->num_threads);
assert(hs->tp);
#endif
} else {
hs->tp = NULL;
}

if (hs->cfg.debug) {
Expand All @@ -479,7 +439,6 @@ int blake3_host_state_init(blake3_host_state *hs, const blake3_host_config *cfg)
fprintf(stderr, "multithreading: %s\n", hs->cfg.no_mthread ? "false" : "true");
fprintf(stderr, "num_threads: %u\n", hs->num_threads);
fprintf(stderr, "simd_degree: %u\n", hs->simd_degree);
fprintf(stderr, "simd_degree_or_2: %u\n", hs->simd_degree_or_2);
fprintf(stderr, "mt_degree: %u\n", hs->mt_degree);

fprintf(stderr, "supported_backends:\n");
Expand All @@ -499,11 +458,9 @@ int blake3_host_state_init(blake3_host_state *hs, const blake3_host_config *cfg)

void blake3_host_state_cleanup(blake3_host_state *hs)
{
blake3_thread *t;
unsigned int i;

assert(hs);

#if 0
if (hs->threads) {

for (i = 0; i < hs->num_threads; i++) {
Expand All @@ -515,6 +472,10 @@ void blake3_host_state_cleanup(blake3_host_state *hs)
hs->threads = NULL;
hs->num_threads = 0;
}
#else
if (hs->tp)
blake3_thread_pool_destroy(hs->tp);
#endif
}

struct _blake3_host_state *blake3_host_state_create(const blake3_host_config *cfg)
Expand Down
24 changes: 12 additions & 12 deletions src/blake3/blake3_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ static inline blake3_output_t blake3_chunk_state_output(const blake3_chunk_state

// the state for the thread when doing compress subtree
typedef struct {
struct _blake3_host_state *hs;
// inputs
const uint8_t *input;
size_t input_len;
Expand Down Expand Up @@ -280,10 +281,12 @@ typedef struct _blake3_host_state {
blake3_compress_in_place_f compress_in_place;

unsigned int simd_degree;
unsigned int simd_degree_or_2;
unsigned int mt_degree;

unsigned int num_threads;

struct _blake3_thread_pool *tp; // thread pool
#if 0
struct _blake3_thread *threads;
pthread_mutex_t lock_thread_free; // should not be needed at the end if we use atomics
pthread_cond_t cond_thread_free;
Expand All @@ -292,18 +295,9 @@ typedef struct _blake3_host_state {
#else
uint64_t thread_free;
#endif
} blake3_host_state;

int blake3_thread_init(blake3_host_state *hs, blake3_thread *t);
void blake3_thread_shutdown(blake3_thread *t);

int blake3_thread_submit(blake3_thread *t, blake3_thread_fn fn, void *arg);
int blake3_thread_wait_result(blake3_thread *t);
void blake3_thread_release(blake3_thread *t);
int blake3_thread_wait_result_and_release(blake3_thread *t);
#endif

blake3_thread *blake3_host_state_thread_pool_reserve(blake3_host_state *hs, bool wait);
void blake3_host_state_thread_pool_unreserve(blake3_host_state *hs, blake3_thread *t);
} blake3_host_state;

int blake3_host_state_init(blake3_host_state *hs, const blake3_host_config *cfg);
void blake3_host_state_cleanup(blake3_host_state *hs);
Expand All @@ -313,5 +307,11 @@ void blake3_hasher_init_keyed(blake3_host_state *hs, blake3_hasher *self, const
void blake3_hasher_init_derive_key(blake3_host_state *hs, blake3_hasher *self, const char *context);
void blake3_hasher_init_derive_key_raw(blake3_host_state *hs, blake3_hasher *self, const void *context, size_t context_len);

blake3_thread_pool *blake3_thread_pool_create(unsigned int num_threads);
void blake3_thread_pool_destroy(blake3_thread_pool *tp);
blake3_thread *blake3_thread_pool_reserve(blake3_thread_pool *tp, bool wait);
void blake3_thread_pool_unreserve(blake3_thread_pool *tp, blake3_thread *t);
int blake3_thread_submit_work(blake3_thread *t, const blake3_thread_work *work);
int blake3_thread_wait_work(blake3_thread *t);

#endif
Loading

0 comments on commit 59c09f6

Please sign in to comment.