From 343179ca4deabe1a2ea4e478b6de437b6c90ea37 Mon Sep 17 00:00:00 2001 From: taymindis Date: Sat, 15 Sep 2018 18:07:04 +0800 Subject: [PATCH] added on new feature --- example.c | 204 ++++++++++++++++++++++++++++++++++++------------------ lfstack.c | 99 +++++++++++++++----------- lfstack.h | 5 ++ 3 files changed, 203 insertions(+), 105 deletions(-) diff --git a/example.c b/example.c index 2df9fbe..0e12bf1 100644 --- a/example.c +++ b/example.c @@ -9,28 +9,96 @@ #include "lfstack.h" +typedef void (*test_function)(pthread_t*); + void one_push_and_multi_pop(pthread_t *threads); void one_pop_and_multi_push(pthread_t *threads); void multi_push_pop(pthread_t *threads); void* worker_push_pop(void *); void* worker_push(void *); void* worker_pop(void *); -void* worker_single_pop(void *); +void* worker_pushingle_c(void *); + +/**Testing must**/ +void one_push_and_multi_pop_must(pthread_t *threads); +void one_pop_must_and_multi_push(pthread_t *threads); +void multi_push_pop_must(pthread_t *threads); +void* worker_push_pop_must(void *); +void* worker_push_must(void *); +void* worker_pop_must(void *); +void* worker_single_pop_must(void *); + +void running_test(test_function testfn); struct timeval tv1, tv2; #define total_put 50000 -int nthreads = 8; //sysconf(_SC_NPROCESSORS_ONLN); // Linux +#define total_running_loop 50 +int nthreads = 4; int one_thread = 1; int nthreads_exited = 0; lfstack_t *mystack; + +void* worker_pop_must(void *arg) { + int i = 0; + int *int_data; + int total_loop = total_put * (*(int*)arg); + while (i++ < total_loop) { + /*Pop*/ + int_data = lfstack_pop_must(mystack); + // printf("%d\n", *int_data); + + free(int_data); + } + __sync_add_and_fetch(&nthreads_exited, 1); + return 0; +} + +void* worker_single_pop_must(void *arg) { + int i = 0; + int *int_data; + int total_loop = total_put * (*(int*)arg); + while (i++ < total_loop) { + /*Pop*/ + int_data = lfstack_single_pop_must(mystack); + // printf("%d\n", *int_data); + + free(int_data); + } + __sync_add_and_fetch(&nthreads_exited, 1); + return 0; +} + +void* worker_push_pop_must(void *arg) +{ + int i = 0; + int *int_data; + while (i < total_put) { + int_data = (int*)malloc(sizeof(int)); + assert(int_data != NULL); + *int_data = i++; + /*Push*/ + while (lfstack_push(mystack, int_data)) { + printf("ENQ FULL?\n"); + } + + /*Pop*/ + int_data = lfstack_pop_must(mystack); + // printf("%d\n", *int_data); + free(int_data); + } + __sync_add_and_fetch(&nthreads_exited, 1); + return 0; +} + void* worker_pop(void *arg) { int i = 0; int *int_data; int total_loop = total_put * (*(int*)arg); while (i++ < total_loop) { + /*Pop*/ while ((int_data = lfstack_pop(mystack)) == NULL) { - + lfstack_sleep(1); } // printf("%d\n", *int_data); @@ -40,15 +108,17 @@ void* worker_pop(void *arg) { return 0; } -void* worker_single_pop(void *arg) { +void* worker_pushingle_c(void *arg) { int i = 0; int *int_data; int total_loop = total_put * (*(int*)arg); while (i++ < total_loop) { + /*Pop*/ while ((int_data = lfstack_single_pop(mystack)) == NULL) { - + lfstack_sleep(1); } // printf("%d\n", *int_data); + free(int_data); } __sync_add_and_fetch(&nthreads_exited, 1); @@ -64,9 +134,10 @@ void* worker_push(void *arg) int_data = (int*)malloc(sizeof(int)); assert(int_data != NULL); *int_data = i; + /*Push*/ while (lfstack_push(mystack, int_data)) { - // printf("PUSH FULL?\n"); + // printf("ENQ FULL?\n"); } } // __sync_add_and_fetch(&nthreads_exited, 1); @@ -82,12 +153,14 @@ void* worker_push_pop(void *arg) int_data = (int*)malloc(sizeof(int)); assert(int_data != NULL); *int_data = i++; + /*Push*/ while (lfstack_push(mystack, int_data)) { - printf("PUSH FULL?\n"); + printf("ENQ FULL?\n"); } + /*Pop*/ while ((int_data = lfstack_pop(mystack)) == NULL) { - // printf("POP EMPTY? %zu\n", lfstack_size(mystack)); + lfstack_sleep(1); } // printf("%d\n", *int_data); free(int_data); @@ -97,18 +170,17 @@ void* worker_push_pop(void *arg) } #define join_threads \ -for (i = 0; i < nthreads; i++)\ -pthread_join(threads[i], NULL);\ -printf("current size= %d\n", (int) lfstack_size(mystack) ) +for (i = 0; i < nthreads; i++) {\ +pthread_join(threads[i], NULL); \ +} #define detach_thread_and_loop \ for (i = 0; i < nthreads; i++)\ pthread_detach(threads[i]);\ while ( nthreads_exited < nthreads ) \ - lfstack_sleep(2);\ + lfstack_sleep(10);\ if(lfstack_size(mystack) != 0){\ -lfstack_sleep(2);\ -printf("current size= %zu\n", lfstack_size(mystack) );\ +lfstack_sleep(10);\ } void multi_push_pop(pthread_t *threads) { @@ -121,13 +193,14 @@ void multi_push_pop(pthread_t *threads) { join_threads; // detach_thread_and_loop; } + void one_pop_and_multi_push(pthread_t *threads) { printf("-----------%s---------------\n", "one_pop_and_multi_push"); int i; for (i = 0; i < nthreads; i++) pthread_create(threads + i, NULL, worker_push, &one_thread); - worker_single_pop(&nthreads); + worker_pushingle_c(&nthreads); join_threads; // detach_thread_and_loop; @@ -147,90 +220,89 @@ void one_push_and_multi_pop(pthread_t *threads) { #pragma GCC diagnostic pop } -int main(void) -{ - int n; - mystack = malloc(sizeof (lfstack_t)); - if (lfstack_init(mystack) == -1) - return -1; +void one_pop_must_and_multi_push(pthread_t *threads) { + printf("-----------%s---------------\n", "one_pop_must_and_multi_push"); + int i; + for (i = 0; i < nthreads; i++) + pthread_create(threads + i, NULL, worker_push, &one_thread); - for (n = 0; n < 30; n++) { - printf("Current running at =%d, ", n); - nthreads_exited = 0; + worker_single_pop_must(&nthreads); - /* Spawn threads. */ - pthread_t threads[nthreads]; - printf("Using %d thread%s.\n", nthreads, nthreads == 1 ? "" : "s"); - printf("Total requests %d \n", total_put); - gettimeofday(&tv1, NULL); + join_threads; + // detach_thread_and_loop; +} - multi_push_pop(threads); +void one_push_and_multi_pop_must(pthread_t *threads) { + printf("-----------%s---------------\n", "one_push_and_multi_pop_must"); + int i; + for (i = 0; i < nthreads; i++) + pthread_create(threads + i, NULL, worker_pop_must, &one_thread); - gettimeofday(&tv2, NULL); - printf ("Total time = %f seconds\n", - (double) (tv2.tv_usec - tv1.tv_usec) / 1000000 + - (double) (tv2.tv_sec - tv1.tv_sec)); + worker_push(&nthreads); + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wimplicit-function-declaration" + detach_thread_and_loop; +#pragma GCC diagnostic pop - //getchar(); - lfstack_sleep(1); - assert ( 0 == lfstack_size(mystack) && "Error, all element should be pop out but not"); +} + +void multi_push_pop_must(pthread_t *threads) { + printf("-----------%s---------------\n", "multi_push_pop_must"); + int i; + for (i = 0; i < nthreads; i++) { + pthread_create(threads + i, NULL, worker_push_pop_must, NULL); } - for (n = 0; n < 30; n++) { + join_threads; + // detach_thread_and_loop; +} + +void running_test(test_function testfn) { + int n; + for (n = 0; n < total_running_loop; n++) { printf("Current running at =%d, ", n); nthreads_exited = 0; - /* Spawn threads. */ pthread_t threads[nthreads]; printf("Using %d thread%s.\n", nthreads, nthreads == 1 ? "" : "s"); printf("Total requests %d \n", total_put); gettimeofday(&tv1, NULL); - one_push_and_multi_pop(threads); + testfn(threads); gettimeofday(&tv2, NULL); printf ("Total time = %f seconds\n", (double) (tv2.tv_usec - tv1.tv_usec) / 1000000 + (double) (tv2.tv_sec - tv1.tv_sec)); - //getchar(); - lfstack_sleep(1); - assert ( 0 == lfstack_size(mystack) && "Error, all element should be pop out but not"); + lfstack_sleep(10); + assert ( 0 == lfstack_size(mystack) && "Error, all stack should be consumed but not"); } +} - for (n = 0; n < 30; n++) { - printf("Current running at =%d, ", n); - nthreads_exited = 0; +int main(void) { + mystack = malloc(sizeof (lfstack_t)); + if (lfstack_init(mystack) == -1) + return -1; - /* Spawn threads. */ - pthread_t threads[nthreads]; - printf("Using %d thread%s.\n", nthreads, nthreads == 1 ? "" : "s"); - printf("Total requests %d \n", total_put); - gettimeofday(&tv1, NULL); + running_test(one_push_and_multi_pop); + running_test(one_push_and_multi_pop_must); - one_pop_and_multi_push(threads); + running_test(one_pop_and_multi_push); + running_test(one_pop_must_and_multi_push); - gettimeofday(&tv2, NULL); - printf ("Total time = %f seconds\n", - (double) (tv2.tv_usec - tv1.tv_usec) / 1000000 + - (double) (tv2.tv_sec - tv1.tv_sec)); + running_test(multi_push_pop); + running_test(multi_push_pop_must); - //getchar(); - lfstack_sleep(1); - assert ( 0 == lfstack_size(mystack) && "Error, all element should be pop out but not"); - } - printf("Take a 4 seconds sleep \n"); - sleep(4); - printf("Flush all the inactive memory \n"); - lfstack_flush(mystack); - // printf("Check the memory usage, it should all flushed, press any key to exit \n"); - // getchar(); lfstack_destroy(mystack); + // sleep(3); free(mystack); + printf("Test Pass!\n"); return 0; } diff --git a/lfstack.c b/lfstack.c index c7cfebb..f60b756 100644 --- a/lfstack.c +++ b/lfstack.c @@ -36,12 +36,12 @@ #include // for usleep #include -#define __LFQ_VAL_COMPARE_AND_SWAP __sync_val_compare_and_swap -#define __LFQ_BOOL_COMPARE_AND_SWAP __sync_bool_compare_and_swap -#define __LFQ_FETCH_AND_ADD __sync_fetch_and_add -#define __LFQ_ADD_AND_FETCH __sync_add_and_fetch -#define __LFQ_YIELD_THREAD sched_yield -#define __LFQ_SYNC_MEMORY __sync_synchronize +#define __LFS_VAL_COMPARE_AND_SWAP __sync_val_compare_and_swap +#define __LFS_BOOL_COMPARE_AND_SWAP __sync_bool_compare_and_swap +#define __LFS_FETCH_AND_ADD __sync_fetch_and_add +#define __LFS_ADD_AND_FETCH __sync_add_and_fetch +#define __LFS_YIELD_THREAD sched_yield +#define __LFS_SYNC_MEMORY __sync_synchronize #else @@ -51,13 +51,13 @@ inline BOOL __SYNC_BOOL_CAS(LONG64 volatile *dest, LONG64 input, LONG64 comparand) { return InterlockedCompareExchangeNoFence64(dest, input, comparand) == comparand; } -#define __LFQ_VAL_COMPARE_AND_SWAP(dest, comparand, input) \ +#define __LFS_VAL_COMPARE_AND_SWAP(dest, comparand, input) \ InterlockedCompareExchangeNoFence64((LONG64 volatile *)dest, (LONG64)input, (LONG64)comparand) -#define __LFQ_BOOL_COMPARE_AND_SWAP(dest, comparand, input) \ +#define __LFS_BOOL_COMPARE_AND_SWAP(dest, comparand, input) \ __SYNC_BOOL_CAS((LONG64 volatile *)dest, (LONG64)input, (LONG64)comparand) -#define __LFQ_FETCH_AND_ADD InterlockedExchangeAddNoFence64 -#define __LFQ_ADD_AND_FETCH InterlockedAddNoFence64 -#define __LFQ_SYNC_MEMORY MemoryBarrier +#define __LFS_FETCH_AND_ADD InterlockedExchangeAddNoFence64 +#define __LFS_ADD_AND_FETCH InterlockedAddNoFence64 +#define __LFS_SYNC_MEMORY MemoryBarrier #else #ifndef asm @@ -66,21 +66,21 @@ inline BOOL __SYNC_BOOL_CAS(LONG64 volatile *dest, LONG64 input, LONG64 comparan inline BOOL __SYNC_BOOL_CAS(LONG volatile *dest, LONG input, LONG comparand) { return InterlockedCompareExchangeNoFence(dest, input, comparand) == comparand; } -#define __LFQ_VAL_COMPARE_AND_SWAP(dest, comparand, input) \ +#define __LFS_VAL_COMPARE_AND_SWAP(dest, comparand, input) \ InterlockedCompareExchangeNoFence((LONG volatile *)dest, (LONG)input, (LONG)comparand) -#define __LFQ_BOOL_COMPARE_AND_SWAP(dest, comparand, input) \ +#define __LFS_BOOL_COMPARE_AND_SWAP(dest, comparand, input) \ __SYNC_BOOL_CAS((LONG volatile *)dest, (LONG)input, (LONG)comparand) -#define __LFQ_FETCH_AND_ADD InterlockedExchangeAddNoFence -#define __LFQ_ADD_AND_FETCH InterlockedAddNoFence -#define __LFQ_SYNC_MEMORY() asm mfence +#define __LFS_FETCH_AND_ADD InterlockedExchangeAddNoFence +#define __LFS_ADD_AND_FETCH InterlockedAddNoFence +#define __LFS_SYNC_MEMORY() asm mfence #endif #include -#define __LFQ_YIELD_THREAD SwitchToThread +#define __LFS_YIELD_THREAD SwitchToThread #endif #include "lfstack.h" -#define DEF_LFQ_ASSIGNED_SPIN 2048 +#define DEF_LFS_ASSIGNED_SPIN 2048 #if defined __GNUC__ || defined __CYGWIN__ || defined __MINGW32__ || defined __APPLE__ #define lfs_time_t long @@ -101,7 +101,6 @@ struct lfstack_cas_node_s { lfs_time_t _deactivate_tm; }; -//static lfstack_cas_node_t* __lfq_assigned(lfstack_t *); static void __lfs_recycle_free(lfstack_t *, lfstack_cas_node_t*); static void _lfs_check_free(lfstack_t *); static void *_pop(lfstack_t *); @@ -115,12 +114,12 @@ _pop(lfstack_t *lfs) { for (;;) { head = lfs->head; - __LFQ_SYNC_MEMORY(); + __LFS_SYNC_MEMORY(); /** ABA PROBLEM? in order to solve this, I use time free to avoid realloc the same aligned address **/ if (lfs->head == head) { prev = head->prev; if (prev) { - if (__LFQ_BOOL_COMPARE_AND_SWAP(&lfs->head, head, prev)) { + if (__LFS_BOOL_COMPARE_AND_SWAP(&lfs->head, head, prev)) { val = head->value; break; } @@ -132,10 +131,10 @@ _pop(lfstack_t *lfs) { } } __lfs_recycle_free(lfs, head); - __LFQ_YIELD_THREAD(); + __LFS_YIELD_THREAD(); _done: // __asm volatile("" ::: "memory"); - __LFQ_SYNC_MEMORY(); + __LFS_SYNC_MEMORY(); _lfs_check_free(lfs); return val; } @@ -147,11 +146,11 @@ _single_pop(lfstack_t *lfs) { for (;;) { head = lfs->head; - __LFQ_SYNC_MEMORY(); + __LFS_SYNC_MEMORY(); if (lfs->head == head) { prev = head->prev; if (prev) { - if (__LFQ_BOOL_COMPARE_AND_SWAP(&lfs->head, head, prev)) { + if (__LFS_BOOL_COMPARE_AND_SWAP(&lfs->head, head, prev)) { val = head->value; free(head); break; @@ -176,9 +175,9 @@ _push(lfstack_t *lfs, void* value) { new_head->value = value; new_head->nextfree = NULL; for (;;) { - __LFQ_SYNC_MEMORY(); + __LFS_SYNC_MEMORY(); new_head->prev = head = lfs->head; - if (__LFQ_BOOL_COMPARE_AND_SWAP(&lfs->head, head, new_head)) { + if (__LFS_BOOL_COMPARE_AND_SWAP(&lfs->head, head, new_head)) { // always check any free value _lfs_check_free(lfs); return 0; @@ -194,17 +193,17 @@ __lfs_recycle_free(lfstack_t *lfs, lfstack_cas_node_t* freenode) { lfstack_cas_node_t *freed; do { freed = lfs->move_free; - } while (!__LFQ_BOOL_COMPARE_AND_SWAP(&freed->nextfree, NULL, freenode) ); + } while (!__LFS_BOOL_COMPARE_AND_SWAP(&freed->nextfree, NULL, freenode) ); lfs_get_curr_time(&freenode->_deactivate_tm); - __LFQ_BOOL_COMPARE_AND_SWAP(&lfs->move_free, freed, freenode); + __LFS_BOOL_COMPARE_AND_SWAP(&lfs->move_free, freed, freenode); } static void _lfs_check_free(lfstack_t *lfs) { lfs_time_t curr_time; - if (__LFQ_BOOL_COMPARE_AND_SWAP(&lfs->in_free_mode, 0, 1)) { + if (__LFS_BOOL_COMPARE_AND_SWAP(&lfs->in_free_mode, 0, 1)) { lfs_get_curr_time(&curr_time); lfstack_cas_node_t *rtfree = lfs->root_free, *nextfree; while ( rtfree && (rtfree != lfs->move_free) ) { @@ -218,9 +217,9 @@ _lfs_check_free(lfstack_t *lfs) { } } lfs->root_free = rtfree; - __LFQ_BOOL_COMPARE_AND_SWAP(&lfs->in_free_mode, 1, 0); + __LFS_BOOL_COMPARE_AND_SWAP(&lfs->in_free_mode, 1, 0); } - __LFQ_SYNC_MEMORY(); + __LFS_SYNC_MEMORY(); } int @@ -281,18 +280,18 @@ lfstack_push(lfstack_t *lfs, void *value) { if (_push(lfs, value)) { return -1; } - __LFQ_ADD_AND_FETCH(&lfs->size, 1); + __LFS_ADD_AND_FETCH(&lfs->size, 1); return 0; } void* lfstack_pop(lfstack_t *lfs) { void *v; - if (//__LFQ_ADD_AND_FETCH(&lfs->size, 0) && + if (//__LFS_ADD_AND_FETCH(&lfs->size, 0) && (v = _pop(lfs)) ) { - __LFQ_FETCH_AND_ADD(&lfs->size, -1); + __LFS_FETCH_AND_ADD(&lfs->size, -1); return v; } // Rest the thread for other thread, to avoid keep looping force @@ -300,15 +299,26 @@ lfstack_pop(lfstack_t *lfs) { return NULL; } +void* +lfstack_pop_must(lfstack_t *lfs) { + void *v; + while ( !(v = _pop(lfs)) ) { + // Rest the thread for other thread, to avoid keep looping force + lfstack_sleep(1); + } + __LFS_FETCH_AND_ADD(&lfs->size, -1); + return v; +} + /**This is only applicable when only single thread consume only**/ void* lfstack_single_pop(lfstack_t *lfs) { void *v; - if (//__LFQ_ADD_AND_FETCH(&lfs->size, 0) && + if (//__LFS_ADD_AND_FETCH(&lfs->size, 0) && (v = _single_pop(lfs)) ) { - __LFQ_FETCH_AND_ADD(&lfs->size, -1); + __LFS_FETCH_AND_ADD(&lfs->size, -1); return v; } // Rest the thread for other thread, to avoid keep looping force @@ -316,12 +326,23 @@ lfstack_single_pop(lfstack_t *lfs) { return NULL; } +void* +lfstack_single_pop_must(lfstack_t *lfs) { + void *v; + while ( !(v = _single_pop(lfs)) ) { + // Rest the thread for other thread, to avoid keep looping force + lfstack_sleep(1); + } + __LFS_FETCH_AND_ADD(&lfs->size, -1); + return v; +} + size_t lfstack_size(lfstack_t *lfs) { - return __LFQ_ADD_AND_FETCH(&lfs->size, 0); + return __LFS_ADD_AND_FETCH(&lfs->size, 0); } -void +void lfstack_sleep(unsigned int milisec) { #if defined __GNUC__ || defined __CYGWIN__ || defined __MINGW32__ || defined __APPLE__ #pragma GCC diagnostic push diff --git a/lfstack.h b/lfstack.h index cf9d13b..adc1cf8 100644 --- a/lfstack.h +++ b/lfstack.h @@ -60,6 +60,11 @@ extern int lfstack_init(lfstack_t *lfstack); extern int lfstack_push(lfstack_t *lfstack, void *value); extern void* lfstack_pop(lfstack_t *lfstack); extern void* lfstack_single_pop(lfstack_t *lfstack); + +/** loop until value been pop, it sleeps 1ms if not found, to reduce cpu high usage **/ +extern void* lfstack_pop_must(lfstack_t *lfstack); +extern void* lfstack_single_pop_must(lfstack_t *lfstack); + /*** lfstack_flush to flush all the inacitve element ***/ extern void lfstack_flush(lfstack_t *lfstack); extern void lfstack_destroy(lfstack_t *lfstack);