Skip to content

Commit

Permalink
Add stopping state to reboot-queue
Browse files Browse the repository at this point in the history
Signed-off-by: Daichi Sakaue <[email protected]>
  • Loading branch information
yokaze committed Dec 12, 2023
1 parent 9a55708 commit 0d7f294
Show file tree
Hide file tree
Showing 11 changed files with 98 additions and 59 deletions.
2 changes: 1 addition & 1 deletion docs/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ CKE exposes the following metrics with the Prometheus format at `/metrics` REST
| node_reboot_status | The reboot status of a node. | Gauge | `node`, `status` |
| operation_phase | 1 if CKE is operating in the phase specified by the `phase` label. | Gauge | `phase` |
| operation_phase_timestamp_seconds | The Unix timestamp when `operation_phase` was last updated. | Gauge | |
| reboot_queue_enabled | True (=1) if reboot queue is enabled. | Gauge | |
| reboot_queue_enabled | True (=1) if reboot queue is enabled or stopping. | Gauge | |
| reboot_queue_entries | The number of reboot queue entries remaining. | Gauge | |
| reboot_queue_items | The number reboot queue entries remaining per status. | Gauge | `status` |
| reboot_queue_running | True (=1) if reboot queue is enabled and the queue is not empty. | Gauge | |
Expand Down
2 changes: 1 addition & 1 deletion docs/reboot.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ The command writes reboot queue entry(s) and increments `reboots/write-index` at

The queue is processed by CKE as follows:

