Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding ability to create new trajectory based on own frame index file #22

Merged
merged 1 commit into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions examples/prepare_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,20 @@ def main():
# simply striding over the trajectories with a step size computed to result in n_cluster frames
tm.stride()
tm.create_trajectories()
# creating a new trajectory based on provided indices
tm.own_idx("index_sample.txt")
tm.create_trajectories()
"""
the given example will create the following files in a new directory named 'clustered'
* MurDopen_CLUSTER_aggl_train.dcd
* MurDopen_CLUSTER_aggl_train_frames.txt
* MurDopen_CLUSTER_pca_train.dcd
* MurDopen_CLUSTER_pca_train_frames.txt
* MurDopen_NEW_TOPO.pdb
* MurDopen_STRIDE_5_train.dcd
* MurDopen_STRIDE_5_train_frames.txt
* MurDopen_STRIDE_train.dcd
* MurDopen_STRIDE_train_frames.txt
* MurDopen_PROVIDED_train.dcd
* MurDopen_PROVIDED_train_frames.txd
the txt files contain the indices of frames of the original trajectory
the dcd files contain the new trajectory
the pdb file is the new topology for the new trajectory
Expand Down
58 changes: 40 additions & 18 deletions src/molearn/data/prepare.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
from sklearn.cluster import AgglomerativeClustering, KMeans
from sklearn.decomposition import PCA

np.random.seed(42)


class DataAssembler:
def __init__(
Expand All @@ -20,6 +18,7 @@ def __init__(
image_mol: bool = False,
outpath: str = "",
verbose: bool = False,
dist_mat: bool = True,
):
"""
Create clustered trajectories, stride trajectories and randomly sampled test frames
Expand All @@ -34,6 +33,7 @@ def __init__(
:param bool image_mol: True to image to molecule (center it in the box)
:param str outpath: directory path where the new trajector(y)ies should be stored
:param bool verbose: True to get info which steps are currently performed
:param bool verbose: True to calculate the n_frames x n_frames distance matrix
"""
self.traj_path = traj_path
self.topo_path = topo_path
Expand All @@ -42,6 +42,7 @@ def __init__(
self.image_mol = image_mol
self.outpath = outpath
self.verbose = verbose
self.dist_mat = dist_mat
assert os.path.exists(self.outpath), "Outpath does not exist"

def _loading_fallback(self, traj_path, topo_path):
Expand Down Expand Up @@ -222,18 +223,19 @@ def read_traj(self, atom_indices=None, ref_atom_indices=None) -> None:
# number of frames for training/validation set
n_train_frames = len(self.train_idx)

# remove H atoms from distance calculation
atom_indices = [
a.index for a in train_traj.topology.atoms if a.element.symbol != "H"
]
if self.verbose:
print("Calculating disance matrix")
# distance matrix between all frames
self.traj_dists = np.empty((n_train_frames, n_train_frames))
for i in range(n_train_frames):
self.traj_dists[i] = md.rmsd(
train_traj, train_traj, i, atom_indices=atom_indices
)
if self.dist_mat:
# remove H atoms from distance calculation
atom_indices = [
a.index for a in train_traj.topology.atoms if a.element.symbol != "H"
]
if self.verbose:
print("Calculating disance matrix")
# distance matrix between all frames
self.traj_dists = np.empty((n_train_frames, n_train_frames))
for i in range(n_train_frames):
self.traj_dists[i] = md.rmsd(
train_traj, train_traj, i, atom_indices=atom_indices
)

def calc_rmsd_f(
self,
Expand Down Expand Up @@ -294,7 +296,7 @@ def distance_cluster(
"""
assert hasattr(
self, "traj_dists"
), "No pairweise frame distances present - read in trajectory first"
), "No pairweise frame distances present - read in trajectory first and make sure `dist_mat=True`"
if self.verbose:
print("Distance clustering")
# replace AgglomerativeClustering with any distance matrix based clustering function
Expand Down Expand Up @@ -363,6 +365,21 @@ def stride(self) -> None:
self.cluster_idx = stride_idx
self.cluster_method = f"STRIDE_{self.n_cluster}"

def own_idx(self, file_path: str):
"""
Provide indices for frames to create a new trajectory.
Useful if trajectory should be sub sampled by some external metric.

:param str file_path: path where the file storing the indices is located. Needs to have each index in a separate line.
"""
provided_idx = []
with open(file_path, "r") as ifile:
for i in ifile:
provided_idx.append(int(i))
self.train_idx = np.asarray(provided_idx)
self.cluster_idx = np.arange(len(self.train_idx))
self.cluster_method = "PROVIDED"

def _save_idx(
self, filepath: str, idxs: list[int] | np.ndarray[tuple[int], np.dtype[np.int_]]
) -> None:
Expand All @@ -389,12 +406,17 @@ def create_trajectories(
[
hasattr(self, "traj"),
hasattr(self, "traj_name"),
hasattr(self, "traj_dists"),
hasattr(self, "train_idx"),
hasattr(self, "frame_idx"),
hasattr(self, "test_border"),
hasattr(self, "cluster_idx"),
]
), "Read in trajectory first"
assert all(
[
hasattr(self, "train_idx"),
hasattr(self, "cluster_idx"),
hasattr(self, "cluster_method"),
]
), "Cluster the trajectory first"

assert hasattr(
self, "cluster_idx"
Expand Down
Loading