Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add feature to execute user-defined command when repair is successfully finished #753

Merged
merged 10 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,8 @@ type RepairOperation struct {
RepairSteps []RepairStep `json:"repair_steps"`
HealthCheckCommand []string `json:"health_check_command"`
CommandTimeoutSeconds *int `json:"command_timeout_seconds,omitempty"`
SuccessCommand []string `json:"success_command,omitempty"`
SuccessCommandTimeout *int `json:"success_command_timeout,omitempty"`
}

type RepairStep struct {
Expand All @@ -319,6 +321,7 @@ const DefaultMaxConcurrentRepairs = 1
const DefaultRepairEvictionTimeoutSeconds = 600
const DefaultRepairHealthCheckCommandTimeoutSeconds = 30
const DefaultRepairCommandTimeoutSeconds = 30
const DefaultRepairSuccessCommandTimeoutSeconds = 30

// Options is a set of optional parameters for k8s components.
type Options struct {
Expand Down
16 changes: 8 additions & 8 deletions docs/cluster.md
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,14 @@ The repair configurations control the [repair functionality](repair.md).

#### RepairOperation

| Name | Required | Type | Description |
| ------------------------- | -------- | -------------- | --------------------------------------------------------------- |
| `operation` | true | string | Name of repair operation. |
| `repair_steps` | true | `[]RepairStep` | Sequences of [repair steps](#repairstep). |
| `health_check_command` | true | array | A command to check repaired machine's health. List of strings. |
| `command_timeout_seconds` | false | \*int | Deadline for health retrieval. Zero means infinity. Default: 30 |
| Name | Required | Type | Description |
| ------------------------- | -------- | -------------- | ----------------------------------------------------------------------------- |
| `operation` | true | string | Name of repair operation. |
| `repair_steps` | true | `[]RepairStep` | Sequences of [repair steps](#repairstep). |
| `health_check_command` | true | array | A command to check repaired machine's health. List of strings. |
| `command_timeout_seconds` | false | \*int | Deadline for health retrieval. Zero means infinity. Default: 30 |
| `success_command` | false | array | A command executed when repair succeeded. List of strings. |
| `success_command_timeout` | false | \*int | Deadline for execution of succcess_command. Zero means infinity. Default: 30 |

##### RepairStep

Expand Down Expand Up @@ -296,6 +298,4 @@ It should end with either `.conf` or `.conflist`.
Fields in `config` may have default values. Some fields are overwritten by CKE.
Please see the source code for more details.

[CRI]: https://github.com/kubernetes/kubernetes/blob/242a97307b34076d5d8f5bbeb154fa4d97c9ef1d/docs/devel/container-runtime-interface.md
[log rotation for CRI runtime]: https://github.com/kubernetes/kubernetes/issues/58823
[LabelSelector]: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#label-selectors
6 changes: 6 additions & 0 deletions docs/repair.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@ CKE decides to execute a repair operations if its `operation` matches `OPERATION
When CKE executes the check command, it appends the IP address of the target machine to the command.
The command should return a string `true` if it evaluates the machine as healthy.

`success_command` and its timeout are used when the machine is evaluated as healthy and the repair operation finishes successfully.
When CKE executes the success command, it appends the IP address of the target machine to the command.
If the repair operation has failed, the command is not executed.
If the `success_command` fails, CKE changes the status of the queue entry to `failed`.
Users can use this command if they want to execute a command as a post-processing of repair operation.

### Repair steps

A repair step is a combination of:
Expand Down
2 changes: 2 additions & 0 deletions mtest/cke-cluster.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ repair:
command_timeout_seconds: 30
need_drain: true
watch_seconds: 30
success_command: ["sh", "-c", "touch /tmp/mtest-repair-success-$1", "success"]
success_command_timeout_seconds: 30
health_check_command: ["sh", "-c", "test -f /tmp/mtest-repair-$1 && echo true", "health_check"]
options:
kube-api:
Expand Down
34 changes: 32 additions & 2 deletions mtest/repair_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,17 @@ func repairShouldNotProceed() {
}).WithTimeout(time.Second * 60).Should(Succeed())
}

func repairSuccessCommandSuccess(node string) {
cmdSuccess := false
for _, host := range []string{host1, host2} {
_, _, err := execAt(host, "docker", "exec", "cke", "test", "-f", "/tmp/mtest-repair-success-"+node)
if err == nil {
cmdSuccess = true
}
}
Expect(cmdSuccess).To(BeTrue())
}

func testRepairOperations() {
// this will run:
// - RepairDrainStartOp
Expand Down Expand Up @@ -110,15 +121,34 @@ func testRepairOperations() {
repairQueueAdd(node1)
waitRepairSuccess(cluster)
nodesShouldBeSchedulable(node1)
repairSuccessCommandSuccess(node1)

ckecliSafe("repair-queue", "delete-finished")
waitRepairEmpty(cluster)

By("setting erroneous success command")
originalSuccessCommand := cluster.Repair.RepairProcedures[0].RepairOperations[0].SuccessCommand
cluster.Repair.RepairProcedures[0].RepairOperations[0].SuccessCommand = []string{"false"}
_, err := ckecliClusterSet(cluster)
Expect(err).NotTo(HaveOccurred())
time.Sleep(time.Second * 3)

repairQueueAdd(node1)
waitRepairFailure(cluster)

ckecliSafe("repair-queue", "delete-finished")
waitRepairEmpty(cluster)

By("restoring success command")
cluster.Repair.RepairProcedures[0].RepairOperations[0].SuccessCommand = originalSuccessCommand
_, err = ckecliClusterSet(cluster)
Expect(err).NotTo(HaveOccurred())
time.Sleep(time.Second * 3)

By("setting erroneous repair command")
originalRepairCommand := cluster.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].RepairCommand

cluster.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].RepairCommand = []string{"false"}
_, err := ckecliClusterSet(cluster)
_, err = ckecliClusterSet(cluster)
Expect(err).NotTo(HaveOccurred())
time.Sleep(time.Second * 3)

Expand Down
16 changes: 10 additions & 6 deletions op/repair_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,16 @@ import (
type repairExecuteOp struct {
finished bool

entry *cke.RepairQueueEntry
step *cke.RepairStep
entry *cke.RepairQueueEntry
step *cke.RepairStep
cluster *cke.Cluster
}

func RepairExecuteOp(entry *cke.RepairQueueEntry, step *cke.RepairStep) cke.Operator {
func RepairExecuteOp(entry *cke.RepairQueueEntry, step *cke.RepairStep, cluster *cke.Cluster) cke.Operator {
return &repairExecuteOp{
entry: entry,
step: step,
entry: entry,
step: step,
cluster: cluster,
}
}

Expand All @@ -40,6 +42,7 @@ func (o *repairExecuteOp) NextCommand() cke.Commander {
timeoutSeconds: o.step.CommandTimeoutSeconds,
retries: o.step.CommandRetries,
interval: o.step.CommandInterval,
cluster: o.cluster,
}
}

Expand All @@ -53,6 +56,7 @@ type repairExecuteCommand struct {
timeoutSeconds *int
retries *int
interval *int
cluster *cke.Cluster
}

func (c repairExecuteCommand) Run(ctx context.Context, inf cke.Infrastructure, _ string) error {
Expand Down Expand Up @@ -110,7 +114,7 @@ RETRY:
"address": c.entry.Address,
"command": strings.Join(c.command, " "),
})
return repairFinish(ctx, inf, c.entry, false)
return repairFinish(ctx, inf, c.entry, false, c.cluster)
}

func (c repairExecuteCommand) Command() cke.Command {
Expand Down
43 changes: 40 additions & 3 deletions op/repair_finish.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,23 @@ import (
"time"

"github.com/cybozu-go/cke"
"github.com/cybozu-go/log"
"github.com/cybozu-go/well"
)

type repairFinishOp struct {
finished bool

entry *cke.RepairQueueEntry
succeeded bool
cluster *cke.Cluster
}

func RepairFinishOp(entry *cke.RepairQueueEntry, succeeded bool) cke.Operator {
func RepairFinishOp(entry *cke.RepairQueueEntry, succeeded bool, cluster *cke.Cluster) cke.Operator {
return &repairFinishOp{
entry: entry,
succeeded: succeeded,
cluster: cluster,
}
}

Expand All @@ -34,6 +38,7 @@ func (o *repairFinishOp) NextCommand() cke.Commander {
return repairFinishCommand{
entry: o.entry,
succeeded: o.succeeded,
cluster: o.cluster,
}
}

Expand All @@ -44,10 +49,11 @@ func (o *repairFinishOp) Targets() []string {
type repairFinishCommand struct {
entry *cke.RepairQueueEntry
succeeded bool
cluster *cke.Cluster
}

func (c repairFinishCommand) Run(ctx context.Context, inf cke.Infrastructure, _ string) error {
return repairFinish(ctx, inf, c.entry, c.succeeded)
return repairFinish(ctx, inf, c.entry, c.succeeded, c.cluster)
}

func (c repairFinishCommand) Command() cke.Command {
Expand All @@ -57,9 +63,40 @@ func (c repairFinishCommand) Command() cke.Command {
}
}

func repairFinish(ctx context.Context, inf cke.Infrastructure, entry *cke.RepairQueueEntry, succeeded bool) error {
func repairFinish(ctx context.Context, inf cke.Infrastructure, entry *cke.RepairQueueEntry, succeeded bool, cluster *cke.Cluster) error {
if succeeded {
entry.Status = cke.RepairStatusSucceeded
//execute Success command
err := func() error {
op, err := entry.GetMatchingRepairOperation(cluster)
if err != nil {
return err
}
if op.SuccessCommand == nil {
return nil
}
ctx := ctx
timeout := cke.DefaultRepairSuccessCommandTimeoutSeconds
if op.SuccessCommandTimeout != nil {
timeout = *op.SuccessCommandTimeout
}
if timeout != 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Second*time.Duration(timeout))
defer cancel()
}
args := append(op.SuccessCommand[1:], entry.Address)
command := well.CommandContext(ctx, op.SuccessCommand[0], args...)
return command.Run()
}()
if err != nil {
entry.Status = cke.RepairStatusFailed
log.Warn("SuccessCommand failed", map[string]interface{}{
log.FnError: err,
"index": entry.Index,
"address": entry.Address,
})
}
} else {
entry.Status = cke.RepairStatusFailed
}
Expand Down
8 changes: 4 additions & 4 deletions server/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,7 @@ func repairOps(c *cke.Cluster, cs *cke.ClusterStatus, constraints *cke.Constrain
continue
}
if rqs.RepairCompleted[entry.Address] {
ops = append(ops, op.RepairFinishOp(entry, true))
ops = append(ops, op.RepairFinishOp(entry, true, c))
continue
}
switch entry.Status {
Expand Down Expand Up @@ -826,7 +826,7 @@ func repairOps(c *cke.Cluster, cs *cke.ClusterStatus, constraints *cke.Constrain
// Though ErrRepairStepOutOfRange may be caused by real misconfiguration,
// e.g., by decreasing "repair_steps" in cluster.yaml, we treat the error
// as the end of the steps for simplicity.
ops = append(ops, op.RepairFinishOp(entry, false))
ops = append(ops, op.RepairFinishOp(entry, false, c))
continue
}

Expand All @@ -838,7 +838,7 @@ func repairOps(c *cke.Cluster, cs *cke.ClusterStatus, constraints *cke.Constrain
continue
}
if !(step.NeedDrain && entry.IsInCluster()) {
ops = append(ops, op.RepairExecuteOp(entry, step))
ops = append(ops, op.RepairExecuteOp(entry, step, c))
continue
}
// DrainBackOffExpire has been confirmed, so start drain now.
Expand All @@ -849,7 +849,7 @@ func repairOps(c *cke.Cluster, cs *cke.ClusterStatus, constraints *cke.Constrain
continue
}
if rqs.DrainCompleted[entry.Address] {
ops = append(ops, op.RepairExecuteOp(entry, step))
ops = append(ops, op.RepairExecuteOp(entry, step, c))
continue
}
if entry.LastTransitionTime.Before(evictionStartLimit) {
Expand Down