Skip to content
This repository has been archived by the owner on Feb 20, 2018. It is now read-only.

Commit

Permalink
Merge pull request #116 from 2gis/response_generator
Browse files Browse the repository at this point in the history
Response generator
  • Loading branch information
z00sts committed Sep 28, 2015
2 parents 07bb9b3 + 686863d commit 7564fe3
Show file tree
Hide file tree
Showing 21 changed files with 555 additions and 477 deletions.
19 changes: 0 additions & 19 deletions core/db/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,6 @@ def __init__(self, connection_string):
expire_on_commit=False)
self.DBSession = scoped_session(self.session_maker)

@transaction
def get_session(self, session_id, dbsession=None):
from core.sessions import Session as WrappedSession
return dbsession.query(WrappedSession).get(session_id)

@transaction
def get_log_steps_for_session(self, session_id, dbsession=None):
return dbsession.query(SessionLogStep).filter_by(
Expand All @@ -78,20 +73,6 @@ def get_last_step(self, session, dbsession=None):
session_id=session.id, milestone=True).order_by(
desc(SessionLogStep.id)).first()

@transaction
def get_sessions(self, dbsession=None):
from core.sessions import Session as WrappedSession
return dbsession.query(WrappedSession).filter_by(
closed=False, timeouted=False, status='running').all()

@transaction
def get_all_active_sessions(self, dbsession=None):
from core.sessions import Session as WrappedSession
sessions = dbsession.query(WrappedSession).filter_by(
closed=False, timeouted=False).all()
return [session for session in sessions
if session.status in ('running', 'waiting')]

@transaction
def get_queue(self, dbsession=None):
from core.sessions import Session as WrappedSession
Expand Down
6 changes: 6 additions & 0 deletions core/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# coding: utf-8

from libvirt import libvirtError # NOQA


Expand Down Expand Up @@ -31,3 +33,7 @@ class ConnectionError(Exception):

class NoSuchEndpoint(Exception):
pass


class QueueItemNotFound(Exception):
pass
114 changes: 55 additions & 59 deletions core/sessions.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from core.config import config
from core.logger import log
from core.exceptions import SessionException
from vmpool.endpoint import delete_vm

from flask import current_app

Expand Down Expand Up @@ -66,6 +67,7 @@ def __init__(self, status_code=None, headers=None, content=None):
class Session(SessionModel):
def __init__(self, name=None, dc=None):
super(Session, self).__init__(name, dc)
current_app.sessions.put(self)

@property
def inactivity(self):
Expand All @@ -75,17 +77,8 @@ def inactivity(self):
def duration(self):
return (datetime.now() - self.created).total_seconds()

def is_timeouted(self):
self.refresh()
return self.timeouted

def is_closed(self):
self.refresh()
return self.closed

