Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert ovn-tester to use asyncio #66

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
1e99a56
ovn-tester: Initial asyncio support.
putnopvut Jul 28, 2021
d095866
ovn-tester: Add stats collection for asynchronous contexts.
putnopvut Jul 29, 2021
d664974
ovn-tester: Get ext_cmds working with asyncio.
putnopvut Aug 3, 2021
5428430
ovn-tester: Rearrange how next lport index is calculated.
putnopvut Aug 2, 2021
7e328a0
ovn-tester: Add locking to load balancer provisioning.
putnopvut Aug 3, 2021
30b50a2
ovn-tester: ping ports concurrently.
putnopvut Jul 29, 2021
6be9a4c
Create generalized QPS testing routine.
putnopvut Aug 3, 2021
1a93c37
ovn-tester: Use QPS model for base cluster bringup.
putnopvut Jul 29, 2021
07525c8
ovn-tester: Use QPS method for density_light test.
putnopvut Jul 30, 2021
0c712bb
ovn-tester: Use QPS method for density_heavy test.
putnopvut Aug 3, 2021
ca660f4
ovn-tester: Use QPS method for netpol_small and netpol_large tests.
putnopvut Aug 3, 2021
8281252
ovn-tester: Use QPS method for netpol_cross_ns test.
putnopvut Aug 3, 2021
e44d369
ovn-tester: Use QPS method for netpol_multitenant test.
putnopvut Aug 3, 2021
f93e2a1
ovn-tester: Use QPS method for cluster_density test.
putnopvut Aug 3, 2021
e068771
ovn-tester: Remove unused __iter__ and __next__ from Context.
putnopvut Aug 9, 2021
8a7c595
ovn-tester: Provision ports simultaneously.
putnopvut Aug 31, 2021
6ed476e
ovn-tester: Don't use pings for testing if port is up.
putnopvut Sep 21, 2021
aa2ce69
ovn-tester: Allow configurable connections.
putnopvut Oct 1, 2021
a737f67
ovn-tester: Allow tests to manually end iterations.
putnopvut Oct 7, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 91 additions & 22 deletions ovn-tester/ovn_context.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import logging
import ovn_stats
import time
import asyncio
from collections import Counter

log = logging.getLogger(__name__)

Expand All @@ -11,15 +13,18 @@


class Context(object):
def __init__(self, test_name, max_iterations=1, brief_report=False,
def __init__(self, test_name, max_iterations=None, brief_report=False,
test=None):
self.iteration = -1
self.test_name = test_name
self.max_iterations = max_iterations
self.brief_report = brief_report
self.iteration_start = None
self.failed = False
self.test = test
self.iterations = dict()
if self.max_iterations is None:
self.iteration_singleton = ContextIteration(0, self)
else:
self.iteration_singleton = None

def __enter__(self):
global active_context
Expand All @@ -32,27 +37,91 @@ def __exit__(self, type, value, traceback):
ovn_stats.report(self.test_name, brief=self.brief_report)
log.info(f'Exiting context {self.test_name}')

def __iter__(self):
return self
async def iteration_started(self, iteration):
'''Explicitly begin iteration 'n'. This is necessary when running
asynchronously since task starts and ends can overlap.'''
iteration.start()
log.info(f'Context {self.test_name}, Iteration {iteration.num}')
if self.test:
await self.test.exec_cmd(iteration.num, self.test_name)

def __next__(self):
now = time.perf_counter()
if self.iteration_start:
duration = now - self.iteration_start
ovn_stats.add(ITERATION_STAT_NAME, duration, failed=self.failed)
log.log(logging.WARNING if self.failed else logging.INFO,
f'Context {self.test_name}, Iteration {self.iteration}, '
f'Result: {"FAILURE" if self.failed else "SUCCESS"}')
self.failed = False
def iteration_completed(self, iteration):
''' Explicitly end iteration 'n'. This is necessary when running
asynchronously since task starts and ends can overlap.'''
iteration.end()
duration = iteration.end_time - iteration.start_time
ovn_stats.add(ITERATION_STAT_NAME, duration, iteration.failed,
iteration)
self.iterations = {task_name: it for task_name, it in
self.iterations.items() if it != iteration}
if self.test:
# exec external cmd
self.test.exec_cmd(self.iteration, self.test_name)
self.iteration_start = now
if self.iteration < self.max_iterations - 1:
self.iteration += 1
log.info(f'Context {self.test_name}, Iteration {self.iteration}')
return self.iteration
raise StopIteration
self.test.terminate_process(self.test_name)
log.log(logging.WARNING if iteration.failed else logging.INFO,
f'Context {self.test_name}, Iteration {iteration.num}, '
f'Result: {"FAILURE" if iteration.failed else "SUCCESS"}')

def all_iterations_completed(self):
if self.iteration_singleton is not None:
# Weird, but you do you man.
self.iteration_completed(self.iteration_singleton)
return

# self.iterations may have the same iteration value for multiple
# keys. We need to first get the unique list of iterations.
iter_list = Counter(self.iterations.values())
for iteration in iter_list:
self.iteration_completed(iteration)

def create_task(self, coro, iteration=None):
'''Create a task to run in this context.'''
if iteration is None:
iteration = get_current_iteration()
task = asyncio.create_task(coro)
self.iterations[task.get_name()] = iteration
return task

async def qps_test(self, qps, coro, *args, **kwargs):
tasks = []
for i in range(self.max_iterations):
iteration = ContextIteration(i, self)
tasks.append(self.create_task(
self.qps_task(iteration, coro, *args, **kwargs), iteration)
)
# Use i+1 so that we don't sleep on task 0 and so that
# we sleep after 20 iterations instead of 21.
if (i + 1) % qps == 0:
await asyncio.sleep(1)
putnopvut marked this conversation as resolved.
Show resolved Hide resolved
await asyncio.gather(*tasks)

async def qps_task(self, iteration, coro, *args, **kwargs):
await self.iteration_started(iteration)
await coro(*args)
if kwargs.get('end_iteration', True):
self.iteration_completed(iteration)


def get_current_iteration():
ctx = active_context
if ctx.iteration_singleton is not None:
return ctx.iteration_singleton

cur_task = asyncio.current_task()
return active_context.iterations[cur_task.get_name()]


class ContextIteration(object):
def __init__(self, num, context):
self.num = num
self.context = context
self.start_time = None
self.end_time = None
self.failed = False

def fail(self):
self.failed = True

def start(self):
self.start_time = time.perf_counter()

def end(self):
self.end_time = time.perf_counter()
51 changes: 39 additions & 12 deletions ovn-tester/ovn_ext_cmd.py
Original file line number Diff line number Diff line change
@@ -1,31 +1,46 @@
from collections import namedtuple
from io import StringIO
import logging


log = logging.getLogger(__name__)


ExtCmdCfg = namedtuple('ExtCmdCfg',
['iteration',
'duration',
'cmd',
'node',
'test',
'pid_name',
'pid_opt',
'background_opt'])
'pid_opt'])


