diff --git a/Gopkg.lock b/Gopkg.lock index 2c097153..d8af445b 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -14,13 +14,13 @@ version = "v1.17.0" [[projects]] - name = "github.com/garyburd/redigo" + name = "github.com/gomodule/redigo" packages = [ "internal", "redis" ] - revision = "d1ed5c67e5794de818ea85e6b522fda02623a484" - version = "v1.4.0" + revision = "9c11da706d9b7902c6da69c592f75637793fe121" + version = "v2.0.0" [[projects]] branch = "master" diff --git a/Gopkg.toml b/Gopkg.toml index 30bb27f6..94bfa49d 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -8,8 +8,8 @@ version = "1.17.0" [[constraint]] - name = "github.com/garyburd/redigo" - version = "1.4.0" + name = "github.com/gomodule/redigo" + version = "2.0.0" [[constraint]] name = "github.com/prometheus/client_golang" diff --git a/exporter/redis.go b/exporter/redis.go index 43aaaa94..126e1139 100644 --- a/exporter/redis.go +++ b/exporter/redis.go @@ -9,7 +9,7 @@ import ( "sync" "time" - "github.com/garyburd/redigo/redis" + "github.com/gomodule/redigo/redis" "github.com/prometheus/client_golang/prometheus" prom_strutil "github.com/prometheus/prometheus/util/strutil" log "github.com/sirupsen/logrus" diff --git a/exporter/redis_test.go b/exporter/redis_test.go index f1c4d82d..1cfe4032 100644 --- a/exporter/redis_test.go +++ b/exporter/redis_test.go @@ -20,7 +20,7 @@ import ( "bytes" "flag" - "github.com/garyburd/redigo/redis" + "github.com/gomodule/redigo/redis" "github.com/prometheus/client_golang/prometheus" dto "github.com/prometheus/client_model/go" ) diff --git a/glide.yaml b/glide.yaml index f8be28ee..5ae5f747 100644 --- a/glide.yaml +++ b/glide.yaml @@ -2,8 +2,8 @@ package: github.com/oliver006/redis_exporter import: - package: github.com/sirupsen/logrus version: v0.11.0 -- package: github.com/garyburd/redigo - version: v1.2.0 +- package: github.com/gomodule/redigo + version: v2.0.0 subpackages: - redis - package: github.com/prometheus/client_golang diff --git a/vendor/github.com/gomodule/redigo/.github/CONTRIBUTING.md b/vendor/github.com/gomodule/redigo/.github/CONTRIBUTING.md new file mode 100644 index 00000000..143443a8 --- /dev/null +++ b/vendor/github.com/gomodule/redigo/.github/CONTRIBUTING.md @@ -0,0 +1,5 @@ +Ask questions at +[StackOverflow](https://stackoverflow.com/questions/ask?tags=go+redis). + +[Open an issue](https://github.com/garyburd/redigo/issues/new) to discuss your +plans before doing any work on Redigo. diff --git a/vendor/github.com/gomodule/redigo/.github/ISSUE_TEMPLATE.md b/vendor/github.com/gomodule/redigo/.github/ISSUE_TEMPLATE.md new file mode 100644 index 00000000..d8a1271d --- /dev/null +++ b/vendor/github.com/gomodule/redigo/.github/ISSUE_TEMPLATE.md @@ -0,0 +1 @@ +Ask questions at https://stackoverflow.com/questions/ask?tags=go+redis diff --git a/vendor/github.com/gomodule/redigo/.travis.yml b/vendor/github.com/gomodule/redigo/.travis.yml new file mode 100644 index 00000000..25d62652 --- /dev/null +++ b/vendor/github.com/gomodule/redigo/.travis.yml @@ -0,0 +1,20 @@ +language: go +sudo: false +services: + - redis-server + +go: + - 1.4 + - 1.5.x + - 1.6.x + - 1.7.x + - 1.8.x + - 1.9.x + - 1.10.x + - tip + +script: + - go get -t -v ./... + - diff -u <(echo -n) <(gofmt -d .) + - go vet $(go list ./... | grep -v /vendor/) + - go test -v -race ./... diff --git a/vendor/github.com/garyburd/redigo/LICENSE b/vendor/github.com/gomodule/redigo/LICENSE similarity index 100% rename from vendor/github.com/garyburd/redigo/LICENSE rename to vendor/github.com/gomodule/redigo/LICENSE diff --git a/vendor/github.com/gomodule/redigo/README.markdown b/vendor/github.com/gomodule/redigo/README.markdown new file mode 100644 index 00000000..114cbc20 --- /dev/null +++ b/vendor/github.com/gomodule/redigo/README.markdown @@ -0,0 +1,51 @@ +Redigo +====== + +[![Build Status](https://travis-ci.org/gomodule/redigo.svg?branch=master)](https://travis-ci.org/gomodule/redigo) +[![GoDoc](https://godoc.org/github.com/gomodule/redigo/redis?status.svg)](https://godoc.org/github.com/gomodule/redigo/redis) + +Redigo is a [Go](http://golang.org/) client for the [Redis](http://redis.io/) database. + +Features +------- + +* A [Print-like](http://godoc.org/github.com/gomodule/redigo/redis#hdr-Executing_Commands) API with support for all Redis commands. +* [Pipelining](http://godoc.org/github.com/gomodule/redigo/redis#hdr-Pipelining), including pipelined transactions. +* [Publish/Subscribe](http://godoc.org/github.com/gomodule/redigo/redis#hdr-Publish_and_Subscribe). +* [Connection pooling](http://godoc.org/github.com/gomodule/redigo/redis#Pool). +* [Script helper type](http://godoc.org/github.com/gomodule/redigo/redis#Script) with optimistic use of EVALSHA. +* [Helper functions](http://godoc.org/github.com/gomodule/redigo/redis#hdr-Reply_Helpers) for working with command replies. + +Documentation +------------- + +- [API Reference](http://godoc.org/github.com/gomodule/redigo/redis) +- [FAQ](https://github.com/gomodule/redigo/wiki/FAQ) +- [Examples](https://godoc.org/github.com/gomodule/redigo/redis#pkg-examples) + +Installation +------------ + +Install Redigo using the "go get" command: + + go get github.com/gomodule/redigo/redis + +The Go distribution is Redigo's only dependency. + +Related Projects +---------------- + +- [rafaeljusto/redigomock](https://godoc.org/github.com/rafaeljusto/redigomock) - A mock library for Redigo. +- [chasex/redis-go-cluster](https://github.com/chasex/redis-go-cluster) - A Redis cluster client implementation. +- [FZambia/go-sentinel](https://github.com/FZambia/go-sentinel) - Redis Sentinel support for Redigo +- [PuerkitoBio/redisc](https://github.com/PuerkitoBio/redisc) - Redis Cluster client built on top of Redigo + +Contributing +------------ + +See [CONTRIBUTING.md](https://github.com/gomodule/redigo/blob/master/.github/CONTRIBUTING.md). + +License +------- + +Redigo is available under the [Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.html). diff --git a/vendor/github.com/garyburd/redigo/internal/commandinfo.go b/vendor/github.com/gomodule/redigo/internal/commandinfo.go similarity index 95% rename from vendor/github.com/garyburd/redigo/internal/commandinfo.go rename to vendor/github.com/gomodule/redigo/internal/commandinfo.go index 11e58425..b763efbd 100644 --- a/vendor/github.com/garyburd/redigo/internal/commandinfo.go +++ b/vendor/github.com/gomodule/redigo/internal/commandinfo.go @@ -12,7 +12,7 @@ // License for the specific language governing permissions and limitations // under the License. -package internal // import "github.com/garyburd/redigo/internal" +package internal // import "github.com/gomodule/redigo/internal" import ( "strings" diff --git a/vendor/github.com/gomodule/redigo/internal/commandinfo_test.go b/vendor/github.com/gomodule/redigo/internal/commandinfo_test.go new file mode 100644 index 00000000..118e94b6 --- /dev/null +++ b/vendor/github.com/gomodule/redigo/internal/commandinfo_test.go @@ -0,0 +1,27 @@ +package internal + +import "testing" + +func TestLookupCommandInfo(t *testing.T) { + for _, n := range []string{"watch", "WATCH", "wAtch"} { + if LookupCommandInfo(n) == (CommandInfo{}) { + t.Errorf("LookupCommandInfo(%q) = CommandInfo{}, expected non-zero value", n) + } + } +} + +func benchmarkLookupCommandInfo(b *testing.B, names ...string) { + for i := 0; i < b.N; i++ { + for _, c := range names { + LookupCommandInfo(c) + } + } +} + +func BenchmarkLookupCommandInfoCorrectCase(b *testing.B) { + benchmarkLookupCommandInfo(b, "watch", "WATCH", "monitor", "MONITOR") +} + +func BenchmarkLookupCommandInfoMixedCase(b *testing.B) { + benchmarkLookupCommandInfo(b, "wAtch", "WeTCH", "monItor", "MONiTOR") +} diff --git a/vendor/github.com/gomodule/redigo/internal/redistest/testdb.go b/vendor/github.com/gomodule/redigo/internal/redistest/testdb.go new file mode 100644 index 00000000..8b05662e --- /dev/null +++ b/vendor/github.com/gomodule/redigo/internal/redistest/testdb.go @@ -0,0 +1,68 @@ +// Copyright 2014 Gary Burd +// +// Licensed under the Apache License, Version 2.0 (the "License"): you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +// Package redistest contains utilities for writing Redigo tests. +package redistest + +import ( + "errors" + "time" + + "github.com/gomodule/redigo/redis" +) + +type testConn struct { + redis.Conn +} + +func (t testConn) Close() error { + _, err := t.Conn.Do("SELECT", "9") + if err != nil { + return nil + } + _, err = t.Conn.Do("FLUSHDB") + if err != nil { + return err + } + return t.Conn.Close() +} + +// Dial dials the local Redis server and selects database 9. To prevent +// stomping on real data, DialTestDB fails if database 9 contains data. The +// returned connection flushes database 9 on close. +func Dial() (redis.Conn, error) { + c, err := redis.DialTimeout("tcp", ":6379", 0, 1*time.Second, 1*time.Second) + if err != nil { + return nil, err + } + + _, err = c.Do("SELECT", "9") + if err != nil { + c.Close() + return nil, err + } + + n, err := redis.Int(c.Do("DBSIZE")) + if err != nil { + c.Close() + return nil, err + } + + if n != 0 { + c.Close() + return nil, errors.New("database #9 is not empty, test can not continue") + } + + return testConn{c}, nil +} diff --git a/vendor/github.com/garyburd/redigo/redis/conn.go b/vendor/github.com/gomodule/redigo/redis/conn.go similarity index 100% rename from vendor/github.com/garyburd/redigo/redis/conn.go rename to vendor/github.com/gomodule/redigo/redis/conn.go diff --git a/vendor/github.com/gomodule/redigo/redis/conn_test.go b/vendor/github.com/gomodule/redigo/redis/conn_test.go new file mode 100644 index 00000000..63a4c23d --- /dev/null +++ b/vendor/github.com/gomodule/redigo/redis/conn_test.go @@ -0,0 +1,867 @@ +// Copyright 2012 Gary Burd +// +// Licensed under the Apache License, Version 2.0 (the "License"): you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package redis_test + +import ( + "bytes" + "crypto/tls" + "crypto/x509" + "fmt" + "io" + "math" + "net" + "os" + "reflect" + "strings" + "testing" + "time" + + "github.com/gomodule/redigo/redis" +) + +type testConn struct { + io.Reader + io.Writer + readDeadline time.Time + writeDeadline time.Time +} + +func (*testConn) Close() error { return nil } +func (*testConn) LocalAddr() net.Addr { return nil } +func (*testConn) RemoteAddr() net.Addr { return nil } +func (c *testConn) SetDeadline(t time.Time) error { c.readDeadline = t; c.writeDeadline = t; return nil } +func (c *testConn) SetReadDeadline(t time.Time) error { c.readDeadline = t; return nil } +func (c *testConn) SetWriteDeadline(t time.Time) error { c.writeDeadline = t; return nil } + +func dialTestConn(r string, w io.Writer) redis.DialOption { + return redis.DialNetDial(func(network, addr string) (net.Conn, error) { + return &testConn{Reader: strings.NewReader(r), Writer: w}, nil + }) +} + +type tlsTestConn struct { + net.Conn + done chan struct{} +} + +func (c *tlsTestConn) Close() error { + c.Conn.Close() + <-c.done + return nil +} + +func dialTestConnTLS(r string, w io.Writer) redis.DialOption { + return redis.DialNetDial(func(network, addr string) (net.Conn, error) { + client, server := net.Pipe() + tlsServer := tls.Server(server, &serverTLSConfig) + go io.Copy(tlsServer, strings.NewReader(r)) + done := make(chan struct{}) + go func() { + io.Copy(w, tlsServer) + close(done) + }() + return &tlsTestConn{Conn: client, done: done}, nil + }) +} + +type durationArg struct { + time.Duration +} + +func (t durationArg) RedisArg() interface{} { + return t.Seconds() +} + +type recursiveArg int + +func (v recursiveArg) RedisArg() interface{} { return v } + +var writeTests = []struct { + args []interface{} + expected string +}{ + { + []interface{}{"SET", "key", "value"}, + "*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n", + }, + { + []interface{}{"SET", "key", "value"}, + "*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n", + }, + { + []interface{}{"SET", "key", byte(100)}, + "*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$3\r\n100\r\n", + }, + { + []interface{}{"SET", "key", 100}, + "*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$3\r\n100\r\n", + }, + { + []interface{}{"SET", "key", int64(math.MinInt64)}, + "*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$20\r\n-9223372036854775808\r\n", + }, + { + []interface{}{"SET", "key", float64(1349673917.939762)}, + "*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$21\r\n1.349673917939762e+09\r\n", + }, + { + []interface{}{"SET", "key", ""}, + "*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$0\r\n\r\n", + }, + { + []interface{}{"SET", "key", nil}, + "*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$0\r\n\r\n", + }, + { + []interface{}{"SET", "key", durationArg{time.Minute}}, + "*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$2\r\n60\r\n", + }, + { + []interface{}{"SET", "key", recursiveArg(123)}, + "*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$3\r\n123\r\n", + }, + { + []interface{}{"ECHO", true, false}, + "*3\r\n$4\r\nECHO\r\n$1\r\n1\r\n$1\r\n0\r\n", + }, +} + +func TestWrite(t *testing.T) { + for _, tt := range writeTests { + var buf bytes.Buffer + c, _ := redis.Dial("", "", dialTestConn("", &buf)) + err := c.Send(tt.args[0].(string), tt.args[1:]...) + if err != nil { + t.Errorf("Send(%v) returned error %v", tt.args, err) + continue + } + c.Flush() + actual := buf.String() + if actual != tt.expected { + t.Errorf("Send(%v) = %q, want %q", tt.args, actual, tt.expected) + } + } +} + +var errorSentinel = &struct{}{} + +var readTests = []struct { + reply string + expected interface{} +}{ + { + "+OK\r\n", + "OK", + }, + { + "+PONG\r\n", + "PONG", + }, + { + "@OK\r\n", + errorSentinel, + }, + { + "$6\r\nfoobar\r\n", + []byte("foobar"), + }, + { + "$-1\r\n", + nil, + }, + { + ":1\r\n", + int64(1), + }, + { + ":-2\r\n", + int64(-2), + }, + { + "*0\r\n", + []interface{}{}, + }, + { + "*-1\r\n", + nil, + }, + { + "*4\r\n$3\r\nfoo\r\n$3\r\nbar\r\n$5\r\nHello\r\n$5\r\nWorld\r\n", + []interface{}{[]byte("foo"), []byte("bar"), []byte("Hello"), []byte("World")}, + }, + { + "*3\r\n$3\r\nfoo\r\n$-1\r\n$3\r\nbar\r\n", + []interface{}{[]byte("foo"), nil, []byte("bar")}, + }, + + { + // "x" is not a valid length + "$x\r\nfoobar\r\n", + errorSentinel, + }, + { + // -2 is not a valid length + "$-2\r\n", + errorSentinel, + }, + { + // "x" is not a valid integer + ":x\r\n", + errorSentinel, + }, + { + // missing \r\n following value + "$6\r\nfoobar", + errorSentinel, + }, + { + // short value + "$6\r\nxx", + errorSentinel, + }, + { + // long value + "$6\r\nfoobarx\r\n", + errorSentinel, + }, +} + +func TestRead(t *testing.T) { + for _, tt := range readTests { + c, _ := redis.Dial("", "", dialTestConn(tt.reply, nil)) + actual, err := c.Receive() + if tt.expected == errorSentinel { + if err == nil { + t.Errorf("Receive(%q) did not return expected error", tt.reply) + } + } else { + if err != nil { + t.Errorf("Receive(%q) returned error %v", tt.reply, err) + continue + } + if !reflect.DeepEqual(actual, tt.expected) { + t.Errorf("Receive(%q) = %v, want %v", tt.reply, actual, tt.expected) + } + } + } +} + +var testCommands = []struct { + args []interface{} + expected interface{} +}{ + { + []interface{}{"PING"}, + "PONG", + }, + { + []interface{}{"SET", "foo", "bar"}, + "OK", + }, + { + []interface{}{"GET", "foo"}, + []byte("bar"), + }, + { + []interface{}{"GET", "nokey"}, + nil, + }, + { + []interface{}{"MGET", "nokey", "foo"}, + []interface{}{nil, []byte("bar")}, + }, + { + []interface{}{"INCR", "mycounter"}, + int64(1), + }, + { + []interface{}{"LPUSH", "mylist", "foo"}, + int64(1), + }, + { + []interface{}{"LPUSH", "mylist", "bar"}, + int64(2), + }, + { + []interface{}{"LRANGE", "mylist", 0, -1}, + []interface{}{[]byte("bar"), []byte("foo")}, + }, + { + []interface{}{"MULTI"}, + "OK", + }, + { + []interface{}{"LRANGE", "mylist", 0, -1}, + "QUEUED", + }, + { + []interface{}{"PING"}, + "QUEUED", + }, + { + []interface{}{"EXEC"}, + []interface{}{ + []interface{}{[]byte("bar"), []byte("foo")}, + "PONG", + }, + }, +} + +func TestDoCommands(t *testing.T) { + c, err := redis.DialDefaultServer() + if err != nil { + t.Fatalf("error connection to database, %v", err) + } + defer c.Close() + + for _, cmd := range testCommands { + actual, err := c.Do(cmd.args[0].(string), cmd.args[1:]...) + if err != nil { + t.Errorf("Do(%v) returned error %v", cmd.args, err) + continue + } + if !reflect.DeepEqual(actual, cmd.expected) { + t.Errorf("Do(%v) = %v, want %v", cmd.args, actual, cmd.expected) + } + } +} + +func TestPipelineCommands(t *testing.T) { + c, err := redis.DialDefaultServer() + if err != nil { + t.Fatalf("error connection to database, %v", err) + } + defer c.Close() + + for _, cmd := range testCommands { + if err := c.Send(cmd.args[0].(string), cmd.args[1:]...); err != nil { + t.Fatalf("Send(%v) returned error %v", cmd.args, err) + } + } + if err := c.Flush(); err != nil { + t.Errorf("Flush() returned error %v", err) + } + for _, cmd := range testCommands { + actual, err := c.Receive() + if err != nil { + t.Fatalf("Receive(%v) returned error %v", cmd.args, err) + } + if !reflect.DeepEqual(actual, cmd.expected) { + t.Errorf("Receive(%v) = %v, want %v", cmd.args, actual, cmd.expected) + } + } +} + +func TestBlankCommmand(t *testing.T) { + c, err := redis.DialDefaultServer() + if err != nil { + t.Fatalf("error connection to database, %v", err) + } + defer c.Close() + + for _, cmd := range testCommands { + if err := c.Send(cmd.args[0].(string), cmd.args[1:]...); err != nil { + t.Fatalf("Send(%v) returned error %v", cmd.args, err) + } + } + reply, err := redis.Values(c.Do("")) + if err != nil { + t.Fatalf("Do() returned error %v", err) + } + if len(reply) != len(testCommands) { + t.Fatalf("len(reply)=%d, want %d", len(reply), len(testCommands)) + } + for i, cmd := range testCommands { + actual := reply[i] + if !reflect.DeepEqual(actual, cmd.expected) { + t.Errorf("Receive(%v) = %v, want %v", cmd.args, actual, cmd.expected) + } + } +} + +func TestRecvBeforeSend(t *testing.T) { + c, err := redis.DialDefaultServer() + if err != nil { + t.Fatalf("error connection to database, %v", err) + } + defer c.Close() + done := make(chan struct{}) + go func() { + c.Receive() + close(done) + }() + time.Sleep(time.Millisecond) + c.Send("PING") + c.Flush() + <-done + _, err = c.Do("") + if err != nil { + t.Fatalf("error=%v", err) + } +} + +func TestError(t *testing.T) { + c, err := redis.DialDefaultServer() + if err != nil { + t.Fatalf("error connection to database, %v", err) + } + defer c.Close() + + c.Do("SET", "key", "val") + _, err = c.Do("HSET", "key", "fld", "val") + if err == nil { + t.Errorf("Expected err for HSET on string key.") + } + if c.Err() != nil { + t.Errorf("Conn has Err()=%v, expect nil", c.Err()) + } + _, err = c.Do("SET", "key", "val") + if err != nil { + t.Errorf("Do(SET, key, val) returned error %v, expected nil.", err) + } +} + +func TestReadTimeout(t *testing.T) { + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("net.Listen returned %v", err) + } + defer l.Close() + + go func() { + for { + c, err := l.Accept() + if err != nil { + return + } + go func() { + time.Sleep(time.Second) + c.Write([]byte("+OK\r\n")) + c.Close() + }() + } + }() + + // Do + + c1, err := redis.Dial(l.Addr().Network(), l.Addr().String(), redis.DialReadTimeout(time.Millisecond)) + if err != nil { + t.Fatalf("redis.Dial returned %v", err) + } + defer c1.Close() + + _, err = c1.Do("PING") + if err == nil { + t.Fatalf("c1.Do() returned nil, expect error") + } + if c1.Err() == nil { + t.Fatalf("c1.Err() = nil, expect error") + } + + // Send/Flush/Receive + + c2, err := redis.Dial(l.Addr().Network(), l.Addr().String(), redis.DialReadTimeout(time.Millisecond)) + if err != nil { + t.Fatalf("redis.Dial returned %v", err) + } + defer c2.Close() + + c2.Send("PING") + c2.Flush() + _, err = c2.Receive() + if err == nil { + t.Fatalf("c2.Receive() returned nil, expect error") + } + if c2.Err() == nil { + t.Fatalf("c2.Err() = nil, expect error") + } +} + +var dialErrors = []struct { + rawurl string + expectedError string +}{ + { + "localhost", + "invalid redis URL scheme", + }, + // The error message for invalid hosts is different in different + // versions of Go, so just check that there is an error message. + { + "redis://weird url", + "", + }, + { + "redis://foo:bar:baz", + "", + }, + { + "http://www.google.com", + "invalid redis URL scheme: http", + }, + { + "redis://localhost:6379/abc123", + "invalid database: abc123", + }, +} + +func TestDialURLErrors(t *testing.T) { + for _, d := range dialErrors { + _, err := redis.DialURL(d.rawurl) + if err == nil || !strings.Contains(err.Error(), d.expectedError) { + t.Errorf("DialURL did not return expected error (expected %v to contain %s)", err, d.expectedError) + } + } +} + +func TestDialURLPort(t *testing.T) { + checkPort := func(network, address string) (net.Conn, error) { + if address != "localhost:6379" { + t.Errorf("DialURL did not set port to 6379 by default (got %v)", address) + } + return nil, nil + } + _, err := redis.DialURL("redis://localhost", redis.DialNetDial(checkPort)) + if err != nil { + t.Error("dial error:", err) + } +} + +func TestDialURLHost(t *testing.T) { + checkHost := func(network, address string) (net.Conn, error) { + if address != "localhost:6379" { + t.Errorf("DialURL did not set host to localhost by default (got %v)", address) + } + return nil, nil + } + _, err := redis.DialURL("redis://:6379", redis.DialNetDial(checkHost)) + if err != nil { + t.Error("dial error:", err) + } +} + +var dialURLTests = []struct { + description string + url string + r string + w string +}{ + {"password", "redis://x:abc123@localhost", "+OK\r\n", "*2\r\n$4\r\nAUTH\r\n$6\r\nabc123\r\n"}, + {"database 3", "redis://localhost/3", "+OK\r\n", "*2\r\n$6\r\nSELECT\r\n$1\r\n3\r\n"}, + {"database 99", "redis://localhost/99", "+OK\r\n", "*2\r\n$6\r\nSELECT\r\n$2\r\n99\r\n"}, + {"no database", "redis://localhost/", "+OK\r\n", ""}, +} + +func TestDialURL(t *testing.T) { + for _, tt := range dialURLTests { + var buf bytes.Buffer + // UseTLS should be ignored in all of these tests. + _, err := redis.DialURL(tt.url, dialTestConn(tt.r, &buf), redis.DialUseTLS(true)) + if err != nil { + t.Errorf("%s dial error: %v", tt.description, err) + continue + } + if w := buf.String(); w != tt.w { + t.Errorf("%s commands = %q, want %q", tt.description, w, tt.w) + } + } +} + +func checkPingPong(t *testing.T, buf *bytes.Buffer, c redis.Conn) { + resp, err := c.Do("PING") + if err != nil { + t.Fatal("ping error:", err) + } + // Close connection to ensure that writes to buf are complete. + c.Close() + expected := "*1\r\n$4\r\nPING\r\n" + actual := buf.String() + if actual != expected { + t.Errorf("commands = %q, want %q", actual, expected) + } + if resp != "PONG" { + t.Errorf("resp = %v, want %v", resp, "PONG") + } +} + +const pingResponse = "+PONG\r\n" + +func TestDialURLTLS(t *testing.T) { + var buf bytes.Buffer + c, err := redis.DialURL("rediss://example.com/", + redis.DialTLSConfig(&clientTLSConfig), + dialTestConnTLS(pingResponse, &buf)) + if err != nil { + t.Fatal("dial error:", err) + } + checkPingPong(t, &buf, c) +} + +func TestDialUseTLS(t *testing.T) { + var buf bytes.Buffer + c, err := redis.Dial("tcp", "example.com:6379", + redis.DialTLSConfig(&clientTLSConfig), + dialTestConnTLS(pingResponse, &buf), + redis.DialUseTLS(true)) + if err != nil { + t.Fatal("dial error:", err) + } + checkPingPong(t, &buf, c) +} + +func TestDialTLSSKipVerify(t *testing.T) { + var buf bytes.Buffer + c, err := redis.Dial("tcp", "example.com:6379", + dialTestConnTLS(pingResponse, &buf), + redis.DialTLSSkipVerify(true), + redis.DialUseTLS(true)) + if err != nil { + t.Fatal("dial error:", err) + } + checkPingPong(t, &buf, c) +} + +// Connect to local instance of Redis running on the default port. +func ExampleDial() { + c, err := redis.Dial("tcp", ":6379") + if err != nil { + // handle error + } + defer c.Close() +} + +// Connect to remote instance of Redis using a URL. +func ExampleDialURL() { + c, err := redis.DialURL(os.Getenv("REDIS_URL")) + if err != nil { + // handle connection error + } + defer c.Close() +} + +// TextExecError tests handling of errors in a transaction. See +// http://redis.io/topics/transactions for information on how Redis handles +// errors in a transaction. +func TestExecError(t *testing.T) { + c, err := redis.DialDefaultServer() + if err != nil { + t.Fatalf("error connection to database, %v", err) + } + defer c.Close() + + // Execute commands that fail before EXEC is called. + + c.Do("DEL", "k0") + c.Do("ZADD", "k0", 0, 0) + c.Send("MULTI") + c.Send("NOTACOMMAND", "k0", 0, 0) + c.Send("ZINCRBY", "k0", 0, 0) + v, err := c.Do("EXEC") + if err == nil { + t.Fatalf("EXEC returned values %v, expected error", v) + } + + // Execute commands that fail after EXEC is called. The first command + // returns an error. + + c.Do("DEL", "k1") + c.Do("ZADD", "k1", 0, 0) + c.Send("MULTI") + c.Send("HSET", "k1", 0, 0) + c.Send("ZINCRBY", "k1", 0, 0) + v, err = c.Do("EXEC") + if err != nil { + t.Fatalf("EXEC returned error %v", err) + } + + vs, err := redis.Values(v, nil) + if err != nil { + t.Fatalf("Values(v) returned error %v", err) + } + + if len(vs) != 2 { + t.Fatalf("len(vs) == %d, want 2", len(vs)) + } + + if _, ok := vs[0].(error); !ok { + t.Fatalf("first result is type %T, expected error", vs[0]) + } + + if _, ok := vs[1].([]byte); !ok { + t.Fatalf("second result is type %T, expected []byte", vs[1]) + } + + // Execute commands that fail after EXEC is called. The second command + // returns an error. + + c.Do("ZADD", "k2", 0, 0) + c.Send("MULTI") + c.Send("ZINCRBY", "k2", 0, 0) + c.Send("HSET", "k2", 0, 0) + v, err = c.Do("EXEC") + if err != nil { + t.Fatalf("EXEC returned error %v", err) + } + + vs, err = redis.Values(v, nil) + if err != nil { + t.Fatalf("Values(v) returned error %v", err) + } + + if len(vs) != 2 { + t.Fatalf("len(vs) == %d, want 2", len(vs)) + } + + if _, ok := vs[0].([]byte); !ok { + t.Fatalf("first result is type %T, expected []byte", vs[0]) + } + + if _, ok := vs[1].(error); !ok { + t.Fatalf("second result is type %T, expected error", vs[2]) + } +} + +func BenchmarkDoEmpty(b *testing.B) { + b.StopTimer() + c, err := redis.DialDefaultServer() + if err != nil { + b.Fatal(err) + } + defer c.Close() + b.StartTimer() + for i := 0; i < b.N; i++ { + if _, err := c.Do(""); err != nil { + b.Fatal(err) + } + } +} + +func BenchmarkDoPing(b *testing.B) { + b.StopTimer() + c, err := redis.DialDefaultServer() + if err != nil { + b.Fatal(err) + } + defer c.Close() + b.StartTimer() + for i := 0; i < b.N; i++ { + if _, err := c.Do("PING"); err != nil { + b.Fatal(err) + } + } +} + +var clientTLSConfig, serverTLSConfig tls.Config + +func init() { + // The certificate and key for testing TLS dial options was created + // using the command + // + // go run GOROOT/src/crypto/tls/generate_cert.go \ + // --rsa-bits 1024 \ + // --host 127.0.0.1,::1,example.com --ca \ + // --start-date "Jan 1 00:00:00 1970" \ + // --duration=1000000h + // + // where GOROOT is the value of GOROOT reported by go env. + localhostCert := []byte(` +-----BEGIN CERTIFICATE----- +MIICFDCCAX2gAwIBAgIRAJfBL4CUxkXcdlFurb3K+iowDQYJKoZIhvcNAQELBQAw +EjEQMA4GA1UEChMHQWNtZSBDbzAgFw03MDAxMDEwMDAwMDBaGA8yMDg0MDEyOTE2 +MDAwMFowEjEQMA4GA1UEChMHQWNtZSBDbzCBnzANBgkqhkiG9w0BAQEFAAOBjQAw +gYkCgYEArizw8WxMUQ3bGHLeuJ4fDrEpy+L2pqrbYRlKk1DasJ/VkB8bImzIpe6+ +LGjiYIxvnDCOJ3f3QplcQuiuMyl6f2irJlJsbFT8Lo/3obnuTKAIaqUdJUqBg6y+ +JaL8Auk97FvunfKFv8U1AIhgiLzAfQ/3Eaq1yi87Ra6pMjGbTtcCAwEAAaNoMGYw +DgYDVR0PAQH/BAQDAgKkMBMGA1UdJQQMMAoGCCsGAQUFBwMBMA8GA1UdEwEB/wQF +MAMBAf8wLgYDVR0RBCcwJYILZXhhbXBsZS5jb22HBH8AAAGHEAAAAAAAAAAAAAAA +AAAAAAEwDQYJKoZIhvcNAQELBQADgYEAdZ8daIVkyhVwflt5I19m0oq1TycbGO1+ +ach7T6cZiBQeNR/SJtxr/wKPEpmvUgbv2BfFrKJ8QoIHYsbNSURTWSEa02pfw4k9 +6RQhij3ZkG79Ituj5OYRORV6Z0HUW32r670BtcuHuAhq7YA6Nxy4FtSt7bAlVdRt +rrKgNsltzMk= +-----END CERTIFICATE-----`) + + localhostKey := []byte(` +-----BEGIN RSA PRIVATE KEY----- +MIICXAIBAAKBgQCuLPDxbExRDdsYct64nh8OsSnL4vamqtthGUqTUNqwn9WQHxsi +bMil7r4saOJgjG+cMI4nd/dCmVxC6K4zKXp/aKsmUmxsVPwuj/ehue5MoAhqpR0l +SoGDrL4lovwC6T3sW+6d8oW/xTUAiGCIvMB9D/cRqrXKLztFrqkyMZtO1wIDAQAB +AoGACrc5G6FOEK6JjDeE/Fa+EmlT6PdNtXNNi+vCas3Opo8u1G8VfEi1D4BgstrB +Eq+RLkrOdB8tVyuYQYWPMhabMqF+hhKJN72j0OwfuPlVvTInwb/cKjo/zbH1IA+Y +HenHNK4ywv7/p/9/MvQPJ3I32cQBCgGUW5chVSH5M1sj5gECQQDabQAI1X0uDqCm +KbX9gXVkAgxkFddrt6LBHt57xujFcqEKFE7nwKhDh7DweVs/VEJ+kpid4z+UnLOw +KjtP9JolAkEAzCNBphQ//IsbH5rNs10wIUw3Ks/Oepicvr6kUFbIv+neRzi1iJHa +m6H7EayK3PWgax6BAsR/t0Jc9XV7r2muSwJAVzN09BHnK+ADGtNEKLTqXMbEk6B0 +pDhn7ZmZUOkUPN+Kky+QYM11X6Bob1jDqQDGmymDbGUxGO+GfSofC8inUQJAGfci +Eo3g1a6b9JksMPRZeuLG4ZstGErxJRH6tH1Va5PDwitka8qhk8o2tTjNMO3NSdLH +diKoXBcE2/Pll5pJoQJBAIMiiMIzXJhnN4mX8may44J/HvMlMf2xuVH2gNMwmZuc +Bjqn3yoLHaoZVvbWOi0C2TCN4FjXjaLNZGifQPbIcaA= +-----END RSA PRIVATE KEY-----`) + + cert, err := tls.X509KeyPair(localhostCert, localhostKey) + if err != nil { + panic(fmt.Sprintf("error creating key pair: %v", err)) + } + serverTLSConfig.Certificates = []tls.Certificate{cert} + + certificate, err := x509.ParseCertificate(serverTLSConfig.Certificates[0].Certificate[0]) + if err != nil { + panic(fmt.Sprintf("error parsing x509 certificate: %v", err)) + } + + clientTLSConfig.RootCAs = x509.NewCertPool() + clientTLSConfig.RootCAs.AddCert(certificate) +} + +func TestWithTimeout(t *testing.T) { + for _, recv := range []bool{true, false} { + for _, defaultTimout := range []time.Duration{0, time.Minute} { + var buf bytes.Buffer + nc := &testConn{Reader: strings.NewReader("+OK\r\n+OK\r\n+OK\r\n+OK\r\n+OK\r\n+OK\r\n+OK\r\n+OK\r\n+OK\r\n+OK\r\n"), Writer: &buf} + c, _ := redis.Dial("", "", redis.DialReadTimeout(defaultTimout), redis.DialNetDial(func(network, addr string) (net.Conn, error) { return nc, nil })) + for i := 0; i < 4; i++ { + var minDeadline, maxDeadline time.Time + + // Alternate between default and specified timeout. + if i%2 == 0 { + if defaultTimout != 0 { + minDeadline = time.Now().Add(defaultTimout) + } + if recv { + c.Receive() + } else { + c.Do("PING") + } + if defaultTimout != 0 { + maxDeadline = time.Now().Add(defaultTimout) + } + } else { + timeout := 10 * time.Minute + minDeadline = time.Now().Add(timeout) + if recv { + redis.ReceiveWithTimeout(c, timeout) + } else { + redis.DoWithTimeout(c, timeout, "PING") + } + maxDeadline = time.Now().Add(timeout) + } + + // Expect set deadline in expected range. + if nc.readDeadline.Before(minDeadline) || nc.readDeadline.After(maxDeadline) { + t.Errorf("recv %v, %d: do deadline error: %v, %v, %v", recv, i, minDeadline, nc.readDeadline, maxDeadline) + } + } + } + } +} diff --git a/vendor/github.com/garyburd/redigo/redis/doc.go b/vendor/github.com/gomodule/redigo/redis/doc.go similarity index 98% rename from vendor/github.com/garyburd/redigo/redis/doc.go rename to vendor/github.com/gomodule/redigo/redis/doc.go index 1d19c166..70ec1ea6 100644 --- a/vendor/github.com/garyburd/redigo/redis/doc.go +++ b/vendor/github.com/gomodule/redigo/redis/doc.go @@ -14,7 +14,7 @@ // Package redis is a client for the Redis database. // -// The Redigo FAQ (https://github.com/garyburd/redigo/wiki/FAQ) contains more +// The Redigo FAQ (https://github.com/gomodule/redigo/wiki/FAQ) contains more // documentation about this package. // // Connections @@ -174,4 +174,4 @@ // non-recoverable error such as a network error or protocol parsing error. If // Err() returns a non-nil value, then the connection is not usable and should // be closed. -package redis // import "github.com/garyburd/redigo/redis" +package redis // import "github.com/gomodule/redigo/redis" diff --git a/vendor/github.com/garyburd/redigo/redis/go16.go b/vendor/github.com/gomodule/redigo/redis/go16.go similarity index 100% rename from vendor/github.com/garyburd/redigo/redis/go16.go rename to vendor/github.com/gomodule/redigo/redis/go16.go diff --git a/vendor/github.com/garyburd/redigo/redis/go17.go b/vendor/github.com/gomodule/redigo/redis/go17.go similarity index 100% rename from vendor/github.com/garyburd/redigo/redis/go17.go rename to vendor/github.com/gomodule/redigo/redis/go17.go diff --git a/vendor/github.com/garyburd/redigo/redis/go18.go b/vendor/github.com/gomodule/redigo/redis/go18.go similarity index 100% rename from vendor/github.com/garyburd/redigo/redis/go18.go rename to vendor/github.com/gomodule/redigo/redis/go18.go diff --git a/vendor/github.com/gomodule/redigo/redis/list_test.go b/vendor/github.com/gomodule/redigo/redis/list_test.go new file mode 100644 index 00000000..9c34bede --- /dev/null +++ b/vendor/github.com/gomodule/redigo/redis/list_test.go @@ -0,0 +1,85 @@ +// Copyright 2018 Gary Burd +// +// Licensed under the Apache License, Version 2.0 (the "License"): you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +// +build go1.9 + +package redis + +import "testing" + +func TestPoolList(t *testing.T) { + var idle idleList + var a, b, c poolConn + + check := func(pcs ...*poolConn) { + if idle.count != len(pcs) { + t.Fatal("idle.count != len(pcs)") + } + if len(pcs) == 0 { + if idle.front != nil { + t.Fatalf("front not nil") + } + if idle.back != nil { + t.Fatalf("back not nil") + } + return + } + if idle.front != pcs[0] { + t.Fatal("front != pcs[0]") + } + if idle.back != pcs[len(pcs)-1] { + t.Fatal("back != pcs[len(pcs)-1]") + } + if idle.front.prev != nil { + t.Fatal("front.prev != nil") + } + if idle.back.next != nil { + t.Fatal("back.next != nil") + } + for i := 1; i < len(pcs)-1; i++ { + if pcs[i-1].next != pcs[i] { + t.Fatal("pcs[i-1].next != pcs[i]") + } + if pcs[i+1].prev != pcs[i] { + t.Fatal("pcs[i+1].prev != pcs[i]") + } + } + } + + idle.pushFront(&c) + check(&c) + idle.pushFront(&b) + check(&b, &c) + idle.pushFront(&a) + check(&a, &b, &c) + idle.popFront() + check(&b, &c) + idle.popFront() + check(&c) + idle.popFront() + check() + + idle.pushFront(&c) + check(&c) + idle.pushFront(&b) + check(&b, &c) + idle.pushFront(&a) + check(&a, &b, &c) + idle.popBack() + check(&a, &b) + idle.popBack() + check(&a) + idle.popBack() + check() +} diff --git a/vendor/github.com/garyburd/redigo/redis/log.go b/vendor/github.com/gomodule/redigo/redis/log.go similarity index 100% rename from vendor/github.com/garyburd/redigo/redis/log.go rename to vendor/github.com/gomodule/redigo/redis/log.go diff --git a/vendor/github.com/garyburd/redigo/redis/pool.go b/vendor/github.com/gomodule/redigo/redis/pool.go similarity index 54% rename from vendor/github.com/garyburd/redigo/redis/pool.go rename to vendor/github.com/gomodule/redigo/redis/pool.go index 5af97a05..d77da325 100644 --- a/vendor/github.com/garyburd/redigo/redis/pool.go +++ b/vendor/github.com/gomodule/redigo/redis/pool.go @@ -16,21 +16,21 @@ package redis import ( "bytes" - "container/list" "crypto/rand" "crypto/sha1" "errors" "io" "strconv" "sync" + "sync/atomic" "time" - "github.com/garyburd/redigo/internal" + "github.com/gomodule/redigo/internal" ) var ( - _ ConnWithTimeout = (*pooledConnection)(nil) - _ ConnWithTimeout = (*errorConnection)(nil) + _ ConnWithTimeout = (*activeConn)(nil) + _ ConnWithTimeout = (*errorConn)(nil) ) var nowFunc = time.Now // for testing @@ -150,19 +150,17 @@ type Pool struct { // for a connection to be returned to the pool before returning. Wait bool - // mu protects fields defined below. - mu sync.Mutex - cond *sync.Cond - closed bool - active int + // Close connections older than this duration. If the value is zero, then + // the pool does not close connections based on age. + MaxConnLifetime time.Duration - // Stack of idleConn with most recently used at the front. - idle list.List -} + chInitialized uint32 // set to 1 when field ch is initialized -type idleConn struct { - c Conn - t time.Time + mu sync.Mutex // mu protects the following fields + closed bool // set to true when the pool is closed. + active int // the number of open connections in the pool + ch chan struct{} // limits open connections when p.Wait is true + idle idleList // idle connections } // NewPool creates a new pool. @@ -178,16 +176,17 @@ func NewPool(newFn func() (Conn, error), maxIdle int) *Pool { // getting an underlying connection, then the connection Err, Do, Send, Flush // and Receive methods return that error. func (p *Pool) Get() Conn { - c, err := p.get() + pc, err := p.get(nil) if err != nil { - return errorConnection{err} + return errorConn{err} } - return &pooledConnection{p: p, c: c} + return &activeConn{p: p, pc: pc} } // PoolStats contains pool statistics. type PoolStats struct { - // ActiveCount is the number of connections in the pool. The count includes idle connections and connections in use. + // ActiveCount is the number of connections in the pool. The count includes + // idle connections and connections in use. ActiveCount int // IdleCount is the number of idle connections in the pool. IdleCount int @@ -198,14 +197,15 @@ func (p *Pool) Stats() PoolStats { p.mu.Lock() stats := PoolStats{ ActiveCount: p.active, - IdleCount: p.idle.Len(), + IdleCount: p.idle.count, } p.mu.Unlock() return stats } -// ActiveCount returns the number of connections in the pool. The count includes idle connections and connections in use. +// ActiveCount returns the number of connections in the pool. The count +// includes idle connections and connections in use. func (p *Pool) ActiveCount() int { p.mu.Lock() active := p.active @@ -216,7 +216,7 @@ func (p *Pool) ActiveCount() int { // IdleCount returns the number of idle connections in the pool. func (p *Pool) IdleCount() int { p.mu.Lock() - idle := p.idle.Len() + idle := p.idle.count p.mu.Unlock() return idle } @@ -224,137 +224,153 @@ func (p *Pool) IdleCount() int { // Close releases the resources used by the pool. func (p *Pool) Close() error { p.mu.Lock() - idle := p.idle - p.idle.Init() + if p.closed { + p.mu.Unlock() + return nil + } p.closed = true - p.active -= idle.Len() - if p.cond != nil { - p.cond.Broadcast() + p.active -= p.idle.count + pc := p.idle.front + p.idle.count = 0 + p.idle.front, p.idle.back = nil, nil + if p.ch != nil { + close(p.ch) } p.mu.Unlock() - for e := idle.Front(); e != nil; e = e.Next() { - e.Value.(idleConn).c.Close() + for ; pc != nil; pc = pc.next { + pc.c.Close() } return nil } -// release decrements the active count and signals waiters. The caller must -// hold p.mu during the call. -func (p *Pool) release() { - p.active -= 1 - if p.cond != nil { - p.cond.Signal() +func (p *Pool) lazyInit() { + // Fast path. + if atomic.LoadUint32(&p.chInitialized) == 1 { + return + } + // Slow path. + p.mu.Lock() + if p.chInitialized == 0 { + p.ch = make(chan struct{}, p.MaxActive) + if p.closed { + close(p.ch) + } else { + for i := 0; i < p.MaxActive; i++ { + p.ch <- struct{}{} + } + } + atomic.StoreUint32(&p.chInitialized, 1) } + p.mu.Unlock() } // get prunes stale connections and returns a connection from the idle list or // creates a new connection. -func (p *Pool) get() (Conn, error) { - p.mu.Lock() - - // Prune stale connections. - - if timeout := p.IdleTimeout; timeout > 0 { - for i, n := 0, p.idle.Len(); i < n; i++ { - e := p.idle.Back() - if e == nil { - break - } - ic := e.Value.(idleConn) - if ic.t.Add(timeout).After(nowFunc()) { - break +func (p *Pool) get(ctx interface { + Done() <-chan struct{} + Err() error +}) (*poolConn, error) { + + // Handle limit for p.Wait == true. + if p.Wait && p.MaxActive > 0 { + p.lazyInit() + if ctx == nil { + <-p.ch + } else { + select { + case <-p.ch: + case <-ctx.Done(): + return nil, ctx.Err() } - p.idle.Remove(e) - p.release() - p.mu.Unlock() - ic.c.Close() - p.mu.Lock() } } - for { - // Get idle connection. + p.mu.Lock() - for i, n := 0, p.idle.Len(); i < n; i++ { - e := p.idle.Front() - if e == nil { - break - } - ic := e.Value.(idleConn) - p.idle.Remove(e) - test := p.TestOnBorrow + // Prune stale connections at the back of the idle list. + if p.IdleTimeout > 0 { + n := p.idle.count + for i := 0; i < n && p.idle.back != nil && p.idle.back.t.Add(p.IdleTimeout).Before(nowFunc()); i++ { + pc := p.idle.back + p.idle.popBack() p.mu.Unlock() - if test == nil || test(ic.c, ic.t) == nil { - return ic.c, nil - } - ic.c.Close() + pc.c.Close() p.mu.Lock() - p.release() + p.active-- } + } - // Check for pool closed before dialing a new connection. - - if p.closed { - p.mu.Unlock() - return nil, errors.New("redigo: get on closed pool") + // Get idle connection from the front of idle list. + for p.idle.front != nil { + pc := p.idle.front + p.idle.popFront() + p.mu.Unlock() + if (p.TestOnBorrow == nil || p.TestOnBorrow(pc.c, pc.t) == nil) && + (p.MaxConnLifetime == 0 || nowFunc().Sub(pc.created) < p.MaxConnLifetime) { + return pc, nil } + pc.c.Close() + p.mu.Lock() + p.active-- + } - // Dial new connection if under limit. - - if p.MaxActive == 0 || p.active < p.MaxActive { - dial := p.Dial - p.active += 1 - p.mu.Unlock() - c, err := dial() - if err != nil { - p.mu.Lock() - p.release() - p.mu.Unlock() - c = nil - } - return c, err - } + // Check for pool closed before dialing a new connection. + if p.closed { + p.mu.Unlock() + return nil, errors.New("redigo: get on closed pool") + } - if !p.Wait { - p.mu.Unlock() - return nil, ErrPoolExhausted - } + // Handle limit for p.Wait == false. + if !p.Wait && p.MaxActive > 0 && p.active >= p.MaxActive { + p.mu.Unlock() + return nil, ErrPoolExhausted + } - if p.cond == nil { - p.cond = sync.NewCond(&p.mu) + p.active++ + p.mu.Unlock() + c, err := p.Dial() + if err != nil { + c = nil + p.mu.Lock() + p.active-- + if p.ch != nil && !p.closed { + p.ch <- struct{}{} } - p.cond.Wait() + p.mu.Unlock() } + return &poolConn{c: c, created: nowFunc()}, err } -func (p *Pool) put(c Conn, forceClose bool) error { - err := c.Err() +func (p *Pool) put(pc *poolConn, forceClose bool) error { p.mu.Lock() - if !p.closed && err == nil && !forceClose { - p.idle.PushFront(idleConn{t: nowFunc(), c: c}) - if p.idle.Len() > p.MaxIdle { - c = p.idle.Remove(p.idle.Back()).(idleConn).c + if !p.closed && !forceClose { + pc.t = nowFunc() + p.idle.pushFront(pc) + if p.idle.count > p.MaxIdle { + pc = p.idle.back + p.idle.popBack() } else { - c = nil + pc = nil } } - if c == nil { - if p.cond != nil { - p.cond.Signal() - } + if pc != nil { p.mu.Unlock() - return nil + pc.c.Close() + p.mu.Lock() + p.active-- } - p.release() + if p.ch != nil && !p.closed { + p.ch <- struct{}{} + } p.mu.Unlock() - return c.Close() + return nil } -type pooledConnection struct { +type activeConn struct { p *Pool - c Conn + pc *poolConn state int } @@ -375,79 +391,107 @@ func initSentinel() { } } -func (pc *pooledConnection) Close() error { - c := pc.c - if _, ok := c.(errorConnection); ok { +func (ac *activeConn) Close() error { + pc := ac.pc + if pc == nil { return nil } - pc.c = errorConnection{errConnClosed} - - if pc.state&internal.MultiState != 0 { - c.Send("DISCARD") - pc.state &^= (internal.MultiState | internal.WatchState) - } else if pc.state&internal.WatchState != 0 { - c.Send("UNWATCH") - pc.state &^= internal.WatchState + ac.pc = nil + + if ac.state&internal.MultiState != 0 { + pc.c.Send("DISCARD") + ac.state &^= (internal.MultiState | internal.WatchState) + } else if ac.state&internal.WatchState != 0 { + pc.c.Send("UNWATCH") + ac.state &^= internal.WatchState } - if pc.state&internal.SubscribeState != 0 { - c.Send("UNSUBSCRIBE") - c.Send("PUNSUBSCRIBE") + if ac.state&internal.SubscribeState != 0 { + pc.c.Send("UNSUBSCRIBE") + pc.c.Send("PUNSUBSCRIBE") // To detect the end of the message stream, ask the server to echo // a sentinel value and read until we see that value. sentinelOnce.Do(initSentinel) - c.Send("ECHO", sentinel) - c.Flush() + pc.c.Send("ECHO", sentinel) + pc.c.Flush() for { - p, err := c.Receive() + p, err := pc.c.Receive() if err != nil { break } if p, ok := p.([]byte); ok && bytes.Equal(p, sentinel) { - pc.state &^= internal.SubscribeState + ac.state &^= internal.SubscribeState break } } } - c.Do("") - pc.p.put(c, pc.state != 0) + pc.c.Do("") + ac.p.put(pc, ac.state != 0 || pc.c.Err() != nil) return nil } -func (pc *pooledConnection) Err() error { +func (ac *activeConn) Err() error { + pc := ac.pc + if pc == nil { + return errConnClosed + } return pc.c.Err() } -func (pc *pooledConnection) Do(commandName string, args ...interface{}) (reply interface{}, err error) { +func (ac *activeConn) Do(commandName string, args ...interface{}) (reply interface{}, err error) { + pc := ac.pc + if pc == nil { + return nil, errConnClosed + } ci := internal.LookupCommandInfo(commandName) - pc.state = (pc.state | ci.Set) &^ ci.Clear + ac.state = (ac.state | ci.Set) &^ ci.Clear return pc.c.Do(commandName, args...) } -func (pc *pooledConnection) DoWithTimeout(timeout time.Duration, commandName string, args ...interface{}) (reply interface{}, err error) { +func (ac *activeConn) DoWithTimeout(timeout time.Duration, commandName string, args ...interface{}) (reply interface{}, err error) { + pc := ac.pc + if pc == nil { + return nil, errConnClosed + } cwt, ok := pc.c.(ConnWithTimeout) if !ok { return nil, errTimeoutNotSupported } ci := internal.LookupCommandInfo(commandName) - pc.state = (pc.state | ci.Set) &^ ci.Clear + ac.state = (ac.state | ci.Set) &^ ci.Clear return cwt.DoWithTimeout(timeout, commandName, args...) } -func (pc *pooledConnection) Send(commandName string, args ...interface{}) error { +func (ac *activeConn) Send(commandName string, args ...interface{}) error { + pc := ac.pc + if pc == nil { + return errConnClosed + } ci := internal.LookupCommandInfo(commandName) - pc.state = (pc.state | ci.Set) &^ ci.Clear + ac.state = (ac.state | ci.Set) &^ ci.Clear return pc.c.Send(commandName, args...) } -func (pc *pooledConnection) Flush() error { +func (ac *activeConn) Flush() error { + pc := ac.pc + if pc == nil { + return errConnClosed + } return pc.c.Flush() } -func (pc *pooledConnection) Receive() (reply interface{}, err error) { +func (ac *activeConn) Receive() (reply interface{}, err error) { + pc := ac.pc + if pc == nil { + return nil, errConnClosed + } return pc.c.Receive() } -func (pc *pooledConnection) ReceiveWithTimeout(timeout time.Duration) (reply interface{}, err error) { +func (ac *activeConn) ReceiveWithTimeout(timeout time.Duration) (reply interface{}, err error) { + pc := ac.pc + if pc == nil { + return nil, errConnClosed + } cwt, ok := pc.c.(ConnWithTimeout) if !ok { return nil, errTimeoutNotSupported @@ -455,15 +499,64 @@ func (pc *pooledConnection) ReceiveWithTimeout(timeout time.Duration) (reply int return cwt.ReceiveWithTimeout(timeout) } -type errorConnection struct{ err error } +type errorConn struct{ err error } -func (ec errorConnection) Do(string, ...interface{}) (interface{}, error) { return nil, ec.err } -func (ec errorConnection) DoWithTimeout(time.Duration, string, ...interface{}) (interface{}, error) { +func (ec errorConn) Do(string, ...interface{}) (interface{}, error) { return nil, ec.err } +func (ec errorConn) DoWithTimeout(time.Duration, string, ...interface{}) (interface{}, error) { return nil, ec.err } -func (ec errorConnection) Send(string, ...interface{}) error { return ec.err } -func (ec errorConnection) Err() error { return ec.err } -func (ec errorConnection) Close() error { return nil } -func (ec errorConnection) Flush() error { return ec.err } -func (ec errorConnection) Receive() (interface{}, error) { return nil, ec.err } -func (ec errorConnection) ReceiveWithTimeout(time.Duration) (interface{}, error) { return nil, ec.err } +func (ec errorConn) Send(string, ...interface{}) error { return ec.err } +func (ec errorConn) Err() error { return ec.err } +func (ec errorConn) Close() error { return nil } +func (ec errorConn) Flush() error { return ec.err } +func (ec errorConn) Receive() (interface{}, error) { return nil, ec.err } +func (ec errorConn) ReceiveWithTimeout(time.Duration) (interface{}, error) { return nil, ec.err } + +type idleList struct { + count int + front, back *poolConn +} + +type poolConn struct { + c Conn + t time.Time + created time.Time + next, prev *poolConn +} + +func (l *idleList) pushFront(pc *poolConn) { + pc.next = l.front + pc.prev = nil + if l.count == 0 { + l.back = pc + } else { + l.front.prev = pc + } + l.front = pc + l.count++ + return +} + +func (l *idleList) popFront() { + pc := l.front + l.count-- + if l.count == 0 { + l.front, l.back = nil, nil + } else { + pc.next.prev = nil + l.front = pc.next + } + pc.next, pc.prev = nil, nil +} + +func (l *idleList) popBack() { + pc := l.back + l.count-- + if l.count == 0 { + l.front, l.back = nil, nil + } else { + pc.prev.next = nil + l.back = pc.prev + } + pc.next, pc.prev = nil, nil +} diff --git a/vendor/github.com/gomodule/redigo/redis/pool17.go b/vendor/github.com/gomodule/redigo/redis/pool17.go new file mode 100644 index 00000000..c1ea18ee --- /dev/null +++ b/vendor/github.com/gomodule/redigo/redis/pool17.go @@ -0,0 +1,35 @@ +// Copyright 2018 Gary Burd +// +// Licensed under the Apache License, Version 2.0 (the "License"): you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +// +build go1.7 + +package redis + +import "context" + +// GetContext gets a connection using the provided context. +// +// The provided Context must be non-nil. If the context expires before the +// connection is complete, an error is returned. Any expiration on the context +// will not affect the returned connection. +// +// If the function completes without error, then the application must close the +// returned connection. +func (p *Pool) GetContext(ctx context.Context) (Conn, error) { + pc, err := p.get(ctx) + if err != nil { + return errorConn{err}, err + } + return &activeConn{p: p, pc: pc}, nil +} diff --git a/vendor/github.com/gomodule/redigo/redis/pool17_test.go b/vendor/github.com/gomodule/redigo/redis/pool17_test.go new file mode 100644 index 00000000..d97bf7a6 --- /dev/null +++ b/vendor/github.com/gomodule/redigo/redis/pool17_test.go @@ -0,0 +1,74 @@ +// Copyright 2018 Gary Burd +// +// Licensed under the Apache License, Version 2.0 (the "License"): you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +// +build go1.7 + +package redis_test + +import ( + "context" + "testing" + + "github.com/gomodule/redigo/redis" +) + +func TestWaitPoolGetContext(t *testing.T) { + d := poolDialer{t: t} + p := &redis.Pool{ + MaxIdle: 1, + MaxActive: 1, + Dial: d.dial, + Wait: true, + } + defer p.Close() + c, err := p.GetContext(context.Background()) + if err != nil { + t.Fatalf("GetContext returned %v", err) + } + defer c.Close() +} + +func TestWaitPoolGetAfterClose(t *testing.T) { + d := poolDialer{t: t} + p := &redis.Pool{ + MaxIdle: 1, + MaxActive: 1, + Dial: d.dial, + Wait: true, + } + p.Close() + _, err := p.GetContext(context.Background()) + if err == nil { + t.Fatal("expected error") + } +} + +func TestWaitPoolGetCanceledContext(t *testing.T) { + d := poolDialer{t: t} + p := &redis.Pool{ + MaxIdle: 1, + MaxActive: 1, + Dial: d.dial, + Wait: true, + } + defer p.Close() + ctx, f := context.WithCancel(context.Background()) + f() + c := p.Get() + defer c.Close() + _, err := p.GetContext(ctx) + if err != context.Canceled { + t.Fatalf("got error %v, want %v", err, context.Canceled) + } +} diff --git a/vendor/github.com/gomodule/redigo/redis/pool_test.go b/vendor/github.com/gomodule/redigo/redis/pool_test.go new file mode 100644 index 00000000..3509dd4e --- /dev/null +++ b/vendor/github.com/gomodule/redigo/redis/pool_test.go @@ -0,0 +1,746 @@ +// Copyright 2011 Gary Burd +// +// Licensed under the Apache License, Version 2.0 (the "License"): you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package redis_test + +import ( + "errors" + "io" + "reflect" + "sync" + "testing" + "time" + + "github.com/gomodule/redigo/redis" +) + +type poolTestConn struct { + d *poolDialer + err error + redis.Conn +} + +func (c *poolTestConn) Close() error { + c.d.mu.Lock() + c.d.open -= 1 + c.d.mu.Unlock() + return c.Conn.Close() +} + +func (c *poolTestConn) Err() error { return c.err } + +func (c *poolTestConn) Do(commandName string, args ...interface{}) (interface{}, error) { + if commandName == "ERR" { + c.err = args[0].(error) + commandName = "PING" + } + if commandName != "" { + c.d.commands = append(c.d.commands, commandName) + } + return c.Conn.Do(commandName, args...) +} + +func (c *poolTestConn) Send(commandName string, args ...interface{}) error { + c.d.commands = append(c.d.commands, commandName) + return c.Conn.Send(commandName, args...) +} + +type poolDialer struct { + mu sync.Mutex + t *testing.T + dialed int + open int + commands []string + dialErr error +} + +func (d *poolDialer) dial() (redis.Conn, error) { + d.mu.Lock() + d.dialed += 1 + dialErr := d.dialErr + d.mu.Unlock() + if dialErr != nil { + return nil, d.dialErr + } + c, err := redis.DialDefaultServer() + if err != nil { + return nil, err + } + d.mu.Lock() + d.open += 1 + d.mu.Unlock() + return &poolTestConn{d: d, Conn: c}, nil +} + +func (d *poolDialer) check(message string, p *redis.Pool, dialed, open, inuse int) { + d.mu.Lock() + if d.dialed != dialed { + d.t.Errorf("%s: dialed=%d, want %d", message, d.dialed, dialed) + } + if d.open != open { + d.t.Errorf("%s: open=%d, want %d", message, d.open, open) + } + + stats := p.Stats() + + if stats.ActiveCount != open { + d.t.Errorf("%s: active=%d, want %d", message, stats.ActiveCount, open) + } + if stats.IdleCount != open-inuse { + d.t.Errorf("%s: idle=%d, want %d", message, stats.IdleCount, open-inuse) + } + + d.mu.Unlock() +} + +func TestPoolReuse(t *testing.T) { + d := poolDialer{t: t} + p := &redis.Pool{ + MaxIdle: 2, + Dial: d.dial, + } + + for i := 0; i < 10; i++ { + c1 := p.Get() + c1.Do("PING") + c2 := p.Get() + c2.Do("PING") + c1.Close() + c2.Close() + } + + d.check("before close", p, 2, 2, 0) + p.Close() + d.check("after close", p, 2, 0, 0) +} + +func TestPoolMaxIdle(t *testing.T) { + d := poolDialer{t: t} + p := &redis.Pool{ + MaxIdle: 2, + Dial: d.dial, + } + defer p.Close() + + for i := 0; i < 10; i++ { + c1 := p.Get() + c1.Do("PING") + c2 := p.Get() + c2.Do("PING") + c3 := p.Get() + c3.Do("PING") + c1.Close() + c2.Close() + c3.Close() + } + d.check("before close", p, 12, 2, 0) + p.Close() + d.check("after close", p, 12, 0, 0) +} + +func TestPoolError(t *testing.T) { + d := poolDialer{t: t} + p := &redis.Pool{ + MaxIdle: 2, + Dial: d.dial, + } + defer p.Close() + + c := p.Get() + c.Do("ERR", io.EOF) + if c.Err() == nil { + t.Errorf("expected c.Err() != nil") + } + c.Close() + + c = p.Get() + c.Do("ERR", io.EOF) + c.Close() + + d.check(".", p, 2, 0, 0) +} + +func TestPoolClose(t *testing.T) { + d := poolDialer{t: t} + p := &redis.Pool{ + MaxIdle: 2, + Dial: d.dial, + } + defer p.Close() + + c1 := p.Get() + c1.Do("PING") + c2 := p.Get() + c2.Do("PING") + c3 := p.Get() + c3.Do("PING") + + c1.Close() + if _, err := c1.Do("PING"); err == nil { + t.Errorf("expected error after connection closed") + } + + c2.Close() + c2.Close() + + p.Close() + + d.check("after pool close", p, 3, 1, 1) + + if _, err := c1.Do("PING"); err == nil { + t.Errorf("expected error after connection and pool closed") + } + + c3.Close() + + d.check("after conn close", p, 3, 0, 0) + + c1 = p.Get() + if _, err := c1.Do("PING"); err == nil { + t.Errorf("expected error after pool closed") + } +} + +func TestPoolClosedConn(t *testing.T) { + d := poolDialer{t: t} + p := &redis.Pool{ + MaxIdle: 2, + IdleTimeout: 300 * time.Second, + Dial: d.dial, + } + defer p.Close() + c := p.Get() + if c.Err() != nil { + t.Fatal("get failed") + } + c.Close() + if err := c.Err(); err == nil { + t.Fatal("Err on closed connection did not return error") + } + if _, err := c.Do("PING"); err == nil { + t.Fatal("Do on closed connection did not return error") + } + if err := c.Send("PING"); err == nil { + t.Fatal("Send on closed connection did not return error") + } + if err := c.Flush(); err == nil { + t.Fatal("Flush on closed connection did not return error") + } + if _, err := c.Receive(); err == nil { + t.Fatal("Receive on closed connection did not return error") + } +} + +func TestPoolIdleTimeout(t *testing.T) { + d := poolDialer{t: t} + p := &redis.Pool{ + MaxIdle: 2, + IdleTimeout: 300 * time.Second, + Dial: d.dial, + } + defer p.Close() + + now := time.Now() + redis.SetNowFunc(func() time.Time { return now }) + defer redis.SetNowFunc(time.Now) + + c := p.Get() + c.Do("PING") + c.Close() + + d.check("1", p, 1, 1, 0) + + now = now.Add(p.IdleTimeout + 1) + + c = p.Get() + c.Do("PING") + c.Close() + + d.check("2", p, 2, 1, 0) +} + +func TestPoolMaxLifetime(t *testing.T) { + d := poolDialer{t: t} + p := &redis.Pool{ + MaxIdle: 2, + MaxConnLifetime: 300 * time.Second, + Dial: d.dial, + } + defer p.Close() + + now := time.Now() + redis.SetNowFunc(func() time.Time { return now }) + defer redis.SetNowFunc(time.Now) + + c := p.Get() + c.Do("PING") + c.Close() + + d.check("1", p, 1, 1, 0) + + now = now.Add(p.MaxConnLifetime + 1) + + c = p.Get() + c.Do("PING") + c.Close() + + d.check("2", p, 2, 1, 0) +} + +func TestPoolConcurrenSendReceive(t *testing.T) { + p := &redis.Pool{ + Dial: redis.DialDefaultServer, + } + defer p.Close() + + c := p.Get() + done := make(chan error, 1) + go func() { + _, err := c.Receive() + done <- err + }() + c.Send("PING") + c.Flush() + err := <-done + if err != nil { + t.Fatalf("Receive() returned error %v", err) + } + _, err = c.Do("") + if err != nil { + t.Fatalf("Do() returned error %v", err) + } + c.Close() +} + +func TestPoolBorrowCheck(t *testing.T) { + d := poolDialer{t: t} + p := &redis.Pool{ + MaxIdle: 2, + Dial: d.dial, + TestOnBorrow: func(redis.Conn, time.Time) error { return redis.Error("BLAH") }, + } + defer p.Close() + + for i := 0; i < 10; i++ { + c := p.Get() + c.Do("PING") + c.Close() + } + d.check("1", p, 10, 1, 0) +} + +func TestPoolMaxActive(t *testing.T) { + d := poolDialer{t: t} + p := &redis.Pool{ + MaxIdle: 2, + MaxActive: 2, + Dial: d.dial, + } + defer p.Close() + + c1 := p.Get() + c1.Do("PING") + c2 := p.Get() + c2.Do("PING") + + d.check("1", p, 2, 2, 2) + + c3 := p.Get() + if _, err := c3.Do("PING"); err != redis.ErrPoolExhausted { + t.Errorf("expected pool exhausted") + } + + c3.Close() + d.check("2", p, 2, 2, 2) + c2.Close() + d.check("3", p, 2, 2, 1) + + c3 = p.Get() + if _, err := c3.Do("PING"); err != nil { + t.Errorf("expected good channel, err=%v", err) + } + c3.Close() + + d.check("4", p, 2, 2, 1) +} + +func TestPoolMonitorCleanup(t *testing.T) { + d := poolDialer{t: t} + p := &redis.Pool{ + MaxIdle: 2, + MaxActive: 2, + Dial: d.dial, + } + defer p.Close() + + c := p.Get() + c.Send("MONITOR") + c.Close() + + d.check("", p, 1, 0, 0) +} + +func TestPoolPubSubCleanup(t *testing.T) { + d := poolDialer{t: t} + p := &redis.Pool{ + MaxIdle: 2, + MaxActive: 2, + Dial: d.dial, + } + defer p.Close() + + c := p.Get() + c.Send("SUBSCRIBE", "x") + c.Close() + + want := []string{"SUBSCRIBE", "UNSUBSCRIBE", "PUNSUBSCRIBE", "ECHO"} + if !reflect.DeepEqual(d.commands, want) { + t.Errorf("got commands %v, want %v", d.commands, want) + } + d.commands = nil + + c = p.Get() + c.Send("PSUBSCRIBE", "x*") + c.Close() + + want = []string{"PSUBSCRIBE", "UNSUBSCRIBE", "PUNSUBSCRIBE", "ECHO"} + if !reflect.DeepEqual(d.commands, want) { + t.Errorf("got commands %v, want %v", d.commands, want) + } + d.commands = nil +} + +func TestPoolTransactionCleanup(t *testing.T) { + d := poolDialer{t: t} + p := &redis.Pool{ + MaxIdle: 2, + MaxActive: 2, + Dial: d.dial, + } + defer p.Close() + + c := p.Get() + c.Do("WATCH", "key") + c.Do("PING") + c.Close() + + want := []string{"WATCH", "PING", "UNWATCH"} + if !reflect.DeepEqual(d.commands, want) { + t.Errorf("got commands %v, want %v", d.commands, want) + } + d.commands = nil + + c = p.Get() + c.Do("WATCH", "key") + c.Do("UNWATCH") + c.Do("PING") + c.Close() + + want = []string{"WATCH", "UNWATCH", "PING"} + if !reflect.DeepEqual(d.commands, want) { + t.Errorf("got commands %v, want %v", d.commands, want) + } + d.commands = nil + + c = p.Get() + c.Do("WATCH", "key") + c.Do("MULTI") + c.Do("PING") + c.Close() + + want = []string{"WATCH", "MULTI", "PING", "DISCARD"} + if !reflect.DeepEqual(d.commands, want) { + t.Errorf("got commands %v, want %v", d.commands, want) + } + d.commands = nil + + c = p.Get() + c.Do("WATCH", "key") + c.Do("MULTI") + c.Do("DISCARD") + c.Do("PING") + c.Close() + + want = []string{"WATCH", "MULTI", "DISCARD", "PING"} + if !reflect.DeepEqual(d.commands, want) { + t.Errorf("got commands %v, want %v", d.commands, want) + } + d.commands = nil + + c = p.Get() + c.Do("WATCH", "key") + c.Do("MULTI") + c.Do("EXEC") + c.Do("PING") + c.Close() + + want = []string{"WATCH", "MULTI", "EXEC", "PING"} + if !reflect.DeepEqual(d.commands, want) { + t.Errorf("got commands %v, want %v", d.commands, want) + } + d.commands = nil +} + +func startGoroutines(p *redis.Pool, cmd string, args ...interface{}) chan error { + errs := make(chan error, 10) + for i := 0; i < cap(errs); i++ { + go func() { + c := p.Get() + _, err := c.Do(cmd, args...) + c.Close() + errs <- err + }() + } + + return errs +} + +func TestWaitPool(t *testing.T) { + d := poolDialer{t: t} + p := &redis.Pool{ + MaxIdle: 1, + MaxActive: 1, + Dial: d.dial, + Wait: true, + } + defer p.Close() + + c := p.Get() + errs := startGoroutines(p, "PING") + d.check("before close", p, 1, 1, 1) + c.Close() + timeout := time.After(2 * time.Second) + for i := 0; i < cap(errs); i++ { + select { + case err := <-errs: + if err != nil { + t.Fatal(err) + } + case <-timeout: + t.Fatalf("timeout waiting for blocked goroutine %d", i) + } + } + d.check("done", p, 1, 1, 0) +} + +func TestWaitPoolClose(t *testing.T) { + d := poolDialer{t: t} + p := &redis.Pool{ + MaxIdle: 1, + MaxActive: 1, + Dial: d.dial, + Wait: true, + } + defer p.Close() + + c := p.Get() + if _, err := c.Do("PING"); err != nil { + t.Fatal(err) + } + errs := startGoroutines(p, "PING") + d.check("before close", p, 1, 1, 1) + p.Close() + timeout := time.After(2 * time.Second) + for i := 0; i < cap(errs); i++ { + select { + case err := <-errs: + switch err { + case nil: + t.Fatal("blocked goroutine did not get error") + case redis.ErrPoolExhausted: + t.Fatal("blocked goroutine got pool exhausted error") + } + case <-timeout: + t.Fatal("timeout waiting for blocked goroutine") + } + } + c.Close() + d.check("done", p, 1, 0, 0) +} + +func TestWaitPoolCommandError(t *testing.T) { + testErr := errors.New("test") + d := poolDialer{t: t} + p := &redis.Pool{ + MaxIdle: 1, + MaxActive: 1, + Dial: d.dial, + Wait: true, + } + defer p.Close() + + c := p.Get() + errs := startGoroutines(p, "ERR", testErr) + d.check("before close", p, 1, 1, 1) + c.Close() + timeout := time.After(2 * time.Second) + for i := 0; i < cap(errs); i++ { + select { + case err := <-errs: + if err != nil { + t.Fatal(err) + } + case <-timeout: + t.Fatalf("timeout waiting for blocked goroutine %d", i) + } + } + d.check("done", p, cap(errs), 0, 0) +} + +func TestWaitPoolDialError(t *testing.T) { + testErr := errors.New("test") + d := poolDialer{t: t} + p := &redis.Pool{ + MaxIdle: 1, + MaxActive: 1, + Dial: d.dial, + Wait: true, + } + defer p.Close() + + c := p.Get() + errs := startGoroutines(p, "ERR", testErr) + d.check("before close", p, 1, 1, 1) + + d.dialErr = errors.New("dial") + c.Close() + + nilCount := 0 + errCount := 0 + timeout := time.After(2 * time.Second) + for i := 0; i < cap(errs); i++ { + select { + case err := <-errs: + switch err { + case nil: + nilCount++ + case d.dialErr: + errCount++ + default: + t.Fatalf("expected dial error or nil, got %v", err) + } + case <-timeout: + t.Fatalf("timeout waiting for blocked goroutine %d", i) + } + } + if nilCount != 1 { + t.Errorf("expected one nil error, got %d", nilCount) + } + if errCount != cap(errs)-1 { + t.Errorf("expected %d dial errors, got %d", cap(errs)-1, errCount) + } + d.check("done", p, cap(errs), 0, 0) +} + +// Borrowing requires us to iterate over the idle connections, unlock the pool, +// and perform a blocking operation to check the connection still works. If +// TestOnBorrow fails, we must reacquire the lock and continue iteration. This +// test ensures that iteration will work correctly if multiple threads are +// iterating simultaneously. +func TestLocking_TestOnBorrowFails_PoolDoesntCrash(t *testing.T) { + const count = 100 + + // First we'll Create a pool where the pilfering of idle connections fails. + d := poolDialer{t: t} + p := &redis.Pool{ + MaxIdle: count, + MaxActive: count, + Dial: d.dial, + TestOnBorrow: func(c redis.Conn, t time.Time) error { + return errors.New("No way back into the real world.") + }, + } + defer p.Close() + + // Fill the pool with idle connections. + conns := make([]redis.Conn, count) + for i := range conns { + conns[i] = p.Get() + } + for i := range conns { + conns[i].Close() + } + + // Spawn a bunch of goroutines to thrash the pool. + var wg sync.WaitGroup + wg.Add(count) + for i := 0; i < count; i++ { + go func() { + c := p.Get() + if c.Err() != nil { + t.Errorf("pool get failed: %v", c.Err()) + } + c.Close() + wg.Done() + }() + } + wg.Wait() + if d.dialed != count*2 { + t.Errorf("Expected %d dials, got %d", count*2, d.dialed) + } +} + +func BenchmarkPoolGet(b *testing.B) { + b.StopTimer() + p := redis.Pool{Dial: redis.DialDefaultServer, MaxIdle: 2} + c := p.Get() + if err := c.Err(); err != nil { + b.Fatal(err) + } + c.Close() + defer p.Close() + b.StartTimer() + for i := 0; i < b.N; i++ { + c = p.Get() + c.Close() + } +} + +func BenchmarkPoolGetErr(b *testing.B) { + b.StopTimer() + p := redis.Pool{Dial: redis.DialDefaultServer, MaxIdle: 2} + c := p.Get() + if err := c.Err(); err != nil { + b.Fatal(err) + } + c.Close() + defer p.Close() + b.StartTimer() + for i := 0; i < b.N; i++ { + c = p.Get() + if err := c.Err(); err != nil { + b.Fatal(err) + } + c.Close() + } +} + +func BenchmarkPoolGetPing(b *testing.B) { + b.StopTimer() + p := redis.Pool{Dial: redis.DialDefaultServer, MaxIdle: 2} + c := p.Get() + if err := c.Err(); err != nil { + b.Fatal(err) + } + c.Close() + defer p.Close() + b.StartTimer() + for i := 0; i < b.N; i++ { + c = p.Get() + if _, err := c.Do("PING"); err != nil { + b.Fatal(err) + } + c.Close() + } +} diff --git a/vendor/github.com/garyburd/redigo/redis/pubsub.go b/vendor/github.com/gomodule/redigo/redis/pubsub.go similarity index 89% rename from vendor/github.com/garyburd/redigo/redis/pubsub.go rename to vendor/github.com/gomodule/redigo/redis/pubsub.go index f0ac8253..2da60211 100644 --- a/vendor/github.com/garyburd/redigo/redis/pubsub.go +++ b/vendor/github.com/gomodule/redigo/redis/pubsub.go @@ -36,18 +36,9 @@ type Message struct { // The originating channel. Channel string - // The message data. - Data []byte -} - -// PMessage represents a pmessage notification. -type PMessage struct { - // The matched pattern. + // The matched pattern, if any Pattern string - // The originating channel. - Channel string - // The message data. Data []byte } @@ -102,9 +93,9 @@ func (c PubSubConn) Ping(data string) error { return c.Conn.Flush() } -// Receive returns a pushed message as a Subscription, Message, PMessage, Pong -// or error. The return value is intended to be used directly in a type switch -// as illustrated in the PubSubConn example. +// Receive returns a pushed message as a Subscription, Message, Pong or error. +// The return value is intended to be used directly in a type switch as +// illustrated in the PubSubConn example. func (c PubSubConn) Receive() interface{} { return c.receiveInternal(c.Conn.Receive()) } @@ -135,11 +126,11 @@ func (c PubSubConn) receiveInternal(replyArg interface{}, errArg error) interfac } return m case "pmessage": - var pm PMessage - if _, err := Scan(reply, &pm.Pattern, &pm.Channel, &pm.Data); err != nil { + var m Message + if _, err := Scan(reply, &m.Pattern, &m.Channel, &m.Data); err != nil { return err } - return pm + return m case "subscribe", "psubscribe", "unsubscribe", "punsubscribe": s := Subscription{Kind: kind} if _, err := Scan(reply, &s.Channel, &s.Count); err != nil { diff --git a/vendor/github.com/gomodule/redigo/redis/pubsub_example_test.go b/vendor/github.com/gomodule/redigo/redis/pubsub_example_test.go new file mode 100644 index 00000000..39d0abf8 --- /dev/null +++ b/vendor/github.com/gomodule/redigo/redis/pubsub_example_test.go @@ -0,0 +1,165 @@ +// Copyright 2012 Gary Burd +// +// Licensed under the Apache License, Version 2.0 (the "License"): you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +// +build go1.7 + +package redis_test + +import ( + "context" + "fmt" + "time" + + "github.com/gomodule/redigo/redis" +) + +// listenPubSubChannels listens for messages on Redis pubsub channels. The +// onStart function is called after the channels are subscribed. The onMessage +// function is called for each message. +func listenPubSubChannels(ctx context.Context, redisServerAddr string, + onStart func() error, + onMessage func(channel string, data []byte) error, + channels ...string) error { + // A ping is set to the server with this period to test for the health of + // the connection and server. + const healthCheckPeriod = time.Minute + + c, err := redis.Dial("tcp", redisServerAddr, + // Read timeout on server should be greater than ping period. + redis.DialReadTimeout(healthCheckPeriod+10*time.Second), + redis.DialWriteTimeout(10*time.Second)) + if err != nil { + return err + } + defer c.Close() + + psc := redis.PubSubConn{Conn: c} + + if err := psc.Subscribe(redis.Args{}.AddFlat(channels)...); err != nil { + return err + } + + done := make(chan error, 1) + + // Start a goroutine to receive notifications from the server. + go func() { + for { + switch n := psc.Receive().(type) { + case error: + done <- n + return + case redis.Message: + if err := onMessage(n.Channel, n.Data); err != nil { + done <- err + return + } + case redis.Subscription: + switch n.Count { + case len(channels): + // Notify application when all channels are subscribed. + if err := onStart(); err != nil { + done <- err + return + } + case 0: + // Return from the goroutine when all channels are unsubscribed. + done <- nil + return + } + } + } + }() + + ticker := time.NewTicker(healthCheckPeriod) + defer ticker.Stop() +loop: + for err == nil { + select { + case <-ticker.C: + // Send ping to test health of connection and server. If + // corresponding pong is not received, then receive on the + // connection will timeout and the receive goroutine will exit. + if err = psc.Ping(""); err != nil { + break loop + } + case <-ctx.Done(): + break loop + case err := <-done: + // Return error from the receive goroutine. + return err + } + } + + // Signal the receiving goroutine to exit by unsubscribing from all channels. + psc.Unsubscribe() + + // Wait for goroutine to complete. + return <-done +} + +func publish() { + c, err := dial() + if err != nil { + fmt.Println(err) + return + } + defer c.Close() + + c.Do("PUBLISH", "c1", "hello") + c.Do("PUBLISH", "c2", "world") + c.Do("PUBLISH", "c1", "goodbye") +} + +// This example shows how receive pubsub notifications with cancelation and +// health checks. +func ExamplePubSubConn() { + redisServerAddr, err := serverAddr() + if err != nil { + fmt.Println(err) + return + } + + ctx, cancel := context.WithCancel(context.Background()) + + err = listenPubSubChannels(ctx, + redisServerAddr, + func() error { + // The start callback is a good place to backfill missed + // notifications. For the purpose of this example, a goroutine is + // started to send notifications. + go publish() + return nil + }, + func(channel string, message []byte) error { + fmt.Printf("channel: %s, message: %s\n", channel, message) + + // For the purpose of this example, cancel the listener's context + // after receiving last message sent by publish(). + if string(message) == "goodbye" { + cancel() + } + return nil + }, + "c1", "c2") + + if err != nil { + fmt.Println(err) + return + } + + // Output: + // channel: c1, message: hello + // channel: c2, message: world + // channel: c1, message: goodbye +} diff --git a/vendor/github.com/gomodule/redigo/redis/pubsub_test.go b/vendor/github.com/gomodule/redigo/redis/pubsub_test.go new file mode 100644 index 00000000..13f3f797 --- /dev/null +++ b/vendor/github.com/gomodule/redigo/redis/pubsub_test.go @@ -0,0 +1,74 @@ +// Copyright 2012 Gary Burd +// +// Licensed under the Apache License, Version 2.0 (the "License"): you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package redis_test + +import ( + "reflect" + "testing" + "time" + + "github.com/gomodule/redigo/redis" +) + +func expectPushed(t *testing.T, c redis.PubSubConn, message string, expected interface{}) { + actual := c.Receive() + if !reflect.DeepEqual(actual, expected) { + t.Errorf("%s = %v, want %v", message, actual, expected) + } +} + +func TestPushed(t *testing.T) { + pc, err := redis.DialDefaultServer() + if err != nil { + t.Fatalf("error connection to database, %v", err) + } + defer pc.Close() + + sc, err := redis.DialDefaultServer() + if err != nil { + t.Fatalf("error connection to database, %v", err) + } + defer sc.Close() + + c := redis.PubSubConn{Conn: sc} + + c.Subscribe("c1") + expectPushed(t, c, "Subscribe(c1)", redis.Subscription{Kind: "subscribe", Channel: "c1", Count: 1}) + c.Subscribe("c2") + expectPushed(t, c, "Subscribe(c2)", redis.Subscription{Kind: "subscribe", Channel: "c2", Count: 2}) + c.PSubscribe("p1") + expectPushed(t, c, "PSubscribe(p1)", redis.Subscription{Kind: "psubscribe", Channel: "p1", Count: 3}) + c.PSubscribe("p2") + expectPushed(t, c, "PSubscribe(p2)", redis.Subscription{Kind: "psubscribe", Channel: "p2", Count: 4}) + c.PUnsubscribe() + expectPushed(t, c, "Punsubscribe(p1)", redis.Subscription{Kind: "punsubscribe", Channel: "p1", Count: 3}) + expectPushed(t, c, "Punsubscribe()", redis.Subscription{Kind: "punsubscribe", Channel: "p2", Count: 2}) + + pc.Do("PUBLISH", "c1", "hello") + expectPushed(t, c, "PUBLISH c1 hello", redis.Message{Channel: "c1", Data: []byte("hello")}) + + c.Ping("hello") + expectPushed(t, c, `Ping("hello")`, redis.Pong{Data: "hello"}) + + c.Conn.Send("PING") + c.Conn.Flush() + expectPushed(t, c, `Send("PING")`, redis.Pong{}) + + c.Ping("timeout") + got := c.ReceiveWithTimeout(time.Minute) + if want := (redis.Pong{Data: "timeout"}); want != got { + t.Errorf("recv /w timeout got %v, want %v", got, want) + } +} diff --git a/vendor/github.com/garyburd/redigo/redis/redis.go b/vendor/github.com/gomodule/redigo/redis/redis.go similarity index 100% rename from vendor/github.com/garyburd/redigo/redis/redis.go rename to vendor/github.com/gomodule/redigo/redis/redis.go diff --git a/vendor/github.com/gomodule/redigo/redis/redis_test.go b/vendor/github.com/gomodule/redigo/redis/redis_test.go new file mode 100644 index 00000000..5a98f535 --- /dev/null +++ b/vendor/github.com/gomodule/redigo/redis/redis_test.go @@ -0,0 +1,71 @@ +// Copyright 2017 Gary Burd +// +// Licensed under the Apache License, Version 2.0 (the "License"): you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package redis_test + +import ( + "testing" + "time" + + "github.com/gomodule/redigo/redis" +) + +type timeoutTestConn int + +func (tc timeoutTestConn) Do(string, ...interface{}) (interface{}, error) { + return time.Duration(-1), nil +} +func (tc timeoutTestConn) DoWithTimeout(timeout time.Duration, cmd string, args ...interface{}) (interface{}, error) { + return timeout, nil +} + +func (tc timeoutTestConn) Receive() (interface{}, error) { + return time.Duration(-1), nil +} +func (tc timeoutTestConn) ReceiveWithTimeout(timeout time.Duration) (interface{}, error) { + return timeout, nil +} + +func (tc timeoutTestConn) Send(string, ...interface{}) error { return nil } +func (tc timeoutTestConn) Err() error { return nil } +func (tc timeoutTestConn) Close() error { return nil } +func (tc timeoutTestConn) Flush() error { return nil } + +func testTimeout(t *testing.T, c redis.Conn) { + r, err := c.Do("PING") + if r != time.Duration(-1) || err != nil { + t.Errorf("Do() = %v, %v, want %v, %v", r, err, time.Duration(-1), nil) + } + r, err = redis.DoWithTimeout(c, time.Minute, "PING") + if r != time.Minute || err != nil { + t.Errorf("DoWithTimeout() = %v, %v, want %v, %v", r, err, time.Minute, nil) + } + r, err = c.Receive() + if r != time.Duration(-1) || err != nil { + t.Errorf("Receive() = %v, %v, want %v, %v", r, err, time.Duration(-1), nil) + } + r, err = redis.ReceiveWithTimeout(c, time.Minute) + if r != time.Minute || err != nil { + t.Errorf("ReceiveWithTimeout() = %v, %v, want %v, %v", r, err, time.Minute, nil) + } +} + +func TestConnTimeout(t *testing.T) { + testTimeout(t, timeoutTestConn(0)) +} + +func TestPoolConnTimeout(t *testing.T) { + p := &redis.Pool{Dial: func() (redis.Conn, error) { return timeoutTestConn(0), nil }} + testTimeout(t, p.Get()) +} diff --git a/vendor/github.com/garyburd/redigo/redis/reply.go b/vendor/github.com/gomodule/redigo/redis/reply.go similarity index 100% rename from vendor/github.com/garyburd/redigo/redis/reply.go rename to vendor/github.com/gomodule/redigo/redis/reply.go diff --git a/vendor/github.com/gomodule/redigo/redis/reply_test.go b/vendor/github.com/gomodule/redigo/redis/reply_test.go new file mode 100644 index 00000000..c7f9b282 --- /dev/null +++ b/vendor/github.com/gomodule/redigo/redis/reply_test.go @@ -0,0 +1,209 @@ +// Copyright 2012 Gary Burd +// +// Licensed under the Apache License, Version 2.0 (the "License"): you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package redis_test + +import ( + "fmt" + "reflect" + "testing" + + "github.com/gomodule/redigo/redis" +) + +type valueError struct { + v interface{} + err error +} + +func ve(v interface{}, err error) valueError { + return valueError{v, err} +} + +var replyTests = []struct { + name interface{} + actual valueError + expected valueError +}{ + { + "ints([[]byte, []byte])", + ve(redis.Ints([]interface{}{[]byte("4"), []byte("5")}, nil)), + ve([]int{4, 5}, nil), + }, + { + "ints([nt64, int64])", + ve(redis.Ints([]interface{}{int64(4), int64(5)}, nil)), + ve([]int{4, 5}, nil), + }, + { + "ints([[]byte, nil, []byte])", + ve(redis.Ints([]interface{}{[]byte("4"), nil, []byte("5")}, nil)), + ve([]int{4, 0, 5}, nil), + }, + { + "ints(nil)", + ve(redis.Ints(nil, nil)), + ve([]int(nil), redis.ErrNil), + }, + { + "int64s([[]byte, []byte])", + ve(redis.Int64s([]interface{}{[]byte("4"), []byte("5")}, nil)), + ve([]int64{4, 5}, nil), + }, + { + "int64s([int64, int64])", + ve(redis.Int64s([]interface{}{int64(4), int64(5)}, nil)), + ve([]int64{4, 5}, nil), + }, + { + "strings([[]byte, []bytev2])", + ve(redis.Strings([]interface{}{[]byte("v1"), []byte("v2")}, nil)), + ve([]string{"v1", "v2"}, nil), + }, + { + "strings([string, string])", + ve(redis.Strings([]interface{}{"v1", "v2"}, nil)), + ve([]string{"v1", "v2"}, nil), + }, + { + "byteslices([v1, v2])", + ve(redis.ByteSlices([]interface{}{[]byte("v1"), []byte("v2")}, nil)), + ve([][]byte{[]byte("v1"), []byte("v2")}, nil), + }, + { + "float64s([v1, v2])", + ve(redis.Float64s([]interface{}{[]byte("1.234"), []byte("5.678")}, nil)), + ve([]float64{1.234, 5.678}, nil), + }, + { + "values([v1, v2])", + ve(redis.Values([]interface{}{[]byte("v1"), []byte("v2")}, nil)), + ve([]interface{}{[]byte("v1"), []byte("v2")}, nil), + }, + { + "values(nil)", + ve(redis.Values(nil, nil)), + ve([]interface{}(nil), redis.ErrNil), + }, + { + "float64(1.0)", + ve(redis.Float64([]byte("1.0"), nil)), + ve(float64(1.0), nil), + }, + { + "float64(nil)", + ve(redis.Float64(nil, nil)), + ve(float64(0.0), redis.ErrNil), + }, + { + "uint64(1)", + ve(redis.Uint64(int64(1), nil)), + ve(uint64(1), nil), + }, + { + "uint64(-1)", + ve(redis.Uint64(int64(-1), nil)), + ve(uint64(0), redis.ErrNegativeInt), + }, + { + "positions([[1, 2], nil, [3, 4]])", + ve(redis.Positions([]interface{}{[]interface{}{[]byte("1"), []byte("2")}, nil, []interface{}{[]byte("3"), []byte("4")}}, nil)), + ve([]*[2]float64{{1.0, 2.0}, nil, {3.0, 4.0}}, nil), + }, +} + +func TestReply(t *testing.T) { + for _, rt := range replyTests { + if rt.actual.err != rt.expected.err { + t.Errorf("%s returned err %v, want %v", rt.name, rt.actual.err, rt.expected.err) + continue + } + if !reflect.DeepEqual(rt.actual.v, rt.expected.v) { + t.Errorf("%s=%+v, want %+v", rt.name, rt.actual.v, rt.expected.v) + } + } +} + +// dial wraps DialDefaultServer() with a more suitable function name for examples. +func dial() (redis.Conn, error) { + return redis.DialDefaultServer() +} + +// serverAddr wraps DefaultServerAddr() with a more suitable function name for examples. +func serverAddr() (string, error) { + return redis.DefaultServerAddr() +} + +func ExampleBool() { + c, err := dial() + if err != nil { + fmt.Println(err) + return + } + defer c.Close() + + c.Do("SET", "foo", 1) + exists, _ := redis.Bool(c.Do("EXISTS", "foo")) + fmt.Printf("%#v\n", exists) + // Output: + // true +} + +func ExampleInt() { + c, err := dial() + if err != nil { + fmt.Println(err) + return + } + defer c.Close() + + c.Do("SET", "k1", 1) + n, _ := redis.Int(c.Do("GET", "k1")) + fmt.Printf("%#v\n", n) + n, _ = redis.Int(c.Do("INCR", "k1")) + fmt.Printf("%#v\n", n) + // Output: + // 1 + // 2 +} + +func ExampleInts() { + c, err := dial() + if err != nil { + fmt.Println(err) + return + } + defer c.Close() + + c.Do("SADD", "set_with_integers", 4, 5, 6) + ints, _ := redis.Ints(c.Do("SMEMBERS", "set_with_integers")) + fmt.Printf("%#v\n", ints) + // Output: + // []int{4, 5, 6} +} + +func ExampleString() { + c, err := dial() + if err != nil { + fmt.Println(err) + return + } + defer c.Close() + + c.Do("SET", "hello", "world") + s, err := redis.String(c.Do("GET", "hello")) + fmt.Printf("%#v\n", s) + // Output: + // "world" +} diff --git a/vendor/github.com/garyburd/redigo/redis/scan.go b/vendor/github.com/gomodule/redigo/redis/scan.go similarity index 100% rename from vendor/github.com/garyburd/redigo/redis/scan.go rename to vendor/github.com/gomodule/redigo/redis/scan.go diff --git a/vendor/github.com/gomodule/redigo/redis/scan_test.go b/vendor/github.com/gomodule/redigo/redis/scan_test.go new file mode 100644 index 00000000..6b566728 --- /dev/null +++ b/vendor/github.com/gomodule/redigo/redis/scan_test.go @@ -0,0 +1,494 @@ +// Copyright 2012 Gary Burd +// +// Licensed under the Apache License, Version 2.0 (the "License"): you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package redis_test + +import ( + "fmt" + "math" + "reflect" + "testing" + "time" + + "github.com/gomodule/redigo/redis" +) + +type durationScan struct { + time.Duration `redis:"sd"` +} + +func (t *durationScan) RedisScan(src interface{}) (err error) { + if t == nil { + return fmt.Errorf("nil pointer") + } + switch src := src.(type) { + case string: + t.Duration, err = time.ParseDuration(src) + case []byte: + t.Duration, err = time.ParseDuration(string(src)) + case int64: + t.Duration = time.Duration(src) + default: + err = fmt.Errorf("cannot convert from %T to %T", src, t) + } + return err +} + +var scanConversionTests = []struct { + src interface{} + dest interface{} +}{ + {[]byte("-inf"), math.Inf(-1)}, + {[]byte("+inf"), math.Inf(1)}, + {[]byte("0"), float64(0)}, + {[]byte("3.14159"), float64(3.14159)}, + {[]byte("3.14"), float32(3.14)}, + {[]byte("-100"), int(-100)}, + {[]byte("101"), int(101)}, + {int64(102), int(102)}, + {[]byte("103"), uint(103)}, + {int64(104), uint(104)}, + {[]byte("105"), int8(105)}, + {int64(106), int8(106)}, + {[]byte("107"), uint8(107)}, + {int64(108), uint8(108)}, + {[]byte("0"), false}, + {int64(0), false}, + {[]byte("f"), false}, + {[]byte("1"), true}, + {int64(1), true}, + {[]byte("t"), true}, + {"hello", "hello"}, + {[]byte("hello"), "hello"}, + {[]byte("world"), []byte("world")}, + {[]interface{}{[]byte("foo")}, []interface{}{[]byte("foo")}}, + {[]interface{}{[]byte("foo")}, []string{"foo"}}, + {[]interface{}{[]byte("hello"), []byte("world")}, []string{"hello", "world"}}, + {[]interface{}{[]byte("bar")}, [][]byte{[]byte("bar")}}, + {[]interface{}{[]byte("1")}, []int{1}}, + {[]interface{}{[]byte("1"), []byte("2")}, []int{1, 2}}, + {[]interface{}{[]byte("1"), []byte("2")}, []float64{1, 2}}, + {[]interface{}{[]byte("1")}, []byte{1}}, + {[]interface{}{[]byte("1")}, []bool{true}}, + {"1m", durationScan{Duration: time.Minute}}, + {[]byte("1m"), durationScan{Duration: time.Minute}}, + {time.Minute.Nanoseconds(), durationScan{Duration: time.Minute}}, + {[]interface{}{[]byte("1m")}, []durationScan{{Duration: time.Minute}}}, + {[]interface{}{[]byte("1m")}, []*durationScan{{Duration: time.Minute}}}, +} + +func TestScanConversion(t *testing.T) { + for _, tt := range scanConversionTests { + values := []interface{}{tt.src} + dest := reflect.New(reflect.TypeOf(tt.dest)) + values, err := redis.Scan(values, dest.Interface()) + if err != nil { + t.Errorf("Scan(%v) returned error %v", tt, err) + continue + } + if !reflect.DeepEqual(tt.dest, dest.Elem().Interface()) { + t.Errorf("Scan(%v) returned %v, want %v", tt, dest.Elem().Interface(), tt.dest) + } + } +} + +var scanConversionErrorTests = []struct { + src interface{} + dest interface{} +}{ + {[]byte("1234"), byte(0)}, + {int64(1234), byte(0)}, + {[]byte("-1"), byte(0)}, + {int64(-1), byte(0)}, + {[]byte("junk"), false}, + {redis.Error("blah"), false}, + {redis.Error("blah"), durationScan{Duration: time.Minute}}, + {"invalid", durationScan{Duration: time.Minute}}, +} + +func TestScanConversionError(t *testing.T) { + for _, tt := range scanConversionErrorTests { + values := []interface{}{tt.src} + dest := reflect.New(reflect.TypeOf(tt.dest)) + values, err := redis.Scan(values, dest.Interface()) + if err == nil { + t.Errorf("Scan(%v) did not return error", tt) + } + } +} + +func ExampleScan() { + c, err := dial() + if err != nil { + fmt.Println(err) + return + } + defer c.Close() + + c.Send("HMSET", "album:1", "title", "Red", "rating", 5) + c.Send("HMSET", "album:2", "title", "Earthbound", "rating", 1) + c.Send("HMSET", "album:3", "title", "Beat") + c.Send("LPUSH", "albums", "1") + c.Send("LPUSH", "albums", "2") + c.Send("LPUSH", "albums", "3") + values, err := redis.Values(c.Do("SORT", "albums", + "BY", "album:*->rating", + "GET", "album:*->title", + "GET", "album:*->rating")) + if err != nil { + fmt.Println(err) + return + } + + for len(values) > 0 { + var title string + rating := -1 // initialize to illegal value to detect nil. + values, err = redis.Scan(values, &title, &rating) + if err != nil { + fmt.Println(err) + return + } + if rating == -1 { + fmt.Println(title, "not-rated") + } else { + fmt.Println(title, rating) + } + } + // Output: + // Beat not-rated + // Earthbound 1 + // Red 5 +} + +type s0 struct { + X int + Y int `redis:"y"` + Bt bool +} + +type s1 struct { + X int `redis:"-"` + I int `redis:"i"` + U uint `redis:"u"` + S string `redis:"s"` + P []byte `redis:"p"` + B bool `redis:"b"` + Bt bool + Bf bool + s0 + Sd durationScan `redis:"sd"` + Sdp *durationScan `redis:"sdp"` +} + +var scanStructTests = []struct { + title string + reply []string + value interface{} +}{ + {"basic", + []string{ + "i", "-1234", + "u", "5678", + "s", "hello", + "p", "world", + "b", "t", + "Bt", "1", + "Bf", "0", + "X", "123", + "y", "456", + "sd", "1m", + "sdp", "1m", + }, + &s1{ + I: -1234, + U: 5678, + S: "hello", + P: []byte("world"), + B: true, + Bt: true, + Bf: false, + s0: s0{X: 123, Y: 456}, + Sd: durationScan{Duration: time.Minute}, + Sdp: &durationScan{Duration: time.Minute}, + }, + }, +} + +func TestScanStruct(t *testing.T) { + for _, tt := range scanStructTests { + + var reply []interface{} + for _, v := range tt.reply { + reply = append(reply, []byte(v)) + } + + value := reflect.New(reflect.ValueOf(tt.value).Type().Elem()) + + if err := redis.ScanStruct(reply, value.Interface()); err != nil { + t.Fatalf("ScanStruct(%s) returned error %v", tt.title, err) + } + + if !reflect.DeepEqual(value.Interface(), tt.value) { + t.Fatalf("ScanStruct(%s) returned %v, want %v", tt.title, value.Interface(), tt.value) + } + } +} + +func TestBadScanStructArgs(t *testing.T) { + x := []interface{}{"A", "b"} + test := func(v interface{}) { + if err := redis.ScanStruct(x, v); err == nil { + t.Errorf("Expect error for ScanStruct(%T, %T)", x, v) + } + } + + test(nil) + + var v0 *struct{} + test(v0) + + var v1 int + test(&v1) + + x = x[:1] + v2 := struct{ A string }{} + test(&v2) +} + +var scanSliceTests = []struct { + src []interface{} + fieldNames []string + ok bool + dest interface{} +}{ + { + []interface{}{[]byte("1"), nil, []byte("-1")}, + nil, + true, + []int{1, 0, -1}, + }, + { + []interface{}{[]byte("1"), nil, []byte("2")}, + nil, + true, + []uint{1, 0, 2}, + }, + { + []interface{}{[]byte("-1")}, + nil, + false, + []uint{1}, + }, + { + []interface{}{[]byte("hello"), nil, []byte("world")}, + nil, + true, + [][]byte{[]byte("hello"), nil, []byte("world")}, + }, + { + []interface{}{[]byte("hello"), nil, []byte("world")}, + nil, + true, + []string{"hello", "", "world"}, + }, + { + []interface{}{[]byte("a1"), []byte("b1"), []byte("a2"), []byte("b2")}, + nil, + true, + []struct{ A, B string }{{"a1", "b1"}, {"a2", "b2"}}, + }, + { + []interface{}{[]byte("a1"), []byte("b1")}, + nil, + false, + []struct{ A, B, C string }{{"a1", "b1", ""}}, + }, + { + []interface{}{[]byte("a1"), []byte("b1"), []byte("a2"), []byte("b2")}, + nil, + true, + []*struct{ A, B string }{{A: "a1", B: "b1"}, {A: "a2", B: "b2"}}, + }, + { + []interface{}{[]byte("a1"), []byte("b1"), []byte("a2"), []byte("b2")}, + []string{"A", "B"}, + true, + []struct{ A, C, B string }{{"a1", "", "b1"}, {"a2", "", "b2"}}, + }, + { + []interface{}{[]byte("a1"), []byte("b1"), []byte("a2"), []byte("b2")}, + nil, + false, + []struct{}{}, + }, +} + +func TestScanSlice(t *testing.T) { + for _, tt := range scanSliceTests { + + typ := reflect.ValueOf(tt.dest).Type() + dest := reflect.New(typ) + + err := redis.ScanSlice(tt.src, dest.Interface(), tt.fieldNames...) + if tt.ok != (err == nil) { + t.Errorf("ScanSlice(%v, []%s, %v) returned error %v", tt.src, typ, tt.fieldNames, err) + continue + } + if tt.ok && !reflect.DeepEqual(dest.Elem().Interface(), tt.dest) { + t.Errorf("ScanSlice(src, []%s) returned %#v, want %#v", typ, dest.Elem().Interface(), tt.dest) + } + } +} + +func ExampleScanSlice() { + c, err := dial() + if err != nil { + fmt.Println(err) + return + } + defer c.Close() + + c.Send("HMSET", "album:1", "title", "Red", "rating", 5) + c.Send("HMSET", "album:2", "title", "Earthbound", "rating", 1) + c.Send("HMSET", "album:3", "title", "Beat", "rating", 4) + c.Send("LPUSH", "albums", "1") + c.Send("LPUSH", "albums", "2") + c.Send("LPUSH", "albums", "3") + values, err := redis.Values(c.Do("SORT", "albums", + "BY", "album:*->rating", + "GET", "album:*->title", + "GET", "album:*->rating")) + if err != nil { + fmt.Println(err) + return + } + + var albums []struct { + Title string + Rating int + } + if err := redis.ScanSlice(values, &albums); err != nil { + fmt.Println(err) + return + } + fmt.Printf("%v\n", albums) + // Output: + // [{Earthbound 1} {Beat 4} {Red 5}] +} + +var argsTests = []struct { + title string + actual redis.Args + expected redis.Args +}{ + {"struct ptr", + redis.Args{}.AddFlat(&struct { + I int `redis:"i"` + U uint `redis:"u"` + S string `redis:"s"` + P []byte `redis:"p"` + M map[string]string `redis:"m"` + Bt bool + Bf bool + }{ + -1234, 5678, "hello", []byte("world"), map[string]string{"hello": "world"}, true, false, + }), + redis.Args{"i", int(-1234), "u", uint(5678), "s", "hello", "p", []byte("world"), "m", map[string]string{"hello": "world"}, "Bt", true, "Bf", false}, + }, + {"struct", + redis.Args{}.AddFlat(struct{ I int }{123}), + redis.Args{"I", 123}, + }, + {"slice", + redis.Args{}.Add(1).AddFlat([]string{"a", "b", "c"}).Add(2), + redis.Args{1, "a", "b", "c", 2}, + }, + {"struct omitempty", + redis.Args{}.AddFlat(&struct { + I int `redis:"i,omitempty"` + U uint `redis:"u,omitempty"` + S string `redis:"s,omitempty"` + P []byte `redis:"p,omitempty"` + M map[string]string `redis:"m,omitempty"` + Bt bool `redis:"Bt,omitempty"` + Bf bool `redis:"Bf,omitempty"` + }{ + 0, 0, "", []byte{}, map[string]string{}, true, false, + }), + redis.Args{"Bt", true}, + }, +} + +func TestArgs(t *testing.T) { + for _, tt := range argsTests { + if !reflect.DeepEqual(tt.actual, tt.expected) { + t.Fatalf("%s is %v, want %v", tt.title, tt.actual, tt.expected) + } + } +} + +func ExampleArgs() { + c, err := dial() + if err != nil { + fmt.Println(err) + return + } + defer c.Close() + + var p1, p2 struct { + Title string `redis:"title"` + Author string `redis:"author"` + Body string `redis:"body"` + } + + p1.Title = "Example" + p1.Author = "Gary" + p1.Body = "Hello" + + if _, err := c.Do("HMSET", redis.Args{}.Add("id1").AddFlat(&p1)...); err != nil { + fmt.Println(err) + return + } + + m := map[string]string{ + "title": "Example2", + "author": "Steve", + "body": "Map", + } + + if _, err := c.Do("HMSET", redis.Args{}.Add("id2").AddFlat(m)...); err != nil { + fmt.Println(err) + return + } + + for _, id := range []string{"id1", "id2"} { + + v, err := redis.Values(c.Do("HGETALL", id)) + if err != nil { + fmt.Println(err) + return + } + + if err := redis.ScanStruct(v, &p2); err != nil { + fmt.Println(err) + return + } + + fmt.Printf("%+v\n", p2) + } + + // Output: + // {Title:Example Author:Gary Body:Hello} + // {Title:Example2 Author:Steve Body:Map} +} diff --git a/vendor/github.com/garyburd/redigo/redis/script.go b/vendor/github.com/gomodule/redigo/redis/script.go similarity index 100% rename from vendor/github.com/garyburd/redigo/redis/script.go rename to vendor/github.com/gomodule/redigo/redis/script.go diff --git a/vendor/github.com/gomodule/redigo/redis/script_test.go b/vendor/github.com/gomodule/redigo/redis/script_test.go new file mode 100644 index 00000000..388e167f --- /dev/null +++ b/vendor/github.com/gomodule/redigo/redis/script_test.go @@ -0,0 +1,100 @@ +// Copyright 2012 Gary Burd +// +// Licensed under the Apache License, Version 2.0 (the "License"): you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package redis_test + +import ( + "fmt" + "reflect" + "testing" + "time" + + "github.com/gomodule/redigo/redis" +) + +var ( + // These variables are declared at package level to remove distracting + // details from the examples. + c redis.Conn + reply interface{} + err error +) + +func ExampleScript() { + // Initialize a package-level variable with a script. + var getScript = redis.NewScript(1, `return redis.call('get', KEYS[1])`) + + // In a function, use the script Do method to evaluate the script. The Do + // method optimistically uses the EVALSHA command. If the script is not + // loaded, then the Do method falls back to the EVAL command. + reply, err = getScript.Do(c, "foo") +} + +func TestScript(t *testing.T) { + c, err := redis.DialDefaultServer() + if err != nil { + t.Fatalf("error connection to database, %v", err) + } + defer c.Close() + + // To test fall back in Do, we make script unique by adding comment with current time. + script := fmt.Sprintf("--%d\nreturn {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}", time.Now().UnixNano()) + s := redis.NewScript(2, script) + reply := []interface{}{[]byte("key1"), []byte("key2"), []byte("arg1"), []byte("arg2")} + + v, err := s.Do(c, "key1", "key2", "arg1", "arg2") + if err != nil { + t.Errorf("s.Do(c, ...) returned %v", err) + } + + if !reflect.DeepEqual(v, reply) { + t.Errorf("s.Do(c, ..); = %v, want %v", v, reply) + } + + err = s.Load(c) + if err != nil { + t.Errorf("s.Load(c) returned %v", err) + } + + err = s.SendHash(c, "key1", "key2", "arg1", "arg2") + if err != nil { + t.Errorf("s.SendHash(c, ...) returned %v", err) + } + + err = c.Flush() + if err != nil { + t.Errorf("c.Flush() returned %v", err) + } + + v, err = c.Receive() + if !reflect.DeepEqual(v, reply) { + t.Errorf("s.SendHash(c, ..); c.Receive() = %v, want %v", v, reply) + } + + err = s.Send(c, "key1", "key2", "arg1", "arg2") + if err != nil { + t.Errorf("s.Send(c, ...) returned %v", err) + } + + err = c.Flush() + if err != nil { + t.Errorf("c.Flush() returned %v", err) + } + + v, err = c.Receive() + if !reflect.DeepEqual(v, reply) { + t.Errorf("s.Send(c, ..); c.Receive() = %v, want %v", v, reply) + } + +} diff --git a/vendor/github.com/gomodule/redigo/redis/test_test.go b/vendor/github.com/gomodule/redigo/redis/test_test.go new file mode 100644 index 00000000..5c758486 --- /dev/null +++ b/vendor/github.com/gomodule/redigo/redis/test_test.go @@ -0,0 +1,183 @@ +// Copyright 2012 Gary Burd +// +// Licensed under the Apache License, Version 2.0 (the "License"): you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package redis + +import ( + "bufio" + "errors" + "flag" + "fmt" + "io" + "io/ioutil" + "os" + "os/exec" + "strconv" + "strings" + "sync" + "testing" + "time" +) + +func SetNowFunc(f func() time.Time) { + nowFunc = f +} + +var ( + ErrNegativeInt = errNegativeInt + + serverPath = flag.String("redis-server", "redis-server", "Path to redis server binary") + serverAddress = flag.String("redis-address", "127.0.0.1", "The address of the server") + serverBasePort = flag.Int("redis-port", 16379, "Beginning of port range for test servers") + serverLogName = flag.String("redis-log", "", "Write Redis server logs to `filename`") + serverLog = ioutil.Discard + + defaultServerMu sync.Mutex + defaultServer *Server + defaultServerErr error +) + +type Server struct { + name string + cmd *exec.Cmd + done chan struct{} +} + +func NewServer(name string, args ...string) (*Server, error) { + s := &Server{ + name: name, + cmd: exec.Command(*serverPath, args...), + done: make(chan struct{}), + } + + r, err := s.cmd.StdoutPipe() + if err != nil { + return nil, err + } + + err = s.cmd.Start() + if err != nil { + return nil, err + } + + ready := make(chan error, 1) + go s.watch(r, ready) + + select { + case err = <-ready: + case <-time.After(time.Second * 10): + err = errors.New("timeout waiting for server to start") + } + + if err != nil { + s.Stop() + return nil, err + } + + return s, nil +} + +func (s *Server) watch(r io.Reader, ready chan error) { + fmt.Fprintf(serverLog, "%d START %s \n", s.cmd.Process.Pid, s.name) + var listening bool + var text string + scn := bufio.NewScanner(r) + for scn.Scan() { + text = scn.Text() + fmt.Fprintf(serverLog, "%s\n", text) + if !listening { + if strings.Contains(text, " * Ready to accept connections") || + strings.Contains(text, " * The server is now ready to accept connections on port") { + listening = true + ready <- nil + } + } + } + if !listening { + ready <- fmt.Errorf("server exited: %s", text) + } + s.cmd.Wait() + fmt.Fprintf(serverLog, "%d STOP %s \n", s.cmd.Process.Pid, s.name) + close(s.done) +} + +func (s *Server) Stop() { + s.cmd.Process.Signal(os.Interrupt) + <-s.done +} + +// stopDefaultServer stops the server created by DialDefaultServer. +func stopDefaultServer() { + defaultServerMu.Lock() + defer defaultServerMu.Unlock() + if defaultServer != nil { + defaultServer.Stop() + defaultServer = nil + } +} + +// DefaultServerAddr starts the test server if not already started and returns +// the address of that server. +func DefaultServerAddr() (string, error) { + defaultServerMu.Lock() + defer defaultServerMu.Unlock() + addr := fmt.Sprintf("%v:%d", *serverAddress, *serverBasePort) + if defaultServer != nil || defaultServerErr != nil { + return addr, defaultServerErr + } + defaultServer, defaultServerErr = NewServer( + "default", + "--port", strconv.Itoa(*serverBasePort), + "--bind", *serverAddress, + "--save", "", + "--appendonly", "no") + return addr, defaultServerErr +} + +// DialDefaultServer starts the test server if not already started and dials a +// connection to the server. +func DialDefaultServer() (Conn, error) { + addr, err := DefaultServerAddr() + if err != nil { + return nil, err + } + c, err := Dial("tcp", addr, DialReadTimeout(1*time.Second), DialWriteTimeout(1*time.Second)) + if err != nil { + return nil, err + } + c.Do("FLUSHDB") + return c, nil +} + +func TestMain(m *testing.M) { + os.Exit(func() int { + flag.Parse() + + var f *os.File + if *serverLogName != "" { + var err error + f, err = os.OpenFile(*serverLogName, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0600) + if err != nil { + fmt.Fprintf(os.Stderr, "Error opening redis-log: %v\n", err) + return 1 + } + defer f.Close() + serverLog = f + } + + defer stopDefaultServer() + + return m.Run() + }()) +} diff --git a/vendor/github.com/gomodule/redigo/redis/zpop_example_test.go b/vendor/github.com/gomodule/redigo/redis/zpop_example_test.go new file mode 100644 index 00000000..f7702e03 --- /dev/null +++ b/vendor/github.com/gomodule/redigo/redis/zpop_example_test.go @@ -0,0 +1,114 @@ +// Copyright 2013 Gary Burd +// +// Licensed under the Apache License, Version 2.0 (the "License"): you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package redis_test + +import ( + "fmt" + + "github.com/gomodule/redigo/redis" +) + +// zpop pops a value from the ZSET key using WATCH/MULTI/EXEC commands. +func zpop(c redis.Conn, key string) (result string, err error) { + + defer func() { + // Return connection to normal state on error. + if err != nil { + c.Do("DISCARD") + } + }() + + // Loop until transaction is successful. + for { + if _, err := c.Do("WATCH", key); err != nil { + return "", err + } + + members, err := redis.Strings(c.Do("ZRANGE", key, 0, 0)) + if err != nil { + return "", err + } + if len(members) != 1 { + return "", redis.ErrNil + } + + c.Send("MULTI") + c.Send("ZREM", key, members[0]) + queued, err := c.Do("EXEC") + if err != nil { + return "", err + } + + if queued != nil { + result = members[0] + break + } + } + + return result, nil +} + +// zpopScript pops a value from a ZSET. +var zpopScript = redis.NewScript(1, ` + local r = redis.call('ZRANGE', KEYS[1], 0, 0) + if r ~= nil then + r = r[1] + redis.call('ZREM', KEYS[1], r) + end + return r +`) + +// This example implements ZPOP as described at +// http://redis.io/topics/transactions using WATCH/MULTI/EXEC and scripting. +func Example_zpop() { + c, err := dial() + if err != nil { + fmt.Println(err) + return + } + defer c.Close() + + // Add test data using a pipeline. + + for i, member := range []string{"red", "blue", "green"} { + c.Send("ZADD", "zset", i, member) + } + if _, err := c.Do(""); err != nil { + fmt.Println(err) + return + } + + // Pop using WATCH/MULTI/EXEC + + v, err := zpop(c, "zset") + if err != nil { + fmt.Println(err) + return + } + fmt.Println(v) + + // Pop using a script. + + v, err = redis.String(zpopScript.Do(c, "zset")) + if err != nil { + fmt.Println(err) + return + } + fmt.Println(v) + + // Output: + // red + // blue +} diff --git a/vendor/github.com/gomodule/redigo/redisx/connmux.go b/vendor/github.com/gomodule/redigo/redisx/connmux.go new file mode 100644 index 00000000..1c305c75 --- /dev/null +++ b/vendor/github.com/gomodule/redigo/redisx/connmux.go @@ -0,0 +1,152 @@ +// Copyright 2014 Gary Burd +// +// Licensed under the Apache License, Version 2.0 (the "License"): you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package redisx + +import ( + "errors" + "sync" + + "github.com/gomodule/redigo/internal" + "github.com/gomodule/redigo/redis" +) + +// ConnMux multiplexes one or more connections to a single underlying +// connection. The ConnMux connections do not support concurrency, commands +// that associate server side state with the connection or commands that put +// the connection in a special mode. +type ConnMux struct { + c redis.Conn + + sendMu sync.Mutex + sendID uint + + recvMu sync.Mutex + recvID uint + recvWait map[uint]chan struct{} +} + +func NewConnMux(c redis.Conn) *ConnMux { + return &ConnMux{c: c, recvWait: make(map[uint]chan struct{})} +} + +// Get gets a connection. The application must close the returned connection. +func (p *ConnMux) Get() redis.Conn { + c := &muxConn{p: p} + c.ids = c.buf[:0] + return c +} + +// Close closes the underlying connection. +func (p *ConnMux) Close() error { + return p.c.Close() +} + +type muxConn struct { + p *ConnMux + ids []uint + buf [8]uint +} + +func (c *muxConn) send(flush bool, cmd string, args ...interface{}) error { + if internal.LookupCommandInfo(cmd).Set != 0 { + return errors.New("command not supported by mux pool") + } + p := c.p + p.sendMu.Lock() + id := p.sendID + c.ids = append(c.ids, id) + p.sendID++ + err := p.c.Send(cmd, args...) + if flush { + err = p.c.Flush() + } + p.sendMu.Unlock() + return err +} + +func (c *muxConn) Send(cmd string, args ...interface{}) error { + return c.send(false, cmd, args...) +} + +func (c *muxConn) Flush() error { + p := c.p + p.sendMu.Lock() + err := p.c.Flush() + p.sendMu.Unlock() + return err +} + +func (c *muxConn) Receive() (interface{}, error) { + if len(c.ids) == 0 { + return nil, errors.New("mux pool underflow") + } + + id := c.ids[0] + c.ids = c.ids[1:] + if len(c.ids) == 0 { + c.ids = c.buf[:0] + } + + p := c.p + p.recvMu.Lock() + if p.recvID != id { + ch := make(chan struct{}) + p.recvWait[id] = ch + p.recvMu.Unlock() + <-ch + p.recvMu.Lock() + if p.recvID != id { + panic("out of sync") + } + } + + v, err := p.c.Receive() + + id++ + p.recvID = id + ch, ok := p.recvWait[id] + if ok { + delete(p.recvWait, id) + } + p.recvMu.Unlock() + if ok { + ch <- struct{}{} + } + + return v, err +} + +func (c *muxConn) Close() error { + var err error + if len(c.ids) == 0 { + return nil + } + c.Flush() + for _ = range c.ids { + _, err = c.Receive() + } + return err +} + +func (c *muxConn) Do(cmd string, args ...interface{}) (interface{}, error) { + if err := c.send(true, cmd, args...); err != nil { + return nil, err + } + return c.Receive() +} + +func (c *muxConn) Err() error { + return c.p.c.Err() +} diff --git a/vendor/github.com/gomodule/redigo/redisx/connmux_test.go b/vendor/github.com/gomodule/redigo/redisx/connmux_test.go new file mode 100644 index 00000000..86908f64 --- /dev/null +++ b/vendor/github.com/gomodule/redigo/redisx/connmux_test.go @@ -0,0 +1,259 @@ +// Copyright 2014 Gary Burd +// +// Licensed under the Apache License, Version 2.0 (the "License"): you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +package redisx_test + +import ( + "net/textproto" + "sync" + "testing" + + "github.com/gomodule/redigo/internal/redistest" + "github.com/gomodule/redigo/redis" + "github.com/gomodule/redigo/redisx" +) + +func TestConnMux(t *testing.T) { + c, err := redistest.Dial() + if err != nil { + t.Fatalf("error connection to database, %v", err) + } + m := redisx.NewConnMux(c) + defer m.Close() + + c1 := m.Get() + c2 := m.Get() + c1.Send("ECHO", "hello") + c2.Send("ECHO", "world") + c1.Flush() + c2.Flush() + s, err := redis.String(c1.Receive()) + if err != nil { + t.Fatal(err) + } + if s != "hello" { + t.Fatalf("echo returned %q, want %q", s, "hello") + } + s, err = redis.String(c2.Receive()) + if err != nil { + t.Fatal(err) + } + if s != "world" { + t.Fatalf("echo returned %q, want %q", s, "world") + } + c1.Close() + c2.Close() +} + +func TestConnMuxClose(t *testing.T) { + c, err := redistest.Dial() + if err != nil { + t.Fatalf("error connection to database, %v", err) + } + m := redisx.NewConnMux(c) + defer m.Close() + + c1 := m.Get() + c2 := m.Get() + + if err := c1.Send("ECHO", "hello"); err != nil { + t.Fatal(err) + } + if err := c1.Close(); err != nil { + t.Fatal(err) + } + + if err := c2.Send("ECHO", "world"); err != nil { + t.Fatal(err) + } + if err := c2.Flush(); err != nil { + t.Fatal(err) + } + + s, err := redis.String(c2.Receive()) + if err != nil { + t.Fatal(err) + } + if s != "world" { + t.Fatalf("echo returned %q, want %q", s, "world") + } + c2.Close() +} + +func BenchmarkConn(b *testing.B) { + b.StopTimer() + c, err := redistest.Dial() + if err != nil { + b.Fatalf("error connection to database, %v", err) + } + defer c.Close() + b.StartTimer() + + for i := 0; i < b.N; i++ { + if _, err := c.Do("PING"); err != nil { + b.Fatal(err) + } + } +} + +func BenchmarkConnMux(b *testing.B) { + b.StopTimer() + c, err := redistest.Dial() + if err != nil { + b.Fatalf("error connection to database, %v", err) + } + m := redisx.NewConnMux(c) + defer m.Close() + + b.StartTimer() + + for i := 0; i < b.N; i++ { + c := m.Get() + if _, err := c.Do("PING"); err != nil { + b.Fatal(err) + } + c.Close() + } +} + +func BenchmarkPool(b *testing.B) { + b.StopTimer() + + p := redis.Pool{Dial: redistest.Dial, MaxIdle: 1} + defer p.Close() + + // Fill the pool. + c := p.Get() + if err := c.Err(); err != nil { + b.Fatal(err) + } + c.Close() + + b.StartTimer() + + for i := 0; i < b.N; i++ { + c := p.Get() + if _, err := c.Do("PING"); err != nil { + b.Fatal(err) + } + c.Close() + } +} + +const numConcurrent = 10 + +func BenchmarkConnMuxConcurrent(b *testing.B) { + b.StopTimer() + c, err := redistest.Dial() + if err != nil { + b.Fatalf("error connection to database, %v", err) + } + defer c.Close() + + m := redisx.NewConnMux(c) + + var wg sync.WaitGroup + wg.Add(numConcurrent) + + b.StartTimer() + + for i := 0; i < numConcurrent; i++ { + go func() { + defer wg.Done() + for i := 0; i < b.N; i++ { + c := m.Get() + if _, err := c.Do("PING"); err != nil { + b.Fatal(err) + } + c.Close() + } + }() + } + wg.Wait() +} + +func BenchmarkPoolConcurrent(b *testing.B) { + b.StopTimer() + + p := redis.Pool{Dial: redistest.Dial, MaxIdle: numConcurrent} + defer p.Close() + + // Fill the pool. + conns := make([]redis.Conn, numConcurrent) + for i := range conns { + c := p.Get() + if err := c.Err(); err != nil { + b.Fatal(err) + } + conns[i] = c + } + for _, c := range conns { + c.Close() + } + + var wg sync.WaitGroup + wg.Add(numConcurrent) + + b.StartTimer() + + for i := 0; i < numConcurrent; i++ { + go func() { + defer wg.Done() + for i := 0; i < b.N; i++ { + c := p.Get() + if _, err := c.Do("PING"); err != nil { + b.Fatal(err) + } + c.Close() + } + }() + } + wg.Wait() +} + +func BenchmarkPipelineConcurrency(b *testing.B) { + b.StopTimer() + c, err := redistest.Dial() + if err != nil { + b.Fatalf("error connection to database, %v", err) + } + defer c.Close() + + var wg sync.WaitGroup + wg.Add(numConcurrent) + + var pipeline textproto.Pipeline + + b.StartTimer() + + for i := 0; i < numConcurrent; i++ { + go func() { + defer wg.Done() + for i := 0; i < b.N; i++ { + id := pipeline.Next() + pipeline.StartRequest(id) + c.Send("PING") + c.Flush() + pipeline.EndRequest(id) + pipeline.StartResponse(id) + _, err := c.Receive() + if err != nil { + b.Fatal(err) + } + pipeline.EndResponse(id) + } + }() + } + wg.Wait() +} diff --git a/vendor/github.com/gomodule/redigo/redisx/doc.go b/vendor/github.com/gomodule/redigo/redisx/doc.go new file mode 100644 index 00000000..04af3116 --- /dev/null +++ b/vendor/github.com/gomodule/redigo/redisx/doc.go @@ -0,0 +1,17 @@ +// Copyright 2012 Gary Burd +// +// Licensed under the Apache License, Version 2.0 (the "License"): you may +// not use this file except in compliance with the License. You may obtain +// a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +// License for the specific language governing permissions and limitations +// under the License. + +// Package redisx contains experimental features for Redigo. Features in this +// package may be modified or deleted at any time. +package redisx // import "github.com/gomodule/redigo/redisx"