From 0285423f905a5732a5b46f9ae4c9e91930a8edc8 Mon Sep 17 00:00:00 2001 From: Kevin Cao Date: Thu, 18 Apr 2024 15:35:04 -0400 Subject: [PATCH] workload: patch tpcc nondeterministic behavior with using set seed The tpcc workload would have non-deterministic behavior when using a set seed as parts of the workload would seed the random generators using unix time instead of the provided seed. Additionally, queries were time based, so the time the workload would run would affect query output. To fix this, added the option to provide a fake time that would artificially increase. Epic: none Release note: none --- pkg/workload/tpcc/BUILD.bazel | 1 + pkg/workload/tpcc/delivery.go | 30 +++++++++++++------ pkg/workload/tpcc/new_order.go | 9 +++--- pkg/workload/tpcc/order_status.go | 7 ++--- pkg/workload/tpcc/payment.go | 9 +++--- pkg/workload/tpcc/stock_level.go | 7 ++--- pkg/workload/tpcc/timeutil.go | 50 +++++++++++++++++++++++++++++++ pkg/workload/tpcc/tpcc.go | 7 ++++- pkg/workload/tpcc/worker.go | 20 ++++++++++--- 9 files changed, 108 insertions(+), 32 deletions(-) create mode 100644 pkg/workload/tpcc/timeutil.go diff --git a/pkg/workload/tpcc/BUILD.bazel b/pkg/workload/tpcc/BUILD.bazel index 5d2e28a64771..9c2348ae988a 100644 --- a/pkg/workload/tpcc/BUILD.bazel +++ b/pkg/workload/tpcc/BUILD.bazel @@ -15,6 +15,7 @@ go_library( "random.go", "result.go", "stock_level.go", + "timeutil.go", "tpcc.go", "worker.go", ], diff --git a/pkg/workload/tpcc/delivery.go b/pkg/workload/tpcc/delivery.go index 6e1725c5184f..154eaebb6db9 100644 --- a/pkg/workload/tpcc/delivery.go +++ b/pkg/workload/tpcc/delivery.go @@ -14,9 +14,9 @@ import ( "context" gosql "database/sql" "fmt" + "sort" "strings" - "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/workload" "github.com/cockroachdb/errors" "github.com/jackc/pgx/v5" @@ -78,13 +78,13 @@ func createDelivery( return del, nil } -func (del *delivery) run(ctx context.Context, wID int) (interface{}, error) { +func (del *delivery) run( + ctx context.Context, wID int, tpccTime *tpccTime, rng *rand.Rand, +) (interface{}, error) { del.config.auditor.deliveryTransactions.Add(1) - rng := rand.New(rand.NewSource(uint64(timeutil.Now().UnixNano()))) - oCarrierID := rng.Intn(10) + 1 - olDeliveryD := timeutil.Now() + olDeliveryD := tpccTime.Now() err := del.config.executeTx( ctx, del.mcp.Get(), @@ -193,16 +193,28 @@ func (del *delivery) run(ctx context.Context, wID int) (interface{}, error) { func makeInTuples(pairs map[int]int) string { tupleStrs := make([]string, 0, len(pairs)) - for k, v := range pairs { - tupleStrs = append(tupleStrs, fmt.Sprintf("(%d, %d)", k, v)) + keys := make([]int, 0, len(pairs)) + for k := range pairs { + keys = append(keys, k) + } + // Sorted to make the output deterministic. + sort.Ints(keys) + for _, k := range keys { + tupleStrs = append(tupleStrs, fmt.Sprintf("(%d, %d)", k, pairs[k])) } return strings.Join(tupleStrs, ", ") } func makeWhereCases(cases map[int]float64) string { casesStrs := make([]string, 0, len(cases)) - for k, v := range cases { - casesStrs = append(casesStrs, fmt.Sprintf("WHEN %d THEN %f", k, v)) + keys := make([]int, 0, len(cases)) + for k := range cases { + keys = append(keys, k) + } + // Sorted to make the output deterministic. + sort.Ints(keys) + for _, k := range keys { + casesStrs = append(casesStrs, fmt.Sprintf("WHEN %d THEN %f", k, cases[k])) } return strings.Join(casesStrs, " ") } diff --git a/pkg/workload/tpcc/new_order.go b/pkg/workload/tpcc/new_order.go index 158d5090afa5..2f4a0d38c083 100644 --- a/pkg/workload/tpcc/new_order.go +++ b/pkg/workload/tpcc/new_order.go @@ -17,7 +17,6 @@ import ( "strings" "time" - "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/workload" "github.com/cockroachdb/errors" "github.com/jackc/pgx/v5" @@ -128,11 +127,11 @@ func createNewOrder( return n, nil } -func (n *newOrder) run(ctx context.Context, wID int) (interface{}, error) { +func (n *newOrder) run( + ctx context.Context, wID int, tpccTime *tpccTime, rng *rand.Rand, +) (interface{}, error) { n.config.auditor.newOrderTransactions.Add(1) - rng := rand.New(rand.NewSource(uint64(timeutil.Now().UnixNano()))) - d := newOrderData{ wID: wID, dID: int(randInt(rng, 1, 10)), @@ -208,7 +207,7 @@ func (n *newOrder) run(ctx context.Context, wID int) (interface{}, error) { return d.items[i].olIID < d.items[j].olIID }) - d.oEntryD = timeutil.Now() + d.oEntryD = tpccTime.Now() err := n.config.executeTx( ctx, n.mcp.Get(), diff --git a/pkg/workload/tpcc/order_status.go b/pkg/workload/tpcc/order_status.go index a4fb8ef9f51f..50dc568b4a7d 100644 --- a/pkg/workload/tpcc/order_status.go +++ b/pkg/workload/tpcc/order_status.go @@ -15,7 +15,6 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/util/bufalloc" - "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/workload" "github.com/cockroachdb/errors" "github.com/jackc/pgtype" @@ -114,11 +113,11 @@ func createOrderStatus( return o, nil } -func (o *orderStatus) run(ctx context.Context, wID int) (interface{}, error) { +func (o *orderStatus) run( + ctx context.Context, wID int, tpccTime *tpccTime, rng *rand.Rand, +) (interface{}, error) { o.config.auditor.orderStatusTransactions.Add(1) - rng := rand.New(rand.NewSource(uint64(timeutil.Now().UnixNano()))) - d := orderStatusData{ dID: rng.Intn(10) + 1, } diff --git a/pkg/workload/tpcc/payment.go b/pkg/workload/tpcc/payment.go index e6e862bf2df7..3ac5b17bb4f2 100644 --- a/pkg/workload/tpcc/payment.go +++ b/pkg/workload/tpcc/payment.go @@ -16,7 +16,6 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/util/bufalloc" - "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/workload" "github.com/cockroachdb/errors" "github.com/jackc/pgx/v5" @@ -147,16 +146,16 @@ func createPayment(ctx context.Context, config *tpcc, mcp *workload.MultiConnPoo return p, nil } -func (p *payment) run(ctx context.Context, wID int) (interface{}, error) { +func (p *payment) run( + ctx context.Context, wID int, tpccTime *tpccTime, rng *rand.Rand, +) (interface{}, error) { p.config.auditor.paymentTransactions.Add(1) - rng := rand.New(rand.NewSource(uint64(timeutil.Now().UnixNano()))) - d := paymentData{ dID: rng.Intn(10) + 1, // hAmount is randomly selected within [1.00..5000.00] hAmount: float64(randInt(rng, 100, 500000)) / float64(100.0), - hDate: timeutil.Now(), + hDate: tpccTime.Now(), } // 2.5.1.2: 85% chance of paying through home warehouse, otherwise diff --git a/pkg/workload/tpcc/stock_level.go b/pkg/workload/tpcc/stock_level.go index 7bd11471588e..9be8ab5de50c 100644 --- a/pkg/workload/tpcc/stock_level.go +++ b/pkg/workload/tpcc/stock_level.go @@ -13,7 +13,6 @@ package tpcc import ( "context" - "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/workload" "github.com/cockroachdb/errors" "github.com/jackc/pgx/v5" @@ -86,9 +85,9 @@ func createStockLevel( return s, nil } -func (s *stockLevel) run(ctx context.Context, wID int) (interface{}, error) { - rng := rand.New(rand.NewSource(uint64(timeutil.Now().UnixNano()))) - +func (s *stockLevel) run( + ctx context.Context, wID int, tpccTime *tpccTime, rng *rand.Rand, +) (interface{}, error) { // 2.8.1.2: The threshold of minimum quantity in stock is selected at random // within [10..20]. d := stockLevelData{ diff --git a/pkg/workload/tpcc/timeutil.go b/pkg/workload/tpcc/timeutil.go new file mode 100644 index 000000000000..e835ad35f7e8 --- /dev/null +++ b/pkg/workload/tpcc/timeutil.go @@ -0,0 +1,50 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package tpcc + +import ( + "time" + + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "golang.org/x/exp/rand" +) + +type tpccTime struct { + syncutil.Mutex + fakeTime time.Time + stepMax time.Duration + rng *rand.Rand +} + +func (t *tpccTime) Now() time.Time { + if t.fakeTime.IsZero() { + return timeutil.Now() + } + + t.Lock() + defer t.Unlock() + t.fakeTime = t.fakeTime.Add( + time.Duration(t.rng.Float64()*t.stepMax.Seconds()) * time.Second, + ) + return t.fakeTime +} + +func newTpccTime(fakeStartTime time.Time, stepMax time.Duration, seed uint64) *tpccTime { + rng := rand.New(new(rand.LockedSource)) + rng.Seed(seed) + + return &tpccTime{ + fakeTime: fakeStartTime, + stepMax: stepMax, + rng: rng, + } +} diff --git a/pkg/workload/tpcc/tpcc.go b/pkg/workload/tpcc/tpcc.go index ceec7c3b1d02..2e3cac817748 100644 --- a/pkg/workload/tpcc/tpcc.go +++ b/pkg/workload/tpcc/tpcc.go @@ -95,6 +95,9 @@ type tpcc struct { queryTraceFile string + // If set, the generator will use randomly generated times instead of actual current time + fakeTime uint32 + randomCIDsCache struct { syncutil.Mutex values [][]int @@ -264,6 +267,7 @@ var tpccMeta = workload.Meta{ g.flags.BoolVar(&g.replicateStaticColumns, `replicate-static-columns`, false, "Create duplicate indexes for all static columns in district, items and warehouse tables, such that each zone or rack has them locally.") g.flags.BoolVar(&g.localWarehouses, `local-warehouses`, false, `Force transactions to use a local warehouse in all cases (in violation of the TPC-C specification)`) g.flags.StringVar(&g.queryTraceFile, `query-trace-file`, ``, `File to write the query traces to. Defaults to no output`) + g.flags.Uint32Var(&g.fakeTime, `fake-time`, 0, `If set, randomly generate times starting from this unix epoch for queries instead of using the current time`) RandomSeed.AddFlag(&g.flags) g.connFlags = workload.NewConnFlags(&g.flags) @@ -418,6 +422,7 @@ func (w *tpcc) Hooks() workload.Hooks { return errors.Wrap(err, "error creating multi-region partitioner") } } + return initializeMix(w) }, PreCreate: func(db *gosql.DB) error { @@ -586,7 +591,7 @@ func (w *tpcc) Tables() []workload.Table { New: func() interface{} { return &generateLocals{ rng: tpccRand{ - Rand: rand.New(rand.NewSource(uint64(timeutil.Now().UnixNano()))), + Rand: rand.New(rand.NewSource(seed)), // Intentionally wait until here to initialize the precomputed rands // so a caller of Tables that only wants schema doesn't compute // them. diff --git a/pkg/workload/tpcc/worker.go b/pkg/workload/tpcc/worker.go index b60d5f87ff48..330f1c9a3858 100644 --- a/pkg/workload/tpcc/worker.go +++ b/pkg/workload/tpcc/worker.go @@ -36,7 +36,7 @@ const ( // tpccTX is an interface for running a TPCC transaction. type tpccTx interface { // run executes the TPCC transaction against the given warehouse ID. - run(ctx context.Context, wID int) (interface{}, error) + run(ctx context.Context, wID int, tpccTime *tpccTime, rng *rand.Rand) (interface{}, error) } type createTxFn func(ctx context.Context, config *tpcc, mcp *workload.MultiConnPool) (tpccTx, error) @@ -200,12 +200,24 @@ func newWorker( } func (w *worker) run(ctx context.Context) error { + rng := rand.New(new(rand.LockedSource)) + rng.Seed(RandomSeed.Seed()) + // Default tpccTime uses actual time + tpccTime := &tpccTime{} + if w.config.fakeTime != 0 { + tpccTime = newTpccTime( + timeutil.Unix(int64(w.config.fakeTime), 0), + 10*time.Second, + RandomSeed.Seed(), + ) + } + // 5.2.4.2: the required mix is achieved by selecting each new transaction // uniformly at random from a deck whose content guarantees the required // transaction mix. Each pass through a deck must be made in a different // uniformly random order. if w.permIdx == len(w.deckPerm) { - rand.Shuffle(len(w.deckPerm), func(i, j int) { + rng.Shuffle(len(w.deckPerm), func(i, j int) { w.deckPerm[i], w.deckPerm[j] = w.deckPerm[j], w.deckPerm[i] }) w.permIdx = 0 @@ -228,7 +240,7 @@ func (w *worker) run(ctx context.Context) error { // cancel them when the context expires. Instead, let them finish normally // but don't account for them in the histogram. start := timeutil.Now() - if _, err := tx.run(context.Background(), warehouseID); err != nil { + if _, err := tx.run(context.Background(), warehouseID, tpccTime, rng); err != nil { w.counters[txInfo.name].error.Inc() return errors.Wrapf(err, "error in %s", txInfo.name) } @@ -245,7 +257,7 @@ func (w *worker) run(ctx context.Context) error { // distribution. Think time = -log(r) * u, where r is a uniform random number // between 0 and 1 and u is the mean think time per operation. // Each distribution is truncated at 10 times its mean value. - thinkTime := -math.Log(rand.Float64()) * txInfo.thinkTime + thinkTime := -math.Log(rng.Float64()) * txInfo.thinkTime if thinkTime > (txInfo.thinkTime * 10) { thinkTime = txInfo.thinkTime * 10 }