Skip to content

Commit

Permalink
Add test cases, run tests in subprocesses
Browse files Browse the repository at this point in the history
  • Loading branch information
julvo committed Nov 4, 2019
1 parent 88e911a commit 3759922
Showing 1 changed file with 64 additions and 20 deletions.
84 changes: 64 additions & 20 deletions reloading/test_reloading.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
from reloading import reloading

SRC_FILE_NAME = 'temporary_testing_file.py'
SRC_FILE_CONTENT = '''

TEST_CHANGING_SOURCE_CONTENT = '''
from reloading import reloading
from time import sleep
Expand All @@ -15,6 +16,57 @@
print('INITIAL_FILE_CONTENTS')
'''

TEST_KEEP_LOCAL_VARIABLES_CONTENT = '''
from reloading import reloading
from time import sleep
fpath = "DON'T CHANGE ME"
for epoch in reloading(range(1)):
assert fpath == "DON'T CHANGE ME"
'''

TEST_PERSIST_AFTER_LOOP = '''
from reloading import reloading
from time import sleep
state = 'INIT'
for epoch in reloading(range(1)):
state = 'CHANGED'
assert state == 'CHANGED'
'''


def run_and_update_source(init_src, updated_src=None, update_after=0.2):
'''Runs init_src in a subprocess and updates source to updated_src after
update_after seconds. Returns the standard output of the subprocess and
whether the subprocess produced an uncaught exception.
'''
with open(SRC_FILE_NAME, 'w') as f:
f.write(init_src)

cmd = ['python', SRC_FILE_NAME]
with sp.Popen(cmd, stdout=sp.PIPE, stderr=sp.PIPE) as proc:
if updated_src != None:
time.sleep(update_after)
with open(SRC_FILE_NAME, 'w') as f:
f.write(updated_src)

try:
stdout, _ = proc.communicate(timeout=2)
stdout = stdout.decode('utf-8')
has_error = False
except:
stdout = ''
has_error = True
proc.terminate()

if os.path.isfile(SRC_FILE_NAME):
os.remove(SRC_FILE_NAME)

return stdout, has_error


class TestReloading(unittest.TestCase):

def test_simple_looping(self):
Expand All @@ -23,29 +75,21 @@ def test_simple_looping(self):
iters += 1

def test_changing_source(self):
with open(SRC_FILE_NAME, 'w') as f:
f.write(SRC_FILE_CONTENT)
stdout, _ = run_and_update_source(
init_src=TEST_CHANGING_SOURCE_CONTENT,
updated_src=TEST_CHANGING_SOURCE_CONTENT.replace('INITIAL', 'CHANGED').rstrip('\n'))

cmd = ['python', SRC_FILE_NAME]
with sp.Popen(cmd, stdout=sp.PIPE) as proc:
# wait for first loop iterations to run before changing source file
time.sleep(0.2)
with open(SRC_FILE_NAME, 'w') as f:
f.write(SRC_FILE_CONTENT.replace('INITIAL', 'CHANGED').rstrip('\n'))
self.assertTrue('INITIAL_FILE_CONTENTS' in stdout and
'CHANGED_FILE_CONTENTS' in stdout)

# check if output contains results from before and after change
stdout = proc.stdout.read().decode('utf-8')
self.assertTrue('INITIAL_FILE_CONTENTS' in stdout and
'CHANGED_FILE_CONTENTS' in stdout)
def test_keep_local_variables(self):
_, has_error = run_and_update_source(init_src=TEST_KEEP_LOCAL_VARIABLES_CONTENT)
self.assertFalse(has_error)

def test_local_vars_are_not_overwritten(self):
fpath = "test"
for _ in reloading(range(1)):
self.assertEqual(fpath, "test")
def test_persist_after_loop(self):
_, has_error = run_and_update_source(init_src=TEST_PERSIST_AFTER_LOOP)
self.assertFalse(has_error)

def tearDown(self):
if os.path.isfile(SRC_FILE_NAME):
os.remove(SRC_FILE_NAME)

if __name__ == '__main__':
unittest.main()

0 comments on commit 3759922

Please sign in to comment.