Skip to content

Commit

Permalink
chore(redactors): memory consumption improvements (#1332)
Browse files Browse the repository at this point in the history
* Document additional go tool profiling flags

* Add a regex cache to avoid compiling regular expressions all the time

* Reduce max buffer capacity

* Prefer bytes to strings

Strings are immutable and hence we need to create a new one
all the time when operation on them

* Some more changes

* More bytes

* Use writer.Write instead of fmt.FPrintf

* Clear regex cache when resetting redactors

* Logs errors when redactors error since they get swallowed

* Add an improvement comment

* Limit the number of goroutines spawned when redacting

* Minor improvement

* Write byte slices one at a time instead of concatenating them first

* Add a test for writeBytes

* Additional tests
  • Loading branch information
banjoh authored Sep 15, 2023
1 parent 514c86d commit 86279b4
Show file tree
Hide file tree
Showing 10 changed files with 227 additions and 81 deletions.
6 changes: 6 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ go tool pprof -http=":8000" cpu.prof
go tool pprof -http=":8001" mem.prof
```

**Additional flags for memory profiling**
- `inuse_space`: Amount of memory allocated and not released yet (default).
- `inuse_objects`: Amount of objects allocated and not released yet.
- `alloc_space`: Total amount of memory allocated (regardless of released).
- `alloc_objects`: Total amount of objects allocated (regardless of released).

More on profiling please visit https://go.dev/doc/diagnostics#profiling

## Contribution workflow
Expand Down
14 changes: 12 additions & 2 deletions pkg/collect/redact.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"bytes"
"compress/gzip"
"io"
"io/ioutil"
"os"
"path/filepath"
"strings"
Expand All @@ -17,18 +16,29 @@ import (
"k8s.io/klog/v2"
)

// Max number of concurrent redactors to run
// Ensure the number is low enough since each of the redactors
// also spawns goroutines to redact files in tar archives and
// other goroutines for each redactor spec.
const MAX_CONCURRENT_REDACTORS = 10

func RedactResult(bundlePath string, input CollectorResult, additionalRedactors []*troubleshootv1beta2.Redact) error {
wg := &sync.WaitGroup{}

// Error channel to capture errors from goroutines
errorCh := make(chan error, len(input))
limitCh := make(chan struct{}, MAX_CONCURRENT_REDACTORS)
defer close(limitCh)

for k, v := range input {
limitCh <- struct{}{}

wg.Add(1)

go func(file string, data []byte) {
defer wg.Done()
defer func() { <-limitCh }() // free up after the function execution has run

var reader io.Reader
if data == nil {

Expand Down Expand Up @@ -84,7 +94,7 @@ func RedactResult(bundlePath string, input CollectorResult, additionalRedactors
// If the file is .tar, .tgz or .tar.gz, it must not be redacted. Instead it is
// decompressed and each file inside the tar redacted and compressed back into the archive.
if filepath.Ext(file) == ".tar" || filepath.Ext(file) == ".tgz" || strings.HasSuffix(file, ".tar.gz") {
tmpDir, err := ioutil.TempDir("", "troubleshoot-subresult-")
tmpDir, err := os.MkdirTemp("", "troubleshoot-subresult-")
if err != nil {
errorCh <- errors.Wrap(err, "failed to create temp dir")
return
Expand Down
2 changes: 1 addition & 1 deletion pkg/collect/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (r CollectorResult) SaveResult(bundlePath string, relativePath string, read
return errors.Wrap(err, "failed to stat file")
}

klog.V(2).Infof("Added %q (%d MB) to bundle output", relativePath, fileInfo.Size()/(1024*1024))
klog.V(2).Infof("Added %q (%d KB) to bundle output", relativePath, fileInfo.Size()/(1024))
return nil
}

Expand Down
9 changes: 7 additions & 2 deletions pkg/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ const (
MESSAGE_TEXT_PADDING = 4
MESSAGE_TEXT_LINES_MARGIN_TO_BOTTOM = 4

// Bufio Reader Constants
MAX_BUFFER_CAPACITY = 1024 * 1024
// This is the initial size of the buffer allocated.
// Under the hood, an array of size N is allocated in memory
BUF_INIT_SIZE = 4096 // 4KB

// This is the muximum size the buffer can grow to
// Its not what the buffer will be allocated to initially
SCANNER_MAX_SIZE = 10 * 1024 * 1024 // 10MB
)
53 changes: 32 additions & 21 deletions pkg/redact/literal.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,26 @@ package redact

import (
"bufio"
"bytes"
"fmt"
"io"
"strings"

"github.com/replicatedhq/troubleshoot/pkg/constants"
"k8s.io/klog/v2"
)

type literalRedactor struct {
matchString string
filePath string
redactName string
isDefault bool
match []byte
filePath string
redactName string
isDefault bool
}

func literalString(matchString, path, name string) Redactor {
func literalString(match []byte, path, name string) Redactor {
return literalRedactor{
matchString: matchString,
filePath: path,
redactName: name,
match: match,
filePath: path,
redactName: name,
}
}

Expand All @@ -28,32 +31,37 @@ func (r literalRedactor) Redact(input io.Reader, path string) io.Reader {
go func() {
var err error
defer func() {
if err == io.EOF {
if err == nil || err == io.EOF {
writer.Close()
} else {
if err == bufio.ErrTooLong {
s := fmt.Sprintf("Error redacting %q. A line in the file exceeded %d MB max length", path, constants.SCANNER_MAX_SIZE/1024/1024)
klog.V(2).Info(s)
} else {
klog.V(2).Info(fmt.Sprintf("Error redacting %q: %v", path, err))
}
writer.CloseWithError(err)
}
}()

reader := bufio.NewReader(input)
buf := make([]byte, constants.BUF_INIT_SIZE)
scanner := bufio.NewScanner(input)
scanner.Buffer(buf, constants.SCANNER_MAX_SIZE)

lineNum := 0
for {
for scanner.Scan() {
lineNum++
var line string
line, err = readLine(reader)
if err != nil {
return
}
line := scanner.Bytes()

clean := strings.ReplaceAll(line, r.matchString, MASK_TEXT)
clean := bytes.ReplaceAll(line, r.match, maskTextBytes)

// io.WriteString would be nicer, but scanner strips new lines
fmt.Fprintf(writer, "%s\n", clean)
// Append newline since scanner strips it
err = writeBytes(writer, clean, NEW_LINE)
if err != nil {
return
}

if clean != line {
if !bytes.Equal(clean, line) {
addRedaction(Redaction{
RedactorName: r.redactName,
CharactersRemoved: len(line) - len(clean),
Expand All @@ -63,6 +71,9 @@ func (r literalRedactor) Redact(input io.Reader, path string) io.Reader {
})
}
}
if scanErr := scanner.Err(); scanErr != nil {
err = scanErr
}
}()
return out
}
74 changes: 50 additions & 24 deletions pkg/redact/multi_line.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ package redact

import (
"bufio"
"fmt"
"bytes"
"io"
"regexp"
"strings"
)

type MultiLineRedactor struct {
Expand All @@ -20,19 +19,19 @@ type MultiLineRedactor struct {

func NewMultiLineRedactor(re1 LineRedactor, re2 string, maskText, path, name string, isDefault bool) (*MultiLineRedactor, error) {
var scanCompiled *regexp.Regexp
compiled1, err := regexp.Compile(re1.regex)
compiled1, err := compileRegex(re1.regex)
if err != nil {
return nil, err
}

if re1.scan != "" {
scanCompiled, err = regexp.Compile(re1.scan)
scanCompiled, err = compileRegex(re1.scan)
if err != nil {
return nil, err
}
}

compiled2, err := regexp.Compile(re2)
compiled2, err := compileRegex(re2)
if err != nil {
return nil, err
}
Expand All @@ -48,14 +47,18 @@ func (r *MultiLineRedactor) Redact(input io.Reader, path string) io.Reader {
writer.CloseWithError(err)
}()

substStr := getReplacementPattern(r.re2, r.maskText)
substStr := []byte(getReplacementPattern(r.re2, r.maskText))

reader := bufio.NewReader(input)
line1, line2, err := getNextTwoLines(reader, nil)
if err != nil {
// this will print 2 blank lines for empty input...
fmt.Fprintf(writer, "%s\n", line1)
fmt.Fprintf(writer, "%s\n", line2)
// Append newlines since scanner strips them
err = writeBytes(writer, line1, NEW_LINE, line2, NEW_LINE)
if err != nil {
return
}

return
}

Expand All @@ -66,33 +69,41 @@ func (r *MultiLineRedactor) Redact(input io.Reader, path string) io.Reader {

// is scan is not nil, then check if line1 matches scan by lowercasing it
if r.scan != nil {
lowerLine1 := strings.ToLower(line1)
if !r.scan.MatchString(lowerLine1) {
fmt.Fprintf(writer, "%s\n", line1)
line1, line2, err = getNextTwoLines(reader, &line2)
lowerLine1 := bytes.ToLower(line1)
if !r.scan.Match(lowerLine1) {
// Append newline since scanner strips it
err = writeBytes(writer, line1, NEW_LINE)
if err != nil {
return
}
line1, line2, err = getNextTwoLines(reader, line2)
flushLastLine = true
continue
}
}

// If line1 matches re1, then transform line2 using re2
if !r.re1.MatchString(line1) {
fmt.Fprintf(writer, "%s\n", line1)
line1, line2, err = getNextTwoLines(reader, &line2)
if !r.re1.Match(line1) {
// Append newline since scanner strips it
err = writeBytes(writer, line1, NEW_LINE)
if err != nil {
return
}
line1, line2, err = getNextTwoLines(reader, line2)
flushLastLine = true
continue
}
flushLastLine = false
clean := r.re2.ReplaceAllString(line2, substStr)
clean := r.re2.ReplaceAll(line2, substStr)

// io.WriteString would be nicer, but reader strips new lines
fmt.Fprintf(writer, "%s\n%s\n", line1, clean)
// Append newlines since scanner strips them
err = writeBytes(writer, line1, NEW_LINE, clean, NEW_LINE)
if err != nil {
return
}

// if clean is not equal to line2, a redaction was performed
if clean != line2 {
if !bytes.Equal(clean, line2) {
addRedaction(Redaction{
RedactorName: r.redactName,
CharactersRemoved: len(line2) - len(clean),
Expand All @@ -106,15 +117,18 @@ func (r *MultiLineRedactor) Redact(input io.Reader, path string) io.Reader {
}

if flushLastLine {
fmt.Fprintf(writer, "%s\n", line1)
// Append newline since scanner strip it
err = writeBytes(writer, line1, NEW_LINE)
if err != nil {
return
}
}
}()
return out
}

func getNextTwoLines(reader *bufio.Reader, curLine2 *string) (line1 string, line2 string, err error) {
line1 = ""
line2 = ""
func getNextTwoLines(reader *bufio.Reader, curLine2 []byte) (line1 []byte, line2 []byte, err error) {
line2 = []byte{}

if curLine2 == nil {
line1, err = readLine(reader)
Expand All @@ -126,11 +140,23 @@ func getNextTwoLines(reader *bufio.Reader, curLine2 *string) (line1 string, line
return
}

line1 = *curLine2
line1 = curLine2
line2, err = readLine(reader)
if err != nil {
return
}

return
}

// writeBytes writes all byte slices to the writer
// in the order they are passed in the variadic argument
func writeBytes(w io.Writer, bs ...[]byte) error {
for _, b := range bs {
_, err := w.Write(b)
if err != nil {
return err
}
}
return nil
}
Loading

0 comments on commit 86279b4

Please sign in to comment.