From 510123b8ee98c2e65b5c4fbe776d16502f370241 Mon Sep 17 00:00:00 2001 From: Cloud User Date: Wed, 24 Apr 2024 05:44:26 -0400 Subject: [PATCH 1/3] fix: fixed enzyme optmization with Kcat fitness function Signed-off-by: Cloud User --- examples/enzeptional/example_enzeptional.py | 40 ++- src/gt4sd/frameworks/enzeptional/core.py | 336 +++++++++++--------- 2 files changed, 211 insertions(+), 165 deletions(-) diff --git a/examples/enzeptional/example_enzeptional.py b/examples/enzeptional/example_enzeptional.py index 0033013cf..6632aed56 100644 --- a/examples/enzeptional/example_enzeptional.py +++ b/examples/enzeptional/example_enzeptional.py @@ -5,12 +5,16 @@ from gt4sd.configuration import GT4SDConfiguration, sync_algorithm_with_s3 -def initialize_environment(): +def initialize_environment(model = "feasibility"): """Synchronize with GT4SD S3 storage and set up the environment.""" # NOTE: For those interested in optimizing kcat values, it is important to adjust the scorer path to reflect this focus, thereby selecting the appropriate model for kcat optimization: f"{configuration.gt4sd_local_cache_path}/properties/proteins/enzeptional/scorers/kcat/model.pkl". The specification of the scaler, located within the same directory as the `scorer.pkl`, is mandatory for accurate model performance. configuration = GT4SDConfiguration.get_instance() sync_algorithm_with_s3("proteins/enzeptional/scorers", module="properties") - return f"{configuration.gt4sd_local_cache_path}/properties/proteins/enzeptional/scorers/feasibility/model.pkl" + name = model.lower() + if name == "kcat": + return f"{configuration.gt4sd_local_cache_path}/properties/proteins/enzeptional/scorers/{name}/model.pkl", f"{configuration.gt4sd_local_cache_path}/properties/proteins/enzeptional/scorers/{name}/scaler.pkl" + else: + return f"{configuration.gt4sd_local_cache_path}/properties/proteins/enzeptional/scorers/{name}/model.pkl", None def load_experiment_parameters(): @@ -20,7 +24,7 @@ def load_experiment_parameters(): def setup_optimizer( - substrate_smiles, product_smiles, sample_sequence, intervals, scorer_path + substrate_smiles, product_smiles, sample_sequence, intervals, scorer_path, scaler_path, concat_order, fitness_kcat ): """Set up and return the optimizer with all necessary components configured.""" model_tokenizer_paths = "facebook/esm2_t33_650M_UR50D" @@ -52,7 +56,9 @@ def setup_optimizer( "selection_ratio": 0.25, "perform_crossover": True, "crossover_type": "single_point", - "concat_order": ["substrate", "sequence", "product"], + "concat_order": concat_order, + "scaler_filepath": scaler_path, + "fitness_kcat": fitness_kcat } return EnzymeOptimizer(**optimizer_config) @@ -64,9 +70,10 @@ def optimize_sequences(optimizer): ) -def main(): +def main_kcat(): logging.basicConfig(level=logging.INFO) - scorer_path = initialize_environment() + scorer_path, scaler_path = initialize_environment(model="kcat") + concat_order, fitness_kcat = ["substrate", "sequence"], True ( substrate_smiles, product_smiles, @@ -74,11 +81,28 @@ def main(): intervals, ) = load_experiment_parameters() optimizer = setup_optimizer( - substrate_smiles, product_smiles, sample_sequence, intervals, scorer_path + substrate_smiles, product_smiles, sample_sequence, intervals, scorer_path, scaler_path, concat_order, fitness_kcat ) optimized_sequences, iteration_info = optimize_sequences(optimizer) logging.info("Optimization completed.") +def main_feasibility(): + logging.basicConfig(level=logging.INFO) + scorer_path, scaler_path = initialize_environment() + concat_order, fitness_kcat = ["substrate", "sequence", "product"], False + ( + substrate_smiles, + product_smiles, + sample_sequence, + intervals, + ) = load_experiment_parameters() + optimizer = setup_optimizer( + substrate_smiles, product_smiles, sample_sequence, intervals, scorer_path, scaler_path, concat_order, fitness_kcat + ) + optimized_sequences, iteration_info = optimize_sequences(optimizer) + logging.info("Optimization completed.") + if __name__ == "__main__": - main() + main_feasibility() + main_kcat() diff --git a/src/gt4sd/frameworks/enzeptional/core.py b/src/gt4sd/frameworks/enzeptional/core.py index e37423194..d36ab8f85 100644 --- a/src/gt4sd/frameworks/enzeptional/core.py +++ b/src/gt4sd/frameworks/enzeptional/core.py @@ -1,7 +1,7 @@ # # MIT License # -# Copyright (c) 2024 GT4SD team +# Copyright (c) 2023 GT4SD team # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal @@ -30,6 +30,7 @@ from itertools import product as iter_product import time from joblib import load +import xgboost as xgb from .processing import ( HFandTAPEModelUtility, SelectionGenerator, @@ -38,9 +39,8 @@ sanitize_intervals_with_padding, ) - -logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) +logger.addHandler(logging.NullHandler()) class MutationModelManager: @@ -89,7 +89,8 @@ class MutationStrategy(ABC): def mutate( self, sequence: str, num_mutations: int, intervals: List[List[int]] ) -> List[str]: - """Abstract method for mutating a sequence. + """ + Abstract method for mutating a sequence. Args: sequence (str): The original sequence to be mutated. @@ -107,7 +108,8 @@ class LanguageModelMutationStrategy(MutationStrategy): """ def __init__(self, mutation_model): - """Initializes the mutation strategy with a given model. + """ + Initializes the mutation strategy with a given model. Args: mutation_model: The model to be used for mutation. @@ -116,7 +118,8 @@ def __init__(self, mutation_model): self.top_k = 2 def set_top_k(self, top_k: int): - """Sets the top k mutations to consider during mutation. + """ + Sets the top k mutations to consider during mutation. Args: top_k (int): The number of top mutations to consider. @@ -126,7 +129,8 @@ def set_top_k(self, top_k: int): def mutate( self, sequence: str, num_mutations: int, intervals: List[List[int]] ) -> List[str]: - """Mutates a sequence within specified intervals using the model. + """ + Mutates a sequence within specified intervals using the model. Args: sequence (str): The original sequence to be mutated. @@ -139,14 +143,9 @@ def mutate( """ flat_intervals = [ - i - for interval in intervals - for i in range(interval[0], interval[1] + 1) - if i < len(sequence) + i for interval in intervals for i in range(interval[0], interval[1]) ] - num_mutations = random.randint(1, num_mutations) - chosen_positions = random.sample( flat_intervals, min(num_mutations, len(flat_intervals)) ) @@ -166,20 +165,21 @@ class TransitionMatrixMutationStrategy(MutationStrategy): """ def __init__(self, transition_matrix: str): - """Initializes the mutation strategy with a transition matrix. + """ + Initializes the mutation strategy with a transition matrix. Args: transition_matrix (str): Path to the CSV file containing the transition matrix. """ - logger.info(" USING TRNASITION MATRIX ") self.transition_matrix = pd.read_csv( transition_matrix, index_col=None, header=0 ) self.top_k = 2 def set_top_k(self, top_k: int): - """Sets the top k mutations to consider during mutation. + """ + Sets the top k mutations to consider during mutation. Args: top_k (int): The number of top mutations to consider. @@ -190,7 +190,8 @@ def set_top_k(self, top_k: int): def mutate( self, sequence: str, num_mutations: int, intervals: List[List[int]] ) -> List[str]: - """Mutates a sequence based on the transition matrix within + """ + Mutates a sequence based on the transition matrix within specified intervals. Args: @@ -202,33 +203,24 @@ def mutate( Returns: List[str]: A list of mutated sequences. """ - - flat_intervals = [ - i - for interval in intervals - for i in range(interval[0], interval[1] + 1) - if i < len(sequence) - ] - - num_mutations = random.randint(1, num_mutations) - - chosen_positions = random.sample( - flat_intervals, min(num_mutations, len(flat_intervals)) - ) - mutated_sequences = [] + for interval in intervals: + start, end = interval + mutation_positions = sorted( + random.sample(range(start, end), min(num_mutations, end - start)) + ) - mutation_options = [] - for pos in chosen_positions: - aa_probabilities = self.transition_matrix.iloc[pos] - top_mutations = aa_probabilities.nlargest(self.top_k).index.tolist() - mutation_options.append([(pos, aa) for aa in top_mutations]) + mutation_options = [] + for pos in mutation_positions: + aa_probabilities = self.transition_matrix.iloc[pos] + top_mutations = aa_probabilities.nlargest(self.top_k).index.tolist() + mutation_options.append([(pos, aa) for aa in top_mutations]) - for mutation_combination in iter_product(*mutation_options): - temp_sequence = list(sequence) - for pos, new_aa in mutation_combination: - temp_sequence[pos] = new_aa - mutated_sequences.append("".join(temp_sequence)) + for mutation_combination in iter_product(*mutation_options): + temp_sequence = list(sequence) + for pos, new_aa in mutation_combination: + temp_sequence[pos] = new_aa + mutated_sequences.append("".join(temp_sequence)) return mutated_sequences @@ -240,18 +232,19 @@ class MutationFactory: @staticmethod def get_mutation_strategy(mutation_config: Dict[str, Any]): - """Retrieves a mutation strategy based on the provided configuration. + """ + Retrieves a mutation strategy based on the provided configuration. Args: mutation_config (Dict[str, Any]): Configuration specifying the type of mutation strategy and its parameters. + Returns: + An instance of the specified mutation strategy. + Raises: - KeyError: If required configuration parameters are missing. ValueError: If the mutation type is unsupported. - - Returns: - _type_: An instance of the specified mutation strategy + KeyError: If required configuration parameters are missing. """ if mutation_config["type"] == "language-modeling": mutation_model = MutationModelManager.load_model( @@ -277,7 +270,8 @@ class SequenceMutator: """ def __init__(self, sequence: str, mutation_config: Dict[str, Any]): - """Initializes the mutator with a sequence and a mutation strategy. + """ + Initializes the mutator with a sequence and a mutation strategy. Args: sequence (str): The sequence to be mutated. @@ -289,7 +283,8 @@ def __init__(self, sequence: str, mutation_config: Dict[str, Any]): self.top_k = 2 def set_top_k(self, top_k: int): - """Sets the number of top mutations to consider in the mutation strategy. + """ + Sets the number of top mutations to consider in the mutation strategy. Args: top_k (int): The number of top mutations to consider. @@ -309,13 +304,14 @@ def get_mutations( current_population: List[str], already_evaluated_sequences: List[str], ) -> List[str]: - """Generates a set of mutated sequences. + """ + Generates a set of mutated sequences. Args: num_sequences (int): Number of mutated sequences to generate. number_of_mutations (int): Number of mutations to apply to each sequence. - intervals (List[Tuple[int]]): Intervals within the sequence + intervals (List[List[int]]): Intervals within the sequence where mutations are allowed. already_evaluated_sequences (List[str]): List of sequences that have already been evaluated. @@ -335,12 +331,41 @@ def get_mutations( new_mutations = self.mutation_strategy.mutate( temp_sequence, max_mutations, intervals ) - mutated_sequences_set.extend(new_mutations) + filtered_mutations = [ + element + for element in new_mutations + if element not in already_evaluated_sequences + ] + if not filtered_mutations: + break + mutated_sequences_set.extend(filtered_mutations) + if len(mutated_sequences_set) >= num_sequences: break return random.sample(mutated_sequences_set, num_sequences) +class Scorer: + def __init__(self, scorer_filepath: str, scaler_filepath: Optional[str] = None): + '''Initialize the scorer. + Args: + scorer_filepath (str): Pickled scorer filepath. + scaler_filepath (Optional[str], optional): Pickled scaler filepath. Defaults to None. + ''' + self.scorer_filepath = scorer_filepath + self.scorer = load(scorer_filepath) + if scaler_filepath is not None: + self.scaler = load(scaler_filepath) + + def predict_proba(self, feature_vector): + return self.scorer.predict_proba(feature_vector) + + def predict(self, feature_vector): + if self.scaler is not None: + feature_vector = self.scaler.transform(feature_vector) + return self.scorer.predict(xgb.DMatrix(feature_vector)) + + class EnzymeOptimizer: """ Optimizes protein sequences based on interaction with @@ -359,7 +384,6 @@ def __init__( mutator: SequenceMutator, intervals: List[Tuple[int, int]], batch_size: int = 2, - seed: int = 123, top_k: int = 2, selection_ratio: float = 0.5, perform_crossover: bool = False, @@ -367,30 +391,39 @@ def __init__( minimum_interval_length: int = 8, pad_intervals: bool = False, concat_order=["sequence", "substrate", "product"], + scaler_filepath: Optional[str] = None, + fitness_kcat: Optional[bool] = False, ): - """Initializes the optimizer with models, sequences, and + """ + Initializes the optimizer with models, sequences, and optimization parameters. - Args: sequence (str): The initial protein sequence. - protein_model (HFandTAPEModelUtility): Model for protein embeddings. - substrate_smiles (str): SMILES representation of the substrate. - product_smiles (str): SMILES representation of the product. + protein_model (HFandTAPEModelUtility): Model for + protein embeddings. + substrate_smiles (str): SMILES representation of + the substrate. + product_smiles (str): SMILES representation of the + product. chem_model_path (str): Path to the chemical model. chem_tokenizer_path (str): Path to the chemical tokenizer. - scorer_filepath (str): Path to the scoring model. - mutator (SequenceMutator): The mutator for generating sequence variants. - intervals (List[Tuple[int, int]]): Intervals for mutation. - batch_size (int, optional): The number of sequences to process in one batch. Defaults to 2. - seed (int, optional): Random seed. Defaults to 123. - top_k (int, optional): Number of top mutations to consider. Defaults to 2. - selection_ratio (float, optional): Ratio of sequences to select after scoring. Defaults to 0.5. - perform_crossover (bool, optional): Flag to perform crossover operation. Defaults to False. - crossover_type (str, optional): Type of crossover operation. Defaults to "uniform". - minimum_interval_length (int, optional): Minimum length of mutation intervals. Defaults to 8. - pad_intervals (bool, optional): Flag to pad the intervals. Defaults to False. - concat_order (list, optional): Order of concatenating embeddings. Defaults to ["sequence", "substrate", "product"]. + scorer_filepath (str): File path to the scoring model. + mutator (SequenceMutator): The mutator for generating + sequence variants. + intervals (List[List[int]]): Intervals for mutation. + batch_size (int): The number of sequences to process in one batch. + top_k (int): Number of top mutations to consider. + selection_ratio (float): Ratio of sequences to select + after scoring. + perform_crossover (bool): Flag to perform crossover operation. + crossover_type (str): Type of crossover operation. + minimum_interval_length (int): Minimum length of + mutation intervals. + pad_intervals (bool): Flag to pad the intervals. + concat_order (list): Order of concatenating embeddings. + scaler_filepath (str): Path to the scaller in case you are usinh the Kcat model. + fitness_kcat (bool): flag to specify if the fitness function is the Kcat. """ self.sequence = sequence self.protein_model = protein_model @@ -404,18 +437,27 @@ def __init__( self.concat_order = concat_order self.minimum_interval_length = minimum_interval_length self.pad_intervals = pad_intervals - self.mutator.set_top_k(top_k) + self.mutator.set_top_k(top_k) # Set top_k for the mutation model self.concat_order = concat_order + self.scorer_filepath = scorer_filepath + self.scorer_filepath = scorer_filepath self.scorer = load(scorer_filepath) - self.seed = seed + if scaler_filepath is not None: + self.scaler = load(scaler_filepath) + self.fitness_kcat = fitness_kcat + # Initialize chem_model for SMILES embeddings self.chem_model = HFandTAPEModelUtility(chem_model_path, chem_tokenizer_path) + + # Compute embeddings for substrate and product self.substrate_embedding = self.chem_model.embed([substrate_smiles])[0] self.product_embedding = self.chem_model.embed([product_smiles])[0] + # Initialize selection and crossover generators self.selection_generator = SelectionGenerator() self.crossover_generator = CrossoverGenerator() + # Process intervals if intervals is None: self.intervals = [(0, len(sequence))] else: @@ -425,16 +467,15 @@ def __init__( self.intervals, minimum_interval_length, len(sequence) ) - random.seed(self.seed) - def optimize( self, num_iterations: int, num_sequences: int, num_mutations: int, - time_budget: Optional[int] = 360, + time_budget: Optional[int] = 50000, ): - """Runs the optimization process over a specified number + """ + Runs the optimization process over a specified number of iterations. Args: @@ -442,15 +483,15 @@ def optimize( the optimization. num_sequences (int): Number of sequences to generate per iteration. - num_mutations (int): Max number of mutations to apply. + num_mutations (int): Number of mutations to apply. time_budget (Optional[int]): Time budget for - optimizer (in seconds). Defaults to 360. + optimizer (in seconds). Returns: A tuple containing the list of all sequences and iteration information. """ - + current_population: List[str] = [] iteration_info = {} scored_original_sequence = self.score_sequence(self.sequence) @@ -467,55 +508,44 @@ def optimize( start_time = time.time() scored_sequences: List[Dict[str, Any]] = [scored_original_sequence] - - if iteration == 0: - current_population: List[str] = [self.sequence] - if len(current_population) < num_sequences: - while len(current_population) < num_sequences: - new_mutants = self.mutator.mutation_strategy.mutate( - self.sequence, num_mutations, self.intervals - ) - for mut in new_mutants: - if mut not in all_mutated_sequences: - current_population.append(mut) - else: - continue - if len(current_population) >= num_sequences: - break - - if len(current_population) >= num_sequences: - random.shuffle(current_population) - current_population = random.sample( - current_population, k=num_sequences + # Generate or use the existing population + if iteration > 0 and current_population: + all_mutated_sequences = current_population + for _ in range(0, len(all_mutated_sequences), self.batch_size): + batch_sequences = self.mutator.get_mutations( + self.batch_size, + num_mutations, + self.intervals, + current_population, + all_mutated_sequences, ) + scored_sequences.extend(self.score_sequences(batch_sequences)) + all_mutated_sequences.extend(batch_sequences) + else: + for _ in range(0, num_sequences, self.batch_size): + batch_sequences = self.mutator.get_mutations( + self.batch_size, + num_mutations, + self.intervals, + current_population, + all_mutated_sequences, + ) + if len(batch_sequences) == 1: + scored_sequences.append(self.score_sequence(batch_sequences[0])) - logger.info( - f"Number of sequences in current population: {len(current_population)}" - ) - - iteration_scored_sequences = [] - for _ in range(0, len(current_population), self.batch_size): - scored_sequences = self.score_sequences( - current_population[_ : _ + self.batch_size] - ) - all_mutated_sequences.extend( - current_population[_ : _ + self.batch_size] - ) - all_scored_sequences.extend(scored_sequences) - iteration_scored_sequences.extend(scored_sequences) + elif len(batch_sequences) > 1: + scored_sequences.extend(self.score_sequences(batch_sequences)) + else: + continue + all_mutated_sequences.extend(batch_sequences) + all_scored_sequences.extend(scored_sequences) if self.selection_ratio < 1.0: - - samples_with_higher_score = [ - i - for i in iteration_scored_sequences - if i["score"] > original_sequence_score_ - ] selected_sequences = self.selection_generator.selection( - samples_with_higher_score, self.selection_ratio + scored_sequences, self.selection_ratio ) else: - selected_sequences = iteration_scored_sequences + selected_sequences = scored_sequences offspring_sequences = [] if self.perform_crossover and len(selected_sequences) > 1: @@ -537,46 +567,31 @@ def optimize( ) offspring_sequences.extend([offspring1, offspring2]) - logger.info(f"Selected samples: {len(selected_sequences)}") - logger.info(f"Number Crossed-Over samples: {len(offspring_sequences)}") - current_population = [ seq["sequence"] for seq in selected_sequences ] + offspring_sequences - if len(current_population) < num_sequences: - while len(current_population) < num_sequences: - current_population.extend( - self.mutator.mutation_strategy.mutate( - self.sequence, num_mutations, self.intervals - ) - ) - if len(current_population) >= num_sequences: - break - - if len(current_population) >= num_sequences: - random.shuffle(current_population) - current_population = current_population[:num_sequences] - + # Update best sequences and count higher scoring sequences higher_scoring_sequences = 0 - for temp_seq in iteration_scored_sequences: + best_seq = "" + for temp_seq in scored_sequences: if temp_seq["score"] > current_best_score: current_best_score = temp_seq["score"] higher_scoring_sequences += 1 + best_seq = temp_seq["sequence"] end_time = time.time() elapsed_time = end_time - start_time iteration_info[iteration + 1] = { - "Iteration": iteration + 1, "best_score": current_best_score, "higher_scoring_sequences": higher_scoring_sequences, "elapsed_time": elapsed_time, + "best_sequence": best_seq, } logger.info( - f" Iteration {iteration + 1}: Best Score: {current_best_score}," - f" Higher Scoring Sequences: {higher_scoring_sequences}, " - f" Time: {elapsed_time} seconds," - f" Population length : {len(current_population)}" + f"Iteration {iteration + 1}: Best Score: {current_best_score}, " + f"Higher Scoring Sequences: {higher_scoring_sequences}, " + f"Time: {elapsed_time} seconds" ) if time_budget is not None and elapsed_time > time_budget: logger.warning(f"Used all the given time budget of {time_budget}s") @@ -585,22 +600,17 @@ def optimize( all_scored_sequences = sorted( all_scored_sequences, key=lambda x: x["score"], reverse=True ) - - df = pd.DataFrame(all_scored_sequences) - df = df.drop_duplicates() - - all_scored_sequences = df.to_dict(orient="records") - return all_scored_sequences, iteration_info def score_sequence(self, sequence: str) -> Dict[str, Any]: - """Scores a single protein sequence. + """ + Scores a single protein sequence. Args: sequence (str): The protein sequence to score. Returns: - Dict[str, Any]: The score of the sequence. + float: The score of the sequence. """ sequence_embedding = self.protein_model.embed([sequence])[0] embeddings = [ @@ -613,12 +623,19 @@ def score_sequence(self, sequence: str) -> Dict[str, Any]: ] combined_embedding = np.concatenate(ordered_embeddings) combined_embedding = combined_embedding.reshape(1, -1) - - score = self.scorer.predict_proba(combined_embedding)[0][1] + + if self.fitness_kcat: + if self.scaler is not None: + combined_embedding = self.scaler.transform(combined_embedding) + score = self.scorer.predict(xgb.DMatrix(combined_embedding))[0] + else: + score = self.scorer.predict_proba(combined_embedding)[0][1] + return {"sequence": sequence, "score": score} - + def score_sequences(self, sequences: List[str]) -> List[Dict[str, float]]: - """Scores a list of protein sequences. + """ + Scores a list of protein sequences. Args: sequences (List[str]): The list of protein sequences to score. @@ -639,11 +656,16 @@ def score_sequences(self, sequences: List[str]) -> List[Dict[str, float]]: ] ordered_embeddings = [ embeddings[self.concat_order.index(item)] for item in self.concat_order - ] + ] combined_embedding = np.concatenate(ordered_embeddings) combined_embedding = combined_embedding.reshape(1, -1) - score = self.scorer.predict_proba(combined_embedding)[0][1] + if self.fitness_kcat: + if self.scaler is not None: + combined_embedding = self.scaler.transform(combined_embedding) + score = self.scorer.predict(xgb.DMatrix(combined_embedding))[0] + else: + score = self.scorer.predict_proba(combined_embedding)[0][1] output.append({"sequence": sequences[position], "score": score}) return output From e73bac875e77f7dc89e284bf27d05e32f5a0cab6 Mon Sep 17 00:00:00 2001 From: yvesnana Date: Wed, 24 Apr 2024 06:56:45 -0400 Subject: [PATCH 2/3] fix: fixed enzyme optmization with Kcat fitness function Signed-off-by: yvesnana --- examples/enzeptional/example_enzeptional.py | 63 +++++++++++++++++---- src/gt4sd/frameworks/enzeptional/core.py | 10 ++-- 2 files changed, 56 insertions(+), 17 deletions(-) diff --git a/examples/enzeptional/example_enzeptional.py b/examples/enzeptional/example_enzeptional.py index 6632aed56..cfb8331ff 100644 --- a/examples/enzeptional/example_enzeptional.py +++ b/examples/enzeptional/example_enzeptional.py @@ -1,13 +1,20 @@ import logging import pandas as pd +from typing import Tuple, List, Optional from gt4sd.frameworks.enzeptional.processing import HFandTAPEModelUtility from gt4sd.frameworks.enzeptional.core import SequenceMutator, EnzymeOptimizer from gt4sd.configuration import GT4SDConfiguration, sync_algorithm_with_s3 -def initialize_environment(model = "feasibility"): - """Synchronize with GT4SD S3 storage and set up the environment.""" - # NOTE: For those interested in optimizing kcat values, it is important to adjust the scorer path to reflect this focus, thereby selecting the appropriate model for kcat optimization: f"{configuration.gt4sd_local_cache_path}/properties/proteins/enzeptional/scorers/kcat/model.pkl". The specification of the scaler, located within the same directory as the `scorer.pkl`, is mandatory for accurate model performance. +def initialize_environment(model = "feasibility") -> Tuple[str, Optional[str]]: + """Synchronize with GT4SD S3 storage and set up the environment. + + Args: + model (str): Type of optimization ("feasibility" or "kcat"). + + Returns: + Tuple[str, Optional[str]]: The path to the scorer file and scaler file (if existing). + """ configuration = GT4SDConfiguration.get_instance() sync_algorithm_with_s3("proteins/enzeptional/scorers", module="properties") name = model.lower() @@ -17,16 +24,39 @@ def initialize_environment(model = "feasibility"): return f"{configuration.gt4sd_local_cache_path}/properties/proteins/enzeptional/scorers/{name}/model.pkl", None -def load_experiment_parameters(): +def load_experiment_parameters() -> Tuple[List, List, List, List]: """Load experiment parameters from a CSV file.""" df = pd.read_csv("data.csv").iloc[1] return df["substrates"], df["products"], df["sequences"], eval(df["intervals"]) def setup_optimizer( - substrate_smiles, product_smiles, sample_sequence, intervals, scorer_path, scaler_path, concat_order, fitness_kcat + substrate_smiles: str, + product_smiles: str, + sample_sequence: str, + intervals: List[List[int]], + scorer_path: str, + scaler_path: str, + concat_order: List[str], + use_xgboost_scorer: bool ): - """Set up and return the optimizer with all necessary components configured.""" + """Set up and return the optimizer with all necessary components configured + + Args: + substrate_smiles (str): SMILES representation of + the substrate. + product_smiles (str): SMILES representation of the + product. + sample_sequence (str): The initial protein sequence. + intervals (List[List[int]]): Intervals for mutation. + scorer_path (str): File path to the scoring model. + scaler_path (str): Path to the scaller in case you are usinh the Kcat model. + concat_order (List[str]): Order of concatenating embeddings. + use_xgboost_scorer (bool): flag to specify if the fitness function is the Kcat. + + Returns: + Initialized optmizer + """ model_tokenizer_paths = "facebook/esm2_t33_650M_UR50D" chem_paths = "seyonec/ChemBERTa-zinc-base-v1" @@ -58,22 +88,30 @@ def setup_optimizer( "crossover_type": "single_point", "concat_order": concat_order, "scaler_filepath": scaler_path, - "fitness_kcat": fitness_kcat + "use_xgboost_scorer": use_xgboost_scorer } return EnzymeOptimizer(**optimizer_config) def optimize_sequences(optimizer): - """Optimize sequences using the configured optimizer.""" + """Optimize sequences using the configured optimizer. + + Args: + optimizer: Initialized optimizer + + Returns: + Optimized sequences + """ return optimizer.optimize( num_iterations=3, num_sequences=5, num_mutations=5, time_budget=3600 ) def main_kcat(): + """Optimization using Kcat model""" logging.basicConfig(level=logging.INFO) scorer_path, scaler_path = initialize_environment(model="kcat") - concat_order, fitness_kcat = ["substrate", "sequence"], True + concat_order, use_xgboost_scorer = ["substrate", "sequence"], True ( substrate_smiles, product_smiles, @@ -81,16 +119,17 @@ def main_kcat(): intervals, ) = load_experiment_parameters() optimizer = setup_optimizer( - substrate_smiles, product_smiles, sample_sequence, intervals, scorer_path, scaler_path, concat_order, fitness_kcat + substrate_smiles, product_smiles, sample_sequence, intervals, scorer_path, scaler_path, concat_order, use_xgboost_scorer ) optimized_sequences, iteration_info = optimize_sequences(optimizer) logging.info("Optimization completed.") def main_feasibility(): + """Optimization using Feasibility model""" logging.basicConfig(level=logging.INFO) scorer_path, scaler_path = initialize_environment() - concat_order, fitness_kcat = ["substrate", "sequence", "product"], False + concat_order, use_xgboost_scorer = ["substrate", "sequence", "product"], False ( substrate_smiles, product_smiles, @@ -98,7 +137,7 @@ def main_feasibility(): intervals, ) = load_experiment_parameters() optimizer = setup_optimizer( - substrate_smiles, product_smiles, sample_sequence, intervals, scorer_path, scaler_path, concat_order, fitness_kcat + substrate_smiles, product_smiles, sample_sequence, intervals, scorer_path, scaler_path, concat_order, use_xgboost_scorer ) optimized_sequences, iteration_info = optimize_sequences(optimizer) logging.info("Optimization completed.") diff --git a/src/gt4sd/frameworks/enzeptional/core.py b/src/gt4sd/frameworks/enzeptional/core.py index d36ab8f85..9e528e7bb 100644 --- a/src/gt4sd/frameworks/enzeptional/core.py +++ b/src/gt4sd/frameworks/enzeptional/core.py @@ -392,7 +392,7 @@ def __init__( pad_intervals: bool = False, concat_order=["sequence", "substrate", "product"], scaler_filepath: Optional[str] = None, - fitness_kcat: Optional[bool] = False, + use_xgboost_scorer: Optional[bool] = False, ): """ Initializes the optimizer with models, sequences, and @@ -423,7 +423,7 @@ def __init__( pad_intervals (bool): Flag to pad the intervals. concat_order (list): Order of concatenating embeddings. scaler_filepath (str): Path to the scaller in case you are usinh the Kcat model. - fitness_kcat (bool): flag to specify if the fitness function is the Kcat. + use_xgboost_scorer (bool): flag to specify if the fitness function is the Kcat. """ self.sequence = sequence self.protein_model = protein_model @@ -444,7 +444,7 @@ def __init__( self.scorer = load(scorer_filepath) if scaler_filepath is not None: self.scaler = load(scaler_filepath) - self.fitness_kcat = fitness_kcat + self.use_xgboost_scorer = use_xgboost_scorer # Initialize chem_model for SMILES embeddings self.chem_model = HFandTAPEModelUtility(chem_model_path, chem_tokenizer_path) @@ -624,7 +624,7 @@ def score_sequence(self, sequence: str) -> Dict[str, Any]: combined_embedding = np.concatenate(ordered_embeddings) combined_embedding = combined_embedding.reshape(1, -1) - if self.fitness_kcat: + if self.use_xgboost_scorer: if self.scaler is not None: combined_embedding = self.scaler.transform(combined_embedding) score = self.scorer.predict(xgb.DMatrix(combined_embedding))[0] @@ -660,7 +660,7 @@ def score_sequences(self, sequences: List[str]) -> List[Dict[str, float]]: combined_embedding = np.concatenate(ordered_embeddings) combined_embedding = combined_embedding.reshape(1, -1) - if self.fitness_kcat: + if self.use_xgboost_scorer: if self.scaler is not None: combined_embedding = self.scaler.transform(combined_embedding) score = self.scorer.predict(xgb.DMatrix(combined_embedding))[0] From 4d4c6fcc1f1b451e1a1e889682be6b08857358dc Mon Sep 17 00:00:00 2001 From: yvesnana Date: Wed, 24 Apr 2024 07:04:14 -0400 Subject: [PATCH 3/3] fix: fixed enzyme optmization with Kcat fitness function Signed-off-by: yvesnana --- src/gt4sd/frameworks/enzeptional/core.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/gt4sd/frameworks/enzeptional/core.py b/src/gt4sd/frameworks/enzeptional/core.py index 9e528e7bb..a6be1710e 100644 --- a/src/gt4sd/frameworks/enzeptional/core.py +++ b/src/gt4sd/frameworks/enzeptional/core.py @@ -347,11 +347,11 @@ def get_mutations( class Scorer: def __init__(self, scorer_filepath: str, scaler_filepath: Optional[str] = None): - '''Initialize the scorer. + """Initialize the scorer. Args: scorer_filepath (str): Pickled scorer filepath. - scaler_filepath (Optional[str], optional): Pickled scaler filepath. Defaults to None. - ''' + scaler_filepath (Optional[str], optional): Pickled scaler filepath. Defaults to None. + """ self.scorer_filepath = scorer_filepath self.scorer = load(scorer_filepath) if scaler_filepath is not None: @@ -359,12 +359,12 @@ def __init__(self, scorer_filepath: str, scaler_filepath: Optional[str] = None): def predict_proba(self, feature_vector): return self.scorer.predict_proba(feature_vector) - + def predict(self, feature_vector): if self.scaler is not None: feature_vector = self.scaler.transform(feature_vector) return self.scorer.predict(xgb.DMatrix(feature_vector)) - + class EnzymeOptimizer: """ @@ -623,16 +623,16 @@ def score_sequence(self, sequence: str) -> Dict[str, Any]: ] combined_embedding = np.concatenate(ordered_embeddings) combined_embedding = combined_embedding.reshape(1, -1) - + if self.use_xgboost_scorer: if self.scaler is not None: combined_embedding = self.scaler.transform(combined_embedding) score = self.scorer.predict(xgb.DMatrix(combined_embedding))[0] else: score = self.scorer.predict_proba(combined_embedding)[0][1] - + return {"sequence": sequence, "score": score} - + def score_sequences(self, sequences: List[str]) -> List[Dict[str, float]]: """ Scores a list of protein sequences. @@ -656,7 +656,7 @@ def score_sequences(self, sequences: List[str]) -> List[Dict[str, float]]: ] ordered_embeddings = [ embeddings[self.concat_order.index(item)] for item in self.concat_order - ] + ] combined_embedding = np.concatenate(ordered_embeddings) combined_embedding = combined_embedding.reshape(1, -1)