Skip to content

Commit

Permalink
agent/schedwatch: Replace trackcurrent with global value (#675)
Browse files Browse the repository at this point in the history
ref #665 (comment)

In short: We've hit *many* issues with the existing `trackcurrent`
setup, so now that we've merged some changes to improve the semantics of
agent->scheduler requests, we can completely replace this complicated
state machine with a global value.

This commit also removes the scheduler lifecycle tracking from
pkg/agent/{execbridge,executor,core} - which is a big simplification as
well.
  • Loading branch information
sharnoff committed Dec 8, 2023
1 parent 201ed8a commit c392180
Show file tree
Hide file tree
Showing 12 changed files with 171 additions and 935 deletions.
2 changes: 0 additions & 2 deletions pkg/agent/core/dumpstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ func (s *State) Dump() StateDump {
}

type pluginStateDump struct {
Alive bool `json:"alive"`
OngoingRequest bool `json:"ongoingRequest"`
ComputeUnit *api.Resources `json:"computeUnit"`
LastRequest *pluginRequestedDump `json:"lastRequest"`
Expand All @@ -62,7 +61,6 @@ func (s *pluginState) dump() pluginStateDump {
}

return pluginStateDump{
Alive: s.alive,
OngoingRequest: s.ongoingRequest,
ComputeUnit: shallowCopy(s.computeUnit),
LastRequest: lastRequest,
Expand Down
37 changes: 2 additions & 35 deletions pkg/agent/core/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ type State struct {
}

type pluginState struct {
alive bool
// ongoingRequest is true iff there is currently an ongoing request to *this* scheduler plugin.
ongoingRequest bool
// computeUnit, if not nil, gives the value of the compute unit we most recently got from a
Expand All @@ -127,8 +126,7 @@ type pluginState struct {
// lastFailureAt, if not nil, gives the time of the most recent request failure
lastFailureAt *time.Time
// permit, if not nil, stores the Permit in the most recent PluginResponse. This field will be
// nil if we have not been able to contact *any* scheduler. If we switch schedulers, we trust
// the old one.
// nil if we have not been able to contact *any* scheduler.
permit *api.Resources
}

Expand Down Expand Up @@ -205,7 +203,6 @@ func NewState(vm api.VmInfo, config Config) *State {
debug: false,
vm: vm,
plugin: pluginState{
alive: false,
ongoingRequest: false,
computeUnit: nil,
lastRequest: nil,
Expand Down Expand Up @@ -353,7 +350,7 @@ func (s *State) calculatePluginAction(
timeUntilNextRequestTick = s.config.PluginRequestTick - now.Sub(s.plugin.lastRequest.at)
}

timeForRequest := timeUntilNextRequestTick <= 0 && s.plugin.alive
timeForRequest := timeUntilNextRequestTick <= 0

var timeUntilRetryBackoffExpires time.Duration
requestPreviouslyDenied := !s.plugin.ongoingRequest &&
Expand All @@ -377,14 +374,6 @@ func (s *State) calculatePluginAction(
permittedRequestResources = currentResources
}

// Can't make a request if the plugin isn't active/alive
if !s.plugin.alive {
if timeForRequest || shouldRequestNewResources {
logFailureReason("there isn't one active right now")
}
return nil, nil
}

// Can't make a duplicate request
if s.plugin.ongoingRequest {
// ... but if the desired request is different from what we would be making,
Expand Down Expand Up @@ -947,28 +936,6 @@ func (s *State) Plugin() PluginHandle {
return PluginHandle{s}
}

func (h PluginHandle) NewScheduler() {
h.s.plugin = pluginState{
alive: true,
ongoingRequest: false,
computeUnit: h.s.plugin.computeUnit, // Keep the previous scheduler's CU unless told otherwise
lastRequest: nil,
lastFailureAt: nil,
permit: h.s.plugin.permit, // Keep this; trust the previous scheduler.
}
}

func (h PluginHandle) SchedulerGone() {
h.s.plugin = pluginState{
alive: false,
ongoingRequest: false,
computeUnit: h.s.plugin.computeUnit,
lastRequest: nil,
lastFailureAt: nil,
permit: h.s.plugin.permit, // Keep this; trust the previous scheduler.
}
}

func (h PluginHandle) StartingRequest(now time.Time, resources api.Resources) {
h.s.plugin.lastRequest = &pluginRequested{
at: now,
Expand Down
167 changes: 0 additions & 167 deletions pkg/agent/core/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ func Test_DesiredResourcesFromMetricsOrRequestedUpscaling(t *testing.T) {
now := time.Now()

// set the compute unit and lastApproved by simulating a scheduler request/response
state.Plugin().NewScheduler()
state.Plugin().StartingRequest(now, c.schedulerApproved)
err := state.Plugin().RequestSuccessful(now, api.PluginResponse{
Permit: c.schedulerApproved,
Expand Down Expand Up @@ -261,7 +260,6 @@ func TestBasicScaleUpAndDownFlow(t *testing.T) {
return state.NextActions(clock.Now())
}

state.Plugin().NewScheduler()
state.Monitor().Active(true)

// Send initial scheduler request:
Expand Down Expand Up @@ -424,7 +422,6 @@ func TestPeriodicPluginRequest(t *testing.T) {
helpers.WithStoredWarnings(a.StoredWarnings()),
)

state.Plugin().NewScheduler()
state.Monitor().Active(true)

metrics := api.Metrics{
Expand Down Expand Up @@ -505,7 +502,6 @@ func TestDeniedDownscalingIncreaseAndRetry(t *testing.T) {
return state.NextActions(clock.Now())
}

state.Plugin().NewScheduler()
state.Monitor().Active(true)

doInitialPluginRequest(a, state, clock, duration("0.1s"), DefaultComputeUnit, nil, resForCU(6))
Expand Down Expand Up @@ -787,7 +783,6 @@ func TestRequestedUpscale(t *testing.T) {
return state.NextActions(clock.Now())
}

state.Plugin().NewScheduler()
state.Monitor().Active(true)

// Send initial scheduler request:
Expand Down Expand Up @@ -1060,7 +1055,6 @@ func TestDownscalePivotBack(t *testing.T) {
helpers.WithCurrentCU(2),
)

state.Plugin().NewScheduler()
state.Monitor().Active(true)

doInitialPluginRequest(a, state, clock, duration("0.1s"), DefaultComputeUnit, nil, resForCU(2))
Expand Down Expand Up @@ -1094,162 +1088,6 @@ func TestDownscalePivotBack(t *testing.T) {
}
}

// Checks that if we're disconnected from the scheduler plugin, we're able to downscale and
// re-upscale back to the last allocation before disconnection, but not beyond that.
// Also checks that when we reconnect, we first *inform* the scheduler plugin of the current
// resource allocation, and *then* send a follow-up request asking for additional resources.
func TestSchedulerDownscaleReupscale(t *testing.T) {
a := helpers.NewAssert(t)
clock := helpers.NewFakeClock(t)
clockTick := func() {
clock.Inc(100 * time.Millisecond)
}
resForCU := DefaultComputeUnit.Mul

state := helpers.CreateInitialState(
DefaultInitialStateConfig,
helpers.WithStoredWarnings(a.StoredWarnings()),
helpers.WithMinMaxCU(1, 3),
helpers.WithCurrentCU(2),
)
nextActions := func() core.ActionSet {
return state.NextActions(clock.Now())
}

state.Plugin().NewScheduler()
state.Monitor().Active(true)

// Send initial scheduler request:
doInitialPluginRequest(a, state, clock, duration("0.1s"), DefaultComputeUnit, nil, resForCU(2))

clockTick()

// Set metrics
a.Do(state.UpdateMetrics, api.Metrics{
LoadAverage1Min: 0.3, // <- means desired scale = 2
LoadAverage5Min: 0.0, // unused
MemoryUsageBytes: 0.0,
})
// Check we're not supposed to do anything
a.Call(nextActions).Equals(core.ActionSet{
Wait: &core.ActionWait{Duration: duration("4.8s")},
})

clockTick()

// Record the scheduler as disconnected
state.Plugin().SchedulerGone()
// ... and check that there's nothing we can do:
a.Call(nextActions).Equals(core.ActionSet{})

clockTick()

// First:
// 1. Change the metrics so we want to downscale to 1 CU
// 2. Request downscaling from the vm-monitor
// 3. Do the NeonVM request
// 4. But *don't* do the request to the scheduler plugin (because it's not there)
a.Do(state.UpdateMetrics, api.Metrics{
LoadAverage1Min: 0.0,
LoadAverage5Min: 0.0,
MemoryUsageBytes: 0.0,
})
// Check that we agree about desired resources
a.Call(getDesiredResources, state, clock.Now()).
Equals(resForCU(1))
// Do vm-monitor request:
a.Call(nextActions).Equals(core.ActionSet{
MonitorDownscale: &core.ActionMonitorDownscale{
Current: resForCU(2),
Target: resForCU(1),
},
})
a.Do(state.Monitor().StartingDownscaleRequest, clock.Now(), resForCU(1))
clockTick()
a.Do(state.Monitor().DownscaleRequestAllowed, clock.Now())
// Do the NeonVM request
a.Call(nextActions).Equals(core.ActionSet{
NeonVMRequest: &core.ActionNeonVMRequest{
Current: resForCU(2),
Target: resForCU(1),
},
})
a.Do(state.NeonVM().StartingRequest, clock.Now(), resForCU(1))
clockTick()
a.Do(state.NeonVM().RequestSuccessful, clock.Now())

// Now the current state reflects the desired state, so there shouldn't be anything else we need
// to do or wait on.
a.Call(nextActions).Equals(core.ActionSet{})

// Next:
// 1. Change the metrics so we want to upscale to 3 CU
// 2. Can't do the scheduler plugin request (not active), but we previously got a permit for 2 CU
// 3. Do the NeonVM request for 2 CU (3 isn't approved)
// 4. Do vm-monitor upscale request for 2 CU
lastMetrics := api.Metrics{
LoadAverage1Min: 0.5,
LoadAverage5Min: 0.0,
MemoryUsageBytes: 0.0,
}
a.Do(state.UpdateMetrics, lastMetrics)
a.Call(getDesiredResources, state, clock.Now()).
Equals(resForCU(3))
// Do NeonVM request
a.Call(nextActions).Equals(core.ActionSet{
NeonVMRequest: &core.ActionNeonVMRequest{
Current: resForCU(1),
Target: resForCU(2),
},
})
a.Do(state.NeonVM().StartingRequest, clock.Now(), resForCU(2))
clockTick()
a.Do(state.NeonVM().RequestSuccessful, clock.Now())
// Do vm-monitor request
a.Call(nextActions).Equals(core.ActionSet{
MonitorUpscale: &core.ActionMonitorUpscale{
Current: resForCU(1),
Target: resForCU(2),
},
})
a.Do(state.Monitor().StartingUpscaleRequest, clock.Now(), resForCU(2))
clockTick()
a.Do(state.Monitor().UpscaleRequestSuccessful, clock.Now())

// Nothing left to do in the meantime, because again, the current state reflects the desired
// state (at least, given that the we can't request anything from the scheduler plugin)

// Finally:
// 1. Update the state so we can now communicate with the scheduler plugin
// 2. Make an initial request to the plugin to inform it of *current* resources
// 3. Make another request to the plugin to request up to 3 CU
// We could test after that too, but this should be enough.
a.Do(state.Plugin().NewScheduler)
// Initial request: informative about current usage
a.Call(nextActions).Equals(core.ActionSet{
PluginRequest: &core.ActionPluginRequest{
LastPermit: ptr(resForCU(2)),
Target: resForCU(2),
Metrics: &lastMetrics,
},
})
a.Do(state.Plugin().StartingRequest, clock.Now(), resForCU(2))
clockTick()
a.NoError(state.Plugin().RequestSuccessful, clock.Now(), api.PluginResponse{
Permit: resForCU(2),
Migrate: nil,
ComputeUnit: DefaultComputeUnit,
})
// Follow-up request: request additional resources
a.Call(nextActions).Equals(core.ActionSet{
PluginRequest: &core.ActionPluginRequest{
LastPermit: ptr(resForCU(2)),
Target: resForCU(3),
Metrics: &lastMetrics,
},
})
}

// Checks that if the VM's min/max bounds change so that the maximum is below the current and
// desired usage, we try to downscale
func TestBoundsChangeRequiresDownsale(t *testing.T) {
Expand All @@ -1270,7 +1108,6 @@ func TestBoundsChangeRequiresDownsale(t *testing.T) {
return state.NextActions(clock.Now())
}

state.Plugin().NewScheduler()
state.Monitor().Active(true)

// Send initial scheduler request:
Expand Down Expand Up @@ -1367,7 +1204,6 @@ func TestBoundsChangeRequiresUpscale(t *testing.T) {
return state.NextActions(clock.Now())
}

state.Plugin().NewScheduler()
state.Monitor().Active(true)

// Send initial scheduler request:
Expand Down Expand Up @@ -1466,7 +1302,6 @@ func TestFailedRequestRetry(t *testing.T) {
return state.NextActions(clock.Now())
}

state.Plugin().NewScheduler()
state.Monitor().Active(true)

// Send initial scheduler request
Expand Down Expand Up @@ -1582,8 +1417,6 @@ func TestMetricsConcurrentUpdatedDuringDownscale(t *testing.T) {
return state.NextActions(clock.Now())
}

state.Plugin().NewScheduler()

// Send initial scheduler request - without the monitor active, so we're stuck at 4 CU for now
doInitialPluginRequest(a, state, clock, duration("0.1s"), DefaultComputeUnit, nil, resForCU(4))

Expand Down
12 changes: 3 additions & 9 deletions pkg/agent/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"

"github.com/tychoish/fun/pubsub"
"github.com/tychoish/fun/srv"
"go.uber.org/zap"

"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -46,18 +45,13 @@ func (r MainRunner) Run(logger *zap.Logger, ctx context.Context) error {
defer vmWatchStore.Stop()
logger.Info("VM watcher started")

broker := pubsub.NewBroker[schedwatch.WatchEvent](ctx, pubsub.BrokerOptions{})
if err := srv.GetOrchestrator(ctx).Add(srv.Broker(broker)); err != nil {
return err
}

schedulerStore, err := schedwatch.StartSchedulerWatcher(ctx, logger, r.KubeClient, watchMetrics, broker, r.Config.Scheduler.SchedulerName)
schedTracker, err := schedwatch.StartSchedulerWatcher(ctx, logger, r.KubeClient, watchMetrics, r.Config.Scheduler.SchedulerName)
if err != nil {
return fmt.Errorf("Starting scheduler watch server: %w", err)
}
defer schedulerStore.Stop()
defer schedTracker.Stop()

globalState, globalPromReg := r.newAgentState(logger, r.EnvArgs.K8sPodIP, broker, schedulerStore)
globalState, globalPromReg := r.newAgentState(logger, r.EnvArgs.K8sPodIP, schedTracker)
watchMetrics.MustRegister(globalPromReg)

if r.Config.Billing != nil {
Expand Down
Loading

0 comments on commit c392180

Please sign in to comment.