Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix reboot_queue_running to report internal-state more precisely #685

Merged
merged 2 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ CKE exposes the following metrics with the Prometheus format at `/metrics` REST
| reboot_queue_enabled | True (=1) if reboot queue is enabled. | Gauge | |
| reboot_queue_entries | The number of reboot queue entries remaining. | Gauge | |
| reboot_queue_items | The number reboot queue entries remaining per status. | Gauge | `status` |
| reboot_queue_running | True (=1) if reboot queue is enabled and the queue is not empty. | Gauge | |
| reboot_queue_running | True (=1) if reboot queue is running. | Gauge | |
| sabakan_integration_successful | True (=1) if sabakan-integration satisfies constraints. | Gauge | |
| sabakan_integration_timestamp_seconds | The Unix timestamp when `sabakan_integration_successful` was last updated. | Gauge | |
| sabakan_workers | The number of worker nodes for each role. | Gauge | `role` |
Expand Down
27 changes: 18 additions & 9 deletions metrics/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type metricGroup struct {
type storage interface {
IsSabakanDisabled(context.Context) (bool, error)
IsRebootQueueDisabled(ctx context.Context) (bool, error)
IsRebootQueueRunning(ctx context.Context) (bool, error)
GetRebootsEntries(ctx context.Context) ([]*cke.RebootQueueEntry, error)
GetCluster(ctx context.Context) (*cke.Cluster, error)
}
Expand Down Expand Up @@ -143,30 +144,38 @@ func (c nodeMetricsCollector) Collect(ch chan<- prometheus.Metric) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

rqDisabled, err := c.storage.IsRebootQueueDisabled(ctx)
disabled, err := c.storage.IsRebootQueueDisabled(ctx)
if err != nil {
log.Error("failed to get if reboot queue is enabled", map[string]interface{}{
log.FnError: err,
})
return
}
var rqEnabled float64
if !disabled {
rqEnabled = 1
}

rqEntries, err := c.storage.GetRebootsEntries(ctx)
running, err := c.storage.IsRebootQueueRunning(ctx)
if err != nil {
log.Error("failed to get reboots entries", map[string]interface{}{
log.Error("failed to get if reboot queue is running", map[string]interface{}{
log.FnError: err,
})
return
}

var rqEnabled, rqRunning float64
if !rqDisabled {
rqEnabled = 1
}
if !rqDisabled && len(rqEntries) > 0 {
var rqRunning float64
if running {
rqRunning = 1
}

rqEntries, err := c.storage.GetRebootsEntries(ctx)
if err != nil {
log.Error("failed to get reboots entries", map[string]interface{}{
log.FnError: err,
})
return
}

cluster, err := c.storage.GetCluster(ctx)
if err != nil {
log.Error("failed to get cluster", map[string]interface{}{
Expand Down
27 changes: 27 additions & 0 deletions metrics/updater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type updateOperationPhaseTestCase struct {
type updateRebootQueueEntriesTestCase struct {
name string
enabled bool
running bool
input []*cke.RebootQueueEntry
expectedEnabled float64
expectedRunning float64
Expand Down Expand Up @@ -256,6 +257,7 @@ func testUpdateRebootQueueEntries(t *testing.T) {
{
name: "zero",
enabled: true,
running: false,
input: nil,
expectedEnabled: 1,
expectedRunning: 0,
Expand All @@ -264,6 +266,7 @@ func testUpdateRebootQueueEntries(t *testing.T) {
{
name: "one",
enabled: true,
running: true,
input: []*cke.RebootQueueEntry{
{Status: cke.RebootStatusQueued},
},
Expand All @@ -274,6 +277,7 @@ func testUpdateRebootQueueEntries(t *testing.T) {
{
name: "two",
enabled: true,
running: true,
input: []*cke.RebootQueueEntry{
{Status: cke.RebootStatusQueued},
{Status: cke.RebootStatusRebooting},
Expand All @@ -282,9 +286,22 @@ func testUpdateRebootQueueEntries(t *testing.T) {
expectedRunning: 1,
expectedEntries: 2,
},
{
name: "two-stopping",
enabled: false,
running: true,
input: []*cke.RebootQueueEntry{
{Status: cke.RebootStatusQueued},
{Status: cke.RebootStatusRebooting},
},
expectedEnabled: 0,
expectedRunning: 1,
expectedEntries: 2,
},
{
name: "two-disabled",
enabled: false,
running: false,
input: []*cke.RebootQueueEntry{
{Status: cke.RebootStatusQueued},
{Status: cke.RebootStatusRebooting},
Expand All @@ -301,6 +318,7 @@ func testUpdateRebootQueueEntries(t *testing.T) {

collector, storage := newTestCollector()
storage.enableRebootQueue(tt.enabled)
storage.setRebootQueueRunning(tt.running)
storage.setRebootsEntries(tt.input)
handler := GetHandler(collector)

Expand Down Expand Up @@ -671,6 +689,7 @@ func newTestCollector() (prometheus.Collector, *testStorage) {
type testStorage struct {
sabakanEnabled bool
rebootQueueEnabled bool
rebootQueueRunning bool
rebootEntries []*cke.RebootQueueEntry
cluster *cke.Cluster
}
Expand All @@ -691,6 +710,14 @@ func (s *testStorage) enableRebootQueue(flag bool) {
s.rebootQueueEnabled = flag
}

func (s *testStorage) IsRebootQueueRunning(_ context.Context) (bool, error) {
return s.rebootQueueRunning, nil
}

func (s *testStorage) setRebootQueueRunning(flag bool) {
s.rebootQueueRunning = flag
}

func (s *testStorage) setRebootsEntries(entries []*cke.RebootQueueEntry) {
s.rebootEntries = entries
}
Expand Down
12 changes: 12 additions & 0 deletions server/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,18 @@ func (c Controller) runOnce(ctx context.Context, leaderKey string, tick <-chan t
}
}

running, err := inf.Storage().IsRebootQueueRunning(ctx)
if err != nil {
return err
}
runningNext := len(rqEntries) > 0
if running != runningNext {
err := inf.Storage().SetRebootQueueRunning(ctx, runningNext)
if err != nil {
return err
}
}

nf := NewNodeFilter(cluster, status)
apiServers := map[string]bool{}
for _, node := range nf.ControlPlane() {
Expand Down
26 changes: 26 additions & 0 deletions storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
KeyConstraints = "constraints"
KeyLeader = "leader/"
KeyRebootsDisabled = "reboots/disabled"
KeyRebootsRunning = "reboots/running"
KeyRebootsPrefix = "reboots/data/"
KeyRebootsWriteIndex = "reboots/write-index"
KeyRecords = "records/"
Expand Down Expand Up @@ -705,6 +706,31 @@ func (s Storage) EnableRebootQueue(ctx context.Context, flag bool) error {
return err
}

// IsRebootQueueRunning returns true if CKE is processing reboot queue.
func (s Storage) IsRebootQueueRunning(ctx context.Context) (bool, error) {
resp, err := s.Get(ctx, KeyRebootsRunning)
if err != nil {
return false, err
}
if resp.Count == 0 {
return false, nil
}

return bytes.Equal([]byte("true"), resp.Kvs[0].Value), nil
}

// SetRebootQueueRunning is used to report if CKE is processing reboot queue.
func (s Storage) SetRebootQueueRunning(ctx context.Context, flag bool) error {
var val string
if flag {
val = "true"
} else {
val = "false"
}
_, err := s.Put(ctx, KeyRebootsRunning, val)
return err
}

func rebootsEntryKey(index int64) string {
return fmt.Sprintf("%s%016x", KeyRebootsPrefix, index)
}
Expand Down
37 changes: 36 additions & 1 deletion storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,7 @@ func testStorageReboot(t *testing.T) {
t.Fatal(err)
}
if disabled {
t.Error("reboot queue should not be disabled by default")
t.Error("reboot queue should be enabled by default")
}

// disable rq and get its state
Expand Down Expand Up @@ -776,6 +776,41 @@ func testStorageReboot(t *testing.T) {
if disabled {
t.Error("reboot queue could not be re-enabled")
}

// rq is not running by default
running, err := storage.IsRebootQueueRunning(ctx)
if err != nil {
t.Fatal(err)
}
if running {
t.Error("reboot queue should not be running by default")
}

// report running rq and get its state
err = storage.SetRebootQueueRunning(ctx, true)
if err != nil {
t.Fatal(err)
}
running, err = storage.IsRebootQueueRunning(ctx)
if err != nil {
t.Fatal(err)
}
if !running {
t.Error("reboot queue could not be reported running")
}

// report not running rq and get its state
err = storage.SetRebootQueueRunning(ctx, false)
if err != nil {
t.Fatal(err)
}
running, err = storage.IsRebootQueueRunning(ctx)
if err != nil {
t.Fatal(err)
}
if running {
t.Error("reboot queue could not be reported not running")
}
}

func testStatus(t *testing.T) {
Expand Down