diff --git a/CHANGELOG.md b/CHANGELOG.md index c218a6ce1..5a0ec3106 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/). ## [Unreleased] ### Added +- backend/gce: first edition of worker-agent (building block for resumable job execution) ### Changed - ratelimit: trace redis connection pool checkout diff --git a/backend/backend.go b/backend/backend.go index 92595fa9a..ada2272c6 100644 --- a/backend/backend.go +++ b/backend/backend.go @@ -15,6 +15,8 @@ var ( ) var ErrDownloadTraceNotImplemented = errors.New("DownloadTrace not implemented") +var ErrInstallAgentNotImplemented = errors.New("InstallAgent not implemented") +var ErrInstanceIPNotImplemented = errors.New("Instance.IP not implemented") // Backend wraps up an alias, backend provider help, and a factory func for a // given backend provider wheee diff --git a/backend/docker.go b/backend/docker.go index 32b54fd81..0eec2f303 100644 --- a/backend/docker.go +++ b/backend/docker.go @@ -662,6 +662,10 @@ func (i *dockerInstance) uploadScriptSCP(ctx gocontext.Context, script []byte) e return nil } +func (i *dockerInstance) InstallAgent(ctx gocontext.Context) error { + return ErrInstallAgentNotImplemented +} + func (i *dockerInstance) RunScript(ctx gocontext.Context, output io.Writer) (*RunResult, error) { if i.runNative { return i.runScriptExec(ctx, output) @@ -851,6 +855,10 @@ func (i *dockerInstance) ID() string { return i.container.ID[0:7] } +func (i *dockerInstance) IP(ctx gocontext.Context) (string, error) { + return "", ErrInstanceIPNotImplemented +} + func (i *dockerInstance) ImageName() string { return i.imageName } diff --git a/backend/fake.go b/backend/fake.go index 04d716fc6..ad0a1c048 100644 --- a/backend/fake.go +++ b/backend/fake.go @@ -69,6 +69,10 @@ func (i *fakeInstance) UploadScript(ctx context.Context, script []byte) error { return nil } +func (i *fakeInstance) InstallAgent(ctx context.Context) error { + return ErrInstallAgentNotImplemented +} + func (i *fakeInstance) RunScript(ctx context.Context, writer io.Writer) (*RunResult, error) { if i.p.cfg.Get("ERROR") == "true" { return &RunResult{Completed: false}, errors.New("fake provider is configured to error all jobs") @@ -102,6 +106,10 @@ func (i *fakeInstance) ID() string { return "fake" } +func (i *fakeInstance) IP(ctx context.Context) (string, error) { + return "", ErrInstanceIPNotImplemented +} + func (i *fakeInstance) ImageName() string { return "fake" } diff --git a/backend/gce.go b/backend/gce.go index b2b1102ef..c620e1c85 100644 --- a/backend/gce.go +++ b/backend/gce.go @@ -1756,6 +1756,62 @@ func (i *gceInstance) isPreempted(ctx gocontext.Context) (bool, error) { return preempted, err } +func (i *gceInstance) InstallAgent(ctx gocontext.Context) error { + var conn remote.Remoter + var err error + + // TODO: windows support + if i.os == "windows" { + return errors.New("agent is not supported on windows yet") + } + + agentBinaryFile := "/Users/bogdana/go/src/github.com/travis-ci/worker-agent/worker-agent-linux-amd64" + + agentBinaryBytes, err := ioutil.ReadFile(agentBinaryFile) + if err != nil { + return errors.Wrap(err, "couldn't read local agent binary") + } + + conn, err = i.sshConnection(ctx) + if err != nil { + return errors.Wrap(err, "couldn't connect to remote server for agent upload") + } + defer conn.Close() + + uploadDest := "/tmp/worker-agent" + + context.LoggerFromContext(ctx).WithFields(logrus.Fields{ + "dest": uploadDest, + "self": "backend/gce_instance", + }).Debug("uploading agent") + + existed, err := conn.UploadFile(uploadDest, agentBinaryBytes) + if existed { + i.progresser.Progress(&ProgressEntry{ + Message: "existing script detected", + State: ProgressFailure, + Interrupts: true, + }) + return ErrStaleVM + } + if err != nil { + return errors.Wrap(err, "couldn't upload agent") + } + + err = conn.Chmod(uploadDest, 0755) + if err != nil { + return errors.Wrap(err, "couldn't chmod agent") + } + + agentCommand := "nohup bash -c '/tmp/worker-agent &> /tmp/worker-agent.out &'" + _, err = conn.RunCommand(agentCommand, ioutil.Discard) + if err != nil { + return errors.Wrap(err, "couldn't run agent") + } + + return nil +} + func (i *gceInstance) RunScript(ctx gocontext.Context, output io.Writer) (*RunResult, error) { var conn remote.Remoter var err error @@ -1919,6 +1975,10 @@ func (i *gceInstance) ID() string { return i.instance.Name } +func (i *gceInstance) IP(ctx gocontext.Context) (string, error) { + return i.getCachedIP(ctx) +} + func (i *gceInstance) ImageName() string { return i.imageName } diff --git a/backend/jupiterbrain.go b/backend/jupiterbrain.go index 70044cb1d..e71ad9c17 100644 --- a/backend/jupiterbrain.go +++ b/backend/jupiterbrain.go @@ -441,6 +441,10 @@ func (i *jupiterBrainInstance) UploadScript(ctx gocontext.Context, script []byte return nil } +func (i *jupiterBrainInstance) InstallAgent(ctx gocontext.Context) error { + return ErrInstallAgentNotImplemented +} + func (i *jupiterBrainInstance) RunScript(ctx gocontext.Context, output io.Writer) (*RunResult, error) { conn, err := i.sshConnection() if err != nil { @@ -499,6 +503,10 @@ func (i *jupiterBrainInstance) ID() string { return i.payload.ID } +func (i *jupiterBrainInstance) IP(ctx gocontext.Context) (string, error) { + return "", ErrInstanceIPNotImplemented +} + func (i *jupiterBrainInstance) ImageName() string { if i.payload == nil { return "{unidentified}" diff --git a/backend/local.go b/backend/local.go index 185ca01ac..1a08800f3 100644 --- a/backend/local.go +++ b/backend/local.go @@ -92,6 +92,10 @@ func (i *localInstance) UploadScript(ctx gocontext.Context, script []byte) error return err } +func (i *localInstance) InstallAgent(ctx gocontext.Context) error { + return ErrInstallAgentNotImplemented +} + func (i *localInstance) RunScript(ctx gocontext.Context, writer io.Writer) (*RunResult, error) { if i.scriptPath == "" { return &RunResult{Completed: false}, errNoScriptUploaded @@ -138,6 +142,10 @@ func (i *localInstance) ID() string { return fmt.Sprintf("local:%s", i.scriptPath) } +func (i *localInstance) IP(ctx gocontext.Context) (string, error) { + return "", ErrInstanceIPNotImplemented +} + func (i *localInstance) ImageName() string { return "" } diff --git a/backend/openstack.go b/backend/openstack.go index 4dc0b102b..fc146b925 100644 --- a/backend/openstack.go +++ b/backend/openstack.go @@ -629,6 +629,10 @@ func (i *osInstance) UploadScript(ctx gocontext.Context, script []byte) error { return nil } +func (i *osInstance) InstallAgent(ctx gocontext.Context) error { + return ErrInstallAgentNotImplemented +} + func (i *osInstance) RunScript(ctx gocontext.Context, output io.Writer) (*RunResult, error) { conn, err := i.sshConnection() if err != nil { @@ -670,6 +674,10 @@ func (i *osInstance) ID() string { return i.instance.ID } +func (i *osInstance) IP(ctx gocontext.Context) (string, error) { + return "", ErrInstanceIPNotImplemented +} + func (i *osInstance) ImageName() string { return i.imageName } diff --git a/backend/package.go b/backend/package.go index 5cf513a6c..d3472846b 100644 --- a/backend/package.go +++ b/backend/package.go @@ -85,6 +85,10 @@ type Instance interface { // method should not be called multiple times. UploadScript(gocontext.Context, []byte) error + // InstallAgent uploads the worker-agent binary and starts + // it as a background process + InstallAgent(gocontext.Context) error + // RunScript runs the build script that was uploaded with the // UploadScript method. RunScript(gocontext.Context, io.Writer) (*RunResult, error) @@ -98,6 +102,11 @@ type Instance interface { // ID is used when identifying the instance in logs and such ID() string + // IP is the ip or hostname of the instance + // TODO: think about a more general struct that includes + // the pod, project, maybe even region + IP(ctx gocontext.Context) (string, error) + // ImageName is the name of the image used to boot the instance ImageName() string diff --git a/config/config.go b/config/config.go index 40ad8065c..c2e5cdb7c 100644 --- a/config/config.go +++ b/config/config.go @@ -275,6 +275,9 @@ var ( Usage: "sample rate for trace as an inverse fraction - for sample rate n, every nth event will be sampled", Value: 1, }), + NewConfigDef("AgentEnabled", &cli.BoolFlag{ + Usage: "Experimental: Use worker-agent for resumable job execution", + }), } // Flags is the list of all CLI flags accepted by travis-worker @@ -437,6 +440,7 @@ type Config struct { StackdriverProjectID string `config:"stackdriver-project-id"` OpencensusTracingEnabled bool `config:"opencensus-tracing-enabled"` OpencensusSamplingRate int `config:"opencensus-sampling-rate"` + AgentEnabled bool `config:"agent-enabled"` ProviderConfig *ProviderConfig } diff --git a/processor.go b/processor.go index 62abfd96d..2c2467aaa 100644 --- a/processor.go +++ b/processor.go @@ -243,6 +243,7 @@ func (p *Processor) process(ctx gocontext.Context, buildJob Job) { &stepCheckCancellation{}, &stepUploadScript{ uploadTimeout: p.config.ScriptUploadTimeout, + agentEnabled: p.config.AgentEnabled, }, &stepCheckCancellation{}, &stepUpdateState{}, @@ -252,6 +253,7 @@ func (p *Processor) process(ctx gocontext.Context, buildJob Job) { logTimeout: logTimeout, hardTimeout: buildJob.StartAttributes().HardTimeout, skipShutdownOnLogTimeout: p.config.SkipShutdownOnLogTimeout, + agentEnabled: p.config.AgentEnabled, }, &stepDownloadTrace{ persister: p.persister, diff --git a/remote/package.go b/remote/package.go index 173a62570..97ebd6890 100644 --- a/remote/package.go +++ b/remote/package.go @@ -1,10 +1,14 @@ package remote -import "io" +import ( + "io" + "os" +) type Remoter interface { UploadFile(path string, data []byte) (bool, error) DownloadFile(path string) ([]byte, error) RunCommand(command string, output io.Writer) (int32, error) + Chmod(path string, mode os.FileMode) error Close() error } diff --git a/ssh/package.go b/ssh/package.go index b85f35657..6b307e059 100644 --- a/ssh/package.go +++ b/ssh/package.go @@ -22,6 +22,7 @@ type Connection interface { UploadFile(path string, data []byte) (bool, error) DownloadFile(path string) ([]byte, error) RunCommand(command string, output io.Writer) (int32, error) + Chmod(path string, mode os.FileMode) error Close() error } @@ -193,6 +194,21 @@ func (c *sshConnection) RunCommand(command string, output io.Writer) (int32, err } } +func (c *sshConnection) Chmod(path string, mode os.FileMode) error { + sftp, err := sftp.NewClient(c.client) + if err != nil { + return errors.Wrap(err, "couldn't create SFTP client") + } + defer sftp.Close() + + err = sftp.Chmod(path, mode) + if err != nil { + return errors.Wrap(err, "couldn't chmod file") + } + + return nil +} + func (c *sshConnection) Close() error { return c.client.Close() } diff --git a/step_run_script.go b/step_run_script.go index 529e152ec..adeac5142 100644 --- a/step_run_script.go +++ b/step_run_script.go @@ -2,6 +2,7 @@ package worker import ( "fmt" + "io" "time" gocontext "context" @@ -9,9 +10,11 @@ import ( "github.com/mitchellh/multistep" "github.com/pkg/errors" "github.com/sirupsen/logrus" + agent "github.com/travis-ci/worker-agent/agent" "github.com/travis-ci/worker/backend" "github.com/travis-ci/worker/context" "go.opencensus.io/trace" + "google.golang.org/grpc" ) var MaxLogLengthExceeded = errors.New("maximum log length exceeded") @@ -26,6 +29,7 @@ type stepRunScript struct { logTimeout time.Duration hardTimeout time.Duration skipShutdownOnLogTimeout bool + agentEnabled bool } func (s *stepRunScript) Run(state multistep.StateBag) multistep.StepAction { @@ -52,7 +56,13 @@ func (s *stepRunScript) Run(state multistep.StateBag) multistep.StepAction { resultChan := make(chan runScriptReturn, 1) go func() { - result, err := instance.RunScript(ctx, logWriter) + var result *backend.RunResult + var err error + if s.agentEnabled { + result, err = s.runScriptWithAgent(ctx, logWriter, instance) + } else { + result, err = instance.RunScript(ctx, logWriter) + } resultChan <- runScriptReturn{ result: result, err: err, @@ -163,6 +173,74 @@ func (s *stepRunScript) Run(state multistep.StateBag) multistep.StepAction { } } +func (s *stepRunScript) runScriptWithAgent(ctx gocontext.Context, logWriter LogWriter, instance backend.Instance) (*backend.RunResult, error) { + ip, err := instance.IP(ctx) + if err != nil { + return &backend.RunResult{Completed: false}, errors.Wrap(err, "could not get instance ip") + } + address := ip + ":" + agent.PORT + + // TODO: figure out grpc security + conn, err := grpc.Dial(address, grpc.WithInsecure()) + if err != nil { + return &backend.RunResult{Completed: false}, errors.Wrap(err, "could not connect to agent") + } + defer conn.Close() + + c := agent.NewAgentClient(conn) + + // TODO: get these values from state + // TODO: get command and args from instance + _, err = c.RunJob(ctx, &agent.RunJobRequest{ + JobId: "123", + Command: "bash", + CommandArgs: []string{"build.sh"}, + LogTimeoutS: 10, + HardTimeoutS: 10, + MaxLogLength: 10, + }) + + logger := context.LoggerFromContext(ctx).WithField("self", "step_run_script") + + if err != nil { + return &backend.RunResult{Completed: false}, errors.Wrap(err, "could not run job") + } + + stream, err := c.GetLogParts(ctx, &agent.LogPartsRequest{}) + if err != nil { + logger.WithField("err", err).Error("error trying to GetLogParts") + return &backend.RunResult{Completed: false}, errors.Wrap(err, "could not get log parts") + } + + // TODO: figure out how to persist offset + // offset := int64(0) + + for { + part, err := stream.Recv() + if err == io.EOF { + break + } + if err != nil { + logger.WithField("err", err).Error("couldn't get log parts from stream") + return &backend.RunResult{Completed: false}, errors.Wrap(err, "could not get log parts") + } + logWriter.Write([]byte(part.Content)) + + // offset = part.Number + } + + st, err := c.GetJobStatus(ctx, &agent.WorkerRequest{}) + if err != nil { + logger.WithField("err", err).Error("couldn't get job status") + return &backend.RunResult{Completed: false}, errors.Wrap(err, "could not get job status") + } + + return &backend.RunResult{ + Completed: true, + ExitCode: st.ExitCode, + }, nil +} + func (s *stepRunScript) writeLogAndFinishWithState(preTimeoutCtx, ctx gocontext.Context, logWriter LogWriter, buildJob Job, state FinishState, logMessage string) { ctx, span := trace.StartSpan(ctx, "WriteLogAndFinishWithState.RunScript") defer span.End() diff --git a/step_upload_script.go b/step_upload_script.go index a1da403ab..ca1e13cf1 100644 --- a/step_upload_script.go +++ b/step_upload_script.go @@ -16,6 +16,7 @@ import ( type stepUploadScript struct { uploadTimeout time.Duration + agentEnabled bool } func (s *stepUploadScript) Run(state multistep.StateBag) multistep.StepAction { @@ -77,6 +78,35 @@ func (s *stepUploadScript) Run(state multistep.StateBag) multistep.StepAction { "since_processed_ms": time.Since(processedAt).Seconds() * 1e3, }).Info("uploaded script") + if s.agentEnabled { + err := instance.InstallAgent(ctx) + if err != nil { + state.Put("err", err) + + span.SetStatus(trace.Status{ + Code: trace.StatusCodeUnavailable, + Message: err.Error(), + }) + + logger.WithFields(logrus.Fields{ + "err": err, + "upload_timeout": s.uploadTimeout, + }).Error("couldn't install agent, attempting requeue") + context.CaptureError(ctx, err) + + err := buildJob.Requeue(preTimeoutCtx) + if err != nil { + logger.WithField("err", err).Error("couldn't requeue job") + } + + return multistep.ActionHalt + } + + logger.WithFields(logrus.Fields{ + "since_processed_ms": time.Since(processedAt).Seconds() * 1e3, + }).Info("installed agent") + } + return multistep.ActionContinue } diff --git a/vendor/manifest b/vendor/manifest index 5b22f7598..5794a2f3e 100644 --- a/vendor/manifest +++ b/vendor/manifest @@ -814,6 +814,14 @@ "path": "vendor/github.com/pmezard/go-difflib/difflib", "notests": true }, + { + "importpath": "github.com/travis-ci/worker-agent", + "repository": "https://github.com/travis-ci/worker-agent", + "vcs": "git", + "revision": "3496f3023d395c7c0c0ff9545a53d1fac5851471", + "branch": "master", + "notests": true + }, { "importpath": "go.opencensus.io", "repository": "https://github.com/census-instrumentation/opencensus-go", @@ -1581,4 +1589,4 @@ "notests": true } ] -} \ No newline at end of file +} diff --git a/winrm/package.go b/winrm/package.go index 09aee3dcb..5040f2c8d 100644 --- a/winrm/package.go +++ b/winrm/package.go @@ -4,6 +4,7 @@ import ( "bytes" "fmt" "io" + "os" "time" "github.com/masterzen/winrm" @@ -13,7 +14,6 @@ import ( var errNotImplemented = fmt.Errorf("method not implemented") func New(host string, port int, username, password string) (*Remoter, error) { - endpoint := &winrm.Endpoint{ Host: host, Port: port, @@ -77,6 +77,10 @@ func (r *Remoter) Close() error { return nil } +func (r *Remoter) Chmod(path string, mode os.FileMode) error { + return errNotImplemented +} + func (r *Remoter) newCopyClient() (*winrmcp.Winrmcp, error) { addr := fmt.Sprintf("%s:%d", r.endpoint.Host, r.endpoint.Port)