Skip to content

Commit

Permalink
feat: calling-convention=stdio
Browse files Browse the repository at this point in the history
Adds support for workers accepting input on stdout and producing output on stdout.

This adds add1c and add1d test coverage in pipelines.sh. add1d also adds python test coverage for pipelines. Also adds mix-and-match pipelines, where we mix-and-match calling conventions, and python vs bash.

Improved pipeline error handling and error handling test coverage.

Separate task failure watching from redirect logic.

Signed-off-by: Nick Mitchell <[email protected]>
  • Loading branch information
starpit committed Oct 25, 2024
1 parent 5b80ac5 commit 8fa709a
Show file tree
Hide file tree
Showing 16 changed files with 317 additions and 92 deletions.
2 changes: 2 additions & 0 deletions cmd/options/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ func AddBuildOptions(cmd *cobra.Command) (*build.Options, error) {
return nil, err
}

AddCallingConventionOptionsTo(cmd, &options)

cmd.Flags().StringVarP(&options.ImagePullSecret, "image-pull-secret", "s", options.ImagePullSecret, "Of the form <user>:<token>@ghcr.io")
cmd.Flags().StringVar(&options.Queue, "queue", options.Queue, "Use the queue defined by this Secret (data: accessKeyID, secretAccessKey, endpoint)")
cmd.Flags().BoolVar(&options.HasGpuSupport, "gpu", options.HasGpuSupport, "Run with GPUs (if supported by the application)")
Expand Down
18 changes: 18 additions & 0 deletions cmd/options/calling-conventions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package options

import (
"github.com/spf13/cobra"

"lunchpail.io/pkg/build"
)

func AddCallingConventionOptions(cmd *cobra.Command) *build.Options {
opts := &build.Options{}
AddCallingConventionOptionsTo(cmd, opts)
cmd.MarkFlagRequired("calling-convention")
return opts
}

