diff --git a/master/internal/rm/kubernetesrm/jobs.go b/master/internal/rm/kubernetesrm/jobs.go index 790b90dd014..09ced04f2fd 100644 --- a/master/internal/rm/kubernetesrm/jobs.go +++ b/master/internal/rm/kubernetesrm/jobs.go @@ -17,6 +17,7 @@ import ( "github.com/pkg/errors" "github.com/sirupsen/logrus" "golang.org/x/exp/maps" + "golang.org/x/sync/errgroup" batchV1 "k8s.io/api/batch/v1" k8sV1 "k8s.io/api/core/v1" k8error "k8s.io/apimachinery/pkg/api/errors" @@ -37,6 +38,9 @@ import ( gateway "sigs.k8s.io/gateway-api/pkg/client/clientset/versioned/typed/apis/v1" alphaGateway "sigs.k8s.io/gateway-api/pkg/client/clientset/versioned/typed/apis/v1alpha2" + // Used to load all auth plugins. + _ "k8s.io/client-go/plugin/pkg/client/auth" + "github.com/determined-ai/determined/master/internal/config" "github.com/determined-ai/determined/master/internal/db" "github.com/determined-ai/determined/master/internal/rm" @@ -51,9 +55,6 @@ import ( "github.com/determined-ai/determined/master/pkg/syncx/waitgroupx" "github.com/determined-ai/determined/master/pkg/tasks" "github.com/determined-ai/determined/proto/pkg/apiv1" - - // Used to load all auth plugins. - _ "k8s.io/client-go/plugin/pkg/client/auth" ) const ( @@ -116,13 +117,15 @@ type jobsService struct { internalTaskGWConfig *config.InternalTaskGatewayConfig // System dependencies. Also set in initialization and never modified after. - syslog *logrus.Entry - clientSet k8sClient.Interface + syslog *logrus.Entry + clientSet k8sClient.Interface + // TODO(!!!): Not set in initialization and never changed anymore.. RIP. podInterfaces map[string]typedV1.PodInterface configMapInterfaces map[string]typedV1.ConfigMapInterface jobInterfaces map[string]typedBatchV1.JobInterface serviceInterfaces map[string]typedV1.ServiceInterface tcpRouteInterfaces map[string]alphaGateway.TCPRouteInterface + // TODO(!!!): end. resourceRequestQueue *requestQueue requestQueueWorkers []*requestProcessingWorker @@ -256,6 +259,7 @@ func newJobsService( } func (j *jobsService) syncNamespaces(ns []string, hasJSLock bool) error { + // TODO(!!!): Prob one informer per cluster too. for _, namespace := range ns { // Since we don't want to do duplicate namespace informers, don't start any // listeners or informers that have already been added to namespacesWithInformers. @@ -351,6 +355,8 @@ func (j *jobsService) startClientSet(namespaces []string) error { return fmt.Errorf("failed to initialize kubernetes clientSet: %w", err) } + j.jobInterfaces[""] = j.clientSet.BatchV1().Jobs("") + j.podInterfaces[""] = j.clientSet.CoreV1().Pods("") for _, ns := range namespaces { j.podInterfaces[ns] = j.clientSet.CoreV1().Pods(ns) j.configMapInterfaces[ns] = j.clientSet.CoreV1().ConfigMaps(ns) @@ -397,7 +403,17 @@ func readClientConfig(kubeconfigPath string) (*rest.Config, error) { // and it expects to find files: // - /var/run/secrets/kubernetes.io/serviceaccount/token // - /var/run/secrets/kubernetes.io/serviceaccount/ca.crt - return rest.InClusterConfig() + c, err := rest.InClusterConfig() + if err != nil { + return nil, err + } + if c.QPS == 0.0 { + c.QPS = 20 + } + if c.Burst == 0 { + c.Burst = 100 + } + return c, nil } if parts := strings.Split(kubeconfigPath, string(os.PathSeparator)); parts[0] == "~" { @@ -1040,7 +1056,7 @@ func (j *jobsService) refreshPodStates(allocationID model.AllocationID) error { return fmt.Errorf("failed to get namespaces for resource manager: %w", err) } - for _, pod := range pods.Items { + for _, pod := range pods { if !slices.Contains(ns, pod.Namespace) { continue } @@ -1085,20 +1101,40 @@ func (j *jobsService) GetSlot(msg *apiv1.GetSlotRequest) *apiv1.GetSlotResponse return j.getSlot(msg.AgentId, msg.SlotId) } -func (j *jobsService) HealthStatus() model.HealthStatus { - j.mu.Lock() - defer j.mu.Unlock() - for _, podInterface := range j.podInterfaces { - _, err := podInterface.List(context.TODO(), metaV1.ListOptions{Limit: 1}) - if err != nil { - j.syslog.WithError(err).Error("kubernetes resource manager marked as unhealthy") - return model.Unhealthy - } - return model.Healthy +func (j *jobsService) HealthStatus(ctx context.Context) model.HealthStatus { + if len(j.podInterfaces) == 0 { + logrus.Error("expected podInterface to be non empty") + return model.Unhealthy } - logrus.Error("expected jobInterface to be non empty") - return model.Unhealthy + _, err := j.podInterfaces[""].List(ctx, metaV1.ListOptions{Limit: 1}) + if k8error.IsForbidden(err) { + return j.healthStatusFallback(ctx) + } else if err != nil { + return model.Unhealthy + } + return model.Healthy +} + +func (j *jobsService) healthStatusFallback(ctx context.Context) model.HealthStatus { + var g errgroup.Group + for n, podInterface := range j.podInterfaces { + if len(n) == 0 { // TODO: We store a non-namespaced client with key "". + continue + } + g.Go(func() error { + _, err := podInterface.List(ctx, metaV1.ListOptions{Limit: 1}) + if err != nil { + return err + } + return nil + }) + } + err := g.Wait() + if err != nil { + return model.Unhealthy + } + return model.Healthy } func (j *jobsService) startNodeInformer() error { @@ -1493,7 +1529,7 @@ func (j *jobsService) releaseAllocationsOnDisabledNode(nodeName string) error { } notifiedAllocations := make(map[model.AllocationID]bool) - for _, pod := range pods.Items { + for _, pod := range pods { jobName, ok := resolvePodJobName(&pod) if !ok { j.syslog.Debugf("found pod when disabling node without %s label", kubernetesJobNameLabel) @@ -1586,7 +1622,6 @@ type computeUsageSummary struct { slotsAvailable int } -// TODO(!!!): good func comment. func (j *jobsService) summarizeComputeUsage(poolName string) (*computeUsageSummary, error) { summary, err := j.summarize() if err != nil { @@ -1926,7 +1961,6 @@ func (j *jobsService) summarizeClusterByNodes() map[string]model.AgentSummary { } podByNode[podInfo.nodeName] = append(podByNode[podInfo.nodeName], podInfo) } - nodeToTasks, taskSlots := j.getNonDetSlots(j.slotType) summary := make(map[string]model.AgentSummary, len(j.currentNodes)) for _, node := range j.currentNodes { @@ -2042,7 +2076,7 @@ func (j *jobsService) getNonDetPods() ([]k8sV1.Pod, error) { } var nonDetPods []k8sV1.Pod - for _, p := range allPods.Items { + for _, p := range allPods { _, isDet := p.Labels[determinedLabel] _, isDetSystem := p.Labels[determinedSystemLabel] @@ -2058,7 +2092,6 @@ func (j *jobsService) getNonDetPods() ([]k8sV1.Pod, error) { func (j *jobsService) getNonDetSlots(deviceType device.Type) (map[string][]string, map[string]int64) { nodeToTasks := make(map[string][]string, len(j.currentNodes)) taskSlots := make(map[string]int64) - nonDetPods, err := j.getNonDetPods() if err != nil { j.syslog.WithError(err).Warn("getting non determined pods, " + @@ -2134,32 +2167,96 @@ func numSlots(slots model.SlotsSummary) int { func (j *jobsService) listJobsInAllNamespaces( ctx context.Context, opts metaV1.ListOptions, ) ([]batchV1.Job, error) { - var res []batchV1.Job - for n, i := range j.jobInterfaces { - pods, err := i.List(ctx, opts) - if err != nil { - return nil, fmt.Errorf("error listing pods for namespace %s: %w", n, err) - } + allJobs, err := j.jobInterfaces[""].List(ctx, opts) + if k8error.IsForbidden(err) { + return j.listJobsInAllNamespacesFallback(ctx, opts) + } else if err != nil { + logrus.WithError(err).WithField("function", "listJobsInAllNamespaces").Error("error listing jobs in all namespace") + return nil, err + } - res = append(res, pods.Items...) + namespaces := set.FromKeys(j.jobInterfaces) + var jobsWeCareAbout []batchV1.Job + for _, j := range allJobs.Items { + if namespaces.Contains(j.Namespace) { + jobsWeCareAbout = append(jobsWeCareAbout, j) + } } + return jobsWeCareAbout, nil +} +func (j *jobsService) listJobsInAllNamespacesFallback( + ctx context.Context, + opts metaV1.ListOptions, +) ([]batchV1.Job, error) { + var g errgroup.Group + var res []batchV1.Job + var resLock sync.Mutex + for n, i := range j.jobInterfaces { + g.Go(func() error { + pods, err := i.List(ctx, opts) + if err != nil { + return fmt.Errorf("error listing pods for namespace %s: %w", n, err) + } + resLock.Lock() + res = append(res, pods.Items...) + resLock.Unlock() + return nil + }) + } + err := g.Wait() + if err != nil { + return nil, err + } return res, nil } func (j *jobsService) listPodsInAllNamespaces( ctx context.Context, opts metaV1.ListOptions, -) (*k8sV1.PodList, error) { - res := &k8sV1.PodList{} - for n, i := range j.podInterfaces { - pods, err := i.List(ctx, opts) - if err != nil { - return nil, fmt.Errorf("error listing pods for namespace %s: %w", n, err) - } +) ([]k8sV1.Pod, error) { + allPods, err := j.podInterfaces[""].List(ctx, opts) + if k8error.IsForbidden(err) { + return j.listPodsInAllNamespacesFallback(ctx, opts) + } else if err != nil { + return nil, err + } - res.Items = append(res.Items, pods.Items...) + namespaces := set.FromKeys(j.podInterfaces) + var podsWeWant []k8sV1.Pod + for _, pod := range allPods.Items { + if namespaces.Contains(pod.Namespace) { + podsWeWant = append(podsWeWant, pod) + } } + return podsWeWant, nil +} +func (j *jobsService) listPodsInAllNamespacesFallback( + ctx context.Context, + opts metaV1.ListOptions, +) ([]k8sV1.Pod, error) { + var g errgroup.Group + var res []k8sV1.Pod + var resLock sync.Mutex + for n, podInterface := range j.podInterfaces { + if len(n) == 0 { + continue + } + g.Go(func() error { + pods, err := podInterface.List(ctx, opts) + if err != nil { + return fmt.Errorf("error listing pods for namespace %s: %w", n, err) + } + resLock.Lock() + res = append(res, pods.Items...) + resLock.Unlock() + return nil + }) + } + err := g.Wait() + if err != nil { + return nil, err + } return res, nil } diff --git a/master/internal/rm/kubernetesrm/jobs_test.go b/master/internal/rm/kubernetesrm/jobs_test.go index 470e9f85b7c..15d304e1a9d 100644 --- a/master/internal/rm/kubernetesrm/jobs_test.go +++ b/master/internal/rm/kubernetesrm/jobs_test.go @@ -3,6 +3,7 @@ package kubernetesrm import ( + "context" "fmt" "os" "testing" @@ -10,7 +11,9 @@ import ( "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" k8sV1 "k8s.io/api/core/v1" + k8error "k8s.io/apimachinery/pkg/api/errors" metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" typedV1 "k8s.io/client-go/kubernetes/typed/core/v1" _ "k8s.io/client-go/plugin/pkg/client/auth" @@ -60,24 +63,180 @@ func TestGetNonDetPods(t *testing.T) { }, } - ns1 := &mocks.PodInterface{} - ns1.On("List", mock.Anything, mock.Anything).Once(). - Return(&k8sV1.PodList{Items: append(hiddenPods, expectedPods[0])}, nil) + emptyNS := &mocks.PodInterface{} + emptyNS.On("List", mock.Anything, mock.Anything).Once(). + Return(&k8sV1.PodList{Items: append(hiddenPods, expectedPods[0], expectedPods[1])}, nil) + + js := jobsService{ + podInterfaces: map[string]typedV1.PodInterface{ + "ns1": &mocks.PodInterface{}, + "ns2": &mocks.PodInterface{}, + "": emptyNS, + }, + } + + actualPods, err := js.getNonDetPods() + require.NoError(t, err) + require.ElementsMatch(t, expectedPods, actualPods) +} + +func TestListPodsInAllNamespaces(t *testing.T) { + detPods := []k8sV1.Pod{ + { + ObjectMeta: metaV1.ObjectMeta{ + Name: "ns1", + }, + Spec: k8sV1.PodSpec{ + NodeName: "a", + }, + }, + { + ObjectMeta: metaV1.ObjectMeta{ + Name: "ns2", + }, + Spec: k8sV1.PodSpec{ + NodeName: "a", + }, + }, + } + ns1 := &mocks.PodInterface{} ns2 := &mocks.PodInterface{} - ns2.On("List", mock.Anything, mock.Anything).Once(). - Return(&k8sV1.PodList{Items: append(hiddenPods, expectedPods[1])}, nil) + + emptyNS := &mocks.PodInterface{} js := jobsService{ podInterfaces: map[string]typedV1.PodInterface{ "ns1": ns1, "ns2": ns2, + "": emptyNS, }, } - actualPods, err := js.getNonDetPods() + // This pod is not part of js.podInterfaces. + outsidePod := k8sV1.Pod{ + ObjectMeta: metaV1.ObjectMeta{ + Name: "ns3", + }, + Spec: k8sV1.PodSpec{ + NodeName: "b", + }, + } + + var expectedPods []k8sV1.Pod + copy(expectedPods, append(detPods, outsidePod)) + expectedPodList := k8sV1.PodList{Items: expectedPods} + emptyNS.On("List", mock.Anything, mock.Anything).Once(). + Return(&k8sV1.PodList{Items: expectedPods}, nil) + + ctx := context.Background() + opts := metaV1.ListOptions{} + actualPodList, err := js.listPodsInAllNamespaces(ctx, opts) require.NoError(t, err) - require.ElementsMatch(t, expectedPods, actualPods) + require.NotNil(t, actualPodList) + require.ElementsMatch(t, expectedPodList.Items, actualPodList) + + forbiddenErr := k8error.NewForbidden(schema.GroupResource{}, "forbidden", + fmt.Errorf("forbidden")) + + emptyNS.On("List", mock.Anything, mock.Anything).Twice(). + Return(nil, forbiddenErr) + + ns1.On("List", mock.Anything, mock.Anything).Twice(). + Return(&k8sV1.PodList{Items: []k8sV1.Pod{detPods[0]}}, nil) + + ns2.On("List", mock.Anything, mock.Anything).Once(). + Return(&k8sV1.PodList{Items: []k8sV1.Pod{detPods[1]}}, nil) + + actualPodList, err = js.listPodsInAllNamespaces(ctx, opts) + require.NoError(t, err) + require.NotNil(t, actualPodList) + require.ElementsMatch(t, detPods, actualPodList) + + listErr := fmt.Errorf("something bad happened") + ns2.On("List", mock.Anything, mock.Anything).Once(). + Return(nil, listErr) + actualPodList, err = js.listPodsInAllNamespaces(ctx, opts) + require.ErrorIs(t, err, listErr) + require.Nil(t, actualPodList) +} + +func TestHealthStatus(t *testing.T) { + detPods := []k8sV1.Pod{ + { + ObjectMeta: metaV1.ObjectMeta{ + Name: "ns1", + }, + Spec: k8sV1.PodSpec{ + NodeName: "a", + }, + }, + { + ObjectMeta: metaV1.ObjectMeta{ + Name: "ns2", + }, + Spec: k8sV1.PodSpec{ + NodeName: "a", + }, + }, + } + + ns1 := &mocks.PodInterface{} + ns2 := &mocks.PodInterface{} + + emptyNS := &mocks.PodInterface{} + + js := jobsService{ + podInterfaces: map[string]typedV1.PodInterface{ + "ns1": ns1, + "ns2": ns2, + "": emptyNS, + }, + } + + // This pod is not part of js.podInterfaces. + outsidePod := k8sV1.Pod{ + ObjectMeta: metaV1.ObjectMeta{ + Name: "ns3", + }, + Spec: k8sV1.PodSpec{ + NodeName: "b", + }, + } + + var expectedPods []k8sV1.Pod + copy(expectedPods, append(detPods, outsidePod)) + emptyNS.On("List", mock.Anything, mock.Anything).Once(). + Return(&k8sV1.PodList{Items: expectedPods}, nil) + + health := js.HealthStatus(context.TODO()) + require.Equal(t, model.Healthy, health) + + emptyNS.On("List", mock.Anything, mock.Anything).Once(). + Return(nil, fmt.Errorf("couldnt list all pods")) + + health = js.HealthStatus(context.TODO()) + require.Equal(t, model.Unhealthy, health) + + forbiddenErr := k8error.NewForbidden(schema.GroupResource{}, "forbidden", + fmt.Errorf("forbidden")) + + emptyNS.On("List", mock.Anything, mock.Anything).Twice(). + Return(nil, forbiddenErr) + + ns1.On("List", mock.Anything, mock.Anything).Twice(). + Return(&k8sV1.PodList{Items: []k8sV1.Pod{detPods[0]}}, nil) + + ns2.On("List", mock.Anything, mock.Anything).Once(). + Return(&k8sV1.PodList{Items: []k8sV1.Pod{detPods[1]}}, nil) + + health = js.HealthStatus(context.TODO()) + require.Equal(t, model.Healthy, health) + + ns2.On("List", mock.Anything, mock.Anything).Once(). + Return(nil, fmt.Errorf("couldnt list pods in namespace ns2")) + health = js.HealthStatus(context.TODO()) + require.Equal(t, model.Unhealthy, health) } func TestJobScheduledStatus(t *testing.T) { diff --git a/master/internal/rm/kubernetesrm/kubernetes_resource_manager.go b/master/internal/rm/kubernetesrm/kubernetes_resource_manager.go index d22f62313a4..e9b0a40906d 100644 --- a/master/internal/rm/kubernetesrm/kubernetes_resource_manager.go +++ b/master/internal/rm/kubernetesrm/kubernetes_resource_manager.go @@ -178,7 +178,7 @@ func (k *ResourceManager) HealthCheck() []model.ResourceManagerHealth { return []model.ResourceManagerHealth{ { ClusterName: k.config.ClusterName, - Status: k.jobsService.HealthStatus(), + Status: k.jobsService.HealthStatus(context.TODO()), }, } } @@ -278,7 +278,6 @@ func (k *ResourceManager) GetResourcePools() (*apiv1.GetResourcePoolsResponse, e // But best to handle it anyway in case the implementation changes in the future. return nil, err } - jobStats, err := k.getPoolJobStats(pool) if err != nil { return nil, err diff --git a/master/internal/rm/kubernetesrm/kubernetes_resource_manager_intg_test.go b/master/internal/rm/kubernetesrm/kubernetes_resource_manager_intg_test.go index b01fa963028..55824f29a13 100644 --- a/master/internal/rm/kubernetesrm/kubernetes_resource_manager_intg_test.go +++ b/master/internal/rm/kubernetesrm/kubernetes_resource_manager_intg_test.go @@ -1605,6 +1605,10 @@ func createMockJobsService(nodes map[string]*k8sV1.Node, devSlotType device.Type jobsClientSet.On("CoreV1").Return(coreV1Interface) jobsClientSet.On("BatchV1").Return(batchV1Interface) + emptyNS := &mocks.PodInterface{} + emptyNS.On("List", mock.Anything, mock.Anything).Return(&podsList, nil) + + podInterfaces := map[string]typedV1.PodInterface{"": emptyNS} return &jobsService{ namespace: "default", clusterName: "", @@ -1619,5 +1623,6 @@ func createMockJobsService(nodes map[string]*k8sV1.Node, devSlotType device.Type slotResourceRequests: config.PodSlotResourceRequests{CPU: 2}, clientSet: jobsClientSet, namespacesWithInformers: make(map[string]bool), + podInterfaces: podInterfaces, } }