Skip to content

Commit

Permalink
Merge pull request #1 from satyrius/map-reduce
Browse files Browse the repository at this point in the history
Map reduce pattern for asynchronous file reading
  • Loading branch information
satyrius committed Nov 20, 2013
2 parents 2d2ad0f + 39967f1 commit b434570
Show file tree
Hide file tree
Showing 19 changed files with 546 additions and 167 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
.env

# Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o
*.a
Expand Down
7 changes: 2 additions & 5 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,5 @@ language: go
go:
- 1.1
- tip
before_install:
- go get github.com/mattn/gom
script:
- $HOME/gopath/bin/gom install
- $HOME/gopath/bin/gom test
install: make deps
script: go test -v -bench .
1 change: 0 additions & 1 deletion Gomfile

This file was deleted.

12 changes: 12 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
test:
go test -v .

bench:
go test -bench .

deps:
go get github.com/stretchr/testify

dev-deps:
go get github.com/nsf/gocode
go get code.google.com/p/rog-go/exp/cmd/godef
23 changes: 23 additions & 0 deletions benchmarks/bufio_read.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Example program that reads big nginx file from stdin line by line
// and measure reading time. The file should be big enough, at least 500K lines
package main

import (
"bufio"
"fmt"
"os"
"time"
)

func main() {
scanner := bufio.NewScanner(os.Stdin)
var count int
start := time.Now()
for scanner.Scan() {
// A dummy action, jest read line by line
scanner.Text()
count++
}
duration := time.Since(start)
fmt.Printf("%v lines readed, it takes %v\n", count, duration)
}
29 changes: 29 additions & 0 deletions benchmarks/bufio_read_entry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Example program that reads big nginx file from stdin line by line
// and measure reading time. The file should be big enough, at least 500K lines
package main

import (
gonx ".."
"bufio"
"fmt"
"os"
"time"
)

func main() {
scanner := bufio.NewScanner(os.Stdin)
var count int
format := `$remote_addr - $remote_user [$time_local] "$request" $status ` +
`$body_bytes_sent "$http_referer" "$http_user_agent" "$http_x_forwarded_for" ` +
`"$cookie_uid" "$cookie_userid" "$request_time" "$http_host" "$is_ajax" ` +
`"$uid_got/$uid_set" "$msec" "$geoip_country_code"`
parser := gonx.NewParser(format)
start := time.Now()
for scanner.Scan() {
// A dummy action, jest read line by line
parser.ParseString(scanner.Text())
count++
}
duration := time.Since(start)
fmt.Printf("%v lines readed, it takes %v\n", count, duration)
}
36 changes: 36 additions & 0 deletions benchmarks/gonx_read_entry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Example program that reads big nginx file from stdin line by line
// and measure reading time. The file should be big enough, at least 500K lines
package main

import (
gonx ".."
"fmt"
"io"
"os"
"runtime"
"time"
)

func init() {
numcpu := runtime.NumCPU()
runtime.GOMAXPROCS(numcpu + 1)
}

func main() {
var count int
format := `$remote_addr - $remote_user [$time_local] "$request" $status ` +
`$body_bytes_sent "$http_referer" "$http_user_agent" "$http_x_forwarded_for" ` +
`"$cookie_uid" "$cookie_userid" "$request_time" "$http_host" "$is_ajax" ` +
`"$uid_got/$uid_set" "$msec" "$geoip_country_code"`
reader := gonx.NewReader(os.Stdin, format)
start := time.Now()
for {
_, err := reader.Read()
if err == io.EOF {
break
}
count++
}
duration := time.Since(start)
fmt.Printf("%v lines readed, it takes %v\n", count, duration)
}
19 changes: 19 additions & 0 deletions entry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package gonx

import (
"fmt"
)

// Parsed log record. Use Get method to retrieve a value by name instead of
// threating this as a map, because inner representation is in design.
type Entry map[string]string

// Return entry field value by name or empty string and error if it
// does not exist.
func (entry *Entry) Get(name string) (value string, err error) {
value, ok := (*entry)[name]
if !ok {
err = fmt.Errorf("Field '%v' does not found in record %+v", name, *entry)
}
return
}
20 changes: 20 additions & 0 deletions entry_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package gonx

import (
"github.com/stretchr/testify/assert"
"testing"
)

func TestEntry(t *testing.T) {
entry := Entry{"foo": "1"}

// Get existings field
val, err := entry.Get("foo")
assert.NoError(t, err)
assert.Equal(t, val, "1")

// Get field that does not exist
val, err = entry.Get("bar")
assert.Error(t, err)
assert.Equal(t, val, "")
}
2 changes: 1 addition & 1 deletion example/common/common.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package main

