-
Notifications
You must be signed in to change notification settings - Fork 0
/
BookmarkServer.py
121 lines (104 loc) · 4.03 KB
/
BookmarkServer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
#!/usr/bin/env python3
#
# A *bookmark server* or URI shortener that maintains a mapping (dictionary)
# between short names and long URIs, checking that each new URI added to the
# mapping actually works (i.e. returns a 200 OK).
import http.server
import os
import requests
import threading
from socketserver import ThreadingMixIn
from urllib.parse import unquote, parse_qs
memory = {}
form = '''<!DOCTYPE html>
<title>Bookmark Server</title>
<form method="POST">
<label>Long URI:
<input name="longuri">
</label>
<br>
<label>Short name:
<input name="shortname">
</label>
<br>
<button type="submit">Save it!</button>
</form>
<p>URIs I know about:
<pre>
{}
</pre>
'''
def CheckURI(uri, timeout=5):
'''Check whether this URI is reachable, i.e. does it return a 200 OK?
This function returns True if a GET request to uri returns a 200 OK, and
False if that GET request returns any other response, or doesn't return
(i.e. times out).
'''
# 1. Write this function.
try:
r = requests.get(uri, timeout=timeout)
return r.status_code == 200
except requests.RequestException:
print('Request Exception')
return False
class Shortener(http.server.BaseHTTPRequestHandler):
def do_GET(self):
# A GET request will either be for / (the root path) or for /some-name.
# Strip off the / and we have either empty string or a name.
name = unquote(self.path[1:])
if name:
if name in memory:
# 2. Send a 303 redirect to the long URI in memory[name].
self.send_response(303)
self.send_header('Location', memory[name])
self.end_headers()
else:
# We don't know that name! Send a 404 error.
self.send_response(404)
self.send_header('Content-type', 'text/plain; charset=utf-8')
self.end_headers()
self.wfile.write("I don't know '{}'.".format(name).encode())
else:
# Root path. Send the form.
self.send_response(200)
self.send_header('Content-type', 'text/html')
self.end_headers()
# List the known associations in the form.
known = "\n".join("{} : {}".format(key, memory[key])
for key in sorted(memory.keys()))
self.wfile.write(form.format(known).encode())
def do_POST(self):
# Decode the form data.
length = int(self.headers.get('Content-length', 0))
body = self.rfile.read(length).decode()
params = parse_qs(body)
# Check that the user submitted the form fields.
if "longuri" not in params or "shortname" not in params:
# 3. Serve a 400 error with a useful message.
self.send_response(400)
self.send_header('Content-type', 'text/plain; charset=utf-8')
self.end_headers()
self.wfile.write('Form not completed!'.encode())
longuri = params["longuri"][0]
shortname = params["shortname"][0]
if CheckURI(longuri):
# This URI is good! Remember it under the specified name.
memory[shortname] = longuri
# 4. Serve a redirect to the root page (the form).
self.send_response(303)
self.send_header('Location', '/')
self.end_headers()
else:
# Didn't successfully fetch the long URI.
# 5. Send a 404 error with a useful message.
self.send_response(404)
self.send_header('Content-type', 'text/plain; charset=utf-8')
self.end_headers()
self.wfile.write('Cannot find the webpage {}'.format(longuri).encode())
class ThreadHTTPServer(ThreadingMixIn, http.server.HTTPServer):
"This is an HTTPServer that supports thread-based concurrency."
if __name__ == '__main__':
port = int(os.environ.get('PORT', 8000))
server_address = ('', port)
httpd = ThreadHTTPServer(server_address, Shortener)
httpd.serve_forever()