Skip to content

Commit

Permalink
Process repair-queue
Browse files Browse the repository at this point in the history
Signed-off-by: morimoto-cybozu <[email protected]>
  • Loading branch information
morimoto-cybozu committed Dec 26, 2023
1 parent 059c9f7 commit 4ccf1dc
Show file tree
Hide file tree
Showing 15 changed files with 1,493 additions and 24 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
runs-on: ubuntu-22.04
strategy:
matrix:
suite: [functions, robustness, operators, reboot]
suite: [functions, robustness, operators, reboot, repair]
env:
SUITE: ${{ matrix.suite }}
CLUSTER: "cke-cluster.yml"
Expand Down
3 changes: 3 additions & 0 deletions mtest/assets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,8 @@ var rebootSlowEvictionDeploymentYAML []byte
//go:embed reboot-alittleslow-eviction-deployment.yaml
var rebootALittleSlowEvictionDeploymentYAML []byte

//go:embed repair-deployment.yaml
var repairDeploymentYAML []byte

//go:embed webhook-resources.yaml
var webhookYAML []byte
10 changes: 10 additions & 0 deletions mtest/cke-cluster.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@ reboot:
boot_check_command: ["bash", "-c", "echo 'true'"]
eviction_timeout_seconds: 30
command_timeout_seconds: 30
repair:
repair_procedures:
- machine_types: ["type1"]
repair_operations:
- operation: "op1"
repair_steps:
- repair_command: ["sh", "-c", "touch /tmp/mtest-repair-$1", "repair"]
need_drain: true
watch_seconds: 30
health_check_command: ["sh", "-c", "test -f /tmp/mtest-repair-$1 && echo true", "health_check"]
options:
kube-api:
extra_binds:
Expand Down
29 changes: 29 additions & 0 deletions mtest/repair-deployment.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
apiVersion: apps/v1
kind: Deployment
metadata:
namespace: repair-test
name: sample
spec:
replicas: 3
selector:
matchLabels:
app: sample
template:
metadata:
labels:
app: sample
spec:
containers:
- name: httpd
image: ghcr.io/cybozu/testhttpd:0
---
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
namespace: repair-test
name: sample
spec:
maxUnavailable: 0
selector:
matchLabels:
app: sample
254 changes: 254 additions & 0 deletions mtest/repair_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
package mtest

