forked from karrick/goavro
-
Notifications
You must be signed in to change notification settings - Fork 0
/
array.go
217 lines (199 loc) · 7.7 KB
/
array.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
package goavro
import (
"fmt"
"io"
"math"
"reflect"
)
func makeArrayCodec(st map[string]*Codec, enclosingNamespace string, schemaMap map[string]interface{}) (*Codec, error) {
// array type must have items
itemSchema, ok := schemaMap["items"]
if !ok {
return nil, fmt.Errorf("Array ought to have items key")
}
itemCodec, err := buildCodec(st, enclosingNamespace, itemSchema)
if err != nil {
return nil, fmt.Errorf("Array items ought to be valid Avro type: %s", err)
}
return &Codec{
typeName: &name{"array", nullNamespace},
nativeFromBinary: func(buf []byte) (interface{}, []byte, error) {
var value interface{}
var err error
// block count and block size
if value, buf, err = longNativeFromBinary(buf); err != nil {
return nil, nil, fmt.Errorf("cannot decode binary array block count: %s", err)
}
blockCount := value.(int64)
if blockCount < 0 {
// NOTE: A negative block count implies there is a long encoded
// block size following the negative block count. We have no use
// for the block size in this decoder, so we read and discard
// the value.
if blockCount == math.MinInt64 {
// The minimum number for any signed numerical type can never be made positive
return nil, nil, fmt.Errorf("cannot decode binary array with block count: %d", math.MinInt64)
}
blockCount = -blockCount // convert to its positive equivalent
if _, buf, err = longNativeFromBinary(buf); err != nil {
return nil, nil, fmt.Errorf("cannot decode binary array block size: %s", err)
}
}
// Ensure block count does not exceed some sane value.
if blockCount > MaxBlockCount {
return nil, nil, fmt.Errorf("cannot decode binary array when block count exceeds MaxBlockCount: %d > %d", blockCount, MaxBlockCount)
}
// NOTE: While the attempt of a RAM optimization shown below is not
// necessary, many encoders will encode all items in a single block.
// We can optimize amount of RAM allocated by runtime for the array
// by initializing the array for that number of items.
arrayValues := make([]interface{}, 0, blockCount)
for blockCount != 0 {
// Decode `blockCount` datum values from buffer
for i := int64(0); i < blockCount; i++ {
if value, buf, err = itemCodec.nativeFromBinary(buf); err != nil {
return nil, nil, fmt.Errorf("cannot decode binary array item %d: %s", i+1, err)
}
arrayValues = append(arrayValues, value)
}
// Decode next blockCount from buffer, because there may be more blocks
if value, buf, err = longNativeFromBinary(buf); err != nil {
return nil, nil, fmt.Errorf("cannot decode binary array block count: %s", err)
}
blockCount = value.(int64)
if blockCount < 0 {
// NOTE: A negative block count implies there is a long
// encoded block size following the negative block count. We
// have no use for the block size in this decoder, so we
// read and discard the value.
if blockCount == math.MinInt64 {
// The minimum number for any signed numerical type can
// never be made positive
return nil, nil, fmt.Errorf("cannot decode binary array with block count: %d", math.MinInt64)
}
blockCount = -blockCount // convert to its positive equivalent
if _, buf, err = longNativeFromBinary(buf); err != nil {
return nil, nil, fmt.Errorf("cannot decode binary array block size: %s", err)
}
}
// Ensure block count does not exceed some sane value.
if blockCount > MaxBlockCount {
return nil, nil, fmt.Errorf("cannot decode binary array when block count exceeds MaxBlockCount: %d > %d", blockCount, MaxBlockCount)
}
}
return arrayValues, buf, nil
},
binaryFromNative: func(buf []byte, datum interface{}) ([]byte, error) {
arrayValues, err := convertArray(datum)
if err != nil {
return nil, fmt.Errorf("cannot encode binary array: %s", err)
}
arrayLength := int64(len(arrayValues))
var alreadyEncoded, remainingInBlock int64
for i, item := range arrayValues {
if remainingInBlock == 0 { // start a new block
remainingInBlock = arrayLength - alreadyEncoded
if remainingInBlock > MaxBlockCount {
// limit block count to MacBlockCount
remainingInBlock = MaxBlockCount
}
buf, _ = longBinaryFromNative(buf, remainingInBlock)
}
if buf, err = itemCodec.binaryFromNative(buf, item); err != nil {
return nil, fmt.Errorf("cannot encode binary array item %d: %v: %s", i+1, item, err)
}
remainingInBlock--
alreadyEncoded++
}
return longBinaryFromNative(buf, 0) // append trailing 0 block count to signal end of Array
},
nativeFromTextual: func(buf []byte) (interface{}, []byte, error) {
var arrayValues []interface{}
var value interface{}
var err error
var b byte
if buf, err = advanceAndConsume(buf, '['); err != nil {
return nil, nil, fmt.Errorf("cannot decode textual array: %s", err)
}
if buf, _ = advanceToNonWhitespace(buf); len(buf) == 0 {
return nil, nil, fmt.Errorf("cannot decode textual array: %s", io.ErrShortBuffer)
}
// NOTE: Special case for empty array
if buf[0] == ']' {
return arrayValues, buf[1:], nil
}
// NOTE: Also terminates when read ']' byte.
for len(buf) > 0 {
// decode value
value, buf, err = itemCodec.nativeFromTextual(buf)
if err != nil {
return nil, nil, fmt.Errorf("cannot decode textual array: %s", err)
}
arrayValues = append(arrayValues, value)
// either comma or closing curly brace
if buf, _ = advanceToNonWhitespace(buf); len(buf) == 0 {
return nil, nil, fmt.Errorf("cannot decode textual array: %s", io.ErrShortBuffer)
}
switch b = buf[0]; b {
case ']':
return arrayValues, buf[1:], nil
case ',':
// no-op
default:
return nil, nil, fmt.Errorf("cannot decode textual array: expected ',' or ']'; received: %q", b)
}
// NOTE: consume comma from above
if buf, _ = advanceToNonWhitespace(buf[1:]); len(buf) == 0 {
return nil, nil, fmt.Errorf("cannot decode textual array: %s", io.ErrShortBuffer)
}
}
return nil, buf, io.ErrShortBuffer
},
textualFromNative: func(buf []byte, datum interface{}) ([]byte, error) {
arrayValues, err := convertArray(datum)
if err != nil {
return nil, fmt.Errorf("cannot encode textual array: %s", err)
}
var atLeastOne bool
buf = append(buf, '[')
for i, item := range arrayValues {
atLeastOne = true
// Encode value
buf, err = itemCodec.textualFromNative(buf, item)
if err != nil {
// field was specified in datum; therefore its value was invalid
return nil, fmt.Errorf("cannot encode textual array item %d; %v: %s", i+1, item, err)
}
buf = append(buf, ',')
}
if atLeastOne {
return append(buf[:len(buf)-1], ']'), nil
}
return append(buf, ']'), nil
},
}, nil
}
// convertArray converts interface{} to []interface{} if possible.
func convertArray(datum interface{}) ([]interface{}, error) {
arrayValues, ok := datum.([]interface{})
if ok {
return arrayValues, nil
}
// NOTE: When given a slice of any other type, zip values to
// items as a convenience to client.
v := reflect.ValueOf(datum)
if v.Kind() != reflect.Slice {
return nil, fmt.Errorf("cannot create []interface{}: expected slice; received: %T", datum)
}
// NOTE: Two better alternatives to the current algorithm are:
// (1) mutate the reflection tuple underneath to convert the
// []int, for example, to []interface{}, with O(1) complexity
// (2) use copy builtin to zip the data items over with O(n) complexity,
// but more efficient than what's below.
// Suggestions?
arrayValues = make([]interface{}, v.Len())
for idx := 0; idx < v.Len(); idx++ {
arrayValues[idx] = v.Index(idx).Interface()
}
return arrayValues, nil
}