-
Notifications
You must be signed in to change notification settings - Fork 0
/
pool.go
167 lines (145 loc) · 3.97 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
package gophernaut
import (
"bufio"
"context"
"fmt"
"io"
"log"
"net/http"
"os"
"os/exec"
"os/signal"
"strings"
)
// Event is basically just an enum
type Event int
// Events that can be generated by our child processes
//go:generate stringer -type=Event
const (
Start Event = iota
Shutdown
PiningForTheFjords
)
func copyToLog(dst *log.Logger, src io.Reader) {
scanner := bufio.NewScanner(src)
for scanner.Scan() {
dst.Print(scanner.Text())
}
}
func startProcess(control <-chan Event, events chan<- Event, executable string) {
procLog := log.New(os.Stdout, fmt.Sprintf("gopher-worker(%s) ", executable), log.Ldate|log.Ltime)
commandParts := strings.Split(executable, " ")
command := exec.Command(commandParts[0], commandParts[1:]...)
log.Printf("Command: %v\n", command)
stdout, err := command.StdoutPipe()
if err != nil {
procLog.Fatalln("Unable to connect to stdout from command...")
}
stderr, err := command.StderrPipe()
if err != nil {
procLog.Fatalln("Unable to connect to stderr from command...")
}
go copyToLog(procLog, stdout)
go copyToLog(procLog, stderr)
// Actually start the subprocess and wait for it to startup
command.Start()
// Push an event onto the channel for manage processes to track the new process
events <- Start
for {
_, ok := <-control
if !ok {
fmt.Println("Killing worker process after receiving close event.")
command.Process.Kill()
events <- Shutdown
break
}
}
}
// Worker ...
type Worker struct {
Hostname string
requestCount int
pool *Pool
busy bool
}
// StartRequest marks this work as busy
func (w *Worker) StartRequest() {
w.busy = true
w.requestCount++
log.Printf("Worker %s request %d starting...\n", w.Hostname, w.requestCount)
}
// CompleteRequest frees this worker
func (w *Worker) CompleteRequest() {
w.busy = false
w.pool.workerChannel <- w
log.Printf("Worker %s request %d complete!\n", w.Hostname, w.requestCount)
}
// GetRequestCount accessor
func (w *Worker) GetRequestCount() int {
return w.requestCount
}
// Pool manages the pool of Worker processes to which gophernaut dispatches
// requests.
type Pool struct {
Executables []string
Hostnames []string
Size int
Workers []*Worker
workerChannel chan *Worker
requestCount int
stoppedCount int
processCount int
controlChannel chan Event
eventsChannel chan Event
}
// Start up the pool
func (p *Pool) Start() {
p.controlChannel = make(chan Event)
p.eventsChannel = make(chan Event)
p.workerChannel = make(chan *Worker, p.Size)
// Handle signals to try to do a graceful shutdown:
receivedSignals := make(chan os.Signal, 1)
signal.Notify(receivedSignals, os.Interrupt) // , syscall.SIGINT, syscall.SIGTERM)
go func() {
for sig := range receivedSignals {
fmt.Printf("Received signal, %s, shutting down workers...\n", sig)
break
}
close(p.controlChannel)
signal.Stop(receivedSignals)
}()
log.Printf("Starting processes... %d\n", len(p.Executables))
// Actually start some processes
for index, executable := range p.Executables {
log.Printf("creating worker...\n")
w := Worker{Hostname: p.Hostnames[index], pool: p}
log.Printf("Queuing worker...\n")
p.workerChannel <- &w
p.Workers = append(p.Workers, &w)
log.Printf("Starting worker process...\n")
go startProcess(p.controlChannel, p.eventsChannel, executable)
}
}
// GetWorker retrieves a worker from the pool and updates the pools
// state to indicate that a request is in progress
func (p *Pool) GetWorker() *Worker {
p.requestCount++
log.Printf("Pool starting request %d...", p.requestCount)
return <-p.workerChannel
}
// ManageProcesses waits for processes to start waits for graceful shutdown
func (p *Pool) ManageProcesses(s *http.Server) {
for event := range p.eventsChannel {
switch event {
case Shutdown:
p.stoppedCount++
case Start:
p.processCount++
}
if p.processCount == p.stoppedCount {
log.Printf("%d workers stopped, shutting down.\n", p.processCount)
c := context.Background()
s.Shutdown(c)
}
}
}