diff --git a/config.yml b/config.yml index 3cc25714..29eeb190 100644 --- a/config.yml +++ b/config.yml @@ -17,7 +17,7 @@ global: text-jinja: "" worker: interval-monitor: 10 - buffer-size: 4096 + buffer-size: 8192 telemetry: enabled: false web-path: "/metrics" diff --git a/docs/loggers/logger_file.md b/docs/loggers/logger_file.md index 28a49119..e45e8955 100644 --- a/docs/loggers/logger_file.md +++ b/docs/loggers/logger_file.md @@ -26,6 +26,9 @@ Options: * `max-files` (integer) > maximum number of files to retain. Set to zero if you want to disable this feature +* `max-batch-size` (integer) + > accumulate data before writing it to the file + * `flush-interval` (integer) > flush buffer to log file every X seconds @@ -62,7 +65,8 @@ logfile: file-path: null max-size: 100 max-files: 10 - flush-interval: 10 + max-batch-size: 65536 + flush-interval: 1 compress: false compress-interval: 5 compress-postcommand: null diff --git a/pkgconfig/loggers.go b/pkgconfig/loggers.go index 55adc6a6..a664b56b 100644 --- a/pkgconfig/loggers.go +++ b/pkgconfig/loggers.go @@ -72,7 +72,8 @@ type ConfigLoggers struct { FilePath string `yaml:"file-path" default:""` MaxSize int `yaml:"max-size" default:"100"` MaxFiles int `yaml:"max-files" default:"10"` - FlushInterval int `yaml:"flush-interval" default:"10"` + MaxBatchSize int `yaml:"max-batch-size" default:"65536"` + FlushInterval int `yaml:"flush-interval" default:"1"` Compress bool `yaml:"compress" default:"false"` CompressInterval int `yaml:"compress-interval" default:"60"` CompressPostCommand string `yaml:"compress-postcommand" default:""` diff --git a/workers/logfile.go b/workers/logfile.go index d9bd1477..f7e4bdb7 100644 --- a/workers/logfile.go +++ b/workers/logfile.go @@ -158,8 +158,7 @@ func (w *LogFile) OpenFile() error { switch w.GetConfig().Loggers.LogFile.Mode { case pkgconfig.ModeText, pkgconfig.ModeJSON, pkgconfig.ModeFlatJSON: - bufferSize := 4096 - w.writerPlain = bufio.NewWriterSize(fd, bufferSize) + w.writerPlain = bufio.NewWriterSize(fd, w.config.Loggers.LogFile.MaxBatchSize) case pkgconfig.ModePCAP: w.writerPcap = pcapgo.NewWriter(fd) @@ -469,6 +468,11 @@ func (w *LogFile) StartLogging() { var data []byte var err error + // Max size of a batch before forcing a write + batch := new(bytes.Buffer) + maxBatchSize := w.config.Loggers.LogFile.MaxBatchSize + batchSize := 0 // Current batch size + for { select { case <-w.OnLoggerStopped(): @@ -476,6 +480,11 @@ func (w *LogFile) StartLogging() { flushTimer.Stop() w.commpressTimer.Stop() + // Force write remaining batch data + if batchSize > 0 { + w.WriteToPlain(batch.Bytes()) + } + // flush writer w.FlushWriters() @@ -494,18 +503,15 @@ func (w *LogFile) StartLogging() { return } - // write to file + // Process the message based on the configured mode + var message []byte switch w.GetConfig().Loggers.LogFile.Mode { // with basic text mode case pkgconfig.ModeText: - w.WriteToPlain(dm.Bytes(w.textFormat, - w.GetConfig().Global.TextFormatDelimiter, - w.GetConfig().Global.TextFormatBoundary)) - - var delimiter bytes.Buffer - delimiter.WriteString("\n") - w.WriteToPlain(delimiter.Bytes()) + message = dm.Bytes(w.textFormat, w.GetConfig().Global.TextFormatDelimiter, w.GetConfig().Global.TextFormatBoundary) + batch.Write(message) + batch.WriteString("\n") // with custom text mode case pkgconfig.ModeJinja: @@ -514,7 +520,7 @@ func (w *LogFile) StartLogging() { w.LogError("jinja template: %s", err) continue } - w.WriteToPlain([]byte(textLine)) + batch.Write([]byte(textLine)) // with json mode case pkgconfig.ModeFlatJSON: @@ -530,7 +536,7 @@ func (w *LogFile) StartLogging() { // with json mode case pkgconfig.ModeJSON: json.NewEncoder(buffer).Encode(dm) - w.WriteToPlain(buffer.Bytes()) + batch.Write(buffer.Bytes()) buffer.Reset() // with dnstap mode @@ -554,7 +560,24 @@ func (w *LogFile) StartLogging() { w.WriteToPcap(dm, pkt) } + // Update the batch size + batchSize += batch.Len() + + // If the batch exceeds the max size, force a write + if batchSize >= maxBatchSize { + w.WriteToPlain(batch.Bytes()) + batch.Reset() // Reset batch after write + batchSize = 0 + } + case <-flushTimer.C: + // Flush the current batch, then flush the writers + if batchSize > 0 { + w.WriteToPlain(batch.Bytes()) + batch.Reset() + batchSize = 0 + } + // flush writer w.FlushWriters()