diff --git a/cmd/rollout-operator/main.go b/cmd/rollout-operator/main.go index 050839662..913e12485 100644 --- a/cmd/rollout-operator/main.go +++ b/cmd/rollout-operator/main.go @@ -62,8 +62,6 @@ type config struct { useZoneTracker bool zoneTrackerConfigMapName string - deletionInterval time.Duration - delayBetweenSts time.Duration } func (cfg *config) register(fs *flag.FlagSet) { @@ -90,8 +88,6 @@ func (cfg *config) register(fs *flag.FlagSet) { fs.BoolVar(&cfg.useZoneTracker, "use-zone-tracker", false, "Use the zone tracker to prevent simultaneous downscales in different zones") fs.StringVar(&cfg.zoneTrackerConfigMapName, "zone-tracker.config-map-name", "rollout-operator-zone-tracker", "The name of the ConfigMap to use for the zone tracker") - fs.DurationVar(&cfg.deletionInterval, "deletion-interval", 5*time.Minute, "[Deprecated]time to wait before actually terminating the pod") - fs.DurationVar(&cfg.delayBetweenSts, "delay-between-sts", 5*time.Minute, "time to wait between stateful sets") } func (cfg config) validate() error { @@ -175,7 +171,7 @@ func main() { maybeStartTLSServer(cfg, logger, kubeClient, restart, metrics) // Init the controller. - c := controller.NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, cfg.kubeNamespace, httpClient, cfg.reconcileInterval, cfg.delayBetweenSts, reg, logger) + c := controller.NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, cfg.kubeNamespace, httpClient, cfg.reconcileInterval, reg, logger) check(errors.Wrap(c.Init(), "failed to init controller")) // Listen to sigterm, as well as for restart (like for certificate renewal). diff --git a/integration/manifests_mock_service_test.go b/integration/manifests_mock_service_test.go index 7605891c5..09a3735b8 100644 --- a/integration/manifests_mock_service_test.go +++ b/integration/manifests_mock_service_test.go @@ -22,7 +22,6 @@ func createMockServiceZone(t *testing.T, ctx context.Context, api *kubernetes.Cl _, err := api.AppsV1().StatefulSets(namespace).Create(ctx, mockServiceStatefulSet(name, "1", true), metav1.CreateOptions{}) require.NoError(t, err, "Can't create StatefulSet") } - { _, err := api.CoreV1().Services(namespace).Create(ctx, mockServiceService(name), metav1.CreateOptions{}) require.NoError(t, err, "Can't create Service") diff --git a/integration/manifests_rollout_operator_test.go b/integration/manifests_rollout_operator_test.go index f87211e10..9f7a201d2 100644 --- a/integration/manifests_rollout_operator_test.go +++ b/integration/manifests_rollout_operator_test.go @@ -59,7 +59,6 @@ func rolloutOperatorDeployment(namespace string, webhook bool) *appsv1.Deploymen fmt.Sprintf("-kubernetes.namespace=%s", namespace), "-reconcile.interval=1s", "-log.level=debug", - "-delay-between-sts=0s", } if webhook { args = append(args, diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 3d63e36a4..bcab67ed8 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -66,11 +66,6 @@ type RolloutController struct { // Used to signal when the controller should stop. stopCh chan struct{} - //deletion interval related - delayBetweenSts time.Duration - lastUpdatedSts *v1.StatefulSet - lastUpdatedTime time.Time - // Metrics. groupReconcileTotal *prometheus.CounterVec groupReconcileFailed *prometheus.CounterVec @@ -82,7 +77,7 @@ type RolloutController struct { discoveredGroups map[string]struct{} } -func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTMapper, scaleClient scale.ScalesGetter, dynamic dynamic.Interface, namespace string, client httpClient, reconcileInterval time.Duration, delayBetweenSts time.Duration, reg prometheus.Registerer, logger log.Logger) *RolloutController { +func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTMapper, scaleClient scale.ScalesGetter, dynamic dynamic.Interface, namespace string, client httpClient, reconcileInterval time.Duration, reg prometheus.Registerer, logger log.Logger) *RolloutController { namespaceOpt := informers.WithNamespace(namespace) // Initialise the StatefulSet informer to restrict the returned StatefulSets to only the ones @@ -114,9 +109,6 @@ func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTM logger: logger, stopCh: make(chan struct{}), discoveredGroups: map[string]struct{}{}, - delayBetweenSts: delayBetweenSts, - lastUpdatedSts: nil, - lastUpdatedTime: time.Time{}, groupReconcileTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "rollout_operator_group_reconciles_total", Help: "Total number of reconciles started for a specific rollout group.", @@ -217,7 +209,7 @@ func (c *RolloutController) reconcile(ctx context.Context) error { span, ctx := opentracing.StartSpanFromContext(ctx, "RolloutController.reconcile()") defer span.Finish() - level.Info(c.logger).Log("msg", "reconcile started") + level.Info(c.logger).Log("msg", "reconcile started (minReadySeconds version)") sets, err := c.listStatefulSetsWithRolloutGroup() if err != nil { @@ -310,8 +302,6 @@ func (c *RolloutController) reconcileStatefulSetsGroup(ctx context.Context, grou if len(notReadySets) == 1 { level.Info(c.logger).Log("msg", "a StatefulSet has some not-Ready pods, reconcile it first", "statefulset", notReadySets[0].Name) sets = util.MoveStatefulSetToFront(sets, notReadySets[0]) - // sts could become not ready by other activities like UKI, we should consider this as last updated sts - c.updateLastUpdatedSts(notReadySets[0]) } for _, sts := range sets { @@ -325,7 +315,6 @@ func (c *RolloutController) reconcileStatefulSetsGroup(ctx context.Context, grou if ongoing { // Do not continue with other StatefulSets because this StatefulSet // update is still ongoing. - c.updateLastUpdatedSts(sts) return nil } } @@ -499,20 +488,6 @@ func (c *RolloutController) listPods(sel labels.Selector) ([]*corev1.Pod, error) return pods, nil } -func (c *RolloutController) canUpdateSts(sts *v1.StatefulSet) bool { - if c.lastUpdatedSts == nil || c.lastUpdatedSts.Name == sts.Name { - // no need to wait within the same sts updates - return true - } - return time.Since(c.lastUpdatedTime) > c.delayBetweenSts -} - -func (c *RolloutController) updateLastUpdatedSts(sts *v1.StatefulSet) { - c.lastUpdatedSts = sts - c.lastUpdatedTime = time.Now() - level.Debug(c.logger).Log("msg", "updated last updated sts", "sts", sts.Name, "time", c.lastUpdatedTime) -} - func (c *RolloutController) updateStatefulSetPods(ctx context.Context, sts *v1.StatefulSet) (bool, error) { level.Debug(c.logger).Log("msg", "reconciling StatefulSet", "statefulset", sts.Name) @@ -522,26 +497,19 @@ func (c *RolloutController) updateStatefulSetPods(ctx context.Context, sts *v1.S } if len(podsToUpdate) > 0 { - if !c.canUpdateSts(sts) { - // only check canUpdateSts if the current sts has pods to be updated - level.Info(c.logger).Log("msg", "delaying reconcile between StatefulSets updates", - "curr", sts.Name, "last", c.lastUpdatedSts.Name, - "delay", c.delayBetweenSts, "pods_to_update", len(podsToUpdate)) - time.Sleep(c.delayBetweenSts) - // MUST return here: - // 1. pods state could change during the delay period, scan the entire cluster again - // 2. since we didn't actually update current sts, the last updated sts should not be changed - // 3. throw errors to not proceed with other StatefulSets - return false, errors.New("delaying reconcile between StatefulSets updates") - } - maxUnavailable := getMaxUnavailableForStatefulSet(sts, c.logger) - numNotReady := int(sts.Status.Replicas - sts.Status.ReadyReplicas) + var numNotAvailable int + if sts.Spec.MinReadySeconds > 0 { + level.Info(c.logger).Log("msg", "StatefulSet has minReadySeconds set, waiting before terminating pods", "statefulset", sts.Name, "min_ready_seconds", sts.Spec.MinReadySeconds) + numNotAvailable = int(sts.Status.Replicas - sts.Status.AvailableReplicas) + } else { + numNotAvailable = int(sts.Status.Replicas - sts.Status.ReadyReplicas) + } // Compute the number of pods we should update, honoring the configured maxUnavailable. numPods := max(0, min( - maxUnavailable-numNotReady, // No more than the configured maxUnavailable (including not-Ready pods). - len(podsToUpdate), // No more than the total number of pods that need to be updated. + maxUnavailable-numNotAvailable, // No more than the configured maxUnavailable (including not-Ready pods). + len(podsToUpdate), // No more than the total number of pods that need to be updated. )) if numPods == 0 { @@ -551,6 +519,7 @@ func (c *RolloutController) updateStatefulSetPods(ctx context.Context, sts *v1.S "pods_to_update", len(podsToUpdate), "replicas", sts.Status.Replicas, "ready_replicas", sts.Status.ReadyReplicas, + "available_replicas", sts.Status.AvailableReplicas, "max_unavailable", maxUnavailable) return true, nil @@ -584,6 +553,11 @@ func (c *RolloutController) updateStatefulSetPods(ctx context.Context, sts *v1.S "statefulset", sts.Name) return true, nil + } else if sts.Spec.MinReadySeconds > 0 && sts.Status.Replicas != sts.Status.AvailableReplicas { + level.Info(c.logger).Log( + "msg", "StatefulSet pods are all updated and ready but StatefulSet has some not-Available replicas", + "statefulset", sts.Name) + return true, nil } // At this point there are no pods to update, so we can update the currentRevision in the StatefulSet. diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 50bf351de..199416875 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -165,6 +165,7 @@ func TestRolloutController_Reconcile(t *testing.T) { mockStatefulSet("ingester-zone-b", withPrevRevision(), func(sts *v1.StatefulSet) { sts.Status.Replicas = 3 sts.Status.ReadyReplicas = 2 + sts.Status.AvailableReplicas = 2 }), }, pods: []runtime.Object{ @@ -183,6 +184,7 @@ func TestRolloutController_Reconcile(t *testing.T) { mockStatefulSet("ingester-zone-b", withPrevRevision(), func(sts *v1.StatefulSet) { sts.Status.Replicas = 3 sts.Status.ReadyReplicas = 1 + sts.Status.AvailableReplicas = 1 }), }, pods: []runtime.Object{ @@ -540,7 +542,7 @@ func TestRolloutController_Reconcile(t *testing.T) { // Create the controller and start informers. reg := prometheus.NewPedanticRegistry() - c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, nil, 5*time.Second, 0, reg, log.NewNopLogger()) + c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, nil, 0, reg, log.NewNopLogger()) require.NoError(t, c.Init()) defer c.Stop() @@ -825,7 +827,7 @@ func TestRolloutController_ReconcileStatefulsetWithDownscaleDelay(t *testing.T) // Create the controller and start informers. reg := prometheus.NewPedanticRegistry() - c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, httpClient, 5*time.Second, 0, reg, log.NewNopLogger()) + c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, httpClient, 0, reg, log.NewNopLogger()) require.NoError(t, c.Init()) defer c.Stop() @@ -928,7 +930,7 @@ func TestRolloutController_ReconcileShouldDeleteMetricsForDecommissionedRolloutG // Create the controller and start informers. reg := prometheus.NewPedanticRegistry() - c := NewRolloutController(kubeClient, nil, nil, nil, testNamespace, nil, 5*time.Second, 0, reg, log.NewNopLogger()) + c := NewRolloutController(kubeClient, nil, nil, nil, testNamespace, nil, 0, reg, log.NewNopLogger()) require.NoError(t, c.Init()) defer c.Stop() @@ -1010,10 +1012,11 @@ func mockStatefulSet(name string, overrides ...func(sts *v1.StatefulSet)) *v1.St }, }, Status: v1.StatefulSetStatus{ - Replicas: 3, - ReadyReplicas: 3, - CurrentRevision: testLastRevisionHash, - UpdateRevision: testLastRevisionHash, + Replicas: 3, + ReadyReplicas: 3, + AvailableReplicas: 3, + CurrentRevision: testLastRevisionHash, + UpdateRevision: testLastRevisionHash, }, } @@ -1067,6 +1070,7 @@ func withReplicas(totalReplicas, readyReplicas int32) func(sts *v1.StatefulSet) sts.Spec.Replicas = &totalReplicas sts.Status.Replicas = totalReplicas sts.Status.ReadyReplicas = readyReplicas + sts.Status.AvailableReplicas = readyReplicas } }