diff --git a/MANIFEST.in b/MANIFEST.in index ddae2813..f1d4aacf 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -4,5 +4,6 @@ include pipestat/schemas/* include pipestat/backends/* include pipestat/backends/file_backend/* include pipestat/backends/db_backend/* +include pipestat/backends/pephub_backend/* include pipestat/pipestatreader/* include pipestat/jinja_templates/* diff --git a/pipestat/__init__.py b/pipestat/__init__.py index 407f62ca..dd436b5e 100644 --- a/pipestat/__init__.py +++ b/pipestat/__init__.py @@ -3,15 +3,9 @@ import logmuse from ._version import __version__ -from .exceptions import PipestatError from .const import PKG_NAME -from .pipestat import ( - PipestatManager, - SamplePipestatManager, - ProjectPipestatManager, - PipestatBoss, -) - +from .exceptions import PipestatError +from .pipestat import PipestatBoss, PipestatManager, ProjectPipestatManager, SamplePipestatManager __all__ = [ "PipestatError", diff --git a/pipestat/argparser.py b/pipestat/argparser.py index f3793056..b24350ac 100644 --- a/pipestat/argparser.py +++ b/pipestat/argparser.py @@ -2,9 +2,10 @@ import argparse import os + from ubiquerg import VersionInHelpParser -from ._version import __version__ +from ._version import __version__ from .const import ENV_VARS, PKG_NAME, STATUS_SCHEMA REPORT_CMD = "report" diff --git a/pipestat/backends/abstract.py b/pipestat/backends/abstract.py index 48db5426..c65e5e63 100644 --- a/pipestat/backends/abstract.py +++ b/pipestat/backends/abstract.py @@ -1,13 +1,13 @@ import os from abc import ABC from logging import getLogger +from typing import Any, Dict, List, Optional, Tuple, Union + from ubiquerg import expandpath -from typing import List, Dict, Any, Optional, Union, Tuple from ..const import PKG_NAME, STATUS -from ..helpers import force_symlink from ..exceptions import SchemaError - +from ..helpers import force_symlink _LOGGER = getLogger(PKG_NAME) diff --git a/pipestat/backends/db_backend/db_helpers.py b/pipestat/backends/db_backend/db_helpers.py index 4addf5fc..0d7b387c 100644 --- a/pipestat/backends/db_backend/db_helpers.py +++ b/pipestat/backends/db_backend/db_helpers.py @@ -2,9 +2,7 @@ from typing import Any, Dict, List, Optional, Union from urllib.parse import quote_plus - -from sqlmodel import and_, or_, Integer, Float, String, Boolean - +from sqlmodel import Boolean, Float, Integer, String, and_, or_ from pipestat.exceptions import MissingConfigDataError diff --git a/pipestat/backends/db_backend/db_parsed_schema.py b/pipestat/backends/db_backend/db_parsed_schema.py index b909b5d6..e324adb4 100644 --- a/pipestat/backends/db_backend/db_parsed_schema.py +++ b/pipestat/backends/db_backend/db_parsed_schema.py @@ -7,11 +7,10 @@ from typing import Any, Dict, List, Mapping, Optional from pydantic import ConfigDict, create_model - - from sqlalchemy import Column, null from sqlalchemy.dialects.postgresql import JSONB from sqlmodel import Field, SQLModel + from pipestat.const import ( CANONICAL_TYPES, CLASSES_BY_TYPE, @@ -22,13 +21,13 @@ PROJECT_NAME, RECORD_IDENTIFIER, SAMPLE_NAME, - STATUS, SCHEMA_DESC_KEY, SCHEMA_ITEMS_KEY, SCHEMA_PROP_KEY, SCHEMA_TYPE_KEY, + STATUS, ) -from pipestat.exceptions import SchemaError, PipestatError +from pipestat.exceptions import PipestatError, SchemaError from pipestat.parsed_schema import ParsedSchema _LOGGER = logging.getLogger(__name__) diff --git a/pipestat/backends/db_backend/dbbackend.py b/pipestat/backends/db_backend/dbbackend.py index 797f5b2e..572192f6 100644 --- a/pipestat/backends/db_backend/dbbackend.py +++ b/pipestat/backends/db_backend/dbbackend.py @@ -1,23 +1,24 @@ import copy import datetime -from logging import getLogger from contextlib import contextmanager -from typing import List, Dict, Any, Optional, Union, NoReturn, Tuple +from logging import getLogger +from typing import Any, Dict, List, NoReturn, Optional, Tuple, Union -from sqlmodel import SQLModel, Session, create_engine, select as sql_select +from sqlmodel import Session, SQLModel, create_engine +from sqlmodel import select as sql_select -from pipestat.backends.db_backend.db_helpers import selection_filter from pipestat.backends.abstract import PipestatBackend +from pipestat.backends.db_backend.db_helpers import selection_filter + +from ...const import CREATED_TIME, MODIFIED_TIME, PKG_NAME, RECORD_IDENTIFIER, STATUS from ...exceptions import ( + ColumnNotFoundError, PipestatDatabaseError, RecordNotFoundError, SchemaError, - ColumnNotFoundError, - UnrecognizedStatusError, SchemaNotFoundError, + UnrecognizedStatusError, ) -from ...const import PKG_NAME, STATUS, RECORD_IDENTIFIER, CREATED_TIME, MODIFIED_TIME - _LOGGER = getLogger(PKG_NAME) diff --git a/pipestat/backends/file_backend/filebackend.py b/pipestat/backends/file_backend/filebackend.py index 8a22de50..e6ae935b 100644 --- a/pipestat/backends/file_backend/filebackend.py +++ b/pipestat/backends/file_backend/filebackend.py @@ -1,25 +1,21 @@ import datetime -import os.path import operator +import os.path from copy import deepcopy from functools import reduce -from itertools import chain -from ...helpers import get_all_result_files - - from glob import glob +from itertools import chain from logging import getLogger -from yacman import FutureYAMLConfigManager as YAMLConfigManager -from yacman import read_lock, write_lock +from typing import Any, Callable, Dict, List, Literal, Optional, Tuple, Union from ubiquerg import create_lock, remove_lock +from yacman import FutureYAMLConfigManager as YAMLConfigManager +from yacman import read_lock, write_lock -from typing import List, Dict, Any, Optional, Union, Literal, Callable, Tuple - -from ...exceptions import UnrecognizedStatusError, PipestatError from ...backends.abstract import PipestatBackend -from ...const import DATE_FORMAT, PKG_NAME, CREATED_TIME, MODIFIED_TIME, META_KEY, HISTORY_KEY - +from ...const import CREATED_TIME, DATE_FORMAT, HISTORY_KEY, META_KEY, MODIFIED_TIME, PKG_NAME +from ...exceptions import PipestatError, UnrecognizedStatusError +from ...helpers import get_all_result_files _LOGGER = getLogger(PKG_NAME) @@ -46,7 +42,7 @@ def __init__( this object method calls :param str pipeline_name: name of pipeline associated with result :param str pipeline_type: "sample" or "project" - :param str parsed_schema: results output schema. Used to construct DB columns. + :param str parsed_schema: results output schema. :param str status_schema: schema containing pipeline statuses e.g. 'running' :param str status_file_dir: directory for placing status flags :param str result_formatter: function for formatting result diff --git a/pipestat/backends/pephub_backend/__init__.py b/pipestat/backends/pephub_backend/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/pipestat/backends/pephub_backend/pephubbackend.py b/pipestat/backends/pephub_backend/pephubbackend.py new file mode 100644 index 00000000..d3df7160 --- /dev/null +++ b/pipestat/backends/pephub_backend/pephubbackend.py @@ -0,0 +1,471 @@ +import copy +from logging import getLogger +from typing import Any, Dict, List, Literal, NoReturn, Optional, Union + +import pephubclient +from pephubclient import PEPHubClient +from pephubclient.constants import RegistryPath +from ubiquerg import parse_registry_path + +from ...backends.abstract import PipestatBackend +from ...const import PKG_NAME, STATUS +from ...exceptions import PipestatPEPHubError, RecordNotFoundError, UnrecognizedStatusError + +_LOGGER = getLogger(PKG_NAME) + + +class PEPHUBBACKEND(PipestatBackend): + def __init__( + self, + record_identifier: Optional[str] = None, + pephub_path: Optional[str] = None, + pipeline_name: Optional[str] = None, + pipeline_type: Optional[str] = None, + parsed_schema: Optional[str] = None, + status_schema: Optional[str] = None, + result_formatter: Optional[staticmethod] = None, + ): + """ + Class representing a PEPHub backend + :param str record_identifier: record identifier to report for. This + creates a weak bound to the record, which can be overridden in + this object method calls + :param str pephub_path: registry path to PEP + :param str pipeline_name: name of pipeline associated with result + :param str pipeline_type: "sample" or "project" + :param str parsed_schema: results output schema. + :param str status_schema: schema containing pipeline statuses e.g. 'running' + :param str result_formatter: function for formatting result + """ + super().__init__(pipeline_type) + + self.phc = PEPHubClient() + self.record_identifier = record_identifier + self.pephub_path = pephub_path + self.pipeline_name = pipeline_name + self.parsed_schema = parsed_schema + self.status_schema = status_schema + self.result_formatter = result_formatter + + if pephubclient.is_registry_path(pephub_path): + + _LOGGER.debug("Initialize PEPHub Backend") + + # Deconstruct registry path so that phc can use it to create/update/delete samples + self.pep_registry = RegistryPath(**parse_registry_path(pephub_path)) + + _LOGGER.debug( + f"Registry namespace: {self.pep_registry.namespace} item: {self.pep_registry.item} tag: {self.pep_registry.tag}" + ) + + else: + raise PipestatPEPHubError(msg=f"Registry path to PEP is invalid: {pephub_path}") + + def check_record_exists( + self, + record_identifier: str, + ) -> bool: + """ + Check if the specified record exists in the table + + :param str record_identifier: record to check for + :return bool: whether the record exists in the table + """ + + query_hit = self.select_records( + filter_conditions=[ + { + "key": "record_identifier", + "operator": "eq", + "value": record_identifier, + } + ] + ) + + return bool(query_hit["records"]) + + def list_results( + self, + restrict_to: Optional[List[str]] = None, + record_identifier: str = None, + ) -> Union[List[str], List[None]]: + """ + Check if the specified results exist in the table + + :param List[str] restrict_to: results identifiers to check for + :param str record_identifier: record to check for + :return List[str] existing: if no result identifier specified, return all results for the record + :return List[str]: results identifiers that exist or an empty list if nothing was found + """ + rid = record_identifier + record = self.select_records( + filter_conditions=[ + { + "key": "record_identifier", + "operator": "eq", + "value": rid, + } + ] + ) + try: + record = record["records"][0][rid] + except IndexError: + return [] + + if restrict_to is None: + return ( + [ + key + for key in self.parsed_schema.results_data.keys() + if getattr(record, key, None) is not None + ] + if record + else [] + ) + else: + return [r for r in restrict_to if record.get(r, None) is not None] if record else [] + + def remove( + self, + record_identifier: Optional[str] = None, + result_identifier: Optional[str] = None, + ) -> bool: + """ + Remove a result. + + If no result ID specified, the entire record + will be removed. + + :param str record_identifier: unique identifier of the record + :param str result_identifier: name of the result to be removed or None + if the record should be removed. + :return bool: whether the result has been removed + """ + + record_identifier = record_identifier or self.record_identifier + + if not self.check_record_exists( + record_identifier=record_identifier, + ): + _LOGGER.error(f"Record '{record_identifier}' not found") + return False + + if result_identifier and not self.check_result_exists( + result_identifier, record_identifier + ): + _LOGGER.error(f"'{result_identifier}' has not been reported for '{record_identifier}'") + return False + + if result_identifier: + values = {result_identifier: ""} + self.phc.sample.update( + namespace=self.pep_registry.namespace, + name=self.pep_registry.item, + tag=self.pep_registry.tag, + sample_name=record_identifier, + sample_dict=values, + ) + return True + else: + self.remove_record( + record_identifier=record_identifier, + rm_record=True, + ) + return True + + def remove_record( + self, + record_identifier: Optional[str] = None, + rm_record: Optional[bool] = False, + ) -> NoReturn: + """ + Remove a record, requires rm_record to be True + + :param str record_identifier: unique identifier of the record + :param bool rm_record: bool for removing record. + :return bool: whether the result has been removed + :raises RecordNotFoundError: if record not found + """ + if rm_record: + self.phc.sample.remove( + namespace=self.pep_registry.namespace, + name=self.pep_registry.item, + tag=self.pep_registry.tag, + sample_name=record_identifier, + ) + else: + _LOGGER.info(f" rm_record flag False, aborting Removing '{record_identifier}' record") + + def report( + self, + values: Dict[str, Any], + record_identifier: Optional[str] = None, + force_overwrite: bool = True, + result_formatter: Optional[staticmethod] = None, + history_enabled: Optional[bool] = False, + ) -> Union[List[str], bool]: + """ + Update the value of a result in a current namespace. + + This method overwrites any existing data and creates the required + hierarchical mapping structure if needed. + + :param history_enabled: this parameter is currently ignored as PEPHub does not support this + :param Dict[str, Any] values: dict of results identifiers and values + to be reported + :param str record_identifier: unique identifier of the record + :param bool force_overwrite: Toggles force overwriting results, defaults to False + :param str result_formatter: function for formatting result + :return bool | list[str] results_formatted: return list of formatted string + """ + if history_enabled: + _LOGGER.warning( + msg="history_enabled set to true but this feature is handled by PEPHub and not Pipestat" + ) + + record_identifier = record_identifier or self.record_identifier + + result_formatter = result_formatter or self.result_formatter + results_formatted = [] + + result_identifiers = list(values.keys()) + + if self.parsed_schema is not None: + self.assert_results_defined( + results=result_identifiers, pipeline_type=self.pipeline_type + ) + + existing = self.list_results( + record_identifier=record_identifier, + restrict_to=result_identifiers, + ) + + if not existing: + + self.phc.sample.create( + namespace=self.pep_registry.namespace, + name=self.pep_registry.item, + tag=self.pep_registry.tag, + sample_name=record_identifier, + sample_dict=values, + overwrite=force_overwrite, + ) + + elif existing: + existing_str = ", ".join(existing) + _LOGGER.warning(f"These results exist for '{record_identifier}': {existing_str}") + if not force_overwrite: + return False + _LOGGER.info(f"Overwriting existing results: {existing_str}") + + self.phc.sample.update( + namespace=self.pep_registry.namespace, + name=self.pep_registry.item, + tag=self.pep_registry.tag, + sample_name=record_identifier, + sample_dict=values, + ) + + for res_id, val in values.items(): + results_formatted.append( + result_formatter( + pipeline_name=self.pipeline_name, + record_identifier=record_identifier, + res_id=res_id, + value=val, + ) + ) + return results_formatted + + def set_status( + self, + status_identifier: str, + record_identifier: str = None, + ) -> None: + """ + Set pipeline run status. + + The status identifier needs to match one of identifiers specified in + the status schema. A basic, ready to use, status schema is shipped with + this package. + + :param str status_identifier: status to set, one of statuses defined + in the status schema + :param str record_identifier: record identifier to set the + pipeline status for + """ + + record_identifier = record_identifier or self.record_identifier + known_status_identifiers = self.status_schema.keys() + if status_identifier not in known_status_identifiers: + raise UnrecognizedStatusError( + f"'{status_identifier}' is not a defined status identifier. " + f"These are allowed: {known_status_identifiers}" + ) + prev_status = self.get_status(record_identifier) + try: + self.report( + values={STATUS: status_identifier}, + record_identifier=record_identifier, + ) + except Exception as e: + _LOGGER.error( + f"Could not insert into the status table ('{self.table_name}'). Exception: {e}" + ) + raise + if prev_status: + _LOGGER.debug(f"Changed status from '{prev_status}' to '{status_identifier}'") + + def get_status(self, record_identifier: str) -> Optional[str]: + """ + Get pipeline status + + :param str record_identifier: record identifier to set the + pipeline status for + :return str status + """ + + try: + result = self.select_records( + columns=[STATUS], + filter_conditions=[ + { + "key": "record_identifier", + "operator": "eq", + "value": record_identifier, + } + ], + ) + except RecordNotFoundError: + return None + try: + status = result["records"][0][record_identifier]["status"] + except IndexError or KeyError: + status = None + + if status == "": # PEPhub returns '' for empty cell + status = None + return status + + def select_records( + self, + columns: Optional[List[str]] = None, + filter_conditions: Optional[List[Dict[str, Any]]] = None, + limit: Optional[int] = 1000, + cursor: Optional[int] = None, + bool_operator: Optional[str] = "AND", + ) -> Dict[str, Any]: + """ + Perform a `SELECT` on the table + + :param list[str] columns: columns to include in the result + :param list[dict] filter_conditions: e.g. [{"key": ["id"], "operator": "eq", "value": 1)], operator list: + - eq for == + - lt for < + - ge for >= + - in for in_ + - like for like + :param int limit: maximum number of results to retrieve per page + :param int cursor: cursor position to begin retrieving records + :param bool bool_operator: Perform filtering with AND or OR Logic. + :return dict records_dict = { + "total_size": int, + "page_size": int, + "next_page_token": int, + "records": List[Dict[{key, Any}]], + } + """ + + if cursor: + _LOGGER.warning("Cursor not supported for PEPHubBackend, ignoring cursor") + + def get_operator(op: Literal["eq", "lt", "ge", "gt", "in"]) -> Any: + """ + Get python operator for a given string + + :param str op: desired operator, "eq", "lt" + :return: operator function + """ + + if op == "eq": + return "==" + if op == "lt": + return "<" + if op == "ge": + return ">=" + if op == "gt": + return ">" + if op == "in": + return "in" + raise ValueError(f"Invalid filter operator: {op}") + + project = self.phc.load_project(project_registry_path=self.pephub_path) + + if columns is not None: + columns = copy.deepcopy(columns) + for i in ["sample_name"]: # PEPHub uses sample_name not record_identifier + if i not in columns: + columns.insert(0, i) + try: + df = project.sample_table[columns] + except KeyError: + records_dict = { + "total_size": 0, + "page_size": limit, + "next_page_token": 0, + "records": [], + } + return records_dict + + else: + df = project.sample_table + + total_count = len(df) + + if filter_conditions: + filter_expression = "" + all_filter_expressions = [] + for filter_condition in filter_conditions: + retrieved_operator = get_operator(filter_condition["operator"]) + if filter_condition["key"] == "record_identifier": + filter_condition["key"] = "sample_name" + + key = filter_condition["key"] + value = filter_condition["value"] + + # Create querry for df based on filter conditions + if isinstance(value, list): + filter_expression = f"{key} {retrieved_operator} {value}" + else: + filter_expression = f"{key} {retrieved_operator} '{value}'" + all_filter_expressions.append(filter_expression) + + if len(all_filter_expressions) > 1: + if bool_operator == "AND": + for filter in all_filter_expressions: + df = df.query(filter) + if bool_operator == "OR": + filter = f"({' | '.join(str(cond) for cond in all_filter_expressions)})" + df = df.query(filter) + + else: + df = df.query(filter_expression) + + print("done") + + # Once we have the dataframe (filtered or unfiltered), convert to a dict using the sample_name/record_identifier as the primary key + df2dict = df.set_index("sample_name").transpose().to_dict(orient="dict") + + # Must do this to align output structure with that of db_backend and file_backends + records_list = [] + for key, value in df2dict.items(): + records_list.append({key: value}) + + records_dict = { + "total_size": total_count, + "page_size": limit, + "next_page_token": 0, + "records": records_list, + } + + return records_dict diff --git a/pipestat/cli.py b/pipestat/cli.py index 5d13f8ac..212a1686 100644 --- a/pipestat/cli.py +++ b/pipestat/cli.py @@ -1,37 +1,29 @@ -import sys import os +import sys from logging import getLogger import logmuse from ubiquerg import expandpath from .argparser import ( - build_argparser, - REPORT_CMD, + HISTORY_CMD, + INIT_CMD, INSPECT_CMD, + LINK_CMD, REMOVE_CMD, + REPORT_CMD, RETRIEVE_CMD, + SERVE_CMD, STATUS_CMD, STATUS_GET_CMD, STATUS_SET_CMD, - INIT_CMD, SUMMARIZE_CMD, - SERVE_CMD, - LINK_CMD, - HISTORY_CMD, -) -from .const import ( - SCHEMA_KEY, - SCHEMA_TYPE_KEY, - CANONICAL_TYPES, - PKG_NAME, -) -from .exceptions import ( - SchemaNotFoundError, - PipestatStartupError, + build_argparser, ) -from .pipestat import PipestatManager, check_dependencies +from .const import CANONICAL_TYPES, PKG_NAME, SCHEMA_KEY, SCHEMA_TYPE_KEY +from .exceptions import PipestatStartupError, SchemaNotFoundError from .helpers import init_generic_config +from .pipestat import PipestatManager, check_dependencies try: from pipestat.pipestatreader.reader import main as readermain diff --git a/pipestat/const.py b/pipestat/const.py index cc2804cb..2e5d42e9 100644 --- a/pipestat/const.py +++ b/pipestat/const.py @@ -6,7 +6,7 @@ # Can be removed when 3.8 is deprecated if int(sys.version.split(".")[1]) < 9: - from typing import List, Dict + from typing import Dict, List list_of_dicts = List[Dict] else: @@ -128,6 +128,7 @@ "string": str, "path": Path, "boolean": bool, + "bool": bool, "file": str, "image": str, "link": str, diff --git a/pipestat/exceptions.py b/pipestat/exceptions.py index a13c2d18..ecb44cbb 100644 --- a/pipestat/exceptions.py +++ b/pipestat/exceptions.py @@ -1,6 +1,7 @@ """ Package exception types """ from typing import Iterable, Optional + from .const import CLASSES_BY_TYPE, ENV_VARS __all__ = [ @@ -20,6 +21,7 @@ "PipestatDependencyError", "ColumnNotFoundError", "SchemaValidationErrorDuringReport", + "PipestatPEPHubError", ] @@ -123,6 +125,13 @@ def __init__(self, msg): super(PipestatDatabaseError, self).__init__(msg) +class PipestatPEPHubError(PipestatError): + """PEPHub backend error""" + + def __init__(self, msg): + super(PipestatPEPHubError, self).__init__(msg) + + class InvalidTypeError(PipestatError): """Type of the reported value is not supported""" diff --git a/pipestat/helpers.py b/pipestat/helpers.py index 9f99b604..5f5a6473 100644 --- a/pipestat/helpers.py +++ b/pipestat/helpers.py @@ -1,25 +1,19 @@ """Assorted project utilities""" -import logging +import errno import glob +import logging import os -import errno - -import jsonschema from json import dumps from pathlib import Path from shutil import make_archive -from typing import Any, Dict, Optional, Tuple, Union, List +from typing import Any, Dict, List, Optional, Tuple, Union +import jsonschema from yaml import dump -from .exceptions import SchemaValidationErrorDuringReport -from .const import ( - PIPESTAT_GENERIC_CONFIG, - SCHEMA_PROP_KEY, - SCHEMA_TYPE_KEY, - CLASSES_BY_TYPE, -) +from .const import CLASSES_BY_TYPE, PIPESTAT_GENERIC_CONFIG, SCHEMA_PROP_KEY, SCHEMA_TYPE_KEY +from .exceptions import SchemaValidationErrorDuringReport _LOGGER = logging.getLogger(__name__) diff --git a/pipestat/parsed_schema.py b/pipestat/parsed_schema.py index 022fb231..ed4ea6c8 100644 --- a/pipestat/parsed_schema.py +++ b/pipestat/parsed_schema.py @@ -4,7 +4,9 @@ import logging from pathlib import Path from typing import Any, Dict, List, Mapping, Optional, Union + import yacman + from .const import ( CANONICAL_TYPES, CLASSES_BY_TYPE, @@ -16,7 +18,6 @@ ) from .exceptions import SchemaError - _LOGGER = logging.getLogger(__name__) diff --git a/pipestat/pipestat.py b/pipestat/pipestat.py index 661109e2..3fd48801 100644 --- a/pipestat/pipestat.py +++ b/pipestat/pipestat.py @@ -1,81 +1,77 @@ -import os import datetime -from logging import getLogger -from copy import deepcopy - +import os from abc import ABC from collections.abc import MutableMapping +from copy import deepcopy +from logging import getLogger +from typing import Any, Dict, Iterator, List, Optional, Union from jsonschema import validate -from yacman import FutureYAMLConfigManager as YAMLConfigManager -from yacman.yacman_future import select_config from ubiquerg import mkabs +from yacman import FutureYAMLConfigManager as YAMLConfigManager from yacman import load_yaml +from yacman.yacman_future import select_config - -from typing import Optional, Union, Dict, Any, List, Iterator - - -from .exceptions import ( - ColumnNotFoundError, - NoBackendSpecifiedError, - InvalidTimeFormatError, - PipestatDependencyError, - PipestatDatabaseError, - RecordNotFoundError, - SchemaNotFoundError, -) from pipestat.backends.file_backend.filebackend import FileBackend -from .reports import HTMLReportBuilder, _create_stats_objs_summaries -from .helpers import ( - validate_type, - default_formatter, - zip_report, - make_subdirectories, -) + from .const import ( - PKG_NAME, - DEFAULT_PIPELINE_NAME, - ENV_VARS, CFG_DATABASE_KEY, - SCHEMA_PATH, - STATUS_SCHEMA, - STATUS_SCHEMA_SOURCE_KEY, - STATUS_SCHEMA_KEY, - STATUS_FILE_DIR, - FILE_KEY, + CFG_SCHEMA, + CONFIG_KEY, + CREATED_TIME, + DATA_KEY, DB_ONLY_KEY, DB_URL, + DEFAULT_PIPELINE_NAME, + ENV_VARS, + FILE_KEY, + MODIFIED_TIME, + MULTI_PIPELINE, + OUTPUT_DIR, PIPELINE_NAME, PIPELINE_TYPE, + PKG_NAME, PROJECT_NAME, RECORD_IDENTIFIER, RESULT_FORMATTER, - MULTI_PIPELINE, - OUTPUT_DIR, - CREATED_TIME, - MODIFIED_TIME, - CFG_SCHEMA, - CONFIG_KEY, - SCHEMA_KEY, SAMPLE_NAME_ID_KEY, - DATA_KEY, + SCHEMA_KEY, + SCHEMA_PATH, + STATUS_FILE_DIR, + STATUS_SCHEMA, + STATUS_SCHEMA_KEY, + STATUS_SCHEMA_SOURCE_KEY, ) +from .exceptions import ( + ColumnNotFoundError, + InvalidTimeFormatError, + NoBackendSpecifiedError, + PipestatDatabaseError, + PipestatDependencyError, + RecordNotFoundError, + SchemaNotFoundError, +) +from .helpers import default_formatter, make_subdirectories, validate_type, zip_report +from .reports import HTMLReportBuilder, _create_stats_objs_summaries try: - from pipestat.backends.db_backend.db_parsed_schema import ( - ParsedSchemaDB as ParsedSchema, - ) + from pipestat.backends.db_backend.db_parsed_schema import ParsedSchemaDB as ParsedSchema except ImportError: from .parsed_schema import ParsedSchema try: - from pipestat.backends.db_backend.dbbackend import DBBackend from pipestat.backends.db_backend.db_helpers import construct_db_url + from pipestat.backends.db_backend.dbbackend import DBBackend except ImportError: # We let this pass, but if the user attempts to create DBBackend, check_dependencies raises exception. pass +try: + from pipestat.backends.pephub_backend.pephubbackend import PEPHUBBACKEND +except ImportError: + # Let this pass, if phc dependencies cannot be imported, raise exception + pass + _LOGGER = getLogger(PKG_NAME) @@ -138,6 +134,7 @@ def __init__( result_formatter: staticmethod = default_formatter, multi_pipelines: bool = False, output_dir: Optional[str] = None, + pephub_path: Optional[str] = None, ): """ Initialize the PipestatManager object @@ -169,7 +166,7 @@ def __init__( # Load and validate database configuration # If results_file_path exists, backend is a file else backend is database. - + self.cfg["pephub_path"] = pephub_path self.cfg["config_path"] = select_config(config_file, ENV_VARS["config"]) if config_dict is not None: @@ -247,6 +244,8 @@ def __init__( if self.cfg[FILE_KEY]: self.initialize_filebackend(record_identifier, results_file_path, flag_file_dir) + elif pephub_path: + self.initialize_pephubbackend(record_identifier, pephub_path) else: self.initialize_dbbackend(record_identifier, show_db_logs) @@ -350,7 +349,19 @@ def resolve_results_file_path(self, results_file_path): return results_file_path return results_file_path - def initialize_filebackend(self, record_identifier, results_file_path, flag_file_dir): + def initialize_filebackend( + self, + record_identifier: str = None, + results_file_path: str = None, + flag_file_dir: str = None, + ): + """ + Initializes the file backend + :param str record_identifier: the record identifier + :param str results_file_path: the path to the results file used for the backend + :param str flag_file_dir: the path to the flag file directory + """ + # Check if there will be multiple results_file_paths _LOGGER.debug(f"Determined file as backend: {results_file_path}") @@ -383,11 +394,32 @@ def initialize_filebackend(self, record_identifier, results_file_path, flag_file return + def initialize_pephubbackend(self, record_identifier: str = None, pephub_path: str = None): + """ + Initializes the pephub backend + :param str record_identifier: the record identifier + :param str pephub_path: the path to the pephub registry + """ + self.backend = PEPHUBBACKEND( + record_identifier, + pephub_path, + self.cfg[PIPELINE_NAME], + self.cfg[PIPELINE_TYPE], + self.cfg[SCHEMA_KEY], + self.cfg[STATUS_SCHEMA_KEY], + self.cfg[RESULT_FORMATTER], + ) + @check_dependencies( dependency_list=["DBBackend"], msg="Missing required dependencies for this usage, e.g. try pip install pipestat['dbbackend']", ) - def initialize_dbbackend(self, record_identifier, show_db_logs): + def initialize_dbbackend(self, record_identifier: str = None, show_db_logs: bool = False): + """ + Initializes the database backend + :param str record_identifier: the record identifier + :param bool show_db_logs: boolean to show_db_logs + """ _LOGGER.debug("Determined database as backend") if self.cfg[SCHEMA_KEY] is None: raise SchemaNotFoundError("Output schema must be supplied for DB backends.") @@ -471,6 +503,9 @@ def list_recent_results( :return dict results: a dict containing start, end, num of records, and list of retrieved records """ + if self.cfg["pephub_path"]: + _LOGGER.warning(f"List recent results not supported for PEPHub backend") + return {} date_format = "%Y-%m-%d %H:%M:%S" if start is None: start = datetime.datetime.now() @@ -790,6 +825,9 @@ def retrieve_history( _LOGGER.warning(f"No history available for Record: {record_identifier}") return {} + elif self.cfg["pephub_path"]: + _LOGGER.warning(f"Retrieving history not supported for PEPHub backend") + return None else: if result_identifier: history = self.backend.retrieve_history_db(record_identifier, result_identifier)[ @@ -860,12 +898,16 @@ def set_status( self.backend.set_status(status_identifier, r_id) @require_backend - def link(self, link_dir) -> str: + def link(self, link_dir) -> Union[str, None]: """ This function creates a link structure such that results are organized by type. :param str link_dir: path to desired symlink output directory - :return str linked_results_path: path to symlink directory + :return str | None linked_results_path: path to symlink directory or None """ + if self.cfg["pephub_path"]: + _LOGGER.warning(f"Linking results is not supported for PEPHub backend.") + return None + self.check_multi_results() linked_results_path = self.backend.link(link_dir=link_dir) @@ -877,7 +919,7 @@ def summarize( looper_samples: Optional[list] = None, amendment: Optional[str] = None, portable: Optional[bool] = False, - ) -> None: + ) -> Union[str, None]: """ Builds a browsable html report for reported results. :param Iterable[str] looper_samples: list of looper Samples from PEP @@ -886,6 +928,11 @@ def summarize( :return str: report_path """ + if self.cfg["pephub_path"]: + _LOGGER.warning( + f"Summarize not supported for PEPHub backend. Please generate report via PEPHub website." + ) + return None self.check_multi_results() @@ -904,11 +951,7 @@ def summarize( def check_multi_results(self): # Check to see if the user used a path with "{record-identifier}" if self.file: - # TODO this needs rework: remove self.cfg["unresolved_result_path"] and just use self.file - if ( - "{record_identifier}" in self.file - or self.cfg["unresolved_result_path"] != self.file - ): + if "{record_identifier}" in self.cfg["unresolved_result_path"]: # assume there are multiple result files in sub-directories self.cfg["multi_result_files"] = True results_directory = self.cfg["unresolved_result_path"].split( diff --git a/pipestat/pipestatreader/reader.py b/pipestat/pipestatreader/reader.py index e384fc86..a10973d2 100644 --- a/pipestat/pipestatreader/reader.py +++ b/pipestat/pipestatreader/reader.py @@ -1,14 +1,14 @@ -import fastapi -import os import logging +import os +from typing import List, Optional, Tuple, Union + +import fastapi import uvicorn -from typing import Optional, List, Union, Tuple +from pydantic import BaseModel from pipestat import SamplePipestatManager from pipestat.exceptions import RecordNotFoundError from pipestat.reports import fetch_pipeline_results -from pydantic import BaseModel - _LOGGER = logging.getLogger(__name__) diff --git a/pipestat/reports.py b/pipestat/reports.py index feb80f02..4f7a6109 100644 --- a/pipestat/reports.py +++ b/pipestat/reports.py @@ -1,39 +1,37 @@ """ Generate HTML reports """ -import shutil - -import jinja2 -import os -import pandas as _pd -import sys import csv -import yaml import glob - +import os +import shutil +import sys +from copy import deepcopy from datetime import timedelta -from eido import read_schema from json import dumps from logging import getLogger -from peppy.const import AMENDMENTS_KEY from typing import List -from copy import deepcopy +import jinja2 +import pandas as _pd +import yaml +from eido import read_schema +from peppy.const import AMENDMENTS_KEY from ubiquerg import mkabs from ._version import __version__ from .const import ( - PIPELINE_NAME, - PKG_NAME, - OUTPUT_DIR, - OBJECT_TYPES, BUTTON_APPEARANCE_BY_FLAG, + FILE_KEY, NO_DATA_PLACEHOLDER, + OBJECT_TYPES, + OUTPUT_DIR, + PIPELINE_NAME, PIPELINE_TYPE, - PROJECT_NAME, - TEMPLATES_DIRNAME, + PKG_NAME, PROFILE_COLNAMES, + PROJECT_NAME, STATUS_FILE_DIR, - FILE_KEY, + TEMPLATES_DIRNAME, ) from .helpers import make_subdirectories @@ -114,6 +112,12 @@ def __call__( self.create_index_html(navbar, self.create_footer()) return self.index_html_path + def _reset_pipeline_type(self): + """ + The report logic will set the pipeline type when multi results is used. It must be reset or it causes issues. + """ + self.prj.backend.pipeline_type = self.prj.pipeline_type + def _create_copy_for_porting(self, parent_path: str, record_identifier: str) -> str: """ Helper function that assists with copying images (pdfs) @@ -212,6 +216,7 @@ def create_sample_parent_html(self, navbar, footer): pages.append(page_relpath) labels.append(sample_name) + self._reset_pipeline_type() template_vars = dict( navbar=navbar, footer=footer, @@ -481,6 +486,7 @@ def create_object_htmls(self, navbar, footer): html_page_path, render_jinja_template("object.html", self.jinja_env, args=template_vars), ) + self._reset_pipeline_type() def create_glossary_html(self, glossary_table, navbar, footer): template_vars = dict( @@ -755,7 +761,7 @@ def create_index_html(self, navbar, footer): table_cell_data = [[rel_sample_html, sample_name]] table_cell_data += list(sorted_sample_stat_results.values()) table_row_data.append(table_cell_data) - + self._reset_pipeline_type() # Create parent samples page with links to each sample save_html( path=os.path.join(self.pipeline_reports, "records.html"), @@ -921,7 +927,7 @@ def create_project_objects(self): if "description" in self.prj.result_schemas[image_result] else "No description in schema" ) - + self._reset_pipeline_type() template_vars = dict(figures=figures, links=links) return render_jinja_template("project_object.html", self.jinja_env, template_vars) @@ -962,6 +968,7 @@ def _stats_to_json_str(self): inclusion_fun=lambda x: x not in OBJECT_TYPES, casting_fun=str, ) + self._reset_pipeline_type() return dumps(results) def _get_navbar_dropdown_data_objects(self, objs, wd, context): @@ -995,7 +1002,7 @@ def _get_navbar_dropdown_data_samples(self, wd, context): ) relpaths.append(_make_relpath(page_name, wd, context)) sample_names.append(sample_name) - + self._reset_pipeline_type() return relpaths, sample_names @@ -1282,6 +1289,8 @@ def _warn(what, e, sn): times.append(NO_DATA_PLACEHOLDER) mems.append(NO_DATA_PLACEHOLDER) + project.backend.pipeline_type = project.pipeline_type + template_vars = dict( sample_names=sample_names, log_paths=log_paths, @@ -1439,7 +1448,7 @@ def _create_stats_objs_summaries(prj, pipeline_name: str) -> List[str]: reported_stats.append(v) stats.append(reported_stats) - + prj.backend.pipeline_type = prj.pipeline_type # Stats File tsv_outfile_path = get_file_for_table(prj, pipeline_name, "stats_summary.tsv") stats.insert(0, columns) diff --git a/requirements/requirements-pephub-backend.txt b/requirements/requirements-pephub-backend.txt new file mode 100644 index 00000000..51564440 --- /dev/null +++ b/requirements/requirements-pephub-backend.txt @@ -0,0 +1 @@ +pephubclient>=0.4.2 \ No newline at end of file diff --git a/requirements/requirements-test.txt b/requirements/requirements-test.txt index 0383dfbd..fbfe980b 100644 --- a/requirements/requirements-test.txt +++ b/requirements/requirements-test.txt @@ -17,5 +17,6 @@ uvicorn fastapi coverage smokeshow +pephubclient diff --git a/tests/conftest.py b/tests/conftest.py index 1476e457..6be8c349 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,12 +1,13 @@ """Test fixtures and helpers to make widely available in the package""" import os -import pytest import subprocess +from atexit import register -from pipestat.const import STATUS_SCHEMA +import pytest from yacman import load_yaml -from atexit import register + +from pipestat.const import STATUS_SCHEMA REC_ID = "constant_record_id" BACKEND_KEY_DB = "db" @@ -22,6 +23,8 @@ """ STANDARD_TEST_PIPE_ID = "default_pipeline_name" +PEPHUB_URL = "databio/pipestat_demo:default" + try: subprocess.check_output( "docker inspect pipestat_test_db --format '{{.State.Status}}'", shell=True diff --git a/tests/test_db_only_mode.py b/tests/test_db_only_mode.py index 28f316cf..af648f20 100644 --- a/tests/test_db_only_mode.py +++ b/tests/test_db_only_mode.py @@ -2,13 +2,11 @@ from pipestat import SamplePipestatManager from pipestat.const import * -from .conftest import DB_URL -from .conftest import SERVICE_UNAVAILABLE, DB_DEPENDENCIES +from .conftest import DB_DEPENDENCIES, DB_URL, SERVICE_UNAVAILABLE try: from sqlmodel import SQLModel, create_engine - from sqlmodel.main import default_registry except ModuleNotFoundError: pass diff --git a/tests/test_init.py b/tests/test_init.py index 80cb5f12..3ec18956 100644 --- a/tests/test_init.py +++ b/tests/test_init.py @@ -1,17 +1,16 @@ -from tempfile import mkdtemp +import os +from tempfile import NamedTemporaryFile, TemporaryDirectory, mkdtemp import pytest -import os from yaml import dump -from pipestat import PipestatManager, SamplePipestatManager, ProjectPipestatManager +from pipestat import PipestatManager, ProjectPipestatManager, SamplePipestatManager +from pipestat.const import PIPESTAT_GENERIC_CONFIG, SCHEMA_KEY from pipestat.exceptions import * -from pipestat.parsed_schema import SCHEMA_PIPELINE_NAME_KEY -from tempfile import NamedTemporaryFile, TemporaryDirectory -from .conftest import STANDARD_TEST_PIPE_ID, DB_DEPENDENCIES -from .conftest import SERVICE_UNAVAILABLE from pipestat.helpers import init_generic_config -from pipestat.const import PIPESTAT_GENERIC_CONFIG, SCHEMA_KEY +from pipestat.parsed_schema import SCHEMA_PIPELINE_NAME_KEY + +from .conftest import DB_DEPENDENCIES, SERVICE_UNAVAILABLE, STANDARD_TEST_PIPE_ID @pytest.mark.skipif(not DB_DEPENDENCIES, reason="Requires dependencies") diff --git a/tests/test_parsed_schema.py b/tests/test_parsed_schema.py index 7c636112..c1c2cf96 100644 --- a/tests/test_parsed_schema.py +++ b/tests/test_parsed_schema.py @@ -3,17 +3,16 @@ from functools import partial from pathlib import Path from typing import * + import pytest import yaml -from pipestat.const import SAMPLE_NAME, STATUS, RECORD_IDENTIFIER + +from pipestat.const import RECORD_IDENTIFIER, SAMPLE_NAME, STATUS from pipestat.exceptions import SchemaError, SchemaValidationErrorDuringReport -from pipestat.parsed_schema import ( - NULL_MAPPING_VALUE, - ParsedSchema, - SCHEMA_PIPELINE_NAME_KEY, -) -from .conftest import COMMON_CUSTOM_STATUS_DATA, DEFAULT_STATUS_DATA, get_data_file_path from pipestat.helpers import validate_type +from pipestat.parsed_schema import NULL_MAPPING_VALUE, SCHEMA_PIPELINE_NAME_KEY, ParsedSchema + +from .conftest import COMMON_CUSTOM_STATUS_DATA, DEFAULT_STATUS_DATA, get_data_file_path TEMP_SCHEMA_FILENAME = "schema.tmp.yaml" diff --git a/tests/test_pipestat.py b/tests/test_pipestat.py index 0deb25dc..c9fd17bf 100644 --- a/tests/test_pipestat.py +++ b/tests/test_pipestat.py @@ -2,31 +2,33 @@ import os.path import time from collections.abc import Mapping -from yacman import YAMLConfigManager +from tempfile import NamedTemporaryFile, TemporaryDirectory +import pephubclient.exceptions import pytest from jsonschema import ValidationError +from yacman import YAMLConfigManager -from pipestat import SamplePipestatManager, ProjectPipestatManager, PipestatBoss, PipestatManager +from pipestat import PipestatBoss, PipestatManager, ProjectPipestatManager, SamplePipestatManager +from pipestat.cli import main from pipestat.const import * from pipestat.exceptions import * -from pipestat.parsed_schema import ParsedSchema from pipestat.helpers import default_formatter, markdown_formatter -from pipestat.cli import main +from pipestat.parsed_schema import ParsedSchema + from .conftest import ( - get_data_file_path, BACKEND_KEY_DB, BACKEND_KEY_FILE, COMMON_CUSTOM_STATUS_DATA, - DEFAULT_STATUS_DATA, - STANDARD_TEST_PIPE_ID, - SERVICE_UNAVAILABLE, + DB_DEPENDENCIES, DB_URL, + DEFAULT_STATUS_DATA, + PEPHUB_URL, REC_ID, - DB_DEPENDENCIES, + SERVICE_UNAVAILABLE, + STANDARD_TEST_PIPE_ID, + get_data_file_path, ) -from tempfile import NamedTemporaryFile, TemporaryDirectory - from .test_db_only_mode import ContextManagerDBTesting CONST_REC_ID = "constant_record_id" @@ -2513,3 +2515,188 @@ def test_select_history_complex_objects( assert len(history_result.keys()) == 1 assert "output_image" in history_result assert len(history_result["output_image"].keys()) == 2 + + +@pytest.mark.skip(reason="requires pephub login to function") +class TestPEPHUBBackend: + """ + THESE TESTS WILL FAIL IF YOU ARE NOT SIGNED IN TO PEPHUB + + use `phc login` to sign in. + """ + + @pytest.mark.parametrize( + ["rec_id", "val"], + [ + ( + "test_pipestat_01", + { + "name_of_something": "test_name", + "number_of_things": 42, + "md5sum": "example_md5sum", + "percentage_of_things": 10, + }, + ), + ( + "test_pipestat_02", + { + "name_of_something": "test_name_02", + "number_of_things": 52, + "md5sum": "example_md5sum_02", + "percentage_of_things": 30, + }, + ), + ], + ) + def test_pephub_backend_report( + self, + rec_id, + val, + config_file_path, + schema_file_path, + results_file_path, + range_values, + ): + + psm = PipestatManager(pephub_path=PEPHUB_URL, schema_path=schema_file_path) + + # Value already exists should give an error unless forcing overwrite + + # force overwrite defaults to true, so it should have no problem reporting + psm.report(record_identifier=rec_id, values=val) + + print("done") + + @pytest.mark.parametrize( + ["rec_id", "val"], + [ + ("test_pipestat_01", {"name_of_something": "test_name"}), + ], + ) + def test_pephub_backend_retrieve_one( + self, + rec_id, + val, + config_file_path, + schema_file_path, + results_file_path, + range_values, + ): + + psm = PipestatManager(pephub_path=PEPHUB_URL, schema_path=schema_file_path) + + result = psm.retrieve_one(record_identifier=rec_id) + + assert len(result.keys()) == 1 + + def test_pephub_backend_retrieve_many( + self, + config_file_path, + schema_file_path, + results_file_path, + range_values, + ): + + rec_ids = ["test_pipestat_01", "test_pipestat_02"] + + psm = PipestatManager(pephub_path=PEPHUB_URL, schema_path=schema_file_path) + + results = psm.retrieve_many(record_identifiers=rec_ids) + + assert len(results["records"]) == 2 + + def test_set_status_pephub_backend( + self, + config_file_path, + schema_file_path, + results_file_path, + range_values, + ): + rec_ids = ["test_pipestat_01"] + + psm = PipestatManager(pephub_path=PEPHUB_URL, schema_path=schema_file_path) + + result = psm.set_status(record_identifier=rec_ids[0], status_identifier="completed") + + assert result is None + + def test_get_status_pephub_backend( + self, + config_file_path, + schema_file_path, + results_file_path, + range_values, + ): + rec_ids = ["sample1", "test_pipestat_01"] + + psm = PipestatManager(pephub_path=PEPHUB_URL, schema_path=schema_file_path) + + result = psm.get_status(record_identifier=rec_ids[0]) + + assert result is None + + result = psm.get_status(record_identifier=rec_ids[1]) + + assert result == "completed" + + def test_pephub_backend_remove( + self, + config_file_path, + schema_file_path, + results_file_path, + range_values, + ): + + rec_ids = ["test_pipestat_01"] + + psm = PipestatManager(pephub_path=PEPHUB_URL, schema_path=schema_file_path) + + results = psm.remove(record_identifier=rec_ids[0], result_identifier="name_of_something") + + assert results is True + + def test_pephub_backend_remove_record( + self, + config_file_path, + schema_file_path, + results_file_path, + range_values, + ): + + rec_ids = ["test_pipestat_01"] + + psm = PipestatManager(pephub_path=PEPHUB_URL, schema_path=schema_file_path) + + results = psm.remove_record(record_identifier=rec_ids[0], rm_record=False) + + results = psm.remove_record(record_identifier=rec_ids[0], rm_record=True) + + def test_pephub_unsupported_funcs( + self, + config_file_path, + schema_file_path, + results_file_path, + range_values, + ): + + rec_ids = ["test_pipestat_01"] + + psm = PipestatManager(pephub_path=PEPHUB_URL, schema_path=schema_file_path) + + results = psm.retrieve_history(record_identifier=rec_ids[0]) + + assert results is None + + psm.link("somedir") + psm.list_recent_results() + psm.summarize() + + def test_pephub_unsupported_funcs( + self, + config_file_path, + schema_file_path, + results_file_path, + range_values, + ): + with pytest.raises(PipestatPEPHubError): + psm = PipestatManager(pephub_path="bogus_path", schema_path=schema_file_path) diff --git a/tests/test_status.py b/tests/test_status.py index 27ad7c72..26e0f9cd 100644 --- a/tests/test_status.py +++ b/tests/test_status.py @@ -1,22 +1,22 @@ """Tests for pipestat's status checking/management functionality""" import os +from tempfile import NamedTemporaryFile + import pytest from pipestat import SamplePipestatManager +from pipestat.const import FILE_KEY, STATUS_FILE_DIR +from pipestat.exceptions import UnrecognizedStatusError -from pipestat.const import STATUS_FILE_DIR, FILE_KEY from .conftest import ( BACKEND_KEY_DB, BACKEND_KEY_FILE, + DB_DEPENDENCIES, DB_URL, SERVICE_UNAVAILABLE, - DB_DEPENDENCIES, ) - from .test_db_only_mode import ContextManagerDBTesting -from pipestat.exceptions import UnrecognizedStatusError -from tempfile import NamedTemporaryFile @pytest.mark.skipif(not DB_DEPENDENCIES, reason="Requires dependencies")