From 9d26adeee38242475b025450bd9f240753a33356 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 18 Oct 2024 11:17:09 +0200 Subject: [PATCH 01/25] fix: add support for delayed downscale port in the URL (#176) * fix: add support for delayed downscale port in the URL * update changelog * Update CHANGELOG.md Co-authored-by: Marco Pracucci --------- Co-authored-by: Marco Pracucci --- CHANGELOG.md | 2 + pkg/controller/delay.go | 6 ++- pkg/controller/delay_test.go | 83 ++++++++++++++++++++++++++++++++++++ 3 files changed, 90 insertions(+), 1 deletion(-) create mode 100644 pkg/controller/delay_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 1f5e58718..994ef73ed 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## main / unreleased +* [BUGFIX] Improved handling of URL ports in `createPrepareDownscaleEndpoints` function. The function now correctly preserves the port when replacing the host in the URL. #176 + ## v0.20.0 * [ENHANCEMENT] Updated dependencies, including: #174 diff --git a/pkg/controller/delay.go b/pkg/controller/delay.go index da671a0f0..0762dded6 100644 --- a/pkg/controller/delay.go +++ b/pkg/controller/delay.go @@ -147,7 +147,11 @@ func createPrepareDownscaleEndpoints(namespace, serviceName string, from, to int } ep.url = *url - ep.url.Host = fmt.Sprintf("%s.%v.%v.svc.cluster.local.", ep.podName, serviceName, ep.namespace) + newHost := fmt.Sprintf("%s.%v.%v.svc.cluster.local.", ep.podName, serviceName, ep.namespace) + if url.Port() != "" { + newHost = fmt.Sprintf("%s:%s", newHost, url.Port()) + } + ep.url.Host = newHost eps = append(eps, ep) } diff --git a/pkg/controller/delay_test.go b/pkg/controller/delay_test.go new file mode 100644 index 000000000..771d29d35 --- /dev/null +++ b/pkg/controller/delay_test.go @@ -0,0 +1,83 @@ +package controller + +import ( + "net/url" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCreatePrepareDownscaleEndpoints(t *testing.T) { + testCases := []struct { + name string + namespace string + serviceName string + from int + to int + inputURL string + expected []endpoint + }{ + { + name: "URL without port", + namespace: "test-namespace", + serviceName: "test-service", + from: 0, + to: 2, + inputURL: "http://example.com/api/prepare", + expected: []endpoint{ + { + namespace: "test-namespace", + podName: "test-service-0", + url: mustParseURL(t, "http://test-service-0.test-service.test-namespace.svc.cluster.local./api/prepare"), + replica: 0, + }, + { + namespace: "test-namespace", + podName: "test-service-1", + url: mustParseURL(t, "http://test-service-1.test-service.test-namespace.svc.cluster.local./api/prepare"), + replica: 1, + }, + }, + }, + { + name: "URL with port", + namespace: "prod-namespace", + serviceName: "prod-service", + from: 1, + to: 3, + inputURL: "http://example.com:8080/api/prepare", + expected: []endpoint{ + { + namespace: "prod-namespace", + podName: "prod-service-1", + url: mustParseURL(t, "http://prod-service-1.prod-service.prod-namespace.svc.cluster.local.:8080/api/prepare"), + replica: 1, + }, + { + namespace: "prod-namespace", + podName: "prod-service-2", + url: mustParseURL(t, "http://prod-service-2.prod-service.prod-namespace.svc.cluster.local.:8080/api/prepare"), + replica: 2, + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + inputURL, err := url.Parse(tc.inputURL) + require.NoError(t, err) + + result := createPrepareDownscaleEndpoints(tc.namespace, tc.serviceName, tc.from, tc.to, inputURL) + + assert.Equal(t, tc.expected, result) + }) + } +} + +func mustParseURL(t *testing.T, urlString string) url.URL { + u, err := url.Parse(urlString) + require.NoError(t, err) + return *u +} From e74c10fade60ae17d522dbaf7d152f5c894d1849 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 18 Oct 2024 11:37:29 +0200 Subject: [PATCH 02/25] Cut v0.20.1 (#177) --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 994ef73ed..6645bcb4d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## main / unreleased +## v0.20.1 + * [BUGFIX] Improved handling of URL ports in `createPrepareDownscaleEndpoints` function. The function now correctly preserves the port when replacing the host in the URL. #176 ## v0.20.0 From efbafdc2bdfc317a15ada1d98b2387d579d4c01f Mon Sep 17 00:00:00 2001 From: Christopher Li Date: Mon, 1 Apr 2024 23:51:13 -0700 Subject: [PATCH 03/25] add timeout --- cmd/rollout-operator/main.go | 4 +++- pkg/controller/controller.go | 7 ++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/cmd/rollout-operator/main.go b/cmd/rollout-operator/main.go index 913e12485..4fa48f4fc 100644 --- a/cmd/rollout-operator/main.go +++ b/cmd/rollout-operator/main.go @@ -62,6 +62,7 @@ type config struct { useZoneTracker bool zoneTrackerConfigMapName string + deletionInterval time.Duration } func (cfg *config) register(fs *flag.FlagSet) { @@ -88,6 +89,7 @@ func (cfg *config) register(fs *flag.FlagSet) { fs.BoolVar(&cfg.useZoneTracker, "use-zone-tracker", false, "Use the zone tracker to prevent simultaneous downscales in different zones") fs.StringVar(&cfg.zoneTrackerConfigMapName, "zone-tracker.config-map-name", "rollout-operator-zone-tracker", "The name of the ConfigMap to use for the zone tracker") + fs.DurationVar(&cfg.deletionInterval, "deletion-interval", 5*time.Minute, "time to wait before actually terminating the pod") } func (cfg config) validate() error { @@ -171,7 +173,7 @@ func main() { maybeStartTLSServer(cfg, logger, kubeClient, restart, metrics) // Init the controller. - c := controller.NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, cfg.kubeNamespace, httpClient, cfg.reconcileInterval, reg, logger) + c := controller.NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, cfg.kubeNamespace, httpClient, cfg.reconcileInterval, cfg.deletionInterval, reg, logger) check(errors.Wrap(c.Init(), "failed to init controller")) // Listen to sigterm, as well as for restart (like for certificate renewal). diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index aa633697a..27a35f27b 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -75,9 +75,11 @@ type RolloutController struct { // Keep track of discovered rollout groups. We use this information to delete metrics // related to rollout groups that have been decommissioned. discoveredGroups map[string]struct{} + + deletionInterval time.Duration } -func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTMapper, scaleClient scale.ScalesGetter, dynamic dynamic.Interface, namespace string, client httpClient, reconcileInterval time.Duration, reg prometheus.Registerer, logger log.Logger) *RolloutController { +func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTMapper, scaleClient scale.ScalesGetter, dynamic dynamic.Interface, namespace string, client httpClient, reconcileInterval time.Duration, deletionInterval time.Duration, reg prometheus.Registerer, logger log.Logger) *RolloutController { namespaceOpt := informers.WithNamespace(namespace) // Initialise the StatefulSet informer to restrict the returned StatefulSets to only the ones @@ -109,6 +111,7 @@ func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTM logger: logger, stopCh: make(chan struct{}), discoveredGroups: map[string]struct{}{}, + deletionInterval: deletionInterval, groupReconcileTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "rollout_operator_group_reconciles_total", Help: "Total number of reconciles started for a specific rollout group.", @@ -527,6 +530,8 @@ func (c *RolloutController) updateStatefulSetPods(ctx context.Context, sts *v1.S continue } + level.Info(c.logger).Log("msg", fmt.Sprintf("wait %s until terminating pod %s", c.deletionInterval.String(), pod.Name)) + time.Sleep(c.deletionInterval) level.Info(c.logger).Log("msg", fmt.Sprintf("terminating pod %s", pod.Name)) if err := c.kubeClient.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}); err != nil { return false, errors.Wrapf(err, "failed to delete pod %s", pod.Name) From 411f950e2eaad3f8f9b2d85f4d887af6c720a321 Mon Sep 17 00:00:00 2001 From: Christopher Li Date: Wed, 3 Apr 2024 18:16:35 -0700 Subject: [PATCH 04/25] add deletion option --- pkg/controller/controller.go | 44 ++++++++++++++++++++++++++++++++---- 1 file changed, 40 insertions(+), 4 deletions(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 27a35f27b..54e6821c2 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -66,6 +66,10 @@ type RolloutController struct { // Used to signal when the controller should stop. stopCh chan struct{} + //deletion interval related + deletionInterval time.Duration + deletionReadyTime time.Time + // Metrics. groupReconcileTotal *prometheus.CounterVec groupReconcileFailed *prometheus.CounterVec @@ -75,8 +79,6 @@ type RolloutController struct { // Keep track of discovered rollout groups. We use this information to delete metrics // related to rollout groups that have been decommissioned. discoveredGroups map[string]struct{} - - deletionInterval time.Duration } func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTMapper, scaleClient scale.ScalesGetter, dynamic dynamic.Interface, namespace string, client httpClient, reconcileInterval time.Duration, deletionInterval time.Duration, reg prometheus.Registerer, logger log.Logger) *RolloutController { @@ -291,6 +293,17 @@ func (c *RolloutController) reconcileStatefulSetsGroup(ctx context.Context, grou } } + //if the cluster is stable, and deletion ready time is not set, then set the deletion ready time + if len(notReadySets) == 0 && c.deletionReadyTime.IsZero() { + c.deletionReadyTime = time.Now().Add(c.deletionInterval) + level.Info(c.logger).Log("msg", "sts group healthy, setting deletion ready time if empty ", "deletionReadyTime", c.deletionReadyTime) + + //reset deletionReadyTime since the cluster is not ready anymore + } else if len(notReadySets) != 0 && !c.deletionReadyTime.IsZero() { + c.deletionReadyTime = time.Time{} + level.Info(c.logger).Log("msg", "sts group unhealthy, reset non-empty deletion ready time ", "deletionReadyTime", c.deletionReadyTime) + } + // Ensure there are not 2+ StatefulSets with not-Ready pods. If there are, we shouldn't proceed // rolling out pods and we should wait until these pods are Ready. The reason is that if there are // unavailable pods in multiple StatefulSets, this could lead to an outage, so we want pods to @@ -521,21 +534,44 @@ func (c *RolloutController) updateStatefulSetPods(ctx context.Context, sts *v1.S return true, nil } + deletionHappened := false for _, pod := range podsToUpdate[:numPods] { // Skip if the pod is terminating. Since "Terminating" is not a pod Phase, we can infer it by checking // if the pod is in the Running phase but the deletionTimestamp has been set (kubectl does something // similar too). + if pod.Status.Phase == corev1.PodRunning && pod.DeletionTimestamp != nil { level.Debug(c.logger).Log("msg", fmt.Sprintf("waiting for pod %s to be terminated", pod.Name)) continue } - level.Info(c.logger).Log("msg", fmt.Sprintf("wait %s until terminating pod %s", c.deletionInterval.String(), pod.Name)) - time.Sleep(c.deletionInterval) + now := time.Now() + // Check if deletionReadyTime is set + if c.deletionReadyTime.IsZero() { + // If not set, schedule deletion by setting deletionReadyTime to now + deletionInterval + c.deletionReadyTime = now.Add(c.deletionInterval) + level.Info(c.logger).Log("msg", "Scheduled future deletion for pod", "pod", pod.Name, "deletionReadyTime", c.deletionReadyTime) + break // Skip this loop iteration; wait for the deletionReadyTime + // Check if it's time to delete the pod + } else if now.Before(c.deletionReadyTime) { + level.Info(c.logger).Log("msg", "Waiting deletion ready before deleting pod", "pod", pod.Name, "deletionReadyTime", c.deletionReadyTime) + break // Not yet time to delete; skip this loop iteration + } + level.Info(c.logger).Log("msg", fmt.Sprintf("terminating pod %s", pod.Name)) if err := c.kubeClient.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}); err != nil { return false, errors.Wrapf(err, "failed to delete pod %s", pod.Name) + } else { + deletionHappened = true + level.Info(c.logger).Log("msg", fmt.Sprintf("pod %s successfully terminated", pod.Name), "deletionReadyTime", c.deletionReadyTime) } + + } + + //make sure no other pods can be deleted + if deletionHappened { + c.deletionReadyTime = time.Time{} + level.Info(c.logger).Log("msg", fmt.Sprintf("reset deletion ready time since deletion just happened")) } return true, nil From 0b73cbab05773774c4e503c1473d02cb851b5f0a Mon Sep 17 00:00:00 2001 From: Christopher Li Date: Wed, 3 Apr 2024 18:32:51 -0700 Subject: [PATCH 05/25] fix test --- .../manifests_rollout_operator_test.go | 1 + pkg/controller/controller.go | 32 ++++++++++--------- pkg/controller/controller_test.go | 6 ++-- 3 files changed, 21 insertions(+), 18 deletions(-) diff --git a/integration/manifests_rollout_operator_test.go b/integration/manifests_rollout_operator_test.go index 9f7a201d2..fa926888f 100644 --- a/integration/manifests_rollout_operator_test.go +++ b/integration/manifests_rollout_operator_test.go @@ -59,6 +59,7 @@ func rolloutOperatorDeployment(namespace string, webhook bool) *appsv1.Deploymen fmt.Sprintf("-kubernetes.namespace=%s", namespace), "-reconcile.interval=1s", "-log.level=debug", + "-deletion-interval=0s", } if webhook { args = append(args, diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 54e6821c2..6033edb07 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -67,7 +67,8 @@ type RolloutController struct { stopCh chan struct{} //deletion interval related - deletionInterval time.Duration + deletionInterval time.Duration + //TODO: convert this to a map of rollout group to deletion ready name deletionReadyTime time.Time // Metrics. @@ -535,6 +536,20 @@ func (c *RolloutController) updateStatefulSetPods(ctx context.Context, sts *v1.S } deletionHappened := false + + now := time.Now() + // Check if deletionReadyTime is set + if c.deletionReadyTime.IsZero() { + // If not set, schedule deletion by setting deletionReadyTime to now + deletionInterval + c.deletionReadyTime = now.Add(c.deletionInterval) + level.Info(c.logger).Log("msg", "Scheduled future deletion for sts pods", "sts", sts.Name, "deletionReadyTime", c.deletionReadyTime) + } + + if now.Before(c.deletionReadyTime) { + level.Info(c.logger).Log("msg", "Waiting deletion ready before deleting pod", "sts", sts.Name, "deletionReadyTime", c.deletionReadyTime) + return true, nil // Not yet time to delete; skip this loop iteration + } + for _, pod := range podsToUpdate[:numPods] { // Skip if the pod is terminating. Since "Terminating" is not a pod Phase, we can infer it by checking // if the pod is in the Running phase but the deletionTimestamp has been set (kubectl does something @@ -545,19 +560,6 @@ func (c *RolloutController) updateStatefulSetPods(ctx context.Context, sts *v1.S continue } - now := time.Now() - // Check if deletionReadyTime is set - if c.deletionReadyTime.IsZero() { - // If not set, schedule deletion by setting deletionReadyTime to now + deletionInterval - c.deletionReadyTime = now.Add(c.deletionInterval) - level.Info(c.logger).Log("msg", "Scheduled future deletion for pod", "pod", pod.Name, "deletionReadyTime", c.deletionReadyTime) - break // Skip this loop iteration; wait for the deletionReadyTime - // Check if it's time to delete the pod - } else if now.Before(c.deletionReadyTime) { - level.Info(c.logger).Log("msg", "Waiting deletion ready before deleting pod", "pod", pod.Name, "deletionReadyTime", c.deletionReadyTime) - break // Not yet time to delete; skip this loop iteration - } - level.Info(c.logger).Log("msg", fmt.Sprintf("terminating pod %s", pod.Name)) if err := c.kubeClient.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}); err != nil { return false, errors.Wrapf(err, "failed to delete pod %s", pod.Name) @@ -571,7 +573,7 @@ func (c *RolloutController) updateStatefulSetPods(ctx context.Context, sts *v1.S //make sure no other pods can be deleted if deletionHappened { c.deletionReadyTime = time.Time{} - level.Info(c.logger).Log("msg", fmt.Sprintf("reset deletion ready time since deletion just happened")) + level.Info(c.logger).Log("msg", "reset deletion ready time since deletion just happened") } return true, nil diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index c5a678d1c..2dfd5dc9f 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -616,7 +616,7 @@ func TestRolloutController_Reconcile(t *testing.T) { // Create the controller and start informers. reg := prometheus.NewPedanticRegistry() - c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, nil, 5*time.Second, reg, log.NewNopLogger()) + c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, nil, 5*time.Second, 0, reg, log.NewNopLogger()) require.NoError(t, c.Init()) defer c.Stop() @@ -925,7 +925,7 @@ func TestRolloutController_ReconcileStatefulsetWithDownscaleDelay(t *testing.T) // Create the controller and start informers. reg := prometheus.NewPedanticRegistry() - c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, httpClient, 5*time.Second, reg, log.NewNopLogger()) + c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, httpClient, 5*time.Second, 0, reg, log.NewNopLogger()) require.NoError(t, c.Init()) defer c.Stop() @@ -1028,7 +1028,7 @@ func TestRolloutController_ReconcileShouldDeleteMetricsForDecommissionedRolloutG // Create the controller and start informers. reg := prometheus.NewPedanticRegistry() - c := NewRolloutController(kubeClient, nil, nil, nil, testNamespace, nil, 5*time.Second, reg, log.NewNopLogger()) + c := NewRolloutController(kubeClient, nil, nil, nil, testNamespace, nil, 5*time.Second, 0, reg, log.NewNopLogger()) require.NoError(t, c.Init()) defer c.Stop() From 1a5099fa84583be9d015aedc3245c2e8263d3d86 Mon Sep 17 00:00:00 2001 From: Yi Jin Date: Fri, 26 Apr 2024 12:56:15 -0700 Subject: [PATCH 06/25] [PLAT-106952] only delay with longer time between sts update Signed-off-by: Yi Jin --- cmd/rollout-operator/main.go | 6 +- .../manifests_rollout_operator_test.go | 2 +- pkg/controller/controller.go | 71 ++++++++----------- 3 files changed, 34 insertions(+), 45 deletions(-) diff --git a/cmd/rollout-operator/main.go b/cmd/rollout-operator/main.go index 4fa48f4fc..050839662 100644 --- a/cmd/rollout-operator/main.go +++ b/cmd/rollout-operator/main.go @@ -63,6 +63,7 @@ type config struct { useZoneTracker bool zoneTrackerConfigMapName string deletionInterval time.Duration + delayBetweenSts time.Duration } func (cfg *config) register(fs *flag.FlagSet) { @@ -89,7 +90,8 @@ func (cfg *config) register(fs *flag.FlagSet) { fs.BoolVar(&cfg.useZoneTracker, "use-zone-tracker", false, "Use the zone tracker to prevent simultaneous downscales in different zones") fs.StringVar(&cfg.zoneTrackerConfigMapName, "zone-tracker.config-map-name", "rollout-operator-zone-tracker", "The name of the ConfigMap to use for the zone tracker") - fs.DurationVar(&cfg.deletionInterval, "deletion-interval", 5*time.Minute, "time to wait before actually terminating the pod") + fs.DurationVar(&cfg.deletionInterval, "deletion-interval", 5*time.Minute, "[Deprecated]time to wait before actually terminating the pod") + fs.DurationVar(&cfg.delayBetweenSts, "delay-between-sts", 5*time.Minute, "time to wait between stateful sets") } func (cfg config) validate() error { @@ -173,7 +175,7 @@ func main() { maybeStartTLSServer(cfg, logger, kubeClient, restart, metrics) // Init the controller. - c := controller.NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, cfg.kubeNamespace, httpClient, cfg.reconcileInterval, cfg.deletionInterval, reg, logger) + c := controller.NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, cfg.kubeNamespace, httpClient, cfg.reconcileInterval, cfg.delayBetweenSts, reg, logger) check(errors.Wrap(c.Init(), "failed to init controller")) // Listen to sigterm, as well as for restart (like for certificate renewal). diff --git a/integration/manifests_rollout_operator_test.go b/integration/manifests_rollout_operator_test.go index fa926888f..f87211e10 100644 --- a/integration/manifests_rollout_operator_test.go +++ b/integration/manifests_rollout_operator_test.go @@ -59,7 +59,7 @@ func rolloutOperatorDeployment(namespace string, webhook bool) *appsv1.Deploymen fmt.Sprintf("-kubernetes.namespace=%s", namespace), "-reconcile.interval=1s", "-log.level=debug", - "-deletion-interval=0s", + "-delay-between-sts=0s", } if webhook { args = append(args, diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 6033edb07..9ee75756f 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -67,9 +67,9 @@ type RolloutController struct { stopCh chan struct{} //deletion interval related - deletionInterval time.Duration - //TODO: convert this to a map of rollout group to deletion ready name - deletionReadyTime time.Time + delayBetweenSts time.Duration + lastUpdatedSts string + lastUpdatedTime time.Time // Metrics. groupReconcileTotal *prometheus.CounterVec @@ -82,7 +82,7 @@ type RolloutController struct { discoveredGroups map[string]struct{} } -func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTMapper, scaleClient scale.ScalesGetter, dynamic dynamic.Interface, namespace string, client httpClient, reconcileInterval time.Duration, deletionInterval time.Duration, reg prometheus.Registerer, logger log.Logger) *RolloutController { +func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTMapper, scaleClient scale.ScalesGetter, dynamic dynamic.Interface, namespace string, client httpClient, reconcileInterval time.Duration, delayBetweenSts time.Duration, reg prometheus.Registerer, logger log.Logger) *RolloutController { namespaceOpt := informers.WithNamespace(namespace) // Initialise the StatefulSet informer to restrict the returned StatefulSets to only the ones @@ -114,7 +114,9 @@ func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTM logger: logger, stopCh: make(chan struct{}), discoveredGroups: map[string]struct{}{}, - deletionInterval: deletionInterval, + delayBetweenSts: delayBetweenSts, + lastUpdatedSts: "", + lastUpdatedTime: time.Time{}, groupReconcileTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "rollout_operator_group_reconciles_total", Help: "Total number of reconciles started for a specific rollout group.", @@ -294,17 +296,6 @@ func (c *RolloutController) reconcileStatefulSetsGroup(ctx context.Context, grou } } - //if the cluster is stable, and deletion ready time is not set, then set the deletion ready time - if len(notReadySets) == 0 && c.deletionReadyTime.IsZero() { - c.deletionReadyTime = time.Now().Add(c.deletionInterval) - level.Info(c.logger).Log("msg", "sts group healthy, setting deletion ready time if empty ", "deletionReadyTime", c.deletionReadyTime) - - //reset deletionReadyTime since the cluster is not ready anymore - } else if len(notReadySets) != 0 && !c.deletionReadyTime.IsZero() { - c.deletionReadyTime = time.Time{} - level.Info(c.logger).Log("msg", "sts group unhealthy, reset non-empty deletion ready time ", "deletionReadyTime", c.deletionReadyTime) - } - // Ensure there are not 2+ StatefulSets with not-Ready pods. If there are, we shouldn't proceed // rolling out pods and we should wait until these pods are Ready. The reason is that if there are // unavailable pods in multiple StatefulSets, this could lead to an outage, so we want pods to @@ -319,19 +310,27 @@ func (c *RolloutController) reconcileStatefulSetsGroup(ctx context.Context, grou if len(notReadySets) == 1 { level.Info(c.logger).Log("msg", "a StatefulSet has some not-Ready pods, reconcile it first", "statefulset", notReadySets[0].Name) sets = util.MoveStatefulSetToFront(sets, notReadySets[0]) + // sts could become not ready by other activities like UKI, we should consider this as last updated sts + c.updateLastUpdatedSts(notReadySets[0].Name) } for _, sts := range sets { + if !c.canUpdateSts(sts.Name) { + time.Sleep(c.delayBetweenSts) + return nil + } ongoing, err := c.updateStatefulSetPods(ctx, sts) if err != nil { // Do not continue with other StatefulSets because this StatefulSet // is expected to be successfully updated before proceeding. + c.updateLastUpdatedSts(sts.Name) return errors.Wrapf(err, "failed to update StatefulSet %s", sts.Name) } if ongoing { // Do not continue with other StatefulSets because this StatefulSet // update is still ongoing. + c.updateLastUpdatedSts(sts.Name) return nil } } @@ -505,6 +504,20 @@ func (c *RolloutController) listPods(sel labels.Selector) ([]*corev1.Pod, error) return pods, nil } +func (c *RolloutController) canUpdateSts(sts string) bool { + if c.lastUpdatedSts == "" || c.lastUpdatedSts == sts { + // no need to wait within the same sts updates + return true + } + return time.Since(c.lastUpdatedTime) > c.delayBetweenSts +} + +func (c *RolloutController) updateLastUpdatedSts(sts string) { + level.Info(c.logger).Log("msg", "update last updated sts", "prev", c.lastUpdatedSts, "curr", sts) + c.lastUpdatedSts = sts + c.lastUpdatedTime = time.Now() +} + func (c *RolloutController) updateStatefulSetPods(ctx context.Context, sts *v1.StatefulSet) (bool, error) { level.Debug(c.logger).Log("msg", "reconciling StatefulSet", "statefulset", sts.Name) @@ -535,26 +548,10 @@ func (c *RolloutController) updateStatefulSetPods(ctx context.Context, sts *v1.S return true, nil } - deletionHappened := false - - now := time.Now() - // Check if deletionReadyTime is set - if c.deletionReadyTime.IsZero() { - // If not set, schedule deletion by setting deletionReadyTime to now + deletionInterval - c.deletionReadyTime = now.Add(c.deletionInterval) - level.Info(c.logger).Log("msg", "Scheduled future deletion for sts pods", "sts", sts.Name, "deletionReadyTime", c.deletionReadyTime) - } - - if now.Before(c.deletionReadyTime) { - level.Info(c.logger).Log("msg", "Waiting deletion ready before deleting pod", "sts", sts.Name, "deletionReadyTime", c.deletionReadyTime) - return true, nil // Not yet time to delete; skip this loop iteration - } - for _, pod := range podsToUpdate[:numPods] { // Skip if the pod is terminating. Since "Terminating" is not a pod Phase, we can infer it by checking // if the pod is in the Running phase but the deletionTimestamp has been set (kubectl does something // similar too). - if pod.Status.Phase == corev1.PodRunning && pod.DeletionTimestamp != nil { level.Debug(c.logger).Log("msg", fmt.Sprintf("waiting for pod %s to be terminated", pod.Name)) continue @@ -563,17 +560,7 @@ func (c *RolloutController) updateStatefulSetPods(ctx context.Context, sts *v1.S level.Info(c.logger).Log("msg", fmt.Sprintf("terminating pod %s", pod.Name)) if err := c.kubeClient.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}); err != nil { return false, errors.Wrapf(err, "failed to delete pod %s", pod.Name) - } else { - deletionHappened = true - level.Info(c.logger).Log("msg", fmt.Sprintf("pod %s successfully terminated", pod.Name), "deletionReadyTime", c.deletionReadyTime) } - - } - - //make sure no other pods can be deleted - if deletionHappened { - c.deletionReadyTime = time.Time{} - level.Info(c.logger).Log("msg", "reset deletion ready time since deletion just happened") } return true, nil From 86357e38ddc8d2ffb06b4f3b138a66b846fedf38 Mon Sep 17 00:00:00 2001 From: Yi Jin Date: Fri, 26 Apr 2024 17:32:55 -0700 Subject: [PATCH 07/25] [PLAT-106952] log when the delay happens Signed-off-by: Yi Jin --- pkg/controller/controller.go | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 9ee75756f..cbdf5ec8a 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -68,7 +68,7 @@ type RolloutController struct { //deletion interval related delayBetweenSts time.Duration - lastUpdatedSts string + lastUpdatedSts *v1.StatefulSet lastUpdatedTime time.Time // Metrics. @@ -115,7 +115,7 @@ func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTM stopCh: make(chan struct{}), discoveredGroups: map[string]struct{}{}, delayBetweenSts: delayBetweenSts, - lastUpdatedSts: "", + lastUpdatedSts: nil, lastUpdatedTime: time.Time{}, groupReconcileTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "rollout_operator_group_reconciles_total", @@ -311,11 +311,13 @@ func (c *RolloutController) reconcileStatefulSetsGroup(ctx context.Context, grou level.Info(c.logger).Log("msg", "a StatefulSet has some not-Ready pods, reconcile it first", "statefulset", notReadySets[0].Name) sets = util.MoveStatefulSetToFront(sets, notReadySets[0]) // sts could become not ready by other activities like UKI, we should consider this as last updated sts - c.updateLastUpdatedSts(notReadySets[0].Name) + c.updateLastUpdatedSts(notReadySets[0]) } for _, sts := range sets { - if !c.canUpdateSts(sts.Name) { + if !c.canUpdateSts(sts) { + level.Info(c.logger).Log("msg", "delaying reconcile due to last updated sts is not ready yet", + "last", c.lastUpdatedSts.Name, "curr", sts.Name, "delay", c.delayBetweenSts) time.Sleep(c.delayBetweenSts) return nil } @@ -323,14 +325,14 @@ func (c *RolloutController) reconcileStatefulSetsGroup(ctx context.Context, grou if err != nil { // Do not continue with other StatefulSets because this StatefulSet // is expected to be successfully updated before proceeding. - c.updateLastUpdatedSts(sts.Name) + c.updateLastUpdatedSts(sts) return errors.Wrapf(err, "failed to update StatefulSet %s", sts.Name) } if ongoing { // Do not continue with other StatefulSets because this StatefulSet // update is still ongoing. - c.updateLastUpdatedSts(sts.Name) + c.updateLastUpdatedSts(sts) return nil } } @@ -504,16 +506,15 @@ func (c *RolloutController) listPods(sel labels.Selector) ([]*corev1.Pod, error) return pods, nil } -func (c *RolloutController) canUpdateSts(sts string) bool { - if c.lastUpdatedSts == "" || c.lastUpdatedSts == sts { +func (c *RolloutController) canUpdateSts(sts *v1.StatefulSet) bool { + if c.lastUpdatedSts == nil || c.lastUpdatedSts.Name == sts.Name { // no need to wait within the same sts updates return true } return time.Since(c.lastUpdatedTime) > c.delayBetweenSts } -func (c *RolloutController) updateLastUpdatedSts(sts string) { - level.Info(c.logger).Log("msg", "update last updated sts", "prev", c.lastUpdatedSts, "curr", sts) +func (c *RolloutController) updateLastUpdatedSts(sts *v1.StatefulSet) { c.lastUpdatedSts = sts c.lastUpdatedTime = time.Now() } From 4071522410b269c56fd77b915a2dd27a2928e5e3 Mon Sep 17 00:00:00 2001 From: Yi Jin Date: Sun, 28 Apr 2024 15:10:29 -0700 Subject: [PATCH 08/25] [PLAT-106952] fix the bug that the delay should only happen if the sts has pods to be updated Signed-off-by: Yi Jin --- pkg/controller/controller.go | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index cbdf5ec8a..3d63e36a4 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -315,17 +315,10 @@ func (c *RolloutController) reconcileStatefulSetsGroup(ctx context.Context, grou } for _, sts := range sets { - if !c.canUpdateSts(sts) { - level.Info(c.logger).Log("msg", "delaying reconcile due to last updated sts is not ready yet", - "last", c.lastUpdatedSts.Name, "curr", sts.Name, "delay", c.delayBetweenSts) - time.Sleep(c.delayBetweenSts) - return nil - } ongoing, err := c.updateStatefulSetPods(ctx, sts) if err != nil { // Do not continue with other StatefulSets because this StatefulSet // is expected to be successfully updated before proceeding. - c.updateLastUpdatedSts(sts) return errors.Wrapf(err, "failed to update StatefulSet %s", sts.Name) } @@ -517,6 +510,7 @@ func (c *RolloutController) canUpdateSts(sts *v1.StatefulSet) bool { func (c *RolloutController) updateLastUpdatedSts(sts *v1.StatefulSet) { c.lastUpdatedSts = sts c.lastUpdatedTime = time.Now() + level.Debug(c.logger).Log("msg", "updated last updated sts", "sts", sts.Name, "time", c.lastUpdatedTime) } func (c *RolloutController) updateStatefulSetPods(ctx context.Context, sts *v1.StatefulSet) (bool, error) { @@ -528,6 +522,19 @@ func (c *RolloutController) updateStatefulSetPods(ctx context.Context, sts *v1.S } if len(podsToUpdate) > 0 { + if !c.canUpdateSts(sts) { + // only check canUpdateSts if the current sts has pods to be updated + level.Info(c.logger).Log("msg", "delaying reconcile between StatefulSets updates", + "curr", sts.Name, "last", c.lastUpdatedSts.Name, + "delay", c.delayBetweenSts, "pods_to_update", len(podsToUpdate)) + time.Sleep(c.delayBetweenSts) + // MUST return here: + // 1. pods state could change during the delay period, scan the entire cluster again + // 2. since we didn't actually update current sts, the last updated sts should not be changed + // 3. throw errors to not proceed with other StatefulSets + return false, errors.New("delaying reconcile between StatefulSets updates") + } + maxUnavailable := getMaxUnavailableForStatefulSet(sts, c.logger) numNotReady := int(sts.Status.Replicas - sts.Status.ReadyReplicas) From 99eef6f62c6b2af266c8cc075dec6e358f95d3c1 Mon Sep 17 00:00:00 2001 From: Yuchen Wang Date: Sun, 30 Jun 2024 02:08:44 -0700 Subject: [PATCH 09/25] update sts until all pods are available --- pkg/controller/controller.go | 12 +++++++++--- pkg/controller/controller_test.go | 12 ++++++++---- 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 3d63e36a4..eedb4c9c6 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -536,12 +536,12 @@ func (c *RolloutController) updateStatefulSetPods(ctx context.Context, sts *v1.S } maxUnavailable := getMaxUnavailableForStatefulSet(sts, c.logger) - numNotReady := int(sts.Status.Replicas - sts.Status.ReadyReplicas) + numNotAvailable := int(sts.Status.Replicas - sts.Status.AvailableReplicas) // Compute the number of pods we should update, honoring the configured maxUnavailable. numPods := max(0, min( - maxUnavailable-numNotReady, // No more than the configured maxUnavailable (including not-Ready pods). - len(podsToUpdate), // No more than the total number of pods that need to be updated. + maxUnavailable-numNotAvailable, // No more than the configured maxUnavailable (including not-Ready pods). + len(podsToUpdate), // No more than the total number of pods that need to be updated. )) if numPods == 0 { @@ -551,6 +551,7 @@ func (c *RolloutController) updateStatefulSetPods(ctx context.Context, sts *v1.S "pods_to_update", len(podsToUpdate), "replicas", sts.Status.Replicas, "ready_replicas", sts.Status.ReadyReplicas, + "available_replicas", sts.Status.AvailableReplicas, "max_unavailable", maxUnavailable) return true, nil @@ -584,6 +585,11 @@ func (c *RolloutController) updateStatefulSetPods(ctx context.Context, sts *v1.S "statefulset", sts.Name) return true, nil + } else if sts.Status.Replicas != sts.Status.AvailableReplicas { + level.Info(c.logger).Log( + "msg", "StatefulSet pods are all updated and ready but StatefulSet has some not-Available replicas", + "statefulset", sts.Name) + return true, nil } // At this point there are no pods to update, so we can update the currentRevision in the StatefulSet. diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 2dfd5dc9f..251feff86 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -165,6 +165,7 @@ func TestRolloutController_Reconcile(t *testing.T) { mockStatefulSet("ingester-zone-b", withPrevRevision(), func(sts *v1.StatefulSet) { sts.Status.Replicas = 3 sts.Status.ReadyReplicas = 2 + sts.Status.AvailableReplicas = 2 }), }, pods: []runtime.Object{ @@ -183,6 +184,7 @@ func TestRolloutController_Reconcile(t *testing.T) { mockStatefulSet("ingester-zone-b", withPrevRevision(), func(sts *v1.StatefulSet) { sts.Status.Replicas = 3 sts.Status.ReadyReplicas = 1 + sts.Status.AvailableReplicas = 1 }), }, pods: []runtime.Object{ @@ -1110,10 +1112,11 @@ func mockStatefulSet(name string, overrides ...func(sts *v1.StatefulSet)) *v1.St }, }, Status: v1.StatefulSetStatus{ - Replicas: 3, - ReadyReplicas: 3, - CurrentRevision: testLastRevisionHash, - UpdateRevision: testLastRevisionHash, + Replicas: 3, + ReadyReplicas: 3, + AvailableReplicas: 3, + CurrentRevision: testLastRevisionHash, + UpdateRevision: testLastRevisionHash, }, } @@ -1167,6 +1170,7 @@ func withReplicas(totalReplicas, readyReplicas int32) func(sts *v1.StatefulSet) sts.Spec.Replicas = &totalReplicas sts.Status.Replicas = totalReplicas sts.Status.ReadyReplicas = readyReplicas + sts.Status.AvailableReplicas = readyReplicas } } From 95baac55cce10458e12ba2664e00439904e7b21f Mon Sep 17 00:00:00 2001 From: Yuchen Wang Date: Sun, 30 Jun 2024 15:29:06 -0700 Subject: [PATCH 10/25] add debug hook --- pkg/controller/controller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index eedb4c9c6..bde8aed64 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -217,7 +217,7 @@ func (c *RolloutController) reconcile(ctx context.Context) error { span, ctx := opentracing.StartSpanFromContext(ctx, "RolloutController.reconcile()") defer span.Finish() - level.Info(c.logger).Log("msg", "reconcile started") + level.Info(c.logger).Log("msg", "reconcile started with minReadySeconds support") sets, err := c.listStatefulSetsWithRolloutGroup() if err != nil { From dfd7dbdb048d8a6572d992076e20f3beeedfe723 Mon Sep 17 00:00:00 2001 From: Yuchen Wang Date: Sun, 30 Jun 2024 16:10:23 -0700 Subject: [PATCH 11/25] rm debug hook --- pkg/controller/controller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index bde8aed64..eedb4c9c6 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -217,7 +217,7 @@ func (c *RolloutController) reconcile(ctx context.Context) error { span, ctx := opentracing.StartSpanFromContext(ctx, "RolloutController.reconcile()") defer span.Finish() - level.Info(c.logger).Log("msg", "reconcile started with minReadySeconds support") + level.Info(c.logger).Log("msg", "reconcile started") sets, err := c.listStatefulSetsWithRolloutGroup() if err != nil { From 387b3ade65777d81af9eda42fc9328c972842cab Mon Sep 17 00:00:00 2001 From: Yuchen Wang Date: Tue, 2 Jul 2024 18:10:35 -0700 Subject: [PATCH 12/25] rm legacy delay --- cmd/rollout-operator/main.go | 6 +-- .../manifests_rollout_operator_test.go | 1 - pkg/controller/controller.go | 42 +------------------ pkg/controller/controller_test.go | 6 +-- 4 files changed, 6 insertions(+), 49 deletions(-) diff --git a/cmd/rollout-operator/main.go b/cmd/rollout-operator/main.go index 050839662..913e12485 100644 --- a/cmd/rollout-operator/main.go +++ b/cmd/rollout-operator/main.go @@ -62,8 +62,6 @@ type config struct { useZoneTracker bool zoneTrackerConfigMapName string - deletionInterval time.Duration - delayBetweenSts time.Duration } func (cfg *config) register(fs *flag.FlagSet) { @@ -90,8 +88,6 @@ func (cfg *config) register(fs *flag.FlagSet) { fs.BoolVar(&cfg.useZoneTracker, "use-zone-tracker", false, "Use the zone tracker to prevent simultaneous downscales in different zones") fs.StringVar(&cfg.zoneTrackerConfigMapName, "zone-tracker.config-map-name", "rollout-operator-zone-tracker", "The name of the ConfigMap to use for the zone tracker") - fs.DurationVar(&cfg.deletionInterval, "deletion-interval", 5*time.Minute, "[Deprecated]time to wait before actually terminating the pod") - fs.DurationVar(&cfg.delayBetweenSts, "delay-between-sts", 5*time.Minute, "time to wait between stateful sets") } func (cfg config) validate() error { @@ -175,7 +171,7 @@ func main() { maybeStartTLSServer(cfg, logger, kubeClient, restart, metrics) // Init the controller. - c := controller.NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, cfg.kubeNamespace, httpClient, cfg.reconcileInterval, cfg.delayBetweenSts, reg, logger) + c := controller.NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, cfg.kubeNamespace, httpClient, cfg.reconcileInterval, reg, logger) check(errors.Wrap(c.Init(), "failed to init controller")) // Listen to sigterm, as well as for restart (like for certificate renewal). diff --git a/integration/manifests_rollout_operator_test.go b/integration/manifests_rollout_operator_test.go index f87211e10..9f7a201d2 100644 --- a/integration/manifests_rollout_operator_test.go +++ b/integration/manifests_rollout_operator_test.go @@ -59,7 +59,6 @@ func rolloutOperatorDeployment(namespace string, webhook bool) *appsv1.Deploymen fmt.Sprintf("-kubernetes.namespace=%s", namespace), "-reconcile.interval=1s", "-log.level=debug", - "-delay-between-sts=0s", } if webhook { args = append(args, diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index eedb4c9c6..bb4572551 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -66,11 +66,6 @@ type RolloutController struct { // Used to signal when the controller should stop. stopCh chan struct{} - //deletion interval related - delayBetweenSts time.Duration - lastUpdatedSts *v1.StatefulSet - lastUpdatedTime time.Time - // Metrics. groupReconcileTotal *prometheus.CounterVec groupReconcileFailed *prometheus.CounterVec @@ -82,7 +77,7 @@ type RolloutController struct { discoveredGroups map[string]struct{} } -func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTMapper, scaleClient scale.ScalesGetter, dynamic dynamic.Interface, namespace string, client httpClient, reconcileInterval time.Duration, delayBetweenSts time.Duration, reg prometheus.Registerer, logger log.Logger) *RolloutController { +func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTMapper, scaleClient scale.ScalesGetter, dynamic dynamic.Interface, namespace string, client httpClient, reconcileInterval time.Duration, reg prometheus.Registerer, logger log.Logger) *RolloutController { namespaceOpt := informers.WithNamespace(namespace) // Initialise the StatefulSet informer to restrict the returned StatefulSets to only the ones @@ -114,9 +109,6 @@ func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTM logger: logger, stopCh: make(chan struct{}), discoveredGroups: map[string]struct{}{}, - delayBetweenSts: delayBetweenSts, - lastUpdatedSts: nil, - lastUpdatedTime: time.Time{}, groupReconcileTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ Name: "rollout_operator_group_reconciles_total", Help: "Total number of reconciles started for a specific rollout group.", @@ -217,7 +209,7 @@ func (c *RolloutController) reconcile(ctx context.Context) error { span, ctx := opentracing.StartSpanFromContext(ctx, "RolloutController.reconcile()") defer span.Finish() - level.Info(c.logger).Log("msg", "reconcile started") + level.Info(c.logger).Log("msg", "reconcile started (minReadySeconds version)") sets, err := c.listStatefulSetsWithRolloutGroup() if err != nil { @@ -310,8 +302,6 @@ func (c *RolloutController) reconcileStatefulSetsGroup(ctx context.Context, grou if len(notReadySets) == 1 { level.Info(c.logger).Log("msg", "a StatefulSet has some not-Ready pods, reconcile it first", "statefulset", notReadySets[0].Name) sets = util.MoveStatefulSetToFront(sets, notReadySets[0]) - // sts could become not ready by other activities like UKI, we should consider this as last updated sts - c.updateLastUpdatedSts(notReadySets[0]) } for _, sts := range sets { @@ -325,7 +315,6 @@ func (c *RolloutController) reconcileStatefulSetsGroup(ctx context.Context, grou if ongoing { // Do not continue with other StatefulSets because this StatefulSet // update is still ongoing. - c.updateLastUpdatedSts(sts) return nil } } @@ -499,20 +488,6 @@ func (c *RolloutController) listPods(sel labels.Selector) ([]*corev1.Pod, error) return pods, nil } -func (c *RolloutController) canUpdateSts(sts *v1.StatefulSet) bool { - if c.lastUpdatedSts == nil || c.lastUpdatedSts.Name == sts.Name { - // no need to wait within the same sts updates - return true - } - return time.Since(c.lastUpdatedTime) > c.delayBetweenSts -} - -func (c *RolloutController) updateLastUpdatedSts(sts *v1.StatefulSet) { - c.lastUpdatedSts = sts - c.lastUpdatedTime = time.Now() - level.Debug(c.logger).Log("msg", "updated last updated sts", "sts", sts.Name, "time", c.lastUpdatedTime) -} - func (c *RolloutController) updateStatefulSetPods(ctx context.Context, sts *v1.StatefulSet) (bool, error) { level.Debug(c.logger).Log("msg", "reconciling StatefulSet", "statefulset", sts.Name) @@ -522,19 +497,6 @@ func (c *RolloutController) updateStatefulSetPods(ctx context.Context, sts *v1.S } if len(podsToUpdate) > 0 { - if !c.canUpdateSts(sts) { - // only check canUpdateSts if the current sts has pods to be updated - level.Info(c.logger).Log("msg", "delaying reconcile between StatefulSets updates", - "curr", sts.Name, "last", c.lastUpdatedSts.Name, - "delay", c.delayBetweenSts, "pods_to_update", len(podsToUpdate)) - time.Sleep(c.delayBetweenSts) - // MUST return here: - // 1. pods state could change during the delay period, scan the entire cluster again - // 2. since we didn't actually update current sts, the last updated sts should not be changed - // 3. throw errors to not proceed with other StatefulSets - return false, errors.New("delaying reconcile between StatefulSets updates") - } - maxUnavailable := getMaxUnavailableForStatefulSet(sts, c.logger) numNotAvailable := int(sts.Status.Replicas - sts.Status.AvailableReplicas) diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 251feff86..8e75c47fb 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -618,7 +618,7 @@ func TestRolloutController_Reconcile(t *testing.T) { // Create the controller and start informers. reg := prometheus.NewPedanticRegistry() - c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, nil, 5*time.Second, 0, reg, log.NewNopLogger()) + c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, nil, 0, reg, log.NewNopLogger()) require.NoError(t, c.Init()) defer c.Stop() @@ -927,7 +927,7 @@ func TestRolloutController_ReconcileStatefulsetWithDownscaleDelay(t *testing.T) // Create the controller and start informers. reg := prometheus.NewPedanticRegistry() - c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, httpClient, 5*time.Second, 0, reg, log.NewNopLogger()) + c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, httpClient, 0, reg, log.NewNopLogger()) require.NoError(t, c.Init()) defer c.Stop() @@ -1030,7 +1030,7 @@ func TestRolloutController_ReconcileShouldDeleteMetricsForDecommissionedRolloutG // Create the controller and start informers. reg := prometheus.NewPedanticRegistry() - c := NewRolloutController(kubeClient, nil, nil, nil, testNamespace, nil, 5*time.Second, 0, reg, log.NewNopLogger()) + c := NewRolloutController(kubeClient, nil, nil, nil, testNamespace, nil, 0, reg, log.NewNopLogger()) require.NoError(t, c.Init()) defer c.Stop() From a1a4552d53f620e1135e223dad45dddb09056ef1 Mon Sep 17 00:00:00 2001 From: Yuchen Wang Date: Tue, 2 Jul 2024 18:27:20 -0700 Subject: [PATCH 13/25] update integration test --- integration/manifests_mock_service_test.go | 1 - pkg/controller/controller.go | 3 ++- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/integration/manifests_mock_service_test.go b/integration/manifests_mock_service_test.go index 7605891c5..09a3735b8 100644 --- a/integration/manifests_mock_service_test.go +++ b/integration/manifests_mock_service_test.go @@ -22,7 +22,6 @@ func createMockServiceZone(t *testing.T, ctx context.Context, api *kubernetes.Cl _, err := api.AppsV1().StatefulSets(namespace).Create(ctx, mockServiceStatefulSet(name, "1", true), metav1.CreateOptions{}) require.NoError(t, err, "Can't create StatefulSet") } - { _, err := api.CoreV1().Services(namespace).Create(ctx, mockServiceService(name), metav1.CreateOptions{}) require.NoError(t, err, "Can't create Service") diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index bb4572551..216bf1f9b 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -498,7 +498,8 @@ func (c *RolloutController) updateStatefulSetPods(ctx context.Context, sts *v1.S if len(podsToUpdate) > 0 { maxUnavailable := getMaxUnavailableForStatefulSet(sts, c.logger) - numNotAvailable := int(sts.Status.Replicas - sts.Status.AvailableReplicas) + //numNotAvailable := int(sts.Status.Replicas - sts.Status.AvailableReplicas) + numNotAvailable := int(sts.Status.Replicas - sts.Status.ReadyReplicas) // Compute the number of pods we should update, honoring the configured maxUnavailable. numPods := max(0, min( From ae93176866f1ba740449f90a7046c38da0dfdfec Mon Sep 17 00:00:00 2001 From: Yuchen Wang Date: Tue, 2 Jul 2024 21:17:26 -0700 Subject: [PATCH 14/25] split code path with MinReadySeconds value --- pkg/controller/controller.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 216bf1f9b..bcab67ed8 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -498,8 +498,13 @@ func (c *RolloutController) updateStatefulSetPods(ctx context.Context, sts *v1.S if len(podsToUpdate) > 0 { maxUnavailable := getMaxUnavailableForStatefulSet(sts, c.logger) - //numNotAvailable := int(sts.Status.Replicas - sts.Status.AvailableReplicas) - numNotAvailable := int(sts.Status.Replicas - sts.Status.ReadyReplicas) + var numNotAvailable int + if sts.Spec.MinReadySeconds > 0 { + level.Info(c.logger).Log("msg", "StatefulSet has minReadySeconds set, waiting before terminating pods", "statefulset", sts.Name, "min_ready_seconds", sts.Spec.MinReadySeconds) + numNotAvailable = int(sts.Status.Replicas - sts.Status.AvailableReplicas) + } else { + numNotAvailable = int(sts.Status.Replicas - sts.Status.ReadyReplicas) + } // Compute the number of pods we should update, honoring the configured maxUnavailable. numPods := max(0, min( @@ -548,7 +553,7 @@ func (c *RolloutController) updateStatefulSetPods(ctx context.Context, sts *v1.S "statefulset", sts.Name) return true, nil - } else if sts.Status.Replicas != sts.Status.AvailableReplicas { + } else if sts.Spec.MinReadySeconds > 0 && sts.Status.Replicas != sts.Status.AvailableReplicas { level.Info(c.logger).Log( "msg", "StatefulSet pods are all updated and ready but StatefulSet has some not-Available replicas", "statefulset", sts.Name) From 4f538ecd4062894a6667a6aee781796596fcb4cb Mon Sep 17 00:00:00 2001 From: Yuchen Wang Date: Sun, 13 Oct 2024 03:53:54 -0700 Subject: [PATCH 15/25] update downscale --- pkg/controller/delay.go | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/pkg/controller/delay.go b/pkg/controller/delay.go index 0762dded6..02c685807 100644 --- a/pkg/controller/delay.go +++ b/pkg/controller/delay.go @@ -20,6 +20,13 @@ import ( "github.com/grafana/rollout-operator/pkg/config" ) +func getStsSvcName(sts *v1.StatefulSet) string { + if sts.Spec.ServiceName != "" { + return sts.Spec.ServiceName + } + return sts.GetName() +} + func cancelDelayedDownscaleIfConfigured(ctx context.Context, logger log.Logger, sts *v1.StatefulSet, httpClient httpClient, replicas int32) { delay, prepareURL, err := parseDelayedDownscaleAnnotations(sts.GetAnnotations()) if delay == 0 || prepareURL == nil { @@ -31,7 +38,7 @@ func cancelDelayedDownscaleIfConfigured(ctx context.Context, logger log.Logger, return } - endpoints := createPrepareDownscaleEndpoints(sts.Namespace, sts.GetName(), 0, int(replicas), prepareURL) + endpoints := createPrepareDownscaleEndpoints(sts.Namespace, sts.GetName(), getStsSvcName(sts), 0, int(replicas), prepareURL) callCancelDelayedDownscale(ctx, logger, httpClient, endpoints) } @@ -54,19 +61,19 @@ func checkScalingDelay(ctx context.Context, logger log.Logger, sts *v1.StatefulS } if desiredReplicas >= currentReplicas { - callCancelDelayedDownscale(ctx, logger, httpClient, createPrepareDownscaleEndpoints(sts.Namespace, sts.GetName(), 0, int(currentReplicas), prepareURL)) + callCancelDelayedDownscale(ctx, logger, httpClient, createPrepareDownscaleEndpoints(sts.Namespace, sts.GetName(), getStsSvcName(sts), 0, int(currentReplicas), prepareURL)) // Proceed even if calling cancel of delayed downscale fails. We call cancellation repeatedly, so it will happen during next reconcile. return desiredReplicas, nil } { // Replicas in [0, desired) interval should cancel any delayed downscale, if they have any. - cancelEndpoints := createPrepareDownscaleEndpoints(sts.Namespace, sts.GetName(), 0, int(desiredReplicas), prepareURL) + cancelEndpoints := createPrepareDownscaleEndpoints(sts.Namespace, sts.GetName(), getStsSvcName(sts), 0, int(desiredReplicas), prepareURL) callCancelDelayedDownscale(ctx, logger, httpClient, cancelEndpoints) } // Replicas in [desired, current) interval are going to be stopped. - downscaleEndpoints := createPrepareDownscaleEndpoints(sts.Namespace, sts.GetName(), int(desiredReplicas), int(currentReplicas), prepareURL) + downscaleEndpoints := createPrepareDownscaleEndpoints(sts.Namespace, sts.GetName(), getStsSvcName(sts), int(desiredReplicas), int(currentReplicas), prepareURL) elapsedTimeSinceDownscaleInitiated, err := callPrepareDownscaleAndReturnElapsedDurationsSinceInitiatedDownscale(ctx, logger, httpClient, downscaleEndpoints) if err != nil { return currentReplicas, fmt.Errorf("failed prepare pods for delayed downscale: %v", err) @@ -131,7 +138,7 @@ type endpoint struct { } // Create prepare-downscale endpoints for pods with index in [from, to) range. URL is fully reused except for host, which is replaced with pod's FQDN. -func createPrepareDownscaleEndpoints(namespace, serviceName string, from, to int, url *url.URL) []endpoint { +func createPrepareDownscaleEndpoints(namespace, stsName, serviceName string, from, to int, url *url.URL) []endpoint { eps := make([]endpoint, 0, to-from) // The DNS entry for a pod of a stateful set is @@ -142,7 +149,7 @@ func createPrepareDownscaleEndpoints(namespace, serviceName string, from, to int for index := from; index < to; index++ { ep := endpoint{ namespace: namespace, - podName: fmt.Sprintf("%v-%v", serviceName, index), + podName: fmt.Sprintf("%v-%v", stsName, index), replica: index, } From 4c518ec53a97aa1d3186eb3def854a201a7537d3 Mon Sep 17 00:00:00 2001 From: Yuchen Wang Date: Thu, 17 Oct 2024 14:24:52 -0700 Subject: [PATCH 16/25] update downscale --- pkg/controller/custom_resource_replicas.go | 7 +- pkg/controller/delay.go | 109 +++++++++++++++++++++ 2 files changed, 115 insertions(+), 1 deletion(-) diff --git a/pkg/controller/custom_resource_replicas.go b/pkg/controller/custom_resource_replicas.go index b81551b88..e7c251353 100644 --- a/pkg/controller/custom_resource_replicas.go +++ b/pkg/controller/custom_resource_replicas.go @@ -46,7 +46,12 @@ func (c *RolloutController) adjustStatefulSetsGroupReplicasToMirrorResource(ctx // We're going to change number of replicas on the statefulset. // If there is delayed downscale configured on the statefulset, we will first handle delay part, and only if that succeeds, // continue with downscaling or upscaling. - desiredReplicas, err := checkScalingDelay(ctx, c.logger, sts, client, currentReplicas, referenceResourceDesiredReplicas) + var desiredReplicas int32 + if sts.GetAnnotations()[config.RolloutDelayedDownscaleAnnotationKey] == "null" { + desiredReplicas, err = checkScalable(ctx, c.logger, sts, client, currentReplicas, referenceResourceDesiredReplicas) + } else { + desiredReplicas, err = checkScalingDelay(ctx, c.logger, sts, client, currentReplicas, referenceResourceDesiredReplicas) + } if err != nil { level.Warn(c.logger).Log("msg", "not scaling statefulset due to failed scaling delay check", "group", groupName, diff --git a/pkg/controller/delay.go b/pkg/controller/delay.go index 02c685807..0cdec10c3 100644 --- a/pkg/controller/delay.go +++ b/pkg/controller/delay.go @@ -43,6 +43,46 @@ func cancelDelayedDownscaleIfConfigured(ctx context.Context, logger log.Logger, callCancelDelayedDownscale(ctx, logger, httpClient, endpoints) } +func checkScalable(ctx context.Context, logger log.Logger, sts *v1.StatefulSet, httpClient httpClient, currentReplicas, desiredReplicas int32) (updatedDesiredReplicas int32, _ error) { + if desiredReplicas >= currentReplicas { + return desiredReplicas, nil + } + + prepareURL, err := parseDownscaleURLAnnotation(sts.GetAnnotations()) + if prepareURL == nil || err != nil { + return currentReplicas, err + } + downscaleEndpoints := createPrepareDownscaleEndpoints(sts.Namespace, sts.GetName(), getStsSvcName(sts), int(desiredReplicas), int(currentReplicas), prepareURL) + scalableBooleans, err := callPerpareDownscaleAndReturnScalable(ctx, logger, httpClient, downscaleEndpoints) + if err != nil { + return currentReplicas, fmt.Errorf("failed prepare pods for delayed downscale: %v", err) + } + + // Find how many pods from the end of statefulset we can already scale down + allowedDesiredReplicas := currentReplicas + for replica := currentReplicas - 1; replica >= desiredReplicas; replica-- { + scalable, ok := scalableBooleans[int(replica)] + if !ok { + break + } + + if !scalable { + break + } + + // We can scale down this replica + allowedDesiredReplicas-- + } + + if allowedDesiredReplicas == currentReplicas { + return currentReplicas, fmt.Errorf("downscale not possible for any pods at the end of statefulset replicas range") + } + + // We can proceed with downscale on at least one pod. + level.Info(logger).Log("msg", "downscale possible on some pods", "name", sts.GetName(), "originalDesiredReplicas", desiredReplicas, "allowedDesiredReplicas", allowedDesiredReplicas) + return allowedDesiredReplicas, nil +} + // Checks if downscale delay has been reached on replicas in [desiredReplicas, currentReplicas) range. // If there is a range of replicas at the end of statefulset for which delay has been reached, this function // returns updated desired replicas that statefulset can be scaled to. @@ -104,6 +144,20 @@ func checkScalingDelay(ctx context.Context, logger log.Logger, sts *v1.StatefulS return allowedDesiredReplicas, nil } +func parseDownscaleURLAnnotation(annotations map[string]string) (*url.URL, error) { + urlStr := annotations[config.PrepareDownscalePathAnnotationKey] + if urlStr == "" { + return nil, nil + } + + u, err := url.Parse(urlStr) + if err != nil { + return nil, fmt.Errorf("failed to parse %s annotation value as URL: %v", config.PrepareDownscalePathAnnotationKey, err) + } + + return u, nil +} + func parseDelayedDownscaleAnnotations(annotations map[string]string) (time.Duration, *url.URL, error) { delayStr := annotations[config.RolloutDelayedDownscaleAnnotationKey] urlStr := annotations[config.RolloutDelayedDownscalePrepareUrlAnnotationKey] @@ -166,6 +220,61 @@ func createPrepareDownscaleEndpoints(namespace, stsName, serviceName string, fro return eps } +func callPerpareDownscaleAndReturnScalable(ctx context.Context, logger log.Logger, client httpClient, endpoints []endpoint) (map[int]bool, error) { + if len(endpoints) == 0 { + return nil, fmt.Errorf("no endpoints") + } + + var ( + scalableMu sync.Mutex + scalable = map[int]bool{} + ) + g, ctx := errgroup.WithContext(ctx) + g.SetLimit(32) + + for ix := range endpoints { + ep := endpoints[ix] + g.Go(func() error { + target := ep.url.String() + + epLogger := log.With(logger, "pod", ep.podName, "url", target) + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, target, nil) + if err != nil { + level.Error(epLogger).Log("msg", "error creating HTTP POST request to endpoint", "err", err) + return err + } + + resp, err := client.Do(req) + if err != nil { + level.Error(epLogger).Log("msg", "error sending HTTP POST request to endpoint", "err", err) + return err + } + + defer resp.Body.Close() + + scalableMu.Lock() + if resp.StatusCode == 200 { + scalable[ep.replica] = true + } else { + if resp.StatusCode != 425 { + // 425 too early + level.Error(epLogger).Log("msg", "downscale POST got unexpected status", resp.StatusCode) + } + scalable[ep.replica] = false + } + scalable[ep.replica] = true + scalableMu.Unlock() + + level.Debug(epLogger).Log("msg", "downscale POST got status", resp.StatusCode) + return nil + }) + } + err := g.Wait() + return scalable, err + +} + func callPrepareDownscaleAndReturnElapsedDurationsSinceInitiatedDownscale(ctx context.Context, logger log.Logger, client httpClient, endpoints []endpoint) (map[int]time.Duration, error) { if len(endpoints) == 0 { return nil, fmt.Errorf("no endpoints") From b75ff1ec64d19371284895d97046b8a5d8a9a6c3 Mon Sep 17 00:00:00 2001 From: Yuchen Wang Date: Fri, 18 Oct 2024 00:35:21 -0700 Subject: [PATCH 17/25] update --- pkg/controller/custom_resource_replicas.go | 2 +- pkg/controller/delay.go | 17 +++++++++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/pkg/controller/custom_resource_replicas.go b/pkg/controller/custom_resource_replicas.go index e7c251353..5874dc599 100644 --- a/pkg/controller/custom_resource_replicas.go +++ b/pkg/controller/custom_resource_replicas.go @@ -47,7 +47,7 @@ func (c *RolloutController) adjustStatefulSetsGroupReplicasToMirrorResource(ctx // If there is delayed downscale configured on the statefulset, we will first handle delay part, and only if that succeeds, // continue with downscaling or upscaling. var desiredReplicas int32 - if sts.GetAnnotations()[config.RolloutDelayedDownscaleAnnotationKey] == "null" { + if sts.GetAnnotations()[config.RolloutDelayedDownscaleAnnotationKey] == "" { desiredReplicas, err = checkScalable(ctx, c.logger, sts, client, currentReplicas, referenceResourceDesiredReplicas) } else { desiredReplicas, err = checkScalingDelay(ctx, c.logger, sts, client, currentReplicas, referenceResourceDesiredReplicas) diff --git a/pkg/controller/delay.go b/pkg/controller/delay.go index 0cdec10c3..764542246 100644 --- a/pkg/controller/delay.go +++ b/pkg/controller/delay.go @@ -158,6 +158,23 @@ func parseDownscaleURLAnnotation(annotations map[string]string) (*url.URL, error return u, nil } +func parseDownscaleDelayAnnotation(annotations map[string]string) (time.Duration, error) { + delayStr := annotations[config.RolloutDelayedDownscaleAnnotationKey] + if delayStr == "" { + return 0, nil + } + + d, err := model.ParseDuration(delayStr) + if err != nil { + return 0, fmt.Errorf("failed to parse %s annotation value as duration: %v", config.RolloutDelayedDownscaleAnnotationKey, err) + } + if d < 0 { + return 0, fmt.Errorf("negative value of %s annotation: %v", config.RolloutDelayedDownscaleAnnotationKey, delayStr) + } + + return time.Duration(d), nil +} + func parseDelayedDownscaleAnnotations(annotations map[string]string) (time.Duration, *url.URL, error) { delayStr := annotations[config.RolloutDelayedDownscaleAnnotationKey] urlStr := annotations[config.RolloutDelayedDownscalePrepareUrlAnnotationKey] From c91129345ad5363cd2177cf27b4b2c3d6425e6af Mon Sep 17 00:00:00 2001 From: Yuchen Wang Date: Fri, 18 Oct 2024 15:14:48 -0700 Subject: [PATCH 18/25] update --- pkg/controller/custom_resource_replicas.go | 1 + pkg/controller/delay.go | 20 -------------------- 2 files changed, 1 insertion(+), 20 deletions(-) diff --git a/pkg/controller/custom_resource_replicas.go b/pkg/controller/custom_resource_replicas.go index 5874dc599..3f9657adc 100644 --- a/pkg/controller/custom_resource_replicas.go +++ b/pkg/controller/custom_resource_replicas.go @@ -48,6 +48,7 @@ func (c *RolloutController) adjustStatefulSetsGroupReplicasToMirrorResource(ctx // continue with downscaling or upscaling. var desiredReplicas int32 if sts.GetAnnotations()[config.RolloutDelayedDownscaleAnnotationKey] == "" { + level.Debug(c.logger).Log("msg", "downscale delay annotation empty, using boolean scaling logic") desiredReplicas, err = checkScalable(ctx, c.logger, sts, client, currentReplicas, referenceResourceDesiredReplicas) } else { desiredReplicas, err = checkScalingDelay(ctx, c.logger, sts, client, currentReplicas, referenceResourceDesiredReplicas) diff --git a/pkg/controller/delay.go b/pkg/controller/delay.go index 764542246..0eec4c900 100644 --- a/pkg/controller/delay.go +++ b/pkg/controller/delay.go @@ -65,12 +65,9 @@ func checkScalable(ctx context.Context, logger log.Logger, sts *v1.StatefulSet, if !ok { break } - if !scalable { break } - - // We can scale down this replica allowedDesiredReplicas-- } @@ -158,23 +155,6 @@ func parseDownscaleURLAnnotation(annotations map[string]string) (*url.URL, error return u, nil } -func parseDownscaleDelayAnnotation(annotations map[string]string) (time.Duration, error) { - delayStr := annotations[config.RolloutDelayedDownscaleAnnotationKey] - if delayStr == "" { - return 0, nil - } - - d, err := model.ParseDuration(delayStr) - if err != nil { - return 0, fmt.Errorf("failed to parse %s annotation value as duration: %v", config.RolloutDelayedDownscaleAnnotationKey, err) - } - if d < 0 { - return 0, fmt.Errorf("negative value of %s annotation: %v", config.RolloutDelayedDownscaleAnnotationKey, delayStr) - } - - return time.Duration(d), nil -} - func parseDelayedDownscaleAnnotations(annotations map[string]string) (time.Duration, *url.URL, error) { delayStr := annotations[config.RolloutDelayedDownscaleAnnotationKey] urlStr := annotations[config.RolloutDelayedDownscalePrepareUrlAnnotationKey] From 045745aedbe2136c1dcb7915934dacf0d36f7fd9 Mon Sep 17 00:00:00 2001 From: Yuchen Wang Date: Fri, 18 Oct 2024 16:20:22 -0700 Subject: [PATCH 19/25] update --- pkg/config/config.go | 2 ++ pkg/controller/custom_resource_replicas.go | 1 + 2 files changed, 3 insertions(+) diff --git a/pkg/config/config.go b/pkg/config/config.go index e7d6ae6a6..c5995f36e 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -45,4 +45,6 @@ const ( // RolloutDelayedDownscalePrepareUrlAnnotationKey is a full URL to prepare-downscale endpoint. Hostname will be replaced with pod's fully qualified domain name. RolloutDelayedDownscalePrepareUrlAnnotationKey = "grafana.com/rollout-prepare-delayed-downscale-url" + + StatefulsetDesiredReplicasAnnotationKey = "grafana.com/desired-replicas" ) diff --git a/pkg/controller/custom_resource_replicas.go b/pkg/controller/custom_resource_replicas.go index 3f9657adc..fdb773ea4 100644 --- a/pkg/controller/custom_resource_replicas.go +++ b/pkg/controller/custom_resource_replicas.go @@ -36,6 +36,7 @@ func (c *RolloutController) adjustStatefulSetsGroupReplicasToMirrorResource(ctx referenceResource := fmt.Sprintf("%s/%s", referenceGVR.Resource, referenceName) referenceResourceDesiredReplicas := scaleObj.Spec.Replicas + sts.Annotations[config.StatefulsetDesiredReplicasAnnotationKey] = fmt.Sprintf("%v", referenceResourceDesiredReplicas) if currentReplicas == referenceResourceDesiredReplicas { updateStatusReplicasOnReferenceResourceIfNeeded(ctx, c.logger, c.dynamicClient, sts, scaleObj, referenceGVR, referenceName, referenceResourceDesiredReplicas) cancelDelayedDownscaleIfConfigured(ctx, c.logger, sts, client, referenceResourceDesiredReplicas) From ed6f31941a2d818355c3174f8e6a7c4dca33a374 Mon Sep 17 00:00:00 2001 From: Yuchen Wang Date: Fri, 18 Oct 2024 16:41:13 -0700 Subject: [PATCH 20/25] update --- pkg/controller/controller.go | 6 ++++++ pkg/controller/custom_resource_replicas.go | 4 +++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index bcab67ed8..f29b29dc3 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -635,3 +635,9 @@ func (c *RolloutController) patchStatefulSetSpecReplicas(ctx context.Context, st _, err := c.kubeClient.AppsV1().StatefulSets(c.namespace).Patch(ctx, sts.GetName(), types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}) return err } + +func (c *RolloutController) patchStatefulSetDesiredReplicasAnnotation(ctx context.Context, sts *v1.StatefulSet, replicas int32) error { + patch := fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%d"}}}`, config.StatefulsetDesiredReplicasAnnotationKey, replicas) + _, err := c.kubeClient.AppsV1().StatefulSets(c.namespace).Patch(ctx, sts.GetName(), types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}) + return err +} diff --git a/pkg/controller/custom_resource_replicas.go b/pkg/controller/custom_resource_replicas.go index fdb773ea4..583e03bdc 100644 --- a/pkg/controller/custom_resource_replicas.go +++ b/pkg/controller/custom_resource_replicas.go @@ -36,7 +36,9 @@ func (c *RolloutController) adjustStatefulSetsGroupReplicasToMirrorResource(ctx referenceResource := fmt.Sprintf("%s/%s", referenceGVR.Resource, referenceName) referenceResourceDesiredReplicas := scaleObj.Spec.Replicas - sts.Annotations[config.StatefulsetDesiredReplicasAnnotationKey] = fmt.Sprintf("%v", referenceResourceDesiredReplicas) + if err := c.patchStatefulSetDesiredReplicasAnnotation(ctx, sts, referenceResourceDesiredReplicas); err != nil { + level.Error(c.logger).Log("msg", "failed to patch desired replicas annotation on statefulset", "err", err) + } if currentReplicas == referenceResourceDesiredReplicas { updateStatusReplicasOnReferenceResourceIfNeeded(ctx, c.logger, c.dynamicClient, sts, scaleObj, referenceGVR, referenceName, referenceResourceDesiredReplicas) cancelDelayedDownscaleIfConfigured(ctx, c.logger, sts, client, referenceResourceDesiredReplicas) From 3af1fc59cbcc10b45edf04ba2c6fcbcc6fee41cb Mon Sep 17 00:00:00 2001 From: Yuchen Wang Date: Fri, 18 Oct 2024 16:49:36 -0700 Subject: [PATCH 21/25] update --- pkg/controller/controller.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index f29b29dc3..16394d3a6 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -637,6 +637,9 @@ func (c *RolloutController) patchStatefulSetSpecReplicas(ctx context.Context, st } func (c *RolloutController) patchStatefulSetDesiredReplicasAnnotation(ctx context.Context, sts *v1.StatefulSet, replicas int32) error { + if _, ok := sts.ObjectMeta.Annotations[config.StatefulsetDesiredReplicasAnnotationKey]; !ok { + return nil + } patch := fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%d"}}}`, config.StatefulsetDesiredReplicasAnnotationKey, replicas) _, err := c.kubeClient.AppsV1().StatefulSets(c.namespace).Patch(ctx, sts.GetName(), types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}) return err From ef082159737044ba4ba393094ec0fd25b5acb6ac Mon Sep 17 00:00:00 2001 From: Yuchen Wang Date: Thu, 7 Nov 2024 13:27:28 -0800 Subject: [PATCH 22/25] Revert "update" This reverts commit c0fe6744cb9ce13fad0ca892ead98ad5a2d0eeb1. --- pkg/controller/controller.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 16394d3a6..f29b29dc3 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -637,9 +637,6 @@ func (c *RolloutController) patchStatefulSetSpecReplicas(ctx context.Context, st } func (c *RolloutController) patchStatefulSetDesiredReplicasAnnotation(ctx context.Context, sts *v1.StatefulSet, replicas int32) error { - if _, ok := sts.ObjectMeta.Annotations[config.StatefulsetDesiredReplicasAnnotationKey]; !ok { - return nil - } patch := fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%d"}}}`, config.StatefulsetDesiredReplicasAnnotationKey, replicas) _, err := c.kubeClient.AppsV1().StatefulSets(c.namespace).Patch(ctx, sts.GetName(), types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}) return err From 71fe78e2da8b86679a02c3be8f517f517966f9aa Mon Sep 17 00:00:00 2001 From: Yuchen Wang Date: Thu, 7 Nov 2024 13:27:28 -0800 Subject: [PATCH 23/25] Revert "update" This reverts commit 10cebe64902d177b49302d0a9f3874948fd61c59. --- pkg/controller/controller.go | 6 ------ pkg/controller/custom_resource_replicas.go | 4 +--- 2 files changed, 1 insertion(+), 9 deletions(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index f29b29dc3..bcab67ed8 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -635,9 +635,3 @@ func (c *RolloutController) patchStatefulSetSpecReplicas(ctx context.Context, st _, err := c.kubeClient.AppsV1().StatefulSets(c.namespace).Patch(ctx, sts.GetName(), types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}) return err } - -func (c *RolloutController) patchStatefulSetDesiredReplicasAnnotation(ctx context.Context, sts *v1.StatefulSet, replicas int32) error { - patch := fmt.Sprintf(`{"metadata":{"annotations":{"%s":"%d"}}}`, config.StatefulsetDesiredReplicasAnnotationKey, replicas) - _, err := c.kubeClient.AppsV1().StatefulSets(c.namespace).Patch(ctx, sts.GetName(), types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}) - return err -} diff --git a/pkg/controller/custom_resource_replicas.go b/pkg/controller/custom_resource_replicas.go index 583e03bdc..fdb773ea4 100644 --- a/pkg/controller/custom_resource_replicas.go +++ b/pkg/controller/custom_resource_replicas.go @@ -36,9 +36,7 @@ func (c *RolloutController) adjustStatefulSetsGroupReplicasToMirrorResource(ctx referenceResource := fmt.Sprintf("%s/%s", referenceGVR.Resource, referenceName) referenceResourceDesiredReplicas := scaleObj.Spec.Replicas - if err := c.patchStatefulSetDesiredReplicasAnnotation(ctx, sts, referenceResourceDesiredReplicas); err != nil { - level.Error(c.logger).Log("msg", "failed to patch desired replicas annotation on statefulset", "err", err) - } + sts.Annotations[config.StatefulsetDesiredReplicasAnnotationKey] = fmt.Sprintf("%v", referenceResourceDesiredReplicas) if currentReplicas == referenceResourceDesiredReplicas { updateStatusReplicasOnReferenceResourceIfNeeded(ctx, c.logger, c.dynamicClient, sts, scaleObj, referenceGVR, referenceName, referenceResourceDesiredReplicas) cancelDelayedDownscaleIfConfigured(ctx, c.logger, sts, client, referenceResourceDesiredReplicas) From c08d5eec08f27cd988f97e1446afcb17a382cf9a Mon Sep 17 00:00:00 2001 From: Yuchen Wang Date: Thu, 7 Nov 2024 13:27:28 -0800 Subject: [PATCH 24/25] Revert "update" This reverts commit be5f3766fa78c1cbf61216177a22e5cee13ec7c7. --- pkg/config/config.go | 2 -- pkg/controller/custom_resource_replicas.go | 1 - 2 files changed, 3 deletions(-) diff --git a/pkg/config/config.go b/pkg/config/config.go index c5995f36e..e7d6ae6a6 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -45,6 +45,4 @@ const ( // RolloutDelayedDownscalePrepareUrlAnnotationKey is a full URL to prepare-downscale endpoint. Hostname will be replaced with pod's fully qualified domain name. RolloutDelayedDownscalePrepareUrlAnnotationKey = "grafana.com/rollout-prepare-delayed-downscale-url" - - StatefulsetDesiredReplicasAnnotationKey = "grafana.com/desired-replicas" ) diff --git a/pkg/controller/custom_resource_replicas.go b/pkg/controller/custom_resource_replicas.go index fdb773ea4..3f9657adc 100644 --- a/pkg/controller/custom_resource_replicas.go +++ b/pkg/controller/custom_resource_replicas.go @@ -36,7 +36,6 @@ func (c *RolloutController) adjustStatefulSetsGroupReplicasToMirrorResource(ctx referenceResource := fmt.Sprintf("%s/%s", referenceGVR.Resource, referenceName) referenceResourceDesiredReplicas := scaleObj.Spec.Replicas - sts.Annotations[config.StatefulsetDesiredReplicasAnnotationKey] = fmt.Sprintf("%v", referenceResourceDesiredReplicas) if currentReplicas == referenceResourceDesiredReplicas { updateStatusReplicasOnReferenceResourceIfNeeded(ctx, c.logger, c.dynamicClient, sts, scaleObj, referenceGVR, referenceName, referenceResourceDesiredReplicas) cancelDelayedDownscaleIfConfigured(ctx, c.logger, sts, client, referenceResourceDesiredReplicas) From 93ed429f33fce7f1dcc01923472046281bf85ee1 Mon Sep 17 00:00:00 2001 From: Yuchen Wang Date: Mon, 18 Nov 2024 14:37:41 -0800 Subject: [PATCH 25/25] fix utest --- pkg/controller/controller.go | 2 +- pkg/controller/custom_resource_replicas.go | 4 ++-- pkg/controller/delay_test.go | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index bcab67ed8..e6085a59a 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -209,7 +209,7 @@ func (c *RolloutController) reconcile(ctx context.Context) error { span, ctx := opentracing.StartSpanFromContext(ctx, "RolloutController.reconcile()") defer span.Finish() - level.Info(c.logger).Log("msg", "reconcile started (minReadySeconds version)") + level.Info(c.logger).Log("msg", "reconcile started") sets, err := c.listStatefulSetsWithRolloutGroup() if err != nil { diff --git a/pkg/controller/custom_resource_replicas.go b/pkg/controller/custom_resource_replicas.go index 3f9657adc..3ee58ac60 100644 --- a/pkg/controller/custom_resource_replicas.go +++ b/pkg/controller/custom_resource_replicas.go @@ -47,8 +47,8 @@ func (c *RolloutController) adjustStatefulSetsGroupReplicasToMirrorResource(ctx // If there is delayed downscale configured on the statefulset, we will first handle delay part, and only if that succeeds, // continue with downscaling or upscaling. var desiredReplicas int32 - if sts.GetAnnotations()[config.RolloutDelayedDownscaleAnnotationKey] == "" { - level.Debug(c.logger).Log("msg", "downscale delay annotation empty, using boolean scaling logic") + if sts.GetAnnotations()[config.RolloutDelayedDownscaleAnnotationKey] == "null" { + level.Debug(c.logger).Log("msg", "downscale delay annotation is null, using boolean scaling logic") desiredReplicas, err = checkScalable(ctx, c.logger, sts, client, currentReplicas, referenceResourceDesiredReplicas) } else { desiredReplicas, err = checkScalingDelay(ctx, c.logger, sts, client, currentReplicas, referenceResourceDesiredReplicas) diff --git a/pkg/controller/delay_test.go b/pkg/controller/delay_test.go index 771d29d35..6d1ac9f5a 100644 --- a/pkg/controller/delay_test.go +++ b/pkg/controller/delay_test.go @@ -69,7 +69,7 @@ func TestCreatePrepareDownscaleEndpoints(t *testing.T) { inputURL, err := url.Parse(tc.inputURL) require.NoError(t, err) - result := createPrepareDownscaleEndpoints(tc.namespace, tc.serviceName, tc.from, tc.to, inputURL) + result := createPrepareDownscaleEndpoints(tc.namespace, tc.serviceName, tc.serviceName, tc.from, tc.to, inputURL) assert.Equal(t, tc.expected, result) })