Skip to content

Commit

Permalink
Update rollout lib
Browse files Browse the repository at this point in the history
Signed-off-by: melserngawy <[email protected]>
  • Loading branch information
serngawy committed Sep 11, 2023
1 parent cf1ead4 commit bd8457a
Show file tree
Hide file tree
Showing 3 changed files with 678 additions and 437 deletions.
196 changes: 125 additions & 71 deletions cluster/v1alpha1/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,10 @@ const (
Skip
)

// ClusterRolloutStatusFunc defines a function to return the rollout status for a managed cluster.
type ClusterRolloutStatusFunc func(clusterName string) ClusterRolloutStatus

// ClusterRolloutStatus holds the rollout status information for a cluster.
type ClusterRolloutStatus struct {
// cluster name
ClusterName string
// GroupKey represents the cluster group key (optional field).
GroupKey clusterv1beta1.GroupKey
// Status is the required field indicating the rollout status.
Expand All @@ -55,24 +54,31 @@ type ClusterRolloutStatus struct {

// RolloutResult contains the clusters to be rolled out and the clusters that have timed out.
type RolloutResult struct {
// ClustersToRollout is a map where the key is the cluster name and the value is the ClusterRolloutStatus.
ClustersToRollout map[string]ClusterRolloutStatus
// ClustersTimeOut is a map where the key is the cluster name and the value is the ClusterRolloutStatus.
ClustersTimeOut map[string]ClusterRolloutStatus
// ClustersToRollout is a slice of ClusterRolloutStatus that will be rolled out.
ClustersToRollout []ClusterRolloutStatus
// ClustersTimeOut is a slice of ClusterRolloutStatus that are timedout.
ClustersTimeOut []ClusterRolloutStatus
// ClustersRemoved is a slice of ClusterRolloutStatus that are removed.
// TODO: implement removed clusters
ClustersRemoved []ClusterRolloutStatus
}

// ClusterRolloutStatusFunc defines a function to return the rollout status for a managed cluster.
type ClusterRolloutStatusFunc[T any] func(clusterName string, workload T) (ClusterRolloutStatus, error)

// +k8s:deepcopy-gen=false
type RolloutHandler struct {
type RolloutHandler[T any] struct {
// placement decision tracker
pdTracker *clusterv1beta1.PlacementDecisionClustersTracker
pdTracker *clusterv1beta1.PlacementDecisionClustersTracker
statusFunc ClusterRolloutStatusFunc[T]
}

func NewRolloutHandler(pdTracker *clusterv1beta1.PlacementDecisionClustersTracker) (*RolloutHandler, error) {
func NewRolloutHandler[T any](pdTracker *clusterv1beta1.PlacementDecisionClustersTracker, statusFunc ClusterRolloutStatusFunc[T]) (*RolloutHandler[T], error) {
if pdTracker == nil {
return nil, fmt.Errorf("invalid placement decision tracker %v", pdTracker)
}

return &RolloutHandler{pdTracker: pdTracker}, nil
return &RolloutHandler[T]{pdTracker: pdTracker, statusFunc: statusFunc}, nil
}

// The input is a duck type RolloutStrategy and a ClusterRolloutStatusFunc to return the rollout status on each managed cluster.
Expand All @@ -83,20 +89,21 @@ func NewRolloutHandler(pdTracker *clusterv1beta1.PlacementDecisionClustersTracke
//
// ClustersTimeOut: If the cluster status is Progressing or Failed, and the status lasts longer than timeout defined in strategy,
// will list them RolloutResult.ClustersTimeOut with status TimeOut.
func (r *RolloutHandler) GetRolloutCluster(rolloutStrategy RolloutStrategy, statusFunc ClusterRolloutStatusFunc) (*RolloutStrategy, RolloutResult, error) {
// func (r *RolloutHandler) GetRolloutCluster(rolloutStrategy RolloutStrategy, statusFunc ClusterRolloutStatusFunc) (*RolloutStrategy, RolloutResult, error) {
func (r *RolloutHandler[T]) GetRolloutCluster(rolloutStrategy RolloutStrategy, existingClusterStatus []ClusterRolloutStatus) (*RolloutStrategy, RolloutResult, error) {
switch rolloutStrategy.Type {
case All:
return r.getRolloutAllClusters(rolloutStrategy, statusFunc)
return r.getRolloutAllClusters(rolloutStrategy, existingClusterStatus)
case Progressive:
return r.getProgressiveClusters(rolloutStrategy, statusFunc)
return r.getProgressiveClusters(rolloutStrategy, existingClusterStatus)
case ProgressivePerGroup:
return r.getProgressivePerGroupClusters(rolloutStrategy, statusFunc)
return r.getProgressivePerGroupClusters(rolloutStrategy, existingClusterStatus)
default:
return nil, RolloutResult{}, fmt.Errorf("incorrect rollout strategy type %v", rolloutStrategy.Type)
}
}

func (r *RolloutHandler) getRolloutAllClusters(rolloutStrategy RolloutStrategy, statusFunc ClusterRolloutStatusFunc) (*RolloutStrategy, RolloutResult, error) {
func (r *RolloutHandler[T]) getRolloutAllClusters(rolloutStrategy RolloutStrategy, existingClusterStatus []ClusterRolloutStatus) (*RolloutStrategy, RolloutResult, error) {
// Prepare the rollout strategy
strategy := RolloutStrategy{Type: All}
strategy.All = rolloutStrategy.All.DeepCopy()
Expand All @@ -113,35 +120,38 @@ func (r *RolloutHandler) getRolloutAllClusters(rolloutStrategy RolloutStrategy,
// Get all clusters and perform progressive rollout
totalClusterGroups := r.pdTracker.ExistingClusterGroupsBesides()
totalClusters := totalClusterGroups.GetClusters().UnsortedList()
rolloutResult := progressivePerCluster(totalClusterGroups, len(totalClusters), failureTimeout, statusFunc)
rolloutResult := progressivePerCluster(totalClusterGroups, len(totalClusters), failureTimeout, existingClusterStatus)

return &strategy, rolloutResult, nil
}

func (r *RolloutHandler) getProgressiveClusters(rolloutStrategy RolloutStrategy, statusFunc ClusterRolloutStatusFunc) (*RolloutStrategy, RolloutResult, error) {
func (r *RolloutHandler[T]) getProgressiveClusters(rolloutStrategy RolloutStrategy, existingClusterStatus []ClusterRolloutStatus) (*RolloutStrategy, RolloutResult, error) {
// Prepare the rollout strategy
strategy := RolloutStrategy{Type: Progressive}
strategy.Progressive = rolloutStrategy.Progressive.DeepCopy()
if strategy.Progressive == nil {
strategy.Progressive = &RolloutProgressive{}
}

// Upgrade mandatory decision groups first
groupKeys := decisionGroupsToGroupKeys(strategy.Progressive.MandatoryDecisionGroups.MandatoryDecisionGroups)
clusterGroups := r.pdTracker.ExistingClusterGroups(groupKeys...)

// Perform progressive rollout for mandatory decision groups
rolloutResult := progressivePerGroup(clusterGroups, maxTimeDuration, statusFunc)
if len(rolloutResult.ClustersToRollout) > 0 {
return &strategy, rolloutResult, nil
}

// Parse timeout for non-mandatory decision groups
failureTimeout, err := parseTimeout(strategy.Progressive.Timeout.Timeout)
if err != nil {
return &strategy, RolloutResult{}, err
}

// Upgrade mandatory decision groups first
groupKeys := decisionGroupsToGroupKeys(strategy.Progressive.MandatoryDecisionGroups.MandatoryDecisionGroups)
clusterGroups := r.pdTracker.ExistingClusterGroups(groupKeys...)

// Perform progressive rollOut for mandatory decision groups first.
if len(clusterGroups) > 0 {
rolloutResult := progressivePerGroup(clusterGroups, failureTimeout, existingClusterStatus)
//fmt.Println("progressivePerGroup ", rolloutResult.ClustersToRollout)
if len(rolloutResult.ClustersToRollout) > 0 || len(rolloutResult.ClustersTimeOut) > 0 {
return &strategy, rolloutResult, nil
}
}

// Calculate the length for progressive rollout
totalClusters := r.pdTracker.ExistingClusterGroupsBesides().GetClusters()
length, err := calculateLength(strategy.Progressive.MaxConcurrency, len(totalClusters))
Expand All @@ -151,47 +161,52 @@ func (r *RolloutHandler) getProgressiveClusters(rolloutStrategy RolloutStrategy,

// Upgrade the remaining clusters
restClusterGroups := r.pdTracker.ExistingClusterGroupsBesides(clusterGroups.GetOrderedGroupKeys()...)
rolloutResult = progressivePerCluster(restClusterGroups, length, failureTimeout, statusFunc)
rolloutResult := progressivePerCluster(restClusterGroups, length, failureTimeout, existingClusterStatus)

return &strategy, rolloutResult, nil
}

func (r *RolloutHandler) getProgressivePerGroupClusters(rolloutStrategy RolloutStrategy, statusFunc ClusterRolloutStatusFunc) (*RolloutStrategy, RolloutResult, error) {
func (r *RolloutHandler[T]) getProgressivePerGroupClusters(rolloutStrategy RolloutStrategy, existingClusterStatus []ClusterRolloutStatus) (*RolloutStrategy, RolloutResult, error) {
// Prepare the rollout strategy
strategy := RolloutStrategy{Type: ProgressivePerGroup}
strategy.ProgressivePerGroup = rolloutStrategy.ProgressivePerGroup.DeepCopy()
if strategy.ProgressivePerGroup == nil {
strategy.ProgressivePerGroup = &RolloutProgressivePerGroup{}
}

// Parse timeout for non-mandatory decision groups
failureTimeout, err := parseTimeout(strategy.ProgressivePerGroup.Timeout.Timeout)
if err != nil {
return &strategy, RolloutResult{}, err
}

// Upgrade mandatory decision groups first
mandatoryDecisionGroups := strategy.ProgressivePerGroup.MandatoryDecisionGroups.MandatoryDecisionGroups
groupKeys := decisionGroupsToGroupKeys(mandatoryDecisionGroups)
clusterGroups := r.pdTracker.ExistingClusterGroups(groupKeys...)

// Perform progressive rollout per group for mandatory decision groups
rolloutResult := progressivePerGroup(clusterGroups, maxTimeDuration, statusFunc)
if len(rolloutResult.ClustersToRollout) > 0 {
return &strategy, rolloutResult, nil
}
// Perform progressive rollout per group for mandatory decision groups first
if len(clusterGroups) > 0 {
rolloutResult := progressivePerGroup(clusterGroups, failureTimeout, existingClusterStatus)

// Parse timeout for non-mandatory decision groups
failureTimeout, err := parseTimeout(strategy.ProgressivePerGroup.Timeout.Timeout)
if err != nil {
return &strategy, RolloutResult{}, err
if len(rolloutResult.ClustersToRollout) > 0 || len(rolloutResult.ClustersTimeOut) > 0 {
return &strategy, rolloutResult, nil
}
}

// Upgrade the rest of the decision groups
restClusterGroups := r.pdTracker.ExistingClusterGroupsBesides(clusterGroups.GetOrderedGroupKeys()...)

// Perform progressive rollout per group for the remaining decision groups
rolloutResult = progressivePerGroup(restClusterGroups, failureTimeout, statusFunc)
rolloutResult := progressivePerGroup(restClusterGroups, failureTimeout, existingClusterStatus)

return &strategy, rolloutResult, nil
}

func progressivePerCluster(clusterGroupsMap clusterv1beta1.ClusterGroupsMap, length int, timeout time.Duration, statusFunc ClusterRolloutStatusFunc) RolloutResult {
rolloutClusters := map[string]ClusterRolloutStatus{}
timeoutClusters := map[string]ClusterRolloutStatus{}
func progressivePerCluster(clusterGroupsMap clusterv1beta1.ClusterGroupsMap, length int, timeout time.Duration, existingClusterStatus []ClusterRolloutStatus) RolloutResult {
rolloutClusters := []ClusterRolloutStatus{}
timeoutClusters := []ClusterRolloutStatus{}
existingClusters := make(map[string]bool)

if length == 0 {
return RolloutResult{
Expand All @@ -200,28 +215,45 @@ func progressivePerCluster(clusterGroupsMap clusterv1beta1.ClusterGroupsMap, len
}
}

clusters := clusterGroupsMap.GetClusters().UnsortedList()
clusterToGroupKey := clusterGroupsMap.ClusterToGroupKey()
for _, status := range existingClusterStatus {
if status.ClusterName == "" {
continue
}

// Sort the clusters in alphabetical order to ensure consistency.
sort.Strings(clusters)
for _, cluster := range clusters {
status := statusFunc(cluster)
if groupKey, exists := clusterToGroupKey[cluster]; exists {
status.GroupKey = groupKey
existingClusters[status.ClusterName] = true
if status.Status == Succeeded || status.Status == TimeOut {
continue
}

newStatus, needToRollout := determineRolloutStatusAndContinue(status, timeout)
status.Status = newStatus.Status
status.TimeOutTime = newStatus.TimeOutTime

if status.Status == TimeOut {
timeoutClusters = append(timeoutClusters, status)
continue
}
if needToRollout {
rolloutClusters[cluster] = status
rolloutClusters = append(rolloutClusters, status)
}
if status.Status == TimeOut {
timeoutClusters[cluster] = status
}

clusters := clusterGroupsMap.GetClusters().UnsortedList()
clusterToGroupKey := clusterGroupsMap.ClusterToGroupKey()
// Sort the clusters in alphabetical order to ensure consistency.
sort.Strings(clusters)
for _, cluster := range clusters {
if existingClusters[cluster] {
continue
}

status := ClusterRolloutStatus{
ClusterName: cluster,
Status: ToApply,
GroupKey: clusterToGroupKey[cluster],
}
rolloutClusters = append(rolloutClusters, status)

if len(rolloutClusters)%length == 0 && len(rolloutClusters) > 0 {
return RolloutResult{
ClustersToRollout: rolloutClusters,
Expand All @@ -236,32 +268,54 @@ func progressivePerCluster(clusterGroupsMap clusterv1beta1.ClusterGroupsMap, len
}
}

func progressivePerGroup(clusterGroupsMap clusterv1beta1.ClusterGroupsMap, timeout time.Duration, statusFunc ClusterRolloutStatusFunc) RolloutResult {
rolloutClusters := map[string]ClusterRolloutStatus{}
timeoutClusters := map[string]ClusterRolloutStatus{}
func progressivePerGroup(clusterGroupsMap clusterv1beta1.ClusterGroupsMap, timeout time.Duration, existingClusterStatus []ClusterRolloutStatus) RolloutResult {
rolloutClusters := []ClusterRolloutStatus{}
timeoutClusters := []ClusterRolloutStatus{}
existingClusters := make(map[string]bool)

clusterGroupKeys := clusterGroupsMap.GetOrderedGroupKeys()
for _, status := range existingClusterStatus {
if status.ClusterName == "" {
continue
}
existingClusters[status.ClusterName] = true

if status.Status == Succeeded || status.Status == TimeOut {
continue
}

newStatus, needToRollout := determineRolloutStatusAndContinue(status, timeout)
status.Status = newStatus.Status
status.TimeOutTime = newStatus.TimeOutTime

if status.Status == TimeOut {
timeoutClusters = append(timeoutClusters, status)
continue
}
if needToRollout {
rolloutClusters = append(rolloutClusters, status)
}
}

clusterGroupKeys := clusterGroupsMap.GetOrderedGroupKeys()
for _, key := range clusterGroupKeys {
if subclusters, ok := clusterGroupsMap[key]; ok {
// Iterate through clusters in the group
for _, cluster := range subclusters.UnsortedList() {
status := statusFunc(cluster)
status.GroupKey = key

newStatus, needToRollout := determineRolloutStatusAndContinue(status, timeout)
status.Status = newStatus.Status
status.TimeOutTime = newStatus.TimeOutTime

if needToRollout {
rolloutClusters[cluster] = status
clusters := subclusters.UnsortedList()
sort.Strings(clusters)
for _, cluster := range clusters {
if existingClusters[cluster] {
continue
}
if status.Status == TimeOut {
timeoutClusters[cluster] = status

status := ClusterRolloutStatus{
ClusterName: cluster,
Status: ToApply,
GroupKey: key,
}
rolloutClusters = append(rolloutClusters, status)
}

// Return if there are clusters to rollout
// As it is perGroup Return if there are clusters to rollout
if len(rolloutClusters) > 0 {
return RolloutResult{
ClustersToRollout: rolloutClusters,
Expand Down
Loading

0 comments on commit bd8457a

Please sign in to comment.