Skip to content

Commit

Permalink
chore: remove redundant rm.ExternalPreemptionPending interface (#10071)
Browse files Browse the repository at this point in the history
The ExternalPreemptionPending method always essentially called allocation.Signal, but through 2/3 pointless other calls/message passes. This just cuts out a few middlemen.

I did it now because it stood in the way of a test @rb-determined-ai was trying to remove.
  • Loading branch information
stoksc authored Oct 17, 2024
1 parent 28bc072 commit 34e4749
Show file tree
Hide file tree
Showing 7 changed files with 7 additions and 59 deletions.
12 changes: 7 additions & 5 deletions master/internal/api_trials.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,7 @@ func (a *apiServer) GetTrialRemainingLogRetentionDays(
}

q := `
SELECT
SELECT
CASE
WHEN MIN(t.end_time) <= ( NOW() - make_interval(days => ?) ) THEN 0
ELSE extract(day from MIN(end_time) + make_interval(days => ?) - NOW())::int
Expand Down Expand Up @@ -1327,12 +1327,14 @@ func (a *apiServer) AllocationPendingPreemptionSignal(
return nil, err
}

if err := a.m.rm.ExternalPreemptionPending(
sproto.PendingPreemption{AllocationID: model.AllocationID(req.AllocationId)},
); err != nil {
err := task.DefaultService.Signal(
model.AllocationID(req.AllocationId),
task.TerminateAllocation,
"preempted by the scheduler",
)
if err != nil {
return nil, err
}

return &apiv1.AllocationPendingPreemptionSignalResponse{}, nil
}

Expand Down
5 changes: 0 additions & 5 deletions master/internal/rm/agentrm/agent_resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,11 +230,6 @@ func (a *ResourceManager) CheckMaxSlotsExceeded(v *sproto.ValidateResourcesReque
return resp.CapacityExceeded, nil
}

// ExternalPreemptionPending implements rm.ResourceManager.
func (*ResourceManager) ExternalPreemptionPending(sproto.PendingPreemption) error {
return rmerrors.ErrNotSupported
}

// GetAgent implements rm.ResourceManager.
func (a *ResourceManager) GetAgent(msg *apiv1.GetAgentRequest) (*apiv1.GetAgentResponse, error) {
agent, ok := a.agentService.get(aproto.ID(msg.AgentId))
Expand Down
20 changes: 0 additions & 20 deletions master/internal/rm/dispatcherrm/dispatcher_resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,26 +243,6 @@ func (m *DispatcherResourceManager) DeleteJob(
return sproto.EmptyDeleteJobResponse(), nil
}

// ExternalPreemptionPending notifies a task of a preemption from the underlying resource manager.
func (m *DispatcherResourceManager) ExternalPreemptionPending(msg sproto.PendingPreemption) error {
m.mu.Lock()
defer m.mu.Unlock()

m.syslog.WithField("allocation-id", msg.AllocationID).
Info("pending preemption of allocation, terminating")
allocReq, ok := m.reqList.TaskByID(msg.AllocationID)
if ok {
rmevents.Publish(allocReq.AllocationID, &sproto.ReleaseResources{
Reason: "preempted by the scheduler",
ForcePreemption: true,
})
} else {
m.syslog.WithField("allocation-id", msg.AllocationID).
Errorf("unable to find allocation actor for allocation")
}
return nil
}

// HealthCheck tries to call launcher and check if it is reachable.
func (m *DispatcherResourceManager) HealthCheck() []model.ResourceManagerHealth {
status := model.Healthy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,6 @@ func (ResourceManager) DeleteJob(sproto.DeleteJob) (sproto.DeleteJobResponse, er
return sproto.EmptyDeleteJobResponse(), nil
}

// ExternalPreemptionPending implements rm.ResourceManager.
func (ResourceManager) ExternalPreemptionPending(sproto.PendingPreemption) error {
return rmerrors.ErrNotSupported
}

// HealthCheck tries to call the KubeAPI.
func (k *ResourceManager) HealthCheck() []model.ResourceManagerHealth {
return []model.ResourceManagerHealth{
Expand Down
7 changes: 0 additions & 7 deletions master/internal/rm/multirm/multirm.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,6 @@ func (m *MultiRMRouter) SetGroupPriority(req sproto.SetGroupPriority) error {
return m.rms[resolvedRMName].SetGroupPriority(req)
}

// ExternalPreemptionPending routes an ExternalPreemptionPending request to the specified resource manager.
func (m *MultiRMRouter) ExternalPreemptionPending(sproto.PendingPreemption) error {
// MultiRM is currently only implemented for Kubernetes, which doesn't support this.
m.syslog.WithError(fmt.Errorf("ExternalPreemptionPending is not implemented for agent, kubernetes, or multi-rm"))
return rmerrors.ErrNotSupported
}

// IsReattachableOnlyAfterStarted routes a IsReattachableOnlyAfterStarted call to a specified resource manager/pool.
func (m *MultiRMRouter) IsReattachableOnlyAfterStarted() bool {
resolvedRMName, err := m.getRMName("")
Expand Down
16 changes: 0 additions & 16 deletions master/internal/rm/multirm/multirm_intg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,22 +209,6 @@ func TestSetGroupPriority(t *testing.T) {
}
}

func TestExternalPreemptionPending(t *testing.T) {
cases := []struct {
name string
req sproto.PendingPreemption
err error
}{
{"MultiRM doesn't implement ExternalPreemptionPending", sproto.PendingPreemption{}, rmerrors.ErrNotSupported},
}
for _, tt := range cases {
t.Run(tt.name, func(t *testing.T) {
err := testMultiRM.ExternalPreemptionPending(tt.req)
require.Equal(t, tt.err, err)
})
}
}

func TestIsReattachable(t *testing.T) {
val := testMultiRM.IsReattachableOnlyAfterStarted()
require.True(t, val)
Expand Down
1 change: 0 additions & 1 deletion master/internal/rm/resource_manager_iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ type ResourceManager interface {
SetGroupMaxSlots(sproto.SetGroupMaxSlots)
SetGroupWeight(sproto.SetGroupWeight) error
SetGroupPriority(sproto.SetGroupPriority) error
ExternalPreemptionPending(sproto.PendingPreemption) error
IsReattachableOnlyAfterStarted() bool
SmallerValueIsHigherPriority() (bool, error)

Expand Down

0 comments on commit 34e4749

Please sign in to comment.