diff --git a/config/components/rbac/role.yaml b/config/components/rbac/role.yaml index c7ed22eb2d..f634072dfd 100644 --- a/config/components/rbac/role.yaml +++ b/config/components/rbac/role.yaml @@ -92,6 +92,14 @@ rules: - list - update - watch +- apiGroups: + - apiextensions.k8s.io + resources: + - customresourcedefinitions + verbs: + - get + - list + - watch - apiGroups: - autoscaling.x-k8s.io resources: diff --git a/go.mod b/go.mod index 43ba6dceaf..dfcc91b11d 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,9 @@ require ( github.com/prometheus/client_model v0.5.0 github.com/ray-project/kuberay/ray-operator v1.1.0-alpha.0 go.uber.org/zap v1.26.0 + golang.org/x/exp v0.0.0-20230905200255-921286631fa9 k8s.io/api v0.29.2 + k8s.io/apiextensions-apiserver v0.29.0 k8s.io/apimachinery v0.29.2 k8s.io/apiserver v0.29.1 k8s.io/autoscaler/cluster-autoscaler/apis v0.0.0-20240325113845-0130d33747bb @@ -100,7 +102,6 @@ require ( go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.18.0 // indirect - golang.org/x/exp v0.0.0-20230905200255-921286631fa9 // indirect golang.org/x/mod v0.14.0 // indirect golang.org/x/net v0.20.0 // indirect golang.org/x/oauth2 v0.12.0 // indirect @@ -121,7 +122,6 @@ require ( gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - k8s.io/apiextensions-apiserver v0.29.0 // indirect k8s.io/gengo v0.0.0-20230829151522-9cce18d56c01 // indirect k8s.io/kms v0.29.1 // indirect sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.28.0 // indirect diff --git a/pkg/controller/jobframework/setup.go b/pkg/controller/jobframework/setup.go index 39541ad3c4..7373c3e9a2 100644 --- a/pkg/controller/jobframework/setup.go +++ b/pkg/controller/jobframework/setup.go @@ -22,15 +22,32 @@ import ( "fmt" "os" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/tools/cache" + "github.com/go-logr/logr" + "golang.org/x/exp/slices" "k8s.io/apimachinery/pkg/api/meta" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + "k8s.io/apimachinery/pkg/watch" + retrywatch "k8s.io/client-go/tools/watch" + "sigs.k8s.io/kueue/pkg/controller/jobs/noop" ) +const ( + pytorchjobAPI = "pytorchjobs.kubeflow.org" + rayclusterAPI = "rayclusters.ray.io" +) + var ( errFailedMappingResource = errors.New("restMapper failed mapping resource") ) @@ -48,7 +65,6 @@ func SetupControllers(mgr ctrl.Manager, log logr.Logger, opts ...Option) error { options := ProcessOptions(opts...) return ForEachIntegration(func(name string, cb IntegrationCallbacks) error { - logger := log.WithValues("jobFrameworkName", name) fwkNamePrefix := fmt.Sprintf("jobFrameworkName %q", name) if options.EnabledFrameworks.Has(name) { @@ -62,24 +78,14 @@ func SetupControllers(mgr ctrl.Manager, log logr.Logger, opts ...Option) error { if err != nil { return fmt.Errorf("%s: %w: %w", fwkNamePrefix, errFailedMappingResource, err) } - if _, err = mgr.GetRESTMapper().RESTMapping(gvk.GroupKind(), gvk.Version); err != nil { - if !meta.IsNoMatchError(err) { - return fmt.Errorf("%s: %w", fwkNamePrefix, err) - } - logger.Info("No matching API in the server for job framework, skipped setup of controller and webhook") + if !isAPIAvailable(context.TODO(), mgr, rayclusterAPI) { + log.Info("API not available, waiting for it to become available... - Skipping setup of controller and webhook") + waitForAPI(context.TODO(), log, mgr, rayclusterAPI, func() { + setupComponents(mgr, log, gvk, fwkNamePrefix, cb, opts...) + }) } else { - if err = cb.NewReconciler( - mgr.GetClient(), - mgr.GetEventRecorderFor(fmt.Sprintf("%s-%s-controller", name, options.ManagerName)), - opts..., - ).SetupWithManager(mgr); err != nil { - return fmt.Errorf("%s: %w", fwkNamePrefix, err) - } - if err = cb.SetupWebhook(mgr, opts...); err != nil { - return fmt.Errorf("%s: unable to create webhook: %w", fwkNamePrefix, err) - } - logger.Info("Set up controller and webhook for job framework") - return nil + log.Info("API is available, setting up components...") + setupComponents(mgr, log, gvk, fwkNamePrefix, cb, opts...) } } if err := noop.SetupWebhook(mgr, cb.JobType); err != nil { @@ -89,6 +95,39 @@ func SetupControllers(mgr ctrl.Manager, log logr.Logger, opts ...Option) error { }) } +func setupComponents(mgr ctrl.Manager, log logr.Logger, gvk schema.GroupVersionKind, fwkNamePrefix string, cb IntegrationCallbacks, opts ...Option) { + // Attempt to get the REST mapping for the GVK + if _, err := mgr.GetRESTMapper().RESTMapping(gvk.GroupKind(), gvk.Version); err != nil { + if !meta.IsNoMatchError(err) { + log.Error(err, fmt.Sprintf("%s: unable to get REST mapping", fwkNamePrefix)) + return + } + log.Info("No matching API in the server for job framework, skipped setup of controller and webhook") + } else { + if err := setupControllerAndWebhook(mgr, gvk, fwkNamePrefix, cb, opts...); err != nil { + log.Error(err, "Failed to set up controller and webhook") + } else { + log.Info("Set up controller and webhook for job framework") + } + } +} + +func setupControllerAndWebhook(mgr ctrl.Manager, gvk schema.GroupVersionKind, fwkNamePrefix string, cb IntegrationCallbacks, opts ...Option) error { + if err := cb.NewReconciler( + mgr.GetClient(), + mgr.GetEventRecorderFor(fmt.Sprintf("%s-%s-controller", gvk.Kind, "managerName")), // Ensure managerName is defined or fetched + opts..., + ).SetupWithManager(mgr); err != nil { + return fmt.Errorf("%s: %w", fwkNamePrefix, err) + } + + if err := cb.SetupWebhook(mgr, opts...); err != nil { + return fmt.Errorf("%s: unable to create webhook: %w", fwkNamePrefix, err) + } + + return nil +} + // SetupIndexes setups the indexers for integrations. // When the platform developers implement a separate kueue-manager to manage the in-house custom jobs, // they can easily setup indexers for the in-house custom jobs. @@ -105,3 +144,73 @@ func SetupIndexes(ctx context.Context, indexer client.FieldIndexer, opts ...Opti return nil }) } + +func isAPIAvailable(ctx context.Context, mgr ctrl.Manager, apiName string) bool { + crdClient, err := apiextensionsclientset.NewForConfig(mgr.GetConfig()) + exitOnError(err, "unable to create CRD client") + + crdList, err := crdClient.ApiextensionsV1().CustomResourceDefinitions().List(ctx, metav1.ListOptions{}) + exitOnError(err, "unable to list CRDs") + + return slices.ContainsFunc(crdList.Items, func(crd apiextensionsv1.CustomResourceDefinition) bool { + return crd.Name == apiName + }) +} + +func waitForAPI(ctx context.Context, log logr.Logger, mgr ctrl.Manager, apiName string, action func()) { + crdClient, err := apiextensionsclientset.NewForConfig(mgr.GetConfig()) + exitOnError(err, "unable to create CRD client") + + crdList, err := crdClient.ApiextensionsV1().CustomResourceDefinitions().List(ctx, metav1.ListOptions{}) + exitOnError(err, "unable to list CRDs") + + // If API is already available, just invoke action + if slices.ContainsFunc(crdList.Items, func(crd apiextensionsv1.CustomResourceDefinition) bool { + return crd.Name == apiName + }) { + action() + return + } + + // Wait for the API to become available then invoke action + log.Info(fmt.Sprintf("API %v not available, setting up retry watcher", apiName)) + retryWatcher, err := retrywatch.NewRetryWatcher(crdList.ResourceVersion, &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + return crdClient.ApiextensionsV1().CustomResourceDefinitions().List(ctx, metav1.ListOptions{}) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + return crdClient.ApiextensionsV1().CustomResourceDefinitions().Watch(ctx, metav1.ListOptions{}) + }, + }) + exitOnError(err, "unable to create retry watcher") + + defer retryWatcher.Stop() + for { + select { + case <-ctx.Done(): + return + case event := <-retryWatcher.ResultChan(): + switch event.Type { + case watch.Error: + exitOnError(apierrors.FromObject(event.Object), fmt.Sprintf("error watching for API %v", apiName)) + + case watch.Added, watch.Modified: + if crd := event.Object.(*apiextensionsv1.CustomResourceDefinition); crd.Name == apiName && + slices.ContainsFunc(crd.Status.Conditions, func(condition apiextensionsv1.CustomResourceDefinitionCondition) bool { + return condition.Type == apiextensionsv1.Established && condition.Status == apiextensionsv1.ConditionTrue + }) { + log.Info(fmt.Sprintf("API %v installed, invoking deferred action", apiName)) + action() + return + } + } + } + } +} + +func exitOnError(err error, msg string) { + if err != nil { + fmt.Sprint(err, msg) + os.Exit(1) + } +} \ No newline at end of file diff --git a/pkg/controller/jobs/raycluster/raycluster_controller.go b/pkg/controller/jobs/raycluster/raycluster_controller.go index 0dd8babd81..1483c1f76f 100644 --- a/pkg/controller/jobs/raycluster/raycluster_controller.go +++ b/pkg/controller/jobs/raycluster/raycluster_controller.go @@ -59,6 +59,8 @@ func init() { // +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=resourceflavors,verbs=get;list;watch // +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloadpriorityclasses,verbs=get;list;watch // +kubebuilder:rbac:groups=ray.io,resources=rayclusters/finalizers,verbs=get;update +// +kubebuilder:rbac:groups="apiextensions.k8s.io",resources=customresourcedefinitions,verbs=get;list;watch + var NewReconciler = jobframework.NewGenericReconcilerFactory(func() jobframework.GenericJob { return &RayCluster{} })