-
Notifications
You must be signed in to change notification settings - Fork 21
/
site_graph.py
250 lines (196 loc) · 9.45 KB
/
site_graph.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
import time
from bs4 import BeautifulSoup
import urllib
import requests
from pyvis.network import Network
import networkx as nx
import argparse
import pickle
import scipy
import numpy as np
from collections import deque
INTERNAL_COLOR = '#0072BB'
EXTERNAL_COLOR = '#FF9F40'
ERROR_COLOR = '#FF0800'
RESOURCE_COLOR = '#2ECC71'
def handle_error(error, error_obj, r, url, visited, error_codes):
error = str(error_obj) if error else r.status_code
visited.add(url)
error_codes[url] = error
print(f'{error} ERROR while visiting {url}')
def has_been_visited(url, visited):
return url in visited or url.rstrip('/') in visited or url + '/' in visited
def crawl(url, visit_external, keep_queries):
visited = set()
edges = set()
resource_pages = set()
error_codes = dict()
redirect_target_url = dict()
head = requests.head(url, timeout=10)
site_url = head.url
redirect_target_url[url] = site_url
to_visit = deque()
to_visit.append((site_url, None))
while to_visit:
url, from_url = to_visit.pop()
print('Visiting', url, 'from', from_url)
error = False
error_obj = None
try:
page = requests.get(url, timeout=10)
except requests.exceptions.RequestException as e:
error = True
error_obj = e
if error or not page:
handle_error(error, error_obj, page, url, visited, error_codes)
continue
# Don't look for links in external pages
if not url.startswith(site_url):
continue
soup = BeautifulSoup(page.text, 'html.parser')
# Handle <base> tags
base_url = soup.find('base')
base_url = '' if base_url is None else base_url.get('href', '')
for link in soup.find_all('a', href=True):
link_url = link['href']
if link_url.startswith('mailto:'):
continue
# Resolve relative paths
if not link_url.startswith('http'):
link_url = urllib.parse.urljoin(url, urllib.parse.urljoin(base_url, link_url))
# Remove queries/fragments from internal links
if not keep_queries and link_url.startswith(site_url):
link_url = urllib.parse.urljoin(link_url, urllib.parse.urlparse(link_url).path)
# Load where we know that link_url will be redirected
if link_url in redirect_target_url:
link_url = redirect_target_url[link_url]
if not has_been_visited(link_url, visited) and (visit_external or link_url.startswith(site_url)):
is_html = False
error = False
error_obj = None
try:
head = requests.head(link_url, timeout=10)
if head and 'html' in head.headers.get('content-type', ''):
is_html = True
except requests.exceptions.RequestException as e:
error = True
error_obj = e
if error or not head:
handle_error(error, error_obj, head, link_url, visited, error_codes)
edges.add((url, link_url))
continue
visited.add(link_url)
redirect_target_url[link_url] = head.url
link_url = redirect_target_url[link_url]
visited.add(link_url)
if is_html:
if url.startswith(site_url):
to_visit.append((link_url, url))
else:
resource_pages.add(link_url)
# print(f'adding edge from {url} to {link_url}')
edges.add((url, link_url))
return edges, error_codes, resource_pages
def get_node_info(nodes, error_codes, resource_pages, args):
node_info = []
for node in nodes:
if node in error_codes:
node_info.append(f'Error: {error_codes[node]}')
elif node in resource_pages:
node_info.append('resource')
elif node.startswith(args.site_url):
node_info.append('internal')
else:
node_info.append('external')
return node_info
def visualize(edges, error_codes, resource_pages, args):
G = nx.DiGraph()
G.add_edges_from(edges)
# Contract any extra nodes
nodes = set(G.nodes)
for node in nodes:
alias = node + '/'
if alias in nodes:
print(f'Contracting {node} and {alias}')
G = nx.contracted_nodes(G, alias, node)
if args.save_txt is not None or args.save_npz is not None:
nodes = list(G.nodes())
adj_matrix = nx.to_numpy_array(G, nodelist=nodes, dtype=int)
if args.save_npz is not None:
base_fname = args.save_npz.replace('.npz', '')
scipy.sparse.save_npz(args.save_npz, scipy.sparse.coo_matrix(adj_matrix))
else:
base_fname = args.save_txt.replace('.txt', '')
np.savetxt(args.save_txt, adj_matrix, fmt='%d')
node_info = get_node_info(nodes, error_codes, resource_pages, args)
with open(base_fname + '_nodes.txt', 'w') as f:
f.write('\n'.join([nodes[i] + '\t' + node_info[i] for i in range(len(nodes))]))
net = Network(width=args.width, height=args.height, directed=True)
net.from_nx(G)
if args.show_buttons:
net.show_buttons()
elif args.options is not None:
try:
with open(args.options, 'r') as f:
net.set_options(f.read())
except FileNotFoundError as e:
print('Error: options file', args.options, 'not found.')
except Exception as e:
print('Error applying options:', e)
for node in net.nodes:
node['size'] = 15
node['label'] = ''
if node['id'].startswith(args.site_url):
node['color'] = INTERNAL_COLOR
if node['id'] in resource_pages:
node['color'] = RESOURCE_COLOR
else:
node['color'] = EXTERNAL_COLOR
if node['id'] in error_codes:
node['title'] = f'{error_codes[node["id"]]} Error: <a href="{node["id"]}">{node["id"]}</a>'
if not args.only_404 or error_codes[node['id']] == 404:
node['color'] = ERROR_COLOR
else:
node['title'] = f'<a href="{node["id"]}">{node["id"]}</a>'
# Remove saved contractions (otherwise save_graph crashes)
for edge in net.edges:
edge.pop('contraction', None)
net.save_graph(args.vis_file)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Visualize the link graph of a website.')
parser.add_argument('site_url', type=str, help='the base URL of the website', nargs='?', default='')
# Defaults
vis_file = 'site.html'
data_file = 'crawl.pickle'
width = 1000
height = 800
parser.add_argument('--vis-file', type=str, help=f'filename in which to save HTML graph visualization (default: {vis_file})', default=vis_file)
parser.add_argument('--data-file', type=str, help=f'filename in which to save crawled graph data (default: {data_file})', default=data_file)
parser.add_argument('--width', type=int, help=f'width of graph visualization in pixels (default: {width})', default=width)
parser.add_argument('--height', type=int, help=f'height of graph visualization in pixels (default: {height})', default=height)
parser.add_argument('--visit-external', action='store_true', help='detect broken external links (slower)')
parser.add_argument('--show-buttons', action='store_true', help='show visualization settings UI')
parser.add_argument('--options', type=str, help='file with drawing options (use --show-buttons to configure, then generate options)')
parser.add_argument('--from-data-file', type=str, help='create visualization from given data file', default=None)
parser.add_argument('--force', action='store_true', help='override warnings about base URL')
parser.add_argument('--save-txt', type=str, nargs='?', help='filename in which to save adjacency matrix (if no argument, uses adj_matrix.txt). Also saves node labels to [filename]_nodes.txt', const='adj_matrix.txt', default=None)
parser.add_argument('--save-npz', type=str, nargs='?', help='filename in which to save sparse adjacency matrix (if no argument, uses adj_matrix.npz). Also saves node labels to [filename]_nodes.txt', const='adj_matrix.npz', default=None)
parser.add_argument('--keep-queries', action='store_true', help='create visualization from given data file')
parser.add_argument('--only-404', action='store_true', help='only color 404 error nodes in the error color')
args = parser.parse_args()
if args.from_data_file is None:
if not args.site_url.startswith('https'):
if not args.force:
print('Warning: not using https. If you really want to use http, run with --force')
exit(1)
edges, error_codes, resource_pages = crawl(args.site_url, args.visit_external, args.keep_queries)
print('Crawl complete.')
with open(args.data_file, 'wb') as f:
pickle.dump((edges, error_codes, resource_pages, args.site_url), f)
print(f'Saved crawl data to {args.data_file}')
else:
with open(args.from_data_file, 'rb') as f:
edges, error_codes, resource_pages, site_url = pickle.load(f)
args.site_url = site_url
visualize(edges, error_codes, resource_pages, args)
print('Saved graph to', args.vis_file)