Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Resumable job execution prototype #553

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](http://keepachangelog.com/).
## [Unreleased]

### Added
- backend/gce: first edition of worker-agent (building block for resumable job execution)

### Changed
- ratelimit: trace redis connection pool checkout
2 changes: 2 additions & 0 deletions backend/backend.go
Original file line number Diff line number Diff line change
@@ -15,6 +15,8 @@ var (
)

var ErrDownloadTraceNotImplemented = errors.New("DownloadTrace not implemented")
var ErrInstallAgentNotImplemented = errors.New("InstallAgent not implemented")
var ErrInstanceIPNotImplemented = errors.New("Instance.IP not implemented")

// Backend wraps up an alias, backend provider help, and a factory func for a
// given backend provider wheee
8 changes: 8 additions & 0 deletions backend/docker.go
Original file line number Diff line number Diff line change
@@ -662,6 +662,10 @@ func (i *dockerInstance) uploadScriptSCP(ctx gocontext.Context, script []byte) e
return nil
}

func (i *dockerInstance) InstallAgent(ctx gocontext.Context) error {
return ErrInstallAgentNotImplemented
}

func (i *dockerInstance) RunScript(ctx gocontext.Context, output io.Writer) (*RunResult, error) {
if i.runNative {
return i.runScriptExec(ctx, output)
@@ -851,6 +855,10 @@ func (i *dockerInstance) ID() string {
return i.container.ID[0:7]
}

func (i *dockerInstance) IP(ctx gocontext.Context) (string, error) {
return "", ErrInstanceIPNotImplemented
}

func (i *dockerInstance) ImageName() string {
return i.imageName
}
8 changes: 8 additions & 0 deletions backend/fake.go
Original file line number Diff line number Diff line change
@@ -69,6 +69,10 @@ func (i *fakeInstance) UploadScript(ctx context.Context, script []byte) error {
return nil
}

func (i *fakeInstance) InstallAgent(ctx context.Context) error {
return ErrInstallAgentNotImplemented
}

func (i *fakeInstance) RunScript(ctx context.Context, writer io.Writer) (*RunResult, error) {
if i.p.cfg.Get("ERROR") == "true" {
return &RunResult{Completed: false}, errors.New("fake provider is configured to error all jobs")
@@ -102,6 +106,10 @@ func (i *fakeInstance) ID() string {
return "fake"
}

func (i *fakeInstance) IP(ctx context.Context) (string, error) {
return "", ErrInstanceIPNotImplemented
}

func (i *fakeInstance) ImageName() string {
return "fake"
}
60 changes: 60 additions & 0 deletions backend/gce.go
Original file line number Diff line number Diff line change
@@ -1756,6 +1756,62 @@ func (i *gceInstance) isPreempted(ctx gocontext.Context) (bool, error) {
return preempted, err
}

func (i *gceInstance) InstallAgent(ctx gocontext.Context) error {
var conn remote.Remoter
var err error

// TODO: windows support
if i.os == "windows" {
return errors.New("agent is not supported on windows yet")
}

agentBinaryFile := "/Users/bogdana/go/src/github.com/travis-ci/worker-agent/worker-agent-linux-amd64"

agentBinaryBytes, err := ioutil.ReadFile(agentBinaryFile)
if err != nil {
return errors.Wrap(err, "couldn't read local agent binary")
}

conn, err = i.sshConnection(ctx)
if err != nil {
return errors.Wrap(err, "couldn't connect to remote server for agent upload")
}
defer conn.Close()

uploadDest := "/tmp/worker-agent"

context.LoggerFromContext(ctx).WithFields(logrus.Fields{
"dest": uploadDest,
"self": "backend/gce_instance",
}).Debug("uploading agent")

existed, err := conn.UploadFile(uploadDest, agentBinaryBytes)
if existed {
i.progresser.Progress(&ProgressEntry{
Message: "existing script detected",
State: ProgressFailure,
Interrupts: true,
})
return ErrStaleVM
}
if err != nil {
return errors.Wrap(err, "couldn't upload agent")
}

err = conn.Chmod(uploadDest, 0755)
if err != nil {
return errors.Wrap(err, "couldn't chmod agent")
}

agentCommand := "nohup bash -c '/tmp/worker-agent &> /tmp/worker-agent.out &'"
_, err = conn.RunCommand(agentCommand, ioutil.Discard)
if err != nil {
return errors.Wrap(err, "couldn't run agent")
}

return nil
}

func (i *gceInstance) RunScript(ctx gocontext.Context, output io.Writer) (*RunResult, error) {
var conn remote.Remoter
var err error
@@ -1919,6 +1975,10 @@ func (i *gceInstance) ID() string {
return i.instance.Name
}

func (i *gceInstance) IP(ctx gocontext.Context) (string, error) {
return i.getCachedIP(ctx)
}

func (i *gceInstance) ImageName() string {
return i.imageName
}
8 changes: 8 additions & 0 deletions backend/jupiterbrain.go
Original file line number Diff line number Diff line change
@@ -441,6 +441,10 @@ func (i *jupiterBrainInstance) UploadScript(ctx gocontext.Context, script []byte
return nil
}

func (i *jupiterBrainInstance) InstallAgent(ctx gocontext.Context) error {
return ErrInstallAgentNotImplemented
}

func (i *jupiterBrainInstance) RunScript(ctx gocontext.Context, output io.Writer) (*RunResult, error) {
conn, err := i.sshConnection()
if err != nil {
@@ -499,6 +503,10 @@ func (i *jupiterBrainInstance) ID() string {
return i.payload.ID
}

func (i *jupiterBrainInstance) IP(ctx gocontext.Context) (string, error) {
return "", ErrInstanceIPNotImplemented
}

func (i *jupiterBrainInstance) ImageName() string {
if i.payload == nil {
return "{unidentified}"
8 changes: 8 additions & 0 deletions backend/local.go
Original file line number Diff line number Diff line change
@@ -92,6 +92,10 @@ func (i *localInstance) UploadScript(ctx gocontext.Context, script []byte) error
return err
}

func (i *localInstance) InstallAgent(ctx gocontext.Context) error {
return ErrInstallAgentNotImplemented
}

func (i *localInstance) RunScript(ctx gocontext.Context, writer io.Writer) (*RunResult, error) {
if i.scriptPath == "" {
return &RunResult{Completed: false}, errNoScriptUploaded
@@ -138,6 +142,10 @@ func (i *localInstance) ID() string {
return fmt.Sprintf("local:%s", i.scriptPath)
}

func (i *localInstance) IP(ctx gocontext.Context) (string, error) {
return "", ErrInstanceIPNotImplemented
}

func (i *localInstance) ImageName() string {
return ""
}
8 changes: 8 additions & 0 deletions backend/openstack.go
Original file line number Diff line number Diff line change
@@ -629,6 +629,10 @@ func (i *osInstance) UploadScript(ctx gocontext.Context, script []byte) error {
return nil
}

func (i *osInstance) InstallAgent(ctx gocontext.Context) error {
return ErrInstallAgentNotImplemented
}

func (i *osInstance) RunScript(ctx gocontext.Context, output io.Writer) (*RunResult, error) {
conn, err := i.sshConnection()
if err != nil {
@@ -670,6 +674,10 @@ func (i *osInstance) ID() string {
return i.instance.ID
}

func (i *osInstance) IP(ctx gocontext.Context) (string, error) {
return "", ErrInstanceIPNotImplemented
}

func (i *osInstance) ImageName() string {
return i.imageName
}
9 changes: 9 additions & 0 deletions backend/package.go
Original file line number Diff line number Diff line change
@@ -85,6 +85,10 @@ type Instance interface {
// method should not be called multiple times.
UploadScript(gocontext.Context, []byte) error

// InstallAgent uploads the worker-agent binary and starts
// it as a background process
InstallAgent(gocontext.Context) error

// RunScript runs the build script that was uploaded with the
// UploadScript method.
RunScript(gocontext.Context, io.Writer) (*RunResult, error)
@@ -98,6 +102,11 @@ type Instance interface {
// ID is used when identifying the instance in logs and such
ID() string

// IP is the ip or hostname of the instance
// TODO: think about a more general struct that includes
// the pod, project, maybe even region
IP(ctx gocontext.Context) (string, error)

// ImageName is the name of the image used to boot the instance
ImageName() string

4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
@@ -275,6 +275,9 @@ var (
Usage: "sample rate for trace as an inverse fraction - for sample rate n, every nth event will be sampled",
Value: 1,
}),
NewConfigDef("AgentEnabled", &cli.BoolFlag{
Usage: "Experimental: Use worker-agent for resumable job execution",
}),
}

// Flags is the list of all CLI flags accepted by travis-worker
@@ -437,6 +440,7 @@ type Config struct {
StackdriverProjectID string `config:"stackdriver-project-id"`
OpencensusTracingEnabled bool `config:"opencensus-tracing-enabled"`
OpencensusSamplingRate int `config:"opencensus-sampling-rate"`
AgentEnabled bool `config:"agent-enabled"`

ProviderConfig *ProviderConfig
}
2 changes: 2 additions & 0 deletions processor.go
Original file line number Diff line number Diff line change
@@ -243,6 +243,7 @@ func (p *Processor) process(ctx gocontext.Context, buildJob Job) {
&stepCheckCancellation{},
&stepUploadScript{
uploadTimeout: p.config.ScriptUploadTimeout,
agentEnabled: p.config.AgentEnabled,
},
&stepCheckCancellation{},
&stepUpdateState{},
@@ -252,6 +253,7 @@ func (p *Processor) process(ctx gocontext.Context, buildJob Job) {
logTimeout: logTimeout,
hardTimeout: buildJob.StartAttributes().HardTimeout,
skipShutdownOnLogTimeout: p.config.SkipShutdownOnLogTimeout,
agentEnabled: p.config.AgentEnabled,
},
&stepDownloadTrace{
persister: p.persister,
6 changes: 5 additions & 1 deletion remote/package.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package remote

import "io"
import (
"io"
"os"
)

type Remoter interface {
UploadFile(path string, data []byte) (bool, error)
DownloadFile(path string) ([]byte, error)
RunCommand(command string, output io.Writer) (int32, error)
Chmod(path string, mode os.FileMode) error
Close() error
}
16 changes: 16 additions & 0 deletions ssh/package.go
Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@ type Connection interface {
UploadFile(path string, data []byte) (bool, error)
DownloadFile(path string) ([]byte, error)
RunCommand(command string, output io.Writer) (int32, error)
Chmod(path string, mode os.FileMode) error
Close() error
}

@@ -193,6 +194,21 @@ func (c *sshConnection) RunCommand(command string, output io.Writer) (int32, err
}
}

func (c *sshConnection) Chmod(path string, mode os.FileMode) error {
sftp, err := sftp.NewClient(c.client)
if err != nil {
return errors.Wrap(err, "couldn't create SFTP client")
}
defer sftp.Close()

err = sftp.Chmod(path, mode)
if err != nil {
return errors.Wrap(err, "couldn't chmod file")
}

return nil
}

func (c *sshConnection) Close() error {
return c.client.Close()
}
Loading