Skip to content

Commit

Permalink
fix: perf issue with too many API reqs when listing pods in all ns
Browse files Browse the repository at this point in the history
  • Loading branch information
amandavialva01 committed Nov 6, 2024
1 parent b189d2a commit 0316375
Show file tree
Hide file tree
Showing 2 changed files with 243 additions and 23 deletions.
95 changes: 79 additions & 16 deletions master/internal/rm/kubernetesrm/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/exp/maps"
"golang.org/x/sync/errgroup"
batchV1 "k8s.io/api/batch/v1"
k8sV1 "k8s.io/api/core/v1"
k8error "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -71,6 +72,7 @@ const (
ReleaseNamespaceEnvVar = "DET_RELEASE_NAMESPACE"
// ResourceTypeNvidia describes the GPU resource type.
ResourceTypeNvidia = "nvidia.com/gpu"
DefaultClientBurst = 10
)

var cacheSyncs []cache.InformerSynced
Expand Down Expand Up @@ -348,6 +350,7 @@ func (j *jobsService) startClientSet(namespaces []string) error {
return fmt.Errorf("failed to initialize kubernetes clientSet: %w", err)
}

j.podInterfaces[""] = j.clientSet.CoreV1().Pods("")
for _, ns := range namespaces {
j.podInterfaces[ns] = j.clientSet.CoreV1().Pods(ns)
j.configMapInterfaces[ns] = j.clientSet.CoreV1().ConfigMaps(ns)
Expand Down Expand Up @@ -1082,20 +1085,44 @@ func (j *jobsService) GetSlot(msg *apiv1.GetSlotRequest) *apiv1.GetSlotResponse
return j.getSlot(msg.AgentId, msg.SlotId)
}

func (j *jobsService) healthStatusFallback(ctx context.Context) model.HealthStatus {
g := errgroup.Group{}
for n, podInterface := range j.podInterfaces {
if len(n) == 0 {
continue
}
g.Go(func() error {
_, err := podInterface.List(ctx, metaV1.ListOptions{Limit: 1})
if err != nil {
return err
}
return nil
})
}
err := g.Wait()
if err != nil {
return model.Unhealthy
}
return model.Healthy
}

