From 31e450866be4486dd41d4423f7d890ada55ead0d Mon Sep 17 00:00:00 2001 From: YZ775 Date: Mon, 8 Jul 2024 02:15:15 +0000 Subject: [PATCH 01/10] Reboot --- cluster.go | 11 ++++++ mtest/cke-cluster.yml | 5 +++ op/kube_node_remove.go | 87 +++++++++++++++++++++++++++++++++++++++--- server/strategy.go | 2 +- 4 files changed, 99 insertions(+), 6 deletions(-) diff --git a/cluster.go b/cluster.go index b5e14a506..e8f84a2f7 100644 --- a/cluster.go +++ b/cluster.go @@ -320,6 +320,16 @@ const DefaultRepairEvictionTimeoutSeconds = 600 const DefaultRepairHealthCheckCommandTimeoutSeconds = 30 const DefaultRepairCommandTimeoutSeconds = 30 +type Retire struct { + ShutdownCommand []string `json:"shutdown_command"` + CheckCommand []string `json:"check_command"` + CommandTimeoutSeconds *int `json:"command_timeout_seconds,omitempty"` + CheckTimeoutSeconds *int `json:"check_timeout_seconds,omitempty"` +} + +const DefaultRetireCommandTimeoutSeconds = 30 +const DefaultRetireCheckTimeoutSeconds = 300 + // Options is a set of optional parameters for k8s components. type Options struct { Etcd EtcdParams `json:"etcd"` @@ -343,6 +353,7 @@ type Cluster struct { DNSService string `json:"dns_service"` Reboot Reboot `json:"reboot"` Repair Repair `json:"repair"` + Retire Retire `json:"retire"` Options Options `json:"options"` } diff --git a/mtest/cke-cluster.yml b/mtest/cke-cluster.yml index c78582077..b70583610 100644 --- a/mtest/cke-cluster.yml +++ b/mtest/cke-cluster.yml @@ -29,6 +29,11 @@ repair: need_drain: true watch_seconds: 30 health_check_command: ["sh", "-c", "test -f /tmp/mtest-repair-$1 && echo true", "health_check"] +retire: + shutdown_command: ["true"] + check_command: ["bash", "-c", "echo 'Off'"] + command_timeout_seconds: 30 + check_timeout_seconds: 300 options: kube-api: extra_binds: diff --git a/op/kube_node_remove.go b/op/kube_node_remove.go index 196f30a5c..7a8798f71 100644 --- a/op/kube_node_remove.go +++ b/op/kube_node_remove.go @@ -4,8 +4,11 @@ import ( "context" "fmt" "strings" + "time" "github.com/cybozu-go/cke" + "github.com/cybozu-go/log" + "github.com/cybozu-go/well" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -16,12 +19,13 @@ import ( type kubeNodeRemove struct { apiserver *cke.Node nodes []*corev1.Node + config *cke.Retire done bool } // KubeNodeRemoveOp removes k8s Node resources. -func KubeNodeRemoveOp(apiserver *cke.Node, nodes []*corev1.Node) cke.Operator { - return &kubeNodeRemove{apiserver: apiserver, nodes: nodes} +func KubeNodeRemoveOp(apiserver *cke.Node, nodes []*corev1.Node, config *cke.Retire) cke.Operator { + return &kubeNodeRemove{apiserver: apiserver, nodes: nodes, config: config} } func (o *kubeNodeRemove) Name() string { @@ -34,7 +38,14 @@ func (o *kubeNodeRemove) NextCommand() cke.Commander { } o.done = true - return nodeRemoveCommand{o.apiserver, o.nodes} + return nodeRemoveCommand{ + o.apiserver, + o.nodes, + o.config.ShutdownCommand, + o.config.CheckCommand, + o.config.CommandTimeoutSeconds, + o.config.CheckTimeoutSeconds, + } } func (o *kubeNodeRemove) Targets() []string { @@ -44,8 +55,12 @@ func (o *kubeNodeRemove) Targets() []string { } type nodeRemoveCommand struct { - apiserver *cke.Node - nodes []*corev1.Node + apiserver *cke.Node + nodes []*corev1.Node + shutdownCommand []string + checkCommand []string + timeoutSeconds *int + checkTimeoutSeconds *int } func (c nodeRemoveCommand) Run(ctx context.Context, inf cke.Infrastructure, _ string) error { @@ -77,6 +92,68 @@ func (c nodeRemoveCommand) Run(ctx context.Context, inf cke.Infrastructure, _ st return fmt.Errorf("failed to patch node %s: %v", n.Name, err) } } + err := func() error { + ctx := ctx + timeout := cke.DefaultRetireCommandTimeoutSeconds + if c.timeoutSeconds != nil { + timeout = *c.timeoutSeconds + } + if timeout != 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, time.Second*time.Duration(timeout)) + defer cancel() + } + args := append(c.shutdownCommand[1:], n.Name) + command := well.CommandContext(ctx, c.shutdownCommand[0], args...) + return command.Run() + }() + if err != nil { + return fmt.Errorf("failed to shutdown node %s: %v", n.Name, err) + } + + err = func() error { + ctx := ctx + checkTimeout := cke.DefaultRetireCheckTimeoutSeconds + if c.checkTimeoutSeconds != nil { + checkTimeout = *c.checkTimeoutSeconds + } + timeout := time.After(time.Duration(checkTimeout) * time.Second) + ticker := time.NewTicker(10 * time.Second) + for { + select { + case <-timeout: + return fmt.Errorf("timeout") + case <-ticker.C: + args := append(c.checkCommand[1:], n.Name) + command := well.CommandContext(ctx, c.checkCommand[0], args...) + stdout, err := command.Output() + if err != nil { + log.Warn("failed to check shutdown status of node", map[string]interface{}{ + log.FnError: err, + "node": n.Name, + }) + continue + } + if strings.TrimSuffix(string(stdout), "\n") == "Off" { + return nil + } + } + } + }() + if err != nil { + return fmt.Errorf("failed to check shutdown status of node %s: %v", n.Name, err) + } + shutdownTaint := corev1.Taint{ + Key: "node.kubernetes.io/out-of-service", + Value: "nodeshutdown", + Effect: corev1.TaintEffectNoExecute, + } + n.Spec.Taints = append(n.Spec.Taints, shutdownTaint) + _, err = nodesAPI.Update(ctx, n, metav1.UpdateOptions{}) + if err != nil { + return fmt.Errorf("failed to update node %s: %v", n.Name, err) + } + err = nodesAPI.Delete(ctx, n.Name, metav1.DeleteOptions{}) if err != nil { return fmt.Errorf("failed to delete node %s: %v", n.Name, err) diff --git a/server/strategy.go b/server/strategy.go index a29b3c3d1..8f0df14d9 100644 --- a/server/strategy.go +++ b/server/strategy.go @@ -353,7 +353,7 @@ OUTER_ETCD: } if nodes := nf.NonClusterNodes(); len(nodes) > 0 { - ops = append(ops, op.KubeNodeRemoveOp(apiServer, nodes)) + ops = append(ops, op.KubeNodeRemoveOp(apiServer, nodes, &c.Retire)) } return ops From 421ea1855c7c3ad4709f802e9d8d2fc3320b3481 Mon Sep 17 00:00:00 2001 From: YZ775 Date: Tue, 22 Oct 2024 07:21:51 +0000 Subject: [PATCH 02/10] fix to execute arbitrary command when retirement --- cluster.go | 9 ++--- mtest/cke-cluster.yml | 6 +-- op/kube_node_remove.go | 92 +++++++++++------------------------------- 3 files changed, 28 insertions(+), 79 deletions(-) diff --git a/cluster.go b/cluster.go index e8f84a2f7..3e9fb0dfa 100644 --- a/cluster.go +++ b/cluster.go @@ -321,14 +321,11 @@ const DefaultRepairHealthCheckCommandTimeoutSeconds = 30 const DefaultRepairCommandTimeoutSeconds = 30 type Retire struct { - ShutdownCommand []string `json:"shutdown_command"` - CheckCommand []string `json:"check_command"` - CommandTimeoutSeconds *int `json:"command_timeout_seconds,omitempty"` - CheckTimeoutSeconds *int `json:"check_timeout_seconds,omitempty"` + OptionalCommand []string `json:"optional_command,omitempty"` + OptionalCommandTimeoutSeconds *int `json:"optional_command_timeout_seconds,omitempty"` } -const DefaultRetireCommandTimeoutSeconds = 30 -const DefaultRetireCheckTimeoutSeconds = 300 +const DefaultRetireOptionalCommandTimeoutSeconds = 30 // Options is a set of optional parameters for k8s components. type Options struct { diff --git a/mtest/cke-cluster.yml b/mtest/cke-cluster.yml index b70583610..ddd10b8e8 100644 --- a/mtest/cke-cluster.yml +++ b/mtest/cke-cluster.yml @@ -30,10 +30,8 @@ repair: watch_seconds: 30 health_check_command: ["sh", "-c", "test -f /tmp/mtest-repair-$1 && echo true", "health_check"] retire: - shutdown_command: ["true"] - check_command: ["bash", "-c", "echo 'Off'"] - command_timeout_seconds: 30 - check_timeout_seconds: 300 + optional_command: ["true"] + optional_command_timeout_seconds: 30 options: kube-api: extra_binds: diff --git a/op/kube_node_remove.go b/op/kube_node_remove.go index 7a8798f71..7db7e81e6 100644 --- a/op/kube_node_remove.go +++ b/op/kube_node_remove.go @@ -7,7 +7,6 @@ import ( "time" "github.com/cybozu-go/cke" - "github.com/cybozu-go/log" "github.com/cybozu-go/well" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -41,10 +40,8 @@ func (o *kubeNodeRemove) NextCommand() cke.Commander { return nodeRemoveCommand{ o.apiserver, o.nodes, - o.config.ShutdownCommand, - o.config.CheckCommand, - o.config.CommandTimeoutSeconds, - o.config.CheckTimeoutSeconds, + o.config.OptionalCommand, + o.config.OptionalCommandTimeoutSeconds, } } @@ -55,12 +52,10 @@ func (o *kubeNodeRemove) Targets() []string { } type nodeRemoveCommand struct { - apiserver *cke.Node - nodes []*corev1.Node - shutdownCommand []string - checkCommand []string - timeoutSeconds *int - checkTimeoutSeconds *int + apiserver *cke.Node + nodes []*corev1.Node + optionalCommand []string + optionalCommandTimeoutSeconds *int } func (c nodeRemoveCommand) Run(ctx context.Context, inf cke.Infrastructure, _ string) error { @@ -92,66 +87,25 @@ func (c nodeRemoveCommand) Run(ctx context.Context, inf cke.Infrastructure, _ st return fmt.Errorf("failed to patch node %s: %v", n.Name, err) } } - err := func() error { - ctx := ctx - timeout := cke.DefaultRetireCommandTimeoutSeconds - if c.timeoutSeconds != nil { - timeout = *c.timeoutSeconds - } - if timeout != 0 { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, time.Second*time.Duration(timeout)) - defer cancel() - } - args := append(c.shutdownCommand[1:], n.Name) - command := well.CommandContext(ctx, c.shutdownCommand[0], args...) - return command.Run() - }() - if err != nil { - return fmt.Errorf("failed to shutdown node %s: %v", n.Name, err) - } - - err = func() error { - ctx := ctx - checkTimeout := cke.DefaultRetireCheckTimeoutSeconds - if c.checkTimeoutSeconds != nil { - checkTimeout = *c.checkTimeoutSeconds - } - timeout := time.After(time.Duration(checkTimeout) * time.Second) - ticker := time.NewTicker(10 * time.Second) - for { - select { - case <-timeout: - return fmt.Errorf("timeout") - case <-ticker.C: - args := append(c.checkCommand[1:], n.Name) - command := well.CommandContext(ctx, c.checkCommand[0], args...) - stdout, err := command.Output() - if err != nil { - log.Warn("failed to check shutdown status of node", map[string]interface{}{ - log.FnError: err, - "node": n.Name, - }) - continue - } - if strings.TrimSuffix(string(stdout), "\n") == "Off" { - return nil - } + if len(c.optionalCommand) != 0 { + err := func() error { + ctx := ctx + timeout := cke.DefaultRetireOptionalCommandTimeoutSeconds + if c.optionalCommandTimeoutSeconds != nil { + timeout = *c.optionalCommandTimeoutSeconds } + if timeout != 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, time.Second*time.Duration(timeout)) + defer cancel() + } + args := append(c.optionalCommand[1:], n.Name) + command := well.CommandContext(ctx, c.optionalCommand[0], args...) + return command.Run() + }() + if err != nil { + return fmt.Errorf("failed to execute optional command in retirement %s: %v", n.Name, err) } - }() - if err != nil { - return fmt.Errorf("failed to check shutdown status of node %s: %v", n.Name, err) - } - shutdownTaint := corev1.Taint{ - Key: "node.kubernetes.io/out-of-service", - Value: "nodeshutdown", - Effect: corev1.TaintEffectNoExecute, - } - n.Spec.Taints = append(n.Spec.Taints, shutdownTaint) - _, err = nodesAPI.Update(ctx, n, metav1.UpdateOptions{}) - if err != nil { - return fmt.Errorf("failed to update node %s: %v", n.Name, err) } err = nodesAPI.Delete(ctx, n.Name, metav1.DeleteOptions{}) From 8835089879a50b851c8e5fcc82b3e0258ae5de0d Mon Sep 17 00:00:00 2001 From: YZ775 Date: Thu, 24 Oct 2024 04:18:43 +0000 Subject: [PATCH 03/10] add command after repair --- cluster.go | 3 +++ op/status.go | 25 +++++++++++++++++++++++++ 2 files changed, 28 insertions(+) diff --git a/cluster.go b/cluster.go index 3e9fb0dfa..72a4072ba 100644 --- a/cluster.go +++ b/cluster.go @@ -304,6 +304,8 @@ type RepairOperation struct { RepairSteps []RepairStep `json:"repair_steps"` HealthCheckCommand []string `json:"health_check_command"` CommandTimeoutSeconds *int `json:"command_timeout_seconds,omitempty"` + SuccessCommand []string `json:"success_command,omitempty"` + SuccessCommandTimeout *int `json:"success_command_timeout,omitempty"` } type RepairStep struct { @@ -319,6 +321,7 @@ const DefaultMaxConcurrentRepairs = 1 const DefaultRepairEvictionTimeoutSeconds = 600 const DefaultRepairHealthCheckCommandTimeoutSeconds = 30 const DefaultRepairCommandTimeoutSeconds = 30 +const DefaultRepairSuccessCommandTimeoutSeconds = 30 type Retire struct { OptionalCommand []string `json:"optional_command,omitempty"` diff --git a/op/status.go b/op/status.go index 30db44ec5..8f4221ba1 100644 --- a/op/status.go +++ b/op/status.go @@ -613,6 +613,31 @@ func GetRepairQueueStatus(ctx context.Context, inf cke.Infrastructure, n *cke.No } if healthy { rqs.RepairCompleted[entry.Address] = true + //execute Success command + op, err := entry.GetMatchingRepairOperation(cluster) + if err != nil { + return cke.RepairQueueStatus{}, err + } + if op.SuccessCommand != nil { + err := func() error { + ctx := ctx + timeout := cke.DefaultRepairSuccessCommandTimeoutSeconds + if op.SuccessCommandTimeout != nil { + timeout = *op.SuccessCommandTimeout + } + if timeout != 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, time.Second*time.Duration(timeout)) + defer cancel() + } + args := append(op.SuccessCommand[1:], entry.Address) + command := well.CommandContext(ctx, op.SuccessCommand[0], args...) + return command.Run() + }() + if err != nil { + return cke.RepairQueueStatus{}, err + } + } } } From 5d66e2c3e535021d19b9aa4326c0681232cc274d Mon Sep 17 00:00:00 2001 From: YZ775 Date: Mon, 11 Nov 2024 02:40:10 +0000 Subject: [PATCH 04/10] revert optional command in retire --- cluster.go | 8 -------- mtest/cke-cluster.yml | 3 --- op/kube_node_remove.go | 41 +++++------------------------------------ server/strategy.go | 2 +- 4 files changed, 6 insertions(+), 48 deletions(-) diff --git a/cluster.go b/cluster.go index 72a4072ba..9cab2ab36 100644 --- a/cluster.go +++ b/cluster.go @@ -323,13 +323,6 @@ const DefaultRepairHealthCheckCommandTimeoutSeconds = 30 const DefaultRepairCommandTimeoutSeconds = 30 const DefaultRepairSuccessCommandTimeoutSeconds = 30 -type Retire struct { - OptionalCommand []string `json:"optional_command,omitempty"` - OptionalCommandTimeoutSeconds *int `json:"optional_command_timeout_seconds,omitempty"` -} - -const DefaultRetireOptionalCommandTimeoutSeconds = 30 - // Options is a set of optional parameters for k8s components. type Options struct { Etcd EtcdParams `json:"etcd"` @@ -353,7 +346,6 @@ type Cluster struct { DNSService string `json:"dns_service"` Reboot Reboot `json:"reboot"` Repair Repair `json:"repair"` - Retire Retire `json:"retire"` Options Options `json:"options"` } diff --git a/mtest/cke-cluster.yml b/mtest/cke-cluster.yml index ddd10b8e8..c78582077 100644 --- a/mtest/cke-cluster.yml +++ b/mtest/cke-cluster.yml @@ -29,9 +29,6 @@ repair: need_drain: true watch_seconds: 30 health_check_command: ["sh", "-c", "test -f /tmp/mtest-repair-$1 && echo true", "health_check"] -retire: - optional_command: ["true"] - optional_command_timeout_seconds: 30 options: kube-api: extra_binds: diff --git a/op/kube_node_remove.go b/op/kube_node_remove.go index 7db7e81e6..196f30a5c 100644 --- a/op/kube_node_remove.go +++ b/op/kube_node_remove.go @@ -4,10 +4,8 @@ import ( "context" "fmt" "strings" - "time" "github.com/cybozu-go/cke" - "github.com/cybozu-go/well" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -18,13 +16,12 @@ import ( type kubeNodeRemove struct { apiserver *cke.Node nodes []*corev1.Node - config *cke.Retire done bool } // KubeNodeRemoveOp removes k8s Node resources. -func KubeNodeRemoveOp(apiserver *cke.Node, nodes []*corev1.Node, config *cke.Retire) cke.Operator { - return &kubeNodeRemove{apiserver: apiserver, nodes: nodes, config: config} +func KubeNodeRemoveOp(apiserver *cke.Node, nodes []*corev1.Node) cke.Operator { + return &kubeNodeRemove{apiserver: apiserver, nodes: nodes} } func (o *kubeNodeRemove) Name() string { @@ -37,12 +34,7 @@ func (o *kubeNodeRemove) NextCommand() cke.Commander { } o.done = true - return nodeRemoveCommand{ - o.apiserver, - o.nodes, - o.config.OptionalCommand, - o.config.OptionalCommandTimeoutSeconds, - } + return nodeRemoveCommand{o.apiserver, o.nodes} } func (o *kubeNodeRemove) Targets() []string { @@ -52,10 +44,8 @@ func (o *kubeNodeRemove) Targets() []string { } type nodeRemoveCommand struct { - apiserver *cke.Node - nodes []*corev1.Node - optionalCommand []string - optionalCommandTimeoutSeconds *int + apiserver *cke.Node + nodes []*corev1.Node } func (c nodeRemoveCommand) Run(ctx context.Context, inf cke.Infrastructure, _ string) error { @@ -87,27 +77,6 @@ func (c nodeRemoveCommand) Run(ctx context.Context, inf cke.Infrastructure, _ st return fmt.Errorf("failed to patch node %s: %v", n.Name, err) } } - if len(c.optionalCommand) != 0 { - err := func() error { - ctx := ctx - timeout := cke.DefaultRetireOptionalCommandTimeoutSeconds - if c.optionalCommandTimeoutSeconds != nil { - timeout = *c.optionalCommandTimeoutSeconds - } - if timeout != 0 { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, time.Second*time.Duration(timeout)) - defer cancel() - } - args := append(c.optionalCommand[1:], n.Name) - command := well.CommandContext(ctx, c.optionalCommand[0], args...) - return command.Run() - }() - if err != nil { - return fmt.Errorf("failed to execute optional command in retirement %s: %v", n.Name, err) - } - } - err = nodesAPI.Delete(ctx, n.Name, metav1.DeleteOptions{}) if err != nil { return fmt.Errorf("failed to delete node %s: %v", n.Name, err) diff --git a/server/strategy.go b/server/strategy.go index 8f0df14d9..a29b3c3d1 100644 --- a/server/strategy.go +++ b/server/strategy.go @@ -353,7 +353,7 @@ OUTER_ETCD: } if nodes := nf.NonClusterNodes(); len(nodes) > 0 { - ops = append(ops, op.KubeNodeRemoveOp(apiServer, nodes, &c.Retire)) + ops = append(ops, op.KubeNodeRemoveOp(apiServer, nodes)) } return ops From e27876103f0be2721d8a6e4c907b4ade72f005c2 Mon Sep 17 00:00:00 2001 From: YZ775 Date: Mon, 11 Nov 2024 08:18:47 +0000 Subject: [PATCH 05/10] mode execution of success command to repairFinish --- op/repair_execute.go | 16 ++++++++------ op/repair_finish.go | 51 +++++++++++++++++++++++++++++++++++++++++--- op/status.go | 25 ---------------------- server/strategy.go | 8 +++---- 4 files changed, 62 insertions(+), 38 deletions(-) diff --git a/op/repair_execute.go b/op/repair_execute.go index d855551a4..4c19382b2 100644 --- a/op/repair_execute.go +++ b/op/repair_execute.go @@ -13,14 +13,16 @@ import ( type repairExecuteOp struct { finished bool - entry *cke.RepairQueueEntry - step *cke.RepairStep + entry *cke.RepairQueueEntry + step *cke.RepairStep + cluster *cke.Cluster } -func RepairExecuteOp(entry *cke.RepairQueueEntry, step *cke.RepairStep) cke.Operator { +func RepairExecuteOp(entry *cke.RepairQueueEntry, step *cke.RepairStep, cluster *cke.Cluster) cke.Operator { return &repairExecuteOp{ - entry: entry, - step: step, + entry: entry, + step: step, + cluster: cluster, } } @@ -40,6 +42,7 @@ func (o *repairExecuteOp) NextCommand() cke.Commander { timeoutSeconds: o.step.CommandTimeoutSeconds, retries: o.step.CommandRetries, interval: o.step.CommandInterval, + cluster: o.cluster, } } @@ -53,6 +56,7 @@ type repairExecuteCommand struct { timeoutSeconds *int retries *int interval *int + cluster *cke.Cluster } func (c repairExecuteCommand) Run(ctx context.Context, inf cke.Infrastructure, _ string) error { @@ -110,7 +114,7 @@ RETRY: "address": c.entry.Address, "command": strings.Join(c.command, " "), }) - return repairFinish(ctx, inf, c.entry, false) + return repairFinish(ctx, inf, c.entry, false, c.cluster) } func (c repairExecuteCommand) Command() cke.Command { diff --git a/op/repair_finish.go b/op/repair_finish.go index fe889dbc8..2848a9382 100644 --- a/op/repair_finish.go +++ b/op/repair_finish.go @@ -5,6 +5,8 @@ import ( "time" "github.com/cybozu-go/cke" + "github.com/cybozu-go/log" + "github.com/cybozu-go/well" ) type repairFinishOp struct { @@ -12,12 +14,14 @@ type repairFinishOp struct { entry *cke.RepairQueueEntry succeeded bool + cluster *cke.Cluster } -func RepairFinishOp(entry *cke.RepairQueueEntry, succeeded bool) cke.Operator { +func RepairFinishOp(entry *cke.RepairQueueEntry, succeeded bool, cluster *cke.Cluster) cke.Operator { return &repairFinishOp{ entry: entry, succeeded: succeeded, + cluster: cluster, } } @@ -34,6 +38,7 @@ func (o *repairFinishOp) NextCommand() cke.Commander { return repairFinishCommand{ entry: o.entry, succeeded: o.succeeded, + cluster: o.cluster, } } @@ -44,10 +49,14 @@ func (o *repairFinishOp) Targets() []string { type repairFinishCommand struct { entry *cke.RepairQueueEntry succeeded bool + cluster *cke.Cluster } func (c repairFinishCommand) Run(ctx context.Context, inf cke.Infrastructure, _ string) error { - return repairFinish(ctx, inf, c.entry, c.succeeded) + if c.succeeded { + + } + return repairFinish(ctx, inf, c.entry, c.succeeded, c.cluster) } func (c repairFinishCommand) Command() cke.Command { @@ -57,8 +66,44 @@ func (c repairFinishCommand) Command() cke.Command { } } -func repairFinish(ctx context.Context, inf cke.Infrastructure, entry *cke.RepairQueueEntry, succeeded bool) error { +func repairFinish(ctx context.Context, inf cke.Infrastructure, entry *cke.RepairQueueEntry, succeeded bool, cluster *cke.Cluster) error { if succeeded { + //execute Success command + err := func() error { + op, err := entry.GetMatchingRepairOperation(cluster) + if err != nil { + return err + } + if op.SuccessCommand != nil { + err := func() error { + ctx := ctx + timeout := cke.DefaultRepairSuccessCommandTimeoutSeconds + if op.SuccessCommandTimeout != nil { + timeout = *op.SuccessCommandTimeout + } + if timeout != 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, time.Second*time.Duration(timeout)) + defer cancel() + } + args := append(op.SuccessCommand[1:], entry.Address) + command := well.CommandContext(ctx, op.SuccessCommand[0], args...) + return command.Run() + }() + if err != nil { + return err + } + } + return nil + }() + if err != nil { + entry.Status = cke.RepairStatusFailed + log.Warn("SuccessCommand failed", map[string]interface{}{ + log.FnError: err, + "index": entry.Index, + "address": entry.Address, + }) + } entry.Status = cke.RepairStatusSucceeded } else { entry.Status = cke.RepairStatusFailed diff --git a/op/status.go b/op/status.go index 8f4221ba1..30db44ec5 100644 --- a/op/status.go +++ b/op/status.go @@ -613,31 +613,6 @@ func GetRepairQueueStatus(ctx context.Context, inf cke.Infrastructure, n *cke.No } if healthy { rqs.RepairCompleted[entry.Address] = true - //execute Success command - op, err := entry.GetMatchingRepairOperation(cluster) - if err != nil { - return cke.RepairQueueStatus{}, err - } - if op.SuccessCommand != nil { - err := func() error { - ctx := ctx - timeout := cke.DefaultRepairSuccessCommandTimeoutSeconds - if op.SuccessCommandTimeout != nil { - timeout = *op.SuccessCommandTimeout - } - if timeout != 0 { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, time.Second*time.Duration(timeout)) - defer cancel() - } - args := append(op.SuccessCommand[1:], entry.Address) - command := well.CommandContext(ctx, op.SuccessCommand[0], args...) - return command.Run() - }() - if err != nil { - return cke.RepairQueueStatus{}, err - } - } } } diff --git a/server/strategy.go b/server/strategy.go index a29b3c3d1..5c849c878 100644 --- a/server/strategy.go +++ b/server/strategy.go @@ -731,7 +731,7 @@ func repairOps(c *cke.Cluster, cs *cke.ClusterStatus, constraints *cke.Constrain continue } if rqs.RepairCompleted[entry.Address] { - ops = append(ops, op.RepairFinishOp(entry, true)) + ops = append(ops, op.RepairFinishOp(entry, true, c)) continue } switch entry.Status { @@ -826,7 +826,7 @@ func repairOps(c *cke.Cluster, cs *cke.ClusterStatus, constraints *cke.Constrain // Though ErrRepairStepOutOfRange may be caused by real misconfiguration, // e.g., by decreasing "repair_steps" in cluster.yaml, we treat the error // as the end of the steps for simplicity. - ops = append(ops, op.RepairFinishOp(entry, false)) + ops = append(ops, op.RepairFinishOp(entry, false, c)) continue } @@ -838,7 +838,7 @@ func repairOps(c *cke.Cluster, cs *cke.ClusterStatus, constraints *cke.Constrain continue } if !(step.NeedDrain && entry.IsInCluster()) { - ops = append(ops, op.RepairExecuteOp(entry, step)) + ops = append(ops, op.RepairExecuteOp(entry, step, c)) continue } // DrainBackOffExpire has been confirmed, so start drain now. @@ -849,7 +849,7 @@ func repairOps(c *cke.Cluster, cs *cke.ClusterStatus, constraints *cke.Constrain continue } if rqs.DrainCompleted[entry.Address] { - ops = append(ops, op.RepairExecuteOp(entry, step)) + ops = append(ops, op.RepairExecuteOp(entry, step, c)) continue } if entry.LastTransitionTime.Before(evictionStartLimit) { From 52d692d1bd61195edd844fe4802e2eb396b7e65e Mon Sep 17 00:00:00 2001 From: YZ775 Date: Mon, 11 Nov 2024 09:10:17 +0000 Subject: [PATCH 06/10] fix repeair_finish --- op/repair_finish.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/op/repair_finish.go b/op/repair_finish.go index 2848a9382..9a7c0fe7f 100644 --- a/op/repair_finish.go +++ b/op/repair_finish.go @@ -68,6 +68,7 @@ func (c repairFinishCommand) Command() cke.Command { func repairFinish(ctx context.Context, inf cke.Infrastructure, entry *cke.RepairQueueEntry, succeeded bool, cluster *cke.Cluster) error { if succeeded { + entry.Status = cke.RepairStatusSucceeded //execute Success command err := func() error { op, err := entry.GetMatchingRepairOperation(cluster) @@ -104,7 +105,6 @@ func repairFinish(ctx context.Context, inf cke.Infrastructure, entry *cke.Repair "address": entry.Address, }) } - entry.Status = cke.RepairStatusSucceeded } else { entry.Status = cke.RepairStatusFailed } From f847d210c3d8d887194df5786ac9376b7e9037c7 Mon Sep 17 00:00:00 2001 From: YZ775 Date: Wed, 20 Nov 2024 01:43:15 +0000 Subject: [PATCH 07/10] add test for successCommand --- mtest/cke-cluster.yml | 2 ++ mtest/repair_test.go | 34 ++++++++++++++++++++++++++++++++-- op/repair_finish.go | 29 ++++++++++++----------------- 3 files changed, 46 insertions(+), 19 deletions(-) diff --git a/mtest/cke-cluster.yml b/mtest/cke-cluster.yml index c78582077..214a24de6 100644 --- a/mtest/cke-cluster.yml +++ b/mtest/cke-cluster.yml @@ -28,6 +28,8 @@ repair: command_timeout_seconds: 30 need_drain: true watch_seconds: 30 + success_command: ["sh", "-c", "touch /tmp/mtest-repair-success-$1", "success"] + success_command_timeout_seconds: 30 health_check_command: ["sh", "-c", "test -f /tmp/mtest-repair-$1 && echo true", "health_check"] options: kube-api: diff --git a/mtest/repair_test.go b/mtest/repair_test.go index 668b6d70a..a29ea421c 100644 --- a/mtest/repair_test.go +++ b/mtest/repair_test.go @@ -62,6 +62,17 @@ func repairShouldNotProceed() { }).WithTimeout(time.Second * 60).Should(Succeed()) } +func repairSuccessCommandSuccess(node string) { + cmdSuccess := false + for _, host := range []string{host1, host2} { + _, _, err := execAt(host, "docker", "exec", "cke", "test", "-f", "/tmp/mtest-repair-success-"+node) + if err == nil { + cmdSuccess = true + } + } + Expect(cmdSuccess).To(BeTrue()) +} + func testRepairOperations() { // this will run: // - RepairDrainStartOp @@ -110,15 +121,34 @@ func testRepairOperations() { repairQueueAdd(node1) waitRepairSuccess(cluster) nodesShouldBeSchedulable(node1) + repairSuccessCommandSuccess(node1) ckecliSafe("repair-queue", "delete-finished") waitRepairEmpty(cluster) + By("setting erroneous success command") + originalSuceessCommand := cluster.Repair.RepairProcedures[0].RepairOperations[0].SuccessCommand + cluster.Repair.RepairProcedures[0].RepairOperations[0].SuccessCommand = []string{"false"} + _, err := ckecliClusterSet(cluster) + Expect(err).NotTo(HaveOccurred()) + time.Sleep(time.Second * 3) + + repairQueueAdd(node1) + waitRepairFailure(cluster) + + ckecliSafe("repair-queue", "delete-finished") + waitRepairEmpty(cluster) + + By("restoring success command") + cluster.Repair.RepairProcedures[0].RepairOperations[0].SuccessCommand = originalSuceessCommand + _, err = ckecliClusterSet(cluster) + Expect(err).NotTo(HaveOccurred()) + time.Sleep(time.Second * 3) + By("setting erroneous repair command") originalRepairCommand := cluster.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].RepairCommand - cluster.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].RepairCommand = []string{"false"} - _, err := ckecliClusterSet(cluster) + _, err = ckecliClusterSet(cluster) Expect(err).NotTo(HaveOccurred()) time.Sleep(time.Second * 3) diff --git a/op/repair_finish.go b/op/repair_finish.go index 9a7c0fe7f..8aab9bcab 100644 --- a/op/repair_finish.go +++ b/op/repair_finish.go @@ -76,24 +76,19 @@ func repairFinish(ctx context.Context, inf cke.Infrastructure, entry *cke.Repair return err } if op.SuccessCommand != nil { - err := func() error { - ctx := ctx - timeout := cke.DefaultRepairSuccessCommandTimeoutSeconds - if op.SuccessCommandTimeout != nil { - timeout = *op.SuccessCommandTimeout - } - if timeout != 0 { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, time.Second*time.Duration(timeout)) - defer cancel() - } - args := append(op.SuccessCommand[1:], entry.Address) - command := well.CommandContext(ctx, op.SuccessCommand[0], args...) - return command.Run() - }() - if err != nil { - return err + ctx := ctx + timeout := cke.DefaultRepairSuccessCommandTimeoutSeconds + if op.SuccessCommandTimeout != nil { + timeout = *op.SuccessCommandTimeout } + if timeout != 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, time.Second*time.Duration(timeout)) + defer cancel() + } + args := append(op.SuccessCommand[1:], entry.Address) + command := well.CommandContext(ctx, op.SuccessCommand[0], args...) + return command.Run() } return nil }() From bd8d389db4e274a89cdf5ea31383ea976fa93003 Mon Sep 17 00:00:00 2001 From: YZ775 Date: Thu, 21 Nov 2024 08:42:17 +0000 Subject: [PATCH 08/10] update document --- docs/cluster.md | 16 ++++++++-------- docs/repair.md | 6 ++++++ 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/docs/cluster.md b/docs/cluster.md index 30f0f96a4..6dfc7dc0e 100644 --- a/docs/cluster.md +++ b/docs/cluster.md @@ -124,12 +124,14 @@ The repair configurations control the [repair functionality](repair.md). #### RepairOperation -| Name | Required | Type | Description | -| ------------------------- | -------- | -------------- | --------------------------------------------------------------- | -| `operation` | true | string | Name of repair operation. | -| `repair_steps` | true | `[]RepairStep` | Sequences of [repair steps](#repairstep). | -| `health_check_command` | true | array | A command to check repaired machine's health. List of strings. | -| `command_timeout_seconds` | false | \*int | Deadline for health retrieval. Zero means infinity. Default: 30 | +| Name | Required | Type | Description | +| ------------------------- | -------- | -------------- | ----------------------------------------------------------------------------- | +| `operation` | true | string | Name of repair operation. | +| `repair_steps` | true | `[]RepairStep` | Sequences of [repair steps](#repairstep). | +| `health_check_command` | true | array | A command to check repaired machine's health. List of strings. | +| `command_timeout_seconds` | false | \*int | Deadline for health retrieval. Zero means infinity. Default: 30 | +| `success_command` | false | array | A command executed when repair is succeeded. List of strings. | +| `success_command_timeout` | false | \*int | Deadline for execution of succcess_command. Zero means infinity. Default: 30 | ##### RepairStep @@ -296,6 +298,4 @@ It should end with either `.conf` or `.conflist`. Fields in `config` may have default values. Some fields are overwritten by CKE. Please see the source code for more details. -[CRI]: https://github.com/kubernetes/kubernetes/blob/242a97307b34076d5d8f5bbeb154fa4d97c9ef1d/docs/devel/container-runtime-interface.md -[log rotation for CRI runtime]: https://github.com/kubernetes/kubernetes/issues/58823 [LabelSelector]: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#label-selectors diff --git a/docs/repair.md b/docs/repair.md index 906399b51..2696d4cb4 100644 --- a/docs/repair.md +++ b/docs/repair.md @@ -108,6 +108,12 @@ CKE decides to execute a repair operations if its `operation` matches `OPERATION When CKE executes the check command, it appends the IP address of the target machine to the command. The command should return a string `true` if it evaluates the machine as healthy. +`success_command` and its timeout are used when all of the repair steps are successfully finished. +When CKE executes the success command, it appends the IP address of the target machine to the command. +If the repair is failed, the command is not executed. +If the `success_command` fails, CKE changes the status of the queue entry to `failed`. +Users can use this command if they want to execute a command after the repair steps. + ### Repair steps A repair step is a combination of: From ab0627007e2414f8d594bf8071c1a73e412a4abe Mon Sep 17 00:00:00 2001 From: YZ775 Date: Thu, 21 Nov 2024 08:43:44 +0000 Subject: [PATCH 09/10] remove unused code --- op/repair_finish.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/op/repair_finish.go b/op/repair_finish.go index 8aab9bcab..3cc24e6e4 100644 --- a/op/repair_finish.go +++ b/op/repair_finish.go @@ -53,9 +53,6 @@ type repairFinishCommand struct { } func (c repairFinishCommand) Run(ctx context.Context, inf cke.Infrastructure, _ string) error { - if c.succeeded { - - } return repairFinish(ctx, inf, c.entry, c.succeeded, c.cluster) } From c7b8888029bd5b11d764ad01c3dcf7b0078f0fc0 Mon Sep 17 00:00:00 2001 From: YZ775 Date: Mon, 9 Dec 2024 01:32:56 +0000 Subject: [PATCH 10/10] fix for review --- docs/cluster.md | 2 +- docs/repair.md | 6 +++--- mtest/repair_test.go | 4 ++-- op/repair_finish.go | 30 +++++++++++++++--------------- 4 files changed, 21 insertions(+), 21 deletions(-) diff --git a/docs/cluster.md b/docs/cluster.md index 6dfc7dc0e..79a3ace23 100644 --- a/docs/cluster.md +++ b/docs/cluster.md @@ -130,7 +130,7 @@ The repair configurations control the [repair functionality](repair.md). | `repair_steps` | true | `[]RepairStep` | Sequences of [repair steps](#repairstep). | | `health_check_command` | true | array | A command to check repaired machine's health. List of strings. | | `command_timeout_seconds` | false | \*int | Deadline for health retrieval. Zero means infinity. Default: 30 | -| `success_command` | false | array | A command executed when repair is succeeded. List of strings. | +| `success_command` | false | array | A command executed when repair succeeded. List of strings. | | `success_command_timeout` | false | \*int | Deadline for execution of succcess_command. Zero means infinity. Default: 30 | ##### RepairStep diff --git a/docs/repair.md b/docs/repair.md index 2696d4cb4..092925f3b 100644 --- a/docs/repair.md +++ b/docs/repair.md @@ -108,11 +108,11 @@ CKE decides to execute a repair operations if its `operation` matches `OPERATION When CKE executes the check command, it appends the IP address of the target machine to the command. The command should return a string `true` if it evaluates the machine as healthy. -`success_command` and its timeout are used when all of the repair steps are successfully finished. +`success_command` and its timeout are used when the machine is evaluated as healthy and the repair operation finishes successfully. When CKE executes the success command, it appends the IP address of the target machine to the command. -If the repair is failed, the command is not executed. +If the repair operation has failed, the command is not executed. If the `success_command` fails, CKE changes the status of the queue entry to `failed`. -Users can use this command if they want to execute a command after the repair steps. +Users can use this command if they want to execute a command as a post-processing of repair operation. ### Repair steps diff --git a/mtest/repair_test.go b/mtest/repair_test.go index a29ea421c..9242f2e0e 100644 --- a/mtest/repair_test.go +++ b/mtest/repair_test.go @@ -127,7 +127,7 @@ func testRepairOperations() { waitRepairEmpty(cluster) By("setting erroneous success command") - originalSuceessCommand := cluster.Repair.RepairProcedures[0].RepairOperations[0].SuccessCommand + originalSuccessCommand := cluster.Repair.RepairProcedures[0].RepairOperations[0].SuccessCommand cluster.Repair.RepairProcedures[0].RepairOperations[0].SuccessCommand = []string{"false"} _, err := ckecliClusterSet(cluster) Expect(err).NotTo(HaveOccurred()) @@ -140,7 +140,7 @@ func testRepairOperations() { waitRepairEmpty(cluster) By("restoring success command") - cluster.Repair.RepairProcedures[0].RepairOperations[0].SuccessCommand = originalSuceessCommand + cluster.Repair.RepairProcedures[0].RepairOperations[0].SuccessCommand = originalSuccessCommand _, err = ckecliClusterSet(cluster) Expect(err).NotTo(HaveOccurred()) time.Sleep(time.Second * 3) diff --git a/op/repair_finish.go b/op/repair_finish.go index 3cc24e6e4..9a0b205c4 100644 --- a/op/repair_finish.go +++ b/op/repair_finish.go @@ -72,22 +72,22 @@ func repairFinish(ctx context.Context, inf cke.Infrastructure, entry *cke.Repair if err != nil { return err } - if op.SuccessCommand != nil { - ctx := ctx - timeout := cke.DefaultRepairSuccessCommandTimeoutSeconds - if op.SuccessCommandTimeout != nil { - timeout = *op.SuccessCommandTimeout - } - if timeout != 0 { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, time.Second*time.Duration(timeout)) - defer cancel() - } - args := append(op.SuccessCommand[1:], entry.Address) - command := well.CommandContext(ctx, op.SuccessCommand[0], args...) - return command.Run() + if op.SuccessCommand == nil { + return nil } - return nil + ctx := ctx + timeout := cke.DefaultRepairSuccessCommandTimeoutSeconds + if op.SuccessCommandTimeout != nil { + timeout = *op.SuccessCommandTimeout + } + if timeout != 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, time.Second*time.Duration(timeout)) + defer cancel() + } + args := append(op.SuccessCommand[1:], entry.Address) + command := well.CommandContext(ctx, op.SuccessCommand[0], args...) + return command.Run() }() if err != nil { entry.Status = cke.RepairStatusFailed