Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PLAT-111199] Terminate Statefulset only if enough pods are available #5

Merged
merged 6 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions cmd/rollout-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ type config struct {

useZoneTracker bool
zoneTrackerConfigMapName string
deletionInterval time.Duration
delayBetweenSts time.Duration
}

func (cfg *config) register(fs *flag.FlagSet) {
Expand All @@ -90,8 +88,6 @@ func (cfg *config) register(fs *flag.FlagSet) {

fs.BoolVar(&cfg.useZoneTracker, "use-zone-tracker", false, "Use the zone tracker to prevent simultaneous downscales in different zones")
fs.StringVar(&cfg.zoneTrackerConfigMapName, "zone-tracker.config-map-name", "rollout-operator-zone-tracker", "The name of the ConfigMap to use for the zone tracker")
fs.DurationVar(&cfg.deletionInterval, "deletion-interval", 5*time.Minute, "[Deprecated]time to wait before actually terminating the pod")
fs.DurationVar(&cfg.delayBetweenSts, "delay-between-sts", 5*time.Minute, "time to wait between stateful sets")
}

func (cfg config) validate() error {
Expand Down Expand Up @@ -175,7 +171,7 @@ func main() {
maybeStartTLSServer(cfg, logger, kubeClient, restart, metrics)

// Init the controller.
c := controller.NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, cfg.kubeNamespace, httpClient, cfg.reconcileInterval, cfg.delayBetweenSts, reg, logger)
c := controller.NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, cfg.kubeNamespace, httpClient, cfg.reconcileInterval, reg, logger)
check(errors.Wrap(c.Init(), "failed to init controller"))

// Listen to sigterm, as well as for restart (like for certificate renewal).
Expand Down
1 change: 0 additions & 1 deletion integration/manifests_mock_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ func createMockServiceZone(t *testing.T, ctx context.Context, api *kubernetes.Cl
_, err := api.AppsV1().StatefulSets(namespace).Create(ctx, mockServiceStatefulSet(name, "1", true), metav1.CreateOptions{})
require.NoError(t, err, "Can't create StatefulSet")
}

{
_, err := api.CoreV1().Services(namespace).Create(ctx, mockServiceService(name), metav1.CreateOptions{})
require.NoError(t, err, "Can't create Service")
Expand Down
1 change: 0 additions & 1 deletion integration/manifests_rollout_operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ func rolloutOperatorDeployment(namespace string, webhook bool) *appsv1.Deploymen
fmt.Sprintf("-kubernetes.namespace=%s", namespace),
"-reconcile.interval=1s",
"-log.level=debug",
"-delay-between-sts=0s",
}
if webhook {
args = append(args,
Expand Down
60 changes: 17 additions & 43 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,6 @@ type RolloutController struct {
// Used to signal when the controller should stop.
stopCh chan struct{}

//deletion interval related
delayBetweenSts time.Duration
lastUpdatedSts *v1.StatefulSet
lastUpdatedTime time.Time

// Metrics.
groupReconcileTotal *prometheus.CounterVec
groupReconcileFailed *prometheus.CounterVec
Expand All @@ -82,7 +77,7 @@ type RolloutController struct {
discoveredGroups map[string]struct{}
}

func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTMapper, scaleClient scale.ScalesGetter, dynamic dynamic.Interface, namespace string, client httpClient, reconcileInterval time.Duration, delayBetweenSts time.Duration, reg prometheus.Registerer, logger log.Logger) *RolloutController {
func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTMapper, scaleClient scale.ScalesGetter, dynamic dynamic.Interface, namespace string, client httpClient, reconcileInterval time.Duration, reg prometheus.Registerer, logger log.Logger) *RolloutController {
namespaceOpt := informers.WithNamespace(namespace)

// Initialise the StatefulSet informer to restrict the returned StatefulSets to only the ones
Expand Down Expand Up @@ -114,9 +109,6 @@ func NewRolloutController(kubeClient kubernetes.Interface, restMapper meta.RESTM
logger: logger,
stopCh: make(chan struct{}),
discoveredGroups: map[string]struct{}{},
delayBetweenSts: delayBetweenSts,
lastUpdatedSts: nil,
lastUpdatedTime: time.Time{},
groupReconcileTotal: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "rollout_operator_group_reconciles_total",
Help: "Total number of reconciles started for a specific rollout group.",
Expand Down Expand Up @@ -217,7 +209,7 @@ func (c *RolloutController) reconcile(ctx context.Context) error {
span, ctx := opentracing.StartSpanFromContext(ctx, "RolloutController.reconcile()")
defer span.Finish()

level.Info(c.logger).Log("msg", "reconcile started")
level.Info(c.logger).Log("msg", "reconcile started (minReadySeconds version)")

sets, err := c.listStatefulSetsWithRolloutGroup()
if err != nil {
Expand Down Expand Up @@ -310,8 +302,6 @@ func (c *RolloutController) reconcileStatefulSetsGroup(ctx context.Context, grou
if len(notReadySets) == 1 {
level.Info(c.logger).Log("msg", "a StatefulSet has some not-Ready pods, reconcile it first", "statefulset", notReadySets[0].Name)
sets = util.MoveStatefulSetToFront(sets, notReadySets[0])
// sts could become not ready by other activities like UKI, we should consider this as last updated sts
c.updateLastUpdatedSts(notReadySets[0])
}

for _, sts := range sets {
Expand All @@ -325,7 +315,6 @@ func (c *RolloutController) reconcileStatefulSetsGroup(ctx context.Context, grou
if ongoing {
// Do not continue with other StatefulSets because this StatefulSet
// update is still ongoing.
c.updateLastUpdatedSts(sts)
return nil
}
}
Expand Down Expand Up @@ -499,20 +488,6 @@ func (c *RolloutController) listPods(sel labels.Selector) ([]*corev1.Pod, error)
return pods, nil
}

func (c *RolloutController) canUpdateSts(sts *v1.StatefulSet) bool {
if c.lastUpdatedSts == nil || c.lastUpdatedSts.Name == sts.Name {
// no need to wait within the same sts updates
return true
}
return time.Since(c.lastUpdatedTime) > c.delayBetweenSts
}

func (c *RolloutController) updateLastUpdatedSts(sts *v1.StatefulSet) {
c.lastUpdatedSts = sts
c.lastUpdatedTime = time.Now()
level.Debug(c.logger).Log("msg", "updated last updated sts", "sts", sts.Name, "time", c.lastUpdatedTime)
}

func (c *RolloutController) updateStatefulSetPods(ctx context.Context, sts *v1.StatefulSet) (bool, error) {
level.Debug(c.logger).Log("msg", "reconciling StatefulSet", "statefulset", sts.Name)

Expand All @@ -522,26 +497,19 @@ func (c *RolloutController) updateStatefulSetPods(ctx context.Context, sts *v1.S
}

if len(podsToUpdate) > 0 {
if !c.canUpdateSts(sts) {
// only check canUpdateSts if the current sts has pods to be updated
level.Info(c.logger).Log("msg", "delaying reconcile between StatefulSets updates",
"curr", sts.Name, "last", c.lastUpdatedSts.Name,
"delay", c.delayBetweenSts, "pods_to_update", len(podsToUpdate))
time.Sleep(c.delayBetweenSts)
// MUST return here:
// 1. pods state could change during the delay period, scan the entire cluster again
// 2. since we didn't actually update current sts, the last updated sts should not be changed
// 3. throw errors to not proceed with other StatefulSets
return false, errors.New("delaying reconcile between StatefulSets updates")
}

maxUnavailable := getMaxUnavailableForStatefulSet(sts, c.logger)
numNotReady := int(sts.Status.Replicas - sts.Status.ReadyReplicas)
var numNotAvailable int
if sts.Spec.MinReadySeconds > 0 {
level.Info(c.logger).Log("msg", "StatefulSet has minReadySeconds set, waiting before terminating pods", "statefulset", sts.Name, "min_ready_seconds", sts.Spec.MinReadySeconds)
numNotAvailable = int(sts.Status.Replicas - sts.Status.AvailableReplicas)
} else {
numNotAvailable = int(sts.Status.Replicas - sts.Status.ReadyReplicas)
}

// Compute the number of pods we should update, honoring the configured maxUnavailable.
numPods := max(0, min(
maxUnavailable-numNotReady, // No more than the configured maxUnavailable (including not-Ready pods).
len(podsToUpdate), // No more than the total number of pods that need to be updated.
maxUnavailable-numNotAvailable, // No more than the configured maxUnavailable (including not-Ready pods).
len(podsToUpdate), // No more than the total number of pods that need to be updated.
))

if numPods == 0 {
Expand All @@ -551,6 +519,7 @@ func (c *RolloutController) updateStatefulSetPods(ctx context.Context, sts *v1.S
"pods_to_update", len(podsToUpdate),
"replicas", sts.Status.Replicas,
"ready_replicas", sts.Status.ReadyReplicas,
"available_replicas", sts.Status.AvailableReplicas,
"max_unavailable", maxUnavailable)

return true, nil
Expand Down Expand Up @@ -584,6 +553,11 @@ func (c *RolloutController) updateStatefulSetPods(ctx context.Context, sts *v1.S
"statefulset", sts.Name)

return true, nil
} else if sts.Spec.MinReadySeconds > 0 && sts.Status.Replicas != sts.Status.AvailableReplicas {
level.Info(c.logger).Log(
"msg", "StatefulSet pods are all updated and ready but StatefulSet has some not-Available replicas",
"statefulset", sts.Name)
return true, nil
}

// At this point there are no pods to update, so we can update the currentRevision in the StatefulSet.
Expand Down
18 changes: 11 additions & 7 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ func TestRolloutController_Reconcile(t *testing.T) {
mockStatefulSet("ingester-zone-b", withPrevRevision(), func(sts *v1.StatefulSet) {
sts.Status.Replicas = 3
sts.Status.ReadyReplicas = 2
sts.Status.AvailableReplicas = 2
}),
},
pods: []runtime.Object{
Expand All @@ -183,6 +184,7 @@ func TestRolloutController_Reconcile(t *testing.T) {
mockStatefulSet("ingester-zone-b", withPrevRevision(), func(sts *v1.StatefulSet) {
sts.Status.Replicas = 3
sts.Status.ReadyReplicas = 1
sts.Status.AvailableReplicas = 1
}),
},
pods: []runtime.Object{
Expand Down Expand Up @@ -540,7 +542,7 @@ func TestRolloutController_Reconcile(t *testing.T) {

// Create the controller and start informers.
reg := prometheus.NewPedanticRegistry()
c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, nil, 5*time.Second, 0, reg, log.NewNopLogger())
c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, nil, 0, reg, log.NewNopLogger())
require.NoError(t, c.Init())
defer c.Stop()

Expand Down Expand Up @@ -825,7 +827,7 @@ func TestRolloutController_ReconcileStatefulsetWithDownscaleDelay(t *testing.T)

// Create the controller and start informers.
reg := prometheus.NewPedanticRegistry()
c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, httpClient, 5*time.Second, 0, reg, log.NewNopLogger())
c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, httpClient, 0, reg, log.NewNopLogger())
require.NoError(t, c.Init())
defer c.Stop()

Expand Down Expand Up @@ -928,7 +930,7 @@ func TestRolloutController_ReconcileShouldDeleteMetricsForDecommissionedRolloutG

// Create the controller and start informers.
reg := prometheus.NewPedanticRegistry()
c := NewRolloutController(kubeClient, nil, nil, nil, testNamespace, nil, 5*time.Second, 0, reg, log.NewNopLogger())
c := NewRolloutController(kubeClient, nil, nil, nil, testNamespace, nil, 0, reg, log.NewNopLogger())
require.NoError(t, c.Init())
defer c.Stop()

Expand Down Expand Up @@ -1010,10 +1012,11 @@ func mockStatefulSet(name string, overrides ...func(sts *v1.StatefulSet)) *v1.St
},
},
Status: v1.StatefulSetStatus{
Replicas: 3,
ReadyReplicas: 3,
CurrentRevision: testLastRevisionHash,
UpdateRevision: testLastRevisionHash,
Replicas: 3,
ReadyReplicas: 3,
AvailableReplicas: 3,
CurrentRevision: testLastRevisionHash,
UpdateRevision: testLastRevisionHash,
},
}

Expand Down Expand Up @@ -1067,6 +1070,7 @@ func withReplicas(totalReplicas, readyReplicas int32) func(sts *v1.StatefulSet)
sts.Spec.Replicas = &totalReplicas
sts.Status.Replicas = totalReplicas
sts.Status.ReadyReplicas = readyReplicas
sts.Status.AvailableReplicas = readyReplicas
}
}

Expand Down