Skip to content
Derick Faller edited this page Oct 31, 2018 · 52 revisions

This is an overview of the various command messages:


blocked_jobs

  • Inputs: Up to 1000 job IDs, status change time
  • Next Commands: update_recipe_metrics
  • Execute:
    1. Select and lock applicable job models
    2. Update applicable jobs to BLOCKED status
    3. Send update_recipe_metrics messages for affected recipes

cancel_jobs

  • Inputs: Up to 100 job IDs, cancel time
  • Next Commands: update_recipe_metrics, update_recipe
  • Execute:
    1. Select and lock applicable job models
    2. Update applicable jobs to CANCELED status
    3. Mark applicable queue models as canceled
    4. Send update_recipe_metrics messages for affected recipes
    5. Send update_recipe messages to update dependent jobs that should now be BLOCKED

cancel_jobs_bulk

  • Inputs: Filter fields for started time, ended time, error categories, error IDs, job IDs, job type IDs, and status, current job ID field to track progress filtering across all jobs
  • Next Commands: cancel_jobs_bulk, cancel_jobs
  • Execute:
    1. Select up to a max batch size of job models that fit the filter criteria
    2. Compile list of job models that can be canceled
    3. Send new cancel_jobs_bulk message with current job ID set to lowest job retrieved so progress is made through the jobs table
    4. Send cancel_jobs messages for filtered jobs that can be canceled

completed_jobs

  • Inputs: Up to 100 (job_id, exe_num) pairs, ended time
  • Next Commands: publish_job, update_recipe_metrics, update_recipe
  • Execute:
    1. Select and lock applicable job models
    2. Ignore jobs for which this message is obsolete
    3. Update jobs to COMPLETED that are in valid state
    4. Set job output if execution output has been set
    5. Query all jobs that are both COMPLETED and have output and create update_recipe and publish_job messages for them
    6. Send update_recipe_metrics messages for affected recipes

create_batch_recipes

  • Inputs: Batch ID, flag to indicate if previous batch recipes are done, current recipe ID field to track progress re-processing previous batch recipes
  • Next Commands: create_batch_recipes, create_recipes
  • Execute:
    1. Retrieve batch
    2. If this batch is re-processing a previous batch, select up to a max size of recipes from the previous batch (recipes < the current recipe ID tracker)
    3. Send create_recipes messages to re-process the previous batch recipes
    4. If not finished with previous batch, send new create_batch_recipes message with current recipe ID set to lowest recipe retrieved so progress is made through the previous batch recipes

create_conditions

  • Inputs: root_recipe_id, recipe_id, batch_id, recipe_condition tuples to describe conditions to create
  • Next Commands: process_condition
  • Execute:
    1. Perform locking in a database transaction
    2. Look for existing conditions to see if message has already run, if so then skip to (5)
    3. Bulk create new condition model(s)
    4. Bulk create new recipe_node models
    5. Database transaction is over, now send messages
    6. For each condition, if its process_input flag is true, send process_condition message

create_job_exe_end

  • Inputs: Up to 10 job_exe_end models
  • Next Commands: None
  • Execute:
    1. Retrieve job_exe_end models with the given IDs to filter out any models that already exist
    2. Bulk create the job_exe_end models that don't exist

create_jobs

  • Inputs: event_id, create_jobs_type (determines a job made from input data or jobs made for recipe), job_type_name (input data type), job_type_version (input data type), job_type_rev_num (input data type), input_data (input data type), root_recipe_id (recipe type), superseded_recipe_id (recipe type), recipe_id (recipe type), batch_id (recipe type), recipe_jobs tuples to describe jobs to create (recipe type)
  • Next Commands: process_job_input, update_recipe_metrics
  • Execute:
    1. Perform locking in a database transaction
    2. Look for existing jobs to see if message has already run, if so then skip to (7)
    3. If jobs are within a recipe that is superseding another recipe, retrieve the superseded jobs
    4. Retrieve job type revision(s) for job(s) to create
    5. Bulk create new job model(s)
    6. Bulk create new recipe_node models if new jobs are in a recipe
    7. Database transaction is over, now send messages
    8. For each new job, if it has data or it's in a recipe and process_input flag is true, send process_job_input message
    9. Send update_recipe_metrics message if in a recipe

