Skip to content

Commit

Permalink
format
Browse files Browse the repository at this point in the history
  • Loading branch information
jasonray committed Dec 25, 2023
1 parent 30d5219 commit dd18bcf
Showing 1 changed file with 41 additions and 95 deletions.
136 changes: 41 additions & 95 deletions pulpo_messaging/tests/test_fqa_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
class TestFqaCli(unittest.TestCase):
fqa_config = 'pulpo-config-fqa.json'

def run_cli(self, command, config: str=None, fqa_base_directory: str = None, additional_args=None, error_on_fail:bool = True):
def run_cli(self, command, config: str = None, fqa_base_directory: str = None, additional_args=None, error_on_fail: bool = True):
if not config:
config = self.fqa_config

Expand All @@ -30,32 +30,38 @@ def run_cli(self, command, config: str=None, fqa_base_directory: str = None, add
print(f'{command=} => {result=}')
return result

def run_put(self, message: str=None, fqa_base_directory:str=None , error_on_fail:bool = True ) :
def run_put(self, message: str = None, fqa_base_directory: str = None, error_on_fail: bool = True):
if not message:
message = 'hello world'

additional_args = [f'--payload="{message}"']
result = self.run_cli(command='queue.put', config=self.fqa_config,fqa_base_directory=fqa_base_directory, additional_args=additional_args, error_on_fail=error_on_fail)
result = self.run_cli(command='queue.put', config=self.fqa_config, fqa_base_directory=fqa_base_directory, additional_args=additional_args, error_on_fail=error_on_fail)

if result.returncode==0:
if result.returncode == 0:
message_id = self.get_message_id_from_output(result)
print(f'run put cli [{message_id=}]')
return result, message_id
else:
return result, None
message_id = None


def run_pop(self, fqa_base_directory:str=None , error_on_fail:bool = True ) :
result = self.run_cli(command='queue.pop', config=self.fqa_config, fqa_base_directory=fqa_base_directory)
return result, message_id

def run_pop(self, fqa_base_directory: str = None, error_on_fail: bool = True):
result = self.run_cli(command='queue.pop', config=self.fqa_config, fqa_base_directory=fqa_base_directory, error_on_fail=error_on_fail)

message_id = self.get_message_id_from_output(result)
print(f'run pop cli [{message_id=}]')
return result, message_id
return result, message_id

def run_peek(self, message_id: str, fqa_base_directory:str=None , error_on_fail:bool = True ) :
def run_peek(self, message_id: str, fqa_base_directory: str = None, error_on_fail: bool = True):
additional_args = [f'--id={message_id}']
result = self.run_cli(command='queue.peek', config=self.fqa_config,fqa_base_directory=fqa_base_directory, additional_args=additional_args, error_on_fail=error_on_fail)
print(f'run peek cli')
result = self.run_cli(command='queue.peek', config=self.fqa_config, fqa_base_directory=fqa_base_directory, additional_args=additional_args, error_on_fail=error_on_fail)
print('run peek cli')
return result

def run_delete(self, message_id: str, fqa_base_directory: str = None, error_on_fail: bool = True):
additional_args = [f'--id={message_id}']
result = self.run_cli(command='queue.delete', config=self.fqa_config, fqa_base_directory=fqa_base_directory, additional_args=additional_args, error_on_fail=error_on_fail)
print('run delete cli')
return result

def get_message_id_from_output(self, result):
Expand All @@ -65,28 +71,30 @@ def get_message_id_from_output(self, result):
# print(f'attempt to extract message id from string: {output_str}')
match = re.search(r"(?:message\.id|message_id)='([^']+)'", output_str)

self.assertIsNotNone(match, f'unable to determine job id from output [{output_str}]')
message_id = match.group(1)
# print(f'extracted {message_id=}')
if match:
message_id = match.group(1)
else:
message_id = None

return message_id

def test_put(self):
result , message_id = self.run_put(message = 'hello world!')
_, message_id = self.run_put(message='hello world!')
self.assertIsNotNone(message_id)

def test_fail_put(self):
# create a directory which the application has no permissions to write
# create a directory which the application has no permissions to write
# this will cause failure
fqa_base_directory = os.path.join(get_unique_base_path("cli"), 'invalid')
os.makedirs(fqa_base_directory)
os.chmod(path=fqa_base_directory, mode=0)

with self.assertRaises(Exception):
self.run_cli(command='queue.peek', config=self.fqa_config, fqa_base_directory=fqa_base_directory , error_on_fail=True )
self.run_put(message='this will fail', fqa_base_directory=fqa_base_directory)

def test_put_peek(self):
fqa_base_directory = get_unique_base_path()
result , message_id = self.run_put(message='hello world', fqa_base_directory=fqa_base_directory)
fqa_base_directory = get_unique_base_path("cli")
result, message_id = self.run_put(message='hello world', fqa_base_directory=fqa_base_directory)

result = self.run_peek(message_id=message_id, fqa_base_directory=fqa_base_directory, error_on_fail=True)
assert result.returncode == 0
Expand All @@ -98,89 +106,27 @@ def test_put_pop_peek(self):
result, put_message_id = self.run_put(message='hello world', fqa_base_directory=fqa_base_directory, error_on_fail=True)
self.assertIsNotNone(put_message_id)

result , pop_message_id = self.run_pop(fqa_base_directory=fqa_base_directory , error_on_fail=True)
result, pop_message_id = self.run_pop(fqa_base_directory=fqa_base_directory, error_on_fail=True)
self.assertIsNotNone(pop_message_id)
self.assertEqual(pop_message_id, put_message_id)

additional_args = [f'--id={put_message_id}']
result = self.run_cli(command='queue.peek', config=self.fqa_config, fqa_base_directory=fqa_base_directory, additional_args=additional_args)

assert result.returncode == 0
result = self.run_peek(put_message_id, fqa_base_directory=fqa_base_directory)
assert 'no message' in str(result.stdout)

def test_put_delete_peek(self):
fqa_base_directory = get_unique_base_path("cli")

additional_args = ['--payload="hello world!"']
result = self.run_cli(command='queue.put', config=self.fqa_config, fqa_base_directory=fqa_base_directory, additional_args=additional_args)
assert result.returncode == 0

put_message_id = self.get_message_id_from_output(result)
print(f'put {put_message_id=}')
result, put_message_id = self.run_put(message='hello world', fqa_base_directory=fqa_base_directory)
self.assertIsNotNone(put_message_id)

additional_args = [f'--id={put_message_id}']
result = self.run_cli(command='queue.delete', config=self.fqa_config, fqa_base_directory=fqa_base_directory, additional_args=additional_args)
assert result.returncode == 0

delete_message_id = self.get_message_id_from_output(result)
print(f'delete {delete_message_id=}')
self.assertIsNotNone(delete_message_id)
self.assertEqual(delete_message_id, put_message_id)
result = self.run_delete(message_id=put_message_id, fqa_base_directory=fqa_base_directory)

additional_args = [f'--id={put_message_id}']
result = self.run_cli(command='queue.peek', config=self.fqa_config, fqa_base_directory=fqa_base_directory, additional_args=additional_args)
assert result.returncode == 0
result = self.run_peek(put_message_id, fqa_base_directory=fqa_base_directory)
assert 'no message' in str(result.stdout)

# @with_beanstalkd()
# def test_put_delete_peek(self):
# tubename = self.get_tubename()
# additional_args = ['--body=sample']
# result =self.run_cli(command='put', tubename=tubename , additional_args=additional_args )
# assert result.returncode == 0

# job_id = self.get_job_id_from_output(result)
# print(f'{job_id=}')
# self.assertIsNotNone(job_id)

# additional_args = [f'--id={job_id}']
# result =self.run_cli(command='peek', tubename=tubename , additional_args=additional_args )
# assert result.returncode == 0
# assert 'sample' in str(result.stdout)

# print(f'pop {job_id=}')
# additional_args = [f'--id={job_id}']
# result =self.run_cli(command='delete', tubename=tubename , additional_args=additional_args )
# assert result.returncode == 0

# additional_args = [f'--id={job_id}']
# result =self.run_cli(command='peek', tubename=tubename , additional_args=additional_args )
# assert result.returncode == 1

# @with_beanstalkd()
# def test_pop_empty(self):
# tubename = self.get_tubename()
# result =self.run_cli(command='pop', tubename=tubename )
# assert result.returncode == 0
# assert 'no message available' in str( result.stdout)

# @with_beanstalkd()
# def test_peek_no_job_id(self):
# tubename = self.get_tubename()
# result =self.run_cli(command='peek', tubename=tubename )
# assert result.returncode == 1
# assert 'invalid job id' in str( result.stderr)

# @with_beanstalkd()
# def test_delete_no_job_id(self):
# tubename = self.get_tubename()
# result =self.run_cli(command='peek', tubename=tubename )
# assert result.returncode == 1
# assert 'invalid job id' in str( result.stderr)

# @with_beanstalkd()
# def test_invalid_command(self):
# tubename = self.get_tubename()
# result =self.run_cli(command='invalidcode', tubename=tubename , additional_args=[] )
# assert result.returncode != 0
def test_put_empty(self):
fqa_base_directory = get_unique_base_path("cli")

result, pop_message_id = self.run_pop(fqa_base_directory=fqa_base_directory, error_on_fail=True)
self.assertIsNone(pop_message_id)
assert 'no message' in str(result.stdout)

0 comments on commit dd18bcf

Please sign in to comment.