import (
gonx "../.."
"flag"
"fmt"
"github.com/satyrius/gonx"
"io"
"os"
"strings"
Expand Down
40 changes: 33 additions & 7 deletions example/nginx/nginx.go
Original file line number Diff line number Diff line change
@@ -1,33 +1,59 @@
package main

import (
gonx "../.."
"flag"
"fmt"
"github.com/satyrius/gonx"
"io"
"os"
"strings"
)

var conf string
var format string
var logFile string

func init() {
flag.StringVar(&conf, "conf", "/etc/nginx/nginx.conf", "Nginx config file")
flag.StringVar(&conf, "conf", "dummy", "Nginx config file (e.g. /etc/nginx/nginx.conf)")
flag.StringVar(&format, "format", "main", "Nginx log_format name")
flag.StringVar(&logFile, "log", "dummy", "Log file name to read. Read from STDIN if file name is '-'")
}

func main() {
flag.Parse()

// Read given file or from STDIN
var file io.Reader
if logFile == "dummy" {
file = strings.NewReader(`89.234.89.123 [08/Nov/2013:13:39:18 +0000] "GET /api/foo/bar HTTP/1.1"`)
} else if logFile == "-" {
file = os.Stdin
} else {
file, err := os.Open(logFile)
if err != nil {
panic(err)
}
defer file.Close()
}

// Use nginx config file to extract format by the name
nginxConfig, err := os.Open(conf)
if err != nil {
panic(err)
var nginxConfig io.Reader
if conf == "dummy" {
nginxConfig = strings.NewReader(`
http {
log_format main '$remote_addr [$time_local] "$request"';
}
`)
} else {
nginxConfig, err := os.Open(conf)
if err != nil {
panic(err)
}
defer nginxConfig.Close()
}
defer nginxConfig.Close()

// Read from STDIN and use log_format to parse log records
reader, err := gonx.NewNginxReader(os.Stdin, nginxConfig, format)
reader, err := gonx.NewNginxReader(file, nginxConfig, format)
if err != nil {
panic(err)
}
Expand Down
88 changes: 88 additions & 0 deletions mapreduce.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package gonx

import (
"bufio"
"io"
"sync"
)

func handleError(err error) {
//fmt.Fprintln(os.Stderr, err)
}

// Iterate over given file and map each it's line into Entry record using
// parser and apply reducer to the Entries channel. Execution terminates
// when result will be readed from reducer's output channel, but the mapper
// works and fills input Entries channel until all lines will be read from
// the fiven file.
func MapReduce(file io.Reader, parser *Parser, reducer Reducer) interface{} {
// Input file lines. This channel is unbuffered to publish
// next line to handle only when previous is taken by mapper.
var lines = make(chan string)

// Host thread to spawn new mappers
var entries = make(chan Entry, 10)
go func(topLoad int) {
// Create semafore channel with capacity equal to the output channel
// capacity. Use it to control mapper goroutines spawn.
var sem = make(chan bool, topLoad)
for i := 0; i < topLoad; i++ {
// Ready to go!
sem <- true
}

var wg sync.WaitGroup
for {
// Wait until semaphore becomes available and run a mapper
if !<-sem {
// Stop the host loop if false received from semaphore
break
}
wg.Add(1)
go func() {
defer wg.Done()
// Take next file line to map. Check is channel closed.
line, ok := <-lines
// Return immediately if lines channel is closed
if !ok {
// Send false to semaphore channel to indicate that job's done
sem <- false
return
}
entry, err := parser.ParseString(line)
if err == nil {
// Write result Entry to the output channel. This will
// block goroutine runtime until channel is free to
// accept new item.
entries <- entry
} else {
handleError(err)
}
// Increment semaphore to allow new mapper workers to spawn
sem <- true
}()
}
// Wait for all mappers to complete, then send a quit signal
wg.Wait()
close(entries)
}(cap(entries))

// Run reducer routine.
var output = make(chan interface{})
go reducer.Reduce(entries, output)

go func() {
scanner := bufio.NewScanner(file)
for scanner.Scan() {
// Read next line from the file and feed mapper routines.
lines <- scanner.Text()
}
close(lines)

if err := scanner.Err(); err != nil {
handleError(err)
}
}()

return <-output
}
Loading

0 comments on commit b434570

Please sign in to comment.