From 0d7f294a3ad07449f7a5ee149df1eb8840950ba1 Mon Sep 17 00:00:00 2001 From: Daichi Sakaue Date: Fri, 8 Dec 2023 17:40:20 +0900 Subject: [PATCH 1/2] Add stopping state to reboot-queue Signed-off-by: Daichi Sakaue --- docs/metrics.md | 2 +- docs/reboot.md | 2 +- metrics/collector.go | 12 +++--- metrics/updater_test.go | 45 ++++++++++++++--------- pkg/ckecli/cmd/reboot_queue_disable.go | 3 +- pkg/ckecli/cmd/reboot_queue_enable.go | 3 +- pkg/ckecli/cmd/reboot_queue_is_enabled.go | 4 +- reboot.go | 13 +++++++ server/control.go | 14 +++++-- storage.go | 39 ++++++++++++-------- storage_test.go | 20 +++++----- 11 files changed, 98 insertions(+), 59 deletions(-) diff --git a/docs/metrics.md b/docs/metrics.md index 86420df4a..9b89be729 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -9,7 +9,7 @@ CKE exposes the following metrics with the Prometheus format at `/metrics` REST | node_reboot_status | The reboot status of a node. | Gauge | `node`, `status` | | operation_phase | 1 if CKE is operating in the phase specified by the `phase` label. | Gauge | `phase` | | operation_phase_timestamp_seconds | The Unix timestamp when `operation_phase` was last updated. | Gauge | | -| reboot_queue_enabled | True (=1) if reboot queue is enabled. | Gauge | | +| reboot_queue_enabled | True (=1) if reboot queue is enabled or stopping. | Gauge | | | reboot_queue_entries | The number of reboot queue entries remaining. | Gauge | | | reboot_queue_items | The number reboot queue entries remaining per status. | Gauge | `status` | | reboot_queue_running | True (=1) if reboot queue is enabled and the queue is not empty. | Gauge | | diff --git a/docs/reboot.md b/docs/reboot.md index 465eb3b72..584cdb1ac 100644 --- a/docs/reboot.md +++ b/docs/reboot.md @@ -43,7 +43,7 @@ The command writes reboot queue entry(s) and increments `reboots/write-index` at The queue is processed by CKE as follows: -1. If `reboots/disabled` is `true`, it doesn't process the queue. +1. If `reboots/state` is not `enabled`, it will not process the queue. 2. Check the reboot queue to find an entry. - If the number of nodes under processing is less than maximum concurrent reboots and the number of unreachable nodes that are not under this reboot process is not more than `maximum-unreachable-nodes-for-reboot` in the constraints, pick several nodes from front of the queue and start draining them. 1. Cordon the node. diff --git a/metrics/collector.go b/metrics/collector.go index b594022cb..7f3222e77 100644 --- a/metrics/collector.go +++ b/metrics/collector.go @@ -39,7 +39,7 @@ type metricGroup struct { // This abstraction is for mock test. type storage interface { IsSabakanDisabled(context.Context) (bool, error) - IsRebootQueueDisabled(ctx context.Context) (bool, error) + GetRebootQueueState(ctx context.Context) (cke.RebootQueueState, error) GetRebootsEntries(ctx context.Context) ([]*cke.RebootQueueEntry, error) GetCluster(ctx context.Context) (*cke.Cluster, error) } @@ -143,7 +143,7 @@ func (c nodeMetricsCollector) Collect(ch chan<- prometheus.Metric) { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() - rqDisabled, err := c.storage.IsRebootQueueDisabled(ctx) + rqState, err := c.storage.GetRebootQueueState(ctx) if err != nil { log.Error("failed to get if reboot queue is enabled", map[string]interface{}{ log.FnError: err, @@ -160,11 +160,11 @@ func (c nodeMetricsCollector) Collect(ch chan<- prometheus.Metric) { } var rqEnabled, rqRunning float64 - if !rqDisabled { + if rqState == cke.RebootQueueStateEnabled || rqState == cke.RebootQueueStateStopping { rqEnabled = 1 - } - if !rqDisabled && len(rqEntries) > 0 { - rqRunning = 1 + if len(rqEntries) > 0 { + rqRunning = 1 + } } cluster, err := c.storage.GetCluster(ctx) diff --git a/metrics/updater_test.go b/metrics/updater_test.go index 07c8c725e..90b07fac2 100644 --- a/metrics/updater_test.go +++ b/metrics/updater_test.go @@ -43,7 +43,7 @@ type updateOperationPhaseTestCase struct { type updateRebootQueueEntriesTestCase struct { name string - enabled bool + state cke.RebootQueueState input []*cke.RebootQueueEntry expectedEnabled float64 expectedRunning float64 @@ -255,15 +255,15 @@ func testUpdateRebootQueueEntries(t *testing.T) { testCases := []updateRebootQueueEntriesTestCase{ { name: "zero", - enabled: true, + state: cke.RebootQueueStateEnabled, input: nil, expectedEnabled: 1, expectedRunning: 0, expectedEntries: 0, }, { - name: "one", - enabled: true, + name: "one", + state: cke.RebootQueueStateEnabled, input: []*cke.RebootQueueEntry{ {Status: cke.RebootStatusQueued}, }, @@ -272,8 +272,8 @@ func testUpdateRebootQueueEntries(t *testing.T) { expectedEntries: 1, }, { - name: "two", - enabled: true, + name: "two", + state: cke.RebootQueueStateEnabled, input: []*cke.RebootQueueEntry{ {Status: cke.RebootStatusQueued}, {Status: cke.RebootStatusRebooting}, @@ -283,8 +283,19 @@ func testUpdateRebootQueueEntries(t *testing.T) { expectedEntries: 2, }, { - name: "two-disabled", - enabled: false, + name: "two-stopping", + state: cke.RebootQueueStateStopping, + input: []*cke.RebootQueueEntry{ + {Status: cke.RebootStatusQueued}, + {Status: cke.RebootStatusRebooting}, + }, + expectedEnabled: 1, + expectedRunning: 1, + expectedEntries: 2, + }, + { + name: "two-disabled", + state: cke.RebootQueueStateDisabled, input: []*cke.RebootQueueEntry{ {Status: cke.RebootStatusQueued}, {Status: cke.RebootStatusRebooting}, @@ -300,7 +311,7 @@ func testUpdateRebootQueueEntries(t *testing.T) { defer ctx.Done() collector, storage := newTestCollector() - storage.enableRebootQueue(tt.enabled) + storage.setRebootQueueState(tt.state) storage.setRebootsEntries(tt.input) handler := GetHandler(collector) @@ -669,10 +680,10 @@ func newTestCollector() (prometheus.Collector, *testStorage) { } type testStorage struct { - sabakanEnabled bool - rebootQueueEnabled bool - rebootEntries []*cke.RebootQueueEntry - cluster *cke.Cluster + sabakanEnabled bool + rebootQueueState cke.RebootQueueState + rebootEntries []*cke.RebootQueueEntry + cluster *cke.Cluster } func (s *testStorage) enableSabakan(flag bool) { @@ -683,12 +694,12 @@ func (s *testStorage) IsSabakanDisabled(_ context.Context) (bool, error) { return !s.sabakanEnabled, nil } -func (s *testStorage) IsRebootQueueDisabled(_ context.Context) (bool, error) { - return !s.rebootQueueEnabled, nil +func (s *testStorage) GetRebootQueueState(_ context.Context) (cke.RebootQueueState, error) { + return s.rebootQueueState, nil } -func (s *testStorage) enableRebootQueue(flag bool) { - s.rebootQueueEnabled = flag +func (s *testStorage) setRebootQueueState(state cke.RebootQueueState) { + s.rebootQueueState = state } func (s *testStorage) setRebootsEntries(entries []*cke.RebootQueueEntry) { diff --git a/pkg/ckecli/cmd/reboot_queue_disable.go b/pkg/ckecli/cmd/reboot_queue_disable.go index 55a6cf9e4..0c695c991 100644 --- a/pkg/ckecli/cmd/reboot_queue_disable.go +++ b/pkg/ckecli/cmd/reboot_queue_disable.go @@ -3,6 +3,7 @@ package cmd import ( "context" + "github.com/cybozu-go/cke" "github.com/cybozu-go/well" "github.com/spf13/cobra" ) @@ -14,7 +15,7 @@ var rebootQueueDisableCmd = &cobra.Command{ RunE: func(cmd *cobra.Command, args []string) error { well.Go(func(ctx context.Context) error { - return storage.EnableRebootQueue(ctx, false) + return storage.SetRebootQueueState(ctx, cke.RebootQueueStateStopping) }) well.Stop() return well.Wait() diff --git a/pkg/ckecli/cmd/reboot_queue_enable.go b/pkg/ckecli/cmd/reboot_queue_enable.go index a435dc9a7..ede7f50ab 100644 --- a/pkg/ckecli/cmd/reboot_queue_enable.go +++ b/pkg/ckecli/cmd/reboot_queue_enable.go @@ -3,6 +3,7 @@ package cmd import ( "context" + "github.com/cybozu-go/cke" "github.com/cybozu-go/well" "github.com/spf13/cobra" ) @@ -14,7 +15,7 @@ var rebootQueueEnableCmd = &cobra.Command{ RunE: func(cmd *cobra.Command, args []string) error { well.Go(func(ctx context.Context) error { - return storage.EnableRebootQueue(ctx, true) + return storage.SetRebootQueueState(ctx, cke.RebootQueueStateEnabled) }) well.Stop() return well.Wait() diff --git a/pkg/ckecli/cmd/reboot_queue_is_enabled.go b/pkg/ckecli/cmd/reboot_queue_is_enabled.go index 4b4e2e104..3a97d313b 100644 --- a/pkg/ckecli/cmd/reboot_queue_is_enabled.go +++ b/pkg/ckecli/cmd/reboot_queue_is_enabled.go @@ -15,11 +15,11 @@ var rebootQueueIsEnabledCmd = &cobra.Command{ RunE: func(cmd *cobra.Command, args []string) error { well.Go(func(ctx context.Context) error { - disabled, err := storage.IsRebootQueueDisabled(ctx) + state, err := storage.GetRebootQueueState(ctx) if err != nil { return err } - fmt.Println(!disabled) + fmt.Println(string(state)) return nil }) well.Stop() diff --git a/reboot.go b/reboot.go index 94de3fffb..df3b32a65 100644 --- a/reboot.go +++ b/reboot.go @@ -4,6 +4,19 @@ import ( "time" ) +// RebootQueueState is state of reboot queue +type RebootQueueState string + +// RebootQueue states +const ( + // Reboot queue is enabled (should be set by ckecli) + RebootQueueStateEnabled = RebootQueueState("enabled") + // Reboot queue is requested to stop (should be set by ckecli) + RebootQueueStateStopping = RebootQueueState("stopping") + // Reboot queue is disabled (should be set by CKE) + RebootQueueStateDisabled = RebootQueueState("disabled") +) + // RebootStatus is status of reboot operation type RebootStatus string diff --git a/server/control.go b/server/control.go index c76e52292..08bb4fe70 100644 --- a/server/control.go +++ b/server/control.go @@ -317,11 +317,17 @@ func (c Controller) runOnce(ctx context.Context, leaderKey string, tick <-chan t rqEntries = cke.DedupRebootQueueEntries(rqEntries) if len(rqEntries) > 0 { - disabled, err := inf.Storage().IsRebootQueueDisabled(ctx) - if err != nil { + rqState, err := inf.Storage().GetRebootQueueState(ctx) + switch { + case err != nil: return err - } - if disabled { + case rqState == cke.RebootQueueStateStopping: + err := inf.Storage().SetRebootQueueState(ctx, cke.RebootQueueStateDisabled) + if err != nil { + return err + } + rqEntries = nil + case rqState == cke.RebootQueueStateDisabled: rqEntries = nil } } diff --git a/storage.go b/storage.go index 5895b8a85..915f87020 100644 --- a/storage.go +++ b/storage.go @@ -30,7 +30,8 @@ const ( KeyClusterRevision = "cluster-revision" KeyConstraints = "constraints" KeyLeader = "leader/" - KeyRebootsDisabled = "reboots/disabled" + KeyRebootsDisabled = "reboots/disabled" // TODO: remove this key + KeyRebootsState = "reboots/state" KeyRebootsPrefix = "reboots/data/" KeyRebootsWriteIndex = "reboots/write-index" KeyRecords = "records/" @@ -679,29 +680,35 @@ func (s Storage) GetSabakanURL(ctx context.Context) (string, error) { return s.getStringValue(ctx, KeySabakanURL) } -// IsRebootQueueDisabled returns true if reboot queue is disabled. -func (s Storage) IsRebootQueueDisabled(ctx context.Context) (bool, error) { - resp, err := s.Get(ctx, KeyRebootsDisabled) +// GetRebootQueueState returns reboot queue state. +func (s Storage) GetRebootQueueState(ctx context.Context) (RebootQueueState, error) { + resp, err := s.Get(ctx, KeyRebootsState) if err != nil { - return false, err + // defaulted to on + return RebootQueueStateEnabled, err } if resp.Count == 0 { - return false, nil + return RebootQueueStateEnabled, nil } - return bytes.Equal([]byte("true"), resp.Kvs[0].Value), nil + return RebootQueueState(resp.Kvs[0].Value), nil } -// EnableRebootQueue enables reboot queue processing when flag is true. -// When flag is false, reboot queue is not processed. -func (s Storage) EnableRebootQueue(ctx context.Context, flag bool) error { - var val string - if flag { - val = "false" - } else { - val = "true" +// SetRebootQueueState sets reboot queue state. +func (s Storage) SetRebootQueueState(ctx context.Context, state RebootQueueState) error { + // TODO: remove this temporary code + resp, err := s.Get(ctx, KeyRebootsDisabled) + if err != nil { + return err } - _, err := s.Put(ctx, KeyRebootsDisabled, val) + if resp.Count > 0 { + _, err := s.Delete(ctx, KeyRebootsDisabled) + if err != nil { + return err + } + } + + _, err = s.Put(ctx, KeyRebootsState, string(state)) return err } diff --git a/storage_test.go b/storage_test.go index 5daac1b0e..2708babdb 100644 --- a/storage_test.go +++ b/storage_test.go @@ -743,37 +743,37 @@ func testStorageReboot(t *testing.T) { } // rq is enabled by default - disabled, err := storage.IsRebootQueueDisabled(ctx) + state, err := storage.GetRebootQueueState(ctx) if err != nil { t.Fatal(err) } - if disabled { - t.Error("reboot queue should not be disabled by default") + if state != RebootQueueStateEnabled { + t.Error("reboot queue should be enabled by default") } // disable rq and get its state - err = storage.EnableRebootQueue(ctx, false) + err = storage.SetRebootQueueState(ctx, RebootQueueStateStopping) if err != nil { t.Fatal(err) } - disabled, err = storage.IsRebootQueueDisabled(ctx) + state, err = storage.GetRebootQueueState(ctx) if err != nil { t.Fatal(err) } - if !disabled { - t.Error("reboot queue could not be disabled") + if state != RebootQueueStateStopping { + t.Error("reboot queue state is not updated correctly") } // re-enable rq and get its state - err = storage.EnableRebootQueue(ctx, true) + err = storage.SetRebootQueueState(ctx, RebootQueueStateEnabled) if err != nil { t.Fatal(err) } - disabled, err = storage.IsRebootQueueDisabled(ctx) + state, err = storage.GetRebootQueueState(ctx) if err != nil { t.Fatal(err) } - if disabled { + if state != RebootQueueStateEnabled { t.Error("reboot queue could not be re-enabled") } } From a7286ef4b2196470578cc4c0a3572c6d6dfc4588 Mon Sep 17 00:00:00 2001 From: Daichi Sakaue Date: Thu, 14 Dec 2023 16:33:06 +0900 Subject: [PATCH 2/2] Reflect comments Signed-off-by: Daichi Sakaue --- docs/metrics.md | 4 +- docs/reboot.md | 2 +- metrics/collector.go | 29 +++++++---- metrics/updater_test.go | 56 +++++++++++++-------- pkg/ckecli/cmd/reboot_queue_disable.go | 3 +- pkg/ckecli/cmd/reboot_queue_enable.go | 3 +- pkg/ckecli/cmd/reboot_queue_is_enabled.go | 4 +- reboot.go | 13 ----- server/control.go | 26 ++++++---- storage.go | 59 +++++++++++++++-------- storage_test.go | 53 ++++++++++++++++---- 11 files changed, 161 insertions(+), 91 deletions(-) diff --git a/docs/metrics.md b/docs/metrics.md index 9b89be729..f1b8a01d1 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -9,10 +9,10 @@ CKE exposes the following metrics with the Prometheus format at `/metrics` REST | node_reboot_status | The reboot status of a node. | Gauge | `node`, `status` | | operation_phase | 1 if CKE is operating in the phase specified by the `phase` label. | Gauge | `phase` | | operation_phase_timestamp_seconds | The Unix timestamp when `operation_phase` was last updated. | Gauge | | -| reboot_queue_enabled | True (=1) if reboot queue is enabled or stopping. | Gauge | | +| reboot_queue_enabled | True (=1) if reboot queue is enabled. | Gauge | | | reboot_queue_entries | The number of reboot queue entries remaining. | Gauge | | | reboot_queue_items | The number reboot queue entries remaining per status. | Gauge | `status` | -| reboot_queue_running | True (=1) if reboot queue is enabled and the queue is not empty. | Gauge | | +| reboot_queue_running | True (=1) if reboot queue is running. | Gauge | | | sabakan_integration_successful | True (=1) if sabakan-integration satisfies constraints. | Gauge | | | sabakan_integration_timestamp_seconds | The Unix timestamp when `sabakan_integration_successful` was last updated. | Gauge | | | sabakan_workers | The number of worker nodes for each role. | Gauge | `role` | diff --git a/docs/reboot.md b/docs/reboot.md index 584cdb1ac..465eb3b72 100644 --- a/docs/reboot.md +++ b/docs/reboot.md @@ -43,7 +43,7 @@ The command writes reboot queue entry(s) and increments `reboots/write-index` at The queue is processed by CKE as follows: -1. If `reboots/state` is not `enabled`, it will not process the queue. +1. If `reboots/disabled` is `true`, it doesn't process the queue. 2. Check the reboot queue to find an entry. - If the number of nodes under processing is less than maximum concurrent reboots and the number of unreachable nodes that are not under this reboot process is not more than `maximum-unreachable-nodes-for-reboot` in the constraints, pick several nodes from front of the queue and start draining them. 1. Cordon the node. diff --git a/metrics/collector.go b/metrics/collector.go index 7f3222e77..6efe25ed7 100644 --- a/metrics/collector.go +++ b/metrics/collector.go @@ -39,7 +39,8 @@ type metricGroup struct { // This abstraction is for mock test. type storage interface { IsSabakanDisabled(context.Context) (bool, error) - GetRebootQueueState(ctx context.Context) (cke.RebootQueueState, error) + IsRebootQueueDisabled(ctx context.Context) (bool, error) + IsRebootQueueRunning(ctx context.Context) (bool, error) GetRebootsEntries(ctx context.Context) ([]*cke.RebootQueueEntry, error) GetCluster(ctx context.Context) (*cke.Cluster, error) } @@ -143,28 +144,36 @@ func (c nodeMetricsCollector) Collect(ch chan<- prometheus.Metric) { ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() - rqState, err := c.storage.GetRebootQueueState(ctx) + disabled, err := c.storage.IsRebootQueueDisabled(ctx) if err != nil { log.Error("failed to get if reboot queue is enabled", map[string]interface{}{ log.FnError: err, }) return } + var rqEnabled float64 + if !disabled { + rqEnabled = 1 + } - rqEntries, err := c.storage.GetRebootsEntries(ctx) + running, err := c.storage.IsRebootQueueRunning(ctx) if err != nil { - log.Error("failed to get reboots entries", map[string]interface{}{ + log.Error("failed to get if reboot queue is running", map[string]interface{}{ log.FnError: err, }) return } + var rqRunning float64 + if running { + rqRunning = 1 + } - var rqEnabled, rqRunning float64 - if rqState == cke.RebootQueueStateEnabled || rqState == cke.RebootQueueStateStopping { - rqEnabled = 1 - if len(rqEntries) > 0 { - rqRunning = 1 - } + rqEntries, err := c.storage.GetRebootsEntries(ctx) + if err != nil { + log.Error("failed to get reboots entries", map[string]interface{}{ + log.FnError: err, + }) + return } cluster, err := c.storage.GetCluster(ctx) diff --git a/metrics/updater_test.go b/metrics/updater_test.go index 90b07fac2..d9fdba042 100644 --- a/metrics/updater_test.go +++ b/metrics/updater_test.go @@ -43,7 +43,8 @@ type updateOperationPhaseTestCase struct { type updateRebootQueueEntriesTestCase struct { name string - state cke.RebootQueueState + enabled bool + running bool input []*cke.RebootQueueEntry expectedEnabled float64 expectedRunning float64 @@ -255,15 +256,17 @@ func testUpdateRebootQueueEntries(t *testing.T) { testCases := []updateRebootQueueEntriesTestCase{ { name: "zero", - state: cke.RebootQueueStateEnabled, + enabled: true, + running: false, input: nil, expectedEnabled: 1, expectedRunning: 0, expectedEntries: 0, }, { - name: "one", - state: cke.RebootQueueStateEnabled, + name: "one", + enabled: true, + running: true, input: []*cke.RebootQueueEntry{ {Status: cke.RebootStatusQueued}, }, @@ -272,8 +275,9 @@ func testUpdateRebootQueueEntries(t *testing.T) { expectedEntries: 1, }, { - name: "two", - state: cke.RebootQueueStateEnabled, + name: "two", + enabled: true, + running: true, input: []*cke.RebootQueueEntry{ {Status: cke.RebootStatusQueued}, {Status: cke.RebootStatusRebooting}, @@ -283,19 +287,21 @@ func testUpdateRebootQueueEntries(t *testing.T) { expectedEntries: 2, }, { - name: "two-stopping", - state: cke.RebootQueueStateStopping, + name: "two-stopping", + enabled: false, + running: true, input: []*cke.RebootQueueEntry{ {Status: cke.RebootStatusQueued}, {Status: cke.RebootStatusRebooting}, }, - expectedEnabled: 1, + expectedEnabled: 0, expectedRunning: 1, expectedEntries: 2, }, { - name: "two-disabled", - state: cke.RebootQueueStateDisabled, + name: "two-disabled", + enabled: false, + running: false, input: []*cke.RebootQueueEntry{ {Status: cke.RebootStatusQueued}, {Status: cke.RebootStatusRebooting}, @@ -311,7 +317,8 @@ func testUpdateRebootQueueEntries(t *testing.T) { defer ctx.Done() collector, storage := newTestCollector() - storage.setRebootQueueState(tt.state) + storage.enableRebootQueue(tt.enabled) + storage.setRebootQueueRunning(tt.running) storage.setRebootsEntries(tt.input) handler := GetHandler(collector) @@ -680,10 +687,11 @@ func newTestCollector() (prometheus.Collector, *testStorage) { } type testStorage struct { - sabakanEnabled bool - rebootQueueState cke.RebootQueueState - rebootEntries []*cke.RebootQueueEntry - cluster *cke.Cluster + sabakanEnabled bool + rebootQueueEnabled bool + rebootQueueRunning bool + rebootEntries []*cke.RebootQueueEntry + cluster *cke.Cluster } func (s *testStorage) enableSabakan(flag bool) { @@ -694,12 +702,20 @@ func (s *testStorage) IsSabakanDisabled(_ context.Context) (bool, error) { return !s.sabakanEnabled, nil } -func (s *testStorage) GetRebootQueueState(_ context.Context) (cke.RebootQueueState, error) { - return s.rebootQueueState, nil +func (s *testStorage) IsRebootQueueDisabled(_ context.Context) (bool, error) { + return !s.rebootQueueEnabled, nil +} + +func (s *testStorage) enableRebootQueue(flag bool) { + s.rebootQueueEnabled = flag +} + +func (s *testStorage) IsRebootQueueRunning(_ context.Context) (bool, error) { + return s.rebootQueueRunning, nil } -func (s *testStorage) setRebootQueueState(state cke.RebootQueueState) { - s.rebootQueueState = state +func (s *testStorage) setRebootQueueRunning(flag bool) { + s.rebootQueueRunning = flag } func (s *testStorage) setRebootsEntries(entries []*cke.RebootQueueEntry) { diff --git a/pkg/ckecli/cmd/reboot_queue_disable.go b/pkg/ckecli/cmd/reboot_queue_disable.go index 0c695c991..55a6cf9e4 100644 --- a/pkg/ckecli/cmd/reboot_queue_disable.go +++ b/pkg/ckecli/cmd/reboot_queue_disable.go @@ -3,7 +3,6 @@ package cmd import ( "context" - "github.com/cybozu-go/cke" "github.com/cybozu-go/well" "github.com/spf13/cobra" ) @@ -15,7 +14,7 @@ var rebootQueueDisableCmd = &cobra.Command{ RunE: func(cmd *cobra.Command, args []string) error { well.Go(func(ctx context.Context) error { - return storage.SetRebootQueueState(ctx, cke.RebootQueueStateStopping) + return storage.EnableRebootQueue(ctx, false) }) well.Stop() return well.Wait() diff --git a/pkg/ckecli/cmd/reboot_queue_enable.go b/pkg/ckecli/cmd/reboot_queue_enable.go index ede7f50ab..a435dc9a7 100644 --- a/pkg/ckecli/cmd/reboot_queue_enable.go +++ b/pkg/ckecli/cmd/reboot_queue_enable.go @@ -3,7 +3,6 @@ package cmd import ( "context" - "github.com/cybozu-go/cke" "github.com/cybozu-go/well" "github.com/spf13/cobra" ) @@ -15,7 +14,7 @@ var rebootQueueEnableCmd = &cobra.Command{ RunE: func(cmd *cobra.Command, args []string) error { well.Go(func(ctx context.Context) error { - return storage.SetRebootQueueState(ctx, cke.RebootQueueStateEnabled) + return storage.EnableRebootQueue(ctx, true) }) well.Stop() return well.Wait() diff --git a/pkg/ckecli/cmd/reboot_queue_is_enabled.go b/pkg/ckecli/cmd/reboot_queue_is_enabled.go index 3a97d313b..4b4e2e104 100644 --- a/pkg/ckecli/cmd/reboot_queue_is_enabled.go +++ b/pkg/ckecli/cmd/reboot_queue_is_enabled.go @@ -15,11 +15,11 @@ var rebootQueueIsEnabledCmd = &cobra.Command{ RunE: func(cmd *cobra.Command, args []string) error { well.Go(func(ctx context.Context) error { - state, err := storage.GetRebootQueueState(ctx) + disabled, err := storage.IsRebootQueueDisabled(ctx) if err != nil { return err } - fmt.Println(string(state)) + fmt.Println(!disabled) return nil }) well.Stop() diff --git a/reboot.go b/reboot.go index df3b32a65..94de3fffb 100644 --- a/reboot.go +++ b/reboot.go @@ -4,19 +4,6 @@ import ( "time" ) -// RebootQueueState is state of reboot queue -type RebootQueueState string - -// RebootQueue states -const ( - // Reboot queue is enabled (should be set by ckecli) - RebootQueueStateEnabled = RebootQueueState("enabled") - // Reboot queue is requested to stop (should be set by ckecli) - RebootQueueStateStopping = RebootQueueState("stopping") - // Reboot queue is disabled (should be set by CKE) - RebootQueueStateDisabled = RebootQueueState("disabled") -) - // RebootStatus is status of reboot operation type RebootStatus string diff --git a/server/control.go b/server/control.go index 08bb4fe70..0125ca1ec 100644 --- a/server/control.go +++ b/server/control.go @@ -317,21 +317,27 @@ func (c Controller) runOnce(ctx context.Context, leaderKey string, tick <-chan t rqEntries = cke.DedupRebootQueueEntries(rqEntries) if len(rqEntries) > 0 { - rqState, err := inf.Storage().GetRebootQueueState(ctx) - switch { - case err != nil: + disabled, err := inf.Storage().IsRebootQueueDisabled(ctx) + if err != nil { return err - case rqState == cke.RebootQueueStateStopping: - err := inf.Storage().SetRebootQueueState(ctx, cke.RebootQueueStateDisabled) - if err != nil { - return err - } - rqEntries = nil - case rqState == cke.RebootQueueStateDisabled: + } + if disabled { rqEntries = nil } } + running, err := inf.Storage().IsRebootQueueRunning(ctx) + if err != nil { + return err + } + runningNext := len(rqEntries) > 0 + if running != runningNext { + err := inf.Storage().SetRebootQueueRunning(ctx, runningNext) + if err != nil { + return err + } + } + nf := NewNodeFilter(cluster, status) apiServers := map[string]bool{} for _, node := range nf.ControlPlane() { diff --git a/storage.go b/storage.go index 915f87020..65f714b0b 100644 --- a/storage.go +++ b/storage.go @@ -30,8 +30,8 @@ const ( KeyClusterRevision = "cluster-revision" KeyConstraints = "constraints" KeyLeader = "leader/" - KeyRebootsDisabled = "reboots/disabled" // TODO: remove this key - KeyRebootsState = "reboots/state" + KeyRebootsDisabled = "reboots/disabled" + KeyRebootsRunning = "reboots/running" KeyRebootsPrefix = "reboots/data/" KeyRebootsWriteIndex = "reboots/write-index" KeyRecords = "records/" @@ -680,35 +680,54 @@ func (s Storage) GetSabakanURL(ctx context.Context) (string, error) { return s.getStringValue(ctx, KeySabakanURL) } -// GetRebootQueueState returns reboot queue state. -func (s Storage) GetRebootQueueState(ctx context.Context) (RebootQueueState, error) { - resp, err := s.Get(ctx, KeyRebootsState) +// IsRebootQueueDisabled returns true if reboot queue is disabled. +func (s Storage) IsRebootQueueDisabled(ctx context.Context) (bool, error) { + resp, err := s.Get(ctx, KeyRebootsDisabled) if err != nil { - // defaulted to on - return RebootQueueStateEnabled, err + return false, err } if resp.Count == 0 { - return RebootQueueStateEnabled, nil + return false, nil } - return RebootQueueState(resp.Kvs[0].Value), nil + return bytes.Equal([]byte("true"), resp.Kvs[0].Value), nil } -// SetRebootQueueState sets reboot queue state. -func (s Storage) SetRebootQueueState(ctx context.Context, state RebootQueueState) error { - // TODO: remove this temporary code - resp, err := s.Get(ctx, KeyRebootsDisabled) +// EnableRebootQueue enables reboot queue processing when flag is true. +// When flag is false, reboot queue is not processed. +func (s Storage) EnableRebootQueue(ctx context.Context, flag bool) error { + var val string + if flag { + val = "false" + } else { + val = "true" + } + _, err := s.Put(ctx, KeyRebootsDisabled, val) + return err +} + +// IsRebootQueueRunning returns true if CKE is processing reboot queue. +func (s Storage) IsRebootQueueRunning(ctx context.Context) (bool, error) { + resp, err := s.Get(ctx, KeyRebootsRunning) if err != nil { - return err + return false, err } - if resp.Count > 0 { - _, err := s.Delete(ctx, KeyRebootsDisabled) - if err != nil { - return err - } + if resp.Count == 0 { + return false, nil } - _, err = s.Put(ctx, KeyRebootsState, string(state)) + return bytes.Equal([]byte("true"), resp.Kvs[0].Value), nil +} + +// SetRebootQueueRunning is used to report if CKE is processing reboot queue. +func (s Storage) SetRebootQueueRunning(ctx context.Context, flag bool) error { + var val string + if flag { + val = "true" + } else { + val = "false" + } + _, err := s.Put(ctx, KeyRebootsRunning, val) return err } diff --git a/storage_test.go b/storage_test.go index 2708babdb..8ab7a5f5c 100644 --- a/storage_test.go +++ b/storage_test.go @@ -743,39 +743,74 @@ func testStorageReboot(t *testing.T) { } // rq is enabled by default - state, err := storage.GetRebootQueueState(ctx) + disabled, err := storage.IsRebootQueueDisabled(ctx) if err != nil { t.Fatal(err) } - if state != RebootQueueStateEnabled { + if disabled { t.Error("reboot queue should be enabled by default") } // disable rq and get its state - err = storage.SetRebootQueueState(ctx, RebootQueueStateStopping) + err = storage.EnableRebootQueue(ctx, false) if err != nil { t.Fatal(err) } - state, err = storage.GetRebootQueueState(ctx) + disabled, err = storage.IsRebootQueueDisabled(ctx) if err != nil { t.Fatal(err) } - if state != RebootQueueStateStopping { - t.Error("reboot queue state is not updated correctly") + if !disabled { + t.Error("reboot queue could not be disabled") } // re-enable rq and get its state - err = storage.SetRebootQueueState(ctx, RebootQueueStateEnabled) + err = storage.EnableRebootQueue(ctx, true) if err != nil { t.Fatal(err) } - state, err = storage.GetRebootQueueState(ctx) + disabled, err = storage.IsRebootQueueDisabled(ctx) if err != nil { t.Fatal(err) } - if state != RebootQueueStateEnabled { + if disabled { t.Error("reboot queue could not be re-enabled") } + + // rq is not running by default + running, err := storage.IsRebootQueueRunning(ctx) + if err != nil { + t.Fatal(err) + } + if running { + t.Error("reboot queue should not be running by default") + } + + // report running rq and get its state + err = storage.SetRebootQueueRunning(ctx, true) + if err != nil { + t.Fatal(err) + } + running, err = storage.IsRebootQueueRunning(ctx) + if err != nil { + t.Fatal(err) + } + if !running { + t.Error("reboot queue could not be reported running") + } + + // report not running rq and get its state + err = storage.SetRebootQueueRunning(ctx, false) + if err != nil { + t.Fatal(err) + } + running, err = storage.IsRebootQueueRunning(ctx) + if err != nil { + t.Fatal(err) + } + if running { + t.Error("reboot queue could not be reported not running") + } } func testStatus(t *testing.T) {