Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: perf issue with too many API reqs when listing pods in all ns #10200

Merged
merged 2 commits into from
Nov 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 136 additions & 39 deletions master/internal/rm/kubernetesrm/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"golang.org/x/exp/maps"
"golang.org/x/sync/errgroup"
batchV1 "k8s.io/api/batch/v1"
k8sV1 "k8s.io/api/core/v1"
k8error "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -37,6 +38,9 @@ import (
gateway "sigs.k8s.io/gateway-api/pkg/client/clientset/versioned/typed/apis/v1"
alphaGateway "sigs.k8s.io/gateway-api/pkg/client/clientset/versioned/typed/apis/v1alpha2"

// Used to load all auth plugins.
_ "k8s.io/client-go/plugin/pkg/client/auth"

"github.com/determined-ai/determined/master/internal/config"
"github.com/determined-ai/determined/master/internal/db"
"github.com/determined-ai/determined/master/internal/rm"
Expand All @@ -51,9 +55,6 @@ import (
"github.com/determined-ai/determined/master/pkg/syncx/waitgroupx"
"github.com/determined-ai/determined/master/pkg/tasks"
"github.com/determined-ai/determined/proto/pkg/apiv1"

// Used to load all auth plugins.
_ "k8s.io/client-go/plugin/pkg/client/auth"
)

