diff --git a/.travis.yml b/.travis.yml index 79dd36ae3..ea9ee052c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -10,15 +10,9 @@ addons: - service1.example.com - service2.example.com - service3.example.com -install: - - travis_fold start "Install.Pip.Package" && make install-dev && pip3 show mockintosh && travis_fold end "Install.Pip.Package" - - travis_fold start "Build.Image" && make build && docker image ls mockintosh && travis_fold end "Build.Image" script: - stty cols 120 - - ./ps.sh & - - travis_fold start "Unit.Tests" && make test-with-coverage && travis_fold end "Unit.Tests" - - travis_fold start "StopContainers.Tests" && make stop-containers && travis_fold end "StopContainers.Tests" - - travis_fold start "Integration.Tests" && make test-integration && travis_fold end "Integration.Tests" + - ./tests.sh after_success: - if [[ $TRAVIS_TAG =~ ^([0-9]+\.?)+$ ]]; then git push --force https://${GH_TOKEN}@github.com/up9inc/mockintosh.git HEAD:gh-pages; else echo Not pushing "$TRAVIS_TAG"; fi - make coverage-after diff --git a/tests.sh b/tests.sh new file mode 100755 index 000000000..f289d33f7 --- /dev/null +++ b/tests.sh @@ -0,0 +1,19 @@ +#!/bin/bash -xe + +# this file is single entrypoint, to be used in local Dockerized tests +cd $(dirname $0) + +rm -f .coverage.* +docker ps # test if docker is operational +docker kill kafka || true + +make build +docker run -it mockintosh --help +docker image ls mockintosh + +tests/ps.sh & + +make install-dev +make test-with-coverage +make stop-containers +make test-integration \ No newline at end of file diff --git a/tests/Dockerfile b/tests/Dockerfile new file mode 100644 index 000000000..d9d8fc4d9 --- /dev/null +++ b/tests/Dockerfile @@ -0,0 +1,10 @@ +FROM up9inc/mockintosh + +RUN apt update && apt install -y rsync docker.io build-essential + +RUN pip uninstall -y mockintosh + +RUN echo 127.0.0.1 service1.example.com >> /etc/hosts +RUN echo 127.0.0.1 service2.example.com >> /etc/hosts +RUN echo 127.0.0.1 service3.example.com >> /etc/hosts + diff --git a/ps.sh b/tests/ps.sh similarity index 100% rename from ps.sh rename to tests/ps.sh diff --git a/tests/test_features_async.py b/tests/test_features_async.py index 521712aa8..1a555b85e 100644 --- a/tests/test_features_async.py +++ b/tests/test_features_async.py @@ -5,37 +5,28 @@ .. module:: __init__ :synopsis: Contains classes that tests mock server's asynchronous features. """ - -import sys +import json +import logging import os +import shlex +import subprocess +import sys +import tempfile +import threading import time -import json import warnings -import threading -import subprocess -from urllib.parse import urlparse from typing import ( Union ) +from urllib.parse import urlparse -import pytest import httpx +import pytest import yaml from jsonschema.validators import validate as jsonschema_validate -from mockintosh.constants import PROGRAM, PYBARS, JINJA, WARN_GPUBSUB_PACKAGE, WARN_AMAZONSQS_PACKAGE from mockintosh import start_render_queue -from mockintosh.services.asynchronous.kafka import ( # noqa: F401 - KafkaService, - KafkaActor, - KafkaConsumer, - KafkaConsumerGroup, - KafkaProducer, - KafkaProducerPayloadList, - KafkaProducerPayload, - _create_topic as kafka_create_topic, - build_single_payload_producer as kafka_build_single_payload_producer -) +from mockintosh.constants import PROGRAM, PYBARS, JINJA, WARN_GPUBSUB_PACKAGE, WARN_AMAZONSQS_PACKAGE from mockintosh.services.asynchronous.amqp import ( # noqa: F401 AmqpService, AmqpActor, @@ -47,6 +38,17 @@ _create_topic as amqp_create_topic, build_single_payload_producer as amqp_build_single_payload_producer ) +from mockintosh.services.asynchronous.kafka import ( # noqa: F401 + KafkaService, + KafkaActor, + KafkaConsumer, + KafkaConsumerGroup, + KafkaProducer, + KafkaProducerPayloadList, + KafkaProducerPayload, + _create_topic as kafka_create_topic, + build_single_payload_producer as kafka_build_single_payload_producer +) from mockintosh.services.asynchronous.redis import ( # noqa: F401 RedisService, RedisActor, @@ -134,21 +136,18 @@ HAR_JSON_SCHEMA = {"$ref": "https://raw.githubusercontent.com/undera/har-jsonschema/master/har-schema.json"} should_cov = os.environ.get('COVERAGE_PROCESS_START', False) -async_service_type = None -class AsyncBase(): - +class AsyncBase: + async_service_type = None mock_server_process = None @classmethod def setup_class(cls): - global async_service_type - - config = 'configs/yaml/hbs/%s/config.yaml' % async_service_type + config = 'configs/yaml/hbs/%s/config.yaml' % cls.async_service_type # Create the Async topics/queues - for topic in ( + topics_to_create = ( 'topic1', 'topic2', 'topic3', @@ -175,25 +174,41 @@ def setup_class(cls): 'binary-topic', 'chain1-step1', 'chain1-step2' - ): - getattr(sys.modules[__name__], '%s_create_topic' % async_service_type)(ASYNC_ADDR[async_service_type], topic) + ) + for topic in topics_to_create: + create_topic_method = getattr(sys.modules[__name__], '%s_create_topic' % cls.async_service_type) + create_topic_method(ASYNC_ADDR[cls.async_service_type], topic) time.sleep(ASYNC_CONSUME_TIMEOUT / 20) - - cmd = '%s %s' % (PROGRAM, get_config_path(config)) + logfile = tempfile.mktemp(prefix=PROGRAM, suffix=".log") + cmd = '%s -v -l %s %s' % (PROGRAM, logfile, get_config_path(config)) if should_cov: cmd = 'coverage run --parallel -m %s' % cmd + else: + cmd = sys.executable + " -m " + cmd this_env = os.environ.copy() + logging.info("Starting mockintosh: %s", cmd) AsyncBase.mock_server_process = subprocess.Popen( - cmd, - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - shell=True, + shlex.split(cmd), + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, env=this_env ) - time.sleep(ASYNC_CONSUME_TIMEOUT / 20) - getattr(sys.modules[__name__], '%s_create_topic' % async_service_type)(ASYNC_ADDR[async_service_type], 'topic2') + cnt = 0 + while cnt < ASYNC_CONSUME_TIMEOUT: + cnt += 1 + time.sleep(1) + assert AsyncBase.mock_server_process.poll() is None + if os.path.exists(logfile): + with open(logfile) as fp: + if "Mock server is ready!" in fp.read(): + break + else: + logging.debug("Waiting for Mockintosh to be ready") + + create_topic = getattr(sys.modules[__name__], '%s_create_topic' % cls.async_service_type) + create_topic(ASYNC_ADDR[cls.async_service_type], 'topic2') resp = httpx.post(MGMT + '/traffic-log', data={"enable": True}, verify=False) assert 204 == resp.status_code @@ -207,10 +222,11 @@ def teardown_class(cls): os.system('killall -2 %s' % name) def assert_consumer_log(self, data: dict, key: Union[str, None], value: str, headers: dict, invert: bool = False): - global async_service_type - if async_service_type not in ('redis', 'mqtt') and key is not None: - criteria = any(any(header['name'] == 'X-%s-Message-Key' % PROGRAM.capitalize() and header['value'] == key for header in entry['response']['headers']) for entry in data['log']['entries']) + if self.async_service_type not in ('redis', 'mqtt') and key is not None: + criteria = any(any( + header['name'] == 'X-%s-Message-Key' % PROGRAM.capitalize() and header['value'] == key for header in + entry['response']['headers']) for entry in data['log']['entries']) if invert: assert not criteria else: @@ -221,9 +237,11 @@ def assert_consumer_log(self, data: dict, key: Union[str, None], value: str, hea else: assert criteria - if async_service_type not in ('redis', 'mqtt'): + if self.async_service_type not in ('redis', 'mqtt'): for n, v in headers.items(): - criteria = any(any(header['name'] == n.title() and header['value'] == v for header in entry['response']['headers']) for entry in data['log']['entries']) + criteria = any( + any(header['name'] == n.title() and header['value'] == v for header in entry['response']['headers']) + for entry in data['log']['entries']) if invert: assert not criteria else: @@ -233,6 +251,7 @@ def assert_async_consume(self, callback, *args): start = time.time() while True: try: + logging.debug("Trying consume: %r(%r)...", callback, args) callback(*args) except AssertionError: time.sleep(ASYNC_CONSUME_WAIT) @@ -243,7 +262,6 @@ def assert_async_consume(self, callback, *args): break def test_get_async(self): - global async_service_type for _format in ('json', 'yaml'): resp = httpx.get(MGMT + '/async?format=%s' % _format, verify=False) @@ -256,22 +274,22 @@ def test_get_async(self): producers = data['producers'] consumers = data['consumers'] - assert len(producers) == 24 if async_service_type == 'gpubsub' else 25 - assert len(consumers) == 15 if async_service_type == 'gpubsub' else 16 + assert len(producers) == 24 if self.async_service_type == 'gpubsub' else 25 + assert len(consumers) == 15 if self.async_service_type == 'gpubsub' else 16 - assert producers[0]['type'] == async_service_type + assert producers[0]['type'] == self.async_service_type assert producers[0]['name'] is None assert producers[0]['index'] == 0 assert producers[0]['queue'] == 'topic1' assert producers[0]['producedMessages'] == 0 assert producers[0]['lastProduced'] is None - assert producers[3]['type'] == async_service_type + assert producers[3]['type'] == self.async_service_type assert producers[3]['name'] == 'actor6' assert producers[3]['index'] == 3 assert producers[3]['queue'] == 'topic6' - assert consumers[0]['type'] == async_service_type + assert consumers[0]['type'] == self.async_service_type assert consumers[0]['name'] is None assert consumers[0]['index'] == 0 assert consumers[0]['queue'] == 'topic2' @@ -279,7 +297,7 @@ def test_get_async(self): assert consumers[0]['consumedMessages'] == 0 assert consumers[0]['lastConsumed'] is None - assert consumers[3]['type'] == async_service_type + assert consumers[3]['type'] == self.async_service_type assert consumers[3]['name'] == 'actor9' assert consumers[3]['index'] == 3 assert consumers[3]['queue'] == 'topic9' @@ -310,14 +328,14 @@ def test_get_async_chain(self): ) def assert_get_async_consume( - self, - key, - value, - headers, - not_key, - not_value, - not_headers1, - not_headers2 + self, + key, + value, + headers, + not_key, + not_value, + not_headers1, + not_headers2 ): resp = httpx.get(MGMT + '/async/consumers/0', verify=False) assert 200 == resp.status_code @@ -329,7 +347,6 @@ def assert_get_async_consume( self.assert_consumer_log(data, not_key, not_value, not_headers2, invert=True) def test_get_async_consume(self): - global async_service_type key = 'key2' value = """ @@ -347,13 +364,13 @@ def test_get_async_consume(self): value_json_decode_error = 'JSON Decode Error' queue, job = start_render_queue() - async_service = getattr(sys.modules[__name__], '%sService' % async_service_type.capitalize())( - ASYNC_ADDR[async_service_type], + async_service = getattr(sys.modules[__name__], '%sService' % self.async_service_type.capitalize())( + ASYNC_ADDR[self.async_service_type], definition=DefinitionMockForAsync(None, PYBARS, queue) ) - async_actor = getattr(sys.modules[__name__], '%sActor' % async_service_type.capitalize())(0) + async_actor = getattr(sys.modules[__name__], '%sActor' % self.async_service_type.capitalize())(0) async_service.add_actor(async_actor) - async_producer = getattr(sys.modules[__name__], '%s_build_single_payload_producer' % async_service_type)( + async_producer = getattr(sys.modules[__name__], '%s_build_single_payload_producer' % self.async_service_type)( 'topic2', value, key=key, @@ -365,7 +382,7 @@ def test_get_async_consume(self): async_producer.produce() time.sleep(1) - async_producer = getattr(sys.modules[__name__], '%s_build_single_payload_producer' % async_service_type)( + async_producer = getattr(sys.modules[__name__], '%s_build_single_payload_producer' % self.async_service_type)( 'topic2', value, key=not_key, @@ -375,7 +392,7 @@ def test_get_async_consume(self): async_producer.produce() time.sleep(1) - async_producer = getattr(sys.modules[__name__], '%s_build_single_payload_producer' % async_service_type)( + async_producer = getattr(sys.modules[__name__], '%s_build_single_payload_producer' % self.async_service_type)( 'topic2', not_value, key=key, @@ -385,7 +402,7 @@ def test_get_async_consume(self): async_producer.produce() time.sleep(1) - async_producer = getattr(sys.modules[__name__], '%s_build_single_payload_producer' % async_service_type)( + async_producer = getattr(sys.modules[__name__], '%s_build_single_payload_producer' % self.async_service_type)( 'topic2', value_json_decode_error, key=key, @@ -395,7 +412,7 @@ def test_get_async_consume(self): async_producer.produce() time.sleep(1) - async_producer = getattr(sys.modules[__name__], '%s_build_single_payload_producer' % async_service_type)( + async_producer = getattr(sys.modules[__name__], '%s_build_single_payload_producer' % self.async_service_type)( 'topic2', value, key=key, @@ -405,7 +422,7 @@ def test_get_async_consume(self): async_producer.produce() time.sleep(1) - async_producer = getattr(sys.modules[__name__], '%s_build_single_payload_producer' % async_service_type)( + async_producer = getattr(sys.modules[__name__], '%s_build_single_payload_producer' % self.async_service_type)( 'topic2', value, key=key, @@ -432,7 +449,7 @@ def test_get_async_consume(self): data = resp.json() consumers = data['consumers'] assert consumers[0]['captured'] == 1 - assert consumers[0]['consumedMessages'] == 2 if async_service_type not in ('redis', 'mqtt') else 3 + assert consumers[0]['consumedMessages'] == 2 if self.async_service_type not in ('redis', 'mqtt') else 3 job.kill() @@ -469,20 +486,19 @@ def assert_get_async_consume_no_key(self, key, value, headers): self.assert_consumer_log(data, key, value, headers) def test_get_async_consume_no_key(self): - global async_service_type key = None value = 'value10' headers = {} queue, job = start_render_queue() - async_service = getattr(sys.modules[__name__], '%sService' % async_service_type.capitalize())( - ASYNC_ADDR[async_service_type], + async_service = getattr(sys.modules[__name__], '%sService' % self.async_service_type.capitalize())( + ASYNC_ADDR[self.async_service_type], definition=DefinitionMockForAsync(None, JINJA, queue) ) - async_actor = getattr(sys.modules[__name__], '%sActor' % async_service_type.capitalize())(0) + async_actor = getattr(sys.modules[__name__], '%sActor' % self.async_service_type.capitalize())(0) async_service.add_actor(async_actor) - async_producer = getattr(sys.modules[__name__], '%s_build_single_payload_producer' % async_service_type)( + async_producer = getattr(sys.modules[__name__], '%s_build_single_payload_producer' % self.async_service_type)( 'topic10', value, key=key, @@ -519,7 +535,6 @@ def assert_get_async_consume_capture_limit_part2(self, value11_1, value11_2): assert any(entry['response']['content']['text'] == value11_2 for entry in data['log']['entries']) def test_get_async_consume_capture_limit(self): - global async_service_type topic10 = 'topic10' value10_1 = 'value10_1' @@ -530,22 +545,22 @@ def test_get_async_consume_capture_limit(self): value11_2 = 'value11_2' queue, job = start_render_queue() - async_service = getattr(sys.modules[__name__], '%sService' % async_service_type.capitalize())( - ASYNC_ADDR[async_service_type], + async_service = getattr(sys.modules[__name__], '%sService' % self.async_service_type.capitalize())( + ASYNC_ADDR[self.async_service_type], definition=DefinitionMockForAsync(None, PYBARS, queue) ) - async_actor = getattr(sys.modules[__name__], '%sActor' % async_service_type.capitalize())(0) + async_actor = getattr(sys.modules[__name__], '%sActor' % self.async_service_type.capitalize())(0) async_service.add_actor(async_actor) # topic10 START - async_producer = getattr(sys.modules[__name__], '%s_build_single_payload_producer' % async_service_type)( + async_producer = getattr(sys.modules[__name__], '%s_build_single_payload_producer' % self.async_service_type)( topic10, value10_1 ) async_actor.set_producer(async_producer) async_producer.produce() - async_producer = getattr(sys.modules[__name__], '%s_build_single_payload_producer' % async_service_type)( + async_producer = getattr(sys.modules[__name__], '%s_build_single_payload_producer' % self.async_service_type)( topic10, value10_2 ) @@ -560,14 +575,14 @@ def test_get_async_consume_capture_limit(self): # topic10 END # topic11 START - async_producer = getattr(sys.modules[__name__], '%s_build_single_payload_producer' % async_service_type)( + async_producer = getattr(sys.modules[__name__], '%s_build_single_payload_producer' % self.async_service_type)( topic11, value11_1 ) async_actor.set_producer(async_producer) async_producer.produce() - async_producer = getattr(sys.modules[__name__], '%s_build_single_payload_producer' % async_service_type)( + async_producer = getattr(sys.modules[__name__], '%s_build_single_payload_producer' % self.async_service_type)( topic11, value11_2 ) @@ -613,15 +628,13 @@ def test_get_async_bad_requests(self): assert resp.text == 'No consumer actor is found for: %r' % actor_name def assert_post_async_produce(self, async_consumer, key, value, headers): - global async_service_type - if async_service_type in ('redis', 'mqtt'): + if self.async_service_type in ('redis', 'mqtt'): assert any(row[1] == value for row in async_consumer.log) else: assert any(row[0] == key and row[1] == value and row[2] == headers for row in async_consumer.log) def test_post_async_produce(self): - global async_service_type key = 'key1' value = 'value1' @@ -632,21 +645,23 @@ def test_post_async_produce(self): } queue, job = start_render_queue() - async_service = getattr(sys.modules[__name__], '%sService' % async_service_type.capitalize())( - ASYNC_ADDR[async_service_type], + async_service = getattr(sys.modules[__name__], '%sService' % self.async_service_type.capitalize())( + ASYNC_ADDR[self.async_service_type], definition=DefinitionMockForAsync(None, PYBARS, queue) ) - async_actor = getattr(sys.modules[__name__], '%sActor' % async_service_type.capitalize())(0) + async_actor = getattr(sys.modules[__name__], '%sActor' % self.async_service_type.capitalize())(0) async_service.add_actor(async_actor) - async_consumer = getattr(sys.modules[__name__], '%sConsumer' % async_service_type.capitalize())('topic1', enable_topic_creation=True) + async_consumer = getattr(sys.modules[__name__], '%sConsumer' % self.async_service_type.capitalize())('topic1', + enable_topic_creation=True) async_actor.set_consumer(async_consumer) - async_consumer_group = getattr(sys.modules[__name__], '%sConsumerGroup' % async_service_type.capitalize())() + async_consumer_group = getattr(sys.modules[__name__], + '%sConsumerGroup' % self.async_service_type.capitalize())() async_consumer_group.add_consumer(async_consumer) t = threading.Thread(target=async_consumer_group.consume, args=(), kwargs={}) t.daemon = True t.start() - if async_service_type == 'gpubsub': + if self.async_service_type == 'gpubsub': time.sleep(ASYNC_CONSUME_TIMEOUT / 24) resp = httpx.post(MGMT + '/async/producers/0', verify=False) @@ -669,15 +684,13 @@ def test_post_async_binary_produce(self): assert 202 == resp.status_code def assert_post_async_produce_by_actor_name(self, async_consumer, key, value, headers): - global async_service_type - if async_service_type in ('redis', 'mqtt'): + if self.async_service_type in ('redis', 'mqtt'): assert any(row[1] == value for row in async_consumer.log) else: assert any(row[0] == key and row[1] == value and row[2] == headers for row in async_consumer.log) def test_post_async_produce_by_actor_name(self): - global async_service_type key = None value = 'value6' @@ -687,21 +700,22 @@ def test_post_async_produce_by_actor_name(self): } queue, job = start_render_queue() - async_service = getattr(sys.modules[__name__], '%sService' % async_service_type.capitalize())( - ASYNC_ADDR[async_service_type], + async_service = getattr(sys.modules[__name__], '%sService' % self.async_service_type.capitalize())( + ASYNC_ADDR[self.async_service_type], definition=DefinitionMockForAsync(None, JINJA, queue) ) - async_actor = getattr(sys.modules[__name__], '%sActor' % async_service_type.capitalize())(0) + async_actor = getattr(sys.modules[__name__], '%sActor' % self.async_service_type.capitalize())(0) async_service.add_actor(async_actor) - async_consumer = getattr(sys.modules[__name__], '%sConsumer' % async_service_type.capitalize())('topic6') + async_consumer = getattr(sys.modules[__name__], '%sConsumer' % self.async_service_type.capitalize())('topic6') async_actor.set_consumer(async_consumer) - async_consumer_group = getattr(sys.modules[__name__], '%sConsumerGroup' % async_service_type.capitalize())() + async_consumer_group = getattr(sys.modules[__name__], + '%sConsumerGroup' % self.async_service_type.capitalize())() async_consumer_group.add_consumer(async_consumer) t = threading.Thread(target=async_consumer_group.consume, args=(), kwargs={}) t.daemon = True t.start() - if async_service_type == 'gpubsub': + if self.async_service_type == 'gpubsub': time.sleep(ASYNC_CONSUME_TIMEOUT / 24) resp = httpx.post(MGMT + '/async/producers/actor6', verify=False) @@ -720,18 +734,17 @@ def test_post_async_produce_by_actor_name(self): job.kill() def assert_post_async_reactive_consumer( - self, - async_consumer, - consumer_key, - consumer_value, - consumer_headers, - producer_key, - producer_value, - producer_headers + self, + async_consumer, + consumer_key, + consumer_value, + consumer_headers, + producer_key, + producer_value, + producer_headers ): - global async_service_type - if async_service_type in ('redis', 'mqtt'): + if self.async_service_type in ('redis', 'mqtt'): assert any( (row[1] == '%s and %s' % ( consumer_value, @@ -755,7 +768,6 @@ def assert_post_async_reactive_consumer( ) def test_post_async_reactive_consumer(self): - global async_service_type producer_topic = 'topic4' producer_key = 'key4' @@ -774,27 +786,29 @@ def test_post_async_reactive_consumer(self): } queue, job = start_render_queue() - async_service = getattr(sys.modules[__name__], '%sService' % async_service_type.capitalize())( - ASYNC_ADDR[async_service_type], + async_service = getattr(sys.modules[__name__], '%sService' % self.async_service_type.capitalize())( + ASYNC_ADDR[self.async_service_type], definition=DefinitionMockForAsync(None, PYBARS, queue) ) - async_actor = getattr(sys.modules[__name__], '%sActor' % async_service_type.capitalize())(0) + async_actor = getattr(sys.modules[__name__], '%sActor' % self.async_service_type.capitalize())(0) async_service.add_actor(async_actor) - async_consumer = getattr(sys.modules[__name__], '%sConsumer' % async_service_type.capitalize())(consumer_topic) + async_consumer = getattr(sys.modules[__name__], '%sConsumer' % self.async_service_type.capitalize())( + consumer_topic) async_actor.set_consumer(async_consumer) - async_consumer_group = getattr(sys.modules[__name__], '%sConsumerGroup' % async_service_type.capitalize())() + async_consumer_group = getattr(sys.modules[__name__], + '%sConsumerGroup' % self.async_service_type.capitalize())() async_consumer_group.add_consumer(async_consumer) t = threading.Thread(target=async_consumer_group.consume, args=(), kwargs={}) t.daemon = True t.start() - async_service = getattr(sys.modules[__name__], '%sService' % async_service_type.capitalize())( - ASYNC_ADDR[async_service_type], + async_service = getattr(sys.modules[__name__], '%sService' % self.async_service_type.capitalize())( + ASYNC_ADDR[self.async_service_type], definition=DefinitionMockForAsync(None, PYBARS, queue) ) - async_actor = getattr(sys.modules[__name__], '%sActor' % async_service_type.capitalize())(0) + async_actor = getattr(sys.modules[__name__], '%sActor' % self.async_service_type.capitalize())(0) async_service.add_actor(async_actor) - async_producer = getattr(sys.modules[__name__], '%s_build_single_payload_producer' % async_service_type)( + async_producer = getattr(sys.modules[__name__], '%s_build_single_payload_producer' % self.async_service_type)( producer_topic, producer_value, key=producer_key, @@ -831,10 +845,9 @@ def test_post_async_bad_requests(self): assert resp.text == 'Invalid producer index!' def assert_post_async_producer_templated(self, async_consumer): - global async_service_type for i in range(2): - if async_service_type in ('redis', 'mqtt'): + if self.async_service_type in ('redis', 'mqtt'): assert any( (row[1][0].isupper()) for row in async_consumer.log @@ -858,24 +871,25 @@ def assert_post_async_producer_templated(self, async_consumer): ) def test_post_async_producer_templated(self): - global async_service_type queue, job = start_render_queue() - async_service = getattr(sys.modules[__name__], '%sService' % async_service_type.capitalize())( - ASYNC_ADDR[async_service_type], + async_service = getattr(sys.modules[__name__], '%sService' % self.async_service_type.capitalize())( + ASYNC_ADDR[self.async_service_type], definition=DefinitionMockForAsync(None, PYBARS, queue) ) - async_actor = getattr(sys.modules[__name__], '%sActor' % async_service_type.capitalize())(0) + async_actor = getattr(sys.modules[__name__], '%sActor' % self.async_service_type.capitalize())(0) async_service.add_actor(async_actor) - async_consumer = getattr(sys.modules[__name__], '%sConsumer' % async_service_type.capitalize())('templated-producer', capture_limit=2) + async_consumer = getattr(sys.modules[__name__], '%sConsumer' % self.async_service_type.capitalize())( + 'templated-producer', capture_limit=2) async_actor.set_consumer(async_consumer) - async_consumer_group = getattr(sys.modules[__name__], '%sConsumerGroup' % async_service_type.capitalize())() + async_consumer_group = getattr(sys.modules[__name__], + '%sConsumerGroup' % self.async_service_type.capitalize())() async_consumer_group.add_consumer(async_consumer) t = threading.Thread(target=async_consumer_group.consume, args=(), kwargs={}) t.daemon = True t.start() - if async_service_type == 'gpubsub': + if self.async_service_type == 'gpubsub': time.sleep(ASYNC_CONSUME_TIMEOUT / 24) for _ in range(2): @@ -892,16 +906,15 @@ def test_post_async_producer_templated(self): job.kill() def test_async_producer_list_has_no_payloads_matching_tags(self): - global async_service_type queue, job = start_render_queue() - async_service = getattr(sys.modules[__name__], '%sService' % async_service_type.capitalize())( - ASYNC_ADDR[async_service_type], + async_service = getattr(sys.modules[__name__], '%sService' % self.async_service_type.capitalize())( + ASYNC_ADDR[self.async_service_type], definition=DefinitionMockForAsync(None, PYBARS, queue) ) - async_actor = getattr(sys.modules[__name__], '%sActor' % async_service_type.capitalize())(0) + async_actor = getattr(sys.modules[__name__], '%sActor' % self.async_service_type.capitalize())(0) async_service.add_actor(async_actor) - async_producer = getattr(sys.modules[__name__], '%s_build_single_payload_producer' % async_service_type)( + async_producer = getattr(sys.modules[__name__], '%s_build_single_payload_producer' % self.async_service_type)( 'topic12', 'value12-3', key='key12-3', @@ -1130,7 +1143,6 @@ def test_delete_async_consumer_bad_requests(self): assert resp.text == 'No consumer actor is found for: %r' % actor_name def test_traffic_log_async(self): - global async_service_type resp = httpx.get(MGMT + '/traffic-log', verify=False) assert 200 == resp.status_code @@ -1140,23 +1152,26 @@ def test_traffic_log_async(self): entries = data['log']['entries'] - parsed = urlparse(ASYNC_ADDR[async_service_type] if ASYNC_ADDR[async_service_type].startswith('http') else 'http://%s' % ASYNC_ADDR[async_service_type]) - netloc = parsed.netloc.split('@')[-1] if async_service_type == 'gpubsub' else parsed.netloc + parsed = urlparse(ASYNC_ADDR[self.async_service_type] if ASYNC_ADDR[self.async_service_type].startswith( + 'http') else 'http://%s' % ASYNC_ADDR[self.async_service_type]) + netloc = parsed.netloc.split('@')[-1] if self.async_service_type == 'gpubsub' else parsed.netloc assert any( entry['request']['method'] == 'PUT' and # noqa: W504, W503 - entry['request']['url'] == '%s://%s/topic1%s' % (async_service_type, netloc, '?key=key1' if async_service_type not in ('redis', 'mqtt') else '') + entry['request']['url'] == '%s://%s/topic1%s' % ( + self.async_service_type, netloc, + '?key=key1' if self.async_service_type not in ('redis', 'mqtt') else '') and # noqa: W504, W503 entry['response']['status'] == 202 for entry in entries ) - if async_service_type in ('redis', 'mqtt'): + if self.async_service_type in ('redis', 'mqtt'): assert any( entry['request']['method'] == 'GET' and # noqa: W504, W503 - entry['request']['url'] == '%s://%s/topic2' % (async_service_type, netloc) + entry['request']['url'] == '%s://%s/topic2' % (self.async_service_type, netloc) and # noqa: W504, W503 entry['response']['status'] == 200 for entry in entries @@ -1165,7 +1180,7 @@ def test_traffic_log_async(self): assert any( entry['request']['method'] == 'GET' and # noqa: W504, W503 - entry['request']['url'] == '%s://%s/topic2?key=key2' % (async_service_type, netloc) + entry['request']['url'] == '%s://%s/topic2?key=key2' % (self.async_service_type, netloc) and # noqa: W504, W503 entry['response']['status'] == 200 and # noqa: W504, W503 @@ -1175,11 +1190,11 @@ def test_traffic_log_async(self): for entry in entries ) - if async_service_type in ('redis', 'mqtt'): + if self.async_service_type in ('redis', 'mqtt'): assert any( entry['request']['method'] == 'GET' and # noqa: W504, W503 - entry['request']['url'] == '%s://%s/topic3' % (async_service_type, netloc) + entry['request']['url'] == '%s://%s/topic3' % (self.async_service_type, netloc) and # noqa: W504, W503 entry['response']['status'] == 200 for entry in entries @@ -1188,7 +1203,7 @@ def test_traffic_log_async(self): assert any( entry['request']['method'] == 'GET' and # noqa: W504, W503 - entry['request']['url'] == '%s://%s/topic3?key=key3' % (async_service_type, netloc) + entry['request']['url'] == '%s://%s/topic3?key=key3' % (self.async_service_type, netloc) and # noqa: W504, W503 entry['response']['status'] == 200 and # noqa: W504, W503 @@ -1201,7 +1216,9 @@ def test_traffic_log_async(self): assert any( entry['request']['method'] == 'PUT' and # noqa: W504, W503 - entry['request']['url'] == '%s://%s/topic3%s' % (async_service_type, netloc, '?key=key3' if async_service_type not in ('redis', 'mqtt') else '') + entry['request']['url'] == '%s://%s/topic3%s' % ( + self.async_service_type, netloc, + '?key=key3' if self.async_service_type not in ('redis', 'mqtt') else '') and # noqa: W504, W503 entry['response']['status'] == 202 for entry in entries @@ -1210,7 +1227,7 @@ def test_traffic_log_async(self): assert any( entry['request']['method'] == 'PUT' and # noqa: W504, W503 - entry['request']['url'] == '%s://%s/topic6' % (async_service_type, netloc) + entry['request']['url'] == '%s://%s/topic6' % (self.async_service_type, netloc) and # noqa: W504, W503 entry['response']['status'] == 202 for entry in entries @@ -1219,7 +1236,9 @@ def test_traffic_log_async(self): assert any( entry['request']['method'] == 'PUT' and # noqa: W504, W503 - entry['request']['url'] == '%s://%s/topic7%s' % (async_service_type, netloc, '?key=key7' if async_service_type not in ('redis', 'mqtt') else '') + entry['request']['url'] == '%s://%s/topic7%s' % ( + self.async_service_type, netloc, + '?key=key7' if self.async_service_type not in ('redis', 'mqtt') else '') and # noqa: W504, W503 entry['response']['status'] == 202 for entry in entries @@ -1228,17 +1247,19 @@ def test_traffic_log_async(self): assert any( entry['request']['method'] == 'PUT' and # noqa: W504, W503 - entry['request']['url'] == '%s://%s/topic8%s' % (async_service_type, netloc, '?key=key8' if async_service_type not in ('redis', 'mqtt') else '') + entry['request']['url'] == '%s://%s/topic8%s' % ( + self.async_service_type, netloc, + '?key=key8' if self.async_service_type not in ('redis', 'mqtt') else '') and # noqa: W504, W503 entry['response']['status'] == 202 for entry in entries ) - if async_service_type in ('redis', 'mqtt'): + if self.async_service_type in ('redis', 'mqtt'): assert any( entry['request']['method'] == 'PUT' and # noqa: W504, W503 - entry['request']['url'].startswith('%s://%s/templated-producer' % (async_service_type, netloc)) + entry['request']['url'].startswith('%s://%s/templated-producer' % (self.async_service_type, netloc)) and # noqa: W504, W503 entry['response']['status'] == 202 for entry in entries @@ -1247,7 +1268,8 @@ def test_traffic_log_async(self): assert any( entry['request']['method'] == 'PUT' and # noqa: W504, W503 - entry['request']['url'].startswith('%s://%s/templated-producer?key=prefix-' % (async_service_type, netloc)) + entry['request']['url'].startswith( + '%s://%s/templated-producer?key=prefix-' % (self.async_service_type, netloc)) and # noqa: W504, W503 entry['request']['headers'][-2]['value'].isnumeric() and # noqa: W504, W503 @@ -1258,7 +1280,6 @@ def test_traffic_log_async(self): ) def test_stats(self): - global async_service_type resp = httpx.get(MGMT + '/stats', verify=False) assert 200 == resp.status_code @@ -1283,7 +1304,7 @@ def test_stats(self): assert data['services'][0]['endpoints'][0]['status_code_distribution'] == {'202': 1} assert data['services'][0]['endpoints'][1]['hint'] == 'GET topic2 - 1' - if async_service_type not in ('redis', 'mqtt'): + if self.async_service_type not in ('redis', 'mqtt'): assert data['services'][0]['endpoints'][1]['request_counter'] == 2 assert data['services'][0]['endpoints'][1]['status_code_distribution'] == {'200': 2} assert data['services'][0]['endpoints'][1]['avg_resp_time'] == 0 @@ -1292,13 +1313,15 @@ def test_stats(self): assert data['services'][0]['endpoints'][2]['request_counter'] > 1 assert data['services'][0]['endpoints'][2]['avg_resp_time'] == 0 assert data['services'][0]['endpoints'][2]['status_code_distribution']['202'] > 1 - assert data['services'][0]['endpoints'][2]['status_code_distribution']['202'] == data['services'][0]['endpoints'][2]['request_counter'] + assert data['services'][0]['endpoints'][2]['status_code_distribution']['202'] == \ + data['services'][0]['endpoints'][2]['request_counter'] assert data['services'][0]['endpoints'][3]['hint'] == 'GET topic3 - 3' assert data['services'][0]['endpoints'][3]['request_counter'] > 0 assert data['services'][0]['endpoints'][3]['avg_resp_time'] == 0 assert data['services'][0]['endpoints'][3]['status_code_distribution']['200'] > 0 - assert data['services'][0]['endpoints'][3]['status_code_distribution']['200'] == data['services'][0]['endpoints'][3]['request_counter'] + assert data['services'][0]['endpoints'][3]['status_code_distribution']['200'] == \ + data['services'][0]['endpoints'][3]['request_counter'] assert data['services'][0]['endpoints'][4]['hint'] == 'GET topic4 - 4' assert data['services'][0]['endpoints'][4]['request_counter'] == 1 @@ -1319,7 +1342,8 @@ def test_stats(self): assert data['services'][0]['endpoints'][7]['request_counter'] > 0 assert data['services'][0]['endpoints'][7]['avg_resp_time'] == 0 assert data['services'][0]['endpoints'][7]['status_code_distribution']['202'] > 0 - assert data['services'][0]['endpoints'][7]['status_code_distribution']['202'] == data['services'][0]['endpoints'][7]['request_counter'] + assert data['services'][0]['endpoints'][7]['status_code_distribution']['202'] == \ + data['services'][0]['endpoints'][7]['request_counter'] assert data['services'][0]['endpoints'][8]['hint'] == 'PUT topic8 - 7 (actor: short-loop)' assert data['services'][0]['endpoints'][8]['request_counter'] > 4 @@ -1347,15 +1371,15 @@ def test_stats(self): assert data['services'][1]['status_code_distribution'] == {} assert len(data['services'][1]['endpoints']) == 6 - if async_service_type not in ('gpubsub', 'amazonsqs', 'mqtt'): - assert data['services'][2]['hint'] == '%s://localhost:%s' % (async_service_type, str(int(ASYNC_ADDR[async_service_type].split(':')[1]) + 1)) + if self.async_service_type not in ('gpubsub', 'amazonsqs', 'mqtt'): + assert data['services'][2]['hint'] == '%s://localhost:%s' % ( + self.async_service_type, str(int(ASYNC_ADDR[self.async_service_type].split(':')[1]) + 1)) assert data['services'][2]['request_counter'] == 0 assert data['services'][2]['avg_resp_time'] == 0 assert data['services'][2]['status_code_distribution'] == {} assert len(data['services'][2]['endpoints']) == 2 def assert_management_post_config(self, async_consumer): - global async_service_type key = 'key1' value = 'value101' @@ -1365,7 +1389,7 @@ def assert_management_post_config(self, async_consumer): 'global-hdr2': 'globalval2' } - if async_service_type in ('redis', 'mqtt'): + if self.async_service_type in ('redis', 'mqtt'): assert any(row[1] == value for row in async_consumer.log) else: assert any(row[0] == key and row[1] == value and row[2] == headers for row in async_consumer.log) @@ -1386,14 +1410,13 @@ def assert_management_post_config(self, async_consumer): self.assert_consumer_log(data, key, value, headers) def test_management_post_config(self): - global async_service_type resp = httpx.get(MGMT + '/config', verify=False) assert 200 == resp.status_code assert resp.headers['Content-Type'] == 'application/json; charset=UTF-8' data = resp.json() - if async_service_type not in ('redis', 'mqtt'): + if self.async_service_type not in ('redis', 'mqtt'): del data['globals']['headers']['global-hdr1'] data['globals']['headers']['global-hdrX'] = 'globalvalY' data['services'][0]['actors'][2]['produce']['key'] = 'key301' @@ -1408,21 +1431,23 @@ def test_management_post_config(self): time.sleep(ASYNC_CONSUME_WAIT / 10) queue, job = start_render_queue() - async_service = getattr(sys.modules[__name__], '%sService' % async_service_type.capitalize())( - ASYNC_ADDR[async_service_type], + async_service = getattr(sys.modules[__name__], '%sService' % self.async_service_type.capitalize())( + ASYNC_ADDR[self.async_service_type], definition=DefinitionMockForAsync(None, PYBARS, queue) ) - async_actor = getattr(sys.modules[__name__], '%sActor' % async_service_type.capitalize())(0) + async_actor = getattr(sys.modules[__name__], '%sActor' % self.async_service_type.capitalize())(0) async_service.add_actor(async_actor) - async_consumer = getattr(sys.modules[__name__], '%sConsumer' % async_service_type.capitalize())('topic1', enable_topic_creation=True) + async_consumer = getattr(sys.modules[__name__], '%sConsumer' % self.async_service_type.capitalize())('topic1', + enable_topic_creation=True) async_actor.set_consumer(async_consumer) - async_consumer_group = getattr(sys.modules[__name__], '%sConsumerGroup' % async_service_type.capitalize())() + async_consumer_group = getattr(sys.modules[__name__], + '%sConsumerGroup' % self.async_service_type.capitalize())() async_consumer_group.add_consumer(async_consumer) t = threading.Thread(target=async_consumer_group.consume, args=(), kwargs={}) t.daemon = True t.start() - if async_service_type == 'gpubsub': + if self.async_service_type == 'gpubsub': time.sleep(ASYNC_CONSUME_TIMEOUT / 24) resp = httpx.post(MGMT + '/async/producers/0', verify=False) @@ -1438,16 +1463,16 @@ def test_management_post_config(self): job.kill() def test_management_get_resources(self): - global async_service_type resp = httpx.get(MGMT + '/resources', verify=False) assert 200 == resp.status_code assert resp.headers['Content-Type'] == 'application/json; charset=UTF-8' data = resp.json() - if async_service_type in ('redis', 'mqtt'): + if self.async_service_type in ('redis', 'mqtt'): assert data == {'files': ['dataset.json', 'image.png', 'value_schema.json', 'value_schema_error.json']} else: - assert data == {'files': ['dataset.json', 'image.png', 'templates/example.txt', 'value_schema.json', 'value_schema_error.json']} + assert data == {'files': ['dataset.json', 'image.png', 'templates/example.txt', 'value_schema.json', + 'value_schema_error.json']} @pytest.mark.parametrize(('topic', 'key', 'value', 'headers', 'endpoint'), [ ( @@ -1474,24 +1499,25 @@ def test_management_get_resources(self): ) ]) def test_trigger_async_producer(self, topic, key, value, headers, endpoint): - global async_service_type queue, job = start_render_queue() - async_service = getattr(sys.modules[__name__], '%sService' % async_service_type.capitalize())( - ASYNC_ADDR[async_service_type], + async_service = getattr(sys.modules[__name__], '%sService' % self.async_service_type.capitalize())( + ASYNC_ADDR[self.async_service_type], definition=DefinitionMockForAsync(None, PYBARS, queue) ) - async_actor = getattr(sys.modules[__name__], '%sActor' % async_service_type.capitalize())(0) + async_actor = getattr(sys.modules[__name__], '%sActor' % self.async_service_type.capitalize())(0) async_service.add_actor(async_actor) - async_consumer = getattr(sys.modules[__name__], '%sConsumer' % async_service_type.capitalize())(topic, enable_topic_creation=True) + async_consumer = getattr(sys.modules[__name__], '%sConsumer' % self.async_service_type.capitalize())(topic, + enable_topic_creation=True) async_actor.set_consumer(async_consumer) - async_consumer_group = getattr(sys.modules[__name__], '%sConsumerGroup' % async_service_type.capitalize())() + async_consumer_group = getattr(sys.modules[__name__], + '%sConsumerGroup' % self.async_service_type.capitalize())() async_consumer_group.add_consumer(async_consumer) t = threading.Thread(target=async_consumer_group.consume, args=(), kwargs={}) t.daemon = True t.start() - if async_service_type == 'gpubsub': + if self.async_service_type == 'gpubsub': time.sleep(ASYNC_CONSUME_TIMEOUT / 24) resp = httpx.get(SRV_8001 + endpoint, headers={'Host': SRV_8001_HOST}) @@ -1533,9 +1559,7 @@ class TestAsyncKafka(AsyncBase): @classmethod def setup_class(cls): - global async_service_type - - async_service_type = 'kafka' + cls.async_service_type = 'kafka' super().setup_class() @@ -1543,9 +1567,7 @@ class TestAsyncAMQP(AsyncBase): @classmethod def setup_class(cls): - global async_service_type - - async_service_type = 'amqp' + cls.async_service_type = 'amqp' super().setup_class() @@ -1553,9 +1575,7 @@ class TestAsyncRedis(AsyncBase): @classmethod def setup_class(cls): - global async_service_type - - async_service_type = 'redis' + cls.async_service_type = 'redis' super().setup_class() @@ -1563,9 +1583,7 @@ class TestAsyncGpubsub(AsyncBase): @classmethod def setup_class(cls): - global async_service_type - - async_service_type = 'gpubsub' + cls.async_service_type = 'gpubsub' super().setup_class() @@ -1573,9 +1591,7 @@ class TestAsyncAmazonSQS(AsyncBase): @classmethod def setup_class(cls): - global async_service_type - - async_service_type = 'amazonsqs' + cls.async_service_type = 'amazonsqs' super().setup_class() @@ -1583,16 +1599,14 @@ class TestAsyncMQTT(AsyncBase): @classmethod def setup_class(cls): - global async_service_type - - async_service_type = 'mqtt' + cls.async_service_type = 'mqtt' super().setup_class() -@pytest.mark.parametrize(('config'), [ +@pytest.mark.parametrize('config', [ 'configs/yaml/hbs/amqp_properties/config.yaml' ]) -class TestAsyncAMQPProperties(): +class TestAsyncAMQPProperties: def setup_method(self): config = self._item.callspec.getparam('config') @@ -1602,11 +1616,11 @@ def teardown_method(self): self.mock_server_process.terminate() def test_scheduled_producer_and_consumer(self, config): - async_service_type = 'amqp' + self.async_service_type = 'amqp' for topic in ( - 'shipping-task' + 'shipping-task' ): - amqp_create_topic(ASYNC_ADDR[async_service_type], topic) + amqp_create_topic(ASYNC_ADDR[self.async_service_type], topic) resp = httpx.post(MGMT + '/traffic-log', data={"enable": True}, verify=False) assert 204 == resp.status_code @@ -1620,13 +1634,15 @@ def test_scheduled_producer_and_consumer(self, config): entries = data['log']['entries'] - parsed = urlparse(ASYNC_ADDR[async_service_type] if ASYNC_ADDR[async_service_type].startswith('http') else 'http://%s' % ASYNC_ADDR[async_service_type]) - netloc = parsed.netloc.split('@')[-1] if async_service_type == 'gpubsub' else parsed.netloc + parsed = urlparse(ASYNC_ADDR[self.async_service_type] if ASYNC_ADDR[self.async_service_type].startswith( + 'http') else 'http://%s' % ASYNC_ADDR[self.async_service_type]) + netloc = parsed.netloc.split('@')[-1] if self.async_service_type == 'gpubsub' else parsed.netloc assert any( entry['request']['method'] == 'PUT' and # noqa: W504, W503 - entry['request']['url'] == '%s://%s/shipping-task%s' % (async_service_type, netloc, '?key=shipping-task') + entry['request']['url'] == '%s://%s/shipping-task%s' % ( + self.async_service_type, netloc, '?key=shipping-task') and # noqa: W504, W503 entry['request']['headers'][0]['name'] == 'Global-Hdr1' and # noqa: W504, W503 @@ -1663,7 +1679,8 @@ def test_scheduled_producer_and_consumer(self, config): assert any( entry['request']['method'] == 'GET' and # noqa: W504, W503 - entry['request']['url'] == '%s://%s/shipping-task%s' % (async_service_type, netloc, '?key=shipping-task') + entry['request']['url'] == '%s://%s/shipping-task%s' % ( + self.async_service_type, netloc, '?key=shipping-task') and # noqa: W504, W503 entry['response']['status'] == 200 and # noqa: W504, W503