Skip to content

Commit

Permalink
More fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
sharnoff committed Feb 6, 2023
1 parent 0ecffad commit 2ff0a7c
Show file tree
Hide file tree
Showing 11 changed files with 441 additions and 118 deletions.
21 changes: 19 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,33 @@ Set up the cluster:
scripts/cluster-init.sh
```

Run the VM:
Start the test VM:

```sh
kubectl apply -f vm-deploy.yaml
```

Run pgbench and watch the vCPU allocation grow:
### Running pgbench

Broadly, the `run-bench.sh` script just exsits to be expensive on CPU, so that more vCPU will be
allocated to the vm. You can run it with:

```sh
scripts/run-bench.sh
# or:
VM_NAME=postgres14-disk-test scripts/run-bench.sh
```

### Running `allocate-loop`

To test on-demand memory reservation, the [`allocate-loop`] binary is built into the test VM, and
can be used to slowly increasing memory allocations of arbitrary size. For example:

```sh
# After ssh-ing into the VM:
cgexec -g memory:neon-test allocate-loop 256 2280
#^^^^^^^^^^^^^^^^^^^^^^^^^ ^^^ ^^^^
# run it in the neon-test cgroup ; use 256 <-> 2280 MiB
```

[`allocate-loop`]: vm_image/allocate-loop.c
2 changes: 2 additions & 0 deletions pkg/agent/informant.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,8 @@ func doInformantRequest[Q any, R any](
return nil, statusCode, fmt.Errorf("Bad JSON response: %w", err)
}

s.runner.logger.Infof("Got informant %q response: %s", path, string(respBody))

return &respData, statusCode, nil
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/agent/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,7 @@ retryServer:
default:
r.logger.Infof(
"Informant server ended quickly, only %s ago. Respecting minimum wait of %s",
time.Since(lastStart),
time.Since(lastStart), minWait,
)
select {
case <-ctx.Done():
Expand Down Expand Up @@ -1457,7 +1457,7 @@ func (s *atomicUpdateState) desiredVMState(allowDecrease bool) api.Resources {
goalCU := currentCU
if s.metrics.LoadAverage1Min > 0.9*float32(s.vm.Cpu.Use) {
goalCU *= 2
} else if s.metrics.LoadAverage1Min < 0.4*float32(s.vm.Cpu.Use) {
} else if s.metrics.LoadAverage1Min < 0.4*float32(s.vm.Cpu.Use) && allowDecrease {
goalCU /= 2
}

Expand Down
118 changes: 81 additions & 37 deletions pkg/informant/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,24 +353,33 @@ func (s *AgentSet) RequestUpscale() {
// FIXME: we should assign a timeout to these upscale requests, so that we don't continue trying
// to upscale after the demand has gone away.

s.lock.Lock()
defer s.lock.Unlock()
agent := func() *Agent {
s.lock.Lock()
defer s.lock.Unlock()

s.wantsMemoryUpscale = true
// If we already have an ongoing request, don't create a new one.
if s.wantsMemoryUpscale {
return nil
}

// explicitly copy s.current so that our error handling points to the right Agent
if agent := s.current; agent != nil {
agent.SpawnRequestUpscale(AgentUpscaleTimeout, func(err error) {
if errors.Is(err, context.Canceled) {
return
}
s.wantsMemoryUpscale = true
return s.current
}()

klog.Errorf(
"Error requesting upscale from Agent %s/%s: %s",
agent.serverAddr, agent.id, err,
)
})
if agent == nil {
return
}

agent.SpawnRequestUpscale(AgentUpscaleTimeout, func(err error) {
if errors.Is(err, context.Canceled) {
return
}

klog.Errorf(
"Error requesting upscale from Agent %s/%s: %s",
agent.serverAddr, agent.id, err,
)
})
}

// ReceivedUpscale marks any desired upscaling from a prior s.RequestUpscale() as resolved
Expand Down Expand Up @@ -519,6 +528,8 @@ func doRequestWithStartSignal[B any, R any](
return
}

klog.Infof("Sending agent %s %q request: %s", agent.id, path, string(bodyBytes))

resp, err := client.Do(req)
if err != nil {
requestErr = err
Expand All @@ -544,6 +555,8 @@ func doRequestWithStartSignal[B any, R any](
return
}

klog.Infof("Got agent %s response: %s", agent.id, string(respBodyBytes))

if responseBody.SequenceNumber == 0 {
requestErr = errors.New("Got invalid sequence number 0")
return
Expand Down Expand Up @@ -763,6 +776,13 @@ func (a *Agent) Resume(timeout time.Duration) error {
}

// SpawnRequestUpscale requests that the Agent increase the resource allocation to this VM
//
// This method until the request is picked up by the message queue, and returns without waiting for
// the request to complete (it'll do that on its own).
//
// The timeout applies only once the request is in-flight.
//
// This method MUST NOT be called while holding a.parent.lock; if that happens, it may deadlock.
func (a *Agent) SpawnRequestUpscale(timeout time.Duration, handleError func(error)) {
// Quick unregistered check
select {
Expand All @@ -776,30 +796,54 @@ func (a *Agent) SpawnRequestUpscale(timeout time.Duration, handleError func(erro
default:
}

body := api.MoreResourcesRequest{
MoreResources: api.MoreResources{Cpu: false, Memory: true},
ExpectedID: a.id,
}
id, _, err := doRequest[api.MoreResourcesRequest, api.AgentIdentification](
a, timeout, http.MethodPost, "/try-upscale", &body,
)
sendDone, recvDone := util.NewSingleSignalPair()

select {
case <-a.unregistered.Recv():
handleError(context.Canceled)
return
default:
}
go func() {
// If we exit early, signal that we're done.
defer sendDone.Send()

if err != nil {
a.EnsureUnregistered()
handleError(err)
return
}
unsetWantsUpscale := func() {
// Unset s.wantsMemoryUpscale if the agent is still current. We want to allow further
// requests to try again.
a.parent.lock.Lock()
defer a.parent.lock.Unlock()

if id.AgentID != a.id {
a.EnsureUnregistered()
handleError(fmt.Errorf("Bad agent identification: expected %q but got %q", a.id, id.AgentID))
return
}
if a.parent.current == a {
a.parent.wantsMemoryUpscale = false
}
}

body := api.MoreResourcesRequest{
MoreResources: api.MoreResources{Cpu: false, Memory: true},
ExpectedID: a.id,
}
// Pass the signal sender into doRequestWithStartSignal so that the signalling on
// start-of-handling is done for us.
id, _, err := doRequestWithStartSignal[api.MoreResourcesRequest, api.AgentIdentification](
a, timeout, &sendDone, http.MethodPost, "/try-upscale", &body,
)

select {
case <-a.unregistered.Recv():
handleError(context.Canceled)
return
default:
}

if err != nil {
unsetWantsUpscale()
a.EnsureUnregistered()
handleError(err)
return
}

if id.AgentID != a.id {
unsetWantsUpscale()
a.EnsureUnregistered()
handleError(fmt.Errorf("Bad agent identification: expected %q but got %q", a.id, id.AgentID))
return
}
}()

<-recvDone.Recv()
}
Loading

0 comments on commit 2ff0a7c

Please sign in to comment.