Skip to content

Commit

Permalink
Finished main code for history
Browse files Browse the repository at this point in the history
  • Loading branch information
khoroshevskyi committed Jul 3, 2024
1 parent a10f1e5 commit 5054510
Show file tree
Hide file tree
Showing 3 changed files with 190 additions and 8 deletions.
5 changes: 5 additions & 0 deletions pepdbagent/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,8 @@ def __init__(self, msg=""):
class NamespaceNotFoundError(PEPDatabaseAgentError):
def __init__(self, msg=""):
super().__init__(f"""Project does not exist. {msg}""")


class HistoryNotFoundError(PEPDatabaseAgentError):
def __init__(self, msg=""):
super().__init__(f"""History does not exist. {msg}""")
193 changes: 185 additions & 8 deletions pepdbagent/modules/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,23 @@
from sqlalchemy.orm.attributes import flag_modified

from pepdbagent.const import DEFAULT_TAG, DESCRIPTION_KEY, NAME_KEY, PEPHUB_SAMPLE_ID_KEY, PKG_NAME
from pepdbagent.db_utils import BaseEngine, Projects, Samples, Subsamples, User, HistoryProjects, HistorySamples, UpdateTypes
from pepdbagent.db_utils import (
BaseEngine,
Projects,
Samples,
Subsamples,
User,
HistoryProjects,
HistorySamples,
UpdateTypes,
)
from pepdbagent.exceptions import (
PEPDatabaseAgentError,
ProjectDuplicatedSampleGUIDsError,
ProjectNotFoundError,
ProjectUniqueNameError,
SampleTableUpdateError,
HistoryNotFoundError,
)
from pepdbagent.models import ProjectDict, UpdateItems, UpdateModel
from pepdbagent.utils import create_digest, generate_guid, order_samples, registry_path_converter
Expand Down Expand Up @@ -127,6 +137,31 @@ def _get_samples(self, session: Session, prj_id: int, with_id: bool) -> List[Dic
:param prj_id: project id
:param with_id: retrieve sample with id
"""
result_dict = self._get_samples_dict(prj_id, session, with_id)

result_dict = order_samples(result_dict)

ordered_samples_list = [sample["sample"] for sample in result_dict]
return ordered_samples_list

@staticmethod
def _get_samples_dict(prj_id: int, session: Session, with_id: bool) -> Dict:
"""
Get not ordered samples from the project. This method is used to retrieve samples from the project
:param prj_id: project id
:param session: open session object
:param with_id: retrieve sample with id
:return: dictionary with samples:
{guid:
{
"sample": sample_dict,
"guid": guid,
"parent_guid": parent_guid
}
}
"""
samples_results = session.scalars(select(Samples).where(Samples.project_id == prj_id))
result_dict = {}
for sample in samples_results:
Expand All @@ -140,10 +175,7 @@ def _get_samples(self, session: Session, prj_id: int, with_id: bool) -> List[Dic
"parent_guid": sample.parent_guid,
}

result_dict = order_samples(result_dict)

ordered_samples_list = [sample["sample"] for sample in result_dict]
return ordered_samples_list
return result_dict

@staticmethod
def _create_select_statement(name: str, namespace: str, tag: str = DEFAULT_TAG) -> Select:
Expand Down Expand Up @@ -575,7 +607,6 @@ def update(
SAMPLE_TABLE_INDEX_KEY, "sample_name"
),
history_sa_model=new_history,

)

if "subsamples" in update_dict:
Expand Down Expand Up @@ -664,7 +695,8 @@ def _update_samples(
session.add(new_sample)

if history_sa_model:
history_sa_model.sample_changes_mapping.append(HistorySamples(
history_sa_model.sample_changes_mapping.append(
HistorySamples(
guid=new_sample.guid,
parent_guid=new_sample.parent_guid,
sample_json=new_sample.sample,
Expand Down Expand Up @@ -709,7 +741,8 @@ def _update_samples(
for remove_id in deleted_ids:

if history_sa_model:
history_sa_model.sample_changes_mapping.append(HistorySamples(
history_sa_model.sample_changes_mapping.append(
HistorySamples(
guid=old_samples_mapping[remove_id].guid,
parent_guid=old_samples_mapping[remove_id].parent_guid,
sample_json=old_samples_mapping[remove_id].sample,
Expand Down Expand Up @@ -1024,3 +1057,147 @@ def get_samples(
.sample_table.replace({np.nan: None})
.to_dict(orient="records")
)

def get_project_history(self, namespace: str, name: str, tag: str) -> Union[dict, None]:
"""
Get project history annotation by providing namespace, name, and tag
:param namespace: project namespace
:param name: project name
:param tag: project tag
:return: project history annotation
"""

with Session(self._sa_engine) as session:
statement = (
select(HistoryProjects)
.where(
HistoryProjects.project_id
== select(Projects.id)
.where(
and_(
Projects.namespace == namespace,
Projects.name == name,
Projects.tag == tag,
)
)
.subquery()
)
.order_by(HistoryProjects.update_time.desc())
)
results = session.scalars(statement)

# TODO: it's not working, need to be fixed
if results:
return {result.id: result.__dict__ for result in results}

def get_project_from_history(
self, namespace: str, name: str, tag: str, history_id: int, raw: bool = True
) -> Union[dict, peppy.Project]:
"""
Get project sample history annotation by providing namespace, name, and tag
:param namespace: project namespace
:param name: project name
:param tag: project tag
:param history_id: history id
:param raw: if True, retrieve unprocessed (raw) PEP dict. [Default: True]
:return: project sample history annotation
"""

with Session(self._sa_engine) as session:
project_mapping = session.scalar(
select(Projects).where(
and_(
Projects.namespace == namespace,
Projects.name == name,
Projects.tag == tag,
)
)
)
if not project_mapping:
raise ProjectNotFoundError(
f"No project found for supplied input: '{namespace}/{name}:{tag}'. "
f"Did you supply a valid namespace and project?"
)

sample_dict = self._get_samples_dict(
prj_id=project_mapping.id, session=session, with_id=True
)

changes_mappings = session.scalars(
select(HistoryProjects)
.where(
and_(
HistoryProjects.project_id == project_mapping.id,
)
)
.order_by(HistoryProjects.update_time.desc())
)

changes_ids = [result.id for result in changes_mappings]

if history_id not in changes_ids:
raise HistoryNotFoundError(
f"No history found for supplied input: '{namespace}/{name}:{tag}'. "
f"Did you supply a valid history id?"
)

# Changes mapping is a ordered list from most early to latest changes
# We have to loop through each change and apply it to the sample list
# It should be done before we found the history_id that user is looking for
project_config = None

for result in changes_mappings:
sample_dict = self._apply_history_changes(sample_dict, result)

if result.id == history_id:
project_config = result.project_yaml
break

samples_list = order_samples(sample_dict)
ordered_samples_list = [sample["sample"] for sample in samples_list]

if raw:
return {
CONFIG_KEY: project_config or project_mapping.config,
SAMPLE_RAW_DICT_KEY: ordered_samples_list,
SUBSAMPLE_RAW_LIST_KEY: self.get_subsamples(namespace, name, tag),
}
return peppy.Project.from_dict(
pep_dictionary={
CONFIG_KEY: project_config or project_mapping.config,
SAMPLE_RAW_DICT_KEY: ordered_samples_list,
SUBSAMPLE_RAW_LIST_KEY: self.get_subsamples(namespace, name, tag),
}
)

@staticmethod
def _apply_history_changes(sample_list: dict, change: HistoryProjects) -> dict:
"""
Apply changes from the history to the sample list
:param sample_list: dictionary with samples
:param change: history change
:return: updated sample list
"""
for sample_change in change.sample_changes_mapping:
sample_id = sample_change.guid

if sample_change.change_type == UpdateTypes.UPDATE:
sample_list[sample_id]["sample"] = sample_change.sample_json
sample_list[sample_id]["parent_guid"] = sample_change.parent_guid

elif sample_change.change_type == UpdateTypes.DELETE:
sample_list[sample_id] = {
"sample": sample_change.sample_json,
"guid": sample_id,
"parent_guid": sample_change.parent_guid,
}

elif sample_change.change_type == UpdateTypes.INSERT:
del sample_list[sample_id]

return sample_list
Empty file added tests/test_project_history.py
Empty file.

0 comments on commit 5054510

Please sign in to comment.