diff --git a/include/neuron/metrics.h b/include/neuron/metrics.h index 0bfd993d0..12765c37c 100644 --- a/include/neuron/metrics.h +++ b/include/neuron/metrics.h @@ -26,11 +26,16 @@ extern "C" { #include +#include + #include "define.h" #include "type.h" #include "utils/rolling_counter.h" +#include "utils/utextend.h" #include "utils/uthash.h" +extern int64_t global_timestamp; + typedef enum { NEU_METRIC_TYPE_COUNTER, NEU_METRIC_TYPE_GAUAGE, @@ -249,6 +254,7 @@ typedef struct { // node metrics typedef struct { + pthread_mutex_t lock; // spin lock neu_node_type_e type; // node type char * name; // node name neu_metric_entry_t * entries; // node metric entries @@ -285,6 +291,16 @@ typedef struct { neu_metric_entry_t *registered_metrics; } neu_metrics_t; +void neu_metrics_init(); +void neu_metrics_add_node(const neu_adapter_t *adapter); +void neu_metrics_del_node(const neu_adapter_t *adapter); +int neu_metrics_register_entry(const char *name, const char *help, + neu_metric_type_e type); +void neu_metrics_unregister_entry(const char *name); + +typedef void (*neu_metrics_cb_t)(const neu_metrics_t *metrics, void *data); +void neu_metrics_visist(neu_metrics_cb_t cb, void *data); + static inline const char *neu_metric_type_str(neu_metric_type_e type) { if (NEU_METRIC_TYPE_COUNTER == type) { @@ -316,22 +332,41 @@ static inline void neu_group_metrics_free(neu_group_metrics_t *group_metrics) HASH_ITER(hh, group_metrics->entries, e, tmp) { HASH_DEL(group_metrics->entries, e); + neu_metrics_unregister_entry(e->name); neu_metric_entry_free(e); } free(group_metrics->name); free(group_metrics); } +static inline neu_node_metrics_t * +neu_node_metrics_new(neu_adapter_t *adapter, neu_node_type_e type, char *name) +{ + neu_node_metrics_t *node_metrics = + (neu_node_metrics_t *) calloc(1, sizeof(*node_metrics)); + if (NULL != node_metrics) { + pthread_mutex_init(&node_metrics->lock, NULL); + node_metrics->type = type; + node_metrics->name = name; + node_metrics->adapter = adapter; + // neu_metrics_add_node(adapter); + } + return node_metrics; +} + static inline void neu_node_metrics_free(neu_node_metrics_t *node_metrics) { if (NULL == node_metrics) { return; } + pthread_mutex_destroy(&node_metrics->lock); + neu_metric_entry_t *e = NULL, *etmp = NULL; HASH_ITER(hh, node_metrics->entries, e, etmp) { HASH_DEL(node_metrics->entries, e); + neu_metrics_unregister_entry(e->name); neu_metric_entry_free(e); } @@ -345,15 +380,148 @@ static inline void neu_node_metrics_free(neu_node_metrics_t *node_metrics) free(node_metrics); } -void neu_metrics_init(); -void neu_metrics_add_node(const neu_adapter_t *adapter); -void neu_metrics_del_node(const neu_adapter_t *adapter); -int neu_metrics_register_entry(const char *name, const char *help, - neu_metric_type_e type); -void neu_metrics_unregister_entry(const char *name); +static inline int neu_node_metrics_add(neu_node_metrics_t *node_metrics, + const char *group_name, const char *name, + const char *help, neu_metric_type_e type, + uint64_t init) +{ + int rv = 0; -typedef void (*neu_metrics_cb_t)(const neu_metrics_t *metrics, void *data); -void neu_metrics_visist(neu_metrics_cb_t cb, void *data); + if (0 > neu_metrics_register_entry(name, help, type)) { + return -1; + } + + pthread_mutex_lock(&node_metrics->lock); + if (NULL == group_name) { + rv = neu_metric_entries_add(&node_metrics->entries, name, help, type, + init); + } else { + neu_group_metrics_t *group_metrics = NULL; + HASH_FIND_STR(node_metrics->group_metrics, group_name, group_metrics); + if (NULL != group_metrics) { + rv = neu_metric_entries_add(&group_metrics->entries, name, help, + type, init); + } else { + group_metrics = + (neu_group_metrics_t *) calloc(1, sizeof(*group_metrics)); + if (NULL != group_metrics && + NULL != (group_metrics->name = strdup(group_name))) { + HASH_ADD_STR(node_metrics->group_metrics, name, group_metrics); + rv = neu_metric_entries_add(&group_metrics->entries, name, help, + type, init); + } else { + free(group_metrics); + rv = -1; + } + } + } + pthread_mutex_unlock(&node_metrics->lock); + + if (0 != rv) { + neu_metrics_unregister_entry(name); + } + return rv; +} + +static inline int neu_node_metrics_update(neu_node_metrics_t *node_metrics, + const char * group, + const char *metric_name, uint64_t n) +{ + neu_metric_entry_t *entry = NULL; + + pthread_mutex_lock(&node_metrics->lock); + if (NULL == group) { + HASH_FIND_STR(node_metrics->entries, metric_name, entry); + } else if (NULL != node_metrics->group_metrics) { + neu_group_metrics_t *g = NULL; + HASH_FIND_STR(node_metrics->group_metrics, group, g); + if (NULL != g) { + HASH_FIND_STR(g->entries, metric_name, entry); + } + } + + if (NULL == entry) { + pthread_mutex_unlock(&node_metrics->lock); + return -1; + } + + if (NEU_METRIC_TYPE_COUNTER == entry->type) { + entry->value += n; + } else if (NEU_METRIC_TYPE_ROLLING_COUNTER == entry->type) { + entry->value = + neu_rolling_counter_inc(entry->rcnt, global_timestamp, n); + } else { + entry->value = n; + } + pthread_mutex_unlock(&node_metrics->lock); + + return 0; +} + +static inline void neu_node_metrics_reset(neu_node_metrics_t *node_metrics) +{ + neu_metric_entry_t *entry = NULL; + + pthread_mutex_lock(&node_metrics->lock); + HASH_LOOP(hh, node_metrics->entries, entry) + { + entry->value = 0; + if (NEU_METRIC_TYPE_ROLLING_COUNTER == entry->type) { + neu_rolling_counter_reset(entry->rcnt); + } + } + + neu_group_metrics_t *g = NULL; + HASH_LOOP(hh, node_metrics->group_metrics, g) + { + HASH_LOOP(hh, g->entries, entry) + { + entry->value = 0; + if (NEU_METRIC_TYPE_ROLLING_COUNTER == entry->type) { + neu_rolling_counter_reset(entry->rcnt); + } + } + } + pthread_mutex_unlock(&node_metrics->lock); +} + +static inline int +neu_node_metrics_update_group(neu_node_metrics_t *node_metrics, + const char * group_name, + const char * new_group_name) +{ + int rv = -1; + neu_group_metrics_t *group_metrics = NULL; + + pthread_mutex_lock(&node_metrics->lock); + HASH_FIND_STR(node_metrics->group_metrics, group_name, group_metrics); + if (NULL != group_metrics) { + char *name = strdup(new_group_name); + if (NULL != name) { + HASH_DEL(node_metrics->group_metrics, group_metrics); + free(group_metrics->name); + group_metrics->name = name; + HASH_ADD_STR(node_metrics->group_metrics, name, group_metrics); + rv = 0; + } + } + pthread_mutex_unlock(&node_metrics->lock); + + return rv; +} + +static inline void neu_node_metrics_del_group(neu_node_metrics_t *node_metrics, + const char * group_name) +{ + neu_group_metrics_t *gm = NULL; + pthread_mutex_lock(&node_metrics->lock); + HASH_FIND_STR(node_metrics->group_metrics, group_name, gm); + if (NULL != gm) { + HASH_DEL(node_metrics->group_metrics, gm); + neu_group_metrics_free(gm); + } + pthread_mutex_unlock(&node_metrics->lock); +} #ifdef __cplusplus } diff --git a/include/neuron/utils/rolling_counter.h b/include/neuron/utils/rolling_counter.h index c0a7c010f..ed43e0528 100644 --- a/include/neuron/utils/rolling_counter.h +++ b/include/neuron/utils/rolling_counter.h @@ -37,9 +37,9 @@ extern "C" { typedef struct { uint64_t val; // accumulator uint64_t ts; // head time stamp in milliseconds - uint32_t res : 22; // time resolution in milliseconds + uint32_t res : 21; // time resolution in milliseconds uint32_t hd : 5; // head position - uint32_t n : 5; // number of counters + uint32_t n : 6; // number of counters uint32_t counts[]; // bins of counters } neu_rolling_counter_t; @@ -90,6 +90,15 @@ static inline uint64_t neu_rolling_counter_inc(neu_rolling_counter_t *counter, return counter->val; } +/** Reset the counter. + */ +static inline void neu_rolling_counter_reset(neu_rolling_counter_t *counter) +{ + counter->val = 0; + counter->hd = 0; + memset(counter->counts, 0, counter->n * sizeof(counter->counts[0])); +} + /** Return the counter value. * * NOTE: may return stale value if the counter is not updated frequent enough. diff --git a/plugins/restful/metric_handle.c b/plugins/restful/metric_handle.c index d1e56d972..d18884b05 100644 --- a/plugins/restful/metric_handle.c +++ b/plugins/restful/metric_handle.c @@ -19,8 +19,11 @@ #include +#include + #include "define.h" #include "metrics.h" +#include "plugin.h" #include "utils/asprintf.h" #include "utils/http.h" #include "utils/http_handler.h" @@ -164,8 +167,8 @@ static inline void gen_global_metrics(const neu_metrics_t *metrics, metrics->south_running_nodes, metrics->south_disconnected_nodes); } -static inline void -gen_single_node_metrics(const neu_node_metrics_t *node_metrics, FILE *stream) +static inline void gen_single_node_metrics(neu_node_metrics_t *node_metrics, + FILE * stream) { fprintf(stream, "# HELP node_type Driver(1) or APP(2)\n" @@ -175,8 +178,13 @@ gen_single_node_metrics(const neu_node_metrics_t *node_metrics, FILE *stream) neu_metric_entry_t *e = NULL; + pthread_mutex_lock(&node_metrics->lock); HASH_LOOP(hh, node_metrics->entries, e) { + if (NEU_METRIC_TYPE_ROLLING_COUNTER == e->type) { + // force clean stale value + e->value = neu_rolling_counter_inc(e->rcnt, global_timestamp, 0); + } fprintf(stream, "# HELP %s %s\n# TYPE %s %s\n%s{node=\"%s\"} %" PRIu64 "\n", e->name, e->help, e->name, neu_metric_type_str(e->type), @@ -188,6 +196,11 @@ gen_single_node_metrics(const neu_node_metrics_t *node_metrics, FILE *stream) { HASH_LOOP(hh, g->entries, e) { + if (NEU_METRIC_TYPE_ROLLING_COUNTER == e->type) { + // force clean stale value + e->value = + neu_rolling_counter_inc(e->rcnt, global_timestamp, 0); + } fprintf(stream, "# HELP %s %s\n# TYPE %s %s\n%s{node=\"%s\",group=\"%s\"} " "%" PRIu64 "\n", @@ -195,6 +208,7 @@ gen_single_node_metrics(const neu_node_metrics_t *node_metrics, FILE *stream) e->name, node_metrics->name, g->name, e->value); } } + pthread_mutex_unlock(&node_metrics->lock); } static inline bool has_entry(neu_node_metrics_t *node_metrics, const char *name) @@ -216,7 +230,7 @@ static inline bool has_entry(neu_node_metrics_t *node_metrics, const char *name) return false; } -static void gen_all_node_metrics(const neu_metrics_t *metrics, int type_filter, +static void gen_all_node_metrics(neu_metrics_t *metrics, int type_filter, FILE *stream) { neu_metric_entry_t * e = NULL, *r = NULL; @@ -261,11 +275,18 @@ static void gen_all_node_metrics(const neu_metrics_t *metrics, int type_filter, r->help, r->name, neu_metric_type_str(r->type)); } + pthread_mutex_lock(&n->lock); HASH_FIND_STR(n->entries, r->name, e); if (e) { + if (NEU_METRIC_TYPE_ROLLING_COUNTER == e->type) { + // force clean stale value + e->value = + neu_rolling_counter_inc(e->rcnt, global_timestamp, 0); + } fprintf(stream, "%s{node=\"%s\"} %" PRIu64 "\n", e->name, n->name, e->value); + pthread_mutex_unlock(&n->lock); continue; } @@ -273,11 +294,17 @@ static void gen_all_node_metrics(const neu_metrics_t *metrics, int type_filter, { HASH_FIND_STR(g->entries, r->name, e); if (e) { + if (NEU_METRIC_TYPE_ROLLING_COUNTER == e->type) { + // force clean stale value + e->value = neu_rolling_counter_inc(e->rcnt, + global_timestamp, 0); + } fprintf(stream, "%s{node=\"%s\",group=\"%s\"} %" PRIu64 "\n", e->name, n->name, g->name, e->value); } } + pthread_mutex_unlock(&n->lock); } } } @@ -289,7 +316,7 @@ struct context { const char *node; }; -static void gen_node_metrics(const neu_metrics_t *metrics, struct context *ctx) +static void gen_node_metrics(neu_metrics_t *metrics, struct context *ctx) { if (ctx->node[0]) { neu_node_metrics_t *n = NULL; diff --git a/src/adapter/adapter.c b/src/adapter/adapter.c index 46fec10ef..18489ae44 100644 --- a/src/adapter/adapter.c +++ b/src/adapter/adapter.c @@ -371,22 +371,19 @@ static int adapter_register_metric(neu_adapter_t *adapter, const char *name, uint64_t init) { if (NULL == adapter->metrics) { - adapter->metrics = calloc(1, sizeof(*adapter->metrics)); + adapter->metrics = + neu_node_metrics_new(adapter, adapter->module->type, adapter->name); if (NULL == adapter->metrics) { return -1; } - adapter->metrics->type = adapter->module->type; - adapter->metrics->name = adapter->name; - adapter->metrics->adapter = adapter; neu_metrics_add_node(adapter); } - if (0 > neu_metric_entries_add(&adapter->metrics->entries, name, help, type, - init)) { + if (0 > + neu_node_metrics_add(adapter->metrics, NULL, name, help, type, init)) { return -1; } - neu_metrics_register_entry(name, help, type); return 0; } @@ -394,50 +391,17 @@ static int adapter_update_metric(neu_adapter_t *adapter, const char *metric_name, uint64_t n, const char *group) { - neu_metric_entry_t *entry = NULL; if (NULL == adapter->metrics) { return -1; } - if (NULL == group) { - HASH_FIND_STR(adapter->metrics->entries, metric_name, entry); - } else if (NULL != adapter->metrics->group_metrics) { - neu_group_metrics_t *g = NULL; - HASH_FIND_STR(adapter->metrics->group_metrics, group, g); - if (NULL != g) { - HASH_FIND_STR(g->entries, metric_name, entry); - } - } - - if (NULL == entry) { - return -1; - } - - if (NEU_METRIC_TYPE_COUNTER == entry->type) { - entry->value += n; - } else if (NEU_METRIC_TYPE_ROLLING_COUNTER == entry->type) { - entry->value = - neu_rolling_counter_inc(entry->rcnt, global_timestamp, n); - } else { - entry->value = n; - } - - return 0; + return neu_node_metrics_update(adapter->metrics, group, metric_name, n); } -static void adapter_reset_metrics(neu_adapter_t *adapter) +static inline void adapter_reset_metrics(neu_adapter_t *adapter) { - neu_metric_entry_t *entry = NULL; - if (NULL == adapter->metrics) { - return; - } - - HASH_LOOP(hh, adapter->metrics->entries, entry) { entry->value = 0; } - - neu_group_metrics_t *g = NULL; - HASH_LOOP(hh, adapter->metrics->group_metrics, g) - { - HASH_LOOP(hh, g->entries, entry) { entry->value = 0; } + if (NULL != adapter->metrics) { + neu_node_metrics_reset(adapter->metrics); } } @@ -825,11 +789,13 @@ static int adapter_loop(enum neu_event_io_type type, int fd, void *usr_data) neu_resp_get_node_state_t *resp = (neu_resp_get_node_state_t *) &header[1]; - neu_metric_entry_t *e = NULL; if (NULL != adapter->metrics) { + pthread_mutex_lock(&adapter->metrics->lock); + neu_metric_entry_t *e = NULL; HASH_FIND_STR(adapter->metrics->entries, NEU_METRIC_LAST_RTT_MS, e); + resp->rtt = NULL != e ? e->value : 0; + pthread_mutex_unlock(&adapter->metrics->lock); } - resp->rtt = NULL != e ? e->value : 0; resp->state = neu_adapter_get_state(adapter); header->type = NEU_RESP_GET_NODE_STATE; neu_msg_exchange(header); @@ -1301,11 +1267,6 @@ void neu_adapter_destroy(neu_adapter_t *adapter) if (NULL != adapter->metrics) { neu_metrics_del_node(adapter); - neu_metric_entry_t *e = NULL; - HASH_LOOP(hh, adapter->metrics->entries, e) - { - neu_metrics_unregister_entry(e->name); - } neu_node_metrics_free(adapter->metrics); } @@ -1483,113 +1444,43 @@ int neu_adapter_register_group_metric(neu_adapter_t *adapter, const char *help, neu_metric_type_e type, uint64_t init) { - neu_group_metrics_t *group_metrics = NULL; - if (NULL == adapter->metrics) { return -1; } - if (0 > neu_metrics_register_entry(name, help, type)) { - return -1; - } - - HASH_FIND_STR(adapter->metrics->group_metrics, group_name, group_metrics); - if (NULL == group_metrics) { - group_metrics = calloc(1, sizeof(*group_metrics)); - if (NULL == group_metrics) { - return -1; - } - group_metrics->name = strdup(group_name); - if (NULL == group_metrics->name) { - free(group_metrics); - return -1; - } - HASH_ADD_STR(adapter->metrics->group_metrics, name, group_metrics); - } - - if (0 > neu_metric_entries_add(&group_metrics->entries, name, help, type, - init)) { - return -1; - } - - return 0; + return neu_node_metrics_add(adapter->metrics, group_name, name, help, type, + init); } int neu_adapter_update_group_metric(neu_adapter_t *adapter, const char * group_name, const char *metric_name, uint64_t n) { - neu_metric_entry_t * entry = NULL; - neu_group_metrics_t *group_metrics = NULL; - if (NULL == adapter->metrics) { return -1; } - HASH_FIND_STR(adapter->metrics->group_metrics, group_name, group_metrics); - if (NULL == group_metrics) { - return -1; - } - - HASH_FIND_STR(group_metrics->entries, metric_name, entry); - if (NULL == entry) { - return -1; - } - - if (NEU_METRIC_TYPE_COUNTER == entry->type) { - entry->value += n; - } else if (NEU_METRIC_TYPE_ROLLING_COUNTER == entry->type) { - entry->value = - neu_rolling_counter_inc(entry->rcnt, global_timestamp, n); - } else { - entry->value = n; - } - - return 0; + return neu_node_metrics_update(adapter->metrics, group_name, metric_name, + n); } int neu_adapter_metric_update_group_name(neu_adapter_t *adapter, const char * group_name, const char * new_group_name) { - neu_group_metrics_t *group_metrics = NULL; - if (NULL == adapter->metrics) { return -1; } - HASH_FIND_STR(adapter->metrics->group_metrics, group_name, group_metrics); - if (NULL == group_metrics) { - return -1; - } - - char *name = strdup(new_group_name); - if (NULL == name) { - return -1; - } - - HASH_DEL(adapter->metrics->group_metrics, group_metrics); - free(group_metrics->name); - group_metrics->name = name; - HASH_ADD_STR(adapter->metrics->group_metrics, name, group_metrics); - - return 0; + return neu_node_metrics_update_group(adapter->metrics, group_name, + new_group_name); } void neu_adapter_del_group_metrics(neu_adapter_t *adapter, const char * group_name) { - if (NULL == adapter->metrics) { - return; - } - - neu_group_metrics_t *gm = NULL; - HASH_FIND_STR(adapter->metrics->group_metrics, group_name, gm); - if (NULL != gm) { - HASH_DEL(adapter->metrics->group_metrics, gm); - neu_metric_entry_t *e = NULL; - HASH_LOOP(hh, gm->entries, e) { neu_metrics_unregister_entry(e->name); } - neu_group_metrics_free(gm); + if (NULL != adapter->metrics) { + neu_node_metrics_del_group(adapter->metrics, group_name); } }