forked from kubernetes-retired/contrib
-
Notifications
You must be signed in to change notification settings - Fork 0
/
kubelet-parser.go
339 lines (302 loc) · 11.1 KB
/
kubelet-parser.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"bufio"
"encoding/json"
"fmt"
"log"
"os"
"path"
"regexp"
"sort"
"time"
)
// This parser does not require additional probes for kubelet tracing. It is currently used by node-perf-dash.
// TODO(coufon): we plan to adopt event for tracing in future.
const (
// TODO(coufon): define it in Kubernetes packages (perftype) and use it.
tracingVersion = "v1"
// Timestamp format of test result log (build-log.txt).
testLogTimeFormat = "2006 Jan 2 15:04:05.000"
// Timestamp format of kubelet log (kubelet.log).
kubeletLogTimeFormat = "2006 0102 15:04:05.000000"
// Probe names.
probeFirstseen = "pod_config_change"
probeRuntime = "runtime_manager"
probeContainerStartPLEG = "container_start_pleg"
probeContainerStartPLEGSync = "container_start_pleg_sync"
probeStatusUpdate = "pod_status_running"
// Infra container starts.
probeInfraContainerPLEG = "infra_container_start_pleg"
probeInfraContainerPLEGSync = "infra_container_start_pleg_sync"
// Test container starts.
probeTestContainerPLEG = "container_start_pleg"
probeTestContainerPLEGSync = "container_start_pleg_sync"
probeTestPodStart = "pod_running"
)
var (
// TODO(coufon): we need a string of year because year is missing in log timestamp.
// Using the current year is a simple temporary solution.
currentYear = fmt.Sprintf("%d", time.Now().Year())
// Kubelet parser do not leverage on additional tracing probes. It uses the native log of Kubelet instead.
// The mapping from native log to tracing probes is as follows:
regexMap = map[string]*regexp.Regexp{
// TODO(coufon): there is no volume now in our node-e2e performance test. Add probe for volume manager in future.
probeFirstseen: regexp.MustCompile(`[IW](\d{2}\d{2} \d{2}:\d{2}:\d{2}.\d{6}).*kubelet.go.*SyncLoop \(ADD, \"api\"\): \".+\"`),
probeRuntime: regexp.MustCompile(`[IW](\d{2}\d{2} \d{2}:\d{2}:\d{2}.\d{6}).*docker_manager.go.*Need to restart pod infra container for.*because it is not found.*`),
// 'container starts' log printed by PLEG.
probeContainerStartPLEG: regexp.MustCompile(`[IW](\d{2}\d{2} \d{2}:\d{2}:\d{2}.\d{6}) .* GenericPLEG: ([^\/]*)\/([^:]*): .* -> running`),
// 'container starts' PLEG event printed by SyncLoop.
probeContainerStartPLEGSync: regexp.MustCompile(`[IW](\d{2}\d{2} \d{2}:\d{2}:\d{2}.\d{6}).*kubelet.go.*SyncLoop \(PLEG\): \".*\((.*)\)\".*Type:"ContainerStarted", Data:"(.*)".*`),
probeStatusUpdate: regexp.MustCompile(`[IW](\d{2}\d{2} \d{2}:\d{2}:\d{2}.\d{6}).*status_manager.go.*Status for pod \".*\((.*)\)\" updated successfully.*Phase:Running.*`),
probeTestPodStart: regexp.MustCompile(`[IW](\d{2}\d{2} \d{2}:\d{2}:\d{2}.\d{6}) .* server.go.*Event.*UID:\"([^\"]*)\", .* type: 'Normal' reason: 'Started' Started container.*`),
}
// We do not process logs for cAdvisor pod. Use this regex to filter them out.
regexMapCadvisorLog = regexp.MustCompile(`.*cadvisor.*`)
)
// TestTimeRange contains test name and its end time.
type TestTimeRange struct {
TestName string
EndTime time.Time
}
// SortedTestTime is a sorted list of TestTimeRange.
type SortedTestTime []TestTimeRange
func (a SortedTestTime) Len() int { return len(a) }
func (a SortedTestTime) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a SortedTestTime) Less(i, j int) bool { return a[i].EndTime.Before(a[j].EndTime) }
// SortedTestTimePerNode records SortedTestTime for each node.
type SortedTestTimePerNode map[string](SortedTestTime)
// TestTime records the end time for one test on one node.
type TestTime map[string](map[string]time.Time)
// Add adds one end time into TestTime.
func (ete TestTime) Add(testName, nodeName, Endtime string) {
if _, ok := ete[nodeName]; !ok {
ete[nodeName] = make(map[string]time.Time)
}
if _, ok := ete[nodeName][testName]; !ok {
end, err := time.Parse(testLogTimeFormat, currentYear+" "+Endtime)
if err != nil {
log.Fatal(err)
}
ete[nodeName][testName] = end
}
}
// Sort sorts all end time of tests for each node and return a map of sorted array.
func (ete TestTime) Sort() SortedTestTimePerNode {
sortedTestTimePerNode := make(SortedTestTimePerNode)
for nodeName, testTimeMap := range ete {
sortedTestTime := SortedTestTime{}
for testName, endTime := range testTimeMap {
sortedTestTime = append(sortedTestTime,
TestTimeRange{
TestName: testName,
EndTime: endTime,
})
}
sort.Sort(sortedTestTime)
sortedTestTimePerNode[nodeName] = sortedTestTime
}
return sortedTestTimePerNode
}
// TracingData contains the tracing data of a test on a node in the format of time series data.
type TracingData struct {
Labels map[string]string `json:"labels"`
Version string `json:"version"`
Data map[string]int64arr `json:"op_series"`
}
// int64arr is a wrapper of sortable int64 array.
type int64arr []int64
func (a int64arr) Len() int { return len(a) }
func (a int64arr) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a int64arr) Less(i, j int) bool { return a[i] < a[j] }
// ParseKubeletLog calls GrabTracingKubelet to parse tracing data while using test end time from build-log.txt to separate tests.
// It returns the parsed tracing data as a string in time series data format.
func ParseKubeletLog(d Downloader, job string, buildNumber int, testTime TestTime) string {
sortedTestTimePerNode := testTime.Sort()
result := ""
for nodeName, sortedTestTime := range sortedTestTimePerNode {
result += GrabTracingKubelet(d, job, buildNumber,
nodeName, sortedTestTime)
}
return result
}
// PodState records the state of a pod from parsed kubelet log. The state is used for parsing.
type PodState struct {
ContainerNrPLEG int
ContainerNrPLEGSync int
StatusUpdated bool
}
// GrabTracingKubelet parse tracing data using kubelet.log.
func GrabTracingKubelet(d Downloader, job string, buildNumber int, nodeName string,
sortedTestTime SortedTestTime) string {
// Return empty string if there is no test in list.
if len(sortedTestTime) == 0 {
return ""
}
file, err := d.GetFile(job, buildNumber, path.Join("artifacts", nodeName, kubeletLogFile))
if err != nil {
fmt.Fprintf(os.Stderr, "Error while fetching tracing data event: %v\n", err)
log.Fatal(err)
}
defer file.Close()
scanner := bufio.NewScanner(file)
result := ""
currentTestIndex := 0
testStarted := false
tracingData := TracingData{
Labels: map[string]string{
"test": sortedTestTime[currentTestIndex].TestName,
"node": nodeName,
},
Version: tracingVersion,
Data: map[string]int64arr{},
}
statePerPod := map[string]*PodState{}
for scanner.Scan() {
line := scanner.Text()
if regexMapCadvisorLog.Match([]byte(line)) {
continue
}
// Found a tracing event in kubelet log.
detectedEntry := parseLogEntry([]byte(line), statePerPod)
if detectedEntry != nil {
// Detect whether the log timestamp is out of current test time range.
if sortedTestTime[currentTestIndex].EndTime.Before(detectedEntry.Timestamp) {
currentTestIndex++
if currentTestIndex >= len(sortedTestTime) {
break
}
tracingData.SortData()
result += timeSeriesTag + tracingData.ToSeriesData() + "\n\n" +
timeSeriesEnd + "\n"
// Move on to the next test.
tracingData = TracingData{
Labels: map[string]string{
"test": sortedTestTime[currentTestIndex].TestName,
"node": nodeName,
},
Version: tracingVersion,
Data: map[string]int64arr{},
}
statePerPod = map[string]*PodState{}
testStarted = false
}
if detectedEntry.Probe == probeFirstseen {
testStarted = true
}
if testStarted == false {
continue
}
tracingData.AppendData(detectedEntry.Probe, detectedEntry.Timestamp.UnixNano())
}
}
if err := scanner.Err(); err != nil {
log.Fatal(err)
}
tracingData.SortData()
return result + timeSeriesTag + tracingData.ToSeriesData() + "\n\n" +
timeSeriesEnd + "\n"
}
// DetectedEntry records a parsed tracing event and timestamp.
type DetectedEntry struct {
Probe string
Timestamp time.Time
}
// parseLogEntry parses one line in Kubelet log.
func parseLogEntry(line []byte, statePerPod map[string]*PodState) *DetectedEntry {
for probe, regex := range regexMap {
if regex.Match(line) {
matchResult := regex.FindSubmatch(line)
if matchResult != nil {
ts, err := time.Parse(kubeletLogTimeFormat, currentYear+" "+string(matchResult[1]))
if err != nil {
log.Fatal("Error: can not parse log timestamp in kubelet.log")
}
switch probe {
// 'container starts' reported by PLEG event.
case probeContainerStartPLEG:
{
pod := string(matchResult[2])
if _, ok := statePerPod[pod]; !ok {
statePerPod[pod] = &PodState{ContainerNrPLEG: 1}
} else {
statePerPod[pod].ContainerNrPLEG++
}
// In our test the pod contains an infra container and test container.
switch statePerPod[pod].ContainerNrPLEG {
case 1:
probe = probeInfraContainerPLEG
case 2:
probe = probeTestContainerPLEG
default:
return nil
}
}
// 'container starts' detected by PLEG reported in Kublet SyncPod.
case probeContainerStartPLEGSync:
{
pod := string(matchResult[2])
if _, ok := statePerPod[pod]; !ok {
statePerPod[pod] = &PodState{ContainerNrPLEGSync: 1}
} else {
statePerPod[pod].ContainerNrPLEGSync++
}
// In our test the pod contains an infra container and test container.
switch statePerPod[pod].ContainerNrPLEGSync {
case 1:
probe = probeInfraContainerPLEGSync
case 2:
probe = probeTestContainerPLEGSync
default:
return nil
}
}
// 'pod running' reported by Kubelet status manager.
case probeStatusUpdate:
{
// We only trace the first status update event.
pod := string(matchResult[2])
if _, ok := statePerPod[pod]; !ok {
statePerPod[pod] = &PodState{}
}
if statePerPod[pod].StatusUpdated {
return nil
}
statePerPod[pod].StatusUpdated = true
}
}
return &DetectedEntry{Probe: probe, Timestamp: ts}
}
}
}
return nil
}
// AppendData adds a new tracing event into tracing data.
func (td *TracingData) AppendData(probe string, timestamp int64) {
td.Data[probe] = append(td.Data[probe], timestamp)
}
// SortData sorts all time series data.
func (td *TracingData) SortData() {
for _, arr := range td.Data {
sort.Sort(arr)
}
}
// ToSeriesData returns stringified tracing data in JSON.
func (td *TracingData) ToSeriesData() string {
seriesData, err := json.Marshal(td)
if err != nil {
log.Fatal(err)
}
return string(seriesData)
}