From 32bf2daf3c5fa70b4afb1c4c266f9229f5021221 Mon Sep 17 00:00:00 2001 From: Anton Egorov Date: Thu, 14 Nov 2013 09:14:26 +0400 Subject: [PATCH 01/21] use testify assert for all test --- reader_test.go | 28 +++++++++++----------------- 1 file changed, 11 insertions(+), 17 deletions(-) diff --git a/reader_test.go b/reader_test.go index 7b53467..9dd4356 100644 --- a/reader_test.go +++ b/reader_test.go @@ -3,7 +3,6 @@ package gonx import ( "github.com/stretchr/testify/assert" "io" - "reflect" "strings" "testing" ) @@ -11,10 +10,9 @@ import ( func TestGetFormatRegexp(t *testing.T) { format := "$remote_addr [$time_local] \"$request\"" reader := NewReader(strings.NewReader(""), format) - expected := `^(?P[^ ]+) \[(?P[^]]+)\] "(?P[^"]+)"$` - if re := reader.GetFormatRegexp(); re.String() != expected { - t.Errorf("Wrong RE '%v'", re) - } + assert.Equal(t, + reader.GetFormatRegexp().String(), + `^(?P[^ ]+) \[(?P[^]]+)\] "(?P[^"]+)"$`) } func TestGetRecord(t *testing.T) { @@ -27,24 +25,20 @@ func TestGetRecord(t *testing.T) { "request": "GET /api/foo/bar HTTP/1.1", } rec, err := reader.Read() - if err != nil { - t.Error(err) - } - if !reflect.DeepEqual(rec, expected) { - t.Errorf("Get invalid record %v", rec) - } - if _, err := reader.Read(); err != io.EOF { - t.Error("End of file expected") - } + assert.NoError(t, err) + assert.Equal(t, rec, expected) + + _, err = reader.Read() + assert.Equal(t, err, io.EOF) } func TestInvalidLineFormat(t *testing.T) { format := "$remote_addr [$time_local] \"$request\"" file := strings.NewReader(`89.234.89.123 - - [08/Nov/2013:13:39:18 +0000] "GET /api/foo/bar HTTP/1.1"`) reader := NewReader(file, format) - if rec, err := reader.Read(); err == nil { - t.Errorf("Invalid record error expected, but get the record %+v", rec) - } + rec, err := reader.Read() + assert.Error(t, err) + assert.Empty(t, rec) } func TestReadLogFormatFromFile(t *testing.T) { From 529c92c0c28a9bd316303f9476cd812410402d53 Mon Sep 17 00:00:00 2001 From: Anton Egorov Date: Thu, 14 Nov 2013 09:15:15 +0400 Subject: [PATCH 02/21] Makefile for deps installation and test --- Makefile | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 Makefile diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..f3f1348 --- /dev/null +++ b/Makefile @@ -0,0 +1,9 @@ +test: deps + go test -v . + +deps: + go get github.com/stretchr/testify + +dev-deps: + go get github.com/nsf/gocode + go get code.google.com/p/rog-go/exp/cmd/godef From 7a05881adb114e0813cba92bb15a48a08afbffa1 Mon Sep 17 00:00:00 2001 From: Anton Egorov Date: Thu, 14 Nov 2013 09:16:47 +0400 Subject: [PATCH 03/21] file parsing concurency --- entry.go | 15 +++++++++ reader.go | 82 +++++++++++++++++++++++++++++++++++++------------- reader_test.go | 1 + 3 files changed, 77 insertions(+), 21 deletions(-) create mode 100644 entry.go diff --git a/entry.go b/entry.go new file mode 100644 index 0000000..3f6146a --- /dev/null +++ b/entry.go @@ -0,0 +1,15 @@ +package gonx + +import ( + "fmt" +) + +type Entry map[string]string + +func (entry *Entry) Get(name string) (value string, err error) { + value, ok := (*entry)[name] + if !ok { + err = fmt.Errorf("Field '%v' does not found in record %+v", name, *entry) + } + return +} diff --git a/reader.go b/reader.go index 54f9166..ef51bbc 100644 --- a/reader.go +++ b/reader.go @@ -4,20 +4,53 @@ import ( "bufio" "fmt" "io" + //"os" "regexp" + "sync" ) type Reader struct { format string re *regexp.Regexp - scanner *bufio.Scanner + files chan io.Reader + entries chan Entry + wg sync.WaitGroup } -func NewReader(logFile io.Reader, format string) *Reader { - return &Reader{ +func NewMap(files chan io.Reader, format string) *Reader { + r := &Reader{ format: format, - scanner: bufio.NewScanner(logFile), + files: files, + entries: make(chan Entry, 10), + } + // Get regexp to trigger its creation now to avoid data race + // in future (this is not the method I want to lock Mutex + // every time. + // TODO Unbind this method from Reader struct + r.GetFormatRegexp() + + for file := range files { + r.wg.Add(1) + go r.readFile(file) } + + go func() { + r.wg.Wait() + close(r.entries) + }() + + return r +} + +func (r *Reader) handleError(err error) { + //fmt.Fprintln(os.Stderr, err) +} + +func NewReader(logFile io.Reader, format string) *Reader { + files := make(chan io.Reader, 1) + files <- logFile + close(files) + return NewMap(files, format) } func NewNginxReader(logFile io.Reader, nginxConf io.Reader, formatName string) (reader *Reader, err error) { @@ -72,26 +105,33 @@ func (r *Reader) GetFormatRegexp() *regexp.Regexp { return r.re } -type Entry map[string]string - -func (entry *Entry) Get(name string) (value string, err error) { - value, ok := (*entry)[name] - if !ok { - err = fmt.Errorf("Field '%v' does not found in record %+v", name, *entry) +func (r *Reader) readFile(file io.Reader) { + // Iterate over log file lines and spawn new mapper goroutine + // to parse it into given format + scanner := bufio.NewScanner(file) + for scanner.Scan() { + r.wg.Add(1) + go func(line string) { + entry, err := r.parseRecord(line) + if err != nil { + r.handleError(err) + } + r.entries <- entry + r.wg.Done() + }(scanner.Text()) } - return + if err := scanner.Err(); err != nil { + r.handleError(err) + } + r.wg.Done() } -// Read next line from log file, and return parsed record. If all lines read -// method return ni, io.EOF -func (r *Reader) Read() (record Entry, err error) { - if r.scanner.Scan() { - record, err = r.parseRecord(r.scanner.Text()) - } else { - err = r.scanner.Err() - if err == nil { - err = io.EOF - } +// Read next line from entries channel, and return parsed record. If channel is closed +// then method returns io.EOF error +func (r *Reader) Read() (entry Entry, err error) { + entry, ok := <-r.entries + if !ok { + err = io.EOF } return } diff --git a/reader_test.go b/reader_test.go index 9dd4356..ad9e9f6 100644 --- a/reader_test.go +++ b/reader_test.go @@ -33,6 +33,7 @@ func TestGetRecord(t *testing.T) { } func TestInvalidLineFormat(t *testing.T) { + t.Skip("Read method does not return errors anymore, because of asynchronios algorithm") format := "$remote_addr [$time_local] \"$request\"" file := strings.NewReader(`89.234.89.123 - - [08/Nov/2013:13:39:18 +0000] "GET /api/foo/bar HTTP/1.1"`) reader := NewReader(file, format) From 8470c19c95580d6ce27eedda365a2a20cd5cffee Mon Sep 17 00:00:00 2001 From: Anton Egorov Date: Thu, 14 Nov 2013 09:22:41 +0400 Subject: [PATCH 04/21] entry tests in a separate file --- entry_test.go | 20 ++++++++++++++++++++ reader_test.go | 14 -------------- 2 files changed, 20 insertions(+), 14 deletions(-) create mode 100644 entry_test.go diff --git a/entry_test.go b/entry_test.go new file mode 100644 index 0000000..a5823c0 --- /dev/null +++ b/entry_test.go @@ -0,0 +1,20 @@ +package gonx + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestEntry(t *testing.T) { + entry := Entry{"foo": "1"} + + // Get existings field + val, err := entry.Get("foo") + assert.NoError(t, err) + assert.Equal(t, val, "1") + + // Get field that does not exist + val, err = entry.Get("bar") + assert.Error(t, err) + assert.Equal(t, val, "") +} diff --git a/reader_test.go b/reader_test.go index ad9e9f6..17a11ad 100644 --- a/reader_test.go +++ b/reader_test.go @@ -66,17 +66,3 @@ func TestReadLogFormatFromFile(t *testing.T) { t.Errorf("Wrong format was read from conf file \n%v\nExpected\n%v", format, expected) } } - -func TestEntry(t *testing.T) { - entry := Entry{"foo": "1"} - - // Get existings field - val, err := entry.Get("foo") - assert.NoError(t, err) - assert.Equal(t, val, "1") - - // Get field that does not exist - val, err = entry.Get("bar") - assert.Error(t, err) - assert.Equal(t, val, "") -} From d56467e339d291c989864c0cb198fd485e3f1286 Mon Sep 17 00:00:00 2001 From: Anton Egorov Date: Thu, 14 Nov 2013 11:29:11 +0400 Subject: [PATCH 05/21] introduce Parser type --- parser.go | 82 ++++++++++++++++++++++++++++++++++ parser_test.go | 72 ++++++++++++++++++++++++++++++ reader.go | 116 +++++++++++-------------------------------------- reader_test.go | 50 +++++---------------- 4 files changed, 191 insertions(+), 129 deletions(-) create mode 100644 parser.go create mode 100644 parser_test.go diff --git a/parser.go b/parser.go new file mode 100644 index 0000000..9a97386 --- /dev/null +++ b/parser.go @@ -0,0 +1,82 @@ +package gonx + +import ( + "bufio" + "fmt" + "io" + "regexp" +) + +type Parser struct { + format string + regexp *regexp.Regexp +} + +// Returns a new Parser, use givel log format to create its internal +// strings parsing regexp. +func NewParser(format string) *Parser { + re := regexp.MustCompile(`\\\$([a-z_]+)(\\?(.))`).ReplaceAllString( + regexp.QuoteMeta(format), "(?P<$1>[^$3]+)$2") + return &Parser{format, regexp.MustCompile(fmt.Sprintf("^%v$", re))} +} + +// Parse log file line using internal format regexp. If line do not match +// given format an error will be returned. +func (parser *Parser) ParseString(line string) (entry Entry, err error) { + re := parser.regexp + fields := re.FindStringSubmatch(line) + if fields == nil { + err = fmt.Errorf("Access log line '%v' does not match given format '%v'", line, re) + return + } + + // Iterate over subexp foung and fill the map record + entry = make(Entry) + for i, name := range re.SubexpNames() { + if i == 0 { + continue + } + entry[name] = fields[i] + } + return +} + +// NewNginxParser parse nginx conf file to find log_format with diven name and +// returns parser for this format. If returns an error if cannot find the needle. +func NewNginxParser(conf io.Reader, name string) (parser *Parser, err error) { + scanner := bufio.NewScanner(conf) + re := regexp.MustCompile(fmt.Sprintf(`^.*log_format\s+%v\s+(.+)\s*$`, name)) + found := false + var format string + for scanner.Scan() { + var line string + if !found { + // Find a log_format definition + line = scanner.Text() + formatDef := re.FindStringSubmatch(line) + if formatDef == nil { + continue + } + found = true + line = formatDef[1] + } else { + line = scanner.Text() + } + // Look for a definition end + re = regexp.MustCompile(`^\s*(.*?)\s*(;|$)`) + lineSplit := re.FindStringSubmatch(line) + if l := len(lineSplit[1]); l > 2 { + format += lineSplit[1][1 : l-1] + } + if lineSplit[2] == ";" { + break + } + } + if !found { + err = fmt.Errorf("`log_format %v` not found in given config", name) + } else { + err = scanner.Err() + } + parser = NewParser(format) + return +} diff --git a/parser_test.go b/parser_test.go new file mode 100644 index 0000000..b44051a --- /dev/null +++ b/parser_test.go @@ -0,0 +1,72 @@ +package gonx + +import ( + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/suite" + "strings" + "testing" +) + +type ParserTestSuite struct { + suite.Suite + format string + parser *Parser +} + +func (suite *ParserTestSuite) SetupTest() { + suite.format = "$remote_addr [$time_local] \"$request\"" + suite.parser = NewParser(suite.format) +} + +func TestParserTestSuite(t *testing.T) { + suite.Run(t, new(ParserTestSuite)) +} + +func (suite *ParserTestSuite) TestFormatSaved() { + assert.Equal(suite.T(), suite.parser.format, suite.format) +} + +func (suite *ParserTestSuite) TestRegexp() { + assert.Equal(suite.T(), + suite.parser.regexp.String(), + `^(?P[^ ]+) \[(?P[^]]+)\] "(?P[^"]+)"$`) +} + +func (suite *ParserTestSuite) TestParseString() { + line := `89.234.89.123 [08/Nov/2013:13:39:18 +0000] "GET /api/foo/bar HTTP/1.1"` + expected := Entry{ + "remote_addr": "89.234.89.123", + "time_local": "08/Nov/2013:13:39:18 +0000", + "request": "GET /api/foo/bar HTTP/1.1", + } + entry, err := suite.parser.ParseString(line) + assert.NoError(suite.T(), err) + assert.Equal(suite.T(), entry, expected) +} + +func (suite *ParserTestSuite) TestParseInvalidString() { + line := `GET /api/foo/bar HTTP/1.1` + _, err := suite.parser.ParseString(line) + assert.Error(suite.T(), err) + // TODO test empty value +} + +func TestNginxParser(t *testing.T) { + expected := "$remote_addr - $remote_user [$time_local] \"$request\" $status \"$http_referer\" \"$http_user_agent\"" + conf := strings.NewReader(` + http { + include conf/mime.types; + log_format minimal '$remote_addr [$time_local] "$request"'; + log_format main '$remote_addr - $remote_user [$time_local] ' + '"$request" $status ' + '"$http_referer" "$http_user_agent"'; + log_format download '$remote_addr - $remote_user [$time_local] ' + '"$request" $status $bytes_sent ' + '"$http_referer" "$http_user_agent" ' + '"$http_range" "$sent_http_content_range"'; + } + `) + parser, err := NewNginxParser(conf, "main") + assert.NoError(t, err) + assert.Equal(t, parser.format, expected) +} diff --git a/reader.go b/reader.go index ef51bbc..33730ba 100644 --- a/reader.go +++ b/reader.go @@ -2,109 +2,62 @@ package gonx import ( "bufio" - "fmt" "io" //"os" - "regexp" "sync" ) type Reader struct { - format string - re *regexp.Regexp + parser *Parser files chan io.Reader entries chan Entry wg sync.WaitGroup } -func NewMap(files chan io.Reader, format string) *Reader { - r := &Reader{ - format: format, +func newMap(files chan io.Reader, parser *Parser) *Reader { + reader := &Reader{ + parser: parser, files: files, entries: make(chan Entry, 10), } - // Get regexp to trigger its creation now to avoid data race - // in future (this is not the method I want to lock Mutex - // every time. - // TODO Unbind this method from Reader struct - r.GetFormatRegexp() for file := range files { - r.wg.Add(1) - go r.readFile(file) + reader.wg.Add(1) + go reader.readFile(file) } go func() { - r.wg.Wait() - close(r.entries) + reader.wg.Wait() + close(reader.entries) }() - return r + return reader } func (r *Reader) handleError(err error) { //fmt.Fprintln(os.Stderr, err) } +func oneFileChannel(file io.Reader) chan io.Reader { + ch := make(chan io.Reader, 1) + ch <- file + close(ch) + return ch +} + func NewReader(logFile io.Reader, format string) *Reader { - files := make(chan io.Reader, 1) - files <- logFile - close(files) - return NewMap(files, format) + return newMap(oneFileChannel(logFile), NewParser(format)) } func NewNginxReader(logFile io.Reader, nginxConf io.Reader, formatName string) (reader *Reader, err error) { - scanner := bufio.NewScanner(nginxConf) - re := regexp.MustCompile(fmt.Sprintf(`^.*log_format\s+%v\s+(.+)\s*$`, formatName)) - found := false - var format string - for scanner.Scan() { - var line string - if !found { - // Find a log_format definition - line = scanner.Text() - formatDef := re.FindStringSubmatch(line) - if formatDef == nil { - continue - } - found = true - line = formatDef[1] - } else { - line = scanner.Text() - } - // Look for a definition end - re = regexp.MustCompile(`^\s*(.*?)\s*(;|$)`) - lineSplit := re.FindStringSubmatch(line) - if l := len(lineSplit[1]); l > 2 { - format += lineSplit[1][1 : l-1] - } - if lineSplit[2] == ";" { - break - } + parser, err := NewNginxParser(nginxConf, formatName) + if err != nil { + return nil, err } - if !found { - err = fmt.Errorf("`log_format %v` not found in given config", formatName) - } else { - err = scanner.Err() - } - reader = NewReader(logFile, format) + reader = newMap(oneFileChannel(logFile), parser) return } -func (r *Reader) GetFormat() string { - return r.format -} - -func (r *Reader) GetFormatRegexp() *regexp.Regexp { - if r.re != nil { - return r.re - } - format := regexp.MustCompile(`\\\$([a-z_]+)(\\?(.))`).ReplaceAllString( - regexp.QuoteMeta(r.format), "(?P<$1>[^$3]+)$2") - r.re = regexp.MustCompile(fmt.Sprintf("^%v$", format)) - return r.re -} - func (r *Reader) readFile(file io.Reader) { // Iterate over log file lines and spawn new mapper goroutine // to parse it into given format @@ -112,11 +65,12 @@ func (r *Reader) readFile(file io.Reader) { for scanner.Scan() { r.wg.Add(1) go func(line string) { - entry, err := r.parseRecord(line) - if err != nil { + entry, err := r.parser.ParseString(line) + if err == nil { + r.entries <- entry + } else { r.handleError(err) } - r.entries <- entry r.wg.Done() }(scanner.Text()) } @@ -135,23 +89,3 @@ func (r *Reader) Read() (entry Entry, err error) { } return } - -func (r *Reader) parseRecord(line string) (record Entry, err error) { - // Parse line to fill map record. Return error if a line does not match given format - re := r.GetFormatRegexp() - fields := re.FindStringSubmatch(line) - if fields == nil { - err = fmt.Errorf("Access log line '%v' does not match given format '%v'", line, re) - return - } - - // Iterate over subexp foung and fill the map record - record = make(Entry) - for i, name := range re.SubexpNames() { - if i == 0 { - continue - } - record[name] = fields[i] - } - return -} diff --git a/reader_test.go b/reader_test.go index 17a11ad..63764ca 100644 --- a/reader_test.go +++ b/reader_test.go @@ -7,27 +7,23 @@ import ( "testing" ) -func TestGetFormatRegexp(t *testing.T) { - format := "$remote_addr [$time_local] \"$request\"" - reader := NewReader(strings.NewReader(""), format) - assert.Equal(t, - reader.GetFormatRegexp().String(), - `^(?P[^ ]+) \[(?P[^]]+)\] "(?P[^"]+)"$`) -} - -func TestGetRecord(t *testing.T) { +func TestRead(t *testing.T) { format := "$remote_addr [$time_local] \"$request\"" file := strings.NewReader(`89.234.89.123 [08/Nov/2013:13:39:18 +0000] "GET /api/foo/bar HTTP/1.1"`) reader := NewReader(file, format) + expected := Entry{ "remote_addr": "89.234.89.123", "time_local": "08/Nov/2013:13:39:18 +0000", "request": "GET /api/foo/bar HTTP/1.1", } - rec, err := reader.Read() + + // Read entry from incoming channel + entry, err := reader.Read() assert.NoError(t, err) - assert.Equal(t, rec, expected) + assert.Equal(t, entry, expected) + // It was only one line, nothing to read _, err = reader.Read() assert.Equal(t, err, io.EOF) } @@ -37,32 +33,10 @@ func TestInvalidLineFormat(t *testing.T) { format := "$remote_addr [$time_local] \"$request\"" file := strings.NewReader(`89.234.89.123 - - [08/Nov/2013:13:39:18 +0000] "GET /api/foo/bar HTTP/1.1"`) reader := NewReader(file, format) - rec, err := reader.Read() - assert.Error(t, err) - assert.Empty(t, rec) -} -func TestReadLogFormatFromFile(t *testing.T) { - expected := "$remote_addr - $remote_user [$time_local] \"$request\" $status \"$http_referer\" \"$http_user_agent\"" - conf := strings.NewReader(` - http { - include conf/mime.types; - log_format minimal '$remote_addr [$time_local] "$request"'; - log_format main '$remote_addr - $remote_user [$time_local] ' - '"$request" $status ' - '"$http_referer" "$http_user_agent"'; - log_format download '$remote_addr - $remote_user [$time_local] ' - '"$request" $status $bytes_sent ' - '"$http_referer" "$http_user_agent" ' - '"$http_range" "$sent_http_content_range"'; - } - `) - file := strings.NewReader("") - reader, err := NewNginxReader(file, conf, "main") - if err != nil { - t.Error(err) - } - if format := reader.GetFormat(); format != expected { - t.Errorf("Wrong format was read from conf file \n%v\nExpected\n%v", format, expected) - } + // Invalid entries do not go to the entries channel, so nothing to read + _, err := reader.Read() + assert.Equal(t, err, io.EOF) + + // TODO test Reader internal error handling } From fa9198c17b4a488f61cd9d69e54cf9355fc3fefa Mon Sep 17 00:00:00 2001 From: Anton Egorov Date: Thu, 14 Nov 2013 11:35:51 +0400 Subject: [PATCH 06/21] nginx config example with fixtures --- example/nginx/nginx.go | 38 ++++++++++++++++++++++++++++++++------ 1 file changed, 32 insertions(+), 6 deletions(-) diff --git a/example/nginx/nginx.go b/example/nginx/nginx.go index eeecd59..3e6f246 100644 --- a/example/nginx/nginx.go +++ b/example/nginx/nginx.go @@ -6,28 +6,54 @@ import ( "github.com/satyrius/gonx" "io" "os" + "strings" ) var conf string var format string +var logFile string func init() { - flag.StringVar(&conf, "conf", "/etc/nginx/nginx.conf", "Nginx config file") + flag.StringVar(&conf, "conf", "dummy", "Nginx config file (e.g. /etc/nginx/nginx.conf)") flag.StringVar(&format, "format", "main", "Nginx log_format name") + flag.StringVar(&logFile, "log", "dummy", "Log file name to read. Read from STDIN if file name is '-'") } func main() { flag.Parse() + // Read given file or from STDIN + var file io.Reader + if logFile == "dummy" { + file = strings.NewReader(`89.234.89.123 [08/Nov/2013:13:39:18 +0000] "GET /api/foo/bar HTTP/1.1"`) + } else if logFile == "-" { + file = os.Stdin + } else { + file, err := os.Open(logFile) + if err != nil { + panic(err) + } + defer file.Close() + } + // Use nginx config file to extract format by the name - nginxConfig, err := os.Open(conf) - if err != nil { - panic(err) + var nginxConfig io.Reader + if conf == "dummy" { + nginxConfig = strings.NewReader(` + http { + log_format main '$remote_addr [$time_local] "$request"'; + } + `) + } else { + nginxConfig, err := os.Open(conf) + if err != nil { + panic(err) + } + defer nginxConfig.Close() } - defer nginxConfig.Close() // Read from STDIN and use log_format to parse log records - reader, err := gonx.NewNginxReader(os.Stdin, nginxConfig, format) + reader, err := gonx.NewNginxReader(file, nginxConfig, format) if err != nil { panic(err) } From 698c53e24167265ee3cb4200acfb32b21efdc521 Mon Sep 17 00:00:00 2001 From: Anton Egorov Date: Thu, 14 Nov 2013 14:57:09 +0400 Subject: [PATCH 07/21] parsing benchmarks --- Makefile | 3 +++ parser_bench_test.go | 39 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 42 insertions(+) create mode 100644 parser_bench_test.go diff --git a/Makefile b/Makefile index f3f1348..be4cbef 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,9 @@ test: deps go test -v . +bench: deps + go test -bench . + deps: go get github.com/stretchr/testify diff --git a/parser_bench_test.go b/parser_bench_test.go new file mode 100644 index 0000000..4e3097d --- /dev/null +++ b/parser_bench_test.go @@ -0,0 +1,39 @@ +package gonx + +import ( + "testing" +) + +func benchLogParsing(b *testing.B, format string, line string) { + parser := NewParser(format) + + // Ensure the string is in valid format + _, err := parser.ParseString(line) + if err != nil { + b.Error(err) + b.Fail() + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + parser.ParseString(line) + } +} + +func BenchmarkParseSimpleLogRecord(b *testing.B) { + format := "$remote_addr [$time_local] \"$request\"" + line := `89.234.89.123 [08/Nov/2013:13:39:18 +0000] "GET /api/foo/bar HTTP/1.1"` + benchLogParsing(b, format, line) +} + +func BenchmarkParseLogRecord(b *testing.B) { + format := `$remote_addr - $remote_user [$time_local] "$request" $status ` + + `$body_bytes_sent "$http_referer" "$http_user_agent" "$http_x_forwarded_for" ` + + `"$cookie_uid" "$cookie_userid" "$request_time" "$http_host" "$is_ajax" ` + + `"$uid_got/$uid_set" "$msec" "$geoip_country_code"` + line := `**.***.**.*** - - [08/Nov/2013:13:39:18 +0000] ` + + `"GET /api/internal/v2/item/1?lang=en HTTP/1.1" 200 142 "http://example.com" ` + + `"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/30.0.1599.101 Safari/537.36" ` + + `"-" "-" "-" "0.084" "example.com" "ajax" "-/-" "1383917958.587" "-"` + benchLogParsing(b, format, line) +} From 42bc21f6362d6534b1131623ccc6ce008640d2cf Mon Sep 17 00:00:00 2001 From: Anton Egorov Date: Thu, 14 Nov 2013 16:13:02 +0400 Subject: [PATCH 08/21] separate map reduce methods --- mapreduce.go | 59 ++++++++++++++++++++++++++++++++++++++++++++++++++++ reader.go | 55 ------------------------------------------------ 2 files changed, 59 insertions(+), 55 deletions(-) create mode 100644 mapreduce.go diff --git a/mapreduce.go b/mapreduce.go new file mode 100644 index 0000000..ea03983 --- /dev/null +++ b/mapreduce.go @@ -0,0 +1,59 @@ +package gonx + +import ( + "bufio" + "io" +) + +func (r *Reader) handleError(err error) { + //fmt.Fprintln(os.Stderr, err) +} + +func newMap(files chan io.Reader, parser *Parser) *Reader { + reader := &Reader{ + parser: parser, + files: files, + entries: make(chan Entry, 10), + } + + for file := range files { + reader.wg.Add(1) + go reader.readFile(file) + } + + go func() { + reader.wg.Wait() + close(reader.entries) + }() + + return reader +} + +func oneFileChannel(file io.Reader) chan io.Reader { + ch := make(chan io.Reader, 1) + ch <- file + close(ch) + return ch +} + +func (r *Reader) readFile(file io.Reader) { + // Iterate over log file lines and spawn new mapper goroutine + // to parse it into given format + scanner := bufio.NewScanner(file) + for scanner.Scan() { + r.wg.Add(1) + go func(line string) { + entry, err := r.parser.ParseString(line) + if err == nil { + r.entries <- entry + } else { + r.handleError(err) + } + r.wg.Done() + }(scanner.Text()) + } + if err := scanner.Err(); err != nil { + r.handleError(err) + } + r.wg.Done() +} diff --git a/reader.go b/reader.go index 33730ba..b5cbb24 100644 --- a/reader.go +++ b/reader.go @@ -1,9 +1,7 @@ package gonx import ( - "bufio" "io" - //"os" "sync" ) @@ -14,37 +12,6 @@ type Reader struct { wg sync.WaitGroup } -func newMap(files chan io.Reader, parser *Parser) *Reader { - reader := &Reader{ - parser: parser, - files: files, - entries: make(chan Entry, 10), - } - - for file := range files { - reader.wg.Add(1) - go reader.readFile(file) - } - - go func() { - reader.wg.Wait() - close(reader.entries) - }() - - return reader -} - -func (r *Reader) handleError(err error) { - //fmt.Fprintln(os.Stderr, err) -} - -func oneFileChannel(file io.Reader) chan io.Reader { - ch := make(chan io.Reader, 1) - ch <- file - close(ch) - return ch -} - func NewReader(logFile io.Reader, format string) *Reader { return newMap(oneFileChannel(logFile), NewParser(format)) } @@ -58,28 +25,6 @@ func NewNginxReader(logFile io.Reader, nginxConf io.Reader, formatName string) ( return } -func (r *Reader) readFile(file io.Reader) { - // Iterate over log file lines and spawn new mapper goroutine - // to parse it into given format - scanner := bufio.NewScanner(file) - for scanner.Scan() { - r.wg.Add(1) - go func(line string) { - entry, err := r.parser.ParseString(line) - if err == nil { - r.entries <- entry - } else { - r.handleError(err) - } - r.wg.Done() - }(scanner.Text()) - } - if err := scanner.Err(); err != nil { - r.handleError(err) - } - r.wg.Done() -} - // Read next line from entries channel, and return parsed record. If channel is closed // then method returns io.EOF error func (r *Reader) Read() (entry Entry, err error) { From 87e50a87171e6121e8f07725d1c7899a8806e6a2 Mon Sep 17 00:00:00 2001 From: Anton Egorov Date: Thu, 14 Nov 2013 16:26:30 +0400 Subject: [PATCH 09/21] update makefile and travis config, no Gomfile --- .travis.yml | 8 +++----- Gomfile | 1 - Makefile | 4 ++-- 3 files changed, 5 insertions(+), 8 deletions(-) delete mode 100644 Gomfile diff --git a/.travis.yml b/.travis.yml index 6247ff3..f31a939 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,8 +2,6 @@ language: go go: - 1.1 - tip -before_install: - - go get github.com/mattn/gom -script: - - $HOME/gopath/bin/gom install - - $HOME/gopath/bin/gom test +install: make deps +script: make test +after_success: make bench diff --git a/Gomfile b/Gomfile deleted file mode 100644 index d173b89..0000000 --- a/Gomfile +++ /dev/null @@ -1 +0,0 @@ -gom 'github.com/stretchr/testify' diff --git a/Makefile b/Makefile index be4cbef..f57a138 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ -test: deps +test: go test -v . -bench: deps +bench: go test -bench . deps: From dace4daff1b081ab83ec79dfa8db37dcbc41edcd Mon Sep 17 00:00:00 2001 From: Anton Egorov Date: Thu, 14 Nov 2013 16:31:01 +0400 Subject: [PATCH 10/21] travis script running tests with benchmarks --- .travis.yml | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index f31a939..3d6cdb0 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,5 +3,4 @@ go: - 1.1 - tip install: make deps -script: make test -after_success: make bench +script: go test -v -bench . From c0d1f3a7431437db2676df41d6998bf3f460ab75 Mon Sep 17 00:00:00 2001 From: Anton Egorov Date: Thu, 14 Nov 2013 20:43:14 +0400 Subject: [PATCH 11/21] relative paths to gone from examples --- example/common/common.go | 2 +- example/nginx/nginx.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/example/common/common.go b/example/common/common.go index 58d028d..0ba930b 100644 --- a/example/common/common.go +++ b/example/common/common.go @@ -1,9 +1,9 @@ package main import ( + gonx "../.." "flag" "fmt" - "github.com/satyrius/gonx" "io" "os" "strings" diff --git a/example/nginx/nginx.go b/example/nginx/nginx.go index 3e6f246..f606826 100644 --- a/example/nginx/nginx.go +++ b/example/nginx/nginx.go @@ -1,9 +1,9 @@ package main import ( + gonx "../.." "flag" "fmt" - "github.com/satyrius/gonx" "io" "os" "strings" From 09e9d11d02bca3a943fbe0ae38cd2ababe28f3de Mon Sep 17 00:00:00 2001 From: Anton Egorov Date: Thu, 14 Nov 2013 20:49:55 +0400 Subject: [PATCH 12/21] scripts for different strategy use benchmark --- benchmarks/bufio_read.go | 23 ++++++++++++++++++++++ benchmarks/bufio_read_entry.go | 29 +++++++++++++++++++++++++++ benchmarks/gonx_read_entry.go | 36 ++++++++++++++++++++++++++++++++++ 3 files changed, 88 insertions(+) create mode 100644 benchmarks/bufio_read.go create mode 100644 benchmarks/bufio_read_entry.go create mode 100644 benchmarks/gonx_read_entry.go diff --git a/benchmarks/bufio_read.go b/benchmarks/bufio_read.go new file mode 100644 index 0000000..e37a66d --- /dev/null +++ b/benchmarks/bufio_read.go @@ -0,0 +1,23 @@ +// Example program that reads big nginx file from stdin line by line +// and measure reading time. The file should be big enough, at least 500K lines +package main + +import ( + "bufio" + "fmt" + "os" + "time" +) + +func main() { + scanner := bufio.NewScanner(os.Stdin) + var count int + start := time.Now() + for scanner.Scan() { + // A dummy action, jest read line by line + scanner.Text() + count++ + } + duration := time.Since(start) + fmt.Printf("%v lines readed, it takes %v\n", count, duration) +} diff --git a/benchmarks/bufio_read_entry.go b/benchmarks/bufio_read_entry.go new file mode 100644 index 0000000..5fa64d3 --- /dev/null +++ b/benchmarks/bufio_read_entry.go @@ -0,0 +1,29 @@ +// Example program that reads big nginx file from stdin line by line +// and measure reading time. The file should be big enough, at least 500K lines +package main + +import ( + gonx ".." + "bufio" + "fmt" + "os" + "time" +) + +func main() { + scanner := bufio.NewScanner(os.Stdin) + var count int + format := `$remote_addr - $remote_user [$time_local] "$request" $status ` + + `$body_bytes_sent "$http_referer" "$http_user_agent" "$http_x_forwarded_for" ` + + `"$cookie_uid" "$cookie_userid" "$request_time" "$http_host" "$is_ajax" ` + + `"$uid_got/$uid_set" "$msec" "$geoip_country_code"` + parser := gonx.NewParser(format) + start := time.Now() + for scanner.Scan() { + // A dummy action, jest read line by line + parser.ParseString(scanner.Text()) + count++ + } + duration := time.Since(start) + fmt.Printf("%v lines readed, it takes %v\n", count, duration) +} diff --git a/benchmarks/gonx_read_entry.go b/benchmarks/gonx_read_entry.go new file mode 100644 index 0000000..e4b6fbf --- /dev/null +++ b/benchmarks/gonx_read_entry.go @@ -0,0 +1,36 @@ +// Example program that reads big nginx file from stdin line by line +// and measure reading time. The file should be big enough, at least 500K lines +package main + +import ( + gonx ".." + "fmt" + "io" + "os" + "runtime" + "time" +) + +func init() { + numcpu := runtime.NumCPU() + runtime.GOMAXPROCS(numcpu + 1) +} + +func main() { + var count int + format := `$remote_addr - $remote_user [$time_local] "$request" $status ` + + `$body_bytes_sent "$http_referer" "$http_user_agent" "$http_x_forwarded_for" ` + + `"$cookie_uid" "$cookie_userid" "$request_time" "$http_host" "$is_ajax" ` + + `"$uid_got/$uid_set" "$msec" "$geoip_country_code"` + reader := gonx.NewReader(os.Stdin, format) + start := time.Now() + for { + _, err := reader.Read() + if err == io.EOF { + break + } + count++ + } + duration := time.Since(start) + fmt.Printf("%v lines readed, it takes %v\n", count, duration) +} From 812cb6b244a44e7b41e98cff1387d884df553476 Mon Sep 17 00:00:00 2001 From: Anton Egorov Date: Sat, 16 Nov 2013 22:40:48 +0400 Subject: [PATCH 13/21] ignore my own .env --- .gitignore | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.gitignore b/.gitignore index 323546a..03f0492 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ +.env + # Compiled Object files, Static and Dynamic libs (Shared Objects) *.o *.a From 574ba2d863429be895343708c14e0d26dbdfffe5 Mon Sep 17 00:00:00 2001 From: Anton Egorov Date: Mon, 18 Nov 2013 00:40:53 +0400 Subject: [PATCH 14/21] move channels and sync to Map struct --- mapreduce.go | 64 ++++++++++++++++++++++++++++++++-------------------- reader.go | 27 +++++++++++----------- 2 files changed, 53 insertions(+), 38 deletions(-) diff --git a/mapreduce.go b/mapreduce.go index ea03983..7ba0700 100644 --- a/mapreduce.go +++ b/mapreduce.go @@ -3,57 +3,73 @@ package gonx import ( "bufio" "io" + "sync" ) -func (r *Reader) handleError(err error) { +func (m *Map) handleError(err error) { //fmt.Fprintln(os.Stderr, err) } -func newMap(files chan io.Reader, parser *Parser) *Reader { - reader := &Reader{ +// Log Entry map +type Map struct { + parser *Parser + entries chan Entry + wg sync.WaitGroup +} + +func oneFileChannel(file io.Reader) chan io.Reader { + ch := make(chan io.Reader, 1) + ch <- file + close(ch) + return ch +} + +func NewMap(files chan io.Reader, parser *Parser) *Map { + m := &Map{ parser: parser, - files: files, entries: make(chan Entry, 10), } for file := range files { - reader.wg.Add(1) - go reader.readFile(file) + m.wg.Add(1) + go m.readFile(file) } go func() { - reader.wg.Wait() - close(reader.entries) + m.wg.Wait() + close(m.entries) }() - return reader + return m } -func oneFileChannel(file io.Reader) chan io.Reader { - ch := make(chan io.Reader, 1) - ch <- file - close(ch) - return ch -} - -func (r *Reader) readFile(file io.Reader) { +func (m *Map) readFile(file io.Reader) { // Iterate over log file lines and spawn new mapper goroutine // to parse it into given format scanner := bufio.NewScanner(file) for scanner.Scan() { - r.wg.Add(1) + m.wg.Add(1) go func(line string) { - entry, err := r.parser.ParseString(line) + entry, err := m.parser.ParseString(line) if err == nil { - r.entries <- entry + m.entries <- entry } else { - r.handleError(err) + m.handleError(err) } - r.wg.Done() + m.wg.Done() }(scanner.Text()) } if err := scanner.Err(); err != nil { - r.handleError(err) + m.handleError(err) + } + m.wg.Done() +} + +// Read next Entry from Entries channel. Return nil if channel is closed +func (m *Map) GetEntry() *Entry { + entry, ok := <-m.entries + if !ok { + return nil } - r.wg.Done() + return &entry } diff --git a/reader.go b/reader.go index b5cbb24..2dc47f9 100644 --- a/reader.go +++ b/reader.go @@ -2,18 +2,15 @@ package gonx import ( "io" - "sync" ) type Reader struct { - parser *Parser - files chan io.Reader - entries chan Entry - wg sync.WaitGroup + entryMap *Map } func NewReader(logFile io.Reader, format string) *Reader { - return newMap(oneFileChannel(logFile), NewParser(format)) + m := NewMap(oneFileChannel(logFile), NewParser(format)) + return &Reader{entryMap: m} } func NewNginxReader(logFile io.Reader, nginxConf io.Reader, formatName string) (reader *Reader, err error) { @@ -21,16 +18,18 @@ func NewNginxReader(logFile io.Reader, nginxConf io.Reader, formatName string) ( if err != nil { return nil, err } - reader = newMap(oneFileChannel(logFile), parser) + m := NewMap(oneFileChannel(logFile), parser) + reader = &Reader{entryMap: m} return } -// Read next line from entries channel, and return parsed record. If channel is closed -// then method returns io.EOF error -func (r *Reader) Read() (entry Entry, err error) { - entry, ok := <-r.entries - if !ok { - err = io.EOF +// Read next the map. Return EOF if there is no Entries to read +func (r *Reader) Read() (Entry, error) { + // TODO return Entry reference instead of instance + entry := r.entryMap.GetEntry() + if entry == nil { + // Have to return emtry entry for backward capability + return Entry{}, io.EOF } - return + return *entry, nil } From a1d60a65ab53825e0cb2a70a0fb53e3b2d8494fd Mon Sep 17 00:00:00 2001 From: Anton Egorov Date: Mon, 18 Nov 2013 01:07:46 +0400 Subject: [PATCH 15/21] move WaitGroup manipulation inside the mapFile method --- mapreduce.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/mapreduce.go b/mapreduce.go index 7ba0700..f1b1539 100644 --- a/mapreduce.go +++ b/mapreduce.go @@ -31,8 +31,7 @@ func NewMap(files chan io.Reader, parser *Parser) *Map { } for file := range files { - m.wg.Add(1) - go m.readFile(file) + go m.mapFile(file) } go func() { @@ -43,26 +42,29 @@ func NewMap(files chan io.Reader, parser *Parser) *Map { return m } -func (m *Map) readFile(file io.Reader) { +func (m *Map) mapFile(file io.Reader) { + // Whole file should be read + m.wg.Add(1) + defer m.wg.Done() + // Iterate over log file lines and spawn new mapper goroutine // to parse it into given format scanner := bufio.NewScanner(file) for scanner.Scan() { m.wg.Add(1) go func(line string) { + defer m.wg.Done() entry, err := m.parser.ParseString(line) if err == nil { m.entries <- entry } else { m.handleError(err) } - m.wg.Done() }(scanner.Text()) } if err := scanner.Err(); err != nil { m.handleError(err) } - m.wg.Done() } // Read next Entry from Entries channel. Return nil if channel is closed From 2e2becf1d20e4bd6c14c693663f516fd04eff2cd Mon Sep 17 00:00:00 2001 From: Anton Egorov Date: Mon, 18 Nov 2013 02:30:00 +0400 Subject: [PATCH 16/21] less code and easier interface with EntryMap func --- mapreduce.go | 66 +++++++++++----------------------------------------- reader.go | 35 +++++++++++++++++++--------- 2 files changed, 38 insertions(+), 63 deletions(-) diff --git a/mapreduce.go b/mapreduce.go index f1b1539..3c1e90d 100644 --- a/mapreduce.go +++ b/mapreduce.go @@ -6,72 +6,34 @@ import ( "sync" ) -func (m *Map) handleError(err error) { +func handleError(err error) { //fmt.Fprintln(os.Stderr, err) } -// Log Entry map -type Map struct { - parser *Parser - entries chan Entry - wg sync.WaitGroup -} - -func oneFileChannel(file io.Reader) chan io.Reader { - ch := make(chan io.Reader, 1) - ch <- file - close(ch) - return ch -} - -func NewMap(files chan io.Reader, parser *Parser) *Map { - m := &Map{ - parser: parser, - entries: make(chan Entry, 10), - } - - for file := range files { - go m.mapFile(file) - } - - go func() { - m.wg.Wait() - close(m.entries) - }() - - return m -} - -func (m *Map) mapFile(file io.Reader) { - // Whole file should be read - m.wg.Add(1) - defer m.wg.Done() +// Iterate over given file and map each it's line into Entry record using parser. +// Results will be written into output Entries channel. +func EntryMap(file io.Reader, parser *Parser, output chan Entry) { + var wg sync.WaitGroup // Iterate over log file lines and spawn new mapper goroutine // to parse it into given format scanner := bufio.NewScanner(file) for scanner.Scan() { - m.wg.Add(1) + wg.Add(1) go func(line string) { - defer m.wg.Done() - entry, err := m.parser.ParseString(line) + defer wg.Done() + entry, err := parser.ParseString(line) if err == nil { - m.entries <- entry + output <- entry } else { - m.handleError(err) + handleError(err) } }(scanner.Text()) } if err := scanner.Err(); err != nil { - m.handleError(err) - } -} - -// Read next Entry from Entries channel. Return nil if channel is closed -func (m *Map) GetEntry() *Entry { - entry, ok := <-m.entries - if !ok { - return nil + handleError(err) } - return &entry + // Wait until all files will be read and all lines will be + // parsed and wrote to the Entries channel + wg.Wait() } diff --git a/reader.go b/reader.go index 2dc47f9..1069635 100644 --- a/reader.go +++ b/reader.go @@ -5,12 +5,20 @@ import ( ) type Reader struct { - entryMap *Map + entries chan Entry + file io.Reader + parser *Parser +} + +func NewEntryReader(logFile io.Reader, parser *Parser) *Reader { + return &Reader{ + file: logFile, + parser: parser, + } } func NewReader(logFile io.Reader, format string) *Reader { - m := NewMap(oneFileChannel(logFile), NewParser(format)) - return &Reader{entryMap: m} + return NewEntryReader(logFile, NewParser(format)) } func NewNginxReader(logFile io.Reader, nginxConf io.Reader, formatName string) (reader *Reader, err error) { @@ -18,18 +26,23 @@ func NewNginxReader(logFile io.Reader, nginxConf io.Reader, formatName string) ( if err != nil { return nil, err } - m := NewMap(oneFileChannel(logFile), parser) - reader = &Reader{entryMap: m} + reader = NewEntryReader(logFile, parser) return } // Read next the map. Return EOF if there is no Entries to read -func (r *Reader) Read() (Entry, error) { +func (r *Reader) Read() (entry Entry, err error) { + if r.entries == nil { + r.entries = make(chan Entry, 10) + go func() { + EntryMap(r.file, r.parser, r.entries) + close(r.entries) + }() + } // TODO return Entry reference instead of instance - entry := r.entryMap.GetEntry() - if entry == nil { - // Have to return emtry entry for backward capability - return Entry{}, io.EOF + entry, ok := <-r.entries + if !ok { + err = io.EOF } - return *entry, nil + return } From 11223c45f6093eb6c75c1eca48cebc70fb88efd1 Mon Sep 17 00:00:00 2001 From: Anton Egorov Date: Tue, 19 Nov 2013 02:20:43 +0400 Subject: [PATCH 17/21] reader entries channel is nil after init --- reader_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/reader_test.go b/reader_test.go index 63764ca..fca5cef 100644 --- a/reader_test.go +++ b/reader_test.go @@ -11,6 +11,7 @@ func TestRead(t *testing.T) { format := "$remote_addr [$time_local] \"$request\"" file := strings.NewReader(`89.234.89.123 [08/Nov/2013:13:39:18 +0000] "GET /api/foo/bar HTTP/1.1"`) reader := NewReader(file, format) + assert.Nil(t, reader.entries) expected := Entry{ "remote_addr": "89.234.89.123", From a994375568f86716c18fa987e507a13df62dd9fc Mon Sep 17 00:00:00 2001 From: Anton Egorov Date: Tue, 19 Nov 2013 02:22:29 +0400 Subject: [PATCH 18/21] control mappers spawn with semaphore, fix memory leak --- mapreduce.go | 72 ++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 56 insertions(+), 16 deletions(-) diff --git a/mapreduce.go b/mapreduce.go index 3c1e90d..71f9b08 100644 --- a/mapreduce.go +++ b/mapreduce.go @@ -13,27 +13,67 @@ func handleError(err error) { // Iterate over given file and map each it's line into Entry record using parser. // Results will be written into output Entries channel. func EntryMap(file io.Reader, parser *Parser, output chan Entry) { - var wg sync.WaitGroup + // Input file lines. This channel is unbuffered to publish + // next line to handle only when previous is taken by mapper. + var lines = make(chan string) + + // Host thread to spawn new mappers + var quit = make(chan int) + go func(topLoad int) { + // Create semafore channel with capacity equal to the output channel + // capacity. Use it to control mapper goroutines spawn. + var sem = make(chan bool, topLoad) + for i := 0; i < topLoad; i++ { + // Ready to go! + sem <- true + } + + var wg sync.WaitGroup + for { + // Wait until semaphore becomes available and run a mapper + if !<-sem { + // Stop the host loop if false received from semaphore + break + } + wg.Add(1) + go func() { + defer wg.Done() + // Take next file line to map. Check is channel closed. + line, ok := <-lines + // Return immediately if lines channel is closed + if !ok { + // Send false to semaphore channel to indicate that job's done + sem <- false + return + } + entry, err := parser.ParseString(line) + if err == nil { + // Write result Entry to the output channel. This will + // block goroutine runtime until channel is free to + // accept new item. + output <- entry + } else { + handleError(err) + } + // Increment semaphore to allow new mapper workers to spawn + sem <- true + }() + } + // Wait for all mappers to complete, then send a quit signal + wg.Wait() + quit <- 1 + }(cap(output)) - // Iterate over log file lines and spawn new mapper goroutine - // to parse it into given format scanner := bufio.NewScanner(file) for scanner.Scan() { - wg.Add(1) - go func(line string) { - defer wg.Done() - entry, err := parser.ParseString(line) - if err == nil { - output <- entry - } else { - handleError(err) - } - }(scanner.Text()) + // Read next line from the file and feed mapper routines. + lines <- scanner.Text() } + close(lines) + if err := scanner.Err(); err != nil { handleError(err) } - // Wait until all files will be read and all lines will be - // parsed and wrote to the Entries channel - wg.Wait() + + <-quit } From 72908a2290d670a90fa9fb1267bd3178fa80d134 Mon Sep 17 00:00:00 2001 From: Anton Egorov Date: Wed, 20 Nov 2013 09:55:21 +0400 Subject: [PATCH 19/21] remove NewEntryReader constructor, it is redundant --- reader.go | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/reader.go b/reader.go index 1069635..a53ab2c 100644 --- a/reader.go +++ b/reader.go @@ -5,28 +5,27 @@ import ( ) type Reader struct { - entries chan Entry file io.Reader parser *Parser + entries chan Entry } -func NewEntryReader(logFile io.Reader, parser *Parser) *Reader { +func NewReader(logFile io.Reader, format string) *Reader { return &Reader{ file: logFile, - parser: parser, + parser: NewParser(format), } } -func NewReader(logFile io.Reader, format string) *Reader { - return NewEntryReader(logFile, NewParser(format)) -} - func NewNginxReader(logFile io.Reader, nginxConf io.Reader, formatName string) (reader *Reader, err error) { parser, err := NewNginxParser(nginxConf, formatName) if err != nil { return nil, err } - reader = NewEntryReader(logFile, parser) + reader = &Reader{ + file: logFile, + parser: parser, + } return } From fb56ba9f14114e6efd1d2aa1983bb56f4d533de2 Mon Sep 17 00:00:00 2001 From: Anton Egorov Date: Wed, 20 Nov 2013 09:55:49 +0400 Subject: [PATCH 20/21] add some docs --- entry.go | 4 ++++ parser.go | 7 ++++--- reader.go | 6 +++++- 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/entry.go b/entry.go index 3f6146a..7e43fe8 100644 --- a/entry.go +++ b/entry.go @@ -4,8 +4,12 @@ import ( "fmt" ) +// Parsed log record. Use Get method to retrieve a value by name instead of +// threating this as a map, because inner representation is in design. type Entry map[string]string +// Return entry field value by name or empty string and error if it +// does not exist. func (entry *Entry) Get(name string) (value string, err error) { value, ok := (*entry)[name] if !ok { diff --git a/parser.go b/parser.go index 9a97386..0c16e4e 100644 --- a/parser.go +++ b/parser.go @@ -7,12 +7,13 @@ import ( "regexp" ) +// Log record parser. Use specific constructors to initialize it. type Parser struct { format string regexp *regexp.Regexp } -// Returns a new Parser, use givel log format to create its internal +// Returns a new Parser, use given log format to create its internal // strings parsing regexp. func NewParser(format string) *Parser { re := regexp.MustCompile(`\\\$([a-z_]+)(\\?(.))`).ReplaceAllString( @@ -41,8 +42,8 @@ func (parser *Parser) ParseString(line string) (entry Entry, err error) { return } -// NewNginxParser parse nginx conf file to find log_format with diven name and -// returns parser for this format. If returns an error if cannot find the needle. +// NewNginxParser parse nginx conf file to find log_format with given name and +// returns parser for this format. It returns an error if cannot find the needle. func NewNginxParser(conf io.Reader, name string) (parser *Parser, err error) { scanner := bufio.NewScanner(conf) re := regexp.MustCompile(fmt.Sprintf(`^.*log_format\s+%v\s+(.+)\s*$`, name)) diff --git a/reader.go b/reader.go index a53ab2c..cd8c165 100644 --- a/reader.go +++ b/reader.go @@ -4,12 +4,14 @@ import ( "io" ) +// Log file reader. Use specific constructors to create it. type Reader struct { file io.Reader parser *Parser entries chan Entry } +// Creates reader for custom log format. func NewReader(logFile io.Reader, format string) *Reader { return &Reader{ file: logFile, @@ -17,6 +19,8 @@ func NewReader(logFile io.Reader, format string) *Reader { } } +// Creates reader for nginx log format. Nginx config parser will be used +// to get particular format from the conf file. func NewNginxReader(logFile io.Reader, nginxConf io.Reader, formatName string) (reader *Reader, err error) { parser, err := NewNginxParser(nginxConf, formatName) if err != nil { @@ -29,7 +33,7 @@ func NewNginxReader(logFile io.Reader, nginxConf io.Reader, formatName string) ( return } -// Read next the map. Return EOF if there is no Entries to read +// Get next parsed Entry from the log file. Return EOF if there is no Entries to read. func (r *Reader) Read() (entry Entry, err error) { if r.entries == nil { r.entries = make(chan Entry, 10) From 39967f18274fabfdaf2112cc45b61b4218688273 Mon Sep 17 00:00:00 2001 From: Anton Egorov Date: Wed, 20 Nov 2013 12:19:06 +0400 Subject: [PATCH 21/21] Reducer interface and implementation to read input channel --- mapreduce.go | 43 ++++++++++++++++++++++++++----------------- reader.go | 6 +----- reducer.go | 24 ++++++++++++++++++++++++ reducer_test.go | 24 ++++++++++++++++++++++++ 4 files changed, 75 insertions(+), 22 deletions(-) create mode 100644 reducer.go create mode 100644 reducer_test.go diff --git a/mapreduce.go b/mapreduce.go index 71f9b08..b88c845 100644 --- a/mapreduce.go +++ b/mapreduce.go @@ -10,15 +10,18 @@ func handleError(err error) { //fmt.Fprintln(os.Stderr, err) } -// Iterate over given file and map each it's line into Entry record using parser. -// Results will be written into output Entries channel. -func EntryMap(file io.Reader, parser *Parser, output chan Entry) { +// Iterate over given file and map each it's line into Entry record using +// parser and apply reducer to the Entries channel. Execution terminates +// when result will be readed from reducer's output channel, but the mapper +// works and fills input Entries channel until all lines will be read from +// the fiven file. +func MapReduce(file io.Reader, parser *Parser, reducer Reducer) interface{} { // Input file lines. This channel is unbuffered to publish // next line to handle only when previous is taken by mapper. var lines = make(chan string) // Host thread to spawn new mappers - var quit = make(chan int) + var entries = make(chan Entry, 10) go func(topLoad int) { // Create semafore channel with capacity equal to the output channel // capacity. Use it to control mapper goroutines spawn. @@ -51,7 +54,7 @@ func EntryMap(file io.Reader, parser *Parser, output chan Entry) { // Write result Entry to the output channel. This will // block goroutine runtime until channel is free to // accept new item. - output <- entry + entries <- entry } else { handleError(err) } @@ -61,19 +64,25 @@ func EntryMap(file io.Reader, parser *Parser, output chan Entry) { } // Wait for all mappers to complete, then send a quit signal wg.Wait() - quit <- 1 - }(cap(output)) + close(entries) + }(cap(entries)) - scanner := bufio.NewScanner(file) - for scanner.Scan() { - // Read next line from the file and feed mapper routines. - lines <- scanner.Text() - } - close(lines) + // Run reducer routine. + var output = make(chan interface{}) + go reducer.Reduce(entries, output) - if err := scanner.Err(); err != nil { - handleError(err) - } + go func() { + scanner := bufio.NewScanner(file) + for scanner.Scan() { + // Read next line from the file and feed mapper routines. + lines <- scanner.Text() + } + close(lines) + + if err := scanner.Err(); err != nil { + handleError(err) + } + }() - <-quit + return <-output } diff --git a/reader.go b/reader.go index cd8c165..293482d 100644 --- a/reader.go +++ b/reader.go @@ -36,11 +36,7 @@ func NewNginxReader(logFile io.Reader, nginxConf io.Reader, formatName string) ( // Get next parsed Entry from the log file. Return EOF if there is no Entries to read. func (r *Reader) Read() (entry Entry, err error) { if r.entries == nil { - r.entries = make(chan Entry, 10) - go func() { - EntryMap(r.file, r.parser, r.entries) - close(r.entries) - }() + r.entries = MapReduce(r.file, r.parser, new(ReadAll)).(chan Entry) } // TODO return Entry reference instead of instance entry, ok := <-r.entries diff --git a/reducer.go b/reducer.go new file mode 100644 index 0000000..04e5689 --- /dev/null +++ b/reducer.go @@ -0,0 +1,24 @@ +package gonx + +// Reducer interface for Entries channel redure. +// +// Each Reduce method should accept input channel of Entries, do it's job and +// the result should be written to the output channel. +// +// It does not return values because usually it runs in a separate +// goroutine and it is handy to use channel for reduced data retrieval. +type Reducer interface { + Reduce(input chan Entry, output chan interface{}) +} + +// Implements Reducer interface for simple input entries redirection to +// the output channel. +type ReadAll struct { +} + +// Redirect input Entries channel directly to the output without any +// modifications. It is useful when you want jast to read file fast +// using asynchronous with mapper routines. +func (r *ReadAll) Reduce(input chan Entry, output chan interface{}) { + output <- input +} diff --git a/reducer_test.go b/reducer_test.go new file mode 100644 index 0000000..8abd33a --- /dev/null +++ b/reducer_test.go @@ -0,0 +1,24 @@ +package gonx + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestReadAllReducer(t *testing.T) { + reducer := new(ReadAll) + assert.Implements(t, (*Reducer)(nil), reducer) + + // Prepare import chanel + input := make(chan Entry, 1) + input <- Entry{} + + output := make(chan interface{}, 1) // Make it buffered to avoid deadlock + reducer.Reduce(input, output) + + // ReadAll reducer writes input channel to the output + result, opened := <-output + assert.True(t, opened) + _, ok := result.(chan Entry) + assert.True(t, ok) +}