Skip to content

Commit

Permalink
adding ability to create new trajectory based on own frame index file
Browse files Browse the repository at this point in the history
  • Loading branch information
gwirn committed Aug 26, 2024
1 parent 6754592 commit 71dbbbb
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 20 deletions.
9 changes: 7 additions & 2 deletions examples/prepare_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,20 @@ def main():
# simply striding over the trajectories with a step size computed to result in n_cluster frames
tm.stride()
tm.create_trajectories()
# creating a new trajectory based on provided indices
tm.own_idx("index_sample.txt")
tm.create_trajectories()
"""
the given example will create the following files in a new directory named 'clustered'
* MurDopen_CLUSTER_aggl_train.dcd
* MurDopen_CLUSTER_aggl_train_frames.txt
* MurDopen_CLUSTER_pca_train.dcd
* MurDopen_CLUSTER_pca_train_frames.txt
* MurDopen_NEW_TOPO.pdb
* MurDopen_STRIDE_5_train.dcd
* MurDopen_STRIDE_5_train_frames.txt
* MurDopen_STRIDE_train.dcd
* MurDopen_STRIDE_train_frames.txt
* MurDopen_PROVIDED_train.dcd
* MurDopen_PROVIDED_train_frames.txd
the txt files contain the indices of frames of the original trajectory
the dcd files contain the new trajectory
the pdb file is the new topology for the new trajectory
Expand Down
58 changes: 40 additions & 18 deletions src/molearn/data/prepare.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
from sklearn.cluster import AgglomerativeClustering, KMeans
from sklearn.decomposition import PCA

np.random.seed(42)


class DataAssembler:
def __init__(
Expand All @@ -20,6 +18,7 @@ def __init__(
image_mol: bool = False,
outpath: str = "",
verbose: bool = False,
dist_mat: bool = True,
):
"""
Create clustered trajectories, stride trajectories and randomly sampled test frames
Expand All @@ -34,6 +33,7 @@ def __init__(
:param bool image_mol: True to image to molecule (center it in the box)
:param str outpath: directory path where the new trajector(y)ies should be stored
:param bool verbose: True to get info which steps are currently performed
:param bool verbose: True to calculate the n_frames x n_frames distance matrix
"""
self.traj_path = traj_path
self.topo_path = topo_path
Expand All @@ -42,6 +42,7 @@ def __init__(
self.image_mol = image_mol
self.outpath = outpath
self.verbose = verbose
self.dist_mat = dist_mat
assert os.path.exists(self.outpath), "Outpath does not exist"

def _loading_fallback(self, traj_path, topo_path):
Expand Down Expand Up @@ -222,18 +223,19 @@ def read_traj(self, atom_indices=None, ref_atom_indices=None) -> None:
# number of frames for training/validation set
n_train_frames = len(self.train_idx)

# remove H atoms from distance calculation
atom_indices = [
a.index for a in train_traj.topology.atoms if a.element.symbol != "H"
]
if self.verbose:
print("Calculating disance matrix")
# distance matrix between all frames
self.traj_dists = np.empty((n_train_frames, n_train_frames))
for i in range(n_train_frames):
self.traj_dists[i] = md.rmsd(
train_traj, train_traj, i, atom_indices=atom_indices
)
if self.dist_mat:
# remove H atoms from distance calculation
atom_indices = [
a.index for a in train_traj.topology.atoms if a.element.symbol != "H"
]
if self.verbose:
print("Calculating disance matrix")
# distance matrix between all frames
self.traj_dists = np.empty((n_train_frames, n_train_frames))
for i in range(n_train_frames):
self.traj_dists[i] = md.rmsd(
train_traj, train_traj, i, atom_indices=atom_indices
)

def calc_rmsd_f(
self,
Expand Down Expand Up @@ -294,7 +296,7 @@ def distance_cluster(
"""
assert hasattr(
self, "traj_dists"
), "No pairweise frame distances present - read in trajectory first"
), "No pairweise frame distances present - read in trajectory first and make sure `dist_mat=True`"
if self.verbose:
print("Distance clustering")
# replace AgglomerativeClustering with any distance matrix based clustering function
Expand Down Expand Up @@ -363,6 +365,21 @@ def stride(self) -> None:
self.cluster_idx = stride_idx
self.cluster_method = f"STRIDE_{self.n_cluster}"

def own_idx(self, file_path: str):
"""
Provide indices for frames to create a new trajectory.
Useful if trajectory should be sub sampled by some external metric.
:param str file_path: path where the file storing the indices is located. Needs to have each index in a separate line.
"""
provided_idx = []
with open(file_path, "r") as ifile:
for i in ifile:
provided_idx.append(int(i))
self.train_idx = np.asarray(provided_idx)
self.cluster_idx = np.arange(len(self.train_idx))
self.cluster_method = "PROVIDED"

def _save_idx(
self, filepath: str, idxs: list[int] | np.ndarray[tuple[int], np.dtype[np.int_]]
) -> None:
Expand All @@ -389,12 +406,17 @@ def create_trajectories(
[
hasattr(self, "traj"),
hasattr(self, "traj_name"),
hasattr(self, "traj_dists"),
hasattr(self, "train_idx"),
hasattr(self, "frame_idx"),
hasattr(self, "test_border"),
hasattr(self, "cluster_idx"),
]
), "Read in trajectory first"
assert all(
[
hasattr(self, "train_idx"),
hasattr(self, "cluster_idx"),
hasattr(self, "cluster_method"),
]
), "Cluster the trajectory first"

assert hasattr(
self, "cluster_idx"
Expand Down

0 comments on commit 71dbbbb

Please sign in to comment.