From f773f948b9ccb1c4dc51eb372bc2efd7df5c126e Mon Sep 17 00:00:00 2001 From: uhm0311 Date: Tue, 8 Mar 2022 18:59:56 +0900 Subject: [PATCH] ENHANCE: Enhance error handling when serverlist update failed #219 --- libmemcached/arcus.cc | 74 ++++++++++ libmemcached/arcus_priv.h | 11 ++ libmemcached/auto.cc | 22 +++ libmemcached/behavior.cc | 12 ++ libmemcached/collection.cc | 201 +++++++++++++++++++++++++++ libmemcached/connect.cc | 8 ++ libmemcached/constants.h | 1 + libmemcached/delete.cc | 16 +++ libmemcached/dump.cc | 6 + libmemcached/exist.cc | 10 ++ libmemcached/flush.cc | 12 ++ libmemcached/get.cc | 71 ++++++++++ libmemcached/hash.cc | 27 ++++ libmemcached/hash.h | 4 + libmemcached/hosts.cc | 128 +++++++++++++++++ libmemcached/memcached.cc | 4 + libmemcached/memcached.h | 4 + libmemcached/return.h | 4 + libmemcached/stats.cc | 7 + libmemcached/storage.cc | 16 +++ libmemcached/strerror.cc | 7 + libmemcached/util/pool.cc | 273 +++++++++++++++++++++++++++++++++++++ libmemcached/version.cc | 6 + tests/ketama.cc | 4 + 24 files changed, 928 insertions(+) diff --git a/libmemcached/arcus.cc b/libmemcached/arcus.cc index 26c6c6f2..4933a293 100644 --- a/libmemcached/arcus.cc +++ b/libmemcached/arcus.cc @@ -54,7 +54,11 @@ static inline void do_arcus_zk_watcher_cachelist(zhandle_t *zh, int type, int st * ARCUS_ZK_OPERATIONS */ static inline void do_arcus_zk_update_cachelist(memcached_st *mc, const struct String_vector *strings); +#ifdef CACHELIST_ERROR_HANDLING +static inline memcached_return_t do_arcus_zk_update_cachelist_by_string(memcached_st *mc, char *serverlist, const size_t size); +#else static inline void do_arcus_zk_update_cachelist_by_string(memcached_st *mc, char *serverlist, const size_t size); +#endif static inline void do_arcus_zk_watch_and_update_cachelist(memcached_st *mc, bool *retry); /** @@ -418,7 +422,13 @@ arcus_return_t arcus_proxy_connect(memcached_st *mc, arcus_set_log_stream(mc, proxy->logfile); rc= do_arcus_proxy_connect(mc, proxy); +#ifdef CACHELIST_ERROR_HANDLING + if (rc != ARCUS_SUCCESS or arcus_server_check_for_update(mc) != MEMCACHED_SUCCESS) { + return ARCUS_ERROR; + } +#else arcus_server_check_for_update(mc); +#endif return rc; } @@ -725,15 +735,26 @@ static inline arcus_return_t do_arcus_zk_close(memcached_st *mc) * PUBLIC API * Check for cache server updates. */ +#ifdef CACHELIST_ERROR_HANDLING +memcached_return_t arcus_server_check_for_update(memcached_st *ptr) +#else void arcus_server_check_for_update(memcached_st *ptr) +#endif { +#ifdef CACHELIST_ERROR_HANDLING + memcached_return_t rc= MEMCACHED_SUCCESS; +#endif arcus_st *arcus; size_t size; uint32_t version; arcus= static_cast(memcached_get_server_manager(ptr)); if (not arcus) { +#ifdef CACHELIST_ERROR_HANDLING + return rc; +#else return; +#endif } if (arcus->proxy.data && arcus->proxy.data->version != arcus->proxy.current_version) @@ -751,10 +772,18 @@ void arcus_server_check_for_update(memcached_st *ptr) if (master && master->configure.version == ptr->configure.version) #endif { +#ifdef CACHELIST_ERROR_HANDLING + rc= do_arcus_zk_update_cachelist_by_string(master, arcus->proxy.data->serverlist, size); +#else do_arcus_zk_update_cachelist_by_string(master, arcus->proxy.data->serverlist, size); +#endif } } else { +#ifdef CACHELIST_ERROR_HANDLING + rc= do_arcus_zk_update_cachelist_by_string(ptr, arcus->proxy.data->serverlist, size); +#else do_arcus_zk_update_cachelist_by_string(ptr, arcus->proxy.data->serverlist, size); +#endif } arcus->proxy.current_version= version; } @@ -772,10 +801,17 @@ void arcus_server_check_for_update(memcached_st *ptr) { /* master's cache list was changed, update member's cache list */ pthread_mutex_lock(&lock_arcus); +#ifdef CACHELIST_ERROR_HANDLING + rc= memcached_pool_update_member(arcus->pool, ptr); +#else (void)memcached_pool_update_member(arcus->pool, ptr); +#endif pthread_mutex_unlock(&lock_arcus); } } +#ifdef CACHELIST_ERROR_HANDLING + return rc; +#endif } /** @@ -854,9 +890,15 @@ static inline int do_add_server_to_cachelist(struct arcus_zk_st *zkinfo, char *n return 0; } +#ifdef CACHELIST_ERROR_HANDLING +static inline memcached_return_t do_arcus_update_cachelist(memcached_st *mc, + memcached_server_info_st *serverinfo, + uint32_t servercount) +#else static inline void do_arcus_update_cachelist(memcached_st *mc, memcached_server_info_st *serverinfo, uint32_t servercount) +#endif { arcus_st *arcus= static_cast(memcached_get_server_manager(mc)); memcached_return_t error= MEMCACHED_SUCCESS; @@ -907,17 +949,34 @@ static inline void do_arcus_update_cachelist(memcached_st *mc, ZOO_LOG_WARN(("CACHE_LIST=UPDATED, to %s, cache_servers=%d in %d ms", arcus->zk.ensemble_list, mc->number_of_hosts, msec)); } else { +#ifdef CACHELIST_ERROR_HANDLING + ZOO_LOG_WARN(("CACHE_LIST=UPDATE_FAIL in %d ms. Reason=%s(%d)", msec, memcached_strerror(NULL, error), error)); +#else ZOO_LOG_WARN(("CACHE_LIST=UPDATE_FAIL in %d ms", msec)); +#endif } +#ifdef CACHELIST_ERROR_HANDLING + return error; +#endif } /** * Rebuild the memcached server list by string */ +#ifdef CACHELIST_ERROR_HANDLING +static inline memcached_return_t do_arcus_zk_update_cachelist_by_string(memcached_st *mc, + char *serverlist, + const size_t size) + +#else static inline void do_arcus_zk_update_cachelist_by_string(memcached_st *mc, char *serverlist, const size_t size) +#endif { +#ifdef CACHELIST_ERROR_HANDLING + memcached_return_t rc= MEMCACHED_SUCCESS; +#endif arcus_st *arcus = static_cast(memcached_get_server_manager(mc)); uint32_t servercount= 0; memcached_server_info_st *serverinfo; @@ -925,7 +984,11 @@ static inline void do_arcus_zk_update_cachelist_by_string(memcached_st *mc, serverinfo = static_cast(libmemcached_malloc(mc, sizeof(memcached_server_info_st)*(size+1))); if (not serverinfo) { +#ifdef CACHELIST_ERROR_HANDLING + return rc; +#else return; +#endif } strncpy(buffer, serverlist, ARCUS_MAX_PROXY_FILE_LENGTH); @@ -948,10 +1011,17 @@ static inline void do_arcus_zk_update_cachelist_by_string(memcached_st *mc, } pthread_mutex_lock(&lock_arcus); +#ifdef CACHELIST_ERROR_HANDLING + rc= do_arcus_update_cachelist(mc, serverinfo, servercount); +#else do_arcus_update_cachelist(mc, serverinfo, servercount); +#endif pthread_mutex_unlock(&lock_arcus); libmemcached_free(mc, serverinfo); +#ifdef CACHELIST_ERROR_HANDLING + return rc; +#endif } static inline int do_arcus_zk_process_reconnect(memcached_st *mc, bool *retry) @@ -1058,7 +1128,11 @@ static inline void do_arcus_zk_update_cachelist(memcached_st *mc, servercount++; /* valid znode name */ } } +#ifdef CACHELIST_ERROR_HANDLING + (void)do_arcus_update_cachelist(mc, serverinfo, servercount); +#else do_arcus_update_cachelist(mc, serverinfo, servercount); +#endif libmemcached_free(mc, serverinfo); } } while(0); diff --git a/libmemcached/arcus_priv.h b/libmemcached/arcus_priv.h index 208092e0..25ef02ba 100644 --- a/libmemcached/arcus_priv.h +++ b/libmemcached/arcus_priv.h @@ -125,14 +125,25 @@ struct memcached_server_info_st }; LIBMEMCACHED_API +#ifdef CACHELIST_ERROR_HANDLING +memcached_return_t arcus_server_check_for_update(memcached_st *ptr); +#else void arcus_server_check_for_update(memcached_st *ptr); +#endif #else /* LIBMEMCACHED_WITH_ZK_INTEGRATION */ +#ifdef CACHELIST_ERROR_HANDLING +static inline memcached_return_t arcus_server_check_for_update(memcached_st *) +{ + return MEMCACHED_SUCCESS; +} +#else static inline void arcus_server_check_for_update(memcached_st *) { /* Nothing */ } #endif +#endif /* LIBMEMCACHED_WITH_ZK_INTEGRATION */ #endif /* __LIBMEMCACHED_ARCUS_PRIV_H__ */ diff --git a/libmemcached/auto.cc b/libmemcached/auto.cc index e8ceca19..70f3229f 100644 --- a/libmemcached/auto.cc +++ b/libmemcached/auto.cc @@ -66,7 +66,13 @@ static memcached_return_t text_incr_decr(memcached_st *ptr, const time_t expiration, uint64_t *value) { +#ifdef CACHELIST_ERROR_HANDLING + if (arcus_server_check_for_update(ptr) != MEMCACHED_SUCCESS) { + return memcached_set_error(*ptr, MEMCACHED_INVALID_SERVERLIST, MEMCACHED_AT); + } +#else arcus_server_check_for_update(ptr); +#endif if (memcached_failed(memcached_key_test(*ptr, (const char **)&key, &key_length, 1))) { @@ -106,6 +112,11 @@ static memcached_return_t text_incr_decr(memcached_st *ptr, }; uint32_t server_key= memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length); +#ifdef CACHELIST_ERROR_HANDLING + if (server_key == UINT32_MAX) { + return memcached_set_error(*ptr, MEMCACHED_INVALID_HASHRING, MEMCACHED_AT); + } +#endif memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key); #ifdef ENABLE_REPLICATION @@ -176,7 +187,13 @@ static memcached_return_t binary_incr_decr(memcached_st *ptr, uint8_t cmd, { bool no_reply= ptr->flags.no_reply; +#ifdef CACHELIST_ERROR_HANDLING + if (arcus_server_check_for_update(ptr) != MEMCACHED_SUCCESS) { + return memcached_set_error(*ptr, MEMCACHED_INVALID_SERVERLIST, MEMCACHED_AT); + } +#else arcus_server_check_for_update(ptr); +#endif if (memcached_server_count(ptr) == 0) return memcached_set_error(*ptr, MEMCACHED_NO_SERVERS, MEMCACHED_AT); @@ -209,6 +226,11 @@ static memcached_return_t binary_incr_decr(memcached_st *ptr, uint8_t cmd, }; uint32_t server_key= memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length); +#ifdef CACHELIST_ERROR_HANDLING + if (server_key == UINT32_MAX) { + return memcached_set_error(*ptr, MEMCACHED_INVALID_HASHRING, MEMCACHED_AT); + } +#endif memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key); #ifdef ENABLE_REPLICATION diff --git a/libmemcached/behavior.cc b/libmemcached/behavior.cc index 16a61f01..1a93a4f6 100644 --- a/libmemcached/behavior.cc +++ b/libmemcached/behavior.cc @@ -199,7 +199,11 @@ memcached_return_t memcached_behavior_set(memcached_st *ptr, case MEMCACHED_BEHAVIOR_SORT_HOSTS: { ptr->flags.use_sort_hosts= bool(data); +#ifdef CACHELIST_ERROR_HANDLING + return run_distribution(ptr); +#else run_distribution(ptr); +#endif break; } @@ -487,9 +491,13 @@ memcached_return_t memcached_behavior_set_distribution(memcached_st *ptr, memcac } ptr->distribution= type; +#ifdef CACHELIST_ERROR_HANDLING + return run_distribution(ptr); +#else run_distribution(ptr); return MEMCACHED_SUCCESS; +#endif } return memcached_set_error(*ptr, MEMCACHED_INVALID_ARGUMENTS, MEMCACHED_AT, @@ -616,7 +624,11 @@ memcached_return_t memcached_bucket_set(memcached_st *self, if (memcached_failed(rc= memcached_virtual_bucket_create(self, host_map, forward_map, buckets, replicas))) { +#ifdef CACHELIST_ERROR_HANDLING + return memcached_behavior_set_distribution(self, old); +#else memcached_behavior_set_distribution(self, old); +#endif } return rc; diff --git a/libmemcached/collection.cc b/libmemcached/collection.cc index e8bd370c..5415296c 100644 --- a/libmemcached/collection.cc +++ b/libmemcached/collection.cc @@ -658,7 +658,13 @@ memcached_return_t memcached_set_attrs(memcached_st *ptr, const char *key, size_t key_length, memcached_coll_attrs_st *attrs) { +#ifdef CACHELIST_ERROR_HANDLING + if (arcus_server_check_for_update(ptr) != MEMCACHED_SUCCESS) { + return memcached_set_error(*ptr, MEMCACHED_INVALID_SERVERLIST, MEMCACHED_AT); + } +#else arcus_server_check_for_update(ptr); +#endif memcached_return_t rc = before_query(ptr, &key, &key_length, 1); if (rc != MEMCACHED_SUCCESS) @@ -724,6 +730,11 @@ memcached_return_t memcached_set_attrs(memcached_st *ptr, /* Find a memcached */ uint32_t server_key= memcached_generate_hash_with_redistribution(ptr, key, key_length); +#ifdef CACHELIST_ERROR_HANDLING + if (server_key == UINT32_MAX) { + return memcached_set_error(*ptr, MEMCACHED_INVALID_HASHRING, MEMCACHED_AT); + } +#endif memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key); #ifdef ENABLE_REPLICATION @@ -773,7 +784,13 @@ memcached_return_t memcached_get_attrs(memcached_st *ptr, const char *key, size_t key_length, memcached_coll_attrs_st *attrs) { +#ifdef CACHELIST_ERROR_HANDLING + if (arcus_server_check_for_update(ptr) != MEMCACHED_SUCCESS) { + return memcached_set_error(*ptr, MEMCACHED_INVALID_SERVERLIST, MEMCACHED_AT); + } +#else arcus_server_check_for_update(ptr); +#endif memcached_return_t rc = before_query(ptr, &key, &key_length, 1); if (rc != MEMCACHED_SUCCESS) @@ -792,6 +809,11 @@ memcached_return_t memcached_get_attrs(memcached_st *ptr, /* Find a memcached */ uint32_t server_key= memcached_generate_hash_with_redistribution(ptr, key, key_length); +#ifdef CACHELIST_ERROR_HANDLING + if (server_key == UINT32_MAX) { + return memcached_set_error(*ptr, MEMCACHED_INVALID_HASHRING, MEMCACHED_AT); + } +#endif memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key); /* Request */ @@ -947,7 +969,13 @@ static memcached_return_t do_coll_create(memcached_st *ptr, memcached_coll_create_attrs_st *attributes, memcached_coll_action_t verb) { +#ifdef CACHELIST_ERROR_HANDLING + if (arcus_server_check_for_update(ptr) != MEMCACHED_SUCCESS) { + return memcached_set_error(*ptr, MEMCACHED_INVALID_SERVERLIST, MEMCACHED_AT); + } +#else arcus_server_check_for_update(ptr); +#endif memcached_return_t rc = before_query(ptr, &key, &key_length, 1); if (rc != MEMCACHED_SUCCESS) @@ -1002,6 +1030,11 @@ static memcached_return_t do_coll_create(memcached_st *ptr, /* Find a memcached */ uint32_t server_key= memcached_generate_hash_with_redistribution(ptr, key, key_length); +#ifdef CACHELIST_ERROR_HANDLING + if (server_key == UINT32_MAX) { + return memcached_set_error(*ptr, MEMCACHED_INVALID_HASHRING, MEMCACHED_AT); + } +#endif memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key); #ifdef ENABLE_REPLICATION @@ -1277,7 +1310,13 @@ static memcached_return_t do_coll_insert(memcached_st *ptr, memcached_coll_create_attrs_st *attributes, memcached_coll_action_t verb) { +#ifdef CACHELIST_ERROR_HANDLING + if (arcus_server_check_for_update(ptr) != MEMCACHED_SUCCESS) { + return memcached_set_error(*ptr, MEMCACHED_INVALID_SERVERLIST, MEMCACHED_AT); + } +#else arcus_server_check_for_update(ptr); +#endif memcached_return_t rc = before_query(ptr, &key, &key_length, 1); if (rc != MEMCACHED_SUCCESS) @@ -1379,6 +1418,11 @@ static memcached_return_t do_coll_insert(memcached_st *ptr, /* Find a memcached */ uint32_t server_key= memcached_generate_hash_with_redistribution(ptr, key, key_length); +#ifdef CACHELIST_ERROR_HANDLING + if (server_key == UINT32_MAX) { + return memcached_set_error(*ptr, MEMCACHED_INVALID_HASHRING, MEMCACHED_AT); + } +#endif memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key); #ifdef ENABLE_REPLICATION @@ -1439,7 +1483,13 @@ static memcached_return_t do_coll_delete(memcached_st *ptr, bool drop_if_empty, memcached_coll_action_t verb) { +#ifdef CACHELIST_ERROR_HANDLING + if (arcus_server_check_for_update(ptr) != MEMCACHED_SUCCESS) { + return memcached_set_error(*ptr, MEMCACHED_INVALID_SERVERLIST, MEMCACHED_AT); + } +#else arcus_server_check_for_update(ptr); +#endif memcached_return_t rc = before_query(ptr, &key, &key_length, 1); if (rc != MEMCACHED_SUCCESS) @@ -1614,6 +1664,11 @@ static memcached_return_t do_coll_delete(memcached_st *ptr, /* Find a memcached */ uint32_t server_key= memcached_generate_hash_with_redistribution(ptr, key, key_length); +#ifdef CACHELIST_ERROR_HANDLING + if (server_key == UINT32_MAX) { + return memcached_set_error(*ptr, MEMCACHED_INVALID_HASHRING, MEMCACHED_AT); + } +#endif memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key); #ifdef ENABLE_REPLICATION @@ -1671,7 +1726,13 @@ static memcached_return_t do_coll_get(memcached_st *ptr, memcached_coll_result_st *result, memcached_coll_action_t verb) { +#ifdef CACHELIST_ERROR_HANDLING + if (arcus_server_check_for_update(ptr) != MEMCACHED_SUCCESS) { + return memcached_set_error(*ptr, MEMCACHED_INVALID_SERVERLIST, MEMCACHED_AT); + } +#else arcus_server_check_for_update(ptr); +#endif memcached_return_t rc = before_query(ptr, &key, &key_length, 1); if (rc != MEMCACHED_SUCCESS) @@ -1698,6 +1759,11 @@ static memcached_return_t do_coll_get(memcached_st *ptr, /* Find a memcached */ uint32_t server_key= memcached_generate_hash_with_redistribution(ptr, key, key_length); +#ifdef CACHELIST_ERROR_HANDLING + if (server_key == UINT32_MAX) { + return memcached_set_error(*ptr, MEMCACHED_INVALID_HASHRING, MEMCACHED_AT); + } +#endif memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key); /* For check space_separated_keys_is_supported */ @@ -1981,7 +2047,13 @@ static memcached_return_t do_coll_mget(memcached_st *ptr, memcached_coll_query_st *query, memcached_coll_action_t verb) { +#ifdef CACHELIST_ERROR_HANDLING + if (arcus_server_check_for_update(ptr) != MEMCACHED_SUCCESS) { + return memcached_set_error(*ptr, MEMCACHED_INVALID_SERVERLIST, MEMCACHED_AT); + } +#else arcus_server_check_for_update(ptr); +#endif memcached_return_t rc = before_query(ptr, keys, key_length, number_of_keys); if (rc != MEMCACHED_SUCCESS) @@ -2102,6 +2174,12 @@ static memcached_return_t do_coll_mget(memcached_st *ptr, for (uint32_t x= 0; x and */ @@ -2250,7 +2328,13 @@ static memcached_return_t do_bop_find_position(memcached_st *ptr, memcached_coll_order_t order, size_t *position, memcached_coll_action_t verb) { +#ifdef CACHELIST_ERROR_HANDLING + if (arcus_server_check_for_update(ptr) != MEMCACHED_SUCCESS) { + return memcached_set_error(*ptr, MEMCACHED_INVALID_SERVERLIST, MEMCACHED_AT); + } +#else arcus_server_check_for_update(ptr); +#endif memcached_return_t rc = before_query(ptr, &key, &key_length, 1); if (rc != MEMCACHED_SUCCESS) @@ -2310,6 +2394,11 @@ static memcached_return_t do_bop_find_position(memcached_st *ptr, /* Find a memcached */ uint32_t server_key= memcached_generate_hash_with_redistribution(ptr, key, key_length); +#ifdef CACHELIST_ERROR_HANDLING + if (server_key == UINT32_MAX) { + return memcached_set_error(*ptr, MEMCACHED_INVALID_HASHRING, MEMCACHED_AT); + } +#endif memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key); rc = memcached_vdo(instance, vector, 4, to_write); @@ -2354,7 +2443,13 @@ static memcached_return_t do_bop_get_by_position(memcached_st *ptr, memcached_coll_result_st *result, memcached_coll_action_t verb) { +#ifdef CACHELIST_ERROR_HANDLING + if (arcus_server_check_for_update(ptr) != MEMCACHED_SUCCESS) { + return memcached_set_error(*ptr, MEMCACHED_INVALID_SERVERLIST, MEMCACHED_AT); + } +#else arcus_server_check_for_update(ptr); +#endif memcached_return_t rc = before_query(ptr, &key, &key_length, 1); if (rc != MEMCACHED_SUCCESS) @@ -2409,6 +2504,11 @@ static memcached_return_t do_bop_get_by_position(memcached_st *ptr, /* Find a memcached */ uint32_t server_key= memcached_generate_hash_with_redistribution(ptr, key, key_length); +#ifdef CACHELIST_ERROR_HANDLING + if (server_key == UINT32_MAX) { + return memcached_set_error(*ptr, MEMCACHED_INVALID_HASHRING, MEMCACHED_AT); + } +#endif memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key); rc = memcached_vdo(instance, vector, 4, to_write); @@ -2457,7 +2557,13 @@ static memcached_return_t do_bop_find_position_with_get(memcached_st *ptr, memcached_coll_result_st *result, memcached_coll_action_t verb) { +#ifdef CACHELIST_ERROR_HANDLING + if (arcus_server_check_for_update(ptr) != MEMCACHED_SUCCESS) { + return memcached_set_error(*ptr, MEMCACHED_INVALID_SERVERLIST, MEMCACHED_AT); + } +#else arcus_server_check_for_update(ptr); +#endif memcached_return_t rc = before_query(ptr, &key, &key_length, 1); if (rc != MEMCACHED_SUCCESS) @@ -2522,6 +2628,11 @@ static memcached_return_t do_bop_find_position_with_get(memcached_st *ptr, /* Find a memcached */ uint32_t server_key= memcached_generate_hash_with_redistribution(ptr, key, key_length); +#ifdef CACHELIST_ERROR_HANDLING + if (server_key == UINT32_MAX) { + return memcached_set_error(*ptr, MEMCACHED_INVALID_HASHRING, MEMCACHED_AT); + } +#endif memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key); rc = memcached_vdo(instance, vector, 4, to_write); @@ -2569,7 +2680,13 @@ static memcached_return_t do_bop_smget(memcached_st *ptr, memcached_bop_query_st *query, memcached_coll_smget_result_st *result) { +#ifdef CACHELIST_ERROR_HANDLING + if (arcus_server_check_for_update(ptr) != MEMCACHED_SUCCESS) { + return memcached_set_error(*ptr, MEMCACHED_INVALID_SERVERLIST, MEMCACHED_AT); + } +#else arcus_server_check_for_update(ptr); +#endif memcached_return_t rc = before_query(ptr, keys, key_length, number_of_keys); if (rc != MEMCACHED_SUCCESS) @@ -2716,6 +2833,12 @@ static memcached_return_t do_bop_smget(memcached_st *ptr, for (uint32_t x= 0; x and */ @@ -2860,7 +2983,13 @@ static memcached_return_t do_coll_exist(memcached_st *ptr, const char *value, size_t value_length, memcached_coll_action_t verb) { +#ifdef CACHELIST_ERROR_HANDLING + if (arcus_server_check_for_update(ptr) != MEMCACHED_SUCCESS) { + return memcached_set_error(*ptr, MEMCACHED_INVALID_SERVERLIST, MEMCACHED_AT); + } +#else arcus_server_check_for_update(ptr); +#endif memcached_return_t rc = before_query(ptr, &key, &key_length, 1); if (rc != MEMCACHED_SUCCESS) @@ -2900,6 +3029,11 @@ static memcached_return_t do_coll_exist(memcached_st *ptr, /* Find a memcached */ uint32_t server_key= memcached_generate_hash_with_redistribution(ptr, key, key_length); +#ifdef CACHELIST_ERROR_HANDLING + if (server_key == UINT32_MAX) { + return memcached_set_error(*ptr, MEMCACHED_INVALID_HASHRING, MEMCACHED_AT); + } +#endif memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key); rc= memcached_vdo(instance, vector, 5, to_write); @@ -2940,13 +3074,24 @@ static memcached_return_t do_coll_piped_exist(memcached_st *ptr, const char *key const char * const *values, const size_t *values_length, memcached_return_t *responses, memcached_return_t *piped_rc) { +#ifdef CACHELIST_ERROR_HANDLING + if (arcus_server_check_for_update(ptr) != MEMCACHED_SUCCESS) { + return memcached_set_error(*ptr, MEMCACHED_INVALID_SERVERLIST, MEMCACHED_AT); + } +#else arcus_server_check_for_update(ptr); +#endif memcached_return_t rc = before_query(ptr, &key, &key_length, 1); if (rc != MEMCACHED_SUCCESS) return rc; uint32_t server_key= memcached_generate_hash_with_redistribution(ptr, key, key_length); +#ifdef CACHELIST_ERROR_HANDLING + if (server_key == UINT32_MAX) { + return memcached_set_error(*ptr, MEMCACHED_INVALID_HASHRING, MEMCACHED_AT); + } +#endif memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key); /* preparation for pipe operations */ @@ -3013,13 +3158,24 @@ static memcached_return_t do_coll_piped_insert(memcached_st *ptr, const char *ke memcached_return_t *responses, memcached_return_t *piped_rc, memcached_coll_action_t verb) { +#ifdef CACHELIST_ERROR_HANDLING + if (arcus_server_check_for_update(ptr) != MEMCACHED_SUCCESS) { + return memcached_set_error(*ptr, MEMCACHED_INVALID_SERVERLIST, MEMCACHED_AT); + } +#else arcus_server_check_for_update(ptr); +#endif memcached_return_t rc = before_query(ptr, &key, &key_length, 1); if (rc != MEMCACHED_SUCCESS) return rc; uint32_t server_key= memcached_generate_hash_with_redistribution(ptr, key, key_length); +#ifdef CACHELIST_ERROR_HANDLING + if (server_key == UINT32_MAX) { + return memcached_set_error(*ptr, MEMCACHED_INVALID_HASHRING, MEMCACHED_AT); + } +#endif memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key); /* preparation for pipe operations */ @@ -3163,7 +3319,13 @@ static memcached_return_t do_coll_piped_insert_bulk(memcached_st *ptr, memcached_return_t *piped_rc, memcached_coll_action_t verb) { +#ifdef CACHELIST_ERROR_HANDLING + if (arcus_server_check_for_update(ptr) != MEMCACHED_SUCCESS) { + return memcached_set_error(*ptr, MEMCACHED_INVALID_SERVERLIST, MEMCACHED_AT); + } +#else arcus_server_check_for_update(ptr); +#endif memcached_return_t rc = before_query(ptr, keys, key_length, number_of_keys); if (rc != MEMCACHED_SUCCESS) @@ -3209,6 +3371,12 @@ static memcached_return_t do_coll_piped_insert_bulk(memcached_st *ptr, for (uint32_t x= 0; xroot)) { set_last_disconnected_host(server); +#ifdef CACHELIST_ERROR_HANDLING + memcached_return_t error= run_distribution(server->root); + if (error == MEMCACHED_SUCCESS) { + error= MEMCACHED_SERVER_MARKED_DEAD; + } + return memcached_set_error(*server, error, MEMCACHED_AT); +#else run_distribution((memcached_st *)server->root); return memcached_set_error(*server, MEMCACHED_SERVER_MARKED_DEAD, MEMCACHED_AT); +#endif } server->state= MEMCACHED_SERVER_STATE_IN_TIMEOUT; diff --git a/libmemcached/constants.h b/libmemcached/constants.h index 18686c64..139ce758 100644 --- a/libmemcached/constants.h +++ b/libmemcached/constants.h @@ -66,6 +66,7 @@ #define POOL_UPDATE_SERVERLIST 1 #define POOL_MORE_CONCURRENCY 1 #define KETAMA_HASH_COLLSION 1 +#define CACHELIST_ERROR_HANDLING 1 /* Public defines */ #define MEMCACHED_DEFAULT_PORT 11211 diff --git a/libmemcached/delete.cc b/libmemcached/delete.cc index 8f1de223..86659136 100644 --- a/libmemcached/delete.cc +++ b/libmemcached/delete.cc @@ -74,6 +74,11 @@ static inline memcached_return_t ascii_delete(memcached_st *ptr, }; uint32_t server_key= memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length); +#ifdef CACHELIST_ERROR_HANDLING + if (server_key == UINT32_MAX) { + return memcached_set_error(*ptr, MEMCACHED_INVALID_HASHRING, MEMCACHED_AT); + } +#endif memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key); #ifdef ENABLE_REPLICATION @@ -169,6 +174,11 @@ static inline memcached_return_t binary_delete(memcached_st *ptr, }; uint32_t server_key= memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length); +#ifdef CACHELIST_ERROR_HANDLING + if (server_key == UINT32_MAX) { + return memcached_set_error(*ptr, MEMCACHED_INVALID_HASHRING, MEMCACHED_AT); + } +#endif memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key); #ifdef ENABLE_REPLICATION @@ -256,7 +266,13 @@ memcached_return_t memcached_delete_by_key(memcached_st *ptr, const char *key, size_t key_length, time_t expiration) { +#ifdef CACHELIST_ERROR_HANDLING + if (arcus_server_check_for_update(ptr) != MEMCACHED_SUCCESS) { + return memcached_set_error(*ptr, MEMCACHED_INVALID_SERVERLIST, MEMCACHED_AT); + } +#else arcus_server_check_for_update(ptr); +#endif memcached_return_t rc; if (memcached_failed(rc= initialize_query(ptr))) diff --git a/libmemcached/dump.cc b/libmemcached/dump.cc index 85baabf3..ee492275 100644 --- a/libmemcached/dump.cc +++ b/libmemcached/dump.cc @@ -29,7 +29,13 @@ static memcached_return_t ascii_dump(memcached_st *ptr, memcached_dump_fn *callb { memcached_return_t rc= MEMCACHED_SUCCESS; +#ifdef CACHELIST_ERROR_HANDLING + if (arcus_server_check_for_update(ptr) != MEMCACHED_SUCCESS) { + return memcached_set_error(*ptr, MEMCACHED_INVALID_SERVERLIST, MEMCACHED_AT); + } +#else arcus_server_check_for_update(ptr); +#endif for (uint32_t server_key= 0; server_key < memcached_server_count(ptr); server_key++) { diff --git a/libmemcached/exist.cc b/libmemcached/exist.cc index 5cdeb6ed..d83cc5bc 100644 --- a/libmemcached/exist.cc +++ b/libmemcached/exist.cc @@ -71,6 +71,11 @@ static memcached_return_t ascii_exist(memcached_st *memc, }; uint32_t server_key= memcached_generate_hash_with_redistribution(memc, group_key, group_key_length); +#ifdef CACHELIST_ERROR_HANDLING + if (server_key == UINT32_MAX) { + return memcached_set_error(*memc, MEMCACHED_INVALID_HASHRING, MEMCACHED_AT); + } +#endif memcached_server_write_instance_st instance= memcached_server_instance_fetch(memc, server_key); /* Send command header */ @@ -122,6 +127,11 @@ static memcached_return_t binary_exist(memcached_st *memc, }; uint32_t server_key= memcached_generate_hash_with_redistribution(memc, group_key, group_key_length); +#ifdef CACHELIST_ERROR_HANDLING + if (server_key == UINT32_MAX) { + return memcached_set_error(*memc, MEMCACHED_INVALID_HASHRING, MEMCACHED_AT); + } +#endif memcached_server_write_instance_st instance= memcached_server_instance_fetch(memc, server_key); /* write the header */ diff --git a/libmemcached/flush.cc b/libmemcached/flush.cc index 20061317..b6f44b3f 100644 --- a/libmemcached/flush.cc +++ b/libmemcached/flush.cc @@ -67,7 +67,13 @@ memcached_return_t memcached_flush_by_prefix(memcached_st *ptr, { memcached_return_t rc; +#ifdef CACHELIST_ERROR_HANDLING + if (arcus_server_check_for_update(ptr) != MEMCACHED_SUCCESS) { + return memcached_set_error(*ptr, MEMCACHED_INVALID_SERVERLIST, MEMCACHED_AT); + } +#else arcus_server_check_for_update(ptr); +#endif if (memcached_failed(rc= initialize_query(ptr))) { @@ -116,7 +122,13 @@ memcached_return_t memcached_flush(memcached_st *ptr, time_t expiration) { memcached_return_t rc; +#ifdef CACHELIST_ERROR_HANDLING + if (arcus_server_check_for_update(ptr) != MEMCACHED_SUCCESS) { + return memcached_set_error(*ptr, MEMCACHED_INVALID_SERVERLIST, MEMCACHED_AT); + } +#else arcus_server_check_for_update(ptr); +#endif if (memcached_failed(rc= initialize_query(ptr))) { diff --git a/libmemcached/get.cc b/libmemcached/get.cc index aa0a263d..3b35a25d 100644 --- a/libmemcached/get.cc +++ b/libmemcached/get.cc @@ -186,6 +186,11 @@ static memcached_return_t ascii_get_by_key(memcached_st *ptr, else { server_key= memcached_generate_hash_with_redistribution(ptr, key, key_length); +#ifdef CACHELIST_ERROR_HANDLING + if (server_key == UINT32_MAX) { + return memcached_set_error(*ptr, MEMCACHED_INVALID_HASHRING, MEMCACHED_AT); + } +#endif } instance= memcached_server_instance_fetch(ptr, server_key); @@ -218,7 +223,13 @@ static memcached_return_t simple_binary_mget(memcached_st *ptr, bool flush= (number_of_keys == 1); +#ifdef CACHELIST_ERROR_HANDLING + if (arcus_server_check_for_update(ptr) != MEMCACHED_SUCCESS) { + return memcached_set_error(*ptr, MEMCACHED_INVALID_SERVERLIST, MEMCACHED_AT); + } +#else arcus_server_check_for_update(ptr); +#endif /* If a server fails we warn about errors and start all over with sending keys @@ -235,6 +246,11 @@ static memcached_return_t simple_binary_mget(memcached_st *ptr, else { server_key= memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]); +#ifdef CACHELIST_ERROR_HANDLING + if (server_key == UINT32_MAX) { + return memcached_set_error(*ptr, MEMCACHED_INVALID_HASHRING, MEMCACHED_AT); + } +#endif } memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key); @@ -344,7 +360,13 @@ static memcached_return_t replication_binary_mget(memcached_st *ptr, uint32_t start= 0; uint64_t randomize_read= memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_RANDOMIZE_REPLICA_READ); +#ifdef CACHELIST_ERROR_HANDLING + if (arcus_server_check_for_update(ptr) != MEMCACHED_SUCCESS) { + return memcached_set_error(*ptr, MEMCACHED_INVALID_SERVERLIST, MEMCACHED_AT); + } +#else arcus_server_check_for_update(ptr); +#endif if (randomize_read) start= (uint32_t)random() % (uint32_t)(ptr->number_of_replicas + 1); @@ -469,6 +491,13 @@ static memcached_return_t binary_mget_by_key(memcached_st *ptr, for (size_t x= 0; x < number_of_keys; x++) { hash[x]= memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]); +#ifdef CACHELIST_ERROR_HANDLING + if (hash[x] == UINT32_MAX) { + libmemcached_free(ptr, hash); + libmemcached_free(ptr, dead_servers); + return memcached_set_error(*ptr, MEMCACHED_INVALID_HASHRING, MEMCACHED_AT); + } +#endif } } @@ -479,7 +508,11 @@ static memcached_return_t binary_mget_by_key(memcached_st *ptr, libmemcached_free(ptr, hash); libmemcached_free(ptr, dead_servers); +#ifdef CACHELIST_ERROR_HANDLING + return rc; +#else return MEMCACHED_SUCCESS; +#endif } /* @@ -494,7 +527,14 @@ char *memcached_get_by_key(memcached_st *ptr, uint32_t *flags, memcached_return_t *error) { +#ifdef CACHELIST_ERROR_HANDLING + if (arcus_server_check_for_update(ptr) != MEMCACHED_SUCCESS) { + *error= memcached_set_error(*ptr, MEMCACHED_INVALID_SERVERLIST, MEMCACHED_AT); + return NULL; + } +#else arcus_server_check_for_update(ptr); +#endif memcached_return_t unused; if (error == NULL) @@ -523,10 +563,20 @@ char *memcached_get_by_key(memcached_st *ptr, } bool is_group_key_set= false; +#ifdef CACHELIST_ERROR_HANDLING + uint32_t master_server_key= UINT32_MAX; /* 0 is a valid server id! */ +#else unsigned int master_server_key= (unsigned int)-1; /* 0 is a valid server id! */ +#endif if (group_key and group_key_length) { master_server_key= memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length); +#ifdef CACHELIST_ERROR_HANDLING + if (master_server_key == UINT32_MAX) { + *error= memcached_set_error(*ptr, MEMCACHED_INVALID_HASHRING, MEMCACHED_AT); + return NULL; + } +#endif is_group_key_set= true; } @@ -650,11 +700,21 @@ memcached_return_t memcached_mget_by_key(memcached_st *ptr, const size_t *key_length, size_t number_of_keys) { +#ifdef CACHELIST_ERROR_HANDLING + if (arcus_server_check_for_update(ptr) != MEMCACHED_SUCCESS) { + return memcached_set_error(*ptr, MEMCACHED_INVALID_SERVERLIST, MEMCACHED_AT); + } +#else arcus_server_check_for_update(ptr); +#endif memcached_return_t rc; bool failures_occured_in_sending= false; +#ifdef CACHELIST_ERROR_HANDLING + uint32_t master_server_key= UINT32_MAX; /* 0 is a valid server id! */ +#else unsigned int master_server_key= (unsigned int)-1; /* 0 is a valid server id! */ +#endif if (memcached_failed(rc= before_get_query(ptr, group_key, group_key_length, keys, key_length, number_of_keys))) @@ -674,6 +734,11 @@ memcached_return_t memcached_mget_by_key(memcached_st *ptr, if (group_key and group_key_length) { master_server_key= memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length); +#ifdef CACHELIST_ERROR_HANDLING + if (master_server_key == UINT32_MAX) { + return memcached_set_error(*ptr, MEMCACHED_INVALID_HASHRING, MEMCACHED_AT); + } +#endif is_group_key_set= true; } @@ -690,6 +755,12 @@ memcached_return_t memcached_mget_by_key(memcached_st *ptr, for (uint32_t x= 0; x < number_of_keys; x++) { key_to_serverkey[x]= memcached_generate_hash_with_redistribution(ptr, keys[x], key_length[x]); +#ifdef CACHELIST_ERROR_HANDLING + if (key_to_serverkey[x] == UINT32_MAX) { + DEALLOCATE_ARRAY(ptr, key_to_serverkey); + return memcached_set_error(*ptr, MEMCACHED_INVALID_HASHRING, MEMCACHED_AT); + } +#endif } /* Prepare and */ diff --git a/libmemcached/hash.cc b/libmemcached/hash.cc index 1238d17c..b1c9782e 100644 --- a/libmemcached/hash.cc +++ b/libmemcached/hash.cc @@ -131,8 +131,15 @@ static inline uint32_t _generate_hash_wrapper(const memcached_st *ptr, const cha } } +#ifdef CACHELIST_ERROR_HANDLING +static inline memcached_return_t _regen_for_auto_eject(memcached_st *ptr) +#else static inline void _regen_for_auto_eject(memcached_st *ptr) +#endif { +#ifdef CACHELIST_ERROR_HANDLING + memcached_return_t rc= MEMCACHED_SUCCESS; +#endif if (_is_auto_eject_host(ptr) && ptr->ketama.next_distribution_rebuild) { struct timeval now; @@ -140,21 +147,41 @@ static inline void _regen_for_auto_eject(memcached_st *ptr) if (gettimeofday(&now, NULL) == 0 and now.tv_sec > ptr->ketama.next_distribution_rebuild) { +#ifdef CACHELIST_ERROR_HANDLING + rc= run_distribution(ptr); +#else run_distribution(ptr); +#endif } } +#ifdef CACHELIST_ERROR_HANDLING + return rc; +#endif } +#ifdef CACHELIST_ERROR_HANDLING +memcached_return_t memcached_autoeject(memcached_st *ptr) +{ + return _regen_for_auto_eject(ptr); +} +#else void memcached_autoeject(memcached_st *ptr) { _regen_for_auto_eject(ptr); } +#endif uint32_t memcached_generate_hash_with_redistribution(memcached_st *ptr, const char *key, size_t key_length) { uint32_t hash= _generate_hash_wrapper(ptr, key, key_length); +#ifdef CACHELIST_ERROR_HANDLING + if (_regen_for_auto_eject(ptr) != MEMCACHED_SUCCESS) { + return UINT32_MAX; + } +#else _regen_for_auto_eject(ptr); +#endif return dispatch_host(ptr, hash); } diff --git a/libmemcached/hash.h b/libmemcached/hash.h index 4331454e..f04a4e56 100644 --- a/libmemcached/hash.h +++ b/libmemcached/hash.h @@ -59,7 +59,11 @@ LIBMEMCACHED_LOCAL uint32_t memcached_generate_hash_with_redistribution(memcached_st *ptr, const char *key, size_t key_length); LIBMEMCACHED_API +#ifdef CACHELIST_ERROR_HANDLING +memcached_return_t memcached_autoeject(memcached_st *ptr); +#else void memcached_autoeject(memcached_st *ptr); +#endif LIBMEMCACHED_API const char * libmemcached_string_hash(memcached_hash_t type); diff --git a/libmemcached/hosts.cc b/libmemcached/hosts.cc index ee21b7e7..3fc429ba 100644 --- a/libmemcached/hosts.cc +++ b/libmemcached/hosts.cc @@ -250,15 +250,39 @@ static memcached_return_t update_continuum(memcached_st *ptr) } #endif +#ifdef CACHELIST_ERROR_HANDLING + memcached_return_t rc= MEMCACHED_SUCCESS; + bool is_auto_ejecting; + + uint64_t is_ketama_weighted= memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED); + uint32_t points_per_server= (uint32_t) (is_ketama_weighted ? MEMCACHED_POINTS_PER_SERVER_KETAMA + : MEMCACHED_POINTS_PER_SERVER); + memcached_ketama_info_st *new_ketama_info= NULL; + + size_t size= 0; + memcached_continuum_item_st *new_continuum= NULL; + + uint64_t total_weight= 0; + uint32_t total_server= 0; + uint32_t first_weight= 0; + bool all_weights_same= true; + + gettimeofday(&now, NULL); +#else if (gettimeofday(&now, NULL)) { return memcached_set_errno(*ptr, errno, MEMCACHED_AT); } +#endif list= memcached_server_list(ptr); /* count live servers (those without a retry delay set) */ +#ifdef CACHELIST_ERROR_HANDLING + is_auto_ejecting= _is_auto_eject_host(ptr); +#else bool is_auto_ejecting= _is_auto_eject_host(ptr); +#endif if (is_auto_ejecting) { live_servers= 0; @@ -289,29 +313,51 @@ static memcached_return_t update_continuum(memcached_st *ptr) return MEMCACHED_SUCCESS; } +#ifdef CACHELIST_ERROR_HANDLING + new_ketama_info= static_cast(libmemcached_malloc(ptr, sizeof(memcached_ketama_info_st))); +#else uint64_t is_ketama_weighted= memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED); uint32_t points_per_server= (uint32_t) (is_ketama_weighted ? MEMCACHED_POINTS_PER_SERVER_KETAMA : MEMCACHED_POINTS_PER_SERVER); memcached_ketama_info_st *new_ketama_info= static_cast(libmemcached_malloc(ptr, sizeof(memcached_ketama_info_st))); +#endif if (new_ketama_info == 0) { +#ifdef CACHELIST_ERROR_HANDLING + rc= MEMCACHED_MEMORY_ALLOCATION_FAILURE; + goto continuum_error; +#else return MEMCACHED_MEMORY_ALLOCATION_FAILURE; +#endif } +#ifdef CACHELIST_ERROR_HANDLING + size= sizeof(memcached_continuum_item_st) * (live_servers + MEMCACHED_CONTINUUM_ADDITION) * points_per_server; + new_continuum= static_cast(libmemcached_malloc(ptr, size)); +#else size_t size= sizeof(memcached_continuum_item_st) * (live_servers + MEMCACHED_CONTINUUM_ADDITION) * points_per_server; memcached_continuum_item_st *new_continuum= static_cast(libmemcached_malloc(ptr, size)); +#endif if (new_continuum == 0) { libmemcached_free(ptr, new_ketama_info); +#ifdef CACHELIST_ERROR_HANDLING + rc= MEMCACHED_MEMORY_ALLOCATION_FAILURE; + goto continuum_error; +#else return MEMCACHED_MEMORY_ALLOCATION_FAILURE; +#endif } +#ifdef CACHELIST_ERROR_HANDLING +#else uint64_t total_weight= 0; uint32_t total_server= 0; uint32_t first_weight= 0; bool all_weights_same= true; +#endif if (is_ketama_weighted) { for (uint32_t host_index = 0; host_index < memcached_server_count(ptr); ++host_index) @@ -386,8 +432,14 @@ static memcached_return_t update_continuum(memcached_st *ptr) if (sort_host_length >= MEMCACHED_MAX_HOST_SORT_LENGTH || sort_host_length < 0) { +#ifdef CACHELIST_ERROR_HANDLING + rc= memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, + memcached_literal_param("snprintf(MEMCACHED_MAX_HOST_SORT_LENGTH)")); + goto continuum_error; +#else return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("snprintf(MEMCACHED_MAX_HOST_SORT_LENGTH)")); +#endif } if (DEBUG) @@ -439,8 +491,16 @@ static memcached_return_t update_continuum(memcached_st *ptr) if (sort_host_length >= MEMCACHED_MAX_HOST_SORT_LENGTH || sort_host_length < 0) { +#ifdef CACHELIST_ERROR_HANDLING + rc= memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, + memcached_literal_param("snprintf(MEMCACHED_MAX_HOST_SORT_LENGTH)")); + libmemcached_free(ptr, new_ketama_info); + libmemcached_free(ptr, new_continuum); + goto continuum_error; +#else return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("snprintf(MEMCACHED_MAX_HOST_SORT_LENGTH)")); +#endif } if (is_ketama_weighted) @@ -515,7 +575,17 @@ static memcached_return_t update_continuum(memcached_st *ptr) } #endif +#ifdef CACHELIST_ERROR_HANDLING + ptr->ketama.is_invalid= false; + ptr->ketama.last_update_failed= 0; +#endif return MEMCACHED_SUCCESS; +#ifdef CACHELIST_ERROR_HANDLING +continuum_error: + ptr->ketama.is_invalid= true; + ptr->ketama.last_update_failed= time(NULL); + return rc; +#endif } #ifdef ENABLE_REPLICATION @@ -527,7 +597,10 @@ static memcached_return_t update_continuum_based_on_rgroups(memcached_st *ptr) uint32_t pointer_per_server= MEMCACHED_POINTS_PER_SERVER; uint32_t pointer_per_hash= 1; uint32_t live_servers= 0; +#ifdef CACHELIST_ERROR_HANDLING +#else struct timeval now; +#endif #ifdef LIBMEMCACHED_WITH_ZK_INTEGRATION arcus_st *arcus= static_cast(memcached_get_server_manager(ptr)); @@ -542,10 +615,25 @@ static memcached_return_t update_continuum_based_on_rgroups(memcached_st *ptr) } #endif +#ifdef CACHELIST_ERROR_HANDLING + memcached_return_t rc= MEMCACHED_SUCCESS; + + uint64_t is_ketama_weighted= memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED); + uint32_t points_per_server= (uint32_t) (is_ketama_weighted ? MEMCACHED_POINTS_PER_SERVER_KETAMA + : MEMCACHED_POINTS_PER_SERVER); + memcached_ketama_info_st *new_ketama_info= NULL; + + size_t size= 0; + memcached_continuum_item_st *new_continuum= NULL; + + uint64_t total_weight= 0; + bool all_weights_same= true; +#else if (gettimeofday(&now, NULL)) { return memcached_set_errno(*ptr, errno, MEMCACHED_AT); } +#endif assert(_is_auto_eject_host(ptr) != true); live_servers= memcached_server_count(ptr); @@ -555,27 +643,49 @@ static memcached_return_t update_continuum_based_on_rgroups(memcached_st *ptr) } list= memcached_rgroup_list(ptr); +#ifdef CACHELIST_ERROR_HANDLING + new_ketama_info= static_cast(libmemcached_malloc(ptr, sizeof(memcached_ketama_info_st))); +#else uint64_t is_ketama_weighted= memcached_behavior_get(ptr, MEMCACHED_BEHAVIOR_KETAMA_WEIGHTED); uint32_t points_per_server= (uint32_t) (is_ketama_weighted ? MEMCACHED_POINTS_PER_SERVER_KETAMA : MEMCACHED_POINTS_PER_SERVER); memcached_ketama_info_st *new_ketama_info= static_cast(libmemcached_malloc(ptr, sizeof(memcached_ketama_info_st))); +#endif if (new_ketama_info == 0) { +#ifdef CACHELIST_ERROR_HANDLING + rc= MEMCACHED_MEMORY_ALLOCATION_FAILURE; + goto continuum_error; +#else return MEMCACHED_MEMORY_ALLOCATION_FAILURE; +#endif } +#ifdef CACHELIST_ERROR_HANDLING + size= sizeof(memcached_continuum_item_st) * (live_servers + MEMCACHED_CONTINUUM_ADDITION) * points_per_server; + new_continuum= static_cast(libmemcached_malloc(ptr, size)); +#else size_t size= sizeof(memcached_continuum_item_st) * (live_servers + MEMCACHED_CONTINUUM_ADDITION) * points_per_server; memcached_continuum_item_st *new_continuum= static_cast(libmemcached_malloc(ptr, size)); +#endif if (new_continuum == 0) { libmemcached_free(ptr, new_ketama_info); +#ifdef CACHELIST_ERROR_HANDLING + rc= MEMCACHED_MEMORY_ALLOCATION_FAILURE; + goto continuum_error; +#else return MEMCACHED_MEMORY_ALLOCATION_FAILURE; +#endif } +#ifdef CACHELIST_ERROR_HANDLING +#else uint64_t total_weight= 0; bool all_weights_same= true; +#endif if (is_ketama_weighted) { for (uint32_t host_index = 0; host_index < memcached_server_count(ptr); ++host_index) @@ -629,8 +739,16 @@ static memcached_return_t update_continuum_based_on_rgroups(memcached_st *ptr) pointer_index); if (sort_host_length >= MEMCACHED_MAX_HOST_SORT_LENGTH || sort_host_length < 0) { +#ifdef CACHELIST_ERROR_HANDLING + rc= memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, + memcached_literal_param("snprintf(MEMCACHED_MAX_HOST_SORT_LENGTH)")); + libmemcached_free(ptr, new_ketama_info); + libmemcached_free(ptr, new_continuum); + goto continuum_error; +#else return memcached_set_error(*ptr, MEMCACHED_MEMORY_ALLOCATION_FAILURE, MEMCACHED_AT, memcached_literal_param("snprintf(MEMCACHED_MAX_HOST_SORT_LENGTH)")); +#endif } if (DEBUG) @@ -708,7 +826,17 @@ static memcached_return_t update_continuum_based_on_rgroups(memcached_st *ptr) } #endif +#ifdef CACHELIST_ERROR_HANDLING + ptr->ketama.is_invalid= false; + ptr->ketama.last_update_failed= 0; +#endif return MEMCACHED_SUCCESS; +#ifdef CACHELIST_ERROR_HANDLING +continuum_error: + ptr->ketama.is_invalid= true; + ptr->ketama.last_update_failed= time(NULL); + return rc; +#endif } #endif diff --git a/libmemcached/memcached.cc b/libmemcached/memcached.cc index 8f2652fe..8843f76e 100644 --- a/libmemcached/memcached.cc +++ b/libmemcached/memcached.cc @@ -100,6 +100,10 @@ static inline bool _memcached_init(memcached_st *self) #ifdef ENABLE_REPLICATION self->flags.repl_enabled= false; #endif +#ifdef CACHELIST_ERROR_HANDLING + self->ketama.is_invalid= false; + self->ketama.last_update_failed= 0; +#endif self->virtual_bucket= NULL; diff --git a/libmemcached/memcached.h b/libmemcached/memcached.h index 4731406f..c7b7545a 100644 --- a/libmemcached/memcached.h +++ b/libmemcached/memcached.h @@ -194,6 +194,10 @@ struct memcached_st { bool weighted; time_t next_distribution_rebuild; memcached_ketama_info_st *info; +#ifdef CACHELIST_ERROR_HANDLING + bool is_invalid; + time_t last_update_failed; +#endif } ketama; struct memcached_virtual_bucket_t *virtual_bucket; diff --git a/libmemcached/return.h b/libmemcached/return.h index 2437a8e9..ee1a397e 100644 --- a/libmemcached/return.h +++ b/libmemcached/return.h @@ -141,6 +141,10 @@ enum memcached_return_t { #ifdef ENABLE_REPLICATION MEMCACHED_SWITCHOVER, MEMCACHED_REPL_SLAVE, +#endif +#ifdef CACHELIST_ERROR_HANDLING + MEMCACHED_INVALID_HASHRING, + MEMCACHED_INVALID_SERVERLIST, #endif MEMCACHED_MAXIMUM_RETURN /* Always add new error code before */ }; diff --git a/libmemcached/stats.cc b/libmemcached/stats.cc index fbc3b95f..29e1476a 100644 --- a/libmemcached/stats.cc +++ b/libmemcached/stats.cc @@ -487,7 +487,14 @@ memcached_stat_st *memcached_stat(memcached_st *self, char *args, memcached_retu memcached_return_t rc; +#ifdef CACHELIST_ERROR_HANDLING + if (arcus_server_check_for_update(self) != MEMCACHED_SUCCESS) { + *error= memcached_set_error(*self, MEMCACHED_INVALID_SERVERLIST, MEMCACHED_AT); + return NULL; + } +#else arcus_server_check_for_update(self); +#endif if (memcached_failed(rc= initialize_query(self))) { diff --git a/libmemcached/storage.cc b/libmemcached/storage.cc index aae5fd39..4beeac07 100644 --- a/libmemcached/storage.cc +++ b/libmemcached/storage.cc @@ -173,6 +173,11 @@ static memcached_return_t memcached_send_binary(memcached_st *ptr, flush= (bool) ((ptr->flags.buffer_requests && verb == SET_OP) ? 0 : 1); uint32_t server_key= memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length); +#ifdef CACHELIST_ERROR_HANDLING + if (server_key == UINT32_MAX) { + return memcached_set_error(*ptr, MEMCACHED_INVALID_HASHRING, MEMCACHED_AT); + } +#endif memcached_server_write_instance_st server= memcached_server_instance_fetch(ptr, server_key); #ifdef ENABLE_REPLICATION @@ -328,6 +333,11 @@ static memcached_return_t memcached_send_ascii(memcached_st *ptr, } uint32_t server_key= memcached_generate_hash_with_redistribution(ptr, group_key, group_key_length); +#ifdef CACHELIST_ERROR_HANDLING + if (server_key == UINT32_MAX) { + return memcached_set_error(*ptr, MEMCACHED_INVALID_HASHRING, MEMCACHED_AT); + } +#endif memcached_server_write_instance_st instance= memcached_server_instance_fetch(ptr, server_key); #ifdef ENABLE_REPLICATION @@ -402,7 +412,13 @@ static inline memcached_return_t memcached_send(memcached_st *ptr, uint64_t cas, memcached_storage_action_t verb) { +#ifdef CACHELIST_ERROR_HANDLING + if (arcus_server_check_for_update(ptr) != MEMCACHED_SUCCESS) { + return memcached_set_error(*ptr, MEMCACHED_INVALID_SERVERLIST, MEMCACHED_AT); + } +#else arcus_server_check_for_update(ptr); +#endif memcached_return_t rc; if (memcached_failed(rc= initialize_query(ptr))) diff --git a/libmemcached/strerror.cc b/libmemcached/strerror.cc index 91a7ff0d..d673d122 100644 --- a/libmemcached/strerror.cc +++ b/libmemcached/strerror.cc @@ -317,6 +317,13 @@ const char *memcached_strerror(memcached_st *, memcached_return_t rc) return "REPLICATION REPL_SLAVE"; #endif +#ifdef CACHELIST_ERROR_HANDLING + case MEMCACHED_INVALID_HASHRING: + return "INVALID HASHRING"; + + case MEMCACHED_INVALID_SERVERLIST: + return "INVALID MEMCACHED"; +#endif default: case MEMCACHED_MAXIMUM_RETURN: return "INVALID memcached_return_t"; diff --git a/libmemcached/util/pool.cc b/libmemcached/util/pool.cc index d98d8e37..ec5ea07e 100644 --- a/libmemcached/util/pool.cc +++ b/libmemcached/util/pool.cc @@ -65,9 +65,17 @@ #include #ifdef LIBMEMCACHED_WITH_ZK_INTEGRATION +#ifdef CACHELIST_ERROR_HANDLING +static memcached_return_t member_update_cachelist(memcached_st **memc, + memcached_pool_st* pool); +#else static memcached_return_t member_update_cachelist(memcached_st *memc, memcached_pool_st* pool); #endif +#endif +#ifdef CACHELIST_ERROR_HANDLING +static void *pool_thread_main(void *arg); +#endif struct memcached_pool_st { @@ -75,6 +83,10 @@ struct memcached_pool_st pthread_mutex_t mutex; pthread_cond_t cond; memcached_st *master; +#ifdef CACHELIST_ERROR_HANDLING + memcached_st *invalid_mc_head; + memcached_st *invalid_mc_tail; +#endif memcached_st *used_mc_head; memcached_st *used_mc_tail; memcached_st *used_bk_head; @@ -88,9 +100,17 @@ struct memcached_pool_st uint32_t cur_size; bool _owns_master; struct timespec _timeout; +#ifdef CACHELIST_ERROR_HANDLING + pthread_t _tid; + bool _thread_running; +#endif memcached_pool_st(memcached_st *master_arg, size_t max_arg) : master(master_arg), +#ifdef CACHELIST_ERROR_HANDLING + invalid_mc_head(NULL), + invalid_mc_tail(NULL), +#endif used_mc_head(NULL), used_mc_tail(NULL), used_bk_head(NULL), @@ -107,6 +127,9 @@ struct memcached_pool_st #endif cur_size(0), _owns_master(false) +#ifdef CACHELIST_ERROR_HANDLING + ,_thread_running(true) +#endif { pthread_mutex_init(&master_lock, NULL); pthread_mutex_init(&mutex, NULL); @@ -129,6 +152,14 @@ struct memcached_pool_st ~memcached_pool_st() { +#ifdef CACHELIST_ERROR_HANDLING + while (invalid_mc_head) + { + memcached_st *mc= invalid_mc_head; + invalid_mc_head= mc->mc_next; + memcached_free(mc); + } +#endif while (used_bk_head) { memcached_st *mc= used_bk_head; @@ -156,6 +187,10 @@ struct memcached_pool_st pthread_cond_destroy(&cond); delete [] mc_pool; delete [] bk_pool; +#ifdef CACHELIST_ERROR_HANDLING + _thread_running= false; + pthread_join(_tid, NULL); +#endif if (_owns_master) { memcached_free(master); @@ -195,6 +230,52 @@ struct memcached_pool_st } }; +#ifdef CACHELIST_ERROR_HANDLING +static memcached_st *invalid_list_get(memcached_pool_st* pool) +{ + memcached_st *mc; + + if ((mc= pool->invalid_mc_head) != NULL) + { + pool->invalid_mc_head= mc->mc_next; + if (pool->invalid_mc_head == NULL) { + pool->invalid_mc_tail= NULL; + } + (void)member_update_cachelist(&mc, pool); + return mc; + } + return mc; +} + +static void invalid_list_add(memcached_pool_st* pool, bool head, memcached_st *mc) +{ + if (head) + { + mc->mc_next= pool->invalid_mc_head; + pool->invalid_mc_head= mc; + if (pool->invalid_mc_tail == NULL) { + pool->invalid_mc_tail= mc; + } + } + else /* tail */ + { + mc->mc_next= NULL; + if (pool->invalid_mc_tail) { + pool->invalid_mc_tail->mc_next= mc; + } else { + pool->invalid_mc_head= mc; + } + pool->invalid_mc_tail= mc; + } + + if (pool->wait_count > 0) + { + /* we might have people waiting for a connection.. wake them up :-) */ + pthread_cond_broadcast(&pool->cond); + } +} +#endif + /* * used mc list functions */ @@ -202,6 +283,9 @@ static memcached_st *mc_list_get(memcached_pool_st* pool) { memcached_st *mc; +#ifdef CACHELIST_ERROR_HANDLING +do_action: +#endif #ifdef POOL_MORE_CONCURRENCY #ifdef LIBMEMCACHED_WITH_ZK_INTEGRATION if ((mc= pool->used_bk_head) != NULL) @@ -210,7 +294,15 @@ static memcached_st *mc_list_get(memcached_pool_st* pool) if (pool->used_bk_head == NULL) { pool->used_bk_tail= NULL; } +#ifdef CACHELIST_ERROR_HANDLING + if (member_update_cachelist(&mc, pool) != MEMCACHED_SUCCESS) + { + invalid_list_add(pool, false, mc); + goto do_action; + } +#else (void)member_update_cachelist(mc, pool); +#endif return mc; } #endif @@ -255,12 +347,23 @@ static void mc_list_add(memcached_pool_st* pool, bool head, memcached_st *mc) static memcached_st *mc_pool_get(memcached_pool_st* pool) { +#ifdef CACHELIST_ERROR_HANDLING +do_action: +#endif #ifdef POOL_MORE_CONCURRENCY #ifdef LIBMEMCACHED_WITH_ZK_INTEGRATION if (pool->bk_top > -1) { memcached_st *mc= pool->bk_pool[pool->bk_top--]; +#ifdef CACHELIST_ERROR_HANDLING + if (member_update_cachelist(&mc, pool) != MEMCACHED_SUCCESS) + { + invalid_list_add(pool, false, mc); + goto do_action; + } +#else (void)member_update_cachelist(mc, pool); +#endif return mc; } #endif @@ -321,6 +424,19 @@ bool memcached_pool_st::init(uint32_t initial) return false; } +#ifdef CACHELIST_ERROR_HANDLING + pthread_attr_t attr; + pthread_attr_init(&attr); + pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM); + if (pthread_create(&_tid, &attr, pool_thread_main, (void*)this) != 0) + { + ZOO_LOG_ERROR(("Cannot create pool manager thread: %s(%d)", strerror(errno), errno)); + delete [] mc_pool; + delete [] bk_pool; + return false; + } +#endif + /* Try to create the initial size of the pool. An allocation failure at this time is not fatal.. @@ -418,6 +534,13 @@ memcached_st* memcached_pool_st::fetch(memcached_return_t& rc) memcached_st* memcached_pool_st::fetch(const struct timespec& relative_time, memcached_return_t& rc) { +#ifdef CACHELIST_ERROR_HANDLING + if (master->ketama.is_invalid) + { + rc= MEMCACHED_INVALID_HASHRING; + return NULL; + } +#endif rc= MEMCACHED_SUCCESS; if (pthread_mutex_lock(&mutex)) @@ -445,8 +568,21 @@ memcached_st* memcached_pool_st::fetch(const struct timespec& relative_time, mem rc= MEMCACHED_MEMORY_ALLOCATION_FAILURE; break; } +#ifdef CACHELIST_ERROR_HANDLING + continue; +#endif + } +#ifdef CACHELIST_ERROR_HANDLING + if ((ret= invalid_list_get(this)) != NULL) + { + if (member_update_cachelist(&ret, this) != MEMCACHED_SUCCESS) { + rc= MEMCACHED_INVALID_SERVERLIST; + } + break; } +#else else /* cur_size == max_size */ +#endif { if (relative_time.tv_sec == 0 and relative_time.tv_nsec == 0) { @@ -514,13 +650,24 @@ bool memcached_pool_st::release(memcached_st *released, memcached_return_t& rc) #ifdef LIBMEMCACHED_WITH_ZK_INTEGRATION else if (compare_ketama_version(released) == false) { +#ifdef CACHELIST_ERROR_HANDLING + if (member_update_cachelist(&released, this) != MEMCACHED_SUCCESS) + { + invalid_list_add(this, false, released); + goto do_action; + } +#else (void)member_update_cachelist(released, this); +#endif } #endif #endif mc_list_add(this, true, released); /* true: to the head side */ +#ifdef CACHELIST_ERROR_HANDLING +do_action: +#endif (void)pthread_mutex_unlock(&mutex); return true; @@ -730,6 +877,33 @@ void memcached_pool_unlock(memcached_pool_st* pool) /* * static functions */ +#ifdef CACHELIST_ERROR_HANDLING +static memcached_return_t member_update_cachelist(memcached_st **memc, + memcached_pool_st* pool) +{ + memcached_st *clone= memcached_clone(NULL, *memc); + if (clone == NULL) + { + return MEMCACHED_MEMORY_ALLOCATION_FAILURE; + } + memcached_return_t rc= memcached_update_cachelist_with_master(*memc, pool->master); + if (rc == MEMCACHED_SUCCESS) + { + memcached_free(clone); +#ifdef UPDATE_HASH_RING_OF_FETCHED_MC + (*memc)->configure.ketama_version= pool->ketama_version(); +#else + (*memc)->configure.version= pool->version(); +#endif + } + else + { + memcached_free(*memc); + *memc= clone; + } + return rc; +} +#else static memcached_return_t member_update_cachelist(memcached_st *memc, memcached_pool_st* pool) { @@ -746,6 +920,7 @@ static memcached_return_t member_update_cachelist(memcached_st *memc, } return rc; } +#endif static int mc_list_remove_all(memcached_pool_st* pool) { @@ -767,6 +942,9 @@ static int mc_list_remove_all(memcached_pool_st* pool) static void mc_list_update_cachelist(memcached_pool_st* pool) { memcached_st *mc; +#ifdef CACHELIST_ERROR_HANDLING + memcached_return_t rc; +#endif #ifdef POOL_MORE_CONCURRENCY while (pool->used_bk_head) @@ -780,17 +958,50 @@ static void mc_list_update_cachelist(memcached_pool_st* pool) (void)pthread_mutex_unlock(&pool->mutex); /* update chachelist without pool lock */ +#ifdef CACHELIST_ERROR_HANDLING + rc= member_update_cachelist(&mc, pool); +#else (void)member_update_cachelist(mc, pool); +#endif (void)pthread_mutex_lock(&pool->mutex); /* push the mc into mc_list */ +#ifdef CACHELIST_ERROR_HANDLING + if (rc != MEMCACHED_SUCCESS) + { + invalid_list_add(pool, false, mc); + } + else +#endif mc_list_add(pool, false, mc); /* false: to the tail side */ } #else mc= pool->used_mc_head; +#ifdef CACHELIST_ERROR_HANDLING + memcached_st *prev= NULL; +#endif while (mc) { +#ifdef CACHELIST_ERROR_HANDLING + if (member_update_cachelist(&mc, pool) != MEMCACHED_SUCCESS) + { + invalid_list_add(pool, false, mc); + if (prev != NULL) + { + prev->mc_next= mc->mc_next; + } + else + { + pool->used_mc_head= mc->mc_next; + } + } + else + { + prev= mc; + } +#else (void)member_update_cachelist(mc, pool); +#endif mc= mc->mc_next; } #endif @@ -800,6 +1011,9 @@ static void mc_pool_update_cachelist(memcached_pool_st* pool) { #ifdef POOL_MORE_CONCURRENCY memcached_st *mc; +#ifdef CACHELIST_ERROR_HANDLING + memcached_return_t rc; +#endif while (pool->bk_top > -1) { @@ -808,16 +1022,40 @@ static void mc_pool_update_cachelist(memcached_pool_st* pool) (void)pthread_mutex_unlock(&pool->mutex); /* update chachelist without pool lock */ +#ifdef CACHELIST_ERROR_HANDLING + rc= member_update_cachelist(&mc, pool); +#else (void)member_update_cachelist(mc, pool); +#endif (void)pthread_mutex_lock(&pool->mutex); /* push the mc into mc_pool */ +#ifdef CACHELIST_ERROR_HANDLING + if (rc != MEMCACHED_SUCCESS) + { + invalid_list_add(pool, false, mc); + } + else +#endif mc_pool_add(pool, mc); } #else for (int xx= 0; xx <= pool->mc_top; ++xx) { +#ifdef CACHELIST_ERROR_HANDLING + if (member_update_cachelist(&pool->mc_pool[xx], pool) != MEMCACHED_SUCCESS) + { + invalid_list_add(pool, false, pool->mc_pool[xx]); + for (int yy= xx; yy < pool->mc_top; ++yy) + { + pool->mc_pool[yy]= pool->mc_pool[yy + 1]; + } + pool->mc_top--; + xx--; + } +#else (void)member_update_cachelist(pool->mc_pool[xx], pool); +#endif } #endif } @@ -893,6 +1131,14 @@ memcached_return_t memcached_pool_update_cachelist(memcached_pool_st *pool, (void)pthread_mutex_lock(&pool->mutex); rc= memcached_update_cachelist(pool->master, serverinfo, servercount, &serverlist_changed); +#ifdef CACHELIST_ERROR_HANDLING + if (rc != MEMCACHED_SUCCESS) + { + (void)pthread_mutex_unlock(&pool->mutex); + (void)pthread_mutex_unlock(&pool->master_lock); + return rc; + } +#endif if (init) { #ifdef UPDATE_HASH_RING_OF_FETCHED_MC @@ -967,7 +1213,11 @@ memcached_return_t memcached_pool_update_member(memcached_pool_st* pool, memcach if (mc->configure.version != pool->version()) #endif { +#ifdef CACHELIST_ERROR_HANDLING + rc= member_update_cachelist(&mc, pool); +#else (void)member_update_cachelist(mc, pool); +#endif } (void)pthread_mutex_unlock(&pool->mutex); return rc; @@ -1012,3 +1262,26 @@ uint16_t get_memcached_pool_size(memcached_pool_st* pool) #endif +#ifdef CACHELIST_ERROR_HANDLING +static void *pool_thread_main(void *arg) +{ + memcached_pool_st *pool= (memcached_pool_st*)arg; + while (pool->_thread_running) + { + if (pool->master->ketama.is_invalid + && pool->master->ketama.last_update_failed + && time(NULL) > pool->master->ketama.last_update_failed) + { + (void)pthread_mutex_lock(&pool->master_lock); + run_distribution(pool->master); + (void)pthread_mutex_unlock(&pool->master_lock); + } + if (pool->_thread_running) + { + sleep(1); + } + } + + return NULL; +} +#endif diff --git a/libmemcached/version.cc b/libmemcached/version.cc index d632be41..5f4afff5 100644 --- a/libmemcached/version.cc +++ b/libmemcached/version.cc @@ -236,7 +236,13 @@ memcached_return_t memcached_version(memcached_st *ptr) { if (ptr) { +#ifdef CACHELIST_ERROR_HANDLING + if (arcus_server_check_for_update(ptr) != MEMCACHED_SUCCESS) { + return memcached_set_error(*ptr, MEMCACHED_INVALID_SERVERLIST, MEMCACHED_AT); + } +#else arcus_server_check_for_update(ptr); +#endif memcached_return_t rc; if ((rc= initialize_query(ptr)) != MEMCACHED_SUCCESS) diff --git a/tests/ketama.cc b/tests/ketama.cc index f91d853e..c8518cd2 100644 --- a/tests/ketama.cc +++ b/tests/ketama.cc @@ -209,7 +209,11 @@ test_return_t auto_eject_hosts(memcached_st *trash) */ for (size_t x= 0; x < 99; x++) { +#ifdef CACHELIST_ERROR_HANDLING + test_compare(MEMCACHED_SUCCESS, memcached_autoeject(memc)); +#else memcached_autoeject(memc); +#endif uint32_t server_idx= memcached_generate_hash(memc, ketama_test_cases[x].key, strlen(ketama_test_cases[x].key)); test_true(server_idx != 2); }