Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

proxy: added multithreaded worker support for proxy plugins #3863

Closed
wants to merge 10 commits into from
18 changes: 14 additions & 4 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -829,10 +829,16 @@ endif()
# ============================

set(CPACK_PACKAGE_VERSION ${FLB_VERSION_STR})
set(CPACK_PACKAGE_NAME "td-agent-bit")

if(FLB_TD)
set(CPACK_PACKAGE_NAME "td-agent-bit")
else()
set(CPACK_PACKAGE_NAME "fluent-bit")
endif()

set(CPACK_PACKAGE_RELEASE 1)
set(CPACK_PACKAGE_CONTACT "Eduardo Silva <eduardo@treasure-data.com>")
set(CPACK_PACKAGE_VENDOR "Treasure Data")
set(CPACK_PACKAGE_CONTACT "Eduardo Silva <eduardo@calyptia.com>")
set(CPACK_PACKAGE_VENDOR "Calyptia Inc.")
set(CPACK_RESOURCE_FILE_LICENSE "${PROJECT_SOURCE_DIR}/LICENSE")
set(CPACK_PACKAGING_INSTALL_PREFIX "/")

Expand Down Expand Up @@ -925,7 +931,11 @@ set(CPACK_DEBIAN_PACKAGE_SHLIBDEPS ON)
# CPack: Windows System
if(CPACK_GENERATOR MATCHES "NSIS")
set(CPACK_MONOLITHIC_INSTALL 1)
set(CPACK_PACKAGE_INSTALL_DIRECTORY "td-agent-bit")
if(FLB_TD)
set(CPACK_PACKAGE_INSTALL_DIRECTORY "td-agent-bit")
else()
set(CPACK_PACKAGE_INSTALL_DIRECTORY "fluent-bit")
endif()
endif()

include(CPack)
2 changes: 1 addition & 1 deletion plugins/out_s3/s3.c
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ static int init_seq_index(void *context) {
}

