Skip to content

Commit

Permalink
feat(serverless): Serverless graph integration (#6911)
Browse files Browse the repository at this point in the history
* add definitions functions for platform
  • Loading branch information
omriyoffe-panw authored Dec 18, 2024
1 parent c20a031 commit aa6e248
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 1 deletion.
2 changes: 1 addition & 1 deletion checkov/serverless/graph_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@


class ServerlessGraphManager(GraphManager[ServerlessLocalGraph, "dict[str, dict[str, Any]]"]):
def __init__(self, db_connector: LibraryGraphConnector, source: str = GraphSource.ARM) -> None:
def __init__(self, db_connector: LibraryGraphConnector, source: str = GraphSource.SERVERLESS) -> None:
super().__init__(db_connector=db_connector, parser=None, source=source)

def build_graph_from_source_directory(
Expand Down
3 changes: 3 additions & 0 deletions checkov/serverless/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,3 +355,6 @@ def cfn_resources_checks(self,

def extract_file_path_from_abs_path(self, path: Path) -> str:
return f"{os.path.sep}{os.path.relpath(path, self.root_folder)}"

def set_definitions_raw(self, definitions_raw: dict[str, list[tuple[int, str]]]) -> None:
self.definitions_raw = definitions_raw
19 changes: 19 additions & 0 deletions checkov/serverless/utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from __future__ import annotations

import os
from collections.abc import Collection
from enum import Enum
from typing import Callable, Any
from pathlib import Path

from checkov.common.parallelizer.parallel_runner import parallel_runner
from checkov.runner_filter import RunnerFilter
from checkov.serverless.parsers.parser import parse
from checkov.common.runners.base_runner import filter_ignored_paths

Expand All @@ -28,6 +31,22 @@ def __str__(self) -> str:
return self.value


def create_definitions(
root_folder: str,
files: Collection[Path] | None = None,
runner_filter: RunnerFilter | None = None,
) -> tuple[dict[str, dict[str, Any]], dict[str, list[tuple[int, str]]]]:
definitions: dict[str, dict[str, Any]] = {}
definitions_raw: dict[str, list[tuple[int, str]]] = {}
runner_filter = runner_filter or RunnerFilter()

if root_folder:
file_paths = get_scannable_file_paths(root_folder, runner_filter.excluded_paths)
definitions, definitions_raw = get_files_definitions(files=file_paths)

return definitions, definitions_raw


def get_scannable_file_paths(root_folder: str | None = None, excluded_paths: list[str] | None = None) -> list[str]:
files_list: list[str] = []

Expand Down

0 comments on commit aa6e248

Please sign in to comment.