const (
Expand Down Expand Up @@ -113,13 +114,15 @@ type jobsService struct {
internalTaskGWConfig *config.InternalTaskGatewayConfig

// System dependencies. Also set in initialization and never modified after.
syslog *logrus.Entry
clientSet k8sClient.Interface
syslog *logrus.Entry
clientSet k8sClient.Interface
// TODO(!!!): Not set in initialization and never changed anymore.. RIP.
podInterfaces map[string]typedV1.PodInterface
configMapInterfaces map[string]typedV1.ConfigMapInterface
jobInterfaces map[string]typedBatchV1.JobInterface
serviceInterfaces map[string]typedV1.ServiceInterface
tcpRouteInterfaces map[string]alphaGateway.TCPRouteInterface
// TODO(!!!): end.

resourceRequestQueue *requestQueue
requestQueueWorkers []*requestProcessingWorker
Expand Down Expand Up @@ -253,6 +256,7 @@ func newJobsService(
}

func (j *jobsService) syncNamespaces(ns []string, hasJSLock bool) error {
// TODO(!!!): Prob one informer per cluster too.
for _, namespace := range ns {
// Since we don't want to do duplicate namespace informers, don't start any
// listeners or informers that have already been added to namespacesWithInformers.
Expand Down Expand Up @@ -348,6 +352,8 @@ func (j *jobsService) startClientSet(namespaces []string) error {
return fmt.Errorf("failed to initialize kubernetes clientSet: %w", err)
}

j.jobInterfaces[""] = j.clientSet.BatchV1().Jobs("")
j.podInterfaces[""] = j.clientSet.CoreV1().Pods("")
for _, ns := range namespaces {
j.podInterfaces[ns] = j.clientSet.CoreV1().Pods(ns)
j.configMapInterfaces[ns] = j.clientSet.CoreV1().ConfigMaps(ns)
Expand Down Expand Up @@ -394,7 +400,17 @@ func readClientConfig(kubeconfigPath string) (*rest.Config, error) {
// and it expects to find files:
// - /var/run/secrets/kubernetes.io/serviceaccount/token
// - /var/run/secrets/kubernetes.io/serviceaccount/ca.crt
return rest.InClusterConfig()
c, err := rest.InClusterConfig()
if err != nil {
return nil, err
}
if c.QPS == 0.0 {
c.QPS = 20
}
if c.Burst == 0 {
c.Burst = 100
}
return c, nil
}

if parts := strings.Split(kubeconfigPath, string(os.PathSeparator)); parts[0] == "~" {
Expand Down Expand Up @@ -1037,7 +1053,7 @@ func (j *jobsService) refreshPodStates(allocationID model.AllocationID) error {
return fmt.Errorf("failed to get namespaces for resource manager: %w", err)
}

for _, pod := range pods.Items {
for _, pod := range pods {
if !slices.Contains(ns, pod.Namespace) {
continue
}
Expand Down Expand Up @@ -1082,20 +1098,40 @@ func (j *jobsService) GetSlot(msg *apiv1.GetSlotRequest) *apiv1.GetSlotResponse
return j.getSlot(msg.AgentId, msg.SlotId)
}

func (j *jobsService) HealthStatus() model.HealthStatus {
j.mu.Lock()
defer j.mu.Unlock()
for _, podInterface := range j.podInterfaces {
_, err := podInterface.List(context.TODO(), metaV1.ListOptions{Limit: 1})
if err != nil {
j.syslog.WithError(err).Error("kubernetes resource manager marked as unhealthy")
return model.Unhealthy
}
return model.Healthy
func (j *jobsService) HealthStatus(ctx context.Context) model.HealthStatus {
if len(j.podInterfaces) == 0 {
logrus.Error("expected podInterface to be non empty")
return model.Unhealthy
}

logrus.Error("expected jobInterface to be non empty")
return model.Unhealthy
_, err := j.podInterfaces[""].List(ctx, metaV1.ListOptions{Limit: 1})
if k8error.IsForbidden(err) {
return j.healthStatusFallback(ctx)
} else if err != nil {
return model.Unhealthy
}
return model.Healthy
}

func (j *jobsService) healthStatusFallback(ctx context.Context) model.HealthStatus {
var g errgroup.Group
for n, podInterface := range j.podInterfaces {
if len(n) == 0 { // TODO: We store a non-namespaced client with key "".
continue
}
g.Go(func() error {
_, err := podInterface.List(ctx, metaV1.ListOptions{Limit: 1})
if err != nil {
return err
}
return nil
})
}
err := g.Wait()
if err != nil {
return model.Unhealthy
}
return model.Healthy
}

func (j *jobsService) startNodeInformer() error {
Expand Down Expand Up @@ -1479,7 +1515,7 @@ func (j *jobsService) releaseAllocationsOnDisabledNode(nodeName string) error {
}

notifiedAllocations := make(map[model.AllocationID]bool)
for _, pod := range pods.Items {
for _, pod := range pods {
jobName, ok := pod.Labels[kubernetesJobNameLabel]
if !ok {
j.syslog.Debugf("found pod when disabling node without %s label", kubernetesJobNameLabel)
Expand Down Expand Up @@ -1572,7 +1608,6 @@ type computeUsageSummary struct {
slotsAvailable int
}

// TODO(!!!): good func comment.
func (j *jobsService) summarizeComputeUsage(poolName string) (*computeUsageSummary, error) {
summary, err := j.summarize()
if err != nil {
Expand Down Expand Up @@ -1901,7 +1936,6 @@ func (j *jobsService) summarizeClusterByNodes() map[string]model.AgentSummary {
}
podByNode[podInfo.nodeName] = append(podByNode[podInfo.nodeName], podInfo)
}

nodeToTasks, taskSlots := j.getNonDetSlots(j.slotType)
summary := make(map[string]model.AgentSummary, len(j.currentNodes))
for _, node := range j.currentNodes {
Expand Down Expand Up @@ -2015,7 +2049,7 @@ func (j *jobsService) getNonDetPods() ([]k8sV1.Pod, error) {
}

var nonDetPods []k8sV1.Pod
for _, p := range allPods.Items {
for _, p := range allPods {
_, isDet := p.Labels[determinedLabel]
_, isDetSystem := p.Labels[determinedSystemLabel]

Expand All @@ -2031,7 +2065,6 @@ func (j *jobsService) getNonDetPods() ([]k8sV1.Pod, error) {
func (j *jobsService) getNonDetSlots(deviceType device.Type) (map[string][]string, map[string]int64) {
nodeToTasks := make(map[string][]string, len(j.currentNodes))
taskSlots := make(map[string]int64)

nonDetPods, err := j.getNonDetPods()
if err != nil {
j.syslog.WithError(err).Warn("getting non determined pods, " +
Expand Down Expand Up @@ -2101,32 +2134,96 @@ func numSlots(slots model.SlotsSummary) int {
func (j *jobsService) listJobsInAllNamespaces(
ctx context.Context, opts metaV1.ListOptions,
) ([]batchV1.Job, error) {
var res []batchV1.Job
for n, i := range j.jobInterfaces {
pods, err := i.List(ctx, opts)
if err != nil {
return nil, fmt.Errorf("error listing pods for namespace %s: %w", n, err)
}
allJobs, err := j.jobInterfaces[""].List(ctx, opts)
if k8error.IsForbidden(err) {
return j.listJobsInAllNamespacesFallback(ctx, opts)
} else if err != nil {
logrus.WithError(err).WithField("function", "listJobsInAllNamespaces").Error("error listing jobs in all namespace")
return nil, err
}

res = append(res, pods.Items...)
namespaces := set.FromKeys(j.jobInterfaces)
var jobsWeCareAbout []batchV1.Job
for _, j := range allJobs.Items {
if namespaces.Contains(j.Namespace) {
jobsWeCareAbout = append(jobsWeCareAbout, j)
}
}
return jobsWeCareAbout, nil
}

func (j *jobsService) listJobsInAllNamespacesFallback(
ctx context.Context,
opts metaV1.ListOptions,
) ([]batchV1.Job, error) {
var g errgroup.Group
var res []batchV1.Job
var resLock sync.Mutex
for n, i := range j.jobInterfaces {
g.Go(func() error {
pods, err := i.List(ctx, opts)
if err != nil {
return fmt.Errorf("error listing pods for namespace %s: %w", n, err)
}
resLock.Lock()
res = append(res, pods.Items...)
resLock.Unlock()
return nil
})
}
err := g.Wait()
if err != nil {
return nil, err
}
return res, nil
}

func (j *jobsService) listPodsInAllNamespaces(
ctx context.Context, opts metaV1.ListOptions,
) (*k8sV1.PodList, error) {
res := &k8sV1.PodList{}
for n, i := range j.podInterfaces {
pods, err := i.List(ctx, opts)
if err != nil {
return nil, fmt.Errorf("error listing pods for namespace %s: %w", n, err)
}
) ([]k8sV1.Pod, error) {
allPods, err := j.podInterfaces[""].List(ctx, opts)
if k8error.IsForbidden(err) {
return j.listPodsInAllNamespacesFallback(ctx, opts)
} else if err != nil {
return nil, err
}

res.Items = append(res.Items, pods.Items...)
namespaces := set.FromKeys(j.podInterfaces)
var podsWeWant []k8sV1.Pod
for _, pod := range allPods.Items {
if namespaces.Contains(pod.Namespace) {
podsWeWant = append(podsWeWant, pod)
}
}
return podsWeWant, nil
}

func (j *jobsService) listPodsInAllNamespacesFallback(
ctx context.Context,
opts metaV1.ListOptions,
) ([]k8sV1.Pod, error) {
var g errgroup.Group
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a limit to the number of go funcs started at the same time? Or is it possible to wait on hundreds of go funcs?

Copy link
Contributor Author

@stoksc stoksc Nov 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great question, no. i'd even added a g.SetLimit for this but the client they all use it already throttled so i decided i didn't care very much to limit it. this is already in fallback only code that is very unlikely to run.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense!

var res []k8sV1.Pod
var resLock sync.Mutex
for n, podInterface := range j.podInterfaces {
if len(n) == 0 {
continue
}
g.Go(func() error {
pods, err := podInterface.List(ctx, opts)
if err != nil {
return fmt.Errorf("error listing pods for namespace %s: %w", n, err)
}
resLock.Lock()
res = append(res, pods.Items...)
resLock.Unlock()
return nil
})
}
err := g.Wait()
if err != nil {
return nil, err
}
return res, nil
}

Expand Down
Loading
Loading