Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delete 'OnDelete' DaemonSet pods #746

Merged
merged 2 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions op/reboot.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,109 @@ func (c rebootDrainStartCommand) Run(ctx context.Context, inf cke.Infrastructure

//

type rebootDeleteDaemonSetPodOp struct {
finished bool

entries []*cke.RebootQueueEntry
config *cke.Reboot
apiserver *cke.Node

mu sync.Mutex
failedNodes []string
}

func RebootDeleteDaemonSetPodOp(apiserver *cke.Node, entries []*cke.RebootQueueEntry, config *cke.Reboot) cke.InfoOperator {
return &rebootDeleteDaemonSetPodOp{
entries: entries,
config: config,
apiserver: apiserver,
}
}

type rebootDeleteDaemonSetPodCommand struct {
entries []*cke.RebootQueueEntry
apiserver *cke.Node

notifyFailedNode func(string)
}

func (o *rebootDeleteDaemonSetPodOp) Name() string {
return "reboot-delete-daemonset-pod"
}

func (o *rebootDeleteDaemonSetPodOp) notifyFailedNode(node string) {
o.mu.Lock()
o.failedNodes = append(o.failedNodes, node)
o.mu.Unlock()
}

func (o *rebootDeleteDaemonSetPodOp) Targets() []string {
ipAddresses := make([]string, len(o.entries))
for i, entry := range o.entries {
ipAddresses[i] = entry.Node
}
return ipAddresses
}

func (o *rebootDeleteDaemonSetPodOp) Info() string {
if len(o.failedNodes) == 0 {
return ""
}
return fmt.Sprintf("failed to delete DaemonSet pods on some nodes: %v", o.failedNodes)
}

func (o *rebootDeleteDaemonSetPodOp) NextCommand() cke.Commander {
if o.finished {
return nil
}
o.finished = true

return rebootDeleteDaemonSetPodCommand{
entries: o.entries,
apiserver: o.apiserver,
notifyFailedNode: o.notifyFailedNode,
}
}

func (c rebootDeleteDaemonSetPodCommand) Command() cke.Command {
ipAddresses := make([]string, len(c.entries))
for i, entry := range c.entries {
ipAddresses[i] = entry.Node
}
return cke.Command{
Name: "rebootDeleteDaemonSetPodCommand",
Target: strings.Join(ipAddresses, ","),
}
}

func (c rebootDeleteDaemonSetPodCommand) Run(ctx context.Context, inf cke.Infrastructure, _ string) error {
cs, err := inf.K8sClient(ctx, c.apiserver)
if err != nil {
return err
}

// delete DaemonSet pod on each node
for _, entry := range c.entries {
// keep entry.Status as RebootStatusDraining and don't update it here.

log.Info("start deletion of DaemonSet pod", map[string]interface{}{
"name": entry.Node,
})
err := deleteOnDeleteDaemonSetPod(ctx, cs, entry.Node)
if err != nil {
log.Warn("deletion of DaemonSet pod failed", map[string]interface{}{
"name": entry.Node,
log.FnError: err,
})
c.notifyFailedNode(entry.Node)
}
}

return nil
}

//

type rebootRebootOp struct {
finished bool

Expand Down
57 changes: 57 additions & 0 deletions op/reboot_decide.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/cybozu-go/cke"
"github.com/cybozu-go/log"
"github.com/cybozu-go/well"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -61,6 +62,42 @@ func enumeratePods(ctx context.Context, cs *kubernetes.Clientset, node string,
return nil
}

// enumerateOnDeleteDaemonSetPods enumerates Pods on a specified node that are owned by "updateStrategy:OnDelete" DaemonSets.
// It calls podHandler for each target pods.
// If the handler returns error, this function returns the error immediately.
// Note: This function does not distinguish API errors and state evaluation returned from subfunction.
func enumerateOnDeleteDaemonSetPods(ctx context.Context, cs *kubernetes.Clientset, node string,
podHandler func(pod *corev1.Pod) error) error {

daemonSets, err := cs.AppsV1().DaemonSets(corev1.NamespaceAll).List(ctx, metav1.ListOptions{})
if err != nil {
return err
}

for _, ds := range daemonSets.Items {
if ds.Spec.UpdateStrategy.Type == appsv1.OnDeleteDaemonSetStrategyType {
labelSelector := metav1.FormatLabelSelector(ds.Spec.Selector)
pods, err := cs.CoreV1().Pods(ds.Namespace).List(ctx, metav1.ListOptions{
LabelSelector: labelSelector,
})
if err != nil {
return err
}

for _, pod := range pods.Items {
if pod.Spec.NodeName == node {
err = podHandler(&pod)
if err != nil {
return err
}
}
}
}
}

return nil
}

// dryRunEvictOrDeleteNodePod checks eviction or deletion of Pods on the specified Node can proceed.
// It returns an error if a running Pod exists or an eviction of the Pod in protected namespace failed.
func dryRunEvictOrDeleteNodePod(ctx context.Context, cs *kubernetes.Clientset, node string, protected map[string]bool) error {
Expand All @@ -73,6 +110,11 @@ func evictOrDeleteNodePod(ctx context.Context, cs *kubernetes.Clientset, node st
return doEvictOrDeleteNodePod(ctx, cs, node, protected, attempts, interval, false)
}

// deleteOnDeleteDaemonSetPod evicts or delete Pods on the specified Node that are owned by "updateStrategy:OnDelete" DaemonSets.
func deleteOnDeleteDaemonSetPod(ctx context.Context, cs *kubernetes.Clientset, node string) error {
return doDeleteOnDeleteDaemonSetPod(ctx, cs, node)
}

// doEvictOrDeleteNodePod evicts or delete Pods on the specified Node.
// It first tries eviction.
// If the eviction failed and the Pod's namespace is not protected, it deletes the Pod.
Expand Down Expand Up @@ -163,6 +205,21 @@ func doEvictOrDeleteNodePod(ctx context.Context, cs *kubernetes.Clientset, node
})
}

// doDeleteOnDeleteDaemonSetPod deletes 'OnDelete' DaemonSet pods on the specified Node.
func doDeleteOnDeleteDaemonSetPod(ctx context.Context, cs *kubernetes.Clientset, node string) error {
return enumerateOnDeleteDaemonSetPods(ctx, cs, node, func(pod *corev1.Pod) error {
err := cs.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{})
if err != nil && !apierrors.IsNotFound(err) {
return err
}
log.Info("deleted daemonset pod", map[string]interface{}{
"namespace": pod.Namespace,
"name": pod.Name,
})
return nil
})
}

// checkPodDeletion checks whether the evicted or deleted Pods are eventually deleted.
// If those pods still exist, this function returns an error.
func checkPodDeletion(ctx context.Context, cs *kubernetes.Clientset, node string) error {
Expand Down
2 changes: 2 additions & 0 deletions server/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -898,6 +898,8 @@ func rebootOps(c *cke.Cluster, constraints *cke.Constraints, rebootArgs DecideOp
}

if len(rebootArgs.DrainCompleted) > 0 {
// After eviction of normal pods, evict "OnDelete" daemonset pods.
ops = append(ops, op.RebootDeleteDaemonSetPodOp(nf.HealthyAPIServer(), rebootArgs.DrainCompleted, &c.Reboot))
ops = append(ops, op.RebootRebootOp(nf.HealthyAPIServer(), rebootArgs.DrainCompleted, &c.Reboot))
}
if len(rebootArgs.NewlyDrained) > 0 {
Expand Down
1 change: 1 addition & 0 deletions server/strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2637,6 +2637,7 @@ func TestDecideOps(t *testing.T) {
},
}),
ExpectedOps: []opData{
{"reboot-delete-daemonset-pod", 1},
{"reboot-reboot", 1},
},
},
Expand Down