Skip to content

Commit

Permalink
Merge pull request #372 from travis-ci/meat-job-delivery-whackamole
Browse files Browse the repository at this point in the history
HTTP job delivery whackamole
  • Loading branch information
meatballhat authored Oct 17, 2017
2 parents 44fa026 + 3d4228a commit cdce890
Show file tree
Hide file tree
Showing 14 changed files with 253 additions and 155 deletions.
10 changes: 8 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,23 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/).
### Added
- backend/docker: support for bind-mounted volumes via space-delimited,
colon-paired values in `TRAVIS_WORKER_DOCKER_BINDS`
- http-job-queue: configurable new job polling interval
- http-job: configurable job refresh claim interval

### Changed
- backend/gce:
- add site tag to job vms
- backend/gce: add site tag to job vms
- http-job:
- account for transitional states when handling state update conflicts
- delete self under various error conditions indicative of a requeue

### Deprecated

### Removed

### Fixed
- backend/docker: switch to container hostname with dashes instead of dots
- http-job: conditional inclusion of state-related timestamps in state updates
- step-run-script: mark job errored on unknown execution error such as poweroff

### Security

Expand Down
6 changes: 3 additions & 3 deletions amqp_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,6 @@ func (j *amqpJob) sendStateUpdate(ctx gocontext.Context, event, state string) er
})
}

func (j *amqpJob) Name() string {
return "amqp"
}
func (j *amqpJob) SetupContext(ctx gocontext.Context) gocontext.Context { return ctx }

