From ea7efdf7bfb948e7bc1e4fa0cc9871cb65c94caa Mon Sep 17 00:00:00 2001 From: Janos Guljas Date: Thu, 2 May 2019 23:34:35 +0200 Subject: [PATCH] Initial commit --- .travis.yml | 13 +++ LICENSE | 27 ++++++ README.md | 12 +++ go.mod | 5 + go.sum | 2 + singleflight.go | 102 ++++++++++++++++++++ singleflight_test.go | 215 +++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 376 insertions(+) create mode 100644 .travis.yml create mode 100644 LICENSE create mode 100644 README.md create mode 100644 go.mod create mode 100644 go.sum create mode 100644 singleflight.go create mode 100644 singleflight_test.go diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..5ecb172 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,13 @@ +language: go + +go: + - 1.12.4 + +install: + - go get -v resenje.org/singleflight/... + +before_script: + - go vet -v resenje.org/singleflight/... + +script: + - go test -v -race resenje.org/singleflight/... \ No newline at end of file diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..9e291b5 --- /dev/null +++ b/LICENSE @@ -0,0 +1,27 @@ +Copyright (c) 2019, Janoš Guljaš +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +* Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +* Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +* Neither the name of this project nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..e4dee4e --- /dev/null +++ b/README.md @@ -0,0 +1,12 @@ +# Singleflight + +[![GoDoc](https://godoc.org/resenje.org/singleflight?status.svg)](https://godoc.org/resenje.org/singleflight) +[![Build Status](https://travis-ci.org/janos/singleflight.svg?branch=master)](https://travis-ci.org/janos/singleflight) + +Package singleflight provides a duplicate function call suppression +mechanism similar to golang.org/x/sync/singleflight but with support +for context cancelation. + +## Installation + +Run `go get resenje.org/singleflight` from command line. \ No newline at end of file diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..d8c43e3 --- /dev/null +++ b/go.mod @@ -0,0 +1,5 @@ +module resenje.org/singleflight + +go 1.12 + +require golang.org/x/sync v0.0.0-20190423024810-112230192c58 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..6eae930 --- /dev/null +++ b/go.sum @@ -0,0 +1,2 @@ +golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEhagSSnXWGV915qUMm9mrU= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= diff --git a/singleflight.go b/singleflight.go new file mode 100644 index 0000000..0e7d31c --- /dev/null +++ b/singleflight.go @@ -0,0 +1,102 @@ +// Copyright (c) 2019, Janoš Guljaš +// All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package singleflight provides a duplicate function call suppression +// mechanism similar to golang.org/x/sync/singleflight with support +// for context cancelation. +package singleflight + +import ( + "context" + "sync" +) + +// Group represents a class of work and forms a namespace in +// which units of work can be executed with duplicate suppression. +type Group struct { + calls map[string]*call // lazily initialized + mu sync.Mutex // protects calls +} + +// Do executes and returns the results of the given function, making +// sure that only one execution is in-flight for a given key at a +// time. If a duplicate comes in, the duplicate caller waits for the +// original to complete and receives the same results. +// Passed context terminates the execution of Do function, not the passed +// function fn. If there are multiple callers, context passed to one caller +// does not effect the execution and returned values of others. +// The return value shared indicates whether v was given to multiple callers. +func (g *Group) Do(ctx context.Context, key string, fn func() (interface{}, error)) (v interface{}, shared bool, err error) { + g.mu.Lock() + if g.calls == nil { + g.calls = make(map[string]*call) + } + + if c, ok := g.calls[key]; ok { + c.shared = true + g.mu.Unlock() + + return g.wait(ctx, key, c) + } + + c := &call{ + done: make(chan struct{}), + } + g.calls[key] = c + g.mu.Unlock() + + go func() { + c.val, c.err = fn() + close(c.done) + }() + + return g.wait(ctx, key, c) +} + +// wait for function passed to Do to finish or context to be done. +func (g *Group) wait(ctx context.Context, key string, c *call) (v interface{}, shared bool, err error) { + select { + case <-c.done: + v = c.val + err = c.err + case <-ctx.Done(): + err = ctx.Err() + } + g.mu.Lock() + if !c.forgotten { + delete(g.calls, key) + } + g.mu.Unlock() + return v, c.shared, err +} + +// Forget tells the singleflight to forget about a key. Future calls +// to Do for this key will call the function rather than waiting for +// an earlier call to complete. +func (g *Group) Forget(key string) { + g.mu.Lock() + if c, ok := g.calls[key]; ok { + c.forgotten = true + } + delete(g.calls, key) + g.mu.Unlock() +} + +// call stores information about as single function call passed to Do function. +type call struct { + // val and err hold the state about results of the function call. + val interface{} + err error + + // done channel signals that the function call is done. + done chan struct{} + + // forgotten indicates whether Forget was called with this call's key + // while the call was still in flight. + forgotten bool + + // shared indicates if results val and err are passed to multiple callers. + shared bool +} diff --git a/singleflight_test.go b/singleflight_test.go new file mode 100644 index 0000000..0eea72c --- /dev/null +++ b/singleflight_test.go @@ -0,0 +1,215 @@ +// Copyright (c) 2019, Janoš Guljaš +// All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package singleflight_test + +import ( + "context" + "errors" + "strconv" + "sync" + "sync/atomic" + "testing" + "time" + + "resenje.org/singleflight" +) + +func TestDo(t *testing.T) { + var g singleflight.Group + + want := "val" + got, shared, err := g.Do(context.Background(), "key", func() (interface{}, error) { + return want, nil + }) + if err != nil { + t.Fatal(err) + } + if shared { + t.Error("the value should not be shared") + } + if got != want { + t.Errorf("got value %v, want %v", got, want) + } +} + +func TestDo_error(t *testing.T) { + var g singleflight.Group + wantErr := errors.New("test error") + got, _, err := g.Do(context.Background(), "key", func() (interface{}, error) { + return nil, wantErr + }) + if err != wantErr { + t.Errorf("got error %v, want %v", err, wantErr) + } + if got != nil { + t.Errorf("unexpected value %#v", got) + } +} + +func TestDo_multipleCalls(t *testing.T) { + var g singleflight.Group + + want := "val" + var counter int32 + + n := 10 + got := make([]interface{}, n) + shared := make([]bool, n) + err := make([]error, n) + var wg sync.WaitGroup + wg.Add(n) + for i := 0; i < n; i++ { + go func(i int) { + defer wg.Done() + got[i], shared[i], err[i] = g.Do(context.Background(), "key", func() (interface{}, error) { + atomic.AddInt32(&counter, 1) + time.Sleep(100 * time.Millisecond) + return want, nil + }) + }(i) + } + wg.Wait() + + if got := atomic.LoadInt32(&counter); got != 1 { + t.Errorf("function called %v times, should only once", got) + } + + for i := 0; i < n; i++ { + if err[i] != nil { + t.Errorf("call %v: unexpected error: %v", i, err[i]) + } + if !shared[i] { + t.Errorf("call %v: the value should be shared", i) + } + if got[i] != want { + t.Errorf("call %v: got value %v, want %v", i, got[i], want) + } + } +} + +func TestDo_callRemoval(t *testing.T) { + var g singleflight.Group + + wantPrefix := "val" + counter := 0 + fn := func() (interface{}, error) { + counter++ + return wantPrefix + strconv.Itoa(counter), nil + } + + got, shared, err := g.Do(context.Background(), "key", fn) + if err != nil { + t.Fatal(err) + } + if shared { + t.Error("the value should not be shared") + } + if want := wantPrefix + "1"; got != want { + t.Errorf("got value %v, want %v", got, want) + } + + got, shared, err = g.Do(context.Background(), "key", fn) + if err != nil { + t.Fatal(err) + } + if shared { + t.Error("the value should not be shared") + } + if want := wantPrefix + "2"; got != want { + t.Errorf("got value %v, want %v", got, want) + } +} + +func TestDo_cancelContext(t *testing.T) { + var g singleflight.Group + + want := "val" + ctx, cancel := context.WithCancel(context.Background()) + go func() { + time.Sleep(100 * time.Millisecond) + cancel() + }() + start := time.Now() + got, shared, err := g.Do(ctx, "key", func() (interface{}, error) { + time.Sleep(time.Second) + return want, nil + }) + if d := time.Since(start); d < 100*time.Microsecond || d > time.Second { + t.Errorf("unexpected Do call duration %s", d) + } + if want := context.Canceled; err != want { + t.Errorf("got error %v, want %v", err, want) + } + if shared { + t.Error("the value should not be shared") + } + if got != nil { + t.Errorf("unexpected value %#v", got) + } +} + +func TestDo_cancelContextSecond(t *testing.T) { + var g singleflight.Group + + want := "val" + fn := func() (interface{}, error) { + time.Sleep(time.Second) + return want, nil + } + go g.Do(context.Background(), "key", fn) + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + time.Sleep(100 * time.Millisecond) + cancel() + }() + start := time.Now() + got, shared, err := g.Do(ctx, "key", fn) + if d := time.Since(start); d < 100*time.Microsecond || d > time.Second { + t.Errorf("unexpected Do call duration %s", d) + } + if want := context.Canceled; err != want { + t.Errorf("got error %v, want %v", err, want) + } + if !shared { + t.Error("the value should be shared") + } + if got != nil { + t.Errorf("unexpected value %#v", got) + } +} + +func TestForget(t *testing.T) { + var g singleflight.Group + + wantPrefix := "val" + var counter uint64 + firstCall := make(chan struct{}) + fn := func() (interface{}, error) { + c := atomic.AddUint64(&counter, 1) + if c == 1 { + close(firstCall) + time.Sleep(time.Second) + } + return wantPrefix + strconv.FormatUint(c, 10), nil + } + + go g.Do(context.Background(), "key", fn) + + <-firstCall + g.Forget("key") + + got, shared, err := g.Do(context.Background(), "key", fn) + if err != nil { + t.Fatal(err) + } + if shared { + t.Error("the value should not be shared") + } + if want := wantPrefix + "2"; got != want { + t.Errorf("got value %v, want %v", got, want) + } +}