Skip to content

Commit

Permalink
Heartbeat writer can always generate on-demand leased heartbeats, eve…
Browse files Browse the repository at this point in the history
…n if not at all configured (#16014)

Signed-off-by: Shlomi Noach <[email protected]>
  • Loading branch information
shlomi-noach authored Jul 3, 2024
1 parent de39000 commit 79c54e5
Show file tree
Hide file tree
Showing 7 changed files with 507 additions and 196 deletions.
1 change: 0 additions & 1 deletion go/test/endtoend/onlineddl/vrepl/onlineddl_vrepl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ func TestMain(m *testing.M) {

clusterInstance.VtTabletExtraArgs = []string{
"--heartbeat_interval", "250ms",
"--heartbeat_on_demand_duration", "5s",
"--migration_check_interval", "5s",
"--watch_replication_stream",
}
Expand Down
12 changes: 11 additions & 1 deletion go/timer/rate_limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ func (r *RateLimiter) DoEmpty() {
_ = r.Do(nil)
}

// AllowOne allows the next Do() call to run, even if the rate limiter would otherwise skip it.
func (r *RateLimiter) AllowOne() {
r.mu.Lock()
defer r.mu.Unlock()

r.lastDoValue = r.tickerValue.Load() - 1
}

// Diff returns the logical clock diff between the ticker and the last Do() call.
func (r *RateLimiter) Diff() int64 {
r.mu.Lock()
Expand All @@ -87,7 +95,9 @@ func (r *RateLimiter) Diff() int64 {

// Stop terminates rate limiter's operation and will not allow any more Do() executions.
func (r *RateLimiter) Stop() {
r.cancel()
if r.cancel != nil {
r.cancel()
}

r.mu.Lock()
defer r.mu.Unlock()
Expand Down
22 changes: 22 additions & 0 deletions go/timer/rate_limiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,23 @@ func TestRateLimiterShort(t *testing.T) {
assert.Less(t, val, 10)
}

func TestRateLimiterAllowOne(t *testing.T) {
r := NewRateLimiter(time.Millisecond * 250)
require.NotNil(t, r)
defer r.Stop()
val := 0
incr := func() error { val++; return nil }
times := 10
for range times {
time.Sleep(time.Millisecond * 100)
r.AllowOne()
err := r.Do(incr)
assert.NoError(t, err)
}
// we expect exactly 10 successful entries.
assert.Equal(t, times, val)
}

func TestRateLimiterStop(t *testing.T) {
r := NewRateLimiter(time.Millisecond * 10)
require.NotNil(t, r)
Expand Down Expand Up @@ -91,3 +108,8 @@ func TestRateLimiterDiff(t *testing.T) {
r.DoEmpty()
assert.LessOrEqual(t, r.Diff(), int64(1))
}

func TestRateLimiterUninitialized(t *testing.T) {
r := &RateLimiter{}
r.Stop()
}
26 changes: 12 additions & 14 deletions go/vt/vttablet/tabletserver/repltracker/repltracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,7 @@ var (

// ReplTracker tracks replication lag.
type ReplTracker struct {
mode string
forceHeartbeat bool
mode string

mu sync.Mutex
isPrimary bool
Expand All @@ -66,11 +65,10 @@ type ReplTracker struct {
// NewReplTracker creates a new ReplTracker.
func NewReplTracker(env tabletenv.Env, alias *topodatapb.TabletAlias) *ReplTracker {
return &ReplTracker{
mode: env.Config().ReplicationTracker.Mode,
forceHeartbeat: env.Config().ReplicationTracker.HeartbeatOnDemand > 0,
hw: newHeartbeatWriter(env, alias),
hr: newHeartbeatReader(env),
poller: &poller{},
mode: env.Config().ReplicationTracker.Mode,
hw: newHeartbeatWriter(env, alias),
hr: newHeartbeatReader(env),
poller: &poller{},
}
}

Expand All @@ -97,9 +95,7 @@ func (rt *ReplTracker) MakePrimary() {
rt.hr.Close()
rt.hw.Open()
}
if rt.forceHeartbeat {
rt.hw.Open()
}
rt.hw.Open()
}

// MakeNonPrimary must be called if the tablet type becomes non-PRIMARY.
Expand All @@ -117,9 +113,7 @@ func (rt *ReplTracker) MakeNonPrimary() {
// Run the status once to pre-initialize values.
rt.poller.Status()
}
if rt.forceHeartbeat {
rt.hw.Close()
}
rt.hw.Close()
}

// Close closes ReplTracker.
Expand Down Expand Up @@ -147,5 +141,9 @@ func (rt *ReplTracker) Status() (time.Duration, error) {
// EnableHeartbeat enables or disables writes of heartbeat. This functionality
// is only used by tests.
func (rt *ReplTracker) EnableHeartbeat(enable bool) {
rt.hw.enableWrites(enable)
if enable {
rt.hw.enableWrites()
} else {
rt.hw.disableWrites()
}
}
131 changes: 86 additions & 45 deletions go/vt/vttablet/tabletserver/repltracker/repltracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql/fakesqldb"
"vitess.io/vitess/go/vt/dbconfigs"
Expand Down Expand Up @@ -50,49 +51,89 @@ func TestReplTracker(t *testing.T) {
target := &querypb.Target{}
mysqld := mysqlctl.NewFakeMysqlDaemon(nil)

rt := NewReplTracker(env, alias)
rt.InitDBConfig(target, mysqld)
assert.Equal(t, tabletenv.Heartbeat, rt.mode)
assert.True(t, rt.hw.enabled)
assert.True(t, rt.hr.enabled)

rt.MakePrimary()
assert.True(t, rt.hw.isOpen)
assert.False(t, rt.hr.isOpen)
assert.True(t, rt.isPrimary)

lag, err := rt.Status()
assert.NoError(t, err)
assert.Equal(t, time.Duration(0), lag)

rt.MakeNonPrimary()
assert.False(t, rt.hw.isOpen)
assert.True(t, rt.hr.isOpen)
assert.False(t, rt.isPrimary)

rt.hr.lastKnownLag = 1 * time.Second
lag, err = rt.Status()
assert.NoError(t, err)
assert.Equal(t, 1*time.Second, lag)

rt.Close()
assert.False(t, rt.hw.isOpen)
assert.False(t, rt.hr.isOpen)

cfg.ReplicationTracker.Mode = tabletenv.Polling
rt = NewReplTracker(env, alias)
rt.InitDBConfig(target, mysqld)
assert.Equal(t, tabletenv.Polling, rt.mode)
assert.Equal(t, mysqld, rt.poller.mysqld)
assert.False(t, rt.hw.enabled)
assert.False(t, rt.hr.enabled)

rt.MakeNonPrimary()
assert.False(t, rt.hw.isOpen)
assert.False(t, rt.hr.isOpen)
assert.False(t, rt.isPrimary)

mysqld.ReplicationStatusError = errors.New("err")
_, err = rt.Status()
assert.Equal(t, "err", err.Error())
t.Run("always-on heartbeat", func(t *testing.T) {
rt := NewReplTracker(env, alias)
rt.InitDBConfig(target, mysqld)
assert.Equal(t, tabletenv.Heartbeat, rt.mode)
assert.Equal(t, HeartbeatConfigTypeAlways, rt.hw.configType)
assert.Zero(t, rt.hw.onDemandDuration)
assert.True(t, rt.hr.enabled)

rt.MakePrimary()
assert.True(t, rt.hw.isOpen)
assert.False(t, rt.hr.isOpen)
assert.True(t, rt.isPrimary)

lag, err := rt.Status()
assert.NoError(t, err)
assert.Equal(t, time.Duration(0), lag)

rt.MakeNonPrimary()
assert.False(t, rt.hw.isOpen)
assert.True(t, rt.hr.isOpen)
assert.False(t, rt.isPrimary)

rt.hr.lastKnownLag = 1 * time.Second
lag, err = rt.Status()
assert.NoError(t, err)
assert.Equal(t, 1*time.Second, lag)

rt.Close()
assert.False(t, rt.hw.isOpen)
assert.False(t, rt.hr.isOpen)
})
t.Run("disabled heartbeat", func(t *testing.T) {
cfg.ReplicationTracker.Mode = tabletenv.Polling
rt := NewReplTracker(env, alias)
rt.InitDBConfig(target, mysqld)
assert.Equal(t, tabletenv.Polling, rt.mode)
assert.Equal(t, mysqld, rt.poller.mysqld)
assert.Equal(t, HeartbeatConfigTypeNone, rt.hw.configType)
require.NotZero(t, defaultOnDemandDuration)
assert.Equal(t, defaultOnDemandDuration, rt.hw.onDemandDuration)
assert.False(t, rt.hr.enabled)

rt.MakeNonPrimary()
assert.False(t, rt.hw.isOpen)
assert.False(t, rt.hr.isOpen)
assert.False(t, rt.isPrimary)

mysqld.ReplicationStatusError = errors.New("err")
_, err := rt.Status()
assert.Equal(t, "err", err.Error())
})
t.Run("on-demand heartbeat", func(t *testing.T) {
cfg.ReplicationTracker.Mode = tabletenv.Heartbeat
cfg.ReplicationTracker.HeartbeatOnDemand = time.Second

rt := NewReplTracker(env, alias)
rt.InitDBConfig(target, mysqld)
assert.Equal(t, tabletenv.Heartbeat, rt.mode)
assert.Equal(t, HeartbeatConfigTypeOnDemand, rt.hw.configType)
assert.Equal(t, minimalOnDemandDuration, rt.hw.onDemandDuration)
assert.True(t, rt.hr.enabled)

rt.MakePrimary()
assert.True(t, rt.hw.isOpen)
assert.False(t, rt.hr.isOpen)
assert.True(t, rt.isPrimary)

lag, err := rt.Status()
assert.NoError(t, err)
assert.Equal(t, time.Duration(0), lag)

rt.MakeNonPrimary()
assert.False(t, rt.hw.isOpen)
assert.True(t, rt.hr.isOpen)
assert.False(t, rt.isPrimary)

rt.hr.lastKnownLag = 1 * time.Second
lag, err = rt.Status()
assert.NoError(t, err)
assert.Equal(t, 1*time.Second, lag)

rt.Close()
assert.False(t, rt.hw.isOpen)
assert.False(t, rt.hr.isOpen)
})
}
Loading

0 comments on commit 79c54e5

Please sign in to comment.