forked from scylladb/scylla-cluster-tests
-
Notifications
You must be signed in to change notification settings - Fork 0
/
gemini_test.py
155 lines (116 loc) · 5.65 KB
/
gemini_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
#!/usr/bin/env python
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
#
# See LICENSE for more details.
#
# Copyright (c) 2018 ScyllaDB
import time
from sdcm.tester import ClusterTester
class GeminiTest(ClusterTester):
"""
Test Scylla with gemini - tool for testing data integrity
https://github.com/scylladb/gemini
"""
gemini_results = {
"cmd": ["N/A"],
"status": "Not Running",
"results": [],
'errors': {}
}
def test_random_load(self):
"""
Run gemini tool
"""
cmd = self.params.get('gemini_cmd')
self.log.debug('Start gemini benchmark')
gemini_thread = self.run_gemini(cmd=cmd)
self.gemini_results["cmd"] = gemini_thread.gemini_commands
self.gemini_results.update(self.verify_gemini_results(queue=gemini_thread))
self.verify_results()
def test_load_random_with_nemesis(self):
cmd = self.params.get('gemini_cmd')
self.db_cluster.add_nemesis(nemesis=self.get_nemesis_class(),
tester_obj=self)
self.log.debug('Start gemini benchmark')
gemini_thread = self.run_gemini(cmd=cmd)
self.gemini_results["cmd"] = gemini_thread.gemini_commands
# sleep before run nemesis test_duration * .25
sleep_before_start = float(self.params.get('test_duration')) * 60 * .1
self.log.info('Sleep interval {}'.format(sleep_before_start))
time.sleep(sleep_before_start)
self.db_cluster.start_nemesis()
self.gemini_results.update(self.verify_gemini_results(queue=gemini_thread))
self.db_cluster.stop_nemesis(timeout=1600)
self.verify_results()
def test_load_random_with_nemesis_cdc_reader(self):
cmd = self.params.get('gemini_cmd')
cdc_stress = self.params.get('stress_cdclog_reader_cmd')
update_es = self.params.get('store_cdclog_reader_stats_in_es')
self.db_cluster.add_nemesis(nemesis=self.get_nemesis_class(),
tester_obj=self)
self.log.debug('Start gemini benchmark')
gemini_thread = self.run_gemini(cmd=cmd)
self.gemini_results["cmd"] = gemini_thread.gemini_commands
# wait gemini create schema and write some data
self.db_cluster.wait_for_schema_agreement()
cdc_stress_queue = self.run_cdclog_reader_thread(stress_cmd=cdc_stress,
keyspace_name="ks1",
base_table_name="table1")
# sleep before run nemesis test_duration * .1
sleep_before_start = float(self.params.get('test_duration')) * 60 * .1
self.log.info('Sleep interval {}'.format(sleep_before_start))
time.sleep(sleep_before_start)
self.db_cluster.start_nemesis()
self.gemini_results.update(self.verify_gemini_results(queue=gemini_thread))
cdc_stress_results = self.verify_cdclog_reader_results(cdc_stress_queue, update_es)
self.log.debug(cdc_stress_results)
self.db_cluster.stop_nemesis(timeout=1600)
self.verify_results()
def test_gemini_and_cdc_reader(self):
gemini_cmd = self.params.get('gemini_cmd')
cdc_stress = self.params.get('stress_cdclog_reader_cmd')
update_es = self.params.get('store_cdclog_reader_stats_in_es')
gemini_thread = self.run_gemini(cmd=gemini_cmd)
self.gemini_results["cmd"] = gemini_thread.gemini_commands
# wait gemini create schema
time.sleep(10)
cdc_stress_queue = self.run_cdclog_reader_thread(stress_cmd=cdc_stress,
keyspace_name="ks1",
base_table_name="table1")
self.gemini_results.update(self.verify_gemini_results(queue=gemini_thread))
cdc_stress_results = self.verify_cdclog_reader_results(cdc_stress_queue, update_es)
self.log.debug(cdc_stress_results)
self.verify_results()
def verify_results(self):
if self.gemini_results['status'] == 'FAILED':
self.fail(self.gemini_results['results'])
def get_email_data(self):
self.log.info('Prepare data for email')
email_data = self._get_common_email_data()
if self.loaders:
gemini_version = self.loaders.gemini_version
else:
self.log.warning("Failed to get gemini version as loader instance is not created")
gemini_version = ""
email_data.update({
"gemini_cmd": self.gemini_results["cmd"],
"gemini_version": gemini_version,
"nemesis_details": self.get_nemesises_stats(),
"nemesis_name": self.params.get("nemesis_class_name"),
"number_of_oracle_nodes": self.params.get("n_test_oracle_db_nodes"),
"oracle_ami_id": self.params.get("ami_id_db_oracle"),
"oracle_db_version":
self.cs_db_cluster.nodes[0].scylla_version if self.cs_db_cluster else "N/A",
"oracle_instance_type": self.params.get("instance_type_db_oracle"),
"results": self.gemini_results["results"],
"scylla_ami_id": self.params.get("ami_id_db_scylla"),
"status": self.gemini_results["status"],
})
return email_data