Skip to content

Commit

Permalink
fix(sast): fix sast reachability report format (#5686)
Browse files Browse the repository at this point in the history
* fix sast report format

* formated result

* fix

* refactor

* lint

* lint

* mypy

* mypy

* lint
  • Loading branch information
achiar99 authored Oct 26, 2023
1 parent 5fb0bd8 commit 8e8225d
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 8 deletions.
5 changes: 2 additions & 3 deletions checkov/common/bridgecrew/platform_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
REQUEST_RETRIES,
)
from checkov.common.util.type_forcers import convert_prisma_policy_filter_to_dict, convert_str_to_bool
from checkov.sast.consts import SastLanguages
from checkov.version import version as checkov_version

if TYPE_CHECKING:
Expand Down Expand Up @@ -508,11 +507,11 @@ def persist_assets_scan_results(self, assets_report: Optional[Dict[str, Any]]) -
new_report = {'imports': {lang.value: assets}}
persist_assets_results(f'sast_{lang.value}', new_report, self.s3_client, self.bucket, self.repo_path)

def persist_reachability_scan_results(self, reachability_report: Optional[Dict[SastLanguages, Any]]) -> None:
def persist_reachability_scan_results(self, reachability_report: Optional[Dict[str, Any]]) -> None:
if not reachability_report:
return
for lang, report in reachability_report.items():
persist_reachability_results(f'sast_{lang.value}', report, self.s3_client, self.bucket, self.repo_path)
persist_reachability_results(f'sast_{lang}', {lang: report}, self.s3_client, self.bucket, self.repo_path)

def persist_image_scan_results(self, report: dict[str, Any] | None, file_path: str, image_name: str, branch: str) -> None:
if not self.s3_client:
Expand Down
3 changes: 2 additions & 1 deletion checkov/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,8 @@ def save_sast_reachability_data(self, scan_reports: List[Report]) -> None:
if rep.sast_reachability:
result[rep.language] = {**result[rep.language], **serialize_reachability_report(rep.sast_reachability)}

self.sast_data.set_reachability_report(result)
formated_report = SastReport.get_formated_reachability_report(result)
self.sast_data.set_reachability_report(formated_report)

def print_results(
self,
Expand Down
2 changes: 1 addition & 1 deletion checkov/sast/prisma_models/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@


class Profiler(BaseModel):
duration: int # noqa: CCE003
duration: Union[str, int] # noqa: CCE003
memory: int # noqa: CCE003


Expand Down
27 changes: 24 additions & 3 deletions checkov/sast/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,35 @@ def get_summary(self) -> Dict[str, Union[int, str]]:

return base_summary

@staticmethod
def get_formated_reachability_report(reachability_report_dict: Dict[SastLanguages, Any]) -> Dict[str, Any]:
formated_report: Dict[str, Any] = {}
for lang, repos_data in reachability_report_dict.items():
formated_report[lang.value] = []
for repo_name, files_data in repos_data.items():
new_repo = {'Name': repo_name, 'Files': []}
for file_path, packages_data in files_data['files'].items():
new_file = {'Path': file_path, 'Packages': []}
for package_name, package_data in packages_data['packages'].items():
new_package = {'Name': package_name, 'Alias': package_data['alias'], 'Functions': []}
for func in package_data['functions']:
new_func = {'Name': func['name'], 'Alias': func['alias'], 'LineNumber': func['line_number'], 'CodeBlock': [func['code_block']]}
new_package['Functions'].append(new_func)
new_file['Packages'].append(new_package)
new_repo['Files'].append(new_file)
formated_report[lang.value].append(new_repo)
return formated_report


class SastData:
def __init__(self) -> None:
self.imports_data: Optional[Dict[str, Any]] = None
self.reachability_report: Optional[Dict[SastLanguages, Any]] = None
self.reachability_report: Optional[Dict[str, Any]] = None

def set_imports_data(self, imports_data: Dict[str, Any]) -> None:
self.imports_data = imports_data

def set_reachability_report(self, reachability_report: Dict[SastLanguages, Any]) -> None:
def set_reachability_report(self, reachability_report: Dict[str, Any]) -> None:
self.reachability_report = reachability_report

@staticmethod
Expand All @@ -53,5 +72,7 @@ def get_sast_import_report(scan_reports: List[SastReport]) -> Dict[str, Any]:
sast_imports_report[report.language] = {}
for report in scan_reports:
for file_name, all_data in report.sast_imports.items():
sast_imports_report[report.language][file_name] = {'all': all_data.get('all', [])}
current_imports = all_data.get('all', [])
if current_imports:
sast_imports_report[report.language][file_name] = {'all': current_imports}
return {"imports": sast_imports_report}

0 comments on commit 8e8225d

Please sign in to comment.