Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
127429: roachtest/large-schema-benchmark: add support for invoking console REST APIs r=fqazi a=fqazi

This patch adds support for invoking the console REST APIs as a part of the tpcc/large-schema-benchmark variant. To add this functionality this PR does the following

1. Extend multidbtpcc to support authenticating and invoking arbitrary end points from a file, and add logic to gather histogram information for these invocations.
2. Update the large-schema-benchmark roachtest to use the new options to invoke the REST API.


Co-authored-by: Faizan Qazi <[email protected]>
  • Loading branch information
craig[bot] and fqazi committed Jul 23, 2024
2 parents fa483c7 + b1f0a60 commit ed53a6d
Show file tree
Hide file tree
Showing 3 changed files with 259 additions and 4 deletions.
30 changes: 28 additions & 2 deletions pkg/cmd/roachtest/tests/large_schema_benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ func registerLargeSchemaBenchmark(r registry.Registry, numTables int, isMultiReg
numSchemasForDatabase = min(numSchemasForDatabase, MaxSchemasForDatabase)
databaseIdx += 1
}

// Create all the databases based on our lists of active vs inactive
// ones.
const inactiveDbListType = 1
Expand Down Expand Up @@ -164,6 +163,12 @@ func registerLargeSchemaBenchmark(r registry.Registry, numTables int, isMultiReg
// completes in a reasonable amount of time.
_, err := conn.Exec("SET CLUSTER SETTING jobs.retention_time='1h'")
require.NoError(t, err)
// Create a user that will be used for authentication for the REST
// API calls.
_, err = conn.Exec("CREATE USER roachadmin password 'roacher'")
require.NoError(t, err)
_, err = conn.Exec("GRANT ADMIN to roachadmin")
require.NoError(t, err)
}
}
err := c.PutString(ctx, strings.Join(dbList, "\n"), populateFileName, 0755, c.Node(c.Spec().NodeCount))
Expand All @@ -173,6 +178,18 @@ func registerLargeSchemaBenchmark(r registry.Registry, numTables int, isMultiReg
// Upload a file containing the ORM queries.
require.NoError(t, c.PutString(ctx, LargeSchemaOrmQueries, "ormQueries.sql", 0755, workloadNode))
mon := c.NewMonitor(ctx, c.All())
// Upload a file containing the web API calls we want to benchmark.
require.NoError(t, c.PutString(ctx,
LargeSchemaAPICalls,
"apiCalls",
0755,
workloadNode))
// Get a list of web console URLs.
webConsoleURLs, err := c.ExternalAdminUIAddr(ctx, t.L(), c.Range(1, c.Spec().NodeCount-1))
require.NoError(t, err)
for urlIdx := range webConsoleURLs {
webConsoleURLs[urlIdx] = "http://" + webConsoleURLs[urlIdx]
}
// Next startup the workload for our list of databases from earlier.
for dbListType, dbList := range [][]string{activeDBList, inactiveDBList} {
dbList := dbList
Expand Down Expand Up @@ -207,9 +224,13 @@ func registerLargeSchemaBenchmark(r registry.Registry, numTables int, isMultiReg
DisableHistogram: true, // We setup the flag above.
WorkloadInstances: wlInstance,
Duration: time.Minute * 60,
ExtraRunArgs: fmt.Sprintf("--db-list-file=%s --txn-preamble-file=%s --conns=%d --workers=%d %s %s",
ExtraRunArgs: fmt.Sprintf("--db-list-file=%s --txn-preamble-file=%s --admin-urls=%q "+
"--console-api-file=apiCalls --console-api-username=%q --console-api-password=%q --conns=%d --workers=%d %s %s",
populateFileName,
"ormQueries.sql",
strings.Join(webConsoleURLs, ","),
"roachadmin",
"roacher",
numWorkers,
numWorkers,
waitEnabled,
Expand Down Expand Up @@ -240,3 +261,8 @@ const LargeSchemaOrmQueries = `
ON sp.nspoid = typnamespace
ORDER BY sp.r, pg_type.oid DESC;
`

// LargeSchemaAPICalls are calls into the consoles cluster API.
const LargeSchemaAPICalls = `
api/v2/databases/$targetDb/tables/
`
1 change: 1 addition & 0 deletions pkg/workload/tpcc/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ go_library(
"//pkg/util/bufalloc",
"//pkg/util/ctxgroup",
"//pkg/util/log",
"//pkg/util/retry",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/uuid",
Expand Down
232 changes: 230 additions & 2 deletions pkg/workload/tpcc/tpcc_multi_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,26 @@ package tpcc

import (
"context"
"crypto/tls"
gosql "database/sql"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"os"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/workload"
"github.com/cockroachdb/cockroach/pkg/workload/histogram"
"github.com/cockroachdb/errors"
"github.com/jackc/pgx/v5"
)

Expand All @@ -33,6 +43,23 @@ type tpccMultiDB struct {
dbListFile string
dbList []*tree.ObjectNamePrefix

adminUrlStr string
adminUrls []string

consoleAPICommandFile string
consoleAPICommands []string
consoleAPITime *histogram.NamedHistogram
consoleAPIRetries *histogram.NamedHistogram
consoleAPIUsername string
consoleAPIPassword string

mu struct {
// cachedRestAPISessions sessions which are cached so that authentication
// not needed for each call.
cachedRestAPISessions []string
syncutil.Mutex
}

// nextDatabase selects the next database in a round robin manner.
nextDatabase atomic.Uint64

Expand All @@ -52,30 +79,202 @@ var tpccMultiDBMeta = workload.Meta{
g.tpcc = tpccMeta.New().(*tpcc)
g.tpcc.workloadName = "tpccmultidb"
g.flags.Meta["txn-preamble-file"] = workload.FlagMeta{RuntimeOnly: true}
g.flags.Meta["admin-urls"] = workload.FlagMeta{RuntimeOnly: true}
g.flags.Meta["console-api-file"] = workload.FlagMeta{RuntimeOnly: true}
g.flags.Meta["console-api-username"] = workload.FlagMeta{RuntimeOnly: true}
g.flags.Meta["console-api-password"] = workload.FlagMeta{RuntimeOnly: true}
// Support accessing multiple databases via the client driver.
g.flags.StringVar(&g.dbListFile, "db-list-file", "", "a file containing a list of databases.")
g.flags.StringVar(&g.adminUrlStr, "admin-urls", "", "a list of admin URLs, seperated by commas")
g.flags.StringVar(&g.consoleAPICommandFile,
"console-api-file",
"",
"a list of commands to run at the start of each txn")
g.flags.StringVar(&g.consoleAPIUsername,
"console-api-username",
"",
"username used to authenticate the console API")
g.flags.StringVar(&g.consoleAPIPassword,
"console-api-password",
"",
"password used to authenticate the console API")
// Because this workload can create a large number of objects, the import
// concurrent may need to be limited.
g.flags.Int(workload.ImportDataLoaderConcurrencyFlag, 32, workload.ImportDataLoaderConcurrencyFlagDescription)
return &g
},
}

// getRestAPISession gets a session ID for the web API, which will attempt
// to use a cached session or generate a new one if the cache is empty. A
// function is returned to allow the session to be returned back to the cache.
func (t *tpccMultiDB) getRestAPISession(
ctx context.Context, client *http.Client, adminUrl string,
) (string, func(), error) {
// Used to fetch a session ID from the cache.
getCached := func() string {
t.mu.Lock()
defer t.mu.Unlock()
if len(t.mu.cachedRestAPISessions) > 0 {
token := t.mu.cachedRestAPISessions[0]
t.mu.cachedRestAPISessions = t.mu.cachedRestAPISessions[1:]
return token
}
return ""
}

// Attempt to get a cached token first.
token := getCached()
// We did not find any cached session ID, so invoke the login
// end poin t.
if token == "" {
loginUrl := fmt.Sprintf("%s/api/v2/login/", adminUrl)
values := url.Values{
"username": {t.consoleAPIUsername},
"password": {t.consoleAPIPassword},
}
req, err := http.NewRequestWithContext(ctx, "POST", loginUrl, strings.NewReader(values.Encode()))
if err != nil {
return "", nil, err
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
loginResp, err := client.Do(req)
if err != nil {
return "", nil, err
}
defer loginResp.Body.Close()
if loginResp.StatusCode != http.StatusOK {
return "", nil, errors.AssertionFailedf("unexpected status from end point during auth %s (%d)",
loginResp.Status,
loginResp.StatusCode)
}
d := json.NewDecoder(loginResp.Body)
var sessionInfo struct {
Session string
}
err = d.Decode(&sessionInfo)
if err != nil {
return "", nil, err
}
token = sessionInfo.Session
}

return token, func() {
t.mu.Lock()
defer t.mu.Unlock()
t.mu.cachedRestAPISessions = append(t.mu.cachedRestAPISessions, token)
}, nil
}

// runWebAPICommands before txn executes any calls into the API.
func (t *tpccMultiDB) runWebAPICommands(
ctx context.Context, targetDb string, adminUrl string,
) error {
transport := &http.Transport{
TLSClientConfig: &tls.Config{
// Roachprod clusters may have invalid certificates.
InsecureSkipVerify: true,
},
}
client := http.Client{Transport: transport}
defer client.CloseIdleConnections()
const maxAPIRetries = 10
var sessionID string
var releaseFunc func()

// For resilience, tolerate any internal errors from the server.
numAttempts := int64(0)
if err := retry.WithMaxAttempts(ctx, retry.Options{}, maxAPIRetries, func() error {
numAttempts++
var err error
sessionID, releaseFunc, err = t.getRestAPISession(ctx, &client, adminUrl)
if err != nil {
return err
}
return nil
}); err != nil {
return err
}
if numAttempts > 0 {
t.consoleAPIRetries.RecordValue(numAttempts)
}
defer releaseFunc()

invokeApi := func(apiCommand string) error {
targetUrl := fmt.Sprintf("%s/%s", adminUrl, apiCommand)
getTablesRequest, err := http.NewRequestWithContext(ctx, "GET", targetUrl, nil)
if err != nil {
return err
}
getTablesRequest.Header.Add("X-Cockroach-API-Session", sessionID)
resp, err := client.Do(getTablesRequest)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return errors.AssertionFailedf("unexpected status from end point (%q) %s (%d)",
apiCommand,
resp.Status,
resp.StatusCode)
}
_, err = io.ReadAll(resp.Body)
return err
}

var totalAPITime time.Duration
for _, apiCommand := range t.consoleAPICommands {
apiCommandResolved := os.Expand(apiCommand, func(s string) string {
switch s {
case "targetDb":
return targetDb
default:
return s
}
})
// Attempt the end point multiple times in case we hit internal errors
// due to the load on the server.
if err := retry.WithMaxAttempts(ctx, retry.Options{}, maxAPIRetries, /*max attempts */
func() error {
startTime := timeutil.Now()
if err := invokeApi(apiCommandResolved); err != nil {
return err
}
// Track once the attempt has been successful.
totalAPITime += timeutil.Since(startTime)
return nil
}); err != nil {
return err
}
}
// Record the time that the invocation took.
t.consoleAPITime.Record(totalAPITime)
return nil
}

// runBeforeEachTxn is executed at the start of each transaction
// inside normal tpcc.
func (t *tpccMultiDB) runBeforeEachTxn(ctx context.Context, tx pgx.Tx) error {
// If multiple DBs are specified via list, select one
// in a roundrobin manner.
nextIdx := t.nextDatabase.Add(1)
targetDb := "tpccmultidb"
if t.dbList != nil {
databaseIdx := int(t.nextDatabase.Add(1) % uint64(len(t.dbList)))
databaseIdx := int(nextIdx % uint64(len(t.dbList)))
targetDb = t.dbList[databaseIdx].Catalog()
if _, err := tx.Exec(ctx, "USE $1", t.dbList[databaseIdx].Catalog()); err != nil {
return err
}
if _, err := tx.Exec(ctx, fmt.Sprintf("SET search_path = %s", t.dbList[databaseIdx].Schema())); err != nil {
return err
}
}

if len(t.adminUrls) > 0 {
adminUrl := t.adminUrls[int(nextIdx%uint64(len(t.adminUrls)))]
if err := t.runWebAPICommands(ctx, targetDb, adminUrl); err != nil {
return err
}
}
return nil
}

Expand All @@ -86,6 +285,15 @@ func (t *tpccMultiDB) Ops(
if err := t.runInit(); err != nil {
return workload.QueryLoad{}, err
}
// Only track console API times if we are going to benchmark endpoints.
if t.consoleAPICommandFile != "" {
if t.consoleAPITime == nil {
t.consoleAPITime = reg.GetHandle().Get("consoleAPITime")
}
if t.consoleAPIRetries == nil {
t.consoleAPIRetries = reg.GetHandle().Get("consoleAPIRetries")
}
}
return t.tpcc.Ops(ctx, urls, reg)
}

Expand Down Expand Up @@ -136,6 +344,26 @@ func (t *tpccMultiDB) runInit() error {
t.dbList = append(t.dbList, prefix)
}
}
// Validate that both options must be specified together.
if (len(t.adminUrlStr) == 0) !=
(len(t.consoleAPICommandFile) == 0) {
err = errors.Newf("console-api-file must be specified with admin-rls must be speicifed together")
return
}
if t.adminUrlStr != "" {
t.adminUrls = strings.Split(t.adminUrlStr, ",")
}
if t.consoleAPICommandFile != "" {
file, err := os.ReadFile(t.consoleAPICommandFile)
if err != nil {
return
}
strConsoleAPIList := strings.Split(string(file), "\n")
if v := len(strConsoleAPIList); v > 0 && len(strConsoleAPIList[v-1]) == 0 {
strConsoleAPIList = strConsoleAPIList[:v-1]
}
t.consoleAPICommands = strConsoleAPIList
}
// Execute extra logic at the start of each txn.
t.onTxnStartFns = append(t.onTxnStartFns, t.runBeforeEachTxn)

Expand Down

0 comments on commit ed53a6d

Please sign in to comment.