Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scale down #8

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## main / unreleased

## v0.20.1

* [BUGFIX] Improved handling of URL ports in `createPrepareDownscaleEndpoints` function. The function now correctly preserves the port when replacing the host in the URL. #176

## v0.20.0

* [ENHANCEMENT] Updated dependencies, including: #174
Expand Down
1 change: 0 additions & 1 deletion integration/manifests_mock_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ func createMockServiceZone(t *testing.T, ctx context.Context, api *kubernetes.Cl
_, err := api.AppsV1().StatefulSets(namespace).Create(ctx, mockServiceStatefulSet(name, "1", true), metav1.CreateOptions{})
require.NoError(t, err, "Can't create StatefulSet")
}

{
_, err := api.CoreV1().Services(namespace).Create(ctx, mockServiceService(name), metav1.CreateOptions{})
require.NoError(t, err, "Can't create Service")
Expand Down
18 changes: 15 additions & 3 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -498,12 +498,18 @@ func (c *RolloutController) updateStatefulSetPods(ctx context.Context, sts *v1.S

if len(podsToUpdate) > 0 {
maxUnavailable := getMaxUnavailableForStatefulSet(sts, c.logger)
numNotReady := int(sts.Status.Replicas - sts.Status.ReadyReplicas)
var numNotAvailable int
if sts.Spec.MinReadySeconds > 0 {
level.Info(c.logger).Log("msg", "StatefulSet has minReadySeconds set, waiting before terminating pods", "statefulset", sts.Name, "min_ready_seconds", sts.Spec.MinReadySeconds)
numNotAvailable = int(sts.Status.Replicas - sts.Status.AvailableReplicas)
} else {
numNotAvailable = int(sts.Status.Replicas - sts.Status.ReadyReplicas)
}

// Compute the number of pods we should update, honoring the configured maxUnavailable.
numPods := max(0, min(
maxUnavailable-numNotReady, // No more than the configured maxUnavailable (including not-Ready pods).
len(podsToUpdate), // No more than the total number of pods that need to be updated.
maxUnavailable-numNotAvailable, // No more than the configured maxUnavailable (including not-Ready pods).
len(podsToUpdate), // No more than the total number of pods that need to be updated.
))

if numPods == 0 {
Expand All @@ -513,6 +519,7 @@ func (c *RolloutController) updateStatefulSetPods(ctx context.Context, sts *v1.S
"pods_to_update", len(podsToUpdate),
"replicas", sts.Status.Replicas,
"ready_replicas", sts.Status.ReadyReplicas,
"available_replicas", sts.Status.AvailableReplicas,
"max_unavailable", maxUnavailable)

return true, nil
Expand Down Expand Up @@ -546,6 +553,11 @@ func (c *RolloutController) updateStatefulSetPods(ctx context.Context, sts *v1.S
"statefulset", sts.Name)

return true, nil
} else if sts.Spec.MinReadySeconds > 0 && sts.Status.Replicas != sts.Status.AvailableReplicas {
level.Info(c.logger).Log(
"msg", "StatefulSet pods are all updated and ready but StatefulSet has some not-Available replicas",
"statefulset", sts.Name)
return true, nil
}

// At this point there are no pods to update, so we can update the currentRevision in the StatefulSet.
Expand Down
18 changes: 11 additions & 7 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ func TestRolloutController_Reconcile(t *testing.T) {
mockStatefulSet("ingester-zone-b", withPrevRevision(), func(sts *v1.StatefulSet) {
sts.Status.Replicas = 3
sts.Status.ReadyReplicas = 2
sts.Status.AvailableReplicas = 2
}),
},
pods: []runtime.Object{
Expand All @@ -183,6 +184,7 @@ func TestRolloutController_Reconcile(t *testing.T) {
mockStatefulSet("ingester-zone-b", withPrevRevision(), func(sts *v1.StatefulSet) {
sts.Status.Replicas = 3
sts.Status.ReadyReplicas = 1
sts.Status.AvailableReplicas = 1
}),
},
pods: []runtime.Object{
Expand Down Expand Up @@ -616,7 +618,7 @@ func TestRolloutController_Reconcile(t *testing.T) {

// Create the controller and start informers.
reg := prometheus.NewPedanticRegistry()
c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, nil, 5*time.Second, reg, log.NewNopLogger())
c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, nil, 0, reg, log.NewNopLogger())
require.NoError(t, c.Init())
defer c.Stop()

Expand Down Expand Up @@ -925,7 +927,7 @@ func TestRolloutController_ReconcileStatefulsetWithDownscaleDelay(t *testing.T)

// Create the controller and start informers.
reg := prometheus.NewPedanticRegistry()
c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, httpClient, 5*time.Second, reg, log.NewNopLogger())
c := NewRolloutController(kubeClient, restMapper, scaleClient, dynamicClient, testNamespace, httpClient, 0, reg, log.NewNopLogger())
require.NoError(t, c.Init())
defer c.Stop()

Expand Down Expand Up @@ -1028,7 +1030,7 @@ func TestRolloutController_ReconcileShouldDeleteMetricsForDecommissionedRolloutG

// Create the controller and start informers.
reg := prometheus.NewPedanticRegistry()
c := NewRolloutController(kubeClient, nil, nil, nil, testNamespace, nil, 5*time.Second, reg, log.NewNopLogger())
c := NewRolloutController(kubeClient, nil, nil, nil, testNamespace, nil, 0, reg, log.NewNopLogger())
require.NoError(t, c.Init())
defer c.Stop()

Expand Down Expand Up @@ -1110,10 +1112,11 @@ func mockStatefulSet(name string, overrides ...func(sts *v1.StatefulSet)) *v1.St
},
},
Status: v1.StatefulSetStatus{
Replicas: 3,
ReadyReplicas: 3,
CurrentRevision: testLastRevisionHash,
UpdateRevision: testLastRevisionHash,
Replicas: 3,
ReadyReplicas: 3,
AvailableReplicas: 3,
CurrentRevision: testLastRevisionHash,
UpdateRevision: testLastRevisionHash,
},
}

