diff --git a/go/vt/discovery/topology_watcher.go b/go/vt/discovery/topology_watcher.go index c510a7ee871..e91925e386a 100644 --- a/go/vt/discovery/topology_watcher.go +++ b/go/vt/discovery/topology_watcher.go @@ -199,9 +199,6 @@ func (tw *TopologyWatcher) loadTablets() { log.Errorf("cannot get tablet for alias %v: %v", alias, err) return } - if !(tw.tabletFilter == nil || tw.tabletFilter.IsIncluded(tablet.Tablet)) { - return - } tw.mu.Lock() aliasStr := topoproto.TabletAliasString(alias) newTablets[aliasStr] = &tabletInfo{ @@ -217,6 +214,10 @@ func (tw *TopologyWatcher) loadTablets() { tw.mu.Lock() for alias, newVal := range newTablets { + if tw.tabletFilter != nil && !tw.tabletFilter.IsIncluded(newVal.tablet) { + continue + } + // trust the alias from topo and add it if it doesn't exist if val, ok := tw.tablets[alias]; ok { // check if the host and port have changed. If yes, replace tablet. @@ -236,6 +237,10 @@ func (tw *TopologyWatcher) loadTablets() { } for _, val := range tw.tablets { + if tw.tabletFilter != nil && !tw.tabletFilter.IsIncluded(val.tablet) { + continue + } + if _, ok := newTablets[val.alias]; !ok { tw.tabletRecorder.RemoveTablet(val.tablet) topologyWatcherOperations.Add(topologyWatcherOpRemoveTablet, 1) diff --git a/go/vt/discovery/topology_watcher_test.go b/go/vt/discovery/topology_watcher_test.go index 1ea6bfe2418..57dd24585bc 100644 --- a/go/vt/discovery/topology_watcher_test.go +++ b/go/vt/discovery/topology_watcher_test.go @@ -17,12 +17,13 @@ limitations under the License. package discovery import ( + "context" "math/rand" "testing" "time" - "context" - + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "google.golang.org/protobuf/proto" "vitess.io/vitess/go/vt/logutil" @@ -44,19 +45,14 @@ func checkOpCounts(t *testing.T, prevCounts, deltas map[string]int64) map[string newVal = 0 } - if newVal != prevVal+delta { - t.Errorf("expected %v to increase by %v, got %v -> %v", key, delta, prevVal, newVal) - } + assert.Equal(t, newVal, prevVal+delta, "expected %v to increase by %v, got %v -> %v", key, delta, prevVal, newVal) } return newCounts } func checkChecksum(t *testing.T, tw *TopologyWatcher, want uint32) { t.Helper() - got := tw.TopoChecksum() - if want != got { - t.Errorf("want checksum %v got %v", want, got) - } + assert.Equal(t, want, tw.TopoChecksum()) } func TestStartAndCloseTopoWatcher(t *testing.T) { @@ -506,3 +502,115 @@ func TestFilterByKeyspace(t *testing.T) { } } } + +// TestFilterByKeypsaceSkipsIgnoredTablets confirms a bug fix for the case when a TopologyWatcher +// has a FilterByKeyspace TabletFilter configured along with refreshKnownTablets turned off. We want +// to ensure that the TopologyWatcher: +// - does not continuosly call GetTablets for tablets that do not satisfy the filter +// - does not add or remove these filtered out tablets from the its healtcheck +func TestFilterByKeypsaceSkipsIgnoredTablets(t *testing.T) { + ts := memorytopo.NewServer("aa") + fhc := NewFakeHealthCheck(nil) + topologyWatcherOperations.ZeroAll() + counts := topologyWatcherOperations.Counts() + f := NewFilterByKeyspace(testKeyspacesToWatch) + tw := NewCellTabletsWatcher(context.Background(), ts, fhc, f, "aa", 10*time.Minute, false /*refreshKnownTablets*/, 5) + + counts = checkOpCounts(t, counts, map[string]int64{}) + checkChecksum(t, tw, 0) + + // Add a tablet from a tracked keyspace to the topology. + tablet := &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "aa", + Uid: 0, + }, + Hostname: "host1", + PortMap: map[string]int32{ + "vt": 123, + }, + Keyspace: "ks1", + Shard: "shard", + } + require.NoError(t, ts.CreateTablet(context.Background(), tablet)) + + tw.loadTablets() + counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 1, "AddTablet": 1}) + checkChecksum(t, tw, 3238442862) + + // Check tablet is reported by HealthCheck + allTablets := fhc.GetAllTablets() + key := TabletToMapKey(tablet) + assert.Contains(t, allTablets, key) + assert.True(t, proto.Equal(tablet, allTablets[key])) + + // Add a second tablet to the topology that should get filtered out by the keyspace filter + tablet2 := &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "aa", + Uid: 2, + }, + Hostname: "host2", + PortMap: map[string]int32{ + "vt": 789, + }, + Keyspace: "ks3", + Shard: "shard", + } + require.NoError(t, ts.CreateTablet(context.Background(), tablet2)) + + tw.loadTablets() + counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 1}) + checkChecksum(t, tw, 2762153755) + + // Check the new tablet is NOT reported by HealthCheck. + allTablets = fhc.GetAllTablets() + assert.Len(t, allTablets, 1) + key = TabletToMapKey(tablet2) + assert.NotContains(t, allTablets, key) + + // Load the tablets again to show that when refreshKnownTablets is disabled, + // only the list is read from the topo and the checksum doesn't change + tw.loadTablets() + counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1}) + checkChecksum(t, tw, 2762153755) + + // With refreshKnownTablets set to false, changes to the port map for the same tablet alias + // should not be reflected in the HealtCheck state + _, err := ts.UpdateTabletFields(context.Background(), tablet.Alias, func(t *topodatapb.Tablet) error { + t.PortMap["vt"] = 456 + return nil + }) + require.NoError(t, err) + + tw.loadTablets() + counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1}) + checkChecksum(t, tw, 2762153755) + + allTablets = fhc.GetAllTablets() + assert.Len(t, allTablets, 1) + origKey := TabletToMapKey(tablet) + tabletWithNewPort := proto.Clone(tablet).(*topodatapb.Tablet) + tabletWithNewPort.PortMap["vt"] = 456 + keyWithNewPort := TabletToMapKey(tabletWithNewPort) + assert.Contains(t, allTablets, origKey) + assert.NotContains(t, allTablets, keyWithNewPort) + + // Remove the tracked tablet from the topo and check that it is detected as being gone. + require.NoError(t, ts.DeleteTablet(context.Background(), tablet.Alias)) + + tw.loadTablets() + counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "RemoveTablet": 1}) + checkChecksum(t, tw, 789108290) + assert.Empty(t, fhc.GetAllTablets()) + + // Remove ignored tablet and check that we didn't try to remove it from the health check + require.NoError(t, ts.DeleteTablet(context.Background(), tablet2.Alias)) + + tw.loadTablets() + checkOpCounts(t, counts, map[string]int64{"ListTablets": 1}) + checkChecksum(t, tw, 0) + assert.Empty(t, fhc.GetAllTablets()) + + tw.Stop() +}