diff --git a/dev/CB_extension_remod.ipynb b/dev/CB_extension_remod.ipynb index 3d3be01..c2aa908 100644 --- a/dev/CB_extension_remod.ipynb +++ b/dev/CB_extension_remod.ipynb @@ -52,942 +52,15 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "### generally useful functions\n", - "\n", - "-> maybe something for refinegems \n", - "--> where to put them, if yes" - ] - }, - { - "cell_type": "code", - "execution_count": 158, - "metadata": {}, - "outputs": [], - "source": [ - "# put in entities in refinegems\n", - "def make_reaction_annotation_dict(model:cobra.Model, db:Literal['KEGG','BiGG']) -> dict:\n", - " \"\"\"Create a dictionary of a model's reaction IDs and a choosen database ID as\n", - " saved in the annotations of the model.\n", - " The database ID can be choosen based on the strings for the namespace options\n", - " in other functions.\n", - "\n", - " Args:\n", - " model (cobra.Model): A model loaded with COBRApy.\n", - " db (Literal['KEGG','BiGG']): The string denoting the database to map to.\n", - "\n", - " Raises:\n", - " ValueError: Unknown database string for paramezer db\n", - "\n", - " Returns:\n", - " dict: The mapping of the reaction IDs to the database IDs found in the annotations\n", - " \"\"\"\n", - "\n", - " react_dict = {}\n", - "\n", - " match db:\n", - " case 'KEGG':\n", - " db_string = 'kegg.reaction'\n", - " case 'BiGG':\n", - " db_string = 'bigg.reaction'\n", - " case _:\n", - " mes = f'Unknown database string for parameter db: {db}'\n", - " raise ValueError(mes)\n", - "\n", - " for r in model.reactions:\n", - " if db_string in r.annotation.keys():\n", - " react_dict[r.id] = r.annotation[db_string]\n", - " else:\n", - " react_dict[r.id] = '-'\n", - "\n", - " return react_dict\n", - "\n", - "\n", - "\n", - "def create_random_id(model:cobra.Model, entity_type:Literal['reac','meta']='reac', prefix:str='') -> str:\n", - " \"\"\"Generate a unique, random ID for a model entity for a model.\n", - "\n", - " Args:\n", - " model (cobra.Model): A model loaded with COBRApy.\n", - " entity_type (Literal['reac','meta'], optional): Type of model entity. \n", - " Can be 'reac' for Reaction or 'meta' for Metabolite.\n", - " Defaults to 'reac'.\n", - " prefix (str, optional): prefix to set for the randomised part.\n", - " Useful to identify the random IDs later on. \n", - " Defaults to ''.\n", - "\n", - " Raises:\n", - " ValueError: Unknown entity_type\n", - "\n", - " Returns:\n", - " str: The generate new and unique ID.\n", - " \"\"\"\n", - "\n", - " match entity_type:\n", - " case 'reac':\n", - " all_ids = [_.id for _ in model.reactions]\n", - " case 'meta':\n", - " all_ids = [_.id for _ in model.metabolites]\n", - " case _:\n", - " mes = f'Unkown entity_type: {entity_type}'\n", - " raise ValueError(mes)\n", - "\n", - " prefix = f'{prefix}{entity_type}'\n", - " var = ''.join(choice(ascii_uppercase + digits) for i in range(6))\n", - " label = prefix + var\n", - " j = 6\n", - " \n", - " while True:\n", - " \n", - " for i in range(36**6): # make sure it does not run endlessly\n", - " if label in all_ids:\n", - " label = prefix + ''.join(choice(ascii_uppercase + digits) for x in range(j))\n", - " else:\n", - " return label\n", - " \n", - " j = j + 1\n", - "\n", - "\n", - "# @TODO: \n", - "# more namespace options\n", - "def match_id_to_namespace(model_entity:[cobra.Reaction, cobra.Metabolite], namespace:Literal['BiGG']) -> None:\n", - " \"\"\"Based on a given namespace, change the ID of a given model entity to it the set namespace.\n", - "\n", - " Currently working namespaces:\n", - " - BiGG \n", - "\n", - " Args:\n", - " model_entity (cobra.Reaction, cobra.Metabolite]): The model entity. \n", - " Can be either a cobra.Reaction or cobra.Metabolite object.\n", - " namespace (Literal['BiGG']): The chosen namespace.\n", - "\n", - " Raises:\n", - " ValueError: Unknown input for namespace\n", - " TypeError: Unknown type for model_entity\n", - " \"\"\"\n", - "\n", - " match model_entity:\n", - "\n", - " # Reaction\n", - " # --------\n", - " case cobra.Reaction():\n", - " match namespace:\n", - "\n", - " case 'BiGG':\n", - " if 'bigg.reaction' in model_entity.annotation.keys():\n", - " # @TODO : currently takes first entry is annotation is list\n", - " model_entity.id = model_entity.annotation['bigg.reaction'] if isinstance(model_entity.annotation['bigg.reaction'],str) else model_entity.annotation['bigg.reaction'][0]\n", - "\n", - " case _:\n", - " mes = f'Unknown input for namespace: {namespace}'\n", - " raise ValueError(mes)\n", - " \n", - " # Metabolite\n", - " # ----------\n", - " case cobra.Metabolite():\n", - " match namespace:\n", - "\n", - " case 'BiGG':\n", - " if 'bigg.metabolite' in model_entity.annotation.keys():\n", - " model_entity.id = model_entity.annotation['bigg.metabolite'] + '_' + model_entity.compartment if isinstance(model_entity.annotation['bigg.metabolite'],str) else model_entity.annotation['bigg.metabolite'][0]\n", - "\n", - " case _:\n", - " mes = f'Unknown input for namespace: {namespace}'\n", - " raise ValueError(mes)\n", - " # Error\n", - " # -----\n", - " case _:\n", - " mes = f'Unknown type for model_entity: {type(model_entity)}'\n", - " raise TypeError(mes)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### reworking functions for extension\n", - "\n", - "#### mapping" + "### reworking functions for extension\n" ] }, { "cell_type": "code", - "execution_count": 21, - "metadata": {}, - "outputs": [], - "source": [ - "# @NOTE changed\n", - "def map_BiGG_reactions_row(row, namespace):\n", - " \"\"\"Map a single entry from the table in map_BiGG_reactions() to the BiGG reaction namespace.\n", - "\n", - " :param row: A single row of the table.\n", - " :type row: pd.Series\n", - " :param namespace: The BiGG reaction namespace table.\n", - " :type namespace: pd.DataFrame\n", - " :returns: The edited row.\n", - " :rtype: pd.Series\n", - " \"\"\"\n", - "\n", - " \"\"\"\n", - " @TODO\n", - " NOTE: only works for cases, where KEGG.reaction in row contains EXACTLY one entry\n", - " in the rare case that multiple reactions belong to one enzyme, they are omitted\n", - " in this search\n", - " \"\"\"\n", - "\n", - " # match by EC number AND KEGG id\n", - " matches = namespace.loc[namespace['EC Number'].str.contains(row['EC number']) & namespace['KEGG Reaction'].str.contains(row['KEGG.reaction'])]\n", - "\n", - " # case 1 : no matches\n", - " if matches.empty:\n", - " return row\n", - "\n", - " # case 2 : exactly one match\n", - " elif len(matches) == 1:\n", - " row['bigg_id'] = matches['id'].values[0]\n", - "\n", - " # case 3 : multiple matches\n", - " # often due to reaction being in different compartments\n", - " else:\n", - " row['bigg_id'] = ' '.join(matches['id'].values)\n", - "\n", - " return row\n", - "\n", - "\n", - "# @TEST : fitted to refinegems\n", - "# @CHECK : connections, e.g. input is now a param short \n", - "def map_BiGG_reactions(table_file):\n", - " \"\"\"Map the output of map_to_KEGG() to a BiGG namespace file (rewritten-type, see auxilliaries).\n", - "\n", - " :param table_file: The path to the saved table from running map_to_KEGG().\n", - " :type table_file: string\n", - " :returns: The table with an additional column for the mapping to BiGG reactions.\n", - " :rtype: pd.DataFrame\n", - " \"\"\"\n", - "\n", - " r_namespace = load_a_table_from_database('bigg_reactions', False)\n", - "\n", - " table = pd.read_csv(table_file)\n", - " table['bigg_id'] = pd.Series(dtype='str')\n", - "\n", - " table = table.apply(lambda row: map_BiGG_reactions_row(row,r_namespace), axis=1)\n", - "\n", - " return table\n" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "#### actual extension" - ] - }, - { - "cell_type": "code", - "execution_count": 73, + "execution_count": null, "metadata": {}, "outputs": [], - "source": [ - "# @TODO\n", - "def isreaction_complete(reac, exclude_dna=True, exclude_rna=True):\n", - " \"\"\"Check, if a reaction is complete and ready to be added to the model.\n", - " Additionally, it is possible to check for DNA and RNA reations\n", - " and set them to be excluded or included.\n", - "\n", - " :param reac: The reaction to be checked.\n", - " :type reac: cobra.Reaction\n", - " :param exclude_dna: Tag to include or exclude DNA reactions.\n", - " :type exclude_dna: bool, default is True.\n", - " :param exclude_rna: Tag to include or exclude RNA reactions.\n", - " :type exclude_rna: bool, default is True.\n", - " :returns: True if the check is successful, else false.\n", - " :rtype: bool\n", - " \"\"\"\n", - "\n", - " # ................\n", - " # @TODO\n", - " # extendable\n", - " # ................\n", - "\n", - " # check reaction\n", - " if exclude_dna and 'DNA' in reac.name:\n", - " return False\n", - " if exclude_rna and 'RNA' in reac.name:\n", - " return False\n", - "\n", - " # check metabolites\n", - " for m in reac.metabolites:\n", - " if m.id == '' or pd.isnull(m.id):\n", - " return False\n", - " if m.name == '' or pd.isnull(m.name):\n", - " return False\n", - " if m.formula == '' or pd.isnull(m.formula):\n", - " return False\n", - "\n", - " return True\n", - "\n", - "\n", - "# @TODO\n", - "# @DOCS wrong\n", - "# UNDER CONSTRUCTION\n", - "def build_metabolite_mnx(metabolite, model, mnx_chem_prop, mnx_chem_xref, bigg_metabolites, namespace):\n", - " \"\"\"Create or retrieve (from model) a metabolite based on its MetaNetX ID.\n", - "\n", - " :param metabolite: The MetaNetX ID of the metabolite.\n", - " :type metabolite: string\n", - " :param model: The underlying genome-scale metabolic model.\n", - " :type model: cobra.model\n", - " :param mnx_chem_xref: The chem_xref table from MetaNetX\n", - " :type mnx_chem_xref: pd.DataFrame\n", - " :param mnx_chem_prop: The chem_prop table from MetaNetX\n", - " :type mnx_chem_prop: pd.DataFrame\n", - " :param bigg_metabolites: The BiGG compound namespace table.\n", - " :type bigg_metabolites: pd.DataFrame\n", - " :returns: The retrieved or newly build metabolite.\n", - " :rtype: cobra.Metabolite\n", - " \"\"\"\n", - "\n", - " metabolite_prop = mnx_chem_prop[mnx_chem_prop['ID']==metabolite]\n", - " metabolite_anno = mnx_chem_xref[mnx_chem_xref['ID']==metabolite]\n", - " model_mnx = [x.annotation['metanetx.chemical'] for x in model.metabolites if 'metanetx.chemical' in x.annotation]\n", - "\n", - " # fast check if compound already in model\n", - " # ------------------------------------------\n", - " # @TODO ..........................................\n", - " # currently no checking for compartments\n", - " # first match will be taken (most often cytosol one)\n", - " # regardless of the compartment\n", - " #.............................................\n", - " # step 1: check if MetaNetX ID in model\n", - " if metabolite in model_mnx:\n", - " matches = [x.id for x in model.metabolites if 'metanetx.chemical' in x.annotation and x.annotation['metanetx.chemical']==metabolite]\n", - "\n", - " # step 2: if yes, retrieve metabolite from model\n", - " # case 1: multiple matches found\n", - " if len(matches) > 1:\n", - " # ................\n", - " # @TODO see above\n", - " # ................\n", - " match = model.metabolites.get_by_id(matches[0])\n", - " # case 2: only one match found\n", - " else:\n", - " match = model.metabolites.get_by_id(matches[0])\n", - "\n", - " # step 3: add metabolite\n", - " return match\n", - "\n", - " # if not, create new metabolite\n", - " # -----------------------------\n", - " else:\n", - "\n", - " # step 1: create a random metabolite ID\n", - " # ...........................\n", - " # @TODO : compartment problem \n", - " # - does it have to be in the name?\n", - " # ...........................\n", - " new_metabolite = cobra.Metabolite(create_random_id(model, 'meta','SPECIMEN')) \n", - "\n", - "\n", - " # step 2: add features\n", - " # --------------------\n", - " # @TODO ..........................................\n", - " # currently no checking for compartments\n", - " # defaults to c\n", - " # makes it difficult to add exchange reactions\n", - " #.................................................\n", - " new_metabolite.formula = metabolite_prop['formula'].iloc[0]\n", - " new_metabolite.name = metabolite_prop['name'].iloc[0]\n", - " new_metabolite.charge = metabolite_prop['charge'].iloc[0]\n", - " new_metabolite.compartment = 'c'\n", - "\n", - " # step 3: add notes\n", - " # -----------------\n", - " new_metabolite.notes['added via'] = 'metanetx.chemical'\n", - "\n", - " # step 4: add annotations\n", - " # -----------------------\n", - " new_metabolite.annotation['metanetx.chemical'] = metabolite_prop['ID'].iloc[0]\n", - " new_metabolite.annotation['chebi'] = metabolite_prop['reference'].iloc[0].upper()\n", - " if not pd.isnull(metabolite_prop['InChIKey'].iloc[0]):\n", - " new_metabolite.annotation['inchikey'] = metabolite_prop['InChIKey'].iloc[0].split('=')[1]\n", - " for db in ['kegg.compound','metacyc.compound','seed.compound','bigg.metabolite']:\n", - " db_matches = metabolite_anno[metabolite_anno['source'].str.contains(db)]\n", - " if len(db_matches) == 1:\n", - " new_metabolite.annotation[db] = db_matches['source'].iloc[0].split(':',1)[1]\n", - " elif len(db_matches) > 1:\n", - " new_metabolite.annotation[db] = [m.split(':',1)[1] for m in db_matches['source'].tolist()]\n", - "\n", - " # if no BiGG was found in MetaNetX, try reverse search in BiGG\n", - " if metabolite in bigg_metabolites['MetaNetX (MNX) Chemical']:\n", - " new_metabolite.annotation['bigg.metabolite'] = bigg_metabolites[bigg_metabolites['MetaNetX (MNX) Chemical']==metabolite].iloc[0]\n", - " \n", - " # add additional information from bigg if possible \n", - " if 'bigg.metabolite' in new_metabolite.annotation.keys():\n", - " bigg_information = bigg_metabolites[bigg_metabolites['bigg_id'].str.contains('|'.join(new_metabolite.annotation['bigg.metabolite']))]\n", - " db_id_bigg = {'BioCyc':'biocyc', 'MetaNetX (MNX) Chemical':'metanetx.chemical','SEED Compound':'seed.compound','InChI Key':'inchikey'}\n", - " for db in db_id_bigg:\n", - " info = bigg_information[db].dropna().to_list()\n", - " if len(info) > 0:\n", - " info = ','.join(info)\n", - " info = [x.strip() for x in info.split(',')] # make sure all entries are a separate list object\n", - " new_metabolite.annotation[db_id_bigg[db]] = info\n", - "\n", - " # step 5: change ID according to namespace\n", - " # ----------------------------------------\n", - " match_id_to_namespace(new_metabolite,namespace)\n", - " \n", - " # step 6: re-check existence of ID in model\n", - " # -----------------------------------------\n", - " # @TODO : check complete annotations? \n", - " # - or let those be covered by the duplicate check later on?\n", - " if new_metabolite.id in [_.id for _ in model.metabolites]:\n", - " return model.metabolites.get_by_id(new_metabolite.id)\n", - " \n", - " return new_metabolite\n", - "\n", - "# @TODO\n", - "# @DOCS\n", - "# UNDER CONSTRUCTION\n", - "def build_metabolite_kegg(kegg_id, model, model_kegg_ids, bigg_metabolites, namespace):\n", - " \"\"\"Create or retrieve (from model) a metabolite based on its KEGG ID.\n", - "\n", - " :param kegg_id: The KEGG.compound ID of the metabolite in question.\n", - " :type kegg_id: string\n", - " :param model: The model.\n", - " :type model: cobra.Model\n", - " :param model_kegg_ids: List of all annotated KEGG Compound IDs in the model.\n", - " :type model_kegg_ids: list\n", - " :param bigg_metabolites: The BiGG compound namespace table.\n", - " :type bigg_metabolites: pd.DataFrame\n", - " :returns: The retrieved or newly build metabolite.\n", - " :rytpe: cobra.Metabolite\n", - " \"\"\"\n", - "\n", - " # retrieve KEGG entry for compound\n", - " # --------------------------------\n", - " try:\n", - " kegg_handle = REST.kegg_get(kegg_id)\n", - " kegg_record = [r for r in Compound.parse(kegg_handle)][0]\n", - " except urllib.error.HTTPError:\n", - " print(F'HTTPError: {kegg_id}')\n", - " return cobra.Metabolite()\n", - " except ConnectionResetError:\n", - " print(F'ConnectionResetError: {kegg_id}')\n", - " return cobra.Metabolite()\n", - " except urllib.error.URLError:\n", - " print(F'URLError: {kegg_id}')\n", - " return cobra.Metabolite()\n", - "\n", - " # ---------------------------------------\n", - " # fast check if compound already in model\n", - " # ---------------------------------------\n", - " # @TODO ..........................................\n", - " # currently no checking for compartments\n", - " # first match will be taken (most often cytosol one)\n", - " # regardless of the compartment\n", - " #.............................................\n", - " # step 1: check via KEGG ID\n", - " if kegg_id in model_kegg_ids:\n", - " matches = [x.id for x in model.metabolites if ('kegg.compound' in x.annotation and x.annotation['kegg.compound'] == kegg_id)]\n", - "\n", - " # step 2: model id --> metabolite object\n", - " # case 1: multiple matches found\n", - " if len(matches) > 1:\n", - " match = model.metabolites.get_by_id(matches[0])\n", - " # case 2: only one match found\n", - " else:\n", - " match = model.metabolites.get_by_id(matches[0])\n", - "\n", - " # step 3: add metabolite\n", - " return match\n", - "\n", - " # -----------------------------\n", - " # if not, create new metabolite\n", - " # -----------------------------\n", - " # ...............\n", - " # @TODO\n", - " # compartment\n", - " # ...............\n", - " else:\n", - " # step 1: create a random metabolite ID\n", - " # -------------------------------------\n", - " # ...........................\n", - " # @TODO : compartment problem \n", - " # - does it have to be in the name?\n", - " # ...........................\n", - " new_metabolite = cobra.Metabolite(create_random_id(model, 'meta','SPECIMEN')) \n", - "\n", - " # step 2: add features\n", - " # --------------------\n", - " # @TODO ..........................................\n", - " # currently no checking for compartments\n", - " #.............................................\n", - " # set name from KEGG and additionally use it as ID if there is none yet\n", - " if isinstance(kegg_record.name, list):\n", - " if len(kegg_record.name) > 1:\n", - " new_metabolite.name = kegg_record.name[1]\n", - " else:\n", - " new_metabolite.name = kegg_record.name[0]\n", - " else:\n", - " new_metabolite.name = kegg_record.name\n", - " # set compartment\n", - " new_metabolite.compartment = 'c'\n", - " # set formula\n", - " new_metabolite.formula = kegg_record.formula\n", - "\n", - " # step 3: add notes\n", - " # -----------------\n", - " new_metabolite.notes['added via'] = 'KEGG.compound'\n", - "\n", - " # step 4: add annotations\n", - " # -----------------------\n", - " new_metabolite.annotation['kegg.compound'] = kegg_id\n", - " db_idtf = {'CAS':'cas','PubChem':'pubchem.compound','ChEBI':'chebi'}\n", - " for db,ids in kegg_record.dblinks:\n", - " if db in db_idtf:\n", - " if len(ids) > 1:\n", - " new_metabolite.annotation[db_idtf[db]] = ids\n", - " else:\n", - " new_metabolite.annotation[db_idtf[db]] = ids[0]\n", - "\n", - " # add additional information from BiGG\n", - " if kegg_id in bigg_metabolites['KEGG Compound']:\n", - "\n", - " bigg_information = bigg_metabolites[bigg_metabolites['KEGG Compound']==kegg_id]\n", - " if len(bigg_information) > 0:\n", - "\n", - " new_metabolite.annotation['bigg.metabolite'] = bigg_information['bigg_id'].to_list()\n", - "\n", - " db_id_bigg = {'BioCyc':'biocyc', 'MetaNetX (MNX) Chemical':'metanetx.chemical','SEED Compound':'seed.compound','InChI Key':'inchikey'}\n", - " for db in db_id_bigg:\n", - " info = bigg_information[db].dropna().to_list()\n", - " if len(info) > 0:\n", - " info = ','.join(info)\n", - " info = [x.strip() for x in info.split(',')] # make sure all entries are a separate list object\n", - " new_metabolite.annotation[db_id_bigg[db]] = info\n", - "\n", - " # step 5: change ID according to namespace\n", - " # ----------------------------------------\n", - " match_id_to_namespace(new_metabolite,namespace)\n", - " \n", - " # step 6: re-check existence of ID in model\n", - " # -----------------------------------------\n", - " # @TODO : check complete annotations? \n", - " # - or let those be covered by the duplicate check later on?\n", - " if new_metabolite.id in [_.id for _ in model.metabolites]:\n", - " return model.metabolites.get_by_id(new_metabolite.id)\n", - "\n", - " return new_metabolite\n", - "\n", - "# @TODO\n", - "# @DOCS\n", - "# UNDER CONSTRUCTION\n", - "def get_metabolites_mnx(model,equation,mnx_chem_xref,mnx_chem_prop,bigg_metabolites, namespace):\n", - " \"\"\"Based on a given MetaNetX equation and a model, get or\n", - " create metabolite entires in/for the model.\n", - "\n", - " :param model: A GEM.\n", - " :type model: cobra.Model\n", - " :param equation: The equation from MetaNetX\n", - " :type equation: string\n", - " :param mnx_chem_xref: The chem_xref table from MetaNetX\n", - " :type mnx_chem_xref: pd.DataFrame\n", - " :param mnx_chem_prop: The chem_prop table from MetaNetX\n", - " :type mnx_chem_prop: pd.DataFrame\n", - " :param bigg_metabolites: The BiGG compound namespace table.\n", - " :type bigg_metabolites: pd.DataFrame\n", - " :returns: Dictonary of metabolites and stoichiometric factors.\n", - " :rtype: dict\n", - " \"\"\"\n", - "\n", - " # @TODO ...................................\n", - " # currently no checking for compartments\n", - " #..........................................\n", - "\n", - " model_metabolites = [m.formula for m in model.metabolites]\n", - " metabolites = {}\n", - " produced = -1.0\n", - " factor = 0\n", - "\n", - " for s in equation.split(' '):\n", - " # switch from reactants to products\n", - " if s == '=':\n", - " produced = 1.0\n", - " # found stoichiometric factor\n", - " elif s.isnumeric():\n", - " factor = float(s)\n", - " # skip\n", - " elif s == '+':\n", - " continue\n", - " # found metabolite\n", - " else:\n", - " # get information from MetaNetX\n", - " metabolite, compartment = s.split('@')\n", - " # build or identify metabolite\n", - " new_metabolite = build_metabolite_mnx(metabolite, model, mnx_chem_prop, mnx_chem_xref,bigg_metabolites, namespace)\n", - " # add metabolite\n", - " if new_metabolite.id in [_.id for _ in metabolites]:\n", - " # ......................................................\n", - " # @TODO: \n", - " # check if metabolite if both reactant and product\n", - " # suggests exchange reaction \n", - " # -> maybe a good place to change compartment for one?\n", - " # -> what about name and directions???\n", - " # ......................................................\n", - " try:\n", - " test = model.metabolites.get_by_id(new_metabolite.id)\n", - " new_metabolite = new_metabolite.copy()\n", - " new_metabolite.id = new_metabolite.id + '_i'\n", - " except:\n", - " new_metabolite.id = new_metabolite.id + '_i'\n", - "\n", - " metabolites[new_metabolite] = factor * produced\n", - "\n", - " return metabolites\n", - "\n", - "\n", - "# @TODO\n", - "# @DOCS\n", - "# UNDER CONSTRUCTION\n", - "def get_metabolites_kegg(model,equation,chem_xref,chem_prop,bigg_metabolites, namespace):\n", - " \"\"\"Based on a given KEGG equation and a model, get or\n", - " create metabolite entires in/for the model.\n", - "\n", - " :param model: A GEM.\n", - " :type model: cobra.Model\n", - " :param equation: The equation from KEGG\n", - " :type equation: string\n", - " :param chem_xref: The chem_xref table from MetaNetX\n", - " :type chem_xref: pd.DataFrame\n", - " :param chem_prop: The chem_prop table from MetaNetX\n", - " :type chem_prop: pd.DataFrame\n", - " :param bigg_metabolites: The BiGG compound namespace table.\n", - " :type bigg_metabolites: pd.DataFrame\n", - " :returns: Dictonary of metabolites and stoichiometric factors.\n", - " :rtype: dict\n", - " \"\"\"\n", - "\n", - " # @TODO ...................................\n", - " # currently no checking for compartments\n", - " #..........................................\n", - "\n", - " model_metabolites = [m.formula for m in model.metabolites]\n", - " model_kegg_ids = [m.annotation['kegg.compound'] for m in model.metabolites if 'kegg.compound' in m.annotation]\n", - " metabolites = {}\n", - " produced = -1.0\n", - " factor = 1\n", - " mnx_id = ''\n", - "\n", - " for s in equation.split(' '):\n", - " # switch from reactants to products\n", - " if '=' in s:\n", - " produced = 1.0\n", - " # found stoichiometric factor\n", - " elif s.isnumeric():\n", - " factor = float(s)\n", - " # skip\n", - " elif s == '+':\n", - " continue\n", - " # found metabolite\n", - " else:\n", - " # check if s is a valid ID\n", - " if '(' in s:\n", - " s = s.split('(')[0]\n", - " # ..................................\n", - " # @TODO\n", - " # known case: DNA(n) --> DNA(n+1)\n", - " # currently note in brackets gets ignored\n", - " # ..................................\n", - " elif not s.isalnum():\n", - " print('Problem: unknown character in ID inside get_metabolites_kegg() detected.\\nPlease contact dev about your problem.')\n", - " sys.exit(1)\n", - "\n", - " mnx_id = chem_xref[chem_xref['source'] == F'kegg.compound:{s}']\n", - " # case 1:\n", - " # add metabolite via MetaNetX\n", - " # -> make sure, only 1 ID match is found (match is unambiguous)\n", - " if len(mnx_id) == 1:\n", - " mnx_id = mnx_id['ID'].item()\n", - " metabolite = build_metabolite_mnx(mnx_id, model, chem_prop, chem_xref, bigg_metabolites, namespace)\n", - " # case 2:\n", - " # add metabolite via KEGG\n", - " else:\n", - " metabolite = build_metabolite_kegg(s, model, model_kegg_ids, bigg_metabolites, namespace)\n", - "\n", - " # add new metabolite\n", - " # @TODO : place to check for exchanges?\n", - " if metabolite.id in [_.id for _ in metabolites]:\n", - " try:\n", - " test = model.metabolites.get_by_id(metabolite.id)\n", - " metabolite = metabolite.copy()\n", - " metabolite.id = metabolite.id + '_i'\n", - " except:\n", - " metabolite.id = metabolite.id + '_i'\n", - "\n", - " metabolites[metabolite] = factor * produced\n", - "\n", - " return metabolites\n", - "\n", - "\n", - "\n", - "def add_gene(model, reaction, row, first=False):\n", - " \"\"\"Add a new gene to a genome-scale metabolic cobra model.\n", - "\n", - " :param model: The model.\n", - " :type model: cobra.Model\n", - " :param reaction: The reaction id to add the gene to.\n", - " :type reaction: string\n", - " :param row: A single row of the output table of map_BiGG_reactions().\n", - " :type row: pd.Series\n", - " :param first: Shows, if gene is the first gene to be added to the reaction.\n", - " :type first: bool, true if gene is first to be added.\n", - " :returns: The updated model.\n", - " :rtype: cobra.Model\n", - " \"\"\"\n", - "\n", - " # add gene\n", - " if first or model.reactions.get_by_id(reaction).gene_reaction_rule == '':\n", - " model.reactions.get_by_id(reaction).gene_reaction_rule = row[0]\n", - " else:\n", - " model.reactions.get_by_id(reaction).gene_reaction_rule = model.reactions.get_by_id(reaction).gene_reaction_rule + ' or ' + row[0]\n", - "\n", - " # add name\n", - " model.genes.get_by_id(row[0]).name = row[1]\n", - "\n", - " # add annotations\n", - " if not pd.isnull(row[4]):\n", - " model.genes.get_by_id(row[0]).annotation['ncbigene'] = row[4]\n", - " model.genes.get_by_id(row[0]).annotation['ncbiprotein'] = row[2].split('.')[0]\n", - " # note: annotations like sbo, kegg.genes and uniprot missing\n", - "\n", - " return model\n", - "\n", - "\n", - "# UNDER CONSTRUCTION\n", - "def add_reaction(model,row,reac_xref,reac_prop,chem_xref,chem_prop,bigg_metabolites, namespace:str='BiGG', exclude_dna=True, exclude_rna=True):\n", - "\n", - " # create reaction object\n", - " reac = cobra.Reaction(create_random_id(model,'reac','SPECIMEN'))\n", - "\n", - " # ----------------------------\n", - " # curate reaction via MetaNetX\n", - " # ----------------------------\n", - " # try kegg.reaction --> metanetx.reaction\n", - " if F'kegg.reaction:{row[\"KEGG.reaction\"]}' in list(reac_xref['source']):\n", - " \n", - " # get MetaNetX ID\n", - " met_reac_kegg = reac_xref[reac_xref['source']==F'kegg.reaction:{row[\"KEGG.reaction\"]}']\n", - " met_reac = reac_prop[reac_prop['ID']==met_reac_kegg['ID'].iloc[0]]\n", - "\n", - " # make sure exactly one entry is parsed\n", - " # @TODO : parallel parsing\n", - " if len(met_reac) > 1:\n", - " print(F'Warning: multiple matches for kegg.reaction {row[\"KEGG.reaction\"]} found. Only first one will be used.')\n", - " met_reac = met_reac.head(1)\n", - "\n", - " # add name\n", - " # --------\n", - " # from MetaNetX KEGG description\n", - " reac.name = met_reac_kegg['description'].iloc[0].split('|')[0]\n", - "\n", - " # add notes\n", - " # ---------\n", - " reac.notes['creation'] = 'via MetaNetX'\n", - " reac.notes['KEGG.information'] = row['KEGG.notes']\n", - "\n", - " # add metabolites\n", - " # ----------------\n", - " reac.add_metabolites(get_metabolites_mnx(model,met_reac['mnx equation'].iloc[0],chem_xref,chem_prop,bigg_metabolites, namespace))\n", - " #@TODO .............\n", - " # direction of reaction\n", - " # ---> current solution:\n", - " # use one direction only\n", - " # ..................\n", - "\n", - " # add annotations\n", - " # ---------------\n", - " reac.annotation['ec-code'] = row['EC number']\n", - " reac.annotation['kegg.reaction'] = row['KEGG.reaction']\n", - " reac.annotation['metanetx.reaction'] = met_reac_kegg['ID'].iloc[0]\n", - " met_reac_anno = reac_xref[reac_xref['ID']==met_reac_kegg['ID'].iloc[0]]\n", - " for db in ['metacyc.reaction','seed.reaction','rhea','bigg.reaction']:\n", - " db_matches = met_reac_anno[met_reac_anno['source'].str.contains(db)]\n", - " if len(db_matches) == 1:\n", - " reac.annotation[db] = db_matches['source'].iloc[0].split(':',1)[1]\n", - " elif len(db_matches) > 1:\n", - " reac.annotation[db] = [r.split(':',1)[1] for r in db_matches['source'].tolist()]\n", - " else:\n", - " continue\n", - " \n", - " # if not possible, use information from KEGG only\n", - " # ------------------------\n", - " # curate reaction via KEGG\n", - " # ------------------------\n", - " else:\n", - " \n", - " # retrieve reaction information from KEGG\n", - " reac_kegg = kegg_reaction_parser(row['KEGG.reaction'])\n", - "\n", - " # add name\n", - " # --------\n", - " # from KEGG name\n", - " reac.name = reac_kegg['name']\n", - "\n", - " # add notes\n", - " # ---------\n", - " reac.notes['creation'] = 'via KEGG'\n", - " reac.notes['KEGG.information'] = row['KEGG.notes']\n", - "\n", - " # add metabolites\n", - " # ----------------\n", - " reac.add_metabolites(get_metabolites_kegg(model,reac_kegg['equation'],chem_xref,chem_prop,bigg_metabolites, namespace))\n", - " #@TODO .............\n", - " # direction of reaction\n", - " # ---> current solution:\n", - " # use one direction only\n", - " # ..................\n", - "\n", - " # add annotations\n", - " # ---------------\n", - " reac.annotation['ec-code'] = row['EC number']\n", - " reac.annotation['kegg.reaction'] = row['KEGG.reaction']\n", - " for db, identifiers in reac_kegg['db'].items():\n", - " if len(identifiers) == 1:\n", - " reac.annotation[db] = identifiers[0]\n", - " else:\n", - " reac.annotation[db] = identifiers\n", - "\n", - "\n", - " # --------------------------------------\n", - " # re-set ID to fit namespace if possible\n", - " # --------------------------------------\n", - " match_id_to_namespace(reac, namespace)\n", - "\n", - " # ---------------------\n", - " # add reaction to model\n", - " # ---------------------\n", - " \n", - " # if the ID change results in an ID already in the model, use that reaction\n", - " if reac.id in [_.id for _ in model.reactions]:\n", - " print(f'{reac.id} already in model, not added a second time.')\n", - " else:\n", - " # check if reaction is complete\n", - " # and fullfills the requirements / parameters\n", - " if isreaction_complete(reac, exclude_dna, exclude_rna):\n", - " model.add_reactions([reac])\n", - " else:\n", - " print(F'reaction {reac.name} for gene {row[\"locus_tag\"]} could not be completely reconstructed, not added to model.')\n", - " return model\n", - "\n", - " # --------\n", - " # add gene\n", - " # --------\n", - " # check if gene is already in model\n", - " if row['locus_tag'] in model.genes:\n", - " # if - for whatever reason - gene already in gpr, skip\n", - " if row['locus_tag'] in model.reactions.get_by_id(reac.id).gene_reaction_rule:\n", - " return model\n", - " # create new gpr, if nonexistent\n", - " elif not model.reactions.get_by_id(reac.id).gene_reaction_rule or len(model.reactions.get_by_id(reac.id).gene_reaction_rule) == 0:\n", - " model.reactions.get_by_id(reac.id).gene_reaction_rule = row['locus_tag']\n", - " # add gene to existing gpr\n", - " else:\n", - " model.reactions.get_by_id(reac.id).gene_reaction_rule = model.reactions.get_by_id(reac.id).gene_reaction_rule + ' or ' + row['locus_tag']\n", - " else:\n", - " # add (to) gene reaction rule and curate new gene object\n", - " if not model.reactions.get_by_id(reac.id).gene_reaction_rule or len(model.reactions.get_by_id(reac.id).gene_reaction_rule) == 0:\n", - " model = add_gene(model, reac.id, row, first=True)\n", - " else:\n", - " model = add_gene(model, reac.id, row, first=False)\n", - "\n", - " return model\n", - "\n", - "# notes\n", - "# @CHECK : connections, e.g. input is now a param short \n", - "def extent_model(table, model,chem_prop_file,chem_xref_file,reac_prop_file,reac_xref_file, namespace, exclude_dna=True, exclude_rna=True):\n", - " \"\"\"Add reactions, metabolites and genes to a model based on the output of map_to_bigg().\n", - "\n", - " :param table: The table with the information to be added to the model.\n", - " :type table: pd.DataFrame, output of map_to_bigg\n", - " :param model: The genome-scale metabolic model to be extended\n", - " :type model: cobra.Model\n", - " :param chem_prop_file: Path to the MetaNetX chem_prop file.\n", - " :type chem_prop_file: string\n", - " :param chem_xref_file: Path to the MetaNetX chem_xref file.\n", - " :type chem_xref_file: string\n", - " :param reac_prop_file: Path to the MetaNetX reac_prop file.\n", - " :type reac_prop_file: string\n", - " :param reac_xref_file: Path to the MetaNetX reac_xref file.\n", - " :type reac_xref_file: string\n", - " :param exclude_dna: Tag to include or exclude DNA reactions.\n", - " :type exclude_dna: bool, default is True.\n", - " :param exclude_rna: Tag to include or exclude RNA reactions.\n", - " :type exclude_rna: bool, default is True.\n", - " :returns: The extended model.\n", - " :rytpe: cobra.Model\n", - " \"\"\"\n", - "\n", - " # load MetaNetX database / namespace\n", - " chem_prop = pd.read_csv(chem_prop_file, sep='\\t', comment='#', names=['ID','name','reference','formula','charge','mass','InChI','InChIKey','SMILES'])\n", - " chem_xref = pd.read_csv(chem_xref_file, sep='\\t', comment='#', names=['source','ID','description'])\n", - "\n", - " reac_prop = pd.read_csv(reac_prop_file, sep='\\t', comment='#', names=['ID','mnx equation','reference','classifs','is_balanced','is_transport'])\n", - " reac_xref = pd.read_csv(reac_xref_file, sep='\\t', comment='#', names=['source','ID','description'])\n", - "\n", - " # load bigg metabolite namespace\n", - " bigg_metabolites = load_a_table_from_database('bigg_metabolites', False)\n", - " bigg_metabolites.rename(columns={'id':'bigg_id'}, inplace=True)\n", - " bigg_metabolites = bigg_metabolites[['bigg_id','universal_bigg_id','name','CHEBI','BioCyc','KEGG Compound','MetaNetX (MNX) Chemical','SEED Compound','InChI Key']]\n", - "\n", - " # add genes one by one to model\n", - " print('\\tAdding genes and if needed reactions and metabolites to model:')\n", - " for row_idx in tqdm(table.index):\n", - "\n", - " # generate Name -> KEGG.reaction dictionary\n", - " react_dict = make_reaction_annotation_dict(model,'KEGG')\n", - " # generate Name -> BiGG.reaction dictionary\n", - " react_dict_2 = make_reaction_annotation_dict(model,'BiGG')\n", - "\n", - " # get row in pandas format\n", - " row = table.iloc[row_idx]\n", - "\n", - "\n", - " # case 1: BiGG name already in model = reaction in model\n", - " if not pd.isnull(row['bigg_id']) and any((True for _ in row['bigg_id'].split(' ') if _ in react_dict_2.values())):\n", - " # get matching reaction id(s)\n", - " reac_found = [_ for _ in row['bigg_id'].split(' ') if _ in react_dict_2.values()]\n", - " # add genes to all reactions\n", - " for r in reac_found:\n", - " model = add_gene(model, r, row)\n", - "\n", - " # case 1: KEGG reaction ID in model = reaction probably in model as well\n", - " elif row['KEGG.reaction'] in react_dict.values():\n", - " # get corresponding reaction\n", - " react_found = [_ for _ in react_dict.keys() if row['KEGG.reaction'] == react_dict[_]]\n", - " # add gene to all reactions found\n", - " for r in react_found:\n", - " model = add_gene(model,r,row)\n", - "\n", - " # case 3: reaction not in model\n", - " # -> add reaction(s), gene and metabolites if needed\n", - " else:\n", - " # case 3.1: one reaction\n", - " react = row['KEGG.reaction'].split(' ')\n", - " if len(react) == 1:\n", - " model = add_reaction(model,row,reac_xref,reac_prop,chem_xref,chem_prop,bigg_metabolites, namespace, exclude_dna, exclude_rna)\n", - "\n", - " # case 3.2: multiple reactions\n", - " # add each reaction separatly, with the same gene for th gene reaction rule\n", - " # note: zero reactions not possible due to previous filtering\n", - " else:\n", - "\n", - " for r in react:\n", - " temp_row = row.copy(deep=True)\n", - " temp_row['KEGG.reaction'] = r\n", - " model = add_reaction(model,temp_row,reac_xref,reac_prop,chem_xref,chem_prop,bigg_metabolites, namespace, exclude_dna, exclude_rna)\n", - "\n", - " return model\n", - "\n" - ] + "source": [] }, { "cell_type": "markdown", @@ -996,199 +69,6 @@ "### Test Area 51" ] }, - { - "cell_type": "code", - "execution_count": 28, - "metadata": {}, - "outputs": [], - "source": [ - "mg = pd.read_csv(mapped_genes_path)\n", - "mapped_genes_comp = map_BiGG_reactions(mapped_genes_path)" - ] - }, - { - "cell_type": "code", - "execution_count": 74, - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "\tAdding genes and if needed reactions and metabolites to model:\n" - ] - }, - { - "data": { - "application/vnd.jupyter.widget-view+json": { - "model_id": "e2bb8d34305c46d7b915363193783ee0", - "version_major": 2, - "version_minor": 0 - }, - "text/plain": [ - " 0%| | 0/53 [00:00\n", - " \n", - " Reaction identifierSPECIMENreacJU0MHQ\n", - " \n", - " NameGTP:GTP guanylyltransferase\n", - " \n", - " Memory address\n", - " 0x1fbd61060\n", - " \n", - " Stoichiometry\n", - " \n", - "

2.0 gtp_c --> SPECIMENmeta0B2QH7 + 2.0 ppi_c

\n", - "

2.0 GTP C10H12N5O14P3 --> cyclic di-3',5'-guanylate + 2.0 Diphosphate

\n", - " \n", - " \n", - " GPRAB-1_S128_02424\n", - " \n", - " Lower bound0.0\n", - " \n", - " Upper bound1000.0\n", - " \n", - " \n", - " " - ], - "text/plain": [ - "" - ] - }, - "execution_count": 85, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "new_model.reactions.get_by_id('SPECIMENreacJU0MHQ')" - ] - }, - { - "cell_type": "code", - "execution_count": 87, - "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "{'metanetx.chemical': 'MNXM731193',\n", - " 'chebi': 'CHEBI:58805',\n", - " 'inchikey': 'PKFDLKSEZWEFGL-MHARETSRSA-L',\n", - " 'kegg.compound': 'C16463',\n", - " 'metacyc.compound': 'C-DI-GMP',\n", - " 'seed.compound': 'cpd15167'}" - ] - }, - "execution_count": 87, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "new_model.metabolites.get_by_id('SPECIMENmeta0B2QH7')" - ] - }, { "cell_type": "code", "execution_count": null, @@ -1198,89 +78,6 @@ " " ] }, - { - "cell_type": "code", - "execution_count": 160, - "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "{'sbo': 'SBO:0000185',\n", - " 'metanetx.reaction': 'MNXR94675',\n", - " 'bigg.reaction': '12DGR120tipp',\n", - " 'eco': 'ECO:0007759'}" - ] - }, - "execution_count": 160, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "draft_model = load_model(draft_model_path,'cobra') " - ] - }, - { - "cell_type": "code", - "execution_count": 161, - "metadata": {}, - "outputs": [ - { - "data": { - "text/plain": [ - "'12DGR120tipp'" - ] - }, - "execution_count": 161, - "metadata": {}, - "output_type": "execute_result" - } - ], - "source": [ - "draft_model.reactions[0].id" - ] - }, - { - "cell_type": "code", - "execution_count": 162, - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "dud\n", - "12DGR120tipp\n" - ] - } - ], - "source": [ - "draft_model.reactions[0].id = 'dud'\n", - "print(draft_model.reactions[0].id)\n", - "match_id_to_namespace(draft_model.reactions[0], 'BiGG')\n", - "print(draft_model.reactions[0].id)" - ] - }, - { - "cell_type": "code", - "execution_count": 3, - "metadata": {}, - "outputs": [], - "source": [ - "from refinegems.utility.io import load_model" - ] - }, - { - "cell_type": "code", - "execution_count": 4, - "metadata": {}, - "outputs": [], - "source": [ - "test_model_path = '/Users/brune/Documents/11_Test_Data/test_SPECIMEN/thesis/Kp_std/03_refinement/step4-smoothing/Kp_std_smooth.xml'\n", - "test_model = load_model(test_model_path,'cobra')" - ] - }, { "cell_type": "code", "execution_count": null, diff --git a/src/specimen/core/generate_draft_model.py b/src/specimen/core/generate_draft_model.py index 07007c1..ca072b3 100644 --- a/src/specimen/core/generate_draft_model.py +++ b/src/specimen/core/generate_draft_model.py @@ -18,6 +18,7 @@ from refinegems.utility.io import load_model from refinegems.utility.entities import resolve_compartment_names from refinegems.curation.biomass import test_biomass_presence +from refinegems.analysis.investigate import run_memote from ..util.util import MIN_GROWTH_RATE @@ -286,6 +287,8 @@ def gen_draft_model(model, bbh, name, dir, edit, medium='default', namespace='Bi draft.id = name cobra.io.write_sbml_model(draft, F'{dir}{name}_draft.xml') + return draft + def run(template, bpbbh, dir, edit_names='no', pid=80.0, name=None, medium='default', namespace='BiGG', memote=False): """Generate a draft model from a blastp best hits tsv file and a template model. @@ -372,21 +375,18 @@ def run(template, bpbbh, dir, edit_names='no', pid=80.0, name=None, medium='defa print('\n# --------------------\n# generate draft model\n# --------------------') start = time.time() - gen_draft_model(template_model, bbh_data, name, dir, edit_names, medium=medium, namespace=namespace) + draft = gen_draft_model(template_model, bbh_data, name, dir, edit_names, medium=medium, namespace=namespace) end = time.time() print(F'\ttotal time: {end - start}s') # ------------------- # analyse with MEMOTE # ------------------- + if memote: - print('\n# -------------------\n# analyse with MEMOTE\n# -------------------') - start = time.time() - draft_path = F'{dir}{name}_draft.xml'.replace(" ", "\ ") - memote_path = F'{dir}{name}_draft.html'.replace(" ", "\ ") - subprocess.run([F'memote report snapshot --filename {memote_path} {draft_path}'], shell=True) - end = time.time() - print(F'\ttotal time: {end - start}s') + memote_path = F'{dir}step1-extension/{name}.html' + run_memote(draft, 'html', return_res=False, save_res=memote_path, verbose=True) total_time_e = time.time() print(F'total runtime: {total_time_e-total_time_s}') + diff --git a/src/specimen/core/refinement/annotation.py b/src/specimen/core/refinement/annotation.py index b7cd91f..f82b2ed 100644 --- a/src/specimen/core/refinement/annotation.py +++ b/src/specimen/core/refinement/annotation.py @@ -17,13 +17,13 @@ # refinegems from refinegems.utility.io import load_model, kegg_reaction_parser +from refinegems.analysis.investigate import run_memote # from SBOannotator import * from SBOannotator import sbo_annotator # further required programs: # - SBOannotator -# - MEMOTE, tested with version 0.13.0 ################################################################################ # functions @@ -223,3 +223,7 @@ def run(model, dir, kegg_viaEC=False, kegg_viaRC=False, memote=False): subprocess.run([F'memote report snapshot --skip test_consistency --filename {memote_path} {draft_path}'], shell=True) end = time.time() print(F'\ttotal time: {end - start}s') + + if memote: + memote_path = F'{dir}step3-annotation/{name}_SBOannotated.html' + run_memote(model, 'html', return_res=False, save_res=memote_path, verbose=True) diff --git a/src/specimen/core/refinement/cleanup.py b/src/specimen/core/refinement/cleanup.py index e6e6d37..3376f4f 100644 --- a/src/specimen/core/refinement/cleanup.py +++ b/src/specimen/core/refinement/cleanup.py @@ -24,9 +24,7 @@ from refinegems.classes.medium import medium_to_model from refinegems.curation.biomass import test_biomass_presence from refinegems.analysis.growth import read_media_config - -# further required programs: -# - MEMOTE, tested with version 0.13.0 +from refinegems.analysis.investigate import run_memote ################################################################################ # variables @@ -819,12 +817,7 @@ def run(model, dir, biocyc_db=None, check_dupl_reac = False, # --------------------------------- # assess model quality using memote # --------------------------------- - + if memote: - print('\n# -------------------\n# analyse with MEMOTE\n# -------------------') - start = time.time() - draft_path = F'{dir}step2-clean-up/{name}.xml'.replace(" ", "\ ") - memote_path = F'{dir}step2-clean-up/{name}.html'.replace(" ", "\ ") - subprocess.run([F'memote report snapshot --filename {memote_path} {draft_path}'], shell=True) - end = time.time() - print(F'\ttotal time: {end - start}s') + memote_path = F'{dir}step2-clean-up/{name}.html' + run_memote(model, 'html', return_res=False, save_res=memote_path, verbose=True) diff --git a/src/specimen/core/refinement/extension.py b/src/specimen/core/refinement/extension.py index bb6c2d3..30fa8d8 100644 --- a/src/specimen/core/refinement/extension.py +++ b/src/specimen/core/refinement/extension.py @@ -8,27 +8,31 @@ # requirements ################################################################################ -import time -import pandas as pd -from tqdm import tqdm -from tqdm.auto import tqdm -tqdm.pandas() -import cobra from pathlib import Path from Bio import SeqIO -import subprocess + +import pandas as pd import re +import subprocess import sys +import time import urllib.error + +from tqdm import tqdm +from tqdm.auto import tqdm +tqdm.pandas() +import cobra + from Bio.KEGG import REST from Bio.KEGG import Enzyme, Compound from refinegems.utility.io import kegg_reaction_parser, load_a_table_from_database +from refinegems.utility.entities import create_random_id, get_reaction_annotation_dict, match_id_to_namespace +from refinegems.analysis.investigate import run_memote # further required programs: # - DIAMOND, tested with version 0.9.14 (works only for certain sensitivity mode) # tested with version 2.1.8 (works for all sensitivity modes for that version) -# - MEMOTE, tested with version 0.13.0 ################################################################################ # functions @@ -98,6 +102,9 @@ def find_best_diamond_hits(file, pid): # ----------------------------- # map to additional information # ----------------------------- + +# mapping zo NCBI +# --------------- def map_to_NCBI_efetch_row(row): """Map a single entry from the table in get_ncbi_info() to NCBI using EntrezDirect. @@ -188,7 +195,8 @@ def get_ncbi_info(table, ncbi_map_file=None): return table - +# mapping to KEGG +# --------------- # @TODO def map_to_KEGG_row(row, loci_list=None): """Map a single entry of the table in map_to_KEGG() to KEGG. @@ -377,8 +385,10 @@ def map_to_KEGG(gene_table,working_dir,manual_dir,genome_summary=None): return F'{working_dir}genes_mapped.csv' +# mapping to BiGG +# --------------- -def map_Bigg_reactions_row(row, namespace): +def map_BiGG_reactions_row(row, namespace): """Map a single entry from the table in map_BiGG_reactions() to the BiGG reaction namespace. :param row: A single row of the table. @@ -405,12 +415,12 @@ def map_Bigg_reactions_row(row, namespace): # case 2 : exactly one match elif len(matches) == 1: - row['bigg_id'] = matches['bigg_id'].values[0] + row['bigg_id'] = matches['id'].values[0] # case 3 : multiple matches # often due to reaction being in different compartments else: - row['bigg_id'] = ' '.join(matches['bigg_id'].values) + row['bigg_id'] = ' '.join(matches['id'].values) return row @@ -431,36 +441,15 @@ def map_BiGG_reactions(table_file): table = pd.read_csv(table_file) table['bigg_id'] = pd.Series(dtype='str') - table = table.apply(lambda row: map_Bigg_reactions_row(row,r_namespace), axis=1) + table = table.apply(lambda row: map_BiGG_reactions_row(row,r_namespace), axis=1) return table - # ------------ # extent model # ------------ -def make_reaction_dict(model): - """For a given model, create a dictionary of reaction IDs and their - KEGG IDs, if annotated. - - :param model: A genome-scale metabolic model. - :type model: cobra.Model - :returns: The reaction.id - kegg.reaction.id mapping. - :rtype: dict - """ - react_dict = {} - - for r in model.reactions: - if 'kegg.reaction' in r.annotation.keys(): - react_dict[r.id] = r.annotation['kegg.reaction'] - else: - react_dict[r.id] = '-' - - return react_dict - - -#@Todo +# @TODO def isreaction_complete(reac, exclude_dna=True, exclude_rna=True): """Check, if a reaction is complete and ready to be added to the model. Additionally, it is possible to check for DNA and RNA reations @@ -477,6 +466,7 @@ def isreaction_complete(reac, exclude_dna=True, exclude_rna=True): """ # ................ + # @TODO # extendable # ................ @@ -497,42 +487,9 @@ def isreaction_complete(reac, exclude_dna=True, exclude_rna=True): return True -#@TODO -def create_reac_id(name): - """From a given reaction name, create an ID for a model. - - @TODO (if needed): - - - check, if created name already exists - - find a more sensible way to create an ID - - - :param name: The name of the reaction. - :type name: string - :returns: The generated ID - :rtype: string - """ - - # create new name from description string - id_str = [] - take = False - for idx,c in enumerate(name): - if c.isupper() or c.isnumeric() or (idx == 0) or (take and not c in ['[',']','(',')','-']): - id_str.append(c.upper()) - take = False - elif c in [' ','-','(']: - take = True - else: - take = False - - id_str = ''.join(id_str) - - # @TODO.......................................... - # check if name is already in model? - # ............................................... - - return id_str +# build genes +# ----------- def add_gene(model, reaction, row, first=False): """Add a new gene to a genome-scale metabolic cobra model. @@ -567,16 +524,18 @@ def add_gene(model, reaction, row, first=False): return model -#@TODO -def build_metabolite_mnx(metabolite, model, model_metabolites, mnx_chem_prop, mnx_chem_xref, bigg_metabolites): +# build metabolites +# ----------------- +# @TODO +# @DOCS wrong +# UNDER CONSTRUCTION +def build_metabolite_mnx(metabolite, model, mnx_chem_prop, mnx_chem_xref, bigg_metabolites, namespace): """Create or retrieve (from model) a metabolite based on its MetaNetX ID. :param metabolite: The MetaNetX ID of the metabolite. :type metabolite: string :param model: The underlying genome-scale metabolic model. :type model: cobra.model - :param model_metabolites: List of the formulas of all metabolites in the model. - :type model_metabolites: list :param mnx_chem_xref: The chem_xref table from MetaNetX :type mnx_chem_xref: pd.DataFrame :param mnx_chem_prop: The chem_prop table from MetaNetX @@ -591,24 +550,18 @@ def build_metabolite_mnx(metabolite, model, model_metabolites, mnx_chem_prop, mn metabolite_anno = mnx_chem_xref[mnx_chem_xref['ID']==metabolite] model_mnx = [x.annotation['metanetx.chemical'] for x in model.metabolites if 'metanetx.chemical' in x.annotation] - # check if compound already in model - # ---------------------------------- + # fast check if compound already in model + # ------------------------------------------ # @TODO .......................................... # currently no checking for compartments - # first match will be taken (_c in most cases) + # first match will be taken (most often cytosol one) # regardless of the compartment #............................................. + # step 1: check if MetaNetX ID in model if metabolite in model_mnx: - # step 1: transform metanetx.id --> model identifier matches = [x.id for x in model.metabolites if 'metanetx.chemical' in x.annotation and x.annotation['metanetx.chemical']==metabolite] - elif not pd.isnull(metabolite_prop['formula'].iloc[0]) and metabolite_prop['formula'].iloc[0] in model_metabolites: - # step 1: transform metabolite_prop['formula'].iloc[0] --> model identifier - matches = [x.id for x in model.metabolites if x.formula == metabolite_prop['formula'].iloc[0]] - else: - matches = None - if matches: - # step 2: model id --> metabolite object + # step 2: if yes, retrieve metabolite from model # case 1: multiple matches found if len(matches) > 1: # ................ @@ -625,84 +578,82 @@ def build_metabolite_mnx(metabolite, model, model_metabolites, mnx_chem_prop, mn # if not, create new metabolite # ----------------------------- else: - # create metabolite with id - if metabolite_anno['source'].str.contains('bigg.reaction').any(): - # use BiGG id, if one exists - # ........ - # the usual compartment problem - #........ - new_metabolite = cobra.Metabolite(metabolite_anno[metabolite_anno['source'].str.contains('bigg.reaction')].iloc[0]) - new_metabolite.annotation['bigg.metabolite'] = metabolite_anno[metabolite_anno['source'].str.contains('bigg.reaction')].iloc[0] - elif metabolite in bigg_metabolites['MetaNetX (MNX) Chemical']: - # ........ - # the usual compartment problem - #........ - bigg_id = bigg_metabolites[bigg_metabolites['MetaNetX (MNX) Chemical']==metabolite].iloc[0] - new_metabolite = cobra.Metabolite(bigg_id) - new_metabolite.annotation['bigg.metabolite'] = bigg_id - elif not pd.isnull(metabolite_prop['formula'].iloc[0]): - new_metabolite = cobra.Metabolite(metabolite_prop['formula'].iloc[0]) - else: - #@TODO ............. - # formula = NaN - # ---> current solution: - # use name (replace whitespaces) - # .................. - if ' ' in metabolite_prop['name'].iloc[0]: - new_metabolite = cobra.Metabolite(metabolite_prop['name'].iloc[0].replace(' ','_')) - else: - new_metabolite = cobra.Metabolite(metabolite_prop['name'].iloc[0]) - # add features - # ------------ + # step 1: create a random metabolite ID + # ........................... + # @TODO : compartment problem + # - does it have to be in the name? + # ........................... + new_metabolite = cobra.Metabolite(create_random_id(model, 'meta','SPECIMEN')) + + + # step 2: add features + # -------------------- # @TODO .......................................... # currently no checking for compartments - #............................................. + # defaults to c + # makes it difficult to add exchange reactions + #................................................. new_metabolite.formula = metabolite_prop['formula'].iloc[0] new_metabolite.name = metabolite_prop['name'].iloc[0] new_metabolite.charge = metabolite_prop['charge'].iloc[0] new_metabolite.compartment = 'c' - # add notes - # --------- + # step 3: add notes + # ----------------- new_metabolite.notes['added via'] = 'metanetx.chemical' - # add annotations - # --------------- + # step 4: add annotations + # ----------------------- new_metabolite.annotation['metanetx.chemical'] = metabolite_prop['ID'].iloc[0] new_metabolite.annotation['chebi'] = metabolite_prop['reference'].iloc[0].upper() if not pd.isnull(metabolite_prop['InChIKey'].iloc[0]): new_metabolite.annotation['inchikey'] = metabolite_prop['InChIKey'].iloc[0].split('=')[1] - for db in ['kegg.compound','metacyc.compound','seed.compound']: + for db in ['kegg.compound','metacyc.compound','seed.compound','bigg.metabolite']: db_matches = metabolite_anno[metabolite_anno['source'].str.contains(db)] if len(db_matches) == 1: new_metabolite.annotation[db] = db_matches['source'].iloc[0].split(':',1)[1] elif len(db_matches) > 1: new_metabolite.annotation[db] = [m.split(':',1)[1] for m in db_matches['source'].tolist()] - # add additional information from bigg if possible - if new_metabolite.id in bigg_metabolites['bigg_id']: - bigg_information = bigg_metabolites[bigg_metabolites['bigg_id']==new_metabolite.id] + + # if no BiGG was found in MetaNetX, try reverse search in BiGG + if metabolite in bigg_metabolites['MetaNetX (MNX) Chemical']: + new_metabolite.annotation['bigg.metabolite'] = bigg_metabolites[bigg_metabolites['MetaNetX (MNX) Chemical']==metabolite].iloc[0] + + # add additional information from bigg if possible + if 'bigg.metabolite' in new_metabolite.annotation.keys(): + bigg_information = bigg_metabolites[bigg_metabolites['bigg_id'].str.contains('|'.join(new_metabolite.annotation['bigg.metabolite']))] db_id_bigg = {'BioCyc':'biocyc', 'MetaNetX (MNX) Chemical':'metanetx.chemical','SEED Compound':'seed.compound','InChI Key':'inchikey'} for db in db_id_bigg: - if not pd.isnull(bigg_information[db]): - info = bigg_information[db] - if ',' in info: - info = [x.strip() for x in info.split(',')] + info = bigg_information[db].dropna().to_list() + if len(info) > 0: + info = ','.join(info) + info = [x.strip() for x in info.split(',')] # make sure all entries are a separate list object new_metabolite.annotation[db_id_bigg[db]] = info + # step 5: change ID according to namespace + # ---------------------------------------- + match_id_to_namespace(new_metabolite,namespace) + + # step 6: re-check existence of ID in model + # ----------------------------------------- + # @TODO : check complete annotations? + # - or let those be covered by the duplicate check later on? + if new_metabolite.id in [_.id for _ in model.metabolites]: + return model.metabolites.get_by_id(new_metabolite.id) + return new_metabolite - -#@TODO -def build_metabolite_kegg(kegg_id, model, model_metabolites, model_kegg_ids, bigg_metabolites): +# @TODO +# @DOCS +# UNDER CONSTRUCTION +def build_metabolite_kegg(kegg_id, model, model_kegg_ids, bigg_metabolites, namespace): """Create or retrieve (from model) a metabolite based on its KEGG ID. :param kegg_id: The KEGG.compound ID of the metabolite in question. :type kegg_id: string :param model: The model. :type model: cobra.Model - :param model_metabolites: List of the formulas of all metabolites in the model. - :type model_metabolites: list :param model_kegg_ids: List of all annotated KEGG Compound IDs in the model. :type model_kegg_ids: list :param bigg_metabolites: The BiGG compound namespace table. @@ -726,26 +677,21 @@ def build_metabolite_kegg(kegg_id, model, model_metabolites, model_kegg_ids, big print(F'URLError: {kegg_id}') return cobra.Metabolite() - # check, if compound in model - # via KEGG ID + # --------------------------------------- + # fast check if compound already in model + # --------------------------------------- + # @TODO .......................................... + # currently no checking for compartments + # first match will be taken (most often cytosol one) + # regardless of the compartment + #............................................. + # step 1: check via KEGG ID if kegg_id in model_kegg_ids: - # step 1: transform kegg_id --> model identifier matches = [x.id for x in model.metabolites if ('kegg.compound' in x.annotation and x.annotation['kegg.compound'] == kegg_id)] - # via formula - elif kegg_record.formula != '' and kegg_record.formula in model_metabolites: - # step 1: transform formula --> model identifier - matches = [x.id for x in model.metabolites if x.formula == kegg_record.formula] - else: - matches = None - if matches: # step 2: model id --> metabolite object # case 1: multiple matches found if len(matches) > 1: - # ................ - # @TODO - # compartment problem again - # ................ match = model.metabolites.get_by_id(matches[0]) # case 2: only one match found else: @@ -754,72 +700,46 @@ def build_metabolite_kegg(kegg_id, model, model_metabolites, model_kegg_ids, big # step 3: add metabolite return match - # create new metabolite - # --------------------- + # ----------------------------- + # if not, create new metabolite + # ----------------------------- # ............... - # note: - # even if annotation is in model, as long as no formula match can be found - # a new compound will be created for the model - # @TODO - # check if new metabolite ID already in model - # -> current ID is just the name of the compound + # compartment # ............... else: - # set name and id - # --------------- - # first try BIGG for ID - found_check = False - if kegg_id in bigg_metabolites['KEGG Compound']: - found = bigg_metabolites[bigg_metabolites['KEGG Compound']==kegg_id]['bigg_id'] - found_check = True - if len(found) > 1: - # ................ - # @TODO - # compartment problem again - # ................ - new_metabolite = cobra.Metabolite(found[0]) - else: - new_metabolite = cobra.Metabolite(found[0]) - + # step 1: create a random metabolite ID + # ------------------------------------- + # ........................... + # @TODO : compartment problem + # - does it have to be in the name? + # ........................... + new_metabolite = cobra.Metabolite(create_random_id(model, 'meta','SPECIMEN')) + + # step 2: add features + # -------------------- + # @TODO .......................................... + # currently no checking for compartments + #............................................. # set name from KEGG and additionally use it as ID if there is none yet if isinstance(kegg_record.name, list): - if not found_check: - if ' ' in kegg_record.name[0]: - meta_id = kegg_record.name[0].replace(' ','_') - else: - meta_id = kegg_record.name[0] - new_metabolite = cobra.Metabolite(meta_id) if len(kegg_record.name) > 1: new_metabolite.name = kegg_record.name[1] else: new_metabolite.name = kegg_record.name[0] else: - if not found_check: - if ' ' in kegg_record.name: - meta_id = kegg_record.name.replace(' ','_') - else: - meta_id = kegg_record.name - new_metabolite = cobra.Metabolite(meta_id) new_metabolite.name = kegg_record.name - - # set formula - # ----------- - new_metabolite.formula = kegg_record.formula - # set compartment - # --------------- - # @TODO .......................................... - # currently no checking for compartments - #............................................. new_metabolite.compartment = 'c' + # set formula + new_metabolite.formula = kegg_record.formula - # add note - # -------- + # step 3: add notes + # ----------------- new_metabolite.notes['added via'] = 'KEGG.compound' - # add annotations - # --------------- + # step 4: add annotations + # ----------------------- new_metabolite.annotation['kegg.compound'] = kegg_id db_idtf = {'CAS':'cas','PubChem':'pubchem.compound','ChEBI':'chebi'} for db,ids in kegg_record.dblinks: @@ -830,21 +750,38 @@ def build_metabolite_kegg(kegg_id, model, model_metabolites, model_kegg_ids, big new_metabolite.annotation[db_idtf[db]] = ids[0] # add additional information from BiGG - if found_check: - bigg_information = bigg_metabolites[bigg_metabolites['bigg_id']==new_metabolite.id] - db_id_bigg = {'BioCyc':'biocyc', 'MetaNetX (MNX) Chemical':'metanetx.chemical','SEED Compound':'seed.compound','InChI Key':'inchikey'} - for db in db_id_bigg: - if not pd.isnull(bigg_information[db]): - info = bigg_information[db] - if ',' in info: - info = [x.strip() for x in info.split(',')] - new_metabolite.annotation[db_id_bigg[db]] = info + if kegg_id in bigg_metabolites['KEGG Compound']: - return new_metabolite + bigg_information = bigg_metabolites[bigg_metabolites['KEGG Compound']==kegg_id] + if len(bigg_information) > 0: + + new_metabolite.annotation['bigg.metabolite'] = bigg_information['bigg_id'].to_list() + + db_id_bigg = {'BioCyc':'biocyc', 'MetaNetX (MNX) Chemical':'metanetx.chemical','SEED Compound':'seed.compound','InChI Key':'inchikey'} + for db in db_id_bigg: + info = bigg_information[db].dropna().to_list() + if len(info) > 0: + info = ','.join(info) + info = [x.strip() for x in info.split(',')] # make sure all entries are a separate list object + new_metabolite.annotation[db_id_bigg[db]] = info + + # step 5: change ID according to namespace + # ---------------------------------------- + match_id_to_namespace(new_metabolite,namespace) + + # step 6: re-check existence of ID in model + # ----------------------------------------- + # @TODO : check complete annotations? + # - or let those be covered by the duplicate check later on? + if new_metabolite.id in [_.id for _ in model.metabolites]: + return model.metabolites.get_by_id(new_metabolite.id) + return new_metabolite -#@TODO -def get_metabolites_mnx(model,equation,mnx_chem_xref,mnx_chem_prop,bigg_metabolites): +# @TODO +# @DOCS +# UNDER CONSTRUCTION +def get_metabolites_mnx(model,equation,mnx_chem_xref,mnx_chem_prop,bigg_metabolites, namespace): """Based on a given MetaNetX equation and a model, get or create metabolite entires in/for the model. @@ -870,7 +807,6 @@ def get_metabolites_mnx(model,equation,mnx_chem_xref,mnx_chem_prop,bigg_metaboli metabolites = {} produced = -1.0 factor = 0 - mnx_id = '' for s in equation.split(' '): # switch from reactants to products @@ -887,9 +823,16 @@ def get_metabolites_mnx(model,equation,mnx_chem_xref,mnx_chem_prop,bigg_metaboli # get information from MetaNetX metabolite, compartment = s.split('@') # build or identify metabolite - new_metabolite = build_metabolite_mnx(metabolite, model, model_metabolites, mnx_chem_prop, mnx_chem_xref,bigg_metabolites) + new_metabolite = build_metabolite_mnx(metabolite, model, mnx_chem_prop, mnx_chem_xref,bigg_metabolites, namespace) # add metabolite if new_metabolite.id in [_.id for _ in metabolites]: + # ...................................................... + # @TODO: + # check if metabolite if both reactant and product + # suggests exchange reaction + # -> maybe a good place to change compartment for one? + # -> what about name and directions??? + # ...................................................... try: test = model.metabolites.get_by_id(new_metabolite.id) new_metabolite = new_metabolite.copy() @@ -902,8 +845,10 @@ def get_metabolites_mnx(model,equation,mnx_chem_xref,mnx_chem_prop,bigg_metaboli return metabolites -#@TODO -def get_metabolites_kegg(model,equation,chem_xref,chem_prop,bigg_metabolites): +# @TODO +# @DOCS +# UNDER CONSTRUCTION +def get_metabolites_kegg(model,equation,chem_xref,chem_prop,bigg_metabolites, namespace): """Based on a given KEGG equation and a model, get or create metabolite entires in/for the model. @@ -953,7 +898,7 @@ def get_metabolites_kegg(model,equation,chem_xref,chem_prop,bigg_metabolites): # currently note in brackets gets ignored # .................................. elif not s.isalnum(): - print('Problem: unknown character in ID inside get_metabolites_kegg() detected.\nPlease adjust code accordingly.') + print('Problem: unknown character in ID inside get_metabolites_kegg() detected.\nPlease contact dev about your problem.') sys.exit(1) mnx_id = chem_xref[chem_xref['source'] == F'kegg.compound:{s}'] @@ -962,13 +907,14 @@ def get_metabolites_kegg(model,equation,chem_xref,chem_prop,bigg_metabolites): # -> make sure, only 1 ID match is found (match is unambiguous) if len(mnx_id) == 1: mnx_id = mnx_id['ID'].item() - metabolite = build_metabolite_mnx(mnx_id, model, model_metabolites, chem_prop, chem_xref, bigg_metabolites) + metabolite = build_metabolite_mnx(mnx_id, model, chem_prop, chem_xref, bigg_metabolites, namespace) # case 2: # add metabolite via KEGG else: - metabolite = build_metabolite_kegg(s, model, model_metabolites, model_kegg_ids, bigg_metabolites) + metabolite = build_metabolite_kegg(s, model, model_kegg_ids, bigg_metabolites, namespace) # add new metabolite + # @TODO : place to check for exchanges? if metabolite.id in [_.id for _ in metabolites]: try: test = model.metabolites.get_by_id(metabolite.id) @@ -982,65 +928,35 @@ def get_metabolites_kegg(model,equation,chem_xref,chem_prop,bigg_metabolites): return metabolites +# build reaction +# -------------- #@TODO -def add_reaction(model,row,reac_xref,reac_prop,chem_xref,chem_prop,bigg_metabolites,exclude_dna=True, exclude_rna=True): - """Add a reaction to a given genome-scale metabolic model. - If possible, the reaction is added via MetaNetX way. If the reaction cannot - be found within the MetaNetX namespace, it will be added via KEGG. +# UNDER CONSTRUCTION +def add_reaction(model,row,reac_xref,reac_prop,chem_xref,chem_prop,bigg_metabolites, namespace:str='BiGG', exclude_dna=True, exclude_rna=True): - :param model: A GEM. - :type model: cobra.Model - :param row: A single row of the output table of map_BiGG_reactions(). - :type row: pd.Series - :param reac_xref: The chem_xref table from MetaNetX - :type reac_xref: pd.DataFrame - :param reac_prop: The chem_prop table from MetaNetX - :type reac_prop: pd.DataFrame - :param chem_xref: The chem_xref table from MetaNetX - :type chem_xref: pd.DataFrame - :param chem_prop: The chem_prop table from MetaNetX - :type chem_prop: pd.DataFrame - :param bigg_metabolites: The BiGG compound namespace table. - :type bigg_metabolites: pd.DataFrame - :param exclude_dna: Tag to include or exclude DNA reactions. - :type exclude_dna: bool, default is True. - :param exclude_rna: Tag to include or exclude RNA reactions. - :type exclude_rna: bool, default is True. - :returns: The updated model. - :rtype: cobra.Model - """ + # create reaction object + reac = cobra.Reaction(create_random_id(model,'reac','SPECIMEN')) # ---------------------------- # curate reaction via MetaNetX # ---------------------------- # try kegg.reaction --> metanetx.reaction if F'kegg.reaction:{row["KEGG.reaction"]}' in list(reac_xref['source']): - + # get MetaNetX ID met_reac_kegg = reac_xref[reac_xref['source']==F'kegg.reaction:{row["KEGG.reaction"]}'] met_reac = reac_prop[reac_prop['ID']==met_reac_kegg['ID'].iloc[0]] # make sure exactly one entry is parsed + # @TODO : parallel parsing if len(met_reac) > 1: print(F'Warning: multiple matches for kegg.reaction {row["KEGG.reaction"]} found. Only first one will be used.') met_reac = met_reac.head(1) - # add id and name - # --------------- - # id: from BiGG or create anew - # name: from MetaNetX KEGG description - if not pd.isnull(row['bigg_id']): - #@TODO ............. - # multiple value for BiGG reaction - # ---> current solution: - # use first one only - # .................. - reac = cobra.Reaction(min(row['bigg_id'].split(' '),key=len)) - reac.name = met_reac_kegg['description'].iloc[0].split('|')[0] - - else: - reac = cobra.Reaction(create_reac_id(met_reac_kegg['description'].iloc[0].split('|')[0])) - reac.name = met_reac_kegg['description'].iloc[0].split('|')[0] + # add name + # -------- + # from MetaNetX KEGG description + reac.name = met_reac_kegg['description'].iloc[0].split('|')[0] # add notes # --------- @@ -1049,7 +965,7 @@ def add_reaction(model,row,reac_xref,reac_prop,chem_xref,chem_prop,bigg_metaboli # add metabolites # ---------------- - reac.add_metabolites(get_metabolites_mnx(model,met_reac['mnx equation'].iloc[0],chem_xref,chem_prop,bigg_metabolites)) + reac.add_metabolites(get_metabolites_mnx(model,met_reac['mnx equation'].iloc[0],chem_xref,chem_prop,bigg_metabolites, namespace)) #@TODO ............. # direction of reaction # ---> current solution: @@ -1070,33 +986,20 @@ def add_reaction(model,row,reac_xref,reac_prop,chem_xref,chem_prop,bigg_metaboli reac.annotation[db] = [r.split(':',1)[1] for r in db_matches['source'].tolist()] else: continue - - + # if not possible, use information from KEGG only # ------------------------ # curate reaction via KEGG # ------------------------ else: + # retrieve reaction information from KEGG reac_kegg = kegg_reaction_parser(row['KEGG.reaction']) - # add id and name - # --------------- - # id: from BiGG or create anew - # name: from KEGG name - if not pd.isnull(row['bigg_id']): - #@TODO ............. - # multiple value for BiGG reaction - # ---> current solution: - # use first one only - # .................. - reac = cobra.Reaction(min(row['bigg_id'].split(' '),key=len)) - reac.name = reac_kegg['name'] - reac.annotation['bigg.reaction'] = min(row['bigg_id'].split(' '),key=len) - - else: - reac = cobra.Reaction(create_reac_id(reac_kegg['name'])) - reac.name = reac_kegg['name'] + # add name + # -------- + # from KEGG name + reac.name = reac_kegg['name'] # add notes # --------- @@ -1105,7 +1008,7 @@ def add_reaction(model,row,reac_xref,reac_prop,chem_xref,chem_prop,bigg_metaboli # add metabolites # ---------------- - reac.add_metabolites(get_metabolites_kegg(model,reac_kegg['equation'],chem_xref,chem_prop,bigg_metabolites)) + reac.add_metabolites(get_metabolites_kegg(model,reac_kegg['equation'],chem_xref,chem_prop,bigg_metabolites, namespace)) #@TODO ............. # direction of reaction # ---> current solution: @@ -1123,35 +1026,56 @@ def add_reaction(model,row,reac_xref,reac_prop,chem_xref,chem_prop,bigg_metaboli reac.annotation[db] = identifiers + # -------------------------------------- + # re-set ID to fit namespace if possible + # -------------------------------------- + match_id_to_namespace(reac, namespace) + # --------------------- # add reaction to model # --------------------- - # check if reaction is complete - # and fullfills the requirements / parameters - if isreaction_complete(reac, exclude_dna, exclude_rna): - model.add_reactions([reac]) + + # if the ID change results in an ID already in the model, use that reaction + if reac.id in [_.id for _ in model.reactions]: + print(f'{reac.id} already in model, not added a second time.') else: - print(F'reaction {reac.name} for gene {row["locus_tag"]} could not be completely reconstructed, not added to model.') - return model + # check if reaction is complete + # and fullfills the requirements / parameters + if isreaction_complete(reac, exclude_dna, exclude_rna): + model.add_reactions([reac]) + else: + print(F'reaction {reac.name} for gene {row["locus_tag"]} could not be completely reconstructed, not added to model.') + return model # -------- # add gene # -------- # check if gene is already in model if row['locus_tag'] in model.genes: - # only need to add the gene reaction rule - model.reactions.get_by_id(reac.id).gene_reaction_rule = row['locus_tag'] + # if - for whatever reason - gene already in gpr, skip + if row['locus_tag'] in model.reactions.get_by_id(reac.id).gene_reaction_rule: + return model + # create new gpr, if nonexistent + elif not model.reactions.get_by_id(reac.id).gene_reaction_rule or len(model.reactions.get_by_id(reac.id).gene_reaction_rule) == 0: + model.reactions.get_by_id(reac.id).gene_reaction_rule = row['locus_tag'] + # add gene to existing gpr + else: + model.reactions.get_by_id(reac.id).gene_reaction_rule = model.reactions.get_by_id(reac.id).gene_reaction_rule + ' or ' + row['locus_tag'] else: - # add gene reaction rule and curate new gene object - model = add_gene(model, reac.id, row, first=True) + # add (to) gene reaction rule and curate new gene object + if not model.reactions.get_by_id(reac.id).gene_reaction_rule or len(model.reactions.get_by_id(reac.id).gene_reaction_rule) == 0: + model = add_gene(model, reac.id, row, first=True) + else: + model = add_gene(model, reac.id, row, first=False) return model +# extend model +# ------------ # notes -# @TEST : fitted to refinegems # @CHECK : connections, e.g. input is now a param short -def extent_model(table, model,chem_prop_file,chem_xref_file,reac_prop_file,reac_xref_file, exclude_dna=True, exclude_rna=True): +def extent_model(table, model,chem_prop_file,chem_xref_file,reac_prop_file,reac_xref_file, namespace, exclude_dna=True, exclude_rna=True): """Add reactions, metabolites and genes to a model based on the output of map_to_bigg(). :param table: The table with the information to be added to the model. @@ -1190,25 +1114,30 @@ def extent_model(table, model,chem_prop_file,chem_xref_file,reac_prop_file,reac_ print('\tAdding genes and if needed reactions and metabolites to model:') for row_idx in tqdm(table.index): - # generate BiGG name -> KEGG.reaction dictionary - react_dict = make_reaction_dict(model) + # generate Name -> KEGG.reaction dictionary + react_dict = get_reaction_annotation_dict(model,'KEGG') + # generate Name -> BiGG.reaction dictionary + react_dict_2 = get_reaction_annotation_dict(model,'BiGG') # get row in pandas format row = table.iloc[row_idx] - # case 1: BiGG name already in model = reaction in model - if not pd.isnull(row['bigg_id']) and len([x for x in react_dict.keys() if (x in row['bigg_id'].split(' '))])>0: - # get matching reaction id - react_found = [x for x in react_dict.keys() if (x in row['bigg_id'].split(' '))][0] - # add gene only - model = add_gene(model, react_found, row) - # case 2: KEGG reaction ID in model = reaction probably in model as well + # case 1: BiGG name already in model = reaction in model + if not pd.isnull(row['bigg_id']) and any((True for _ in row['bigg_id'].split(' ') if _ in react_dict_2.values())): + # get matching reaction id(s) + reac_found = [_ for _ in row['bigg_id'].split(' ') if _ in react_dict_2.values()] + # add genes to all reactions + for r in reac_found: + model = add_gene(model, r, row) + + # case 1: KEGG reaction ID in model = reaction probably in model as well elif row['KEGG.reaction'] in react_dict.values(): # get corresponding reaction - react_found = list(react_dict.keys())[list(react_dict.values()).index(row['KEGG.reaction'])] - # add gene only - model = add_gene(model, react_found, row) + react_found = [_ for _ in react_dict.keys() if row['KEGG.reaction'] == react_dict[_]] + # add gene to all reactions found + for r in react_found: + model = add_gene(model,r,row) # case 3: reaction not in model # -> add reaction(s), gene and metabolites if needed @@ -1216,7 +1145,7 @@ def extent_model(table, model,chem_prop_file,chem_xref_file,reac_prop_file,reac_ # case 3.1: one reaction react = row['KEGG.reaction'].split(' ') if len(react) == 1: - model = add_reaction(model,row,reac_xref,reac_prop,chem_xref,chem_prop,bigg_metabolites, exclude_dna, exclude_rna) + model = add_reaction(model,row,reac_xref,reac_prop,chem_xref,chem_prop,bigg_metabolites, namespace, exclude_dna, exclude_rna) # case 3.2: multiple reactions # add each reaction separatly, with the same gene for th gene reaction rule @@ -1226,10 +1155,12 @@ def extent_model(table, model,chem_prop_file,chem_xref_file,reac_prop_file,reac_ for r in react: temp_row = row.copy(deep=True) temp_row['KEGG.reaction'] = r - model = add_reaction(model,temp_row,reac_xref,reac_prop,chem_xref,chem_prop,bigg_metabolites, exclude_dna, exclude_rna) + model = add_reaction(model,temp_row,reac_xref,reac_prop,chem_xref,chem_prop,bigg_metabolites, namespace, exclude_dna, exclude_rna) return model +# main +# ---- # @TEST : fitted to refinegems # @CHECK : connections, e.g. input is now two params short @@ -1422,10 +1353,5 @@ def run(draft, gene_list, fasta, db, dir, mnx_chem_prop, mnx_chem_xref, mnx_reac # --------------------------------- if memote: - print('\n# -------------------\n# analyse with MEMOTE\n# -------------------') - start = time.time() - draft_path = F'{dir}step1-extension/{name}.xml'.replace(" ", "\ ") - memote_path = F'{dir}step1-extension/{name}.html'.replace(" ", "\ ") - subprocess.run([F'memote report snapshot --filename {memote_path} {draft_path}'], shell=True) - end = time.time() - print(F'\ttotal time: {end - start}s') + memote_path = F'{dir}step1-extension/{name}.html' + run_memote(draft, 'html', return_res=False, save_res=memote_path, verbose=True) diff --git a/src/specimen/core/refinement/smoothing.py b/src/specimen/core/refinement/smoothing.py index 922ba6a..b27de86 100644 --- a/src/specimen/core/refinement/smoothing.py +++ b/src/specimen/core/refinement/smoothing.py @@ -23,9 +23,9 @@ from cobra import Reaction from refinegems.curation.biomass import test_biomass_presence +from refinegems.analysis.investigate import run_memote # further required programs: -# - MEMOTE, tested with version 0.13.0 # - BOFdat # - MassChargeCuration @@ -422,10 +422,6 @@ def run(genome,model,dir,mcc='skip',dna_weight_frac=0.023,ion_weight_frac=0.05, # --------------------------------- if memote: - print('\n# -------------------\n# analyse with MEMOTE\n# -------------------') - start = time.time() - draft_path = outname.replace(" ", "\ ") - memote_path = F'{dir}step4-smoothing/{model_name}.html'.replace(" ", "\ ") - subprocess.run([F'memote report snapshot --filename {memote_path} {draft_path}'], shell=True) - end = time.time() - print(F'\ttotal time: {end - start}s') + memote_path = F'{dir}step4-smoothing/{model_name}.html' + run_memote(model, 'html', return_res=False, save_res=memote_path, verbose=True) +