diff --git a/go/vt/vtorc/logic/tablet_discovery.go b/go/vt/vtorc/logic/tablet_discovery.go index 62fd7e9a4af..3bb56c8cb51 100644 --- a/go/vt/vtorc/logic/tablet_discovery.go +++ b/go/vt/vtorc/logic/tablet_discovery.go @@ -70,9 +70,6 @@ func OpenTabletDiscovery() <-chan time.Time { if _, err := db.ExecVTOrc("delete from vitess_tablet"); err != nil { log.Error(err) } -<<<<<<< HEAD - return time.Tick(time.Second * time.Duration(config.Config.TopoInformationRefreshSeconds)) //nolint SA1015: using time.Tick leaks the underlying ticker -======= // We refresh all information from the topo once before we start the ticks to do // it on a timer. ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) @@ -80,8 +77,7 @@ func OpenTabletDiscovery() <-chan time.Time { if err := refreshAllInformation(ctx); err != nil { log.Errorf("failed to initialize topo information: %+v", err) } - return time.Tick(config.GetTopoInformationRefreshDuration()) //nolint SA1015: using time.Tick leaks the underlying ticker ->>>>>>> 91811154ac (`vtorc`: require topo for `Healthy: true` in `/debug/health` (#17129)) + return time.Tick(time.Second * time.Duration(config.Config.TopoInformationRefreshSeconds)) //nolint SA1015: using time.Tick leaks the underlying ticker } // refreshAllTablets reloads the tablets from topo and discovers the ones which haven't been refreshed in a while @@ -91,14 +87,10 @@ func refreshAllTablets(ctx context.Context) error { }, false /* forceRefresh */) } -<<<<<<< HEAD -func refreshTabletsUsing(loader func(tabletAlias string), forceRefresh bool) { +func refreshTabletsUsing(ctx context.Context, loader func(tabletAlias string), forceRefresh bool) error { if !IsLeaderOrActive() { - return + return nil } -======= -func refreshTabletsUsing(ctx context.Context, loader func(tabletAlias string), forceRefresh bool) error { ->>>>>>> 91811154ac (`vtorc`: require topo for `Healthy: true` in `/debug/health` (#17129)) if len(clustersToWatch) == 0 { // all known clusters ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer cancel() diff --git a/go/vt/vtorc/logic/vtorc.go b/go/vt/vtorc/logic/vtorc.go index 65158b957a4..4115de3c7b3 100644 --- a/go/vt/vtorc/logic/vtorc.go +++ b/go/vt/vtorc/logic/vtorc.go @@ -17,12 +17,9 @@ package logic import ( -<<<<<<< HEAD + "context" "os" "os/signal" -======= - "context" ->>>>>>> 91811154ac (`vtorc`: require topo for `Healthy: true` in `/debug/health` (#17129)) "sync" "sync/atomic" "syscall" @@ -420,33 +417,7 @@ func ContinuousDiscovery() { } }() case <-tabletTopoTick: -<<<<<<< HEAD - // Create a wait group - var wg sync.WaitGroup - - // Refresh all keyspace information. - wg.Add(1) - go func() { - defer wg.Done() - RefreshAllKeyspacesAndShards() - }() - - // Refresh all tablets. - wg.Add(1) - go func() { - defer wg.Done() - refreshAllTablets() - }() - - // Wait for both the refreshes to complete - wg.Wait() - // We have completed one discovery cycle in the entirety of it. We should update the process health. - process.FirstDiscoveryCycleComplete.Store(true) - } - } -} -======= - ctx, cancel := context.WithTimeout(context.Background(), config.GetTopoInformationRefreshDuration()) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(config.Config.TopoInformationRefreshSeconds)) if err := refreshAllInformation(ctx); err != nil { log.Errorf("failed to refresh topo information: %+v", err) } @@ -477,4 +448,3 @@ func refreshAllInformation(ctx context.Context) error { } return err } ->>>>>>> 91811154ac (`vtorc`: require topo for `Healthy: true` in `/debug/health` (#17129)) diff --git a/go/vt/vtorc/logic/vtorc_test.go b/go/vt/vtorc/logic/vtorc_test.go index edd8141e8b7..77e268fdeaa 100644 --- a/go/vt/vtorc/logic/vtorc_test.go +++ b/go/vt/vtorc/logic/vtorc_test.go @@ -62,6 +62,8 @@ func waitForLocksReleaseAndGetTimeWaitedFor() time.Duration { } func TestRefreshAllInformation(t *testing.T) { + defer process.ResetLastHealthCheckCache() + // Store the old flags and restore on test completion oldTs := ts defer func() { @@ -74,12 +76,12 @@ func TestRefreshAllInformation(t *testing.T) { }() // Verify in the beginning, we have the first DiscoveredOnce field false. - _, discoveredOnce := process.HealthTest() - require.False(t, discoveredOnce) + _, err := process.HealthTest() + require.NoError(t, err) // Create a memory topo-server and create the keyspace and shard records ts = memorytopo.NewServer(context.Background(), cell1) - _, err := ts.GetOrCreateShard(context.Background(), keyspace, shard) + _, err = ts.GetOrCreateShard(context.Background(), keyspace, shard) require.NoError(t, err) // Test error @@ -87,14 +89,18 @@ func TestRefreshAllInformation(t *testing.T) { cancel() // cancel context to simulate timeout require.Error(t, refreshAllInformation(ctx)) require.False(t, process.FirstDiscoveryCycleComplete.Load()) - _, discoveredOnce = process.HealthTest() - require.False(t, discoveredOnce) + health, err := process.HealthTest() + require.NoError(t, err) + require.False(t, health.DiscoveredOnce) + process.ResetLastHealthCheckCache() // Test success ctx2, cancel2 := context.WithCancel(context.Background()) defer cancel2() require.NoError(t, refreshAllInformation(ctx2)) require.True(t, process.FirstDiscoveryCycleComplete.Load()) - _, discoveredOnce = process.HealthTest() - require.True(t, discoveredOnce) + health, err = process.HealthTest() + require.NoError(t, err) + require.True(t, health.DiscoveredOnce) + process.ResetLastHealthCheckCache() } diff --git a/go/vt/vtorc/process/health.go b/go/vt/vtorc/process/health.go index 7d247675e8b..7f8ab83b39b 100644 --- a/go/vt/vtorc/process/health.go +++ b/go/vt/vtorc/process/health.go @@ -36,6 +36,8 @@ var FirstDiscoveryCycleComplete atomic.Bool var lastHealthCheckCache = cache.New(config.HealthPollSeconds*time.Second, time.Second) +func ResetLastHealthCheckCache() { lastHealthCheckCache.Flush() } + type NodeHealth struct { Hostname string Token string @@ -105,18 +107,11 @@ func RegisterNode(nodeHealth *NodeHealth) (healthy bool, err error) { } // HealthTest attempts to write to the backend database and get a result -<<<<<<< HEAD func HealthTest() (health *HealthStatus, err error) { cacheKey := util.ProcessToken.Hash if healthStatus, found := lastHealthCheckCache.Get(cacheKey); found { return healthStatus.(*HealthStatus), nil } -======= -func HealthTest() (health *NodeHealth, discoveredOnce bool) { - ThisNodeHealth.LastReported = time.Now() - discoveredOnce = FirstDiscoveryCycleComplete.Load() - ThisNodeHealth.Healthy = discoveredOnce && writeHealthToDatabase() ->>>>>>> 91811154ac (`vtorc`: require topo for `Healthy: true` in `/debug/health` (#17129)) health = &HealthStatus{Healthy: false, Hostname: ThisHostname, Token: util.ProcessToken.Hash} defer lastHealthCheckCache.Set(cacheKey, health, cache.DefaultExpiration) @@ -127,8 +122,8 @@ func HealthTest() (health *NodeHealth, discoveredOnce bool) { log.Error(err) return health, err } - health.Healthy = healthy health.DiscoveredOnce = FirstDiscoveryCycleComplete.Load() + health.Healthy = healthy && health.DiscoveredOnce if health.ActiveNode, health.IsActiveNode, err = ElectedNode(); err != nil { health.Error = err