Skip to content

Commit

Permalink
Retain the error message in task execution even when next pod succeeds (
Browse files Browse the repository at this point in the history
  • Loading branch information
burmanm authored Feb 12, 2024
1 parent ff5bc87 commit f309eb4
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 38 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Changelog for Cass Operator, new PRs should update the `main / unreleased` secti

* [FEATURE] [#601](https://github.com/k8ssandra/cass-operator/pull/601) Add additionalAnnotations field to CR so that all resources created by the operator can be annotated.
* [BUGFIX] [#607](https://github.com/k8ssandra/cass-operator/issues/607) Add missing additional labels and annotations to the superuserSecret.
* [BUGFIX] [#612](https://github.com/k8ssandra/cass-operator/issues/612) Improve error message handling in the task jobs by retaining the message that previously failed pod has generated

## v1.18.2

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ CONTROLLER_TOOLS_VERSION ?= v0.12.0
OPERATOR_SDK_VERSION ?= 1.29.0
HELM_VERSION ?= 3.12.0
OPM_VERSION ?= 1.26.5
GOLINT_VERSION ?= 1.52.2
GOLINT_VERSION ?= 1.55.2

.PHONY: cert-manager
cert-manager: ## Install cert-manager to the cluster
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ require (
require (
github.com/Jeffail/gabs/v2 v2.7.0
github.com/onsi/ginkgo/v2 v2.9.4
github.com/prometheus/client_golang v1.15.1
go.uber.org/zap v1.24.0
golang.org/x/mod v0.12.0
k8s.io/utils v0.0.0-20230406110748-d93618cff8a2
Expand Down Expand Up @@ -55,7 +56,6 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.15.1 // indirect
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/procfs v0.9.0 // indirect
Expand Down
61 changes: 32 additions & 29 deletions internal/controllers/control/cassandratask_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ func (r *CassandraTaskReconciler) Reconcile(ctx context.Context, req ctrl.Reques
taskId := string(cassTask.UID)

var err error
var errMsg string
var failed, completed int
JobDefinition:
for _, job := range cassTask.Spec.Jobs {
Expand Down Expand Up @@ -337,7 +338,7 @@ JobDefinition:
}
}

res, failed, completed, err = r.reconcileEveryPodTask(ctx, dc, taskConfig)
res, failed, completed, errMsg, err = r.reconcileEveryPodTask(ctx, dc, taskConfig)

if err != nil {
return ctrl.Result{}, err
Expand Down Expand Up @@ -368,7 +369,6 @@ JobDefinition:
SetCondition(&cassTask, api.JobRunning, metav1.ConditionFalse, "")

if failed > 0 {
errMsg := ""
if err != nil {
errMsg = err.Error()
}
Expand Down Expand Up @@ -579,13 +579,13 @@ func (r *CassandraTaskReconciler) cleanupJobAnnotations(ctx context.Context, dc
}

// reconcileEveryPodTask executes the given task against all the Datacenter pods
func (r *CassandraTaskReconciler) reconcileEveryPodTask(ctx context.Context, dc *cassapi.CassandraDatacenter, taskConfig *TaskConfiguration) (ctrl.Result, int, int, error) {
func (r *CassandraTaskReconciler) reconcileEveryPodTask(ctx context.Context, dc *cassapi.CassandraDatacenter, taskConfig *TaskConfiguration) (ctrl.Result, int, int, string, error) {
logger := log.FromContext(ctx)

// We sort to ensure we process the dcPods in the same order
dcPods, err := r.getDatacenterPods(ctx, dc)
if err != nil {
return ctrl.Result{}, 0, 0, err
return ctrl.Result{}, 0, 0, "", err
}

sort.Slice(dcPods, func(i, j int) bool {
Expand All @@ -601,10 +601,11 @@ func (r *CassandraTaskReconciler) reconcileEveryPodTask(ctx context.Context, dc

nodeMgmtClient, err := httphelper.NewMgmtClient(ctx, r.Client, dc)
if err != nil {
return ctrl.Result{}, 0, 0, err
return ctrl.Result{}, 0, 0, "", err
}

failed, completed := 0, 0
errMsg := ""

for idx, pod := range dcPods {
// TODO Do we need post-pod processing functionality also? In case we need to wait for some other event to happen (processed by cass-operator).
Expand All @@ -617,7 +618,7 @@ func (r *CassandraTaskReconciler) reconcileEveryPodTask(ctx context.Context, dc

features, err := nodeMgmtClient.FeatureSet(&pod)
if err != nil {
return ctrl.Result{}, failed, completed, err
return ctrl.Result{}, failed, completed, errMsg, err
}

if pod.Annotations == nil {
Expand All @@ -626,7 +627,7 @@ func (r *CassandraTaskReconciler) reconcileEveryPodTask(ctx context.Context, dc

jobStatus, err := GetJobStatusFromPodAnnotations(taskConfig.Id, pod.Annotations)
if err != nil {
return ctrl.Result{}, failed, completed, err
return ctrl.Result{}, failed, completed, errMsg, err
}

if jobStatus.Id != "" {
Expand All @@ -645,20 +646,22 @@ func (r *CassandraTaskReconciler) reconcileEveryPodTask(ctx context.Context, dc
details, err := nodeMgmtClient.JobDetails(&pod, jobStatus.Id)
if err != nil {
logger.Error(err, "Could not get JobDetails for pod", "Pod", pod)
return ctrl.Result{}, failed, completed, err
return ctrl.Result{}, failed, completed, errMsg, err
}

if details.Id == "" {
// This job was not found, pod most likely restarted. Let's retry..
delete(pod.Annotations, getJobAnnotationKey(taskConfig.Id))
err = r.Client.Update(ctx, &pod)
if err != nil {
return ctrl.Result{}, failed, completed, err
return ctrl.Result{}, failed, completed, errMsg, err
}
return ctrl.Result{RequeueAfter: 1 * time.Second}, failed, completed, nil
return ctrl.Result{RequeueAfter: 1 * time.Second}, failed, completed, errMsg, nil
} else if details.Status == podJobError {
// Log the error, move on
logger.Error(fmt.Errorf("task failed: %s", details.Error), "Job failed to successfully complete the task", "Pod", pod)
errMsg = details.Error
err = fmt.Errorf("task failed: %s", errMsg)
logger.Error(err, "Job failed to successfully complete the task", "Pod", pod)
if taskConfig.RestartPolicy != corev1.RestartPolicyOnFailure || jobStatus.Retries >= 1 {
jobStatus.Status = podJobError
} else {
Expand All @@ -669,11 +672,11 @@ func (r *CassandraTaskReconciler) reconcileEveryPodTask(ctx context.Context, dc
}

if err = JobStatusToPodAnnotations(taskConfig.Id, pod.Annotations, jobStatus); err != nil {
return ctrl.Result{}, failed, completed, err
return ctrl.Result{}, failed, completed, errMsg, err
}

if err = r.Client.Update(ctx, &pod); err != nil {
return ctrl.Result{}, failed, completed, err
return ctrl.Result{}, failed, completed, errMsg, err
}

if jobStatus.Status == podJobError {
Expand All @@ -684,32 +687,32 @@ func (r *CassandraTaskReconciler) reconcileEveryPodTask(ctx context.Context, dc
// Pod has finished, remove the job_id and let us move to the next pod
jobStatus.Status = podJobCompleted
if err = JobStatusToPodAnnotations(taskConfig.Id, pod.Annotations, jobStatus); err != nil {
return ctrl.Result{}, failed, completed, err
return ctrl.Result{}, failed, completed, errMsg, err
}

if err = r.Client.Update(ctx, &pod); err != nil {
return ctrl.Result{}, failed, completed, err
return ctrl.Result{}, failed, completed, errMsg, err
}
completed++
continue
} else if details.Status == podJobWaiting {
// Job is still running or waiting
return ctrl.Result{RequeueAfter: JobRunningRequeue}, failed, completed, nil
return ctrl.Result{RequeueAfter: JobRunningRequeue}, failed, completed, errMsg, nil
}
} else {
if len(jobRunner) > 0 {
// Something is still holding the worker
return ctrl.Result{RequeueAfter: JobRunningRequeue}, failed, completed, nil
return ctrl.Result{RequeueAfter: JobRunningRequeue}, failed, completed, errMsg, nil
}

// Nothing is holding the job, this pod has finished
jobStatus.Status = podJobCompleted
if err = JobStatusToPodAnnotations(taskConfig.Id, pod.Annotations, jobStatus); err != nil {
return ctrl.Result{}, failed, completed, err
return ctrl.Result{}, failed, completed, errMsg, err
}

if err = r.Client.Update(ctx, &pod); err != nil {
return ctrl.Result{}, failed, completed, err
return ctrl.Result{}, failed, completed, errMsg, err
}
completed++
continue
Expand All @@ -720,32 +723,32 @@ func (r *CassandraTaskReconciler) reconcileEveryPodTask(ctx context.Context, dc
// Pod isn't running anything at the moment, this pod should run next
jobId, err := taskConfig.AsyncFunc(nodeMgmtClient, &pod, taskConfig)
if err != nil {
return ctrl.Result{}, failed, completed, err
return ctrl.Result{}, failed, completed, errMsg, err
}
jobStatus.Handler = jobHandlerMgmtApi
jobStatus.Id = jobId

if err = JobStatusToPodAnnotations(taskConfig.Id, pod.Annotations, jobStatus); err != nil {
return ctrl.Result{}, failed, completed, err
return ctrl.Result{}, failed, completed, errMsg, err
}

err = r.Client.Update(ctx, &pod)
if err != nil {
logger.Error(err, "Failed to patch pod's status to include jobId", "Pod", pod)
return ctrl.Result{}, failed, completed, err
return ctrl.Result{}, failed, completed, errMsg, err
}
} else {
if len(jobRunner) > 0 {
// Something is still holding the worker
return ctrl.Result{RequeueAfter: JobRunningRequeue}, failed, completed, nil
return ctrl.Result{RequeueAfter: JobRunningRequeue}, failed, completed, errMsg, nil
}

if taskConfig.SyncFunc == nil {
// This feature is not supported in sync mode, mark everything as done
err := fmt.Errorf("this job isn't supported by the target pod")
logger.Error(err, "unable to execute requested job against pod", "Pod", pod)
failed++
return ctrl.Result{}, failed, completed, err
return ctrl.Result{}, failed, completed, errMsg, err
}

jobId := strconv.Itoa(idx)
Expand All @@ -755,13 +758,13 @@ func (r *CassandraTaskReconciler) reconcileEveryPodTask(ctx context.Context, dc
jobStatus.Id = jobId

if err = JobStatusToPodAnnotations(taskConfig.Id, pod.Annotations, jobStatus); err != nil {
return ctrl.Result{}, failed, completed, err
return ctrl.Result{}, failed, completed, errMsg, err
}

err = r.Client.Update(ctx, &pod)
if err != nil {
logger.Error(err, "Failed to patch pod's status to indicate its running a local job", "Pod", pod)
return ctrl.Result{}, failed, completed, err
return ctrl.Result{}, failed, completed, errMsg, err
}

pod := pod
Expand Down Expand Up @@ -806,13 +809,13 @@ func (r *CassandraTaskReconciler) reconcileEveryPodTask(ctx context.Context, dc
}

// We have a job going on, return back later to check the status
return ctrl.Result{RequeueAfter: JobRunningRequeue}, failed, completed, nil
return ctrl.Result{RequeueAfter: JobRunningRequeue}, failed, completed, errMsg, nil
}

if len(jobRunner) > 0 {
// Something is still holding the worker while none of the existing pods are, probably the replace job.
return ctrl.Result{RequeueAfter: JobRunningRequeue}, failed, completed, nil
return ctrl.Result{RequeueAfter: JobRunningRequeue}, failed, completed, errMsg, nil
}

return ctrl.Result{}, failed, completed, nil
return ctrl.Result{}, failed, completed, errMsg, nil
}
16 changes: 10 additions & 6 deletions internal/controllers/control/cassandratask_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,9 @@ var _ = Describe("CassandraTask controller tests", func() {
Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(BeNumerically(">=", nodeCount))
Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", nodeCount))

Expect(completedTask.Status.Failed).To(BeNumerically(">=", nodeCount))
Expect(completedTask.Status.Failed).To(BeNumerically("==", nodeCount))
Expect(completedTask.Status.Conditions[2].Type).To(Equal(string(api.JobFailed)))
Expect(completedTask.Status.Conditions[2].Message).To(Equal("any error"))
})
It("If retryPolicy is set, we should see a retry", func() {
By("Creating fake mgmt-api server")
Expand All @@ -492,12 +494,14 @@ var _ = Describe("CassandraTask controller tests", func() {

completedTask := waitForTaskCompletion(taskKey)

// Due to retry, we have double the amount of calls
Expect(callDetails.URLCounts["/api/v1/ops/keyspace/cleanup"]).To(Equal(nodeCount * 2))
Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(BeNumerically(">=", nodeCount*2))
Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", nodeCount*2))
// Due to retry, we try twice and then bail out
Expect(callDetails.URLCounts["/api/v1/ops/keyspace/cleanup"]).To(Equal(2 * nodeCount))
Expect(callDetails.URLCounts["/api/v0/ops/executor/job"]).To(BeNumerically(">=", 2*nodeCount))
Expect(callDetails.URLCounts["/api/v0/metadata/versions/features"]).To(BeNumerically(">", 2*nodeCount))

Expect(completedTask.Status.Failed).To(BeNumerically(">=", nodeCount))
Expect(completedTask.Status.Failed).To(BeNumerically("==", nodeCount))
Expect(completedTask.Status.Conditions[2].Type).To(Equal(string(api.JobFailed)))
Expect(completedTask.Status.Conditions[2].Message).To(Equal("any error"))
})
It("Replace a node in the datacenter without specifying the pod", func() {
testFailedNamespaceName := fmt.Sprintf("test-task-failed-%d", rand.Int31())
Expand Down
2 changes: 1 addition & 1 deletion pkg/httphelper/server_test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ var featuresReply = `{

var jobDetailsCompleted = `{"submit_time":"1638545895255","end_time":"1638545895255","id":"%s","type":"Cleanup","status":"COMPLETED"}`

var jobDetailsFailed = `{"submit_time":"1638545895255","end_time":"1638545895255","id":"%s","type":"Cleanup","status":"ERROR"}`
var jobDetailsFailed = `{"submit_time":"1638545895255","end_time":"1638545895255","id":"%s","type":"Cleanup","status":"ERROR","error":"any error"}`

func mgmtApiListener() (net.Listener, error) {
mgmtApiListener, err := net.Listen("tcp", "127.0.0.1:8080")
Expand Down

0 comments on commit f309eb4

Please sign in to comment.