class ExtCmd(object):
def __init__(self, config, central_node, worker_nodes):
cmd_list = [
ExtCmdCfg(
iteration=ext_cmd.get('iteration', None),
duration=ext_cmd.get('duration', 0),
cmd=ext_cmd.get('cmd', ''),
node=ext_cmd.get('node', ''),
test=ext_cmd.get('test', ''),
pid_name=ext_cmd.get('pid_name', ''),
pid_opt=ext_cmd.get('pid_opt', ''),
background_opt=ext_cmd.get('background_opt', False),
) for ext_cmd in config.get('ext_cmd', list())
]
cmd_list.sort(key=lambda x: x.iteration)
self.cmd_map = self.build_cmd_map(cmd_list, central_node, worker_nodes)
self.processes = []

def __del__(self):
# Just in case someone set some daft duration for a command, terminate
# it here.
while len(self.processes) > 0:
process, _ = self.processes.pop(0)
log.warning(f'Terminating command "{process.command}" '
f'(bad duration)')
process.terminate()

def build_cmd_map(self, cmd_list, central_node, worker_nodes):
cmd_map = dict()
Expand All @@ -42,7 +57,7 @@ def build_cmd_map(self, cmd_list, central_node, worker_nodes):
'cmd': c.cmd,
'pid_name': c.pid_name,
'pid_opt': c.pid_opt,
'background_opt': c.background_opt,
'duration': c.duration,
}
continue
for w in worker_nodes:
Expand All @@ -53,12 +68,12 @@ def build_cmd_map(self, cmd_list, central_node, worker_nodes):
'cmd': c.cmd,
'pid_name': c.pid_name,
'pid_opt': c.pid_opt,
'background_opt': c.background_opt,
'duration': c.duration,
}
break
return cmd_map

def exec_cmd(self, iteration, test):
async def exec_cmd(self, iteration, test):
ext_cmd = self.cmd_map.get((iteration, test), None)
if not ext_cmd:
return
Expand All @@ -68,13 +83,25 @@ def exec_cmd(self, iteration, test):

if len(ext_cmd['pid_name']):
stdout = StringIO()
w.run("pidof -s " + ext_cmd['pid_name'], stdout=stdout)
await w.run("pidof -s " + ext_cmd['pid_name'], stdout=stdout)
pid = stdout.getvalue().strip()
cmd = cmd + " " + ext_cmd['pid_opt'] + " " + pid

