Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
122648: workload: patch tpcc nondeterministic behavior with using set seed r=msbutler a=kev-cao

The tpcc workload would have non-deterministic behavior when using a set
seed as parts of the workload would seed the random generators using
unix time instead of the provided seed. Additionally, queries were time
based, so the time the workload would run would affect query output. To
fix this, added the option to provide a fake time that would
artificially increase.

Epic: none

Release note: none

Co-authored-by: Kevin Cao <[email protected]>
  • Loading branch information
craig[bot] and kev-cao committed Apr 22, 2024
2 parents 5eb8b8d + 0285423 commit 0885d0e
Show file tree
Hide file tree
Showing 9 changed files with 108 additions and 32 deletions.
1 change: 1 addition & 0 deletions pkg/workload/tpcc/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ go_library(
"random.go",
"result.go",
"stock_level.go",
"timeutil.go",
"tpcc.go",
"worker.go",
],
Expand Down
30 changes: 21 additions & 9 deletions pkg/workload/tpcc/delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import (
"context"
gosql "database/sql"
"fmt"
"sort"
"strings"

"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/workload"
"github.com/cockroachdb/errors"
"github.com/jackc/pgx/v5"
Expand Down Expand Up @@ -78,13 +78,13 @@ func createDelivery(
return del, nil
}

func (del *delivery) run(ctx context.Context, wID int) (interface{}, error) {
func (del *delivery) run(
ctx context.Context, wID int, tpccTime *tpccTime, rng *rand.Rand,
) (interface{}, error) {
del.config.auditor.deliveryTransactions.Add(1)

rng := rand.New(rand.NewSource(uint64(timeutil.Now().UnixNano())))

oCarrierID := rng.Intn(10) + 1
olDeliveryD := timeutil.Now()
olDeliveryD := tpccTime.Now()

err := del.config.executeTx(
ctx, del.mcp.Get(),
Expand Down Expand Up @@ -193,16 +193,28 @@ func (del *delivery) run(ctx context.Context, wID int) (interface{}, error) {

func makeInTuples(pairs map[int]int) string {
tupleStrs := make([]string, 0, len(pairs))
for k, v := range pairs {
tupleStrs = append(tupleStrs, fmt.Sprintf("(%d, %d)", k, v))
keys := make([]int, 0, len(pairs))
for k := range pairs {
keys = append(keys, k)
}
// Sorted to make the output deterministic.
sort.Ints(keys)
for _, k := range keys {
tupleStrs = append(tupleStrs, fmt.Sprintf("(%d, %d)", k, pairs[k]))
}
return strings.Join(tupleStrs, ", ")
}

func makeWhereCases(cases map[int]float64) string {
casesStrs := make([]string, 0, len(cases))
for k, v := range cases {
casesStrs = append(casesStrs, fmt.Sprintf("WHEN %d THEN %f", k, v))
keys := make([]int, 0, len(cases))
for k := range cases {
keys = append(keys, k)
}
// Sorted to make the output deterministic.
sort.Ints(keys)
for _, k := range keys {
casesStrs = append(casesStrs, fmt.Sprintf("WHEN %d THEN %f", k, cases[k]))
}
return strings.Join(casesStrs, " ")
}
Expand Down
9 changes: 4 additions & 5 deletions pkg/workload/tpcc/new_order.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/workload"
"github.com/cockroachdb/errors"
"github.com/jackc/pgx/v5"
Expand Down Expand Up @@ -128,11 +127,11 @@ func createNewOrder(
return n, nil
}

func (n *newOrder) run(ctx context.Context, wID int) (interface{}, error) {
func (n *newOrder) run(
ctx context.Context, wID int, tpccTime *tpccTime, rng *rand.Rand,
) (interface{}, error) {
n.config.auditor.newOrderTransactions.Add(1)

rng := rand.New(rand.NewSource(uint64(timeutil.Now().UnixNano())))

d := newOrderData{
wID: wID,
dID: int(randInt(rng, 1, 10)),
Expand Down Expand Up @@ -208,7 +207,7 @@ func (n *newOrder) run(ctx context.Context, wID int) (interface{}, error) {
return d.items[i].olIID < d.items[j].olIID
})

d.oEntryD = timeutil.Now()
d.oEntryD = tpccTime.Now()

err := n.config.executeTx(
ctx, n.mcp.Get(),
Expand Down
7 changes: 3 additions & 4 deletions pkg/workload/tpcc/order_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/util/bufalloc"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/workload"
"github.com/cockroachdb/errors"
"github.com/jackc/pgtype"
Expand Down Expand Up @@ -114,11 +113,11 @@ func createOrderStatus(
return o, nil
}

func (o *orderStatus) run(ctx context.Context, wID int) (interface{}, error) {
func (o *orderStatus) run(
ctx context.Context, wID int, tpccTime *tpccTime, rng *rand.Rand,
) (interface{}, error) {
o.config.auditor.orderStatusTransactions.Add(1)

rng := rand.New(rand.NewSource(uint64(timeutil.Now().UnixNano())))

d := orderStatusData{
dID: rng.Intn(10) + 1,
}
Expand Down
9 changes: 4 additions & 5 deletions pkg/workload/tpcc/payment.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/util/bufalloc"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/workload"
"github.com/cockroachdb/errors"
"github.com/jackc/pgx/v5"
Expand Down Expand Up @@ -147,16 +146,16 @@ func createPayment(ctx context.Context, config *tpcc, mcp *workload.MultiConnPoo
return p, nil
}

func (p *payment) run(ctx context.Context, wID int) (interface{}, error) {
func (p *payment) run(
ctx context.Context, wID int, tpccTime *tpccTime, rng *rand.Rand,
) (interface{}, error) {
p.config.auditor.paymentTransactions.Add(1)

rng := rand.New(rand.NewSource(uint64(timeutil.Now().UnixNano())))

d := paymentData{
dID: rng.Intn(10) + 1,
// hAmount is randomly selected within [1.00..5000.00]
hAmount: float64(randInt(rng, 100, 500000)) / float64(100.0),
hDate: timeutil.Now(),
hDate: tpccTime.Now(),
}

// 2.5.1.2: 85% chance of paying through home warehouse, otherwise
Expand Down
7 changes: 3 additions & 4 deletions pkg/workload/tpcc/stock_level.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package tpcc
import (
"context"

"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/workload"
"github.com/cockroachdb/errors"
"github.com/jackc/pgx/v5"
Expand Down Expand Up @@ -86,9 +85,9 @@ func createStockLevel(
return s, nil
}

func (s *stockLevel) run(ctx context.Context, wID int) (interface{}, error) {
rng := rand.New(rand.NewSource(uint64(timeutil.Now().UnixNano())))

func (s *stockLevel) run(
ctx context.Context, wID int, tpccTime *tpccTime, rng *rand.Rand,
) (interface{}, error) {
// 2.8.1.2: The threshold of minimum quantity in stock is selected at random
// within [10..20].
d := stockLevelData{
Expand Down
50 changes: 50 additions & 0 deletions pkg/workload/tpcc/timeutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package tpcc

import (
"time"

"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"golang.org/x/exp/rand"
)

type tpccTime struct {
syncutil.Mutex
fakeTime time.Time
stepMax time.Duration
rng *rand.Rand
}

func (t *tpccTime) Now() time.Time {
if t.fakeTime.IsZero() {
return timeutil.Now()
}

t.Lock()
defer t.Unlock()
t.fakeTime = t.fakeTime.Add(
time.Duration(t.rng.Float64()*t.stepMax.Seconds()) * time.Second,
)
return t.fakeTime
}

func newTpccTime(fakeStartTime time.Time, stepMax time.Duration, seed uint64) *tpccTime {
rng := rand.New(new(rand.LockedSource))
rng.Seed(seed)

return &tpccTime{
fakeTime: fakeStartTime,
stepMax: stepMax,
rng: rng,
}
}
7 changes: 6 additions & 1 deletion pkg/workload/tpcc/tpcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ type tpcc struct {

queryTraceFile string

// If set, the generator will use randomly generated times instead of actual current time
fakeTime uint32

randomCIDsCache struct {
syncutil.Mutex
values [][]int
Expand Down Expand Up @@ -264,6 +267,7 @@ var tpccMeta = workload.Meta{
g.flags.BoolVar(&g.replicateStaticColumns, `replicate-static-columns`, false, "Create duplicate indexes for all static columns in district, items and warehouse tables, such that each zone or rack has them locally.")
g.flags.BoolVar(&g.localWarehouses, `local-warehouses`, false, `Force transactions to use a local warehouse in all cases (in violation of the TPC-C specification)`)
g.flags.StringVar(&g.queryTraceFile, `query-trace-file`, ``, `File to write the query traces to. Defaults to no output`)
g.flags.Uint32Var(&g.fakeTime, `fake-time`, 0, `If set, randomly generate times starting from this unix epoch for queries instead of using the current time`)
RandomSeed.AddFlag(&g.flags)
g.connFlags = workload.NewConnFlags(&g.flags)

Expand Down Expand Up @@ -418,6 +422,7 @@ func (w *tpcc) Hooks() workload.Hooks {
return errors.Wrap(err, "error creating multi-region partitioner")
}
}

return initializeMix(w)
},
PreCreate: func(db *gosql.DB) error {
Expand Down Expand Up @@ -586,7 +591,7 @@ func (w *tpcc) Tables() []workload.Table {
New: func() interface{} {
return &generateLocals{
rng: tpccRand{
Rand: rand.New(rand.NewSource(uint64(timeutil.Now().UnixNano()))),
Rand: rand.New(rand.NewSource(seed)),
// Intentionally wait until here to initialize the precomputed rands
// so a caller of Tables that only wants schema doesn't compute
// them.
Expand Down
20 changes: 16 additions & 4 deletions pkg/workload/tpcc/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ const (
// tpccTX is an interface for running a TPCC transaction.
type tpccTx interface {
// run executes the TPCC transaction against the given warehouse ID.
run(ctx context.Context, wID int) (interface{}, error)
run(ctx context.Context, wID int, tpccTime *tpccTime, rng *rand.Rand) (interface{}, error)
}

type createTxFn func(ctx context.Context, config *tpcc, mcp *workload.MultiConnPool) (tpccTx, error)
Expand Down Expand Up @@ -200,12 +200,24 @@ func newWorker(
}

func (w *worker) run(ctx context.Context) error {
rng := rand.New(new(rand.LockedSource))
rng.Seed(RandomSeed.Seed())
// Default tpccTime uses actual time
tpccTime := &tpccTime{}
if w.config.fakeTime != 0 {
tpccTime = newTpccTime(
timeutil.Unix(int64(w.config.fakeTime), 0),
10*time.Second,
RandomSeed.Seed(),
)
}

// 5.2.4.2: the required mix is achieved by selecting each new transaction
// uniformly at random from a deck whose content guarantees the required
// transaction mix. Each pass through a deck must be made in a different
// uniformly random order.
if w.permIdx == len(w.deckPerm) {
rand.Shuffle(len(w.deckPerm), func(i, j int) {
rng.Shuffle(len(w.deckPerm), func(i, j int) {
w.deckPerm[i], w.deckPerm[j] = w.deckPerm[j], w.deckPerm[i]
})
w.permIdx = 0
Expand All @@ -228,7 +240,7 @@ func (w *worker) run(ctx context.Context) error {
// cancel them when the context expires. Instead, let them finish normally
// but don't account for them in the histogram.
start := timeutil.Now()
if _, err := tx.run(context.Background(), warehouseID); err != nil {
if _, err := tx.run(context.Background(), warehouseID, tpccTime, rng); err != nil {
w.counters[txInfo.name].error.Inc()
return errors.Wrapf(err, "error in %s", txInfo.name)
}
Expand All @@ -245,7 +257,7 @@ func (w *worker) run(ctx context.Context) error {
// distribution. Think time = -log(r) * u, where r is a uniform random number
// between 0 and 1 and u is the mean think time per operation.
// Each distribution is truncated at 10 times its mean value.
thinkTime := -math.Log(rand.Float64()) * txInfo.thinkTime
thinkTime := -math.Log(rng.Float64()) * txInfo.thinkTime
if thinkTime > (txInfo.thinkTime * 10) {
thinkTime = txInfo.thinkTime * 10
}
Expand Down

0 comments on commit 0885d0e

Please sign in to comment.