From 6aee71cd99e0bad96a3cab9d419526a08226d010 Mon Sep 17 00:00:00 2001 From: Dan Buch Date: Fri, 15 Sep 2017 10:46:48 -0400 Subject: [PATCH 01/13] Allow configuration of HTTP job queue intervals --- canceller.go | 10 ++++++++++ cli.go | 4 +++- config/config.go | 26 +++++++++++++++++++------- http_job_queue.go | 33 ++++++++++++++++++--------------- http_job_queue_test.go | 8 ++++---- 5 files changed, 54 insertions(+), 27 deletions(-) diff --git a/canceller.go b/canceller.go index 6709e5498..fe2759ce0 100644 --- a/canceller.go +++ b/canceller.go @@ -2,6 +2,16 @@ package worker import "sync" +type JobIDBroadcaster interface { + Broadcast(uint64) +} + +type Canceller interface { + JobIDBroadcaster + Subscribe(uint64) <-chan struct{} + Unsubscribe(uint64, <-chan struct{}) +} + // A CancellationBroadcaster allows you to subscribe to and unsubscribe from // cancellation messages for a given job ID. type CancellationBroadcaster struct { diff --git a/cli.go b/cli.go index 0bf04382b..6076963b0 100644 --- a/cli.go +++ b/cli.go @@ -644,6 +644,7 @@ func (i *CLI) buildHTTPJobQueue() (*HTTPJobQueue, error) { jobQueue, err := NewHTTPJobQueue( jobBoardURL, i.Config.TravisSite, i.Config.ProviderName, i.Config.QueueName, + i.Config.HTTPPollingInterval, i.Config.HTTPRefreshClaimInterval, i.CancellationBroadcaster) if err != nil { return nil, errors.Wrap(err, "error creating HTTP job queue") @@ -658,7 +659,8 @@ func (i *CLI) buildHTTPJobQueue() (*HTTPJobQueue, error) { } func (i *CLI) buildFileJobQueue() (*FileJobQueue, error) { - jobQueue, err := NewFileJobQueue(i.Config.BaseDir, i.Config.QueueName, i.Config.FilePollingInterval) + jobQueue, err := NewFileJobQueue( + i.Config.BaseDir, i.Config.QueueName, i.Config.FilePollingInterval) if err != nil { return nil, err } diff --git a/config/config.go b/config/config.go index 6aba53c8a..370c43a3e 100644 --- a/config/config.go +++ b/config/config.go @@ -14,12 +14,14 @@ import ( ) var ( - defaultAmqpURI = "amqp://" - defaultBaseDir = "." - defaultFilePollingInterval, _ = time.ParseDuration("5s") - defaultPoolSize = 1 - defaultProviderName = "docker" - defaultQueueType = "amqp" + defaultAmqpURI = "amqp://" + defaultBaseDir = "." + defaultFilePollingInterval, _ = time.ParseDuration("5s") + defaultHTTPPollingInterval, _ = time.ParseDuration("1s") + defaultHTTPRefreshClaimInterval, _ = time.ParseDuration("5s") + defaultPoolSize = 1 + defaultProviderName = "docker" + defaultQueueType = "amqp" defaultHardTimeout, _ = time.ParseDuration("50m") defaultInitialSleep, _ = time.ParseDuration("1s") @@ -79,6 +81,14 @@ var ( NewConfigDef("QueueName", &cli.StringFlag{ Usage: "The AMQP queue to subscribe to for jobs", }), + NewConfigDef("HTTPPollingInterval", &cli.DurationFlag{ + Value: defaultHTTPPollingInterval, + Usage: `Sleep interval between new job requests (only valid for "http" queue type)`, + }), + NewConfigDef("HTTPRefreshClaimInterval", &cli.DurationFlag{ + Value: defaultHTTPRefreshClaimInterval, + Usage: `Sleep interval between job claim refresh requests (only valid for "http" queue type)`, + }), NewConfigDef("LibratoEmail", &cli.StringFlag{ Usage: "Librato metrics account email", }), @@ -317,7 +327,9 @@ type Config struct { JobBoardURL string `config:"job-board-url"` TravisSite string `config:"travis-site"` - FilePollingInterval time.Duration `config:"file-polling-interval"` + FilePollingInterval time.Duration `config:"file-polling-interval"` + HTTPPollingInterval time.Duration `config:"http-polling-interval"` + HTTPRefreshClaimInterval time.Duration `config:"http-refresh-claim-interval"` HardTimeout time.Duration `config:"hard-timeout"` InitialSleep time.Duration `config:"initial-sleep"` diff --git a/http_job_queue.go b/http_job_queue.go index d98565e80..3930b0719 100644 --- a/http_job_queue.go +++ b/http_job_queue.go @@ -27,12 +27,13 @@ var ( // HTTPJobQueue is a JobQueue that uses http type HTTPJobQueue struct { - jobBoardURL *url.URL - site string - providerName string - queue string - pollInterval time.Duration - cb *CancellationBroadcaster + jobBoardURL *url.URL + site string + providerName string + queue string + pollInterval time.Duration + refreshClaimInterval time.Duration + cb JobIDBroadcaster DefaultLanguage, DefaultDist, DefaultGroup, DefaultOS string } @@ -53,15 +54,17 @@ type jobBoardErrorResponse struct { // NewHTTPJobQueue creates a new job-board job queue func NewHTTPJobQueue(jobBoardURL *url.URL, site, providerName, queue string, - cb *CancellationBroadcaster) (*HTTPJobQueue, error) { + pollInterval, refreshClaimInterval time.Duration, + cb JobIDBroadcaster) (*HTTPJobQueue, error) { return &HTTPJobQueue{ - jobBoardURL: jobBoardURL, - site: site, - providerName: providerName, - queue: queue, - pollInterval: time.Second, - cb: cb, + jobBoardURL: jobBoardURL, + site: site, + providerName: providerName, + queue: queue, + pollInterval: pollInterval, + refreshClaimInterval: refreshClaimInterval, + cb: cb, }, nil } @@ -365,7 +368,7 @@ func (q *HTTPJobQueue) generateJobRefreshClaimFunc(jobID uint64) (func(gocontext context.LoggerFromContext(ctx).WithFields(logrus.Fields{ "err": err, "job_id": jobID, - }).Error("failed to refresh claim; cancelling") + }).Error("cancelling") q.cb.Broadcast(jobID) return } @@ -379,7 +382,7 @@ func (q *HTTPJobQueue) generateJobRefreshClaimFunc(jobID uint64) (func(gocontext select { case <-ctx.Done(): return - case <-time.After(q.pollInterval): + case <-time.After(q.refreshClaimInterval): } } }, (<-chan struct{})(readyChan) diff --git a/http_job_queue_test.go b/http_job_queue_test.go index 926f6ea14..9c2ef734b 100644 --- a/http_job_queue_test.go +++ b/http_job_queue_test.go @@ -15,7 +15,7 @@ import ( ) func TestHTTPJobQueue(t *testing.T) { - hjq, err := NewHTTPJobQueue(nil, "test", "fake", "fake", nil) + hjq, err := NewHTTPJobQueue(nil, "test", "fake", "fake", time.Second, time.Second, nil) assert.Nil(t, err) assert.NotNil(t, hjq) } @@ -64,7 +64,7 @@ func TestHTTPJobQueue_Jobs(t *testing.T) { defer jobBoardServer.Close() jobBoardURL, _ := url.Parse(jobBoardServer.URL) - hjq, err := NewHTTPJobQueue(jobBoardURL, "test", "fake", "fake", nil) + hjq, err := NewHTTPJobQueue(jobBoardURL, "test", "fake", "fake", time.Second, time.Second, nil) assert.Nil(t, err) assert.NotNil(t, hjq) @@ -82,13 +82,13 @@ func TestHTTPJobQueue_Jobs(t *testing.T) { } func TestHTTPJobQueue_Name(t *testing.T) { - hjq, err := NewHTTPJobQueue(nil, "test", "fake", "fake", nil) + hjq, err := NewHTTPJobQueue(nil, "test", "fake", "fake", time.Second, time.Second, nil) assert.Nil(t, err) assert.Equal(t, "http", hjq.Name()) } func TestHTTPJobQueue_Cleanup(t *testing.T) { - hjq, err := NewHTTPJobQueue(nil, "test", "fake", "fake", nil) + hjq, err := NewHTTPJobQueue(nil, "test", "fake", "fake", time.Second, time.Second, nil) assert.Nil(t, err) assert.Nil(t, hjq.Cleanup()) } From 1fe2205559ea441342791ff6663834ceb7e9e212 Mon Sep 17 00:00:00 2001 From: Dan Buch Date: Wed, 20 Sep 2017 10:35:09 -0400 Subject: [PATCH 02/13] Serialize http job state updates with same field names as amqp --- http_job.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/http_job.go b/http_job.go index 7bfb3fe91..098d52d55 100644 --- a/http_job.go +++ b/http_job.go @@ -56,10 +56,10 @@ type httpJobPayload struct { type httpJobStateUpdate struct { CurrentState string `json:"cur"` NewState string `json:"new"` - Queued *time.Time `json:"queued,omitempty"` - Received time.Time `json:"received,omitempty"` - Started time.Time `json:"started,omitempty"` - Finished time.Time `json:"finished,omitempty"` + Queued *time.Time `json:"queued_at,omitempty"` + Received time.Time `json:"received_at,omitempty"` + Started time.Time `json:"started_at,omitempty"` + Finished time.Time `json:"finished_at,omitempty"` Meta *httpJobStateUpdateMeta `json:"meta,omitempty"` } From 7588d85544455764f8d96c46921e2a8175a841bd Mon Sep 17 00:00:00 2001 From: Dan Buch Date: Wed, 20 Sep 2017 11:00:03 -0400 Subject: [PATCH 03/13] Undo some bits in http queue constructor for compat reasons --- canceller.go | 10 ---------- cli.go | 2 +- config/config.go | 2 +- http_job_queue.go | 22 +++++++++++++++++++--- http_job_queue_test.go | 8 ++++---- 5 files changed, 25 insertions(+), 19 deletions(-) diff --git a/canceller.go b/canceller.go index fe2759ce0..6709e5498 100644 --- a/canceller.go +++ b/canceller.go @@ -2,16 +2,6 @@ package worker import "sync" -type JobIDBroadcaster interface { - Broadcast(uint64) -} - -type Canceller interface { - JobIDBroadcaster - Subscribe(uint64) <-chan struct{} - Unsubscribe(uint64, <-chan struct{}) -} - // A CancellationBroadcaster allows you to subscribe to and unsubscribe from // cancellation messages for a given job ID. type CancellationBroadcaster struct { diff --git a/cli.go b/cli.go index 6076963b0..b50a030aa 100644 --- a/cli.go +++ b/cli.go @@ -641,7 +641,7 @@ func (i *CLI) buildHTTPJobQueue() (*HTTPJobQueue, error) { return nil, errors.Wrap(err, "error parsing job board URL") } - jobQueue, err := NewHTTPJobQueue( + jobQueue, err := NewHTTPJobQueueWithIntervals( jobBoardURL, i.Config.TravisSite, i.Config.ProviderName, i.Config.QueueName, i.Config.HTTPPollingInterval, i.Config.HTTPRefreshClaimInterval, diff --git a/config/config.go b/config/config.go index 370c43a3e..da67b0bbf 100644 --- a/config/config.go +++ b/config/config.go @@ -17,7 +17,7 @@ var ( defaultAmqpURI = "amqp://" defaultBaseDir = "." defaultFilePollingInterval, _ = time.ParseDuration("5s") - defaultHTTPPollingInterval, _ = time.ParseDuration("1s") + defaultHTTPPollingInterval, _ = time.ParseDuration("3s") defaultHTTPRefreshClaimInterval, _ = time.ParseDuration("5s") defaultPoolSize = 1 defaultProviderName = "docker" diff --git a/http_job_queue.go b/http_job_queue.go index 3930b0719..a5008aee2 100644 --- a/http_job_queue.go +++ b/http_job_queue.go @@ -33,7 +33,7 @@ type HTTPJobQueue struct { queue string pollInterval time.Duration refreshClaimInterval time.Duration - cb JobIDBroadcaster + cb *CancellationBroadcaster DefaultLanguage, DefaultDist, DefaultGroup, DefaultOS string } @@ -52,10 +52,26 @@ type jobBoardErrorResponse struct { UpstreamError string `json:"upstream_error,omitempty"` } -// NewHTTPJobQueue creates a new job-board job queue +// NewHTTPJobQueue creates a new http job queue func NewHTTPJobQueue(jobBoardURL *url.URL, site, providerName, queue string, + cb *CancellationBroadcaster) (*HTTPJobQueue, error) { + + return &HTTPJobQueue{ + jobBoardURL: jobBoardURL, + site: site, + providerName: providerName, + queue: queue, + pollInterval: 3 * time.Second, + refreshClaimInterval: 5 * time.Second, + cb: cb, + }, nil +} + +// NewHTTPJobQueueWithIntervals creates a new http job queue with the specified +// poll and refresh claim intervals +func NewHTTPJobQueueWithIntervals(jobBoardURL *url.URL, site, providerName, queue string, pollInterval, refreshClaimInterval time.Duration, - cb JobIDBroadcaster) (*HTTPJobQueue, error) { + cb *CancellationBroadcaster) (*HTTPJobQueue, error) { return &HTTPJobQueue{ jobBoardURL: jobBoardURL, diff --git a/http_job_queue_test.go b/http_job_queue_test.go index 9c2ef734b..926f6ea14 100644 --- a/http_job_queue_test.go +++ b/http_job_queue_test.go @@ -15,7 +15,7 @@ import ( ) func TestHTTPJobQueue(t *testing.T) { - hjq, err := NewHTTPJobQueue(nil, "test", "fake", "fake", time.Second, time.Second, nil) + hjq, err := NewHTTPJobQueue(nil, "test", "fake", "fake", nil) assert.Nil(t, err) assert.NotNil(t, hjq) } @@ -64,7 +64,7 @@ func TestHTTPJobQueue_Jobs(t *testing.T) { defer jobBoardServer.Close() jobBoardURL, _ := url.Parse(jobBoardServer.URL) - hjq, err := NewHTTPJobQueue(jobBoardURL, "test", "fake", "fake", time.Second, time.Second, nil) + hjq, err := NewHTTPJobQueue(jobBoardURL, "test", "fake", "fake", nil) assert.Nil(t, err) assert.NotNil(t, hjq) @@ -82,13 +82,13 @@ func TestHTTPJobQueue_Jobs(t *testing.T) { } func TestHTTPJobQueue_Name(t *testing.T) { - hjq, err := NewHTTPJobQueue(nil, "test", "fake", "fake", time.Second, time.Second, nil) + hjq, err := NewHTTPJobQueue(nil, "test", "fake", "fake", nil) assert.Nil(t, err) assert.Equal(t, "http", hjq.Name()) } func TestHTTPJobQueue_Cleanup(t *testing.T) { - hjq, err := NewHTTPJobQueue(nil, "test", "fake", "fake", time.Second, time.Second, nil) + hjq, err := NewHTTPJobQueue(nil, "test", "fake", "fake", nil) assert.Nil(t, err) assert.Nil(t, hjq.Cleanup()) } From 1f166a4b4cf30a3b4d7eac90c97b758d599f4ad1 Mon Sep 17 00:00:00 2001 From: Dan Buch Date: Sun, 24 Sep 2017 22:45:04 -0400 Subject: [PATCH 04/13] Move HTTP job deletion responsibility to queue level and delete job in same conditions as when AMQP jobs are dropped. --- amqp_job_queue.go | 12 +++--- http_job.go | 71 ++---------------------------- http_job_queue.go | 108 +++++++++++++++++++++++++++++++++++++++++++--- http_job_test.go | 10 +---- 4 files changed, 111 insertions(+), 90 deletions(-) diff --git a/amqp_job_queue.go b/amqp_job_queue.go index 4dd0097d6..3ba1d3000 100644 --- a/amqp_job_queue.go +++ b/amqp_job_queue.go @@ -104,10 +104,10 @@ func (q *AMQPJobQueue) Jobs(ctx gocontext.Context) (outChan <-chan Job, err erro err := json.Unmarshal(delivery.Body, buildJob.payload) if err != nil { - logger.WithField("err", err).Error("payload JSON parse error, attempting to nack delivery") + logger.WithField("err", err).Error("payload JSON parse error, attempting to ack+drop delivery") err := delivery.Ack(false) if err != nil { - logger.WithField("err", err).WithField("delivery", delivery).Error("couldn't nack delivery") + logger.WithField("err", err).WithField("delivery", delivery).Error("couldn't ack+drop delivery") } continue } @@ -116,20 +116,20 @@ func (q *AMQPJobQueue) Jobs(ctx gocontext.Context) (outChan <-chan Job, err erro err = json.Unmarshal(delivery.Body, &startAttrs) if err != nil { - logger.WithField("err", err).Error("start attributes JSON parse error, attempting to nack delivery") + logger.WithField("err", err).Error("start attributes JSON parse error, attempting to ack+drop delivery") err := delivery.Ack(false) if err != nil { - logger.WithField("err", err).WithField("delivery", delivery).Error("couldn't nack delivery") + logger.WithField("err", err).WithField("delivery", delivery).Error("couldn't ack+drop delivery") } continue } buildJob.rawPayload, err = simplejson.NewJson(delivery.Body) if err != nil { - logger.WithField("err", err).Error("raw payload JSON parse error, attempting to nack delivery") + logger.WithField("err", err).Error("raw payload JSON parse error, attempting to ack+drop delivery") err := delivery.Ack(false) if err != nil { - logger.WithField("err", err).WithField("delivery", delivery).Error("couldn't nack delivery") + logger.WithField("err", err).WithField("delivery", delivery).Error("couldn't ack+drop delivery") } continue } diff --git a/http_job.go b/http_job.go index 098d52d55..3f8ebe2db 100644 --- a/http_job.go +++ b/http_job.go @@ -5,18 +5,14 @@ import ( "encoding/base64" "encoding/json" "fmt" - "io/ioutil" "net/http" - "net/url" "time" gocontext "context" "github.com/bitly/go-simplejson" - "github.com/cenk/backoff" "github.com/jtacoma/uritemplates" "github.com/pkg/errors" - "github.com/sirupsen/logrus" "github.com/travis-ci/worker/backend" "github.com/travis-ci/worker/context" "github.com/travis-ci/worker/metrics" @@ -32,10 +28,7 @@ type httpJob struct { stateCount uint refreshClaim func(gocontext.Context) - - jobBoardURL *url.URL - site string - processorID string + deleteSelf func(gocontext.Context) error } type jobScriptPayload struct { @@ -141,70 +134,12 @@ func (j *httpJob) currentState() string { } func (j *httpJob) Finish(ctx gocontext.Context, state FinishState) error { - logger := context.LoggerFromContext(ctx).WithFields(logrus.Fields{ - "state": state, - "self": "http_job", - }) - - logger.Info("finishing job") - - u := *j.jobBoardURL - u.Path = fmt.Sprintf("/jobs/%d", j.Payload().Job.ID) - u.User = nil - - req, err := http.NewRequest("DELETE", u.String(), nil) - if err != nil { - return err - } - - req.Header.Add("Travis-Site", j.site) - req.Header.Add("Authorization", "Bearer "+j.payload.JWT) - req.Header.Add("From", j.processorID) - - bo := backoff.NewExponentialBackOff() - bo.MaxInterval = 10 * time.Second - bo.MaxElapsedTime = 1 * time.Minute - - logger.WithField("url", u.String()).Debug("performing DELETE request") - - var resp *http.Response - err = backoff.Retry(func() (err error) { - resp, err = (&http.Client{}).Do(req) - if resp != nil && resp.StatusCode != http.StatusNoContent { - logger.WithFields(logrus.Fields{ - "expected_status": http.StatusNoContent, - "actual_status": resp.StatusCode, - }).Debug("delete failed") - - if resp.Body != nil { - resp.Body.Close() - } - return errors.Errorf("expected %d but got %d", http.StatusNoContent, resp.StatusCode) - } - - return - }, bo) - - if err != nil { - return errors.Wrap(err, "failed to mark job complete with retries") - } - - defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) + // FIXME: use processor context here? + err := j.deleteSelf(gocontext.TODO()) if err != nil { return err } - if resp.StatusCode != http.StatusNoContent { - var errorResp jobBoardErrorResponse - err := json.Unmarshal(body, &errorResp) - if err != nil { - return errors.Wrapf(err, "job board job delete request errored with status %d and didn't send an error response", resp.StatusCode) - } - - return errors.Errorf("job board job delete request errored with status %d: %s", resp.StatusCode, errorResp.Error) - } - j.finished = time.Now() if j.received.IsZero() { j.received = j.finished diff --git a/http_job_queue.go b/http_job_queue.go index a5008aee2..ca2ce0c52 100644 --- a/http_job_queue.go +++ b/http_job_queue.go @@ -154,6 +154,8 @@ func (q *HTTPJobQueue) pollForJob(ctx gocontext.Context, buildJobChan chan Job) }).Info("sent job to output channel") return true, readyChan case <-ctx.Done(): + // FIXME: use processor context here? + _ = q.deleteJob(gocontext.TODO(), jobID) logger.WithField("err", ctx.Err()).Warn("returning from jobs loop due to context done") return false, nil } @@ -216,6 +218,84 @@ func (q *HTTPJobQueue) fetchJobID(ctx gocontext.Context) (uint64, error) { return fetchedJobID, nil } +func (q *HTTPJobQueue) deleteJob(ctx gocontext.Context, jobID uint64) error { + logger := context.LoggerFromContext(ctx).WithFields(logrus.Fields{ + "self": "http_job_queue", + }) + + logger.Info("deleting job") + + jwt, ok := context.JWTFromContext(ctx) + if !ok { + return fmt.Errorf("failed to find jwt in context for job_id=%v", jobID) + } + + processorID, ok := context.ProcessorFromContext(ctx) + if !ok { + processorID = "unknown-processor" + } + + u := *q.jobBoardURL + u.Path = fmt.Sprintf("/jobs/%d", jobID) + u.User = nil + + req, err := http.NewRequest("DELETE", u.String(), nil) + if err != nil { + return err + } + + req.Header.Add("Travis-Site", q.site) + req.Header.Add("Authorization", "Bearer "+jwt) + req.Header.Add("From", processorID) + + bo := backoff.NewExponentialBackOff() + bo.MaxInterval = 10 * time.Second + bo.MaxElapsedTime = 1 * time.Minute + + logger.WithField("url", u.String()).Debug("performing DELETE request") + + var resp *http.Response + err = backoff.Retry(func() (err error) { + resp, err = http.DefaultClient.Do(req) + if resp != nil && resp.StatusCode != http.StatusNoContent { + logger.WithFields(logrus.Fields{ + "expected_status": http.StatusNoContent, + "actual_status": resp.StatusCode, + }).Debug("delete failed") + + if resp.Body != nil { + resp.Body.Close() + } + return errors.Errorf("expected %d but got %d", http.StatusNoContent, resp.StatusCode) + } + + return + }, bo) + + if err != nil { + return errors.Wrap(err, "failed to delete job with retries") + } + + defer resp.Body.Close() + + if resp.StatusCode == http.StatusNoContent { + return nil + } + + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + + var errorResp jobBoardErrorResponse + err = json.Unmarshal(body, &errorResp) + if err != nil { + return errors.Wrapf(err, "job board job delete request errored with status %d and didn't send an error response", resp.StatusCode) + } + + return errors.Errorf("job board job delete request errored with status %d: %s", resp.StatusCode, errorResp.Error) +} + func (q *HTTPJobQueue) refreshJobClaim(ctx gocontext.Context, jobID uint64) error { logger := context.LoggerFromContext(ctx).WithFields(logrus.Fields{ "self": "http_job_queue", @@ -290,10 +370,9 @@ func (q *HTTPJobQueue) fetchJob(ctx gocontext.Context, jobID uint64) (Job, <-cha startAttributes: &backend.StartAttributes{}, refreshClaim: refreshClaimFunc, - - jobBoardURL: q.jobBoardURL, - site: q.site, - processorID: processorID, + deleteSelf: func(ctx gocontext.Context) error { + return q.deleteJob(ctx, jobID) + }, } startAttrs := &httpJobPayloadStartAttrs{ Data: &jobPayloadStartAttrs{ @@ -351,17 +430,32 @@ func (q *HTTPJobQueue) fetchJob(ctx gocontext.Context, jobID uint64) (Job, <-cha err = json.Unmarshal(body, buildJob.payload) if err != nil { - return nil, nil, errors.Wrap(err, "failed to unmarshal job-board payload") + logger.WithField("err", err).Error("payload JSON parse error, attempting to delete job") + err := q.deleteJob(ctx, jobID) + if err != nil { + return nil, nil, errors.Wrap(err, "couldn't delete job") + } + return nil, nil, errors.Wrap(err, "payload JSON parse error") } err = json.Unmarshal(body, &startAttrs) if err != nil { - return nil, nil, errors.Wrap(err, "failed to unmarshal start attributes from job-board") + logger.WithField("err", err).Error("start attributes JSON parse error, attempting to delete job") + err := q.deleteJob(ctx, jobID) + if err != nil { + return nil, nil, errors.Wrap(err, "couldn't delete job") + } + return nil, nil, errors.Wrap(err, "start attributes JSON parse error") } rawPayload, err := simplejson.NewJson(body) if err != nil { - return nil, nil, errors.Wrap(err, "failed to parse raw payload with simplejson") + logger.WithField("err", err).Error("raw payload JSON parse error, attempting to delete job") + err := q.deleteJob(ctx, jobID) + if err != nil { + return nil, nil, errors.Wrap(err, "couldn't delete job") + } + return nil, nil, errors.Wrap(err, "raw payload JSON parse error") } buildJob.rawPayload = rawPayload.Get("data") diff --git a/http_job_test.go b/http_job_test.go index 626016b0a..2fd478127 100644 --- a/http_job_test.go +++ b/http_job_test.go @@ -5,7 +5,6 @@ import ( "fmt" "net/http" "net/http/httptest" - "net/url" "strings" "testing" @@ -62,9 +61,7 @@ func newTestHTTPJob(t *testing.T) *httpJob { }, rawPayload: rawPayload, startAttributes: startAttributes, - - site: "test", - processorID: "whee", + deleteSelf: func(_ gocontext.Context) error { return nil }, } } @@ -108,7 +105,6 @@ func TestHTTPJob_Error(t *testing.T) { job := newTestHTTPJob(t) job.payload.JobStateURL = ts.URL job.payload.JobPartsURL = ts.URL - job.jobBoardURL, _ = url.Parse(ts.URL) err := job.Error(gocontext.TODO(), "wat") if err != nil { @@ -125,7 +121,6 @@ func TestHTTPJob_Requeue(t *testing.T) { job := newTestHTTPJob(t) job.payload.JobStateURL = ts.URL job.payload.JobPartsURL = ts.URL - job.jobBoardURL, _ = url.Parse(ts.URL) ctx := gocontext.TODO() @@ -143,7 +138,6 @@ func TestHTTPJob_Received(t *testing.T) { job := newTestHTTPJob(t) job.payload.JobStateURL = ts.URL job.payload.JobPartsURL = ts.URL - job.jobBoardURL, _ = url.Parse(ts.URL) err := job.Received(gocontext.TODO()) if err != nil { @@ -159,7 +153,6 @@ func TestHTTPJob_Started(t *testing.T) { job := newTestHTTPJob(t) job.payload.JobStateURL = ts.URL job.payload.JobPartsURL = ts.URL - job.jobBoardURL, _ = url.Parse(ts.URL) err := job.Started(gocontext.TODO()) if err != nil { @@ -179,7 +172,6 @@ func TestHTTPJob_Finish(t *testing.T) { job := newTestHTTPJob(t) job.payload.JobStateURL = ts.URL job.payload.JobPartsURL = ts.URL - job.jobBoardURL, _ = url.Parse(ts.URL) ctx := gocontext.TODO() From 640316e1aa507a8f09f63cce5e09a97eb93b09da Mon Sep 17 00:00:00 2001 From: Dan Buch Date: Sun, 24 Sep 2017 22:54:57 -0400 Subject: [PATCH 05/13] Pass along processor context to http job deletion and continue using a TODO context in the best-effort case :shrugging: --- http_job.go | 3 +-- http_job_queue.go | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/http_job.go b/http_job.go index 3f8ebe2db..1a0d75046 100644 --- a/http_job.go +++ b/http_job.go @@ -134,8 +134,7 @@ func (j *httpJob) currentState() string { } func (j *httpJob) Finish(ctx gocontext.Context, state FinishState) error { - // FIXME: use processor context here? - err := j.deleteSelf(gocontext.TODO()) + err := j.deleteSelf(ctx) if err != nil { return err } diff --git a/http_job_queue.go b/http_job_queue.go index ca2ce0c52..f4266722b 100644 --- a/http_job_queue.go +++ b/http_job_queue.go @@ -154,7 +154,7 @@ func (q *HTTPJobQueue) pollForJob(ctx gocontext.Context, buildJobChan chan Job) }).Info("sent job to output channel") return true, readyChan case <-ctx.Done(): - // FIXME: use processor context here? + // best-effort delete with a meaningless context :ok_hand: _ = q.deleteJob(gocontext.TODO(), jobID) logger.WithField("err", ctx.Err()).Warn("returning from jobs loop due to context done") return false, nil From 54d1b468abfd8c66eca5152145a9226c20afa038 Mon Sep 17 00:00:00 2001 From: Dan Buch Date: Fri, 13 Oct 2017 21:57:24 -0400 Subject: [PATCH 06/13] Ensure all contexts have JWT for state updates and :scream_cat: change the signal for graceful shutdown + pause from SIGWINCH to SIGUSR2 --- amqp_job.go | 6 +++--- cli.go | 6 +++--- file_job.go | 6 +++--- http_job.go | 21 +++++++++++++++++++-- http_job_queue.go | 19 +++++++++++++++---- job.go | 1 + package_test.go | 2 ++ processor.go | 4 ++-- step_check_cancellation.go | 2 +- 9 files changed, 49 insertions(+), 18 deletions(-) diff --git a/amqp_job.go b/amqp_job.go index dd0726bce..2d7319fe1 100644 --- a/amqp_job.go +++ b/amqp_job.go @@ -184,6 +184,6 @@ func (j *amqpJob) sendStateUpdate(ctx gocontext.Context, event, state string) er }) } -func (j *amqpJob) Name() string { - return "amqp" -} +func (j *amqpJob) SetupContext(ctx gocontext.Context) gocontext.Context { return ctx } + +func (j *amqpJob) Name() string { return "amqp" } diff --git a/cli.go b/cli.go index e7534ca24..31aefdcb3 100644 --- a/cli.go +++ b/cli.go @@ -486,7 +486,7 @@ func (i *CLI) signalHandler() { signal.Notify(signalChan, syscall.SIGTERM, syscall.SIGINT, syscall.SIGUSR1, syscall.SIGTTIN, syscall.SIGTTOU, - syscall.SIGWINCH) + syscall.SIGUSR2) for { select { @@ -504,8 +504,8 @@ func (i *CLI) signalHandler() { case syscall.SIGTTOU: i.logger.Info("SIGTTOU received, removing processor from pool") i.ProcessorPool.Decr() - case syscall.SIGWINCH: - i.logger.Warn("SIGWINCH received, toggling graceful shutdown and pause") + case syscall.SIGUSR2: + i.logger.Warn("SIGUSR2 received, toggling graceful shutdown and pause") i.ProcessorPool.GracefulShutdown(true) case syscall.SIGUSR1: i.logProcessorInfo("received SIGUSR1") diff --git a/file_job.go b/file_job.go index 5728a08ef..a84adb785 100644 --- a/file_job.go +++ b/file_job.go @@ -109,6 +109,6 @@ func (j *fileJob) LogWriter(ctx gocontext.Context, defaultLogTimeout time.Durati return newFileLogWriter(ctx, j.logFile, logTimeout) } -func (j *fileJob) Name() string { - return "file" -} +func (j *fileJob) SetupContext(ctx gocontext.Context) gocontext.Context { return ctx } + +func (j *fileJob) Name() string { return "file" } diff --git a/http_job.go b/http_job.go index 1a0d75046..dd2e7ad9c 100644 --- a/http_job.go +++ b/http_job.go @@ -29,6 +29,7 @@ type httpJob struct { refreshClaim func(gocontext.Context) deleteSelf func(gocontext.Context) error + cancelSelf func(gocontext.Context) } type jobScriptPayload struct { @@ -221,12 +222,28 @@ func (j *httpJob) sendStateUpdate(ctx gocontext.Context, curState, newState stri defer resp.Body.Close() if resp.StatusCode != http.StatusOK { + j.handleStateUpdateError(ctx, resp.StatusCode, newState) return errors.Errorf("expected %d, but got %d", http.StatusOK, resp.StatusCode) } return nil } -func (j *httpJob) Name() string { - return "http" +func (j *httpJob) handleStateUpdateError(ctx gocontext.Context, status int, newState string) { + if status != http.StatusConflict { + return + } + + if newState == "received" || newState == "started" { + // NOTE: receiving a conflict response when attempting to transition to + // 'received' or 'started' means that the job is potentially being run + // by multiple workers. Assume the worst and cancel self. + j.cancelSelf(ctx) + } } + +func (j *httpJob) SetupContext(ctx gocontext.Context) gocontext.Context { + return context.FromJWT(ctx, j.payload.JWT) +} + +func (j *httpJob) Name() string { return "http" } diff --git a/http_job_queue.go b/http_job_queue.go index f4266722b..6abda4b4e 100644 --- a/http_job_queue.go +++ b/http_job_queue.go @@ -154,8 +154,16 @@ func (q *HTTPJobQueue) pollForJob(ctx gocontext.Context, buildJobChan chan Job) }).Info("sent job to output channel") return true, readyChan case <-ctx.Done(): - // best-effort delete with a meaningless context :ok_hand: - _ = q.deleteJob(gocontext.TODO(), jobID) + if j, ok := buildJob.(*httpJob); ok { + if processorID, ok := context.ProcessorFromContext(ctx); ok { + // best-effort delete + delCtx := context.FromProcessor( + context.FromJWT(gocontext.TODO(), j.payload.JWT), + processorID) + logger.WithField("job_id", jobID).Warn("context done; deleting job") + _ = q.deleteJob(delCtx, jobID) + } + } logger.WithField("err", ctx.Err()).Warn("returning from jobs loop due to context done") return false, nil } @@ -227,7 +235,7 @@ func (q *HTTPJobQueue) deleteJob(ctx gocontext.Context, jobID uint64) error { jwt, ok := context.JWTFromContext(ctx) if !ok { - return fmt.Errorf("failed to find jwt in context for job_id=%v", jobID) + return errors.New("failed to delete job; no jwt in context") } processorID, ok := context.ProcessorFromContext(ctx) @@ -305,7 +313,7 @@ func (q *HTTPJobQueue) refreshJobClaim(ctx gocontext.Context, jobID uint64) erro jwt, ok := context.JWTFromContext(ctx) if !ok { - return fmt.Errorf("failed to find jwt in context for job_id=%v", jobID) + return errors.New("failed to refresh claim; no jwt in context") } processorID, ok := context.ProcessorFromContext(ctx) @@ -373,6 +381,9 @@ func (q *HTTPJobQueue) fetchJob(ctx gocontext.Context, jobID uint64) (Job, <-cha deleteSelf: func(ctx gocontext.Context) error { return q.deleteJob(ctx, jobID) }, + cancelSelf: func(ctx gocontext.Context) { + q.cb.Broadcast(jobID) + }, } startAttrs := &httpJobPayloadStartAttrs{ Data: &jobPayloadStartAttrs{ diff --git a/job.go b/job.go index efaeac7f1..b0d75fe15 100644 --- a/job.go +++ b/job.go @@ -93,4 +93,5 @@ type Job interface { LogWriter(gocontext.Context, time.Duration) (LogWriter, error) Name() string + SetupContext(gocontext.Context) gocontext.Context } diff --git a/package_test.go b/package_test.go index c641a793d..657cfdca7 100644 --- a/package_test.go +++ b/package_test.go @@ -112,6 +112,8 @@ func (fj *fakeJob) LogWriter(_ gocontext.Context, _ time.Duration) (LogWriter, e return &fakeLogWriter{broken: fj.hasBrokenLogWriter}, nil } +func (j *fakeJob) SetupContext(ctx gocontext.Context) gocontext.Context { return ctx } + func (fj *fakeJob) Name() string { return "fake" } type fakeLogWriter struct { diff --git a/processor.go b/processor.go index 1a711735a..e64bafc65 100644 --- a/processor.go +++ b/processor.go @@ -206,8 +206,8 @@ func (p *Processor) process(ctx gocontext.Context, buildJob Job) { state := new(multistep.BasicStateBag) state.Put("hostname", p.ID) state.Put("buildJob", buildJob) - state.Put("procCtx", p.ctx) - state.Put("ctx", ctx) + state.Put("procCtx", buildJob.SetupContext(p.ctx)) + state.Put("ctx", buildJob.SetupContext(ctx)) logger := context.LoggerFromContext(ctx).WithFields(logrus.Fields{ "job_id": buildJob.Payload().Job.ID, diff --git a/step_check_cancellation.go b/step_check_cancellation.go index a9e3421dd..aa035f48e 100644 --- a/step_check_cancellation.go +++ b/step_check_cancellation.go @@ -23,7 +23,7 @@ func (s *stepCheckCancellation) Run(state multistep.StateBag) multistep.StepActi } else { err := buildJob.Finish(procCtx, FinishStateCancelled) if err != nil { - context.LoggerFromContext(ctx).WithField("err", err).WithField("state", state).Error("couldn't update job state") + context.LoggerFromContext(ctx).WithField("err", err).WithField("state", FinishStateCancelled).Error("couldn't update job state") } } return multistep.ActionHalt From bc31e8d1f077938067ef3ab4d56dc80386a4a2b7 Mon Sep 17 00:00:00 2001 From: Dan Buch Date: Mon, 16 Oct 2017 12:18:24 -0400 Subject: [PATCH 07/13] Build HTTP state update body more like AMQP to hopefully correct inconsistencies! --- http_job.go | 53 ++++++++++++++++++++++++++++------------------------- 1 file changed, 28 insertions(+), 25 deletions(-) diff --git a/http_job.go b/http_job.go index dd2e7ad9c..df0f9b8d8 100644 --- a/http_job.go +++ b/http_job.go @@ -47,20 +47,6 @@ type httpJobPayload struct { ImageName string `json:"image_name"` } -type httpJobStateUpdate struct { - CurrentState string `json:"cur"` - NewState string `json:"new"` - Queued *time.Time `json:"queued_at,omitempty"` - Received time.Time `json:"received_at,omitempty"` - Started time.Time `json:"started_at,omitempty"` - Finished time.Time `json:"finished_at,omitempty"` - Meta *httpJobStateUpdateMeta `json:"meta,omitempty"` -} - -type httpJobStateUpdateMeta struct { - StateUpdateCount uint `json:"state_update_count,omitempty"` -} - func (j *httpJob) GoString() string { return fmt.Sprintf("&httpJob{payload: %#v, startAttributes: %#v}", j.payload, j.startAttributes) @@ -174,20 +160,37 @@ func (j *httpJob) Generate(ctx gocontext.Context, job Job) ([]byte, error) { return script, nil } -func (j *httpJob) sendStateUpdate(ctx gocontext.Context, curState, newState string) error { - j.stateCount++ - payload := &httpJobStateUpdate{ - CurrentState: curState, - NewState: newState, - Queued: j.Payload().Job.QueuedAt, - Received: j.received, - Started: j.started, - Finished: j.finished, - Meta: &httpJobStateUpdateMeta{ - StateUpdateCount: j.stateCount, +func (j *httpJob) createStateUpdateBody(curState, newState string) map[string]interface{} { + body := map[string]interface{}{ + "id": j.Payload().Job.ID, + "state": newState, + "cur": curState, + "new": newState, + "meta": map[string]interface{}{ + "state_update_count": j.stateCount, }, } + if j.Payload().Job.QueuedAt != nil { + body["queued_at"] = j.Payload().Job.QueuedAt.UTC().Format(time.RFC3339) + } + if !j.received.IsZero() { + body["received_at"] = j.received.UTC().Format(time.RFC3339) + } + if !j.started.IsZero() { + body["started_at"] = j.started.UTC().Format(time.RFC3339) + } + if !j.finished.IsZero() { + body["finished_at"] = j.finished.UTC().Format(time.RFC3339) + } + + return body +} + +func (j *httpJob) sendStateUpdate(ctx gocontext.Context, curState, newState string) error { + j.stateCount++ + payload := j.createStateUpdateBody(curState, newState) + encodedPayload, err := json.Marshal(payload) if err != nil { return errors.Wrap(err, "error encoding json") From c58c0fdc07b732584b9a1233fb11a30f59ab8d53 Mon Sep 17 00:00:00 2001 From: Dan Buch Date: Mon, 16 Oct 2017 14:16:02 -0400 Subject: [PATCH 08/13] Account for context done-ness when reporting some errors --- http_job_queue.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/http_job_queue.go b/http_job_queue.go index 6abda4b4e..70d97fa45 100644 --- a/http_job_queue.go +++ b/http_job_queue.go @@ -485,7 +485,8 @@ func (q *HTTPJobQueue) generateJobRefreshClaimFunc(jobID uint64) (func(gocontext for { err := q.refreshJobClaim(ctx, jobID) - if err == httpJobRefreshClaimErr { + if err == httpJobRefreshClaimErr && ctx.Err() == nil { + // NOTE: indicates an error while context is not yet done context.LoggerFromContext(ctx).WithFields(logrus.Fields{ "err": err, "job_id": jobID, @@ -494,7 +495,7 @@ func (q *HTTPJobQueue) generateJobRefreshClaimFunc(jobID uint64) (func(gocontext return } - if err != nil { + if err != nil && ctx.Err() == nil { context.LoggerFromContext(ctx).WithFields(logrus.Fields{ "err": err, "job_id": jobID, From 99cb6755fd3c3d508ce8bfbb1b551ee209139900 Mon Sep 17 00:00:00 2001 From: Dan Buch Date: Mon, 16 Oct 2017 11:52:23 -0400 Subject: [PATCH 09/13] Switch from dotty to dashy container hostname Closes #391 --- backend/docker.go | 4 ++-- backend/docker_test.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/backend/docker.go b/backend/docker.go index f2c7d4139..2d474ed1b 100644 --- a/backend/docker.go +++ b/backend/docker.go @@ -707,7 +707,7 @@ func findDockerImageByTag(searchTags []string, images []dockertypes.ImageSummary } func containerNameFromContext(ctx gocontext.Context) string { - randName := fmt.Sprintf("travis-job.unk.unk.%s", uuid.NewRandom()) + randName := fmt.Sprintf("travis-job-unk-unk-%s", uuid.NewRandom()) jobID, ok := context.JobIDFromContext(ctx) if !ok { return randName @@ -733,5 +733,5 @@ func containerNameFromContext(ctx gocontext.Context) string { nameParts = append(nameParts, cleanedPart) } - return strings.Join(append(nameParts, fmt.Sprintf("%v", jobID)), ".") + return strings.Join(append(nameParts, fmt.Sprintf("%v", jobID)), "-") } diff --git a/backend/docker_test.go b/backend/docker_test.go index 766b73b1a..6f1938aea 100644 --- a/backend/docker_test.go +++ b/backend/docker_test.go @@ -589,11 +589,11 @@ func TestDocker_containerNameFromContext(t *testing.T) { for _, tc := range []struct{ r, n string }{ { r: "friendly/fribble", - n: fmt.Sprintf("travis-job.friendly.fribble.%v", jobID), + n: fmt.Sprintf("travis-job-friendly-fribble-%v", jobID), }, { r: "very-SiLlY.nAmE.wat/por-cu-pine", - n: fmt.Sprintf("travis-job.very-SiLlY-nAm.por-cu-pine.%v", jobID), + n: fmt.Sprintf("travis-job-very-SiLlY-nAm-por-cu-pine-%v", jobID), }, } { ctx := context.FromRepository(context.FromJobID(gocontext.TODO(), jobID), tc.r) From aa6701521e173d90baed33002eeb9b92bc0fb7dc Mon Sep 17 00:00:00 2001 From: Dan Buch Date: Mon, 16 Oct 2017 11:54:02 -0400 Subject: [PATCH 10/13] Add changelog entry for dashy container hostname change --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 542b8660c..a29201341 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/). ### Removed ### Fixed +- backend/docker: switch to container hostname with dashes instead of dots ### Security From 673e2190ab2648147b5498b142e0637f2a4aef7a Mon Sep 17 00:00:00 2001 From: Dan Buch Date: Mon, 16 Oct 2017 12:44:56 -0400 Subject: [PATCH 11/13] Fix test for docker container hostname :sweat_smile: --- backend/docker_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/backend/docker_test.go b/backend/docker_test.go index 6f1938aea..55f77e3ae 100644 --- a/backend/docker_test.go +++ b/backend/docker_test.go @@ -601,8 +601,8 @@ func TestDocker_containerNameFromContext(t *testing.T) { } randName := containerNameFromContext(gocontext.TODO()) - randParts := strings.Split(randName, ".") - assert.Len(t, randParts, 4) - assert.Equal(t, "unk", randParts[1]) + randParts := strings.Split(randName, "-") + assert.Len(t, randParts, 9) assert.Equal(t, "unk", randParts[2]) + assert.Equal(t, "unk", randParts[3]) } From 098d68859978a0453ce212b24842060bb074a953 Mon Sep 17 00:00:00 2001 From: Dan Buch Date: Mon, 16 Oct 2017 16:02:55 -0400 Subject: [PATCH 12/13] Update changelog for HTTP-based job delivery whackamole changes --- CHANGELOG.md | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index a29201341..12c3cbcc3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,10 +6,14 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/). ### Added - backend/docker: support for bind-mounted volumes via space-delimited, colon-paired values in `TRAVIS_WORKER_DOCKER_BINDS` +- http-job-queue: configurable new job polling interval +- http-job: configurable job refresh claim interval ### Changed -- backend/gce: - - add site tag to job vms +- backend/gce: add site tag to job vms +- http-job: + - account for transitional states when handling state update conflicts + - delete self under various error conditions indicative of a requeue ### Deprecated @@ -17,6 +21,7 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/). ### Fixed - backend/docker: switch to container hostname with dashes instead of dots +- http-job: conditional inclusion of state-related timestamps in state updates ### Security From 3d4228a9326c3846a23e545285eaa5fa1caf3ffc Mon Sep 17 00:00:00 2001 From: Dan Buch Date: Mon, 16 Oct 2017 21:55:08 -0400 Subject: [PATCH 13/13] Mark job errored on unknown execution error --- CHANGELOG.md | 1 + step_run_script.go | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 12c3cbcc3..718463599 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/). ### Fixed - backend/docker: switch to container hostname with dashes instead of dots - http-job: conditional inclusion of state-related timestamps in state updates +- step-run-script: mark job errored on unknown execution error such as poweroff ### Security diff --git a/step_run_script.go b/step_run_script.go index f9116ba8b..cf085498b 100644 --- a/step_run_script.go +++ b/step_run_script.go @@ -73,6 +73,10 @@ func (s *stepRunScript) Run(state multistep.StateBag) multistep.StepAction { } } else { logger.WithField("err", r.err).WithField("completed", r.result.Completed).Error("couldn't run script") + err := buildJob.Finish(procCtx, FinishStateErrored) + if err != nil { + logger.WithField("err", err).Error("couldn't mark job errored") + } } return multistep.ActionHalt