diff --git a/.github/workflows/trigger_ab_tests.yml b/.github/workflows/trigger_ab_tests.yml index bed66f08ddf..bee5f4af191 100644 --- a/.github/workflows/trigger_ab_tests.yml +++ b/.github/workflows/trigger_ab_tests.yml @@ -2,6 +2,7 @@ on: push: branches: - main + - firecracker-v* jobs: trigger_ab_test: diff --git a/tests/framework/ab_test.py b/tests/framework/ab_test.py index 316ecc5704b..8b7204baf20 100644 --- a/tests/framework/ab_test.py +++ b/tests/framework/ab_test.py @@ -100,7 +100,9 @@ def git_ab_test( return result_a, result_b, comparison -def check_regression(a_samples: List[float], b_samples: List[float]): +def check_regression( + a_samples: List[float], b_samples: List[float], *, n_resamples: int = 9999 +): """Checks for a regression by performing a permutation test. A permutation test is a non-parametric test that takes three parameters: Two populations (sets of samples) and a function computing a "statistic" based on two populations. First, the test computes the statistic for the initial populations. It then randomly @@ -120,6 +122,7 @@ def check_regression(a_samples: List[float], b_samples: List[float]): # Compute the difference of means, such that a positive different indicates potential for regression. lambda x, y: statistics.mean(y) - statistics.mean(x), vectorized=False, + n_resamples=n_resamples, ) diff --git a/tests/framework/utils_iperf.py b/tests/framework/utils_iperf.py index ab7117e405c..4b1b5691e41 100644 --- a/tests/framework/utils_iperf.py +++ b/tests/framework/utils_iperf.py @@ -57,11 +57,13 @@ def run_test(self, first_free_cpu): assert self._num_clients < CpuMap.len() - self._microvm.vcpus_count - 2 for server_idx in range(self._num_clients): - cmd = self.host_command(server_idx).build() assigned_cpu = CpuMap(first_free_cpu) - utils.run_cmd( - f"taskset --cpu-list {assigned_cpu} {self._microvm.jailer.netns_cmd_prefix()} {cmd}" + cmd = ( + self.host_command(server_idx) + .with_arg("--affinity", assigned_cpu) + .build() ) + utils.run_cmd(f"{self._microvm.jailer.netns_cmd_prefix()} {cmd}") first_free_cpu += 1 time.sleep(SERVER_STARTUP_TIME_SEC) @@ -105,12 +107,14 @@ def spawn_iperf3_client(self, client_idx): mode = MODE_MAP[self._mode][client_idx % len(MODE_MAP[self._mode])] # Add the port where the iperf3 client is going to send/receive. - cmd = self.guest_command(client_idx).with_arg(mode).build() - - pinned_cmd = ( - f"taskset --cpu-list {client_idx % self._microvm.vcpus_count} {cmd}" + cmd = ( + self.guest_command(client_idx) + .with_arg(mode) + .with_arg("--affinity", client_idx % self._microvm.vcpus_count) + .build() ) - rc, stdout, stderr = self._microvm.ssh.run(pinned_cmd) + + rc, stdout, stderr = self._microvm.ssh.run(cmd) assert rc == 0, stderr @@ -176,18 +180,24 @@ def emit_iperf3_metrics(metrics, iperf_result, omit): )[0]: metrics.put_metric("cpu_utilization_vmm", cpu_util_data_point, "Percent") - for time_series in iperf_result["g2h"]: - for interval in time_series["intervals"][omit:]: - metrics.put_metric( - "throughput_guest_to_host", - interval["sum"]["bits_per_second"], - "Bits/Second", - ) + data_points = zip( + *[time_series["intervals"][omit:] for time_series in iperf_result["g2h"]] + ) - for time_series in iperf_result["h2g"]: - for interval in time_series["intervals"][omit:]: - metrics.put_metric( - "throughput_host_to_guest", - interval["sum"]["bits_per_second"], - "Bits/Second", - ) + for point_in_time in data_points: + metrics.put_metric( + "throughput_guest_to_host", + sum(interval["sum"]["bits_per_second"] for interval in point_in_time), + "Bits/Second", + ) + + data_points = zip( + *[time_series["intervals"][omit:] for time_series in iperf_result["h2g"]] + ) + + for point_in_time in data_points: + metrics.put_metric( + "throughput_host_to_guest", + sum(interval["sum"]["bits_per_second"] for interval in point_in_time), + "Bits/Second", + ) diff --git a/tests/host_tools/metrics.py b/tests/host_tools/metrics.py index ef636373295..7a5d15f1bfa 100644 --- a/tests/host_tools/metrics.py +++ b/tests/host_tools/metrics.py @@ -105,7 +105,8 @@ def emit_raw_emf(emf_msg: dict): "AWS_EMF_LOG_GROUP_NAME", f"{namespace}-metrics" ) emf_msg["_aws"]["LogStreamName"] = os.environ.get("AWS_EMF_LOG_STREAM_NAME", "") - emf_msg["_aws"]["Namespace"] = namespace + for metrics in emf_msg["_aws"]["CloudWatchMetrics"]: + metrics["Namespace"] = namespace emf_endpoint = urlparse(os.environ["AWS_EMF_AGENT_ENDPOINT"]) with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: diff --git a/tests/integration_tests/performance/test_block_ab.py b/tests/integration_tests/performance/test_block_ab.py index 75729aee465..f0946d4a996 100644 --- a/tests/integration_tests/performance/test_block_ab.py +++ b/tests/integration_tests/performance/test_block_ab.py @@ -106,25 +106,38 @@ def run_fio(microvm, mode, block_size): def process_fio_logs(vm, fio_mode, logs_dir, metrics): """Parses the fio logs in `{logs_dir}/{fio_mode}_bw.*.log and emits their contents as CloudWatch metrics""" - for job_id in range(vm.vcpus_count): - data = Path(f"{logs_dir}/{fio_mode}_bw.{job_id + 1}.log").read_text("UTF-8") - for line in data.splitlines(): + data = [ + Path(f"{logs_dir}/{fio_mode}_bw.{job_id + 1}.log") + .read_text("UTF-8") + .splitlines() + for job_id in range(vm.vcpus_count) + ] + + for tup in zip(*data): + bw_read = 0 + bw_write = 0 + + for line in tup: _, value, direction, _ = line.split(",", maxsplit=3) value = int(value.strip()) # See https://fio.readthedocs.io/en/latest/fio_doc.html#log-file-formats match direction.strip(): case "0": - metrics.put_metric("bw_read", value, "Kilobytes/Second") + bw_read += value case "1": - metrics.put_metric("bw_write", value, "Kilobytes/Second") + bw_write += value case _: assert False + if bw_read: + metrics.put_metric("bw_read", bw_read, "Kilobytes/Second") + if bw_write: + metrics.put_metric("bw_write", bw_write, "Kilobytes/Second") + @pytest.mark.nonci -@pytest.mark.timeout(RUNTIME_SEC * 1000) # 1.40 hours @pytest.mark.parametrize("vcpus", [1, 2], ids=["1vcpu", "2vcpu"]) @pytest.mark.parametrize("fio_mode", ["randread", "randwrite"]) @pytest.mark.parametrize("fio_block_size", [4096], ids=["bs4096"]) diff --git a/tests/integration_tests/performance/test_network_ab.py b/tests/integration_tests/performance/test_network_ab.py index 55769348ce2..46bf9516014 100644 --- a/tests/integration_tests/performance/test_network_ab.py +++ b/tests/integration_tests/performance/test_network_ab.py @@ -6,7 +6,6 @@ import pytest -from framework.utils import CpuMap from framework.utils_iperf import IPerf3Test, emit_iperf3_metrics # each iteration is 30 * 0.2s = 6s @@ -46,50 +45,57 @@ def consume_ping_output(ping_putput): yield float(time[0]) -@pytest.mark.nonci -@pytest.mark.timeout(3600) -def test_network_latency(microvm_factory, guest_kernel, rootfs, metrics): - """ - Test network latency for multiple vm configurations. - - Send a ping from the guest to the host. - """ +@pytest.fixture +def network_microvm(request, microvm_factory, guest_kernel, rootfs): + """Creates a microvm with the networking setup used by the performance tests in this file. + This fixture receives its vcpu count via indirect parameterization""" vm = microvm_factory.build(guest_kernel, rootfs, monitor_memory=False) vm.spawn(log_level="Info") - vm.basic_config(vcpu_count=GUEST_VCPUS, mem_size_mib=GUEST_MEM_MIB) - iface = vm.add_net_iface() + vm.basic_config(vcpu_count=request.param, mem_size_mib=GUEST_MEM_MIB) + vm.add_net_iface() vm.start() - # Check if the needed CPU cores are available. We have the API thread, VMM - # thread and then one thread for each configured vCPU. - assert CpuMap.len() >= 2 + vm.vcpus_count - # Pin uVM threads to physical cores. assert vm.pin_vmm(0), "Failed to pin firecracker thread." assert vm.pin_api(1), "Failed to pin fc_api thread." for i in range(vm.vcpus_count): assert vm.pin_vcpu(i, i + 2), f"Failed to pin fc_vcpu {i} thread." + return vm + + +@pytest.mark.nonci +@pytest.mark.parametrize("network_microvm", [1], indirect=True) +def test_network_latency( + network_microvm, metrics +): # pylint:disable=redefined-outer-name + """ + Test network latency for multiple vm configurations. + + Send a ping from the guest to the host. + """ + samples = [] + host_ip = network_microvm.iface["eth0"]["iface"].host_ip for _ in range(ITERATIONS): - rc, ping_output, stderr = vm.ssh.run( - f"ping -c {REQUEST_PER_ITERATION} -i {DELAY} {iface.host_ip}" + rc, ping_output, stderr = network_microvm.ssh.run( + f"ping -c {REQUEST_PER_ITERATION} -i {DELAY} {host_ip}" ) assert rc == 0, stderr samples.extend(consume_ping_output(ping_output)) metrics.set_dimensions( - {"performance_test": "test_network_latency", **vm.dimensions} + {"performance_test": "test_network_latency", **network_microvm.dimensions} ) for sample in samples: metrics.put_metric("ping_latency", sample, "Milliseconds") -class TCPIPerf3Test(IPerf3Test): +class TcpIPerf3Test(IPerf3Test): """IPerf3 runner for the TCP throughput performance test""" BASE_PORT = 5000 @@ -120,18 +126,15 @@ def __init__(self, microvm, mode, host_ip, payload_length): @pytest.mark.nonci @pytest.mark.timeout(3600) -@pytest.mark.parametrize("vcpus", [1, 2]) +@pytest.mark.parametrize("network_microvm", [1, 2], indirect=True) @pytest.mark.parametrize("payload_length", ["128K", "1024K"], ids=["p128K", "p1024K"]) @pytest.mark.parametrize("mode", ["g2h", "h2g", "bd"]) def test_network_tcp_throughput( - microvm_factory, - guest_kernel, - rootfs, - vcpus, + network_microvm, payload_length, mode, metrics, -): +): # pylint:disable=redefined-outer-name """ Iperf between guest and host in both directions for TCP workload. """ @@ -139,36 +142,24 @@ def test_network_tcp_throughput( # We run bi-directional tests only on uVM with more than 2 vCPus # because we need to pin one iperf3/direction per vCPU, and since we # have two directions, we need at least two vCPUs. - if mode == "bd" and vcpus < 2: + if mode == "bd" and network_microvm.vcpus_count < 2: pytest.skip("bidrectional test only done with at least 2 vcpus") - vm = microvm_factory.build(guest_kernel, rootfs, monitor_memory=False) - vm.spawn(log_level="Info") - vm.basic_config(vcpu_count=vcpus, mem_size_mib=GUEST_MEM_MIB) - iface = vm.add_net_iface() - vm.start() - - # Check if the needed CPU cores are available. We have the API thread, VMM - # thread and then one thread for each configured vCPU. Lastly, we need one for - # the iperf server on the host. - assert CpuMap.len() > 2 + vm.vcpus_count - - # Pin uVM threads to physical cores. - assert vm.pin_vmm(0), "Failed to pin firecracker thread." - assert vm.pin_api(1), "Failed to pin fc_api thread." - for i in range(vm.vcpus_count): - assert vm.pin_vcpu(i, i + 2), f"Failed to pin fc_vcpu {i} thread." - - test = TCPIPerf3Test(vm, mode, iface.host_ip, payload_length) - data = test.run_test(vm.vcpus_count + 2) + test = TcpIPerf3Test( + network_microvm, + mode, + network_microvm.iface["eth0"]["iface"].host_ip, + payload_length, + ) + data = test.run_test(network_microvm.vcpus_count + 2) metrics.set_dimensions( { "performance_test": "test_network_tcp_throughput", "payload_length": payload_length, "mode": mode, - **vm.dimensions, + **network_microvm.dimensions, } ) - emit_iperf3_metrics(metrics, data, TCPIPerf3Test.WARMUP_SEC) + emit_iperf3_metrics(metrics, data, TcpIPerf3Test.WARMUP_SEC) diff --git a/tests/integration_tests/performance/test_vsock_ab.py b/tests/integration_tests/performance/test_vsock_ab.py index 5d3f2b64560..9a2f068535c 100644 --- a/tests/integration_tests/performance/test_vsock_ab.py +++ b/tests/integration_tests/performance/test_vsock_ab.py @@ -6,7 +6,6 @@ import pytest -from framework.utils import CpuMap from framework.utils_iperf import IPerf3Test, emit_iperf3_metrics from framework.utils_vsock import VSOCK_UDS_PATH, make_host_port_path @@ -93,11 +92,6 @@ def test_vsock_throughput( vm.api.vsock.put(vsock_id="vsock0", guest_cid=3, uds_path="/" + VSOCK_UDS_PATH) vm.start() - # Check if the needed CPU cores are available. We have the API thread, VMM - # thread and then one thread for each configured vCPU. Lastly, we need one for - # the iperf server on the host. - assert CpuMap.len() > 2 + vm.vcpus_count - # Pin uVM threads to physical cores. assert vm.pin_vmm(0), "Failed to pin firecracker thread." assert vm.pin_api(1), "Failed to pin fc_api thread." diff --git a/tools/ab_test.py b/tools/ab_test.py index 0efd89b2fae..3084f5a51e6 100755 --- a/tools/ab_test.py +++ b/tools/ab_test.py @@ -40,6 +40,30 @@ get_metrics_logger, ) +# Performance tests that are known to be unstable and exhibit variances of up to 60% of the mean +IGNORED = [ + # Network throughput on m6a.metal + {"instance": "m6a.metal", "performance_test": "test_network_tcp_throughput"}, + # Block throughput for 1 vcpu on m6g.metal/5.10 + { + "performance_test": "test_block_performance", + "instance": "m6g.metal", + "host_kernel": "linux-5.10", + "vcpus": "1", + }, +] + + +def is_ignored(dimensions) -> bool: + """Checks whether the given dimensions match a entry in the IGNORED dictionary above""" + for high_variance in IGNORED: + matching = {key: dimensions[key] for key in high_variance} + + if matching == high_variance: + return True + + return False + def extract_dimensions(emf): """Extracts the cloudwatch dimensions from an EMF log message""" @@ -145,7 +169,7 @@ def collect_data(firecracker_checkout: Path, test: str): return load_data_series(revision) -def analyze_data(processed_emf_a, processed_emf_b): +def analyze_data(processed_emf_a, processed_emf_b, *, n_resamples: int = 9999): """ Analyzes the A/B-test data produced by `collect_data`, by performing regression tests as described this script's doc-comment. @@ -175,7 +199,9 @@ def analyze_data(processed_emf_a, processed_emf_b): print( f"Doing A/B-test for dimensions {dimension_set} and property {metric}" ) - result = check_regression(values_a, metrics_b[metric][0]) + result = check_regression( + values_a, metrics_b[metric][0], n_resamples=n_resamples + ) metrics_logger.set_dimensions({"metric": metric, **dict(dimension_set)}) metrics_logger.put_metric("p_value", float(result.pvalue), "None") @@ -189,7 +215,9 @@ def analyze_data(processed_emf_a, processed_emf_b): return results -def ab_performance_test(a_revision, b_revision, test, p_thresh, strength_thresh): +def ab_performance_test( + a_revision, b_revision, test, p_thresh, strength_thresh, noise_threshold +): """Does an A/B-test of the specified test across the given revisions""" _, commit_list, _ = utils.run_cmd( f"git --no-pager log --oneline {a_revision}..{b_revision}" @@ -201,17 +229,62 @@ def ab_performance_test(a_revision, b_revision, test, p_thresh, strength_thresh) processed_emf_a, processed_emf_b, results = git_ab_test( lambda checkout, _: collect_data(checkout, test), - analyze_data, + lambda ah, be: analyze_data(ah, be, n_resamples=int(100 / p_thresh)), a_revision=a_revision, b_revision=b_revision, ) + # We sort our A/B-Testing results keyed by metric here. The resulting lists of values + # will be approximately normal distributed, and we will use this property as a means of error correction. + # The idea behind this is that testing the same metric (say, restore_latency) across different scenarios (e.g. + # different vcpu counts) will be related in some unknown way (meaning most scenarios will show a change in the same + # direction). In particular, if one scenario yields a slight improvement and the next yields a + # slight degradation, we take this as evidence towards both being mere noise that cancels out. + # + # Empirical evidence for this assumption is that + # 1. Historically, a true performance change has never shown up in just a single test, it always showed up + # across most (if not all) tests for a specific metric. + # 2. Analyzing data collected from historical runs shows that across different parameterizations of the same + # metric, the collected samples approximately follow mean / variance = const, with the constant independent + # of the parameterization. + # + # Mathematically, this has the following justification: By the central + # limit theorem, the means of samples are (approximately) normal distributed. Denote by A + # and B the distributions of the mean of samples from the 'A' and 'B' + # tests respectively. Under our null hypothesis, the distributions of the + # 'A' and 'B' samples are identical (although we dont know what the exact + # distributions are), meaning so are A and B, say A ~ B ~ N(mu, sigma^2). + # The difference of two normal distributions is also normal distributed, + # with the means being subtracted and the variances being added. + # Therefore, A - B ~ N(0, 2sigma^2). If we now normalize this distribution by mu (which + # corresponds to considering the distribution of relative regressions instead), we get (A-B)/mu ~ N(0, c), with c + # being the constant from point 2. above. This means that we can combine the relative means across + # different parameterizations, and get a distributions whose expected + # value is 0, provided our null hypothesis was true. It is exactly this distribution + # for which we collect samples in the dictionary below. Therefore, a sanity check + # on the average of the average of the performance changes for a single metric + # is a good candidates for a sanity check against false-positives. + # + # Note that with this approach, for performance changes to "cancel out", we would need essentially a perfect split + # between scenarios that improve performance and scenarios that degrade performance, something we have not + # ever observed to actually happen. + relative_changes_by_metric = {} + failures = [] for (dimension_set, metric), (result, unit) in results.items(): + if is_ignored(dict(dimension_set)): + continue + values_a = processed_emf_a[dimension_set][metric][0] + baseline_mean = statistics.mean(values_a) + + if metric not in relative_changes_by_metric: + relative_changes_by_metric[metric] = [] + relative_changes_by_metric[metric].append(result.statistic / baseline_mean) + if ( result.pvalue < p_thresh - and abs(result.statistic) > abs(statistics.mean(values_a)) * strength_thresh + and abs(result.statistic) > baseline_mean * strength_thresh ): failures.append((dimension_set, metric, result, unit)) @@ -225,8 +298,10 @@ def ab_performance_test(a_revision, b_revision, test, p_thresh, strength_thresh) f"characteristics did not change across the tested commits, has a probability of {result.pvalue:.2%}. " f"Tested Dimensions:\n{json.dumps(dict(dimension_set), indent=2)}" for (dimension_set, metric, result, unit) in failures + # Sanity check as described above + if abs(statistics.mean(relative_changes_by_metric[metric])) > noise_threshold ) - assert not failures, "\n" + failure_report + assert not failure_report, "\n" + failure_report print("No regressions detected!") @@ -251,13 +326,16 @@ def canonicalize_revision(revision): parser.add_argument( "--significance", help="The p-value threshold that needs to be crossed for a test result to be considered significant", + type=float, default=0.01, ) parser.add_argument( "--relative-strength", help="The minimal delta required before a regression will be considered valid", - default=0.2, + type=float, + default=0.0, ) + parser.add_argument("--noise-threshold", type=float, default=0.05) args = parser.parse_args() ab_performance_test( @@ -267,4 +345,5 @@ def canonicalize_revision(revision): args.test, args.significance, args.relative_strength, + args.noise_threshold, )