Skip to content

Commit

Permalink
stdlib: Move file filters into runners (#1646)
Browse files Browse the repository at this point in the history
* stdlib: Move file filters into runners

* address comments

* Push cleanable files into RunnerOutput

* cleanup
  • Loading branch information
V-FEXrt authored Sep 16, 2024
1 parent 82da532 commit bdc0112
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 35 deletions.
24 changes: 20 additions & 4 deletions share/wake/lib/system/io.wake
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ def writeRunner (content: String) =
def primWrite (mode: Integer) (path: String) (content: String): Result String String =
(\_ \_ \_ prim "write") mode path content

def run (job: Job) ((RunnerInput _ cmd vis _ _ _ _ _ predict _): RunnerInput): Result RunnerOutput Error =
def run (job: Job) ((RunnerInput _ cmd vis _ _ _ _ _ predict _ fnInputs fnOutputs): RunnerInput): Result RunnerOutput Error =
# Command must be ("<write>", "-m", "{string mode}", "{string path}", Nil)
require "<write>", "-m", smode, path, Nil = cmd
else panic "writeImp violated command-line contract"
Expand All @@ -161,7 +161,15 @@ def writeRunner (content: String) =
match writeTask
Fail f -> failWithError f
Pass path ->
RunnerOutput (vis | map getPathName) (path,) reality
def fileInputs =
vis
| map getPathName
| fnInputs

def cleanable = (path, Nil)
def fileOutputs = fnOutputs cleanable

RunnerOutput fileInputs fileOutputs cleanable reality
| Pass

makeRunner "write" run
Expand Down Expand Up @@ -277,7 +285,7 @@ def mkdirRunner: Runner =
def primMkdir (mode: Integer) (path: String): Result String String =
(\_ \_ prim "mkdir") mode path

def run (job: Job) ((RunnerInput _ cmd vis _ _ _ _ _ predict _): RunnerInput): Result RunnerOutput Error =
def run (job: Job) ((RunnerInput _ cmd vis _ _ _ _ _ predict _ fnInputs fnOutputs): RunnerInput): Result RunnerOutput Error =
# Command must be ("<mkdir>", "-m", "{string mode}", "{string path}", Nil)
require "<mkdir>", "-m", smode, path, Nil = cmd
else panic "mkdirImp violated command-line contract"
Expand All @@ -300,7 +308,15 @@ def mkdirRunner: Runner =
match mkdirTask
Fail f -> failWithError f
Pass path ->
RunnerOutput (vis | map getPathName) (path,) reality
def fileInputs =
vis
| map getPathName
| fnInputs

def cleanable = (path, Nil)
def fileOutputs = fnOutputs cleanable

RunnerOutput fileInputs fileOutputs cleanable reality
| Pass

makeRunner "mkdir" run
Expand Down
15 changes: 5 additions & 10 deletions share/wake/lib/system/job.wake
Original file line number Diff line number Diff line change
Expand Up @@ -150,22 +150,17 @@ def runAlways cmd env dir stdin res uusage finputs foutputs vis keep run echo st
| getOrElse uusage

def output =
run job (Pass (RunnerInput label cmd vis env dir stdin res prefix usage isatty))
run job (Pass (RunnerInput label cmd vis env dir stdin res prefix usage isatty finputs foutputs))

def final _ = match output
Fail e -> primJobFailLaunch job e
Pass (RunnerOutput inputs outputs reality) ->
def input =
finputs inputs
| map simplify
| implode

def output =
foutputs outputs
Pass (RunnerOutput inputs outputs cleanable reality) ->
def hashedOutputs =
outputs
| computeHashes prefix
| implode

primJobFinish job input output (implode outputs) reality
primJobFinish job inputs.implode hashedOutputs cleanable.implode reality

# Make sure we don't hash files before the job has stopped running
def _ = waitJobMerged final job
Expand Down
8 changes: 4 additions & 4 deletions share/wake/lib/system/job_cache_runner.wake
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ export def mkJobCacheRunner (hashFn: RunnerInput => Result String Error) (wakero
def job_cache_add str = prim "job_cache_add"

def run (job: Job) (input: RunnerInput): Result RunnerOutput Error =
def (RunnerInput label cmd vis env dir stdin _ prefix _ _) = input
def (RunnerInput label cmd vis env dir stdin _ prefix _ _ _ _) = input

def mkVisJson (Path path hash) =
JObject (
Expand Down Expand Up @@ -166,7 +166,7 @@ export def mkJobCacheRunner (hashFn: RunnerInput => Result String Error) (wakero
def predict = Usage status runtime cputime mem ibytes obytes
def _ = primJobVirtual job stdout stderr predict

Pass (RunnerOutput inputs outputs predict)
Pass (RunnerOutput inputs outputs Nil predict)

def _ =
require True = isDebugOn
Expand All @@ -176,7 +176,7 @@ export def mkJobCacheRunner (hashFn: RunnerInput => Result String Error) (wakero
True

# Now we need to run the job
require Pass (RunnerOutput inputs outputs usage) = baseDoIt job (Pass input)
require Pass (RunnerOutput inputs outputs cleanable usage) = baseDoIt job (Pass input)

def Usage status runtime cputime mem ibytes obytes = usage
def inputsTree = listToTree scmpCanonical inputs
Expand Down Expand Up @@ -243,6 +243,6 @@ export def mkJobCacheRunner (hashFn: RunnerInput => Result String Error) (wakero

job_cache_add jobCacheAddJson

Pass (RunnerOutput (map getPathName vis) outputs usage)
Pass (RunnerOutput (map getPathName vis) outputs cleanable usage)

makeRunner "job-cache: {name}" run
14 changes: 6 additions & 8 deletions share/wake/lib/system/remote_cache_runner.wake
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ export def mkRemoteCacheRunner (rscApi: RemoteCacheApi) (hashFn: RunnerInput =>

def _ = primJobVirtual job stdout stderr predict

Pass (RunnerOutput inputs outputs predict)
Pass (RunnerOutput inputs outputs Nil predict)

def run (job: Job) (input: RunnerInput): Result RunnerOutput Error =
def label = input.getRunnerInputLabel
Expand All @@ -237,7 +237,7 @@ export def mkRemoteCacheRunner (rscApi: RemoteCacheApi) (hashFn: RunnerInput =>
require True = shouldDebugRemoteCache Unit

def _ =
writeTempFile "remote.cache.lookup.fail" "label: {input.getRunnerInputLabel}\nerror: {err | format}"
writeTempFile "remote.cache.lookup.fail" "label: {label}\nerror: {err | format}"

True

Expand All @@ -252,9 +252,7 @@ export def mkRemoteCacheRunner (rscApi: RemoteCacheApi) (hashFn: RunnerInput =>
require True = shouldDebugRemoteCache Unit

def _ = breadcrumb "{label}: Did not find a match"

def _ =
writeTempFile "remote.cache.lookup.miss" "label: {input.getRunnerInputLabel}"
def _ = writeTempFile "remote.cache.lookup.miss" "label: {label}"

True

Expand All @@ -277,11 +275,11 @@ export def mkRemoteCacheRunner (rscApi: RemoteCacheApi) (hashFn: RunnerInput =>
## --- Helper functions ---

# Creates a CacheSearchRequest from the various inputs to a runner
def mkSearchRequest ((RunnerInput label cmd vis env dir stdin _res _prefix _usage isAtty): RunnerInput) (hidden: String) =
def mkSearchRequest ((RunnerInput label cmd vis env dir stdin _res _prefix _usage isAtty _ _): RunnerInput) (hidden: String) =
CacheSearchRequest label cmd dir env hidden isAtty stdin vis

# Creates a CachePostJobRequest from the various inputs and outputs of a runner
def mkPostJobRequest ((RunnerInput label cmd vis env dir stdin _res _prefix _ isAtty): RunnerInput) (output: RunnerOutput) (hidden: String) (stdoutBlobId: String) (stderrBlobId: String) (files: List CachePostRequestOutputFile) (directories: List CachePostRequestOutputDirectory) (symlinks: List CachePostRequestOutputSymlink) =
def mkPostJobRequest ((RunnerInput label cmd vis env dir stdin _res _prefix _ isAtty _ _): RunnerInput) (output: RunnerOutput) (hidden: String) (stdoutBlobId: String) (stderrBlobId: String) (files: List CachePostRequestOutputFile) (directories: List CachePostRequestOutputDirectory) (symlinks: List CachePostRequestOutputSymlink) =
def Usage status runtime cputime mem ibytes obytes = output.getRunnerOutputUsage

CachePostRequest
Expand All @@ -306,7 +304,7 @@ def mkPostJobRequest ((RunnerInput label cmd vis env dir stdin _res _prefix _ is
obytes

# Creates a CacheAllowedRequest from the various inputs and outputs of a runner
def mkCacheAllowedRequest ((RunnerInput label cmd vis env dir stdin _res _prefix _ isAtty): RunnerInput) (output: RunnerOutput) (hidden: String) =
def mkCacheAllowedRequest ((RunnerInput label cmd vis env dir stdin _res _prefix _ isAtty _ _): RunnerInput) (output: RunnerOutput) (hidden: String) =
def Usage status runtime cputime mem _ibytes obytes = output.getRunnerOutputUsage

CacheAllowedRequest label cmd dir env hidden isAtty stdin vis status runtime cputime mem obytes
Expand Down
90 changes: 81 additions & 9 deletions share/wake/lib/system/runner.wake
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,46 @@ export tuple RunnerInput =
export Record: Usage
# Determines if job should run in psuedoterminal
export IsAtty: Boolean

# Modify the Runner's reported inputs (files read).
# Must be called exactly once for a given job.
#
# FnInputs may only be called once as the user provided function may not be idempotent.
# Ex: FnInputs = tail being called multiple times would drop an extra file each time its called
# while the user only intentded for the very first file to be dropped
#
# Note for wrapping runners:
# It is expected that a wrapping runner maintain this invariant. What that looks like will
# depend heavily on the wrapper and requires due diligence. A cache wrapper for example
# should pass it along unmodified since it doesn't care about the actual inputs while
# a smart wrapper tracking file reads must pass a no-op function to the inner runner
# so that it can apply filtering after collecting the files read.
export FnInputs: (List String => List String)
# Modify the Runner's reported outputs (files created).
# Must be called exactly once for a given job.
#
# FnOutputs may only be called once as the user provided function may not be idempotent.
# Ex: FnOutputs = tail being called multiple times would drop an extra file each time its called
# while the user only intentded for the very first file to be dropped
#
# Note for wrapping runners:
# It is expected that a wrapping runner maintain this invariant. What that looks like will
# depend heavily on the wrapper and requires due diligence. A cache wrapper for example
# should pass it along unmodified since it doesn't care about the actual outputs while
# a smart wrapper tracking file writes must pass a no-op function to the inner runner
# so that it can apply filtering after collecting the files written.
export FnOutputs: (List String => List String)

# The result returned by a Runner after running a job
export tuple RunnerOutput =
# The filtered list of files actually read by the job. It can be used for more precise reruns
export Inputs: List String
# The filtered list of files output by the job. These files are hashed and elevated to Path
export Outputs: List String
# The list of files output by the job that should be deleted by wake --clean. May include files
# listed in Outputs as they will be removed later. The files in the list that are not in
# Outputs will not be hashed, published, or elevated to Path but will be deleted by wake --clean
export CleanableOutputs: List String
# The actual resource usage of the job that will be written to the database
export Usage: Usage

# A Runner describes a way to invoke a Plan to get a Job
Expand Down Expand Up @@ -84,24 +120,42 @@ export def makeRunner (name: String) (run: Job => RunnerInput => Result RunnerOu
# You must use Fn{Inputs,Outputs} to fill in this information for wake to maintain safety and reusability
# Advanced usage only, proceed with caution
export def localRunner: Runner =
def run (job: Job) ((RunnerInput _ cmd vis env dir stdin _ _ predict isatty): RunnerInput): Result RunnerOutput Error =
def jobKey = JobKey dir stdin env.implode cmd.implode 0 "" isatty.booleanToInteger
def run (job: Job) ((RunnerInput _ cmd vis env dir stdin _ _ predict isAtty fnInputs fnOutputs): RunnerInput): Result RunnerOutput Error =
def jobKey = JobKey dir stdin env.implode cmd.implode 0 "" isAtty.booleanToInteger
def _ = primJobLaunch job jobKey predict

def fileInputs =
vis
| map getPathName
| fnInputs

# Caller needs to fill this in from nothing
def cleanable = Nil
def fileOutputs = fnOutputs cleanable

job.getJobReality
|< RunnerOutput (vis | map getPathName) Nil
|< RunnerOutput fileInputs fileOutputs cleanable

makeRunner "local" run

# virtualRunner: A runner that immediatly marks the job as complete using the predicted usage
#
# This runner is useful for tracking a unit of work that is job like but not launched as a process
export def virtualRunner: Runner =
def run (job: Job) ((RunnerInput _ _ vis _ _ _ _ _ predict _): RunnerInput): Result RunnerOutput Error =
def run (job: Job) ((RunnerInput _ _ vis _ _ _ _ _ predict _ fnInputs fnOutputs): RunnerInput): Result RunnerOutput Error =
def _ = primJobVirtual job "" "" predict

def fileInputs =
vis
| map getPathName
| fnInputs

# Caller needs to fill this in from nothing
def cleanable = Nil
def fileOutputs = fnOutputs cleanable

job.getJobReality
|< RunnerOutput (vis | map getPathName) Nil
|< RunnerOutput fileInputs fileOutputs cleanable

makeRunner "virtual" run

Expand Down Expand Up @@ -172,7 +226,7 @@ export def makeJSONRunner ((JSONRunnerPlan rawScript extraArgs extraEnv estimate
def script = which (simplify rawScript)
def executeOk = access script xOK

def run (job: Job) ((RunnerInput label command visible environment directory stdin res prefix record isatty): RunnerInput): Result RunnerOutput Error =
def run (job: Job) ((RunnerInput label command visible environment directory stdin res prefix record isatty fnInputs fnOutputs): RunnerInput): Result RunnerOutput Error =
require True = executeOk
else failWithError "Runner {script} is not executable"

Expand Down Expand Up @@ -219,8 +273,21 @@ export def makeJSONRunner ((JSONRunnerPlan rawScript extraArgs extraEnv estimate
def cmd = script, "-I", "-p", specFile, "-o", resultFile, extraArgs

# Rewrite input so that the local runner can run the job with a configured sandbox
# The identity function is passsed to the file input/output filters so no info is lost
def localInput =
RunnerInput label cmd Nil (extraEnv ++ environment) "." "" Nil prefix (estimate record) isatty
RunnerInput
label
cmd
Nil
(extraEnv ++ environment)
"."
""
Nil
prefix
(estimate record)
isatty
identity
identity

# Dispatch to the local runner via composition and get the outputs
def (Runner _ localRun) = localRunner
Expand Down Expand Up @@ -258,8 +325,13 @@ export def makeJSONRunner ((JSONRunnerPlan rawScript extraArgs extraEnv estimate
| getOrElse Nil
| mapPartial getJString

def jsonInputs = getK `inputs`
def jsonOutputs = getK `outputs`
def filteredInputs = fnInputs jsonInputs
def filteredOutputs = fnOutputs jsonOutputs

match usageResult
Fail f -> Fail (makeError f)
Pass usage -> Pass (RunnerOutput (getK `inputs`) (getK `outputs`) usage)
Pass usage -> Pass (RunnerOutput filteredInputs filteredOutputs jsonOutputs usage)

makeRunner "json-{script}" run

0 comments on commit bdc0112

Please sign in to comment.