create_recipes

  • Inputs: batch_id, event_id, forced_nodes, create_recipes_type (determines a reprocess or sub-recipes type), recipe_type_name (reprocess), recipe_type_rev_num (reprocess), root_recipe_ids (reprocess), recipe_id (sub-recipes), root_recipe_id (sub-recipes), superseded_recipe_id (sub-recipes), sub_recipes (sub-recipes)
  • Next Commands: supersede_recipe_nodes, process_recipe_input, update_recipe_metrics
  • Execute:
    1. Perform locking in a database transaction
    2. Look for existing recipes to see if message has already run, if so then skip to (8)
    3. If superseding, get superseded models and create diffs between old and new revisions
    4. Bulk create new recipe models
    5. If this is a reprocess type, copy data from superseded recipe model and supersede reprocessed models
    6. Bulk create new recipe_node models if new recipes are sub-recipes
    7. Copy jobs and sub-recipes from superseded recipe (for identical nodes)
    8. Database transaction is over, now send messages
    9. If new recipe is superseding another recipe, then it is a reprocess so create supersede_recipe_nodes messages
    10. For each new recipe, if it has data or it's in a recipe and process_input flag is true, send process_recipe_input message
    11. Send update_recipe_metrics message if in a recipe

delete_files

  • Inputs: Up to 100 ScaleFile IDs, job_id, trigger_id, source_file_id, purge flag
  • Next Commands: purge_jobs
  • Execute:
    • If purge flag is true: Execute query to delete given ScaleFile entries and call purge_jobs for associated jobs
    • If purge flag is false: Execute query to mark given ScaleFile entries as deleted and unpublished

failed_jobs

  • Inputs: Up to 100 (job_id, exe_num) pairs sorted by error_id, ended time
  • Next Commands: queued_jobs (for retried jobs), update_recipe_metrics, update_recipe (for failed jobs)
  • Execute:
    1. Select and lock applicable job models, then select for job related fields
    2. Ignore jobs for which this message is obsolete
    3. Jobs to retry are sent in queued_jobs messages
    4. Jobs that aren't retried are updated to FAILED status and sent in update_recipe messages
    5. Send update_recipe_metrics messages for affected recipes

pending_jobs

  • Inputs: Up to 1000 job IDs, status change time
  • Next Commands: update_recipe_metrics
  • Execute:
    1. Select and lock applicable job models
    2. Update applicable jobs to PENDING status
    3. Send update_recipe_metrics messages for affected recipes

process_condition

  • Inputs: Condition ID
  • Next Commands: update_recipe
  • Execute:
    1. Query for condition model with related recipe and recipe__recipe_type_rev models
    2. If condition has not been processed yet:
      1. Query condition's recipe dependencies to get their output data
      2. Combine dependency outputs into condition's data, and then validate and set data on the condition model
    3. Apply the condition's data filter to its data, and set the condition as processed with is_accepted corresponding to whether the filter accepted the data
    4. Send update_recipe message to update the condition's recipe

process_job_input

  • Inputs: Job ID
  • Next Commands: queued_jobs
  • Execute:
    1. Query for job model with related job_type_rev, recipe and recipe__recipe_type_rev models
    2. If job is in recipe and does not have input data yet:
      1. Query job's recipe dependencies to get their output data
      2. Combine dependency outputs into job's input data, and then validate and set input data on the job model
    3. In a transaction:
      1. Lock job model
      2. Bulk create job input file models
      3. Update job input meta-data fields (input_file_size, source_started, etc)
    4. If job has never been queued, send queued_jobs message

process_recipe_input

  • Inputs: Recipe ID, forced nodes (optional)
  • Next Commands: update_recipe
  • Execute:
    1. Query for recipe model with related recipe_type_rev, recipe and recipe__recipe_type_rev models
    2. If recipe is a sub-recipe and does not have input data yet:
      1. Query sub-recipe's recipe dependencies to get their output data
      2. Combine dependency outputs into sub-recipe's input data, and then validate and set input data on the sub-recipe model
    3. In a transaction:
      1. Lock recipe model
      2. Bulk create recipe input file models
      3. Update recipe input meta-data fields (input_file_size, source_started, etc)
    4. Send update_recipe message for recipe

publish_job

  • Inputs: Job ID
  • Next Commands: None
  • Execute:
    1. Select job model with latest job_exe model
    2. Publish the job's products

queued_jobs

  • Inputs: Up to 100 (job_id, exe_num) pairs, requeue flag, optional priority
  • Next Commands: update_recipe_metrics
  • Execute:
    1. Select and lock applicable job models
    2. Ignore jobs for which this message is obsolete (also only accept jobs for first queue or re-queue depending on requeue flag)
    3. Update applicable jobs to QUEUED status
    4. Select QUEUED jobs with related fields
    5. Select input files for QUEUED jobs
    6. Bulk create new queue models
    7. Send update_recipe_metrics messages for affected recipes

