Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add recover model from checkpoint first if it exists, store is local #376

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 28 additions & 3 deletions cmd/craned/app/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package app
import (
"context"
"flag"
"fmt"
"os"
"strings"

Expand All @@ -25,6 +26,7 @@ import (
predictionapi "github.com/gocrane/api/prediction/v1alpha1"

"github.com/gocrane/crane/cmd/craned/app/options"
"github.com/gocrane/crane/pkg/checkpoint"
"github.com/gocrane/crane/pkg/controller/analytics"
"github.com/gocrane/crane/pkg/controller/cnp"
"github.com/gocrane/crane/pkg/controller/ehpa"
Expand Down Expand Up @@ -108,7 +110,11 @@ func Run(ctx context.Context, opts *options.Options) error {
}
// initialize data sources and predictor
realtimeDataSources, histroyDataSources, _ := initDataSources(mgr, opts)
predictorMgr := initPredictorManager(opts, realtimeDataSources, histroyDataSources)
predictorMgr, err := initPredictorManager(opts, realtimeDataSources, histroyDataSources)
if err != nil {
klog.Error(err, "failed to init predictor mgr")
return err
}

initScheme()
initWebhooks(mgr, opts)
Expand Down Expand Up @@ -197,8 +203,27 @@ func initDataSources(mgr ctrl.Manager, opts *options.Options) (map[providers.Dat
return realtimeDataSources, historyDataSources, hybridDataSources
}

func initPredictorManager(opts *options.Options, realtimeDataSources map[providers.DataSourceType]providers.RealTime, historyDataSources map[providers.DataSourceType]providers.History) predictor.Manager {
return predictor.NewManager(realtimeDataSources, historyDataSources, predictor.DefaultPredictorsConfig(opts.AlgorithmModelConfig))
func initPredictorManager(opts *options.Options, realtimeDataSources map[providers.DataSourceType]providers.RealTime, historyDataSources map[providers.DataSourceType]providers.History) (predictor.Manager, error) {
cpStoreType := checkpoint.StoreType(opts.CheckpointerStore)
var checkpointer checkpoint.Checkpointer
var err error
if opts.EnableCheckpointer {
switch cpStoreType {
case checkpoint.StoreTypeLocal:
checkpointer, err = checkpoint.InitCheckpointer(checkpoint.StoreType(opts.CheckpointerStore), opts.CheckpointerLocalConfig)
if err != nil {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the InitCheckpointer be called exactly once? e.g., by sync.Once.Do

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the InitCheckpointer be called exactly once? e.g., by sync.Once.Do

it is ok to call InitCheckpointer more than once, becasue the factory pattern, there is a map to store each storage checkpoint. it will return if it already exists.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

return nil, err
}
default:
return nil, fmt.Errorf("not supported checkpointer store type %v", cpStoreType)
}
}

ctx := predictor.CheckPointerContext{
Enable: opts.EnableCheckpointer,
Checkpointer: checkpointer,
}
return predictor.NewManager(realtimeDataSources, historyDataSources, predictor.DefaultPredictorsConfig(opts.AlgorithmModelConfig), ctx), nil
}

// initControllers setup controllers with manager
Expand Down
12 changes: 11 additions & 1 deletion cmd/craned/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/spf13/pflag"
componentbaseconfig "k8s.io/component-base/config"

"github.com/gocrane/crane/pkg/checkpoint"
"github.com/gocrane/crane/pkg/controller/ehpa"
"github.com/gocrane/crane/pkg/prediction/config"
"github.com/gocrane/crane/pkg/providers"
Expand Down Expand Up @@ -37,7 +38,10 @@ type Options struct {
DataSourceGrpcConfig providers.GrpcConfig

// AlgorithmModelConfig
AlgorithmModelConfig config.AlgorithmModelConfig
AlgorithmModelConfig config.AlgorithmModelConfig
EnableCheckpointer bool
CheckpointerStore string
CheckpointerLocalConfig checkpoint.LocalStoreConfig

// WebhookConfig
WebhookConfig webhooks.WebhookConfig
Expand Down Expand Up @@ -107,6 +111,12 @@ func (o *Options) AddFlags(flags *pflag.FlagSet) {
flags.StringVar(&o.DataSourceGrpcConfig.Address, "grpc-ds-address", "localhost:50051", "grpc data source server address")
flags.DurationVar(&o.DataSourceGrpcConfig.Timeout, "grpc-ds-timeout", time.Minute, "grpc timeout")
flags.DurationVar(&o.AlgorithmModelConfig.UpdateInterval, "model-update-interval", 12*time.Hour, "algorithm model update interval, now used for dsp model update interval")

flags.BoolVar(&o.EnableCheckpointer, "enable-checkpointer", false, "algorithm model checkpointer, if you want to do checkpoint, you can enable it")
flags.StringVar(&o.CheckpointerStore, "checkpointer-store", "local", "type of the checkpointer, different checkpointer has different storage type. default is local")
flags.StringVar(&o.CheckpointerLocalConfig.Root, "checkpointer-local-root", ".", "local checkpointer root path which checkpoint data stored in, make sure your app has permission to read/write")
flags.IntVar(&o.CheckpointerLocalConfig.MaxWorkers, "checkpointer-local-max-workers", 4, "local checkpointer max workers to do read/write")

flags.BoolVar(&o.WebhookConfig.Enabled, "webhook-enabled", true, "whether enable webhook or not, default to true")
flags.StringVar(&o.RecommendationConfigFile, "recommendation-config-file", "", "recommendation configuration file")
flags.StringSliceVar(&o.EhpaControllerConfig.PropagationConfig.LabelPrefixes, "ehpa-propagation-label-prefixes", []string{}, "propagate labels whose key has the prefix to hpa")
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ go 1.17

require (
github.com/go-echarts/go-echarts/v2 v2.2.4
github.com/gocrane/api v0.4.1-0.20220520134105-09d430d903ac
github.com/google/cadvisor v0.39.2
github.com/mjibson/go-dsp v0.0.0-20180508042940-11479a337f12
github.com/prometheus/client_golang v1.11.0
Expand Down Expand Up @@ -149,7 +148,6 @@ require (
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/warnings.v0 v0.1.1 // indirect
Expand All @@ -169,7 +167,8 @@ require (
require (
cloud.google.com/go v0.84.0 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/fsnotify/fsnotify v1.5.1 // indirect
github.com/fsnotify/fsnotify v1.5.1
github.com/gocrane/api v0.5.0
github.com/json-iterator/go v1.1.12 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/robfig/cron/v3 v3.0.1
Expand All @@ -181,6 +180,7 @@ require (
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
golang.org/x/tools v0.1.8 // indirect
google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa // indirect
google.golang.org/protobuf v1.27.1
)

replace (
Expand Down
6 changes: 2 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -310,10 +310,8 @@ github.com/gobwas/pool v0.2.1 h1:xfeeEhW7pwmX8nuLVlqbzVc7udMDrwetjEv+TZIz1og=
github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
github.com/gobwas/ws v1.1.0-rc.5 h1:QOAag7FoBaBYYHRqzqkhhd8fq5RTubvI4v3Ft/gDVVQ=
github.com/gobwas/ws v1.1.0-rc.5/go.mod h1:nzvNcVha5eUziGrbxFCo6qFIojQHjJV5cLYIbezhfL0=
github.com/gocrane/api v0.4.1-0.20220507041258-d376db2b4ad4 h1:vGDg3G6y661KAlhjf/8/r8JCjaIi6aV8szCP+MZRU3Y=
github.com/gocrane/api v0.4.1-0.20220507041258-d376db2b4ad4/go.mod h1:GxI+t9AW8+NsHkz2JkPBIJN//9eLUjTZl1ScYAbXMbk=
github.com/gocrane/api v0.4.1-0.20220520134105-09d430d903ac h1:lBKVVOA4del0Plj80PCE+nglxaJxaXanCv5N6a3laVY=
github.com/gocrane/api v0.4.1-0.20220520134105-09d430d903ac/go.mod h1:GxI+t9AW8+NsHkz2JkPBIJN//9eLUjTZl1ScYAbXMbk=
github.com/gocrane/api v0.5.0 h1:hKPt1T8T/vBEtMyWhz976ZHG8w+Z4NuHpp5+eixcw1A=
github.com/gocrane/api v0.5.0/go.mod h1:GxI+t9AW8+NsHkz2JkPBIJN//9eLUjTZl1ScYAbXMbk=
github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/godbus/dbus/v5 v5.0.4 h1:9349emZab16e7zQvpmsbtjc18ykshndd8y2PG3sgJbA=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
Expand Down
70 changes: 70 additions & 0 deletions pkg/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package checkpoint

import (
"context"
"fmt"
"sync"
"time"

"github.com/gocrane/crane/pkg/internal"
"github.com/gocrane/crane/pkg/metricnaming"
)

// Checkpointer is used to do checkpoint for metric namer. this package is only responsible for executing store and load of checkpoint data.
// You can implement other checkpoint writer and reader backed by different storages such as localfs、s3、etcd(Custom Resource Definition)
// the caller decides when to do checkpoint, checkpoint frequency is depending on the caller.
// there are multiple ways to decide when to do checkpoint.
// 1. predictor checkpoints all metric namers together periodically by a independent routine. but this will not guarantee the checkpoint data is consistent with the latest updated model in memory
// 2. predictor checkpoints the metric namer each time after model is updating, so the checkpoint is always latest. for example, the percentile to do checkpoint after add sample for each metric namer.
// 3. application caller such as evpa triggers the metric namer to do checkpoint. delegate the trigger to application caller
type Checkpointer interface {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move directory checkpoint into somewhere deeper, for example, in evpa's rootdirr

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move directory checkpoint into somewhere deeper, for example, in evpa's rootdirr

I think check pointer is a common lib, not only for evpa. other model can be checkpointed too if it supports. so i put it in top level

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice for a common checkpoint ability, So for a common lib, maybe we need to change interface function to a general way.

Start(stopCh <-chan struct{})
Writer
Reader
}

type Writer interface {
// store metricNamer checkpoints. each time call will override original checkpoint data of the same metric namer if it exists.
// each metric namer model checkpoint only store one replica.
// this is sync way, it block until the checkpoint stored operation finished
StoreMetricModelCheckpoint(ctx context.Context, checkpoint *internal.CheckpointContext, now time.Time) error
// this is async way, it send the checkpoint to a channel and return immediately
AsyncStoreMetricModelCheckpoint(ctx context.Context, checkpoint *internal.CheckpointContext, now time.Time) error
// close checkpointer, close the queue && wait until all requests pending in queue done
Flush()
}

type Reader interface {
// load metricNamer checkpoints
LoadMetricModelCheckpoint(ctx context.Context, namer metricnaming.MetricNamer) (*internal.MetricNamerModelCheckpoint, error)
}

type StoreType string

const (
StoreTypeLocal StoreType = "local"
StoreTypeK8s StoreType = "k8s"
)

type Factory func(cfg interface{}) (Checkpointer, error)

var (
checkpointFactorys = make(map[StoreType]Factory)
lock sync.Mutex
)

func RegisterFactory(storeType StoreType, factory Factory) {
lock.Lock()
defer lock.Unlock()
checkpointFactorys[storeType] = factory
}

func InitCheckpointer(storeType StoreType, cfg interface{}) (Checkpointer, error) {
lock.Lock()
defer lock.Unlock()
if factory, ok := checkpointFactorys[storeType]; ok {
return factory(cfg)
} else {
return nil, fmt.Errorf("not registered checkpoint store type %v", storeType)
}
}
3 changes: 3 additions & 0 deletions pkg/checkpoint/k8s.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package checkpoint

// todo: define the k8s crd for metricnamer model checkpoint
Loading