Skip to content

Commit

Permalink
Tmp commit of omnification + start of bugfixes
Browse files Browse the repository at this point in the history
  • Loading branch information
sveinugu committed Dec 13, 2024
1 parent 018b5e4 commit ac9e078
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 52 deletions.
5 changes: 5 additions & 0 deletions src/omnipy/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,11 @@ def _set_data_file_and_validate(self, key: str, val: ModelT) -> None:
prev_value = self.data[key]

try:
# if is_model_instance(val):
# self.data[key] = self.get_model_class()(val)
# else:
# self.data[key] = val

self.data[key] = val
self._validate_data_file(key)
except Exception:
Expand Down
9 changes: 9 additions & 0 deletions src/omnipy/engine/job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,15 @@ def _inner_run_linear_flow(*args: object, **kwargs: object):
result = None
with linear_flow.flow_context:
for i, job in enumerate(linear_flow.task_templates):
# run_kwargs = kwargs if i == 0 else {}
# if job.has_coroutine_func():
# async def resolve_async_job(*inner_args, **inner_kwargs):
# return await resolve(job(*inner_args, **inner_kwargs))
#
# return resolve_async_job(*args, **run_kwargs)
# # TODO: Better handling of kwargs
# result = job(*args, **run_kwargs)

# TODO: Better handling of kwargs
if i == 0:
result = job(*args, **kwargs)
Expand Down
92 changes: 40 additions & 52 deletions src/omnipy/modules/fairtracks/create_filter.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,36 @@
#!/usr/bin/env python

from copy import copy
import json
import sys
from typing import List, Optional

from pydantic import BaseModel
from pydantic import BaseModel, HttpUrl
import requests

from omnipy import create_row_index_from_column, Model
from omnipy.compute.flow import LinearFlowTemplate
from omnipy.compute.task import TaskTemplate
from omnipy.modules.json.datasets import (JsonDataset,
JsonDictOfDictsOfScalarsDataset,
JsonListOfDictsOfScalarsDataset)
from omnipy.modules.json.models import JsonDictOfDictsOfScalarsModel, JsonListOfDictsOfScalarsModel
from omnipy.modules.json.typedefs import Json
from omnipy.modules.json.datasets import JsonDataset, JsonDictOfDictsOfScalarsDataset
from omnipy.modules.json.models import JsonDictOfDictsOfScalarsModel, JsonModel
from omnipy.modules.json.typedefs import JsonScalar

#########################################################
# user parameters
#######################################################
download_all_projects = False
download_all_cases = False
number_of_projects = 2
number_of_cases = 2
number_of_files = 2
DOWNLOAD_ALL_PROJECTS = False
DOWNLOAD_ALL_CASES = False
NUMBER_OF_PROJECTS = 2
NUMBER_OF_CASES = 2
NUMBER_OF_FILES = 2
# NB very few annotations per case, get all

#######################################################
# endpoints definition
########################################################
projects_endpt = 'https://api.gdc.cancer.gov/projects'
cases_endpt = 'https://api.gdc.cancer.gov/cases'
annotations_endpt = 'https://api.gdc.cancer.gov/annotations'
TOP_LEVEL_URL = 'https://api.gdc.cancer.gov/'
CASES_ENDPT = 'https://api.gdc.cancer.gov/cases'
ANNOTATIONS_ENDPT = 'https://api.gdc.cancer.gov/annotations'
PROJECTS_ENDPT = 'https://api.gdc.cancer.gov/projects'

###########################################################
# step1: get total number of TCGA projects (program.name)
Expand All @@ -52,42 +51,31 @@ def create_filter(field: str, value: List[str]) -> Filter:
return Filter(content=Content(field=field, value=value))


def call_endpoint(endpoint: str, filters: Optional[Filter] = None, **kwargs: object) -> Json:
def call_endpoint(top_level_url: HttpUrl,
endpoint: str,
filters: Optional[Filter] = None,
**kwargs: JsonScalar) -> JsonModel:
if filters:
kwargs['filters'] = json.dumps(filters.dict())
return requests.get(endpoint, params=kwargs).json() # type: ignore
# full_endpoint = top_level_url + endpoint
return JsonModel(requests.get(endpoint, params=kwargs).json())


class ProjectCountDataset(Model[int]):
...


