Skip to content

Commit

Permalink
Address some review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Rohit Nayak <[email protected]>
  • Loading branch information
rohit-nayak-ps committed Nov 26, 2023
1 parent 5f3aed2 commit 89ec701
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 32 deletions.
4 changes: 1 addition & 3 deletions go/test/endtoend/vreplication/fk_ext_load_generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,7 @@ func (lg *SimpleLoadGenerator) getVtgateConn(ctx context.Context) (*mysql.Conn,

func (lg *SimpleLoadGenerator) getNumRows(vtgateConn *mysql.Conn, table string) int {
t := lg.vc.t
numRows, err := getRowCount(t, vtgateConn, table)
require.NoError(t, err)
return numRows
return getRowCount(t, vtgateConn, table)
}

func (lg *SimpleLoadGenerator) WaitForAdditionalRows(count int) error {
Expand Down
33 changes: 19 additions & 14 deletions go/test/endtoend/vreplication/fk_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,25 +286,30 @@ func doReshard(t *testing.T, keyspace, workflowName, sourceShards, targetShards
rs.Complete()
}

func areRowCountsEqual(t *testing.T) bool {
vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
defer vtgateConn.Close()
parentRowCount := getRowCount(t, vtgateConn, "target2.parent")
childRowCount := getRowCount(t, vtgateConn, "target2.child")
parentCopyRowCount := getRowCount(t, vtgateConn, "target1.parent_copy")
childCopyRowCount := getRowCount(t, vtgateConn, "target1.child_copy")
log.Infof("Post-materialize row counts are parent: %d, child: %d, parent_copy: %d, child_copy: %d",
parentRowCount, childRowCount, parentCopyRowCount, childCopyRowCount)
if parentRowCount != parentCopyRowCount || childRowCount != childCopyRowCount {
return false
}
return true
}

// validateMaterializeRowCounts expects the Load generator to be stopped before calling it.
func validateMaterializeRowCounts(t *testing.T) {
if lg.State() != LoadGeneratorStateStopped {
t.Fatal("Load generator was unexpectedly still running when validateMaterializeRowCounts was called -- this will produce unreliable results.")
}
vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
defer vtgateConn.Close()
parentRowCount, err := getRowCount(t, vtgateConn, "target2.parent")
require.NoError(t, err)
childRowCount, err := getRowCount(t, vtgateConn, "target2.child")
require.NoError(t, err)
parentCopyRowCount, err := getRowCount(t, vtgateConn, "target1.parent_copy")
require.NoError(t, err)
childCopyRowCount, err := getRowCount(t, vtgateConn, "target1.child_copy")
require.NoError(t, err)
log.Infof("Post-materialize row counts are parent: %d, child: %d, parent_copy: %d, child_copy: %d",
parentRowCount, childRowCount, parentCopyRowCount, childCopyRowCount)
require.Equal(t, parentRowCount, parentCopyRowCount)
require.Equal(t, childRowCount, childCopyRowCount)
areRowCountsEqual2 := func() bool {
return areRowCountsEqual(t)
}
require.NoError(t, waitForCondition("row counts to be equal", areRowCountsEqual2, defaultTimeout))
}

const fkExtMaterializeSpec = `
Expand Down
32 changes: 17 additions & 15 deletions go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,20 +73,24 @@ func execMultipleQueries(t *testing.T, conn *mysql.Conn, database string, lines
}

func execQueryWithRetry(t *testing.T, conn *mysql.Conn, query string, timeout time.Duration) *sqltypes.Result {
timer := time.NewTimer(timeout)
defer timer.Stop()
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
ticker := time.NewTicker(defaultTick)
defer ticker.Stop()

var qr *sqltypes.Result
var err error
for {
qr, err := conn.ExecuteFetch(query, 1000, false)
qr, err = conn.ExecuteFetch(query, 1000, false)
if err == nil {
return qr
}
select {
case <-timer.C:
case <-ctx.Done():
require.FailNow(t, fmt.Sprintf("query %q did not succeed before the timeout of %s; last seen result: %v",
query, timeout, qr.Rows))
default:
case <-ticker.C:
log.Infof("query %q failed with error %v, retrying in %ds", query, err, defaultTick)
time.Sleep(defaultTick)
}
}
}
Expand Down Expand Up @@ -745,14 +749,11 @@ func isBinlogRowImageNoBlob(t *testing.T, tablet *cluster.VttabletProcess) bool
return mode == "noblob"
}

func getRowCount(t *testing.T, vtgateConn *mysql.Conn, table string) (int, error) {
func getRowCount(t *testing.T, vtgateConn *mysql.Conn, table string) int {
query := fmt.Sprintf("select count(*) from %s", table)
qr := execVtgateQuery(t, vtgateConn, "", query)
if qr == nil {
return 0, fmt.Errorf("query failed %s", query)
}
numRows, err := qr.Rows[0][0].ToInt()
return numRows, err
numRows, _ := qr.Rows[0][0].ToInt()
return numRows
}

const (
Expand Down Expand Up @@ -893,15 +894,16 @@ func waitForCondition(name string, condition func() bool, timeout time.Duration)

ticker := time.NewTicker(tickInterval)
defer ticker.Stop()
timeoutCh := time.After(timeout)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
for {
select {
case <-ticker.C:
if condition() {
return nil
}
case <-timeoutCh:
return fmt.Errorf("timed out waiting for %s", name)
case <-ctx.Done():
return fmt.Errorf("%s: waiting for %s", ctx.Err(), name)
}
}
}

0 comments on commit 89ec701

Please sign in to comment.