forked from IBM/sarama
-
Notifications
You must be signed in to change notification settings - Fork 0
/
message_test.go
213 lines (193 loc) · 7.2 KB
/
message_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
package sarama
import (
"runtime"
"strings"
"testing"
"time"
)
var (
emptyMessage = []byte{
167, 236, 104, 3, // CRC
0x00, // magic version byte
0x00, // attribute flags
0xFF, 0xFF, 0xFF, 0xFF, // key
0xFF, 0xFF, 0xFF, 0xFF} // value
emptyV1Message = []byte{
204, 47, 121, 217, // CRC
0x01, // magic version byte
0x00, // attribute flags
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, // timestamp
0xFF, 0xFF, 0xFF, 0xFF, // key
0xFF, 0xFF, 0xFF, 0xFF} // value
emptyV2Message = []byte{
167, 236, 104, 3, // CRC
0x02, // magic version byte
0x00, // attribute flags
0xFF, 0xFF, 0xFF, 0xFF, // key
0xFF, 0xFF, 0xFF, 0xFF} // value
emptyGzipMessage = []byte{
97, 79, 149, 90, //CRC
0x00, // magic version byte
0x01, // attribute flags
0xFF, 0xFF, 0xFF, 0xFF, // key
// value
0x00, 0x00, 0x00, 0x17,
0x1f, 0x8b,
0x08,
0, 0, 9, 110, 136, 0, 255, 1, 0, 0, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0}
emptyGzipMessage18 = []byte{
132, 99, 80, 148, //CRC
0x00, // magic version byte
0x01, // attribute flags
0xFF, 0xFF, 0xFF, 0xFF, // key
// value
0x00, 0x00, 0x00, 0x17,
0x1f, 0x8b,
0x08,
0, 0, 0, 0, 0, 0, 255, 1, 0, 0, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0}
emptyLZ4Message = []byte{
132, 219, 238, 101, // CRC
0x01, // version byte
0x03, // attribute flags: lz4
0, 0, 1, 88, 141, 205, 89, 56, // timestamp
0xFF, 0xFF, 0xFF, 0xFF, // key
0x00, 0x00, 0x00, 0x0f, // len
0x04, 0x22, 0x4D, 0x18, // LZ4 magic number
100, // LZ4 flags: version 01, block indepedant, content checksum
112, 185, 0, 0, 0, 0, // LZ4 data
5, 93, 204, 2, // LZ4 checksum
}
emptyBulkSnappyMessage = []byte{
180, 47, 53, 209, //CRC
0x00, // magic version byte
0x02, // attribute flags
0xFF, 0xFF, 0xFF, 0xFF, // key
0, 0, 0, 42,
130, 83, 78, 65, 80, 80, 89, 0, // SNAPPY magic
0, 0, 0, 1, // min version
0, 0, 0, 1, // default version
0, 0, 0, 22, 52, 0, 0, 25, 1, 16, 14, 227, 138, 104, 118, 25, 15, 13, 1, 8, 1, 0, 0, 62, 26, 0}
emptyBulkGzipMessage = []byte{
139, 160, 63, 141, //CRC
0x00, // magic version byte
0x01, // attribute flags
0xFF, 0xFF, 0xFF, 0xFF, // key
0x00, 0x00, 0x00, 0x27, // len
0x1f, 0x8b, // Gzip Magic
0x08, // deflate compressed
0, 0, 0, 0, 0, 0, 0, 99, 96, 128, 3, 190, 202, 112, 143, 7, 12, 12, 255, 129, 0, 33, 200, 192, 136, 41, 3, 0, 199, 226, 155, 70, 52, 0, 0, 0}
emptyBulkLZ4Message = []byte{
246, 12, 188, 129, // CRC
0x01, // Version
0x03, // attribute flags (LZ4)
255, 255, 249, 209, 212, 181, 73, 201, // timestamp
0xFF, 0xFF, 0xFF, 0xFF, // key
0x00, 0x00, 0x00, 0x47, // len
0x04, 0x22, 0x4D, 0x18, // magic number lz4
100, // lz4 flags 01100100
// version: 01, block indep: 1, block checksum: 0, content size: 0, content checksum: 1, reserved: 00
112, 185, 52, 0, 0, 128, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 14, 121, 87, 72, 224, 0, 0, 255, 255, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 14, 121, 87, 72, 224, 0, 0, 255, 255, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0,
71, 129, 23, 111, // LZ4 checksum
}
)
func TestMessageEncoding(t *testing.T) {
message := Message{}
testEncodable(t, "empty", &message, emptyMessage)
message.Value = []byte{}
message.Codec = CompressionGZIP
if strings.HasPrefix(runtime.Version(), "go1.8") || strings.HasPrefix(runtime.Version(), "go1.9") {
testEncodable(t, "empty gzip", &message, emptyGzipMessage18)
} else {
testEncodable(t, "empty gzip", &message, emptyGzipMessage)
}
message.Value = []byte{}
message.Codec = CompressionLZ4
message.Timestamp = time.Unix(1479847795, 0)
message.Version = 1
testEncodable(t, "empty lz4", &message, emptyLZ4Message)
}
func TestMessageDecoding(t *testing.T) {
message := Message{}
testDecodable(t, "empty", &message, emptyMessage)
if message.Codec != CompressionNone {
t.Error("Decoding produced compression codec where there was none.")
}
if message.Key != nil {
t.Error("Decoding produced key where there was none.")
}
if message.Value != nil {
t.Error("Decoding produced value where there was none.")
}
if message.Set != nil {
t.Error("Decoding produced set where there was none.")
}
testDecodable(t, "empty gzip", &message, emptyGzipMessage)
if message.Codec != CompressionGZIP {
t.Error("Decoding produced incorrect compression codec (was gzip).")
}
if message.Key != nil {
t.Error("Decoding produced key where there was none.")
}
if message.Value == nil || len(message.Value) != 0 {
t.Error("Decoding produced nil or content-ful value where there was an empty array.")
}
}
func TestMessageDecodingBulkSnappy(t *testing.T) {
message := Message{}
testDecodable(t, "bulk snappy", &message, emptyBulkSnappyMessage)
if message.Codec != CompressionSnappy {
t.Errorf("Decoding produced codec %d, but expected %d.", message.Codec, CompressionSnappy)
}
if message.Key != nil {
t.Errorf("Decoding produced key %+v, but none was expected.", message.Key)
}
if message.Set == nil {
t.Error("Decoding produced no set, but one was expected.")
} else if len(message.Set.Messages) != 2 {
t.Errorf("Decoding produced a set with %d messages, but 2 were expected.", len(message.Set.Messages))
}
}
func TestMessageDecodingBulkGzip(t *testing.T) {
message := Message{}
testDecodable(t, "bulk gzip", &message, emptyBulkGzipMessage)
if message.Codec != CompressionGZIP {
t.Errorf("Decoding produced codec %d, but expected %d.", message.Codec, CompressionGZIP)
}
if message.Key != nil {
t.Errorf("Decoding produced key %+v, but none was expected.", message.Key)
}
if message.Set == nil {
t.Error("Decoding produced no set, but one was expected.")
} else if len(message.Set.Messages) != 2 {
t.Errorf("Decoding produced a set with %d messages, but 2 were expected.", len(message.Set.Messages))
}
}
func TestMessageDecodingBulkLZ4(t *testing.T) {
message := Message{}
testDecodable(t, "bulk lz4", &message, emptyBulkLZ4Message)
if message.Codec != CompressionLZ4 {
t.Errorf("Decoding produced codec %d, but expected %d.", message.Codec, CompressionLZ4)
}
if message.Key != nil {
t.Errorf("Decoding produced key %+v, but none was expected.", message.Key)
}
if message.Set == nil {
t.Error("Decoding produced no set, but one was expected.")
} else if len(message.Set.Messages) != 2 {
t.Errorf("Decoding produced a set with %d messages, but 2 were expected.", len(message.Set.Messages))
}
}
func TestMessageDecodingVersion1(t *testing.T) {
message := Message{Version: 1}
testDecodable(t, "decoding empty v1 message", &message, emptyV1Message)
}
func TestMessageDecodingUnknownVersions(t *testing.T) {
message := Message{Version: 2}
err := decode(emptyV2Message, &message)
if err == nil {
t.Error("Decoding did not produce an error for an unknown magic byte")
}
if err.Error() != "kafka: error decoding packet: unknown magic byte (2)" {
t.Error("Decoding an unknown magic byte produced an unknown error ", err)
}
}