-
Notifications
You must be signed in to change notification settings - Fork 2
/
ac_PT_multithread.py
177 lines (135 loc) · 6.67 KB
/
ac_PT_multithread.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
""""Script to apply the autocorrelation function"""
from multiprocessing import Pool, cpu_count
import time
import os
from pathlib import Path
import csv
import networkx as nx
import numpy as np
import matplotlib.pyplot as plt
from graph_info import node_info, metal_index, edge_info, vector_feature_PT, vector_feature_NBO
from ac_funtions import *
from utilities import round_csv, save_vectors, join_vectors, round_numbers
# paramteres to modify
PARAMS = {
'ac_operator': 'FR',
'model_number': 3, # AABBA(II) model for PT (1, 2, 3), any number for the AA, BBavg, BB, or AB
'depth_max': 1,
'walk': 'ABBAavg' # AA, BBavg, BB, AB, ABBAavg, ABBA
}
# Define feature set mapping functions
def get_feature_for_walk(walk, fs, model_number):
if walk == 'AA':
return fs[2] # feature_node_depth
elif walk in ['BBavg', 'BB']:
return fs[3] # feature_edge_depth
elif walk == 'AB':
return fs[3] # feature_edge_depth
elif walk in ['ABBAavg', 'ABBA']:
# Return the feature based on model_number
return fs[4 + model_number - 1] # feature_new1_edge_depth, feature_new2_edge_depth, or feature_new3_edge_depth
else:
raise ValueError(f"Walk type '{walk}' is not recognized.")
def read_graph(file, path_to_gml, params):
ac_operator = params['ac_operator']
model_number = params['model_number']
depth_max = params['depth_max']
walk = params['walk']
# computation dict
comp_dict = {'MA': np.multiply,
'FA': np.multiply,
'MD': np.subtract,
'FD': np.subtract,
'MR': np.divide,
'FR': np.divide,
'MS': np.add,
'FS': np.add}
# feature list
feature_set_PT = vector_feature_PT(depth_max, ac_operator, model_number, walk)
# Determine the feature type based on walk and model_number
feature_type = get_feature_for_walk(walk, feature_set_PT, model_number)
file = os.path.join(path_to_gml, file)
# define the class graph
G = nx.Graph()
# read the graph
G = nx.read_gml(file)
# add feature_identity attribute to nodes and edges
nx.set_node_attributes(G, 1, "feature_identity")
nx.set_edge_attributes(G, 1, "feature_identity")
# draw graphs
#nx.draw(G, with_labels=True, font_weight='bold')
#plt.show()
# set the starting node
indx = metal_index(G)
# walk over the attributes
node_dict = node_info(G, depth_max, indx)
edge_dict = edge_info(G, depth_max, indx)
# Perform AC function based on selected walk type
if walk == 'AA':
if ac_operator in ['MA', 'MD', 'MR', 'MS']:
AC_vector, AC_vname = atom_atom_MC(G, indx, depth_max, node_dict, feature_set_PT[0], ac_operator, comp_dict)
elif ac_operator in ['FA', 'FD', 'FR', 'FS']:
AC_vector, AC_vname = atom_atom_F(G, depth_max, feature_set_PT[0], ac_operator, comp_dict)
elif walk == 'BBavg':
if ac_operator in ['MA', 'MD', 'MR', 'MS']:
BB_AC_vector, BB_AC_vname, BB_AC_vector_avg, AC_vname = bond_bond_MC(G, depth_max, edge_dict, feature_set_PT[1], ac_operator, comp_dict)
elif walk == 'BB':
if ac_operator in ['MA', 'MD', 'MR', 'MS']:
BB_AC_vector, AC_vname, BB_AC_vector_avg, BB_AC_vname_avg = bond_bond_MC(G, depth_max, edge_dict, feature_set_PT[1], ac_operator, comp_dict)
elif ac_operator in ['FA', 'FD', 'FR', 'FS']:
AC_vector, AC_vname = bond_bond_F(G, depth_max, feature_set_PT[1], ac_operator, comp_dict)
elif walk == 'AB':
if ac_operator in ['MA', 'MD', 'MR', 'MS']:
AC_vector, AC_vname = bond_atom_MC(G, indx, depth_max, edge_dict, feature_set_PT[0], feature_set_PT[1], ac_operator, comp_dict)
elif ac_operator in ['FA', 'FD', 'FR', 'FS']:
AC_vector, AC_vname = bond_atom_F(G, depth_max, feature_set_PT[0], feature_set_PT[1], ac_operator, comp_dict)
elif walk =='ABBAavg':
if ac_operator in ['MA', 'MD', 'MR', 'MS']:
AC_vector, AC_vname = new_gp_bond_bond_MC(G, depth_max, node_dict, edge_dict, model_number, ac_operator, comp_dict)
elif walk == 'ABBA':
if ac_operator in ['FA', 'FD', 'FR', 'FS']:
AC_vector, AC_vname = new_gp_bond_bond_F(G, depth_max, node_dict, model_number, ac_operator, comp_dict)
else:
raise ValueError(f"Walk type '{walk}' is not recognized.")
return AC_vname
# Wrapper function for multiprocessing
def process_file(args):
file, path_to_gml, params = args
return read_graph(file, path_to_gml, params)
if __name__ == "__main__":
# time of execution
start_time = time.time()
# vector to obtain
print('Selected PT AABBA vector:\nOrigin/Operator:', PARAMS['ac_operator'], \
' Type of walking:', PARAMS['walk'], \
' Maximum depth:', PARAMS['depth_max'],
' Model number (applies for AABBA(II)):', PARAMS['model_number'] \
)
# Determine the directory of the script
script_dir = Path(__file__).resolve().parent
# Define relative paths based on the script directory
general_path = script_dir
path_to_gml = general_path / 'PT_graphs'
path_to_folder = general_path / f'vectors_AABBA/PT_{PARAMS["walk"]}'
# Ensure the output directory exists
if not os.path.exists(path_to_folder):
os.makedirs(path_to_folder)
# List comprehension to get all .gml files
gml_list = [file for file in os.listdir(path_to_gml) if file.endswith('.gml')]
# save maximum depth
#get_max_depth(gml_list)
# Create a process pool and map the files
with Pool() as pool:
poolReturn = pool.map(process_file, [(file, path_to_gml, PARAMS) for file in gml_list])
# Extract the feature set
feature_set_PT = vector_feature_PT(PARAMS['depth_max'], PARAMS['ac_operator'], PARAMS['model_number'], PARAMS['walk'])
# Determine feature type
feature_type = get_feature_for_walk(PARAMS['walk'], feature_set_PT, PARAMS['model_number'])
# Join vectors and save vectors
out_dict = join_vectors(poolReturn, feature_type)
# save derived vectors in a .csv file
print('Save vectors in a .csv file')
save_vectors(path_to_folder, out_dict, PARAMS['depth_max'], PARAMS['ac_operator'], PARAMS['walk'], PARAMS['model_number'])
# Round features of the CSV
round_csv(path_to_folder, PARAMS['depth_max'], PARAMS['ac_operator'], PARAMS['walk'], PARAMS['model_number'])
print("Execution time: " + str(round((time.time() - start_time) / 60, 4)) + " minutes." + str(cpu_count()))