purge_jobs

  • Inputs: Up to 100 job IDs, trigger_id, source_file_id
  • Next Commands: purge_recipe if part of recipe; purge_source_file
  • Execute:
    1. Call purge_source_file for the given source_file_id
    2. Select JobExecution where job ID is in the list of given purge_job_ids
    3. Select and Delete the following models (in order) where job_exe was selected:
      1. TaskUpdate
      2. JobExecutionOutput
      3. JobExecutionEnd
    4. Delete selected job_exes
    5. Select and delete the following (in order) where job ID is in the list of given purge_job_ids:
      1. FileAncestryLink
      2. BatchJob
      3. RecipeNode
      4. JobInputFile
      5. Queue
      6. Job

purge_recipe

  • Inputs: recipe_id, trigger_id, source_file_id
  • Next Commands: spawn_delete_files_job if leaf jobs in recipe; purge_recipe if leaf recipes in recipe; purge_source_file for input source files
  • Execute:
    1. Call purge_source_file for the given source_file_id
    2. If there are leaf nodes (no jobs/recipes dependent) in given recipe:
      1. spawn_delete_files_job for leaf jobs - purges leaf jobs
      2. purge_recipe for leaf recipes - purges leaf recipes
    3. If not (2) above and if the given recipe is part of another (parent) recipe:
      1. purge_recipe for parent recipe - purges parent recipe
    4. If not (2) above and if the given recipe is superseded:
      1. purge_recipe for superseding recipe - purges superseding recipe
    5. If not (2) above delete the following where recipe == the given recipe:
      1. BatchRecipe
      2. RecipeNode
      3. RecipeInputFile
      4. Recipe

purge_source_file

  • Inputs: source_file_id, trigger_id
  • Next Commands: spawn_delete_files_job for jobs that take source file as an input; purge_recipe for recipes that take source file as an input
  • Execute:
    1. If there are jobs that are not part of a recipe and take the given source file as an input:
      1. spawn_delete_files_job for jobs in order to purge them
    2. If there are recipes that take the given source file as an input:
      1. purge_recipe for recipes in order to purge them
    3. If not (1) and (2) above, delete the following where source_file_id == the given recipe:
      1. Ingest
      2. ScaleFile

reprocess_recipes

  • NOTE: This message is deprecated. Please use create_recipes instead
  • Inputs: Up to 100 root recipe IDs, new recipe type revision ID, event ID, all_jobs boolean, job names, optional batch ID
  • Next Commands: process_recipe_input, cancel_jobs, unpublish_jobs
  • Execute:
    1. Select and lock latest (non-superseded) recipe models for the given root recipe IDs
    2. Select recipe type model and all needed recipe type revision models
    3. Bulk create new recipe models
    4. Supersede old recipe models
    5. Select recipe_job models for all jobs in superseded recipes
    6. Bulk create recipe_job models for jobs that are being copied into new recipes
    7. Supersede old job models
    8. Send cancel_jobs messages for all superseded jobs
    9. Send unpublish_jobs messages for all jobs "deleted" from the superseded recipes
    10. Send process_recipe_input messages for all new recipes

requeue_jobs

  • Inputs: Up to 100 (job_id, exe_num) pairs, optional priority
  • Next Commands: queued_jobs, uncancel_jobs
  • Execute:
    1. Select job models and collect jobs that are valid for re-queue
    2. Collect jobs that are valid for being uncanceled
    3. Increase max_tries field for jobs that will be re-queued
    4. Send queued_jobs messages to queue the jobs that will be re-queued
    5. Send uncancel_jobs messages to uncancel jobs

requeue_jobs_bulk

  • Inputs: Filter fields for started time, ended time, error categories, error IDs, job IDs, job type IDs, and status, current job ID field to track progress filtering across all jobs, priority
  • Next Commands: requeue_jobs_bulk, requeue_jobs
  • Execute:
    1. Select up to a max batch size of job models that fit the filter criteria
    2. Compile list of job models that can be re-queued
    3. Send new requeue_jobs_bulk message with current job ID set to lowest job retrieved so progress is made through the jobs table
    4. Send requeue_jobs messages for filtered jobs that can be re-queued

restart_scheduler

  • Inputs: When timestamp
  • Next Commands: failed_jobs, create_job_exe_end
  • Execute:
    1. Select job_exe models that match QUEUED/RUNNING job models
    2. Create failed_jobs messages for (job ID and exe_num) pairs with scheduler-lost error
    3. Create create_job_exe_end messages for job_exe models containing failed job_exe_end models with scheduler-lost error

