Skip to content

Commit

Permalink
Evict 'OnDelete' DaemonSet pods
Browse files Browse the repository at this point in the history
Signed-off-by: naoki-take <[email protected]>
  • Loading branch information
tkna committed Jun 25, 2024
1 parent 2b16d7c commit 131033c
Show file tree
Hide file tree
Showing 4 changed files with 192 additions and 9 deletions.
131 changes: 131 additions & 0 deletions op/reboot.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,137 @@ func (c rebootDrainStartCommand) Run(ctx context.Context, inf cke.Infrastructure

//

type rebootEvictDaemonSetPodOp struct {
finished bool

entries []*cke.RebootQueueEntry
config *cke.Reboot
apiserver *cke.Node

mu sync.Mutex
failedNodes []string
}

func RebootEvictDaemonSetPodOp(apiserver *cke.Node, entries []*cke.RebootQueueEntry, config *cke.Reboot) cke.InfoOperator {
return &rebootEvictDaemonSetPodOp{
entries: entries,
config: config,
apiserver: apiserver,
}
}

type rebootEvictDaemonSetPodCommand struct {
entries []*cke.RebootQueueEntry
protectedNamespaces *metav1.LabelSelector
apiserver *cke.Node
evictAttempts int
evictInterval time.Duration

notifyFailedNode func(string)
}

func (o *rebootEvictDaemonSetPodOp) Name() string {
return "reboot-evict-daemonset-pod"
}

func (o *rebootEvictDaemonSetPodOp) notifyFailedNode(node string) {
o.mu.Lock()
o.failedNodes = append(o.failedNodes, node)
o.mu.Unlock()
}

func (o *rebootEvictDaemonSetPodOp) Targets() []string {
ipAddresses := make([]string, len(o.entries))
for i, entry := range o.entries {
ipAddresses[i] = entry.Node
}
return ipAddresses
}

func (o *rebootEvictDaemonSetPodOp) Info() string {
if len(o.failedNodes) == 0 {
return ""
}
return fmt.Sprintf("failed to evict DaemonSet pods on some nodes: %v", o.failedNodes)
}

func (o *rebootEvictDaemonSetPodOp) NextCommand() cke.Commander {
if o.finished {
return nil
}
o.finished = true

attempts := 1
if o.config.EvictRetries != nil {
attempts = *o.config.EvictRetries + 1
}
interval := 0 * time.Second
if o.config.EvictInterval != nil {
interval = time.Second * time.Duration(*o.config.EvictInterval)
}

return rebootEvictDaemonSetPodCommand{
entries: o.entries,
protectedNamespaces: o.config.ProtectedNamespaces,
apiserver: o.apiserver,
notifyFailedNode: o.notifyFailedNode,
evictAttempts: attempts,
evictInterval: interval,
}
}

func (c rebootEvictDaemonSetPodCommand) Command() cke.Command {
ipAddresses := make([]string, len(c.entries))
for i, entry := range c.entries {
ipAddresses[i] = entry.Node
}
return cke.Command{
Name: "rebootEvictDaemonSetPodCommand",
Target: strings.Join(ipAddresses, ","),
}
}

func (c rebootEvictDaemonSetPodCommand) Run(ctx context.Context, inf cke.Infrastructure, _ string) error {
cs, err := inf.K8sClient(ctx, c.apiserver)
if err != nil {
return err
}

protected, err := listProtectedNamespaces(ctx, cs, c.protectedNamespaces)
if err != nil {
return err
}

// evict DaemonSet pod on each node
// cordon is unnecessary for DaemonSet pods, so dry-run eviction is also skipped.
for _, entry := range c.entries {
// keep entry.Status as RebootStatusDraining and don't update it here.

log.Info("start eviction of DaemonSet pod", map[string]interface{}{
"name": entry.Node,
})
err := evictOrDeleteOnDeleteDaemonSetPod(ctx, cs, entry.Node, protected, c.evictAttempts, c.evictInterval)
if err != nil {
log.Warn("eviction of DaemonSet pod failed", map[string]interface{}{
"name": entry.Node,
log.FnError: err,
})
c.notifyFailedNode(entry.Node)
err = drainBackOff(ctx, inf, entry, err)
if err != nil {
return err
}
log.Info("eviction of DaemonSet pod succeeded", map[string]interface{}{
"name": entry.Node,
})
}
}

return nil
}

//

type rebootRebootOp struct {
finished bool

Expand Down
67 changes: 58 additions & 9 deletions op/reboot_decide.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/cybozu-go/cke"
"github.com/cybozu-go/log"
"github.com/cybozu-go/well"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -61,33 +62,83 @@ func enumeratePods(ctx context.Context, cs *kubernetes.Clientset, node string,
return nil
}

// enumerateOnDeleteDaemonSetPods enumerates Pods on a specified node that are owned by "updateStrategy:OnDelete" DaemonSets.
// It calls podHandler for each target pods.
// If the handler returns error, this function returns the error immediately.
// Note: This function does not distinguish API errors and state evaluation returned from subfunction.
func enumerateOnDeleteDaemonSetPods(ctx context.Context, cs *kubernetes.Clientset, node string,
podHandler func(pod *corev1.Pod) error) error {

daemonSets, err := cs.AppsV1().DaemonSets(corev1.NamespaceAll).List(ctx, metav1.ListOptions{})
if err != nil {
return err
}

for _, ds := range daemonSets.Items {
if ds.Spec.UpdateStrategy.Type == appsv1.OnDeleteDaemonSetStrategyType {
labelSelector := metav1.FormatLabelSelector(ds.Spec.Selector)
pods, err := cs.CoreV1().Pods(ds.Namespace).List(ctx, metav1.ListOptions{
LabelSelector: labelSelector,
})
if err != nil {
return err
}

for _, pod := range pods.Items {
if pod.Spec.NodeName == node {
err = podHandler(&pod)
if err != nil {
return err
}
}
}
}
}

return nil
}

// dryRunEvictOrDeleteNodePod checks eviction or deletion of Pods on the specified Node can proceed.
// It returns an error if a running Pod exists or an eviction of the Pod in protected namespace failed.
func dryRunEvictOrDeleteNodePod(ctx context.Context, cs *kubernetes.Clientset, node string, protected map[string]bool) error {
return doEvictOrDeleteNodePod(ctx, cs, node, protected, 0, 0, true)
return enumeratePods(ctx, cs, node,
doEvictOrDeleteNodePod(ctx, cs, node, protected, 0, time.Duration(0), true),
func(pod *corev1.Pod) error {
return fmt.Errorf("job-managed pod exists: %s/%s, phase=%s", pod.Namespace, pod.Name, pod.Status.Phase)
},
)
}

// evictOrDeleteNodePod evicts or delete Pods on the specified Node.
// If a running Job Pod exists, this function returns an error.
func evictOrDeleteNodePod(ctx context.Context, cs *kubernetes.Clientset, node string, protected map[string]bool, attempts int, interval time.Duration) error {
return doEvictOrDeleteNodePod(ctx, cs, node, protected, attempts, interval, false)
return enumeratePods(ctx, cs, node,
doEvictOrDeleteNodePod(ctx, cs, node, protected, attempts, interval, false),
func(pod *corev1.Pod) error {
return fmt.Errorf("job-managed pod exists: %s/%s, phase=%s", pod.Namespace, pod.Name, pod.Status.Phase)
},
)
}

// evictOrDeleteOnDeleteDaemonSetPod evicts or delete Pods on the specified Node that are owned by "updateStrategy:OnDelete" DaemonSets.
func evictOrDeleteOnDeleteDaemonSetPod(ctx context.Context, cs *kubernetes.Clientset, node string, protected map[string]bool, attempts int, interval time.Duration) error {
return enumerateOnDeleteDaemonSetPods(ctx, cs, node, doEvictOrDeleteNodePod(ctx, cs, node, protected, attempts, interval, false))
}

// doEvictOrDeleteNodePod evicts or delete Pods on the specified Node.
// doEvictOrDeleteNodePod returns a pod handler that evicts or delete Pods on the specified Node.
// It first tries eviction.
// If the eviction failed and the Pod's namespace is not protected, it deletes the Pod.
// If the eviction failed and the Pod's namespace is protected, it retries after `interval` interval at most `attempts` times.
// If a running Job Pod exists, this function returns an error.
// If `dry` is true, it performs dry run and `attempts` and `interval` are ignored.
func doEvictOrDeleteNodePod(ctx context.Context, cs *kubernetes.Clientset, node string, protected map[string]bool, attempts int, interval time.Duration, dry bool) error {
func doEvictOrDeleteNodePod(ctx context.Context, cs *kubernetes.Clientset, node string, protected map[string]bool, attempts int, interval time.Duration, dry bool) func(pod *corev1.Pod) error {
var deleteOptions *metav1.DeleteOptions
if dry {
deleteOptions = &metav1.DeleteOptions{
DryRun: []string{"All"},
}
}

return enumeratePods(ctx, cs, node, func(pod *corev1.Pod) error {
return func(pod *corev1.Pod) error {
if dry && !protected[pod.Namespace] {
// in case of dry-run for Pods in non-protected namespace,
// return immediately because its "eviction or deletion" never fails
Expand Down Expand Up @@ -158,9 +209,7 @@ func doEvictOrDeleteNodePod(ctx context.Context, cs *kubernetes.Clientset, node
return fmt.Errorf("failed to evict pod %s/%s due to PDB: %w", pod.Namespace, pod.Name, err)
}
return nil
}, func(pod *corev1.Pod) error {
return fmt.Errorf("job-managed pod exists: %s/%s, phase=%s", pod.Namespace, pod.Name, pod.Status.Phase)
})
}
}

// checkPodDeletion checks whether the evicted or deleted Pods are eventually deleted.
Expand Down
2 changes: 2 additions & 0 deletions server/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -898,6 +898,8 @@ func rebootOps(c *cke.Cluster, constraints *cke.Constraints, rebootArgs DecideOp
}

if len(rebootArgs.DrainCompleted) > 0 {
// After eviction of normal pods, evict "OnDelete" daemonset pods.
ops = append(ops, op.RebootEvictDaemonSetPodOp(nf.HealthyAPIServer(), rebootArgs.DrainCompleted, &c.Reboot))
ops = append(ops, op.RebootRebootOp(nf.HealthyAPIServer(), rebootArgs.DrainCompleted, &c.Reboot))
}
if len(rebootArgs.NewlyDrained) > 0 {
Expand Down
1 change: 1 addition & 0 deletions server/strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2637,6 +2637,7 @@ func TestDecideOps(t *testing.T) {
},
}),
ExpectedOps: []opData{
{"reboot-evict-daemonset-pod", 1},
{"reboot-reboot", 1},
},
},
Expand Down

0 comments on commit 131033c

Please sign in to comment.