Skip to content

Commit

Permalink
Get repair-queue status
Browse files Browse the repository at this point in the history
Signed-off-by: morimoto-cybozu <[email protected]>
  • Loading branch information
morimoto-cybozu committed Dec 25, 2023
1 parent 8c49fb7 commit 059c9f7
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 2 deletions.
101 changes: 101 additions & 0 deletions op/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,12 @@ import (
"net/url"
"strconv"
"strings"
"time"

"github.com/cybozu-go/cke"
"github.com/cybozu-go/cke/static"
"github.com/cybozu-go/log"
"github.com/cybozu-go/well"
"go.etcd.io/etcd/api/v3/etcdserverpb"
clientv3 "go.etcd.io/etcd/client/v3"
k8serr "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -563,3 +565,102 @@ func containCommandOption(slice []string, optionName string) bool {
}
return false
}

func GetRepairQueueStatus(ctx context.Context, inf cke.Infrastructure, n *cke.Node, cluster *cke.Cluster) (cke.RepairQueueStatus, error) {
rqs := cke.RepairQueueStatus{}

clientset, err := inf.K8sClient(ctx, n)
if err != nil {
return cke.RepairQueueStatus{}, err
}

disabled, err := inf.Storage().IsRepairQueueDisabled(ctx)
if err != nil {
return cke.RepairQueueStatus{}, err
}
rqs.Enabled = !disabled

entries, err := inf.Storage().GetRepairsEntries(ctx)
if err != nil {
return cke.RepairQueueStatus{}, err
}
rqs.Entries = entries

for _, entry := range entries {
// Update Nodename every time.
// Though the nodename of a machine in a Kubernetes cluster will not change,
// the machine can join/leave the cluster dynamically.
entry.FillNodename(cluster)
}

rqs.RepairCompleted = make(map[string]bool)
for _, entry := range entries {
if !(entry.Status == cke.RepairStatusQueued || entry.Status == cke.RepairStatusProcessing) {
// not "!(RepairStatusProcessing && repairStepStatusWatching)"
// Though the repair completion will happen a little later after the execution of
// the repair command in most cases, it is useful to check the health for all
// unfinished entries.
continue
}
healthy, err := isRepairTargetHealthy(ctx, entry, cluster)
if err != nil {
log.Warn("health check failed", map[string]interface{}{
log.FnError: err,
"index": entry.Index,
"address": entry.Address,
})
continue
}
if healthy {
rqs.RepairCompleted[entry.Address] = true
}
}

rqs.DrainCompleted = make(map[string]bool)
for _, entry := range entries {
if !(entry.Status == cke.RepairStatusProcessing && entry.StepStatus == cke.RepairStepStatusDraining) {
continue
}
if !entry.IsInCluster() {
// The target machine has been removed from the Kubernetes cluster while being drained.
// The drain operation should be treated as succeeded.
// Unlike the reboot queue, the repair queue continues to manage the out-of-cluster machine.
rqs.DrainCompleted[entry.Address] = true
continue
}
err := checkPodDeletion(ctx, clientset, entry.Nodename)
if err == nil {
rqs.DrainCompleted[entry.Address] = true
}
}

return rqs, nil
}

func isRepairTargetHealthy(ctx context.Context, entry *cke.RepairQueueEntry, cluster *cke.Cluster) (bool, error) {
op, err := entry.GetMatchingRepairOperation(cluster)
if err != nil {
return false, err
}
if len(op.HealthCheckCommand) == 0 {
return false, errors.New("health check command not defined")
}

timeout := cke.DefaultRepairHealthCheckCommandTimeoutSeconds
if op.CommandTimeoutSeconds != nil {
timeout = *op.CommandTimeoutSeconds
}
if timeout != 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Second*time.Duration(timeout))
defer cancel()
}

args := append(op.HealthCheckCommand[1:], entry.Address)
command := well.CommandContext(ctx, op.HealthCheckCommand[0], args...)
stdout, err := command.Output()
if err != nil {
return false, err
}
return strings.TrimSpace(string(stdout)) == "true", nil
}
6 changes: 6 additions & 0 deletions server/get_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,5 +88,11 @@ func (c Controller) GetClusterStatus(ctx context.Context, cluster *cke.Cluster,
}
cs.Kubernetes = kcs

repairQueueStatus, err := op.GetRepairQueueStatus(ctx, inf, livingMaster, cluster)
if err != nil {
return nil, err
}
cs.RepairQueue = repairQueueStatus

return cs, nil
}
13 changes: 11 additions & 2 deletions status.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,9 @@ type ClusterStatus struct {
Name string
NodeStatuses map[string]*NodeStatus // keys are IP address strings.

Etcd EtcdClusterStatus
Kubernetes KubernetesClusterStatus
Etcd EtcdClusterStatus
Kubernetes KubernetesClusterStatus
RepairQueue RepairQueueStatus
}

// NodeStatus status of a node.
Expand Down Expand Up @@ -148,3 +149,11 @@ type ProxyStatus struct {
IsHealthy bool
Config *proxyv1alpha1.KubeProxyConfiguration
}

// RepairQueueStatus represents repair queue status
type RepairQueueStatus struct {
Enabled bool
Entries []*RepairQueueEntry
RepairCompleted map[string]bool
DrainCompleted map[string]bool
}

0 comments on commit 059c9f7

Please sign in to comment.