Skip to content

Commit

Permalink
Merge pull request #490 from travis-ci/igor-log-part-received-at
Browse files Browse the repository at this point in the history
measure log processing SLI
  • Loading branch information
igorwwwwwwwwwwwwwwwwwwww authored Aug 29, 2018
2 parents 3d2df5f + babc057 commit ae267b5
Show file tree
Hide file tree
Showing 10 changed files with 58 additions and 23 deletions.
6 changes: 4 additions & 2 deletions amqp_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (j *amqpJob) LogWriter(ctx gocontext.Context, defaultLogTimeout time.Durati
logTimeout = defaultLogTimeout
}

return newAMQPLogWriter(ctx, j.logWriterChan, j.payload.Job.ID, logTimeout, j.withLogSharding)
return newAMQPLogWriter(ctx, j.logWriterChan, j.payload.Job.ID, j.Payload().Job.QueuedAt, logTimeout, j.withLogSharding)
}

func (j *amqpJob) createStateUpdateBody(ctx gocontext.Context, state string) map[string]interface{} {
Expand Down Expand Up @@ -186,7 +186,9 @@ func (j *amqpJob) sendStateUpdate(ctx gocontext.Context, event, state string) er
return err.(error)
}

func (j *amqpJob) SetupContext(ctx gocontext.Context) gocontext.Context { return ctx }
func (j *amqpJob) SetupContext(ctx gocontext.Context) gocontext.Context {
return ctx
}

func (j *amqpJob) Name() string { return "amqp" }

Expand Down
51 changes: 34 additions & 17 deletions amqp_log_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,26 @@ import (
)

type amqpLogPart struct {
JobID uint64 `json:"id"`
Content string `json:"log"`
Number int `json:"number"`
UUID string `json:"uuid"`
Final bool `json:"final"`
JobID uint64 `json:"id"`
Content string `json:"log"`
Number int `json:"number"`
UUID string `json:"uuid"`
Final bool `json:"final"`
QueuedAt *time.Time `json:"queued_at,omitempty"`
}

