diff --git a/pkg/cmd/roachtest/tests/BUILD.bazel b/pkg/cmd/roachtest/tests/BUILD.bazel index 0181dc4a3c79..78ac72718d8a 100644 --- a/pkg/cmd/roachtest/tests/BUILD.bazel +++ b/pkg/cmd/roachtest/tests/BUILD.bazel @@ -272,6 +272,7 @@ go_library( "//pkg/util/timeutil", "//pkg/util/uuid", "//pkg/util/version", + "//pkg/workload", "//pkg/workload/histogram", "//pkg/workload/querybench", "//pkg/workload/tpcc", diff --git a/pkg/cmd/roachtest/tests/online_restore.go b/pkg/cmd/roachtest/tests/online_restore.go index a99d68319d39..3d073f5b8cae 100644 --- a/pkg/cmd/roachtest/tests/online_restore.go +++ b/pkg/cmd/roachtest/tests/online_restore.go @@ -14,10 +14,12 @@ import ( "context" "fmt" "math" + "strings" "time" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/clusterstats" + "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry" "github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test" "github.com/cockroachdb/cockroach/pkg/jobs" @@ -25,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachprod/prometheus" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + crdbworkload "github.com/cockroachdb/cockroach/pkg/workload" "github.com/cockroachdb/errors" "github.com/montanaflynn/stats" "github.com/stretchr/testify/require" @@ -46,7 +49,7 @@ var queriesThroughputAgg = clusterstats.AggQuery{ Tag: "Queries over Time", } -func registerOnlineRestore(r registry.Registry) { +func registerOnlineRestorePerf(r registry.Registry) { // This driver creates a variety of roachtests to benchmark online restore // performance with the prefix // restore/{online,offline}/). For each @@ -126,113 +129,25 @@ func registerOnlineRestore(r registry.Registry) { // Takes 10 minutes on OR tests for some reason. SkipPostValidations: registry.PostValidationReplicaDivergence, Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { - - testStartTime := timeutil.Now() - rd := makeRestoreDriver(t, c, sp) rd.prepareCluster(ctx) - statsCollector, err := createStatCollector(ctx, rd) - require.NoError(t, err) - - m := c.NewMonitor(ctx, sp.hardware.getCRDBNodes()) - var restoreStartTime time.Time - m.Go(func(ctx context.Context) error { - db, err := rd.c.ConnE(ctx, rd.t.L(), rd.c.Node(1)[0]) - if err != nil { - return err - } - defer db.Close() - - if useWorkarounds { - // TODO(dt): what's the right value for this? How do we tune this - // on the fly automatically during the restore instead of by-hand? - // Context: We expect many operations to take longer than usual - // when some or all of the data they touch is remote. For now this - // is being blanket set to 1h manually, and a user's run-book - // would need to do this by hand before an online restore and - // reset it manually after, but ideally the queues would be aware - // of remote-ness when they pick their own timeouts and pick - // accordingly. - if _, err := db.Exec("SET CLUSTER SETTING kv.queue.process.guaranteed_time_budget='1h'"); err != nil { - return err - } - // TODO(dt): AC appears periodically reduce the workload to 0 QPS - // during the download phase (sudden jumps from 0 to 2k qps to 0). - // Disable for now until we figure out how to smooth this out. - if _, err := db.Exec("SET CLUSTER SETTING admission.disk_bandwidth_tokens.elastic.enabled=false"); err != nil { - return err - } - if _, err := db.Exec("SET CLUSTER SETTING admission.kv.enabled=false"); err != nil { - return err - } - if _, err := db.Exec("SET CLUSTER SETTING admission.sql_kv_response.enabled=false"); err != nil { - return err - } - } - opts := "" - if runOnline { - opts = "WITH EXPERIMENTAL DEFERRED COPY" - } - restoreStartTime = timeutil.Now() - restoreCmd := rd.restoreCmd("DATABASE tpce", opts) - t.L().Printf("Running %s", restoreCmd) - if _, err = db.ExecContext(ctx, restoreCmd); err != nil { - return err - } - return nil - }) - m.Wait() - - workloadCtx, workloadCancel := context.WithCancel(ctx) - mDownload := c.NewMonitor(workloadCtx, sp.hardware.getCRDBNodes()) - - workloadStartTime := timeutil.Now() - - mDownload.Go(func(ctx context.Context) error { - if !runWorkload { - fmt.Printf("roachtest configured to skip running the foreground workload") - return nil - } - err := sp.backup.workload.run(ctx, t, c, sp.hardware) - // We expect the workload to return a context cancelled error because - // the roachtest driver cancels the monitor's context after the download job completes - if err != nil && ctx.Err() == nil { - // Implies the workload context was not cancelled and the workload cmd returned a - // different error. - return errors.Wrapf(err, `Workload context was not cancelled. Error returned by workload cmd`) - } - rd.t.L().Printf("workload successfully finished") - return nil - }) - var downloadEndTimeLowerBound time.Time - mDownload.Go(func(ctx context.Context) error { - defer workloadCancel() - if runOnline { - downloadEndTimeLowerBound, err = waitForDownloadJob(ctx, c, t.L()) - if err != nil { - return err - } - } - if runWorkload { - // Run the workload for at most 10 minutes. - testRuntime := timeutil.Since(testStartTime) - workloadDuration := sp.timeout - (testRuntime + time.Minute) - maxWorkloadDuration := time.Minute * 10 - if workloadDuration > maxWorkloadDuration { - workloadDuration = maxWorkloadDuration - } - t.L().Printf("let workload run for another %.2f minutes", workloadDuration.Minutes()) - time.Sleep(workloadDuration) - } - return nil - }) - mDownload.Wait() + restoreStats := runRestore(ctx, t, c, sp, rd, runOnline, runWorkload, useWorkarounds) if runOnline { - require.NoError(t, postRestoreValidation(ctx, c, t.L(), rd.sp.backup.workload.DatabaseName(), downloadEndTimeLowerBound)) + require.NoError(t, postRestoreValidation( + ctx, + c, + t.L(), + sp.backup.workload.DatabaseName(), + restoreStats.downloadEndTimeLowerBound, + )) } if runWorkload { - require.NoError(t, exportStats(ctx, rd, statsCollector, workloadStartTime, restoreStartTime)) + require.NoError(t, exportStats( + ctx, + rd, + restoreStats, + )) } }, }) @@ -242,6 +157,90 @@ func registerOnlineRestore(r registry.Registry) { } } +func registerOnlineRestoreCorrectness(r registry.Registry) { + sp := restoreSpecs{ + hardware: makeHardwareSpecs(hardwareSpecs{workloadNode: true}), + backup: makeRestoringBackupSpecs(backupSpecs{ + nonRevisionHistory: true, + version: fixtureFromMasterVersion, + workload: tpccRestore{opts: tpccRestoreOptions{warehouses: 10, workers: 1, waitFraction: 0, maxOps: 1000}}}), + timeout: 15 * time.Minute, + suites: registry.Suites(registry.Nightly), + restoreUptoIncremental: 1, + namePrefix: "online/correctness", + } + sp.initTestName() + r.Add( + registry.TestSpec{ + Name: sp.testName, + Owner: registry.OwnerDisasterRecovery, + Cluster: sp.hardware.makeClusterSpecs(r, sp.backup.cloud), + Timeout: sp.timeout, + CompatibleClouds: sp.backup.CompatibleClouds(), + Suites: sp.suites, + SkipPostValidations: registry.PostValidationReplicaDivergence, + Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { + defaultSeed := crdbworkload.NewUint64RandomSeed().Seed() + var defaultFakeTime uint32 = 1713818229 // Set to a fixed value for reproducibility + regRestoreSpecs, regWorkload := initCorrectnessRestoreSpecs( + t, sp, defaultSeed, defaultFakeTime, "-reg.trace", + ) + onlineRestoreSpecs, onlineWorkload := initCorrectnessRestoreSpecs( + t, sp, defaultSeed, defaultFakeTime, "-online.trace", + ) + + rd := makeRestoreDriver(t, c, sp) + rd.prepareCluster(ctx) + + runRestore( + ctx, t, c, regRestoreSpecs, rd, + false /* runOnline */, true /* runWorkload */, false, /* useWorkarounds */ + ) + details, err := c.RunWithDetails( + ctx, + t.L(), + option.WithNodes([]int{regRestoreSpecs.hardware.getWorkloadNode()}), + fmt.Sprintf("cat %s", regWorkload.opts.queryTraceFile), + ) + require.NoError(t, err, "failed to retrieve query trace for regular restore") + regQueryTrace := details[0].Stdout + + c.Wipe(ctx) + rd.prepareCluster(ctx) + + runRestore( + ctx, t, c, onlineRestoreSpecs, rd, + true /* runOnline */, true /* runWorkload */, false, /* useWorkarounds */ + ) + details, err = c.RunWithDetails( + ctx, + t.L(), + option.WithNodes([]int{onlineRestoreSpecs.hardware.getWorkloadNode()}), + fmt.Sprintf("cat %s", onlineWorkload.opts.queryTraceFile), + ) + require.NoError(t, err, "failed to retrieve query trace for online restore") + onlineQueryTrace := details[0].Stdout + + // Compare the query traces. + var longerOutput, shorterOutput string + if len(regQueryTrace) > len(onlineQueryTrace) { + longerOutput = regQueryTrace + shorterOutput = onlineQueryTrace + } else { + longerOutput = onlineQueryTrace + shorterOutput = regQueryTrace + } + + require.True( + t, + strings.HasPrefix(longerOutput, shorterOutput), + "query trace of online restore does not match regular restore", + ) + }, + }, + ) +} + func postRestoreValidation( ctx context.Context, c cluster.Cluster, @@ -306,16 +305,12 @@ func createStatCollector( // execution time and the timestamp we reach and maintain an "acceptable" p99 // latency. We define an acceptable latency as within 1.25x of the latency // observed 1 minute after the download job completed. -func exportStats( - ctx context.Context, - rd restoreDriver, - statsCollector clusterstats.StatCollector, - workloadStartTime, restoreStartTime time.Time, -) error { +func exportStats(ctx context.Context, rd restoreDriver, restoreStats restoreStats) error { endTime := timeutil.Now() latencyQueryKey := sqlServiceLatency.Query + statsCollector := restoreStats.collector exportingStats, err := statsCollector.Exporter().Export(ctx, rd.c, rd.t, true, /* dryRun */ - workloadStartTime, + restoreStats.workloadStartTime, endTime, []clusterstats.AggQuery{sqlServiceLatencyP95Agg, queriesThroughputAgg}, func(stats map[string]clusterstats.StatSummary) (string, float64) { @@ -341,8 +336,8 @@ func exportStats( if timeToHealth.IsZero() { timeToHealth = endTime } - rto := timeToHealth.Sub(restoreStartTime).Minutes() - fullRestoreTime := endTime.Sub(restoreStartTime).Minutes() + rto := timeToHealth.Sub(restoreStats.restoreStartTime).Minutes() + fullRestoreTime := endTime.Sub(restoreStats.restoreStartTime).Minutes() description := "Time to within 1.25x of regular p95 latency (mins)" rd.t.L().Printf("%s: %.2f minutes, compared to link + download phase time %.2f", description, rto, fullRestoreTime) rd.t.L().Printf("Latency at Recovery Time %.0f ms; at end of test %.0f ms", latestHealthyValue, healthyLatency) @@ -415,3 +410,153 @@ func waitForDownloadJob( } } } + +// initCorrectnessRestoreSpecs initializes the restoreSpecs for correctness testing based on the +// base restore spec by setting the workload seed, fake time, and trace file name +func initCorrectnessRestoreSpecs( + t test.Test, baseSp restoreSpecs, seed uint64, fakeTime uint32, traceSuffix string, +) (restoreSpecs, tpccRestore) { + t.Helper() + tpccWorkload, ok := baseSp.backup.workload.(tpccRestore) + if !ok { + require.Fail(t, "only tpcc workloads are supported for correctness testing") + } + baseSp.timeout = time.Duration(baseSp.timeout.Seconds()*0.45) * time.Second + tpccWorkload.opts.queryTraceFile = strings.ReplaceAll(baseSp.testName, "/", "-") + traceSuffix + if tpccWorkload.opts.seed == 0 { + tpccWorkload.opts.seed = seed + } + if tpccWorkload.opts.fakeTime == 0 { + tpccWorkload.opts.fakeTime = fakeTime + } + baseSp.backup.workload = tpccWorkload + return baseSp, tpccWorkload +} + +type restoreStats struct { + collector clusterstats.StatCollector + restoreStartTime time.Time + restoreEndTime time.Time + downloadEndTimeLowerBound time.Time + workloadStartTime time.Time + workloadEndTime time.Time +} + +func runRestore( + ctx context.Context, + t test.Test, + c cluster.Cluster, + sp restoreSpecs, + rd restoreDriver, + runOnline, runWorkload, useWorkarounds bool, +) restoreStats { + testStartTime := timeutil.Now() + + statsCollector, err := createStatCollector(ctx, rd) + require.NoError(t, err) + + m := c.NewMonitor(ctx, sp.hardware.getCRDBNodes()) + var restoreStartTime, restoreEndTime time.Time + m.Go(func(ctx context.Context) error { + db, err := rd.c.ConnE(ctx, rd.t.L(), rd.c.Node(1)[0]) + if err != nil { + return err + } + defer db.Close() + if useWorkarounds { + // TODO(dt): what's the right value for this? How do we tune this + // on the fly automatically during the restore instead of by-hand? + // Context: We expect many operations to take longer than usual + // when some or all of the data they touch is remote. For now this + // is being blanket set to 1h manually, and a user's run-book + // would need to do this by hand before an online restore and + // reset it manually after, but ideally the queues would be aware + // of remote-ness when they pick their own timeouts and pick + // accordingly. + if _, err := db.Exec("SET CLUSTER SETTING kv.queue.process.guaranteed_time_budget='1h'"); err != nil { + return err + } + // TODO(dt): AC appears periodically reduce the workload to 0 QPS + // during the download phase (sudden jumps from 0 to 2k qps to 0). + // Disable for now until we figure out how to smooth this out. + if _, err := db.Exec("SET CLUSTER SETTING admission.disk_bandwidth_tokens.elastic.enabled=false"); err != nil { + return err + } + if _, err := db.Exec("SET CLUSTER SETTING admission.kv.enabled=false"); err != nil { + return err + } + if _, err := db.Exec("SET CLUSTER SETTING admission.sql_kv_response.enabled=false"); err != nil { + return err + } + } + opts := "" + if runOnline { + opts = "WITH EXPERIMENTAL DEFERRED COPY" + } + restoreStartTime = timeutil.Now() + restoreCmd := rd.restoreCmd(fmt.Sprintf("DATABASE %s", sp.backup.workload.DatabaseName()), opts) + t.L().Printf("Running %s", restoreCmd) + if _, err = db.ExecContext(ctx, restoreCmd); err != nil { + return err + } + return nil + }) + m.Wait() + restoreEndTime = timeutil.Now() + + workloadCtx, workloadCancel := context.WithCancel(ctx) + mDownload := c.NewMonitor(workloadCtx, sp.hardware.getCRDBNodes()) + + var workloadStartTime, workloadEndTime time.Time + mDownload.Go(func(ctx context.Context) error { + if !runWorkload { + fmt.Printf("roachtest configured to skip running the foreground workload") + return nil + } + workloadStartTime = timeutil.Now() + err := sp.backup.workload.run(ctx, t, c, sp.hardware) + // We expect the workload to return a context cancelled error because + // the roachtest driver cancels the monitor's context after the download job completes + if err != nil && ctx.Err() == nil { + // Implies the workload context was not cancelled and the workload cmd returned a + // different error. + return errors.Wrapf(err, `Workload context was not cancelled. Error returned by workload cmd`) + } + rd.t.L().Printf("workload successfully finished") + return nil + }) + var downloadEndTimeLowerBound time.Time + mDownload.Go(func(ctx context.Context) error { + defer workloadCancel() + if runOnline { + downloadEndTimeLowerBound, err = waitForDownloadJob(ctx, c, t.L()) + if err != nil { + return err + } + } + if runWorkload { + // Run the workload for at most 10 minutes. + testRuntime := timeutil.Since(testStartTime) + workloadDuration := sp.timeout - (testRuntime + time.Minute) + maxWorkloadDuration := time.Minute * 10 + if workloadDuration > maxWorkloadDuration { + workloadDuration = maxWorkloadDuration + } + t.L().Printf("let workload run for another %.2f minutes", workloadDuration.Minutes()) + time.Sleep(workloadDuration) + } + return nil + }) + mDownload.Wait() + if runWorkload { + workloadEndTime = timeutil.Now() + } + return restoreStats{ + collector: statsCollector, + restoreStartTime: restoreStartTime, + restoreEndTime: restoreEndTime, + workloadStartTime: workloadStartTime, + workloadEndTime: workloadEndTime, + downloadEndTimeLowerBound: downloadEndTimeLowerBound, + } +} diff --git a/pkg/cmd/roachtest/tests/registry.go b/pkg/cmd/roachtest/tests/registry.go index f785131507b3..b6b8078f1bbd 100644 --- a/pkg/cmd/roachtest/tests/registry.go +++ b/pkg/cmd/roachtest/tests/registry.go @@ -119,7 +119,8 @@ func RegisterTests(r registry.Registry) { registerRestart(r) registerRestore(r) registerRestoreNodeShutdown(r) - registerOnlineRestore(r) + registerOnlineRestorePerf(r) + registerOnlineRestoreCorrectness(r) registerRoachmart(r) registerRoachtest(r) registerRubyPG(r) diff --git a/pkg/cmd/roachtest/tests/restore.go b/pkg/cmd/roachtest/tests/restore.go index a5ed4db9b864..5fa00434f1eb 100644 --- a/pkg/cmd/roachtest/tests/restore.go +++ b/pkg/cmd/roachtest/tests/restore.go @@ -816,8 +816,11 @@ func (tpce tpceRestore) DatabaseName() string { type tpccRestoreOptions struct { warehouses int workers int + maxOps int waitFraction float64 queryTraceFile string + seed uint64 + fakeTime uint32 } type tpccRestore struct { @@ -830,8 +833,8 @@ func (tpcc tpccRestore) init( crdbNodes := sp.getCRDBNodes() cmd := roachtestutil.NewCommand(`./cockroach workload init tpcc`). MaybeFlag(tpcc.opts.warehouses > 0, "warehouses", tpcc.opts.warehouses). - MaybeFlag(tpcc.opts.workers > 0, "workers", tpcc.opts.workers). - MaybeFlag(tpcc.opts.waitFraction != 1, "wait", tpcc.opts.waitFraction). + MaybeFlag(tpcc.opts.seed != 0, "seed", tpcc.opts.seed). + MaybeFlag(tpcc.opts.fakeTime != 0, "fake-time", tpcc.opts.fakeTime). Arg(fmt.Sprintf("{pgurl:%d-%d}", crdbNodes[0], crdbNodes[len(crdbNodes)-1])) c.Run(ctx, option.WithNodes([]int{sp.getWorkloadNode()}), cmd.String()) } @@ -841,9 +844,11 @@ func (tpcc tpccRestore) run( ) error { crdbNodes := sp.getCRDBNodes() cmd := roachtestutil.NewCommand(`./cockroach workload run tpcc`). - MaybeFlag(tpcc.opts.warehouses > 0, "warehouses", tpcc.opts.warehouses). MaybeFlag(tpcc.opts.workers > 0, "workers", tpcc.opts.workers). MaybeFlag(tpcc.opts.waitFraction != 1, "wait", tpcc.opts.waitFraction). + MaybeFlag(tpcc.opts.maxOps != 0, "max-ops", tpcc.opts.maxOps). + MaybeFlag(tpcc.opts.seed != 0, "seed", tpcc.opts.seed). + MaybeFlag(tpcc.opts.fakeTime != 0, "fake-time", tpcc.opts.fakeTime). MaybeFlag(tpcc.opts.queryTraceFile != "", "query-trace-file", tpcc.opts.queryTraceFile). Arg(fmt.Sprintf("{pgurl:%d-%d}", crdbNodes[0], crdbNodes[len(crdbNodes)-1])) return c.RunE(ctx, option.WithNodes([]int{sp.getWorkloadNode()}), cmd.String()) @@ -854,7 +859,21 @@ func (tpcc tpccRestore) fixtureDir() string { } func (tpcc tpccRestore) String() string { - return fmt.Sprintf("tpc-c/%d", tpcc.opts.warehouses) + var builder strings.Builder + builder.WriteString("tpcc/") + switch tpcc.opts.warehouses { + case 10: + builder.WriteString("150MB") + case 500: + builder.WriteString("8GB") + case 7000: + builder.WriteString("115GB") + case 25000: + builder.WriteString("400GB") + default: + panic("tpcc warehouse count not recognized") + } + return builder.String() } func (tpcc tpccRestore) DatabaseName() string {