func AddCallingConventionOptionsTo(cmd *cobra.Command, options *build.Options) {
cmd.Flags().VarP(&options.CallingConvention, "calling-convention", "C", "Task input and output calling convention [files, stdio]")
}
10 changes: 6 additions & 4 deletions cmd/subcommands/component/worker/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func Run() *cobra.Command {
var startupDelay int
cmd.Flags().IntVar(&startupDelay, "delay", 0, "Delay (in seconds) before engaging in any work")

ccOpts := options.AddCallingConventionOptions(cmd)
logOpts := options.AddLogOptions(cmd)

cmd.RunE = func(cmd *cobra.Command, args []string) error {
Expand All @@ -46,10 +47,11 @@ func Run() *cobra.Command {
}

return worker.Run(context.Background(), args, worker.Options{
StartupDelay: startupDelay,
PollingInterval: pollingInterval,
LogOptions: *logOpts,
RunContext: run.ForPool(poolName).ForWorker(workerName),
CallingConvention: ccOpts.CallingConvention,
StartupDelay: startupDelay,
PollingInterval: pollingInterval,
LogOptions: *logOpts,
RunContext: run.ForPool(poolName).ForWorker(workerName),
})
}

Expand Down
66 changes: 66 additions & 0 deletions pkg/boot/failures.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package boot

import (
"context"
"fmt"
"os"
"strings"

"lunchpail.io/pkg/be"
"lunchpail.io/pkg/build"
"lunchpail.io/pkg/ir/queue"
s3 "lunchpail.io/pkg/runtime/queue"
)

func lookForTaskFailures(ctx context.Context, backend be.Backend, run queue.RunContext, opts build.LogOptions) error {
client, err := s3.NewS3ClientForRun(ctx, backend, run.RunName)
if err != nil {
return err
}
defer client.Stop()

if err := client.Mkdirp(run.Bucket); err != nil {
return err
}

failures := run.AsFileForAnyWorker(queue.FinishedWithFailed) // we want to be notified if a task fails in *any* worker
objc, errc := client.Listen(run.Bucket, failures, "", false)

done := false
for !done {
select {
case err := <-errc:
if err == nil || strings.Contains(err.Error(), "EOF") {
done = true
} else {
fmt.Fprintln(os.Stderr, err)
}
case object := <-objc:
// Oops, a task failed. Fetch the stderr and show it.
if opts.Verbose {
fmt.Fprintf(os.Stderr, "Got indication of task failure %s\n", object)
}

// We need to find the FinishedWithStderr file
// that corresponds to the given object, which
// is an AssignedAndFinished file. To do so,
// we can parse the object to extract the task
// instance (`ForObjectTask`) and then use
// that `fortask` to templatize the
// FinishedWithCode
forobject, err := run.ForObject(queue.FinishedWithFailed, object)
if err != nil {
return err
}

errorContent, err := client.Get(run.Bucket, forobject.AsFile(queue.FinishedWithStderr))
if err != nil {
return err
}

return fmt.Errorf("\033[0;31m" + errorContent + "\033[0m\n")
}
}

return nil
}
4 changes: 3 additions & 1 deletion pkg/boot/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"path/filepath"
"slices"

"github.com/dustin/go-humanize/english"

"lunchpail.io/pkg/be"
"lunchpail.io/pkg/build"
"lunchpail.io/pkg/ir/llir"
Expand All @@ -29,7 +31,7 @@ func catAndRedirect(ctx context.Context, inputs []string, backend be.Backend, ir
if len(inputs) > 0 {
// "cat" the inputs into the queue
if opts.Verbose {
fmt.Fprintf(os.Stderr, "up is using 'cat' to inject %d input files\n", len(inputs))
fmt.Fprintf(os.Stderr, "Using 'cat' to inject %s\n", english.Plural(len(inputs), "input file", ""))
}
if err := builtins.Cat(ctx, client.S3Client, ir.Context.Run, inputs, opts); err != nil {
return err
Expand Down
37 changes: 27 additions & 10 deletions pkg/boot/up.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,50 +107,61 @@ func upLLIR(ctx context.Context, backend be.Backend, ir llir.LLIR, opts UpOption

// We need to chain the isRunning channel to our 0-2 consumers
// below. This is because golang channels are not multicast.
isRunning3 := make(chan struct{})
isRunning4 := make(chan struct{})
needsCatAndRedirect := len(opts.Inputs) > 0 || ir.Context.Run.Step > 0
go func() {
<-isRunning
isRunning3 <- struct{}{}
isRunning4 <- struct{}{}
isRunning4 <- struct{}{}
if needsCatAndRedirect {
isRunning3 <- struct{}{}
isRunning4 <- struct{}{}
}
if opts.Watch {
isRunning3 <- struct{}{}
isRunning4 <- struct{}{}
}
}()

var errorFromIo error
redirectDone := make(chan struct{})
if needsCatAndRedirect {
// Behave like `cat inputs | ... > outputs`
go func() {
// wait for the run to be ready for us to enqueue
<-isRunning3
<-isRunning4

defer func() { redirectDone <- struct{}{} }()
if err := catAndRedirect(cancellable, opts.Inputs, backend, ir, *opts.BuildOptions.Log); err != nil {
fmt.Fprintln(os.Stderr, err)
errorFromIo = err
cancel()
}
}()
} else if opts.Watch {
verbose := opts.BuildOptions.Log.Verbose
go func() {
<-isRunning3
<-isRunning4
go watchLogs(cancellable, backend, ir, WatchOptions{Verbose: verbose})
go watchUtilization(cancellable, backend, ir, WatchOptions{Verbose: verbose})
}()
}

go func() {
<-isRunning3
<-isRunning4
if err := handlePipelineStdout(ir.Context); err != nil {
fmt.Fprintln(os.Stderr, err)
}
}()

var errorFromTask error
go func() {
<-isRunning4
if err := lookForTaskFailures(cancellable, backend, ir.Context.Run, *opts.BuildOptions.Log); err != nil {
errorFromTask = err
// fail fast? cancel()
}
}()

defer cancel()
err := backend.Up(cancellable, ir, opts.BuildOptions, isRunning)
errorFromUp := backend.Up(cancellable, ir, opts.BuildOptions, isRunning)

if needsCatAndRedirect {
<-redirectDone
Expand All @@ -162,5 +173,11 @@ func upLLIR(ctx context.Context, backend be.Backend, ir llir.LLIR, opts UpOption
default:
}

return err
if errorFromTask != nil {
return errorFromTask
}
if errorFromIo != nil {
return errorFromIo
}
return errorFromUp
}
28 changes: 15 additions & 13 deletions pkg/build/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"path/filepath"

"lunchpail.io/pkg/be/target"
"lunchpail.io/pkg/ir/hlir"
)

type TargetOptions struct {
Expand All @@ -23,19 +24,20 @@ type Options struct {
Target *TargetOptions
Log *LogOptions

ImagePullSecret string `yaml:"imagePullSecret,omitempty"`
OverrideValues []string `yaml:"overrideValues,omitempty"`
OverrideFileValues []string `yaml:"overrideFileValues,omitempty"`
Queue string `yaml:",omitempty"`
HasGpuSupport bool `yaml:"hasGpuSupport,omitempty"`
ApiKey string `yaml:"apiKey,omitempty"`
ResourceGroupID string `yaml:"resourceGroupID,omitempty"`
SSHKeyType string `yaml:"SSHKeyType,omitempty"`
PublicSSHKey string `yaml:"publicSSHKey,omitempty"`
Zone string `yaml:"zone,omitempty"`
Profile string `yaml:"profile,omitempty"`
ImageID string `yaml:"imageID,omitempty"`
CreateNamespace bool `yaml:"createNamespace,omitempty"`
hlir.CallingConvention `yaml:"callingConvention,omitempty"`
ImagePullSecret string `yaml:"imagePullSecret,omitempty"`
OverrideValues []string `yaml:"overrideValues,omitempty"`
OverrideFileValues []string `yaml:"overrideFileValues,omitempty"`
Queue string `yaml:",omitempty"`
HasGpuSupport bool `yaml:"hasGpuSupport,omitempty"`
ApiKey string `yaml:"apiKey,omitempty"`
ResourceGroupID string `yaml:"resourceGroupID,omitempty"`
SSHKeyType string `yaml:"SSHKeyType,omitempty"`
PublicSSHKey string `yaml:"publicSSHKey,omitempty"`
Zone string `yaml:"zone,omitempty"`
Profile string `yaml:"profile,omitempty"`
ImageID string `yaml:"imageID,omitempty"`
CreateNamespace bool `yaml:"createNamespace,omitempty"`
}

//go:embed buildOptions.json
Expand Down
18 changes: 16 additions & 2 deletions pkg/fe/transformer/api/workerpool/lower.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,25 @@ func Lower(buildName string, ctx llir.Context, app hlir.Application, pool hlir.W
app.Spec.Env = make(map[string]string)
}

queueArgs := fmt.Sprintf("--pool %s --worker $LUNCHPAIL_POD_NAME --verbose=%v --debug=%v ", pool.Metadata.Name, opts.Log.Verbose, opts.Log.Debug)
queueArgs := fmt.Sprintf("--pool %s --worker $LUNCHPAIL_POD_NAME --verbose=%v --debug=%v ",
pool.Metadata.Name,
opts.Log.Verbose,
opts.Log.Debug,
)

callingConvention := opts.CallingConvention
if callingConvention == "" {
callingConvention = app.Spec.CallingConvention
}
if callingConvention == "" {
callingConvention = hlir.CallingConventionFiles
}

app.Spec.Command = fmt.Sprintf(`trap "$LUNCHPAIL_EXE component worker prestop %s" EXIT
$LUNCHPAIL_EXE component worker run --delay %d %s -- %s`,
$LUNCHPAIL_EXE component worker run --delay %d --calling-convention %v %s -- %s`,
queueArgs,
startupDelay,
callingConvention,
queueArgs,
app.Spec.Command,
)
Expand Down
1 change: 1 addition & 0 deletions pkg/ir/hlir/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Application struct {
SecurityContext SecurityContext `yaml:"securityContext,omitempty"`
ContainerSecurityContext ContainerSecurityContext `yaml:"containerSecurityContext,omitempty"`
Needs []Needs `yaml:"needs,omitempty"`
CallingConvention `yaml:"callingConvention,omitempty"`
}
}

Expand Down
41 changes: 41 additions & 0 deletions pkg/ir/hlir/calling-convention.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package hlir

import "fmt"

type CallingConvention string

const (
CallingConventionFiles CallingConvention = "files"
CallingConventionStdio = "stdio"
)

func lookup(maybe string) (CallingConvention, error) {
switch maybe {
case string(CallingConventionFiles):
return CallingConventionFiles, nil
case string(CallingConventionStdio):
return CallingConventionStdio, nil
}

return "", fmt.Errorf("Unsupported calling convention %s\n", maybe)
}

// String is used both by fmt.Print and by Cobra in help text
func (cc *CallingConvention) String() string {
return string(*cc)
}

// Set must have pointer receiver so it doesn't change the value of a copy
func (cc *CallingConvention) Set(v string) error {
p, err := lookup(v)
if err != nil {
return err
}
*cc = p
return nil
}

// Type is only used in help text
func (cc *CallingConvention) Type() string {
return "CallingConvention"
}
13 changes: 8 additions & 5 deletions pkg/ir/queue/as.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,14 @@ func (run RunContext) AsFile(path Path) string {
if err != nil {
return ""
}
return s
return anyPoolP.ReplaceAllString(
anyWorkerP.ReplaceAllString(
anyTaskP.ReplaceAllString(s, ""),
""),
"")
}

// As with AsFile() but returning the enclosing directory (i.e. not
// specific to a pool, a worker, or a task)
func (run RunContext) AsFileForAnyWorker(path Path) string {
return filepath.Dir(filepath.Dir(run.ForPool("").ForWorker("").ForTask("").AsFile(path)))
// As with AsFile(), but independent of any particular worker
func (ctx RunContext) AsFileForAnyWorker(path Path) string {
return ctx.ForPool(any).ForWorker(any).ForTask(any).AsFile(path)
}
5 changes: 5 additions & 0 deletions pkg/ir/queue/pattern.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ package queue

import "regexp"

var any = "*"
var anyPoolP = regexp.MustCompile("/pool/\\" + any)
var anyWorkerP = regexp.MustCompile("/worker/\\" + any)
var anyTaskP = regexp.MustCompile("\\" + any + "$") // task comes at the end

var placeholder = "xxxxxxxxxxxxxx"
var placeholderR = regexp.MustCompile(placeholder)

Expand Down
Loading

0 comments on commit 8fa709a

Please sign in to comment.