Skip to content

Commit

Permalink
limit message length by 64K
Browse files Browse the repository at this point in the history
  • Loading branch information
apetruhin committed Feb 16, 2024
1 parent 0599992 commit 4e0f364
Show file tree
Hide file tree
Showing 4 changed files with 415 additions and 197 deletions.
92 changes: 56 additions & 36 deletions multiline.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

var (
multilineCollectorTimeout = time.Millisecond * 100
multilineCollectorLimit = 64 * 1024
)

type Message struct {
Expand All @@ -19,20 +20,29 @@ type Message struct {
}

type MultilineCollector struct {
Messages chan Message
ts time.Time
level Level
lines []string
Messages chan Message

timeout time.Duration
limit int

ts time.Time
level Level
lines []string
size int

lock sync.Mutex
closed bool
lastReceiveTime time.Time

isFirstLineContainsTimestamp bool
timeout time.Duration
lastReceiveTime time.Time
closed bool
lock sync.Mutex
pythonTraceback bool
pythonTracebackExpected bool
}

func NewMultilineCollector(ctx context.Context, timeout time.Duration) *MultilineCollector {
func NewMultilineCollector(ctx context.Context, timeout time.Duration, limit int) *MultilineCollector {
m := &MultilineCollector{
timeout: timeout,
limit: limit,
Messages: make(chan Message, 1),
}
go m.dispatch(ctx)
Expand Down Expand Up @@ -74,13 +84,19 @@ func (m *MultilineCollector) Add(entry LogEntry) {
}
return
}
if !m.isPrevMsgPart(entry.Content) {
if m.isNextMessage(entry.Content) {
pythonTraceback := m.pythonTraceback
m.flushMessage()
m.pythonTraceback = pythonTraceback
}
m.add(entry)
}

func (m *MultilineCollector) add(entry LogEntry) {
remaining := m.limit - m.size
if remaining <= 0 {
return
}
if len(m.lines) == 0 {
m.ts = entry.Timestamp
m.level = GuessLevel(entry.Content)
Expand All @@ -89,45 +105,46 @@ func (m *MultilineCollector) add(entry LogEntry) {
}
m.isFirstLineContainsTimestamp = containsTimestamp(entry.Content)
}
m.appendLine(entry.Content)
content := entry.Content
if len(content) > remaining {
content = content[:remaining]
}
m.lines = append(m.lines, content)
m.size += len(content) + 1
m.lastReceiveTime = time.Now()
}

func (m *MultilineCollector) appendLine(value string) {
m.lines = append(m.lines, value)
}

func (m *MultilineCollector) isPrevMsgPart(l string) bool {
if len(m.lines) == 0 {
return true
}

if m.isFirstLineContainsTimestamp {
if !containsTimestamp(l) {
return true
}
func (m *MultilineCollector) isNextMessage(l string) bool {
if l == "" || strings.HasPrefix(l, "\t") || strings.HasPrefix(l, " ") {
return false
}

if strings.HasPrefix(l, "\tat ") || strings.HasPrefix(l, "\t... ") {
return true
if m.isFirstLineContainsTimestamp {
return containsTimestamp(l)
}

if strings.HasPrefix(l, "Traceback (") || strings.HasPrefix(l, " File") || strings.HasPrefix(l, " ") {
return true
if strings.HasPrefix(l, "Caused by: ") {
return false
}

if strings.HasPrefix(l, "Caused by: ") {
return true
if strings.HasPrefix(l, "Traceback ") {
m.pythonTraceback = true
if m.pythonTracebackExpected {
m.pythonTracebackExpected = false
return false
}
return len(m.lines) > 0
}
if l == "The above exception was the direct cause of the following exception:" || l == "During handling of the above exception, another exception occurred:" {
return true
m.pythonTracebackExpected = true
return false
}
prevLine := m.lines[len(m.lines)-1]
if strings.HasPrefix(prevLine, " ") || strings.HasPrefix(prevLine, " File") || strings.HasSuffix(prevLine, "with root cause") {
return true
if m.pythonTraceback {
m.pythonTraceback = false
return false
}
return false

return true
}

func (m *MultilineCollector) flushMessage() {
Expand All @@ -149,7 +166,10 @@ func (m *MultilineCollector) flushMessage() {

func (m *MultilineCollector) reset() {
m.ts = time.Time{}
m.level = LevelUnknown
m.lines = m.lines[:0]
m.size = 0
m.isFirstLineContainsTimestamp = false
m.level = LevelUnknown
m.pythonTraceback = false
m.pythonTracebackExpected = false
}
Loading

0 comments on commit 4e0f364

Please sign in to comment.