1. If `reboots/disabled` is `true`, it doesn't process the queue.
1. If `reboots/state` is not `enabled`, it will not process the queue.
2. Check the reboot queue to find an entry.
- If the number of nodes under processing is less than maximum concurrent reboots and the number of unreachable nodes that are not under this reboot process is not more than `maximum-unreachable-nodes-for-reboot` in the constraints, pick several nodes from front of the queue and start draining them.
1. Cordon the node.
Expand Down
12 changes: 6 additions & 6 deletions metrics/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type metricGroup struct {
// This abstraction is for mock test.
type storage interface {
IsSabakanDisabled(context.Context) (bool, error)
IsRebootQueueDisabled(ctx context.Context) (bool, error)
GetRebootQueueState(ctx context.Context) (cke.RebootQueueState, error)
GetRebootsEntries(ctx context.Context) ([]*cke.RebootQueueEntry, error)
GetCluster(ctx context.Context) (*cke.Cluster, error)
}
Expand Down Expand Up @@ -143,7 +143,7 @@ func (c nodeMetricsCollector) Collect(ch chan<- prometheus.Metric) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()

rqDisabled, err := c.storage.IsRebootQueueDisabled(ctx)
rqState, err := c.storage.GetRebootQueueState(ctx)
if err != nil {
log.Error("failed to get if reboot queue is enabled", map[string]interface{}{
log.FnError: err,
Expand All @@ -160,11 +160,11 @@ func (c nodeMetricsCollector) Collect(ch chan<- prometheus.Metric) {
}

var rqEnabled, rqRunning float64
if !rqDisabled {
if rqState == cke.RebootQueueStateEnabled || rqState == cke.RebootQueueStateStopping {
rqEnabled = 1
}
if !rqDisabled && len(rqEntries) > 0 {
rqRunning = 1
if len(rqEntries) > 0 {
rqRunning = 1
}
}

cluster, err := c.storage.GetCluster(ctx)
Expand Down
45 changes: 28 additions & 17 deletions metrics/updater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type updateOperationPhaseTestCase struct {

type updateRebootQueueEntriesTestCase struct {
name string
enabled bool
state cke.RebootQueueState
input []*cke.RebootQueueEntry
expectedEnabled float64
expectedRunning float64
Expand Down Expand Up @@ -255,15 +255,15 @@ func testUpdateRebootQueueEntries(t *testing.T) {
testCases := []updateRebootQueueEntriesTestCase{
{
name: "zero",
enabled: true,
state: cke.RebootQueueStateEnabled,
input: nil,
expectedEnabled: 1,
expectedRunning: 0,
expectedEntries: 0,
},
{
name: "one",
enabled: true,
name: "one",
state: cke.RebootQueueStateEnabled,
input: []*cke.RebootQueueEntry{
{Status: cke.RebootStatusQueued},
},
Expand All @@ -272,8 +272,8 @@ func testUpdateRebootQueueEntries(t *testing.T) {
expectedEntries: 1,
},
{
name: "two",
enabled: true,
name: "two",
state: cke.RebootQueueStateEnabled,
input: []*cke.RebootQueueEntry{
{Status: cke.RebootStatusQueued},
{Status: cke.RebootStatusRebooting},
Expand All @@ -283,8 +283,19 @@ func testUpdateRebootQueueEntries(t *testing.T) {
expectedEntries: 2,
},
{
name: "two-disabled",
enabled: false,
name: "two-stopping",
state: cke.RebootQueueStateStopping,
input: []*cke.RebootQueueEntry{
{Status: cke.RebootStatusQueued},
{Status: cke.RebootStatusRebooting},
},
expectedEnabled: 1,
expectedRunning: 1,
expectedEntries: 2,
},
{
name: "two-disabled",
state: cke.RebootQueueStateDisabled,
input: []*cke.RebootQueueEntry{
{Status: cke.RebootStatusQueued},
{Status: cke.RebootStatusRebooting},
Expand All @@ -300,7 +311,7 @@ func testUpdateRebootQueueEntries(t *testing.T) {
defer ctx.Done()

collector, storage := newTestCollector()
storage.enableRebootQueue(tt.enabled)
storage.setRebootQueueState(tt.state)
storage.setRebootsEntries(tt.input)
handler := GetHandler(collector)

Expand Down Expand Up @@ -669,10 +680,10 @@ func newTestCollector() (prometheus.Collector, *testStorage) {
}

type testStorage struct {
sabakanEnabled bool
rebootQueueEnabled bool
rebootEntries []*cke.RebootQueueEntry
cluster *cke.Cluster
sabakanEnabled bool
rebootQueueState cke.RebootQueueState
rebootEntries []*cke.RebootQueueEntry
cluster *cke.Cluster
}

func (s *testStorage) enableSabakan(flag bool) {
Expand All @@ -683,12 +694,12 @@ func (s *testStorage) IsSabakanDisabled(_ context.Context) (bool, error) {
return !s.sabakanEnabled, nil
}

func (s *testStorage) IsRebootQueueDisabled(_ context.Context) (bool, error) {
return !s.rebootQueueEnabled, nil
func (s *testStorage) GetRebootQueueState(_ context.Context) (cke.RebootQueueState, error) {
return s.rebootQueueState, nil
}

func (s *testStorage) enableRebootQueue(flag bool) {
s.rebootQueueEnabled = flag
func (s *testStorage) setRebootQueueState(state cke.RebootQueueState) {
s.rebootQueueState = state
}

func (s *testStorage) setRebootsEntries(entries []*cke.RebootQueueEntry) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/ckecli/cmd/reboot_queue_disable.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cmd
import (
"context"

"github.com/cybozu-go/cke"
"github.com/cybozu-go/well"
"github.com/spf13/cobra"
)
Expand All @@ -14,7 +15,7 @@ var rebootQueueDisableCmd = &cobra.Command{

RunE: func(cmd *cobra.Command, args []string) error {
well.Go(func(ctx context.Context) error {
return storage.EnableRebootQueue(ctx, false)
return storage.SetRebootQueueState(ctx, cke.RebootQueueStateStopping)
})
well.Stop()
return well.Wait()
Expand Down
3 changes: 2 additions & 1 deletion pkg/ckecli/cmd/reboot_queue_enable.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cmd
import (
"context"

"github.com/cybozu-go/cke"
"github.com/cybozu-go/well"
"github.com/spf13/cobra"
)
Expand All @@ -14,7 +15,7 @@ var rebootQueueEnableCmd = &cobra.Command{

RunE: func(cmd *cobra.Command, args []string) error {
well.Go(func(ctx context.Context) error {
return storage.EnableRebootQueue(ctx, true)
return storage.SetRebootQueueState(ctx, cke.RebootQueueStateEnabled)
})
well.Stop()
return well.Wait()
Expand Down
4 changes: 2 additions & 2 deletions pkg/ckecli/cmd/reboot_queue_is_enabled.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ var rebootQueueIsEnabledCmd = &cobra.Command{

RunE: func(cmd *cobra.Command, args []string) error {
well.Go(func(ctx context.Context) error {
disabled, err := storage.IsRebootQueueDisabled(ctx)
state, err := storage.GetRebootQueueState(ctx)
if err != nil {
return err
}
fmt.Println(!disabled)
fmt.Println(string(state))
return nil
})
well.Stop()
Expand Down
13 changes: 13 additions & 0 deletions reboot.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,19 @@ import (
"time"
)

// RebootQueueState is state of reboot queue
type RebootQueueState string

// RebootQueue states
const (
// Reboot queue is enabled (should be set by ckecli)
RebootQueueStateEnabled = RebootQueueState("enabled")
// Reboot queue is requested to stop (should be set by ckecli)
RebootQueueStateStopping = RebootQueueState("stopping")
// Reboot queue is disabled (should be set by CKE)
RebootQueueStateDisabled = RebootQueueState("disabled")
)

// RebootStatus is status of reboot operation
type RebootStatus string

Expand Down
14 changes: 10 additions & 4 deletions server/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,11 +317,17 @@ func (c Controller) runOnce(ctx context.Context, leaderKey string, tick <-chan t
rqEntries = cke.DedupRebootQueueEntries(rqEntries)

if len(rqEntries) > 0 {
disabled, err := inf.Storage().IsRebootQueueDisabled(ctx)
if err != nil {
rqState, err := inf.Storage().GetRebootQueueState(ctx)
switch {
case err != nil:
return err
}
if disabled {
case rqState == cke.RebootQueueStateStopping:
err := inf.Storage().SetRebootQueueState(ctx, cke.RebootQueueStateDisabled)
if err != nil {
return err
}
rqEntries = nil
case rqState == cke.RebootQueueStateDisabled:
rqEntries = nil
}
}
Expand Down
39 changes: 23 additions & 16 deletions storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ const (
KeyClusterRevision = "cluster-revision"
KeyConstraints = "constraints"
KeyLeader = "leader/"
KeyRebootsDisabled = "reboots/disabled"
KeyRebootsDisabled = "reboots/disabled" // TODO: remove this key
KeyRebootsState = "reboots/state"
KeyRebootsPrefix = "reboots/data/"
KeyRebootsWriteIndex = "reboots/write-index"
KeyRecords = "records/"
Expand Down Expand Up @@ -679,29 +680,35 @@ func (s Storage) GetSabakanURL(ctx context.Context) (string, error) {
return s.getStringValue(ctx, KeySabakanURL)
}

// IsRebootQueueDisabled returns true if reboot queue is disabled.
func (s Storage) IsRebootQueueDisabled(ctx context.Context) (bool, error) {
resp, err := s.Get(ctx, KeyRebootsDisabled)
// GetRebootQueueState returns reboot queue state.
func (s Storage) GetRebootQueueState(ctx context.Context) (RebootQueueState, error) {
resp, err := s.Get(ctx, KeyRebootsState)
if err != nil {
return false, err
// defaulted to on
return RebootQueueStateEnabled, err
}
if resp.Count == 0 {
return false, nil
return RebootQueueStateEnabled, nil
}

return bytes.Equal([]byte("true"), resp.Kvs[0].Value), nil
return RebootQueueState(resp.Kvs[0].Value), nil
}

// EnableRebootQueue enables reboot queue processing when flag is true.
// When flag is false, reboot queue is not processed.
func (s Storage) EnableRebootQueue(ctx context.Context, flag bool) error {
var val string
if flag {
val = "false"
} else {
val = "true"
// SetRebootQueueState sets reboot queue state.
func (s Storage) SetRebootQueueState(ctx context.Context, state RebootQueueState) error {
// TODO: remove this temporary code
resp, err := s.Get(ctx, KeyRebootsDisabled)
if err != nil {
return err
}
_, err := s.Put(ctx, KeyRebootsDisabled, val)
if resp.Count > 0 {
_, err := s.Delete(ctx, KeyRebootsDisabled)
if err != nil {
return err
}
}

_, err = s.Put(ctx, KeyRebootsState, string(state))
return err
}

Expand Down
20 changes: 10 additions & 10 deletions storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -743,37 +743,37 @@ func testStorageReboot(t *testing.T) {
}

// rq is enabled by default
disabled, err := storage.IsRebootQueueDisabled(ctx)
state, err := storage.GetRebootQueueState(ctx)
if err != nil {
t.Fatal(err)
}
if disabled {
t.Error("reboot queue should not be disabled by default")
if state != RebootQueueStateEnabled {
t.Error("reboot queue should be enabled by default")
}

// disable rq and get its state
err = storage.EnableRebootQueue(ctx, false)
err = storage.SetRebootQueueState(ctx, RebootQueueStateStopping)
if err != nil {
t.Fatal(err)
}
disabled, err = storage.IsRebootQueueDisabled(ctx)
state, err = storage.GetRebootQueueState(ctx)
if err != nil {
t.Fatal(err)
}
if !disabled {
t.Error("reboot queue could not be disabled")
if state != RebootQueueStateStopping {
t.Error("reboot queue state is not updated correctly")
}

// re-enable rq and get its state
err = storage.EnableRebootQueue(ctx, true)
err = storage.SetRebootQueueState(ctx, RebootQueueStateEnabled)
if err != nil {
t.Fatal(err)
}
disabled, err = storage.IsRebootQueueDisabled(ctx)
state, err = storage.GetRebootQueueState(ctx)
if err != nil {
t.Fatal(err)
}
if disabled {
if state != RebootQueueStateEnabled {
t.Error("reboot queue could not be re-enabled")
}
}
Expand Down

0 comments on commit 0d7f294

Please sign in to comment.