Expand Down Expand Up @@ -1167,6 +1170,7 @@ func withReplicas(totalReplicas, readyReplicas int32) func(sts *v1.StatefulSet)
sts.Spec.Replicas = &totalReplicas
sts.Status.Replicas = totalReplicas
sts.Status.ReadyReplicas = readyReplicas
sts.Status.AvailableReplicas = readyReplicas
}
}

Expand Down
8 changes: 7 additions & 1 deletion pkg/controller/custom_resource_replicas.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,13 @@ func (c *RolloutController) adjustStatefulSetsGroupReplicasToMirrorResource(ctx
// We're going to change number of replicas on the statefulset.
// If there is delayed downscale configured on the statefulset, we will first handle delay part, and only if that succeeds,
// continue with downscaling or upscaling.
desiredReplicas, err := checkScalingDelay(ctx, c.logger, sts, client, currentReplicas, referenceResourceDesiredReplicas)
var desiredReplicas int32
if sts.GetAnnotations()[config.RolloutDelayedDownscaleAnnotationKey] == "null" {
level.Debug(c.logger).Log("msg", "downscale delay annotation is null, using boolean scaling logic")
desiredReplicas, err = checkScalable(ctx, c.logger, sts, client, currentReplicas, referenceResourceDesiredReplicas)
} else {
desiredReplicas, err = checkScalingDelay(ctx, c.logger, sts, client, currentReplicas, referenceResourceDesiredReplicas)
}
if err != nil {
level.Warn(c.logger).Log("msg", "not scaling statefulset due to failed scaling delay check",
"group", groupName,
Expand Down
131 changes: 124 additions & 7 deletions pkg/controller/delay.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@ import (
"github.com/grafana/rollout-operator/pkg/config"
)

func getStsSvcName(sts *v1.StatefulSet) string {
if sts.Spec.ServiceName != "" {
return sts.Spec.ServiceName
}
return sts.GetName()
}

func cancelDelayedDownscaleIfConfigured(ctx context.Context, logger log.Logger, sts *v1.StatefulSet, httpClient httpClient, replicas int32) {
delay, prepareURL, err := parseDelayedDownscaleAnnotations(sts.GetAnnotations())
if delay == 0 || prepareURL == nil {
Expand All @@ -31,11 +38,48 @@ func cancelDelayedDownscaleIfConfigured(ctx context.Context, logger log.Logger,
return
}

endpoints := createPrepareDownscaleEndpoints(sts.Namespace, sts.GetName(), 0, int(replicas), prepareURL)
endpoints := createPrepareDownscaleEndpoints(sts.Namespace, sts.GetName(), getStsSvcName(sts), 0, int(replicas), prepareURL)

callCancelDelayedDownscale(ctx, logger, httpClient, endpoints)
}

func checkScalable(ctx context.Context, logger log.Logger, sts *v1.StatefulSet, httpClient httpClient, currentReplicas, desiredReplicas int32) (updatedDesiredReplicas int32, _ error) {
if desiredReplicas >= currentReplicas {
return desiredReplicas, nil
}

prepareURL, err := parseDownscaleURLAnnotation(sts.GetAnnotations())
if prepareURL == nil || err != nil {
return currentReplicas, err
}
downscaleEndpoints := createPrepareDownscaleEndpoints(sts.Namespace, sts.GetName(), getStsSvcName(sts), int(desiredReplicas), int(currentReplicas), prepareURL)
scalableBooleans, err := callPerpareDownscaleAndReturnScalable(ctx, logger, httpClient, downscaleEndpoints)
if err != nil {
return currentReplicas, fmt.Errorf("failed prepare pods for delayed downscale: %v", err)
}

// Find how many pods from the end of statefulset we can already scale down
allowedDesiredReplicas := currentReplicas
for replica := currentReplicas - 1; replica >= desiredReplicas; replica-- {
scalable, ok := scalableBooleans[int(replica)]
if !ok {
break
}
if !scalable {
break
}
allowedDesiredReplicas--
}

if allowedDesiredReplicas == currentReplicas {
return currentReplicas, fmt.Errorf("downscale not possible for any pods at the end of statefulset replicas range")
}

// We can proceed with downscale on at least one pod.
level.Info(logger).Log("msg", "downscale possible on some pods", "name", sts.GetName(), "originalDesiredReplicas", desiredReplicas, "allowedDesiredReplicas", allowedDesiredReplicas)
return allowedDesiredReplicas, nil
}

// Checks if downscale delay has been reached on replicas in [desiredReplicas, currentReplicas) range.
// If there is a range of replicas at the end of statefulset for which delay has been reached, this function
// returns updated desired replicas that statefulset can be scaled to.
Expand All @@ -54,19 +98,19 @@ func checkScalingDelay(ctx context.Context, logger log.Logger, sts *v1.StatefulS
}

if desiredReplicas >= currentReplicas {
callCancelDelayedDownscale(ctx, logger, httpClient, createPrepareDownscaleEndpoints(sts.Namespace, sts.GetName(), 0, int(currentReplicas), prepareURL))
callCancelDelayedDownscale(ctx, logger, httpClient, createPrepareDownscaleEndpoints(sts.Namespace, sts.GetName(), getStsSvcName(sts), 0, int(currentReplicas), prepareURL))
// Proceed even if calling cancel of delayed downscale fails. We call cancellation repeatedly, so it will happen during next reconcile.
return desiredReplicas, nil
}

{
// Replicas in [0, desired) interval should cancel any delayed downscale, if they have any.
cancelEndpoints := createPrepareDownscaleEndpoints(sts.Namespace, sts.GetName(), 0, int(desiredReplicas), prepareURL)
cancelEndpoints := createPrepareDownscaleEndpoints(sts.Namespace, sts.GetName(), getStsSvcName(sts), 0, int(desiredReplicas), prepareURL)
callCancelDelayedDownscale(ctx, logger, httpClient, cancelEndpoints)
}

// Replicas in [desired, current) interval are going to be stopped.
downscaleEndpoints := createPrepareDownscaleEndpoints(sts.Namespace, sts.GetName(), int(desiredReplicas), int(currentReplicas), prepareURL)
downscaleEndpoints := createPrepareDownscaleEndpoints(sts.Namespace, sts.GetName(), getStsSvcName(sts), int(desiredReplicas), int(currentReplicas), prepareURL)
elapsedTimeSinceDownscaleInitiated, err := callPrepareDownscaleAndReturnElapsedDurationsSinceInitiatedDownscale(ctx, logger, httpClient, downscaleEndpoints)
if err != nil {
return currentReplicas, fmt.Errorf("failed prepare pods for delayed downscale: %v", err)
Expand Down Expand Up @@ -97,6 +141,20 @@ func checkScalingDelay(ctx context.Context, logger log.Logger, sts *v1.StatefulS
return allowedDesiredReplicas, nil
}

func parseDownscaleURLAnnotation(annotations map[string]string) (*url.URL, error) {
urlStr := annotations[config.PrepareDownscalePathAnnotationKey]
if urlStr == "" {
return nil, nil
}

u, err := url.Parse(urlStr)
if err != nil {
return nil, fmt.Errorf("failed to parse %s annotation value as URL: %v", config.PrepareDownscalePathAnnotationKey, err)
}

return u, nil
}

func parseDelayedDownscaleAnnotations(annotations map[string]string) (time.Duration, *url.URL, error) {
delayStr := annotations[config.RolloutDelayedDownscaleAnnotationKey]
urlStr := annotations[config.RolloutDelayedDownscalePrepareUrlAnnotationKey]
Expand Down Expand Up @@ -131,7 +189,7 @@ type endpoint struct {
}

// Create prepare-downscale endpoints for pods with index in [from, to) range. URL is fully reused except for host, which is replaced with pod's FQDN.
func createPrepareDownscaleEndpoints(namespace, serviceName string, from, to int, url *url.URL) []endpoint {
func createPrepareDownscaleEndpoints(namespace, stsName, serviceName string, from, to int, url *url.URL) []endpoint {
eps := make([]endpoint, 0, to-from)

// The DNS entry for a pod of a stateful set is
Expand All @@ -142,19 +200,78 @@ func createPrepareDownscaleEndpoints(namespace, serviceName string, from, to int
for index := from; index < to; index++ {
ep := endpoint{
namespace: namespace,
podName: fmt.Sprintf("%v-%v", serviceName, index),
podName: fmt.Sprintf("%v-%v", stsName, index),
replica: index,
}

ep.url = *url
ep.url.Host = fmt.Sprintf("%s.%v.%v.svc.cluster.local.", ep.podName, serviceName, ep.namespace)
newHost := fmt.Sprintf("%s.%v.%v.svc.cluster.local.", ep.podName, serviceName, ep.namespace)
if url.Port() != "" {
newHost = fmt.Sprintf("%s:%s", newHost, url.Port())
}
ep.url.Host = newHost

eps = append(eps, ep)
}

return eps
}

func callPerpareDownscaleAndReturnScalable(ctx context.Context, logger log.Logger, client httpClient, endpoints []endpoint) (map[int]bool, error) {
if len(endpoints) == 0 {
return nil, fmt.Errorf("no endpoints")
}

var (
scalableMu sync.Mutex
scalable = map[int]bool{}
)
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(32)

for ix := range endpoints {
ep := endpoints[ix]
g.Go(func() error {
target := ep.url.String()

epLogger := log.With(logger, "pod", ep.podName, "url", target)

req, err := http.NewRequestWithContext(ctx, http.MethodPost, target, nil)
if err != nil {
level.Error(epLogger).Log("msg", "error creating HTTP POST request to endpoint", "err", err)
return err
}

resp, err := client.Do(req)
if err != nil {
level.Error(epLogger).Log("msg", "error sending HTTP POST request to endpoint", "err", err)
return err
}

defer resp.Body.Close()

scalableMu.Lock()
if resp.StatusCode == 200 {
scalable[ep.replica] = true
} else {
if resp.StatusCode != 425 {
// 425 too early
level.Error(epLogger).Log("msg", "downscale POST got unexpected status", resp.StatusCode)
}
scalable[ep.replica] = false
}
scalable[ep.replica] = true
scalableMu.Unlock()

level.Debug(epLogger).Log("msg", "downscale POST got status", resp.StatusCode)
return nil
})
}
err := g.Wait()
return scalable, err

}

func callPrepareDownscaleAndReturnElapsedDurationsSinceInitiatedDownscale(ctx context.Context, logger log.Logger, client httpClient, endpoints []endpoint) (map[int]time.Duration, error) {
if len(endpoints) == 0 {
return nil, fmt.Errorf("no endpoints")
Expand Down
Loading