diff --git a/lib/logproto/CMakeLists.txt b/lib/logproto/CMakeLists.txt index 7348b63824..7c9de7dcd4 100644 --- a/lib/logproto/CMakeLists.txt +++ b/lib/logproto/CMakeLists.txt @@ -11,6 +11,7 @@ set(LOGPROTO_HEADERS logproto/logproto-server.h logproto/logproto-text-client.h logproto/logproto-text-server.h + logproto/logproto-auto-server.h PARENT_SCOPE) set(LOGPROTO_SOURCES @@ -25,6 +26,7 @@ set(LOGPROTO_SOURCES logproto/logproto-server.c logproto/logproto-text-client.c logproto/logproto-text-server.c + logproto/logproto-auto-server.c PARENT_SCOPE) add_test_subdirectory(tests) diff --git a/lib/logproto/Makefile.am b/lib/logproto/Makefile.am index da02d8f723..0fa2b1839c 100644 --- a/lib/logproto/Makefile.am +++ b/lib/logproto/Makefile.am @@ -11,6 +11,7 @@ logprotoinclude_HEADERS = \ lib/logproto/logproto-framed-server.h \ lib/logproto/logproto-text-client.h \ lib/logproto/logproto-text-server.h \ + lib/logproto/logproto-auto-server.h \ lib/logproto/logproto-multiline-server.h \ lib/logproto/logproto-record-server.h \ lib/logproto/logproto-builtins.h \ @@ -25,6 +26,7 @@ logproto_sources = \ lib/logproto/logproto-framed-server.c \ lib/logproto/logproto-text-client.c \ lib/logproto/logproto-text-server.c \ + lib/logproto/logproto-auto-server.c \ lib/logproto/logproto-multiline-server.c \ lib/logproto/logproto-record-server.c \ lib/logproto/logproto-builtins.c diff --git a/lib/logproto/logproto-auto-server.c b/lib/logproto/logproto-auto-server.c new file mode 100644 index 0000000000..3abd79953d --- /dev/null +++ b/lib/logproto/logproto-auto-server.c @@ -0,0 +1,256 @@ +/* + * Copyright (c) 2024 Balázs Scheidler + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * As an additional exemption you are allowed to compile & link against the + * OpenSSL libraries as published by the OpenSSL project. See the file + * COPYING for details. + * + */ +#include "logproto-auto-server.h" +#include "logproto-text-server.h" +#include "logproto-framed-server.h" +#include "messages.h" + +#include "transport/transport-haproxy.h" + +enum +{ + LPAS_FAILURE, + LPAS_NEED_MORE_DATA, + LPAS_SUCCESS, +}; + +typedef struct _LogProtoAutoServer +{ + LogProtoServer super; + + gboolean tls_detected; + gboolean haproxy_detected; +} LogProtoAutoServer; + +static LogProtoServer * +_construct_detected_proto(LogProtoAutoServer *self, const gchar *detect_buffer, gsize detect_buffer_len) +{ + gint fd = self->super.transport_stack.fd; + + if (g_ascii_isdigit(detect_buffer[0])) + { + msg_debug("Auto-detected octet-counted-framing on RFC6587 connection, using framed protocol", + evt_tag_int("fd", fd)); + return log_proto_framed_server_new(NULL, self->super.options); + } + if (detect_buffer[0] == '<') + { + msg_debug("Auto-detected non-transparent-framing on RFC6587 connection, using simple text protocol", + evt_tag_int("fd", fd)); + } + else + { + msg_debug("Unable to detect framing on RFC6587 connection, falling back to simple text transport", + evt_tag_int("fd", fd), + evt_tag_mem("detect_buffer", detect_buffer, detect_buffer_len)); + } + return log_proto_text_server_new(NULL, self->super.options); +} + +static gint +_is_tls_client_hello(const gchar *buf, gsize buf_len) +{ + /* + * The first message on a TLS connection must be the client_hello, which + * is a type of handshake record, and it cannot be compressed or + * encrypted. A plaintext record has this format: + * + * 0 byte record_type // 0x16 = handshake + * 1 byte major // major protocol version + * 2 byte minor // minor protocol version + * 3-4 uint16 length // size of the payload + * 5 byte handshake_type // 0x01 = client_hello + * 6 uint24 length // size of the ClientHello + * 9 byte major // major protocol version + * 10 byte minor // minor protocol version + * 11 uint32 gmt_unix_time + * 15 byte random_bytes[28] + * ... + */ + if (buf_len < 1) + return LPAS_NEED_MORE_DATA; + + /* 0x16 indicates a TLS handshake */ + if (buf[0] != 0x16) + return LPAS_FAILURE; + + if (buf_len < 5) + return LPAS_NEED_MORE_DATA; + + guint32 record_len = (buf[3] << 8) + buf[4]; + + /* client_hello is at least 34 bytes */ + if (record_len < 34) + return LPAS_FAILURE; + + if (buf_len < 6) + return LPAS_NEED_MORE_DATA; + + /* is the handshake_type 0x01 == client_hello */ + if (buf[5] != 0x01) + return FALSE; + + if (buf_len < 9) + return LPAS_NEED_MORE_DATA; + + guint32 payload_size = (buf[6] << 16) + (buf[7] << 8) + buf[8]; + + /* The message payload can't be bigger than the enclosing record */ + if (payload_size + 4 > record_len) + return LPAS_FAILURE; + return LPAS_SUCCESS; +} + +static gint +_is_haproxy_header(const gchar *buf, gsize buf_len) +{ + const gchar proxy_v1_signature[] = { 0x50, 0x52, 0x4F, 0x58, 0x59, 0x20 }; + const gchar proxy_v2_signature[] = { 0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A }; + + if (buf_len < sizeof(proxy_v1_signature)) + return LPAS_NEED_MORE_DATA; + + if (memcmp(buf, proxy_v1_signature, sizeof(proxy_v1_signature)) == 0) + return LPAS_SUCCESS; + + if (buf_len < sizeof(proxy_v2_signature)) + return LPAS_NEED_MORE_DATA; + + if (memcmp(buf, proxy_v2_signature, sizeof(proxy_v2_signature)) != 0) + return LPAS_FAILURE; + + if (buf_len < sizeof(proxy_v2_signature) + 1) + return LPAS_NEED_MORE_DATA; + + if ((buf[sizeof(proxy_v2_signature)] & 0xf0) != 0x20) + return LPAS_FAILURE; + return LPAS_SUCCESS; +} + +static LogProtoPrepareAction +log_proto_auto_server_poll_prepare(LogProtoServer *s, GIOCondition *cond, gint *timeout G_GNUC_UNUSED) +{ + LogProtoAutoServer *self = (LogProtoAutoServer *) s; + + if (log_transport_stack_poll_prepare(&self->super.transport_stack, cond)) + return LPPA_FORCE_SCHEDULE_FETCH; + + if (*cond == 0) + *cond = G_IO_IN; + + return LPPA_POLL_IO; +} + +static LogProtoStatus +log_proto_auto_handshake(LogProtoServer *s, gboolean *handshake_finished, LogProtoServer **proto_replacement) +{ + LogProtoAutoServer *self = (LogProtoAutoServer *) s; + gchar detect_buffer[16]; + gboolean moved_forward; + gint rc; + + rc = log_transport_stack_read_ahead(&self->super.transport_stack, detect_buffer, sizeof(detect_buffer), &moved_forward); + if (rc == 0) + return LPS_EOF; + else if (rc < 0) + { + if (errno == EAGAIN) + return LPS_AGAIN; + return LPS_ERROR; + } + + if (!self->tls_detected) + { + switch (_is_tls_client_hello(detect_buffer, rc)) + { + case LPAS_NEED_MORE_DATA: + if (moved_forward) + return LPS_AGAIN; + break; + case LPAS_SUCCESS: + self->tls_detected = TRUE; + /* this is a TLS handshake! let's switch to TLS */ + if (log_transport_stack_switch(&self->super.transport_stack, LOG_TRANSPORT_TLS)) + { + msg_debug("TLS handshake detected, switching to TLS"); + return LPS_AGAIN; + } + else + { + msg_error("TLS handshake detected, unable to switch to TLS, no tls() options specified"); + return LPS_ERROR; + } + break; + default: + break; + } + } + if (!self->haproxy_detected) + { + switch (_is_haproxy_header(detect_buffer, rc)) + { + case LPAS_NEED_MORE_DATA: + if (moved_forward) + return LPS_AGAIN; + break; + case LPAS_SUCCESS: + self->haproxy_detected = TRUE; + + /* FIXME: make this a factory */ + log_transport_stack_add_transport(&self->super.transport_stack, LOG_TRANSPORT_HAPROXY, + log_transport_haproxy_new(self->super.transport_stack.active_transport, self->super.transport_stack.active_transport)); + + /* this is a haproxy header */ + if (log_transport_stack_switch(&self->super.transport_stack, LOG_TRANSPORT_HAPROXY)) + { + msg_debug("HAProxy header detected, switching to haproxy"); + return LPS_AGAIN; + } + else + { + msg_error("HAProxy header detected, but haproxy transport is not set up"); + return LPS_ERROR; + } + break; + default: + break; + + } + } + *proto_replacement = _construct_detected_proto(self, detect_buffer, rc); + return LPS_SUCCESS; +} + +LogProtoServer * +log_proto_auto_server_new(LogTransport *transport, const LogProtoServerOptions *options) +{ + LogProtoAutoServer *self = g_new0(LogProtoAutoServer, 1); + + /* we are not using our own transport stack, transport is to be passed to + * the LogProto implementation once we finished with detection */ + + log_proto_server_init(&self->super, transport, options); + self->super.handshake = log_proto_auto_handshake; + self->super.poll_prepare = log_proto_auto_server_poll_prepare; + return &self->super; +} diff --git a/lib/logproto/logproto-auto-server.h b/lib/logproto/logproto-auto-server.h new file mode 100644 index 0000000000..0872cd66e8 --- /dev/null +++ b/lib/logproto/logproto-auto-server.h @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2024 Balázs Scheidler + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * As an additional exemption you are allowed to compile & link against the + * OpenSSL libraries as published by the OpenSSL project. See the file + * COPYING for details. + * + */ +#ifndef LOGPROTO_AUTO_SERVER_H_INCLUDED +#define LOGPROTO_AUTO_SERVER_H_INCLUDED + +#include "logproto-server.h" + +LogProtoServer *log_proto_auto_server_new(LogTransport *transport, const LogProtoServerOptions *options); + +#endif diff --git a/lib/logproto/logproto-buffered-server.c b/lib/logproto/logproto-buffered-server.c index ac61b108c0..621cc0cc68 100644 --- a/lib/logproto/logproto-buffered-server.c +++ b/lib/logproto/logproto-buffered-server.c @@ -179,10 +179,9 @@ log_proto_buffered_server_apply_state(LogProtoBufferedServer *self, PersistEntry struct stat st; gint64 ofs = 0; LogProtoBufferedServerState *state; - LogTransport *transport = log_transport_stack_get_active(&self->super.transport_stack); gint fd; - fd = transport->fd; + fd = self->super.transport_stack.fd; self->persist_handle = handle; if (fstat(fd, &st) < 0) @@ -255,7 +254,7 @@ log_proto_buffered_server_apply_state(LogProtoBufferedServer *self, PersistEntry raw_buffer = g_alloca(state->raw_buffer_size); } - rc = log_transport_read(transport, raw_buffer, state->raw_buffer_size, NULL); + rc = log_transport_stack_read(&self->super.transport_stack, raw_buffer, state->raw_buffer_size, NULL); if (rc != state->raw_buffer_size) { msg_notice("Error re-reading buffer contents of the file to be continued, restarting from the beginning", @@ -584,12 +583,12 @@ log_proto_buffered_server_restart_with_state(LogProtoServer *s, PersistState *pe } LogProtoPrepareAction -log_proto_buffered_server_prepare(LogProtoServer *s, GIOCondition *cond, gint *timeout G_GNUC_UNUSED) +log_proto_buffered_server_poll_prepare(LogProtoServer *s, GIOCondition *cond, gint *timeout G_GNUC_UNUSED) { LogProtoBufferedServer *self = (LogProtoBufferedServer *) s; - LogTransport *transport = log_transport_stack_get_active(&self->super.transport_stack); - *cond = transport->cond; + if (log_transport_stack_poll_prepare(&self->super.transport_stack, cond)) + return LPPA_FORCE_SCHEDULE_FETCH; /* if there's no pending I/O in the transport layer, then we want to do a read */ if (*cond == 0) @@ -602,9 +601,7 @@ static gint log_proto_buffered_server_read_data_method(LogProtoBufferedServer *self, guchar *buf, gsize len, LogTransportAuxData *aux) { - LogTransport *transport = log_transport_stack_get_active(&self->super.transport_stack); - - return log_transport_read(transport, buf, len, aux); + return log_transport_stack_read(&self->super.transport_stack, buf, len, aux); } static void @@ -1081,7 +1078,7 @@ log_proto_buffered_server_init(LogProtoBufferedServer *self, LogTransport *trans const LogProtoServerOptions *options) { log_proto_server_init(&self->super, transport, options); - self->super.prepare = log_proto_buffered_server_prepare; + self->super.poll_prepare = log_proto_buffered_server_poll_prepare; self->super.fetch = log_proto_buffered_server_fetch; self->super.free_fn = log_proto_buffered_server_free_method; self->super.restart_with_state = log_proto_buffered_server_restart_with_state; diff --git a/lib/logproto/logproto-buffered-server.h b/lib/logproto/logproto-buffered-server.h index 55a92f9690..fbd5f52702 100644 --- a/lib/logproto/logproto-buffered-server.h +++ b/lib/logproto/logproto-buffered-server.h @@ -114,8 +114,8 @@ log_proto_buffered_server_cue_flush(LogProtoBufferedServer *self) self->flush_partial_message = TRUE; } -LogProtoPrepareAction log_proto_buffered_server_prepare(LogProtoServer *s, GIOCondition *cond, - gint *timeout G_GNUC_UNUSED); +LogProtoPrepareAction log_proto_buffered_server_poll_prepare(LogProtoServer *s, GIOCondition *cond, + gint *timeout G_GNUC_UNUSED); LogProtoBufferedServerState *log_proto_buffered_server_get_state(LogProtoBufferedServer *self); void log_proto_buffered_server_put_state(LogProtoBufferedServer *self); diff --git a/lib/logproto/logproto-builtins.c b/lib/logproto/logproto-builtins.c index d21eb1d1d1..e76b23769e 100644 --- a/lib/logproto/logproto-builtins.c +++ b/lib/logproto/logproto-builtins.c @@ -26,6 +26,7 @@ #include "logproto-text-server.h" #include "logproto-framed-client.h" #include "logproto-framed-server.h" +#include "logproto-auto-server.h" #include "plugin.h" #include "plugin-types.h" @@ -39,6 +40,7 @@ DEFINE_LOG_PROTO_SERVER(log_proto_text); DEFINE_LOG_PROTO_SERVER(log_proto_text_with_nuls); DEFINE_LOG_PROTO_CLIENT(log_proto_framed); DEFINE_LOG_PROTO_SERVER(log_proto_framed); +DEFINE_LOG_PROTO_SERVER(log_proto_auto); static Plugin framed_server_plugins[] = { @@ -50,6 +52,7 @@ static Plugin framed_server_plugins[] = LOG_PROTO_SERVER_PLUGIN(log_proto_text_with_nuls, "text-with-nuls"), LOG_PROTO_CLIENT_PLUGIN(log_proto_framed, "framed"), LOG_PROTO_SERVER_PLUGIN(log_proto_framed, "framed"), + LOG_PROTO_SERVER_PLUGIN(log_proto_auto, "auto"), }; void diff --git a/lib/logproto/logproto-client.h b/lib/logproto/logproto-client.h index f571b26f82..6d1b9c7d4c 100644 --- a/lib/logproto/logproto-client.h +++ b/lib/logproto/logproto-client.h @@ -67,8 +67,7 @@ struct _LogProtoClient LogProtoStatus status; const LogProtoClientOptions *options; LogTransportStack transport_stack; - /* FIXME: rename to something else */ - gboolean (*prepare)(LogProtoClient *s, gint *fd, GIOCondition *cond, gint *timeout); + gboolean (*poll_prepare)(LogProtoClient *s, gint *fd, GIOCondition *cond, gint *timeout); LogProtoStatus (*post)(LogProtoClient *s, LogMessage *logmsg, guchar *msg, gsize msg_len, gboolean *consumed); LogProtoStatus (*process_in)(LogProtoClient *s); LogProtoStatus (*flush)(LogProtoClient *s); @@ -124,9 +123,9 @@ log_proto_client_handshake(LogProtoClient *s, gboolean *handshake_finished) } static inline gboolean -log_proto_client_prepare(LogProtoClient *s, gint *fd, GIOCondition *cond, gint *timeout) +log_proto_client_poll_prepare(LogProtoClient *s, gint *fd, GIOCondition *cond, gint *timeout) { - gboolean result = s->prepare(s, fd, cond, timeout); + gboolean result = s->poll_prepare(s, fd, cond, timeout); if (!result && *timeout < 0) *timeout = s->options->idle_timeout; diff --git a/lib/logproto/logproto-framed-server.c b/lib/logproto/logproto-framed-server.c index 369dba2a7a..3da9c13537 100644 --- a/lib/logproto/logproto-framed-server.c +++ b/lib/logproto/logproto-framed-server.c @@ -66,12 +66,12 @@ typedef struct _LogProtoFramedServer } LogProtoFramedServer; static LogProtoPrepareAction -log_proto_framed_server_prepare(LogProtoServer *s, GIOCondition *cond, gint *timeout G_GNUC_UNUSED) +log_proto_framed_server_poll_prepare(LogProtoServer *s, GIOCondition *cond, gint *timeout G_GNUC_UNUSED) { LogProtoFramedServer *self = (LogProtoFramedServer *) s; - LogTransport *transport = log_transport_stack_get_active(&self->super.transport_stack); - *cond = transport->cond; + if (log_transport_stack_poll_prepare(&self->super.transport_stack, cond)) + return LPPA_FORCE_SCHEDULE_FETCH; /* there is a half message in our buffer so try to wait */ if (!self->half_message_in_buffer) @@ -97,7 +97,6 @@ static gboolean log_proto_framed_server_fetch_data(LogProtoFramedServer *self, gboolean *may_read, LogProtoStatus *status) { - LogTransport *transport = log_transport_stack_get_active(&self->super.transport_stack); gint rc; *status = LPS_SUCCESS; @@ -111,15 +110,16 @@ log_proto_framed_server_fetch_data(LogProtoFramedServer *self, gboolean *may_rea return FALSE; log_transport_aux_data_reinit(&self->buffer_aux); - rc = log_transport_read(transport, &self->buffer[self->buffer_end], self->buffer_size - self->buffer_end, - &self->buffer_aux); + rc = log_transport_stack_read(&self->super.transport_stack, + &self->buffer[self->buffer_end], self->buffer_size - self->buffer_end, + &self->buffer_aux); if (rc < 0) { if (errno != EAGAIN) { msg_error("Error reading RFC6587 style framed data", - evt_tag_int("fd", transport->fd), + evt_tag_int("fd", self->super.transport_stack.fd), evt_tag_error("error")); *status = LPS_ERROR; } @@ -134,7 +134,7 @@ log_proto_framed_server_fetch_data(LogProtoFramedServer *self, gboolean *may_rea if (rc == 0) { msg_trace("EOF occurred while reading", - evt_tag_int(EVT_TAG_FD, transport->fd)); + evt_tag_int(EVT_TAG_FD, self->super.transport_stack.fd)); *status = LPS_EOF; return FALSE; } @@ -430,7 +430,7 @@ log_proto_framed_server_new(LogTransport *transport, const LogProtoServerOptions LogProtoFramedServer *self = g_new0(LogProtoFramedServer, 1); log_proto_server_init(&self->super, transport, options); - self->super.prepare = log_proto_framed_server_prepare; + self->super.poll_prepare = log_proto_framed_server_poll_prepare; self->super.fetch = log_proto_framed_server_fetch; self->super.free_fn = log_proto_framed_server_free; self->half_message_in_buffer = FALSE; diff --git a/lib/logproto/logproto-record-server.c b/lib/logproto/logproto-record-server.c index ed72351b8f..c6f1f0ed17 100644 --- a/lib/logproto/logproto-record-server.c +++ b/lib/logproto/logproto-record-server.c @@ -54,17 +54,16 @@ static gint log_proto_record_server_read_data(LogProtoBufferedServer *s, guchar *buf, gsize len, LogTransportAuxData *aux) { LogProtoRecordServer *self = (LogProtoRecordServer *) s; - LogTransport *transport = log_transport_stack_get_active(&self->super.super.transport_stack); gint rc; /* assert that we have enough space in the buffer to read record_size bytes */ g_assert(len >= self->record_size); len = self->record_size; - rc = log_transport_read(transport, buf, len, aux); + rc = log_transport_stack_read(&s->super.transport_stack, buf, len, aux); if (rc > 0 && rc != self->record_size) { msg_error("Record size was set, and couldn't read enough bytes", - evt_tag_int(EVT_TAG_FD, transport->fd), + evt_tag_int(EVT_TAG_FD, s->super.transport_stack.fd), evt_tag_int("record_size", self->record_size), evt_tag_int("read", rc)); errno = EIO; diff --git a/lib/logproto/logproto-server.h b/lib/logproto/logproto-server.h index 76904143cc..cf0fbfeabd 100644 --- a/lib/logproto/logproto-server.h +++ b/lib/logproto/logproto-server.h @@ -83,13 +83,12 @@ struct _LogProtoServer AckTracker *ack_tracker; LogProtoServerWakeupCallback wakeup_callback; - /* FIXME: rename to something else */ - LogProtoPrepareAction (*prepare)(LogProtoServer *s, GIOCondition *cond, gint *timeout); + LogProtoPrepareAction (*poll_prepare)(LogProtoServer *s, GIOCondition *cond, gint *timeout); gboolean (*restart_with_state)(LogProtoServer *s, PersistState *state, const gchar *persist_name); LogProtoStatus (*fetch)(LogProtoServer *s, const guchar **msg, gsize *msg_len, gboolean *may_read, LogTransportAuxData *aux, Bookmark *bookmark); gboolean (*validate_options)(LogProtoServer *s); - LogProtoStatus (*handshake)(LogProtoServer *s, gboolean *handshake_finished); + LogProtoStatus (*handshake)(LogProtoServer *s, gboolean *handshake_finished, LogProtoServer **proto_replacement); void (*free_fn)(LogProtoServer *s); }; @@ -100,11 +99,19 @@ log_proto_server_validate_options(LogProtoServer *self) } static inline LogProtoStatus -log_proto_server_handshake(LogProtoServer *s, gboolean *handshake_finished) +log_proto_server_handshake(LogProtoServer *s, gboolean *handshake_finished, LogProtoServer **proto_replacement) { if (s->handshake) { - return s->handshake(s, handshake_finished); + LogProtoStatus status; + + g_assert(*proto_replacement == NULL); + status = s->handshake(s, handshake_finished, proto_replacement); + if (*proto_replacement) + { + g_assert(status == LPS_SUCCESS || status == LPS_AGAIN); + } + return status; } *handshake_finished = TRUE; return LPS_SUCCESS; @@ -117,9 +124,9 @@ log_proto_server_set_options(LogProtoServer *self, const LogProtoServerOptions * } static inline LogProtoPrepareAction -log_proto_server_prepare(LogProtoServer *s, GIOCondition *cond, gint *timeout) +log_proto_server_poll_prepare(LogProtoServer *s, GIOCondition *cond, gint *timeout) { - LogProtoPrepareAction result = s->prepare(s, cond, timeout); + LogProtoPrepareAction result = s->poll_prepare(s, cond, timeout); if (result == LPPA_POLL_IO && *timeout < 0) *timeout = s->options->idle_timeout; diff --git a/lib/logproto/logproto-text-client.c b/lib/logproto/logproto-text-client.c index 7b10e090b2..0b3c3560e3 100644 --- a/lib/logproto/logproto-text-client.c +++ b/lib/logproto/logproto-text-client.c @@ -27,13 +27,14 @@ #include static gboolean -log_proto_text_client_prepare(LogProtoClient *s, gint *fd, GIOCondition *cond, gint *timeout) +log_proto_text_client_poll_prepare(LogProtoClient *s, gint *fd, GIOCondition *cond, gint *timeout) { LogProtoTextClient *self = (LogProtoTextClient *) s; - LogTransport *transport = log_transport_stack_get_active(&self->super.transport_stack); - *fd = transport->fd; - *cond = transport->cond; + if (log_transport_stack_poll_prepare(&self->super.transport_stack, cond)) + return TRUE; + + *fd = self->super.transport_stack.fd; /* if there's no pending I/O in the transport layer, then we want to do a write */ if (*cond == 0) @@ -46,24 +47,26 @@ static LogProtoStatus log_proto_text_client_drop_input(LogProtoClient *s) { LogProtoTextClient *self = (LogProtoTextClient *) s; - LogTransport *transport = log_transport_stack_get_active(&self->super.transport_stack); guchar buf[1024]; gint rc = -1; do { - rc = log_transport_read(transport, buf, sizeof(buf), NULL); + rc = log_transport_stack_read(&self->super.transport_stack, buf, sizeof(buf), NULL); } while (rc > 0); if (rc == -1 && errno != EAGAIN) { - msg_error("Error reading data", evt_tag_int("fd", transport->fd), evt_tag_error("error")); + msg_error("Error reading data", + evt_tag_int("fd", self->super.transport_stack.fd), + evt_tag_error("error")); return LPS_ERROR; } else if (rc == 0) { - msg_error("EOF occurred while idle", evt_tag_int("fd", transport->fd)); + msg_error("EOF occurred while idle", + evt_tag_int("fd", self->super.transport_stack.fd)); return LPS_ERROR; } @@ -74,7 +77,6 @@ static LogProtoStatus log_proto_text_client_flush(LogProtoClient *s) { LogProtoTextClient *self = (LogProtoTextClient *) s; - LogTransport *transport = log_transport_stack_get_active(&self->super.transport_stack); gint rc; if (!self->partial) @@ -85,13 +87,13 @@ log_proto_text_client_flush(LogProtoClient *s) /* attempt to flush previously buffered data */ gint len = self->partial_len - self->partial_pos; - rc = log_transport_write(transport, &self->partial[self->partial_pos], len); + rc = log_transport_stack_write(&self->super.transport_stack, &self->partial[self->partial_pos], len); if (rc < 0) { if (errno != EAGAIN && errno != EINTR) { msg_error("I/O error occurred while writing", - evt_tag_int("fd", transport->fd), + evt_tag_int("fd", self->super.transport_stack.fd), evt_tag_error(EVT_TAG_OSERROR)); return LPS_ERROR; } @@ -194,7 +196,7 @@ void log_proto_text_client_init(LogProtoTextClient *self, LogTransport *transport, const LogProtoClientOptions *options) { log_proto_client_init(&self->super, transport, options); - self->super.prepare = log_proto_text_client_prepare; + self->super.poll_prepare = log_proto_text_client_poll_prepare; self->super.flush = log_proto_text_client_flush; if (options->drop_input) self->super.process_in = log_proto_text_client_drop_input; diff --git a/lib/logproto/logproto-text-server.c b/lib/logproto/logproto-text-server.c index 674fb686d4..438324fa80 100644 --- a/lib/logproto/logproto-text-server.c +++ b/lib/logproto/logproto-text-server.c @@ -27,12 +27,12 @@ #include LogProtoPrepareAction -log_proto_text_server_prepare_method(LogProtoServer *s, GIOCondition *cond, gint *timeout) +log_proto_text_server_poll_prepare_method(LogProtoServer *s, GIOCondition *cond, gint *timeout) { LogProtoTextServer *self = (LogProtoTextServer *) s; gboolean avail; - LogProtoPrepareAction action = log_proto_buffered_server_prepare(s, cond, timeout); + LogProtoPrepareAction action = log_proto_buffered_server_poll_prepare(s, cond, timeout); if (action != LPPA_POLL_IO) return action; @@ -304,7 +304,7 @@ void log_proto_text_server_init(LogProtoTextServer *self, LogTransport *transport, const LogProtoServerOptions *options) { log_proto_buffered_server_init(&self->super, transport, options); - self->super.super.prepare = log_proto_text_server_prepare_method; + self->super.super.poll_prepare = log_proto_text_server_poll_prepare_method; self->super.super.free_fn = log_proto_text_server_free; self->super.fetch_from_buffer = log_proto_text_server_fetch_from_buffer; self->super.flush = log_proto_text_server_flush; diff --git a/lib/logproto/logproto-text-server.h b/lib/logproto/logproto-text-server.h index fd5b36308c..b3b7754475 100644 --- a/lib/logproto/logproto-text-server.h +++ b/lib/logproto/logproto-text-server.h @@ -50,7 +50,7 @@ LogProtoServer *log_proto_text_with_nuls_server_new(LogTransport *transport, con void log_proto_text_server_free(LogProtoServer *self); void log_proto_text_server_init(LogProtoTextServer *self, LogTransport *transport, const LogProtoServerOptions *options); -LogProtoPrepareAction log_proto_text_server_prepare_method(LogProtoServer *s, GIOCondition *cond, gint *timeout); +LogProtoPrepareAction log_proto_text_server_poll_prepare_method(LogProtoServer *s, GIOCondition *cond, gint *timeout); static inline gboolean log_proto_text_server_validate_options_method(LogProtoServer *s) diff --git a/lib/logproto/tests/CMakeLists.txt b/lib/logproto/tests/CMakeLists.txt index c94913854a..018cd41be6 100644 --- a/lib/logproto/tests/CMakeLists.txt +++ b/lib/logproto/tests/CMakeLists.txt @@ -5,6 +5,7 @@ set(TEST_LOGPROTO_SOURCES test-text-server.c test-dgram-server.c test-framed-server.c + test-auto-server.c test-indented-multiline-server.c test-regexp-multiline-server.c) diff --git a/lib/logproto/tests/Makefile.am b/lib/logproto/tests/Makefile.am index 8dcb10a91d..b334a7094c 100644 --- a/lib/logproto/tests/Makefile.am +++ b/lib/logproto/tests/Makefile.am @@ -17,6 +17,7 @@ lib_logproto_tests_test_logproto_SOURCES = \ lib/logproto/tests/test-text-server.c \ lib/logproto/tests/test-dgram-server.c \ lib/logproto/tests/test-framed-server.c \ + lib/logproto/tests/test-auto-server.c \ lib/logproto/tests/test-indented-multiline-server.c \ lib/logproto/tests/test-regexp-multiline-server.c diff --git a/lib/logproto/tests/test-auto-server.c b/lib/logproto/tests/test-auto-server.c new file mode 100644 index 0000000000..c697d5495f --- /dev/null +++ b/lib/logproto/tests/test-auto-server.c @@ -0,0 +1,139 @@ +/* + * Copyright (c) 2012-2019 Balabit + * Copyright (c) 2012-2013 Balázs Scheidler + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * As an additional exemption you are allowed to compile & link against the + * OpenSSL libraries as published by the OpenSSL project. See the file + * COPYING for details. + * + */ + +#include +#include "libtest/mock-transport.h" +#include "libtest/proto_lib.h" +#include "libtest/msg_parse_lib.h" + +#include "logproto/logproto-auto-server.h" + +#include + +Test(log_proto, test_log_proto_initial_framing_too_long) +{ + LogProtoServer *proto; + + proto = log_proto_auto_server_new( + log_transport_mock_stream_new( + "100000000 too long\n", -1, + LTM_EOF), + get_inited_proto_server_options()); + + assert_proto_server_handshake_failure(&proto, LPS_SUCCESS); + assert_proto_server_fetch_failure(proto, LPS_ERROR, NULL); + log_proto_server_free(proto); +} + +Test(log_proto, test_log_proto_error_in_initial_frame) +{ + LogProtoServer *proto; + + proto = log_proto_auto_server_new( + log_transport_mock_stream_new( + LTM_INJECT_ERROR(EIO)), + get_inited_proto_server_options()); + + assert_proto_server_handshake_failure(&proto, LPS_ERROR); + log_proto_server_free(proto); +} + +Test(log_proto, test_log_proto_auto_server_no_framing) +{ + LogProtoServer *proto; + + proto = log_proto_auto_server_new( + log_transport_mock_stream_new( + "abcdefghijklmnopqstuvwxyz\n", -1, + "01234567\n", -1, + "01234567\0", 9, + "abcdef", -1, + LTM_EOF), + get_inited_proto_server_options()); + + assert_proto_server_handshake(&proto); + assert_proto_server_fetch(proto, "abcdefghijklmnopqstuvwxyz", -1); + assert_proto_server_fetch(proto, "01234567", -1); + assert_proto_server_fetch(proto, "01234567", 8); + assert_proto_server_fetch(proto, "abcdef", -1); + assert_proto_server_fetch_failure(proto, LPS_EOF, NULL); + log_proto_server_free(proto); +} + +Test(log_proto, test_log_proto_auto_server_opening_bracket) +{ + LogProtoServer *proto; + + proto = log_proto_auto_server_new( + log_transport_mock_stream_new( + "<55> abcdefghijklmnopqstuvwxyz\n", -1, + "01234567\n", -1, + "01234567\0", 9, + "abcdef", -1, + LTM_EOF), + get_inited_proto_server_options()); + + assert_proto_server_handshake(&proto); + assert_proto_server_fetch(proto, "<55> abcdefghijklmnopqstuvwxyz", -1); + assert_proto_server_fetch(proto, "01234567", -1); + assert_proto_server_fetch(proto, "01234567", 8); + assert_proto_server_fetch(proto, "abcdef", -1); + assert_proto_server_fetch_failure(proto, LPS_EOF, NULL); + log_proto_server_free(proto); +} + +Test(log_proto, test_log_proto_auto_server_with_framing) +{ + LogProtoServer *proto; + + proto = log_proto_auto_server_new( + log_transport_mock_stream_new( + "32 0123456789ABCDEF0123456789ABCDEF", -1, + "10 01234567\n\n", -1, + "10 01234567\0\0", 13, + /* utf8 */ + "30 árvíztűrőtükörfúrógép", -1, + /* iso-8859-2 */ + "21 \xe1\x72\x76\xed\x7a\x74\xfb\x72\xf5\x74\xfc\x6b\xf6\x72\x66\xfa" /* |árvíztűrőtükörfú| */ + "\x72\xf3\x67\xe9\x70", -1, /* |rógép| */ + /* ucs4 */ + "32 \x00\x00\x00\xe1\x00\x00\x00\x72\x00\x00\x00\x76\x00\x00\x00\xed" /* |...á...r...v...í| */ + "\x00\x00\x00\x7a\x00\x00\x00\x74\x00\x00\x01\x71\x00\x00\x00\x72", 35, /* |...z...t...ű...r| */ + LTM_EOF), + get_inited_proto_server_options()); + + assert_proto_server_handshake(&proto); + assert_proto_server_fetch(proto, "0123456789ABCDEF0123456789ABCDEF", -1); + assert_proto_server_fetch(proto, "01234567\n\n", -1); + assert_proto_server_fetch(proto, "01234567\0\0", 10); + assert_proto_server_fetch(proto, "árvíztűrőtükörfúrógép", -1); + assert_proto_server_fetch(proto, + "\xe1\x72\x76\xed\x7a\x74\xfb\x72\xf5\x74\xfc\x6b\xf6\x72\x66\xfa" /* |.rv.zt.r.t.k.rf.| */ + "\x72\xf3\x67\xe9\x70", -1); /* |r.g.p| */ + assert_proto_server_fetch(proto, + "\x00\x00\x00\xe1\x00\x00\x00\x72\x00\x00\x00\x76\x00\x00\x00\xed" /* |...á...r...v...í| */ + "\x00\x00\x00\x7a\x00\x00\x00\x74\x00\x00\x01\x71\x00\x00\x00\x72", 32); /* |...z...t...q...r| */ + assert_proto_server_fetch_failure(proto, LPS_EOF, NULL); + log_proto_server_free(proto); +} diff --git a/lib/logreader.c b/lib/logreader.c index 35fd7de285..eb5a4d69df 100644 --- a/lib/logreader.c +++ b/lib/logreader.c @@ -314,7 +314,7 @@ log_reader_update_watches(LogReader *self) return; } - LogProtoPrepareAction prepare_action = log_proto_server_prepare(self->proto, &cond, &idle_timeout); + LogProtoPrepareAction prepare_action = log_proto_server_poll_prepare(self->proto, &cond, &idle_timeout); if (idle_timeout > 0) { @@ -426,7 +426,15 @@ static inline gint log_reader_process_handshake(LogReader *self) { gboolean handshake_finished = FALSE; - LogProtoStatus status = log_proto_server_handshake(self->proto, &handshake_finished); + LogProtoServer *proto_replacement = NULL; + LogProtoStatus status = log_proto_server_handshake(self->proto, &handshake_finished, &proto_replacement); + + if (proto_replacement) + { + log_transport_stack_move(&proto_replacement->transport_stack, &self->proto->transport_stack); + log_proto_server_free(self->proto); + self->proto = proto_replacement; + } switch (status) { diff --git a/lib/logwriter.c b/lib/logwriter.c index 236b046837..d73c5aa6c1 100644 --- a/lib/logwriter.c +++ b/lib/logwriter.c @@ -478,7 +478,7 @@ log_writer_update_watches(LogWriter *self) /* NOTE: we either start the suspend_timer or enable the fd_watch. The two MUST not happen at the same time. */ - if (log_proto_client_prepare(self->proto, &fd, &cond, &idle_timeout) || + if (log_proto_client_poll_prepare(self->proto, &fd, &cond, &idle_timeout) || self->waiting_for_throttle || log_queue_check_items(self->queue, &timeout_msec, (LogQueuePushNotifyFunc) log_writer_schedule_update_watches, self, NULL)) @@ -540,7 +540,7 @@ log_writer_start_watches(LogWriter *self) if (self->watches_running) return; - log_proto_client_prepare(self->proto, &fd, &cond, &idle_timeout); + log_proto_client_poll_prepare(self->proto, &fd, &cond, &idle_timeout); self->fd_watch.fd = fd; diff --git a/lib/transport/logtransport.c b/lib/transport/logtransport.c index 61fd5f11b4..6761214312 100644 --- a/lib/transport/logtransport.c +++ b/lib/transport/logtransport.c @@ -27,6 +27,99 @@ #include +gssize +_log_transport_combined_read_with_read_ahead(LogTransport *self, + gpointer buf, gsize count, + LogTransportAuxData *aux) +{ + gsize ra_left = self->ra.buf_len - self->ra.pos; + gsize ra_count = count <= ra_left ? count : ra_left; + + if (ra_count > 0) + { + /* prepend data from read ahead buffer */ + memcpy(buf, &self->ra.buf[self->ra.pos], ra_count); + self->ra.pos += ra_count; + if (self->ra.pos < self->ra.buf_len) + { + return ra_count; + } + } + else + { + self->ra.buf_len = self->ra.pos = 0; + errno = EAGAIN; + return -1; + } + + buf = ((gchar *) buf) + ra_count; + count -= ra_count; + + if (count > 0) + { + /* need to read more */ + gssize rc = self->read(self, buf, count, aux); + if (rc < 0) + { + if (errno == EAGAIN) + return ra_count; + /* error, we put the bytes back to our read_ahead.buf */ + self->ra.pos -= ra_count; + return rc; + } + else + return rc + ra_count; + } + else + return ra_count; +} + + +/* NOTE: this would repeat the entire read operation if you invoke it + * multiple times. The maximum size of read_ahead is limited by the size of + * self->ra.buf[] + */ +gssize +log_transport_read_ahead(LogTransport *self, gpointer buf, gsize buflen, gboolean *moved_forward) +{ + gsize buffer_space = MIN(buflen, sizeof(self->ra.buf)); + gsize count = buffer_space > self->ra.buf_len ? buffer_space - self->ra.buf_len : 0; + gint rc = 0; + + g_assert(buflen <= sizeof(self->ra.buf)); + + /* read at the end of the read_ahead buffer */ + if (count > 0) + { + rc = self->read(self, + &self->ra.buf[self->ra.buf_len], + count, + NULL); + + if (rc < 0) + { + if (moved_forward) + *moved_forward = FALSE; + return rc; + } + } + + if (moved_forward) + *moved_forward = rc > 0; + + self->ra.buf_len += rc; + + if (self->ra.buf_len > 0) + { + rc = MIN(self->ra.buf_len, buflen); + memcpy(buf, self->ra.buf, rc); + return rc; + } + + return 0; +} + + void log_transport_free_method(LogTransport *s) { diff --git a/lib/transport/logtransport.h b/lib/transport/logtransport.h index c6d7e00f32..d73f081634 100644 --- a/lib/transport/logtransport.h +++ b/lib/transport/logtransport.h @@ -28,19 +28,61 @@ #include "syslog-ng.h" #include "transport/transport-aux-data.h" -typedef struct _LogTransport LogTransport; +/* + * LogTransport: + * + * This is an interface that a LogProto implementation can use to do I/O. + * There might be multiple LogTransport implementations alive for a specific + * connection: for instance we might do both plain text and SSL encrypted + * communication on the same socket, when the haproxy proxy protocol is in + * use and SSL is enabled. It might also make sense to instantiate a + * transport doing gzip compression transparently. + * + * The combination of interoperating LogTransport instances is called the + * LogTransportStack (see transport-stack.h) + * + * There's a circular, borrowed reference between the stack and the + * constituent LogTransport instances. + */ +typedef struct _LogTransport LogTransport; +typedef struct _LogTransportStack LogTransportStack; struct _LogTransport { gint fd; GIOCondition cond; - const gchar *name; + gssize (*read)(LogTransport *self, gpointer buf, gsize count, LogTransportAuxData *aux); gssize (*write)(LogTransport *self, const gpointer buf, gsize count); gssize (*writev)(LogTransport *self, struct iovec *iov, gint iov_count); void (*free_fn)(LogTransport *self); + + /* read ahead */ + struct + { + gchar buf[16]; + gint buf_len; + gint pos; + } ra; + LogTransportStack *stack; + const gchar *name; }; +static inline gboolean +log_transport_poll_prepare(LogTransport *self, GIOCondition *cond) +{ + if (self->ra.buf_len != self->ra.pos) + return TRUE; + *cond = self->cond; + return FALSE; +} + +static inline void +log_transport_assign_to_stack(LogTransport *self, LogTransportStack *stack) +{ + self->stack = stack; +} + static inline gssize log_transport_write(LogTransport *self, const gpointer buf, gsize count) { @@ -53,12 +95,21 @@ log_transport_writev(LogTransport *self, struct iovec *iov, gint iov_count) return self->writev(self, iov, iov_count); } +gssize _log_transport_combined_read_with_read_ahead(LogTransport *self, + gpointer buf, gsize count, + LogTransportAuxData *aux); + static inline gssize log_transport_read(LogTransport *self, gpointer buf, gsize count, LogTransportAuxData *aux) { - return self->read(self, buf, count, aux); + if (G_LIKELY(self->ra.buf_len == 0)) + return self->read(self, buf, count, aux); + + return _log_transport_combined_read_with_read_ahead(self, buf, count, aux); } +gssize log_transport_read_ahead(LogTransport *self, gpointer buf, gsize count, gboolean *moved_forward); + void log_transport_init_instance(LogTransport *s, const gchar *name, gint fd); void log_transport_free_method(LogTransport *s); void log_transport_free(LogTransport *s); diff --git a/lib/transport/tests/Makefile.am b/lib/transport/tests/Makefile.am index 09d88a914d..545c8712cf 100644 --- a/lib/transport/tests/Makefile.am +++ b/lib/transport/tests/Makefile.am @@ -1,5 +1,6 @@ lib_transport_tests_TESTS = \ lib/transport/tests/test_aux_data \ + lib/transport/tests/test_transport \ lib/transport/tests/test_transport_stack \ lib/transport/tests/test_transport_haproxy @@ -13,6 +14,12 @@ lib_transport_tests_test_aux_data_LDADD = $(TEST_LDADD) lib_transport_tests_test_aux_data_SOURCES = \ lib/transport/tests/test_aux_data.c +lib_transport_tests_test_transport_CFLAGS = $(TEST_CFLAGS) \ + -I${top_srcdir}/lib/transport/tests +lib_transport_tests_test_transport_LDADD = $(TEST_LDADD) +lib_transport_tests_test_transport_SOURCES = \ + lib/transport/tests/test_transport.c + lib_transport_tests_test_transport_stack_CFLAGS = $(TEST_CFLAGS) \ -I${top_srcdir}/lib/transport/tests lib_transport_tests_test_transport_stack_LDADD = $(TEST_LDADD) diff --git a/lib/transport/tests/test_transport.c b/lib/transport/tests/test_transport.c new file mode 100644 index 0000000000..2ac0053dc0 --- /dev/null +++ b/lib/transport/tests/test_transport.c @@ -0,0 +1,160 @@ +/* + * Copyright (c) 2024 Balazs Scheidler + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; either + * version 2.1 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * As an additional exemption you are allowed to compile & link against the + * OpenSSL libraries as published by the OpenSSL project. See the file + * COPYING for details. + * + */ + +#include +#include "libtest/mock-transport.h" + +#include "transport/logtransport.h" +#include "apphook.h" + +#include + +Test(transport, test_read_ahead_invokes_only_one_read_operation) +{ + /* this will result in single-byte reads */ + LogTransport *t = log_transport_mock_stream_new("readahead", -1, LTM_EOF); + + gchar buf[12] = {0}; + gboolean moved_forward; + gint rc; + + for (gint i = 1; i <= 9; i++) + { + memset(buf, 0, sizeof(buf)); + rc = log_transport_read_ahead(t, buf, 9, &moved_forward); + cr_assert(moved_forward == TRUE); + cr_assert(rc == i, "unexpected rc = %d", rc); + cr_assert(strncmp(buf, "readahead", i) == 0); + } + rc = log_transport_read_ahead(t, buf, 10, &moved_forward); + cr_assert(rc == 9, "unexpected rc = %d", rc); + cr_assert(moved_forward == FALSE); + + /* the read() returns the bytes that were read in advance */ + + memset(buf, 0, sizeof(buf)); + rc = log_transport_read(t, buf, 9, NULL); + cr_assert(rc == 9, "unexpected rc = %d", rc); + cr_assert_str_eq(buf, "readahead"); + +} + +Test(transport, test_read_ahead_bytes_get_shifted_into_the_actual_read) +{ + LogTransport *t = log_transport_mock_records_new("readahead", -1, LTM_EOF); + + gchar buf[12] = {0}; + gboolean moved_forward; + memset(buf, 0, sizeof(buf)); + gint rc = log_transport_read_ahead(t, buf, 4, &moved_forward); + cr_assert(rc == 4, "unexpected rc = %d", rc); + cr_assert_str_eq(buf, "read"); + + /* the read() returns the bytes that were read in advance */ + + memset(buf, 0, sizeof(buf)); + rc = log_transport_read(t, buf, 4, NULL); + cr_assert(rc == 4, "unexpected rc = %d", rc); + cr_assert_str_eq(buf, "read"); + +} + +Test(transport, test_read_ahead_bytes_and_new_read_is_combined) +{ + LogTransport *t = log_transport_mock_records_new("readahead", -1, LTM_EOF); + + gboolean moved_forward; + gchar buf[12] = {0}; + memset(buf, 0, sizeof(buf)); + gint rc = log_transport_read_ahead(t, buf, 4, &moved_forward); + cr_assert(rc == 4, "unexpected rc = %d", rc); + cr_assert_str_eq(buf, "read"); + + /* NOTE: the mock will return only a single byte for every read to + * exercise retry mechanisms, so only read a single character here */ + + memset(buf, 0, sizeof(buf)); + rc = log_transport_read(t, buf, 5, NULL); + cr_assert(rc == 5, "unexpected rc = %d", rc); + cr_assert_str_eq(buf, "reada"); + +} + +Test(transport, test_read_ahead_returns_the_same_buffer_any_number_of_times) +{ + LogTransport *t = log_transport_mock_records_new("readahead", -1, LTM_EOF); + gboolean moved_forward; + + gchar buf[12]; + + memset(buf, 0, sizeof(buf)); + cr_assert(log_transport_read_ahead(t, buf, 1, &moved_forward) == 1); + cr_assert_str_eq(buf, "r"); + memset(buf, 0, sizeof(buf)); + cr_assert(log_transport_read_ahead(t, buf, 2, &moved_forward) == 2); + cr_assert_str_eq(buf, "re"); + memset(buf, 0, sizeof(buf)); + cr_assert(log_transport_read_ahead(t, buf, 8, &moved_forward) == 8); + cr_assert_str_eq(buf, "readahea"); + memset(buf, 0, sizeof(buf)); + cr_assert(log_transport_read_ahead(t, buf, 4, &moved_forward) == 4); + cr_assert_str_eq(buf, "read"); + + /* the read() returns the bytes that were read in advance */ + cr_assert(log_transport_read(t, buf, 9, NULL) == 9); + cr_assert_str_eq(buf, "readahead"); + +} + +Test(transport, test_read_ahead_more_than_the_internal_buffer, .signal = SIGABRT) +{ + LogTransport *t = log_transport_mock_records_new("12345678901234567890", -1, LTM_EOF); + gboolean moved_forward; + + /* 20 bytes, the internal look ahead buffer in LogTransport is 16 bytes which we are overflowing here */ + cr_assert(sizeof(t->ra.buf) == 16); + + gchar buf[32]; + + memset(buf, 0, sizeof(buf)); + cr_assert(log_transport_read_ahead(t, buf, 20, &moved_forward) == 20); +} + +Test(transport, test_read_ahead_with_packets_split_in_half) +{ + LogTransport *t = log_transport_mock_records_new("1234", -1, "5678", -1, LTM_EOF); + gboolean moved_forward; + + gchar buf[32]; + memset(buf, 0, sizeof(buf)); + gint rc = log_transport_read_ahead(t, buf, 8, &moved_forward); + cr_assert(rc == 4, "unexpected rc = %d", rc); + cr_assert_str_eq(buf, "1234"); + + memset(buf, 0, sizeof(buf)); + rc = log_transport_read_ahead(t, buf, 8, &moved_forward); + cr_assert(rc == 8, "unexpected rc = %d", rc); + cr_assert_str_eq(buf, "12345678"); +} + +TestSuite(transport, .init = app_startup, .fini = app_shutdown); diff --git a/lib/transport/tests/test_transport_haproxy.c b/lib/transport/tests/test_transport_haproxy.c index 7ec90e3fa2..cca26b1201 100644 --- a/lib/transport/tests/test_transport_haproxy.c +++ b/lib/transport/tests/test_transport_haproxy.c @@ -45,11 +45,20 @@ teardown(void) app_shutdown(); } -static void -concat_nv(const gchar *name, const gchar *value, gsize value_len, gpointer user_data) +void +_format_addresses(GString *value, LogTransportAuxData *aux) { - GString *aux_nv_concated = user_data; - g_string_append_printf(aux_nv_concated, "%s=%s ", name, value); + gchar buf[1024]; + if (aux->peer_addr) + { + g_sockaddr_format(aux->peer_addr, buf, sizeof(buf), GSA_FULL); + g_string_append_printf(value, "source=%s ", buf); + } + if (aux->local_addr) + { + g_sockaddr_format(aux->local_addr, buf, sizeof(buf), GSA_FULL); + g_string_append_printf(value, "destination=%s", buf); + } } TestSuite(log_transport_proxy, .init = setup, .fini = teardown); @@ -58,7 +67,7 @@ typedef struct { const gchar *proxy_header; gboolean valid; - const gchar *aux_values; + const gchar *addresses; gint proxy_header_len; } ProtocolHeaderTestParams; @@ -71,15 +80,11 @@ ParameterizedTestParameters(log_transport_proxy, test_proxy_protocol_parse_heade { "PROXY UNKNOWN extra ignored parameters\r\n", TRUE, "" }, { "PROXY TCP4 1.1.1.1 2.2.2.2 3333 4444\r\n", TRUE, - .aux_values = "PROXIED_SRCIP=1.1.1.1 PROXIED_DSTIP=2.2.2.2 " - "PROXIED_SRCPORT=3333 PROXIED_DSTPORT=4444 " - "PROXIED_IP_VERSION=4 " + .addresses = "source=AF_INET(1.1.1.1:3333) destination=AF_INET(2.2.2.2:4444)" }, { "PROXY TCP6 ::1 ::2 3333 4444\r\n", TRUE, - .aux_values = "PROXIED_SRCIP=::1 PROXIED_DSTIP=::2 " - "PROXIED_SRCPORT=3333 PROXIED_DSTPORT=4444 " - "PROXIED_IP_VERSION=6 " + .addresses = "source=AF_INET6([::1]:3333) destination=AF_INET6([::2]:4444)" }, /* INVALID PROTO */ @@ -128,7 +133,7 @@ ParameterizedTestParameters(log_transport_proxy, test_proxy_protocol_parse_heade /* proxy v2 */ { "\r\n\r\n\0\r\nQUIT\n!\21\0\f\1\1\1\1\2\2\2\2\2025\255\234", TRUE, - .aux_values = "PROXIED_SRCIP=1.1.1.1 PROXIED_DSTIP=2.2.2.2 PROXIED_SRCPORT=33333 PROXIED_DSTPORT=44444 PROXIED_IP_VERSION=4 ", + .addresses = "source=AF_INET(1.1.1.1:33333) destination=AF_INET(2.2.2.2:44444)", .proxy_header_len = 28 }, }; @@ -146,12 +151,14 @@ ParameterizedTest(ProtocolHeaderTestParams *params, log_transport_proxy, test_pr gssize rc; log_transport_stack_init(&stack, mock); - LogTransport *transport = log_transport_haproxy_new(&stack, LOG_TRANSPORT_INITIAL, LOG_TRANSPORT_NONE); - + log_transport_stack_add_transport(&stack, + LOG_TRANSPORT_HAPROXY, log_transport_haproxy_new(LOG_TRANSPORT_INITIAL, LOG_TRANSPORT_INITIAL)); + log_transport_stack_switch(&stack, LOG_TRANSPORT_HAPROXY); do { log_transport_aux_data_init(&aux); - rc = log_transport_read(transport, buf, sizeof(buf), &aux); + rc = log_transport_stack_read(&stack, buf, sizeof(buf), &aux); + log_transport_aux_data_destroy(&aux); } while (rc == -1 && errno == EAGAIN); @@ -159,15 +166,14 @@ ParameterizedTest(ProtocolHeaderTestParams *params, log_transport_proxy, test_pr "This should be %s: \n>>%.*s<<\n (rc=%d, errno=%d)", params->valid ? "valid" : "invalid", proxy_header_len, params->proxy_header, (gint) rc, errno); - if (rc == 0 && params->aux_values) + if (rc == 0 && params->addresses) { - GString *aux_nv_concated = g_string_new(NULL); - log_transport_aux_data_foreach(&aux, concat_nv, aux_nv_concated); - cr_assert_str_eq(aux_nv_concated->str, params->aux_values); - g_string_free(aux_nv_concated, TRUE); - } + GString *addresses = g_string_new(NULL); + _format_addresses(addresses, &aux); + cr_assert_str_eq(addresses->str, params->addresses); + g_string_free(addresses, TRUE); + } - log_transport_free(transport); log_transport_stack_deinit(&stack); } diff --git a/lib/transport/transport-adapter.c b/lib/transport/transport-adapter.c index 74f3039ac1..7b1e472e39 100644 --- a/lib/transport/transport-adapter.c +++ b/lib/transport/transport-adapter.c @@ -27,7 +27,7 @@ gssize log_transport_adapter_read_method(LogTransport *s, gpointer buf, gsize buflen, LogTransportAuxData *aux) { LogTransportAdapter *self = (LogTransportAdapter *) s; - LogTransport *transport = log_transport_stack_get_transport(self->stack, self->base_index); + LogTransport *transport = log_transport_stack_get_transport(s->stack, self->base_index); return log_transport_read(transport, buf, buflen, aux); } @@ -36,7 +36,7 @@ gssize log_transport_adapter_write_method(LogTransport *s, const gpointer buf, gsize count) { LogTransportAdapter *self = (LogTransportAdapter *) s; - LogTransport *transport = log_transport_stack_get_transport(self->stack, self->base_index); + LogTransport *transport = log_transport_stack_get_transport(s->stack, self->base_index); return log_transport_write(transport, buf, count); } @@ -45,20 +45,19 @@ gssize log_transport_adapter_writev_method(LogTransport *s, struct iovec *iov, gint iov_count) { LogTransportAdapter *self = (LogTransportAdapter *) s; - LogTransport *transport = log_transport_stack_get_transport(self->stack, self->base_index); + LogTransport *transport = log_transport_stack_get_transport(s->stack, self->base_index); return log_transport_writev(transport, iov, iov_count); } void log_transport_adapter_init_instance(LogTransportAdapter *self, const gchar *name, - LogTransportStack *stack, LogTransportIndex base_index) + LogTransportIndex base_index) { - log_transport_init_instance(&self->super, name, stack->fd); + log_transport_init_instance(&self->super, name, -1); self->super.read = log_transport_adapter_read_method; self->super.write = log_transport_adapter_write_method; self->super.writev = log_transport_adapter_writev_method; - self->stack = stack; self->base_index = base_index; } diff --git a/lib/transport/transport-adapter.h b/lib/transport/transport-adapter.h index fbc3122100..6a1e2f0d02 100644 --- a/lib/transport/transport-adapter.h +++ b/lib/transport/transport-adapter.h @@ -30,7 +30,6 @@ typedef struct _LogTransportAdapter LogTransportAdapter; struct _LogTransportAdapter { LogTransport super; - LogTransportStack *stack; LogTransportIndex base_index; }; @@ -39,6 +38,6 @@ gssize log_transport_adapter_write_method(LogTransport *s, const gpointer buf, g gssize log_transport_adapter_writev_method(LogTransport *s, struct iovec *iov, gint iov_count); void log_transport_adapter_init_instance(LogTransportAdapter *self, const gchar *name, - LogTransportStack *stack, LogTransportIndex base); + LogTransportIndex base); #endif diff --git a/lib/transport/transport-aux-data.h b/lib/transport/transport-aux-data.h index c74a88f628..0f2ffc15de 100644 --- a/lib/transport/transport-aux-data.h +++ b/lib/transport/transport-aux-data.h @@ -81,6 +81,16 @@ log_transport_aux_data_copy(LogTransportAuxData *dst, LogTransportAuxData *src) } } +static inline void +log_transport_aux_data_move(LogTransportAuxData *dst, LogTransportAuxData *src) +{ + gsize data_to_copy = sizeof(*src) - sizeof(src->data) + src->end_ptr; + + if (dst) + memcpy(dst, src, data_to_copy); + log_transport_aux_data_init(src); +} + static inline void log_transport_aux_data_set_peer_addr_ref(LogTransportAuxData *self, GSockAddr *peer_addr) { diff --git a/lib/transport/transport-factory-tls.c b/lib/transport/transport-factory-tls.c index 3a137c9114..fc74f69677 100644 --- a/lib/transport/transport-factory-tls.c +++ b/lib/transport/transport-factory-tls.c @@ -39,7 +39,7 @@ _construct_transport(const LogTransportFactory *s, LogTransportStack *stack) tls_session_set_verifier(tls_session, self->tls_verifier); - return log_transport_tls_new(tls_session, stack->fd); + return log_transport_tls_new(tls_session, log_transport_stack_get_transport(stack, LOG_TRANSPORT_SOCKET)); } void diff --git a/lib/transport/transport-haproxy.c b/lib/transport/transport-haproxy.c index 95421c9855..ce996f5fb8 100644 --- a/lib/transport/transport-haproxy.c +++ b/lib/transport/transport-haproxy.c @@ -483,7 +483,7 @@ _fetch_into_proxy_buffer(LogTransportHAProxy *self) } else { - msg_error("Unable to determine PROXY protocol version"); + msg_error("Unable to determine PROXY protocol version", evt_tag_mem("proxy_header", self->proxy_header_buff, PROXY_PROTO_HDR_MAGIC_LEN)); return STATUS_ERROR; } g_assert_not_reached(); @@ -507,6 +507,27 @@ _fetch_into_proxy_buffer(LogTransportHAProxy *self) } } +static void +_save_addresses(LogTransportHAProxy *self) +{ + LogTransportStack *stack = self->super.super.stack; + if (self->info.unknown) + return; + + if (self->info.ip_version == 4) + { + log_transport_aux_data_set_peer_addr_ref(&stack->aux_data, g_sockaddr_inet_new(self->info.src_ip, self->info.src_port)); + log_transport_aux_data_set_local_addr_ref(&stack->aux_data, g_sockaddr_inet_new(self->info.dst_ip, self->info.dst_port)); + } + else if (self->info.ip_version == 6) + { + log_transport_aux_data_set_peer_addr_ref(&stack->aux_data, g_sockaddr_inet6_new(self->info.src_ip, self->info.src_port)); + log_transport_aux_data_set_local_addr_ref(&stack->aux_data, g_sockaddr_inet6_new(self->info.dst_ip, self->info.dst_port)); + } + else + g_assert_not_reached(); +} + static Status _proccess_proxy_header(LogTransportHAProxy *self) { @@ -527,6 +548,7 @@ _proccess_proxy_header(LogTransportHAProxy *self) if (parsable) { msg_trace("PROXY protocol header parsed successfully"); + _save_addresses(self); return STATUS_SUCCESS; } @@ -537,29 +559,6 @@ _proccess_proxy_header(LogTransportHAProxy *self) } } -static void -_augment_aux_data(LogTransportHAProxy *self, LogTransportAuxData *aux) -{ - gchar buf1[8]; - gchar buf2[8]; - gchar buf3[8]; - - if (self->info.unknown) - return; - - snprintf(buf1, 8, "%i", self->info.src_port); - snprintf(buf2, 8, "%i", self->info.dst_port); - snprintf(buf3, 8, "%i", self->info.ip_version); - - log_transport_aux_data_add_nv_pair(aux, "PROXIED_SRCIP", self->info.src_ip); - log_transport_aux_data_add_nv_pair(aux, "PROXIED_DSTIP", self->info.dst_ip); - log_transport_aux_data_add_nv_pair(aux, "PROXIED_SRCPORT", buf1); - log_transport_aux_data_add_nv_pair(aux, "PROXIED_DSTPORT", buf2); - log_transport_aux_data_add_nv_pair(aux, "PROXIED_IP_VERSION", buf3); - - return; -} - static gssize _haproxy_read(LogTransport *s, gpointer buf, gsize buflen, LogTransportAuxData *aux) { @@ -576,20 +575,22 @@ _haproxy_read(LogTransport *s, gpointer buf, gsize buflen, LogTransportAuxData * errno = EAGAIN; return -1; } - if (self->switch_to != LOG_TRANSPORT_NONE) - self->super.base_index = self->switch_to; - } - _augment_aux_data(self, aux); - return log_transport_adapter_read_method(s, buf, buflen, aux); + if (!log_transport_stack_switch(self->super.super.stack, self->switch_to)) + g_assert_not_reached(); + + errno = EAGAIN; + return -1; + } + g_assert_not_reached(); } LogTransport * -log_transport_haproxy_new(LogTransportStack *stack, LogTransportIndex base, LogTransportIndex switch_to) +log_transport_haproxy_new(LogTransportIndex base, LogTransportIndex switch_to) { LogTransportHAProxy *self = g_new0(LogTransportHAProxy, 1); - log_transport_adapter_init_instance(&self->super, "haproxy", stack, base); + log_transport_adapter_init_instance(&self->super, "haproxy", base); self->super.super.read = _haproxy_read; self->switch_to = switch_to; diff --git a/lib/transport/transport-haproxy.h b/lib/transport/transport-haproxy.h index 6830df8f80..c689934b7a 100644 --- a/lib/transport/transport-haproxy.h +++ b/lib/transport/transport-haproxy.h @@ -26,6 +26,6 @@ #include "transport-adapter.h" -LogTransport *log_transport_haproxy_new(LogTransportStack *stack, LogTransportIndex base, LogTransportIndex flip); +LogTransport *log_transport_haproxy_new(LogTransportIndex base, LogTransportIndex flip); #endif diff --git a/lib/transport/transport-stack.c b/lib/transport/transport-stack.c index 6b3ab04683..a00df6e2c9 100644 --- a/lib/transport/transport-stack.c +++ b/lib/transport/transport-stack.c @@ -46,20 +46,23 @@ void log_transport_stack_add_transport(LogTransportStack *self, gint index, LogTransport *transport) { g_assert(self->transports[index] == NULL); + log_transport_assign_to_stack(transport, self); self->transports[index] = transport; if (self->fd == -1) self->fd = transport->fd; - else + else if (transport->fd != -1) g_assert(self->fd == transport->fd); } gboolean log_transport_stack_switch(LogTransportStack *self, gint index) { + g_assert(index < LOG_TRANSPORT__MAX); LogTransport *active_transport = log_transport_stack_get_active(self); LogTransport *requested_transport = log_transport_stack_get_transport(self, index); - g_assert(requested_transport != NULL); + if (!requested_transport) + return FALSE; msg_debug("Transport switch requested", evt_tag_str("active-transport", active_transport ? active_transport->name : "none"), @@ -77,6 +80,39 @@ log_transport_stack_switch(LogTransportStack *self, gint index) return TRUE; } +/* + * Move the transport stack state to another LogTransportStack instance. + * Normally LogTransportStack instances are embedded in LogProto instances, + * so in case the LogProto instance is replaced, the transport stack may + * need to be moved. + */ +void +log_transport_stack_move(LogTransportStack *self, LogTransportStack *other) +{ + self->fd = other->fd; + self->active_transport = other->active_transport; + other->fd = -1; + + for (gint i = 0; i < LOG_TRANSPORT__MAX; i++) + { + g_assert(self->transports[i] == NULL); + g_assert(self->transport_factories[i] == NULL); + + if (other->transports[i]) + { + self->transports[i] = other->transports[i]; + log_transport_assign_to_stack(self->transports[i], self); + other->transports[i] = NULL; + } + if (other->transport_factories[i]) + { + self->transport_factories[i] = other->transport_factories[i]; + other->transport_factories[i] = NULL; + } + } + log_transport_aux_data_move(&self->aux_data, &other->aux_data); +} + void log_transport_stack_init(LogTransportStack *self, LogTransport *initial_transport) { @@ -84,11 +120,13 @@ log_transport_stack_init(LogTransportStack *self, LogTransport *initial_transpor self->fd = -1; if (initial_transport) log_transport_stack_add_transport(self, LOG_TRANSPORT_INITIAL, initial_transport); + log_transport_aux_data_init(&self->aux_data); } void log_transport_stack_deinit(LogTransportStack *self) { + log_transport_aux_data_destroy(&self->aux_data); if (self->fd != -1) { msg_trace("Closing log transport fd", diff --git a/lib/transport/transport-stack.h b/lib/transport/transport-stack.h index a53fca18ee..8c7414184c 100644 --- a/lib/transport/transport-stack.h +++ b/lib/transport/transport-stack.h @@ -27,7 +27,6 @@ #include "transport/logtransport.h" -typedef struct _LogTransportStack LogTransportStack; typedef struct _LogTransportFactory LogTransportFactory; typedef enum @@ -106,17 +105,21 @@ struct _LogTransportStack gint fd; LogTransport *transports[LOG_TRANSPORT__MAX]; LogTransportFactory *transport_factories[LOG_TRANSPORT__MAX]; + LogTransportAuxData aux_data; }; static inline LogTransport * log_transport_stack_get_transport(LogTransportStack *self, gint index) { + g_assert(index < LOG_TRANSPORT__MAX); + if (self->transports[index]) return self->transports[index]; if (self->transport_factories[index]) { self->transports[index] = log_transport_factory_construct_transport(self->transport_factories[index], self); + log_transport_assign_to_stack(self->transports[index], self); return self->transports[index]; } return NULL; @@ -128,9 +131,46 @@ log_transport_stack_get_active(LogTransportStack *self) return log_transport_stack_get_transport(self, self->active_transport); } +static inline gboolean +log_transport_stack_poll_prepare(LogTransportStack *self, GIOCondition *cond) +{ + LogTransport *transport = log_transport_stack_get_active(self); + return log_transport_poll_prepare(transport, cond); +} + +static inline gssize +log_transport_stack_write(LogTransportStack *self, const gpointer buf, gsize count) +{ + LogTransport *transport = log_transport_stack_get_active(self); + return log_transport_write(transport, buf, count); +} + +static inline gssize +log_transport_stack_writev(LogTransportStack *self, struct iovec *iov, gint iov_count) +{ + LogTransport *transport = log_transport_stack_get_active(self); + return log_transport_writev(transport, iov, iov_count); +} + +static inline gssize +log_transport_stack_read(LogTransportStack *self, gpointer buf, gsize count, LogTransportAuxData *aux) +{ + LogTransport *transport = log_transport_stack_get_active(self); + log_transport_aux_data_copy(aux, &self->aux_data); + return log_transport_read(transport, buf, count, aux); +} + +static inline gssize +log_transport_stack_read_ahead(LogTransportStack *self, gpointer buf, gsize count, gboolean *moved_forward) +{ + LogTransport *transport = log_transport_stack_get_active(self); + return log_transport_read_ahead(transport, buf, count, moved_forward); +} + void log_transport_stack_add_factory(LogTransportStack *self, LogTransportFactory *); void log_transport_stack_add_transport(LogTransportStack *self, gint index, LogTransport *); gboolean log_transport_stack_switch(LogTransportStack *self, gint index); +void log_transport_stack_move(LogTransportStack *self, LogTransportStack *other); void log_transport_stack_init(LogTransportStack *self, LogTransport *initial_transport); void log_transport_stack_deinit(LogTransportStack *self); diff --git a/lib/transport/transport-tls.c b/lib/transport/transport-tls.c index 53bebe8753..4d64068524 100644 --- a/lib/transport/transport-tls.c +++ b/lib/transport/transport-tls.c @@ -30,6 +30,101 @@ #include #include +static int +_BIO_transport_write(BIO *bio, const char *buf, size_t buflen, size_t *written_bytes) +{ + LogTransport *transport = BIO_get_data(bio); + gssize ret; + + ret = log_transport_write(transport, (gpointer) buf, buflen); + BIO_clear_retry_flags(bio); + + if (ret < 0) + { + *written_bytes = 0; + if (errno == EAGAIN) + BIO_set_retry_write(bio); + return -1; + } + *written_bytes = ret; + return 1; +} + +int +_BIO_transport_read(BIO *bio, char *buf, gsize buflen, gsize *read_bytes) +{ + LogTransport *transport = BIO_get_data(bio); + gssize ret; + + ret = log_transport_read(transport, buf, buflen, NULL); + if (ret < 0) + { + *read_bytes = 0; + if (errno == EAGAIN) + BIO_set_retry_read(bio); + return -1; + } + *read_bytes = ret; + return 1; +} + +long +_BIO_transport_ctrl(BIO *bio, int cmd, long num, void *ptr) +{ + long ret = 1; + + switch (cmd) + { + case BIO_CTRL_GET_CLOSE: + ret = BIO_get_shutdown(bio); + break; + case BIO_CTRL_SET_CLOSE: + BIO_set_shutdown(bio, (int)num); + break; + case BIO_CTRL_DUP: + case BIO_CTRL_FLUSH: + ret = 1; + break; + case BIO_CTRL_RESET: + case BIO_C_FILE_SEEK: + case BIO_C_FILE_TELL: + case BIO_CTRL_INFO: + case BIO_C_SET_FD: + case BIO_C_GET_FD: + case BIO_CTRL_PENDING: + case BIO_CTRL_WPENDING: + default: + ret = 0; + break; + } + + return ret; +} + +BIO_METHOD * +BIO_s_transport(void) +{ + static BIO_METHOD *meth = NULL; + + if (meth) + return meth; + + meth = BIO_meth_new(BIO_TYPE_NONE, "LogTransportBIO"); + BIO_meth_set_write_ex(meth, _BIO_transport_write); + BIO_meth_set_read_ex(meth, _BIO_transport_read); + BIO_meth_set_ctrl(meth, _BIO_transport_ctrl); + + return meth; +} + +BIO * +BIO_transport_new(LogTransport *transport) +{ + BIO *bio = BIO_new(BIO_s_transport()); + BIO_set_data(bio, transport); + return bio; +} + typedef struct _LogTransportTLS { LogTransportSocket super; @@ -241,11 +336,11 @@ log_transport_tls_write_method(LogTransport *s, const gpointer buf, gsize buflen static void log_transport_tls_free_method(LogTransport *s); LogTransport * -log_transport_tls_new(TLSSession *tls_session, gint fd) +log_transport_tls_new(TLSSession *tls_session, LogTransport *transport) { LogTransportTLS *self = g_new0(LogTransportTLS, 1); - log_transport_stream_socket_init_instance(&self->super, fd); + log_transport_stream_socket_init_instance(&self->super, -1); self->super.super.name = "tls"; self->super.super.cond = 0; self->super.super.read = log_transport_tls_read_method; @@ -253,7 +348,8 @@ log_transport_tls_new(TLSSession *tls_session, gint fd) self->super.super.free_fn = log_transport_tls_free_method; self->tls_session = tls_session; - SSL_set_fd(self->tls_session->ssl, fd); + BIO *bio = BIO_transport_new(transport); + SSL_set_bio(self->tls_session->ssl, bio, bio); return &self->super.super; } diff --git a/lib/transport/transport-tls.h b/lib/transport/transport-tls.h index 682667e1ab..4497df890e 100644 --- a/lib/transport/transport-tls.h +++ b/lib/transport/transport-tls.h @@ -27,6 +27,6 @@ #include "transport/logtransport.h" #include "transport/tls-context.h" -LogTransport *log_transport_tls_new(TLSSession *tls_session, gint fd); +LogTransport *log_transport_tls_new(TLSSession *tls_session, LogTransport *transport); #endif diff --git a/libtest/mock-transport.c b/libtest/mock-transport.c index 8ad2929145..f283a4893c 100644 --- a/libtest/mock-transport.c +++ b/libtest/mock-transport.c @@ -211,7 +211,7 @@ log_transport_mock_read_method(LogTransport *s, gpointer buf, gsize count, LogTr switch (g_array_index(self->value, data_t, self->current_value_ndx).type) { case DATA_STRING: - if (self->input_is_a_stream) + if (self->input_is_a_stream && count > 0) count = 1; current_iov = &g_array_index(self->value, data_t, self->current_value_ndx).iov; diff --git a/libtest/proto_lib.c b/libtest/proto_lib.c index ecbdf22fc8..3e796123c8 100644 --- a/libtest/proto_lib.c +++ b/libtest/proto_lib.c @@ -37,6 +37,31 @@ assert_proto_server_status(LogProtoServer *proto, LogProtoStatus status, LogProt cr_assert_eq(status, expected_status, "LogProtoServer expected status mismatch"); } +LogProtoStatus +proto_server_handshake(LogProtoServer **proto) +{ + gboolean handshake_finished = FALSE; + LogProtoStatus status; + + start_grabbing_messages(); + do + { + LogProtoServer *proto_replacement = NULL; + status = log_proto_server_handshake(*proto, &handshake_finished, &proto_replacement); + if (status == LPS_AGAIN) + status = LPS_SUCCESS; + if (proto_replacement) + { + log_transport_stack_move(&proto_replacement->transport_stack, &(*proto)->transport_stack); + log_proto_server_free(*proto); + *proto = proto_replacement; + } + } + while (status == LPS_SUCCESS && handshake_finished == FALSE); + stop_grabbing_messages(); + return status; +} + LogProtoStatus proto_server_fetch(LogProtoServer *proto, const guchar **msg, gsize *msg_len) { @@ -80,6 +105,16 @@ construct_server_proto_plugin(const gchar *name, LogTransport *transport) return log_proto_server_factory_construct(proto_factory, transport, &proto_server_options); } +void +assert_proto_server_handshake(LogProtoServer **proto) +{ + LogProtoStatus status; + + status = proto_server_handshake(proto); + + assert_proto_server_status(*proto, status, LPS_SUCCESS); +} + void assert_proto_server_fetch(LogProtoServer *proto, const gchar *expected_msg, gssize expected_msg_len) { @@ -146,6 +181,16 @@ assert_proto_server_fetch_failure(LogProtoServer *proto, LogProtoStatus expected assert_grabbed_log_contains(error_message); } +void +assert_proto_server_handshake_failure(LogProtoServer **proto, LogProtoStatus expected_status) +{ + LogProtoStatus status; + + status = proto_server_handshake(proto); + + assert_proto_server_status(*proto, status, expected_status); +} + void assert_proto_server_fetch_ignored_eof(LogProtoServer *proto) { diff --git a/libtest/proto_lib.h b/libtest/proto_lib.h index 357a7d2fe8..5b75874e38 100644 --- a/libtest/proto_lib.h +++ b/libtest/proto_lib.h @@ -29,6 +29,9 @@ extern LogProtoServerOptions proto_server_options; + +void assert_proto_server_handshake(LogProtoServer **proto); +void assert_proto_server_handshake_failure(LogProtoServer **proto, LogProtoStatus expected_status); void assert_proto_server_status(LogProtoServer *proto, LogProtoStatus status, LogProtoStatus expected_status); void assert_proto_server_fetch(LogProtoServer *proto, const gchar *expected_msg, gssize expected_msg_len); void assert_proto_server_fetch_single_read(LogProtoServer *proto, const gchar *expected_msg, gssize expected_msg_len); diff --git a/modules/affile/logproto-file-writer.c b/modules/affile/logproto-file-writer.c index 06a90c8ef4..f463e8ae9f 100644 --- a/modules/affile/logproto-file-writer.c +++ b/modules/affile/logproto-file-writer.c @@ -48,11 +48,10 @@ typedef struct _LogProtoFileWriter static inline gboolean _flush_partial(LogProtoFileWriter *self, LogProtoStatus *status) { - LogTransport *transport = log_transport_stack_get_active(&self->super.transport_stack); /* there is still some data from the previous file writing process */ gint len = self->partial_len - self->partial_pos; - gssize rc = log_transport_write(transport, self->partial + self->partial_pos, len); + gssize rc = log_transport_stack_write(&self->super.transport_stack, self->partial + self->partial_pos, len); if (rc > 0 && self->fsync) fsync(self->fd); @@ -67,7 +66,7 @@ _flush_partial(LogProtoFileWriter *self, LogProtoStatus *status) log_proto_client_msg_rewind(&self->super); msg_error("I/O error occurred while writing", - evt_tag_int("fd", transport->fd), + evt_tag_int("fd", self->super.transport_stack.fd), evt_tag_error(EVT_TAG_OSERROR)); *status = LPS_ERROR; @@ -139,7 +138,6 @@ static LogProtoStatus log_proto_file_writer_flush(LogProtoClient *s) { LogProtoFileWriter *self = (LogProtoFileWriter *)s; - LogTransport *transport = log_transport_stack_get_active(&self->super.transport_stack); if (self->partial) { @@ -152,10 +150,10 @@ log_proto_file_writer_flush(LogProtoClient *s) if (self->buf_count == 0) return LPS_SUCCESS; - gssize rc = log_transport_writev(transport, self->buffer, self->buf_count); + gssize rc = log_transport_stack_writev(&self->super.transport_stack, self->buffer, self->buf_count); if (rc > 0 && self->fsync) - fsync(transport->fd); + fsync(self->super.transport_stack.fd); if (rc < 0) { @@ -164,7 +162,7 @@ log_proto_file_writer_flush(LogProtoClient *s) log_proto_client_msg_rewind(&self->super); msg_error("I/O error occurred while writing", - evt_tag_int("fd", transport->fd), + evt_tag_int("fd", self->super.transport_stack.fd), evt_tag_error(EVT_TAG_OSERROR)); return LPS_ERROR; } @@ -232,13 +230,14 @@ log_proto_file_writer_post(LogProtoClient *s, LogMessage *logmsg, guchar *msg, g } static gboolean -log_proto_file_writer_prepare(LogProtoClient *s, gint *fd, GIOCondition *cond, gint *timeout) +log_proto_file_writer_poll_prepare(LogProtoClient *s, gint *fd, GIOCondition *cond, gint *timeout) { LogProtoFileWriter *self = (LogProtoFileWriter *) s; - LogTransport *transport = log_transport_stack_get_active(&self->super.transport_stack); - *fd = transport->fd; - *cond = transport->cond; + if (log_transport_stack_poll_prepare(&self->super.transport_stack, cond)) + return TRUE; + + *fd = self->super.transport_stack.fd; /* if there's no pending I/O in the transport layer, then we want to do a write */ if (*cond == 0) @@ -265,7 +264,7 @@ log_proto_file_writer_new(LogTransport *transport, const LogProtoClientOptions * log_proto_client_init(&self->super, transport, options); self->buf_size = flush_lines; self->fsync = fsync_; - self->super.prepare = log_proto_file_writer_prepare; + self->super.poll_prepare = log_proto_file_writer_poll_prepare; self->super.post = log_proto_file_writer_post; self->super.flush = log_proto_file_writer_flush; return &self->super; diff --git a/modules/afsocket/afsocket-grammar.ym b/modules/afsocket/afsocket-grammar.ym index d26ea8bd73..fefe3d845a 100644 --- a/modules/afsocket/afsocket-grammar.ym +++ b/modules/afsocket/afsocket-grammar.ym @@ -772,6 +772,7 @@ afsocket_transport : KW_TRANSPORT '(' KW_TCP ')' { transport_mapper_set_transport(last_transport_mapper, "tcp"); } | KW_TRANSPORT '(' KW_UDP ')' { transport_mapper_set_transport(last_transport_mapper, "udp"); } | KW_TRANSPORT '(' KW_TLS ')' { transport_mapper_set_transport(last_transport_mapper, "tls"); } + | KW_TRANSPORT '(' KW_AUTO ')' { transport_mapper_set_transport(last_transport_mapper, "auto"); } | KW_IP_PROTOCOL '(' inet_ip_protocol_option ')' { transport_mapper_set_address_family(last_transport_mapper, $3); } ; diff --git a/modules/afsocket/transport-mapper-inet.c b/modules/afsocket/transport-mapper-inet.c index 4552e66084..d9e00d81df 100644 --- a/modules/afsocket/transport-mapper-inet.c +++ b/modules/afsocket/transport-mapper-inet.c @@ -108,7 +108,7 @@ _setup_haproxy_transport(TransportMapperInet *self, LogTransportStack *stack, LogTransportIndex base_index, LogTransportIndex switch_to) { log_transport_stack_add_transport(stack, LOG_TRANSPORT_HAPROXY, - log_transport_haproxy_new(stack, base_index, switch_to)); + log_transport_haproxy_new(base_index, switch_to)); return TRUE; } @@ -134,16 +134,17 @@ transport_mapper_inet_setup_stack(TransportMapper *s, LogTransportStack *stack) { LogTransportIndex switch_to; - if (self->tls_context && !_is_tls_required(self)) + if (self->tls_context) switch_to = LOG_TRANSPORT_TLS; else - switch_to = LOG_TRANSPORT_NONE; + switch_to = LOG_TRANSPORT_SOCKET; if (!_setup_haproxy_transport(self, stack, initial_transport_index, switch_to)) return FALSE; initial_transport_index = LOG_TRANSPORT_HAPROXY; } - log_transport_stack_switch(stack, initial_transport_index); + if (!log_transport_stack_switch(stack, initial_transport_index)) + g_assert_not_reached(); return TRUE; } diff --git a/modules/afsocket/transport-mapper-unix.c b/modules/afsocket/transport-mapper-unix.c index 13c1ee86a9..16fb86409a 100644 --- a/modules/afsocket/transport-mapper-unix.c +++ b/modules/afsocket/transport-mapper-unix.c @@ -43,7 +43,8 @@ _setup_stack(TransportMapper *s, LogTransportStack *stack) else transport = log_transport_unix_stream_socket_new(stack->fd); log_transport_stack_add_transport(stack, LOG_TRANSPORT_SOCKET, transport); - log_transport_stack_switch(stack, LOG_TRANSPORT_SOCKET); + if (!log_transport_stack_switch(stack, LOG_TRANSPORT_SOCKET)) + g_assert_not_reached(); return TRUE; } diff --git a/news/feature-4814.md b/news/feature-4814.md new file mode 100644 index 0000000000..d8b48cba9c --- /dev/null +++ b/news/feature-4814.md @@ -0,0 +1,4 @@ +`syslog()` source driver: add support for RFC6587 style auto-detection of +octet-count based framing to avoid confusion that stems from the sender +using a different protocol to the server. This behaviour can be enabled +by using `transport(auto)` option for the `syslog()` source. diff --git a/tests/copyright/policy b/tests/copyright/policy index 83a9e1ba7b..1306366ba0 100644 --- a/tests/copyright/policy +++ b/tests/copyright/policy @@ -135,6 +135,8 @@ modules/python/python-confgen\.[ch] lib/tests/test_logscheduler\.c lib/filterx/.*\.[ch] lib/filterx/filterx-grammar\.ym +lib/logproto/logproto-auto-server\.[ch] +lib/transport/tests/test_transport\.c ########################################################################### # These tests are GPLd even though they reside under lib/ and are excluded @@ -269,6 +271,7 @@ tests/light/functional_tests/filterx/test_filterx\.py tests/light/functional_tests/filterx/test_filterx_scope\.py tests/light/functional_tests/filterx/test_filterx_update_metric\.py tests/light/functional_tests/parsers/metrics-probe/test_metrics_probe\.py +tests/light/functional_tests/source_drivers/syslog_source/auto/test_auto_proto\.py tests/light/src/syslog_ng_ctl/prometheus_stats_handler.py tests/light/src/syslog_ng_config/statements/template/template\.py tests/light/src/syslog_ng_config/statements/__init__\.py diff --git a/tests/light/functional_tests/source_drivers/syslog_source/auto/test_auto_proto.py b/tests/light/functional_tests/source_drivers/syslog_source/auto/test_auto_proto.py new file mode 100644 index 0000000000..6c8e514db7 --- /dev/null +++ b/tests/light/functional_tests/source_drivers/syslog_source/auto/test_auto_proto.py @@ -0,0 +1,131 @@ +#!/usr/bin/env python +############################################################################# +# Copyright (c) 2024 Balazs Scheidler +# Copyright (c) 2024 Axoflow +# +# This program is free software; you can redistribute it and/or modify it +# under the terms of the GNU General Public License version 2 as published +# by the Free Software Foundation, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA +# +# As an additional exemption you are allowed to compile & link against the +# OpenSSL libraries as published by the OpenSSL project. See the file +# COPYING for details. +# +############################################################################# +from pathlib import Path + +from src.common.blocking import wait_until_true +from src.common.file import File +from src.common.file import copy_shared_file +from src.common.random_id import get_unique_id + + +def _write_auto_config(config, syslog_ng, port_allocator, transport, testcase_parameters): + server_key_path = copy_shared_file(testcase_parameters, "server.key") + server_cert_path = copy_shared_file(testcase_parameters, "server.crt") + + output_file = "output.log" + + syslog_source = config.create_syslog_source( + ip="localhost", + port=port_allocator(), + keep_hostname="yes", + transport=transport, + tls={ + "key-file": server_key_path, + "cert-file": server_cert_path, + "peer-verify": '"optional-untrusted"', + }, + ) + file_destination = config.create_file_destination(file_name=output_file) + config.create_logpath(statements=[syslog_source, file_destination]) + + syslog_ng.start(config) + return (syslog_source, file_destination) + +def _test_auto_detect(syslog_source, file_destination, loggen, input_messages, number_of_messages, expected_messages, use_ssl=False, proxied=False, proxied_tls_passthrough=False): + + loggen_input_file_path = Path("loggen_input_{}.txt".format(get_unique_id())) + loggen_input_file = File(loggen_input_file_path) + loggen_input_file.write_content_and_close(input_messages) + loggen.start( + syslog_source.options["ip"], syslog_source.options["port"], + number=number_of_messages+(1 if proxied else 0), + dont_parse=True, + read_file=str(loggen_input_file_path), + syslog_proto=True, + inet=None if use_ssl else True, + use_ssl=use_ssl, + proxied=1 if proxied else None, + proxied_tls_passthrough=proxied_tls_passthrough, + proxy_src_ip="1.1.1.1", proxy_dst_ip="2.2.2.2", proxy_src_port="3333", proxy_dst_port="4444", + ) + + wait_until_true(lambda: loggen.get_sent_message_count() == number_of_messages) + + assert file_destination.read_log() == expected_messages + + loggen.stop() + + +def test_auto_framing(config, syslog_ng, syslog_ng_ctl, port_allocator, loggen, testcase_parameters): + NUMBER_OF_MESSAGES = 10 + INPUT_MESSAGES = "52 <2>Oct 11 22:14:15 myhostname sshd[1234]: message 0\n" * NUMBER_OF_MESSAGES + EXPECTED_MESSAGES = "Oct 11 22:14:15 myhostname sshd[1234]: message 0\n" + (syslog_source, file_destination) = _write_auto_config(config, syslog_ng, port_allocator, '"auto"', testcase_parameters) + + _test_auto_detect(syslog_source, file_destination, loggen, + INPUT_MESSAGES, NUMBER_OF_MESSAGES, EXPECTED_MESSAGES) + +def test_auto_framing_tls(config, syslog_ng, syslog_ng_ctl, port_allocator, loggen, testcase_parameters): + NUMBER_OF_MESSAGES = 10 + INPUT_MESSAGES = "52 <2>Oct 11 22:14:15 myhostname sshd[1234]: message 0\n" * NUMBER_OF_MESSAGES + EXPECTED_MESSAGES = "Oct 11 22:14:15 myhostname sshd[1234]: message 0\n" + (syslog_source, file_destination) = _write_auto_config(config, syslog_ng, port_allocator, '"auto"', testcase_parameters) + + _test_auto_detect(syslog_source, file_destination, loggen, + INPUT_MESSAGES, NUMBER_OF_MESSAGES, EXPECTED_MESSAGES, + use_ssl=True) + +def test_auto_framing_tls_proxied(config, syslog_ng, syslog_ng_ctl, port_allocator, loggen, testcase_parameters): + NUMBER_OF_MESSAGES = 10 + INPUT_MESSAGES = "52 <2>Oct 11 22:14:15 myhostname sshd[1234]: message 0\n" * NUMBER_OF_MESSAGES + EXPECTED_MESSAGES = "Oct 11 22:14:15 myhostname sshd[1234]: message 0\n" + (syslog_source, file_destination) = _write_auto_config(config, syslog_ng, port_allocator, '"auto"', testcase_parameters) + + _test_auto_detect(syslog_source, file_destination, loggen, + INPUT_MESSAGES, NUMBER_OF_MESSAGES, EXPECTED_MESSAGES, + use_ssl=True, proxied=True) + +def test_auto_framing_tls_proxied_passthrough(config, syslog_ng, syslog_ng_ctl, port_allocator, loggen, testcase_parameters): + NUMBER_OF_MESSAGES = 10 + INPUT_MESSAGES = "52 <2>Oct 11 22:14:15 myhostname sshd[1234]: message 0\n" * NUMBER_OF_MESSAGES + EXPECTED_MESSAGES = "Oct 11 22:14:15 myhostname sshd[1234]: message 0\n" + (syslog_source, file_destination) = _write_auto_config(config, syslog_ng, port_allocator, '"auto"', testcase_parameters) + + _test_auto_detect(syslog_source, file_destination, loggen, + INPUT_MESSAGES, NUMBER_OF_MESSAGES, EXPECTED_MESSAGES, + use_ssl=True, proxied=True, proxied_tls_passthrough=True) + + +def test_auto_no_framing(config, syslog_ng, syslog_ng_ctl, port_allocator, loggen, testcase_parameters): + NUMBER_OF_MESSAGES = 10 + INPUT_MESSAGES = "<2>Oct 11 22:14:15 myhostname sshd[1234]: message 0\n" * NUMBER_OF_MESSAGES + EXPECTED_MESSAGES = "Oct 11 22:14:15 myhostname sshd[1234]: message 0\n" + + (syslog_source, file_destination) = _write_auto_config(config, syslog_ng, port_allocator, '"auto"', testcase_parameters) + _test_auto_detect(syslog_source, file_destination, loggen, + INPUT_MESSAGES, NUMBER_OF_MESSAGES, EXPECTED_MESSAGES) + + _test_auto_detect(syslog_source, file_destination, loggen, + INPUT_MESSAGES, NUMBER_OF_MESSAGES, EXPECTED_MESSAGES, + use_ssl=True) diff --git a/tests/light/functional_tests/source_drivers/syslog_source/proxyprotocol/test_pp_syslog.py b/tests/light/functional_tests/source_drivers/syslog_source/proxyprotocol/test_pp_syslog.py index c8cffb71e1..107b7ed218 100644 --- a/tests/light/functional_tests/source_drivers/syslog_source/proxyprotocol/test_pp_syslog.py +++ b/tests/light/functional_tests/source_drivers/syslog_source/proxyprotocol/test_pp_syslog.py @@ -92,32 +92,32 @@ def _test_pp(config, syslog_ng, syslog_ng_ctl, port_allocator, loggen, testcase_ def test_pp_syslog_tcp(config, syslog_ng, syslog_ng_ctl, port_allocator, loggen, testcase_parameters): - TEMPLATE = r'"${PROXIED_SRCIP} ${PROXIED_DSTIP} ${PROXIED_SRCPORT} ${PROXIED_DSTPORT} ${PROXIED_IP_VERSION} ${MESSAGE}\n"' + TEMPLATE = r'"${SOURCEIP} ${DESTIP} ${DESTPORT} ${MESSAGE}\n"' INPUT_MESSAGES = "53 <2>Oct 11 22:14:15 myhostname sshd[1234]: message 0\r\n" - EXPECTED_MESSAGES = "1.1.1.1 2.2.2.2 3333 4444 4 <2>Oct 11 22:14:15 myhostname sshd[1234]: message 0\n" + EXPECTED_MESSAGES = "1.1.1.1 2.2.2.2 4444 <2>Oct 11 22:14:15 myhostname sshd[1234]: message 0\n" NUMBER_OF_MESSAGES = 2 _test_pp(config, syslog_ng, syslog_ng_ctl, port_allocator, loggen, testcase_parameters, '"proxied-tcp"', INPUT_MESSAGES, NUMBER_OF_MESSAGES, EXPECTED_MESSAGES, TEMPLATE) def test_pp_syslog_tls(config, syslog_ng, syslog_ng_ctl, port_allocator, loggen, testcase_parameters): - TEMPLATE = r'"${PROXIED_SRCIP} ${PROXIED_DSTIP} ${PROXIED_SRCPORT} ${PROXIED_DSTPORT} ${PROXIED_IP_VERSION} ${MESSAGE}\n"' + TEMPLATE = r'"${SOURCEIP} ${DESTIP} ${DESTPORT} ${MESSAGE}\n"' INPUT_MESSAGES = "53 <2>Oct 11 22:14:15 myhostname sshd[1234]: message 0\r\n" - EXPECTED_MESSAGES = "1.1.1.1 2.2.2.2 3333 4444 4 <2>Oct 11 22:14:15 myhostname sshd[1234]: message 0\n" + EXPECTED_MESSAGES = "1.1.1.1 2.2.2.2 4444 <2>Oct 11 22:14:15 myhostname sshd[1234]: message 0\n" NUMBER_OF_MESSAGES = 2 _test_pp(config, syslog_ng, syslog_ng_ctl, port_allocator, loggen, testcase_parameters, '"proxied-tls"', INPUT_MESSAGES, NUMBER_OF_MESSAGES, EXPECTED_MESSAGES, TEMPLATE) def test_pp_syslog_tls_with_passphrase(config, syslog_ng, syslog_ng_ctl, port_allocator, loggen, testcase_parameters): - TEMPLATE = r'"${PROXIED_SRCIP} ${PROXIED_DSTIP} ${PROXIED_SRCPORT} ${PROXIED_DSTPORT} ${PROXIED_IP_VERSION} ${MESSAGE}\n"' + TEMPLATE = r'"${SOURCEIP} ${DESTIP} ${DESTPORT} ${MESSAGE}\n"' INPUT_MESSAGES = "53 <2>Oct 11 22:14:15 myhostname sshd[1234]: message 0\r\n" - EXPECTED_MESSAGES = "1.1.1.1 2.2.2.2 3333 4444 4 <2>Oct 11 22:14:15 myhostname sshd[1234]: message 0\n" + EXPECTED_MESSAGES = "1.1.1.1 2.2.2.2 4444 <2>Oct 11 22:14:15 myhostname sshd[1234]: message 0\n" NUMBER_OF_MESSAGES = 2 _test_pp(config, syslog_ng, syslog_ng_ctl, port_allocator, loggen, testcase_parameters, '"proxied-tls"', INPUT_MESSAGES, NUMBER_OF_MESSAGES, EXPECTED_MESSAGES, TEMPLATE, password="asdfg") def test_pp_syslog_tls_passthrough(config, syslog_ng, syslog_ng_ctl, port_allocator, loggen, testcase_parameters): - TEMPLATE = r'"${PROXIED_SRCIP} ${PROXIED_DSTIP} ${PROXIED_SRCPORT} ${PROXIED_DSTPORT} ${PROXIED_IP_VERSION} ${MESSAGE}\n"' + TEMPLATE = r'"${SOURCEIP} ${DESTIP} ${DESTPORT} ${MESSAGE}\n"' INPUT_MESSAGES = "53 <2>Oct 11 22:14:15 myhostname sshd[1234]: message 0\r\n" - EXPECTED_MESSAGES = "1.1.1.1 2.2.2.2 3333 4444 4 <2>Oct 11 22:14:15 myhostname sshd[1234]: message 0\n" + EXPECTED_MESSAGES = "1.1.1.1 2.2.2.2 4444 <2>Oct 11 22:14:15 myhostname sshd[1234]: message 0\n" NUMBER_OF_MESSAGES = 2 _test_pp(config, syslog_ng, syslog_ng_ctl, port_allocator, loggen, testcase_parameters, '"proxied-tls-passthrough"', INPUT_MESSAGES, NUMBER_OF_MESSAGES, EXPECTED_MESSAGES, TEMPLATE) diff --git a/tests/light/src/syslog_ng/syslog_ng_executor.py b/tests/light/src/syslog_ng/syslog_ng_executor.py index 12a966d936..962b11f9d6 100644 --- a/tests/light/src/syslog_ng/syslog_ng_executor.py +++ b/tests/light/src/syslog_ng/syslog_ng_executor.py @@ -80,7 +80,7 @@ def run_process_with_gdb(self, stderr, debug, trace, verbose, startup_debug, no_ syslog_ng_bin, ] return self.__process_executor.start( - command=["xterm", "-e", shlex.join(gdb_command_args)], + command=["xterm", "-fa", "Monospace", "-fs", "18", "-e", shlex.join(gdb_command_args)], stdout_path="/dev/null", stderr_path="/dev/null", ) diff --git a/tests/light/src/syslog_ng_config/statements/sources/syslog_source.py b/tests/light/src/syslog_ng_config/statements/sources/syslog_source.py index 20a43d9e1e..be6faf33d5 100644 --- a/tests/light/src/syslog_ng_config/statements/sources/syslog_source.py +++ b/tests/light/src/syslog_ng_config/statements/sources/syslog_source.py @@ -26,6 +26,7 @@ def map_transport(transport): mapping = { + "auto": NetworkIO.Transport.TCP, "tcp": NetworkIO.Transport.TCP, "udp": NetworkIO.Transport.UDP, "tls": NetworkIO.Transport.TLS,