Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(sast): fix sast reachability report format #5686

Merged
merged 11 commits into from
Oct 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions checkov/common/bridgecrew/platform_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
REQUEST_RETRIES,
)
from checkov.common.util.type_forcers import convert_prisma_policy_filter_to_dict, convert_str_to_bool
from checkov.sast.consts import SastLanguages
from checkov.version import version as checkov_version

if TYPE_CHECKING:
Expand Down Expand Up @@ -508,11 +507,11 @@ def persist_assets_scan_results(self, assets_report: Optional[Dict[str, Any]]) -
new_report = {'imports': {lang.value: assets}}
persist_assets_results(f'sast_{lang.value}', new_report, self.s3_client, self.bucket, self.repo_path)

def persist_reachability_scan_results(self, reachability_report: Optional[Dict[SastLanguages, Any]]) -> None:
def persist_reachability_scan_results(self, reachability_report: Optional[Dict[str, Any]]) -> None:
if not reachability_report:
return
for lang, report in reachability_report.items():
persist_reachability_results(f'sast_{lang.value}', report, self.s3_client, self.bucket, self.repo_path)
persist_reachability_results(f'sast_{lang}', {lang: report}, self.s3_client, self.bucket, self.repo_path)

def persist_image_scan_results(self, report: dict[str, Any] | None, file_path: str, image_name: str, branch: str) -> None:
if not self.s3_client:
Expand Down
3 changes: 2 additions & 1 deletion checkov/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -755,7 +755,8 @@ def save_sast_reachability_data(self, scan_reports: List[Report]) -> None:
if rep.sast_reachability:
result[rep.language] = {**result[rep.language], **serialize_reachability_report(rep.sast_reachability)}

self.sast_data.set_reachability_report(result)
formated_report = SastReport.get_formated_reachability_report(result)
self.sast_data.set_reachability_report(formated_report)

def print_results(
self,
Expand Down
2 changes: 1 addition & 1 deletion checkov/sast/prisma_models/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@


class Profiler(BaseModel):
duration: int # noqa: CCE003
duration: Union[str, int] # noqa: CCE003
memory: int # noqa: CCE003


Expand Down
27 changes: 24 additions & 3 deletions checkov/sast/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,35 @@ def get_summary(self) -> Dict[str, Union[int, str]]:

return base_summary

@staticmethod
def get_formated_reachability_report(reachability_report_dict: Dict[SastLanguages, Any]) -> Dict[str, Any]:
formated_report: Dict[str, Any] = {}
for lang, repos_data in reachability_report_dict.items():
formated_report[lang.value] = []
for repo_name, files_data in repos_data.items():
new_repo = {'Name': repo_name, 'Files': []}
for file_path, packages_data in files_data['files'].items():
new_file = {'Path': file_path, 'Packages': []}
for package_name, package_data in packages_data['packages'].items():
new_package = {'Name': package_name, 'Alias': package_data['alias'], 'Functions': []}
for func in package_data['functions']:
new_func = {'Name': func['name'], 'Alias': func['alias'], 'LineNumber': func['line_number'], 'CodeBlock': [func['code_block']]}
new_package['Functions'].append(new_func)
new_file['Packages'].append(new_package)
new_repo['Files'].append(new_file)
formated_report[lang.value].append(new_repo)
return formated_report


class SastData:
def __init__(self) -> None:
self.imports_data: Optional[Dict[str, Any]] = None
self.reachability_report: Optional[Dict[SastLanguages, Any]] = None
self.reachability_report: Optional[Dict[str, Any]] = None

def set_imports_data(self, imports_data: Dict[str, Any]) -> None:
self.imports_data = imports_data

def set_reachability_report(self, reachability_report: Dict[SastLanguages, Any]) -> None:
def set_reachability_report(self, reachability_report: Dict[str, Any]) -> None:
self.reachability_report = reachability_report

@staticmethod
Expand All @@ -53,5 +72,7 @@ def get_sast_import_report(scan_reports: List[SastReport]) -> Dict[str, Any]:
sast_imports_report[report.language] = {}
for report in scan_reports:
for file_name, all_data in report.sast_imports.items():
sast_imports_report[report.language][file_name] = {'all': all_data.get('all', [])}
current_imports = all_data.get('all', [])
if current_imports:
sast_imports_report[report.language][file_name] = {'all': current_imports}
return {"imports": sast_imports_report}
Loading