Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix systemd integration #112

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 36 additions & 10 deletions logsource_systemd.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
//go:build !nosystemd && linux
// +build !nosystemd,linux

package main

import (
"context"
"errors"
"fmt"
"io"
"log"
Expand All @@ -30,10 +32,13 @@ type SystemdJournal interface {
AddMatch(match string) error
GetEntry() (*sdjournal.JournalEntry, error)
Next() (uint64, error)
SeekRealtimeUsec(usec uint64) error
SeekTail() error
PreviousSkip(skip uint64) (uint64, error)
Wait(timeout time.Duration) int
}

var SystemdNoMoreEntries = errors.New("No more journal entries")

// NewSystemdLogSource returns a log source for reading Systemd
// journal entries. `unit` and `slice` provide filtering if non-empty
// (with `slice` taking precedence).
Expand All @@ -52,12 +57,7 @@ func NewSystemdLogSource(j SystemdJournal, path, unit, slice string) (*SystemdLo
}

// Start at end of journal
if err := logSrc.journal.SeekRealtimeUsec(uint64(timeNow().UnixNano() / 1000)); err != nil {
logSrc.journal.Close()
return nil, err
}

if r := logSrc.journal.Wait(1 * time.Second); r < 0 {
if err := logSrc.journal.SeekTail(); err != nil {
logSrc.journal.Close()
return nil, err
}
Expand All @@ -74,12 +74,36 @@ func (s *SystemdLogSource) Path() string {
}

func (s *SystemdLogSource) Read(ctx context.Context) (string, error) {
// wait for any changes in any journal file
r := s.journal.Wait(10 * time.Second) // max wait 10 seconds
if r < 0 {
s.journal.Close()
return "", fmt.Errorf("sd_journal.wait returned %d", r)
}
if r == sdjournal.SD_JOURNAL_INVALIDATE {
// the first wait call seems to initialize the watch and results always in INVALIDATE
// seek again to the end of the journal
if err := s.journal.SeekTail(); err != nil {
return "", err
}
// go back to the last entry, so that next() will advance the pointer to the new entry
_, err := s.journal.PreviousSkip(1)
if err != nil {
return "", err
}
} else if r == sdjournal.SD_JOURNAL_NOP {
// wait timed out without any changes in the journal
return "", SystemdNoMoreEntries
}

c, err := s.journal.Next()
if err != nil {
return "", err
}
if c == 0 {
return "", io.EOF
// we might get triggered by journal changes, which are unrelated to our matches (unit)
// in that case, we are still at the end of the journal, but no new entry has been added for us
return "", SystemdNoMoreEntries
}

e, err := s.journal.GetEntry()
Expand All @@ -88,14 +112,16 @@ func (s *SystemdLogSource) Read(ctx context.Context) (string, error) {
}
ts := time.Unix(0, int64(e.RealtimeTimestamp)*int64(time.Microsecond))

return fmt.Sprintf(
entry := fmt.Sprintf(
"%s %s %s[%s]: %s",
ts.Format(time.Stamp),
e.Fields["_HOSTNAME"],
e.Fields["SYSLOG_IDENTIFIER"],
e.Fields["_PID"],
e.Fields["MESSAGE"],
), nil
)
//log.Printf("Found entry: %s\n", entry)
return entry, nil
}

// A systemdLogSourceFactory is a factory that can create
Expand Down
35 changes: 24 additions & 11 deletions logsource_systemd_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
//go:build !nosystemd && linux
// +build !nosystemd,linux

package main

import (
"context"
"io"
"os"
"testing"
"time"
Expand All @@ -21,8 +21,8 @@ func TestNewSystemdLogSource(t *testing.T) {
}

assert.Equal(t, []string{"_SYSTEMD_SLICE=aslice"}, j.addMatchCalls, "A match should be added for slice.")
assert.Equal(t, []uint64{1234567890000000}, j.seekRealtimeUsecCalls, "A call to SeekRealtimeUsec should be made.")
assert.Equal(t, []time.Duration{1 * time.Second}, j.waitCalls, "A call to Wait should be made.")
assert.Equal(t, 1, j.seekTailCalls, "A call to SeekTail should be made.")
assert.Equal(t, 0, len(j.waitCalls), "No call to Wait should be made yet.")

if err := src.Close(); err != nil {
t.Fatalf("Close failed: %v", err)
Expand Down Expand Up @@ -69,6 +69,9 @@ func TestSystemdLogSource_Read(t *testing.T) {
if err != nil {
t.Fatalf("Read failed: %v", err)
}
assert.Equal(t, []time.Duration{10 * time.Second}, j.waitCalls, "A Wait call should be made")
assert.Equal(t, 2, j.seekTailCalls, "Two seekTail calls expected")
assert.Equal(t, []uint64{1}, j.previousSkipCalls, "One previousSkipCall expected.")
assert.Equal(t, "Feb 13 23:31:30 ahost anid[123]: aline", s, "Read should get data from the journal entry.")
}

Expand All @@ -85,7 +88,7 @@ func TestSystemdLogSource_ReadEOF(t *testing.T) {
defer src.Close()

_, err = src.Read(ctx)
assert.Equal(t, io.EOF, err, "Should interpret Next 0 as EOF.")
assert.Equal(t, SystemdNoMoreEntries, err, "Should interpret Next 0 as no more entries.")
}

func TestMain(m *testing.M) {
Expand All @@ -105,10 +108,11 @@ type fakeSystemdJournal struct {
nextValues []uint64
nextError error

addMatchCalls []string
closeCalls int
seekRealtimeUsecCalls []uint64
waitCalls []time.Duration
addMatchCalls []string
closeCalls int
seekTailCalls int
previousSkipCalls []uint64
waitCalls []time.Duration
}

func (j *fakeSystemdJournal) AddMatch(match string) error {
Expand Down Expand Up @@ -139,12 +143,21 @@ func (j *fakeSystemdJournal) Next() (uint64, error) {
return v, nil
}

func (j *fakeSystemdJournal) SeekRealtimeUsec(usec uint64) error {
j.seekRealtimeUsecCalls = append(j.seekRealtimeUsecCalls, usec)
func (j *fakeSystemdJournal) SeekTail() error {
j.seekTailCalls++
return nil
}

func (j *fakeSystemdJournal) PreviousSkip(skip uint64) (uint64, error) {
j.previousSkipCalls = append(j.previousSkipCalls, skip)
return skip, nil
}

func (j *fakeSystemdJournal) Wait(timeout time.Duration) int {
j.waitCalls = append(j.waitCalls, timeout)
return 0
if len(j.waitCalls) == 1 {
// first wait call
return sdjournal.SD_JOURNAL_INVALIDATE
}
return sdjournal.SD_JOURNAL_APPEND
}
9 changes: 6 additions & 3 deletions postfix_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func CollectShowqFromSocket(path string, ch chan<- prometheus.Metric) error {

// Patterns for parsing log messages.
var (
logLine = regexp.MustCompile(` ?(postfix|opendkim)(/(\w+))?\[\d+\]: ((?:(warning|error|fatal|panic): )?.*)`)
logLine = regexp.MustCompile(` ?(postfix|opendkim)(/(\w+)){0,2}\[\d+\]: ((?:(warning|error|fatal|panic): )?.*)`)
lmtpPipeSMTPLine = regexp.MustCompile(`, relay=(\S+), .*, delays=([0-9\.]+)/([0-9\.]+)/([0-9\.]+)/([0-9\.]+), `)
qmgrInsertLine = regexp.MustCompile(`:.*, size=(\d+), nrcpt=(\d+) `)
qmgrExpiredLine = regexp.MustCompile(`:.*, status=(expired|force-expired), returned to sender`)
Expand All @@ -311,6 +311,9 @@ var (

// CollectFromLogline collects metrict from a Postfix log line.
func (e *PostfixExporter) CollectFromLogLine(line string) {
if line == "" {
return
}
// Strip off timestamp, hostname, etc.
logMatches := logLine.FindStringSubmatch(line)

Expand Down Expand Up @@ -683,10 +686,10 @@ func (e *PostfixExporter) StartMetricCollection(ctx context.Context) {
for {
line, err := e.logSrc.Read(ctx)
if err != nil {
if err != io.EOF {
if err != SystemdNoMoreEntries {
log.Printf("Couldn't read journal: %v", err)
return
}
return
}
e.CollectFromLogLine(line)
gauge.Set(1)
Expand Down
3 changes: 2 additions & 1 deletion postfix_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,9 @@ func TestPostfixExporter_CollectFromLogline(t *testing.T) {
"Apr 26 10:55:19 tcc1 postfix/smtpd[21126]: warning: SASL authentication failure: cannot connect to saslauthd server: Permission denied",
"Apr 26 10:55:19 tcc1 postfix/smtpd[21126]: warning: SASL authentication failure: Password verification failed",
"Apr 26 10:55:19 tcc1 postfix/smtpd[21126]: warning: laptop.local[192.168.1.2]: SASL PLAIN authentication failed: generic failure",
"Nov 4 19:30:11 tcc1 postfix/submissions/smtpd[284714]: warning: unknown[1.2.3.4]: SASL LOGIN authentication failed: authentication failure",
},
saslFailedCount: 1,
saslFailedCount: 2,
removedCount: 0,
},
fields: fields{
Expand Down