import (
"bytes"
"encoding/json"
"fmt"
"strconv"
"time"

"github.com/cybozu-go/cke"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func getRepairEntries() ([]*cke.RepairQueueEntry, error) {
var entries []*cke.RepairQueueEntry
data, stderr, err := ckecli("repair-queue", "list")
if err != nil {
return nil, fmt.Errorf("%w, stdout: %s, stderr: %s", err, data, stderr)
}
err = json.Unmarshal(data, &entries)
if err != nil {
return nil, err
}
return entries, nil
}

func waitRepairCompletion(cluster *cke.Cluster, statuses []cke.RepairStatus) {
ts := time.Now()
EventuallyWithOffset(2, func(g Gomega) {
entries, err := getRepairEntries()
g.Expect(err).NotTo(HaveOccurred())
for _, entry := range entries {
g.Expect(entry.Status).To(BeElementOf(statuses))
}
g.Expect(checkCluster(cluster, ts)).NotTo(HaveOccurred())
}).Should(Succeed())
}

func waitRepairSuccess(cluster *cke.Cluster) {
waitRepairCompletion(cluster, []cke.RepairStatus{cke.RepairStatusSucceeded})
}

func waitRepairFailure(cluster *cke.Cluster) {
waitRepairCompletion(cluster, []cke.RepairStatus{cke.RepairStatusFailed})
}

func waitRepairEmpty(cluster *cke.Cluster) {
waitRepairCompletion(cluster, nil)
}

func repairShouldNotProceed() {
ConsistentlyWithOffset(1, func(g Gomega) {
entries, err := getRepairEntries()
g.Expect(err).NotTo(HaveOccurred())
for _, entry := range entries {
g.Expect(entry.Status).NotTo(BeElementOf(cke.RepairStatusSucceeded, cke.RepairStatusFailed))
}
}).WithTimeout(time.Second * 60).Should(Succeed())
}

func testRepairOperations() {
// this will run:
// - RepairDrainStartOp
// - RepairExecuteOp
// - RepairDrainTimeoutOp
// - RepairFinishOp
// - RepairDequeueOp

// This test examines status gathering and CLI commands as well as operations.
// It is not necessary to test the behaviors examined in "server/strategy_test.go".

// This test uses "touch" and "test -f" for repair_command and health_check_command.
// "true" and "echo true" are insufficient for repair queue test because
// CKE first checks health and never calls "RepairDrainStartOp" for healthy machines.
It("should execute repair commands", func() {
cluster := getCluster()
for i := 0; i < 3; i++ {
cluster.Nodes[i].ControlPlane = true
}

currentWriteIndex := 0
repairQueueAdd := func(address string) {
execSafeAt(host1, "docker", "exec", "cke", "find", "/tmp", "-maxdepth", "1", "-name", "mtest-repair-*", "-delete")
execSafeAt(host2, "docker", "exec", "cke", "find", "/tmp", "-maxdepth", "1", "-name", "mtest-repair-*", "-delete")
_, stderr, err := ckecli("repair-queue", "add", "op1", "type1", address)
ExpectWithOffset(1, err).NotTo(HaveOccurred(), "stderr: %s", stderr)
currentWriteIndex++
}

By("disabling repair queue")
ckecliSafe("repair-queue", "disable")
stdout := ckecliSafe("repair-queue", "is-enabled")
Expect(bytes.TrimSpace(stdout)).To(Equal([]byte("false")))

repairQueueAdd(node1)
repairShouldNotProceed()

ckecliSafe("repair-queue", "delete-unfinished")
waitRepairEmpty(cluster)

By("enabling repair queue")
ckecliSafe("repair-queue", "enable")
stdout = ckecliSafe("repair-queue", "is-enabled")
Expect(bytes.TrimSpace(stdout)).To(Equal([]byte("true")))

repairQueueAdd(node1)
waitRepairSuccess(cluster)
nodesShouldBeSchedulable(node1)

ckecliSafe("repair-queue", "delete-finished")
waitRepairEmpty(cluster)

By("setting erroneous repair command")
originalRepairCommand := cluster.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].RepairCommand

cluster.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].RepairCommand = []string{"false"}
_, err := ckecliClusterSet(cluster)
Expect(err).NotTo(HaveOccurred())
time.Sleep(time.Second * 3)

repairQueueAdd(node1)
waitRepairFailure(cluster)

ckecliSafe("repair-queue", "delete-finished")
waitRepairEmpty(cluster)

By("setting noop repair command")
cluster.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].RepairCommand = []string{"true"}
_, err = ckecliClusterSet(cluster)
Expect(err).NotTo(HaveOccurred())
time.Sleep(time.Second * 3)

repairQueueAdd(node1)
waitRepairFailure(cluster)

ckecliSafe("repair-queue", "delete-finished")
waitRepairEmpty(cluster)

By("setting noop repair command and long watch duration")
originalWatchSeconds := cluster.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].WatchSeconds

longWatch := 600
cluster.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].WatchSeconds = &longWatch
_, err = ckecliClusterSet(cluster)
Expect(err).NotTo(HaveOccurred())
time.Sleep(time.Second * 3)

repairQueueAdd(node1)
repairShouldNotProceed()

ckecliSafe("repair-queue", "delete", strconv.Itoa(currentWriteIndex-1))
waitRepairEmpty(cluster)

By("restoring repair command and watch duration")
cluster.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].RepairCommand = originalRepairCommand
cluster.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].WatchSeconds = originalWatchSeconds
_, err = ckecliClusterSet(cluster)
Expect(err).NotTo(HaveOccurred())
time.Sleep(time.Second * 3)

By("deploying drain-blocking workload")
_, stderr, err := kubectl("create", "namespace", "repair-test")
Expect(err).NotTo(HaveOccurred(), "stderr: %s", stderr)
_, stderr, err = kubectl("label", "namespace", "repair-test", "protected=true")
Expect(err).NotTo(HaveOccurred(), "stderr: %s", stderr)
_, stderr, err = kubectlWithInput(repairDeploymentYAML, "apply", "-f", "-")
Expect(err).NotTo(HaveOccurred(), "stderr: %s", stderr)
nodeNames := make([]string, 3)
Eventually(func(g Gomega) {
stdout, stderr, err := kubectl("get", "-n=repair-test", "deployment", "sample", "-o=json")
g.Expect(err).NotTo(HaveOccurred(), "stderr: %s", stderr)
var deploy appsv1.Deployment
err = json.Unmarshal(stdout, &deploy)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(deploy.Status.ReadyReplicas).To(Equal(int32(3)))

stdout, stderr, err = kubectl("get", "-n=repair-test", "pod", "-l=app=sample", "-o=json")
g.Expect(err).NotTo(HaveOccurred(), "stderr: %s", stderr)
var pods corev1.PodList
err = json.Unmarshal(stdout, &pods)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(pods.Items).To(HaveLen(3))
for i, pod := range pods.Items {
nodeNames[i] = pod.Spec.NodeName
g.Expect(nodeNames[i]).NotTo(BeEmpty())
}
}).Should(Succeed())

