From 06030614e54d33ed45584fd8e5193e9db7c5e74d Mon Sep 17 00:00:00 2001 From: Robrecht Cannoodt Date: Sun, 22 Sep 2024 08:46:04 +0200 Subject: [PATCH] fix workflows --- scripts/create_resources/test_resources.sh | 18 ++- scripts/run_benchmark/run_test_local.sh | 10 +- src/api/comp_transformer.yaml | 2 +- .../transform/config.vsh.yaml | 0 .../transform/script.py | 0 .../process_datasets/config.vsh.yaml | 38 +++--- src/workflows/process_datasets/main.nf | 125 +---------------- src/workflows/run_benchmark/config.vsh.yaml | 60 +++++--- src/workflows/run_benchmark/main.nf | 128 ++++++++++++------ 9 files changed, 163 insertions(+), 218 deletions(-) rename src/{transformers => data_processors}/transform/config.vsh.yaml (100%) rename src/{transformers => data_processors}/transform/script.py (100%) diff --git a/scripts/create_resources/test_resources.sh b/scripts/create_resources/test_resources.sh index f588723..9269469 100755 --- a/scripts/create_resources/test_resources.sh +++ b/scripts/create_resources/test_resources.sh @@ -14,7 +14,7 @@ DATASET_DIR=resources_test/task_batch_integration mkdir -p $DATASET_DIR # process dataset -viash run src/process_dataset/config.vsh.yaml -- \ +viash run src/data_processors/process_dataset/config.vsh.yaml -- \ --input "$RAW_DATA/cxg_mouse_pancreas_atlas/dataset.h5ad" \ --output_dataset "$DATASET_DIR/cxg_mouse_pancreas_atlas/dataset.h5ad" \ --output_solution "$DATASET_DIR/cxg_mouse_pancreas_atlas/solution.h5ad" @@ -25,18 +25,28 @@ viash run src/methods/combat/config.vsh.yaml -- \ --output $DATASET_DIR/cxg_mouse_pancreas_atlas/integrated.h5ad # run transformer -viash run src/transformers/transform/config.vsh.yaml -- \ +viash run src/data_processors/transform/config.vsh.yaml -- \ --input_integrated $DATASET_DIR/cxg_mouse_pancreas_atlas/integrated.h5ad \ --input_dataset $DATASET_DIR/cxg_mouse_pancreas_atlas/dataset.h5ad \ --expected_method_types feature \ --output $DATASET_DIR/cxg_mouse_pancreas_atlas/integrated_full.h5ad # run one metric -viash run src/metrics/accuracy/config.vsh.yaml -- \ - --input_prediction $DATASET_DIR/cxg_mouse_pancreas_atlas/integrated.h5ad \ +viash run src/metrics/graph_connectivity/config.vsh.yaml -- \ + --input_integrated $DATASET_DIR/cxg_mouse_pancreas_atlas/integrated_full.h5ad \ --input_solution $DATASET_DIR/cxg_mouse_pancreas_atlas/solution.h5ad \ --output $DATASET_DIR/cxg_mouse_pancreas_atlas/score.h5ad +# write the state file +cat > $DATASET_DIR/state.yaml << HERE +id: cxg_mouse_pancreas_atlas +output_dataset: !file dataset.h5ad +output_solution: !file solution.h5ad +output_integrated: !file integrated.h5ad +output_integrated_full: !file integrated_full.h5ad +output_score: !file score.h5ad +HERE + # only run this if you have access to the openproblems-data bucket aws s3 sync --profile op \ "resources_test/task_batch_integration" \ diff --git a/scripts/run_benchmark/run_test_local.sh b/scripts/run_benchmark/run_test_local.sh index 4325e95..82e31a8 100755 --- a/scripts/run_benchmark/run_test_local.sh +++ b/scripts/run_benchmark/run_test_local.sh @@ -6,14 +6,6 @@ REPO_ROOT=$(git rev-parse --show-toplevel) # ensure that the command below is run from the root of the repository cd "$REPO_ROOT" -# remove this when you have implemented the script -echo "TODO: once the 'run_benchmark' workflow has been implemented, update this script to use it." -echo " Step 1: replace 'task_batch_integration' with the name of the task in the following command." -echo " Step 2: replace the rename keys parameters to fit your run_benchmark inputs" -echo " Step 3: replace the settings parameter to fit your run_benchmark outputs" -echo " Step 4: remove this message" -exit 1 - set -e echo "Running benchmark on test data" @@ -26,7 +18,7 @@ publish_dir="resources/results/${RUN_ID}" # write the parameters to file cat > /tmp/params.yaml << HERE input_states: s3://openproblems-data/resources_test/task_batch_integration/**/state.yaml -rename_keys: 'input_train:output_train;input_test:output_test;input_solution:output_solution' +rename_keys: 'input_dataset:output_dataset;input_solution:output_solution' output_state: "state.yaml" publish_dir: "$publish_dir" HERE diff --git a/src/api/comp_transformer.yaml b/src/api/comp_transformer.yaml index 317de17..eb34729 100644 --- a/src/api/comp_transformer.yaml +++ b/src/api/comp_transformer.yaml @@ -1,4 +1,4 @@ -namespace: transformers +namespace: data_processors info: type: transformer type_info: diff --git a/src/transformers/transform/config.vsh.yaml b/src/data_processors/transform/config.vsh.yaml similarity index 100% rename from src/transformers/transform/config.vsh.yaml rename to src/data_processors/transform/config.vsh.yaml diff --git a/src/transformers/transform/script.py b/src/data_processors/transform/script.py similarity index 100% rename from src/transformers/transform/script.py rename to src/data_processors/transform/script.py diff --git a/src/workflows/process_datasets/config.vsh.yaml b/src/workflows/process_datasets/config.vsh.yaml index 18efbba..032cc8e 100644 --- a/src/workflows/process_datasets/config.vsh.yaml +++ b/src/workflows/process_datasets/config.vsh.yaml @@ -1,29 +1,23 @@ name: process_datasets namespace: workflows -status: disabled - argument_groups: - # - name: Inputs - # arguments: - # - name: "--input" - # __merge__: /src/api/file_common_dataset.yaml - # required: true - # direction: input - # - name: Outputs - # arguments: - # - name: "--output_train" - # __merge__: /src/api/file_train_h5ad.yaml - # required: true - # direction: output - # - name: "--output_test" - # __merge__: /src/api/file_test_h5ad.yaml - # required: true - # direction: output - # - name: "--output_solution" - # __merge__: /src/api/file_solution.yaml - # required: true - # direction: output + - name: Inputs + arguments: + - name: "--input" + __merge__: /src/api/file_common_dataset.yaml + required: true + direction: input + - name: Outputs + arguments: + - name: "--output_dataset" + __merge__: /src/api/file_dataset.yaml + required: true + direction: output + - name: "--output_solution" + __merge__: /src/api/file_solution.yaml + required: true + direction: output resources: - type: nextflow_script diff --git a/src/workflows/process_datasets/main.nf b/src/workflows/process_datasets/main.nf index 8ffa666..58007ad 100644 --- a/src/workflows/process_datasets/main.nf +++ b/src/workflows/process_datasets/main.nf @@ -1,7 +1,7 @@ include { findArgumentSchema } from "${meta.resources_dir}/helper.nf" workflow auto { - findStatesTemp(params, meta.config) + findStates(params, meta.config) | meta.workflow.run( auto: [publish: "state"] ) @@ -41,133 +41,14 @@ workflow run_wf { | process_dataset.run( fromState: [ input: "dataset" ], toState: [ - output_train: "output_train", - output_test: "output_test", + output_dataset: "output_dataset", output_solution: "output_solution" ] ) // only output the files for which an output file was specified - | setState(["output_train", "output_test", "output_solution"]) + | setState(["output_dataset", "output_solution"]) emit: output_ch } - - -// temp fix for rename_keys typo - -def findStatesTemp(Map params, Map config) { - def auto_config = deepClone(config) - def auto_params = deepClone(params) - - auto_config = auto_config.clone() - // override arguments - auto_config.argument_groups = [] - auto_config.arguments = [ - [ - type: "string", - name: "--id", - description: "A dummy identifier", - required: false - ], - [ - type: "file", - name: "--input_states", - example: "/path/to/input/directory/**/state.yaml", - description: "Path to input directory containing the datasets to be integrated.", - required: true, - multiple: true, - multiple_sep: ";" - ], - [ - type: "string", - name: "--filter", - example: "foo/.*/state.yaml", - description: "Regex to filter state files by path.", - required: false - ], - // to do: make this a yaml blob? - [ - type: "string", - name: "--rename_keys", - example: ["newKey1:oldKey1", "newKey2:oldKey2"], - description: "Rename keys in the detected input files. This is useful if the input files do not match the set of input arguments of the workflow.", - required: false, - multiple: true, - multiple_sep: ";" - ], - [ - type: "string", - name: "--settings", - example: '{"output_dataset": "dataset.h5ad", "k": 10}', - description: "Global arguments as a JSON glob to be passed to all components.", - required: false - ] - ] - if (!(auto_params.containsKey("id"))) { - auto_params["id"] = "auto" - } - - // run auto config through processConfig once more - auto_config = processConfig(auto_config) - - workflow findStatesTempWf { - helpMessage(auto_config) - - output_ch = - channelFromParams(auto_params, auto_config) - | flatMap { autoId, args -> - - def globalSettings = args.settings ? readYamlBlob(args.settings) : [:] - - // look for state files in input dir - def stateFiles = args.input_states - - // filter state files by regex - if (args.filter) { - stateFiles = stateFiles.findAll{ stateFile -> - def stateFileStr = stateFile.toString() - def matcher = stateFileStr =~ args.filter - matcher.matches()} - } - - // read in states - def states = stateFiles.collect { stateFile -> - def state_ = readTaggedYaml(stateFile) - [state_.id, state_] - } - - // construct renameMap - if (args.rename_keys) { - def renameMap = args.rename_keys.collectEntries{renameString -> - def split = renameString.split(":") - assert split.size() == 2: "Argument 'rename_keys' should be of the form 'newKey:oldKey;newKey:oldKey'" - split - } - - // rename keys in state, only let states through which have all keys - // also add global settings - states = states.collectMany{id, state -> - def newState = [:] - - for (key in renameMap.keySet()) { - def origKey = renameMap[key] - if (!(state.containsKey(origKey))) { - return [] - } - newState[key] = state[origKey] - } - - [[id, globalSettings + newState]] - } - } - - states - } - emit: - output_ch - } - - return findStatesTempWf -} \ No newline at end of file diff --git a/src/workflows/run_benchmark/config.vsh.yaml b/src/workflows/run_benchmark/config.vsh.yaml index 3e66142..07629b5 100644 --- a/src/workflows/run_benchmark/config.vsh.yaml +++ b/src/workflows/run_benchmark/config.vsh.yaml @@ -1,26 +1,19 @@ name: run_benchmark namespace: workflows -status: disabled - argument_groups: - name: Inputs arguments: - # - name: "--input_train" - # __merge__: /src/api/file_train_h5ad.yaml - # type: file - # direction: input - # required: true - # - name: "--input_test" - # __merge__: /src/api/file_test_h5ad.yaml - # type: file - # direction: input - # required: true - # - name: "--input_solution" - # __merge__: /src/api/file_solution.yaml - # type: file - # direction: input - # required: true + - name: "--input_dataset" + __merge__: /src/api/file_dataset.yaml + type: file + direction: input + required: true + - name: "--input_solution" + __merge__: /src/api/file_solution.yaml + type: file + direction: input + required: true - name: Outputs arguments: - name: "--output_scores" @@ -66,9 +59,36 @@ resources: dependencies: - name: h5ad/extract_uns_metadata repository: core - - name: control_methods/true_labels - - name: methods/logistic_regression - - name: metrics/accuracy + - name: control_methods/embed_cell_types + - name: control_methods/embed_cell_types_jittered + - name: control_methods/no_integration + - name: control_methods/no_integration_batch + - name: control_methods/shuffle_integration + - name: control_methods/shuffle_integration_by_batch + - name: control_methods/shuffle_integration_by_cell_type + - name: methods/bbknn + - name: methods/combat + - name: methods/fastmnn + - name: methods/liger + - name: methods/mnn_correct + - name: methods/mnnpy + - name: methods/pyliger + - name: methods/scalex + - name: methods/scanorama + - name: methods/scanvi + - name: methods/scvi + - name: metrics/asw_batch + - name: metrics/aws_label + - name: metrics/cell_cycle_conservation + - name: metrics/clustering_overlap + - name: metrics/graph_connectivity + - name: metrics/hvg_overlap + - name: metrics/isolated_label_asw + - name: metrics/isolated_label_f1 + - name: metrics/kbet + - name: metrics/lisi + - name: metrics/pcr + - name: data_processors/transform runners: - type: nextflow diff --git a/src/workflows/run_benchmark/main.nf b/src/workflows/run_benchmark/main.nf index 2ea9016..331fc46 100644 --- a/src/workflows/run_benchmark/main.nf +++ b/src/workflows/run_benchmark/main.nf @@ -7,13 +7,39 @@ workflow auto { // construct list of methods and control methods methods = [ - true_labels, - logistic_regression + embed_cell_types, + embed_cell_types_jittered, + no_integration, + no_integration_batch, + shuffle_integration, + shuffle_integration_by_batch, + shuffle_integration_by_cell_type, + bbknn, + combat, + fastmnn, + liger, + mnn_correct, + mnnpy, + pyliger, + scalex, + scanorama, + scanvi, + scvi ] // construct list of metrics metrics = [ - accuracy + asw_batch, + asw_label, + cell_cycle_conservation, + clustering_overlap, + graph_connectivity, + hvg_overlap, + isolated_label_asw, + isolated_label_f1, + kbet, + lisi, + pcr ] workflow run_wf { @@ -69,11 +95,11 @@ workflow run_wf { // use 'fromState' to fetch the arguments the component requires from the overall state fromState: { id, state, comp -> - def new_args = [ - input_train: state.input_train, - input_test: state.input_test - ] - if (comp.config.info.type == "control_method") { + def new_args = [] + if (comp.config.info.type == "method") { + new_args.input = state.input_dataset + } else if (comp.config.info.type == "control_method") { + new_args.input_dataset = state.input_dataset new_args.input_solution = state.input_solution } new_args @@ -83,21 +109,46 @@ workflow run_wf { toState: { id, output, state, comp -> state + [ method_id: comp.config.name, + method_types: comp.config.info.method_types method_output: output.output ] } ) + | transform.run( + fromState: [input: "method_output"], + toState: { id, state, output -> + def method_types_cleaned = [] + if ("feature" in state.method_types) { + method_types_cleaned += ["feature", "embedding", "graph"] + } else if ("embedding" in state.method_types) { + method_types_cleaned += ["embedding", "graph"] + } else if ("graph" in state.method_types) { + method_types_cleaned += ["graph"] + } + + def new_state = state + [ + method_output_cleaned: output.output, + method_types_cleaned: method_types_cleaned + ] + + [id, new_state] + } + ) + // run all metrics | runEach( components: metrics, id: { id, state, comp -> id + "." + comp.config.name }, + filter: { id, state, comp -> + comp.info.metric_type in state.method_types_cleaned + }, // use 'fromState' to fetch the arguments the component requires from the overall state fromState: [ input_solution: "input_solution", - input_prediction: "method_output" + input_integrated: "method_output_cleaned" ], // use 'toState' to publish that component's outputs to the overall state toState: { id, output, state, comp -> @@ -108,6 +159,26 @@ workflow run_wf { } ) + // extract the scores + | extract_uns_metadata.run( + key: "extract_scores", + fromState: [input: "metric_output"], + toState: { id, output, state -> + state + [ + score_uns: readYaml(output.output).uns + ] + } + ) + + | joinStates { ids, states -> + // store the scores in a file + def score_uns = states.collect{it.score_uns} + def score_uns_yaml_blob = toYamlBlob(score_uns) + def score_uns_file = tempFile("score_uns.yaml") + score_uns_file.write(score_uns_yaml_blob) + + ["output", [output_scores: score_uns_file]] + } /****************************** * GENERATE OUTPUT YAML FILES * @@ -115,7 +186,7 @@ workflow run_wf { // TODO: can we store everything below in a separate helper function? // extract the dataset metadata - dataset_meta_ch = dataset_ch + meta_ch = dataset_ch // only keep one of the normalization methods | filter{ id, state -> state.dataset_uns.normalization_id == "log_cp10k" @@ -131,23 +202,6 @@ workflow run_wf { def dataset_uns_file = tempFile("dataset_uns.yaml") dataset_uns_file.write(dataset_uns_yaml_blob) - ["output", [output_dataset_info: dataset_uns_file]] - } - - output_ch = score_ch - - // extract the scores - | extract_metadata.run( - key: "extract_scores", - fromState: [input: "metric_output"], - toState: { id, output, state -> - state + [ - score_uns: readYaml(output.output).uns - ] - } - ) - - | joinStates { ids, states -> // store the method configs in a file def method_configs = methods.collect{it.config} def method_configs_yaml_blob = toYamlBlob(method_configs) @@ -160,30 +214,24 @@ workflow run_wf { def metric_configs_file = tempFile("metric_configs.yaml") metric_configs_file.write(metric_configs_yaml_blob) + // store the task info in a file def viash_file = meta.resources_dir.resolve("_viash.yaml") - def viash_file_content = toYamlBlob(readYaml(viash_file).info) - def task_info_file = tempFile("task_info.yaml") - task_info_file.write(viash_file_content) - - // store the scores in a file - def score_uns = states.collect{it.score_uns} - def score_uns_yaml_blob = toYamlBlob(score_uns) - def score_uns_file = tempFile("score_uns.yaml") - score_uns_file.write(score_uns_yaml_blob) + // create output state def new_state = [ + output_dataset_info: dataset_uns_file, output_method_configs: method_configs_file, output_metric_configs: metric_configs_file, - output_task_info: task_info_file, - output_scores: score_uns_file, + output_task_info: viash_file, _meta: states[0]._meta ] ["output", new_state] } - // merge all of the output data - | mix(dataset_meta_ch) + // merge all of the output data + output_ch = score_ch + | mix(meta_ch) | joinStates{ ids, states -> def mergedStates = states.inject([:]) { acc, m -> acc + m } [ids[0], mergedStates]