From d32831f42f38780b718ad80deaae4177166e6bcf Mon Sep 17 00:00:00 2001 From: Oleg Vasilev Date: Fri, 26 Apr 2024 19:06:17 +0300 Subject: [PATCH 01/21] import multierrgroup as-is Signed-off-by: Oleg Vasilev --- pkg/util/multierrgroup/LICENSE | 21 +++++++ pkg/util/multierrgroup/multierrgroup.go | 59 ++++++++++++++++++++ pkg/util/multierrgroup/multierrgroup_test.go | 58 +++++++++++++++++++ 3 files changed, 138 insertions(+) create mode 100644 pkg/util/multierrgroup/LICENSE create mode 100644 pkg/util/multierrgroup/multierrgroup.go create mode 100644 pkg/util/multierrgroup/multierrgroup_test.go diff --git a/pkg/util/multierrgroup/LICENSE b/pkg/util/multierrgroup/LICENSE new file mode 100644 index 000000000..d8839129b --- /dev/null +++ b/pkg/util/multierrgroup/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2021 Peter Kristensen + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/pkg/util/multierrgroup/multierrgroup.go b/pkg/util/multierrgroup/multierrgroup.go new file mode 100644 index 000000000..d20a4aced --- /dev/null +++ b/pkg/util/multierrgroup/multierrgroup.go @@ -0,0 +1,59 @@ +// Package multierrgroup provides a mix of multierr and errgroup +// See documentation for https://pkg.go.dev/go.uber.org/multierr and https://pkg.go.dev/golang.org/x/sync/errgroup +package multierrgroup + +import ( + "context" + "sync" + + "go.uber.org/multierr" +) + +// Group manages goroutines and collect all the errors. +// See https://pkg.go.dev/golang.org/x/sync/errgroup#Group for more information +type Group struct { + cancel context.CancelFunc + + wg sync.WaitGroup + + errMutex sync.Mutex + err error +} + +// WithContext returns a new Group with a associated Context. +// The context will be canceled if any goroutine returns an error. +// See https://pkg.go.dev/golang.org/x/sync/errgroup#WithContext +func WithContext(ctx context.Context) (*Group, context.Context) { + ctx, cancel := context.WithCancel(ctx) + return &Group{cancel: cancel}, ctx +} + +// Wait blocks until all goroutines have completed. +// +// All errors returned from the goroutines will be combined into one using multierr and returned from this method. +func (g *Group) Wait() error { + g.wg.Wait() + if g.cancel != nil { + g.cancel() + } + return g.err +} + +// Go calls the function in a new goroutine. +// If a non-nil errors is returned, the context is canceled and +// the error is collected using multierr and will be returned by Wait. +func (g *Group) Go(f func() error) { + g.wg.Add(1) + + go func() { + defer g.wg.Done() + if err := f(); err != nil { + g.errMutex.Lock() + g.err = multierr.Append(g.err, err) + g.errMutex.Unlock() + if g.cancel != nil { + g.cancel() + } + } + }() +} diff --git a/pkg/util/multierrgroup/multierrgroup_test.go b/pkg/util/multierrgroup/multierrgroup_test.go new file mode 100644 index 000000000..23f19ec28 --- /dev/null +++ b/pkg/util/multierrgroup/multierrgroup_test.go @@ -0,0 +1,58 @@ +package multierrgroup_test + +import ( + "context" + "errors" + "fmt" + "testing" + + "go.ptx.dk/multierrgroup" + "go.uber.org/multierr" +) + +func ExampleGroup() { + g := multierrgroup.Group{} + g.Go(func() error { + return errors.New("error 1") + }) + g.Go(func() error { + return errors.New("error 2") + }) + err := g.Wait() + // Using golang.org/x/sync/errgroup would return a return error depending on which goroutine was scheduled first + + errs := multierr.Errors(err) + fmt.Println("Got", len(errs), "errors") + // Output: Got 2 errors +} + +func TestWithContext(t *testing.T) { + err1 := errors.New("error 1") + err2 := errors.New("error 2") + + g, ctx := multierrgroup.WithContext(context.Background()) + g.Go(func() error { + return err1 + }) + g.Go(func() error { + return err2 + }) + err := g.Wait() + if !errors.Is(err, err1) { + t.Errorf("error: %s should be: %s", err, err1) + } + if !errors.Is(err, err2) { + t.Errorf("error: %s should be: %s", err, err2) + } + canceled := false + select { + case <-ctx.Done(): + canceled = true + default: + + } + if !canceled { + t.Errorf("context should have been canceled!") + } + +} From 7b6e5c1fe356343ba10d39e224ba7e2dbda27be1 Mon Sep 17 00:00:00 2001 From: Oleg Vasilev Date: Fri, 26 Apr 2024 19:08:16 +0300 Subject: [PATCH 02/21] rename Signed-off-by: Oleg Vasilev --- pkg/util/{multierrgroup => taskgroup}/LICENSE | 0 .../multierrgroup.go => taskgroup/taskgroup.go} | 2 +- .../multierrgroup_test.go => taskgroup/taskgroup_test.go} | 8 ++++---- 3 files changed, 5 insertions(+), 5 deletions(-) rename pkg/util/{multierrgroup => taskgroup}/LICENSE (100%) rename pkg/util/{multierrgroup/multierrgroup.go => taskgroup/taskgroup.go} (98%) rename pkg/util/{multierrgroup/multierrgroup_test.go => taskgroup/taskgroup_test.go} (85%) diff --git a/pkg/util/multierrgroup/LICENSE b/pkg/util/taskgroup/LICENSE similarity index 100% rename from pkg/util/multierrgroup/LICENSE rename to pkg/util/taskgroup/LICENSE diff --git a/pkg/util/multierrgroup/multierrgroup.go b/pkg/util/taskgroup/taskgroup.go similarity index 98% rename from pkg/util/multierrgroup/multierrgroup.go rename to pkg/util/taskgroup/taskgroup.go index d20a4aced..d1fca1804 100644 --- a/pkg/util/multierrgroup/multierrgroup.go +++ b/pkg/util/taskgroup/taskgroup.go @@ -1,6 +1,6 @@ // Package multierrgroup provides a mix of multierr and errgroup // See documentation for https://pkg.go.dev/go.uber.org/multierr and https://pkg.go.dev/golang.org/x/sync/errgroup -package multierrgroup +package taskgroup import ( "context" diff --git a/pkg/util/multierrgroup/multierrgroup_test.go b/pkg/util/taskgroup/taskgroup_test.go similarity index 85% rename from pkg/util/multierrgroup/multierrgroup_test.go rename to pkg/util/taskgroup/taskgroup_test.go index 23f19ec28..7c2bc4e78 100644 --- a/pkg/util/multierrgroup/multierrgroup_test.go +++ b/pkg/util/taskgroup/taskgroup_test.go @@ -1,4 +1,4 @@ -package multierrgroup_test +package taskgroup_test import ( "context" @@ -6,12 +6,12 @@ import ( "fmt" "testing" - "go.ptx.dk/multierrgroup" + "github.com/neondatabase/autoscaling/pkg/util/taskgroup" "go.uber.org/multierr" ) func ExampleGroup() { - g := multierrgroup.Group{} + g := taskgroup.Group{} g.Go(func() error { return errors.New("error 1") }) @@ -30,7 +30,7 @@ func TestWithContext(t *testing.T) { err1 := errors.New("error 1") err2 := errors.New("error 2") - g, ctx := multierrgroup.WithContext(context.Background()) + g, ctx := taskgroup.WithContext(context.Background()) g.Go(func() error { return err1 }) From 4b3d8e1d55f171656f69822237b3720a973cae32 Mon Sep 17 00:00:00 2001 From: Oleg Vasilev Date: Fri, 26 Apr 2024 19:23:25 +0300 Subject: [PATCH 03/21] change interface Signed-off-by: Oleg Vasilev --- pkg/util/taskgroup/taskgroup.go | 21 ++++++++++++++++----- pkg/util/taskgroup/taskgroup_test.go | 16 +++++++++------- 2 files changed, 25 insertions(+), 12 deletions(-) diff --git a/pkg/util/taskgroup/taskgroup.go b/pkg/util/taskgroup/taskgroup.go index d1fca1804..681ba56ea 100644 --- a/pkg/util/taskgroup/taskgroup.go +++ b/pkg/util/taskgroup/taskgroup.go @@ -4,15 +4,18 @@ package taskgroup import ( "context" + "fmt" "sync" "go.uber.org/multierr" + "go.uber.org/zap" ) // Group manages goroutines and collect all the errors. // See https://pkg.go.dev/golang.org/x/sync/errgroup#Group for more information type Group struct { cancel context.CancelFunc + logger *zap.Logger wg sync.WaitGroup @@ -20,12 +23,17 @@ type Group struct { err error } +// NewGroup returns a new Group. +func NewGroup(logger *zap.Logger) *Group { + return &Group{logger: logger} +} + // WithContext returns a new Group with a associated Context. // The context will be canceled if any goroutine returns an error. // See https://pkg.go.dev/golang.org/x/sync/errgroup#WithContext -func WithContext(ctx context.Context) (*Group, context.Context) { - ctx, cancel := context.WithCancel(ctx) - return &Group{cancel: cancel}, ctx +func (g *Group) WithContext(ctx context.Context) context.Context { + ctx, g.cancel = context.WithCancel(ctx) + return ctx } // Wait blocks until all goroutines have completed. @@ -42,15 +50,18 @@ func (g *Group) Wait() error { // Go calls the function in a new goroutine. // If a non-nil errors is returned, the context is canceled and // the error is collected using multierr and will be returned by Wait. -func (g *Group) Go(f func() error) { +func (g *Group) Go(name string, f func(logger *zap.Logger) error) { g.wg.Add(1) go func() { defer g.wg.Done() - if err := f(); err != nil { + logger := g.logger.Named(name) + if err := f(logger); err != nil { + err = fmt.Errorf("task %s failed: %w", name, err) g.errMutex.Lock() g.err = multierr.Append(g.err, err) g.errMutex.Unlock() + logger.Error(err.Error()) if g.cancel != nil { g.cancel() } diff --git a/pkg/util/taskgroup/taskgroup_test.go b/pkg/util/taskgroup/taskgroup_test.go index 7c2bc4e78..29dc707cb 100644 --- a/pkg/util/taskgroup/taskgroup_test.go +++ b/pkg/util/taskgroup/taskgroup_test.go @@ -8,14 +8,15 @@ import ( "github.com/neondatabase/autoscaling/pkg/util/taskgroup" "go.uber.org/multierr" + "go.uber.org/zap" ) func ExampleGroup() { - g := taskgroup.Group{} - g.Go(func() error { + g := taskgroup.NewGroup(zap.NewNop()) + g.Go("task1", func(_ *zap.Logger) error { return errors.New("error 1") }) - g.Go(func() error { + g.Go("task2", func(_ *zap.Logger) error { return errors.New("error 2") }) err := g.Wait() @@ -29,12 +30,14 @@ func ExampleGroup() { func TestWithContext(t *testing.T) { err1 := errors.New("error 1") err2 := errors.New("error 2") + log := zap.NewNop() - g, ctx := taskgroup.WithContext(context.Background()) - g.Go(func() error { + g := taskgroup.NewGroup(log) + ctx := g.WithContext(context.Background()) + g.Go("task1", func(_ *zap.Logger) error { return err1 }) - g.Go(func() error { + g.Go("task2", func(_ *zap.Logger) error { return err2 }) err := g.Wait() @@ -54,5 +57,4 @@ func TestWithContext(t *testing.T) { if !canceled { t.Errorf("context should have been canceled!") } - } From 5e848d87e26190b156e040dd87343d82c6a855e8 Mon Sep 17 00:00:00 2001 From: Oleg Vasilev Date: Fri, 26 Apr 2024 19:25:42 +0300 Subject: [PATCH 04/21] hide default ctor Signed-off-by: Oleg Vasilev --- pkg/util/taskgroup/taskgroup.go | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/pkg/util/taskgroup/taskgroup.go b/pkg/util/taskgroup/taskgroup.go index 681ba56ea..b2a49ec20 100644 --- a/pkg/util/taskgroup/taskgroup.go +++ b/pkg/util/taskgroup/taskgroup.go @@ -12,8 +12,14 @@ import ( ) // Group manages goroutines and collect all the errors. -// See https://pkg.go.dev/golang.org/x/sync/errgroup#Group for more information -type Group struct { +// See https://pkg.go.dev/golang.org/x/sync/errgroup#group for more information +type Group interface { + WithContext(ctx context.Context) context.Context + Wait() error + Go(name string, f func(logger *zap.Logger) error) +} + +type group struct { cancel context.CancelFunc logger *zap.Logger @@ -24,14 +30,14 @@ type Group struct { } // NewGroup returns a new Group. -func NewGroup(logger *zap.Logger) *Group { - return &Group{logger: logger} +func NewGroup(logger *zap.Logger) Group { + return &group{logger: logger} } // WithContext returns a new Group with a associated Context. // The context will be canceled if any goroutine returns an error. // See https://pkg.go.dev/golang.org/x/sync/errgroup#WithContext -func (g *Group) WithContext(ctx context.Context) context.Context { +func (g *group) WithContext(ctx context.Context) context.Context { ctx, g.cancel = context.WithCancel(ctx) return ctx } @@ -39,7 +45,7 @@ func (g *Group) WithContext(ctx context.Context) context.Context { // Wait blocks until all goroutines have completed. // // All errors returned from the goroutines will be combined into one using multierr and returned from this method. -func (g *Group) Wait() error { +func (g *group) Wait() error { g.wg.Wait() if g.cancel != nil { g.cancel() @@ -50,7 +56,7 @@ func (g *Group) Wait() error { // Go calls the function in a new goroutine. // If a non-nil errors is returned, the context is canceled and // the error is collected using multierr and will be returned by Wait. -func (g *Group) Go(name string, f func(logger *zap.Logger) error) { +func (g *group) Go(name string, f func(logger *zap.Logger) error) { g.wg.Add(1) go func() { From 0dc2112c6eb0909a01818173cca68859aeb169d8 Mon Sep 17 00:00:00 2001 From: Oleg Vasilev Date: Fri, 26 Apr 2024 19:42:29 +0300 Subject: [PATCH 05/21] change runner into taskgroup Signed-off-by: Oleg Vasilev --- neonvm/runner/main.go | 46 +++++++++++-------------------------------- 1 file changed, 12 insertions(+), 34 deletions(-) diff --git a/neonvm/runner/main.go b/neonvm/runner/main.go index 49764d31b..a50964902 100644 --- a/neonvm/runner/main.go +++ b/neonvm/runner/main.go @@ -38,12 +38,12 @@ import ( "github.com/opencontainers/runtime-spec/specs-go" "github.com/vishvananda/netlink" "go.uber.org/zap" - "golang.org/x/sync/errgroup" "k8s.io/apimachinery/pkg/api/resource" vmv1 "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1" "github.com/neondatabase/autoscaling/pkg/api" + "github.com/neondatabase/autoscaling/pkg/util/taskgroup" ) const ( @@ -696,26 +696,13 @@ func run(logger *zap.Logger) error { } } - // Helper function to both log an error and return the result of wrapping it with the message. - // - // This is used below because (*errgroup.Group).Wait() returns at most 1 error, which means - // imprecise usage can hide errors. So, we log the errors before they would be returned, to make - // sure they're always visible. - logWrap := func(msg string, err error) error { - logger.Error(msg, zap.Error(err)) - return fmt.Errorf("%s: %w", msg, err) - } - - eg := &errgroup.Group{} - eg.Go(func() error { - if err := runInitScript(logger, vmSpec.InitScript); err != nil { - return logWrap("failed to run init script", err) - } - return nil + tg := taskgroup.NewGroup(logger) + tg.Go("init-script", func(logger *zap.Logger) error { + return runInitScript(logger, vmSpec.InitScript) }) - eg.Go(func() error { - if err := createISO9660runtime( + tg.Go("iso9660-runtime", func(logger *zap.Logger) error { + return createISO9660runtime( runtimeDiskPath, vmSpec.Guest.Command, vmSpec.Guest.Args, @@ -725,31 +712,22 @@ func run(logger *zap.Logger) error { enableSSH, swapInfo, shmSize, - ); err != nil { - return logWrap("failed to create iso9660 disk", err) - } - return nil + ) }) - eg.Go(func() error { + tg.Go("rootDisk", func(logger *zap.Logger) error { // resize rootDisk image of size specified and new size more than current - if err := resizeRootDisk(logger, vmSpec); err != nil { - return logWrap("failed to resize rootDisk", err) - } - return nil + return resizeRootDisk(logger, vmSpec) }) var qemuCmd []string - eg.Go(func() error { + tg.Go("qemu-cmd", func(logger *zap.Logger) error { var err error qemuCmd, err = buildQEMUCmd(cfg, logger, vmSpec, &vmStatus, cpus, memory, enableSSH, swapInfo) - if err != nil { - return logWrap("failed to build QEMU command", err) - } - return nil + return err }) - if err := eg.Wait(); err != nil { + if err := tg.Wait(); err != nil { return err } From 9bcda717b2a01bb4f038d6bc05f4874f54d2e17e Mon Sep 17 00:00:00 2001 From: Oleg Vasilev Date: Fri, 26 Apr 2024 19:42:43 +0300 Subject: [PATCH 06/21] fix linter Signed-off-by: Oleg Vasilev --- pkg/util/taskgroup/taskgroup.go | 9 ++++++++- pkg/util/taskgroup/taskgroup_test.go | 3 ++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/pkg/util/taskgroup/taskgroup.go b/pkg/util/taskgroup/taskgroup.go index b2a49ec20..a282e9dd4 100644 --- a/pkg/util/taskgroup/taskgroup.go +++ b/pkg/util/taskgroup/taskgroup.go @@ -31,7 +31,14 @@ type group struct { // NewGroup returns a new Group. func NewGroup(logger *zap.Logger) Group { - return &group{logger: logger} + return &group{ + cancel: nil, // Set separately by WithContext + logger: logger, + wg: sync.WaitGroup{}, + + errMutex: sync.Mutex{}, + err: nil, + } } // WithContext returns a new Group with a associated Context. diff --git a/pkg/util/taskgroup/taskgroup_test.go b/pkg/util/taskgroup/taskgroup_test.go index 29dc707cb..67c371552 100644 --- a/pkg/util/taskgroup/taskgroup_test.go +++ b/pkg/util/taskgroup/taskgroup_test.go @@ -6,9 +6,10 @@ import ( "fmt" "testing" - "github.com/neondatabase/autoscaling/pkg/util/taskgroup" "go.uber.org/multierr" "go.uber.org/zap" + + "github.com/neondatabase/autoscaling/pkg/util/taskgroup" ) func ExampleGroup() { From 38694c23cd284efab5c895c558e789c6473ddc76 Mon Sep 17 00:00:00 2001 From: Oleg Vasilev Date: Fri, 26 Apr 2024 19:46:30 +0300 Subject: [PATCH 07/21] use taskgroup in the agent Signed-off-by: Oleg Vasilev --- pkg/agent/billing/billing.go | 7 +++---- pkg/agent/entrypoint.go | 37 +++++++++++++++++++++--------------- 2 files changed, 25 insertions(+), 19 deletions(-) diff --git a/pkg/agent/billing/billing.go b/pkg/agent/billing/billing.go index ee3393c5b..93d4d41da 100644 --- a/pkg/agent/billing/billing.go +++ b/pkg/agent/billing/billing.go @@ -86,7 +86,7 @@ func RunBillingMetricsCollector( conf *Config, store VMStoreForNode, metrics PromMetrics, -) { +) error { var clients []clientInfo if c := conf.Clients.HTTP; c != nil { @@ -142,15 +142,14 @@ func RunBillingMetricsCollector( case <-collectTicker.C: logger.Info("Collecting billing state") if store.Stopped() && backgroundCtx.Err() == nil { - err := errors.New("VM store stopped but background context is still live") - logger.Panic("Validation check failed", zap.Error(err)) + return errors.New("VM store stopped but background context is still live") } state.collect(logger, store, metrics) case <-accumulateTicker.C: logger.Info("Creating billing batch") state.drainEnqueue(logger, conf, billing.GetHostname(), queueWriters) case <-backgroundCtx.Done(): - return + return nil } } } diff --git a/pkg/agent/entrypoint.go b/pkg/agent/entrypoint.go index 39a4302b3..3db226334 100644 --- a/pkg/agent/entrypoint.go +++ b/pkg/agent/entrypoint.go @@ -13,6 +13,7 @@ import ( "github.com/neondatabase/autoscaling/pkg/agent/billing" "github.com/neondatabase/autoscaling/pkg/agent/schedwatch" "github.com/neondatabase/autoscaling/pkg/util" + "github.com/neondatabase/autoscaling/pkg/util/taskgroup" "github.com/neondatabase/autoscaling/pkg/util/watch" ) @@ -59,9 +60,6 @@ func (r MainRunner) Run(logger *zap.Logger, ctx context.Context) error { metrics := billing.NewPromMetrics() metrics.MustRegister(globalPromReg) - // TODO: catch panics here, bubble those into a clean-ish shutdown. - go billing.RunBillingMetricsCollector(ctx, logger, &r.Config.Billing, storeForNode, metrics) - promLogger := logger.Named("prometheus") if err := util.StartPrometheusMetricsServer(ctx, promLogger.Named("global"), 9100, globalPromReg); err != nil { return fmt.Errorf("Error starting prometheus metrics server: %w", err) @@ -77,18 +75,27 @@ func (r MainRunner) Run(logger *zap.Logger, ctx context.Context) error { } } - logger.Info("Entering main loop") - for { - event, err := vmEventQueue.Wait(ctx) - if err != nil { - if ctx.Err() != nil { - // treat context canceled as a "normal" exit (because it is) - return nil + tg := taskgroup.NewGroup(logger) + ctx = tg.WithContext(ctx) + tg.Go("watch-metrics", func(logger *zap.Logger) error { + return billing.RunBillingMetricsCollector(ctx, logger, &r.Config.Billing, storeForNode, metrics) + }) + tg.Go("main-loop", func(logger *zap.Logger) error { + logger.Info("Entering main loop") + for { + event, err := vmEventQueue.Wait(ctx) + if err != nil { + if ctx.Err() != nil { + // treat context canceled as a "normal" exit (because it is) + return nil + } + + logger.Error("vmEventQueue returned error", zap.Error(err)) + return err } - - logger.Error("vmEventQueue returned error", zap.Error(err)) - return err + globalState.handleEvent(ctx, logger, event) } - globalState.handleEvent(ctx, logger, event) - } + }) + + return tg.Wait() } From 1602695c69b47baf422f86ee6156dd7197640197 Mon Sep 17 00:00:00 2001 From: Oleg Vasilev Date: Sun, 5 May 2024 21:17:27 +0300 Subject: [PATCH 08/21] go mod tidy Signed-off-by: Oleg Vasilev --- go.mod | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go.mod b/go.mod index ba947d616..86c4faccb 100644 --- a/go.mod +++ b/go.mod @@ -62,10 +62,10 @@ require ( github.com/stretchr/testify v1.8.1 github.com/tychoish/fun v0.8.5 github.com/vishvananda/netlink v1.1.1-0.20220125195016-0639e7e787ba + go.uber.org/multierr v1.9.0 go.uber.org/zap v1.24.0 golang.org/x/crypto v0.21.0 golang.org/x/exp v0.0.0-20230425010034-47ecfdc1ba53 - golang.org/x/sync v0.1.0 golang.org/x/term v0.18.0 gopkg.in/yaml.v3 v3.0.1 k8s.io/api v0.25.16 @@ -169,10 +169,10 @@ require ( go.opentelemetry.io/otel/trace v0.20.0 // indirect go.opentelemetry.io/proto/otlp v0.7.0 // indirect go.uber.org/atomic v1.10.0 // indirect - go.uber.org/multierr v1.9.0 // indirect golang.org/x/mod v0.8.0 // indirect golang.org/x/net v0.23.0 // indirect golang.org/x/oauth2 v0.7.0 // indirect + golang.org/x/sync v0.1.0 // indirect golang.org/x/sys v0.18.0 // indirect golang.org/x/text v0.14.0 // indirect golang.org/x/time v0.0.0-20220609170525-579cf78fd858 // indirect From 2c9ee716b2e8843a8dc561212fdc04d652cb1b8a Mon Sep 17 00:00:00 2001 From: Oleg Vasilev Date: Tue, 21 May 2024 17:25:12 +0300 Subject: [PATCH 09/21] add panic handler Signed-off-by: Oleg Vasilev --- pkg/util/taskgroup/LICENSE | 2 ++ pkg/util/taskgroup/taskgroup.go | 35 ++++++++++++++++++++++++---- pkg/util/taskgroup/taskgroup_test.go | 12 ++++++++++ 3 files changed, 44 insertions(+), 5 deletions(-) diff --git a/pkg/util/taskgroup/LICENSE b/pkg/util/taskgroup/LICENSE index d8839129b..05608b298 100644 --- a/pkg/util/taskgroup/LICENSE +++ b/pkg/util/taskgroup/LICENSE @@ -1,3 +1,5 @@ +Originally taken from https://github.com/ptxmac/multierrgroup + MIT License Copyright (c) 2021 Peter Kristensen diff --git a/pkg/util/taskgroup/taskgroup.go b/pkg/util/taskgroup/taskgroup.go index a282e9dd4..0723ae4d8 100644 --- a/pkg/util/taskgroup/taskgroup.go +++ b/pkg/util/taskgroup/taskgroup.go @@ -1,4 +1,6 @@ -// Package multierrgroup provides a mix of multierr and errgroup +// Originally taken from https://github.com/ptxmac/multierrgroup + +// Package taskgroup provides a mix of multierr and errgroup // See documentation for https://pkg.go.dev/go.uber.org/multierr and https://pkg.go.dev/golang.org/x/sync/errgroup package taskgroup @@ -15,13 +17,15 @@ import ( // See https://pkg.go.dev/golang.org/x/sync/errgroup#group for more information type Group interface { WithContext(ctx context.Context) context.Context + WithPanicHandler(f func(any)) Wait() error Go(name string, f func(logger *zap.Logger) error) } type group struct { - cancel context.CancelFunc - logger *zap.Logger + cancel context.CancelFunc + logger *zap.Logger + panicHandler func(any) wg sync.WaitGroup @@ -41,7 +45,7 @@ func NewGroup(logger *zap.Logger) Group { } } -// WithContext returns a new Group with a associated Context. +// WithContext updates the current Group with a associated Context. // The context will be canceled if any goroutine returns an error. // See https://pkg.go.dev/golang.org/x/sync/errgroup#WithContext func (g *group) WithContext(ctx context.Context) context.Context { @@ -49,6 +53,11 @@ func (g *group) WithContext(ctx context.Context) context.Context { return ctx } +// WithPanicHandler sets a panic handler for the group. +func (g *group) WithPanicHandler(f func(any)) { + g.panicHandler = f +} + // Wait blocks until all goroutines have completed. // // All errors returned from the goroutines will be combined into one using multierr and returned from this method. @@ -60,6 +69,19 @@ func (g *group) Wait() error { return g.err } +func (g *group) call(f func() error) (err error) { + defer func() { + if r := recover(); r != nil { + if g.panicHandler != nil { + g.panicHandler(r) + } + err = fmt.Errorf("panic: %v", r) + } + }() + err = f() + return err +} + // Go calls the function in a new goroutine. // If a non-nil errors is returned, the context is canceled and // the error is collected using multierr and will be returned by Wait. @@ -69,7 +91,10 @@ func (g *group) Go(name string, f func(logger *zap.Logger) error) { go func() { defer g.wg.Done() logger := g.logger.Named(name) - if err := f(logger); err != nil { + cb := func() error { + return f(logger) + } + if err := g.call(cb); err != nil { err = fmt.Errorf("task %s failed: %w", name, err) g.errMutex.Lock() g.err = multierr.Append(g.err, err) diff --git a/pkg/util/taskgroup/taskgroup_test.go b/pkg/util/taskgroup/taskgroup_test.go index 67c371552..2e25c604d 100644 --- a/pkg/util/taskgroup/taskgroup_test.go +++ b/pkg/util/taskgroup/taskgroup_test.go @@ -10,6 +10,7 @@ import ( "go.uber.org/zap" "github.com/neondatabase/autoscaling/pkg/util/taskgroup" + "github.com/stretchr/testify/assert" ) func ExampleGroup() { @@ -59,3 +60,14 @@ func TestWithContext(t *testing.T) { t.Errorf("context should have been canceled!") } } + +func TestPanic(t *testing.T) { + log := zap.NewNop() + g := taskgroup.NewGroup(log) + g.Go("task1", func(_ *zap.Logger) error { + panic("panic message") + }) + err := g.Wait() + assert.NotNil(t, err) + assert.Equal(t, err.Error(), "task task1 failed: panic: panic message") +} From bf7a5af2a6866e9105f415fe7ac40d185faef019 Mon Sep 17 00:00:00 2001 From: Oleg Vasilev Date: Tue, 21 May 2024 17:42:48 +0300 Subject: [PATCH 10/21] fix lints Signed-off-by: Oleg Vasilev --- pkg/util/taskgroup/taskgroup.go | 7 ++++--- pkg/util/taskgroup/taskgroup_test.go | 2 +- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/pkg/util/taskgroup/taskgroup.go b/pkg/util/taskgroup/taskgroup.go index 0723ae4d8..d4c50155f 100644 --- a/pkg/util/taskgroup/taskgroup.go +++ b/pkg/util/taskgroup/taskgroup.go @@ -36,9 +36,10 @@ type group struct { // NewGroup returns a new Group. func NewGroup(logger *zap.Logger) Group { return &group{ - cancel: nil, // Set separately by WithContext - logger: logger, - wg: sync.WaitGroup{}, + cancel: nil, // Set separately by WithContext + panicHandler: nil, // Set separately by WithPanicHandler + logger: logger, + wg: sync.WaitGroup{}, errMutex: sync.Mutex{}, err: nil, diff --git a/pkg/util/taskgroup/taskgroup_test.go b/pkg/util/taskgroup/taskgroup_test.go index 2e25c604d..adec4aa25 100644 --- a/pkg/util/taskgroup/taskgroup_test.go +++ b/pkg/util/taskgroup/taskgroup_test.go @@ -6,11 +6,11 @@ import ( "fmt" "testing" + "github.com/stretchr/testify/assert" "go.uber.org/multierr" "go.uber.org/zap" "github.com/neondatabase/autoscaling/pkg/util/taskgroup" - "github.com/stretchr/testify/assert" ) func ExampleGroup() { From 836b1b0ae3c02f5e400be3707542fdd3d94d4817 Mon Sep 17 00:00:00 2001 From: Oleg Vasilev Date: Tue, 28 May 2024 16:31:45 +0300 Subject: [PATCH 11/21] update the API to Ctx() Signed-off-by: Oleg Vasilev --- pkg/util/taskgroup/taskgroup.go | 38 ++++++++++++++++++++-------- pkg/util/taskgroup/taskgroup_test.go | 31 +++++++++++++---------- 2 files changed, 46 insertions(+), 23 deletions(-) diff --git a/pkg/util/taskgroup/taskgroup.go b/pkg/util/taskgroup/taskgroup.go index d4c50155f..b4dc1b883 100644 --- a/pkg/util/taskgroup/taskgroup.go +++ b/pkg/util/taskgroup/taskgroup.go @@ -16,7 +16,7 @@ import ( // Group manages goroutines and collect all the errors. // See https://pkg.go.dev/golang.org/x/sync/errgroup#group for more information type Group interface { - WithContext(ctx context.Context) context.Context + Ctx() context.Context WithPanicHandler(f func(any)) Wait() error Go(name string, f func(logger *zap.Logger) error) @@ -24,6 +24,7 @@ type Group interface { type group struct { cancel context.CancelFunc + ctx context.Context logger *zap.Logger panicHandler func(any) @@ -33,10 +34,20 @@ type group struct { err error } +type GroupOption func(*group) + +// WithParentContext sets the parent context for the group. +func WithParentContext(ctx context.Context) GroupOption { + return func(g *group) { + g.ctx, g.cancel = context.WithCancel(ctx) + } +} + // NewGroup returns a new Group. -func NewGroup(logger *zap.Logger) Group { - return &group{ - cancel: nil, // Set separately by WithContext +func NewGroup(logger *zap.Logger, opts ...GroupOption) Group { + g := &group{ + cancel: nil, // Set separately by Ctx + ctx: nil, // Set separately by Ctx panicHandler: nil, // Set separately by WithPanicHandler logger: logger, wg: sync.WaitGroup{}, @@ -44,14 +55,21 @@ func NewGroup(logger *zap.Logger) Group { errMutex: sync.Mutex{}, err: nil, } + + for _, opt := range opts { + opt(g) + } + if g.ctx == nil { + // If parent context is not set, use background context + WithParentContext(context.Background())(g) + } + + return g } -// WithContext updates the current Group with a associated Context. -// The context will be canceled if any goroutine returns an error. -// See https://pkg.go.dev/golang.org/x/sync/errgroup#WithContext -func (g *group) WithContext(ctx context.Context) context.Context { - ctx, g.cancel = context.WithCancel(ctx) - return ctx +// Ctx returns a context that will be canceled when the group is Waited. +func (g *group) Ctx() context.Context { + return g.ctx } // WithPanicHandler sets a panic handler for the group. diff --git a/pkg/util/taskgroup/taskgroup_test.go b/pkg/util/taskgroup/taskgroup_test.go index adec4aa25..c23a766ce 100644 --- a/pkg/util/taskgroup/taskgroup_test.go +++ b/pkg/util/taskgroup/taskgroup_test.go @@ -35,7 +35,6 @@ func TestWithContext(t *testing.T) { log := zap.NewNop() g := taskgroup.NewGroup(log) - ctx := g.WithContext(context.Background()) g.Go("task1", func(_ *zap.Logger) error { return err1 }) @@ -43,21 +42,27 @@ func TestWithContext(t *testing.T) { return err2 }) err := g.Wait() - if !errors.Is(err, err1) { - t.Errorf("error: %s should be: %s", err, err1) - } - if !errors.Is(err, err2) { - t.Errorf("error: %s should be: %s", err, err2) - } - canceled := false + assert.ErrorIs(t, err, err1) + assert.ErrorIs(t, err, err2) + select { - case <-ctx.Done(): - canceled = true + case <-g.Ctx().Done(): + break default: - + t.Fatal("context should be done") } - if !canceled { - t.Errorf("context should have been canceled!") +} + +func TestParentContext(t *testing.T) { + parentCtx, cancel := context.WithCancel(context.Background()) + g := taskgroup.NewGroup(zap.NewNop(), taskgroup.WithParentContext(parentCtx)) + cancel() + + select { + case <-g.Ctx().Done(): + break + default: + t.Fatal("context should be done") } } From f7d0581f3171326ae54663711b02d902ec568f3a Mon Sep 17 00:00:00 2001 From: Oleg Vasilev Date: Tue, 28 May 2024 16:31:53 +0300 Subject: [PATCH 12/21] change callsites Signed-off-by: Oleg Vasilev --- pkg/agent/entrypoint.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/pkg/agent/entrypoint.go b/pkg/agent/entrypoint.go index 67ec19f71..f0e19575d 100644 --- a/pkg/agent/entrypoint.go +++ b/pkg/agent/entrypoint.go @@ -77,13 +77,12 @@ func (r MainRunner) Run(logger *zap.Logger, ctx context.Context) error { mc, err := billing.NewMetricsCollector(ctx, logger, &r.Config.Billing) if err != nil { - return fmt.Errorf("error starting billing metrics collector: %w", err) + return fmt.Errorf("error creating billing metrics collector: %w", err) } - tg := taskgroup.NewGroup(logger) - ctx = tg.WithContext(ctx) - tg.Go("watch-metrics", func(logger *zap.Logger) error { - return mc.Run(ctx, logger, storeForNode, metrics) + tg := taskgroup.NewGroup(logger, taskgroup.WithParentContext(ctx)) + tg.Go("collect-metrics", func(logger *zap.Logger) error { + return mc.Run(tg.Ctx(), logger, storeForNode, metrics) }) tg.Go("main-loop", func(logger *zap.Logger) error { logger.Info("Entering main loop") @@ -98,7 +97,7 @@ func (r MainRunner) Run(logger *zap.Logger, ctx context.Context) error { logger.Error("vmEventQueue returned error", zap.Error(err)) return err } - globalState.handleEvent(ctx, logger, event) + globalState.handleEvent(tg.Ctx(), logger, event) } }) From 9eff2d34d27ae4e802fac27edaa8e16a1d3b746d Mon Sep 17 00:00:00 2001 From: Oleg Vasilev Date: Tue, 28 May 2024 16:55:54 +0300 Subject: [PATCH 13/21] add stack trace Signed-off-by: Oleg Vasilev --- pkg/util/stack/LICENSE | 18 ++ pkg/util/stack/stack.go | 160 +++++++++++++ pkg/util/stack/stack_test.go | 331 +++++++++++++++++++++++++++ pkg/util/taskgroup/taskgroup.go | 4 + pkg/util/taskgroup/taskgroup_test.go | 3 +- 5 files changed, 515 insertions(+), 1 deletion(-) create mode 100644 pkg/util/stack/LICENSE create mode 100644 pkg/util/stack/stack.go create mode 100644 pkg/util/stack/stack_test.go diff --git a/pkg/util/stack/LICENSE b/pkg/util/stack/LICENSE new file mode 100644 index 000000000..eaf017fa4 --- /dev/null +++ b/pkg/util/stack/LICENSE @@ -0,0 +1,18 @@ +Originally taken from https://github.com/sharnoff/chord + +Copyright @sharnoff + +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and +associated documentation files (the “Software”), to deal in the Software without restriction, +including without limitation the rights to use, copy, modify, merge, publish, distribute, +sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all copies or substantial +portions of the Software. + +THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT +NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES +OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/pkg/util/stack/stack.go b/pkg/util/stack/stack.go new file mode 100644 index 000000000..decc67181 --- /dev/null +++ b/pkg/util/stack/stack.go @@ -0,0 +1,160 @@ +package stack + +// Originally taken from https://github.com/sharnoff/chord + +// TODO - want to have some kind of "N skipped" when (a) there's lots of frames and (b) many of +// those frames are duplicates + +import ( + "runtime" + "strconv" + "sync" +) + +// StackTrace represents a collected stack trace, possibly with a parent (i.e caller) +// +// StackTraces are designed to make it easy to track callers across goroutines. They are typically +// produced by [GetStackTrace]; refer to that function for more information. +type StackTrace struct { + // Frames provides the frames of this stack trace. Each frame's caller is at the index following + // it; the first frame is the direct caller. + Frames []StackFrame + // Parent, if not nil, provides the "parent" stack trace - typically the stack trace at the + // point this goroutine was spawned. + Parent *StackTrace +} + +// Individual stack frame, contained in a [StackTrace], produced by [GetStackTrace]. +type StackFrame struct { + // Function provides the name of the function being called, or the empty string if unknown. + Function string + // File gives the name of the file, or an empty string if the file is unknown. + File string + // Line gives the line number (starting from 1), or zero if the line number is unknown. + Line int +} + +// GetStackTrace produces a StackTrace, optionally with a parent's stack trace to append. +// +// skip sets the number of initial calling stack frames to exclude. Setting skip to zero will +// produce a StackTrace where the first [StackFrame] represents the location where GetStackTrace was +// called. +func GetStackTrace(parent *StackTrace, skip uint) StackTrace { + frames := getFrames(skip + 1) // skip the additional frame introduced by GetStackTrace + return StackTrace{Frames: frames, Parent: parent} +} + +// String produces a string representation of the stack trace, roughly similar to the default panic +// handler's. +// +// For some examples of formatting, refer to the StackTrace tests. +func (st StackTrace) String() string { + var buf []byte + + for { + if len(st.Frames) == 0 { + buf = append(buf, "\n"...) + } else { + for _, f := range st.Frames { + var function, functionTail, file, fileLineSep, line string + + if f.Function == "" { + function = "" + } else { + function = f.Function + functionTail = "(...)" + } + + if f.File == "" { + file = "" + } else { + file = f.File + if f.Line != 0 { + fileLineSep = ":" + line = strconv.Itoa(f.Line) + } + } + + buf = append(buf, function...) + buf = append(buf, functionTail...) + buf = append(buf, "\n\t"...) + buf = append(buf, file...) + buf = append(buf, fileLineSep...) + buf = append(buf, line...) + buf = append(buf, byte('\n')) + } + } + + if st.Parent == nil { + break + } + + st = *st.Parent + buf = append(buf, "called by "...) + continue + } + + return string(buf) +} + +var pcBufPool = sync.Pool{ + New: func() any { + buf := make([]uintptr, 128) + return &buf + }, +} + +func putPCBuffer(buf *[]uintptr) { + if len(*buf) < 1024 { + pcBufPool.Put(buf) + } +} + +func getFrames(skip uint) []StackFrame { + skip += 2 // skip the frame introduced by this function and runtime.Callers + + pcBuf := pcBufPool.Get().(*[]uintptr) + defer putPCBuffer(pcBuf) + if len(*pcBuf) == 0 { + panic("internal error: len(*pcBuf) == 0") + } + + // read program counters into the buffer, repeating until buffer is big enough. + // + // This is O(n log n), where n is the true number of program counters. + var pc []uintptr + for { + n := runtime.Callers(0, *pcBuf) + if n == 0 { + panic("runtime.Callers(0, ...) returned zero") + } + + if n < len(*pcBuf) { + pc = (*pcBuf)[:n] + break + } else { + *pcBuf = make([]uintptr, 2*len(*pcBuf)) + } + } + + framesIter := runtime.CallersFrames(pc) + var frames []StackFrame + more := true + for more { + var frame runtime.Frame + frame, more = framesIter.Next() + + if skip > 0 { + skip -= 1 + continue + } + + frames = append(frames, StackFrame{ + Function: frame.Function, + File: frame.File, + Line: frame.Line, + }) + } + + return frames +} diff --git a/pkg/util/stack/stack_test.go b/pkg/util/stack/stack_test.go new file mode 100644 index 000000000..5f8645097 --- /dev/null +++ b/pkg/util/stack/stack_test.go @@ -0,0 +1,331 @@ +//nolint:exhaustruct // This is taken as-is from other repo +package stack_test + +import ( + "fmt" + "regexp" + "strings" + "testing" + + "github.com/neondatabase/autoscaling/pkg/util/stack" +) + +//noinspection + +func concatLines(lines ...string) string { + return strings.Join(lines, "\n") +} + +func TestStackFormatVarieties(t *testing.T) { + t.Parallel() + + expected := concatLines( + "packagename.foo(...)", + "\t/path/to/package/foo.go:37", + "packagename.bar(...)", + "\t/path/to/package/bar.go", + "packagename.baz(...)", + "\t", + "packagename.qux(...)", + "\t", + "", + "\t/unknown/function/path.go:45", + "", + "\t", + "", + ) + + st := stack.StackTrace{ + Frames: []stack.StackFrame{ + {Function: "packagename.foo", File: "/path/to/package/foo.go", Line: 37}, + {Function: "packagename.bar", File: "/path/to/package/bar.go"}, + {Function: "packagename.baz"}, + {Function: "packagename.qux", Line: 29}, // Line should have no effect if File is missing. + {File: "/unknown/function/path.go", Line: 45}, + {}, + }, + } + + got := st.String() + + if got != expected { + t.Fail() + t.Log( + "--- BEGIN expected formatting ---\n", + fmt.Sprintf("%q", expected), + "\n--- END expected formatting. BEGIN actual formatting ---\n", + fmt.Sprintf("%q", got), + ) + } +} + +func TestStackParentsFormat(t *testing.T) { + t.Parallel() + + expected := concatLines( + "packagename.Foo(...)", + "\t/path/to/package/foo.go:37", + "packagename.Bar(...)", + "\t/path/to/package/bar.go:45", + "called by packagename2.Baz(...)", + "\t/path/to/package2/baz.go:52", + "packagename2.Qux(...)", + "\t/path/to/package2/qux.go:59", + "called by packagename3.Abc(...)", + "\t/path/to/package3/abc.go:66", + "packagename3.Xyz(...)", + "\t/path/to/package3/xyz.go:71", + "", + ) + + st := stack.StackTrace{ + Frames: []stack.StackFrame{ + {Function: "packagename.Foo", File: "/path/to/package/foo.go", Line: 37}, + {Function: "packagename.Bar", File: "/path/to/package/bar.go", Line: 45}, + }, + Parent: &stack.StackTrace{ + Frames: []stack.StackFrame{ + {Function: "packagename2.Baz", File: "/path/to/package2/baz.go", Line: 52}, + {Function: "packagename2.Qux", File: "/path/to/package2/qux.go", Line: 59}, + }, + Parent: &stack.StackTrace{ + Frames: []stack.StackFrame{ + {Function: "packagename3.Abc", File: "/path/to/package3/abc.go", Line: 66}, + {Function: "packagename3.Xyz", File: "/path/to/package3/xyz.go", Line: 71}, + }, + }, + }, + } + + got := st.String() + + if got != expected { + t.Fail() + t.Log( + "--- BEGIN expected formatting ---\n", + fmt.Sprintf("%q", expected), + "\n--- END expected formatting. BEGIN actual formatting ---\n", + fmt.Sprintf("%q", got), + ) + } +} + +func validateStackTrace(t *testing.T, expected, got stack.StackTrace) { + for depth := 0; ; depth += 1 { + if (expected.Parent == nil) != (got.Parent == nil) { + t.Fatalf( + "mismatched at depth %d, whether has parent: expected %v, got %v", + depth, expected.Parent != nil, got.Parent != nil, + ) + } + + if len(expected.Frames) > len(got.Frames) || expected.Parent != nil && len(expected.Frames) != len(got.Frames) { + t.Fatalf( + "mismatched at depth %d, number of frames: expected %d, got %d", + depth, len(expected.Frames), len(got.Frames), + ) + } + + for i := range expected.Frames { + e := expected.Frames[i] + g := got.Frames[i] + + // check .File + if matched, err := regexp.Match(fmt.Sprint("^", e.File, "$"), []byte(g.File)); !matched || err != nil { + if err != nil { + panic(fmt.Errorf("bad regex for expected at depth %d, Frames[%d].Function: %w", depth, i, err)) + } + + t.Fatalf("mismatched at depth %d, Frames[%d].File: expected match for %q, got %q", depth, i, e.File, g.File) + } + + // check .Function + if matched, err := regexp.Match(fmt.Sprint("^", e.Function, "$"), []byte(g.Function)); !matched || err != nil { + if err != nil { + panic(fmt.Errorf("bad regex for expected at depth %d, Frames[%d].Function: %w", depth, i, err)) + } + + t.Fatalf("mismatched at depth %d, Frames[%d].Function: expected match for %q, got %q", depth, i, e.Function, g.Function) + } + + // check .Line + if (e.Line == 0) != (g.Line == 0) { + expectedKind := "!= 0" + if e.Line == 0 { + expectedKind = "== 0" + } + t.Fatalf("mismatched at depth %d, Frames[%d].Line: expected %s, got %d", depth, i, expectedKind, g.Line) + } + } + + if expected.Parent == nil { + return + } + + expected = *expected.Parent + got = *got.Parent + } +} + +func TestStackBasicCreation(t *testing.T) { + t.Parallel() + + expected := stack.StackTrace{ + Frames: []stack.StackFrame{ + {Function: `.*/stack_test.TestStackBasicCreation.func1`, File: `.*/stack_test\.go`, Line: 1}, + {Function: `.*/stack_test.TestStackBasicCreation.func2`, File: `.*/stack_test\.go`, Line: 1}, + {Function: `.*/stack_test.TestStackBasicCreation.func3`, File: `.*/stack_test\.go`, Line: 1}, + {Function: `.*/stack_test.TestStackBasicCreation`, File: `.*/stack_test\.go`, Line: 1}, + }, + } + + func1 := func() stack.StackTrace { + return stack.GetStackTrace(nil, 0) + } + func2 := func() stack.StackTrace { + return func1() + } + func3 := func() stack.StackTrace { + return func2() + } + + got := func3() + + validateStackTrace(t, expected, got) +} + +func TestStackPartialSkip(t *testing.T) { + t.Parallel() + + expected := stack.StackTrace{ + Frames: []stack.StackFrame{ + {Function: `.*/stack_test.TestStackPartialSkip.func3`, File: `.*/stack_test\.go`, Line: 1}, + {Function: `.*/stack_test.TestStackPartialSkip.func4`, File: `.*/stack_test\.go`, Line: 1}, + {Function: `.*/stack_test.TestStackPartialSkip`, File: `.*/stack_test\.go`, Line: 1}, + }, + } + + func1 := func() stack.StackTrace { + return stack.GetStackTrace(nil, 2) + } + func2 := func() stack.StackTrace { + return func1() + } + func3 := func() stack.StackTrace { + return func2() + } + func4 := func() stack.StackTrace { + return func3() + } + + got := func4() + + validateStackTrace(t, expected, got) +} + +func TestStackSkipTooManyIsEmpty(t *testing.T) { + t.Parallel() + + st := stack.GetStackTrace(nil, 100000) // pick a big number to skip all frames + if len(st.Frames) != 0 { + t.Fatal("expected no frames, got", len(st.Frames)) + } +} + +func TestStackMultiCreation(t *testing.T) { + t.Parallel() + + expected := stack.StackTrace{ + Frames: []stack.StackFrame{ + {Function: `.*/stack_test.TestStackMultiCreation.func3`, File: `.*/stack_test\.go`, Line: 1}, + {Function: `.*/stack_test.TestStackMultiCreation.func1`, File: `.*/stack_test\.go`, Line: 1}, + {Function: `runtime\.goexit`, File: `.*`, Line: 1}, // TODO: this may be fragile + }, + Parent: &stack.StackTrace{ + Frames: []stack.StackFrame{ + {Function: `.*/stack_test.TestStackMultiCreation.func4`, File: `.*/stack_test\.go`, Line: 1}, + {Function: `.*/stack_test.TestStackMultiCreation.func5`, File: `.*/stack_test\.go`, Line: 1}, + {Function: `.*/stack_test.TestStackMultiCreation.func1`, File: `.*/stack_test\.go`, Line: 1}, + {Function: `runtime\.goexit`, File: `.*`, Line: 1}, + }, + Parent: &stack.StackTrace{ + Frames: []stack.StackFrame{ + {Function: `.*/stack_test.TestStackMultiCreation.func6`, File: `.*/stack_test\.go`, Line: 1}, + {Function: `.*/stack_test.TestStackMultiCreation.func7`, File: `.*/stack_test\.go`, Line: 1}, + {Function: `.*/stack_test.TestStackMultiCreation`, File: `.*/stack_test\.go`, Line: 1}, + }, + }, + }, + } + + send := func(ch chan stack.StackTrace, parent stack.StackTrace, f func(stack.StackTrace) stack.StackTrace) { + ch <- f(parent) + } + + spawnWithStack := func(p *stack.StackTrace, f func(stack.StackTrace) stack.StackTrace) stack.StackTrace { + parent := stack.GetStackTrace(p, 1) // skip this function and the inner go func + + ch := make(chan stack.StackTrace) + go send(ch, parent, f) + return <-ch + } + + func3 := func(parent stack.StackTrace) stack.StackTrace { + return stack.GetStackTrace(&parent, 0) + } + func4 := func(parent stack.StackTrace) stack.StackTrace { + return spawnWithStack(&parent, func3) + } + func5 := func(parent stack.StackTrace) stack.StackTrace { + return func4(parent) + } + func6 := func() stack.StackTrace { + return spawnWithStack(nil, func5) + } + func7 := func() stack.StackTrace { + return func6() + } + + got := func7() + + validateStackTrace(t, expected, got) +} + +func TestStackCreateAfterRecover(t *testing.T) { + t.Parallel() + + expected := stack.StackTrace{ + Frames: []stack.StackFrame{ + {Function: `.*stack_test.TestStackCreateAfterRecover.func1`, File: `.*/stack_test\.go`, Line: 1}, + {Function: `.*stack_test.TestStackCreateAfterRecover.func2`, File: `.*/stack_test\.go`, Line: 1}, + {Function: `.*stack_test.TestStackCreateAfterRecover.func3`, File: `.*/stack_test\.go`, Line: 1}, + }, + } + + func1 := func() { + panic("") + } + + func2 := func() { + func1() + } + + var func4 func() + + func3 := func() { + defer func4() + func2() + } + + var st stack.StackTrace + func4 = func() { + if recover() != nil { + st = stack.GetStackTrace(nil, 2) + } + } + + func3() + got := st + + validateStackTrace(t, expected, got) +} diff --git a/pkg/util/taskgroup/taskgroup.go b/pkg/util/taskgroup/taskgroup.go index b4dc1b883..64164d354 100644 --- a/pkg/util/taskgroup/taskgroup.go +++ b/pkg/util/taskgroup/taskgroup.go @@ -11,6 +11,8 @@ import ( "go.uber.org/multierr" "go.uber.org/zap" + + "github.com/neondatabase/autoscaling/pkg/util/stack" ) // Group manages goroutines and collect all the errors. @@ -94,6 +96,8 @@ func (g *group) call(f func() error) (err error) { if g.panicHandler != nil { g.panicHandler(r) } + st := stack.GetStackTrace(nil, 1).String() + g.logger.Error("panic", zap.Any("panic", r), zap.String("stack", st)) err = fmt.Errorf("panic: %v", r) } }() diff --git a/pkg/util/taskgroup/taskgroup_test.go b/pkg/util/taskgroup/taskgroup_test.go index c23a766ce..4083322b8 100644 --- a/pkg/util/taskgroup/taskgroup_test.go +++ b/pkg/util/taskgroup/taskgroup_test.go @@ -67,7 +67,7 @@ func TestParentContext(t *testing.T) { } func TestPanic(t *testing.T) { - log := zap.NewNop() + log := zap.NewExample() g := taskgroup.NewGroup(log) g.Go("task1", func(_ *zap.Logger) error { panic("panic message") @@ -75,4 +75,5 @@ func TestPanic(t *testing.T) { err := g.Wait() assert.NotNil(t, err) assert.Equal(t, err.Error(), "task task1 failed: panic: panic message") + t.Fail() } From 103e1843c885e79a60ed5cedc212f3b5f79d2b69 Mon Sep 17 00:00:00 2001 From: Oleg Vasilev Date: Tue, 28 May 2024 17:00:12 +0300 Subject: [PATCH 14/21] delete fail Signed-off-by: Oleg Vasilev --- pkg/util/taskgroup/taskgroup_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/util/taskgroup/taskgroup_test.go b/pkg/util/taskgroup/taskgroup_test.go index 4083322b8..d36ed5d56 100644 --- a/pkg/util/taskgroup/taskgroup_test.go +++ b/pkg/util/taskgroup/taskgroup_test.go @@ -75,5 +75,4 @@ func TestPanic(t *testing.T) { err := g.Wait() assert.NotNil(t, err) assert.Equal(t, err.Error(), "task task1 failed: panic: panic message") - t.Fail() } From b62bd056185b9f8f53ed698fd7aad2e9ef76e972 Mon Sep 17 00:00:00 2001 From: Oleg Vasilev Date: Thu, 30 May 2024 18:23:15 +0500 Subject: [PATCH 15/21] Update pkg/util/taskgroup/taskgroup.go Co-authored-by: Em Sharnoff --- pkg/util/taskgroup/taskgroup.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/util/taskgroup/taskgroup.go b/pkg/util/taskgroup/taskgroup.go index 64164d354..d702ca6f8 100644 --- a/pkg/util/taskgroup/taskgroup.go +++ b/pkg/util/taskgroup/taskgroup.go @@ -97,7 +97,7 @@ func (g *group) call(f func() error) (err error) { g.panicHandler(r) } st := stack.GetStackTrace(nil, 1).String() - g.logger.Error("panic", zap.Any("panic", r), zap.String("stack", st)) + g.logger.Error("Task panicked", zap.Any("payload", r), zap.String("stack", st)) err = fmt.Errorf("panic: %v", r) } }() From 27467b47c496382c360a61f79931d339479f2aad Mon Sep 17 00:00:00 2001 From: Oleg Vasilev Date: Fri, 31 May 2024 14:35:48 +0400 Subject: [PATCH 16/21] add tests for the stacktrace Signed-off-by: Oleg Vasilev --- pkg/util/taskgroup/taskgroup.go | 1 + pkg/util/taskgroup/taskgroup_test.go | 25 +++++++++++++++++++++++-- 2 files changed, 24 insertions(+), 2 deletions(-) diff --git a/pkg/util/taskgroup/taskgroup.go b/pkg/util/taskgroup/taskgroup.go index d702ca6f8..cd8ee2606 100644 --- a/pkg/util/taskgroup/taskgroup.go +++ b/pkg/util/taskgroup/taskgroup.go @@ -96,6 +96,7 @@ func (g *group) call(f func() error) (err error) { if g.panicHandler != nil { g.panicHandler(r) } + // Omit 1 frame - the f() call below st := stack.GetStackTrace(nil, 1).String() g.logger.Error("Task panicked", zap.Any("payload", r), zap.String("stack", st)) err = fmt.Errorf("panic: %v", r) diff --git a/pkg/util/taskgroup/taskgroup_test.go b/pkg/util/taskgroup/taskgroup_test.go index d36ed5d56..8348bcba8 100644 --- a/pkg/util/taskgroup/taskgroup_test.go +++ b/pkg/util/taskgroup/taskgroup_test.go @@ -4,11 +4,13 @@ import ( "context" "errors" "fmt" + "strings" "testing" "github.com/stretchr/testify/assert" "go.uber.org/multierr" "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" "github.com/neondatabase/autoscaling/pkg/util/taskgroup" ) @@ -66,13 +68,32 @@ func TestParentContext(t *testing.T) { } } +func setupLogsCapture() (*zap.Logger, *observer.ObservedLogs) { + core, logs := observer.New(zap.InfoLevel) + return zap.New(core), logs +} + func TestPanic(t *testing.T) { - log := zap.NewExample() - g := taskgroup.NewGroup(log) + logger, logs := setupLogsCapture() + g := taskgroup.NewGroup(logger) g.Go("task1", func(_ *zap.Logger) error { panic("panic message") }) err := g.Wait() assert.NotNil(t, err) assert.Equal(t, err.Error(), "task task1 failed: panic: panic message") + + assert.Equal(t, 2, logs.Len()) + msg0 := logs.All()[0] + assert.Equal(t, "Task panicked", msg0.Message) + assert.Len(t, msg0.Context, 2) + assert.Equal(t, "payload", msg0.Context[0].Key) + assert.Equal(t, "panic message", msg0.Context[0].String) + assert.Equal(t, "stack", msg0.Context[1].Key) + stackTrace := msg0.Context[1].String + assert.True(t, strings.HasPrefix(stackTrace, "runtime.gopanic(...)\n")) + msg1 := logs.All()[1] + assert.Equal(t, "task task1 failed: panic: panic message", msg1.Message) + assert.Len(t, msg1.Context, 0) + } From 38c551214b2f81458de67c06fc36b6ec7c6bdecb Mon Sep 17 00:00:00 2001 From: Oleg Vasilev Date: Fri, 31 May 2024 14:46:38 +0400 Subject: [PATCH 17/21] bring back the panic Signed-off-by: Oleg Vasilev --- pkg/agent/billing/billing.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/agent/billing/billing.go b/pkg/agent/billing/billing.go index e0bb5e454..d3b49f416 100644 --- a/pkg/agent/billing/billing.go +++ b/pkg/agent/billing/billing.go @@ -175,7 +175,9 @@ func (mc *MetricsCollector) Run( case <-collectTicker.C: logger.Info("Collecting billing state") if store.Stopped() && ctx.Err() == nil { - return errors.New("VM store stopped but background context is still live") + err := errors.New("VM store stopped but background context is still live") + logger.Panic("Validation check failed", zap.Error(err)) + return err } state.collect(logger, store, metrics) case <-accumulateTicker.C: From 75dc6de0b9f020195f0e8570d6ce3a8f21f18c2d Mon Sep 17 00:00:00 2001 From: Oleg Vasilev Date: Mon, 10 Jun 2024 13:41:39 +0400 Subject: [PATCH 18/21] move WithPanic, and expand comments Signed-off-by: Oleg Vasilev --- pkg/util/taskgroup/taskgroup.go | 13 +++++++------ pkg/util/taskgroup/taskgroup_test.go | 16 +++++++++++++--- 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/pkg/util/taskgroup/taskgroup.go b/pkg/util/taskgroup/taskgroup.go index cd8ee2606..9be990dc5 100644 --- a/pkg/util/taskgroup/taskgroup.go +++ b/pkg/util/taskgroup/taskgroup.go @@ -19,7 +19,6 @@ import ( // See https://pkg.go.dev/golang.org/x/sync/errgroup#group for more information type Group interface { Ctx() context.Context - WithPanicHandler(f func(any)) Wait() error Go(name string, f func(logger *zap.Logger) error) } @@ -45,6 +44,13 @@ func WithParentContext(ctx context.Context) GroupOption { } } +// WithPanicHandler sets a panic handler for the group. +func WithPanicHandler(f func(any)) GroupOption { + return func(g *group) { + g.panicHandler = f + } +} + // NewGroup returns a new Group. func NewGroup(logger *zap.Logger, opts ...GroupOption) Group { g := &group{ @@ -74,11 +80,6 @@ func (g *group) Ctx() context.Context { return g.ctx } -// WithPanicHandler sets a panic handler for the group. -func (g *group) WithPanicHandler(f func(any)) { - g.panicHandler = f -} - // Wait blocks until all goroutines have completed. // // All errors returned from the goroutines will be combined into one using multierr and returned from this method. diff --git a/pkg/util/taskgroup/taskgroup_test.go b/pkg/util/taskgroup/taskgroup_test.go index 8348bcba8..d619f242f 100644 --- a/pkg/util/taskgroup/taskgroup_test.go +++ b/pkg/util/taskgroup/taskgroup_test.go @@ -74,16 +74,25 @@ func setupLogsCapture() (*zap.Logger, *observer.ObservedLogs) { } func TestPanic(t *testing.T) { + var panicCnt int + handler := func(_ any) { + panicCnt++ + } + logger, logs := setupLogsCapture() - g := taskgroup.NewGroup(logger) + g := taskgroup.NewGroup(logger, taskgroup.WithPanicHandler(handler)) g.Go("task1", func(_ *zap.Logger) error { panic("panic message") }) err := g.Wait() assert.NotNil(t, err) - assert.Equal(t, err.Error(), "task task1 failed: panic: panic message") + assert.Equal(t, "task task1 failed: panic: panic message", err.Error()) + assert.Equal(t, 1, panicCnt) + // We have two log lines: one specific for the panic, with additional + // context, and one for any task failure. assert.Equal(t, 2, logs.Len()) + msg0 := logs.All()[0] assert.Equal(t, "Task panicked", msg0.Message) assert.Len(t, msg0.Context, 2) @@ -92,8 +101,9 @@ func TestPanic(t *testing.T) { assert.Equal(t, "stack", msg0.Context[1].Key) stackTrace := msg0.Context[1].String assert.True(t, strings.HasPrefix(stackTrace, "runtime.gopanic(...)\n")) + msg1 := logs.All()[1] + // msg := task {name} failed: {error}; error := panic: {panicMessage} assert.Equal(t, "task task1 failed: panic: panic message", msg1.Message) assert.Len(t, msg1.Context, 0) - } From 954e2c64ed9ab3545f3327f702e2dff428bfb065 Mon Sep 17 00:00:00 2001 From: Oleg Vasilev Date: Mon, 10 Jun 2024 13:46:17 +0400 Subject: [PATCH 19/21] changed task name to billing-metrics Signed-off-by: Oleg Vasilev --- pkg/agent/entrypoint.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/agent/entrypoint.go b/pkg/agent/entrypoint.go index f0e19575d..435a4e8ac 100644 --- a/pkg/agent/entrypoint.go +++ b/pkg/agent/entrypoint.go @@ -81,7 +81,7 @@ func (r MainRunner) Run(logger *zap.Logger, ctx context.Context) error { } tg := taskgroup.NewGroup(logger, taskgroup.WithParentContext(ctx)) - tg.Go("collect-metrics", func(logger *zap.Logger) error { + tg.Go("billing-metrics", func(logger *zap.Logger) error { return mc.Run(tg.Ctx(), logger, storeForNode, metrics) }) tg.Go("main-loop", func(logger *zap.Logger) error { From e68f93998142b5b8993d5994452d07de13dd4272 Mon Sep 17 00:00:00 2001 From: Oleg Vasilev Date: Mon, 17 Jun 2024 15:19:04 +0400 Subject: [PATCH 20/21] Update pkg/util/taskgroup/taskgroup_test.go Co-authored-by: Em Sharnoff --- pkg/util/taskgroup/taskgroup_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/util/taskgroup/taskgroup_test.go b/pkg/util/taskgroup/taskgroup_test.go index d619f242f..491d58d81 100644 --- a/pkg/util/taskgroup/taskgroup_test.go +++ b/pkg/util/taskgroup/taskgroup_test.go @@ -100,6 +100,8 @@ func TestPanic(t *testing.T) { assert.Equal(t, "panic message", msg0.Context[0].String) assert.Equal(t, "stack", msg0.Context[1].Key) stackTrace := msg0.Context[1].String + // test that the stack trace begins with gopanic(...) so we always start + // the backtrace at the same place assert.True(t, strings.HasPrefix(stackTrace, "runtime.gopanic(...)\n")) msg1 := logs.All()[1] From 80f6fa2c842efcc9dd323de606a017ff60926cfd Mon Sep 17 00:00:00 2001 From: Oleg Vasilev Date: Mon, 17 Jun 2024 15:20:08 +0400 Subject: [PATCH 21/21] Update pkg/agent/entrypoint.go --- pkg/agent/entrypoint.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/agent/entrypoint.go b/pkg/agent/entrypoint.go index 435a4e8ac..176f854f7 100644 --- a/pkg/agent/entrypoint.go +++ b/pkg/agent/entrypoint.go @@ -81,7 +81,7 @@ func (r MainRunner) Run(logger *zap.Logger, ctx context.Context) error { } tg := taskgroup.NewGroup(logger, taskgroup.WithParentContext(ctx)) - tg.Go("billing-metrics", func(logger *zap.Logger) error { + tg.Go("billing", func(logger *zap.Logger) error { return mc.Run(tg.Ctx(), logger, storeForNode, metrics) }) tg.Go("main-loop", func(logger *zap.Logger) error {