diff --git a/Gopkg.lock b/Gopkg.lock deleted file mode 100644 index 6378c60..0000000 --- a/Gopkg.lock +++ /dev/null @@ -1,178 +0,0 @@ -# This file is autogenerated, do not edit; changes may be undone by the next 'dep ensure'. - - -[[projects]] - digest = "1:932aa48a82654523f58abe261c890a6170ee19afd16edf80ab894826ac4fe650" - name = "github.com/avast/retry-go" - packages = ["."] - pruneopts = "UT" - revision = "48dfe711bb728e1c3c6a82ae93b07c3ab0750d4a" - version = "v2.1.0" - -[[projects]] - digest = "1:9874425f935d8483b5fc9b7b2c6d7d57305521565899b9b498f7bdf413abc329" - name = "github.com/aws/aws-sdk-go" - packages = [ - "aws", - "aws/awserr", - "aws/awsutil", - "aws/client", - "aws/client/metadata", - "aws/corehandlers", - "aws/credentials", - "aws/credentials/ec2rolecreds", - "aws/credentials/endpointcreds", - "aws/credentials/processcreds", - "aws/credentials/stscreds", - "aws/csm", - "aws/defaults", - "aws/ec2metadata", - "aws/endpoints", - "aws/request", - "aws/session", - "aws/signer/v4", - "internal/ini", - "internal/s3err", - "internal/sdkio", - "internal/sdkrand", - "internal/sdkuri", - "internal/shareddefaults", - "private/protocol", - "private/protocol/eventstream", - "private/protocol/eventstream/eventstreamapi", - "private/protocol/query", - "private/protocol/query/queryutil", - "private/protocol/rest", - "private/protocol/restxml", - "private/protocol/xml/xmlutil", - "service/s3", - "service/s3/s3iface", - "service/s3/s3manager", - "service/sts", - ] - pruneopts = "UT" - revision = "f778816ebd8f4a83264c207c77e6bcd0a5a76603" - version = "v1.19.1" - -[[projects]] - digest = "1:553f73a4171c265045ae4f5d3122429ecf2c9c0c232c91f336127fe45480104a" - name = "github.com/cenkalti/backoff" - packages = ["."] - pruneopts = "UT" - revision = "62661b46c4093e2c1f38d943e663db1a29873e80" - version = "v2.1.0" - -[[projects]] - digest = "1:bb81097a5b62634f3e9fec1014657855610c82d19b9a40c17612e32651e35dca" - name = "github.com/jmespath/go-jmespath" - packages = ["."] - pruneopts = "UT" - revision = "c2b33e84" - -[[projects]] - branch = "master" - digest = "1:d38f81081a389f1466ec98192cf9115a82158854d6f01e1c23e2e7554b97db71" - name = "github.com/rcrowley/go-metrics" - packages = ["."] - pruneopts = "UT" - revision = "3113b8401b8a98917cde58f8bbd42a1b1c03b1fd" - -[[projects]] - branch = "master" - digest = "1:ec22607298efbafb122c4e4b2ca8ceb6c4ec1ba72e74705b84f9f2ce8ee77984" - name = "github.com/segmentio/backo-go" - packages = ["."] - pruneopts = "UT" - revision = "204274ad699c0983a70203a566887f17a717fef4" - -[[projects]] - digest = "1:99200fae8f08f08cb5021232dddf70adb35d860c10206c553401344fe2908a9c" - name = "github.com/segmentio/conf" - packages = ["."] - pruneopts = "UT" - revision = "a2a7fb2aaa3c99893d06150d50195c0be197f5d5" - version = "1.0.0" - -[[projects]] - digest = "1:338c877d3e0136603b78a7c852cd120422e5787c6c2c5f338230ba2d9771af8f" - name = "github.com/segmentio/go-snakecase" - packages = ["."] - pruneopts = "UT" - revision = "3ecf343f213326f182b8477c337a2be66c9205b0" - version = "v1.0.0" - -[[projects]] - digest = "1:329ee641534f22c5bb5ddb9e775ba2352268cec78f0eb810197047f85dc31bb1" - name = "github.com/segmentio/objconv" - packages = [ - ".", - "adapters", - "adapters/net", - "adapters/net/mail", - "adapters/net/url", - "json", - "objutil", - "yaml", - ] - pruneopts = "UT" - revision = "7a1d7b8e6f3551b30751e6b2ea6bae500883870e" - version = "v1.0.1" - -[[projects]] - branch = "master" - digest = "1:2f9538da7381bb981fb8451b2b0fb304c2bc7f1724228dc336a8f002e6fec704" - name = "github.com/xtgo/uuid" - packages = ["."] - pruneopts = "UT" - revision = "a0b114877d4caeffbd7f87e3757c17fce570fea7" - -[[projects]] - digest = "1:aa7ab4fd27851221e4153921472ef7e3c1b048eaacca8e87c73fb7a094250dd7" - name = "github.com/zorkian/go-datadog-api" - packages = ["."] - pruneopts = "UT" - revision = "f3f6d2f4859047aae0cac1ce3d16689608480fd9" - version = "v2.18.0" - -[[projects]] - digest = "1:dad6cd59e8bb8d209194efbd3850b5d054384a63061671bfc2a6b2cc0929c731" - name = "gopkg.in/go-playground/mold.v2" - packages = [ - ".", - "modifiers", - ] - pruneopts = "UT" - revision = "6bc20ce40733e8e04c2fda4523a957dd8e198948" - version = "v2.2.0" - -[[projects]] - branch = "v2" - digest = "1:b6539350da50de0d3c9b83ae587c06b89be9cb5750443bdd887f1c4077f57776" - name = "gopkg.in/validator.v2" - packages = ["."] - pruneopts = "UT" - revision = "135c24b11c19e52befcae2ec3fca5d9b78c4e98e" - -[[projects]] - digest = "1:342378ac4dcb378a5448dd723f0784ae519383532f5e70ade24132c4c8693202" - name = "gopkg.in/yaml.v2" - packages = ["."] - pruneopts = "UT" - revision = "5420a8b6744d3b0345ab293f6fcba19c978f1183" - version = "v2.2.1" - -[solve-meta] - analyzer-name = "dep" - analyzer-version = 1 - input-imports = [ - "github.com/avast/retry-go", - "github.com/aws/aws-sdk-go/aws/session", - "github.com/aws/aws-sdk-go/service/s3/s3manager", - "github.com/rcrowley/go-metrics", - "github.com/segmentio/backo-go", - "github.com/segmentio/conf", - "github.com/xtgo/uuid", - "github.com/zorkian/go-datadog-api", - ] - solver-name = "gps-cdcl" - solver-version = 1 diff --git a/Gopkg.toml b/Gopkg.toml deleted file mode 100644 index cf27efb..0000000 --- a/Gopkg.toml +++ /dev/null @@ -1,31 +0,0 @@ -[[constraint]] - branch = "master" - name = "github.com/segmentio/backo-go" - -[[constraint]] - name = "github.com/segmentio/conf" - version = "1.0.0" - -[[constraint]] - branch = "master" - name = "github.com/xtgo/uuid" - -[prune] - go-tests = true - unused-packages = true - -[[constraint]] - branch = "master" - name = "github.com/rcrowley/go-metrics" - -[[constraint]] - name = "github.com/zorkian/go-datadog-api" - version = "2.18.0" - -[[constraint]] - name = "github.com/avast/retry-go" - version = "2.1.0" - -[[constraint]] - name = "github.com/aws/aws-sdk-go" - version = "1.19.1" diff --git a/analytics.go b/analytics.go index ce23ab1..3d63777 100644 --- a/analytics.go +++ b/analytics.go @@ -7,6 +7,7 @@ import ( "io" "io/ioutil" "net/http" + "strconv" "sync" "time" @@ -183,6 +184,11 @@ func (c *client) Enqueue(msg Message) (err error) { m.MessageId = makeMessageID(m.MessageId, id) m.Timestamp = makeTimestamp(m.Timestamp, ts) msg = m + case TrackObj: + m.Type = "track" + m.MessageId = makeMessageID(m.MessageId, id) + m.Timestamp = makeTimestamp(m.Timestamp, ts) + msg = m } defer func() { @@ -285,7 +291,7 @@ func (c *client) upload(b []byte) error { req.Header.Add("User-Agent", "analytics-go (version: "+Version+")") req.Header.Add("Content-Type", "application/json") - req.Header.Add("Content-Length", string(len(b))) + req.Header.Add("Content-Length", strconv.Itoa(len(b))) req.Header.Add("x-api-key", c.key) res, err := c.http.Do(req) @@ -429,3 +435,17 @@ func (c *client) notifyFailure(msgs []message, err error) { } } } + +func (c *client) notifyFailureMsg(m Message, err error, count int64) { + c.failureCounters(m.tags()...).Inc(count) + if c.Callback != nil { + c.Callback.Failure(m, err) + } +} + +func (c *client) notifySuccessMsg(m Message, count int64) { + c.successCounters(m.tags()...).Inc(count) + if c.Callback != nil { + c.Callback.Success(m) + } +} diff --git a/analytics_test.go b/analytics_test.go index 64c6303..d0db92d 100644 --- a/analytics_test.go +++ b/analytics_test.go @@ -177,6 +177,70 @@ func mockServer() (chan []byte, *httptest.Server) { return done, server } +func ExampleTrackObj() { + body, server := mockServer() + defer server.Close() + + client, _ := NewWithConfig("h97jamjwbh", Config{ + Endpoint: server.URL, + BatchSize: 1, + now: mockTime, + uid: mockId, + }) + defer client.Close() + + type msg struct { + Application string `json:"application"` + Version string `json:"version"` + Platform string `json:"platform"` + } + + client.Enqueue(TrackObj{ + Track: Track{ + Event: "Download", + UserId: "123456", + }, + Properties: &msg{ + Application: "Segment Desktop", + Version: "1.1.0", + Platform: "osx", + }, + }) + + s := strings.Replace(string(<-body), + fmt.Sprintf(`"version": "%s"`, Version), + `"version": "3.4.0"`, + -1, + ) + + fmt.Printf("%s\n", s) + // Output: + // { + // "batch": [ + // { + // "event": "Download", + // "messageId": "I'm unique", + // "properties": { + // "application": "Segment Desktop", + // "platform": "osx", + // "version": "1.1.0" + // }, + // "timestamp": 1257894000000, + // "type": "track", + // "userId": "123456" + // } + // ], + // "context": { + // "library": { + // "name": "analytics-go", + // "version": "3.4.0" + // } + // }, + // "messageId": "I'm unique", + // "sentAt": 1257894000000 + // } +} + func ExampleTrack() { body, server := mockServer() defer server.Close() diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..2ece438 --- /dev/null +++ b/go.mod @@ -0,0 +1,23 @@ +module github.com/FindHotel/analytics-go + +go 1.14 + +require ( + github.com/avast/retry-go v2.1.0+incompatible + github.com/aws/aws-sdk-go v1.19.1 + github.com/cenkalti/backoff v2.1.0+incompatible // indirect + github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af // indirect + github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a + github.com/segmentio/backo-go v0.0.0-20160424052352-204274ad699c + github.com/segmentio/conf v1.0.0 + github.com/segmentio/go-snakecase v1.0.0 // indirect + github.com/segmentio/objconv v1.0.1 // indirect + github.com/stretchr/objx v0.3.0 // indirect + github.com/stretchr/testify v1.6.1 + github.com/xtgo/uuid v0.0.0-20140804021211-a0b114877d4c + github.com/zorkian/go-datadog-api v2.18.0+incompatible + gopkg.in/go-playground/mold.v2 v2.2.0 // indirect + gopkg.in/validator.v2 v2.0.0-20180514200540-135c24b11c19 // indirect + gopkg.in/yaml.v2 v2.2.1 // indirect + gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..ea1adab --- /dev/null +++ b/go.sum @@ -0,0 +1,46 @@ +github.com/avast/retry-go v2.1.0+incompatible h1:NDQfwOYTuYSbKEwFu+dx5YiU3jANx9n4NW2ZCzYL3AI= +github.com/avast/retry-go v2.1.0+incompatible/go.mod h1:XtSnn+n/sHqQIpZ10K1qAevBhOOCWBLXXy3hyiqqBrY= +github.com/aws/aws-sdk-go v1.19.1 h1:8kOP0/XGJwXIFlYoD1DAtA39cAjc15Iv/QiDMKitD9U= +github.com/aws/aws-sdk-go v1.19.1/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= +github.com/cenkalti/backoff v2.1.0+incompatible h1:FIRvWBZrzS4YC7NT5cOuZjexzFvIr+Dbi6aD1cZaNBk= +github.com/cenkalti/backoff v2.1.0+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM= +github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ= +github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/segmentio/backo-go v0.0.0-20160424052352-204274ad699c h1:rsRTAcCR5CeNLkvgBVSjQoDGRRt6kggsE6XYBqCv2KQ= +github.com/segmentio/backo-go v0.0.0-20160424052352-204274ad699c/go.mod h1:kJ9mm9YmoWSkk+oQ+5Cj8DEoRCX2JT6As4kEtIIOp1M= +github.com/segmentio/conf v1.0.0 h1:oRF4BtoJbI/+I7fUngYMnMcKFbjqVUFi8hv4Pp0l88w= +github.com/segmentio/conf v1.0.0/go.mod h1:y0VyxYAlU2slxCjm7XX7tGKFlN39bwHCZrbOpCcLsr8= +github.com/segmentio/go-snakecase v1.0.0 h1:FSeHpP0sBL3O+MCpxvQZrS5a51WAki6gposZuwVE9L4= +github.com/segmentio/go-snakecase v1.0.0/go.mod h1:jk1miR5MS7Na32PZUykG89Arm+1BUSYhuGR6b7+hJto= +github.com/segmentio/objconv v1.0.1 h1:QjfLzwriJj40JibCV3MGSEiAoXixbp4ybhwfTB8RXOM= +github.com/segmentio/objconv v1.0.1/go.mod h1:auayaH5k3137Cl4SoXTgrzQcuQDmvuVtZgS0fb1Ahys= +github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.3.0 h1:NGXK3lHquSN08v5vWalVI/L8XU9hdzE/G6xsrze47As= +github.com/stretchr/objx v0.3.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/xtgo/uuid v0.0.0-20140804021211-a0b114877d4c h1:3lbZUMbMiGUW/LMkfsEABsc5zNT9+b1CvsJx47JzJ8g= +github.com/xtgo/uuid v0.0.0-20140804021211-a0b114877d4c/go.mod h1:UrdRz5enIKZ63MEE3IF9l2/ebyx59GyGgPi+tICQdmM= +github.com/zorkian/go-datadog-api v2.18.0+incompatible h1:7JZOVDO8qDaXDKPAzTgiJahU3IoDyzxbLDwoT0U9n0w= +github.com/zorkian/go-datadog-api v2.18.0+incompatible/go.mod h1:PkXwHX9CUQa/FpB9ZwAD45N1uhCW4MT/Wj7m36PbKss= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/go-playground/mold.v2 v2.2.0 h1:Y4IYB4/HYQfuq43zaKh6vs9cVelLE9qbqe2fkyfCTWQ= +gopkg.in/go-playground/mold.v2 v2.2.0/go.mod h1:XMyyRsGtakkDPbxXbrA5VODo6bUXyvoDjLd5l3T0XoA= +gopkg.in/validator.v2 v2.0.0-20180514200540-135c24b11c19 h1:WB265cn5OpO+hK3pikC9hpP1zI/KTwmyMFKloW9eOVc= +gopkg.in/validator.v2 v2.0.0-20180514200540-135c24b11c19/go.mod h1:o4V0GXN9/CAmCsvJ0oXYZvrZOe7syiDZSN1GWGZTGzc= +gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 h1:tQIYjPdBoyREyB9XMu+nnTclpTYkz2zFM+lzLJFO4gQ= +gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/s3client.go b/s3client.go index 8e3d366..cb9324b 100644 --- a/s3client.go +++ b/s3client.go @@ -1,11 +1,15 @@ package analytics import ( + "bufio" "bytes" "compress/gzip" "encoding/json" - "fmt" "io" + "io/ioutil" + "log" + "os" + "path/filepath" "sync" "time" @@ -14,11 +18,17 @@ import ( "github.com/aws/aws-sdk-go/service/s3/s3manager" ) +type uploader interface { + Upload(input *s3manager.UploadInput, options ...func(*s3manager.Uploader)) (*s3manager.UploadOutput, error) +} + type s3Client struct { *client config S3ClientConfig apiContext *apiContext - uploader *s3manager.Uploader + uploader uploader + //s3client works only with one type of msg + tagsOnlyMsg tagsOnlyMsg } // S3 is a configuration for s3Client. @@ -30,8 +40,12 @@ type S3 struct { // Examples: tuna, salmon, haring, etc. Each system receives its own stream. Stream string + // MaxBatchBytes size repsresents the size of buffer or file and when events are flushed MaxBatchBytes int + // BufferFilePath if specified the temp file will be used to store the data + BufferFilePath string + KeyConstructor func(now func() Time, uid func() string) string UploaderOptions []func(*s3manager.Uploader) @@ -56,6 +70,8 @@ func NewS3ClientWithConfig(config S3ClientConfig) (Client, error) { return nil, err } + client.msgs = make(chan Message, 1024) // overrite the buffer + sess := session.Must(session.NewSession()) uploader := s3manager.NewUploader(sess, cfg.S3.UploaderOptions...) @@ -88,52 +104,55 @@ func (c *s3Client) loop() { ex := newExecutor(c.maxConcurrentRequests) defer ex.close() - mq := messageQueue{ + bw := bufferedEncoder{ maxBatchSize: c.BatchSize, - maxBatchBytes: c.config.S3.MaxBatchBytes, + maxBatchBytes: int64(c.config.S3.MaxBatchBytes), + newBufFunc: func() encodedBuffer { + return c.newBuffer(c.config.S3.BufferFilePath, c.config.S3.MaxBatchBytes) + }, } + bw.init() for { select { case msg := <-c.msgs: - c.push(&mq, msg, wg, ex) + c.push(&bw, msg, wg, ex) case <-tick.C: - c.flush(&mq, wg, ex) + c.flush(&bw, wg, ex) case <-c.quit: + log.Println("exit requested – draining messages") c.debugf("exit requested – draining messages") // Drain the msg channel, we have to close it first so no more // messages can be pushed and otherwise the loop would never end. close(c.msgs) for msg := range c.msgs { - c.push(&mq, msg, wg, ex) + c.push(&bw, msg, wg, ex) } - c.flush(&mq, wg, ex) + c.flush(&bw, wg, ex) + defer bw.buf.Close() c.debugf("exit") return } } } -func (c *s3Client) push(q *messageQueue, m Message, wg *sync.WaitGroup, ex *executor) { - var msg message - var err error - - if msg, err = makeTargetMessage(m, maxMessageBytes, c.apiContext, c.now); err != nil { - c.errorf("%s - %v", err, m) - c.notifyFailure([]message{msg}, err) - return +func (c *s3Client) newBuffer(path string, size int) encodedBuffer { + if path == "" { + return newMemBuffer(size) } - c.debugf("buffer (%d/%d) %v", len(q.pending), c.BatchSize, m) - - if msgs := q.push(msg); msgs != nil { - c.debugf("exceeded messages batch limit with batch of %d messages – flushing", len(msgs)) - c.sendAsync(msgs, wg, ex) + buf, err := newFileBuffer(path) + if err != nil { + c.errorf("invalid file name", err) + // fallback to a small membuffer + return newMemBuffer(1024) } + + return buf } type apiContext struct { @@ -149,12 +168,10 @@ type targetMessage struct { Event Message `json:"event"` SentAt Time `json:"sentAt"` ReceivedAt Time `json:"receivedAt"` - - json []byte } func (m *targetMessage) MarshalJSON() ([]byte, error) { - return m.json, nil + return json.Marshal(m) } func (m *targetMessage) Msg() Message { @@ -162,32 +179,72 @@ func (m *targetMessage) Msg() Message { } func (m *targetMessage) size() int { - return len(m.json) + return -1 +} + +// dummy message to store flags +type tagsOnlyMsg struct { + t []string +} + +func (m *tagsOnlyMsg) tags() []string { + return m.t +} + +func (m *tagsOnlyMsg) validate() error { + return nil +} + +func (c *s3Client) push(encoder *bufferedEncoder, m Message, wg *sync.WaitGroup, ex *executor) { + c.setTagsIfExsist(m) + + ready, err := encodeMessage(encoder, m, c.apiContext, c.now) + if err != nil { + c.errorf("cant encode message: ", err) + c.notifyFailureMsg(m, err, 1) + } + c.debugf("buffer (%d/%d) %v", encoder.messages, c.BatchSize, m) + + if ready { + c.debugf("exceeded messages batch limit with batch of %d messages – flushing", encoder.messages) + c.sendAsync(encoder, wg, ex) + } } -// makeTargetMessage constructs targetMessage instance. -func makeTargetMessage(m Message, maxBytes int, apiContext *apiContext, now func() Time) (message, error) { +// we need this functio to send metrics +func (c *s3Client) setTagsIfExsist(m Message) { + if len(c.tagsOnlyMsg.t) == 0 { + c.tagsOnlyMsg.t = m.tags() + } +} + +func encodeMessage(bw *bufferedEncoder, m Message, ctx *apiContext, now func() Time) (ready bool, err error) { ts := now() - result := targetMessage{ - APIContext: apiContext, + msg := targetMessage{ + APIContext: ctx, Event: m, SentAt: ts, ReceivedAt: ts, } - type alias targetMessage - b, err := json.Marshal(alias(result)) - if err != nil { - return &result, err - } - if len(b) > maxBytes { - return &result, ErrMessageTooBig - } - result.json = b - return &result, nil + type alias targetMessage // we won't use json.Marshaller implementation + + return bw.Push(alias(msg)) } // Asychronously send a batched requests. -func (c *s3Client) sendAsync(msgs []message, wg *sync.WaitGroup, ex *executor) { +func (c *s3Client) sendAsync(bw *bufferedEncoder, wg *sync.WaitGroup, ex *executor) { + if bw.BytesLen() == 0 { + c.errorf("empty buffer, send is not possible") + return + } + + msgs := bw.TotalMsgs() + buf, err := bw.CommitBuffer() + if err != nil { + c.errorf("can't flush gzip, send is not possible") + return + } + wg.Add(1) if !ex.do(func() { @@ -200,81 +257,50 @@ func (c *s3Client) sendAsync(msgs []message, wg *sync.WaitGroup, ex *executor) { c.errorf("panic - %s", err) } }() - c.send(msgs) + c.send(buf, msgs) }) { wg.Done() c.errorf("sending messages failed - %s", ErrTooManyRequests) - c.notifyFailure(msgs, ErrTooManyRequests) + c.notifyFailureMsg(&c.tagsOnlyMsg, ErrTooManyRequests, int64(bw.TotalMsgs())) } } -func (c *s3Client) flush(q *messageQueue, wg *sync.WaitGroup, ex *executor) { - if msgs := q.flush(); msgs != nil { - c.debugf("flushing %d messages", len(msgs)) - c.sendAsync(msgs, wg, ex) +func (c *s3Client) flush(bw *bufferedEncoder, wg *sync.WaitGroup, ex *executor) { + msgs := bw.TotalMsgs() + if msgs > 0 { + c.debugf("flushing %d messages", msgs) + c.sendAsync(bw, wg, ex) } } // Send batch request. -func (c *s3Client) send(msgs []message) { +func (c *s3Client) send(buf encodedBuffer, msgs int) { const attempts = 10 - var err error - - buf := &bytes.Buffer{} - wr := gzip.NewWriter(buf) - encoder := json.NewEncoder(wr) + defer buf.Close() - marshalledMessages := []message{} - failedMessages := []message{} - var lastError error - - for _, m := range msgs { - err = encoder.Encode(m) + for i := 0; i != attempts; i++ { + reader, err := buf.Reader() if err != nil { - failedMessages = append(failedMessages, m) - lastError = err - } else { - marshalledMessages = append(marshalledMessages, m) + c.errorf("can't get reader", err) } - } - if len(failedMessages) > 0 { - c.errorf("marshalling message - %s", lastError) - c.notifyFailure(failedMessages, lastError) - } - if err = wr.Close(); err != nil { - c.errorf("flushing writer failed: %s", err) - return - } - - if buf.Len() == 0 || len(marshalledMessages) == 0 { - c.errorf("empty buffer, send is not possible") - return - } - - for i := 0; i != attempts; i++ { - if err = c.upload(buf); err == nil { - c.notifySuccess(marshalledMessages) + if err = c.upload(reader); err == nil { + c.notifySuccessMsg(&c.tagsOnlyMsg, int64(msgs)) return } // Wait for either a retry timeout or the client to be closed. select { case <-time.After(c.RetryAfter(i)): - err = fmt.Errorf("%d messages dropped because of error: %s", len(msgs), err) - c.errorf(err.Error()) - c.notifyFailure(marshalledMessages, err) + c.errorf("%d messages dropped because of error: %s", msgs, err) return case <-c.quit: - err = fmt.Errorf("%d messages dropped because they failed to be sent and the client was closed, upload error: %s", len(msgs), err) - c.errorf(err.Error()) - c.notifyFailure(marshalledMessages, err) + c.errorf("%d messages dropped because they failed to be sent and the client was closed, upload error: %s", msgs, err) return } } - c.errorf("%d messages dropped because they failed to be sent after %d attempts", len(msgs), attempts) - c.notifyFailure(marshalledMessages, err) + c.errorf("%d messages dropped because they failed to be sent after %d attempts", msgs, attempts) } // Upload batch to S3. @@ -292,4 +318,166 @@ func (c *s3Client) upload(r io.Reader) error { return err } -func stringPtr(s string) *string { return &s } +type bufferedEncoder struct { + maxBatchSize int + maxBatchBytes int64 + + newBufFunc func() encodedBuffer + + buf encodedBuffer + encoder *json.Encoder + gziper *gzip.Writer + messages int +} + +func (q *bufferedEncoder) BytesLen() int64 { + return q.buf.Size() +} + +func (q *bufferedEncoder) TotalMsgs() int { + return q.messages +} + +func (q *bufferedEncoder) CommitBuffer() (encodedBuffer, error) { + err := q.gziper.Close() + if err != nil { + return nil, err + } + + oldbuff := q.buf + q.init() + + return oldbuff, nil +} + +func (q *bufferedEncoder) Push(v interface{}) (bool, error) { + if err := q.encoder.Encode(v); err != nil { + return false, err + } + q.messages++ + + if q.buf.Size() >= q.maxBatchBytes { + return true, nil + } + + if q.messages >= q.maxBatchSize { + return true, nil + } + + return false, nil +} + +func (q *bufferedEncoder) init() { + q.buf = q.newBufFunc() + q.gziper = gzip.NewWriter(q.buf) + q.encoder = json.NewEncoder(q.gziper) + q.messages = 0 +} + +type encodedBuffer interface { + io.WriteCloser + Size() int64 + Reader() (io.Reader, error) + Reset() error +} + +type memBuffer struct { + buf *bytes.Buffer +} + +func newMemBuffer(size int) *memBuffer { + var buf bytes.Buffer + if size > 0 { + buf.Grow(size) + } + + return &memBuffer{ + buf: &buf, + } +} + +func (m *memBuffer) Write(p []byte) (n int, err error) { + return m.buf.Write(p) +} + +func (m *memBuffer) Reader() (io.Reader, error) { + return bytes.NewReader(m.buf.Bytes()), nil +} + +func (m *memBuffer) Reset() error { + m.buf.Reset() + return nil +} + +func (m *memBuffer) Size() int64 { + return int64(m.buf.Len()) +} + +func (m *memBuffer) Close() error { + return nil +} + +type fileBuffer struct { + fd *os.File + writer *bufio.Writer + reader *bufio.Reader + size int64 +} + +func newFileBuffer(path string) (*fileBuffer, error) { + dir, file := filepath.Split(path) + + fd, err := ioutil.TempFile(dir, file) + if err != nil { + return nil, err + } + return &fileBuffer{ + fd: fd, + writer: bufio.NewWriter(fd), + reader: bufio.NewReader(fd), + }, nil +} + +func (m *fileBuffer) Write(p []byte) (n int, err error) { + n, err = m.writer.Write(p) + if err != nil { + return n, err + } + + m.size += int64(n) + return n, nil +} + +func (m *fileBuffer) Reader() (io.Reader, error) { + if err := m.writer.Flush(); err != nil { + return nil, err + } + + if _, err := m.fd.Seek(0, io.SeekStart); err != nil { + return nil, err + } + + return io.LimitReader(m.reader, m.size), nil +} + +func (m *fileBuffer) Reset() error { + m.size = 0 + if _, err := m.fd.Seek(0, io.SeekStart); err != nil { + return err + } + m.writer.Reset(m.fd) + m.reader.Reset(m.fd) + + return nil +} + +func (m *fileBuffer) Size() int64 { + return m.size +} + +func (m *fileBuffer) Close() error { + fileName := m.fd.Name() + m.fd.Close() + + return os.Remove(fileName) +} diff --git a/s3client_test.go b/s3client_test.go index 0a2bef2..f487c3f 100644 --- a/s3client_test.go +++ b/s3client_test.go @@ -1,36 +1,123 @@ package analytics import ( + "bufio" + "bytes" + "compress/gzip" "encoding/json" + "io" + "io/ioutil" + "log" + "path/filepath" "testing" + "time" + + "github.com/aws/aws-sdk-go/service/s3/s3manager" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestTargetMessageMarshalling(t *testing.T) { - m := Track{ - Event: "FooBared", - UserId: "tuna", - Properties: map[string]interface{}{ - "index": 1, - "qwer": 3424, + type msg struct { + Index int `json:"index"` + Qwer int `json:"qwer"` + } + m := TrackObj{ + Track: Track{ + Event: "FooBared", + UserId: "tuna", + }, + Properties: msg{ + Index: 1, + Qwer: 3424, }, } - tm, err := makeTargetMessage(m, 10000, nil, func() Time { return Time{} }) - if err != nil { - t.Error(err) + + encoder := &bufferedEncoder{ + maxBatchSize: 100, + maxBatchBytes: 10, + newBufFunc: func() encodedBuffer { + return newMemBuffer(1024) + }, } - b, err := json.Marshal(tm) - if err != nil { - t.Error(err) + encoder.init() + + _, err := encodeMessage(encoder, m, nil, func() Time { return Time{} }) + require.NoError(t, err) + + buf, err := encoder.CommitBuffer() + require.NoError(t, err) + reader, err := buf.Reader() + require.NoError(t, err) + + result := readAndUngzip(t, reader) + + expected := `{"event":{"userId":"tuna","event":"FooBared","timestamp":0,"properties":{"index":1,"qwer":3424}},"sentAt":0,"receivedAt":0}` + "\n" + + require.Equal(t, expected, string(result)) +} + +func readAndUngzip(t *testing.T, r io.Reader) []byte { + q, err := gzip.NewReader(r) + require.NoError(t, err) + defer q.Close() + + d, err := ioutil.ReadAll(q) + require.NoError(t, err) + return d +} + +func Test_encodedBuffers(t *testing.T) { + fileBuf, err := newFileBuffer(filePath) + require.NoError(t, err) + defer fileBuf.Close() + + tests := map[string]struct { + buf encodedBuffer + }{ + "file buffer": { + buf: fileBuf, + }, + "memory buffer": { + buf: &memBuffer{ + buf: bytes.NewBuffer(make([]byte, 0, 1024)), + }, + }, } - t.Logf("json: %s", string(b)) + for name, tt := range tests { + t.Run(name, func(t *testing.T) { + buf := tt.buf + defer buf.Close() + + writeAndReadBuffer(t, buf, "hello there, 42") + buf.Reset() - expected := `{"event":{"userId":"tuna","event":"FooBared","timestamp":0,"properties":{"index":1,"qwer":3424}},"sentAt":0,"receivedAt":0}` + writeAndReadBuffer(t, buf, "hello there, i'm longer") + buf.Reset() - if string(b) != expected { - t.Errorf("Expected: %s, Actual: %s", expected, string(b)) + writeAndReadBuffer(t, buf, "then shorter") + buf.Reset() + }) } } +func writeAndReadBuffer(t *testing.T, buf encodedBuffer, expected string) { + _, err := buf.Write([]byte(expected)) + require.NoError(t, err) + + reader, err := buf.Reader() + require.NoError(t, err) + + result, err := ioutil.ReadAll(reader) + require.NoError(t, err) + + require.Equal(t, expected, string(result)) +} + +const ( + filePath = "/tmp/buffer_events.tmp" +) + func ManualTestS3Client(t *testing.T) { c, err := NewS3ClientWithConfig( S3ClientConfig{ @@ -38,14 +125,13 @@ func ManualTestS3Client(t *testing.T) { Verbose: true, }, S3: S3{ - Stream: "tuna", - Stage: "pavel", + Stream: "tuna", + Stage: "dev", + BufferFilePath: filePath, }, }, ) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) for i := 0; i < 10; i++ { m := Track{ @@ -56,13 +142,182 @@ func ManualTestS3Client(t *testing.T) { "qwer": 3424, }, } - if err := c.Enqueue(m); err != nil { - t.Error(err) + require.NoError(t, c.Enqueue(m)) + } + require.NoError(t, c.Close()) + + t.FailNow() +} + +func Test_TriggerByTime(t *testing.T) { + defer checkNoFilesLeft(t, filePath) + um := uploadMock{ + resultChan: make(chan []byte, 1), + } + c := newPatchedClient(t, S3ClientConfig{ + Config: Config{ + Verbose: true, + BatchSize: 10, + Interval: 500 * time.Millisecond, + }, + S3: S3{ + Stream: "tuna", + Stage: "dev", + BufferFilePath: filePath, + }, + }, &um) + + writeEvents(t, c, 1, 0) + + select { + case msgs := <-um.resultChan: + msgs1 := readAllMessagies(t, msgs) + assert.Equal(t, 1, len(msgs1)) + + case <-time.After(1 * time.Second): + t.Errorf("no message by timeout") + } + + require.NoError(t, c.Close()) + require.Empty(t, um.resultChan, "no messages") +} + +func Test_MemoryLimit(t *testing.T) { + defer checkNoFilesLeft(t, filePath) + const bytesLimit = 5 * 1024 * 1024 // 5 MiB + + um := uploadMock{ + resultChan: make(chan []byte, 4), + } + c := newPatchedClient(t, S3ClientConfig{ + Config: Config{ + Interval: 5 * time.Minute, + BatchSize: 200_000, + }, + S3: S3{ + Stream: "tuna", + Stage: "dev", + MaxBatchBytes: bytesLimit, + BufferFilePath: filePath, + }, + }, &um) + + writeEvents(t, c, 400_000, 0) + + readOneEvent(t, um.resultChan, bytesLimit) + readOneEvent(t, um.resultChan, bytesLimit) + + require.NoError(t, c.Close()) +} + +func checkNoFilesLeft(t *testing.T, path string) { + t.Helper() + dir, fn := filepath.Split(path) + files, err := ioutil.ReadDir(dir) + require.NoError(t, err) + + for _, file := range files { + assert.NotContains(t, file.Name(), fn) + } +} + +func readOneEvent(t *testing.T, resultChan <-chan []byte, bytesLimit int) { + eventsData := <-resultChan + log.Println("event size: ", float64(len(eventsData))/1024/1024, " mib") + readAllMessagies(t, eventsData) + require.GreaterOrEqual(t, len(eventsData), bytesLimit) +} + +func Test_MessagesLimit(t *testing.T) { + const msgsLimit = 10 + + um := uploadMock{ + resultChan: make(chan []byte, 3), + } + c := newPatchedClient(t, S3ClientConfig{ + Config: Config{ + Verbose: true, + BatchSize: msgsLimit, + maxConcurrentRequests: 1, + }, + S3: S3{ + Stream: "tuna", + Stage: "dev", + }, + }, &um) + + writeEvents(t, c, msgsLimit*2, 0) + + msgs1 := readAllMessagies(t, <-um.resultChan) + require.Equal(t, msgsLimit, len(msgs1)) + msgs2 := readAllMessagies(t, <-um.resultChan) + require.Equal(t, msgsLimit, len(msgs2)) + + err := c.Close() + require.NoError(t, err) + + require.Empty(t, um.resultChan, "no messages") +} + +func newPatchedClient(t *testing.T, cfg S3ClientConfig, um *uploadMock) Client { + c, err := NewS3ClientWithConfig(cfg) + require.NoError(t, err) + + c.(*s3Client).uploader = um + return c +} + +func readAllMessagies(t *testing.T, input []byte) []Properties { + gunzip, err := gzip.NewReader(bytes.NewReader(input)) + require.NoError(t, err) + defer gunzip.Close() + + scan := bufio.NewReader(gunzip) + + var totalMsgs []Properties + for { + row, _, err := scan.ReadLine() + if err == io.EOF { + break + } + require.NoError(t, err) + + v := Properties{} + err = json.Unmarshal(row, &v) + require.NoError(t, err) + + totalMsgs = append(totalMsgs, v) + } + + return totalMsgs +} + +func writeEvents(t *testing.T, c Client, messages int, delay time.Duration) { + t.Helper() + for i := 0; i < messages; i++ { + m := Track{ + Event: "FooBared", + UserId: "tuna", + Properties: map[string]interface{}{ + "index": i, + "qwer": 3424, + }, } + require.NoError(t, c.Enqueue(m)) } - if err := c.Close(); err != nil { - t.Error(err) +} + +type uploadMock struct { + resultChan chan []byte +} + +func (u *uploadMock) Upload(input *s3manager.UploadInput, options ...func(*s3manager.Uploader)) (*s3manager.UploadOutput, error) { + data, err := ioutil.ReadAll(input.Body) + if err != nil { + return nil, err } - t.FailNow() + u.resultChan <- data + + return nil, nil } diff --git a/s3clientconfig.go b/s3clientconfig.go index 3780515..cc719bd 100644 --- a/s3clientconfig.go +++ b/s3clientconfig.go @@ -21,7 +21,7 @@ func makeS3ClientConfig(c S3ClientConfig) (S3ClientConfig, error) { } if c.S3.MaxBatchBytes == 0 { - c.S3.MaxBatchBytes = 128 * MB + c.S3.MaxBatchBytes = 20 * MB } if c.S3.Bucket == "" { diff --git a/track.go b/track.go index b79607a..eebf2c5 100644 --- a/track.go +++ b/track.go @@ -40,3 +40,12 @@ func (msg Track) validate() error { return nil } + +// TrackObj represents object sent in a track call as Track +// but instead of map[string]interface{} accepts any struct which should be serialized to json +type TrackObj struct { + // This field is exported for serialization purposes and shouldn't be set by + // the application, its value is always overwritten by the library. + Track + Properties interface{} `json:"properties,omitempty"` +} diff --git a/vendor/github.com/segmentio/backo-go b/vendor/github.com/segmentio/backo-go deleted file mode 160000 index ed3adc5..0000000 --- a/vendor/github.com/segmentio/backo-go +++ /dev/null @@ -1 +0,0 @@ -Subproject commit ed3adc599ae5cd72e97bd085ff1835c9d3276594 diff --git a/vendor/github.com/xtgo/uuid b/vendor/github.com/xtgo/uuid deleted file mode 160000 index a0b1148..0000000 --- a/vendor/github.com/xtgo/uuid +++ /dev/null @@ -1 +0,0 @@ -Subproject commit a0b114877d4caeffbd7f87e3757c17fce570fea7