@TaskTemplate()
def get_project_counts() -> JsonListOfDictsOfScalarsDataset:
def get_project_counts(top_level_url: HttpUrl = HttpUrl(url=TOP_LEVEL_URL)) -> ProjectCountDataset:
response = call_endpoint(
projects_endpt,
'projects',
facets='program.name',
size=0,
**{'from': 0}, # due to 'from' being a reserved keyword
**{'from': 0}, # type: ignore[arg-type] # needed as 'from' being a reserved keyword
)
buckets = response['data']['aggregations']['program.name']['buckets']

dataset = JsonListOfDictsOfScalarsDataset()
dataset['buckets'] = buckets
return dataset


@TaskTemplate(iterate_over_data_files=True)
def create_dict_from_list_of_dicts(
list_of_dicts: JsonListOfDictsOfScalarsModel,
field_whose_values_will_be_new_keys: str) -> JsonDictOfDictsOfScalarsModel:
output_dict = {}
for item in list_of_dicts:
item_copy = copy(item.contents)
new_key = item_copy[field_whose_values_will_be_new_keys]
del item_copy[field_whose_values_will_be_new_keys]
output_dict[new_key] = item_copy
return output_dict

buckets = response['data']['aggregations']['program.name']['buckets'] # type: ignore[index]

# TODO: Figure out why JsonDictOfDictsOfScalarsModel was serialized as CSV
# TODO: Somehow fix so that we do not need to call Model.contents within a task
return ProjectCountDataset(buckets)


@TaskTemplate()
Expand All @@ -99,14 +87,14 @@ def rest_of_the_steps( # noqa: C901
program_info_dict = project_count_per_program['buckets']['TCGA'].contents
size_max = program_info_dict['doc_count']

if download_all_projects:
if DOWNLOAD_ALL_PROJECTS:
size = size_max
else:
size = min(size_max, number_of_projects)
size = min(size_max, NUMBER_OF_PROJECTS)

fields = ','.join(['summary.case_count', 'summary.file_count'])
response = call_endpoint(
projects_endpt, filters=create_filter('program.name', ['TCGA']), fields=fields, size=size)
PROJECTS_ENDPT, filters=create_filter('program.name', ['TCGA']), fields=fields, size=size)
projects = response['data']['hits']
if len(projects) != size:
print('size mismatch')
Expand All @@ -118,12 +106,12 @@ def rest_of_the_steps( # noqa: C901
##############################################################################################

for project in projects:
if download_all_cases:
if DOWNLOAD_ALL_CASES:
size = project['summary']['case_count']
else:
size = min(project['summary']['case_count'], number_of_cases)
size = min(project['summary']['case_count'], NUMBER_OF_CASES)
response = call_endpoint(
cases_endpt,
CASES_ENDPT,
filter=create_filter('project.project_id', [project['id']]),
fields=','.join(['files.file_id', 'summary.file_count', 'annotation']),
size=size)
Expand All @@ -138,7 +126,7 @@ def rest_of_the_steps( # noqa: C901
for project in projects:
for case in project['cases']:
response = call_endpoint(
annotations_endpt,
ANNOTATIONS_ENDPT,
filter=create_filter('case_id', [case['id']]),
fields=['annotation_id'],
size=size)
Expand All @@ -159,7 +147,7 @@ def rest_of_the_steps( # noqa: C901
for case in project['cases']:
case_id_list.append(case['id'])
for i, file in enumerate(case['files']):
if i >= number_of_files:
if i >= NUMBER_OF_FILES:
continue
file_id_list.append(file['file_id'])
for annotation in case['annotations']:
Expand All @@ -180,12 +168,12 @@ def rest_of_the_steps( # noqa: C901

@LinearFlowTemplate(
get_project_counts,
create_dict_from_list_of_dicts.refine(
name='create_dict_from_list_of_dicts_using_key_field',
create_row_index_from_column.refine(
name='create_dict_of_dicts_from_list_of_dicts_using_key_field',
fixed_params=dict(field_whose_values_will_be_new_keys='key')),
rest_of_the_steps)
def get_filters_for_tcga() -> JsonDictOfDictsOfScalarsModel:
...


# get_filters_for_tcga.run()
get_filters_for_tcga.run()

0 comments on commit ac9e078

Please sign in to comment.