Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Deletion Interval Options between pod restart #1

Merged
merged 3 commits into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/rollout-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type config struct {

useZoneTracker bool
zoneTrackerConfigMapName string
deletionInterval time.Duration
}

func (cfg *config) register(fs *flag.FlagSet) {
Expand All @@ -88,6 +89,7 @@ func (cfg *config) register(fs *flag.FlagSet) {

fs.BoolVar(&cfg.useZoneTracker, "use-zone-tracker", false, "Use the zone tracker to prevent simultaneous downscales in different zones")
fs.StringVar(&cfg.zoneTrackerConfigMapName, "zone-tracker.config-map-name", "rollout-operator-zone-tracker", "The name of the ConfigMap to use for the zone tracker")
fs.DurationVar(&cfg.deletionInterval, "deletion-interval", 5*time.Minute, "time to wait before actually terminating the pod")
}

func (cfg config) validate() error {
Expand Down Expand Up @@ -171,7 +173,7 @@ func main() {
maybeStartTLSServer(cfg, logger, kubeClient, restart, metrics)

// Init the controller.
c := controller.NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, cfg.kubeNamespace, httpClient, cfg.reconcileInterval, reg, logger)
c := controller.NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, cfg.kubeNamespace, httpClient, cfg.reconcileInterval, cfg.deletionInterval, reg, logger)
check(errors.Wrap(c.Init(), "failed to init controller"))

// Listen to sigterm, as well as for restart (like for certificate renewal).
Expand Down
1 change: 1 addition & 0 deletions integration/manifests_rollout_operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func rolloutOperatorDeployment(namespace string, webhook bool) *appsv1.Deploymen
fmt.Sprintf("-kubernetes.namespace=%s", namespace),
"-reconcile.interval=1s",
"-log.level=debug",
"-deletion-interval=0s",
}
if webhook {
args = append(args,
Expand Down
45 changes: 44 additions & 1 deletion pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ type RolloutController struct {
// Used to signal when the controller should stop.
stopCh chan struct{}

//deletion interval related
deletionInterval time.Duration
//TODO: convert this to a map of rollout group to deletion ready name
deletionReadyTime time.Time

// Metrics.
groupReconcileTotal *prometheus.CounterVec
groupReconcileFailed *prometheus.CounterVec
Expand All @@ -77,7 +82,7 @@ type RolloutController struct {
discoveredGroups map[string]struct{}
}

func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTMapper, scaleClient scale.ScalesGetter, dynamic dynamic.Interface, namespace string, client httpClient, reconcileInterval time.Duration, reg prometheus.Registerer, logger log.Logger) *RolloutController {
func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTMapper, scaleClient scale.ScalesGetter, dynamic dynamic.Interface, namespace string, client httpClient, reconcileInterval time.Duration, deletionInterval time.Duration, reg prometheus.Registerer, logger log.Logger) *RolloutController {
namespaceOpt := informers.WithNamespace(namespace)

// Initialise the StatefulSet informer to restrict the returned StatefulSets to only the ones
Expand Down Expand Up @@ -109,6 +114,7 @@ func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTM
logger: logger,
stopCh: make(chan struct{}),
discoveredGroups: map[string]struct{}{},
deletionInterval: deletionInterval,
groupReconcileTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "rollout_operator_group_reconciles_total",
Help: "Total number of reconciles started for a specific rollout group.",
Expand Down Expand Up @@ -288,6 +294,17 @@ func (c *RolloutController) reconcileStatefulSetsGroup(ctx context.Context, grou
}
}

//if the cluster is stable, and deletion ready time is not set, then set the deletion ready time
if len(notReadySets) == 0 && c.deletionReadyTime.IsZero() {
c.deletionReadyTime = time.Now().Add(c.deletionInterval)
level.Info(c.logger).Log("msg", "sts group healthy, setting deletion ready time if empty ", "deletionReadyTime", c.deletionReadyTime)

//reset deletionReadyTime since the cluster is not ready anymore
} else if len(notReadySets) != 0 && !c.deletionReadyTime.IsZero() {
c.deletionReadyTime = time.Time{}
level.Info(c.logger).Log("msg", "sts group unhealthy, reset non-empty deletion ready time ", "deletionReadyTime", c.deletionReadyTime)
}

christopherzli marked this conversation as resolved.
Show resolved Hide resolved
// Ensure there are not 2+ StatefulSets with not-Ready pods. If there are, we shouldn't proceed
// rolling out pods and we should wait until these pods are Ready. The reason is that if there are
// unavailable pods in multiple StatefulSets, this could lead to an outage, so we want pods to
Expand Down Expand Up @@ -518,10 +535,26 @@ func (c *RolloutController) updateStatefulSetPods(ctx context.Context, sts *v1.S
return true, nil
}

deletionHappened := false

now := time.Now()
// Check if deletionReadyTime is set
if c.deletionReadyTime.IsZero() {
// If not set, schedule deletion by setting deletionReadyTime to now + deletionInterval
c.deletionReadyTime = now.Add(c.deletionInterval)
level.Info(c.logger).Log("msg", "Scheduled future deletion for sts pods", "sts", sts.Name, "deletionReadyTime", c.deletionReadyTime)
}

if now.Before(c.deletionReadyTime) {
level.Info(c.logger).Log("msg", "Waiting deletion ready before deleting pod", "sts", sts.Name, "deletionReadyTime", c.deletionReadyTime)
return true, nil // Not yet time to delete; skip this loop iteration
}

for _, pod := range podsToUpdate[:numPods] {
// Skip if the pod is terminating. Since "Terminating" is not a pod Phase, we can infer it by checking
// if the pod is in the Running phase but the deletionTimestamp has been set (kubectl does something
// similar too).

if pod.Status.Phase == corev1.PodRunning && pod.DeletionTimestamp != nil {
level.Debug(c.logger).Log("msg", fmt.Sprintf("waiting for pod %s to be terminated", pod.Name))
continue
Expand All @@ -530,7 +563,17 @@ func (c *RolloutController) updateStatefulSetPods(ctx context.Context, sts *v1.S
level.Info(c.logger).Log("msg", fmt.Sprintf("terminating pod %s", pod.Name))
if err := c.kubeClient.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}); err != nil {
return false, errors.Wrapf(err, "failed to delete pod %s", pod.Name)
} else {
deletionHappened = true
level.Info(c.logger).Log("msg", fmt.Sprintf("pod %s successfully terminated", pod.Name), "deletionReadyTime", c.deletionReadyTime)
}

}

//make sure no other pods can be deleted
if deletionHappened {
c.deletionReadyTime = time.Time{}
level.Info(c.logger).Log("msg", "reset deletion ready time since deletion just happened")
}

return true, nil
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ func TestRolloutController_Reconcile(t *testing.T) {

// Create the controller and start informers.
reg := prometheus.NewPedanticRegistry()
c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, nil, 5*time.Second, reg, log.NewNopLogger())
c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, nil, 5*time.Second, 0, reg, log.NewNopLogger())
require.NoError(t, c.Init())
defer c.Stop()

Expand Down Expand Up @@ -825,7 +825,7 @@ func TestRolloutController_ReconcileStatefulsetWithDownscaleDelay(t *testing.T)

// Create the controller and start informers.
reg := prometheus.NewPedanticRegistry()
c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, httpClient, 5*time.Second, reg, log.NewNopLogger())
c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, httpClient, 5*time.Second, 0, reg, log.NewNopLogger())
require.NoError(t, c.Init())
defer c.Stop()

Expand Down Expand Up @@ -928,7 +928,7 @@ func TestRolloutController_ReconcileShouldDeleteMetricsForDecommissionedRolloutG

// Create the controller and start informers.
reg := prometheus.NewPedanticRegistry()
c := NewRolloutController(kubeClient, nil, nil, nil, testNamespace, nil, 5*time.Second, reg, log.NewNopLogger())
c := NewRolloutController(kubeClient, nil, nil, nil, testNamespace, nil, 5*time.Second, 0, reg, log.NewNopLogger())
require.NoError(t, c.Init())
defer c.Stop()

Expand Down