diff --git a/README.md b/README.md index 0a03141..532a0c5 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ import ( "time" "github.com/sirupsen/logrus" - "github.com/fhaze/kinesis-producer" + "github.com/achunariov/kinesis-producer" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/kinesis" @@ -72,7 +72,7 @@ package main import ( "time" - "github.com/fhaze/kinesis-producer" + "github.com/achunariov/kinesis-producer" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/kinesis" @@ -131,7 +131,7 @@ import ( "math/big" "time" - "github.com/fhaze/kinesis-producer" + "github.com/achunariov/kinesis-producer" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/kinesis" @@ -223,8 +223,8 @@ customLogger := &CustomLogger{} ```go import ( "github.com/sirupsen/logrus" - producer "github.com/fhaze/kinesis-producer" - "github.com/fhaze/kinesis-producer/loggers" + producer "github.com/achunariov/kinesis-producer" + "github.com/achunariov/kinesis-producer/loggers" ) log := logrus.New() @@ -246,12 +246,12 @@ kinesis-producer ships with three logger implementations. ### License MIT -[godoc-url]: https://godoc.org/github.com/fhaze/kinesis-producer +[godoc-url]: https://godoc.org/github.com/achunariov/kinesis-producer [godoc-img]: https://img.shields.io/badge/godoc-reference-blue.svg?style=flat-square [kpl-url]: https://github.com/awslabs/amazon-kinesis-producer [de-aggregation]: http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-kpl-consumer-deaggregation.html [kpl-aggregation]: http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-producer-adv-aggregation.html -[aggregation-format-url]: https://github.com/fhaze/kinesis-producer/blob/master/aggregation-format.md +[aggregation-format-url]: https://github.com/achunariov/kinesis-producer/blob/master/aggregation-format.md [license-image]: https://img.shields.io/badge/license-MIT-blue.svg?style=flat-square [license-url]: LICENSE diff --git a/aggregator.go b/aggregator.go index 00e25ef..6d3fd41 100644 --- a/aggregator.go +++ b/aggregator.go @@ -4,8 +4,8 @@ import ( "crypto/md5" "sync" - k "github.com/aws/aws-sdk-go/service/kinesis" - "github.com/fhaze/kinesis-producer/pb" + "github.com/achunariov/kinesis-producer/pb" + "github.com/aws/aws-sdk-go-v2/service/kinesis/types" "github.com/golang/protobuf/proto" ) @@ -16,13 +16,13 @@ var ( // Contains the AWS Kinesis PutRecordsRequestEntry and UserRecords that are aggregated into // the request. UserRecords are provided for more control over failure notifications type AggregatedRecordRequest struct { - Entry *k.PutRecordsRequestEntry + Entry types.PutRecordsRequestEntry UserRecords []UserRecord } func NewAggregatedRecordRequest(data []byte, partitionKey, explicitHashKey *string, userRecords []UserRecord) *AggregatedRecordRequest { return &AggregatedRecordRequest{ - Entry: &k.PutRecordsRequestEntry{ + Entry: types.PutRecordsRequestEntry{ Data: data, PartitionKey: partitionKey, ExplicitHashKey: explicitHashKey, diff --git a/aggregator_test.go b/aggregator_test.go index 334f231..b6a4d20 100644 --- a/aggregator_test.go +++ b/aggregator_test.go @@ -2,11 +2,12 @@ package producer import ( "fmt" + "github.com/aws/aws-sdk-go-v2/service/kinesis/types" "strconv" "testing" + "github.com/achunariov/kinesis-producer/deaggregation" k "github.com/aws/aws-sdk-go/service/kinesis" - "github.com/fhaze/kinesis-producer/deaggregation" "github.com/stretchr/testify/require" ) @@ -119,7 +120,7 @@ func TestAggregation(t *testing.T) { } } -func extractRecords(entry *k.PutRecordsRequestEntry) (out []*k.PutRecordsRequestEntry) { +func extractRecords(entry types.PutRecordsRequestEntry) (out []*k.PutRecordsRequestEntry) { dest, err := deaggregation.Unmarshal(entry.Data) if err != nil { return diff --git a/config.go b/config.go index 6b55c42..1d459b5 100644 --- a/config.go +++ b/config.go @@ -1,11 +1,13 @@ package producer import ( + "context" "log" "os" "time" - k "github.com/aws/aws-sdk-go/service/kinesis" + k "github.com/aws/aws-sdk-go-v2/service/kinesis" + "github.com/aws/aws-sdk-go-v2/service/kinesis/types" ) // Constants and default configuration take from: @@ -24,7 +26,7 @@ const ( // Putter is the interface that wraps the KinesisAPI.PutRecords method. type Putter interface { - PutRecords(*k.PutRecordsInput) (*k.PutRecordsOutput, error) + PutRecords(ctx context.Context, params *k.PutRecordsInput, optFns ...func(*k.Options)) (*k.PutRecordsOutput, error) } // GetShardsFunc is called to populate the shard map on initialization and during refresh @@ -32,9 +34,9 @@ type Putter interface { // initialization, this will be nil. GetShardsFunc should return a shard list, a bool // indicating if the shards should be updated and an error. If false bool or error is // returned, shards will not be updated. -type GetShardsFunc func(old []*k.Shard) ([]*k.Shard, bool, error) +type GetShardsFunc func(old []types.Shard) ([]types.Shard, bool, error) -func defaultGetShardsFunc(old []*k.Shard) ([]*k.Shard, bool, error) { return nil, false, nil } +func defaultGetShardsFunc(old []types.Shard) ([]types.Shard, bool, error) { return nil, false, nil } // Config is the Producer configuration. type Config struct { diff --git a/deaggregation/deaggregation.go b/deaggregation/deaggregation.go index 3f7903d..e642e1c 100644 --- a/deaggregation/deaggregation.go +++ b/deaggregation/deaggregation.go @@ -6,7 +6,7 @@ import ( "bytes" "crypto/md5" - "github.com/fhaze/kinesis-producer/pb" + "github.com/achunariov/kinesis-producer/pb" "google.golang.org/protobuf/proto" ) diff --git a/deaggregation/deaggregation_test.go b/deaggregation/deaggregation_test.go index 2f3b4c5..98f832f 100644 --- a/deaggregation/deaggregation_test.go +++ b/deaggregation/deaggregation_test.go @@ -6,7 +6,7 @@ import ( "reflect" "testing" - "github.com/fhaze/kinesis-producer/pb" + "github.com/achunariov/kinesis-producer/pb" "google.golang.org/protobuf/proto" ) diff --git a/example_test.go b/example_test.go index 3d07156..c004926 100644 --- a/example_test.go +++ b/example_test.go @@ -7,15 +7,14 @@ import ( "os" "time" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/kinesis" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/kinesis" "github.com/google/uuid" ) func ExampleSimple() { logger := &StdLogger{log.New(os.Stdout, "", log.LstdFlags)} - client := kinesis.New(session.New(aws.NewConfig())) + client := kinesis.NewFromConfig(*aws.NewConfig()) pr := New(&Config{ StreamName: "test", BacklogCount: 2000, @@ -49,7 +48,7 @@ func ExampleSimple() { func ExampleShardMap() { logger := &StdLogger{log.New(os.Stdout, "", log.LstdFlags)} - client := kinesis.New(session.New(aws.NewConfig())) + client := kinesis.NewFromConfig(*aws.NewConfig()) pr := New(&Config{ StreamName: "test", BacklogCount: 2000, @@ -114,7 +113,7 @@ func newMyExampleUserRecord(key, val string) (*myExampleUserRecord, error) { func ExampleUserRecord() { logger := &StdLogger{log.New(os.Stdout, "", log.LstdFlags)} - client := kinesis.New(session.New(aws.NewConfig())) + client := kinesis.NewFromConfig(*aws.NewConfig()) pr := New(&Config{ StreamName: "test", BacklogCount: 2000, diff --git a/go.mod b/go.mod index f0ff9fd..8ca6b09 100644 --- a/go.mod +++ b/go.mod @@ -1,18 +1,18 @@ -module github.com/fhaze/kinesis-producer +module github.com/achunariov/kinesis-producer + +go 1.17 require ( - github.com/aws/aws-sdk-go v1.21.10 - github.com/golang/protobuf v1.5.0 // indirect + github.com/aws/aws-sdk-go v1.40.37 + github.com/aws/aws-sdk-go-v2 v1.9.0 + github.com/aws/aws-sdk-go-v2/service/kinesis v1.6.0 + github.com/golang/protobuf v1.5.2 github.com/google/uuid v1.1.1 - github.com/jpillora/backoff v0.0.0-20180909062703-3050d21c67d7 - github.com/pkg/errors v0.8.1 // indirect + github.com/jpillora/backoff v1.0.0 github.com/sirupsen/logrus v1.4.2 github.com/stretchr/testify v1.2.2 go.uber.org/atomic v1.4.0 // indirect go.uber.org/multierr v1.1.0 // indirect go.uber.org/zap v1.10.0 - golang.org/x/net v0.0.0-20190724013045-ca1201d0de80 // indirect - google.golang.org/protobuf v1.26.0 + google.golang.org/protobuf v1.27.1 ) - -go 1.13 diff --git a/go.sum b/go.sum index aef3842..01c315c 100644 --- a/go.sum +++ b/go.sum @@ -1,22 +1,39 @@ -github.com/aws/aws-sdk-go v1.21.10 h1:lTRdgyxraKbnNhx7kWeoW/Uow1TKnSNDpQGTtEXJQgk= -github.com/aws/aws-sdk-go v1.21.10/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= +github.com/aws/aws-sdk-go v1.40.37 h1:I+Q6cLctkFyMMrKukcDnj+i2kjrQ37LGiOM6xmsxC48= +github.com/aws/aws-sdk-go v1.40.37/go.mod h1:585smgzpB/KqRA+K3y/NL/oYRqQvpNJYvLm+LY1U59Q= +github.com/aws/aws-sdk-go-v2 v1.9.0 h1:+S+dSqQCN3MSU5vJRu1HqHrq00cJn6heIMU7X9hcsoo= +github.com/aws/aws-sdk-go-v2 v1.9.0/go.mod h1:cK/D0BBs0b/oWPIcX/Z/obahJK1TT7IPVjy53i/mX/4= +github.com/aws/aws-sdk-go-v2/service/kinesis v1.6.0 h1:hb+NupVMUzINGUCfDs2+YqMkWKu47dBIQHpulM0XWh4= +github.com/aws/aws-sdk-go-v2/service/kinesis v1.6.0/go.mod h1:9O7UG2pELnP0hq35+Gd7XDjOLBkg7tmgRQ0y14ZjoJI= +github.com/aws/smithy-go v1.8.0 h1:AEwwwXQZtUwP5Mz506FeXXrKBe0jA8gVM+1gEcSRooc= +github.com/aws/smithy-go v1.8.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/golang/protobuf v1.5.0 h1:LUVKkCeviFUMKqHa4tXIIij/lbhnMbP7Fn5wKdKkRh4= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= +github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.6 h1:BKbKCqvP6I+rmFHt06ZmyQtvB8xAkWdhFyr0ZUNZcxQ= +github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM= -github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= -github.com/jpillora/backoff v0.0.0-20180909062703-3050d21c67d7 h1:K//n/AqR5HjG3qxbrBCL4vJPW0MVFSs9CPK1OOJdRME= -github.com/jpillora/backoff v0.0.0-20180909062703-3050d21c67d7/go.mod h1:2iMrUgbbvHEiQClaW2NsSzMyGHqN+rDFqY705q49KG0= +github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= +github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= +github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= +github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= +github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= +github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= +github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= -github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= @@ -26,13 +43,25 @@ go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= -golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/net v0.0.0-20210614182718-04defd469f4e h1:XpT3nA5TvE525Ne3hInMh6+GETgn27Zfm9dxsThnX2Q= +golang.org/x/net v0.0.0-20210614182718-04defd469f4e/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc= golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da h1:b3NXsE2LusjYGGjL5bxEVZZORm/YEFFrWFjR8eFrw/c= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0 h1:bxAC2xTBsZGibn2RTntX0oH50xLsqy1OxA9tTL3p/lk= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ= +google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10= +gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/loggers/kplogrus/logrus.go b/loggers/kplogrus/logrus.go index cd188ac..07a4a3c 100644 --- a/loggers/kplogrus/logrus.go +++ b/loggers/kplogrus/logrus.go @@ -1,7 +1,7 @@ package kplogrus import ( - producer "github.com/fhaze/kinesis-producer" + producer "github.com/achunariov/kinesis-producer" "github.com/sirupsen/logrus" ) diff --git a/loggers/kpzap/zap.go b/loggers/kpzap/zap.go index c745123..60e1e10 100644 --- a/loggers/kpzap/zap.go +++ b/loggers/kpzap/zap.go @@ -3,7 +3,7 @@ package kpzap import ( "go.uber.org/zap" - producer "github.com/fhaze/kinesis-producer" + producer "github.com/achunariov/kinesis-producer" ) // Logger implements a zap.Logger logger for kinesis-producer diff --git a/pb/messages.proto b/pb/messages.proto index e30b3ad..432ff51 100644 --- a/pb/messages.proto +++ b/pb/messages.proto @@ -1,7 +1,7 @@ syntax = "proto2"; package pb; -option go_package = "github.com/fhaze/kinesis-producer/pb"; +option go_package = "github.com/achunariov/kinesis-producer/pb"; message AggregatedRecord { repeated string partition_key_table = 1; diff --git a/producer_test.go b/producer_test.go index a07492f..91b6eff 100644 --- a/producer_test.go +++ b/producer_test.go @@ -1,16 +1,17 @@ package producer import ( + "context" "errors" "fmt" + "github.com/aws/aws-sdk-go-v2/service/kinesis/types" "runtime" "sync" "testing" "time" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/service/kinesis" - k "github.com/aws/aws-sdk-go/service/kinesis" + "github.com/aws/aws-sdk-go-v2/aws" + k "github.com/aws/aws-sdk-go-v2/service/kinesis" "github.com/google/uuid" ) @@ -25,7 +26,7 @@ type clientMock struct { incoming map[int][]string } -func (c *clientMock) PutRecords(input *k.PutRecordsInput) (*k.PutRecordsOutput, error) { +func (c *clientMock) PutRecords(ctx context.Context, input *k.PutRecordsInput, optFns ...func(*k.Options)) (*k.PutRecordsOutput, error) { res := c.responses[c.calls] for _, r := range input.Records { c.incoming[c.calls] = append(c.incoming[c.calls], *r.PartitionKey) @@ -66,7 +67,7 @@ var testCases = []testCase{ { Error: nil, Response: &k.PutRecordsOutput{ - FailedRecordCount: aws.Int64(0), + FailedRecordCount: aws.Int32(0), }, }, }}, @@ -84,13 +85,13 @@ var testCases = []testCase{ { Error: nil, Response: &k.PutRecordsOutput{ - FailedRecordCount: aws.Int64(0), + FailedRecordCount: aws.Int32(0), }, }, { Error: nil, Response: &k.PutRecordsOutput{ - FailedRecordCount: aws.Int64(0), + FailedRecordCount: aws.Int32(0), }, }, }}, @@ -109,8 +110,8 @@ var testCases = []testCase{ { Error: nil, Response: &k.PutRecordsOutput{ - FailedRecordCount: aws.Int64(1), - Records: []*k.PutRecordsResultEntry{ + FailedRecordCount: aws.Int32(1), + Records: []types.PutRecordsResultEntry{ {SequenceNumber: aws.String("3"), ShardId: aws.String("1")}, {ErrorCode: aws.String("400")}, }, @@ -119,7 +120,7 @@ var testCases = []testCase{ { Error: nil, Response: &k.PutRecordsOutput{ - FailedRecordCount: aws.Int64(0), + FailedRecordCount: aws.Int32(0), }, }, }}, @@ -138,13 +139,13 @@ var testCases = []testCase{ { Error: nil, Response: &k.PutRecordsOutput{ - FailedRecordCount: aws.Int64(0), + FailedRecordCount: aws.Int32(0), }, }, { Error: nil, Response: &k.PutRecordsOutput{ - FailedRecordCount: aws.Int64(0), + FailedRecordCount: aws.Int32(0), }, }, }}, @@ -248,9 +249,9 @@ func TestNotify(t *testing.T) { } } -func mockGetShards(startingShards, shards []*k.Shard, updated bool, err error) GetShardsFunc { +func mockGetShards(startingShards, shards []types.Shard, updated bool, err error) GetShardsFunc { calls := 0 - return func(_ []*k.Shard) ([]*k.Shard, bool, error) { + return func(_ []types.Shard) ([]types.Shard, bool, error) { calls++ switch calls { case 1: @@ -274,17 +275,17 @@ type mockThrottleClient struct { func (c *mockThrottleClient) PutRecords(input *k.PutRecordsInput) (*k.PutRecordsOutput, error) { select { case <-c.done: - failed := int64(0) + failed := int32(0) return &k.PutRecordsOutput{ FailedRecordCount: &failed, }, nil default: fmt.Println("put records throttle") - failed := int64(len(input.Records)) + failed := int32(len(input.Records)) code := "errorcode" - var records []*kinesis.PutRecordsResultEntry + var records []types.PutRecordsResultEntry for range input.Records { - records = append(records, &kinesis.PutRecordsResultEntry{ + records = append(records, types.PutRecordsResultEntry{ ErrorCode: &code, }) } @@ -464,8 +465,8 @@ type mockBenchmarkClient struct { b *testing.B } -func (_ *mockBenchmarkClient) PutRecords(_ *k.PutRecordsInput) (*k.PutRecordsOutput, error) { - failed := int64(0) +func (_ *mockBenchmarkClient) PutRecords(ctx context.Context, input *k.PutRecordsInput, optFns ...func(*k.Options)) (*k.PutRecordsOutput, error) { + failed := int32(0) return &k.PutRecordsOutput{ FailedRecordCount: &failed, }, nil diff --git a/shard_map.go b/shard_map.go index 5db1fae..342e1f6 100644 --- a/shard_map.go +++ b/shard_map.go @@ -1,28 +1,30 @@ package producer import ( + "context" "crypto/md5" "math/big" "sort" "sync" - k "github.com/aws/aws-sdk-go/service/kinesis" + k "github.com/aws/aws-sdk-go-v2/service/kinesis" + "github.com/aws/aws-sdk-go-v2/service/kinesis/types" ) // 2^128 exclusive upper bound // Hash key ranges are 0 indexed, so true max is 2^128 - 1 -const maxHashKeyRange = "340282366920938463463374607431768211455" +var maxHashKeyRange = "340282366920938463463374607431768211455" // ShardLister is the interface that wraps the KinesisAPI.ListShards method. type ShardLister interface { - ListShards(input *k.ListShardsInput) (*k.ListShardsOutput, error) + ListShards(ctx context.Context, params *k.ListShardsInput, optFns ...func(*k.Options)) (*k.ListShardsOutput, error) } // GetKinesisShardsFunc gets the active list of shards from Kinesis.ListShards API func GetKinesisShardsFunc(client ShardLister, streamName string) GetShardsFunc { - return func(old []*k.Shard) ([]*k.Shard, bool, error) { + return func(old []types.Shard) ([]types.Shard, bool, error) { var ( - shards []*k.Shard + shards []types.Shard next *string ) @@ -34,7 +36,7 @@ func GetKinesisShardsFunc(client ShardLister, streamName string) GetShardsFunc { input.StreamName = &streamName } - resp, err := client.ListShards(input) + resp, err := client.ListShards(context.Background(), input) if err != nil { return nil, false, err } @@ -66,7 +68,7 @@ func GetKinesisShardsFunc(client ShardLister, streamName string) GetShardsFunc { // StaticGetShardsFunc returns a GetShardsFunc that when called, will generate a static // list of shards with length count whos HashKeyRanges are evenly distributed func StaticGetShardsFunc(count int) GetShardsFunc { - return func(old []*k.Shard) ([]*k.Shard, bool, error) { + return func(old []types.Shard) ([]types.Shard, bool, error) { if count == 0 { return nil, false, nil } @@ -77,33 +79,32 @@ func StaticGetShardsFunc(count int) GetShardsFunc { step = step.Div(step, bCount) b1 := big.NewInt(int64(1)) - shards := make([]*k.Shard, count) + shards := make([]types.Shard, count) key := big.NewInt(int64(0)) for i := 0; i < count; i++ { - shard := new(k.Shard) - hkRange := new(k.HashKeyRange) - bI := big.NewInt(int64(i)) // starting key range (step * i) key = key.Mul(bI, step) - hkRange = hkRange.SetStartingHashKey(key.String()) + startingHashKey := key.String() + // ending key range ((step * (i + 1)) - 1) bINext := big.NewInt(int64(i + 1)) key = key.Mul(bINext, step) key = key.Sub(key, b1) - hkRange = hkRange.SetEndingHashKey(key.String()) + endingHashKey := key.String() - // TODO: Is setting other shard properties necessary? - shard = shard.SetHashKeyRange(hkRange) - shards[i] = shard + shards[i].HashKeyRange = &types.HashKeyRange{ + StartingHashKey: &startingHashKey, + EndingHashKey: &endingHashKey, + } } // Set last shard end range to max to account for small rounding errors - shards[len(shards)-1].HashKeyRange.SetEndingHashKey(maxHashKeyRange) + shards[len(shards)-1].HashKeyRange.EndingHashKey = &maxHashKeyRange return shards, false, nil } } -type ShardSlice []*k.Shard +type ShardSlice []types.Shard func (p ShardSlice) Len() int { return len(p) } func (p ShardSlice) Less(i, j int) bool { @@ -115,7 +116,7 @@ func (p ShardSlice) Less(i, j int) bool { func (p ShardSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } // Checks to see if the shards have the same hash key ranges -func shardsEqual(a, b []*k.Shard) bool { +func shardsEqual(a, b []types.Shard) bool { if len(a) != len(b) { return false } @@ -131,7 +132,7 @@ func shardsEqual(a, b []*k.Shard) bool { type ShardMap struct { sync.RWMutex - shards []*k.Shard + shards []types.Shard aggregators []*Aggregator // aggregateBatchCount determine the maximum number of items to pack into an aggregated record. aggregateBatchCount int @@ -144,7 +145,7 @@ type ShardMap struct { // A ShardMap with an empty shards slice will return to unsharded behavior with a single // aggregator. The aggregator will instead use the PartitionKey of the first UserRecord and // no ExplicitHashKey. -func NewShardMap(shards []*k.Shard, aggregateBatchCount int) *ShardMap { +func NewShardMap(shards []types.Shard, aggregateBatchCount int) *ShardMap { return &ShardMap{ shards: shards, aggregators: makeAggregators(shards), @@ -197,7 +198,7 @@ func (m *ShardMap) Drain() ([]*AggregatedRecordRequest, []error) { } // Shards returns the list of shards -func (m *ShardMap) Shards() []*k.Shard { +func (m *ShardMap) Shards() []types.Shard { m.RLock() shards := m.shards m.RUnlock() @@ -214,7 +215,7 @@ func (m *ShardMap) Shards() []*k.Shard { // it out since we retain original partition keys (but not explicit hash keys) // Shard merging should not be an issue since records from both shards should fall // into the merged hash key range. -func (m *ShardMap) UpdateShards(shards []*k.Shard, pendingRecords []*AggregatedRecordRequest) ([]*AggregatedRecordRequest, error) { +func (m *ShardMap) UpdateShards(shards []types.Shard, pendingRecords []*AggregatedRecordRequest) ([]*AggregatedRecordRequest, error) { m.Lock() defer m.Unlock() @@ -328,7 +329,7 @@ func hashKey(pk string) *big.Int { return hk } -func makeAggregators(shards []*k.Shard) []*Aggregator { +func makeAggregators(shards []types.Shard) []*Aggregator { count := len(shards) if count == 0 { return []*Aggregator{NewAggregator(nil)} diff --git a/shard_map_test.go b/shard_map_test.go index 035deb4..591c46d 100644 --- a/shard_map_test.go +++ b/shard_map_test.go @@ -1,6 +1,7 @@ package producer import ( + "context" "errors" "math/big" "math/rand" @@ -8,8 +9,9 @@ import ( "sync" "testing" - "github.com/aws/aws-sdk-go/aws" - k "github.com/aws/aws-sdk-go/service/kinesis" + "github.com/aws/aws-sdk-go-v2/aws" + k "github.com/aws/aws-sdk-go-v2/service/kinesis" + "github.com/aws/aws-sdk-go-v2/service/kinesis/types" "github.com/stretchr/testify/require" ) @@ -100,7 +102,7 @@ func TestShardMapPut(t *testing.T) { }, postDrained: []*AggregatedRecordRequest{ &AggregatedRecordRequest{ - Entry: &k.PutRecordsRequestEntry{}, + Entry: types.PutRecordsRequestEntry{}, UserRecords: []UserRecord{ newTestUserRecord("foo", "", []byte("hello")), newTestUserRecord("foo", "", []byte("hello")), @@ -118,7 +120,7 @@ func TestShardMapPut(t *testing.T) { }, putDrained: []*AggregatedRecordRequest{ &AggregatedRecordRequest{ - Entry: &k.PutRecordsRequestEntry{}, + Entry: types.PutRecordsRequestEntry{}, UserRecords: []UserRecord{ newTestUserRecord("foo", "", []byte("hello")), }, @@ -126,7 +128,7 @@ func TestShardMapPut(t *testing.T) { }, postDrained: []*AggregatedRecordRequest{ &AggregatedRecordRequest{ - Entry: &k.PutRecordsRequestEntry{}, + Entry: types.PutRecordsRequestEntry{}, UserRecords: []UserRecord{ newTestUserRecord("foo", "", []byte("hello")), }, @@ -145,7 +147,7 @@ func TestShardMapPut(t *testing.T) { }, putDrained: []*AggregatedRecordRequest{ &AggregatedRecordRequest{ - Entry: &k.PutRecordsRequestEntry{}, + Entry: types.PutRecordsRequestEntry{}, UserRecords: []UserRecord{ newTestUserRecord("foo", "", mockData("hello", (maxRecordSize-100)/3)), newTestUserRecord("foo", "", mockData("hello", (maxRecordSize-100)/3)), @@ -155,7 +157,7 @@ func TestShardMapPut(t *testing.T) { }, postDrained: []*AggregatedRecordRequest{ &AggregatedRecordRequest{ - Entry: &k.PutRecordsRequestEntry{}, + Entry: types.PutRecordsRequestEntry{}, UserRecords: []UserRecord{ newTestUserRecord("foo", "", mockData("hello", (maxRecordSize-100)/3)), }, @@ -175,7 +177,7 @@ func TestShardMapPut(t *testing.T) { }, putDrained: []*AggregatedRecordRequest{ &AggregatedRecordRequest{ - Entry: &k.PutRecordsRequestEntry{ + Entry: types.PutRecordsRequestEntry{ // StartingHashKey of first shard ExplicitHashKey: aws.String("0"), }, @@ -187,7 +189,7 @@ func TestShardMapPut(t *testing.T) { }, postDrained: []*AggregatedRecordRequest{ &AggregatedRecordRequest{ - Entry: &k.PutRecordsRequestEntry{ + Entry: types.PutRecordsRequestEntry{ // StartingHashKey of first shard ExplicitHashKey: aws.String("0"), }, @@ -196,7 +198,7 @@ func TestShardMapPut(t *testing.T) { }, }, &AggregatedRecordRequest{ - Entry: &k.PutRecordsRequestEntry{ + Entry: types.PutRecordsRequestEntry{ // StartingHashKey of second shard ExplicitHashKey: aws.String("170141183460469231731687303715884105727"), }, @@ -222,7 +224,7 @@ func TestShardMapPut(t *testing.T) { }, putDrained: []*AggregatedRecordRequest{ &AggregatedRecordRequest{ - Entry: &k.PutRecordsRequestEntry{ + Entry: types.PutRecordsRequestEntry{ // StartingHashKey of first shard ExplicitHashKey: aws.String("0"), }, @@ -234,7 +236,7 @@ func TestShardMapPut(t *testing.T) { }, postDrained: []*AggregatedRecordRequest{ &AggregatedRecordRequest{ - Entry: &k.PutRecordsRequestEntry{ + Entry: types.PutRecordsRequestEntry{ // StartingHashKey of first shard ExplicitHashKey: aws.String("0"), }, @@ -243,7 +245,7 @@ func TestShardMapPut(t *testing.T) { }, }, &AggregatedRecordRequest{ - Entry: &k.PutRecordsRequestEntry{ + Entry: types.PutRecordsRequestEntry{ // StartingHashKey of second shard ExplicitHashKey: aws.String("170141183460469231731687303715884105727"), }, @@ -474,7 +476,7 @@ type mockShardLister struct { next *string } -func (m *mockShardLister) ListShards(input *k.ListShardsInput) (*k.ListShardsOutput, error) { +func (m *mockShardLister) ListShards(ctx context.Context, input *k.ListShardsInput, optFns ...func(*k.Options)) (*k.ListShardsOutput, error) { m.callCount++ if m.callCount > len(m.responses) { return nil, errors.New("ListShards error") @@ -558,8 +560,8 @@ func TestGetKinesisShardsFunc(t *testing.T) { t.Run(tc.name, func(t *testing.T) { var ( listShardsResponses []*k.ListShardsOutput - oldShards []*k.Shard - expectedShards []*k.Shard + oldShards []types.Shard + expectedShards []types.Shard ) if tc.listShardsResponses != "" { @@ -568,12 +570,12 @@ func TestGetKinesisShardsFunc(t *testing.T) { } if tc.oldShards != "" { - oldShards = make([]*k.Shard, 0) + oldShards = make([]types.Shard, 0) loadJSONFromFile(t, tc.oldShards, &oldShards) } if tc.expectedShards != "" { - expectedShards = make([]*k.Shard, 0) + expectedShards = make([]types.Shard, 0) loadJSONFromFile(t, tc.expectedShards, &expectedShards) } @@ -623,7 +625,7 @@ func TestShardMapUpdateShards(t *testing.T) { newShards: "testdata/TestShardMapUpdateShards/error_pending_put/newShards.json", pendingRecords: []*AggregatedRecordRequest{ &AggregatedRecordRequest{ - Entry: &k.PutRecordsRequestEntry{ + Entry: types.PutRecordsRequestEntry{ // StartingHashKey of second shard ExplicitHashKey: aws.String("170141183460469231731687303715884105728"), }, @@ -635,7 +637,7 @@ func TestShardMapUpdateShards(t *testing.T) { }, updateDrained: []*AggregatedRecordRequest{ &AggregatedRecordRequest{ - Entry: &k.PutRecordsRequestEntry{ + Entry: types.PutRecordsRequestEntry{ // StartingHashKey of second shard ExplicitHashKey: aws.String("170141183460469231731687303715884105728"), }, @@ -647,7 +649,7 @@ func TestShardMapUpdateShards(t *testing.T) { }, postDrained: []*AggregatedRecordRequest{ &AggregatedRecordRequest{ - Entry: &k.PutRecordsRequestEntry{ + Entry: types.PutRecordsRequestEntry{ // StartingHashKey of first shard ExplicitHashKey: aws.String("0"), }, @@ -657,7 +659,7 @@ func TestShardMapUpdateShards(t *testing.T) { }, }, &AggregatedRecordRequest{ - Entry: &k.PutRecordsRequestEntry{ + Entry: types.PutRecordsRequestEntry{ // StartingHashKey of second shard ExplicitHashKey: aws.String("170141183460469231731687303715884105728"), }, @@ -680,7 +682,7 @@ func TestShardMapUpdateShards(t *testing.T) { newShards: "testdata/TestShardMapUpdateShards/error_agg_put/newShards.json", pendingRecords: []*AggregatedRecordRequest{ &AggregatedRecordRequest{ - Entry: &k.PutRecordsRequestEntry{ + Entry: types.PutRecordsRequestEntry{ // StartingHashKey of second shard ExplicitHashKey: aws.String("0"), }, @@ -692,7 +694,7 @@ func TestShardMapUpdateShards(t *testing.T) { }, updateDrained: []*AggregatedRecordRequest{ &AggregatedRecordRequest{ - Entry: &k.PutRecordsRequestEntry{ + Entry: types.PutRecordsRequestEntry{ // StartingHashKey of second shard ExplicitHashKey: aws.String("0"), }, @@ -704,7 +706,7 @@ func TestShardMapUpdateShards(t *testing.T) { }, postDrained: []*AggregatedRecordRequest{ &AggregatedRecordRequest{ - Entry: &k.PutRecordsRequestEntry{ + Entry: types.PutRecordsRequestEntry{ // StartingHashKey of first shard ExplicitHashKey: aws.String("0"), }, @@ -714,7 +716,7 @@ func TestShardMapUpdateShards(t *testing.T) { }, }, &AggregatedRecordRequest{ - Entry: &k.PutRecordsRequestEntry{ + Entry: types.PutRecordsRequestEntry{ // StartingHashKey of second shard ExplicitHashKey: aws.String("170141183460469231731687303715884105728"), }, @@ -738,7 +740,7 @@ func TestShardMapUpdateShards(t *testing.T) { // getShardsUpdated: false, // postDrained: []*AggregatedRecordRequest{ // &AggregatedRecordRequest{ - // Entry: &k.PutRecordsRequestEntry{ + // Entry: types.PutRecordsRequestEntry{ // // StartingHashKey of first shard // ExplicitHashKey: aws.String("0"), // }, @@ -748,7 +750,7 @@ func TestShardMapUpdateShards(t *testing.T) { // }, // }, // &AggregatedRecordRequest{ - // Entry: &k.PutRecordsRequestEntry{ + // Entry: types.PutRecordsRequestEntry{ // // StartingHashKey of second shard // ExplicitHashKey: aws.String("170141183460469231731687303715884105728"), // }, @@ -772,7 +774,7 @@ func TestShardMapUpdateShards(t *testing.T) { newShards: "testdata/TestShardMapUpdateShards/update/newShards.json", pendingRecords: []*AggregatedRecordRequest{ &AggregatedRecordRequest{ - Entry: &k.PutRecordsRequestEntry{ + Entry: types.PutRecordsRequestEntry{ // StartingHashKey of first shard ExplicitHashKey: aws.String("0"), }, @@ -782,7 +784,7 @@ func TestShardMapUpdateShards(t *testing.T) { }, }, &AggregatedRecordRequest{ - Entry: &k.PutRecordsRequestEntry{ + Entry: types.PutRecordsRequestEntry{ // StartingHashKey of second shard ExplicitHashKey: aws.String("170141183460469231731687303715884105728"), }, @@ -794,7 +796,7 @@ func TestShardMapUpdateShards(t *testing.T) { }, postDrained: []*AggregatedRecordRequest{ &AggregatedRecordRequest{ - Entry: &k.PutRecordsRequestEntry{ + Entry: types.PutRecordsRequestEntry{ // StartingHashKey of first shard ExplicitHashKey: aws.String("0"), }, @@ -806,7 +808,7 @@ func TestShardMapUpdateShards(t *testing.T) { }, }, &AggregatedRecordRequest{ - Entry: &k.PutRecordsRequestEntry{ + Entry: types.PutRecordsRequestEntry{ // StartingHashKey of second shard ExplicitHashKey: aws.String("170141183460469231731687303715884105728"), }, @@ -832,7 +834,7 @@ func TestShardMapUpdateShards(t *testing.T) { newShards: "testdata/TestShardMapUpdateShards/update_drained/newShards.json", pendingRecords: []*AggregatedRecordRequest{ &AggregatedRecordRequest{ - Entry: &k.PutRecordsRequestEntry{ + Entry: types.PutRecordsRequestEntry{ // StartingHashKey of first shard ExplicitHashKey: aws.String("0"), }, @@ -841,7 +843,7 @@ func TestShardMapUpdateShards(t *testing.T) { }, }, &AggregatedRecordRequest{ - Entry: &k.PutRecordsRequestEntry{ + Entry: types.PutRecordsRequestEntry{ // StartingHashKey of second shard ExplicitHashKey: aws.String("170141183460469231731687303715884105728"), }, @@ -852,7 +854,7 @@ func TestShardMapUpdateShards(t *testing.T) { }, updateDrained: []*AggregatedRecordRequest{ &AggregatedRecordRequest{ - Entry: &k.PutRecordsRequestEntry{ + Entry: types.PutRecordsRequestEntry{ // StartingHashKey of first shard ExplicitHashKey: aws.String("0"), }, @@ -862,7 +864,7 @@ func TestShardMapUpdateShards(t *testing.T) { }, }, &AggregatedRecordRequest{ - Entry: &k.PutRecordsRequestEntry{ + Entry: types.PutRecordsRequestEntry{ // StartingHashKey of first shard ExplicitHashKey: aws.String("0"), }, @@ -874,7 +876,7 @@ func TestShardMapUpdateShards(t *testing.T) { }, postDrained: []*AggregatedRecordRequest{ &AggregatedRecordRequest{ - Entry: &k.PutRecordsRequestEntry{ + Entry: types.PutRecordsRequestEntry{ // StartingHashKey of first shard ExplicitHashKey: aws.String("0"), }, @@ -890,10 +892,10 @@ func TestShardMapUpdateShards(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { var ( - startingShards []*k.Shard + startingShards []types.Shard ) if tc.startingShards != "" { - startingShards = make([]*k.Shard, 0) + startingShards = make([]types.Shard, 0) loadJSONFromFile(t, tc.startingShards, &startingShards) } @@ -905,9 +907,9 @@ func TestShardMapUpdateShards(t *testing.T) { require.NoError(t, err) } - var newShards []*k.Shard + var newShards []types.Shard if tc.newShards != "" { - newShards = make([]*k.Shard, 0) + newShards = make([]types.Shard, 0) loadJSONFromFile(t, tc.newShards, &newShards) } diff --git a/worker_pool.go b/worker_pool.go index 0020569..5058620 100644 --- a/worker_pool.go +++ b/worker_pool.go @@ -1,11 +1,13 @@ package producer import ( + "context" "fmt" "sync" "time" - k "github.com/aws/aws-sdk-go/service/kinesis" + k "github.com/aws/aws-sdk-go-v2/service/kinesis" + "github.com/aws/aws-sdk-go-v2/service/kinesis/types" "github.com/jpillora/backoff" ) @@ -236,12 +238,12 @@ func (wp *WorkerPool) send(work *Work) *Work { count := len(work.records) wp.Logger.Info("flushing records", LogValue{"reason", work.reason}, LogValue{"records", count}) - kinesisRecords := make([]*k.PutRecordsRequestEntry, count) + kinesisRecords := make([]types.PutRecordsRequestEntry, count) for i := 0; i < count; i++ { kinesisRecords[i] = work.records[i].Entry } - out, err := wp.Client.PutRecords(&k.PutRecordsInput{ + out, err := wp.Client.PutRecords(context.Background(), &k.PutRecordsInput{ StreamName: &wp.StreamName, Records: kinesisRecords, }) @@ -299,8 +301,8 @@ func (wp *WorkerPool) send(work *Work) *Work { // failures returns the failed records as indicated in the response. func failures( records []*AggregatedRecordRequest, - response []*k.PutRecordsResultEntry, - count int64, + response []types.PutRecordsResultEntry, + count int32, ) []*AggregatedRecordRequest { out := make([]*AggregatedRecordRequest, 0, count) for i, record := range response {