Skip to content

Commit

Permalink
fix: address scale down issues
Browse files Browse the repository at this point in the history
The workflow now tries to validate the state of etcd on the node. This allows for multiple
reconciliations.

Signed-off-by: Andrew Rynhard <[email protected]>
  • Loading branch information
andrewrynhard authored and talos-bot committed Oct 16, 2020
1 parent 7479eb7 commit 338ca46
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 38 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# syntax = docker/dockerfile-upstream:1.1.4-experimental

FROM golang:1.13 AS build
FROM golang:1.15 AS build
ENV GO111MODULE on
ENV GOPROXY https://proxy.golang.org
ENV CGO_ENABLED 0
Expand Down
115 changes: 78 additions & 37 deletions controllers/taloscontrolplane_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,12 @@ func (r *TalosControlPlaneReconciler) Reconcile(req ctrl.Request) (res ctrl.Resu
// We are scaling down
case numMachines > desiredReplicas:
logger.Info("Scaling down control plane", "Desired", desiredReplicas, "Existing", numMachines)
return r.scaleDownControlPlane(ctx, util.ObjectKey(cluster), controlPlane.TCP.Name)

res, err = r.scaleDownControlPlane(ctx, util.ObjectKey(cluster), controlPlane.TCP.Name)
if err != nil && (res.Requeue || res.RequeueAfter > 0) {
logger.Info("Failed to scale down control plane", "error", err)
return res, nil
}
}

