Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(redactors): memory consumption improvements #1332

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ go tool pprof -http=":8000" cpu.prof
go tool pprof -http=":8001" mem.prof
```

**Additional flags for memory profiling**
- `inuse_space`: Amount of memory allocated and not released yet (default).
- `inuse_objects`: Amount of objects allocated and not released yet.
- `alloc_space`: Total amount of memory allocated (regardless of released).
- `alloc_objects`: Total amount of objects allocated (regardless of released).

More on profiling please visit https://go.dev/doc/diagnostics#profiling

## Contribution workflow
Expand Down
14 changes: 12 additions & 2 deletions pkg/collect/redact.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"bytes"
"compress/gzip"
"io"
"io/ioutil"
"os"
"path/filepath"
"strings"
Expand All @@ -17,18 +16,29 @@ import (
"k8s.io/klog/v2"
)

// Max number of concurrent redactors to run
// Ensure the number is low enough since each of the redactors
// also spawns goroutines to redact files in tar archives and
// other goroutines for each redactor spec.
const MAX_CONCURRENT_REDACTORS = 10

func RedactResult(bundlePath string, input CollectorResult, additionalRedactors []*troubleshootv1beta2.Redact) error {
wg := &sync.WaitGroup{}

// Error channel to capture errors from goroutines
errorCh := make(chan error, len(input))
limitCh := make(chan struct{}, MAX_CONCURRENT_REDACTORS)
banjoh marked this conversation as resolved.
Show resolved Hide resolved
defer close(limitCh)

for k, v := range input {
limitCh <- struct{}{}

wg.Add(1)

go func(file string, data []byte) {
defer wg.Done()
defer func() { <-limitCh }() // free up after the function execution has run

var reader io.Reader
if data == nil {

Expand Down Expand Up @@ -84,7 +94,7 @@ func RedactResult(bundlePath string, input CollectorResult, additionalRedactors
// If the file is .tar, .tgz or .tar.gz, it must not be redacted. Instead it is
// decompressed and each file inside the tar redacted and compressed back into the archive.
if filepath.Ext(file) == ".tar" || filepath.Ext(file) == ".tgz" || strings.HasSuffix(file, ".tar.gz") {
tmpDir, err := ioutil.TempDir("", "troubleshoot-subresult-")
tmpDir, err := os.MkdirTemp("", "troubleshoot-subresult-")
if err != nil {
errorCh <- errors.Wrap(err, "failed to create temp dir")
return
Expand Down
2 changes: 1 addition & 1 deletion pkg/collect/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (r CollectorResult) SaveResult(bundlePath string, relativePath string, read
return errors.Wrap(err, "failed to stat file")
}

klog.V(2).Infof("Added %q (%d MB) to bundle output", relativePath, fileInfo.Size()/(1024*1024))
klog.V(2).Infof("Added %q (%d KB) to bundle output", relativePath, fileInfo.Size()/(1024))
return nil
}

Expand Down
9 changes: 7 additions & 2 deletions pkg/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ const (
MESSAGE_TEXT_PADDING = 4
MESSAGE_TEXT_LINES_MARGIN_TO_BOTTOM = 4

// Bufio Reader Constants
MAX_BUFFER_CAPACITY = 1024 * 1024
// This is the initial size of the buffer allocated.
// Under the hood, an array of size N is allocated in memory
BUF_INIT_SIZE = 4096 // 4KB

// This is the muximum size the buffer can grow to
// Its not what the buffer will be allocated to initially
SCANNER_MAX_SIZE = 10 * 1024 * 1024 // 10MB
)
53 changes: 32 additions & 21 deletions pkg/redact/literal.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,26 @@ package redact

import (
"bufio"
"bytes"
"fmt"
"io"
"strings"

"github.com/replicatedhq/troubleshoot/pkg/constants"
"k8s.io/klog/v2"
)

type literalRedactor struct {
matchString string
filePath string
redactName string
isDefault bool
match []byte
filePath string
redactName string
isDefault bool
}

func literalString(matchString, path, name string) Redactor {
func literalString(match []byte, path, name string) Redactor {
return literalRedactor{
matchString: matchString,
filePath: path,
redactName: name,
match: match,
filePath: path,
redactName: name,
}
}

Expand All @@ -28,32 +31,37 @@ func (r literalRedactor) Redact(input io.Reader, path string) io.Reader {
go func() {
var err error
defer func() {
if err == io.EOF {
if err == nil || err == io.EOF {
writer.Close()
} else {
if err == bufio.ErrTooLong {
s := fmt.Sprintf("Error redacting %q. A line in the file exceeded %d MB max length", path, constants.SCANNER_MAX_SIZE/1024/1024)
klog.V(2).Info(s)
} else {
klog.V(2).Info(fmt.Sprintf("Error redacting %q: %v", path, err))
}
writer.CloseWithError(err)
}
}()

reader := bufio.NewReader(input)
buf := make([]byte, constants.BUF_INIT_SIZE)
scanner := bufio.NewScanner(input)
scanner.Buffer(buf, constants.SCANNER_MAX_SIZE)

lineNum := 0
for {
for scanner.Scan() {
lineNum++
var line string
line, err = readLine(reader)
if err != nil {
return
}
line := scanner.Bytes()

clean := strings.ReplaceAll(line, r.matchString, MASK_TEXT)
clean := bytes.ReplaceAll(line, r.match, maskTextBytes)

// io.WriteString would be nicer, but scanner strips new lines
fmt.Fprintf(writer, "%s\n", clean)
// Append newline since scanner strips it
err = writeBytes(writer, clean, NEW_LINE)
if err != nil {
return
}

if clean != line {
if !bytes.Equal(clean, line) {
addRedaction(Redaction{
RedactorName: r.redactName,
CharactersRemoved: len(line) - len(clean),
Expand All @@ -63,6 +71,9 @@ func (r literalRedactor) Redact(input io.Reader, path string) io.Reader {
})
}
}
if scanErr := scanner.Err(); scanErr != nil {
err = scanErr
}
}()
return out
}
74 changes: 50 additions & 24 deletions pkg/redact/multi_line.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@ package redact

import (
"bufio"
"fmt"
"bytes"
"io"
"regexp"
"strings"
)

type MultiLineRedactor struct {
Expand All @@ -20,19 +19,19 @@ type MultiLineRedactor struct {

func NewMultiLineRedactor(re1 LineRedactor, re2 string, maskText, path, name string, isDefault bool) (*MultiLineRedactor, error) {
var scanCompiled *regexp.Regexp
compiled1, err := regexp.Compile(re1.regex)
compiled1, err := compileRegex(re1.regex)
if err != nil {
return nil, err
}

if re1.scan != "" {
scanCompiled, err = regexp.Compile(re1.scan)
scanCompiled, err = compileRegex(re1.scan)
if err != nil {
return nil, err
}
}

compiled2, err := regexp.Compile(re2)
compiled2, err := compileRegex(re2)
if err != nil {
return nil, err
}
Expand All @@ -48,14 +47,18 @@ func (r *MultiLineRedactor) Redact(input io.Reader, path string) io.Reader {
writer.CloseWithError(err)
}()

substStr := getReplacementPattern(r.re2, r.maskText)
substStr := []byte(getReplacementPattern(r.re2, r.maskText))

reader := bufio.NewReader(input)
line1, line2, err := getNextTwoLines(reader, nil)
if err != nil {
// this will print 2 blank lines for empty input...
fmt.Fprintf(writer, "%s\n", line1)
fmt.Fprintf(writer, "%s\n", line2)
// Append newlines since scanner strips them
err = writeBytes(writer, line1, NEW_LINE, line2, NEW_LINE)
if err != nil {
return
}

return
}

Expand All @@ -66,33 +69,41 @@ func (r *MultiLineRedactor) Redact(input io.Reader, path string) io.Reader {

// is scan is not nil, then check if line1 matches scan by lowercasing it
if r.scan != nil {
lowerLine1 := strings.ToLower(line1)
if !r.scan.MatchString(lowerLine1) {
fmt.Fprintf(writer, "%s\n", line1)
line1, line2, err = getNextTwoLines(reader, &line2)
lowerLine1 := bytes.ToLower(line1)
if !r.scan.Match(lowerLine1) {
// Append newline since scanner strips it
err = writeBytes(writer, line1, NEW_LINE)
if err != nil {
return
}
line1, line2, err = getNextTwoLines(reader, line2)
flushLastLine = true
continue
}
}

// If line1 matches re1, then transform line2 using re2
if !r.re1.MatchString(line1) {
fmt.Fprintf(writer, "%s\n", line1)
line1, line2, err = getNextTwoLines(reader, &line2)
if !r.re1.Match(line1) {
// Append newline since scanner strips it
err = writeBytes(writer, line1, NEW_LINE)
if err != nil {
return
}
line1, line2, err = getNextTwoLines(reader, line2)
flushLastLine = true
continue
}
flushLastLine = false
clean := r.re2.ReplaceAllString(line2, substStr)
clean := r.re2.ReplaceAll(line2, substStr)

// io.WriteString would be nicer, but reader strips new lines
fmt.Fprintf(writer, "%s\n%s\n", line1, clean)
// Append newlines since scanner strips them
err = writeBytes(writer, line1, NEW_LINE, clean, NEW_LINE)
if err != nil {
return
}

// if clean is not equal to line2, a redaction was performed
if clean != line2 {
if !bytes.Equal(clean, line2) {
addRedaction(Redaction{
RedactorName: r.redactName,
CharactersRemoved: len(line2) - len(clean),
Expand All @@ -106,15 +117,18 @@ func (r *MultiLineRedactor) Redact(input io.Reader, path string) io.Reader {
}

if flushLastLine {
fmt.Fprintf(writer, "%s\n", line1)
// Append newline since scanner strip it
err = writeBytes(writer, line1, NEW_LINE)
if err != nil {
return
}
}
}()
return out
}

func getNextTwoLines(reader *bufio.Reader, curLine2 *string) (line1 string, line2 string, err error) {
line1 = ""
line2 = ""
func getNextTwoLines(reader *bufio.Reader, curLine2 []byte) (line1 []byte, line2 []byte, err error) {
line2 = []byte{}

if curLine2 == nil {
line1, err = readLine(reader)
Expand All @@ -126,11 +140,23 @@ func getNextTwoLines(reader *bufio.Reader, curLine2 *string) (line1 string, line
return
}

line1 = *curLine2
line1 = curLine2
line2, err = readLine(reader)
if err != nil {
return
}

return
}

// writeBytes writes all byte slices to the writer
// in the order they are passed in the variadic argument
func writeBytes(w io.Writer, bs ...[]byte) error {
for _, b := range bs {
_, err := w.Write(b)
if err != nil {
return err
}
}
return nil
}
Loading
Loading