-
Notifications
You must be signed in to change notification settings - Fork 3
/
shell.py
307 lines (257 loc) · 13 KB
/
shell.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
import logging
import os
import re
import time
import urllib
from threading import Thread
import pika
import json
from Queue import Queue
from flask import current_app as app, render_template, request, redirect, abort, jsonify, json as json_mod, url_for, session, Blueprint
from itsdangerous import TimedSerializer, BadTimeSignature, Signer, BadSignature
from passlib.hash import bcrypt_sha256
from CTFd.utils import sha512, is_safe_url, authed, can_send_mail, sendmail, can_register, get_config, verify_email
from CTFd.models import db, Teams, Pages
from CTFd.plugins import register_plugin_assets_directory
import CTFd.auth
import CTFd.views
from CTFd import utils
def load(app):
# register_plugin_assets_directory(app, base_path='/plugins/ctfd-shell-plugin/assets/')
app.db.create_all()
shellexists = Pages.query.filter_by(route='shell').first()
if not shellexists:
title = 'Shell'
route = 'shell'
html = """<style>
#shell-container {
width: calc(100vw - 20px);
padding: 0 0 0 0;
}
#shell-help {
text-align: center;
height:75px;
margin: 0 0 0 0;
border-radius: 5px;
}
#shell {
height: calc(100vh - 200px);
width: 100%;
}
</style>
<div id="shell-container" class="container shell-container">
<div id="shell-help" class="alert alert-info alert-dismissable">
<a href="#" class="close" data-dismiss="alert" aria-label="close">×</a>
<p>Use the username and password you registered with to log in.</p>
<p>You may also log in over ssh on port 2222</p>
</div>
<div class>
<iframe id="shell" src="https://192.168.1.1/shell/" id="gadget0" name="gadget0" frameborder="1"></iframe>
</div>
</div>
"""
page = Pages(title, route, html, draft=False, auth_required=True)
db.session.add(page)
db.session.commit()
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='shell_queue', durable=True)
def new_register():
logger = logging.getLogger('regs')
if not utils.can_register():
return redirect(url_for('auth.login'))
if request.method == 'POST':
errors = []
name = request.form['name']
email = request.form['email']
password = request.form['password']
name_len = len(name) == 0
names = Teams.query.add_columns('name', 'id').filter_by(name=name).first()
emails = Teams.query.add_columns('email', 'id').filter_by(email=email).first()
pass_short = len(password) == 0
pass_long = len(password) > 128
valid_email = utils.check_email_format(request.form['email'])
team_name_email_check = utils.check_email_format(name)
if not valid_email:
errors.append("Please enter a valid email address")
if names:
errors.append('That team name is already taken')
if team_name_email_check is True:
errors.append('Your team name cannot be an email address')
if emails:
errors.append('That email has already been used')
if pass_short:
errors.append('Pick a longer password')
if pass_long:
errors.append('Pick a shorter password')
if name_len:
errors.append('Pick a longer team name')
if len(errors) > 0:
return render_template('register.html', errors=errors, name=request.form['name'], email=request.form['email'], password=request.form['password'])
else:
with app.app_context():
team = Teams(name, email.lower(), password)
db.session.add(team)
db.session.commit()
db.session.flush()
message = json.dumps(["add", name, password])
channel.basic_publish(exchange='',
routing_key='shell_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
))
session['username'] = team.name
session['id'] = team.id
session['admin'] = team.admin
session['nonce'] = utils.sha512(os.urandom(10))
if utils.can_send_mail() and utils.get_config('verify_emails'): # Confirming users is enabled and we can send email.
logger = logging.getLogger('regs')
logger.warn("[{date}] {ip} - {username} registered (UNCONFIRMED) with {email}".format(
date=time.strftime("%m/%d/%Y %X"),
ip=utils.get_ip(),
username=request.form['name'].encode('utf-8'),
email=request.form['email'].encode('utf-8')
))
utils.verify_email(team.email)
db.session.close()
return redirect(url_for('auth.confirm_user'))
else: # Don't care about confirming users
if utils.can_send_mail(): # We want to notify the user that they have registered.
utils.sendmail(request.form['email'], "You've successfully registered for {}".format(utils.get_config('ctf_name')))
logger.warn("[{date}] {ip} - {username} registered with {email}".format(
date=time.strftime("%m/%d/%Y %X"),
ip=utils.get_ip(),
username=request.form['name'].encode('utf-8'),
email=request.form['email'].encode('utf-8')
))
db.session.close()
return redirect(url_for('challenges.challenges_view'))
else:
return render_template('register.html')
def new_profile(data=None):
logger = logging.getLogger('logins')
if data is not None:
try:
s = TimedSerializer(app.config['SECRET_KEY'])
name = s.loads(utils.base64decode(data, urldecode=True), max_age=1800)
except BadTimeSignature:
return render_template('reset_password.html', errors=['Your link has expired'])
except (BadSignature, TypeError, base64.binascii.Error):
return render_template('reset_password.html', errors=['Your reset token is invalid'])
if request.method == "GET":
return render_template('reset_password.html', mode='set')
if request.method == "POST":
team = Teams.query.filter_by(name=name).first_or_404()
team.password = bcrypt_sha256.encrypt(request.form['password'].strip())
db.session.commit()
logger.warn("[{date}] {ip} - successful password reset for {username}".format(
date=time.strftime("%m/%d/%Y %X"),
ip=utils.get_ip(),
username=team.name.encode('utf-8')
))
password = request.form['password'].strip()
if password:
message = json.dumps(["change", name, password])
channel.basic_publish(exchange='',
routing_key='shell_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
))
db.session.close()
return redirect(url_for('auth.login'))
if request.method == 'POST':
email = request.form['email'].strip()
team = Teams.query.filter_by(email=email).first()
errors = []
if utils.can_send_mail() is False:
return render_template(
'reset_password.html',
errors=['Email could not be sent due to server misconfiguration']
)
if not team:
return render_template(
'reset_password.html',
errors=['If that account exists you will receive an email, please check your inbox']
)
utils.forgot_password(email, team.name)
return render_template(
'reset_password.html',
errors=['If that account exists you will receive an email, please check your inbox']
)
return render_template('reset_password.html')
def new_reset_pass():
if utils.authed():
if request.method == "POST":
errors = []
name = request.form.get('name').strip()
email = request.form.get('email').strip()
website = request.form.get('website').strip()
affiliation = request.form.get('affiliation').strip()
country = request.form.get('country').strip()
user = Teams.query.filter_by(id=session['id']).first()
if not utils.get_config('prevent_name_change'):
names = Teams.query.filter_by(name=name).first()
name_len = len(request.form['name']) == 0
emails = Teams.query.filter_by(email=email).first()
valid_email = utils.check_email_format(email)
if utils.check_email_format(name) is True:
errors.append('Team name cannot be an email address')
if ('password' in request.form.keys() and not len(request.form['password']) == 0) and \
(not bcrypt_sha256.verify(request.form.get('confirm').strip(), user.password)):
errors.append("Your old password doesn't match what we have.")
if not valid_email:
errors.append("That email doesn't look right")
if not utils.get_config('prevent_name_change') and names and name != session['username']:
errors.append('That team name is already taken')
if emails and emails.id != session['id']:
errors.append('That email has already been used')
if not utils.get_config('prevent_name_change') and name_len:
errors.append('Pick a longer team name')
if website.strip() and not utils.validate_url(website):
errors.append("That doesn't look like a valid URL")
if len(errors) > 0:
return render_template('profile.html', name=name, email=email, website=website,
affiliation=affiliation, country=country, errors=errors)
else:
team = Teams.query.filter_by(id=session['id']).first()
if team.name != name:
if not utils.get_config('prevent_name_change'):
team.name = name
session['username'] = team.name
if team.email != email.lower():
team.email = email.lower()
if utils.get_config('verify_emails'):
team.verified = False
if 'password' in request.form.keys() and not len(request.form['password']) == 0:
team.password = bcrypt_sha256.encrypt(request.form.get('password'))
message = json.dumps(["change", name, password])
channel.basic_publish(exchange='',
routing_key='shell_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
))
team.website = website
team.affiliation = affiliation
team.country = country
db.session.commit()
db.session.close()
return redirect(url_for('views.profile'))
else:
user = Teams.query.filter_by(id=session['id']).first()
name = user.name
email = user.email
website = user.website
affiliation = user.affiliation
country = user.country
prevent_name_change = utils.get_config('prevent_name_change')
confirm_email = utils.get_config('verify_emails') and not user.verified
return render_template('profile.html', name=name, email=email, website=website, affiliation=affiliation,
country=country, prevent_name_change=prevent_name_change, confirm_email=confirm_email)
else:
return redirect(url_for('auth.login'))
app.view_functions['auth.register'] = new_register
app.view_functions['views.profile'] = new_profile
app.view_functions['auth.reset_password'] = new_reset_pass