Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Merge pull request #131 from sofastack/batch_confirm
Browse files Browse the repository at this point in the history
Feature: support batch confirm & GrayTime & useBeta
  • Loading branch information
gold300jin authored Sep 28, 2023
2 parents af3b8e9 + c5fdd1f commit c201977
Show file tree
Hide file tree
Showing 10 changed files with 234 additions and 76 deletions.
2 changes: 2 additions & 0 deletions module-controller/api/v1alpha1/moduledeployment_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ type ReleaseStatus struct {

// Last time the release transitioned from one status to another.
LastTransitionTime metav1.Time `json:"lastTransitionTime,omitempty"`

NextReconcileTime metav1.Time `json:"nextReconcileTime,omitempty"`
}

type ModuleDeploymentCondition struct {
Expand Down
1 change: 1 addition & 0 deletions module-controller/api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion module-controller/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ package main

import (
"flag"
"go.uber.org/zap/zapcore"
"os"

"go.uber.org/zap/zapcore"

"github.com/sofastack/sofa-serverless/internal/controller"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
Expand All @@ -30,6 +31,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"

ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,9 @@ spec:
to another.
format: date-time
type: string
nextReconcileTime:
format: date-time
type: string
progress:
description: The phase current release reach
type: string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ spec:
url: http://serverless-opensource.oss-cn-shanghai.aliyuncs.com/module-packages/stable/dynamic-provider-1.0.0-ark-biz.jar
replicas: 1
operationStrategy:
needConfirm: true
needConfirm: false
grayTimeBetweenBatchSeconds: 120
useBeta: false
batchCount: 1
schedulingStrategy:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,22 @@ package controller

import (
"context"
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"

"github.com/sofastack/sofa-serverless/api/v1alpha1"
"github.com/sofastack/sofa-serverless/internal/constants/finalizer"
"github.com/sofastack/sofa-serverless/internal/constants/label"
"github.com/sofastack/sofa-serverless/internal/utils"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"time"
)

var _ = Describe("Module Controller", func() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,23 @@ import (
"strconv"
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"

moduledeploymentv1alpha1 "github.com/sofastack/sofa-serverless/api/v1alpha1"
"github.com/sofastack/sofa-serverless/internal/constants/finalizer"
"github.com/sofastack/sofa-serverless/internal/constants/label"
"github.com/sofastack/sofa-serverless/internal/utils"

v1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/pointer"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

moduledeploymentv1alpha1 "github.com/sofastack/sofa-serverless/api/v1alpha1"
"github.com/sofastack/sofa-serverless/internal/constants/finalizer"
"github.com/sofastack/sofa-serverless/internal/constants/label"
"github.com/sofastack/sofa-serverless/internal/utils"
)

// ModuleDeploymentReconciler reconciles a ModuleDeployment object
Expand Down Expand Up @@ -111,15 +110,7 @@ func (r *ModuleDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Req
return ctrl.Result{}, err
}
case moduledeploymentv1alpha1.ModuleDeploymentReleaseProgressExecuting:
// update moduleReplicaSet
enqueue, err := r.updateModuleReplicaSet(ctx, moduleDeployment, newRS)
if err != nil {
return ctrl.Result{}, err
}
if enqueue {
requeueAfter := utils.GetNextReconcileTime(time.Now())
return ctrl.Result{RequeueAfter: requeueAfter}, nil
}
return r.updateModuleReplicaSet(ctx, moduleDeployment, newRS)
case moduledeploymentv1alpha1.ModuleDeploymentReleaseProgressCompleted:
if moduleDeployment.Spec.Replicas != newRS.Spec.Replicas {
moduleDeployment.Status.ReleaseStatus.Progress = moduledeploymentv1alpha1.ModuleDeploymentReleaseProgressExecuting
Expand All @@ -128,13 +119,29 @@ func (r *ModuleDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Req
return ctrl.Result{}, err
}
}

if !moduleVersionChanged && isUrlChange(moduleDeployment.Spec.Template.Spec.Module, newRS.Spec.Template.Spec.Module) {
newRS.Spec.Template.Spec.Module = moduleDeployment.Spec.Template.Spec.Module
if err := r.Client.Update(ctx, newRS); err != nil {
return ctrl.Result{}, err
}
}
case moduledeploymentv1alpha1.ModuleDeploymentReleaseProgressWaitingForConfirmation:
moduleDeployment.Spec.Pause = true
if err := r.Update(ctx, moduleDeployment); err != nil {
return ctrl.Result{}, err
}

moduleDeployment.Status.ReleaseStatus.Progress = moduledeploymentv1alpha1.ModuleDeploymentReleaseProgressPaused
if err := r.Status().Update(ctx, moduleDeployment); err != nil {
return ctrl.Result{}, err
}
case moduledeploymentv1alpha1.ModuleDeploymentReleaseProgressPaused:
if !moduleDeployment.Spec.Pause && time.Since(moduleDeployment.Status.ReleaseStatus.NextReconcileTime.Time) >= 0 {
moduleDeployment.Status.ReleaseStatus.Progress = moduledeploymentv1alpha1.ModuleDeploymentReleaseProgressExecuting
if err := r.Status().Update(ctx, moduleDeployment); err != nil {
return ctrl.Result{}, err
}
}
}

// update moduleDeployment owner reference
Expand Down Expand Up @@ -302,7 +309,7 @@ func (r *ModuleDeploymentReconciler) updateModuleReplicas(
}

func (r *ModuleDeploymentReconciler) updateModuleReplicaSet(ctx context.Context, moduleDeployment *moduledeploymentv1alpha1.ModuleDeployment,
newRS *moduledeploymentv1alpha1.ModuleReplicaSet) (bool, error) {
newRS *moduledeploymentv1alpha1.ModuleReplicaSet) (ctrl.Result, error) {
var (
batchCount = moduleDeployment.Spec.OperationStrategy.BatchCount
curBatch = moduleDeployment.Status.ReleaseStatus.CurrentBatch
Expand All @@ -311,6 +318,7 @@ func (r *ModuleDeploymentReconciler) updateModuleReplicaSet(ctx context.Context,
expReplicas = moduleDeployment.Spec.Replicas
deltaReplicas = expReplicas - newRS.Spec.Replicas
)

if deltaReplicas == 0 {
moduleDeployment.Status.ReleaseStatus.Progress = moduledeploymentv1alpha1.ModuleDeploymentReleaseProgressCompleted
moduleDeployment.Status.ReleaseStatus.LastTransitionTime = metav1.Now()
Expand All @@ -320,7 +328,7 @@ func (r *ModuleDeploymentReconciler) updateModuleReplicaSet(ctx context.Context,
LastTransitionTime: metav1.Now(),
Message: "deployment release progress completed",
})
return false, r.Status().Update(ctx, moduleDeployment)
return ctrl.Result{}, r.Status().Update(ctx, moduleDeployment)
}

if expReplicas < batchCount {
Expand All @@ -334,12 +342,13 @@ func (r *ModuleDeploymentReconciler) updateModuleReplicaSet(ctx context.Context,
// wait moduleReplicaset ready
if newRS.Spec.Replicas != curReplicas {
log.Log.Info(fmt.Sprintf("newRs is not ready, expect replicas %v, but got %v", newRS.Spec.Replicas, curReplicas))
return true, nil
return ctrl.Result{Requeue: true, RequeueAfter: utils.GetNextReconcileTime(time.Now())}, nil
}

replicas := int32(0)
// use beta strategy
if batchCount != 1 && curBatch == 1 && moduleDeployment.Spec.OperationStrategy.UseBeta {
useBeta := batchCount != 1 && curBatch == 1 && moduleDeployment.Spec.OperationStrategy.UseBeta
if useBeta {
replicas = 1
} else if curBatch == batchCount { // if it's the last batch
replicas = expReplicas
Expand All @@ -349,20 +358,45 @@ func (r *ModuleDeploymentReconciler) updateModuleReplicaSet(ctx context.Context,

err := r.updateModuleReplicas(ctx, replicas, moduleDeployment, newRS)
if err != nil {
return false, err
return ctrl.Result{}, err
}

var message string
now := metav1.Now()

if useBeta {
moduleDeployment.Status.ReleaseStatus.CurrentBatch = 1
message = "deployment release: beta deployment"
} else {
moduleDeployment.Status.ReleaseStatus.CurrentBatch += 1
message = fmt.Sprintf("deployment release: curbatch %v, batchCount %v", curBatch, batchCount)
}

moduleDeployment.Status.ReleaseStatus.CurrentBatch += 1
moduleDeployment.Status.ReleaseStatus.LastTransitionTime = now
moduleDeployment.Status.ReleaseStatus.Progress = moduledeploymentv1alpha1.ModuleDeploymentReleaseProgressExecuting
moduleDeployment.Status.ReleaseStatus.LastTransitionTime = metav1.Now()

moduleDeployment.Status.Conditions = append(moduleDeployment.Status.Conditions, moduledeploymentv1alpha1.ModuleDeploymentCondition{
Type: moduledeploymentv1alpha1.DeploymentProgressing,
Status: corev1.ConditionTrue,
LastTransitionTime: metav1.Now(),
Message: fmt.Sprintf("deployment release: curbatch %v, batchCount %v", curBatch, batchCount),
LastTransitionTime: now,
Message: message,
})

return false, r.Status().Update(ctx, moduleDeployment)
var grayTime int
if curBatch != batchCount {
if moduleDeployment.Spec.OperationStrategy.NeedConfirm { // use NeedConfirm Strategy
moduleDeployment.Status.ReleaseStatus.Progress = moduledeploymentv1alpha1.ModuleDeploymentReleaseProgressWaitingForConfirmation
} else if grayTime = int(moduleDeployment.Spec.OperationStrategy.GrayTimeBetweenBatchSeconds); grayTime != 0 {
if curBatch == batchCount {
moduleDeployment.Status.ReleaseStatus.Progress = moduledeploymentv1alpha1.ModuleDeploymentReleaseProgressExecuting
} else {
moduleDeployment.Status.ReleaseStatus.NextReconcileTime = metav1.NewTime(now.Add(time.Duration(grayTime) * time.Second))
moduleDeployment.Status.ReleaseStatus.Progress = moduledeploymentv1alpha1.ModuleDeploymentReleaseProgressPaused
}
}
}

return ctrl.Result{Requeue: true, RequeueAfter: time.Duration(grayTime) * time.Second}, r.Status().Update(ctx, moduleDeployment)
}

// generate module replicas
Expand Down
Loading

0 comments on commit c201977

Please sign in to comment.