From bdc01121470005965384a7ab0dffd07982bdaeaa Mon Sep 17 00:00:00 2001 From: Ashley Coleman Date: Mon, 16 Sep 2024 11:05:54 -0600 Subject: [PATCH] stdlib: Move file filters into runners (#1646) * stdlib: Move file filters into runners * address comments * Push cleanable files into RunnerOutput * cleanup --- share/wake/lib/system/io.wake | 24 ++++- share/wake/lib/system/job.wake | 15 ++-- share/wake/lib/system/job_cache_runner.wake | 8 +- .../wake/lib/system/remote_cache_runner.wake | 14 ++- share/wake/lib/system/runner.wake | 90 +++++++++++++++++-- 5 files changed, 116 insertions(+), 35 deletions(-) diff --git a/share/wake/lib/system/io.wake b/share/wake/lib/system/io.wake index 73bd439a2..7c0972e89 100644 --- a/share/wake/lib/system/io.wake +++ b/share/wake/lib/system/io.wake @@ -138,7 +138,7 @@ def writeRunner (content: String) = def primWrite (mode: Integer) (path: String) (content: String): Result String String = (\_ \_ \_ prim "write") mode path content - def run (job: Job) ((RunnerInput _ cmd vis _ _ _ _ _ predict _): RunnerInput): Result RunnerOutput Error = + def run (job: Job) ((RunnerInput _ cmd vis _ _ _ _ _ predict _ fnInputs fnOutputs): RunnerInput): Result RunnerOutput Error = # Command must be ("", "-m", "{string mode}", "{string path}", Nil) require "", "-m", smode, path, Nil = cmd else panic "writeImp violated command-line contract" @@ -161,7 +161,15 @@ def writeRunner (content: String) = match writeTask Fail f -> failWithError f Pass path -> - RunnerOutput (vis | map getPathName) (path,) reality + def fileInputs = + vis + | map getPathName + | fnInputs + + def cleanable = (path, Nil) + def fileOutputs = fnOutputs cleanable + + RunnerOutput fileInputs fileOutputs cleanable reality | Pass makeRunner "write" run @@ -277,7 +285,7 @@ def mkdirRunner: Runner = def primMkdir (mode: Integer) (path: String): Result String String = (\_ \_ prim "mkdir") mode path - def run (job: Job) ((RunnerInput _ cmd vis _ _ _ _ _ predict _): RunnerInput): Result RunnerOutput Error = + def run (job: Job) ((RunnerInput _ cmd vis _ _ _ _ _ predict _ fnInputs fnOutputs): RunnerInput): Result RunnerOutput Error = # Command must be ("", "-m", "{string mode}", "{string path}", Nil) require "", "-m", smode, path, Nil = cmd else panic "mkdirImp violated command-line contract" @@ -300,7 +308,15 @@ def mkdirRunner: Runner = match mkdirTask Fail f -> failWithError f Pass path -> - RunnerOutput (vis | map getPathName) (path,) reality + def fileInputs = + vis + | map getPathName + | fnInputs + + def cleanable = (path, Nil) + def fileOutputs = fnOutputs cleanable + + RunnerOutput fileInputs fileOutputs cleanable reality | Pass makeRunner "mkdir" run diff --git a/share/wake/lib/system/job.wake b/share/wake/lib/system/job.wake index 263141c1c..0729e2bc4 100644 --- a/share/wake/lib/system/job.wake +++ b/share/wake/lib/system/job.wake @@ -150,22 +150,17 @@ def runAlways cmd env dir stdin res uusage finputs foutputs vis keep run echo st | getOrElse uusage def output = - run job (Pass (RunnerInput label cmd vis env dir stdin res prefix usage isatty)) + run job (Pass (RunnerInput label cmd vis env dir stdin res prefix usage isatty finputs foutputs)) def final _ = match output Fail e -> primJobFailLaunch job e - Pass (RunnerOutput inputs outputs reality) -> - def input = - finputs inputs - | map simplify - | implode - - def output = - foutputs outputs + Pass (RunnerOutput inputs outputs cleanable reality) -> + def hashedOutputs = + outputs | computeHashes prefix | implode - primJobFinish job input output (implode outputs) reality + primJobFinish job inputs.implode hashedOutputs cleanable.implode reality # Make sure we don't hash files before the job has stopped running def _ = waitJobMerged final job diff --git a/share/wake/lib/system/job_cache_runner.wake b/share/wake/lib/system/job_cache_runner.wake index 8ed0c1d36..fb3a51da5 100644 --- a/share/wake/lib/system/job_cache_runner.wake +++ b/share/wake/lib/system/job_cache_runner.wake @@ -60,7 +60,7 @@ export def mkJobCacheRunner (hashFn: RunnerInput => Result String Error) (wakero def job_cache_add str = prim "job_cache_add" def run (job: Job) (input: RunnerInput): Result RunnerOutput Error = - def (RunnerInput label cmd vis env dir stdin _ prefix _ _) = input + def (RunnerInput label cmd vis env dir stdin _ prefix _ _ _ _) = input def mkVisJson (Path path hash) = JObject ( @@ -166,7 +166,7 @@ export def mkJobCacheRunner (hashFn: RunnerInput => Result String Error) (wakero def predict = Usage status runtime cputime mem ibytes obytes def _ = primJobVirtual job stdout stderr predict - Pass (RunnerOutput inputs outputs predict) + Pass (RunnerOutput inputs outputs Nil predict) def _ = require True = isDebugOn @@ -176,7 +176,7 @@ export def mkJobCacheRunner (hashFn: RunnerInput => Result String Error) (wakero True # Now we need to run the job - require Pass (RunnerOutput inputs outputs usage) = baseDoIt job (Pass input) + require Pass (RunnerOutput inputs outputs cleanable usage) = baseDoIt job (Pass input) def Usage status runtime cputime mem ibytes obytes = usage def inputsTree = listToTree scmpCanonical inputs @@ -243,6 +243,6 @@ export def mkJobCacheRunner (hashFn: RunnerInput => Result String Error) (wakero job_cache_add jobCacheAddJson - Pass (RunnerOutput (map getPathName vis) outputs usage) + Pass (RunnerOutput (map getPathName vis) outputs cleanable usage) makeRunner "job-cache: {name}" run diff --git a/share/wake/lib/system/remote_cache_runner.wake b/share/wake/lib/system/remote_cache_runner.wake index b75d159ec..04936d635 100644 --- a/share/wake/lib/system/remote_cache_runner.wake +++ b/share/wake/lib/system/remote_cache_runner.wake @@ -216,7 +216,7 @@ export def mkRemoteCacheRunner (rscApi: RemoteCacheApi) (hashFn: RunnerInput => def _ = primJobVirtual job stdout stderr predict - Pass (RunnerOutput inputs outputs predict) + Pass (RunnerOutput inputs outputs Nil predict) def run (job: Job) (input: RunnerInput): Result RunnerOutput Error = def label = input.getRunnerInputLabel @@ -237,7 +237,7 @@ export def mkRemoteCacheRunner (rscApi: RemoteCacheApi) (hashFn: RunnerInput => require True = shouldDebugRemoteCache Unit def _ = - writeTempFile "remote.cache.lookup.fail" "label: {input.getRunnerInputLabel}\nerror: {err | format}" + writeTempFile "remote.cache.lookup.fail" "label: {label}\nerror: {err | format}" True @@ -252,9 +252,7 @@ export def mkRemoteCacheRunner (rscApi: RemoteCacheApi) (hashFn: RunnerInput => require True = shouldDebugRemoteCache Unit def _ = breadcrumb "{label}: Did not find a match" - - def _ = - writeTempFile "remote.cache.lookup.miss" "label: {input.getRunnerInputLabel}" + def _ = writeTempFile "remote.cache.lookup.miss" "label: {label}" True @@ -277,11 +275,11 @@ export def mkRemoteCacheRunner (rscApi: RemoteCacheApi) (hashFn: RunnerInput => ## --- Helper functions --- # Creates a CacheSearchRequest from the various inputs to a runner -def mkSearchRequest ((RunnerInput label cmd vis env dir stdin _res _prefix _usage isAtty): RunnerInput) (hidden: String) = +def mkSearchRequest ((RunnerInput label cmd vis env dir stdin _res _prefix _usage isAtty _ _): RunnerInput) (hidden: String) = CacheSearchRequest label cmd dir env hidden isAtty stdin vis # Creates a CachePostJobRequest from the various inputs and outputs of a runner -def mkPostJobRequest ((RunnerInput label cmd vis env dir stdin _res _prefix _ isAtty): RunnerInput) (output: RunnerOutput) (hidden: String) (stdoutBlobId: String) (stderrBlobId: String) (files: List CachePostRequestOutputFile) (directories: List CachePostRequestOutputDirectory) (symlinks: List CachePostRequestOutputSymlink) = +def mkPostJobRequest ((RunnerInput label cmd vis env dir stdin _res _prefix _ isAtty _ _): RunnerInput) (output: RunnerOutput) (hidden: String) (stdoutBlobId: String) (stderrBlobId: String) (files: List CachePostRequestOutputFile) (directories: List CachePostRequestOutputDirectory) (symlinks: List CachePostRequestOutputSymlink) = def Usage status runtime cputime mem ibytes obytes = output.getRunnerOutputUsage CachePostRequest @@ -306,7 +304,7 @@ def mkPostJobRequest ((RunnerInput label cmd vis env dir stdin _res _prefix _ is obytes # Creates a CacheAllowedRequest from the various inputs and outputs of a runner -def mkCacheAllowedRequest ((RunnerInput label cmd vis env dir stdin _res _prefix _ isAtty): RunnerInput) (output: RunnerOutput) (hidden: String) = +def mkCacheAllowedRequest ((RunnerInput label cmd vis env dir stdin _res _prefix _ isAtty _ _): RunnerInput) (output: RunnerOutput) (hidden: String) = def Usage status runtime cputime mem _ibytes obytes = output.getRunnerOutputUsage CacheAllowedRequest label cmd dir env hidden isAtty stdin vis status runtime cputime mem obytes diff --git a/share/wake/lib/system/runner.wake b/share/wake/lib/system/runner.wake index 2caf4b23a..6bd5da0da 100644 --- a/share/wake/lib/system/runner.wake +++ b/share/wake/lib/system/runner.wake @@ -47,10 +47,46 @@ export tuple RunnerInput = export Record: Usage # Determines if job should run in psuedoterminal export IsAtty: Boolean - + # Modify the Runner's reported inputs (files read). + # Must be called exactly once for a given job. + # + # FnInputs may only be called once as the user provided function may not be idempotent. + # Ex: FnInputs = tail being called multiple times would drop an extra file each time its called + # while the user only intentded for the very first file to be dropped + # + # Note for wrapping runners: + # It is expected that a wrapping runner maintain this invariant. What that looks like will + # depend heavily on the wrapper and requires due diligence. A cache wrapper for example + # should pass it along unmodified since it doesn't care about the actual inputs while + # a smart wrapper tracking file reads must pass a no-op function to the inner runner + # so that it can apply filtering after collecting the files read. + export FnInputs: (List String => List String) + # Modify the Runner's reported outputs (files created). + # Must be called exactly once for a given job. + # + # FnOutputs may only be called once as the user provided function may not be idempotent. + # Ex: FnOutputs = tail being called multiple times would drop an extra file each time its called + # while the user only intentded for the very first file to be dropped + # + # Note for wrapping runners: + # It is expected that a wrapping runner maintain this invariant. What that looks like will + # depend heavily on the wrapper and requires due diligence. A cache wrapper for example + # should pass it along unmodified since it doesn't care about the actual outputs while + # a smart wrapper tracking file writes must pass a no-op function to the inner runner + # so that it can apply filtering after collecting the files written. + export FnOutputs: (List String => List String) + +# The result returned by a Runner after running a job export tuple RunnerOutput = + # The filtered list of files actually read by the job. It can be used for more precise reruns export Inputs: List String + # The filtered list of files output by the job. These files are hashed and elevated to Path export Outputs: List String + # The list of files output by the job that should be deleted by wake --clean. May include files + # listed in Outputs as they will be removed later. The files in the list that are not in + # Outputs will not be hashed, published, or elevated to Path but will be deleted by wake --clean + export CleanableOutputs: List String + # The actual resource usage of the job that will be written to the database export Usage: Usage # A Runner describes a way to invoke a Plan to get a Job @@ -84,12 +120,21 @@ export def makeRunner (name: String) (run: Job => RunnerInput => Result RunnerOu # You must use Fn{Inputs,Outputs} to fill in this information for wake to maintain safety and reusability # Advanced usage only, proceed with caution export def localRunner: Runner = - def run (job: Job) ((RunnerInput _ cmd vis env dir stdin _ _ predict isatty): RunnerInput): Result RunnerOutput Error = - def jobKey = JobKey dir stdin env.implode cmd.implode 0 "" isatty.booleanToInteger + def run (job: Job) ((RunnerInput _ cmd vis env dir stdin _ _ predict isAtty fnInputs fnOutputs): RunnerInput): Result RunnerOutput Error = + def jobKey = JobKey dir stdin env.implode cmd.implode 0 "" isAtty.booleanToInteger def _ = primJobLaunch job jobKey predict + def fileInputs = + vis + | map getPathName + | fnInputs + + # Caller needs to fill this in from nothing + def cleanable = Nil + def fileOutputs = fnOutputs cleanable + job.getJobReality - |< RunnerOutput (vis | map getPathName) Nil + |< RunnerOutput fileInputs fileOutputs cleanable makeRunner "local" run @@ -97,11 +142,20 @@ export def localRunner: Runner = # # This runner is useful for tracking a unit of work that is job like but not launched as a process export def virtualRunner: Runner = - def run (job: Job) ((RunnerInput _ _ vis _ _ _ _ _ predict _): RunnerInput): Result RunnerOutput Error = + def run (job: Job) ((RunnerInput _ _ vis _ _ _ _ _ predict _ fnInputs fnOutputs): RunnerInput): Result RunnerOutput Error = def _ = primJobVirtual job "" "" predict + def fileInputs = + vis + | map getPathName + | fnInputs + + # Caller needs to fill this in from nothing + def cleanable = Nil + def fileOutputs = fnOutputs cleanable + job.getJobReality - |< RunnerOutput (vis | map getPathName) Nil + |< RunnerOutput fileInputs fileOutputs cleanable makeRunner "virtual" run @@ -172,7 +226,7 @@ export def makeJSONRunner ((JSONRunnerPlan rawScript extraArgs extraEnv estimate def script = which (simplify rawScript) def executeOk = access script xOK - def run (job: Job) ((RunnerInput label command visible environment directory stdin res prefix record isatty): RunnerInput): Result RunnerOutput Error = + def run (job: Job) ((RunnerInput label command visible environment directory stdin res prefix record isatty fnInputs fnOutputs): RunnerInput): Result RunnerOutput Error = require True = executeOk else failWithError "Runner {script} is not executable" @@ -219,8 +273,21 @@ export def makeJSONRunner ((JSONRunnerPlan rawScript extraArgs extraEnv estimate def cmd = script, "-I", "-p", specFile, "-o", resultFile, extraArgs # Rewrite input so that the local runner can run the job with a configured sandbox + # The identity function is passsed to the file input/output filters so no info is lost def localInput = - RunnerInput label cmd Nil (extraEnv ++ environment) "." "" Nil prefix (estimate record) isatty + RunnerInput + label + cmd + Nil + (extraEnv ++ environment) + "." + "" + Nil + prefix + (estimate record) + isatty + identity + identity # Dispatch to the local runner via composition and get the outputs def (Runner _ localRun) = localRunner @@ -258,8 +325,13 @@ export def makeJSONRunner ((JSONRunnerPlan rawScript extraArgs extraEnv estimate | getOrElse Nil | mapPartial getJString + def jsonInputs = getK `inputs` + def jsonOutputs = getK `outputs` + def filteredInputs = fnInputs jsonInputs + def filteredOutputs = fnOutputs jsonOutputs + match usageResult Fail f -> Fail (makeError f) - Pass usage -> Pass (RunnerOutput (getK `inputs`) (getK `outputs`) usage) + Pass usage -> Pass (RunnerOutput filteredInputs filteredOutputs jsonOutputs usage) makeRunner "json-{script}" run