forked from pypi/legacy
-
Notifications
You must be signed in to change notification settings - Fork 0
/
pypi.wsgi
executable file
·158 lines (119 loc) · 4.88 KB
/
pypi.wsgi
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
#!/usr/bin/python
import sys
import os
prefix = os.path.dirname(__file__)
sys.path.insert(0, prefix)
import cStringIO
import webui
import store
import config
import re
from functools import partial
store.keep_conn = True
CONFIG_FILE = os.environ.get("PYPI_CONFIG", os.path.join(prefix, 'config.ini'))
class Request:
def __init__(self, environ, start_response):
self.start_response = start_response
try:
length = int(environ.get('CONTENT_LENGTH', 0))
except ValueError:
length = 0
self.rfile = cStringIO.StringIO(environ['wsgi.input'].read(length))
self.wfile = cStringIO.StringIO()
self.config = config.Config(CONFIG_FILE)
self.status = None
self.headers = []
def set_status(self, status):
self.status = status
def send_response(self, code, message='no details available'):
self.status = '%s %s' % (code, message)
self.headers = []
def send_header(self, keyword, value):
self.headers.append((keyword, value))
def set_content_type(self, content_type):
self.send_header('Content-Type', content_type)
def end_headers(self):
self.start_response(self.status, self.headers)
class CacheControlMiddleware(object):
def __init__(self, app):
self.app = app
def __call__(self, environ, start_response):
def _start_response(status, headers, exc_info=None):
script = environ.get("SCRIPT_NAME", None)
if script in set(["/simple"]):
# Cache for a day in Fastly, but 10 minutes in browsers
headers += [
("Surrogate-Control", "max-age=86400"),
("Cache-Control", "max-age=600, public"),
]
elif script in set(["/packages"]):
if status[:3] in ["200", "304"]:
# Cache for a year
headers += [("Cache-Control", "max-age=31557600, public")]
else:
# Cache for an hour
headers += [("Cache-Control", "max-age=3600, public")]
elif script in set(["/mirrors", "/security"]):
# Cache these for a week
headers += [("Cache-Control", "max-age=604800, public")]
# http://www.gnuterrypratchett.com/
headers += [("X-Clacks-Overhead", "GNU Terry Pratchett")]
return start_response(status, headers, exc_info)
return self.app(environ, _start_response)
class SecurityHeaderMiddleware(object):
def __init__(self, app):
self.app = app
def __call__(self, environ, start_response):
def _start_response(status, headers, exc_info=None):
headers += [
("X-Frame-Options", "deny"),
("X-XSS-Protection", "1; mode=block"),
("X-Content-Type-Options", "nosniff"),
]
return start_response(status, headers, exc_info)
return self.app(environ, _start_response)
def debug(environ, start_response):
if environ['PATH_INFO'].startswith("/auth") and \
"HTTP_AUTHORIZATION" not in environ:
start_response("401 login",
[('WWW-Authenticate', 'Basic realm="foo"')])
return
start_response("200 ok", [('Content-type', 'text/plain')])
environ = environ.items()
environ.sort()
for k, v in environ:
yield "%s=%s\n" % (k, v)
return
def application(environ, start_response):
if "HTTP_AUTHORIZATION" in environ:
environ["HTTP_CGI_AUTHORIZATION"] = environ["HTTP_AUTHORIZATION"]
try:
r = Request(environ, start_response)
webui.WebUI(r, environ).run()
return [r.wfile.getvalue()]
except Exception, e:
import traceback;traceback.print_exc()
return ['Ooops, there was a problem (%s)' % e]
#application=debug
# Handle Caching at the WSGI layer
application = CacheControlMiddleware(application)
# Add some Security Headers to every response
application = SecurityHeaderMiddleware(application)
# pretend to be like the UWSGI configuration - set SCRIPT_NAME to the first
# part of the PATH_INFO if it's valid and remove that part from the PATH_INFO
def site_fake(app, environ, start_response):
PATH_INFO = environ['PATH_INFO']
m = re.match('^/(pypi|simple|daytime|serversig|mirrors|id|oauth|google_login|'
'security|packages)(.*)', PATH_INFO)
if not m:
start_response("404 not found", [('Content-type', 'text/plain')])
return ['Not Found: %s' % PATH_INFO]
environ['SCRIPT_NAME'] = '/' + m.group(1)
environ['PATH_INFO'] = m.group(2)
return app(environ, start_response)
if __name__ == '__main__':
# very simple wsgi server so we can play locally
from wsgiref.simple_server import make_server
httpd = make_server('', 8000, partial(site_fake, application))
print "Serving on port 8000..."
httpd.serve_forever()