Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ExecutionProgress() method to function interface to report exec progress #2023

Merged
merged 21 commits into from
Oct 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
8aacea2
Add phase level progress fields in actionset object
PrasadG193 Jun 6, 2023
508247c
Change weight based progress calc with func implementation
PrasadG193 Jun 6, 2023
ea1cdd6
Refactor and add unit tests for phase progress
PrasadG193 Jun 16, 2023
35c5578
Update generated deepcopy methods
PrasadG193 Jun 16, 2023
72c2bb6
Fix import cycle issue in test
PrasadG193 Jun 19, 2023
135b55c
Merge branch 'master' into phase-progress-interface
PrasadG193 Jul 17, 2023
b14874f
Merge branch 'master' into phase-progress-interface
PrasadG193 Jul 25, 2023
68a075e
Update pkg/apis/cr/v1alpha1/types.go
PrasadG193 Aug 4, 2023
b0e53a5
Update pkg/apis/cr/v1alpha1/types.go
PrasadG193 Aug 4, 2023
8da47fa
Update pkg/apis/cr/v1alpha1/types.go
PrasadG193 Aug 4, 2023
090539b
Update pkg/apis/cr/v1alpha1/types.go
PrasadG193 Aug 4, 2023
caff247
Merge branch 'master' into phase-progress-interface
PrasadG193 Aug 4, 2023
5529275
Use correct action index while updating action phase state
PrasadG193 Aug 11, 2023
65db94a
Merge branch 'master' into phase-progress-interface
pavannd1 Sep 13, 2023
afd882d
Merge branch 'master' into phase-progress-interface
PrasadG193 Sep 20, 2023
83d12c9
Merge branch 'master' into phase-progress-interface
PrasadG193 Sep 26, 2023
5822e8d
Implement ExecutionProgress method on Kanister functions (#2117)
PrasadG193 Sep 26, 2023
6603d94
Implement ExecProgress on test function
PrasadG193 Sep 27, 2023
d1aa60f
Remove unused mockPhase in test
PrasadG193 Sep 27, 2023
22eaade
Remove unnecessary whitespace in updateActionProgress func
PrasadG193 Oct 3, 2023
73445d4
Merge branch 'master' into phase-progress-interface
mergify[bot] Oct 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion pkg/apis/cr/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,8 @@ type ActionStatus struct {
DeferPhase Phase `json:"deferPhase,omitempty"`
}

// ActionProgress provides information on the progress of an action.
// ActionProgress provides information on the combined progress
// of all the phases in the action.
type ActionProgress struct {
// RunningPhase represents which phase of the action is being run
RunningPhase string `json:"runningPhase,omitempty"`
Expand Down Expand Up @@ -187,6 +188,28 @@ type Phase struct {
State State `json:"state"`
// Output is the map of output artifacts produced by the Blueprint phase.
Output map[string]interface{} `json:"output,omitempty"`
// Progress represents the phase execution progress.
Progress PhaseProgress `json:"progress,omitempty"`
PrasadG193 marked this conversation as resolved.
Show resolved Hide resolved
PrasadG193 marked this conversation as resolved.
Show resolved Hide resolved
}

// PhaseProgress represents the execution state of the phase.
type PhaseProgress struct {
// ProgressPercent represents the execution progress in percentage.
ProgressPercent string `json:"progressPercent,omitempty"`
// SizeUploadedB represents the size of data uploaded in Bytes at a given time during phase execution.
// This field will be empty for phases which do not involve data movement.
SizeUploadedB int64 `json:"sizeUploadedB,omitempty"`
// EstimatedUploadSizeB represents the total estimated size of data in Bytes
// that will be uploaded during the phase execution.
// This field will be empty for phases which do not involve data movement.
EstimatedUploadSizeB int64 `json:"estinatedUploadSizeB,omitempty"`
// EstimatedTimeSeconds is the estimated time required in seconds to upload the
// remaining data estimated with EstimatedUploadSizeB.
// This field will be empty for phases which do not involve data movement.
EstimatedTimeSeconds int64 `json:"estinatedTimeSeconds,omitempty"`
// LastTransitionTime represents the last date time when the progress status
// was received.
LastTransitionTime *metav1.Time `json:"lastTransitionTime,omitempty"`
}

// k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down
20 changes: 20 additions & 0 deletions pkg/apis/cr/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion pkg/blueprint/validate/validate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,9 @@ func blueprint() *crv1alpha1.Blueprint {
}
}

type nonDefaultVersionFunc struct{}
type nonDefaultVersionFunc struct {
progressPercent string
}

func (nd *nonDefaultVersionFunc) Name() string {
return "NonDefaultVersionFunc"
Expand All @@ -453,9 +455,15 @@ func (nd *nonDefaultVersionFunc) Arguments() []string {
}

func (nd *nonDefaultVersionFunc) Exec(context.Context, param.TemplateParams, map[string]interface{}) (map[string]interface{}, error) {
nd.progressPercent = "0"
defer func() { nd.progressPercent = "100" }()
return nil, nil
}

func (nd *nonDefaultVersionFunc) ExecutionProgress() (crv1alpha1.PhaseProgress, error) {
return crv1alpha1.PhaseProgress{ProgressPercent: nd.progressPercent}, nil
}

var _ kanister.Func = (*nonDefaultVersionFunc)(nil)

func init() {
Expand Down
40 changes: 29 additions & 11 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,14 +394,6 @@ func (c *Controller) handleActionSet(ctx context.Context, t *tomb.Tomb, as *crv1
}
}

go func() {
// progress update is computed on a best-effort basis.
// if it exits with error, we will just log it.
if err := progress.TrackActionsProgress(ctx, c.crClient, as.GetName(), as.GetNamespace()); err != nil {
log.Error().WithError(err)
}
}()

for i, a := range as.Status.Actions {
var bp *crv1alpha1.Blueprint
if bp, err = c.crClient.CrV1alpha1().Blueprints(as.GetNamespace()).Get(ctx, a.Blueprint, v1.GetOptions{}); err != nil {
Expand Down Expand Up @@ -471,7 +463,7 @@ func (c *Controller) runAction(ctx context.Context, t *tomb.Tomb, as *crv1alpha1
defer func() {
var deferErr error
if deferPhase != nil {
c.updateActionSetRunningPhase(ctx, as, deferPhase.Name())
c.updateActionSetRunningPhase(ctx, aIDX, as, deferPhase.Name())
deferErr = c.executeDeferPhase(ctx, deferPhase, tp, bp, action.Name, aIDX, as)
}
// render artifacts only if all the phases are run successfully
Expand All @@ -490,11 +482,22 @@ func (c *Controller) runAction(ctx context.Context, t *tomb.Tomb, as *crv1alpha1
var output map[string]interface{}
var msg string
if err == nil {
c.updateActionSetRunningPhase(ctx, as, p.Name())
c.updateActionSetRunningPhase(ctx, aIDX, as, p.Name())
progressTrackCtx, doneProgressTrack := context.WithCancel(ctx)
defer doneProgressTrack()
go func() {
// progress update is computed on a best-effort basis.
// if it exits with error, we will just log it.
if err := progress.UpdateActionSetsProgress(progressTrackCtx, aIDX, c.crClient, as.GetName(), as.GetNamespace(), p); err != nil {
log.Error().WithError(err)
}
}()
output, err = p.Exec(ctx, *bp, action.Name, *tp)
doneProgressTrack()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to call it again even after calling it from defer ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the doneProgressTrack() needs to be called as soon as Exec() is done. The defer() gets called once all the phase Exec is done and action is completed (or failed). When the next phase execution starts during interaction in this function, the older phase's Progress() starts sending completion percent as 0. So before running next phase, the goroutine for current phase needs to be closed. Let me know if this makes sense or not.

} else {
msg = fmt.Sprintf("Failed to init phase params: %#v:", as.Status.Actions[aIDX].Phases[i])
}

var rf func(*crv1alpha1.ActionSet) error
if err != nil {
coreErr = err
Expand All @@ -511,8 +514,17 @@ func (c *Controller) runAction(ctx context.Context, t *tomb.Tomb, as *crv1alpha1
coreErr = nil
rf = func(ras *crv1alpha1.ActionSet) error {
ras.Status.Actions[aIDX].Phases[i].State = crv1alpha1.StateComplete
pp, err := p.Progress()
if err != nil {
log.Error().WithError(err)
return nil
}
ras.Status.Actions[aIDX].Phases[i].Progress = pp
pavannd1 marked this conversation as resolved.
Show resolved Hide resolved
// this updates the phase output in the actionset status
ras.Status.Actions[aIDX].Phases[i].Output = output
if err := progress.SetActionSetPercentCompleted(ras); err != nil {
log.Error().WithError(err)
}
return nil
}
}
Expand Down Expand Up @@ -545,9 +557,15 @@ func (c *Controller) runAction(ctx context.Context, t *tomb.Tomb, as *crv1alpha1
// updateActionSetRunningPhase updates the actionset's `status.Progress.RunningPhase` with the phase name
// that is being run currently. It doesn't fail if there was a problem updating the actionset. It just logs
// the failure.
func (c *Controller) updateActionSetRunningPhase(ctx context.Context, as *crv1alpha1.ActionSet, phase string) {
func (c *Controller) updateActionSetRunningPhase(ctx context.Context, aIDX int, as *crv1alpha1.ActionSet, phase string) {
err := reconcile.ActionSet(ctx, c.crClient.CrV1alpha1(), as.Namespace, as.Name, func(as *crv1alpha1.ActionSet) error {
as.Status.Progress.RunningPhase = phase
// Iterate through all the phases and set current phase state to running
for i := 0; i < len(as.Status.Actions[aIDX].Phases); i++ {
if as.Status.Actions[aIDX].Phases[i].Name == phase {
as.Status.Actions[aIDX].Phases[i].State = crv1alpha1.StateRunning
}
}
return nil
})
if err != nil {
Expand Down
28 changes: 28 additions & 0 deletions pkg/customresource/actionset.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,20 @@ spec:
type: object
state:
type: string
progress:
properties:
progressPercent:
type: string
sizeUploadedB:
type: integer
estimatedTimeSeconds:
type: integer
estimatedUploadSizeB:
type: integer
lastTransitionTime:
type: string
format: date-time
type: object
type: object
phases:
description: Phases are sub-actions an are executed sequentially.
Expand All @@ -279,6 +293,20 @@ spec:
type: object
state:
type: string
progress:
properties:
progressPercent:
type: string
sizeUploadedB:
type: integer
estimatedTimeSeconds:
type: integer
estimatedUploadSizeB:
type: integer
lastTransitionTime:
type: string
format: date-time
type: object
type: object
type: array
type: object
Expand Down
22 changes: 20 additions & 2 deletions pkg/function/backup_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,22 @@ package function

import (
"context"
"time"

"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/client-go/kubernetes"

kanister "github.com/kanisterio/kanister/pkg"
crv1alpha1 "github.com/kanisterio/kanister/pkg/apis/cr/v1alpha1"
"github.com/kanisterio/kanister/pkg/consts"
"github.com/kanisterio/kanister/pkg/field"
"github.com/kanisterio/kanister/pkg/format"
"github.com/kanisterio/kanister/pkg/kube"
"github.com/kanisterio/kanister/pkg/log"
"github.com/kanisterio/kanister/pkg/param"
"github.com/kanisterio/kanister/pkg/progress"
"github.com/kanisterio/kanister/pkg/restic"
)

Expand Down Expand Up @@ -64,13 +68,19 @@ func init() {

var _ kanister.Func = (*backupDataFunc)(nil)

type backupDataFunc struct{}
type backupDataFunc struct {
progressPercent string
}

func (*backupDataFunc) Name() string {
return BackupDataFuncName
}

func (*backupDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) {
func (b *backupDataFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) {
// Set progress percent
b.progressPercent = progress.StartedPercent
defer func() { b.progressPercent = progress.CompletedPercent }()

var namespace, pod, container, includePath, backupArtifactPrefix, encryptionKey string
var err error
if err = Arg(args, BackupDataNamespaceArg, &namespace); err != nil {
Expand Down Expand Up @@ -188,3 +198,11 @@ func backupData(ctx context.Context, cli kubernetes.Interface, namespace, pod, c
phySize: phySize,
}, nil
}

func (b *backupDataFunc) ExecutionProgress() (crv1alpha1.PhaseProgress, error) {
metav1Time := metav1.NewTime(time.Now())
return crv1alpha1.PhaseProgress{
ProgressPercent: b.progressPercent,
LastTransitionTime: &metav1Time,
}, nil
}
22 changes: 20 additions & 2 deletions pkg/function/backup_data_all.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,19 @@ import (
"encoding/json"
"fmt"
"strings"
"time"

"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

kanister "github.com/kanisterio/kanister/pkg"
crv1alpha1 "github.com/kanisterio/kanister/pkg/apis/cr/v1alpha1"
"github.com/kanisterio/kanister/pkg/consts"
"github.com/kanisterio/kanister/pkg/field"
"github.com/kanisterio/kanister/pkg/kube"
"github.com/kanisterio/kanister/pkg/param"
"github.com/kanisterio/kanister/pkg/progress"
"github.com/kanisterio/kanister/pkg/restic"
)

Expand Down Expand Up @@ -62,13 +66,19 @@ func init() {

var _ kanister.Func = (*backupDataAllFunc)(nil)

type backupDataAllFunc struct{}
type backupDataAllFunc struct {
progressPercent string
}

func (*backupDataAllFunc) Name() string {
return BackupDataAllFuncName
}

func (*backupDataAllFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) {
func (b *backupDataAllFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) {
// Set progress percent
b.progressPercent = progress.StartedPercent
defer func() { b.progressPercent = progress.CompletedPercent }()

var namespace, pods, container, includePath, backupArtifactPrefix, encryptionKey string
var err error
if err = Arg(args, BackupDataAllNamespaceArg, &namespace); err != nil {
Expand Down Expand Up @@ -172,3 +182,11 @@ func backupDataAll(ctx context.Context, cli kubernetes.Interface, namespace stri
FunctionOutputVersion: kanister.DefaultVersion,
}, nil
}

func (b *backupDataAllFunc) ExecutionProgress() (crv1alpha1.PhaseProgress, error) {
metav1Time := metav1.NewTime(time.Now())
return crv1alpha1.PhaseProgress{
ProgressPercent: b.progressPercent,
LastTransitionTime: &metav1Time,
}, nil
}
21 changes: 19 additions & 2 deletions pkg/function/backup_data_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ package function
import (
"bytes"
"context"
"time"

"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"

kanister "github.com/kanisterio/kanister/pkg"
Expand All @@ -27,6 +29,7 @@ import (
"github.com/kanisterio/kanister/pkg/format"
"github.com/kanisterio/kanister/pkg/kube"
"github.com/kanisterio/kanister/pkg/param"
"github.com/kanisterio/kanister/pkg/progress"
"github.com/kanisterio/kanister/pkg/restic"
)

Expand Down Expand Up @@ -56,7 +59,9 @@ func init() {

var _ kanister.Func = (*BackupDataStatsFunc)(nil)

type BackupDataStatsFunc struct{}
type BackupDataStatsFunc struct {
progressPercent string
}

func (*BackupDataStatsFunc) Name() string {
return BackupDataStatsFuncName
Expand Down Expand Up @@ -130,7 +135,11 @@ func backupDataStatsPodFunc(
}
}

func (*BackupDataStatsFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) {
func (b *BackupDataStatsFunc) Exec(ctx context.Context, tp param.TemplateParams, args map[string]interface{}) (map[string]interface{}, error) {
// Set progress percent
b.progressPercent = progress.StartedPercent
defer func() { b.progressPercent = progress.CompletedPercent }()

var namespace, backupArtifactPrefix, backupID, mode, encryptionKey string
var err error
if err = Arg(args, BackupDataStatsNamespaceArg, &namespace); err != nil {
Expand Down Expand Up @@ -183,3 +192,11 @@ func (*BackupDataStatsFunc) Arguments() []string {
BackupDataStatsEncryptionKeyArg,
}
}

func (b *BackupDataStatsFunc) ExecutionProgress() (crv1alpha1.PhaseProgress, error) {
metav1Time := metav1.NewTime(time.Now())
return crv1alpha1.PhaseProgress{
ProgressPercent: b.progressPercent,
LastTransitionTime: &metav1Time,
}, nil
}
Loading