Skip to content

Commit

Permalink
neonvm-controller: replace failing reconciliation with per-VM failure…
Browse files Browse the repository at this point in the history
… interval (#949)

The old method had frequent false positive, because there might be a lot
of intermittent failures, but overall the system does progress, and
every particular VM is getting reconciled.
  • Loading branch information
Omrigan authored Jun 7, 2024
1 parent c78e4bc commit 3ac5841
Show file tree
Hide file tree
Showing 9 changed files with 343 additions and 38 deletions.
2 changes: 2 additions & 0 deletions neonvm/config/controller/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ spec:
# * cache.direct=on - use O_DIRECT (don't abuse host's page cache!)
# * cache.no-flush=on - ignores disk flush operations (not needed; our disks are ephemeral)
- "--qemu-disk-cache-settings=cache.writeback=on,cache.direct=on,cache.no-flush=on"
- "--failure-pending-period=1m"
- "--failing-refresh-interval=15s"
env:
- name: NAD_IPAM_NAME
value: $(NAD_IPAM_NAME)
Expand Down
10 changes: 10 additions & 0 deletions neonvm/controllers/config.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package controllers

import "time"

// ReconcilerConfig stores shared configuration for VirtualMachineReconciler and
// VirtualMachineMigrationReconciler.
type ReconcilerConfig struct {
Expand All @@ -22,6 +24,14 @@ type ReconcilerConfig struct {
// This field is passed to neonvm-runner as the `-qemu-disk-cache-settings` arg, and is directly
// used in setting up the VM disks via QEMU's `-drive` flag.
QEMUDiskCacheSettings string

// FailurePendingPeriod is the period for the propagation of
// reconciliation failures to the observability instruments
FailurePendingPeriod time.Duration

// FailingRefreshInterval is the interval between consecutive
// updates of metrics and logs, related to failing reconciliations
FailingRefreshInterval time.Duration
}

func (c *ReconcilerConfig) criEndpointSocketPath() string {
Expand Down
108 changes: 108 additions & 0 deletions neonvm/controllers/failurelag/tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package failurelag

import (
"sync"
"time"
)

// Tracker accumulates failure events for a given key and determines if
// the key is degraded. The key becomes degraded if it receives only failures
// over a configurable pending period. Once the success event is received, the key
// is no longer considered degraded, and the pending period is reset.
type Tracker[T comparable] struct {
period time.Duration

pendingSince map[T]time.Time
degraded map[T]struct{}
degradeAt []degradeAt[T]

lock sync.Mutex
Now func() time.Time
}

type degradeAt[T comparable] struct {
ts time.Time
key T
}

func NewTracker[T comparable](period time.Duration) *Tracker[T] {
return &Tracker[T]{
period: period,
pendingSince: make(map[T]time.Time),
degraded: make(map[T]struct{}),
degradeAt: []degradeAt[T]{},
lock: sync.Mutex{},
Now: time.Now,
}
}

// forward processes all the fireAt events that are now in the past.
func (t *Tracker[T]) forward(now time.Time) {
i := 0
for ; i < len(t.degradeAt); i++ {
event := t.degradeAt[i]
if event.ts.After(now) {
break
}
pendingSince, ok := t.pendingSince[event.key]
if !ok {
// There was a success event in between
continue
}

if event.ts.Sub(pendingSince) < t.period {
// There was a success, and another failure in between
// We will have another fireAt event for this key in the future
continue
}
t.degraded[event.key] = struct{}{}
}
t.degradeAt = t.degradeAt[i:]
}

func (t *Tracker[T]) RecordSuccess(key T) {
t.lock.Lock()
defer t.lock.Unlock()

delete(t.degraded, key)
delete(t.pendingSince, key)
t.forward(t.Now())
}

func (t *Tracker[T]) RecordFailure(key T) {
t.lock.Lock()
defer t.lock.Unlock()

now := t.Now()

if _, ok := t.pendingSince[key]; !ok {
t.pendingSince[key] = now
}

t.degradeAt = append(t.degradeAt, degradeAt[T]{
ts: now.Add(t.period),
key: key,
})

t.forward(now)
}

func (t *Tracker[T]) DegradedCount() int {
t.lock.Lock()
defer t.lock.Unlock()

t.forward(t.Now())
return len(t.degraded)
}

func (t *Tracker[T]) Degraded() []T {
t.lock.Lock()
defer t.lock.Unlock()

t.forward(t.Now())
keys := make([]T, 0, len(t.degraded))
for k := range t.degraded {
keys = append(keys, k)
}
return keys
}
110 changes: 110 additions & 0 deletions neonvm/controllers/failurelag/tracker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package failurelag_test

import (
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/neondatabase/autoscaling/neonvm/controllers/failurelag"
)

type nowMock struct {
ts time.Time
}

func (n *nowMock) Now() time.Time {
return n.ts
}

func (n *nowMock) Add(d time.Duration) {
n.ts = n.ts.Add(d)
}

func newNowMock() *nowMock {
ts, _ := time.Parse("2006-01-02", "2024-01-01")
return &nowMock{ts: ts}
}

func TestTracker(t *testing.T) {
now := newNowMock()
tracker := failurelag.NewTracker[string](10 * time.Minute)
tracker.Now = now.Now

// Alert fires after 15 minutes
tracker.RecordFailure("key1")
assert.Equal(t, tracker.DegradedCount(), 0)
now.Add(15 * time.Minute)
assert.Equal(t, tracker.DegradedCount(), 1)

// Alert no longer fires
tracker.RecordSuccess("key1")
assert.Equal(t, tracker.DegradedCount(), 0)
}

func TestFailureSuccess(t *testing.T) {
now := newNowMock()
tracker := failurelag.NewTracker[string](10 * time.Minute)
tracker.Now = now.Now

// Alert doesn't fire if there was a success in the interval
tracker.RecordFailure("key1")

now.Add(5 * time.Minute)
tracker.RecordSuccess("key1")

now.Add(10 * time.Minute)
assert.Equal(t, tracker.DegradedCount(), 0)
}

func TestFailureSuccessFailure(t *testing.T) {
now := newNowMock()
tracker := failurelag.NewTracker[string](10 * time.Minute)
tracker.Now = now.Now

// Alert doesn't fire if there was success + failure in the interval
tracker.RecordFailure("key1")

now.Add(5 * time.Minute)
tracker.RecordSuccess("key1")

now.Add(1 * time.Minute)
tracker.RecordFailure("key1")

now.Add(5 * time.Minute)
assert.Equal(t, tracker.DegradedCount(), 0)

// But after 7 more minutes it does
now.Add(7 * time.Minute)
assert.Equal(t, tracker.DegradedCount(), 1)
}

func TestMultipleKeys(t *testing.T) {
now := newNowMock()
tracker := failurelag.NewTracker[string](10 * time.Minute)
tracker.Now = now.Now

// A combination of TestFailureSuccess and TestFailureSuccessFailure
tracker.RecordFailure("key1")
tracker.RecordFailure("key2")

now.Add(5 * time.Minute)
tracker.RecordSuccess("key1")
tracker.RecordSuccess("key2")

now.Add(1 * time.Minute)
tracker.RecordFailure("key1")

now.Add(5 * time.Minute)
assert.Equal(t, tracker.DegradedCount(), 0)

now.Add(7 * time.Minute)
assert.Equal(t, tracker.DegradedCount(), 1)
assert.Equal(t, tracker.Degraded(), []string{"key1"})

tracker.RecordFailure("key2")
now.Add(15 * time.Minute)
assert.Equal(t, tracker.DegradedCount(), 2)
assert.Contains(t, tracker.Degraded(), "key1")
assert.Contains(t, tracker.Degraded(), "key2")
}
Loading

0 comments on commit 3ac5841

Please sign in to comment.