Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sca): Extending reachability post-runner in checkov and enriching cves with ReachableFunction data #5707

Merged
merged 8 commits into from
Nov 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,13 @@ def merge_sca_and_sast_reports(self, merged_reports: list[Report]) -> None:
not bool(convert_str_to_bool(os.getenv('CKV_ENABLE_SCA_INTEGRATE_SAST', False))):
return

# Extract SAST imports report
# Extract SAST imports report and reachability report
sast_reports = [scan_report for scan_report in merged_reports if isinstance(scan_report, SastReport)]
if not len(sast_reports):
return

sast_imports_report = SastData.get_sast_import_report(sast_reports)
sast_reacability_report = SastData.get_sast_reachability_report(sast_reports)

# Extract SCA packages report
sca_packages_report = [scan_report for scan_report in merged_reports if
Expand All @@ -80,18 +81,24 @@ def merge_sca_and_sast_reports(self, merged_reports: list[Report]) -> None:
lang = self.get_sast_lang_by_file_path(sca_file_path)

# Extract Sast data from Sast report filtered by the language
entries = sast_imports_report.get('imports', {}).get(lang, {}).items()
filtered_entries = [(code_file_path, sast_data) for code_file_path, sast_data in entries if
self.is_deeper_or_equal_level(sca_file_path, code_file_path)]
imports_entries = sast_imports_report.get('imports', {}).get(lang, {}).items()
filtered_imports_entries = [(code_file_path, sast_data) for code_file_path, sast_data in imports_entries if
self.is_deeper_or_equal_level(sca_file_path, code_file_path)]

if not len(filtered_entries):
reachability_entries = sast_reacability_report.get('reachability', {}).get(lang, {}).items()
filtered_reachability_entries = [(code_file_path, sast_data) for code_file_path, sast_data in
reachability_entries if self.is_deeper_or_equal_level(sca_file_path,
code_file_path)]

if not len(filtered_imports_entries) and not len(filtered_reachability_entries):
continue

# Create map with the relevant structure for the enrichment step
sast_files_by_packages_map = self.create_file_by_package_map(filtered_entries)
# Create maps with the relevant structure for the enrichment step
sast_files_by_packages_map = self.create_file_by_package_map(filtered_imports_entries)
sast_reachable_data_by_packages_map = self.create_reachable_data_by_package_map(filtered_reachability_entries)

# Enrich the CVEs
self.enrich_cves_with_sast_data(current_cves, sast_files_by_packages_map)
self.enrich_cves_with_sast_data(current_cves, sast_files_by_packages_map, sast_reachable_data_by_packages_map)

'''
Each SCA report check has file_path, we want to getter same file_path so we won't have to calculate SAST language more then once
Expand Down Expand Up @@ -132,19 +139,42 @@ def create_file_by_package_map(self, filtered_entries: List[Tuple[Any, Any]]) ->

return sast_files_by_packages_map

def create_reachable_data_by_package_map(self, filtered_reachability_entries: List[Tuple[Any, Any]]) -> Dict[str, Dict[str, List[str]]]:
reachable_data_by_packages_map: Dict[str, Dict[str, List[str]]] = defaultdict(dict)
for code_file_path, file_data in filtered_reachability_entries:
packages = file_data.packages
for package_name, package_data in packages.items():
reachable_data_by_packages_map[package_name][code_file_path] = package_data.functions
return reachable_data_by_packages_map

#######################################################################################################################
'''
enrich each CVE with the risk factor of IsUsed - which means there is a file the use the package of that CVE
'''

def enrich_cves_with_sast_data(self, current_cves: List[Record], sast_files_by_packages_map: Dict[str, List[str]]) -> None:
def _is_package_used_for_cve(self, cve_vulnerability_details: Dict[str, Any], sast_files_by_packages_map: Dict[str, List[str]]) -> bool:
package_name = cve_vulnerability_details.get('package_name', '')
normalize_package_name = self.normalize_package_name(package_name)
return package_name in sast_files_by_packages_map or normalize_package_name in sast_files_by_packages_map

def _is_reachable_function_for_cve(self, cve_vulnerability_details: Dict[str, Any], sast_reachable_data_by_packages_map: Dict[str, Dict[str, List[str]]]) -> bool:
package_name = cve_vulnerability_details.get('package_name', '')
return package_name in sast_reachable_data_by_packages_map

def enrich_cves_with_sast_data(
self,
current_cves: List[Record],
sast_files_by_packages_map: Dict[str, List[str]],
sast_reachable_data_by_packages_map: Dict[str, Dict[str, List[str]]]
) -> None:
for cve_check in current_cves:
if cve_check.vulnerability_details:
package_name = cve_check.vulnerability_details.get('package_name', '')
normalize_package_name = self.normalize_package_name(package_name)
cve_check.vulnerability_details.get('risk_factors', {})['IsUsed'] = False
is_package_used = self._is_package_used_for_cve(cve_check.vulnerability_details, sast_files_by_packages_map)
cve_check.vulnerability_details.get('risk_factors', {})['IsUsed'] = is_package_used

if package_name in sast_files_by_packages_map or normalize_package_name in sast_files_by_packages_map:
cve_check.vulnerability_details.get('risk_factors', {})['IsUsed'] = True
is_reachable_function = self._is_reachable_function_for_cve(cve_check.vulnerability_details, sast_reachable_data_by_packages_map)
cve_check.vulnerability_details.get('risk_factors', {})['ReachableFunction'] = is_reachable_function
#######################################################################################################################

'''
we want to consider sast info only on files that are on the same level of the SCA file or deeper.
Expand Down
23 changes: 23 additions & 0 deletions checkov/sast/report.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
from typing import Any, Dict, Union, List, Optional

from checkov.common.output.report import Report
Expand Down Expand Up @@ -76,3 +77,25 @@ def get_sast_import_report(scan_reports: List[SastReport]) -> Dict[str, Any]:
if current_imports:
sast_imports_report[report.language][file_name] = {'all': current_imports}
return {"imports": sast_imports_report}

@staticmethod
def get_sast_reachability_report(scan_reports: List[SastReport]) -> Dict[str, Any]:
first_found_repo_name = None
sast_reachability_report: Dict[SastLanguages, Any] = {}
for report in scan_reports:
sast_reachability_report[report.language] = {}
for report in scan_reports:
for repo_name, repo_data in report.sast_reachability.items():

# validating we are dealing only with one repo, as it happens for imports report
if first_found_repo_name:
if repo_name != first_found_repo_name:
logging.error(f'[get_sast_reachability_report] - found more than one repository in '
f'the scan reports. {scan_reports}')
return {"reachability": {}}
else:
first_found_repo_name = repo_name

for file_name, file_data in repo_data.files.items():
sast_reachability_report[report.language][file_name] = file_data
return {"reachability": sast_reachability_report}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from checkov.common.output.report import Report
from checkov.sast.consts import SastLanguages
from checkov.sast.report import SastReport
from checkov.sast.prisma_models.report import Package, File, Function


class TestVulnerabilitiesIntegration(unittest.TestCase):
Expand Down Expand Up @@ -193,6 +194,25 @@ def test_normalized_package_name_case_simple(self):
result = vul_integration.normalize_package_name(original)
self.assertTrue(result, expected)

def test_create_reachable_data_by_package_map(self):
filtered_reachability_entries = [('/index.js', File(packages={'axios': Package(alias='ax', functions=[Function(name='trim', alias='hopa', line_number=4, code_block='hopa()')]), 'lodash': Package(alias='', functions=[Function(name='template', alias='', line_number=1, code_block='template()'), Function(name='toNumber', alias='', line_number=4, code_block='hopa()')])}))]
instance = BcPlatformIntegration()
vul_integration = VulnerabilitiesIntegration(instance)
reachable_data_by_package_map = vul_integration.create_reachable_data_by_package_map(filtered_reachability_entries)
assert reachable_data_by_package_map == {
'axios': {
'/index.js': [
Function(name='trim', alias='hopa', line_number=4, code_block='hopa()')
]
},
'lodash': {
'/index.js': [
Function(name='template', alias='', line_number=1, code_block='template()'),
Function(name='toNumber', alias='', line_number=4, code_block='hopa()')
]
}
}


if __name__ == '__main__':
unittest.main()
54 changes: 54 additions & 0 deletions tests/sast/test_report.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from __future__ import annotations

from checkov.sast.report import SastData, SastReport
from checkov.sast.consts import SastLanguages
from checkov.sast.prisma_models.report import Function, Repositories, File, Package


def _create_sast_reports_for_test_get_sast_reachability_report_with_one_report() -> list[SastReport]:
# we don't care about the init's params, except for the sast-language
report1 = SastReport('', {}, SastLanguages.JAVASCRIPT)
report1.sast_reachability = {
'repo_1': Repositories(files={
'/index.js': File(packages={
'axios': Package(alias='ax', functions=[
Function(name='trim', alias='hopa', line_number=4, code_block='hopa()')
]),
'lodash': Package(alias='', functions=[
Function(name='template', alias='', line_number=1, code_block='template()'),
Function(name='toNumber', alias='', line_number=4, code_block='hopa()')
])
}),
'/main.js': File(packages={
'axios': Package(alias='ax', functions=[
Function(name='trim', alias='hi', line_number=4, code_block='hi()')
])
})
})
}
return [report1]


def test_get_sast_reachability_report_with_one_report():
scan_reports: list[SastReport] = _create_sast_reports_for_test_get_sast_reachability_report_with_one_report()
sast_reachability_report = SastData.get_sast_reachability_report(scan_reports)
assert sast_reachability_report == {
'reachability': {
SastLanguages.JAVASCRIPT: {
'/index.js': File(packages={
'axios': Package(alias='ax', functions=[
Function(name='trim', alias='hopa', line_number=4, code_block='hopa()')
]),
'lodash': Package(alias='', functions=[
Function(name='template', alias='', line_number=1, code_block='template()'),
Function(name='toNumber', alias='', line_number=4, code_block='hopa()')
])
}),
'/main.js': File(packages={
'axios': Package(alias='ax', functions=[
Function(name='trim', alias='hi', line_number=4, code_block='hi()')
])
})
}
}
}
Loading