diff --git a/pkg/ccl/backupccl/BUILD.bazel b/pkg/ccl/backupccl/BUILD.bazel index 5a90a7f7455b..eca492e27baa 100644 --- a/pkg/ccl/backupccl/BUILD.bazel +++ b/pkg/ccl/backupccl/BUILD.bazel @@ -142,6 +142,7 @@ go_library( "//pkg/util/log/logutil", "//pkg/util/metric", "//pkg/util/mon", + "//pkg/util/pprofutil", "//pkg/util/protoutil", "//pkg/util/quotapool", "//pkg/util/randutil", diff --git a/pkg/ccl/backupccl/restore_data_processor.go b/pkg/ccl/backupccl/restore_data_processor.go index 393524cd5eea..7cd32e45f1cf 100644 --- a/pkg/ccl/backupccl/restore_data_processor.go +++ b/pkg/ccl/backupccl/restore_data_processor.go @@ -39,9 +39,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" + "github.com/cockroachdb/cockroach/pkg/util/pprofutil" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/quotapool" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" gogotypes "github.com/gogo/protobuf/types" @@ -374,6 +376,7 @@ func (rd *restoreDataProcessor) openSSTs( readAsOfIter := storage.NewReadAsOfIterator(iter, rd.spec.RestoreTime) cleanup := func() { + log.VInfof(ctx, 1, "finished with and closing %d files in span [%s-%s)", len(entry.Files), entry.Span.Key, entry.Span.EndKey) readAsOfIter.Close() rd.qp.Release(iterAllocs...) @@ -395,7 +398,7 @@ func (rd *restoreDataProcessor) openSSTs( return mSST, nil } - log.VEventf(ctx, 1 /* level */, "ingesting span [%s-%s)", entry.Span.Key, entry.Span.EndKey) + log.VEventf(ctx, 1, "ingesting %d files in span [%s-%s)", len(entry.Files), entry.Span.Key, entry.Span.EndKey) storeFiles := make([]storageccl.StoreFile, 0, len(entry.Files)) iterAllocs := make([]*quotapool.IntAlloc, 0, len(entry.Files)) @@ -486,6 +489,10 @@ func (rd *restoreDataProcessor) runRestoreWorkers( ctx context.Context, entries chan execinfrapb.RestoreSpanEntry, ) error { return ctxgroup.GroupWorkers(ctx, rd.numWorkers, func(ctx context.Context, worker int) error { + ctx = logtags.AddTag(ctx, "restore-worker", worker) + ctx, undo := pprofutil.SetProfilerLabelsFromCtxTags(ctx) + defer undo() + kr, err := MakeKeyRewriterFromRekeys(rd.FlowCtx.Codec(), rd.spec.TableRekeys, rd.spec.TenantRekeys, false /* restoreTenantFromStream */) if err != nil { @@ -501,6 +508,12 @@ func (rd *restoreDataProcessor) runRestoreWorkers( return done, nil } + ctx := logtags.AddTag(ctx, "restore-span", entry.ProgressIdx) + ctx, undo := pprofutil.SetProfilerLabelsFromCtxTags(ctx) + defer undo() + ctx, sp := tracing.ChildSpan(ctx, "restore.processRestoreSpanEntry") + defer sp.Finish() + var res *resumeEntry for { sstIter, res, err = rd.openSSTs(ctx, entry, res) diff --git a/pkg/cmd/roachprod/main.go b/pkg/cmd/roachprod/main.go index 312f87048af3..6375d72cd8ca 100644 --- a/pkg/cmd/roachprod/main.go +++ b/pkg/cmd/roachprod/main.go @@ -138,6 +138,28 @@ Local Clusters }), } +var growCmd = &cobra.Command{ + Use: `grow `, + Short: `grow a cluster by adding nodes`, + Long: `grow a cluster by adding the specified number of nodes to it. + +The cluster has to be a managed cluster (i.e., a cluster created with the +gce-managed flag). Only Google Cloud clusters currently support adding nodes. +The new nodes will use the instance template that was used to create the cluster +originally (Nodes will be created in the same zone as the existing nodes, or if +the cluster is geographically distributed, the nodes will be fairly distributed +across the zones of the cluster). +`, + Args: cobra.ExactArgs(2), + Run: wrap(func(cmd *cobra.Command, args []string) error { + count, err := strconv.ParseInt(args[1], 10, 8) + if err != nil || count < 1 { + return errors.Wrapf(err, "invalid num-nodes argument") + } + return roachprod.Grow(context.Background(), config.Logger, args[0], int(count)) + }), +} + var setupSSHCmd = &cobra.Command{ Use: "setup-ssh ", Short: "set up ssh for a cluster", @@ -1437,6 +1459,7 @@ func main() { cobra.EnableCommandSorting = false rootCmd.AddCommand( createCmd, + growCmd, resetCmd, destroyCmd, extendCmd, diff --git a/pkg/roachprod/cloud/BUILD.bazel b/pkg/roachprod/cloud/BUILD.bazel index eba5c64d24d2..06166bc86db9 100644 --- a/pkg/roachprod/cloud/BUILD.bazel +++ b/pkg/roachprod/cloud/BUILD.bazel @@ -13,6 +13,7 @@ go_library( "//pkg/roachprod/config", "//pkg/roachprod/logger", "//pkg/roachprod/vm", + "//pkg/roachprod/vm/gce", "//pkg/util/timeutil", "@com_github_aws_aws_sdk_go_v2_config//:config", "@com_github_aws_aws_sdk_go_v2_service_ec2//:ec2", diff --git a/pkg/roachprod/cloud/cluster_cloud.go b/pkg/roachprod/cloud/cluster_cloud.go index d826bd3a71d1..2ba5d8c743a3 100644 --- a/pkg/roachprod/cloud/cluster_cloud.go +++ b/pkg/roachprod/cloud/cluster_cloud.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachprod/config" "github.com/cockroachdb/cockroach/pkg/roachprod/logger" "github.com/cockroachdb/cockroach/pkg/roachprod/vm" + "github.com/cockroachdb/cockroach/pkg/roachprod/vm/gce" "github.com/cockroachdb/errors" "golang.org/x/sync/errgroup" ) @@ -279,6 +280,27 @@ func CreateCluster( }) } +// GrowCluster adds new nodes to an existing cluster. +func GrowCluster(l *logger.Logger, c *Cluster, NumNodes int) error { + names := make([]string, 0, NumNodes) + offset := len(c.VMs) + 1 + for i := offset; i < offset+NumNodes; i++ { + vmName := vm.Name(c.Name, i) + names = append(names, vmName) + } + + providers := c.Clouds() + if len(providers) != 1 && providers[0] != gce.ProviderName { + return errors.Errorf("cluster %s is not on gce, growing a cluster is currently only supported on %s", + c.Name, gce.ProviderName) + } + + // Only GCE supports expanding a cluster. + return vm.ForProvider(gce.ProviderName, func(p vm.Provider) error { + return p.Grow(l, c.VMs, c.Name, names) + }) +} + // DestroyCluster TODO(peter): document func DestroyCluster(l *logger.Logger, c *Cluster) error { // DNS entries are destroyed first to ensure that the GC job will not try diff --git a/pkg/roachprod/roachprod.go b/pkg/roachprod/roachprod.go index 29514bf61d3a..a8ed5fa35dcc 100644 --- a/pkg/roachprod/roachprod.go +++ b/pkg/roachprod/roachprod.go @@ -1504,6 +1504,27 @@ func Create( return SetupSSH(ctx, l, clusterName) } +func Grow(ctx context.Context, l *logger.Logger, clusterName string, numNodes int) error { + if numNodes <= 0 || numNodes >= 1000 { + // Upper limit is just for safety. + return fmt.Errorf("number of nodes must be in [1..999]") + } + + if err := LoadClusters(); err != nil { + return err + } + c, err := newCluster(l, clusterName) + if err != nil { + return err + } + + err = cloud.GrowCluster(l, &c.Cluster, numNodes) + if err != nil { + return err + } + return SetupSSH(ctx, l, clusterName) +} + // GC garbage-collects expired clusters, unused SSH key pairs in AWS, and unused // DNS records. func GC(l *logger.Logger, dryrun bool) error { diff --git a/pkg/roachprod/vm/aws/aws.go b/pkg/roachprod/vm/aws/aws.go index c679e4435c63..86d542c0837d 100644 --- a/pkg/roachprod/vm/aws/aws.go +++ b/pkg/roachprod/vm/aws/aws.go @@ -600,6 +600,10 @@ func (p *Provider) Create( return p.waitForIPs(l, names, regions, providerOpts) } +func (p *Provider) Grow(*logger.Logger, vm.List, string, []string) error { + panic("unimplemented") +} + // waitForIPs waits until AWS reports both internal and external IP addresses // for all newly created VMs. If we did not wait for these IPs then attempts to // list the new VMs after the creation might find VMs without an external IP. diff --git a/pkg/roachprod/vm/azure/azure.go b/pkg/roachprod/vm/azure/azure.go index 14be9aab328c..0172e8ae0f1b 100644 --- a/pkg/roachprod/vm/azure/azure.go +++ b/pkg/roachprod/vm/azure/azure.go @@ -138,6 +138,10 @@ func (p *Provider) AttachVolume(*logger.Logger, vm.Volume, *vm.VM) (string, erro panic("unimplemented") } +func (p *Provider) Grow(*logger.Logger, vm.List, string, []string) error { + panic("unimplemented") +} + // New constructs a new Provider instance. func New() *Provider { p := &Provider{} diff --git a/pkg/roachprod/vm/flagstub/flagstub.go b/pkg/roachprod/vm/flagstub/flagstub.go index 5b5df7380501..23883408d4ee 100644 --- a/pkg/roachprod/vm/flagstub/flagstub.go +++ b/pkg/roachprod/vm/flagstub/flagstub.go @@ -99,6 +99,11 @@ func (p *provider) Create( return errors.Newf("%s", p.unimplemented) } +// Grow implements vm.Provider and returns Unimplemented. +func (p *provider) Grow(l *logger.Logger, vms vm.List, clusterName string, names []string) error { + return errors.Newf("%s", p.unimplemented) +} + // Delete implements vm.Provider and returns Unimplemented. func (p *provider) Delete(l *logger.Logger, vms vm.List) error { return errors.Newf("%s", p.unimplemented) diff --git a/pkg/roachprod/vm/gce/BUILD.bazel b/pkg/roachprod/vm/gce/BUILD.bazel index 39566064b272..1659d981566d 100644 --- a/pkg/roachprod/vm/gce/BUILD.bazel +++ b/pkg/roachprod/vm/gce/BUILD.bazel @@ -36,6 +36,7 @@ go_test( }), deps = [ "//pkg/roachprod/vm", + "//pkg/util/randutil", "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", "@com_github_stretchr_testify//assert", diff --git a/pkg/roachprod/vm/gce/gcloud.go b/pkg/roachprod/vm/gce/gcloud.go index 28db81a9cb2f..193e8eb38863 100644 --- a/pkg/roachprod/vm/gce/gcloud.go +++ b/pkg/roachprod/vm/gce/gcloud.go @@ -1447,7 +1447,86 @@ func (p *Provider) Create( return err } } - return propagateDiskLabels(l, project, labels, zoneToHostNames, &opts) + return propagateDiskLabels(l, project, labels, zoneToHostNames, opts.SSDOpts.UseLocalSSD) +} + +// computeGrowDistribution computes the distribution of new nodes across the +// existing instance groups. Groups must be sorted by size from smallest to +// largest before passing to this function. The distribution is computed +// naively, for simplicity, by growing the instance group with the smallest +// size, and then the next smallest, and so on. +func computeGrowDistribution(groups []jsonManagedInstanceGroup, newNodeCount int) []int { + addCount := make([]int, len(groups)) + curIndex := 0 + for i := 0; i < newNodeCount; i++ { + nextIndex := (curIndex + 1) % len(groups) + if groups[curIndex].Size+addCount[curIndex] > + groups[nextIndex].Size+addCount[nextIndex] { + curIndex = nextIndex + } else { + curIndex = 0 + } + addCount[curIndex]++ + } + return addCount +} + +func (p *Provider) Grow(l *logger.Logger, vms vm.List, clusterName string, names []string) error { + project := vms[0].Project + groupName := instanceGroupName(clusterName) + groups, err := listManagedInstanceGroups(project, groupName) + if err != nil { + return err + } + + newNodeCount := len(names) + sort.Slice(groups, func(i, j int) bool { + return groups[i].Size < groups[j].Size + }) + addCounts := computeGrowDistribution(groups, newNodeCount) + + zoneToHostNames := make(map[string][]string) + var g errgroup.Group + for idx, group := range groups { + addCount := addCounts[idx] + if addCount == 0 { + continue + } + createArgs := []string{"compute", "instance-groups", "managed", "create-instance", "--zone", group.Zone, groupName} + for i := 0; i < addCount; i++ { + name := names[0] + names = names[1:] + argsWithName := append(createArgs[:len(createArgs):len(createArgs)], []string{"--instance", name}...) + zoneToHostNames[group.Zone] = append(zoneToHostNames[group.Zone], name) + g.Go(func() error { + cmd := exec.Command("gcloud", argsWithName...) + output, err := cmd.CombinedOutput() + if err != nil { + return errors.Wrapf(err, "Command: gcloud %s\nOutput: %s", argsWithName, output) + } + return nil + }) + } + } + + err = g.Wait() + if err != nil { + return err + } + + err = waitForGroupStability(project, groupName, maps.Keys(zoneToHostNames)) + if err != nil { + return err + } + + var labelsJoined string + for key, value := range vms[0].Labels { + if labelsJoined != "" { + labelsJoined += "," + } + labelsJoined += fmt.Sprintf("%s=%s", key, value) + } + return propagateDiskLabels(l, project, labelsJoined, zoneToHostNames, len(vms[0].LocalDisks) != 0) } // Given a machine type, return the allowed number (> 0) of local SSDs, sorted in ascending order. @@ -1509,7 +1588,7 @@ func propagateDiskLabels( project string, labels string, zoneToHostNames map[string][]string, - opts *vm.CreateOpts, + useLocalSSD bool, ) error { var g errgroup.Group @@ -1538,7 +1617,7 @@ func propagateDiskLabels( return nil }) - if !opts.SSDOpts.UseLocalSSD { + if !useLocalSSD { g.Go(func() error { persistentDiskArgs := append([]string(nil), argsPrefix...) persistentDiskArgs = append(persistentDiskArgs, zoneArg...) @@ -1579,6 +1658,7 @@ func listInstanceTemplates(project string) ([]jsonInstanceTemplate, error) { type jsonManagedInstanceGroup struct { Name string `json:"name"` Zone string `json:"zone"` + Size int `json:"size"` } // listManagedInstanceGroups returns a list of managed instance groups for a diff --git a/pkg/roachprod/vm/gce/gcloud_test.go b/pkg/roachprod/vm/gce/gcloud_test.go index 56a4b9dc38ca..f09b4917508c 100644 --- a/pkg/roachprod/vm/gce/gcloud_test.go +++ b/pkg/roachprod/vm/gce/gcloud_test.go @@ -11,12 +11,18 @@ package gce import ( + "math" + "math/rand" + "reflect" + "sort" "strconv" "strings" "testing" + "testing/quick" "time" "github.com/cockroachdb/cockroach/pkg/roachprod/vm" + "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "github.com/stretchr/testify/assert" @@ -188,3 +194,74 @@ func Test_buildFilterPreemptionCliArgs(t *testing.T) { }) } } + +func randInstanceGroupSizes(r *rand.Rand) []jsonManagedInstanceGroup { + // We do not test empty sets, hence the +1. + count := r.Intn(10) + 1 + groups := make([]jsonManagedInstanceGroup, count) + for i := 0; i < count; i++ { + groups[i].Size = r.Intn(32) + } + return groups +} + +func TestComputeGrowDistribution(t *testing.T) { + rng, _ := randutil.NewTestRand() + c := quick.Config{MaxCount: 128, + Rand: rng, + Values: func(values []reflect.Value, r *rand.Rand) { + values[0] = reflect.ValueOf(randInstanceGroupSizes(r)) + }} + + testDistribution := func(groups []jsonManagedInstanceGroup) bool { + // Generate a random number of new nodes to add to the groups. + newNodeCount := rng.Intn(24) + 1 + + // Compute the total number of nodes before the distribution and + // the maximum distance between the number of nodes in the groups. + totalNodesBefore := 0 + curMax, curMin := 0.0, math.MaxFloat64 + for _, g := range groups { + totalNodesBefore += g.Size + curMax = math.Max(curMax, float64(g.Size)) + curMin = math.Min(curMin, float64(g.Size)) + } + maxDistanceBefore := curMax - curMin + + // Sort the groups, compute the new distribution and apply it to the + // group sizes. + sort.Slice(groups, func(i, j int) bool { + return groups[i].Size < groups[j].Size + }) + newTargetSize := computeGrowDistribution(groups, newNodeCount) + for idx := range newTargetSize { + groups[idx].Size += newTargetSize[idx] + } + + // Compute the total number of nodes after the distribution and the maximum + // distance between the number of nodes in the groups. + totalNodesAfter := 0 + curMax, curMin = 0.0, math.MaxFloat64 + for _, g := range groups { + totalNodesAfter += g.Size + curMax = math.Max(curMax, float64(g.Size)) + curMin = math.Min(curMin, float64(g.Size)) + } + maxDistanceAfter := curMax - curMin + + // The total number of nodes should be the sum of the new node count and the + // total number of nodes before the distribution. + if totalNodesAfter != totalNodesBefore+newNodeCount { + return false + } + // The maximum distance between the number of nodes in the groups should not + // increase by more than 1, otherwise the new distribution was not fair. + if maxDistanceAfter > maxDistanceBefore+1.0 { + return false + } + return true + } + if err := quick.Check(testDistribution, &c); err != nil { + t.Error(err) + } +} diff --git a/pkg/roachprod/vm/local/local.go b/pkg/roachprod/vm/local/local.go index 414ecda000f6..8f8662306428 100644 --- a/pkg/roachprod/vm/local/local.go +++ b/pkg/roachprod/vm/local/local.go @@ -242,6 +242,10 @@ func (p *Provider) Create( return nil } +func (p *Provider) Grow(l *logger.Logger, vms vm.List, clusterName string, names []string) error { + return errors.New("unimplemented") +} + // Delete is part of the vm.Provider interface. func (p *Provider) Delete(l *logger.Logger, vms vm.List) error { panic("DeleteCluster should be used") diff --git a/pkg/roachprod/vm/vm.go b/pkg/roachprod/vm/vm.go index 14a295e33f4f..9720497ef62f 100644 --- a/pkg/roachprod/vm/vm.go +++ b/pkg/roachprod/vm/vm.go @@ -433,6 +433,7 @@ type Provider interface { // zones for the given provider. ConfigSSH(l *logger.Logger, zones []string) error Create(l *logger.Logger, names []string, opts CreateOpts, providerOpts ProviderOpts) error + Grow(l *logger.Logger, vms List, clusterName string, names []string) error Reset(l *logger.Logger, vms List) error Delete(l *logger.Logger, vms List) error Extend(l *logger.Logger, vms List, lifetime time.Duration) error diff --git a/pkg/storage/mvcc_history_test.go b/pkg/storage/mvcc_history_test.go index 52f4c9d64069..9ec5fe97adb7 100644 --- a/pkg/storage/mvcc_history_test.go +++ b/pkg/storage/mvcc_history_test.go @@ -2579,10 +2579,10 @@ func (e *evalCtx) getTxn(opt optArg) *roachpb.Transaction { // newReader returns a new (metamorphic) reader for use by a single command. The // caller must call Close on the reader when done. func (e *evalCtx) newReader() storage.Reader { - switch mvccHistoriesReader { + switch strings.ToLower(mvccHistoriesReader) { case "engine": return noopCloseReader{e.engine} - case "reader", "readOnly": + case "reader", "readonly": return e.engine.NewReader(storage.StandardDurability) case "batch": return e.engine.NewBatch()