/* Create directory path if it doesn't exist */
ret = mkdir(ctx->metadata_dir, 0600);
ret = mkdir(ctx->metadata_dir, 0700);
if (ret < 0 && errno != EEXIST) {
flb_plg_error(ctx->ins, "Failed to create metadata directory");
return -1;
Expand Down
6 changes: 6 additions & 0 deletions plugins/out_s3/s3_multipart.c
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,12 @@ int complete_multipart_upload(struct flb_s3 *ctx,
struct flb_http_client *c = NULL;
struct flb_aws_client *s3_client;

if (!m_upload->upload_id) {
flb_plg_error(ctx->ins, "Cannot complete multipart upload for key %s: "
"upload ID is unset ", m_upload->s3_key);
return -1;
}

uri = flb_sds_create_size(flb_sds_len(m_upload->s3_key) + 11 +
flb_sds_len(m_upload->upload_id));
if (!uri) {
Expand Down
13 changes: 11 additions & 2 deletions src/flb_network.c
Original file line number Diff line number Diff line change
Expand Up @@ -602,12 +602,12 @@ static void flb_net_getaddrinfo_timeout_handler(struct flb_config *config, void

*(lookup_context->udp_timeout_detected) = 1;

ares_cancel(lookup_context->ares_channel);

if (lookup_context->ares_socket_created) {
mk_event_del(lookup_context->event_loop, &lookup_context->response_event);
}

ares_cancel(lookup_context->ares_channel);

flb_coro_resume(lookup_context->coroutine);

flb_net_dns_lookup_context_destroy(lookup_context);
Expand Down Expand Up @@ -771,6 +771,15 @@ int flb_net_getaddrinfo(const char *node, const char *service, struct addrinfo *
*/
timeout *= 1000;

/* We need to ensure that our timer won't overlap with the upstream timeout handler.
*/
if (timeout > 500) {
timeout -= 500;
}
else {
timeout /= 2;
}

ares_hints.ai_flags = hints->ai_flags;
ares_hints.ai_family = hints->ai_family;
ares_hints.ai_socktype = hints->ai_socktype;
Expand Down
41 changes: 30 additions & 11 deletions src/flb_output.c
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,21 @@ void flb_output_net_default(const char *host, const int port,
}
}

/* Add thread pool for output plugin if configured with workers */
int flb_output_enable_multi_threading(struct flb_output_instance *ins, struct flb_config *config)
{
/* Multi-threading enabled ? (through 'workers' property) */
if (ins->tp_workers > 0) {
if(flb_output_thread_pool_create(config, ins) != 0) {
flb_output_instance_destroy(ins);
return -1;
}
flb_output_thread_pool_start(ins);
}

return 0;
}

/* Return an instance name or alias */
const char *flb_output_name(struct flb_output_instance *ins)
{
Expand Down Expand Up @@ -856,6 +871,15 @@ int flb_output_init_all(struct flb_config *config)
flb_output_instance_destroy(ins);
return -1;
}

/* Multi-threading enabled if configured */
ret = flb_output_enable_multi_threading(ins, config);
gautampunhani marked this conversation as resolved.
Show resolved Hide resolved
if (ret == -1) {
flb_error("[output] could not start thread pool for '%s' plugin",
p->name);
return -1;
}

continue;
}
#endif
Expand Down Expand Up @@ -958,17 +982,12 @@ int flb_output_init_all(struct flb_config *config)
return -1;
}

/* Multi-threading enabled ? (through 'workers' property) */
if (ins->tp_workers > 0) {
ret = flb_output_thread_pool_create(config, ins);
if (ret == -1) {
flb_error("[output] could not start thread pool for '%s' plugin",
p->name);
flb_output_instance_destroy(ins);
return -1;
}

flb_output_thread_pool_start(ins);
/* Multi-threading enabled if configured */
ret = flb_output_enable_multi_threading(ins, config);
if (ret == -1) {
flb_error("[output] could not start thread pool for '%s' plugin",
p->name);
return -1;
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/flb_pack_gelf.c
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,8 @@ flb_sds_t flb_msgpack_to_gelf(flb_sds_t *s, msgpack_object *o,
}
*s = tmp;

tmp = flb_sds_printf(s, "%" PRIu32".%lu",
/* gelf supports milliseconds */
tmp = flb_sds_printf(s, "%" PRIu32".%03lu",
tm->tm.tv_sec, tm->tm.tv_nsec / 1000000);
if (tmp == NULL) {
return NULL;
Expand Down
1 change: 1 addition & 0 deletions src/flb_regex.c
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ ssize_t flb_regex_do(struct flb_regex *r, const char *str, size_t slen,

region = onig_region_new();
if (!region) {
flb_errno();
result->region = NULL;
return -1;
}
Expand Down
5 changes: 2 additions & 3 deletions src/multiline/flb_ml.c
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,6 @@ static int ml_append_try_parser(struct flb_ml_parser_ins *parser,
/* Parse incoming content */
ret = flb_parser_do(parser->ml_parser->parser, (char *) buf, size,
&out_buf, &out_size, &out_time);

if (flb_time_to_double(&out_time) == 0.0) {
flb_time_copy(&out_time, tm);
}
Expand Down Expand Up @@ -623,7 +622,8 @@ int flb_ml_append(struct flb_ml *ml, uint64_t stream_id,

mk_list_foreach(head_group, &group->parsers) {
parser_i = mk_list_entry(head_group, struct flb_ml_parser_ins, _head);
if (lru_parser && parser_i == lru_parser) {
if (lru_parser && lru_parser == parser_i &&
lru_parser->last_stream_id == stream_id) {
continue;
}

Expand All @@ -639,7 +639,6 @@ int flb_ml_append(struct flb_ml *ml, uint64_t stream_id,
else {
parser_i = NULL;
}

}

if (!processed) {
Expand Down
47 changes: 46 additions & 1 deletion tests/internal/gelf.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,52 @@ void test_gelf_pack()
msgpack_sbuffer_destroy(&mp_sbuf);
}

#define EXPECTED_OUT_MSEC \
"{\"version\":\"1.1\", \"short_message\":\"true, 2019, str\", \"_t2\":\"false\", \"timestamp\":337647600.012}"

/* https://github.com/fluent/fluent-bit/issues/3727 */
void test_gelf_pack_msec()
{
msgpack_sbuffer mp_sbuf;
msgpack_packer mp_pck;
struct flb_time ts;
struct flb_gelf_fields fields = {0};
flb_sds_t out;

/* Pack sample msgpack */
msgpack_sbuffer_init(&mp_sbuf);
msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);


ts.tm.tv_sec = 337647600;
ts.tm.tv_nsec = 12341111; /* 12.34msec */

msgpack_pack_map(&mp_pck, 2);
msgpack_pack_str(&mp_pck, 2);
msgpack_pack_str_body(&mp_pck, "t1", 2);
msgpack_pack_array(&mp_pck, 3);
msgpack_pack_true(&mp_pck);
msgpack_pack_uint64(&mp_pck, 2019);
msgpack_pack_str(&mp_pck, 3);
msgpack_pack_str_body(&mp_pck, "str", 3);
msgpack_pack_str(&mp_pck, 2);
msgpack_pack_str_body(&mp_pck, "t2", 2);
msgpack_pack_false(&mp_pck);

fields.short_message_key = flb_sds_create("t1");
out = flb_msgpack_raw_to_gelf(mp_sbuf.data, mp_sbuf.size, &ts, &fields);
TEST_CHECK(out != NULL);

if(!TEST_CHECK(strcmp(out, EXPECTED_OUT_MSEC) == 0)) {
TEST_MSG("out=%s", out);
}
flb_sds_destroy(out);
flb_sds_destroy(fields.short_message_key);
msgpack_sbuffer_destroy(&mp_sbuf);
}

TEST_LIST = {
{"gelf_pack", test_gelf_pack},
{"gelf_pack", test_gelf_pack},
{"gelf_pack_msec", test_gelf_pack_msec},
{ 0 }
};