diff --git a/src/omnipy/data/dataset.py b/src/omnipy/data/dataset.py index 099ec708..d2c1c267 100644 --- a/src/omnipy/data/dataset.py +++ b/src/omnipy/data/dataset.py @@ -506,6 +506,11 @@ def _set_data_file_and_validate(self, key: str, val: ModelT) -> None: prev_value = self.data[key] try: + # if is_model_instance(val): + # self.data[key] = self.get_model_class()(val) + # else: + # self.data[key] = val + self.data[key] = val self._validate_data_file(key) except Exception: diff --git a/src/omnipy/engine/job_runner.py b/src/omnipy/engine/job_runner.py index f1c227e5..882db011 100644 --- a/src/omnipy/engine/job_runner.py +++ b/src/omnipy/engine/job_runner.py @@ -112,6 +112,15 @@ def _inner_run_linear_flow(*args: object, **kwargs: object): result = None with linear_flow.flow_context: for i, job in enumerate(linear_flow.task_templates): + # run_kwargs = kwargs if i == 0 else {} + # if job.has_coroutine_func(): + # async def resolve_async_job(*inner_args, **inner_kwargs): + # return await resolve(job(*inner_args, **inner_kwargs)) + # + # return resolve_async_job(*args, **run_kwargs) + # # TODO: Better handling of kwargs + # result = job(*args, **run_kwargs) + # TODO: Better handling of kwargs if i == 0: result = job(*args, **kwargs) diff --git a/src/omnipy/modules/fairtracks/create_filter.py b/src/omnipy/modules/fairtracks/create_filter.py index 1c431bf3..4b4ef25e 100644 --- a/src/omnipy/modules/fairtracks/create_filter.py +++ b/src/omnipy/modules/fairtracks/create_filter.py @@ -1,37 +1,36 @@ #!/usr/bin/env python -from copy import copy import json import sys from typing import List, Optional -from pydantic import BaseModel +from pydantic import BaseModel, HttpUrl import requests +from omnipy import create_row_index_from_column, Model from omnipy.compute.flow import LinearFlowTemplate from omnipy.compute.task import TaskTemplate -from omnipy.modules.json.datasets import (JsonDataset, - JsonDictOfDictsOfScalarsDataset, - JsonListOfDictsOfScalarsDataset) -from omnipy.modules.json.models import JsonDictOfDictsOfScalarsModel, JsonListOfDictsOfScalarsModel -from omnipy.modules.json.typedefs import Json +from omnipy.modules.json.datasets import JsonDataset, JsonDictOfDictsOfScalarsDataset +from omnipy.modules.json.models import JsonDictOfDictsOfScalarsModel, JsonModel +from omnipy.modules.json.typedefs import JsonScalar ######################################################### # user parameters ####################################################### -download_all_projects = False -download_all_cases = False -number_of_projects = 2 -number_of_cases = 2 -number_of_files = 2 +DOWNLOAD_ALL_PROJECTS = False +DOWNLOAD_ALL_CASES = False +NUMBER_OF_PROJECTS = 2 +NUMBER_OF_CASES = 2 +NUMBER_OF_FILES = 2 # NB very few annotations per case, get all ####################################################### # endpoints definition ######################################################## -projects_endpt = 'https://api.gdc.cancer.gov/projects' -cases_endpt = 'https://api.gdc.cancer.gov/cases' -annotations_endpt = 'https://api.gdc.cancer.gov/annotations' +TOP_LEVEL_URL = 'https://api.gdc.cancer.gov/' +CASES_ENDPT = 'https://api.gdc.cancer.gov/cases' +ANNOTATIONS_ENDPT = 'https://api.gdc.cancer.gov/annotations' +PROJECTS_ENDPT = 'https://api.gdc.cancer.gov/projects' ########################################################### # step1: get total number of TCGA projects (program.name) @@ -52,42 +51,31 @@ def create_filter(field: str, value: List[str]) -> Filter: return Filter(content=Content(field=field, value=value)) -def call_endpoint(endpoint: str, filters: Optional[Filter] = None, **kwargs: object) -> Json: +def call_endpoint(top_level_url: HttpUrl, + endpoint: str, + filters: Optional[Filter] = None, + **kwargs: JsonScalar) -> JsonModel: if filters: kwargs['filters'] = json.dumps(filters.dict()) - return requests.get(endpoint, params=kwargs).json() # type: ignore + # full_endpoint = top_level_url + endpoint + return JsonModel(requests.get(endpoint, params=kwargs).json()) + + +class ProjectCountDataset(Model[int]): + ... @TaskTemplate() -def get_project_counts() -> JsonListOfDictsOfScalarsDataset: +def get_project_counts(top_level_url: HttpUrl = HttpUrl(url=TOP_LEVEL_URL)) -> ProjectCountDataset: response = call_endpoint( - projects_endpt, + 'projects', facets='program.name', size=0, - **{'from': 0}, # due to 'from' being a reserved keyword + **{'from': 0}, # type: ignore[arg-type] # needed as 'from' being a reserved keyword ) - buckets = response['data']['aggregations']['program.name']['buckets'] - - dataset = JsonListOfDictsOfScalarsDataset() - dataset['buckets'] = buckets - return dataset - - -@TaskTemplate(iterate_over_data_files=True) -def create_dict_from_list_of_dicts( - list_of_dicts: JsonListOfDictsOfScalarsModel, - field_whose_values_will_be_new_keys: str) -> JsonDictOfDictsOfScalarsModel: - output_dict = {} - for item in list_of_dicts: - item_copy = copy(item.contents) - new_key = item_copy[field_whose_values_will_be_new_keys] - del item_copy[field_whose_values_will_be_new_keys] - output_dict[new_key] = item_copy - return output_dict - + buckets = response['data']['aggregations']['program.name']['buckets'] # type: ignore[index] -# TODO: Figure out why JsonDictOfDictsOfScalarsModel was serialized as CSV -# TODO: Somehow fix so that we do not need to call Model.contents within a task + return ProjectCountDataset(buckets) @TaskTemplate() @@ -99,14 +87,14 @@ def rest_of_the_steps( # noqa: C901 program_info_dict = project_count_per_program['buckets']['TCGA'].contents size_max = program_info_dict['doc_count'] - if download_all_projects: + if DOWNLOAD_ALL_PROJECTS: size = size_max else: - size = min(size_max, number_of_projects) + size = min(size_max, NUMBER_OF_PROJECTS) fields = ','.join(['summary.case_count', 'summary.file_count']) response = call_endpoint( - projects_endpt, filters=create_filter('program.name', ['TCGA']), fields=fields, size=size) + PROJECTS_ENDPT, filters=create_filter('program.name', ['TCGA']), fields=fields, size=size) projects = response['data']['hits'] if len(projects) != size: print('size mismatch') @@ -118,12 +106,12 @@ def rest_of_the_steps( # noqa: C901 ############################################################################################## for project in projects: - if download_all_cases: + if DOWNLOAD_ALL_CASES: size = project['summary']['case_count'] else: - size = min(project['summary']['case_count'], number_of_cases) + size = min(project['summary']['case_count'], NUMBER_OF_CASES) response = call_endpoint( - cases_endpt, + CASES_ENDPT, filter=create_filter('project.project_id', [project['id']]), fields=','.join(['files.file_id', 'summary.file_count', 'annotation']), size=size) @@ -138,7 +126,7 @@ def rest_of_the_steps( # noqa: C901 for project in projects: for case in project['cases']: response = call_endpoint( - annotations_endpt, + ANNOTATIONS_ENDPT, filter=create_filter('case_id', [case['id']]), fields=['annotation_id'], size=size) @@ -159,7 +147,7 @@ def rest_of_the_steps( # noqa: C901 for case in project['cases']: case_id_list.append(case['id']) for i, file in enumerate(case['files']): - if i >= number_of_files: + if i >= NUMBER_OF_FILES: continue file_id_list.append(file['file_id']) for annotation in case['annotations']: @@ -180,12 +168,12 @@ def rest_of_the_steps( # noqa: C901 @LinearFlowTemplate( get_project_counts, - create_dict_from_list_of_dicts.refine( - name='create_dict_from_list_of_dicts_using_key_field', + create_row_index_from_column.refine( + name='create_dict_of_dicts_from_list_of_dicts_using_key_field', fixed_params=dict(field_whose_values_will_be_new_keys='key')), rest_of_the_steps) def get_filters_for_tcga() -> JsonDictOfDictsOfScalarsModel: ... -# get_filters_for_tcga.run() +get_filters_for_tcga.run()