From 89ec7010f639a23109e15c000e2d77628a34da98 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Sun, 26 Nov 2023 22:29:03 +0100 Subject: [PATCH] Address some review comments Signed-off-by: Rohit Nayak --- .../fk_ext_load_generator_test.go | 4 +-- go/test/endtoend/vreplication/fk_ext_test.go | 33 +++++++++++-------- go/test/endtoend/vreplication/helper_test.go | 32 +++++++++--------- 3 files changed, 37 insertions(+), 32 deletions(-) diff --git a/go/test/endtoend/vreplication/fk_ext_load_generator_test.go b/go/test/endtoend/vreplication/fk_ext_load_generator_test.go index c875392f7af..12b5871781f 100644 --- a/go/test/endtoend/vreplication/fk_ext_load_generator_test.go +++ b/go/test/endtoend/vreplication/fk_ext_load_generator_test.go @@ -132,9 +132,7 @@ func (lg *SimpleLoadGenerator) getVtgateConn(ctx context.Context) (*mysql.Conn, func (lg *SimpleLoadGenerator) getNumRows(vtgateConn *mysql.Conn, table string) int { t := lg.vc.t - numRows, err := getRowCount(t, vtgateConn, table) - require.NoError(t, err) - return numRows + return getRowCount(t, vtgateConn, table) } func (lg *SimpleLoadGenerator) WaitForAdditionalRows(count int) error { diff --git a/go/test/endtoend/vreplication/fk_ext_test.go b/go/test/endtoend/vreplication/fk_ext_test.go index 2693e26728d..3890583eb18 100644 --- a/go/test/endtoend/vreplication/fk_ext_test.go +++ b/go/test/endtoend/vreplication/fk_ext_test.go @@ -286,25 +286,30 @@ func doReshard(t *testing.T, keyspace, workflowName, sourceShards, targetShards rs.Complete() } +func areRowCountsEqual(t *testing.T) bool { + vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) + defer vtgateConn.Close() + parentRowCount := getRowCount(t, vtgateConn, "target2.parent") + childRowCount := getRowCount(t, vtgateConn, "target2.child") + parentCopyRowCount := getRowCount(t, vtgateConn, "target1.parent_copy") + childCopyRowCount := getRowCount(t, vtgateConn, "target1.child_copy") + log.Infof("Post-materialize row counts are parent: %d, child: %d, parent_copy: %d, child_copy: %d", + parentRowCount, childRowCount, parentCopyRowCount, childCopyRowCount) + if parentRowCount != parentCopyRowCount || childRowCount != childCopyRowCount { + return false + } + return true +} + // validateMaterializeRowCounts expects the Load generator to be stopped before calling it. func validateMaterializeRowCounts(t *testing.T) { if lg.State() != LoadGeneratorStateStopped { t.Fatal("Load generator was unexpectedly still running when validateMaterializeRowCounts was called -- this will produce unreliable results.") } - vtgateConn = getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) - defer vtgateConn.Close() - parentRowCount, err := getRowCount(t, vtgateConn, "target2.parent") - require.NoError(t, err) - childRowCount, err := getRowCount(t, vtgateConn, "target2.child") - require.NoError(t, err) - parentCopyRowCount, err := getRowCount(t, vtgateConn, "target1.parent_copy") - require.NoError(t, err) - childCopyRowCount, err := getRowCount(t, vtgateConn, "target1.child_copy") - require.NoError(t, err) - log.Infof("Post-materialize row counts are parent: %d, child: %d, parent_copy: %d, child_copy: %d", - parentRowCount, childRowCount, parentCopyRowCount, childCopyRowCount) - require.Equal(t, parentRowCount, parentCopyRowCount) - require.Equal(t, childRowCount, childCopyRowCount) + areRowCountsEqual2 := func() bool { + return areRowCountsEqual(t) + } + require.NoError(t, waitForCondition("row counts to be equal", areRowCountsEqual2, defaultTimeout)) } const fkExtMaterializeSpec = ` diff --git a/go/test/endtoend/vreplication/helper_test.go b/go/test/endtoend/vreplication/helper_test.go index a4e4f4e0cc0..07c12caf194 100644 --- a/go/test/endtoend/vreplication/helper_test.go +++ b/go/test/endtoend/vreplication/helper_test.go @@ -73,20 +73,24 @@ func execMultipleQueries(t *testing.T, conn *mysql.Conn, database string, lines } func execQueryWithRetry(t *testing.T, conn *mysql.Conn, query string, timeout time.Duration) *sqltypes.Result { - timer := time.NewTimer(timeout) - defer timer.Stop() + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + ticker := time.NewTicker(defaultTick) + defer ticker.Stop() + + var qr *sqltypes.Result + var err error for { - qr, err := conn.ExecuteFetch(query, 1000, false) + qr, err = conn.ExecuteFetch(query, 1000, false) if err == nil { return qr } select { - case <-timer.C: + case <-ctx.Done(): require.FailNow(t, fmt.Sprintf("query %q did not succeed before the timeout of %s; last seen result: %v", query, timeout, qr.Rows)) - default: + case <-ticker.C: log.Infof("query %q failed with error %v, retrying in %ds", query, err, defaultTick) - time.Sleep(defaultTick) } } } @@ -745,14 +749,11 @@ func isBinlogRowImageNoBlob(t *testing.T, tablet *cluster.VttabletProcess) bool return mode == "noblob" } -func getRowCount(t *testing.T, vtgateConn *mysql.Conn, table string) (int, error) { +func getRowCount(t *testing.T, vtgateConn *mysql.Conn, table string) int { query := fmt.Sprintf("select count(*) from %s", table) qr := execVtgateQuery(t, vtgateConn, "", query) - if qr == nil { - return 0, fmt.Errorf("query failed %s", query) - } - numRows, err := qr.Rows[0][0].ToInt() - return numRows, err + numRows, _ := qr.Rows[0][0].ToInt() + return numRows } const ( @@ -893,15 +894,16 @@ func waitForCondition(name string, condition func() bool, timeout time.Duration) ticker := time.NewTicker(tickInterval) defer ticker.Stop() - timeoutCh := time.After(timeout) + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() for { select { case <-ticker.C: if condition() { return nil } - case <-timeoutCh: - return fmt.Errorf("timed out waiting for %s", name) + case <-ctx.Done(): + return fmt.Errorf("%s: waiting for %s", ctx.Err(), name) } } }