-
Notifications
You must be signed in to change notification settings - Fork 24
/
test_v1.py
102 lines (92 loc) · 3.98 KB
/
test_v1.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
from test import BaseTestCase
from common.isolated_creds import *
import time
import sys
class BaseTestCase_v1(BaseTestCase):
isolation = True
@classmethod
def setUpClass(cls):
cls.project = None
cls.admin_inputs = None
cls.admin_connections = None
cls.domain_name = None
cls.domain_obj = None
super(BaseTestCase_v1, cls).setUpClass()
if 'common.k8s.base' in sys.modules and issubclass(
cls, sys.modules['common.k8s.base'].BaseK8sTest):
# no isolation for k8s, all tests run in same domain & project
cls.isolation = False
cls.inputs.tenant_isolation = False
if 'v3' in cls.inputs.auth_url:
if cls.isolation and cls.inputs.domain_isolation:
cls.domain_name = cls.__name__
# If user wants to run tests in his allocated domain
else:
cls.domain_name = cls.inputs.stack_domain
if not cls.inputs.tenant_isolation:
project_name = cls.inputs.stack_tenant
else:
project_name = cls.__name__
cls.isolated_creds = IsolatedCreds(
cls.inputs,
domain_name=cls.domain_name,
project_name=project_name,
input_file=cls.input_file,
logger=cls.logger)
if cls.isolation is False:
cls.admin_isolated_creds = AdminIsolatedCreds(
cls.inputs,
domain_name=cls.inputs.admin_domain,
input_file=cls.input_file,
logger=cls.logger)
cls.isolated_creds = cls.admin_isolated_creds
elif cls.inputs.tenant_isolation:
cls.admin_isolated_creds = AdminIsolatedCreds(
cls.inputs,
domain_name=cls.inputs.admin_domain,
input_file=cls.input_file,
logger=cls.logger)
cls.admin_isolated_creds.setUp()
if 'v3' in cls.inputs.auth_url:
cls.domain_obj = cls.admin_isolated_creds.create_domain(
cls.isolated_creds.domain_name)
cls.project = cls.admin_isolated_creds.create_tenant(
cls.isolated_creds.project_name, cls.isolated_creds.domain_name)
cls.admin_inputs = cls.admin_isolated_creds.get_inputs(cls.project)
cls.admin_isolated_creds.create_and_attach_user_to_tenant(
cls.project,
cls.isolated_creds.username,
cls.isolated_creds.password)
cls.admin_connections = cls.admin_isolated_creds.get_connections(
cls.admin_inputs)
# endif
cls.isolated_creds.setUp()
if not cls.project:
cls.project = cls.isolated_creds.create_tenant(
cls.isolated_creds.project_name)
cls.inputs = cls.isolated_creds.get_inputs(cls.project)
cls.connections = cls.isolated_creds.get_connections(cls.inputs)
cls.create_flood_vmi_if_vcenter_gw_setup()
# end setUpClass
@classmethod
def tearDownClass(cls):
if cls.isolation and cls.inputs.tenant_isolation:
cls.admin_isolated_creds.delete_tenant(cls.project)
cls.admin_isolated_creds.delete_user(cls.isolated_creds.username)
if cls.isolation and cls.inputs.domain_isolation:
cls.admin_isolated_creds.delete_domain(cls.domain_obj)
super(BaseTestCase_v1, cls).tearDownClass()
# end tearDownClass
def if_vcenter_gw_setup_return_gw_orch_class(self):
if self.inputs.vcenter_gw_setup:
return self.connections.slave_orch
else:
return self.connections.orch
@property
def orchestrator(self):
return self.if_vcenter_gw_setup_return_gw_orch_class()
@classmethod
def create_flood_vmi_if_vcenter_gw_setup(cls):
if cls.inputs.vcenter_gw_setup: # For vcenter gateway setup
cls.vcenter_orch = cls.connections.slave_orch
cls.vcenter_orch.create_vn_vmi_for_stp_bpdu_to_be_flooded()