Skip to content

Commit

Permalink
util: add taskgroup (#922)
Browse files Browse the repository at this point in the history
taskgroup.Group is the better version of errgroup.Group with two
changes:
1. Support for multerrors
2. Logging the errors, when they are returned

Part of #921

---------

Signed-off-by: Oleg Vasilev <[email protected]>
Co-authored-by: Em Sharnoff <[email protected]>
  • Loading branch information
Omrigan and sharnoff authored Jun 17, 2024
1 parent 1ab666f commit 3a61010
Show file tree
Hide file tree
Showing 11 changed files with 842 additions and 77 deletions.
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ require (
github.com/stretchr/testify v1.8.1
github.com/tychoish/fun v0.8.5
github.com/vishvananda/netlink v1.1.1-0.20220125195016-0639e7e787ba
go.uber.org/multierr v1.9.0
go.uber.org/zap v1.24.0
golang.org/x/crypto v0.21.0
golang.org/x/exp v0.0.0-20230425010034-47ecfdc1ba53
golang.org/x/sync v0.3.0
golang.org/x/term v0.18.0
gopkg.in/yaml.v3 v3.0.1
k8s.io/api v0.26.15
Expand Down Expand Up @@ -184,10 +184,10 @@ require (
go.opentelemetry.io/otel/trace v1.10.0 // indirect
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/mod v0.12.0 // indirect
golang.org/x/net v0.21.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/oauth2 v0.7.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.3.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -671,8 +671,8 @@ golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qx
golang.org/x/net v0.0.0-20210805182204-aaa1db679c0d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4=
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs=
golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
Expand Down
46 changes: 12 additions & 34 deletions neonvm/runner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ import (
"github.com/opencontainers/runtime-spec/specs-go"
"github.com/vishvananda/netlink"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"

"k8s.io/apimachinery/pkg/api/resource"

vmv1 "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1"
"github.com/neondatabase/autoscaling/pkg/api"
"github.com/neondatabase/autoscaling/pkg/util/taskgroup"
)

const (
Expand Down Expand Up @@ -699,26 +699,13 @@ func run(logger *zap.Logger) error {
}
}

// Helper function to both log an error and return the result of wrapping it with the message.
//
// This is used below because (*errgroup.Group).Wait() returns at most 1 error, which means
// imprecise usage can hide errors. So, we log the errors before they would be returned, to make
// sure they're always visible.
logWrap := func(msg string, err error) error {
logger.Error(msg, zap.Error(err))
return fmt.Errorf("%s: %w", msg, err)
}

eg := &errgroup.Group{}
eg.Go(func() error {
if err := runInitScript(logger, vmSpec.InitScript); err != nil {
return logWrap("failed to run init script", err)
}
return nil
tg := taskgroup.NewGroup(logger)
tg.Go("init-script", func(logger *zap.Logger) error {
return runInitScript(logger, vmSpec.InitScript)
})

eg.Go(func() error {
if err := createISO9660runtime(
tg.Go("iso9660-runtime", func(logger *zap.Logger) error {
return createISO9660runtime(
runtimeDiskPath,
vmSpec.Guest.Command,
vmSpec.Guest.Args,
Expand All @@ -728,31 +715,22 @@ func run(logger *zap.Logger) error {
enableSSH,
swapInfo,
shmSize,
); err != nil {
return logWrap("failed to create iso9660 disk", err)
}
return nil
)
})

eg.Go(func() error {
tg.Go("rootDisk", func(logger *zap.Logger) error {
// resize rootDisk image of size specified and new size more than current
if err := resizeRootDisk(logger, vmSpec); err != nil {
return logWrap("failed to resize rootDisk", err)
}
return nil
return resizeRootDisk(logger, vmSpec)
})
var qemuCmd []string

eg.Go(func() error {
tg.Go("qemu-cmd", func(logger *zap.Logger) error {
var err error
qemuCmd, err = buildQEMUCmd(cfg, logger, vmSpec, &vmStatus, cpus, memory, enableSSH, swapInfo)
if err != nil {
return logWrap("failed to build QEMU command", err)
}
return nil
return err
})

if err := eg.Wait(); err != nil {
if err := tg.Wait(); err != nil {
return err
}

Expand Down
44 changes: 23 additions & 21 deletions pkg/agent/billing/billing.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,19 +86,24 @@ type vmMetricsSeconds struct {
activeTime time.Duration
}

func StartBillingMetricsCollector(
type MetricsCollector struct {
conf *Config
clients []clientInfo
}

func NewMetricsCollector(
ctx context.Context,
parentLogger *zap.Logger,
conf *Config,
store VMStoreForNode,
metrics PromMetrics,
) error {
) (*MetricsCollector, error) {
logger := parentLogger.Named("billing")

var clients []clientInfo
mc := &MetricsCollector{
conf: conf,
clients: make([]clientInfo, 0),
}

if c := conf.Clients.HTTP; c != nil {
clients = append(clients, clientInfo{
mc.clients = append(mc.clients, clientInfo{
client: billing.NewHTTPClient(c.URL, http.DefaultClient),
name: "http",
config: c.BaseClientConfig,
Expand All @@ -107,35 +112,31 @@ func StartBillingMetricsCollector(
if c := conf.Clients.S3; c != nil {
client, err := billing.NewS3Client(ctx, c.S3ClientConfig)
if err != nil {
return fmt.Errorf("failed to create S3 client: %w", err)
return nil, fmt.Errorf("failed to create S3 client: %w", err)
}
logger.Info("Created S3 client", client.LogFields())
clients = append(clients, clientInfo{
mc.clients = append(mc.clients, clientInfo{
client: client,
name: "s3",
config: c.BaseClientConfig,
})
}

// TODO: catch panics here, bubble those into a clean-ish shutdown.
go runBillingMetricsCollector(ctx, logger, conf, store, metrics, clients)
return nil
return mc, nil
}

func runBillingMetricsCollector(
func (mc *MetricsCollector) Run(
ctx context.Context,
logger *zap.Logger,
conf *Config,
store VMStoreForNode,
metrics PromMetrics,
clients []clientInfo,
) {
) error {

collectTicker := time.NewTicker(time.Second * time.Duration(conf.CollectEverySeconds))
collectTicker := time.NewTicker(time.Second * time.Duration(mc.conf.CollectEverySeconds))
defer collectTicker.Stop()
// Offset by half a second, so it's a bit more deterministic.
time.Sleep(500 * time.Millisecond)
accumulateTicker := time.NewTicker(time.Second * time.Duration(conf.AccumulateEverySeconds))
accumulateTicker := time.NewTicker(time.Second * time.Duration(mc.conf.AccumulateEverySeconds))
defer accumulateTicker.Stop()

state := metricsState{
Expand All @@ -147,7 +148,7 @@ func runBillingMetricsCollector(

var queueWriters []eventQueuePusher[*billing.IncrementalEvent]

for _, c := range clients {
for _, c := range mc.clients {
qw, queueReader := newEventQueue[*billing.IncrementalEvent](metrics.queueSizeCurrent.WithLabelValues(c.name))
queueWriters = append(queueWriters, qw)

Expand Down Expand Up @@ -176,13 +177,14 @@ func runBillingMetricsCollector(
if store.Stopped() && ctx.Err() == nil {
err := errors.New("VM store stopped but background context is still live")
logger.Panic("Validation check failed", zap.Error(err))
return err
}
state.collect(logger, store, metrics)
case <-accumulateTicker.C:
logger.Info("Creating billing batch")
state.drainEnqueue(logger, conf, billing.GetHostname(), queueWriters)
state.drainEnqueue(logger, mc.conf, billing.GetHostname(), queueWriters)
case <-ctx.Done():
return
return nil
}
}
}
Expand Down
43 changes: 26 additions & 17 deletions pkg/agent/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/neondatabase/autoscaling/pkg/agent/billing"
"github.com/neondatabase/autoscaling/pkg/agent/schedwatch"
"github.com/neondatabase/autoscaling/pkg/util"
"github.com/neondatabase/autoscaling/pkg/util/taskgroup"
"github.com/neondatabase/autoscaling/pkg/util/watch"
)

Expand Down Expand Up @@ -59,11 +60,6 @@ func (r MainRunner) Run(logger *zap.Logger, ctx context.Context) error {
metrics := billing.NewPromMetrics()
metrics.MustRegister(globalPromReg)

err = billing.StartBillingMetricsCollector(ctx, logger, &r.Config.Billing, storeForNode, metrics)
if err != nil {
return fmt.Errorf("error starting billing metrics collector: %w", err)
}

promLogger := logger.Named("prometheus")
if err := util.StartPrometheusMetricsServer(ctx, promLogger.Named("global"), 9100, globalPromReg); err != nil {
return fmt.Errorf("Error starting prometheus metrics server: %w", err)
Expand All @@ -79,18 +75,31 @@ func (r MainRunner) Run(logger *zap.Logger, ctx context.Context) error {
}
}

logger.Info("Entering main loop")
for {
event, err := vmEventQueue.Wait(ctx)
if err != nil {
if ctx.Err() != nil {
// treat context canceled as a "normal" exit (because it is)
return nil
}
mc, err := billing.NewMetricsCollector(ctx, logger, &r.Config.Billing)
if err != nil {
return fmt.Errorf("error creating billing metrics collector: %w", err)
}

logger.Error("vmEventQueue returned error", zap.Error(err))
return err
tg := taskgroup.NewGroup(logger, taskgroup.WithParentContext(ctx))
tg.Go("billing", func(logger *zap.Logger) error {
return mc.Run(tg.Ctx(), logger, storeForNode, metrics)
})
tg.Go("main-loop", func(logger *zap.Logger) error {
logger.Info("Entering main loop")
for {
event, err := vmEventQueue.Wait(ctx)
if err != nil {
if ctx.Err() != nil {
// treat context canceled as a "normal" exit (because it is)
return nil
}

logger.Error("vmEventQueue returned error", zap.Error(err))
return err
}
globalState.handleEvent(tg.Ctx(), logger, event)
}
globalState.handleEvent(ctx, logger, event)
}
})

return tg.Wait()
}
18 changes: 18 additions & 0 deletions pkg/util/stack/LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
Originally taken from https://github.com/sharnoff/chord

Copyright @sharnoff

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and
associated documentation files (the “Software”), to deal in the Software without restriction,
including without limitation the rights to use, copy, modify, merge, publish, distribute,
sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial
portions of the Software.

THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT
NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES
OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
Loading

0 comments on commit 3a61010

Please sign in to comment.