running_jobs

  • Inputs: Up to 100 (job_id, exe_num) pairs sorted by node_id, started time
  • Next Commands: update_recipe_metrics
  • Execute:
    1. Select and lock applicable job models
    2. Check job status and exe_num to determine whether each job model should have full update (includes status), partial update (just node), or no update (since exe_num has increased)
    3. Do a partial update per node to set node field
    4. Do a single update to set status to RUNNING for all jobs that need a full update
    5. Send update_recipe_metrics messages for affected recipes

spawn_delete_files_job

  • Inputs: job_id, trigger_id, source_file_id, purge flag
  • Next Commands: create_jobs
  • Execute:
    1. Get files produced by the input job_id
    2. Construct inputs for job comprised of a files dict, a workspaces dict, the purge flag, and the job_id
    3. Send create_jobs message to spawn scale-delete-files system job and give it the constructed inputs

supersede_recipe_nodes

  • Inputs: Up to 100 recipe IDs, supersede timestamp, supersede_all flag, nodes names for superseded jobs, nodes names for superseded sub-recipes, unpublish_all flag, nodes names for jobs to unpublish, supersede_recursive_all flag, nodes names for sub-recipes to recursively supersede, unpublish_recursive_all flag, nodes names for sub-recipes to recursively supersede/unpublish
  • Next Commands: cancel_jobs, unpublish_jobs, supersede_recipe_nodes
  • Execute:
    1. Supersede applicable jobs
    2. Supersede applicable sub-recipes
    3. Send cancel_jobs messages for superseded jobs
    4. Send unpublish_jobs messages for applicable jobs
    5. Send supersede_recipe_nodes messages to recursively handle sub-recipes

uncancel_jobs

  • Inputs: Up to 1000 job IDs, uncancel time
  • Next Commands: update_recipe_metrics, update_recipe
  • Execute:
    1. Select and lock applicable job models
    2. Update applicable jobs to PENDING status
    3. Send update_recipe messages for all job IDs
    4. Send update_recipe_metrics messages for affected recipes

unpublish_jobs

  • Inputs: Up to 100 job IDs, unpublish time
  • Next Commands: None
  • Execute:
    1. Mark all products produced by the given job IDs as unpublished

update_batch_metrics

  • Inputs: Up to 100 batch IDs
  • Next Commands: None
  • Execute:
    1. Execute query to generate recipe metrics for the batch
    2. Execute query to generate job metrics for the batch
    3. Execute query to generate job metrics per job name for the batch

update_recipe_metrics

  • Inputs: Up to 100 recipe IDs
  • Next Commands: update_recipe, update_recipe_metrics, update_batch_metrics
  • Execute:
    1. Update the job and sub-recipe count metrics in the given recipe models
    2. Send update_recipe_metrics messages for recipes that contain this message's recipes
    3. Send update_recipe messages for root recipes that contain this message's recipes
    4. Send update_batch_metrics messages for batches associated with any top-level (not a sub-recipe) recipe updated by this message

update_recipe

  • Inputs: Root recipe ID, forced nodes (optional)
  • Next Commands: blocked_jobs, pending_jobs, create_jobs, create_recipes, process_job_input, process_recipe_input
  • Execute:
    1. Retrieve recipe with recipe type revision and recipe nodes
    2. Set the recipe as completed if all nodes have completed
    3. Create messages for jobs that need to be set to BLOCKED status
    4. Create messages for jobs that need to be set to PENDING status
    5. Create messages to create any needed recipe jobs
    6. Create messages to create any needed sub-recipes
    7. Create messages to process job input for jobs ready to be queued
    8. Create messages to process recipe input for sub-recipes ready to be processed

update_recipes

  • NOTE: This message is deprecated. Please use update_recipe instead
  • Inputs: Up to 100 recipe IDs
  • Next Commands: blocked_jobs, pending_jobs, process_job_input
  • Execute:
    1. Lock applicable recipe models
    2. Process recipe handlers in batches, ensuring that each batch does not exceed 1000 jobs (unless a single recipe exceeds that on its own)
    3. Bulk create job and recipe_job models for recipe jobs that are ready to be created
    4. Create messages for recipe jobs that should be updated to BLOCKED/PENDING status
    5. For jobs ready for their input, set the job input from the recipe input and ancestor inputs
    6. Create process_job_input messages for jobs that are ready for first queue
    7. Set is_completed for newly completed recipes
Clone this wiki locally