forked from scylladb/scylla-cluster-tests
-
Notifications
You must be signed in to change notification settings - Fork 0
/
performance_regression_operator_multi_tenant_test.py
166 lines (135 loc) · 6.65 KB
/
performance_regression_operator_multi_tenant_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
#!/usr/bin/env python
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See LICENSE for more details.
#
# Copyright (c) 2022 ScyllaDB
from performance_regression_test import PerformanceRegressionTest
from sdcm.utils.common import ParallelObject
from sdcm.utils.operator.multitenant_common import MultiTenantTestMixin
# pylint: disable=too-many-public-methods
class PerformanceRegressionOperatorMultiTenantTest(MultiTenantTestMixin, PerformanceRegressionTest):
load_iteration_timeout_sec = 7200
def create_test_stats(self, *args, **kwargs): # pylint: disable=unused-argument
self.log.info(
"Suppress the test class stats creation. "
"Leave it for the per-DB classes.")
self._stats = self._init_stats()
def update_test_with_errors(self):
self.log.info("update_test_with_errors: Suppress writing errors to ES")
def run_fstrim_on_all_db_nodes(self):
for tenant in self.tenants:
self.log.info("Running fstrim command on the '%s' DB cluster", tenant.db_cluster.name)
tenant.db_cluster.fstrim_scylla_disks_on_nodes()
def wait_no_compactions_running(self, *args, **kwargs):
for tenant in self.tenants:
self.log.info("Waiting for compactions to finish on the '%s' DB cluster",
tenant.db_cluster.name)
tenant.wait_no_compactions_running(*args, **kwargs)
def preload_data(self, compaction_strategy=None):
def _preload_data(tenant):
prepare_write_cmd = tenant.params.get('prepare_write_cmd')
db_cluster_name = tenant.db_cluster.name
if not prepare_write_cmd:
self.log.warning(
"No prepare command defined in YAML for the '%s' cluster",
db_cluster_name)
return
self.log.info("Running preload command for the '%s' cluster", db_cluster_name)
tenant.create_test_stats(
sub_type='write-prepare', doc_id_with_timestamp=True)
stress_queue, params = [], {
'prefix': 'preload-',
}
if self.params.get('round_robin'):
self.log.debug(
"'%s' DB cluster: Populating data using round_robin", db_cluster_name)
params.update({'stress_num': 1, 'round_robin': True})
for stress_cmd in prepare_write_cmd:
params.update({'stress_cmd': stress_cmd})
# Run all stress commands
params.update(dict(stats_aggregate_cmds=False))
self.log.debug("'%s' DB cluster: RUNNING stress cmd: %s",
db_cluster_name, stress_cmd)
stress_queue.append(tenant.run_stress_thread(**params))
for stress in stress_queue:
tenant.get_stress_results(queue=stress, store_results=False)
tenant.update_test_details()
self.log.info("Running preload operation in parallel on all the DB clusters")
object_set = ParallelObject(
timeout=self.load_iteration_timeout_sec,
objects=[[tenant] for tenant in self.tenants],
num_workers=len(self.tenants),
)
object_set.run(func=_preload_data, unpack_objects=True, ignore_exceptions=False)
def run_read_workload(self, nemesis=False):
def _run_read_workload(tenant, nemesis):
tenant.run_read_workload(nemesis=nemesis)
self.log.info("Running 'read' workload operation in parallel")
object_set = ParallelObject(
timeout=self.load_iteration_timeout_sec,
objects=[[tenant, nemesis] for tenant in self.tenants],
num_workers=len(self.tenants),
)
object_set.run(func=_run_read_workload, unpack_objects=True, ignore_exceptions=False)
def run_write_workload(self, nemesis=False):
def _run_write_workload(tenant, nemesis):
tenant.run_write_workload(nemesis=nemesis)
self.log.info("Running 'write' workload operation in parallel")
object_set = ParallelObject(
timeout=self.load_iteration_timeout_sec,
objects=[[tenant, nemesis] for tenant in self.tenants],
num_workers=len(self.tenants),
)
object_set.run(func=_run_write_workload, unpack_objects=True, ignore_exceptions=False)
def run_mixed_workload(self, nemesis: bool = False):
def _run_mixed_workload(tenant, nemesis):
tenant.run_mixed_workload(nemesis=nemesis)
self.log.info("Running 'mixed' workload operation in parallel")
object_set = ParallelObject(
timeout=self.load_iteration_timeout_sec,
objects=[[tenant, nemesis] for tenant in self.tenants],
num_workers=len(self.tenants),
)
object_set.run(func=_run_mixed_workload, unpack_objects=True, ignore_exceptions=False)
def run_workload(self, stress_cmd, nemesis=False, sub_type=None):
def _run_workload(tenant, stress_cmd, nemesis):
tenant.run_workload(stress_cmd=stress_cmd, nemesis=nemesis)
self.log.info("Running workload in parallel with following command:\n%s", stress_cmd)
object_set = ParallelObject(
timeout=self.load_iteration_timeout_sec,
objects=[[tenant, stress_cmd, nemesis] for tenant in self.tenants],
num_workers=len(self.tenants),
)
object_set.run(func=_run_workload, unpack_objects=True, ignore_exceptions=False)
def test_write(self):
raise NotImplementedError()
def test_read(self):
raise NotImplementedError()
def test_mixed(self):
raise NotImplementedError()
def test_mv_write(self):
raise NotImplementedError()
def test_mv_write_populated(self):
raise NotImplementedError()
def test_mv_write_not_populated(self):
raise NotImplementedError()
def test_mv_read_populated(self):
raise NotImplementedError()
def test_mv_read_not_populated(self):
raise NotImplementedError()
def test_mv_mixed_populated(self):
raise NotImplementedError()
def test_mv_mixed_not_populated(self):
raise NotImplementedError()
def test_uniform_counter_update_bench(self):
raise NotImplementedError()
def test_timeseries_bench(self):
raise NotImplementedError()