func (j *jobsService) HealthStatus() model.HealthStatus {
j.mu.Lock()
defer j.mu.Unlock()
for _, podInterface := range j.podInterfaces {
_, err := podInterface.List(context.TODO(), metaV1.ListOptions{Limit: 1})
if err != nil {
j.syslog.WithError(err).Error("kubernetes resource manager marked as unhealthy")
return model.Unhealthy
}
return model.Healthy
ctx := context.TODO()
if len(j.podInterfaces) == 0 {
logrus.Error("expected podInterface to be non empty")
return model.Unhealthy
}

logrus.Error("expected jobInterface to be non empty")
return model.Unhealthy
_, err := j.podInterfaces[""].List(ctx, metaV1.ListOptions{Limit: 1})
if err != nil {
if k8error.IsForbidden(err) {
return j.healthStatusFallback(ctx)
}
return model.Unhealthy
}
return model.Healthy
}

func (j *jobsService) startNodeInformer() error {
Expand Down Expand Up @@ -2114,20 +2141,56 @@ func (j *jobsService) listJobsInAllNamespaces(
return res, nil
}

func (j *jobsService) listImportantPods(ctx context.Context, opts metaV1.ListOptions) (*k8sV1.PodList, error) {
resLock := sync.Mutex{}
res := &k8sV1.PodList{}
g := errgroup.Group{}
cnt := 0
for n, podInterface := range j.podInterfaces {
if len(n) == 0 {
continue
}
g.Go(func() error {
time.Sleep(time.Duration(cnt/DefaultClientBurst) * 1050 * time.Millisecond)
pods, err := podInterface.List(ctx, opts)
if err != nil {
return fmt.Errorf("error listing pods for namespace %s: %w", n, err)
}
resLock.Lock()
res.Items = append(res.Items, pods.Items...)
resLock.Unlock()
return nil
})
cnt += 1
}
err := g.Wait()
if err != nil {
return nil, err
}
return res, nil
}

func (j *jobsService) listPodsInAllNamespaces(
ctx context.Context, opts metaV1.ListOptions,
) (*k8sV1.PodList, error) {
res := &k8sV1.PodList{}
for n, i := range j.podInterfaces {
pods, err := i.List(ctx, opts)
if err != nil {
return nil, fmt.Errorf("error listing pods for namespace %s: %w", n, err)
allPods, err := j.podInterfaces[""].List(ctx, opts)
if err != nil {
if k8error.IsForbidden(err) {
return j.listImportantPods(ctx, opts)
}
return nil, err
}

res.Items = append(res.Items, pods.Items...)
var podsWeWant k8sV1.PodList
namespaces := set.FromKeys(j.podInterfaces)
for _, pod := range allPods.Items {
if namespaces.Contains(pod.Namespace) {
podsWeWant.Items = append(podsWeWant.Items, pod)
}
}

return res, nil
allPods.Items, podsWeWant.Items = podsWeWant.Items, allPods.Items
return allPods, nil
}

func (j *jobsService) listConfigMapsInAllNamespaces(
Expand Down
171 changes: 164 additions & 7 deletions master/internal/rm/kubernetesrm/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,17 @@
package kubernetesrm

import (
"context"
"fmt"
"os"
"testing"

"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
k8sV1 "k8s.io/api/core/v1"
k8error "k8s.io/apimachinery/pkg/api/errors"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
typedV1 "k8s.io/client-go/kubernetes/typed/core/v1"
_ "k8s.io/client-go/plugin/pkg/client/auth"

Expand Down Expand Up @@ -60,24 +63,178 @@ func TestGetNonDetPods(t *testing.T) {
},
}

ns1 := &mocks.PodInterface{}
ns1.On("List", mock.Anything, mock.Anything).Once().
Return(&k8sV1.PodList{Items: append(hiddenPods, expectedPods[0])}, nil)
emptyNS := &mocks.PodInterface{}
emptyNS.On("List", mock.Anything, mock.Anything).Once().
Return(&k8sV1.PodList{Items: append(hiddenPods, expectedPods[0], expectedPods[1])}, nil)

js := jobsService{
podInterfaces: map[string]typedV1.PodInterface{
"ns1": &mocks.PodInterface{},
"ns2": &mocks.PodInterface{},
"": emptyNS,
},
}

actualPods, err := js.getNonDetPods()
require.NoError(t, err)
require.ElementsMatch(t, expectedPods, actualPods)
}

func TestListPodsInAllNamespaces(t *testing.T) {
detPods := []k8sV1.Pod{
{
ObjectMeta: metaV1.ObjectMeta{
Name: "ns1",
},
Spec: k8sV1.PodSpec{
NodeName: "a",
},
},
{
ObjectMeta: metaV1.ObjectMeta{
Name: "ns2",
},
Spec: k8sV1.PodSpec{
NodeName: "a",
},
},
}

ns1 := &mocks.PodInterface{}
ns2 := &mocks.PodInterface{}
ns2.On("List", mock.Anything, mock.Anything).Once().
Return(&k8sV1.PodList{Items: append(hiddenPods, expectedPods[1])}, nil)

emptyNS := &mocks.PodInterface{}

js := jobsService{
podInterfaces: map[string]typedV1.PodInterface{
"ns1": ns1,
"ns2": ns2,
"": emptyNS,
},
}

actualPods, err := js.getNonDetPods()
// This pod is not part of js.podInterfaces.
outsidePod := k8sV1.Pod{
ObjectMeta: metaV1.ObjectMeta{
Name: "ns3",
},
Spec: k8sV1.PodSpec{
NodeName: "b",
},
}

expectedPods := append(detPods, outsidePod)
expectedPodList := k8sV1.PodList{Items: expectedPods}
emptyNS.On("List", mock.Anything, mock.Anything).Once().
Return(&k8sV1.PodList{Items: expectedPods}, nil)

ctx := context.Background()
opts := metaV1.ListOptions{}
actualPodList, err := js.listPodsInAllNamespaces(ctx, opts)
require.NoError(t, err)
require.ElementsMatch(t, expectedPods, actualPods)
require.NotNil(t, actualPodList)
require.ElementsMatch(t, expectedPodList.Items, actualPodList.Items)

forbiddenErr := k8error.NewForbidden(schema.GroupResource{}, "forbidden",
fmt.Errorf("forbidden"))

emptyNS.On("List", mock.Anything, mock.Anything).Twice().
Return(nil, forbiddenErr)

ns1.On("List", mock.Anything, mock.Anything).Twice().
Return(&k8sV1.PodList{Items: []k8sV1.Pod{detPods[0]}}, nil)

ns2.On("List", mock.Anything, mock.Anything).Once().
Return(&k8sV1.PodList{Items: []k8sV1.Pod{detPods[1]}}, nil)

actualPodList, err = js.listPodsInAllNamespaces(ctx, opts)
require.NoError(t, err)
require.NotNil(t, actualPodList)
require.ElementsMatch(t, detPods, actualPodList.Items)

listErr := fmt.Errorf("something bad happened")
ns2.On("List", mock.Anything, mock.Anything).Once().
Return(nil, listErr)
actualPodList, err = js.listPodsInAllNamespaces(ctx, opts)
require.ErrorIs(t, err, listErr)
require.Nil(t, actualPodList)
}

func TestHealthStatus(t *testing.T) {
detPods := []k8sV1.Pod{
{
ObjectMeta: metaV1.ObjectMeta{
Name: "ns1",
},
Spec: k8sV1.PodSpec{
NodeName: "a",
},
},
{
ObjectMeta: metaV1.ObjectMeta{
Name: "ns2",
},
Spec: k8sV1.PodSpec{
NodeName: "a",
},
},
}

ns1 := &mocks.PodInterface{}
ns2 := &mocks.PodInterface{}

emptyNS := &mocks.PodInterface{}

js := jobsService{
podInterfaces: map[string]typedV1.PodInterface{
"ns1": ns1,
"ns2": ns2,
"": emptyNS,
},
}

// This pod is not part of js.podInterfaces.
outsidePod := k8sV1.Pod{
ObjectMeta: metaV1.ObjectMeta{
Name: "ns3",
},
Spec: k8sV1.PodSpec{
NodeName: "b",
},
}

expectedPods := append(detPods, outsidePod)
emptyNS.On("List", mock.Anything, mock.Anything).Once().
Return(&k8sV1.PodList{Items: expectedPods}, nil)

health := js.HealthStatus()
require.Equal(t, model.Healthy, health)

emptyNS.On("List", mock.Anything, mock.Anything).Once().
Return(nil, fmt.Errorf("couldnt list all pods"))

health = js.HealthStatus()
require.Equal(t, model.Unhealthy, health)

forbiddenErr := k8error.NewForbidden(schema.GroupResource{}, "forbidden",
fmt.Errorf("forbidden"))

emptyNS.On("List", mock.Anything, mock.Anything).Twice().
Return(nil, forbiddenErr)

ns1.On("List", mock.Anything, mock.Anything).Twice().
Return(&k8sV1.PodList{Items: []k8sV1.Pod{detPods[0]}}, nil)

ns2.On("List", mock.Anything, mock.Anything).Once().
Return(&k8sV1.PodList{Items: []k8sV1.Pod{detPods[1]}}, nil)

health = js.HealthStatus()
require.Equal(t, model.Healthy, health)

ns2.On("List", mock.Anything, mock.Anything).Once().
Return(nil, fmt.Errorf("couldnt list pods in namespace ns2"))
health = js.HealthStatus()
require.Equal(t, model.Unhealthy, health)
}

func TestJobScheduledStatus(t *testing.T) {
Expand Down

0 comments on commit 0316375

Please sign in to comment.