Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add auto-detecting transport/logproto properties. #372

Open
wants to merge 29 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
421058b
libtest: fixed LogTransportMock to not return bytes if the buffer siz…
bazsi Feb 4, 2024
f2f9ffc
logtransport: add read_ahead() method
bazsi Feb 3, 2024
70dc1e6
transport: establish link between LogTransportStack and constituent L…
bazsi Nov 9, 2024
aaac335
transport: remove explicit stack argument from LogTransportAdapter
bazsi Nov 2, 2024
3a9f8e0
logproto: add LogProtoAutoServer implementation
bazsi Feb 3, 2024
017c45d
syslog: use rfc6587 style auto-detection when transport("auto") is se…
bazsi Apr 25, 2024
2210654
libtest: allow LogProtoServer instance to change during handshake
bazsi Oct 20, 2024
47f5643
logreader: allow replacing LogProtoServer instances during handshake
bazsi Oct 20, 2024
f77bb5c
light: add testcase for framing auto detection
bazsi Oct 20, 2024
963db7e
news: updated news entry
bazsi Feb 3, 2024
fa94fef
tests/copyright: updated policy about new files
bazsi Oct 20, 2024
137ed30
logproto: rename prepare() methods to poll_prepare
bazsi Nov 8, 2024
4d343ee
transport-stack: add assertion to log_transport_stack_get_active()
bazsi Nov 10, 2024
c113e37
transport-stack: indicate failure in log_transport_stack_switch()
bazsi Nov 8, 2024
d7b6bfd
transport-tls: use OpenSSL BIOs to wrap the lower level LogTransport …
bazsi Nov 8, 2024
b627c33
transport-stack: add poll_prepare() methods for LogTransport and LogT…
bazsi Nov 8, 2024
a71e690
transport-stack: add LogTransportStack level read/write/writev methods
bazsi Nov 10, 2024
047cab5
transport-aux-data: add log_transport_aux_data_move()
bazsi Nov 11, 2024
51d011e
transport-stack: make it possible to add aux data from the LogTranspo…
bazsi Nov 10, 2024
d026b75
logproto: call transport-stack level I/O methods
bazsi Nov 8, 2024
71fe874
transport-haproxy: use the normal $SOURCEIP, $DESTIP, $DESTPORT macros
bazsi Nov 2, 2024
859ba65
transport-haproxy: use the LogTransportStack->aux to save src/dst add…
bazsi Nov 10, 2024
241b0fc
transport-haproxy: implement transport switch instead of changing bas…
bazsi Nov 10, 2024
76f5109
transport-haproxy: add more information to the proxy detection failur…
bazsi Nov 10, 2024
b3c46fc
logproto: add support for auto-detecting TLS handshakes
bazsi Nov 10, 2024
ed08dd4
logproto: add support for auto-detecting haproxy handshakes
bazsi Nov 9, 2024
57e8313
light: use larger fonts when executing xterm
bazsi Nov 2, 2024
dff7494
light: add testcase for transport(auto)
bazsi Nov 11, 2024
a2bc880
foo: bar
bazsi Nov 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions lib/logproto/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ set(LOGPROTO_HEADERS
logproto/logproto-server.h
logproto/logproto-text-client.h
logproto/logproto-text-server.h
logproto/logproto-auto-server.h
PARENT_SCOPE)

