From 34e47490f75367bdf10f57a682e3df857c2a4b16 Mon Sep 17 00:00:00 2001 From: Bradley Laney Date: Thu, 17 Oct 2024 19:26:02 -0400 Subject: [PATCH] chore: remove redundant rm.ExternalPreemptionPending interface (#10071) The ExternalPreemptionPending method always essentially called allocation.Signal, but through 2/3 pointless other calls/message passes. This just cuts out a few middlemen. I did it now because it stood in the way of a test @rb-determined-ai was trying to remove. --- master/internal/api_trials.go | 12 ++++++----- .../rm/agentrm/agent_resource_manager.go | 5 ----- .../dispatcher_resource_manager.go | 20 ------------------- .../kubernetes_resource_manager.go | 5 ----- master/internal/rm/multirm/multirm.go | 7 ------- .../internal/rm/multirm/multirm_intg_test.go | 16 --------------- master/internal/rm/resource_manager_iface.go | 1 - 7 files changed, 7 insertions(+), 59 deletions(-) diff --git a/master/internal/api_trials.go b/master/internal/api_trials.go index 71c680e03c5..3be8b83b295 100644 --- a/master/internal/api_trials.go +++ b/master/internal/api_trials.go @@ -684,7 +684,7 @@ func (a *apiServer) GetTrialRemainingLogRetentionDays( } q := ` -SELECT +SELECT CASE WHEN MIN(t.end_time) <= ( NOW() - make_interval(days => ?) ) THEN 0 ELSE extract(day from MIN(end_time) + make_interval(days => ?) - NOW())::int @@ -1327,12 +1327,14 @@ func (a *apiServer) AllocationPendingPreemptionSignal( return nil, err } - if err := a.m.rm.ExternalPreemptionPending( - sproto.PendingPreemption{AllocationID: model.AllocationID(req.AllocationId)}, - ); err != nil { + err := task.DefaultService.Signal( + model.AllocationID(req.AllocationId), + task.TerminateAllocation, + "preempted by the scheduler", + ) + if err != nil { return nil, err } - return &apiv1.AllocationPendingPreemptionSignalResponse{}, nil } diff --git a/master/internal/rm/agentrm/agent_resource_manager.go b/master/internal/rm/agentrm/agent_resource_manager.go index 2482e320297..268f64ce352 100644 --- a/master/internal/rm/agentrm/agent_resource_manager.go +++ b/master/internal/rm/agentrm/agent_resource_manager.go @@ -230,11 +230,6 @@ func (a *ResourceManager) CheckMaxSlotsExceeded(v *sproto.ValidateResourcesReque return resp.CapacityExceeded, nil } -// ExternalPreemptionPending implements rm.ResourceManager. -func (*ResourceManager) ExternalPreemptionPending(sproto.PendingPreemption) error { - return rmerrors.ErrNotSupported -} - // GetAgent implements rm.ResourceManager. func (a *ResourceManager) GetAgent(msg *apiv1.GetAgentRequest) (*apiv1.GetAgentResponse, error) { agent, ok := a.agentService.get(aproto.ID(msg.AgentId)) diff --git a/master/internal/rm/dispatcherrm/dispatcher_resource_manager.go b/master/internal/rm/dispatcherrm/dispatcher_resource_manager.go index 0777552369f..5214154cdd1 100644 --- a/master/internal/rm/dispatcherrm/dispatcher_resource_manager.go +++ b/master/internal/rm/dispatcherrm/dispatcher_resource_manager.go @@ -243,26 +243,6 @@ func (m *DispatcherResourceManager) DeleteJob( return sproto.EmptyDeleteJobResponse(), nil } -// ExternalPreemptionPending notifies a task of a preemption from the underlying resource manager. -func (m *DispatcherResourceManager) ExternalPreemptionPending(msg sproto.PendingPreemption) error { - m.mu.Lock() - defer m.mu.Unlock() - - m.syslog.WithField("allocation-id", msg.AllocationID). - Info("pending preemption of allocation, terminating") - allocReq, ok := m.reqList.TaskByID(msg.AllocationID) - if ok { - rmevents.Publish(allocReq.AllocationID, &sproto.ReleaseResources{ - Reason: "preempted by the scheduler", - ForcePreemption: true, - }) - } else { - m.syslog.WithField("allocation-id", msg.AllocationID). - Errorf("unable to find allocation actor for allocation") - } - return nil -} - // HealthCheck tries to call launcher and check if it is reachable. func (m *DispatcherResourceManager) HealthCheck() []model.ResourceManagerHealth { status := model.Healthy diff --git a/master/internal/rm/kubernetesrm/kubernetes_resource_manager.go b/master/internal/rm/kubernetesrm/kubernetes_resource_manager.go index 8eadb1291b2..e558239304f 100644 --- a/master/internal/rm/kubernetesrm/kubernetes_resource_manager.go +++ b/master/internal/rm/kubernetesrm/kubernetes_resource_manager.go @@ -173,11 +173,6 @@ func (ResourceManager) DeleteJob(sproto.DeleteJob) (sproto.DeleteJobResponse, er return sproto.EmptyDeleteJobResponse(), nil } -// ExternalPreemptionPending implements rm.ResourceManager. -func (ResourceManager) ExternalPreemptionPending(sproto.PendingPreemption) error { - return rmerrors.ErrNotSupported -} - // HealthCheck tries to call the KubeAPI. func (k *ResourceManager) HealthCheck() []model.ResourceManagerHealth { return []model.ResourceManagerHealth{ diff --git a/master/internal/rm/multirm/multirm.go b/master/internal/rm/multirm/multirm.go index ae20726064b..9252d27752c 100644 --- a/master/internal/rm/multirm/multirm.go +++ b/master/internal/rm/multirm/multirm.go @@ -131,13 +131,6 @@ func (m *MultiRMRouter) SetGroupPriority(req sproto.SetGroupPriority) error { return m.rms[resolvedRMName].SetGroupPriority(req) } -// ExternalPreemptionPending routes an ExternalPreemptionPending request to the specified resource manager. -func (m *MultiRMRouter) ExternalPreemptionPending(sproto.PendingPreemption) error { - // MultiRM is currently only implemented for Kubernetes, which doesn't support this. - m.syslog.WithError(fmt.Errorf("ExternalPreemptionPending is not implemented for agent, kubernetes, or multi-rm")) - return rmerrors.ErrNotSupported -} - // IsReattachableOnlyAfterStarted routes a IsReattachableOnlyAfterStarted call to a specified resource manager/pool. func (m *MultiRMRouter) IsReattachableOnlyAfterStarted() bool { resolvedRMName, err := m.getRMName("") diff --git a/master/internal/rm/multirm/multirm_intg_test.go b/master/internal/rm/multirm/multirm_intg_test.go index c313402eae1..670e807834e 100644 --- a/master/internal/rm/multirm/multirm_intg_test.go +++ b/master/internal/rm/multirm/multirm_intg_test.go @@ -209,22 +209,6 @@ func TestSetGroupPriority(t *testing.T) { } } -func TestExternalPreemptionPending(t *testing.T) { - cases := []struct { - name string - req sproto.PendingPreemption - err error - }{ - {"MultiRM doesn't implement ExternalPreemptionPending", sproto.PendingPreemption{}, rmerrors.ErrNotSupported}, - } - for _, tt := range cases { - t.Run(tt.name, func(t *testing.T) { - err := testMultiRM.ExternalPreemptionPending(tt.req) - require.Equal(t, tt.err, err) - }) - } -} - func TestIsReattachable(t *testing.T) { val := testMultiRM.IsReattachableOnlyAfterStarted() require.True(t, val) diff --git a/master/internal/rm/resource_manager_iface.go b/master/internal/rm/resource_manager_iface.go index 6b487e2c5bc..8aa2760500e 100644 --- a/master/internal/rm/resource_manager_iface.go +++ b/master/internal/rm/resource_manager_iface.go @@ -24,7 +24,6 @@ type ResourceManager interface { SetGroupMaxSlots(sproto.SetGroupMaxSlots) SetGroupWeight(sproto.SetGroupWeight) error SetGroupPriority(sproto.SetGroupPriority) error - ExternalPreemptionPending(sproto.PendingPreemption) error IsReattachableOnlyAfterStarted() bool SmallerValueIsHigherPriority() (bool, error)