if ext_cmd['background_opt']:
cmd = cmd + " &"
if ext_cmd['duration'] <= 0:
await w.run(cmd, stdout=stdout)
else:
process = await w.run_deferred(cmd)
self.processes.append((process, ext_cmd['duration']))

def terminate_process(self, test):
new_processes = []
for process, duration in self.processes:
duration -= 1
if duration <= 0:
log.info(f'Terminating command "{process.command}"')
# XXX Should we print stdout/stderr from the command?
process.terminate()
else:
new_processes.append((process, duration))

stdout = StringIO()
w.run(cmd, stdout=stdout)
return stdout.getvalue().strip()
self.processes = new_processes
54 changes: 29 additions & 25 deletions ovn-tester/ovn_load_balancer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,21 @@ def __str__(self):
return f"Invalid Protocol: {self.args}"


async def create_load_balancer(lb_name, nbctl, vips=None,
dceara marked this conversation as resolved.
Show resolved Hide resolved
protocols=VALID_PROTOCOLS):
lb = OvnLoadBalancer(lb_name, nbctl)
valid_protocols = [prot for prot in protocols if prot in VALID_PROTOCOLS]
if len(valid_protocols) == 0:
raise InvalidProtocol(protocols)
for protocol in valid_protocols:
lb.lbs.append(await nbctl.create_lb(lb_name, protocol))
if vips:
await lb.add_vips(vips)
return lb


class OvnLoadBalancer(object):
def __init__(self, lb_name, nbctl, vips=None, protocols=VALID_PROTOCOLS):
def __init__(self, lb_name, nbctl):
'''
Create load balancers with optional vips.
lb_name: String used as basis for load balancer name.
Expand All @@ -21,20 +34,11 @@ def __init__(self, lb_name, nbctl, vips=None, protocols=VALID_PROTOCOLS):
protocols: List of protocols to use when creating Load Balancers.
'''
self.nbctl = nbctl
self.protocols = [
prot for prot in protocols if prot in VALID_PROTOCOLS
]
if len(self.protocols) == 0:
raise InvalidProtocol(protocols)
self.name = lb_name
self.vips = {}
self.vips = dict()
self.lbs = []
for protocol in self.protocols:
self.lbs.append(self.nbctl.create_lb(self.name, protocol))
if vips:
self.add_vips(vips)

def add_vips(self, vips):
async def add_vips(self, vips):
'''
Add VIPs to a load balancer.
vips: Dictionary with key being a VIP string, and value being a list of
Expand All @@ -52,17 +56,17 @@ def add_vips(self, vips):
updated_vips[vip] = cur_backends

for lb in self.lbs:
self.nbctl.lb_set_vips(lb.uuid, updated_vips)
await self.nbctl.lb_set_vips(lb.uuid, updated_vips)

def clear_vips(self):
async def clear_vips(self):
'''
Clear all VIPs from the load balancer.
'''
self.vips.clear()
for lb in self.lbs:
self.nbctl.lb_clear_vips(lb.uuid)
await self.nbctl.lb_clear_vips(lb.uuid)

def add_backends_to_vip(self, backends, vips=None):
async def add_backends_to_vip(self, backends, vips=None):
'''
Add backends to existing load balancer VIPs.
backends: A list of IP addresses to add as backends to VIPs.
Expand All @@ -74,20 +78,20 @@ def add_backends_to_vip(self, backends, vips=None):
cur_backends.extend(backends)

for lb in self.lbs:
self.nbctl.lb_set_vips(lb.uuid, self.vips)
await self.nbctl.lb_set_vips(lb.uuid, self.vips)

def add_to_routers(self, routers):
async def add_to_routers(self, routers):
for lb in self.lbs:
self.nbctl.lb_add_to_routers(lb.uuid, routers)
await self.nbctl.lb_add_to_routers(lb.uuid, routers)

def add_to_switches(self, switches):
async def add_to_switches(self, switches):
for lb in self.lbs:
self.nbctl.lb_add_to_switches(lb.uuid, switches)
await self.nbctl.lb_add_to_switches(lb.uuid, switches)

def remove_from_routers(self, routers):
async def remove_from_routers(self, routers):
for lb in self.lbs:
self.nbctl.lb_remove_from_routers(lb.uuid, routers)
await self.nbctl.lb_remove_from_routers(lb.uuid, routers)

def remove_from_switches(self, switches):
async def remove_from_switches(self, switches):
for lb in self.lbs:
self.nbctl.lb_remove_from_switches(lb.uuid, switches)
await self.nbctl.lb_remove_from_switches(lb.uuid, switches)
Loading