diff --git a/osv/impact_git_test.py b/osv/impact_git_test.py new file mode 100644 index 00000000000..7fefc5a52ae --- /dev/null +++ b/osv/impact_git_test.py @@ -0,0 +1,488 @@ +"""impact_git_test.py: Tests for the impact module using git repositories.""" + +from .test_tools.test_repository import TestRepository +import unittest +from . import impact + + +class GitImpactTest(unittest.TestCase): + """Tests for the impact module using git repositories.""" + + @classmethod + def setUpClass(cls): + cls.__repo_analyzer = impact.RepoAnalyzer(detect_cherrypicks=False) + + ######## 1rst : tests with only "introduced" and "fixed" + + def test_introduced_fixed_linear(self): + """Simple range, only two commits are vulnerable. """ + + repo = TestRepository("test_introduced_fixed_linear", debug=False) + + first = repo.add_empty_commit( + vulnerability=TestRepository.VulnerabilityType.INTRODUCED) + second = repo.add_empty_commit(parents=[first]) + repo.add_empty_commit( + parents=[second], vulnerability=TestRepository.VulnerabilityType.FIXED) + (all_introduced, all_fixed, all_last_affected, + all_limit) = repo.get_ranges() + + result = self.__repo_analyzer.get_affected(repo.repo, all_introduced, + all_fixed, all_limit, + all_last_affected) + + expected = set([first.hex, second.hex]) + repo.remove() + self.assertEqual( + result.commits, + expected, + "Expected: %s, got: %s" % (expected, result.commits), + ) + + def test_introduced_fixed_branch_propagation(self): + """Ensures the detection of the propagation + of the vulnerability in created branches""" + repo = TestRepository( + "test_introduced_fixed_branch_propagation", debug=False) + + first = repo.add_empty_commit( + vulnerability=TestRepository.VulnerabilityType.INTRODUCED) + second = repo.add_empty_commit(parents=[first]) + repo.add_empty_commit( + parents=[second], vulnerability=TestRepository.VulnerabilityType.FIXED) + fourth = repo.add_empty_commit(parents=[second]) + (all_introduced, all_fixed, all_last_affected, + all_limit) = repo.get_ranges() + + result = self.__repo_analyzer.get_affected(repo.repo, all_introduced, + all_fixed, all_limit, + all_last_affected) + + expected = set([first.hex, second.hex, fourth.hex]) + repo.remove() + self.assertEqual( + result.commits, + expected, + "Expected: %s, got: %s" % (expected, result.commits), + ) + + def test_introduced_fixed_merge(self): + """Ensures that a merge without a fix does not + affect the propagation of a vulnerability""" + repo = TestRepository("test_introduced_fixed_merge", debug=False) + + first = repo.add_empty_commit( + vulnerability=TestRepository.VulnerabilityType.INTRODUCED) + second = repo.add_empty_commit() + third = repo.add_empty_commit(parents=[first, second]) + repo.add_empty_commit( + parents=[third], vulnerability=TestRepository.VulnerabilityType.FIXED) + (all_introduced, all_fixed, all_last_affected, + all_limit) = repo.get_ranges() + + result = self.__repo_analyzer.get_affected(repo.repo, all_introduced, + all_fixed, all_limit, + all_last_affected) + + expected = set([first.hex, third.hex]) + repo.remove() + self.assertEqual( + result.commits, + expected, + "Expected: %s, got: %s" % (expected, result.commits), + ) + + def test_introduced_fixed_two_linear(self): + """Ensures that multiple introduced commit + in the same branch are correctly handled""" + repo = TestRepository("test_introduced_fixed_two_linear", debug=False) + + first = repo.add_empty_commit( + vulnerability=TestRepository.VulnerabilityType.INTRODUCED) + second = repo.add_empty_commit( + parents=[first], vulnerability=TestRepository.VulnerabilityType.FIXED) + third = repo.add_empty_commit( + parents=[second], + vulnerability=TestRepository.VulnerabilityType.INTRODUCED) + repo.add_empty_commit( + parents=[third], vulnerability=TestRepository.VulnerabilityType.FIXED) + (all_introduced, all_fixed, all_last_affected, + all_limit) = repo.get_ranges() + + result = self.__repo_analyzer.get_affected(repo.repo, all_introduced, + all_fixed, all_limit, + all_last_affected) + + expected = set([first.hex, third.hex]) + repo.remove() + self.assertEqual( + result.commits, + expected, + "Expected: %s, got: %s" % (expected, result.commits), + ) + + def test_introduced_fixed_merge_propagation(self): + """Ensures that a vulnerability is propagated from + a branch, in spite of the main branch having a fix.""" + + repo = TestRepository( + "test_introduced_fixed_merge_propagation", debug=False) + + first = repo.add_empty_commit( + vulnerability=TestRepository.VulnerabilityType.INTRODUCED) + second = repo.add_empty_commit( + parents=[first], vulnerability=TestRepository.VulnerabilityType.FIXED) + third = repo.add_empty_commit( + vulnerability=TestRepository.VulnerabilityType.INTRODUCED) + fourth = repo.add_empty_commit(parents=[second, third]) + repo.add_empty_commit( + parents=[fourth], vulnerability=TestRepository.VulnerabilityType.FIXED) + (all_introduced, all_fixed, all_last_affected, + all_limit) = repo.get_ranges() + + result = self.__repo_analyzer.get_affected(repo.repo, all_introduced, + all_fixed, all_limit, + all_last_affected) + + expected = set([first.hex, third.hex, fourth.hex]) + repo.remove() + self.assertEqual( + result.commits, + expected, + "Expected: %s, got: %s" % (expected, result.commits), + ) + + def test_introduced_fixed_fix_propagation(self): + """Ensures that a fix gets propagated, in the case of a merge""" + repo = TestRepository("test_introduced_fixed_fix_propagation") + + first = repo.add_empty_commit( + vulnerability=TestRepository.VulnerabilityType.INTRODUCED) + second = repo.add_empty_commit( + vulnerability=TestRepository.VulnerabilityType.FIXED) + third = repo.add_empty_commit(parents=[first, second]) + repo.add_empty_commit( + parents=[third], vulnerability=TestRepository.VulnerabilityType.FIXED) + (all_introduced, all_fixed, all_last_affected, + all_limit) = repo.get_ranges() + + result = self.__repo_analyzer.get_affected(repo.repo, all_introduced, + all_fixed, all_limit, + all_last_affected) + + expected = set([first.hex]) + repo.remove() + self.assertEqual( + result.commits, + expected, + "Expected: %s, got: %s" % (expected, result.commits), + ) + + ######## 2nd : tests with "introduced" and "limit" + + def test_introduced_limit_linear(self): + """Ensures the basic behavior of limit + (the limit commit is considered unaffected).""" + repo = TestRepository("test_intoduced_limit_linear") + + first = repo.add_empty_commit( + vulnerability=TestRepository.VulnerabilityType.INTRODUCED) + second = repo.add_empty_commit(parents=[first]) + repo.add_empty_commit( + parents=[second], vulnerability=TestRepository.VulnerabilityType.LIMIT) + (all_introduced, all_fixed, all_last_affected, + all_limit) = repo.get_ranges() + + result = self.__repo_analyzer.get_affected(repo.repo, all_introduced, + all_fixed, all_limit, + all_last_affected) + + expected = set([first.hex, second.hex]) + repo.remove() + self.assertEqual( + result.commits, + expected, + "Expected: %s, got: %s" % (expected, result.commits), + ) + + def test_introduced_limit_branch(self): + """Ensures that a limit commit does limit the vulnerability to a branch.""" + repo = TestRepository("test_intoduced_limit_branch") + + first = repo.add_empty_commit( + vulnerability=TestRepository.VulnerabilityType.INTRODUCED) + second = repo.add_empty_commit(parents=[first]) + repo.add_empty_commit( + parents=[second], vulnerability=TestRepository.VulnerabilityType.LIMIT) + repo.add_empty_commit(parents=[second]) + (all_introduced, all_fixed, all_last_affected, + all_limit) = repo.get_ranges() + result = self.__repo_analyzer.get_affected(repo.repo, all_introduced, + all_fixed, all_limit, + all_last_affected) + + expected = set([ + first.hex, + second.hex, + ]) + repo.remove() + self.assertEqual( + result.commits, + expected, + "Expected: %s, got: %s" % (expected, result.commits), + ) + + def test_introduced_limit_merge(self): + """Ensures that a merge without a fix does + not affect the propagation of a vulnerability.""" + repo = TestRepository("test_intoduced_limit_merge", debug=False) + + first = repo.add_empty_commit( + vulnerability=TestRepository.VulnerabilityType.INTRODUCED) + second = repo.add_empty_commit() + third = repo.add_empty_commit(parents=[first, second]) + repo.add_empty_commit( + parents=[third], vulnerability=TestRepository.VulnerabilityType.LIMIT) + (all_introduced, all_fixed, all_last_affected, + all_limit) = repo.get_ranges() + + result = self.__repo_analyzer.get_affected(repo.repo, all_introduced, + all_fixed, all_limit, + all_last_affected) + + expected = set([first.hex, third.hex]) + repo.remove() + self.assertEqual( + result.commits, + expected, + "Expected: %s, got: %s" % (expected, result.commits), + ) + + def test_introduced_limit_two_linear(self): + """Ensures that multiple introduced commit in + the same branch are correctly handled, wrt limit.""" + repo = TestRepository("test_introduced_limit_two_linear", debug=False) + + first = repo.add_empty_commit( + vulnerability=TestRepository.VulnerabilityType.INTRODUCED) + second = repo.add_empty_commit( + parents=[first], vulnerability=TestRepository.VulnerabilityType.LIMIT) + third = repo.add_empty_commit( + parents=[second], + vulnerability=TestRepository.VulnerabilityType.INTRODUCED) + repo.add_empty_commit( + parents=[third], vulnerability=TestRepository.VulnerabilityType.LIMIT) + (all_introduced, all_fixed, all_last_affected, + all_limit) = repo.get_ranges() + + result = self.__repo_analyzer.get_affected(repo.repo, all_introduced, + all_fixed, all_limit, + all_last_affected) + + expected = set([first.hex, third.hex]) + repo.remove() + self.assertEqual( + result.commits, + expected, + "Expected: %s, got: %s" % (expected, result.commits), + ) + + ######## 2nd : tests with "introduced" and "last-affected" + + def test_introduced_last_affected_linear(self): + """Ensures the basic behavior of last_affected + commits (the las_affected commit is considered affected).""" + repo = TestRepository("test_introduced_last_affected_linear") + + first = repo.add_empty_commit( + vulnerability=TestRepository.VulnerabilityType.INTRODUCED) + second = repo.add_empty_commit(parents=[first]) + third = repo.add_empty_commit( + parents=[second], + vulnerability=TestRepository.VulnerabilityType.LAST_AFFECTED, + ) + (all_introduced, all_fixed, all_last_affected, + all_limit) = repo.get_ranges() + + result = self.__repo_analyzer.get_affected(repo.repo, all_introduced, + all_fixed, all_limit, + all_last_affected) + + expected = set([first.hex, second.hex, third.hex]) + repo.remove() + self.assertEqual( + result.commits, + expected, + "Expected: %s, got: %s" % (expected, result.commits), + ) + + def test_introduced_last_affected_branch_propagation(self): + """Ensures that vulnerabilities are propagated to branches""" + repo = TestRepository( + "test_introduced_last_affected_branch_propagation", debug=False) + + first = repo.add_empty_commit( + vulnerability=TestRepository.VulnerabilityType.INTRODUCED) + second = repo.add_empty_commit(parents=[first]) + third = repo.add_empty_commit( + parents=[second], + vulnerability=TestRepository.VulnerabilityType.LAST_AFFECTED, + ) + fourth = repo.add_empty_commit(parents=[second]) + (all_introduced, all_fixed, all_last_affected, + all_limit) = repo.get_ranges() + + result = self.__repo_analyzer.get_affected(repo.repo, all_introduced, + all_fixed, all_limit, + all_last_affected) + + expected = set([first.hex, second.hex, third.hex, fourth.hex]) + repo.remove() + self.assertEqual( + result.commits, + expected, + "Expected: %s, got: %s" % (expected, result.commits), + ) + + def test_introduced_last_affected_merge(self): + """Ensures that a merge without a fix does + not affect the propagation of a vulnerability.""" + repo = TestRepository("test_introduced_last_affected_merge", debug=False) + + first = repo.add_empty_commit( + vulnerability=TestRepository.VulnerabilityType.INTRODUCED) + second = repo.add_empty_commit() + third = repo.add_empty_commit(parents=[first, second]) + fourth = repo.add_empty_commit( + parents=[third], + vulnerability=TestRepository.VulnerabilityType.LAST_AFFECTED, + ) + (all_introduced, all_fixed, all_last_affected, + all_limit) = repo.get_ranges() + + result = self.__repo_analyzer.get_affected(repo.repo, all_introduced, + all_fixed, all_limit, + all_last_affected) + + expected = set([first.hex, third.hex, fourth.hex]) + repo.remove() + self.assertEqual( + result.commits, + expected, + "Expected: %s, got: %s" % (expected, result.commits), + ) + + def test_introduced_last_affected_two_linear(self): + """Ensures that multiple introduced commit in + the same branch are correctly handled, wrt last_affected.""" + repo = TestRepository( + "test_introduced_last_affected_two_linear", debug=False) + + first = repo.add_empty_commit( + vulnerability=TestRepository.VulnerabilityType.INTRODUCED) + second = repo.add_empty_commit( + parents=[first], + vulnerability=TestRepository.VulnerabilityType.LAST_AFFECTED, + ) + third = repo.add_empty_commit( + parents=[second], + vulnerability=TestRepository.VulnerabilityType.INTRODUCED) + fourth = repo.add_empty_commit( + parents=[third], + vulnerability=TestRepository.VulnerabilityType.LAST_AFFECTED, + ) + + (all_introduced, all_fixed, all_last_affected, + all_limit) = repo.get_ranges() + + result = self.__repo_analyzer.get_affected(repo.repo, all_introduced, + all_fixed, all_limit, + all_last_affected) + + expected = set([first.hex, second.hex, third.hex, fourth.hex]) + repo.remove() + self.assertEqual( + result.commits, + expected, + "Expected: %s, got: %s" % (expected, result.commits), + ) + + ######## 3nd : tests with "introduced", "limit", and "fixed" + + def test_introduced_limit_fixed_linear_lf(self): + """Ensures the behaviors of limit and fixed commits are not conflicting.""" + repo = TestRepository("test_introduced_limit_fixed_linear_lf") + + first = repo.add_empty_commit( + vulnerability=TestRepository.VulnerabilityType.INTRODUCED) + second = repo.add_empty_commit( + parents=[first], vulnerability=TestRepository.VulnerabilityType.LIMIT) + repo.add_empty_commit( + parents=[second], vulnerability=TestRepository.VulnerabilityType.FIXED) + + (all_introduced, all_fixed, all_last_affected, + all_limit) = repo.get_ranges() + + result = self.__repo_analyzer.get_affected(repo.repo, all_introduced, + all_fixed, all_limit, + all_last_affected) + + expected = set([first.hex]) + repo.remove() + self.assertEqual( + result.commits, + expected, + "Expected: %s, got: %s" % (expected, result.commits), + ) + + def test_introduced_limit_fixed_linear_fl(self): + """Ensures the behaviors of limit and fixed commits are not conflicting""" + repo = TestRepository("test_introduced_limit_fixed_linear_lf") + + first = repo.add_empty_commit( + vulnerability=TestRepository.VulnerabilityType.INTRODUCED) + second = repo.add_empty_commit( + parents=[first], vulnerability=TestRepository.VulnerabilityType.FIXED) + repo.add_empty_commit( + parents=[second], vulnerability=TestRepository.VulnerabilityType.LIMIT) + + (all_introduced, all_fixed, all_last_affected, + all_limit) = repo.get_ranges() + result = self.__repo_analyzer.get_affected(repo.repo, all_introduced, + all_fixed, all_limit, + all_last_affected) + + expected = set([first.hex]) + repo.remove() + self.assertEqual( + result.commits, + expected, + "Expected: %s, got: %s" % (expected, result.commits), + ) + + def test_introduced_limit_branch_limit(self): + """Ensures the behaviors of limit and fixed + commits are not conflicting, in the case of a branch created.""" + repo = TestRepository("test_introduced_limit_fixed_linear_lf", debug=False) + + first = repo.add_empty_commit( + vulnerability=TestRepository.VulnerabilityType.INTRODUCED) + second = repo.add_empty_commit( + parents=[first], vulnerability=TestRepository.VulnerabilityType.LIMIT) + repo.add_empty_commit(parents=[first]) + repo.add_empty_commit( + parents=[second], vulnerability=TestRepository.VulnerabilityType.FIXED) + + (all_introduced, all_fixed, all_last_affected, + all_limit) = repo.get_ranges() + result = self.__repo_analyzer.get_affected(repo.repo, all_introduced, + all_fixed, all_limit, + all_last_affected) + + expected = set([first.hex]) + repo.remove() + self.assertEqual( + result.commits, + expected, + "Expected: %s, got: %s" % (expected, result.commits), + ) \ No newline at end of file diff --git a/osv/test_tools/test_repository.py b/osv/test_tools/test_repository.py new file mode 100644 index 00000000000..d12647ba41a --- /dev/null +++ b/osv/test_tools/test_repository.py @@ -0,0 +1,158 @@ +"""test_repository""" +import pygit2 +import json +from datetime import datetime +from enum import Enum +import os +import shutil +import uuid + + +class TestRepository: + """ Utilitary class to create a test repository for the git tests + """ + + class VulnerabilityType(Enum): + INTRODUCED = 1 + FIXED = 2 + LAST_AFFECTED = 3 + LIMIT = 4 + NONE = 5 + + _author = pygit2.Signature('John Smith', 'johnSmith@example.com') + _commiter = pygit2.Signature('John Smith', 'johnSmith@example.com') + + _initial_commit = None + + def __init__(self, name: str, debug: bool = False): + self.debug = debug + self.name = name + self.introduced = [] + self.fixed = [] + self.last_affected = [] + self.limit = [] + + if os.path.exists(f"osv/testdata/test_repositories/{name}"): + shutil.rmtree(f"osv/testdata/test_repositories/{name}") + self.repo = pygit2.init_repository( + f"osv/testdata/test_repositories/{name}", bare=False) + #empty initial commit usefull for the creation of the repository + tree = self.repo.TreeBuilder().write() + self._initial_commit = self.repo.create_commit('refs/heads/main', + self._author, self._commiter, + "message", tree, []) + self.create_branch(f"branch_{self._initial_commit.hex}", + self._initial_commit) + self.repo.references.create("refs/remotes/origin/main", + self._initial_commit) + + def create_branch(self, name: str, commit: pygit2.Oid): + self.repo.references.create(f'refs/heads/{name}', commit) + self.repo.references.create(f'refs/remotes/origin/{name}', commit) + + def add_empty_commit( + self, + parents: list[pygit2.Oid] = None, + vulnerability: VulnerabilityType = VulnerabilityType.NONE, + message: str = "Empty") -> pygit2.Oid: + """ + Adds a empty commit to the repository, tags it with the vulnerability + type and adds it to the vulnerability list if specified + """ + + tree = self.repo.TreeBuilder().write() + self._author = pygit2.Signature( + str(uuid.uuid1()), 'johnSmith@example.com' + ) #using a random uuid to avoid commits being the same + commit = None + + if not parents or len(parents) == 0: + self.repo.create_branch( + 'branch_temp', self.repo.revparse_single(self._initial_commit.hex)) + commit = self.repo.create_commit('refs/heads/branch_temp', self._author, + self._commiter, message, tree, + [self._initial_commit]) + + self.repo.branches.delete('branch_temp') + self.create_branch(f'branch_{commit.hex}', commit) + + else: + self.repo.create_branch('branch_temp', + self.repo.revparse_single(parents[0].hex)) + commit = self.repo.create_commit('refs/heads/branch_temp', self._author, + self._commiter, message, tree, parents) + self.repo.branches.delete('branch_temp') + self.create_branch(commit=commit, name=f'branch_{commit.hex}') + + self.repo.references.get('refs/remotes/{0}/{1}'.format( + "origin", "main")).set_target(commit) + self.repo.references.get('refs/heads/main').set_target(commit) + + if self.debug: + os.system("echo -------------------------------" + + "-----------------------------------") + os.system(f"git -C osv/testdata/test_repositories/{self.name}" + + " log --all --graph --decorate") + + #self.repo.branches.delete(created_branch.branch_name) + + match vulnerability: + case self.VulnerabilityType.INTRODUCED: + self.introduced.append(commit.hex) + case self.VulnerabilityType.FIXED: + self.fixed.append(commit.hex) + case self.VulnerabilityType.LAST_AFFECTED: + self.last_affected.append(commit.hex) + case self.VulnerabilityType.LIMIT: + self.limit.append(commit.hex) + case self.VulnerabilityType.NONE: + pass + case _: + raise ValueError("Invalid vulnerability type") + return commit + + def remove(self): + shutil.rmtree(f"osv/testdata/test_repositories/{self.name}/") + while os.path.exists( + f"osv/testdata/test_repositories/{self.name}/"): # check if it exists + pass + ##cleanup + self.introduced = [] + self.fixed = [] + self.last_affected = [] + self.limit = [] + + def get_ranges(self): + """ + return the ranges of the repository + """ + return (self.introduced, self.fixed, self.last_affected, self.limit) + + def print_commits(self): + """ prints the commits of the repository + """ + print(self.name) + commits = [] + for ref in self.repo.listall_reference_objects(): + print(ref.target) + for commit in self.repo.walk(ref.target, pygit2.GIT_SORT_TIME): + + current_commit = { + 'hash': + commit.hex, + 'message': + commit.message, + 'commit_date': + datetime.utcfromtimestamp(commit.commit_time + ).strftime('%Y-%m-%dT%H:%M:%SZ'), + 'author_name': + commit.author.name, + 'author_email': + commit.author.email, + 'parents': [c.hex for c in commit.parents], + } + if current_commit in commits: + break + commits.append(current_commit) + + print(json.dumps(commits, indent=2)) \ No newline at end of file diff --git a/osv/testdata/.gitignore b/osv/testdata/.gitignore index f7bc0fe8b7c..67746de30ce 100644 --- a/osv/testdata/.gitignore +++ b/osv/testdata/.gitignore @@ -1 +1,2 @@ -version_enum \ No newline at end of file +version_enum +test_repositories/** \ No newline at end of file diff --git a/run_tests.sh b/run_tests.sh index dd8af7aa902..dc8be557c4e 100755 --- a/run_tests.sh +++ b/run_tests.sh @@ -6,6 +6,7 @@ python3 -m pipenv run python -m unittest osv.bug_test python3 -m pipenv run python -m unittest osv.purl_helpers_test python3 -m pipenv run python -m unittest osv.request_helper_test python3 -m pipenv run python -m unittest osv.semver_index_test +python3 -m pipenv run python -m unittest osv.impact_git_test python3 -m pipenv run python -m unittest osv.impact_test # Run all osv.ecosystems tests