// Generate Cluster Kubeconfig if needed
Expand Down Expand Up @@ -322,23 +327,6 @@ func (r *TalosControlPlaneReconciler) scaleDownControlPlane(ctx context.Context,

r.Log.Info("Found control plane machines", "machines", len(machines))

oldest := machines[0]
for _, machine := range machines {
if !machine.ObjectMeta.DeletionTimestamp.IsZero() {
r.Log.Info("Machine is in process of deletion", "machine", machine.Name)

return ctrl.Result{RequeueAfter: 20 * time.Second}, nil
}

if machine.CreationTimestamp.Before(&oldest.CreationTimestamp) {
oldest = machine
}
}

if oldest.Status.NodeRef == nil {
return ctrl.Result{}, fmt.Errorf("%q machine does not have a nodeRef", oldest.Name)
}

kubeconfigSecret := &v1.Secret{}

err = r.Client.Get(ctx,
Expand All @@ -349,24 +337,53 @@ func (r *TalosControlPlaneReconciler) scaleDownControlPlane(ctx context.Context,
kubeconfigSecret,
)
if err != nil {
return ctrl.Result{Requeue: true}, err
return ctrl.Result{RequeueAfter: 20 * time.Second}, err
}

config, err := clientcmd.RESTConfigFromKubeConfig(kubeconfigSecret.Data["value"])
if err != nil {
return ctrl.Result{Requeue: true}, err
return ctrl.Result{RequeueAfter: 20 * time.Second}, err
}

clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return ctrl.Result{Requeue: true}, err
return ctrl.Result{RequeueAfter: 20 * time.Second}, err
}

oldest := machines[0]
for _, machine := range machines {
if !machine.ObjectMeta.DeletionTimestamp.IsZero() {
r.Log.Info("Machine is in process of deletion", "machine", machine.Name)

node, err := clientset.CoreV1().Nodes().Get(ctx, machine.Status.NodeRef.Name, metav1.GetOptions{})
if err != nil {
return ctrl.Result{RequeueAfter: 20 * time.Second}, err
}

r.Log.Info("Deleting node", "machine", machine.Name, "node", node.Name)

err = clientset.CoreV1().Nodes().Delete(ctx, node.Name, metav1.DeleteOptions{})
if err != nil {
return ctrl.Result{RequeueAfter: 20 * time.Second}, err
}

return ctrl.Result{RequeueAfter: 20 * time.Second}, nil
}

if machine.CreationTimestamp.Before(&oldest.CreationTimestamp) {
oldest = machine
}
}

if oldest.Status.NodeRef == nil {
return ctrl.Result{RequeueAfter: 20 * time.Second}, fmt.Errorf("%q machine does not have a nodeRef", oldest.Name)
}

var address string

node, err := clientset.CoreV1().Nodes().Get(ctx, oldest.Status.NodeRef.Name, metav1.GetOptions{})
if err != nil {
return ctrl.Result{Requeue: true}, err
return ctrl.Result{RequeueAfter: 20 * time.Second}, err
}

for _, addr := range node.Status.Addresses {
Expand All @@ -377,7 +394,7 @@ func (r *TalosControlPlaneReconciler) scaleDownControlPlane(ctx context.Context,
}

if address == "" {
return ctrl.Result{Requeue: true}, fmt.Errorf("no address was found for node %q", node.Name)
return ctrl.Result{RequeueAfter: 20 * time.Second}, fmt.Errorf("no address was found for node %q", node.Name)
}

var (
Expand All @@ -387,7 +404,7 @@ func (r *TalosControlPlaneReconciler) scaleDownControlPlane(ctx context.Context,

err = r.Client.List(ctx, &cfgs)
if err != nil {
return ctrl.Result{Requeue: true}, err
return ctrl.Result{RequeueAfter: 20 * time.Second}, err
}

for _, cfg := range cfgs.Items {
Expand All @@ -400,31 +417,55 @@ func (r *TalosControlPlaneReconciler) scaleDownControlPlane(ctx context.Context,
}

if found == nil {
return ctrl.Result{Requeue: true}, fmt.Errorf("failed to find TalosConfig for %q", oldest.Name)
return ctrl.Result{RequeueAfter: 20 * time.Second}, fmt.Errorf("failed to find TalosConfig for %q", oldest.Name)
}

t, err := talosconfig.FromString(found.Status.TalosConfig)
if err != nil {
return ctrl.Result{Requeue: true}, err
return ctrl.Result{RequeueAfter: 20 * time.Second}, err
}

c, err := talosclient.New(ctx, talosclient.WithEndpoints(address), talosclient.WithConfig(t))
if err != nil {
return ctrl.Result{Requeue: true}, err
return ctrl.Result{RequeueAfter: 20 * time.Second}, err
}

r.Log.Info("Forfeiting leadership", "machine", oldest.Status.NodeRef.Name)
r.Log.Info("Verifying etcd status", "machine", oldest.Name, "node", node.Name, "address", address)

_, err = c.MachineClient.EtcdForfeitLeadership(ctx, &machine.EtcdForfeitLeadershipRequest{})
svcs, err := c.MachineClient.ServiceList(ctx, &empty.Empty{})
if err != nil {
return ctrl.Result{Requeue: true}, nil
return ctrl.Result{RequeueAfter: 20 * time.Second}, err
}

r.Log.Info("Leaving etcd", "machine", oldest.Name, "node", node.Name, "address", address)
for _, svc := range svcs.Messages[0].Services {
if svc.Id != "etcd" {
continue
}

if svc.State != "Finished" {
r.Log.Info("Forfeiting leadership", "machine", oldest.Status.NodeRef.Name)

_, err = c.MachineClient.EtcdLeaveCluster(ctx, &machine.EtcdLeaveClusterRequest{})
_, err = c.MachineClient.EtcdForfeitLeadership(ctx, &machine.EtcdForfeitLeadershipRequest{})
if err != nil {
return ctrl.Result{RequeueAfter: 20 * time.Second}, err
}

r.Log.Info("Leaving etcd", "machine", oldest.Name, "node", node.Name, "address", address)

_, err = c.MachineClient.EtcdLeaveCluster(ctx, &machine.EtcdLeaveClusterRequest{})
if err != nil {
return ctrl.Result{RequeueAfter: 20 * time.Second}, err
}
}

break
}

r.Log.Info("Deleting machine", "machine", oldest.Name, "node", node.Name)

err = r.Client.Delete(ctx, &oldest)
if err != nil {
return ctrl.Result{Requeue: true}, nil
return ctrl.Result{}, err
}

// NB: We shutdown the node here so that a loadbalancer will drop the backend.
Expand All @@ -434,14 +475,14 @@ func (r *TalosControlPlaneReconciler) scaleDownControlPlane(ctx context.Context,

_, err = c.MachineClient.Shutdown(ctx, &empty.Empty{})
if err != nil {
return ctrl.Result{Requeue: true}, nil
return ctrl.Result{RequeueAfter: 20 * time.Second}, err
}

r.Log.Info("Deleting control plane node", "machine", oldest.Name, "node", node.Name)
r.Log.Info("Deleting node", "machine", oldest.Name, "node", node.Name)

err = r.Client.Delete(ctx, &oldest)
err = clientset.CoreV1().Nodes().Delete(ctx, node.Name, metav1.DeleteOptions{})
if err != nil {
return ctrl.Result{}, err
return ctrl.Result{RequeueAfter: 20 * time.Second}, err
}

// Requeue so that we handle any additional scaling.
Expand Down

0 comments on commit 338ca46

Please sign in to comment.