diff --git a/examples/prepare_example.py b/examples/prepare_example.py index 8ee33d7..0217bf6 100644 --- a/examples/prepare_example.py +++ b/examples/prepare_example.py @@ -38,6 +38,9 @@ def main(): # simply striding over the trajectories with a step size computed to result in n_cluster frames tm.stride() tm.create_trajectories() + # creating a new trajectory based on provided indices + tm.own_idx("index_sample.txt") + tm.create_trajectories() """ the given example will create the following files in a new directory named 'clustered' * MurDopen_CLUSTER_aggl_train.dcd @@ -45,8 +48,10 @@ def main(): * MurDopen_CLUSTER_pca_train.dcd * MurDopen_CLUSTER_pca_train_frames.txt * MurDopen_NEW_TOPO.pdb - * MurDopen_STRIDE_5_train.dcd - * MurDopen_STRIDE_5_train_frames.txt + * MurDopen_STRIDE_train.dcd + * MurDopen_STRIDE_train_frames.txt + * MurDopen_PROVIDED_train.dcd + * MurDopen_PROVIDED_train_frames.txd the txt files contain the indices of frames of the original trajectory the dcd files contain the new trajectory the pdb file is the new topology for the new trajectory diff --git a/src/molearn/data/prepare.py b/src/molearn/data/prepare.py index e0663ff..59e0119 100644 --- a/src/molearn/data/prepare.py +++ b/src/molearn/data/prepare.py @@ -7,8 +7,6 @@ from sklearn.cluster import AgglomerativeClustering, KMeans from sklearn.decomposition import PCA -np.random.seed(42) - class DataAssembler: def __init__( @@ -20,6 +18,7 @@ def __init__( image_mol: bool = False, outpath: str = "", verbose: bool = False, + dist_mat: bool = True, ): """ Create clustered trajectories, stride trajectories and randomly sampled test frames @@ -34,6 +33,7 @@ def __init__( :param bool image_mol: True to image to molecule (center it in the box) :param str outpath: directory path where the new trajector(y)ies should be stored :param bool verbose: True to get info which steps are currently performed + :param bool verbose: True to calculate the n_frames x n_frames distance matrix """ self.traj_path = traj_path self.topo_path = topo_path @@ -42,6 +42,7 @@ def __init__( self.image_mol = image_mol self.outpath = outpath self.verbose = verbose + self.dist_mat = dist_mat assert os.path.exists(self.outpath), "Outpath does not exist" def _loading_fallback(self, traj_path, topo_path): @@ -222,18 +223,19 @@ def read_traj(self, atom_indices=None, ref_atom_indices=None) -> None: # number of frames for training/validation set n_train_frames = len(self.train_idx) - # remove H atoms from distance calculation - atom_indices = [ - a.index for a in train_traj.topology.atoms if a.element.symbol != "H" - ] - if self.verbose: - print("Calculating disance matrix") - # distance matrix between all frames - self.traj_dists = np.empty((n_train_frames, n_train_frames)) - for i in range(n_train_frames): - self.traj_dists[i] = md.rmsd( - train_traj, train_traj, i, atom_indices=atom_indices - ) + if self.dist_mat: + # remove H atoms from distance calculation + atom_indices = [ + a.index for a in train_traj.topology.atoms if a.element.symbol != "H" + ] + if self.verbose: + print("Calculating disance matrix") + # distance matrix between all frames + self.traj_dists = np.empty((n_train_frames, n_train_frames)) + for i in range(n_train_frames): + self.traj_dists[i] = md.rmsd( + train_traj, train_traj, i, atom_indices=atom_indices + ) def calc_rmsd_f( self, @@ -294,7 +296,7 @@ def distance_cluster( """ assert hasattr( self, "traj_dists" - ), "No pairweise frame distances present - read in trajectory first" + ), "No pairweise frame distances present - read in trajectory first and make sure `dist_mat=True`" if self.verbose: print("Distance clustering") # replace AgglomerativeClustering with any distance matrix based clustering function @@ -363,6 +365,21 @@ def stride(self) -> None: self.cluster_idx = stride_idx self.cluster_method = f"STRIDE_{self.n_cluster}" + def own_idx(self, file_path: str): + """ + Provide indices for frames to create a new trajectory. + Useful if trajectory should be sub sampled by some external metric. + + :param str file_path: path where the file storing the indices is located. Needs to have each index in a separate line. + """ + provided_idx = [] + with open(file_path, "r") as ifile: + for i in ifile: + provided_idx.append(int(i)) + self.train_idx = np.asarray(provided_idx) + self.cluster_idx = np.arange(len(self.train_idx)) + self.cluster_method = "PROVIDED" + def _save_idx( self, filepath: str, idxs: list[int] | np.ndarray[tuple[int], np.dtype[np.int_]] ) -> None: @@ -389,12 +406,17 @@ def create_trajectories( [ hasattr(self, "traj"), hasattr(self, "traj_name"), - hasattr(self, "traj_dists"), hasattr(self, "train_idx"), - hasattr(self, "frame_idx"), - hasattr(self, "test_border"), + hasattr(self, "cluster_idx"), ] ), "Read in trajectory first" + assert all( + [ + hasattr(self, "train_idx"), + hasattr(self, "cluster_idx"), + hasattr(self, "cluster_method"), + ] + ), "Cluster the trajectory first" assert hasattr( self, "cluster_idx"