From b515fc99e36aa295ccedcd93f7e3cd6abf4e050e Mon Sep 17 00:00:00 2001 From: Britt Cyr Date: Wed, 20 Nov 2024 23:06:02 +0100 Subject: [PATCH] add updated script --- programs/manifest/verify-manifest.py | 235 +++++++++++++++++++++++++++ 1 file changed, 235 insertions(+) create mode 100755 programs/manifest/verify-manifest.py diff --git a/programs/manifest/verify-manifest.py b/programs/manifest/verify-manifest.py new file mode 100755 index 000000000..dff00860a --- /dev/null +++ b/programs/manifest/verify-manifest.py @@ -0,0 +1,235 @@ +from enum import Enum +import subprocess +import io +import sys +import datetime +import time +import os +import json +import argparse + + +class VerificationResult(Enum): + Verified = 0 + Violated = 1 + Timeout = 2 + UnexpectedError = 3 + + def __str__(self): + return f"{self.name}" + + @staticmethod + def from_string(result: str) -> 'VerificationResult': + return VerificationResult[result] + + @staticmethod + def from_command_result(command_result: subprocess.CompletedProcess[str]) -> 'VerificationResult': + if command_result.returncode == 0: + assert "|Not violated" in command_result.stdout, "The verification terminated successfully, but cannot find the '|Not violated' substring in stdout" + return VerificationResult.Verified + elif "|Violated" in command_result.stdout: + # If the return code is not zero, and in the stdout we find `|Violated`, + # then the verification of the rule failed. + return VerificationResult.Violated + elif "|Timeout" in command_result.stdout: + # If the return code is not zero, and in the stdout we find `|Timeout`, + # then the verification of the rule failed. + return VerificationResult.Timeout + else: + # If the return code is not zero, but we cannot find `|Violated` in + # stdout, then we had an unexpected error while calling `just`. + return VerificationResult.UnexpectedError + + +class ProverOption: + '''Option to give to the prover.''' + pass + + +class Bmc(ProverOption): + def __init__(self, n: int): + self.n = n + + def __str__(self) -> str: + return f'--loop_iter {self.n}' + + +class AssumeUnwindCond(ProverOption): + def __str__(self) -> str: + return f'--optimistic_loop' + + +class CargoFeature: + '''Cargo feature to use when calling `cargo build-sbf`.''' + pass + + +class CvtDbMock(CargoFeature): + def __str__(self) -> str: + return 'cvt-db-mock' + + +class Rule: + def __init__(self, name: str, expected_result: VerificationResult, prover_options: list[ProverOption], cargo_features: list[CargoFeature]): + self.name = name + self.expected_result = expected_result + self.prover_options = prover_options + self.cargo_features = cargo_features + + def __str__(self) -> str: + return f"{self.name}" + + @staticmethod + def list_from_json(rules_config_file_path: str) -> list['Rule']: + with open(rules_config_file_path, 'r') as f: + data = json.load(f) + + rules = [] + for rule_data in data['rules']: + name = rule_data['name'] + + # Load expected result + expected_result_str = rule_data.get('expected_result') + if expected_result_str not in VerificationResult.__members__: + raise ValueError(f"Invalid expected result '{expected_result_str}' for rule '{name}'") + expected_result = VerificationResult.from_string( + expected_result_str) + + # Load prover options + prover_options = [] + for option in rule_data['prover_options']: + if 'bmc' in option: + if not isinstance(option['bmc'], int): + raise TypeError(f"Invalid type for 'bmc' option in rule '{name}', expected integer") + prover_options.append(Bmc(option['bmc'])) + elif 'assumeUnwindCond' in option: + if not isinstance(option['assumeUnwindCond'], bool): + raise TypeError(f"Invalid type for 'assumeUnwindCond' option in rule '{name}', expected boolean") + prover_options.append(AssumeUnwindCond()) + else: + raise ValueError(f"Unknown prover option '{option}' in rule '{name}'") + + # Load cargo features + cargo_features = [] + valid_features = { + 'cvt-db-mock': lambda: CvtDbMock() + } + for feature in rule_data['cargo_features']: + if feature not in valid_features: + raise ValueError(f"Unknown cargo feature '{feature}' in rule '{name}'") + else: + cargo_features.append(valid_features[feature]()) + + rules.append(Rule(name, expected_result, + prover_options, cargo_features)) + + return rules + + +class VerificationRunner: + def __init__(self): + self.had_error = False + + def verify_all(self, rules: list[Rule]): + logfile_name = VerificationRunner.generate_logfile_name() + with open(logfile_name, 'w') as logfile: + for (index, rule) in enumerate(rules): + print(f'[{index+1:2}/{len(rules)}] {rule.name} ... ', end='') + self.verify_rule(logfile, rule) + VerificationRunner.clean() + if self.had_error: + sys.exit(1) + + def verify_rule(self, logfile: io.TextIOWrapper, rule: Rule): + sys.stdout.flush() + command = VerificationRunner.build_command(rule) + try: + start_time = time.time() + verification_result = self.run_verification(command, logfile, rule) + end_time = time.time() # Record the end time + elapsed_time = end_time - start_time # Calculate the elapsed time + self.check_verification_result( + verification_result, rule.expected_result, command, elapsed_time) + except FileNotFoundError: + print(f"Failed to run command: `{command}`") + + @staticmethod + def build_command(rule: Rule) -> str: + command = f'just verify-remote {rule.name}' + for option in rule.prover_options: + command += f' {option}' + return command + + def run_verification(self, command: str, logfile: io.TextIOWrapper, rule: Rule) -> VerificationResult: + verification_env = VerificationRunner.build_verification_env(rule) + # Run the verifier and capure the output. + command_result = subprocess.run( + command.split(), check=False, text=True, capture_output=True, env=verification_env) + VerificationRunner.log_output( + logfile, rule.name, command, command_result) + return VerificationResult.from_command_result(command_result) + + @staticmethod + def build_verification_env(rule: Rule) -> dict[str, str]: + # Start with the current environment variables + verification_env = os.environ.copy() + cargo_features = '' + for feature in rule.cargo_features: + cargo_features += f' {feature}' + verification_env["CARGO_FEATURES"] = cargo_features + return verification_env + + def check_verification_result(self, verification_result: VerificationResult, expected_result: VerificationResult, command: str, elapsed_seconds: float) -> None: + # Assert that we did not have an unexpected error (i.e., compilation + # error), since all the rules should be verified or not verified. + assert verification_result != VerificationResult.UnexpectedError, \ + f"Had unexpected error running `{command}`" + + # A timeout is an unexpected event + assert verification_result != VerificationResult.Timeout, \ + f"Had a timeout event running `{command}`" + + if verification_result == expected_result: + print(f'ok ({elapsed_seconds:.2f}s)') + else: + print(f'error ({elapsed_seconds:.2f}s)') + print(f'\tExpected result: {expected_result}') + print(f'\tActual result: {verification_result}') + self.had_error = True + + @staticmethod + def generate_logfile_name() -> str: + current_time = datetime.datetime.now() + return current_time.strftime("log_verification_%Y-%m-%d_%H:%M:%S") + + @staticmethod + def log_output(logfile: io.TextIOWrapper, rule_name: str, command: str, command_result: subprocess.CompletedProcess[str]) -> None: + print(f'---- Rule {rule_name} ----', file=logfile) + print(f'Command: `{command}`', file=logfile) + print(f'Return code: {command_result.returncode}', file=logfile) + print(f'Stdout:', file=logfile) + print(command_result.stdout, file=logfile) + print(f'Stderr:', file=logfile) + print(command_result.stderr, file=logfile) + + @staticmethod + def clean() -> None: + ''' Call `just clean`. ''' + subprocess.run(["just", "clean"], check=True, capture_output=True) + +# TODO: @phreppo - should sanity rules be added? + + +# Parse the CLI options +parser = argparse.ArgumentParser( + description="Run the rules as specified in a JSON configuration file") +parser.add_argument( + "-r", "--rules", + help="Path to the JSON configuration file with the rules specification.", + type=str, + default="rules.json" +) + +rules = Rule.list_from_json(args.rules) +runner = VerificationRunner() +runner.verify_all(rules)