set(LOGPROTO_SOURCES
Expand All @@ -25,6 +26,7 @@ set(LOGPROTO_SOURCES
logproto/logproto-server.c
logproto/logproto-text-client.c
logproto/logproto-text-server.c
logproto/logproto-auto-server.c
PARENT_SCOPE)

add_test_subdirectory(tests)
2 changes: 2 additions & 0 deletions lib/logproto/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ logprotoinclude_HEADERS = \
lib/logproto/logproto-framed-server.h \
lib/logproto/logproto-text-client.h \
lib/logproto/logproto-text-server.h \
lib/logproto/logproto-auto-server.h \
lib/logproto/logproto-multiline-server.h \
lib/logproto/logproto-record-server.h \
lib/logproto/logproto-builtins.h \
Expand All @@ -25,6 +26,7 @@ logproto_sources = \
lib/logproto/logproto-framed-server.c \
lib/logproto/logproto-text-client.c \
lib/logproto/logproto-text-server.c \
lib/logproto/logproto-auto-server.c \
lib/logproto/logproto-multiline-server.c \
lib/logproto/logproto-record-server.c \
lib/logproto/logproto-builtins.c
Expand Down
256 changes: 256 additions & 0 deletions lib/logproto/logproto-auto-server.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
/*
* Copyright (c) 2024 Balázs Scheidler <[email protected]>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*
* As an additional exemption you are allowed to compile & link against the
* OpenSSL libraries as published by the OpenSSL project. See the file
* COPYING for details.
*
*/
#include "logproto-auto-server.h"
#include "logproto-text-server.h"
#include "logproto-framed-server.h"
#include "messages.h"

#include "transport/transport-haproxy.h"

enum
{
LPAS_FAILURE,
LPAS_NEED_MORE_DATA,
LPAS_SUCCESS,
};

typedef struct _LogProtoAutoServer
{
LogProtoServer super;

gboolean tls_detected;
gboolean haproxy_detected;
} LogProtoAutoServer;

static LogProtoServer *
_construct_detected_proto(LogProtoAutoServer *self, const gchar *detect_buffer, gsize detect_buffer_len)
{
gint fd = self->super.transport_stack.fd;

if (g_ascii_isdigit(detect_buffer[0]))
{
msg_debug("Auto-detected octet-counted-framing on RFC6587 connection, using framed protocol",
evt_tag_int("fd", fd));
return log_proto_framed_server_new(NULL, self->super.options);
}
if (detect_buffer[0] == '<')
{
msg_debug("Auto-detected non-transparent-framing on RFC6587 connection, using simple text protocol",
evt_tag_int("fd", fd));
}
else
{
msg_debug("Unable to detect framing on RFC6587 connection, falling back to simple text transport",
evt_tag_int("fd", fd),
evt_tag_mem("detect_buffer", detect_buffer, detect_buffer_len));
}
return log_proto_text_server_new(NULL, self->super.options);
}

static gint
_is_tls_client_hello(const gchar *buf, gsize buf_len)
{
/*
* The first message on a TLS connection must be the client_hello, which
* is a type of handshake record, and it cannot be compressed or
* encrypted. A plaintext record has this format:
*
* 0 byte record_type // 0x16 = handshake
* 1 byte major // major protocol version
* 2 byte minor // minor protocol version
* 3-4 uint16 length // size of the payload
* 5 byte handshake_type // 0x01 = client_hello
* 6 uint24 length // size of the ClientHello
* 9 byte major // major protocol version
* 10 byte minor // minor protocol version
* 11 uint32 gmt_unix_time
* 15 byte random_bytes[28]
* ...
*/
if (buf_len < 1)
return LPAS_NEED_MORE_DATA;

/* 0x16 indicates a TLS handshake */
if (buf[0] != 0x16)
return LPAS_FAILURE;

if (buf_len < 5)
return LPAS_NEED_MORE_DATA;

guint32 record_len = (buf[3] << 8) + buf[4];

/* client_hello is at least 34 bytes */
if (record_len < 34)
return LPAS_FAILURE;

if (buf_len < 6)
return LPAS_NEED_MORE_DATA;

/* is the handshake_type 0x01 == client_hello */
if (buf[5] != 0x01)
return FALSE;

if (buf_len < 9)
return LPAS_NEED_MORE_DATA;

guint32 payload_size = (buf[6] << 16) + (buf[7] << 8) + buf[8];

/* The message payload can't be bigger than the enclosing record */
if (payload_size + 4 > record_len)
return LPAS_FAILURE;
return LPAS_SUCCESS;
}

static gint
_is_haproxy_header(const gchar *buf, gsize buf_len)
{
const gchar proxy_v1_signature[] = { 0x50, 0x52, 0x4F, 0x58, 0x59, 0x20 };
const gchar proxy_v2_signature[] = { 0x0D, 0x0A, 0x0D, 0x0A, 0x00, 0x0D, 0x0A, 0x51, 0x55, 0x49, 0x54, 0x0A };

if (buf_len < sizeof(proxy_v1_signature))
return LPAS_NEED_MORE_DATA;

if (memcmp(buf, proxy_v1_signature, sizeof(proxy_v1_signature)) == 0)
return LPAS_SUCCESS;

if (buf_len < sizeof(proxy_v2_signature))
return LPAS_NEED_MORE_DATA;

if (memcmp(buf, proxy_v2_signature, sizeof(proxy_v2_signature)) != 0)
return LPAS_FAILURE;

if (buf_len < sizeof(proxy_v2_signature) + 1)
return LPAS_NEED_MORE_DATA;

if ((buf[sizeof(proxy_v2_signature)] & 0xf0) != 0x20)
return LPAS_FAILURE;
return LPAS_SUCCESS;
}

static LogProtoPrepareAction
log_proto_auto_server_poll_prepare(LogProtoServer *s, GIOCondition *cond, gint *timeout G_GNUC_UNUSED)
{
LogProtoAutoServer *self = (LogProtoAutoServer *) s;

if (log_transport_stack_poll_prepare(&self->super.transport_stack, cond))
return LPPA_FORCE_SCHEDULE_FETCH;

if (*cond == 0)
*cond = G_IO_IN;

return LPPA_POLL_IO;
}

static LogProtoStatus
log_proto_auto_handshake(LogProtoServer *s, gboolean *handshake_finished, LogProtoServer **proto_replacement)
{
LogProtoAutoServer *self = (LogProtoAutoServer *) s;
gchar detect_buffer[16];
gboolean moved_forward;
gint rc;

rc = log_transport_stack_read_ahead(&self->super.transport_stack, detect_buffer, sizeof(detect_buffer), &moved_forward);
if (rc == 0)
return LPS_EOF;
else if (rc < 0)
{
if (errno == EAGAIN)
return LPS_AGAIN;
return LPS_ERROR;
}

if (!self->tls_detected)
{
switch (_is_tls_client_hello(detect_buffer, rc))
{
case LPAS_NEED_MORE_DATA:
if (moved_forward)
return LPS_AGAIN;
break;
case LPAS_SUCCESS:
self->tls_detected = TRUE;
/* this is a TLS handshake! let's switch to TLS */
if (log_transport_stack_switch(&self->super.transport_stack, LOG_TRANSPORT_TLS))
{
msg_debug("TLS handshake detected, switching to TLS");
return LPS_AGAIN;
}
else
{
msg_error("TLS handshake detected, unable to switch to TLS, no tls() options specified");
return LPS_ERROR;
}
break;
default:
break;
}
}
if (!self->haproxy_detected)
{
switch (_is_haproxy_header(detect_buffer, rc))
{
case LPAS_NEED_MORE_DATA:
if (moved_forward)
return LPS_AGAIN;
break;
case LPAS_SUCCESS:
self->haproxy_detected = TRUE;

/* FIXME: make this a factory */
log_transport_stack_add_transport(&self->super.transport_stack, LOG_TRANSPORT_HAPROXY,
log_transport_haproxy_new(self->super.transport_stack.active_transport, self->super.transport_stack.active_transport));

/* this is a haproxy header */
if (log_transport_stack_switch(&self->super.transport_stack, LOG_TRANSPORT_HAPROXY))
{
msg_debug("HAProxy header detected, switching to haproxy");
return LPS_AGAIN;
}
else
{
msg_error("HAProxy header detected, but haproxy transport is not set up");
return LPS_ERROR;
}
break;
default:
break;

}
}
*proto_replacement = _construct_detected_proto(self, detect_buffer, rc);
return LPS_SUCCESS;
}

LogProtoServer *
log_proto_auto_server_new(LogTransport *transport, const LogProtoServerOptions *options)
{
LogProtoAutoServer *self = g_new0(LogProtoAutoServer, 1);

/* we are not using our own transport stack, transport is to be passed to
* the LogProto implementation once we finished with detection */

log_proto_server_init(&self->super, transport, options);
self->super.handshake = log_proto_auto_handshake;
self->super.poll_prepare = log_proto_auto_server_poll_prepare;
return &self->super;
}
30 changes: 30 additions & 0 deletions lib/logproto/logproto-auto-server.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (c) 2024 Balázs Scheidler <[email protected]>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*
* As an additional exemption you are allowed to compile & link against the
* OpenSSL libraries as published by the OpenSSL project. See the file
* COPYING for details.
*
*/
#ifndef LOGPROTO_AUTO_SERVER_H_INCLUDED
#define LOGPROTO_AUTO_SERVER_H_INCLUDED

#include "logproto-server.h"

LogProtoServer *log_proto_auto_server_new(LogTransport *transport, const LogProtoServerOptions *options);

#endif
17 changes: 7 additions & 10 deletions lib/logproto/logproto-buffered-server.c
Original file line number Diff line number Diff line change
Expand Up @@ -179,10 +179,9 @@ log_proto_buffered_server_apply_state(LogProtoBufferedServer *self, PersistEntry
struct stat st;
gint64 ofs = 0;
LogProtoBufferedServerState *state;
LogTransport *transport = log_transport_stack_get_active(&self->super.transport_stack);
gint fd;

fd = transport->fd;
fd = self->super.transport_stack.fd;
self->persist_handle = handle;

if (fstat(fd, &st) < 0)
Expand Down Expand Up @@ -255,7 +254,7 @@ log_proto_buffered_server_apply_state(LogProtoBufferedServer *self, PersistEntry
raw_buffer = g_alloca(state->raw_buffer_size);
}

rc = log_transport_read(transport, raw_buffer, state->raw_buffer_size, NULL);
rc = log_transport_stack_read(&self->super.transport_stack, raw_buffer, state->raw_buffer_size, NULL);
if (rc != state->raw_buffer_size)
{
msg_notice("Error re-reading buffer contents of the file to be continued, restarting from the beginning",
Expand Down Expand Up @@ -584,12 +583,12 @@ log_proto_buffered_server_restart_with_state(LogProtoServer *s, PersistState *pe
}

LogProtoPrepareAction
log_proto_buffered_server_prepare(LogProtoServer *s, GIOCondition *cond, gint *timeout G_GNUC_UNUSED)
log_proto_buffered_server_poll_prepare(LogProtoServer *s, GIOCondition *cond, gint *timeout G_GNUC_UNUSED)
{
LogProtoBufferedServer *self = (LogProtoBufferedServer *) s;
LogTransport *transport = log_transport_stack_get_active(&self->super.transport_stack);

*cond = transport->cond;
if (log_transport_stack_poll_prepare(&self->super.transport_stack, cond))
return LPPA_FORCE_SCHEDULE_FETCH;

/* if there's no pending I/O in the transport layer, then we want to do a read */
if (*cond == 0)
Expand All @@ -602,9 +601,7 @@ static gint
log_proto_buffered_server_read_data_method(LogProtoBufferedServer *self, guchar *buf, gsize len,
LogTransportAuxData *aux)
{
LogTransport *transport = log_transport_stack_get_active(&self->super.transport_stack);

return log_transport_read(transport, buf, len, aux);
return log_transport_stack_read(&self->super.transport_stack, buf, len, aux);
}

static void
Expand Down Expand Up @@ -1081,7 +1078,7 @@ log_proto_buffered_server_init(LogProtoBufferedServer *self, LogTransport *trans
const LogProtoServerOptions *options)
{
log_proto_server_init(&self->super, transport, options);
self->super.prepare = log_proto_buffered_server_prepare;
self->super.poll_prepare = log_proto_buffered_server_poll_prepare;
self->super.fetch = log_proto_buffered_server_fetch;
self->super.free_fn = log_proto_buffered_server_free_method;
self->super.restart_with_state = log_proto_buffered_server_restart_with_state;
Expand Down
4 changes: 2 additions & 2 deletions lib/logproto/logproto-buffered-server.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ log_proto_buffered_server_cue_flush(LogProtoBufferedServer *self)
self->flush_partial_message = TRUE;
}

LogProtoPrepareAction log_proto_buffered_server_prepare(LogProtoServer *s, GIOCondition *cond,
gint *timeout G_GNUC_UNUSED);
LogProtoPrepareAction log_proto_buffered_server_poll_prepare(LogProtoServer *s, GIOCondition *cond,
gint *timeout G_GNUC_UNUSED);
LogProtoBufferedServerState *log_proto_buffered_server_get_state(LogProtoBufferedServer *self);
void log_proto_buffered_server_put_state(LogProtoBufferedServer *self);

Expand Down
Loading
Loading