repairQueueAdd(nodeNames[0])
repairShouldNotProceed()

entries, err := getRepairEntries()
Expect(err).NotTo(HaveOccurred())
Expect(entries).To(HaveLen(1))
Expect(entries[0].Status).To(Equal(cke.RepairStatusProcessing))
Expect(entries[0].StepStatus).To(Equal(cke.RepairStepStatusWaiting))
Expect(entries[0].DrainBackOffExpire).NotTo(Equal(time.Time{}))
Expect(entries[0].DrainBackOffCount).NotTo(BeZero())

ckecliSafe("repair-queue", "reset-backoff")
entries, err = getRepairEntries()
Expect(err).NotTo(HaveOccurred())
Expect(entries).To(HaveLen(1))
Expect(entries[0].DrainBackOffExpire).To(Equal(time.Time{}))
Expect(entries[0].DrainBackOffCount).To(BeZero())

ckecliSafe("repair-queue", "delete-unfinished")
waitRepairEmpty(cluster)

By("setting protected_namespace to include workload")
cluster.Repair.ProtectedNamespaces = &metav1.LabelSelector{
MatchLabels: map[string]string{"protected": "true"},
}
_, err = ckecliClusterSet(cluster)
Expect(err).NotTo(HaveOccurred())
time.Sleep(time.Second * 3)

repairQueueAdd(nodeNames[0])
repairShouldNotProceed()

ckecliSafe("repair-queue", "delete-unfinished")
waitRepairEmpty(cluster)

By("setting protected_namespace not to include workload")
cluster.Repair.ProtectedNamespaces = &metav1.LabelSelector{
MatchLabels: map[string]string{"foo": "bar"},
}
_, err = ckecliClusterSet(cluster)
Expect(err).NotTo(HaveOccurred())
time.Sleep(time.Second * 3)

repairQueueAdd(nodeNames[0])
waitRepairSuccess(cluster)
nodesShouldBeSchedulable(nodeNames[0])

ckecliSafe("repair-queue", "delete-finished")
waitRepairEmpty(cluster)

By("restoring protected_namespace and disabling need_drain")
cluster.Repair.ProtectedNamespaces = nil
cluster.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].NeedDrain = false
_, err = ckecliClusterSet(cluster)
Expect(err).NotTo(HaveOccurred())
time.Sleep(time.Second * 3)

repairQueueAdd(nodeNames[1])
waitRepairSuccess(cluster)
nodesShouldBeSchedulable(nodeNames[1])
})
}
4 changes: 4 additions & 0 deletions mtest/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ var _ = Describe("Test CKE", func() {
Context("reboot", func() {
testRebootOperations()
})
case "repair":
Context("repair", func() {
testRepairOperations()
})
case "upgrade":
Context("upgrade", Ordered, func() {
testUpgrade()
Expand Down
53 changes: 53 additions & 0 deletions op/repair_dequeue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package op

import (
"context"

"github.com/cybozu-go/cke"
)

type repairDequeueOp struct {
finished bool

entry *cke.RepairQueueEntry
}

func RepairDequeueOp(entry *cke.RepairQueueEntry) cke.Operator {
return &repairDequeueOp{
entry: entry,
}
}

func (o *repairDequeueOp) Name() string {
return "repair-dequeue"
}

func (o *repairDequeueOp) NextCommand() cke.Commander {
if o.finished {
return nil
}

o.finished = true
return repairDequeueCommand{
entry: o.entry,
}
}

func (o *repairDequeueOp) Targets() []string {
return []string{o.entry.Address}
}

type repairDequeueCommand struct {
entry *cke.RepairQueueEntry
}

func (c repairDequeueCommand) Run(ctx context.Context, inf cke.Infrastructure, leaderKey string) error {
return inf.Storage().DeleteRepairsEntry(ctx, leaderKey, c.entry.Index)
}

func (c repairDequeueCommand) Command() cke.Command {
return cke.Command{
Name: "repairDequeueCommand",
Target: c.entry.Address,
}
}
Loading

0 comments on commit 4ccf1dc

Please sign in to comment.