type amqpLogWriter struct {
ctx gocontext.Context
jobID uint64
sharded bool
ctx gocontext.Context
jobID uint64
jobQueuedAt *time.Time
sharded bool

closeChan chan struct{}

bufferMutex sync.Mutex
buffer *bytes.Buffer
logPartNumber int
jobStarted bool

bytesWritten int
maxLength int
Expand All @@ -44,17 +47,18 @@ type amqpLogWriter struct {
timeout time.Duration
}

func newAMQPLogWriter(ctx gocontext.Context, logWriterChan *amqp.Channel, jobID uint64, timeout time.Duration, sharded bool) (*amqpLogWriter, error) {
func newAMQPLogWriter(ctx gocontext.Context, logWriterChan *amqp.Channel, jobID uint64, jobQueuedAt *time.Time, timeout time.Duration, sharded bool) (*amqpLogWriter, error) {

writer := &amqpLogWriter{
ctx: context.FromComponent(ctx, "log_writer"),
amqpChan: logWriterChan,
jobID: jobID,
closeChan: make(chan struct{}),
buffer: new(bytes.Buffer),
timer: time.NewTimer(time.Hour),
timeout: timeout,
sharded: sharded,
ctx: context.FromComponent(ctx, "log_writer"),
amqpChan: logWriterChan,
jobID: jobID,
jobQueuedAt: jobQueuedAt,
closeChan: make(chan struct{}),
buffer: new(bytes.Buffer),
timer: time.NewTimer(time.Hour),
timeout: timeout,
sharded: sharded,
}

context.LoggerFromContext(ctx).WithFields(logrus.Fields{
Expand Down Expand Up @@ -127,6 +131,10 @@ func (w *amqpLogWriter) SetMaxLogLength(bytes int) {
w.maxLength = bytes
}

func (w *amqpLogWriter) SetJobStarted() {
w.jobStarted = true
}

// WriteAndClose works like a Write followed by a Close, but ensures that no
// other Writes are allowed in between.
func (w *amqpLogWriter) WriteAndClose(p []byte) (int, error) {
Expand Down Expand Up @@ -222,6 +230,15 @@ func (w *amqpLogWriter) flush() {
func (w *amqpLogWriter) publishLogPart(part amqpLogPart) error {
part.UUID, _ = context.UUIDFromContext(w.ctx)

// we emit the queued_at field on the log part to indicate that
// this is when the job started running. downstream consumers of
// the log parts (travis-logs) can then use the timestamp to compute
// a "time to first log line" metric.
if w.jobStarted {
part.QueuedAt = w.jobQueuedAt
w.jobStarted = false
}

partBody, err := json.Marshal(part)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion amqp_log_writer_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (l *AMQPLogWriterFactory) LogWriter(ctx gocontext.Context, defaultLogTimeou
logTimeout = defaultLogTimeout
}

return newAMQPLogWriter(ctx, l.logWriterChan, job.Payload().Job.ID, logTimeout, l.withLogSharding)
return newAMQPLogWriter(ctx, l.logWriterChan, job.Payload().Job.ID, job.Payload().Job.QueuedAt, logTimeout, l.withLogSharding)
}

func (l *AMQPLogWriterFactory) Cleanup() error {
Expand Down
9 changes: 6 additions & 3 deletions amqp_log_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ func TestAMQPLogWriterWrite(t *testing.T) {
defer amqpChan.Close()

uuid := uuid.NewRandom()
queuedAt := time.Now()
ctx := workerctx.FromUUID(context.TODO(), uuid.String())

logWriter, err := newAMQPLogWriter(ctx, amqpChan, 4, time.Hour, false)
logWriter, err := newAMQPLogWriter(ctx, amqpChan, 4, &queuedAt, time.Hour, false)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -74,9 +75,10 @@ func TestAMQPLogWriterClose(t *testing.T) {
defer amqpChan.Close()

uuid := uuid.NewRandom()
queuedAt := time.Now()
ctx := workerctx.FromUUID(context.TODO(), uuid.String())

logWriter, err := newAMQPLogWriter(ctx, amqpChan, 4, time.Hour, false)
logWriter, err := newAMQPLogWriter(ctx, amqpChan, 4, &queuedAt, time.Hour, false)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -122,9 +124,10 @@ func TestAMQPMaxLogLength(t *testing.T) {
defer amqpChan.Close()

uuid := uuid.NewRandom()
queuedAt := time.Now()
ctx := workerctx.FromUUID(context.TODO(), uuid.String())

logWriter, err := newAMQPLogWriter(ctx, amqpChan, 4, time.Hour, false)
logWriter, err := newAMQPLogWriter(ctx, amqpChan, 4, &queuedAt, time.Hour, false)
if err != nil {
t.Fatal(err)
}
Expand Down
2 changes: 2 additions & 0 deletions file_log_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ func (w *fileLogWriter) SetMaxLogLength(n int) {
return
}

func (w *fileLogWriter) SetJobStarted() {}

func (w *fileLogWriter) Timeout() <-chan time.Time {
return w.timer.C
}
Expand Down
2 changes: 2 additions & 0 deletions http_log_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ func (w *httpLogWriter) SetMaxLogLength(bytes int) {
w.maxLength = bytes
}

func (w *httpLogWriter) SetJobStarted() {}

func (w *httpLogWriter) WriteAndClose(p []byte) (int, error) {
if w.closed() {
return 0, fmt.Errorf("log already closed")
Expand Down
1 change: 1 addition & 0 deletions log_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,5 @@ type LogWriter interface {
WriteAndClose([]byte) (int, error)
Timeout() <-chan time.Time
SetMaxLogLength(int)
SetJobStarted()
}
2 changes: 2 additions & 0 deletions package_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,3 +146,5 @@ func (flw *fakeLogWriter) Timeout() <-chan time.Time {
}

func (flw *fakeLogWriter) SetMaxLogLength(_ int) {}

func (flw *fakeLogWriter) SetJobStarted() {}
3 changes: 3 additions & 0 deletions step_update_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ func (s *stepUpdateState) Run(state multistep.StateBag) multistep.StepAction {
buildJob := state.Get("buildJob").(Job)
instance := state.Get("instance").(backend.Instance)
processedAt := state.Get("processedAt").(time.Time)
logWriter := state.Get("logWriter").(LogWriter)

logger := context.LoggerFromContext(ctx).WithField("self", "step_update_state")

Expand All @@ -26,6 +27,8 @@ func (s *stepUpdateState) Run(state multistep.StateBag) multistep.StepAction {
state.Put("ctx", ctx)
}

logWriter.SetJobStarted()

err := buildJob.Started(ctx)
if err != nil {
context.LoggerFromContext(ctx).WithFields(logrus.Fields{
Expand Down
3 changes: 3 additions & 0 deletions step_write_worker_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ func (w *byteBufferLogWriter) Timeout() <-chan time.Time {
func (w *byteBufferLogWriter) SetMaxLogLength(m int) {
}

func (w *byteBufferLogWriter) SetJobStarted() {
}

func setupStepWriteWorkerInfo() (*stepWriteWorkerInfo, *byteBufferLogWriter, multistep.StateBag) {
s := &stepWriteWorkerInfo{}

Expand Down

0 comments on commit ae267b5

Please sign in to comment.