From 414ac12ee63415eede46cb3084d755a6da6fba23 Mon Sep 17 00:00:00 2001 From: Andreas Dangel Date: Sat, 4 Nov 2023 19:51:37 +0100 Subject: [PATCH 1/2] systemd: re-seek to tail after wait Wait before every read. After invalidate, reposition at the last entry again, so that the following Next() call will position to the next new entry. --- logsource_systemd.go | 46 ++++++++++++++++++++++++++++++--------- logsource_systemd_test.go | 35 +++++++++++++++++++---------- postfix_exporter.go | 7 ++++-- 3 files changed, 65 insertions(+), 23 deletions(-) diff --git a/logsource_systemd.go b/logsource_systemd.go index 60f5cb6..89e299b 100644 --- a/logsource_systemd.go +++ b/logsource_systemd.go @@ -1,9 +1,11 @@ +//go:build !nosystemd && linux // +build !nosystemd,linux package main import ( "context" + "errors" "fmt" "io" "log" @@ -30,10 +32,13 @@ type SystemdJournal interface { AddMatch(match string) error GetEntry() (*sdjournal.JournalEntry, error) Next() (uint64, error) - SeekRealtimeUsec(usec uint64) error + SeekTail() error + PreviousSkip(skip uint64) (uint64, error) Wait(timeout time.Duration) int } +var SystemdNoMoreEntries = errors.New("No more journal entries") + // NewSystemdLogSource returns a log source for reading Systemd // journal entries. `unit` and `slice` provide filtering if non-empty // (with `slice` taking precedence). @@ -52,12 +57,7 @@ func NewSystemdLogSource(j SystemdJournal, path, unit, slice string) (*SystemdLo } // Start at end of journal - if err := logSrc.journal.SeekRealtimeUsec(uint64(timeNow().UnixNano() / 1000)); err != nil { - logSrc.journal.Close() - return nil, err - } - - if r := logSrc.journal.Wait(1 * time.Second); r < 0 { + if err := logSrc.journal.SeekTail(); err != nil { logSrc.journal.Close() return nil, err } @@ -74,12 +74,36 @@ func (s *SystemdLogSource) Path() string { } func (s *SystemdLogSource) Read(ctx context.Context) (string, error) { + // wait for any changes in any journal file + r := s.journal.Wait(10 * time.Second) // max wait 10 seconds + if r < 0 { + s.journal.Close() + return "", fmt.Errorf("sd_journal.wait returned %d", r) + } + if r == sdjournal.SD_JOURNAL_INVALIDATE { + // the first wait call seems to initialize the watch and results always in INVALIDATE + // seek again to the end of the journal + if err := s.journal.SeekTail(); err != nil { + return "", err + } + // go back to the last entry, so that next() will advance the pointer to the new entry + _, err := s.journal.PreviousSkip(1) + if err != nil { + return "", err + } + } else if r == sdjournal.SD_JOURNAL_NOP { + // wait timed out without any changes in the journal + return "", SystemdNoMoreEntries + } + c, err := s.journal.Next() if err != nil { return "", err } if c == 0 { - return "", io.EOF + // we might get triggered by journal changes, which are unrelated to our matches (unit) + // in that case, we are still at the end of the journal, but no new entry has been added for us + return "", SystemdNoMoreEntries } e, err := s.journal.GetEntry() @@ -88,14 +112,16 @@ func (s *SystemdLogSource) Read(ctx context.Context) (string, error) { } ts := time.Unix(0, int64(e.RealtimeTimestamp)*int64(time.Microsecond)) - return fmt.Sprintf( + entry := fmt.Sprintf( "%s %s %s[%s]: %s", ts.Format(time.Stamp), e.Fields["_HOSTNAME"], e.Fields["SYSLOG_IDENTIFIER"], e.Fields["_PID"], e.Fields["MESSAGE"], - ), nil + ) + //log.Printf("Found entry: %s\n", entry) + return entry, nil } // A systemdLogSourceFactory is a factory that can create diff --git a/logsource_systemd_test.go b/logsource_systemd_test.go index a1f9c4a..3b0d68a 100644 --- a/logsource_systemd_test.go +++ b/logsource_systemd_test.go @@ -1,10 +1,10 @@ +//go:build !nosystemd && linux // +build !nosystemd,linux package main import ( "context" - "io" "os" "testing" "time" @@ -21,8 +21,8 @@ func TestNewSystemdLogSource(t *testing.T) { } assert.Equal(t, []string{"_SYSTEMD_SLICE=aslice"}, j.addMatchCalls, "A match should be added for slice.") - assert.Equal(t, []uint64{1234567890000000}, j.seekRealtimeUsecCalls, "A call to SeekRealtimeUsec should be made.") - assert.Equal(t, []time.Duration{1 * time.Second}, j.waitCalls, "A call to Wait should be made.") + assert.Equal(t, 1, j.seekTailCalls, "A call to SeekTail should be made.") + assert.Equal(t, 0, len(j.waitCalls), "No call to Wait should be made yet.") if err := src.Close(); err != nil { t.Fatalf("Close failed: %v", err) @@ -69,6 +69,9 @@ func TestSystemdLogSource_Read(t *testing.T) { if err != nil { t.Fatalf("Read failed: %v", err) } + assert.Equal(t, []time.Duration{10 * time.Second}, j.waitCalls, "A Wait call should be made") + assert.Equal(t, 2, j.seekTailCalls, "Two seekTail calls expected") + assert.Equal(t, []uint64{1}, j.previousSkipCalls, "One previousSkipCall expected.") assert.Equal(t, "Feb 13 23:31:30 ahost anid[123]: aline", s, "Read should get data from the journal entry.") } @@ -85,7 +88,7 @@ func TestSystemdLogSource_ReadEOF(t *testing.T) { defer src.Close() _, err = src.Read(ctx) - assert.Equal(t, io.EOF, err, "Should interpret Next 0 as EOF.") + assert.Equal(t, SystemdNoMoreEntries, err, "Should interpret Next 0 as no more entries.") } func TestMain(m *testing.M) { @@ -105,10 +108,11 @@ type fakeSystemdJournal struct { nextValues []uint64 nextError error - addMatchCalls []string - closeCalls int - seekRealtimeUsecCalls []uint64 - waitCalls []time.Duration + addMatchCalls []string + closeCalls int + seekTailCalls int + previousSkipCalls []uint64 + waitCalls []time.Duration } func (j *fakeSystemdJournal) AddMatch(match string) error { @@ -139,12 +143,21 @@ func (j *fakeSystemdJournal) Next() (uint64, error) { return v, nil } -func (j *fakeSystemdJournal) SeekRealtimeUsec(usec uint64) error { - j.seekRealtimeUsecCalls = append(j.seekRealtimeUsecCalls, usec) +func (j *fakeSystemdJournal) SeekTail() error { + j.seekTailCalls++ return nil } +func (j *fakeSystemdJournal) PreviousSkip(skip uint64) (uint64, error) { + j.previousSkipCalls = append(j.previousSkipCalls, skip) + return skip, nil +} + func (j *fakeSystemdJournal) Wait(timeout time.Duration) int { j.waitCalls = append(j.waitCalls, timeout) - return 0 + if len(j.waitCalls) == 1 { + // first wait call + return sdjournal.SD_JOURNAL_INVALIDATE + } + return sdjournal.SD_JOURNAL_APPEND } diff --git a/postfix_exporter.go b/postfix_exporter.go index 56691cc..f20d99c 100644 --- a/postfix_exporter.go +++ b/postfix_exporter.go @@ -311,6 +311,9 @@ var ( // CollectFromLogline collects metrict from a Postfix log line. func (e *PostfixExporter) CollectFromLogLine(line string) { + if line == "" { + return + } // Strip off timestamp, hostname, etc. logMatches := logLine.FindStringSubmatch(line) @@ -683,10 +686,10 @@ func (e *PostfixExporter) StartMetricCollection(ctx context.Context) { for { line, err := e.logSrc.Read(ctx) if err != nil { - if err != io.EOF { + if err != SystemdNoMoreEntries { log.Printf("Couldn't read journal: %v", err) + return } - return } e.CollectFromLogLine(line) gauge.Set(1) From 5e575066bd50727f6ec7e581315c4f51bab217c6 Mon Sep 17 00:00:00 2001 From: Andreas Dangel Date: Sat, 4 Nov 2023 19:54:07 +0100 Subject: [PATCH 2/2] Support service names postfix/submission/smtpd patterns This is used in Debian. Note, that also --systemd.unit=postfix@-.service must be used in Debian. --- postfix_exporter.go | 2 +- postfix_exporter_test.go | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/postfix_exporter.go b/postfix_exporter.go index f20d99c..2a3f65d 100644 --- a/postfix_exporter.go +++ b/postfix_exporter.go @@ -292,7 +292,7 @@ func CollectShowqFromSocket(path string, ch chan<- prometheus.Metric) error { // Patterns for parsing log messages. var ( - logLine = regexp.MustCompile(` ?(postfix|opendkim)(/(\w+))?\[\d+\]: ((?:(warning|error|fatal|panic): )?.*)`) + logLine = regexp.MustCompile(` ?(postfix|opendkim)(/(\w+)){0,2}\[\d+\]: ((?:(warning|error|fatal|panic): )?.*)`) lmtpPipeSMTPLine = regexp.MustCompile(`, relay=(\S+), .*, delays=([0-9\.]+)/([0-9\.]+)/([0-9\.]+)/([0-9\.]+), `) qmgrInsertLine = regexp.MustCompile(`:.*, size=(\d+), nrcpt=(\d+) `) qmgrExpiredLine = regexp.MustCompile(`:.*, status=(expired|force-expired), returned to sender`) diff --git a/postfix_exporter_test.go b/postfix_exporter_test.go index 8f057a8..6c65461 100644 --- a/postfix_exporter_test.go +++ b/postfix_exporter_test.go @@ -133,8 +133,9 @@ func TestPostfixExporter_CollectFromLogline(t *testing.T) { "Apr 26 10:55:19 tcc1 postfix/smtpd[21126]: warning: SASL authentication failure: cannot connect to saslauthd server: Permission denied", "Apr 26 10:55:19 tcc1 postfix/smtpd[21126]: warning: SASL authentication failure: Password verification failed", "Apr 26 10:55:19 tcc1 postfix/smtpd[21126]: warning: laptop.local[192.168.1.2]: SASL PLAIN authentication failed: generic failure", + "Nov 4 19:30:11 tcc1 postfix/submissions/smtpd[284714]: warning: unknown[1.2.3.4]: SASL LOGIN authentication failed: authentication failure", }, - saslFailedCount: 1, + saslFailedCount: 2, removedCount: 0, }, fields: fields{