-
Notifications
You must be signed in to change notification settings - Fork 18
/
describe_logdirs_response.go
151 lines (127 loc) · 4.25 KB
/
describe_logdirs_response.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package healer
import (
"encoding/binary"
"fmt"
)
// DescribeLogDirsResponse is a response of DescribeLogDirsRequest
type DescribeLogDirsResponse struct {
CoordinatorID uint32 `json:"-"`
ThrottleTimeMS int32 `json:"throttle_time_ms"`
Results []DescribeLogDirsResponseResult `json:"results"`
}
func (r DescribeLogDirsResponse) Error() error {
for _, result := range r.Results {
if result.ErrorCode != 0 {
return KafkaError(result.ErrorCode)
}
}
return nil
}
type DescribeLogDirsResponseResult struct {
ErrorCode int16 `json:"error_code"`
LogDir string `json:"log_dir"`
Topics []DescribeLogDirsResponseTopic `json:"topics"`
}
type DescribeLogDirsResponseTopic struct {
TopicName string `json:"topic"`
Partitions []DescribeLogDirsResponsePartition `json:"partitions"`
}
func decodeToDescribeLogDirsResponseTopic(payload []byte, version uint16) (r DescribeLogDirsResponseTopic, offset int, err error) {
l := int(binary.BigEndian.Uint16(payload[offset:]))
offset += 2
r.TopicName = string(payload[offset : offset+l])
offset += l
numPartitions := int(binary.BigEndian.Uint32(payload[offset:]))
offset += 4
if numPartitions == -1 {
r.Partitions = nil
return
} else if numPartitions == 0 {
r.Partitions = []DescribeLogDirsResponsePartition{}
return
} else if numPartitions < 0 {
err = fmt.Errorf("describe_logdirs response numPartitions < 0: %d", numPartitions)
return
} else {
r.Partitions = make([]DescribeLogDirsResponsePartition, numPartitions)
}
var o int
for i := 0; i < numPartitions; i++ {
r.Partitions[i], o = decodeToDescribeLogDirsResponsePartition(payload[offset:], version)
offset += o
}
return
}
type DescribeLogDirsResponsePartition struct {
PartitionID int32 `json:"partition_id"`
Size int64 `json:"size"`
OffsetLag int64 `json:"offset_lag"`
IsFutureKey bool `json:"is_future_key"`
}
func decodeToDescribeLogDirsResponsePartition(payload []byte, version uint16) (r DescribeLogDirsResponsePartition, offset int) {
r.PartitionID = int32(binary.BigEndian.Uint32(payload[offset:]))
offset += 4
r.Size = int64(binary.BigEndian.Uint64(payload[offset:]))
offset += 8
r.OffsetLag = int64(binary.BigEndian.Uint64(payload[offset:]))
offset += 8
r.IsFutureKey = payload[offset] != 0
offset++
return
}
// NewDescribeLogDirsResponse create a DescribeLogDirsResponse from the given payload
func NewDescribeLogDirsResponse(payload []byte, version uint16) (r DescribeLogDirsResponse, err error) {
offset := 0
responseLength := int(binary.BigEndian.Uint32(payload))
if responseLength+4 != len(payload) {
return r, fmt.Errorf("describe_logdirs response length did not match: %d!=%d", responseLength+4, len(payload))
}
offset += 4
r.CoordinatorID = binary.BigEndian.Uint32(payload)
offset += 4
r.ThrottleTimeMS = int32(binary.BigEndian.Uint32(payload[offset:]))
offset += 4
numResults := int(binary.BigEndian.Uint32(payload[offset:]))
offset += 4
if numResults == -1 {
r.Results = nil
return r, nil
} else if numResults == 0 {
r.Results = []DescribeLogDirsResponseResult{}
return r, nil
} else if numResults < 0 {
return r, fmt.Errorf("describe_logdirs response numResults < 0: %d", numResults)
} else {
r.Results = make([]DescribeLogDirsResponseResult, numResults)
}
for i := 0; i < numResults; i++ {
r.Results[i].ErrorCode = int16(binary.BigEndian.Uint16(payload[offset:]))
offset += 2
l := int(binary.BigEndian.Uint16(payload[offset:]))
offset += 2
r.Results[i].LogDir = string(payload[offset : offset+l])
offset += l
numTopics := int(binary.BigEndian.Uint32(payload[offset:]))
offset += 4
if numTopics == -1 {
r.Results[i].Topics = nil
continue
} else if numTopics == 0 {
r.Results[i].Topics = []DescribeLogDirsResponseTopic{}
continue
} else if numTopics < 0 {
return r, fmt.Errorf("describe_logdirs response numTopics < 0: %d", numTopics)
} else {
r.Results[i].Topics = make([]DescribeLogDirsResponseTopic, numTopics)
}
var o int
for j := 0; j < numTopics; j++ {
r.Results[i].Topics[j], o, err = decodeToDescribeLogDirsResponseTopic(payload[offset:], version)
offset += o
if err != nil {
return
}
}
}
return
}