Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

util: add taskgroup #922

Merged
merged 25 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ require (
github.com/stretchr/testify v1.8.1
github.com/tychoish/fun v0.8.5
github.com/vishvananda/netlink v1.1.1-0.20220125195016-0639e7e787ba
go.uber.org/multierr v1.9.0
go.uber.org/zap v1.24.0
golang.org/x/crypto v0.21.0
golang.org/x/exp v0.0.0-20230425010034-47ecfdc1ba53
golang.org/x/sync v0.1.0
golang.org/x/term v0.18.0
gopkg.in/yaml.v3 v3.0.1
k8s.io/api v0.25.16
Expand Down Expand Up @@ -187,10 +187,10 @@ require (
go.opentelemetry.io/otel/trace v0.20.0 // indirect
go.opentelemetry.io/proto/otlp v0.7.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
golang.org/x/mod v0.8.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/oauth2 v0.7.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/time v0.0.0-20220609170525-579cf78fd858 // indirect
Expand Down
46 changes: 12 additions & 34 deletions neonvm/runner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,12 @@ import (
"github.com/opencontainers/runtime-spec/specs-go"
"github.com/vishvananda/netlink"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"

"k8s.io/apimachinery/pkg/api/resource"

vmv1 "github.com/neondatabase/autoscaling/neonvm/apis/neonvm/v1"
"github.com/neondatabase/autoscaling/pkg/api"
"github.com/neondatabase/autoscaling/pkg/util/taskgroup"
)

const (
Expand Down Expand Up @@ -699,26 +699,13 @@ func run(logger *zap.Logger) error {
}
}

// Helper function to both log an error and return the result of wrapping it with the message.
//
// This is used below because (*errgroup.Group).Wait() returns at most 1 error, which means
// imprecise usage can hide errors. So, we log the errors before they would be returned, to make
// sure they're always visible.
logWrap := func(msg string, err error) error {
logger.Error(msg, zap.Error(err))
return fmt.Errorf("%s: %w", msg, err)
}

eg := &errgroup.Group{}
eg.Go(func() error {
if err := runInitScript(logger, vmSpec.InitScript); err != nil {
return logWrap("failed to run init script", err)
}
return nil
tg := taskgroup.NewGroup(logger)
tg.Go("init-script", func(logger *zap.Logger) error {
return runInitScript(logger, vmSpec.InitScript)
})

eg.Go(func() error {
if err := createISO9660runtime(
tg.Go("iso9660-runtime", func(logger *zap.Logger) error {
return createISO9660runtime(
runtimeDiskPath,
vmSpec.Guest.Command,
vmSpec.Guest.Args,
Expand All @@ -728,31 +715,22 @@ func run(logger *zap.Logger) error {
enableSSH,
swapInfo,
shmSize,
); err != nil {
return logWrap("failed to create iso9660 disk", err)
}
return nil
)
})

eg.Go(func() error {
tg.Go("rootDisk", func(logger *zap.Logger) error {
// resize rootDisk image of size specified and new size more than current
if err := resizeRootDisk(logger, vmSpec); err != nil {
return logWrap("failed to resize rootDisk", err)
}
return nil
return resizeRootDisk(logger, vmSpec)
})
var qemuCmd []string

eg.Go(func() error {
tg.Go("qemu-cmd", func(logger *zap.Logger) error {
var err error
qemuCmd, err = buildQEMUCmd(cfg, logger, vmSpec, &vmStatus, cpus, memory, enableSSH, swapInfo)
if err != nil {
return logWrap("failed to build QEMU command", err)
}
return nil
return err
})

if err := eg.Wait(); err != nil {
if err := tg.Wait(); err != nil {
return err
}

Expand Down
46 changes: 23 additions & 23 deletions pkg/agent/billing/billing.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,19 +86,24 @@ type vmMetricsSeconds struct {
activeTime time.Duration
}

func StartBillingMetricsCollector(
type MetricsCollector struct {
conf *Config
clients []clientInfo
}

func NewMetricsCollector(
ctx context.Context,
parentLogger *zap.Logger,
conf *Config,
store VMStoreForNode,
metrics PromMetrics,
) error {
) (*MetricsCollector, error) {
logger := parentLogger.Named("billing")

var clients []clientInfo
mc := &MetricsCollector{
conf: conf,
clients: make([]clientInfo, 0),
}

if c := conf.Clients.HTTP; c != nil {
clients = append(clients, clientInfo{
mc.clients = append(mc.clients, clientInfo{
client: billing.NewHTTPClient(c.URL, http.DefaultClient),
name: "http",
config: c.BaseClientConfig,
Expand All @@ -107,35 +112,31 @@ func StartBillingMetricsCollector(
if c := conf.Clients.S3; c != nil {
client, err := billing.NewS3Client(ctx, c.S3ClientConfig)
if err != nil {
return fmt.Errorf("failed to create S3 client: %w", err)
return nil, fmt.Errorf("failed to create S3 client: %w", err)
}
logger.Info("Created S3 client", client.LogFields())
clients = append(clients, clientInfo{
mc.clients = append(mc.clients, clientInfo{
client: client,
name: "s3",
config: c.BaseClientConfig,
})
}

// TODO: catch panics here, bubble those into a clean-ish shutdown.
go runBillingMetricsCollector(ctx, logger, conf, store, metrics, clients)
return nil
return mc, nil
}

func runBillingMetricsCollector(
func (mc *MetricsCollector) Run(
ctx context.Context,
logger *zap.Logger,
conf *Config,
store VMStoreForNode,
metrics PromMetrics,
clients []clientInfo,
) {
) error {

collectTicker := time.NewTicker(time.Second * time.Duration(conf.CollectEverySeconds))
collectTicker := time.NewTicker(time.Second * time.Duration(mc.conf.CollectEverySeconds))
defer collectTicker.Stop()
// Offset by half a second, so it's a bit more deterministic.
time.Sleep(500 * time.Millisecond)
accumulateTicker := time.NewTicker(time.Second * time.Duration(conf.AccumulateEverySeconds))
accumulateTicker := time.NewTicker(time.Second * time.Duration(mc.conf.AccumulateEverySeconds))
defer accumulateTicker.Stop()

state := metricsState{
Expand All @@ -147,7 +148,7 @@ func runBillingMetricsCollector(

var queueWriters []eventQueuePusher[*billing.IncrementalEvent]

for _, c := range clients {
for _, c := range mc.clients {
qw, queueReader := newEventQueue[*billing.IncrementalEvent](metrics.queueSizeCurrent.WithLabelValues(c.name))
queueWriters = append(queueWriters, qw)

Expand All @@ -174,15 +175,14 @@ func runBillingMetricsCollector(
case <-collectTicker.C:
logger.Info("Collecting billing state")
if store.Stopped() && ctx.Err() == nil {
err := errors.New("VM store stopped but background context is still live")
logger.Panic("Validation check failed", zap.Error(err))
return errors.New("VM store stopped but background context is still live")
Omrigan marked this conversation as resolved.
Show resolved Hide resolved
}
state.collect(logger, store, metrics)
case <-accumulateTicker.C:
logger.Info("Creating billing batch")
state.drainEnqueue(logger, conf, billing.GetHostname(), queueWriters)
state.drainEnqueue(logger, mc.conf, billing.GetHostname(), queueWriters)
case <-ctx.Done():
return
return nil
}
}
}
Expand Down
43 changes: 26 additions & 17 deletions pkg/agent/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/neondatabase/autoscaling/pkg/agent/billing"
"github.com/neondatabase/autoscaling/pkg/agent/schedwatch"
"github.com/neondatabase/autoscaling/pkg/util"
"github.com/neondatabase/autoscaling/pkg/util/taskgroup"
"github.com/neondatabase/autoscaling/pkg/util/watch"
)

Expand Down Expand Up @@ -59,11 +60,6 @@ func (r MainRunner) Run(logger *zap.Logger, ctx context.Context) error {
metrics := billing.NewPromMetrics()
metrics.MustRegister(globalPromReg)

err = billing.StartBillingMetricsCollector(ctx, logger, &r.Config.Billing, storeForNode, metrics)
if err != nil {
return fmt.Errorf("error starting billing metrics collector: %w", err)
}

promLogger := logger.Named("prometheus")
if err := util.StartPrometheusMetricsServer(ctx, promLogger.Named("global"), 9100, globalPromReg); err != nil {
return fmt.Errorf("Error starting prometheus metrics server: %w", err)
Expand All @@ -79,18 +75,31 @@ func (r MainRunner) Run(logger *zap.Logger, ctx context.Context) error {
}
}

logger.Info("Entering main loop")
for {
event, err := vmEventQueue.Wait(ctx)
if err != nil {
if ctx.Err() != nil {
// treat context canceled as a "normal" exit (because it is)
return nil
}
mc, err := billing.NewMetricsCollector(ctx, logger, &r.Config.Billing)
if err != nil {
return fmt.Errorf("error creating billing metrics collector: %w", err)
}

logger.Error("vmEventQueue returned error", zap.Error(err))
return err
tg := taskgroup.NewGroup(logger, taskgroup.WithParentContext(ctx))
tg.Go("collect-metrics", func(logger *zap.Logger) error {
return mc.Run(tg.Ctx(), logger, storeForNode, metrics)
})
Omrigan marked this conversation as resolved.
Show resolved Hide resolved
tg.Go("main-loop", func(logger *zap.Logger) error {
logger.Info("Entering main loop")
for {
event, err := vmEventQueue.Wait(ctx)
if err != nil {
if ctx.Err() != nil {
// treat context canceled as a "normal" exit (because it is)
return nil
}

logger.Error("vmEventQueue returned error", zap.Error(err))
return err
}
globalState.handleEvent(tg.Ctx(), logger, event)
}
globalState.handleEvent(ctx, logger, event)
}
})

return tg.Wait()
}
18 changes: 18 additions & 0 deletions pkg/util/stack/LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
Originally taken from https://github.com/sharnoff/chord

Copyright @sharnoff

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and
associated documentation files (the “Software”), to deal in the Software without restriction,
including without limitation the rights to use, copy, modify, merge, publish, distribute,
sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial
portions of the Software.

THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT
NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES
OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
Loading
Loading