func (j *amqpJob) Name() string { return "amqp" }
12 changes: 6 additions & 6 deletions amqp_job_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,10 @@ func (q *AMQPJobQueue) Jobs(ctx gocontext.Context) (outChan <-chan Job, err erro

err := json.Unmarshal(delivery.Body, buildJob.payload)
if err != nil {
logger.WithField("err", err).Error("payload JSON parse error, attempting to nack delivery")
logger.WithField("err", err).Error("payload JSON parse error, attempting to ack+drop delivery")
err := delivery.Ack(false)
if err != nil {
logger.WithField("err", err).WithField("delivery", delivery).Error("couldn't nack delivery")
logger.WithField("err", err).WithField("delivery", delivery).Error("couldn't ack+drop delivery")
}
continue
}
Expand All @@ -116,20 +116,20 @@ func (q *AMQPJobQueue) Jobs(ctx gocontext.Context) (outChan <-chan Job, err erro

err = json.Unmarshal(delivery.Body, &startAttrs)
if err != nil {
logger.WithField("err", err).Error("start attributes JSON parse error, attempting to nack delivery")
logger.WithField("err", err).Error("start attributes JSON parse error, attempting to ack+drop delivery")
err := delivery.Ack(false)
if err != nil {
logger.WithField("err", err).WithField("delivery", delivery).Error("couldn't nack delivery")
logger.WithField("err", err).WithField("delivery", delivery).Error("couldn't ack+drop delivery")
}
continue
}

buildJob.rawPayload, err = simplejson.NewJson(delivery.Body)
if err != nil {
logger.WithField("err", err).Error("raw payload JSON parse error, attempting to nack delivery")
logger.WithField("err", err).Error("raw payload JSON parse error, attempting to ack+drop delivery")
err := delivery.Ack(false)
if err != nil {
logger.WithField("err", err).WithField("delivery", delivery).Error("couldn't nack delivery")
logger.WithField("err", err).WithField("delivery", delivery).Error("couldn't ack+drop delivery")
}
continue
}
Expand Down
6 changes: 4 additions & 2 deletions cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,9 +645,10 @@ func (i *CLI) buildHTTPJobQueue() (*HTTPJobQueue, error) {
return nil, errors.Wrap(err, "error parsing job board URL")
}

jobQueue, err := NewHTTPJobQueue(
jobQueue, err := NewHTTPJobQueueWithIntervals(
jobBoardURL, i.Config.TravisSite,
i.Config.ProviderName, i.Config.QueueName,
i.Config.HTTPPollingInterval, i.Config.HTTPRefreshClaimInterval,
i.CancellationBroadcaster)
if err != nil {
return nil, errors.Wrap(err, "error creating HTTP job queue")
Expand All @@ -662,7 +663,8 @@ func (i *CLI) buildHTTPJobQueue() (*HTTPJobQueue, error) {
}

func (i *CLI) buildFileJobQueue() (*FileJobQueue, error) {
jobQueue, err := NewFileJobQueue(i.Config.BaseDir, i.Config.QueueName, i.Config.FilePollingInterval)
jobQueue, err := NewFileJobQueue(
i.Config.BaseDir, i.Config.QueueName, i.Config.FilePollingInterval)
if err != nil {
return nil, err
}
Expand Down
26 changes: 19 additions & 7 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@ import (
)

var (
defaultAmqpURI = "amqp://"
defaultBaseDir = "."
defaultFilePollingInterval, _ = time.ParseDuration("5s")
defaultPoolSize = 1
defaultProviderName = "docker"
defaultQueueType = "amqp"
defaultAmqpURI = "amqp://"
defaultBaseDir = "."
defaultFilePollingInterval, _ = time.ParseDuration("5s")
defaultHTTPPollingInterval, _ = time.ParseDuration("3s")
defaultHTTPRefreshClaimInterval, _ = time.ParseDuration("5s")
defaultPoolSize = 1
defaultProviderName = "docker"
defaultQueueType = "amqp"

defaultHardTimeout, _ = time.ParseDuration("50m")
defaultInitialSleep, _ = time.ParseDuration("1s")
Expand Down Expand Up @@ -79,6 +81,14 @@ var (
NewConfigDef("QueueName", &cli.StringFlag{
Usage: "The AMQP queue to subscribe to for jobs",
}),
NewConfigDef("HTTPPollingInterval", &cli.DurationFlag{
Value: defaultHTTPPollingInterval,
Usage: `Sleep interval between new job requests (only valid for "http" queue type)`,
}),
NewConfigDef("HTTPRefreshClaimInterval", &cli.DurationFlag{
Value: defaultHTTPRefreshClaimInterval,
Usage: `Sleep interval between job claim refresh requests (only valid for "http" queue type)`,
}),
NewConfigDef("LibratoEmail", &cli.StringFlag{
Usage: "Librato metrics account email",
}),
Expand Down Expand Up @@ -317,7 +327,9 @@ type Config struct {
JobBoardURL string `config:"job-board-url"`
TravisSite string `config:"travis-site"`

FilePollingInterval time.Duration `config:"file-polling-interval"`
FilePollingInterval time.Duration `config:"file-polling-interval"`
HTTPPollingInterval time.Duration `config:"http-polling-interval"`
HTTPRefreshClaimInterval time.Duration `config:"http-refresh-claim-interval"`

HardTimeout time.Duration `config:"hard-timeout"`
InitialSleep time.Duration `config:"initial-sleep"`
Expand Down
6 changes: 3 additions & 3 deletions file_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,6 @@ func (j *fileJob) LogWriter(ctx gocontext.Context, defaultLogTimeout time.Durati
return newFileLogWriter(ctx, j.logFile, logTimeout)
}

func (j *fileJob) Name() string {
return "file"
}
func (j *fileJob) SetupContext(ctx gocontext.Context) gocontext.Context { return ctx }

func (j *fileJob) Name() string { return "file" }
144 changes: 49 additions & 95 deletions http_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,14 @@ import (
"encoding/base64"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"time"

gocontext "context"

"github.com/bitly/go-simplejson"
"github.com/cenk/backoff"
"github.com/jtacoma/uritemplates"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/travis-ci/worker/backend"
"github.com/travis-ci/worker/context"
"github.com/travis-ci/worker/metrics"
Expand All @@ -32,10 +28,8 @@ type httpJob struct {
stateCount uint

refreshClaim func(gocontext.Context)

jobBoardURL *url.URL
site string
processorID string
deleteSelf func(gocontext.Context) error
cancelSelf func(gocontext.Context)
}

type jobScriptPayload struct {
Expand All @@ -53,20 +47,6 @@ type httpJobPayload struct {
ImageName string `json:"image_name"`
}

type httpJobStateUpdate struct {
CurrentState string `json:"cur"`
NewState string `json:"new"`
Queued *time.Time `json:"queued,omitempty"`
Received time.Time `json:"received,omitempty"`
Started time.Time `json:"started,omitempty"`
Finished time.Time `json:"finished,omitempty"`
Meta *httpJobStateUpdateMeta `json:"meta,omitempty"`
}

type httpJobStateUpdateMeta struct {
StateUpdateCount uint `json:"state_update_count,omitempty"`
}

func (j *httpJob) GoString() string {
return fmt.Sprintf("&httpJob{payload: %#v, startAttributes: %#v}",
j.payload, j.startAttributes)
Expand Down Expand Up @@ -141,70 +121,11 @@ func (j *httpJob) currentState() string {
}

func (j *httpJob) Finish(ctx gocontext.Context, state FinishState) error {
logger := context.LoggerFromContext(ctx).WithFields(logrus.Fields{
"state": state,
"self": "http_job",
})

logger.Info("finishing job")

u := *j.jobBoardURL
u.Path = fmt.Sprintf("/jobs/%d", j.Payload().Job.ID)
u.User = nil

req, err := http.NewRequest("DELETE", u.String(), nil)
if err != nil {
return err
}

req.Header.Add("Travis-Site", j.site)
req.Header.Add("Authorization", "Bearer "+j.payload.JWT)
req.Header.Add("From", j.processorID)

bo := backoff.NewExponentialBackOff()
bo.MaxInterval = 10 * time.Second
bo.MaxElapsedTime = 1 * time.Minute

logger.WithField("url", u.String()).Debug("performing DELETE request")

var resp *http.Response
err = backoff.Retry(func() (err error) {
resp, err = (&http.Client{}).Do(req)
if resp != nil && resp.StatusCode != http.StatusNoContent {
logger.WithFields(logrus.Fields{
"expected_status": http.StatusNoContent,
"actual_status": resp.StatusCode,
}).Debug("delete failed")

if resp.Body != nil {
resp.Body.Close()
}
return errors.Errorf("expected %d but got %d", http.StatusNoContent, resp.StatusCode)
}

return
}, bo)

if err != nil {
return errors.Wrap(err, "failed to mark job complete with retries")
}

defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
err := j.deleteSelf(ctx)
if err != nil {
return err
}

if resp.StatusCode != http.StatusNoContent {
var errorResp jobBoardErrorResponse
err := json.Unmarshal(body, &errorResp)
if err != nil {
return errors.Wrapf(err, "job board job delete request errored with status %d and didn't send an error response", resp.StatusCode)
}

return errors.Errorf("job board job delete request errored with status %d: %s", resp.StatusCode, errorResp.Error)
}

j.finished = time.Now()
if j.received.IsZero() {
j.received = j.finished
Expand Down Expand Up @@ -239,20 +160,37 @@ func (j *httpJob) Generate(ctx gocontext.Context, job Job) ([]byte, error) {
return script, nil
}

func (j *httpJob) sendStateUpdate(ctx gocontext.Context, curState, newState string) error {
j.stateCount++
payload := &httpJobStateUpdate{
CurrentState: curState,
NewState: newState,
Queued: j.Payload().Job.QueuedAt,
Received: j.received,
Started: j.started,
Finished: j.finished,
Meta: &httpJobStateUpdateMeta{
StateUpdateCount: j.stateCount,
func (j *httpJob) createStateUpdateBody(curState, newState string) map[string]interface{} {
body := map[string]interface{}{
"id": j.Payload().Job.ID,
"state": newState,
"cur": curState,
"new": newState,
"meta": map[string]interface{}{
"state_update_count": j.stateCount,
},
}

if j.Payload().Job.QueuedAt != nil {
body["queued_at"] = j.Payload().Job.QueuedAt.UTC().Format(time.RFC3339)
}
if !j.received.IsZero() {
body["received_at"] = j.received.UTC().Format(time.RFC3339)
}
if !j.started.IsZero() {
body["started_at"] = j.started.UTC().Format(time.RFC3339)
}
if !j.finished.IsZero() {
body["finished_at"] = j.finished.UTC().Format(time.RFC3339)
}

return body
}

func (j *httpJob) sendStateUpdate(ctx gocontext.Context, curState, newState string) error {
j.stateCount++
payload := j.createStateUpdateBody(curState, newState)

encodedPayload, err := json.Marshal(payload)
if err != nil {
return errors.Wrap(err, "error encoding json")
Expand Down Expand Up @@ -287,12 +225,28 @@ func (j *httpJob) sendStateUpdate(ctx gocontext.Context, curState, newState stri
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
j.handleStateUpdateError(ctx, resp.StatusCode, newState)
return errors.Errorf("expected %d, but got %d", http.StatusOK, resp.StatusCode)
}

return nil
}

func (j *httpJob) Name() string {
return "http"
func (j *httpJob) handleStateUpdateError(ctx gocontext.Context, status int, newState string) {
if status != http.StatusConflict {
return
}

if newState == "received" || newState == "started" {
// NOTE: receiving a conflict response when attempting to transition to
// 'received' or 'started' means that the job is potentially being run
// by multiple workers. Assume the worst and cancel self.
j.cancelSelf(ctx)
}
}

func (j *httpJob) SetupContext(ctx gocontext.Context) gocontext.Context {
return context.FromJWT(ctx, j.payload.JWT)
}

func (j *httpJob) Name() string { return "http" }
Loading

0 comments on commit cdce890

Please sign in to comment.