diff --git a/examples/generation_example.py b/examples/generation_example.py new file mode 100644 index 0000000..2931d96 --- /dev/null +++ b/examples/generation_example.py @@ -0,0 +1,100 @@ +import os +import numpy as np +import torch +import sys + +sys.path.insert(0, os.path.join(os.path.abspath(os.pardir), "src")) +from molearn.models.foldingnet import AutoEncoder +from molearn.analysis import MolearnAnalysis +from molearn.data import PDBData + + +def main(): + # Note: running the code below within a function is necessary to ensure that + # multiprocessing (used to calculate DOPE and Ramachandran) runs correctly + + print("> Loading network parameters...") + + fname = f"xbb_foldingnet_checkpoints{os.sep}checkpoint_epoch208_loss-4.205589803059896.ckpt" + # if GPU is available we will use the GPU else the CPU + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + checkpoint = torch.load(fname, map_location=device) + net = AutoEncoder(**checkpoint["network_kwargs"]) + net.load_state_dict(checkpoint["model_state_dict"]) + + # the network is currently on CPU. If GPU is available, move it there + if torch.cuda.is_available(): + net.to(device) + + print("> Loading training data...") + + MA = MolearnAnalysis() + MA.set_network(net) + + # increasing the batch size makes encoding/decoding operations faster, + # but more memory demanding + MA.batch_size = 4 + + # increasing processes makes DOPE and Ramachandran scores calculations faster, + # but more more memory demanding + MA.processes = 2 + + # what follows is a method to re-create the training and test set + # by defining the manual see and loading the dataset in the same order as when + # the neural network was trained, the same train-test split will be obtained + data = PDBData() + data.import_pdb( + "./clustered/MurD_open_selection_CLUSTER_aggl_train.dcd", + "./clustered/MurD_open_selection_NEW_TOPO.pdb", + ) + data.fix_terminal() + data.atomselect(atoms=["CA", "C", "N", "CB", "O"]) + data.prepare_dataset() + data_train, data_test = data.split(manual_seed=25) + + # store the training and test set in the MolearnAnalysis instance + # the second parameter of the following commands can be both a PDBData instance + # or a path to a multi-PDB file + MA.set_dataset("training", data_train) + MA.set_dataset("test", data_test) + + print("> generating error landscape") + # build a 50x50 grid. By default, it will be 10% larger than the region occupied + # by all loaded datasets + grid_side_len = 50 + MA.setup_grid(grid_side_len, padding=1.0) + landscape_err_latent, landscape_err_3d, xaxis, yaxis = MA.scan_error() + + # argsort all errors as a 1D array + sort_by_err = landscape_err_latent.ravel().argsort() + + # number of structures to generate + n_structs = 10 + # only generate structures that have a low enough error - error threshold + err_th = 1.5 + # boolean array which latent coords from the 1D array have a low enough error + err_not_reached = landscape_err_latent.ravel()[sort_by_err] < err_th + # get the latent coords with the lowest errors + coords_oi = np.asarray( + [ + [xaxis[i % grid_side_len], yaxis[i // grid_side_len]] + for i in sort_by_err[:n_structs] + ] + ) + + # still mask them to be below the error threshold + coords_oi = coords_oi[err_not_reached[:n_structs]].reshape(1, -1, 2) + assert ( + coords_oi.shape[1] > 0 + ), "No latent coords available, try raising your error threshold (err_th)" + + # generated structures will be created in ./newly_generated_structs + if not os.path.isdir("newly_generated_structs"): + os.mkdir("newly_generated_structs") + # use relax=True to also relax the generated structures + # !!! relax=True will only work when trained on all atoms !!! + MA.generate(coords_oi, "newly_generated_structs") + + +if __name__ == "__main__": + main() diff --git a/src/molearn/analysis/analyser.py b/src/molearn/analysis/analyser.py index 39e1a0c..4cdeed0 100644 --- a/src/molearn/analysis/analyser.py +++ b/src/molearn/analysis/analyser.py @@ -11,40 +11,60 @@ # # Authors: Matteo Degiacomi, Samuel Musson +from __future__ import annotations +import os from copy import deepcopy import numpy as np import torch.optim + try: - from modeller import * + # from modeller import * + from modeller.selection import Selection + from modeller.environ import Environ from modeller.scripts import complete_pdb except Exception as e: - print('Error importing modeller: ') + print("Error importing modeller: ") print(e) - + try: from ..scoring import Parallel_DOPE_Score except ImportError as e: - print('Import Error captured while trying to import Parallel_DOPE_Score, it is likely that you dont have Modeller installed') + print( + "Import Error captured while trying to import Parallel_DOPE_Score, it is likely that you dont have Modeller installed" + ) print(e) try: from ..scoring import Parallel_Ramachandran_Score except ImportError as e: - print('Import Error captured while trying to import Parallel_Ramachandran_Score, it is likely that you dont have cctbx/iotbx installed') + print( + "Import Error captured while trying to import Parallel_Ramachandran_Score, it is likely that you dont have cctbx/iotbx installed" + ) print(e) from ..data import PDBData from ..utils import as_numpy from tqdm import tqdm import warnings + +from openmm.app.modeller import Modeller +from openmm.app.forcefield import ForceField +from openmm.app.pdbfile import PDBFile + +# from openmm.app import PME +from openmm.app import NoCutoff +from openmm.openmm import VerletIntegrator +from openmm.app.simulation import Simulation +from openmm.unit import picoseconds + warnings.filterwarnings("ignore") class MolearnAnalysis: - ''' + """ This class provides methods dedicated to the quality analysis of a trained model. - ''' - + """ + def __init__(self): self._datasets = {} self._encoded = {} @@ -54,26 +74,26 @@ def __init__(self): self.processes = 1 def set_network(self, network): - ''' + """ :param network: a trained neural network defined in :func:`molearn.models ` - ''' + """ self.network = network self.network.eval() self.device = next(network.parameters()).device def get_dataset(self, key): - ''' + """ :param str key: key pointing to a dataset previously loaded with :func:`set_dataset ` - ''' + """ return self._datasets[key] def set_dataset(self, key, data, atomselect="*"): - ''' + """ :param data: :func:`PDBData ` object containing atomic coordinates :param str key: label to be associated with data :param list/str atomselect: list of atom names to load, or '*' to indicate that all atoms are loaded. - ''' - if isinstance(data, str) and data.endswith('.pdb'): + """ + if isinstance(data, str) and data.endswith(".pdb"): d = PDBData() d.import_pdb(data) d.atomselect(atomselect) @@ -82,82 +102,95 @@ def set_dataset(self, key, data, atomselect="*"): elif isinstance(data, PDBData): _data = data else: - raise NotImplementedError('data should be an PDBData instance') + raise NotImplementedError("data should be an PDBData instance") for _key, dataset in self._datasets.items(): - assert dataset.shape[2]== _data.dataset.shape[2] and dataset.shape[1]==_data.dataset.shape[1], f'number of d.o.f differes: {key} has shape {_data.shape} while {_key} has shape {dataset.shape}' + assert ( + dataset.shape[2] == _data.dataset.shape[2] + and dataset.shape[1] == _data.dataset.shape[1] + ), f"number of d.o.f differes: {key} has shape {_data.shape} while {_key} has shape {dataset.shape}" self._datasets[key] = _data.dataset.float() - if not hasattr(self, 'meanval'): + if not hasattr(self, "meanval"): self.meanval = _data.mean - if not hasattr(self, 'stdval'): + if not hasattr(self, "stdval"): self.stdval = _data.std - if not hasattr(self, 'atoms'): + if not hasattr(self, "atoms"): self.atoms = _data.atoms - if not hasattr(self, 'mol'): + if not hasattr(self, "mol"): self.mol = _data.frame() - if not hasattr(self, 'shape'): + if not hasattr(self, "shape"): self.shape = (_data.dataset.shape[1], _data.dataset.shape[2]) def get_encoded(self, key): - ''' + """ :param str key: key pointing to a dataset previously loaded with :func:`set_dataset ` :return: array containing the encoding in latent space of dataset associated with key - ''' + """ if key not in self._encoded: - assert key in self._datasets, f'key {key} does not exist in internal _datasets or in _latent_coords, add it with MolearnAnalysis.set_latent_coords(key, torch.Tensor) '\ - 'or add the corresponding dataset with MolearnAnalysis.set_dataset(name, PDBDataset)' + assert key in self._datasets, ( + f"key {key} does not exist in internal _datasets or in _latent_coords, add it with MolearnAnalysis.set_latent_coords(key, torch.Tensor) " + "or add the corresponding dataset with MolearnAnalysis.set_dataset(name, PDBDataset)" + ) with torch.no_grad(): dataset = self.get_dataset(key) batch_size = self.batch_size encoded = None - for i in tqdm(range(0, dataset.shape[0], batch_size), desc=f'encoding {key}'): - z = self.network.encode(dataset[i:i+batch_size].to(self.device)).cpu() + for i in tqdm( + range(0, dataset.shape[0], batch_size), desc=f"encoding {key}" + ): + z = self.network.encode( + dataset[i : i + batch_size].to(self.device) + ).cpu() if encoded is None: encoded = torch.empty(dataset.shape[0], z.shape[1], z.shape[2]) - encoded[i:i+batch_size] = z + encoded[i : i + batch_size] = z self._encoded[key] = encoded - + return self._encoded[key] def set_encoded(self, key, coords): - ''' + """ :param str key: key pointing to a dataset previously loaded with :func:`set_dataset ` - ''' + """ self._encoded[key] = torch.tensor(coords).float() def get_decoded(self, key): - ''' + """ :param str key: key pointing to a dataset previously loaded with :func:`set_dataset ` - ''' + """ if key not in self._decoded: with torch.no_grad(): batch_size = self.batch_size encoded = self.get_encoded(key) decoded = torch.empty(encoded.shape[0], *self.shape).float() - for i in tqdm(range(0, encoded.shape[0], batch_size), desc=f'Decoding {key}'): - decoded[i:i+batch_size] = self.network.decode(encoded[i:i+batch_size].to(self.device))[:, :, :self.shape[1]].cpu() + for i in tqdm( + range(0, encoded.shape[0], batch_size), desc=f"Decoding {key}" + ): + decoded[i : i + batch_size] = self.network.decode( + encoded[i : i + batch_size].to(self.device) + )[:, :, : self.shape[1]].cpu() self._decoded[key] = decoded return self._decoded[key] def set_decoded(self, key, structures): - ''' + """ :param str key: key pointing to a dataset previously loaded with :func:`set_dataset ` - ''' + """ self._decoded[key] = structures def num_trainable_params(self): - ''' + """ :return: number of trainable parameters in the neural network previously loaded with :func:`set_dataset ` - ''' + """ return sum(p.numel() for p in self.network.parameters() if p.requires_grad) def get_error(self, key, align=True): - ''' + """ Calculate the reconstruction error of a dataset encoded and decoded by a trained neural network. - + :param str key: key pointing to a dataset previously loaded with :func:`set_dataset ` :param bool align: if True, the RMSD will be calculated by finding the optimal alignment between structures :return: 1D array containing the RMSD between input structures and their encoded-decoded counterparts - ''' + """ dataset = self.get_dataset(key) z = self.get_encoded(key) decoded = self.get_decoded(key) @@ -165,8 +198,15 @@ def get_error(self, key, align=True): err = [] m = deepcopy(self.mol) for i in range(dataset.shape[0]): - crd_ref = as_numpy(dataset[i].permute(1,0).unsqueeze(0))*self.stdval + self.meanval - crd_mdl = as_numpy(decoded[i].permute(1,0).unsqueeze(0))[:, :dataset.shape[2]]*self.stdval + self.meanval # clip the padding of models + crd_ref = ( + as_numpy(dataset[i].permute(1, 0).unsqueeze(0)) * self.stdval + + self.meanval + ) + crd_mdl = ( + as_numpy(decoded[i].permute(1, 0).unsqueeze(0))[:, : dataset.shape[2]] + * self.stdval + + self.meanval + ) # clip the padding of models # use Molecule Biobox class to calculate RMSD if align: m.coordinates = deepcopy(crd_ref) @@ -174,81 +214,95 @@ def get_error(self, key, align=True): m.add_xyz(crd_mdl[0]) rmsd = m.rmsd(0, 1) else: - rmsd = np.sqrt(np.sum((crd_ref.flatten()-crd_mdl.flatten())**2)/crd_mdl.shape[1]) # Cartesian L2 norm + rmsd = np.sqrt( + np.sum((crd_ref.flatten() - crd_mdl.flatten()) ** 2) + / crd_mdl.shape[1] + ) # Cartesian L2 norm err.append(rmsd) return np.array(err) def get_dope(self, key, refine=True, **kwargs): - ''' + """ :param str key: key pointing to a dataset previously loaded with :func:`set_dataset ` :param bool refine: if True, refine structures before calculating DOPE score :return: dictionary containing DOPE score of dataset, and its decoded counterpart - ''' + """ dataset = self.get_dataset(key) decoded = self.get_decoded(key) - + dope_dataset = self.get_all_dope_score(dataset, refine=refine, **kwargs) dope_decoded = self.get_all_dope_score(decoded, refine=refine, **kwargs) - return dict(dataset_dope=dope_dataset, - decoded_dope=dope_decoded) + return dict(dataset_dope=dope_dataset, decoded_dope=dope_decoded) def get_ramachandran(self, key): - ''' + """ :param str key: key pointing to a dataset previously loaded with :func:`set_dataset ` - ''' - + """ + dataset = self.get_dataset(key) decoded = self.get_decoded(key) - ramachandran = {f'dataset_{key}':value for key, value in self.get_all_ramachandran_score(dataset).items()} - ramachandran.update({f'decoded_{key}':value for key, value in self.get_all_ramachandran_score(decoded).items()}) + ramachandran = { + f"dataset_{key}": value + for key, value in self.get_all_ramachandran_score(dataset).items() + } + ramachandran.update( + { + f"decoded_{key}": value + for key, value in self.get_all_ramachandran_score(decoded).items() + } + ) return ramachandran def setup_grid(self, samples=64, bounds_from=None, bounds=None, padding=0.1): - ''' + """ Define a NxN point grid regularly sampling the latent space. - + :param int samples: grid size (build a samples x samples grid) :param str/list bounds_from: Name(s) of datasets to use as reference, either as single string, a list of strings, or 'all' :param tuple/list bounds: tuple (xmin, xmax, ymin, ymax) or None :param float padding: define size of extra spacing around boundary conditions (as ratio of axis dimensions) - ''' - - key = 'grid' + """ + + key = "grid" if bounds is None: if bounds_from is None: bounds_from = "all" - + bounds = self._get_bounds(bounds_from, exclude=key) - - bx = (bounds[1]-bounds[0])*padding - by = (bounds[3]-bounds[2])*padding - self.xvals = np.linspace(bounds[0]-bx, bounds[1]+bx, samples) - self.yvals = np.linspace(bounds[2]-by, bounds[3]+by, samples) + + bx = (bounds[1] - bounds[0]) * padding + by = (bounds[3] - bounds[2]) * padding + self.xvals = np.linspace(bounds[0] - bx, bounds[1] + bx, samples) + self.yvals = np.linspace(bounds[2] - by, bounds[3] + by, samples) self.n_samples = samples meshgrid = np.meshgrid(self.xvals, self.yvals) stack = np.stack(meshgrid, axis=2).reshape(-1, 1, 2) self.set_encoded(key, stack) - + return key - def _get_bounds(self, bounds_from, exclude=['grid', 'grid_decoded']): - ''' + def _get_bounds(self, bounds_from, exclude=["grid", "grid_decoded"]): + """ :param bounds_from: keys of datasets to be considered for identification of boundaries in latent space :param exclude: keys of dataset not to consider :return: four scalars as edges of x and y axis: xmin, xmax, ymin, ymax - ''' + """ if isinstance(exclude, str): - exclude = [exclude,] - - if bounds_from == 'all': + exclude = [ + exclude, + ] + + if bounds_from == "all": bounds_from = [key for key in self._datasets.keys() if key not in exclude] elif isinstance(bounds_from, str): - bounds_from = [bounds_from,] - + bounds_from = [ + bounds_from, + ] + xmin, ymin, xmax, ymax = [], [], [], [] for key in bounds_from: z = self.get_encoded(key) @@ -256,86 +310,116 @@ def _get_bounds(self, bounds_from, exclude=['grid', 'grid_decoded']): ymin.append(z[:, 1].min()) xmax.append(z[:, 0].max()) ymax.append(z[:, 1].max()) - + xmin, ymin = min(xmin), min(ymin) xmax, ymax = max(xmax), max(ymax) return xmin, xmax, ymin, ymax def scan_error_from_target(self, key, index=None, align=True): - ''' - Calculate landscape of RMSD vs single target structure. Target should be previously loaded datset containing a single conformation. - + """ + Calculate landscape of RMSD vs single target structure. Target should be previously loaded datset containing a single conformation. + :param str key: key pointing to a dataset previously loaded with :func:`set_dataset ` :param int index: index of conformation to be selected from dataset containing multiple conformations. :param bool align: if True, structures generated from the grid are aligned to target prior RMSD calculation. :return: RMSD latent space NxN surface :return: x-axis values :return: y-axis values - ''' - s_key = f'RMSD_from_{key}' if index is None else f'RMSD_from_{key}_index_{index}' + """ + s_key = ( + f"RMSD_from_{key}" if index is None else f"RMSD_from_{key}_index_{index}" + ) if s_key not in self.surfaces: - assert 'grid' in self._encoded, 'make sure to call MolearnAnalysis.setup_grid first' - target = self.get_dataset(key) if index is None else self.get_dataset(key)[index].unsqueeze(0) + assert ( + "grid" in self._encoded + ), "make sure to call MolearnAnalysis.setup_grid first" + target = ( + self.get_dataset(key) + if index is None + else self.get_dataset(key)[index].unsqueeze(0) + ) if target.shape[0] != 1: - msg = f'dataset {key} shape is {target.shape}. \ + msg = f"dataset {key} shape is {target.shape}. \ A dataset with a single conformation is expected.\ Either pass a key that points to a single structure or pass the index of the \ -structure you want, e.g., analyser.scan_error_from_target(key, index=0)' +structure you want, e.g., analyser.scan_error_from_target(key, index=0)" raise Exception(msg) - - decoded = self.get_decoded('grid') + + decoded = self.get_decoded("grid") if align: - crd_ref = as_numpy(target.permute(0, 2, 1))*self.stdval - crd_mdl = as_numpy(decoded.permute(0, 2, 1))*self.stdval + crd_ref = as_numpy(target.permute(0, 2, 1)) * self.stdval + crd_mdl = as_numpy(decoded.permute(0, 2, 1)) * self.stdval m = deepcopy(self.mol) m.coordinates = np.concatenate([crd_ref, crd_mdl]) m.set_current(0) rmsd = np.array([m.rmsd(0, i) for i in range(1, len(m.coordinates))]) else: - rmsd = (((decoded-target)*self.stdval)**2).sum(axis=1).mean(axis=-1).sqrt() - self.surfaces[s_key] = as_numpy(rmsd.reshape(self.n_samples, self.n_samples)) - + rmsd = ( + (((decoded - target) * self.stdval) ** 2) + .sum(axis=1) + .mean(axis=-1) + .sqrt() + ) + self.surfaces[s_key] = as_numpy( + rmsd.reshape(self.n_samples, self.n_samples) + ) + return self.surfaces[s_key], self.xvals, self.yvals - def scan_error(self, s_key='Network_RMSD', z_key='Network_z_drift'): - ''' + def scan_error(self, s_key="Network_RMSD", z_key="Network_z_drift"): + """ Calculate RMSD and z-drift on a grid sampling the latent space. Requires a grid system to be defined via a prior call to :func:`set_dataset `. - + :param str s_key: label for RMSD dataset :param str z_key: label for z-drift dataset :return: input-to-decoded RMSD latent space NxN surface :return: z-drift latent space NxN surface :return: x-axis values :return: y-axis values - ''' - s_key = 'Network_RMSD' - z_key = 'Network_z_drift' + """ + s_key = "Network_RMSD" + z_key = "Network_z_drift" if s_key not in self.surfaces: - assert 'grid' in self._encoded, 'make sure to call MolearnAnalysis.setup_grid first' - decoded = self.get_decoded('grid') # decode grid + assert ( + "grid" in self._encoded + ), "make sure to call MolearnAnalysis.setup_grid first" + decoded = self.get_decoded("grid") # decode grid # self.set_dataset('grid_decoded', decoded) # add back as dataset w. different name - self._datasets['grid_decoded'] = decoded - decoded_2 = self.get_decoded('grid_decoded') # encode, and decode a second time - grid = self.get_encoded('grid') # retrieve original grid - grid_2 = self.get_encoded('grid_decoded') # retrieve decoded encoded grid - - rmsd = (((decoded-decoded_2)*self.stdval)**2).sum(axis=1).mean(axis=-1).sqrt() - z_drift = ((grid-grid_2)**2).mean(axis=2).mean(axis=1).sqrt() + self._datasets["grid_decoded"] = decoded + decoded_2 = self.get_decoded( + "grid_decoded" + ) # encode, and decode a second time + grid = self.get_encoded("grid") # retrieve original grid + grid_2 = self.get_encoded("grid_decoded") # retrieve decoded encoded grid + + rmsd = ( + (((decoded - decoded_2) * self.stdval) ** 2) + .sum(axis=1) + .mean(axis=-1) + .sqrt() + ) + z_drift = ((grid - grid_2) ** 2).mean(axis=2).mean(axis=1).sqrt() self.surfaces[s_key] = rmsd.reshape(self.n_samples, self.n_samples).numpy() - self.surfaces[z_key] = z_drift.reshape(self.n_samples, self.n_samples).numpy() - + self.surfaces[z_key] = z_drift.reshape( + self.n_samples, self.n_samples + ).numpy() + return self.surfaces[s_key], self.surfaces[z_key], self.xvals, self.yvals def _ramachandran_score(self, frame): - ''' + """ returns multiprocessing AsyncResult AsyncResult.get() will return the result - ''' - if not hasattr(self, 'ramachandran_score_class'): - self.ramachandran_score_class = Parallel_Ramachandran_Score(self.mol, self.processes) - assert len(frame.shape) == 2, f'We wanted 2D data but got {len(frame.shape)} dimensions' + """ + if not hasattr(self, "ramachandran_score_class"): + self.ramachandran_score_class = Parallel_Ramachandran_Score( + self.mol, self.processes + ) + assert ( + len(frame.shape) == 2 + ), f"We wanted 2D data but got {len(frame.shape)} dimensions" if frame.shape[0] == 3: f = frame.permute(1, 0) else: @@ -343,109 +427,117 @@ def _ramachandran_score(self, frame): f = frame if isinstance(f, torch.Tensor): f = f.data.cpu().numpy() - - return self.ramachandran_score_class.get_score(f*self.stdval) + + return self.ramachandran_score_class.get_score(f * self.stdval) # nf, na, no, nt = self.ramachandran_score_class.get_score(f*self.stdval) # return {'favored':nf, 'allowed':na, 'outliers':no, 'total':nt} def _dope_score(self, frame, refine=True, **kwargs): - ''' + """ returns multiprocessing AsyncResult AsyncResult.get() will return the result - ''' - if not hasattr(self, 'dope_score_class'): + """ + if not hasattr(self, "dope_score_class"): self.dope_score_class = Parallel_DOPE_Score(self.mol, self.processes) - assert len(frame.shape) == 2, f'We wanted 2D data but got {len(frame.shape)} dimensions' + assert ( + len(frame.shape) == 2 + ), f"We wanted 2D data but got {len(frame.shape)} dimensions" if frame.shape[0] == 3: f = frame.permute(1, 0) else: assert frame.shape[1] == 3 f = frame - if isinstance(f,torch.Tensor): + if isinstance(f, torch.Tensor): f = f.data.cpu().numpy() - return self.dope_score_class.get_score(f*self.stdval, refine=refine, **kwargs) + return self.dope_score_class.get_score(f * self.stdval, refine=refine, **kwargs) def get_all_ramachandran_score(self, tensor): - ''' + """ Calculate Ramachandran score of an ensemble of atomic conrdinates. - + :param tensor: - ''' + """ rama = dict(favored=[], allowed=[], outliers=[], total=[]) results = [] for f in tensor: results.append(self._ramachandran_score(f)) - for r in tqdm(results,desc='Calc rama'): + for r in tqdm(results, desc="Calc rama"): favored, allowed, outliers, total = r.get() - rama['favored'].append(favored) - rama['allowed'].append(allowed) - rama['outliers'].append(outliers) - rama['total'].append(total) - return {key:np.array(value) for key, value in rama.items()} + rama["favored"].append(favored) + rama["allowed"].append(allowed) + rama["outliers"].append(outliers) + rama["total"].append(total) + return {key: np.array(value) for key, value in rama.items()} def get_all_dope_score(self, tensor, refine=True): - ''' + """ Calculate DOPE score of an ensemble of atom coordinates. :param tensor: :param bool refine: if True, return DOPE score of input and output structure after refinement - ''' + """ results = [] for f in tensor: results.append(self._dope_score(f, refine=refine)) - results = np.array([r.get() for r in tqdm(results, desc='Calc Dope')]) + results = np.array([r.get() for r in tqdm(results, desc="Calc Dope")]) return results def reference_dope_score(self, frame): - ''' + """ :param numpy.array frame: array with shape [1, N, 3] with Cartesian coordinates of atoms :return: DOPE score - ''' + """ self.mol.coordinates = deepcopy(frame) - self.mol.write_pdb('tmp.pdb', split_struc=False) + self.mol.write_pdb("tmp.pdb", split_struc=False) env = Environ() - env.libs.topology.read(file='$(LIB)/top_heav.lib') - env.libs.parameters.read(file='$(LIB)/par.lib') - mdl = complete_pdb(env, 'tmp.pdb') + env.libs.topology.read(file="$(LIB)/top_heav.lib") + env.libs.parameters.read(file="$(LIB)/par.lib") + mdl = complete_pdb(env, "tmp.pdb") atmsel = Selection(mdl.chains[0]) score = atmsel.assess_dope() return score def scan_dope(self, key=None, refine=True, **kwargs): - ''' + """ Calculate DOPE score on a grid sampling the latent space. Requires a grid system to be defined via a prior call to :func:`set_dataset `. - + :param str key: label for unrefined DOPE score surface (default is DOPE_unrefined or DOPE_refined) :param bool refine: if True, structures generated will be energy minimised before DOPE scoring :return: DOPE score latent space NxN surface :return: x-axis values :return: y-axis values - ''' - + """ + if key is None: - if refine=='both': + if refine == "both": key = "DOPE_both" elif refine: key = "DOPE_refined" else: key = "DOPE_unrefined" - + if key not in self.surfaces: - assert 'grid' in self._encoded, 'make sure to call MolearnAnalysis.setup_grid first' - decoded = self.get_decoded('grid') + assert ( + "grid" in self._encoded + ), "make sure to call MolearnAnalysis.setup_grid first" + decoded = self.get_decoded("grid") result = self.get_all_dope_score(decoded, refine=refine, **kwargs) - if refine=='both': - self.surfaces[key] = as_numpy(result.reshape(self.n_samples, self.n_samples, 2)) + if refine == "both": + self.surfaces[key] = as_numpy( + result.reshape(self.n_samples, self.n_samples, 2) + ) else: - self.surfaces[key] = as_numpy(result.reshape(self.n_samples, self.n_samples)) - + self.surfaces[key] = as_numpy( + result.reshape(self.n_samples, self.n_samples) + ) + return self.surfaces[key], self.xvals, self.yvals def scan_ramachandran(self): - ''' + """ Calculate Ramachandran scores on a grid sampling the latent space. Requires a grid system to be defined via a prior call to :func:`set_dataset `. Saves four surfaces in memory, with keys 'Ramachandran_favored', 'Ramachandran_allowed', 'Ramachandran_outliers', and 'Ramachandran_total'. @@ -453,50 +545,136 @@ def scan_ramachandran(self): :return: Ramachandran_favoured latent space NxN surface (ratio of residues in favourable conformation) :return: x-axis values :return: y-axis values - ''' - keys = {i:f'Ramachandran_{i}' for i in ('favored', 'allowed', 'outliers', 'total')} + """ + keys = { + i: f"Ramachandran_{i}" for i in ("favored", "allowed", "outliers", "total") + } if list(keys.values())[0] not in self.surfaces: - assert 'grid' in self._encoded, 'make sure to call MolearnAnalysis.setup_grid first' - decoded = self.get_decoded('grid') + assert ( + "grid" in self._encoded + ), "make sure to call MolearnAnalysis.setup_grid first" + decoded = self.get_decoded("grid") rama = self.get_all_ramachandran_score(decoded) for key, value in rama.items(): self.surfaces[keys[key]] = value - return self.surfaces['Ramachandran_favored'], self.xvals, self.yvals - + return self.surfaces["Ramachandran_favored"], self.xvals, self.yvals + def scan_custom(self, fct, params, key): - ''' + """ Generate a surface coloured as a function of a user-defined function. - + :param fct: function taking atomic coordinates as input, an optional list of parameters, and returning a single value. :param list params: parameters to be passed to function f. If no parameter is needed, pass an empty list. :param str key: name of the dataset generated by this function scan :return: latent space NxN surface, evaluated according to input function :return: x-axis values :return: y-axis values - ''' - decoded = self.get_decoded('grid') + """ + decoded = self.get_decoded("grid") results = [] for i, j in enumerate(decoded): - s = (j.view(1, 3, -1).permute(0, 2, 1)*self.stdval).numpy() + s = (j.view(1, 3, -1).permute(0, 2, 1) * self.stdval).numpy() results.append(fct(s, *params)) self.surfaces[key] = np.array(results).reshape(self.n_samples, self.n_samples) - + return self.surfaces[key], self.xvals, self.yvals - def generate(self, crd): - ''' + def _relax(self, pdb_path: str, mini_path: str) -> None: + """ + relax generated structure + + :param str pdb_path: path of the pdb file to relax + :param str mini_path: path where the new relaxed structure should be saved + """ + # read pdb + pdb = PDBFile(pdb_path) + # preparation + forcefield = ForceField("amber99sb.xml") + modeller = Modeller(pdb.topology, pdb.positions) + modeller.addHydrogens(forcefield) + system = forcefield.createSystem(modeller.topology, nonbondedMethod=NoCutoff) + integrator = VerletIntegrator(0.001 * picoseconds) + simulation = Simulation(modeller.topology, system, integrator) + simulation.context.setPositions(modeller.positions) + # minimize protein + simulation.minimizeEnergy(maxIterations=100) + positions = simulation.context.getState(getPositions=True).getPositions() + # write new pdb file + PDBFile.writeFile(simulation.topology, positions, open(mini_path, "w+")) + + def _pdb_file( + self, + prot_coords: np.ndarray[tuple[int, int], np.dtype[np.float64]], + outpath: str, + ) -> None: + """ + create pdb file for given coordinates + + :param np.ndarray[tuple[int, int], np.dtype[np.float64]] prot_coords: coordinates of all atoms of a protein + :param str outpath: path where the pdb file should be stored + """ + pdb_data = self.mol.data + with open( + outpath, + "w+", + ) as cfile: + for ck, k in enumerate(prot_coords): + cfile.write( + f"{str(pdb_data['atom'][ck]):6s}{int(pdb_data['index'][ck]):5d} {str(pdb_data['name'][ck]):^4s}{'':1s}{str(pdb_data['resname'][ck]):3s} {str(pdb_data['chain'][ck]):1s}{int(pdb_data['resid'][ck]):4d}{'':1s} {k[0]:8.3f}{k[1]:8.3f}{k[2]:8.3f}{float(pdb_data['occupancy'][ck]):6.2f}{float(pdb_data['beta'][ck]):6.2f} {str(pdb_data['atomtype'][ck]):>2s}{str(pdb_data['charge'][ck]):2s}\n" + ) + + def generate( + self, + crd: np.ndarray[tuple[int, int, int], np.dtype[np.float64]], + pdb_path: str | None = None, + relax: bool = False, + ) -> np.ndarray[tuple[int, int, int], np.dtype[np.float64]]: + """ Generate a collection of protein conformations, given coordinates in the latent space. - + :param numpy.array crd: coordinates in the latent space, as a (Nx2) array + :param str pdb_path: path where to pdb_files should be stored as files named sN.pdb where N is the index in the crd array + :param bool relax: whether relaxed structures should be generated in a sN_relaxed.pdb file + :return: collection of protein conformations in the Cartesian space (NxMx3, where M is the number of atoms in the protein) - ''' + """ with torch.no_grad(): - z = torch.tensor(crd.transpose(1, 2, 0)).float() key = list(self._datasets)[0] - s = self.network.decode(z)[:, :, :self._datasets[key].shape[2]].numpy().transpose(0, 2, 1) - - return s*self.stdval + self.meanval + # if not on cpu transfer data back to cpu before converting it to numpy array + if self.device == "cpu": + z = torch.tensor(crd.transpose(1, 2, 0)).float() + s = ( + self.network.decode(z)[:, :, : self._datasets[key].shape[2]] + .numpy() + .transpose(0, 2, 1) + ) + else: + z = torch.tensor(crd.transpose(1, 2, 0)).float().to(self.device) + s = ( + self.network.decode(z)[:, :, : self._datasets[key].shape[2]] + .cpu() + .numpy() + .transpose(0, 2, 1) + ) + + gen_prot_coords = s * self.stdval + self.meanval + # create pdb file + if pdb_path is not None: + for ci, i in enumerate(tqdm(gen_prot_coords, desc="Generating pdb file")): + struct_path = os.path.join(pdb_path, f"s{ci}.pdb") + self._pdb_file(i, struct_path) + # relax and save as new file + if relax: + self._relax( + struct_path, f"{os.path.splitext(struct_path)[0]}_relax.pdb" + ) + + return gen_prot_coords def __getstate__(self): - return {key:value for key, value in dict(self.__dict__).items() if key not in ['dope_score_class', 'ramachandran_score_class']} + return { + key: value + for key, value in dict(self.__dict__).items() + if key not in ["dope_score_class", "ramachandran_score_class"] + }