Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
118912: roachprod: add Grow command r=DarrylWong,renatolabs a=herkolategan

Previously, after creating a managed (MIG) cluster in GCE it was not possible to add nodes. This change introduces a new `Grow` method on the gcloud Provider, and as part of the Provider interface.

New nodes are distributed among the zones, the groups are currently in, to balance out the nodes.

Epic: CRDB-33832
See: cockroachdb#117802

Release Note: None

119572: restore: add prrof label/trace span per restore span r=dt a=dt

Release note: none.
Epic: none.

119577: storage: fix TestMVCCHistories r=RaduBerinde a=jbowens

In cockroachdb#119219, I accidentally changed the capitalization of "readonly" when parsing MVCC history tests. Revert it to "readonly," and lowercase the incoming string to be accepting of capaitalization mistakes within the test files themselves.

Fix cockroachdb#119568.

Epic: none
Release note: none

Co-authored-by: Herko Lategan <[email protected]>
Co-authored-by: David Taylor <[email protected]>
Co-authored-by: Jackson Owens <[email protected]>
  • Loading branch information
4 people committed Feb 23, 2024
4 parents 2414447 + b19f3a9 + f7ded79 + 9c3dd3f commit 93db971
Show file tree
Hide file tree
Showing 15 changed files with 263 additions and 6 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ go_library(
"//pkg/util/log/logutil",
"//pkg/util/metric",
"//pkg/util/mon",
"//pkg/util/pprofutil",
"//pkg/util/protoutil",
"//pkg/util/quotapool",
"//pkg/util/randutil",
Expand Down
15 changes: 14 additions & 1 deletion pkg/ccl/backupccl/restore_data_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/pprofutil"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
gogotypes "github.com/gogo/protobuf/types"
Expand Down Expand Up @@ -374,6 +376,7 @@ func (rd *restoreDataProcessor) openSSTs(
readAsOfIter := storage.NewReadAsOfIterator(iter, rd.spec.RestoreTime)

cleanup := func() {
log.VInfof(ctx, 1, "finished with and closing %d files in span [%s-%s)", len(entry.Files), entry.Span.Key, entry.Span.EndKey)
readAsOfIter.Close()
rd.qp.Release(iterAllocs...)

Expand All @@ -395,7 +398,7 @@ func (rd *restoreDataProcessor) openSSTs(
return mSST, nil
}

log.VEventf(ctx, 1 /* level */, "ingesting span [%s-%s)", entry.Span.Key, entry.Span.EndKey)
log.VEventf(ctx, 1, "ingesting %d files in span [%s-%s)", len(entry.Files), entry.Span.Key, entry.Span.EndKey)

storeFiles := make([]storageccl.StoreFile, 0, len(entry.Files))
iterAllocs := make([]*quotapool.IntAlloc, 0, len(entry.Files))
Expand Down Expand Up @@ -486,6 +489,10 @@ func (rd *restoreDataProcessor) runRestoreWorkers(
ctx context.Context, entries chan execinfrapb.RestoreSpanEntry,
) error {
return ctxgroup.GroupWorkers(ctx, rd.numWorkers, func(ctx context.Context, worker int) error {
ctx = logtags.AddTag(ctx, "restore-worker", worker)
ctx, undo := pprofutil.SetProfilerLabelsFromCtxTags(ctx)
defer undo()

kr, err := MakeKeyRewriterFromRekeys(rd.FlowCtx.Codec(), rd.spec.TableRekeys, rd.spec.TenantRekeys,
false /* restoreTenantFromStream */)
if err != nil {
Expand All @@ -501,6 +508,12 @@ func (rd *restoreDataProcessor) runRestoreWorkers(
return done, nil
}

ctx := logtags.AddTag(ctx, "restore-span", entry.ProgressIdx)
ctx, undo := pprofutil.SetProfilerLabelsFromCtxTags(ctx)
defer undo()
ctx, sp := tracing.ChildSpan(ctx, "restore.processRestoreSpanEntry")
defer sp.Finish()

var res *resumeEntry
for {
sstIter, res, err = rd.openSSTs(ctx, entry, res)
Expand Down
23 changes: 23 additions & 0 deletions pkg/cmd/roachprod/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,28 @@ Local Clusters
}),
}

var growCmd = &cobra.Command{
Use: `grow <cluster> <num-nodes>`,
Short: `grow a cluster by adding nodes`,
Long: `grow a cluster by adding the specified number of nodes to it.
The cluster has to be a managed cluster (i.e., a cluster created with the
gce-managed flag). Only Google Cloud clusters currently support adding nodes.
The new nodes will use the instance template that was used to create the cluster
originally (Nodes will be created in the same zone as the existing nodes, or if
the cluster is geographically distributed, the nodes will be fairly distributed
across the zones of the cluster).
`,
Args: cobra.ExactArgs(2),
Run: wrap(func(cmd *cobra.Command, args []string) error {
count, err := strconv.ParseInt(args[1], 10, 8)
if err != nil || count < 1 {
return errors.Wrapf(err, "invalid num-nodes argument")
}
return roachprod.Grow(context.Background(), config.Logger, args[0], int(count))
}),
}

var setupSSHCmd = &cobra.Command{
Use: "setup-ssh <cluster>",
Short: "set up ssh for a cluster",
Expand Down Expand Up @@ -1437,6 +1459,7 @@ func main() {
cobra.EnableCommandSorting = false
rootCmd.AddCommand(
createCmd,
growCmd,
resetCmd,
destroyCmd,
extendCmd,
Expand Down
1 change: 1 addition & 0 deletions pkg/roachprod/cloud/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
"//pkg/roachprod/config",
"//pkg/roachprod/logger",
"//pkg/roachprod/vm",
"//pkg/roachprod/vm/gce",
"//pkg/util/timeutil",
"@com_github_aws_aws_sdk_go_v2_config//:config",
"@com_github_aws_aws_sdk_go_v2_service_ec2//:ec2",
Expand Down
22 changes: 22 additions & 0 deletions pkg/roachprod/cloud/cluster_cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachprod/config"
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
"github.com/cockroachdb/cockroach/pkg/roachprod/vm"
"github.com/cockroachdb/cockroach/pkg/roachprod/vm/gce"
"github.com/cockroachdb/errors"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -279,6 +280,27 @@ func CreateCluster(
})
}

// GrowCluster adds new nodes to an existing cluster.
func GrowCluster(l *logger.Logger, c *Cluster, NumNodes int) error {
names := make([]string, 0, NumNodes)
offset := len(c.VMs) + 1
for i := offset; i < offset+NumNodes; i++ {
vmName := vm.Name(c.Name, i)
names = append(names, vmName)
}

providers := c.Clouds()
if len(providers) != 1 && providers[0] != gce.ProviderName {
return errors.Errorf("cluster %s is not on gce, growing a cluster is currently only supported on %s",
c.Name, gce.ProviderName)
}

// Only GCE supports expanding a cluster.
return vm.ForProvider(gce.ProviderName, func(p vm.Provider) error {
return p.Grow(l, c.VMs, c.Name, names)
})
}

// DestroyCluster TODO(peter): document
func DestroyCluster(l *logger.Logger, c *Cluster) error {
// DNS entries are destroyed first to ensure that the GC job will not try
Expand Down
21 changes: 21 additions & 0 deletions pkg/roachprod/roachprod.go
Original file line number Diff line number Diff line change
Expand Up @@ -1504,6 +1504,27 @@ func Create(
return SetupSSH(ctx, l, clusterName)
}

func Grow(ctx context.Context, l *logger.Logger, clusterName string, numNodes int) error {
if numNodes <= 0 || numNodes >= 1000 {
// Upper limit is just for safety.
return fmt.Errorf("number of nodes must be in [1..999]")
}

if err := LoadClusters(); err != nil {
return err
}
c, err := newCluster(l, clusterName)
if err != nil {
return err
}

err = cloud.GrowCluster(l, &c.Cluster, numNodes)
if err != nil {
return err
}
return SetupSSH(ctx, l, clusterName)
}

// GC garbage-collects expired clusters, unused SSH key pairs in AWS, and unused
// DNS records.
func GC(l *logger.Logger, dryrun bool) error {
Expand Down
4 changes: 4 additions & 0 deletions pkg/roachprod/vm/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,10 @@ func (p *Provider) Create(
return p.waitForIPs(l, names, regions, providerOpts)
}

func (p *Provider) Grow(*logger.Logger, vm.List, string, []string) error {
panic("unimplemented")
}

// waitForIPs waits until AWS reports both internal and external IP addresses
// for all newly created VMs. If we did not wait for these IPs then attempts to
// list the new VMs after the creation might find VMs without an external IP.
Expand Down
4 changes: 4 additions & 0 deletions pkg/roachprod/vm/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ func (p *Provider) AttachVolume(*logger.Logger, vm.Volume, *vm.VM) (string, erro
panic("unimplemented")
}

func (p *Provider) Grow(*logger.Logger, vm.List, string, []string) error {
panic("unimplemented")
}

// New constructs a new Provider instance.
func New() *Provider {
p := &Provider{}
Expand Down
5 changes: 5 additions & 0 deletions pkg/roachprod/vm/flagstub/flagstub.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ func (p *provider) Create(
return errors.Newf("%s", p.unimplemented)
}

// Grow implements vm.Provider and returns Unimplemented.
func (p *provider) Grow(l *logger.Logger, vms vm.List, clusterName string, names []string) error {
return errors.Newf("%s", p.unimplemented)
}

// Delete implements vm.Provider and returns Unimplemented.
func (p *provider) Delete(l *logger.Logger, vms vm.List) error {
return errors.Newf("%s", p.unimplemented)
Expand Down
1 change: 1 addition & 0 deletions pkg/roachprod/vm/gce/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ go_test(
}),
deps = [
"//pkg/roachprod/vm",
"//pkg/util/randutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//assert",
Expand Down
86 changes: 83 additions & 3 deletions pkg/roachprod/vm/gce/gcloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -1447,7 +1447,86 @@ func (p *Provider) Create(
return err
}
}
return propagateDiskLabels(l, project, labels, zoneToHostNames, &opts)
return propagateDiskLabels(l, project, labels, zoneToHostNames, opts.SSDOpts.UseLocalSSD)
}

// computeGrowDistribution computes the distribution of new nodes across the
// existing instance groups. Groups must be sorted by size from smallest to
// largest before passing to this function. The distribution is computed
// naively, for simplicity, by growing the instance group with the smallest
// size, and then the next smallest, and so on.
func computeGrowDistribution(groups []jsonManagedInstanceGroup, newNodeCount int) []int {
addCount := make([]int, len(groups))
curIndex := 0
for i := 0; i < newNodeCount; i++ {
nextIndex := (curIndex + 1) % len(groups)
if groups[curIndex].Size+addCount[curIndex] >
groups[nextIndex].Size+addCount[nextIndex] {
curIndex = nextIndex
} else {
curIndex = 0
}
addCount[curIndex]++
}
return addCount
}

func (p *Provider) Grow(l *logger.Logger, vms vm.List, clusterName string, names []string) error {
project := vms[0].Project
groupName := instanceGroupName(clusterName)
groups, err := listManagedInstanceGroups(project, groupName)
if err != nil {
return err
}

newNodeCount := len(names)
sort.Slice(groups, func(i, j int) bool {
return groups[i].Size < groups[j].Size
})
addCounts := computeGrowDistribution(groups, newNodeCount)

zoneToHostNames := make(map[string][]string)
var g errgroup.Group
for idx, group := range groups {
addCount := addCounts[idx]
if addCount == 0 {
continue
}
createArgs := []string{"compute", "instance-groups", "managed", "create-instance", "--zone", group.Zone, groupName}
for i := 0; i < addCount; i++ {
name := names[0]
names = names[1:]
argsWithName := append(createArgs[:len(createArgs):len(createArgs)], []string{"--instance", name}...)
zoneToHostNames[group.Zone] = append(zoneToHostNames[group.Zone], name)
g.Go(func() error {
cmd := exec.Command("gcloud", argsWithName...)
output, err := cmd.CombinedOutput()
if err != nil {
return errors.Wrapf(err, "Command: gcloud %s\nOutput: %s", argsWithName, output)
}
return nil
})
}
}

err = g.Wait()
if err != nil {
return err
}

err = waitForGroupStability(project, groupName, maps.Keys(zoneToHostNames))
if err != nil {
return err
}

var labelsJoined string
for key, value := range vms[0].Labels {
if labelsJoined != "" {
labelsJoined += ","
}
labelsJoined += fmt.Sprintf("%s=%s", key, value)
}
return propagateDiskLabels(l, project, labelsJoined, zoneToHostNames, len(vms[0].LocalDisks) != 0)
}

// Given a machine type, return the allowed number (> 0) of local SSDs, sorted in ascending order.
Expand Down Expand Up @@ -1509,7 +1588,7 @@ func propagateDiskLabels(
project string,
labels string,
zoneToHostNames map[string][]string,
opts *vm.CreateOpts,
useLocalSSD bool,
) error {
var g errgroup.Group

Expand Down Expand Up @@ -1538,7 +1617,7 @@ func propagateDiskLabels(
return nil
})

if !opts.SSDOpts.UseLocalSSD {
if !useLocalSSD {
g.Go(func() error {
persistentDiskArgs := append([]string(nil), argsPrefix...)
persistentDiskArgs = append(persistentDiskArgs, zoneArg...)
Expand Down Expand Up @@ -1579,6 +1658,7 @@ func listInstanceTemplates(project string) ([]jsonInstanceTemplate, error) {
type jsonManagedInstanceGroup struct {
Name string `json:"name"`
Zone string `json:"zone"`
Size int `json:"size"`
}

// listManagedInstanceGroups returns a list of managed instance groups for a
Expand Down
Loading

0 comments on commit 93db971

Please sign in to comment.