Skip to content

Commit

Permalink
proxy: destroy used buffer on exit (#3888)
Browse files Browse the repository at this point in the history
Signed-off-by: Takahiro Yamashita <[email protected]>
  • Loading branch information
nokute78 authored Dec 12, 2021
1 parent dcb7fda commit 83686a5
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 32 deletions.
5 changes: 1 addition & 4 deletions include/fluent-bit/flb_plugin_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,7 @@ struct flb_plugin_proxy_def {
/* Proxy context */
struct flb_plugin_proxy {
/* Fields populated once remote flb_cb_register() is called */
int type; /* defined by FLB_PROXY_[INPUT|OUTPUT]_PLUGIN */
int proxy; /* proxy type */
char *name; /* plugin short name */
char *description; /* plugin description */
struct flb_plugin_proxy_def *def;

/* Internal */
struct flb_api *api; /* API context to export functions */
Expand Down
56 changes: 29 additions & 27 deletions src/flb_plugin_proxy.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ static void proxy_cb_flush(struct flb_event_chunk *event_chunk,


#ifdef FLB_HAVE_PROXY_GO
if (ctx->proxy->proxy == FLB_PROXY_GOLANG) {
if (ctx->proxy->def->proxy == FLB_PROXY_GOLANG) {
flb_trace("[GO] entering go_flush()");
ret = proxy_go_flush(ctx,
event_chunk->data,
Expand All @@ -69,26 +69,16 @@ static void proxy_cb_flush(struct flb_event_chunk *event_chunk,
FLB_OUTPUT_RETURN(ret);
}


static void flb_plugin_proxy_destroy(struct flb_plugin_proxy *proxy);
static int flb_proxy_cb_exit(void *data, struct flb_config *config)
{
struct flb_output_plugin *instance = data;
struct flb_plugin_proxy *proxy = (instance->proxy);
struct flbgo_output_plugin *plugin;
void *inst;

inst = proxy->data;

plugin = (struct flbgo_output_plugin *) inst;
flb_debug("[GO] running exit callback");

if (plugin->cb_exit_ctx) {
return plugin->cb_exit_ctx(plugin->context->remote_context);
}
else if (plugin->cb_exit) {
return plugin->cb_exit();
if (proxy->def->proxy == FLB_PROXY_GOLANG) {
proxy_go_destroy(proxy->data);
}

flb_plugin_proxy_destroy(proxy);
return 0;
}

Expand All @@ -108,8 +98,8 @@ static int flb_proxy_register_output(struct flb_plugin_proxy *proxy,
out->type = FLB_OUTPUT_PLUGIN_PROXY;
out->proxy = proxy;
out->flags = def->flags;
out->name = flb_strdup(def->name);
out->description = flb_strdup(def->description);
out->name = def->name;
out->description = def->description;
mk_list_add(&out->_head, &config->out_plugins);

/*
Expand Down Expand Up @@ -140,7 +130,7 @@ int flb_plugin_proxy_register(struct flb_plugin_proxy *proxy,
{
int ret;
int (*cb_register)(struct flb_plugin_proxy_def *);
struct flb_plugin_proxy_def *def;
struct flb_plugin_proxy_def *def = proxy->def;

/* Lookup the registration callback */
cb_register = flb_plugin_proxy_symbol(proxy, "FLBPluginRegister");
Expand All @@ -156,10 +146,6 @@ int flb_plugin_proxy_register(struct flb_plugin_proxy *proxy,
* - plugin name
* - plugin description
*/
def = flb_malloc(sizeof(struct flb_plugin_proxy_def));
if (!def) {
return -1;
}

/* Do the registration */
ret = cb_register(def);
Expand All @@ -184,7 +170,6 @@ int flb_plugin_proxy_register(struct flb_plugin_proxy *proxy,
* real link to the 'output' interface
*/
if (def->type == FLB_PROXY_OUTPUT_PLUGIN) {
proxy->proxy = def->proxy;
flb_proxy_register_output(proxy, def, config);
}
}
Expand All @@ -202,14 +187,14 @@ int flb_plugin_proxy_init(struct flb_plugin_proxy *proxy,
proxy->instance = o_ins;

/* Based on 'proxy', use the proper handler */
if (proxy->proxy == FLB_PROXY_GOLANG) {
if (proxy->def->proxy == FLB_PROXY_GOLANG) {
#ifdef FLB_HAVE_PROXY_GO
ret = proxy_go_init(proxy);
#endif
}
else {
fprintf(stderr, "[proxy] unrecognized proxy handler %i\n",
proxy->proxy);
proxy->def->proxy);
}

return ret;
Expand Down Expand Up @@ -245,8 +230,17 @@ struct flb_plugin_proxy *flb_plugin_proxy_create(const char *dso_path, int type,
return NULL;
}

proxy->def = flb_malloc(sizeof(struct flb_plugin_proxy_def));
if (!proxy->def) {
flb_errno();
dlclose(handle);
flb_api_destroy(proxy->api);
flb_free(proxy);
return NULL;
}

/* Set fields and add it to the list */
proxy->type = type;
proxy->def->type = type;
proxy->dso_handler = handle;
proxy->data = NULL;
mk_list_add(&proxy->_head, &config->proxies);
Expand All @@ -257,9 +251,17 @@ struct flb_plugin_proxy *flb_plugin_proxy_create(const char *dso_path, int type,
return proxy;
}

void flb_plugin_proxy_destroy(struct flb_plugin_proxy *proxy)
static void flb_plugin_proxy_destroy(struct flb_plugin_proxy *proxy)
{
/* cleanup */
void (*cb_unregister)(struct flb_plugin_proxy_def *def);

cb_unregister = flb_plugin_proxy_symbol(proxy, "FLBPluginUnregister");
if (cb_unregister != NULL) {
cb_unregister(proxy->def);
}
flb_free(proxy->def);
flb_api_destroy(proxy->api);
dlclose(proxy->dso_handler);
mk_list_del(&proxy->_head);
flb_free(proxy);
Expand Down
19 changes: 19 additions & 0 deletions src/proxy/go/go.c
Original file line number Diff line number Diff line change
Expand Up @@ -145,3 +145,22 @@ int proxy_go_flush(struct flb_plugin_proxy_context *ctx,
flb_free(buf);
return ret;
}

int proxy_go_destroy(void *data)
{
int ret = 0;
struct flbgo_output_plugin *plugin;

plugin = (struct flbgo_output_plugin *) data;
flb_debug("[GO] running exit callback");

if (plugin->cb_exit_ctx) {
ret = plugin->cb_exit_ctx(plugin->context->remote_context);
}
else if (plugin->cb_exit) {
ret = plugin->cb_exit();
}
flb_free(plugin->name);
flb_free(plugin);
return ret;
}
2 changes: 1 addition & 1 deletion src/proxy/go/go.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,5 @@ int proxy_go_init(struct flb_plugin_proxy *proxy);
int proxy_go_flush(struct flb_plugin_proxy_context *ctx,
const void *data, size_t size,
const char *tag, int tag_len);

int proxy_go_destroy(void *data);
#endif

0 comments on commit 83686a5

Please sign in to comment.