@property
def info(self):
self.refresh()
stat = {
"id": self.id,
"name": self.name,
Expand Down Expand Up @@ -117,11 +110,11 @@ def restart_timer(self):
self.save()

def delete(self, message=""):
self.refresh()
if self.endpoint_name:
current_app.sessions.remove(self)
if hasattr(self, "endpoint"):
log.info("Deleting VM for session: %s" % self.id)

from vmpool.api.endpoint import delete_vm
self.endpoint.delete()
else:
delete_vm(self.endpoint_name)
log.info("Session %s deleted. %s" % (self.id, message))

Expand All @@ -139,8 +132,8 @@ def failed(self, tb="Session closed by user"):
self.delete(tb)

def set_vm(self, endpoint):
self.endpoint_ip = endpoint.get('ip', None)
self.endpoint_name = endpoint.get('name', None)
self.endpoint_ip = endpoint.ip
self.endpoint_name = endpoint.name

def run(self, endpoint):
self.restart_timer()
Expand All @@ -150,9 +143,7 @@ def run(self, endpoint):
log.info("Session %s starting on %s." % (self.id, self.endpoint_name))

def timeout(self):
self.refresh()
self.timeouted = True
self.save()
self.failed("Session timeout")

def add_sub_step(self, control_line, body=None):
Expand Down Expand Up @@ -180,36 +171,14 @@ def make_request(self, port, request):
t.start()

response = None
while not response:
if self.timeouted:
response = SimpleResponse(
status_code=500,
headers={},
content='{"status": 1, "value": "Session timeouted"}'
)
elif self.closed:
response = SimpleResponse(
status_code=500,
headers={},
content='{"status": 1, "value": "Session closed"}'
)
else:
if not t.isAlive():
response = q.get()
del q
del t
if isinstance(response, Exception):
raise response
response = SimpleResponse(
status_code=response.status_code,
headers=response.headers,
content=response.content
)
t = None
elif t is not None:
t.join(0.1)

return response.status_code, response.headers, response.content
while t.isAlive():
yield None, None, None

response = q.get()
if isinstance(response, Exception):
raise response

yield response.status_code, response.headers, response.content


class SessionWorker(Thread):
Expand All @@ -219,13 +188,10 @@ def __init__(self, app):
self.daemon = True
self.app = app

def active_sessions(self):
return self.app.database.get_sessions()

def run(self):
with self.app.app_context():
while self.running:
for session in self.active_sessions():
for session in self.app.sessions.active():
if session.inactivity > config.SESSION_TIMEOUT:
session.timeout()
time.sleep(1)
Expand All @@ -237,13 +203,43 @@ def stop(self):


class Sessions(object):
@staticmethod
def get_session(session_id):
session = current_app.database.get_session(session_id)

if not session or session.is_closed():
raise SessionException("There is no active session %s" %
session_id)
active_sessions = dict()

def put(self, session):
if str(session.id) not in self.active_sessions.keys():
self.active_sessions[str(session.id)] = session
else:
raise SessionException("Duplicate session id: %s" % session.id)

def remove(self, session):
try:
del self.active_sessions[str(session.id)]
except KeyError:
pass

def active(self):
return self.active_sessions.values()

def kill_all(self):
for session in self.active_sessions.values():
session.delete()

def get_session(self, session_id):
try:
session = self.active_sessions[str(session_id)]
except KeyError:
raise SessionException(
"There is no active session %s" % session_id
)

if session.closed:
if session.timeouted:
raise SessionException(
"Session %s timeouted" % session_id
)
else:
raise SessionException(
"Session %s closed" % session_id
)

session.refresh()
return session
7 changes: 7 additions & 0 deletions core/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,13 @@ def wait_for(condition, timeout=5):

return condition()

def generator_wait_for(condition, timeout=5):
start = time.time()
while not condition() and time.time() - start < timeout:
yield None

yield condition()


class BucketThread(Thread):
def __init__(self, bucket, *args, **kwargs):
Expand Down
55 changes: 28 additions & 27 deletions tests/integrational/test_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ def setUpClass(cls):

from flask import Flask
cls.app = Flask(__name__)
cls.app.sessions = None

from core import db
cls.app.database = db.Database(config.DATABASE)
Expand All @@ -24,24 +25,34 @@ def setUp(self):
self.ctx = self.app.app_context()
self.ctx.push()

def tearDown(self):
self.ctx.pop()

def test_file_deletion(self):
with patch(
'core.utils.init.home_dir', Mock(return_value=config.BASE_DIR)
), patch(
'core.logger.setup_logging', Mock(return_value=Mock())
), patch(
'flask.current_app.sessions', Mock()
), patch(
'core.network.Network', Mock()
), patch(
'core.connection.Virsh', Mock()
):
from vmmaster import cleanup
self.cleanup = cleanup

from core.sessions import Session
self.session = Session()
self.session.status = 'unknown'

session = Session()
session.status = 'unknown'
session.name = '__test_file_deletion'
session.save()
def tearDown(self):
self.ctx.pop()

session_dir = os.path.join(config.SCREENSHOTS_DIR, str(session.id))
def test_file_deletion(self):
self.session.name = '__test_file_deletion'
self.session.save()

session_dir = os.path.join(
config.SCREENSHOTS_DIR, str(self.session.id)
)
system_utils.run_command(
["mkdir", config.SCREENSHOTS_DIR],
silent=True)
Expand All @@ -52,30 +63,20 @@ def test_file_deletion(self):
["touch", os.path.join(session_dir, "file_for_deletion")],
silent=True)

cleanup.delete_session_data([session])
self.cleanup.delete_session_data([self.session])
self.assertEqual(os.path.isdir(session_dir), 0)
system_utils.run_command(
["rm", "-rf", config.SCREENSHOTS_DIR], silent=True)

def test_outdated_sessions(self):
with patch(
'core.utils.init.home_dir', Mock(return_value=config.BASE_DIR)
), patch(
'core.logger.setup_logging', Mock(return_value=Mock())
):
from vmmaster import cleanup
from core.sessions import Session

session = Session()
session.status = 'unknown'
session.closed = True
session.name = '__test_outdated_sessions'
session.created = \
self.session.closed = True
self.session.name = '__test_outdated_sessions'
self.session.created = \
datetime.now() - timedelta(days=config.SCREENSHOTS_DAYS, seconds=1)
session.save()
self.session.save()

outdated_sessions = cleanup.old_sessions()
outdated_sessions = self.cleanup.old_sessions()
outdated_ids = [s.id for s in outdated_sessions]

self.assertIn(session.id, outdated_ids)
cleanup.delete_session_data([session])
self.assertIn(self.session.id, outdated_ids)
self.cleanup.delete_session_data([self.session])
18 changes: 17 additions & 1 deletion tests/unit/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,14 +193,30 @@ def shortDescription(self):
return None


primary_key_mock = 1


def set_primary_key(_self):
global primary_key_mock
_self.id = primary_key_mock
primary_key_mock += 1
pass


class DatabaseMock(Mock):
def __init__(self, *args, **kwargs):
super(DatabaseMock, self).__init__(*args, **kwargs)
self.add = Mock(side_effect=set_primary_key)


def vmmaster_server_mock(port):
with patch(
'core.network.Network', Mock(
return_value=Mock(get_ip=Mock(return_value='0')))
), patch(
'core.connection.Virsh', Mock()
), patch(
'core.db.database', Mock()
'core.db.database', DatabaseMock()
), patch(
'core.utils.init.home_dir', Mock(return_value=fake_home_dir())
), patch(
Expand Down
Loading

0 comments on commit 7564fe3

Please sign in to comment.