Skip to content

Commit

Permalink
output/log: Add flushing infrastructure
Browse files Browse the repository at this point in the history
Issue: 3449

Add flushing functions and infrastructure. This includes:
- Flushing functions for packet loggers
- Log file flushing support
  • Loading branch information
jlucovsky committed Dec 14, 2024
1 parent 5c5ea66 commit dec0d0a
Show file tree
Hide file tree
Showing 11 changed files with 78 additions and 38 deletions.
2 changes: 1 addition & 1 deletion src/output-eve-stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ void EveStreamLogRegister(void)
{
OutputPacketLoggerFunctions output_logger_functions = {
.LogFunc = EveStreamLogger,
.FlushFunc = NULL,
.FlushFunc = OutputJsonLogFlush,
.ConditionFunc = EveStreamLogCondition,
.ThreadInitFunc = EveStreamLogThreadInit,
.ThreadDeinitFunc = EveStreamLogThreadDeinit,
Expand Down
10 changes: 9 additions & 1 deletion src/output-json-alert.c
Original file line number Diff line number Diff line change
Expand Up @@ -823,6 +823,14 @@ static int AlertJsonDecoderEvent(ThreadVars *tv, JsonAlertLogThread *aft, const
return TM_ECODE_OK;
}

static int JsonAlertFlush(ThreadVars *tv, void *thread_data, const Packet *p)
{
JsonAlertLogThread *aft = thread_data;
SCLogDebug("%s flushing %s", tv->name, ((LogFileCtx *)(aft->ctx->file_ctx))->filename);
OutputJsonFlush(aft->ctx);
return 0;
}

static int JsonAlertLogger(ThreadVars *tv, void *thread_data, const Packet *p)
{
JsonAlertLogThread *aft = thread_data;
Expand Down Expand Up @@ -1067,7 +1075,7 @@ void JsonAlertLogRegister (void)
{
OutputPacketLoggerFunctions output_logger_functions = {
.LogFunc = JsonAlertLogger,
.FlushFunc = NULL,
.FlushFunc = JsonAlertFlush,
.ConditionFunc = JsonAlertLogCondition,
.ThreadInitFunc = JsonAlertLogThreadInit,
.ThreadDeinitFunc = JsonAlertLogThreadDeinit,
Expand Down
10 changes: 9 additions & 1 deletion src/output-json-anomaly.c
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,14 @@ static int AnomalyJson(ThreadVars *tv, JsonAnomalyLogThread *aft, const Packet *
return rc;
}

static int JsonAnomalyFlush(ThreadVars *tv, void *thread_data, const Packet *p)
{
JsonAnomalyLogThread *aft = thread_data;
SCLogDebug("%s flushing %s", tv->name, ((LogFileCtx *)(aft->ctx->file_ctx))->filename);
OutputJsonFlush(aft->ctx);
return 0;
}

static int JsonAnomalyLogger(ThreadVars *tv, void *thread_data, const Packet *p)
{
JsonAnomalyLogThread *aft = thread_data;
Expand Down Expand Up @@ -451,7 +459,7 @@ void JsonAnomalyLogRegister (void)
{
OutputPacketLoggerFunctions output_logger_functions = {
.LogFunc = JsonAnomalyLogger,
.FlushFunc = NULL,
.FlushFunc = JsonAnomalyFlush,
.ConditionFunc = JsonAnomalyLogCondition,
.ThreadInitFunc = JsonAnomalyLogThreadInit,
.ThreadDeinitFunc = JsonAnomalyLogThreadDeinit,
Expand Down
9 changes: 9 additions & 0 deletions src/output-json-common.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,15 @@ static void OutputJsonLogDeInitCtxSub(OutputCtx *output_ctx)
SCFree(output_ctx);
}

int OutputJsonLogFlush(ThreadVars *tv, void *thread_data, const Packet *p)
{
OutputJsonThreadCtx *aft = thread_data;
LogFileCtx *file_ctx = aft->ctx->file_ctx;
SCLogDebug("%s flushing %s", tv->name, file_ctx->filename);
LogFileFlush(file_ctx);
return 0;
}

OutputInitResult OutputJsonLogInitSub(ConfNode *conf, OutputCtx *parent_ctx)
{
OutputInitResult result = { NULL, false };
Expand Down
2 changes: 1 addition & 1 deletion src/output-json-drop.c
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ void JsonDropLogRegister (void)
{
OutputPacketLoggerFunctions output_logger_functions = {
.LogFunc = JsonDropLogger,
.FlushFunc = NULL,
.FlushFunc = OutputJsonLogFlush,
.ConditionFunc = JsonDropLogCondition,
.ThreadInitFunc = JsonDropLogThreadInit,
.ThreadDeinitFunc = JsonDropLogThreadDeinit,
Expand Down
2 changes: 1 addition & 1 deletion src/output-json-frame.c
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ void JsonFrameLogRegister(void)
{
OutputPacketLoggerFunctions output_logger_functions = {
.LogFunc = JsonFrameLogger,
.FlushFunc = NULL,
.FlushFunc = OutputJsonLogFlush,
.ConditionFunc = JsonFrameLogCondition,
.ThreadInitFunc = JsonFrameLogThreadInit,
.ThreadDeinitFunc = JsonFrameLogThreadDeinit,
Expand Down
2 changes: 1 addition & 1 deletion src/output-json-metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ void JsonMetadataLogRegister (void)
{
OutputPacketLoggerFunctions output_logger_functions = {
.LogFunc = JsonMetadataLogger,
.FlushFunc = NULL,
.FlushFunc = OutputJsonLogFlush,
.ConditionFunc = JsonMetadataLogCondition,
.ThreadInitFunc = JsonLogThreadInit,
.ThreadDeinitFunc = JsonLogThreadDeinit,
Expand Down
6 changes: 6 additions & 0 deletions src/output-json.c
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,12 @@ int OutputJSONBuffer(json_t *js, LogFileCtx *file_ctx, MemBuffer **buffer)
return 0;
}

void OutputJsonFlush(OutputJsonThreadCtx *ctx)
{
LogFileCtx *file_ctx = ctx->file_ctx;
LogFileFlush(file_ctx);
}

void OutputJsonBuilderBuffer(
ThreadVars *tv, const Packet *p, Flow *f, JsonBuilder *js, OutputJsonThreadCtx *ctx)
{
Expand Down
2 changes: 2 additions & 0 deletions src/output-json.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,14 @@ TmEcode JsonLogThreadDeinit(ThreadVars *t, void *data);

void EveAddCommonOptions(const OutputJsonCommonSettings *cfg, const Packet *p, const Flow *f,
JsonBuilder *js, enum OutputJsonLogDirection dir);
int OutputJsonLogFlush(ThreadVars *tv, void *thread_data, const Packet *p);
void EveAddMetadata(const Packet *p, const Flow *f, JsonBuilder *js);

int OutputJSONMemBufferCallback(const char *str, size_t size, void *data);

OutputJsonThreadCtx *CreateEveThreadCtx(ThreadVars *t, OutputJsonCtx *ctx);
void FreeEveThreadCtx(OutputJsonThreadCtx *ctx);
void JSONFormatAndAddMACAddr(JsonBuilder *js, const char *key, const uint8_t *val, bool is_array);
void OutputJsonFlush(OutputJsonThreadCtx *ctx);

#endif /* SURICATA_OUTPUT_JSON_H */
66 changes: 34 additions & 32 deletions src/util-logopenfile.c
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ static int SCLogUnixSocketReconnect(LogFileCtx *log_ctx)
log_ctx->fp = SCLogOpenUnixSocketFp(log_ctx->filename, log_ctx->sock_type, 0);
if (log_ctx->fp) {
/* Connected at last (or reconnected) */
SCLogNotice("Reconnected socket \"%s\"", log_ctx->filename);
SCLogDebug("Reconnected socket \"%s\"", log_ctx->filename);
} else if (disconnected) {
SCLogWarning("Reconnect failed: %s (will keep trying)", strerror(errno));
}
Expand Down Expand Up @@ -189,6 +189,22 @@ static inline void OutputWriteLock(pthread_mutex_t *m)

}

/**
* \brief Flush a log file.
*/
static void SCLogFileFlushNoLock(LogFileCtx *log_ctx)
{
log_ctx->bytes_since_last_flush = 0;
SCFflushUnlocked(log_ctx->fp);
}

static void SCLogFileFlush(LogFileCtx *log_ctx)
{
OutputWriteLock(&log_ctx->fp_mutex);
SCLogFileFlushNoLock(log_ctx);
SCMutexUnlock(&log_ctx->fp_mutex);
}

/**
* \brief Write buffer to log file.
* \retval 0 on failure; otherwise, the return value of fwrite_unlocked (number of
Expand Down Expand Up @@ -224,8 +240,15 @@ static int SCLogFileWriteNoLock(const char *buffer, int buffer_len, LogFileCtx *
log_ctx->filename);
}
log_ctx->output_errors++;
} else if (log_ctx->buffer_size) {
SCFflushUnlocked(log_ctx->fp);
return ret;
}

log_ctx->bytes_since_last_flush += buffer_len;

if (log_ctx->buffer_size && log_ctx->bytes_since_last_flush >= log_ctx->buffer_size) {
SCLogDebug("%s: flushing %" PRIu64 " during write", log_ctx->filename,
log_ctx->bytes_since_last_flush);
SCLogFileFlushNoLock(log_ctx);
}
}

Expand All @@ -248,35 +271,7 @@ static int SCLogFileWrite(const char *buffer, int buffer_len, LogFileCtx *log_ct
} else
#endif
{

/* Check for rotation. */
if (log_ctx->rotation_flag) {
log_ctx->rotation_flag = 0;
SCConfLogReopen(log_ctx);
}

if (log_ctx->flags & LOGFILE_ROTATE_INTERVAL) {
time_t now = time(NULL);
if (now >= log_ctx->rotate_time) {
SCConfLogReopen(log_ctx);
log_ctx->rotate_time = now + log_ctx->rotate_interval;
}
}

if (log_ctx->fp) {
clearerr(log_ctx->fp);
if (1 != fwrite(buffer, buffer_len, 1, log_ctx->fp)) {
/* Only the first error is logged */
if (!log_ctx->output_errors) {
SCLogError("%s error while writing to %s",
ferror(log_ctx->fp) ? strerror(errno) : "unknown error",
log_ctx->filename);
}
log_ctx->output_errors++;
} else {
fflush(log_ctx->fp);
}
}
ret = SCLogFileWriteNoLock(buffer, buffer_len, log_ctx);
}

SCMutexUnlock(&log_ctx->fp_mutex);
Expand Down Expand Up @@ -706,6 +701,7 @@ LogFileCtx *LogFileNewCtx(void)

lf_ctx->Write = SCLogFileWrite;
lf_ctx->Close = SCLogFileClose;
lf_ctx->Flush = SCLogFileFlush;

return lf_ctx;
}
Expand Down Expand Up @@ -970,6 +966,12 @@ int LogFileFreeCtx(LogFileCtx *lf_ctx)
SCReturnInt(1);
}

void LogFileFlush(LogFileCtx *file_ctx)
{
SCLogDebug("%s: bytes-to-flush %ld", file_ctx->filename, file_ctx->bytes_since_last_flush);
file_ctx->Flush(file_ctx);
}

int LogFileWrite(LogFileCtx *file_ctx, MemBuffer *buffer)
{
if (file_ctx->type == LOGFILE_TYPE_FILE || file_ctx->type == LOGFILE_TYPE_UNIX_DGRAM ||
Expand Down
5 changes: 5 additions & 0 deletions src/util-logopenfile.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ typedef struct LogFileCtx_ {

int (*Write)(const char *buffer, int buffer_len, struct LogFileCtx_ *fp);
void (*Close)(struct LogFileCtx_ *fp);
void (*Flush)(struct LogFileCtx_ *fp);

LogFileTypeCtx filetype;

Expand Down Expand Up @@ -159,6 +160,9 @@ typedef struct LogFileCtx_ {
uint64_t dropped;

uint64_t output_errors;

/* Track buffered content */
uint64_t bytes_since_last_flush;
} LogFileCtx;

/* Min time (msecs) before trying to reconnect a Unix domain socket */
Expand All @@ -173,6 +177,7 @@ typedef struct LogFileCtx_ {
LogFileCtx *LogFileNewCtx(void);
int LogFileFreeCtx(LogFileCtx *);
int LogFileWrite(LogFileCtx *file_ctx, MemBuffer *buffer);
void LogFileFlush(LogFileCtx *file_ctx);

LogFileCtx *LogFileEnsureExists(ThreadId thread_id, LogFileCtx *lf_ctx);
int SCConfLogOpenGeneric(ConfNode *conf, LogFileCtx *, const char *, int);
Expand Down

0 comments on commit dec0d0a

Please sign in to comment.