Skip to content

Commit

Permalink
CARRY: Add watcher to controller and webhook
Browse files Browse the repository at this point in the history
  • Loading branch information
ChristianZaccaria committed May 30, 2024
1 parent fdafd7a commit 2736a06
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 20 deletions.
8 changes: 8 additions & 0 deletions config/components/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,14 @@ rules:
- list
- update
- watch
- apiGroups:
- apiextensions.k8s.io
resources:
- customresourcedefinitions
verbs:
- get
- list
- watch
- apiGroups:
- autoscaling.x-k8s.io
resources:
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ require (
github.com/prometheus/client_model v0.5.0
github.com/ray-project/kuberay/ray-operator v1.1.0-alpha.0
go.uber.org/zap v1.26.0
golang.org/x/exp v0.0.0-20230905200255-921286631fa9
k8s.io/api v0.29.2
k8s.io/apiextensions-apiserver v0.29.0
k8s.io/apimachinery v0.29.2
k8s.io/apiserver v0.29.1
k8s.io/autoscaler/cluster-autoscaler/apis v0.0.0-20240325113845-0130d33747bb
Expand Down Expand Up @@ -100,7 +102,6 @@ require (
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.18.0 // indirect
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect
golang.org/x/mod v0.14.0 // indirect
golang.org/x/net v0.20.0 // indirect
golang.org/x/oauth2 v0.12.0 // indirect
Expand All @@ -121,7 +122,6 @@ require (
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/apiextensions-apiserver v0.29.0 // indirect
k8s.io/gengo v0.0.0-20230829151522-9cce18d56c01 // indirect
k8s.io/kms v0.29.1 // indirect
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.28.0 // indirect
Expand Down
145 changes: 127 additions & 18 deletions pkg/controller/jobframework/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,32 @@ import (
"fmt"
"os"

apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/tools/cache"

"github.com/go-logr/logr"
"golang.org/x/exp/slices"
"k8s.io/apimachinery/pkg/api/meta"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"

apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/apimachinery/pkg/watch"
retrywatch "k8s.io/client-go/tools/watch"

"sigs.k8s.io/kueue/pkg/controller/jobs/noop"
)

const (
pytorchjobAPI = "pytorchjobs.kubeflow.org"
rayclusterAPI = "rayclusters.ray.io"
)

var (
errFailedMappingResource = errors.New("restMapper failed mapping resource")
)
Expand All @@ -48,7 +65,6 @@ func SetupControllers(mgr ctrl.Manager, log logr.Logger, opts ...Option) error {
options := ProcessOptions(opts...)

return ForEachIntegration(func(name string, cb IntegrationCallbacks) error {
logger := log.WithValues("jobFrameworkName", name)
fwkNamePrefix := fmt.Sprintf("jobFrameworkName %q", name)

if options.EnabledFrameworks.Has(name) {
Expand All @@ -62,24 +78,14 @@ func SetupControllers(mgr ctrl.Manager, log logr.Logger, opts ...Option) error {
if err != nil {
return fmt.Errorf("%s: %w: %w", fwkNamePrefix, errFailedMappingResource, err)
}
if _, err = mgr.GetRESTMapper().RESTMapping(gvk.GroupKind(), gvk.Version); err != nil {
if !meta.IsNoMatchError(err) {
return fmt.Errorf("%s: %w", fwkNamePrefix, err)
}
logger.Info("No matching API in the server for job framework, skipped setup of controller and webhook")
if !isAPIAvailable(context.TODO(), mgr, rayclusterAPI) {
log.Info("API not available, waiting for it to become available... - Skipping setup of controller and webhook")
waitForAPI(context.TODO(), log, mgr, rayclusterAPI, func() {
setupComponents(mgr, log, gvk, fwkNamePrefix, cb, opts...)
})
} else {
if err = cb.NewReconciler(
mgr.GetClient(),
mgr.GetEventRecorderFor(fmt.Sprintf("%s-%s-controller", name, options.ManagerName)),
opts...,
).SetupWithManager(mgr); err != nil {
return fmt.Errorf("%s: %w", fwkNamePrefix, err)
}
if err = cb.SetupWebhook(mgr, opts...); err != nil {
return fmt.Errorf("%s: unable to create webhook: %w", fwkNamePrefix, err)
}
logger.Info("Set up controller and webhook for job framework")
return nil
log.Info("API is available, setting up components...")
setupComponents(mgr, log, gvk, fwkNamePrefix, cb, opts...)
}
}
if err := noop.SetupWebhook(mgr, cb.JobType); err != nil {
Expand All @@ -89,6 +95,39 @@ func SetupControllers(mgr ctrl.Manager, log logr.Logger, opts ...Option) error {
})
}

func setupComponents(mgr ctrl.Manager, log logr.Logger, gvk schema.GroupVersionKind, fwkNamePrefix string, cb IntegrationCallbacks, opts ...Option) {
// Attempt to get the REST mapping for the GVK
if _, err := mgr.GetRESTMapper().RESTMapping(gvk.GroupKind(), gvk.Version); err != nil {
if !meta.IsNoMatchError(err) {
log.Error(err, fmt.Sprintf("%s: unable to get REST mapping", fwkNamePrefix))
return
}
log.Info("No matching API in the server for job framework, skipped setup of controller and webhook")
} else {
if err := setupControllerAndWebhook(mgr, gvk, fwkNamePrefix, cb, opts...); err != nil {
log.Error(err, "Failed to set up controller and webhook")
} else {
log.Info("Set up controller and webhook for job framework")
}
}
}

func setupControllerAndWebhook(mgr ctrl.Manager, gvk schema.GroupVersionKind, fwkNamePrefix string, cb IntegrationCallbacks, opts ...Option) error {
if err := cb.NewReconciler(
mgr.GetClient(),
mgr.GetEventRecorderFor(fmt.Sprintf("%s-%s-controller", gvk.Kind, "managerName")), // Ensure managerName is defined or fetched
opts...,
).SetupWithManager(mgr); err != nil {
return fmt.Errorf("%s: %w", fwkNamePrefix, err)
}

if err := cb.SetupWebhook(mgr, opts...); err != nil {
return fmt.Errorf("%s: unable to create webhook: %w", fwkNamePrefix, err)
}

return nil
}

// SetupIndexes setups the indexers for integrations.
// When the platform developers implement a separate kueue-manager to manage the in-house custom jobs,
// they can easily setup indexers for the in-house custom jobs.
Expand All @@ -105,3 +144,73 @@ func SetupIndexes(ctx context.Context, indexer client.FieldIndexer, opts ...Opti
return nil
})
}

func isAPIAvailable(ctx context.Context, mgr ctrl.Manager, apiName string) bool {
crdClient, err := apiextensionsclientset.NewForConfig(mgr.GetConfig())
exitOnError(err, "unable to create CRD client")

crdList, err := crdClient.ApiextensionsV1().CustomResourceDefinitions().List(ctx, metav1.ListOptions{})
exitOnError(err, "unable to list CRDs")

return slices.ContainsFunc(crdList.Items, func(crd apiextensionsv1.CustomResourceDefinition) bool {
return crd.Name == apiName
})
}

func waitForAPI(ctx context.Context, log logr.Logger, mgr ctrl.Manager, apiName string, action func()) {
crdClient, err := apiextensionsclientset.NewForConfig(mgr.GetConfig())
exitOnError(err, "unable to create CRD client")

crdList, err := crdClient.ApiextensionsV1().CustomResourceDefinitions().List(ctx, metav1.ListOptions{})
exitOnError(err, "unable to list CRDs")

// If API is already available, just invoke action
if slices.ContainsFunc(crdList.Items, func(crd apiextensionsv1.CustomResourceDefinition) bool {
return crd.Name == apiName
}) {
action()
return
}

// Wait for the API to become available then invoke action
log.Info(fmt.Sprintf("API %v not available, setting up retry watcher", apiName))
retryWatcher, err := retrywatch.NewRetryWatcher(crdList.ResourceVersion, &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return crdClient.ApiextensionsV1().CustomResourceDefinitions().List(ctx, metav1.ListOptions{})
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return crdClient.ApiextensionsV1().CustomResourceDefinitions().Watch(ctx, metav1.ListOptions{})
},
})
exitOnError(err, "unable to create retry watcher")

defer retryWatcher.Stop()
for {
select {
case <-ctx.Done():
return
case event := <-retryWatcher.ResultChan():
switch event.Type {
case watch.Error:
exitOnError(apierrors.FromObject(event.Object), fmt.Sprintf("error watching for API %v", apiName))

case watch.Added, watch.Modified:
if crd := event.Object.(*apiextensionsv1.CustomResourceDefinition); crd.Name == apiName &&
slices.ContainsFunc(crd.Status.Conditions, func(condition apiextensionsv1.CustomResourceDefinitionCondition) bool {
return condition.Type == apiextensionsv1.Established && condition.Status == apiextensionsv1.ConditionTrue
}) {
log.Info(fmt.Sprintf("API %v installed, invoking deferred action", apiName))
action()
return
}
}
}
}
}

func exitOnError(err error, msg string) {
if err != nil {
fmt.Sprint(err, msg)
os.Exit(1)
}
}
2 changes: 2 additions & 0 deletions pkg/controller/jobs/raycluster/raycluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ func init() {
// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=resourceflavors,verbs=get;list;watch
// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloadpriorityclasses,verbs=get;list;watch
// +kubebuilder:rbac:groups=ray.io,resources=rayclusters/finalizers,verbs=get;update
// +kubebuilder:rbac:groups="apiextensions.k8s.io",resources=customresourcedefinitions,verbs=get;list;watch


var NewReconciler = jobframework.NewGenericReconcilerFactory(func() jobframework.GenericJob { return &RayCluster{} })

Expand Down

0 comments on commit 2736a06

Please sign in to comment.