Skip to content

Commit

Permalink
Merge pull request #1849 from eeff/eeff/dev
Browse files Browse the repository at this point in the history
fix(restful): force clean stale rolling metrics
  • Loading branch information
fengzeroz authored Feb 19, 2024
2 parents e943fe0 + 14e7f6f commit f233851
Show file tree
Hide file tree
Showing 4 changed files with 238 additions and 143 deletions.
184 changes: 176 additions & 8 deletions include/neuron/metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,16 @@ extern "C" {

#include <stdint.h>

#include <pthread.h>

#include "define.h"
#include "type.h"
#include "utils/rolling_counter.h"
#include "utils/utextend.h"
#include "utils/uthash.h"

extern int64_t global_timestamp;

typedef enum {
NEU_METRIC_TYPE_COUNTER,
NEU_METRIC_TYPE_GAUAGE,
Expand Down Expand Up @@ -249,6 +254,7 @@ typedef struct {

// node metrics
typedef struct {
pthread_mutex_t lock; // spin lock
neu_node_type_e type; // node type
char * name; // node name
neu_metric_entry_t * entries; // node metric entries
Expand Down Expand Up @@ -285,6 +291,16 @@ typedef struct {
neu_metric_entry_t *registered_metrics;
} neu_metrics_t;

void neu_metrics_init();
void neu_metrics_add_node(const neu_adapter_t *adapter);
void neu_metrics_del_node(const neu_adapter_t *adapter);
int neu_metrics_register_entry(const char *name, const char *help,
neu_metric_type_e type);
void neu_metrics_unregister_entry(const char *name);

typedef void (*neu_metrics_cb_t)(const neu_metrics_t *metrics, void *data);
void neu_metrics_visist(neu_metrics_cb_t cb, void *data);

static inline const char *neu_metric_type_str(neu_metric_type_e type)
{
if (NEU_METRIC_TYPE_COUNTER == type) {
Expand Down Expand Up @@ -316,22 +332,41 @@ static inline void neu_group_metrics_free(neu_group_metrics_t *group_metrics)
HASH_ITER(hh, group_metrics->entries, e, tmp)
{
HASH_DEL(group_metrics->entries, e);
neu_metrics_unregister_entry(e->name);
neu_metric_entry_free(e);
}
free(group_metrics->name);
free(group_metrics);
}

static inline neu_node_metrics_t *
neu_node_metrics_new(neu_adapter_t *adapter, neu_node_type_e type, char *name)
{
neu_node_metrics_t *node_metrics =
(neu_node_metrics_t *) calloc(1, sizeof(*node_metrics));
if (NULL != node_metrics) {
pthread_mutex_init(&node_metrics->lock, NULL);
node_metrics->type = type;
node_metrics->name = name;
node_metrics->adapter = adapter;
// neu_metrics_add_node(adapter);
}
return node_metrics;
}

static inline void neu_node_metrics_free(neu_node_metrics_t *node_metrics)
{
if (NULL == node_metrics) {
return;
}

pthread_mutex_destroy(&node_metrics->lock);

neu_metric_entry_t *e = NULL, *etmp = NULL;
HASH_ITER(hh, node_metrics->entries, e, etmp)
{
HASH_DEL(node_metrics->entries, e);
neu_metrics_unregister_entry(e->name);
neu_metric_entry_free(e);
}

Expand All @@ -345,15 +380,148 @@ static inline void neu_node_metrics_free(neu_node_metrics_t *node_metrics)
free(node_metrics);
}

void neu_metrics_init();
void neu_metrics_add_node(const neu_adapter_t *adapter);
void neu_metrics_del_node(const neu_adapter_t *adapter);
int neu_metrics_register_entry(const char *name, const char *help,
neu_metric_type_e type);
void neu_metrics_unregister_entry(const char *name);
static inline int neu_node_metrics_add(neu_node_metrics_t *node_metrics,
const char *group_name, const char *name,
const char *help, neu_metric_type_e type,
uint64_t init)
{
int rv = 0;

typedef void (*neu_metrics_cb_t)(const neu_metrics_t *metrics, void *data);
void neu_metrics_visist(neu_metrics_cb_t cb, void *data);
if (0 > neu_metrics_register_entry(name, help, type)) {
return -1;
}

pthread_mutex_lock(&node_metrics->lock);
if (NULL == group_name) {
rv = neu_metric_entries_add(&node_metrics->entries, name, help, type,
init);
} else {
neu_group_metrics_t *group_metrics = NULL;
HASH_FIND_STR(node_metrics->group_metrics, group_name, group_metrics);
if (NULL != group_metrics) {
rv = neu_metric_entries_add(&group_metrics->entries, name, help,
type, init);
} else {
group_metrics =
(neu_group_metrics_t *) calloc(1, sizeof(*group_metrics));
if (NULL != group_metrics &&
NULL != (group_metrics->name = strdup(group_name))) {
HASH_ADD_STR(node_metrics->group_metrics, name, group_metrics);
rv = neu_metric_entries_add(&group_metrics->entries, name, help,
type, init);
} else {
free(group_metrics);
rv = -1;
}
}
}
pthread_mutex_unlock(&node_metrics->lock);

if (0 != rv) {
neu_metrics_unregister_entry(name);
}
return rv;
}

static inline int neu_node_metrics_update(neu_node_metrics_t *node_metrics,
const char * group,
const char *metric_name, uint64_t n)
{
neu_metric_entry_t *entry = NULL;

pthread_mutex_lock(&node_metrics->lock);
if (NULL == group) {
HASH_FIND_STR(node_metrics->entries, metric_name, entry);
} else if (NULL != node_metrics->group_metrics) {
neu_group_metrics_t *g = NULL;
HASH_FIND_STR(node_metrics->group_metrics, group, g);
if (NULL != g) {
HASH_FIND_STR(g->entries, metric_name, entry);
}
}

if (NULL == entry) {
pthread_mutex_unlock(&node_metrics->lock);
return -1;
}

if (NEU_METRIC_TYPE_COUNTER == entry->type) {
entry->value += n;
} else if (NEU_METRIC_TYPE_ROLLING_COUNTER == entry->type) {
entry->value =
neu_rolling_counter_inc(entry->rcnt, global_timestamp, n);
} else {
entry->value = n;
}
pthread_mutex_unlock(&node_metrics->lock);

return 0;
}

static inline void neu_node_metrics_reset(neu_node_metrics_t *node_metrics)
{
neu_metric_entry_t *entry = NULL;

pthread_mutex_lock(&node_metrics->lock);
HASH_LOOP(hh, node_metrics->entries, entry)
{
entry->value = 0;
if (NEU_METRIC_TYPE_ROLLING_COUNTER == entry->type) {
neu_rolling_counter_reset(entry->rcnt);
}
}

neu_group_metrics_t *g = NULL;
HASH_LOOP(hh, node_metrics->group_metrics, g)
{
HASH_LOOP(hh, g->entries, entry)
{
entry->value = 0;
if (NEU_METRIC_TYPE_ROLLING_COUNTER == entry->type) {
neu_rolling_counter_reset(entry->rcnt);
}
}
}
pthread_mutex_unlock(&node_metrics->lock);
}

static inline int
neu_node_metrics_update_group(neu_node_metrics_t *node_metrics,
const char * group_name,
const char * new_group_name)
{
int rv = -1;
neu_group_metrics_t *group_metrics = NULL;

pthread_mutex_lock(&node_metrics->lock);
HASH_FIND_STR(node_metrics->group_metrics, group_name, group_metrics);
if (NULL != group_metrics) {
char *name = strdup(new_group_name);
if (NULL != name) {
HASH_DEL(node_metrics->group_metrics, group_metrics);
free(group_metrics->name);
group_metrics->name = name;
HASH_ADD_STR(node_metrics->group_metrics, name, group_metrics);
rv = 0;
}
}
pthread_mutex_unlock(&node_metrics->lock);

return rv;
}

static inline void neu_node_metrics_del_group(neu_node_metrics_t *node_metrics,
const char * group_name)
{
neu_group_metrics_t *gm = NULL;
pthread_mutex_lock(&node_metrics->lock);
HASH_FIND_STR(node_metrics->group_metrics, group_name, gm);
if (NULL != gm) {
HASH_DEL(node_metrics->group_metrics, gm);
neu_group_metrics_free(gm);
}
pthread_mutex_unlock(&node_metrics->lock);
}

#ifdef __cplusplus
}
Expand Down
13 changes: 11 additions & 2 deletions include/neuron/utils/rolling_counter.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ extern "C" {
typedef struct {
uint64_t val; // accumulator
uint64_t ts; // head time stamp in milliseconds
uint32_t res : 22; // time resolution in milliseconds
uint32_t res : 21; // time resolution in milliseconds
uint32_t hd : 5; // head position
uint32_t n : 5; // number of counters
uint32_t n : 6; // number of counters
uint32_t counts[]; // bins of counters
} neu_rolling_counter_t;

Expand Down Expand Up @@ -90,6 +90,15 @@ static inline uint64_t neu_rolling_counter_inc(neu_rolling_counter_t *counter,
return counter->val;
}

/** Reset the counter.
*/
static inline void neu_rolling_counter_reset(neu_rolling_counter_t *counter)
{
counter->val = 0;
counter->hd = 0;
memset(counter->counts, 0, counter->n * sizeof(counter->counts[0]));
}

/** Return the counter value.
*
* NOTE: may return stale value if the counter is not updated frequent enough.
Expand Down
35 changes: 31 additions & 4 deletions plugins/restful/metric_handle.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@

#include <stdio.h>

#include <pthread.h>

#include "define.h"
#include "metrics.h"
#include "plugin.h"
#include "utils/asprintf.h"
#include "utils/http.h"
#include "utils/http_handler.h"
Expand Down Expand Up @@ -164,8 +167,8 @@ static inline void gen_global_metrics(const neu_metrics_t *metrics,
metrics->south_running_nodes, metrics->south_disconnected_nodes);
}

static inline void
gen_single_node_metrics(const neu_node_metrics_t *node_metrics, FILE *stream)
static inline void gen_single_node_metrics(neu_node_metrics_t *node_metrics,
FILE * stream)
{
fprintf(stream,
"# HELP node_type Driver(1) or APP(2)\n"
Expand All @@ -175,8 +178,13 @@ gen_single_node_metrics(const neu_node_metrics_t *node_metrics, FILE *stream)

neu_metric_entry_t *e = NULL;

pthread_mutex_lock(&node_metrics->lock);
HASH_LOOP(hh, node_metrics->entries, e)
{
if (NEU_METRIC_TYPE_ROLLING_COUNTER == e->type) {
// force clean stale value
e->value = neu_rolling_counter_inc(e->rcnt, global_timestamp, 0);
}
fprintf(stream,
"# HELP %s %s\n# TYPE %s %s\n%s{node=\"%s\"} %" PRIu64 "\n",
e->name, e->help, e->name, neu_metric_type_str(e->type),
Expand All @@ -188,13 +196,19 @@ gen_single_node_metrics(const neu_node_metrics_t *node_metrics, FILE *stream)
{
HASH_LOOP(hh, g->entries, e)
{
if (NEU_METRIC_TYPE_ROLLING_COUNTER == e->type) {
// force clean stale value
e->value =
neu_rolling_counter_inc(e->rcnt, global_timestamp, 0);
}
fprintf(stream,
"# HELP %s %s\n# TYPE %s %s\n%s{node=\"%s\",group=\"%s\"} "
"%" PRIu64 "\n",
e->name, e->help, e->name, neu_metric_type_str(e->type),
e->name, node_metrics->name, g->name, e->value);
}
}
pthread_mutex_unlock(&node_metrics->lock);
}

static inline bool has_entry(neu_node_metrics_t *node_metrics, const char *name)
Expand All @@ -216,7 +230,7 @@ static inline bool has_entry(neu_node_metrics_t *node_metrics, const char *name)
return false;
}

static void gen_all_node_metrics(const neu_metrics_t *metrics, int type_filter,
static void gen_all_node_metrics(neu_metrics_t *metrics, int type_filter,
FILE *stream)
{
neu_metric_entry_t * e = NULL, *r = NULL;
Expand Down Expand Up @@ -261,23 +275,36 @@ static void gen_all_node_metrics(const neu_metrics_t *metrics, int type_filter,
r->help, r->name, neu_metric_type_str(r->type));
}

pthread_mutex_lock(&n->lock);
HASH_FIND_STR(n->entries, r->name, e);
if (e) {
if (NEU_METRIC_TYPE_ROLLING_COUNTER == e->type) {
// force clean stale value
e->value =
neu_rolling_counter_inc(e->rcnt, global_timestamp, 0);
}
fprintf(stream, "%s{node=\"%s\"} %" PRIu64 "\n", e->name,
n->name, e->value);

pthread_mutex_unlock(&n->lock);
continue;
}

HASH_LOOP(hh, n->group_metrics, g)
{
HASH_FIND_STR(g->entries, r->name, e);
if (e) {
if (NEU_METRIC_TYPE_ROLLING_COUNTER == e->type) {
// force clean stale value
e->value = neu_rolling_counter_inc(e->rcnt,
global_timestamp, 0);
}
fprintf(stream,
"%s{node=\"%s\",group=\"%s\"} %" PRIu64 "\n",
e->name, n->name, g->name, e->value);
}
}
pthread_mutex_unlock(&n->lock);
}
}
}
Expand All @@ -289,7 +316,7 @@ struct context {
const char *node;
};

static void gen_node_metrics(const neu_metrics_t *metrics, struct context *ctx)
static void gen_node_metrics(neu_metrics_t *metrics, struct context *ctx)
{
if (ctx->node[0]) {
neu_node_metrics_t *n = NULL;
Expand Down
Loading

0 comments on commit f233851

Please sign in to comment.