forked from segmentio/analytics-go
-
Notifications
You must be signed in to change notification settings - Fork 1
/
message.go
143 lines (119 loc) · 3.5 KB
/
message.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package analytics
import (
"encoding/json"
"time"
)
// Callback interface is used by analytics clients to notify
// the application when a message send succeeded or failed.
//
// Callback methods are called by a client's internal goroutines, there are no
// guarantees on which goroutine will trigger the callbacks, the calls can be
// made sequentially or in parallel, the order doesn't depend on the order of
// messages were queued to the client.
//
// Callback methods must return quickly and not cause long blocking operations
// to avoid interferring with the client's internal work flow.
type Callback interface {
// This method is called for every message that was successfully sent to
// the API.
Success(Message)
// This method is called for every message that failed to be sent to the
// API and will be discarded by the client.
Failure(Message, error)
}
// Message interface is used to represent analytics objects that can be sent via
// a client.
//
// Types like analytics.Track, analytics.Page, etc... implement this interface
// and therefore can be passed to the analytics.Client.Send method.
type Message interface {
// Returns tags associated with the message.
tags() []string
// Validates the internal structure of the message, the method must return
// nil if the message is valid, or an error describing what went wrong.
validate() error
}
// Takes a message id as first argument and returns it, unless it's the zero-
// value, in that case the default id passed as second argument is returned.
func makeMessageID(id string, def string) string {
if len(id) == 0 {
return def
}
return id
}
// Returns the time value passed as first argument, unless it's the zero-value,
// in that case the default value passed as second argument is returned.
func makeTimestamp(t Time, def Time) Time {
if t == (Time(time.Time{})) {
return def
}
return t
}
// This structure represents objects sent to the /v1/batch endpoint. We don't
// export this type because it's only meant to be used internally to send groups
// of messages in one API call.
type batch struct {
MessageID string `json:"messageId"`
SentAt Time `json:"sentAt"`
Messages []message `json:"batch"`
Context *Context `json:"context"`
}
type serializedMessage struct {
msg Message
json []byte
}
func (m *serializedMessage) MarshalJSON() ([]byte, error) {
return m.json, nil
}
func (m *serializedMessage) Msg() Message {
return m.msg
}
func (m *serializedMessage) size() int {
// The `+ 1` is for the comma that sits between each items of a JSON array.
return len(m.json) + 1
}
type message interface {
MarshalJSON() ([]byte, error)
Msg() Message
size() int
}
func makeMessage(m Message, maxBytes int) (message, error) {
result := &serializedMessage{msg: m}
b, err := json.Marshal(m)
if err != nil {
return result, err
}
if len(b) > maxBytes {
return result, ErrMessageTooBig
}
result.json = b
return result, nil
}
type messageQueue struct {
pending []message
bytes int
maxBatchSize int
maxBatchBytes int
}
func (q *messageQueue) push(m message) (b []message) {
if (q.bytes + m.size()) > q.maxBatchBytes {
b = q.flush()
}
if q.pending == nil {
q.pending = make([]message, 0, q.maxBatchSize)
}
q.pending = append(q.pending, m)
q.bytes += m.size()
if b == nil && len(q.pending) == q.maxBatchSize {
b = q.flush()
}
return
}
func (q *messageQueue) flush() (msgs []message) {
msgs, q.pending, q.bytes = q.pending, nil, 0
return
}
const (
maxBatchBytes = 500000
maxMessageBytes = 15000
)