Skip to content

Commit

Permalink
Avoid duplicate findings from multiple sarif files (#927)
Browse files Browse the repository at this point in the history
* Add example CodeQL SARIF file

* Prevent duplicate findings from multiple sarif files

* Account for frozen datatypes when updating finding metadata

* Avoid warnings when running test pipeline
  • Loading branch information
drdavella authored Nov 22, 2024
1 parent 39776ac commit 7e5a741
Show file tree
Hide file tree
Showing 11 changed files with 448 additions and 316 deletions.
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ readme = "README.md"
license = {file = "LICENSE"}
description = "A pluggable framework for building codemods in Python"
dependencies = [
"boltons~=21.0.0",
"GitPython<4",
"isort>=5.12,<5.14",
"libcst>=1.1,<1.6",
Expand Down Expand Up @@ -118,6 +119,7 @@ version_file = "src/codemodder/_version.py"
[tool.pytest.ini_options]
# Ignore integration tests and ci tests by default
testpaths = ["tests"]
asyncio_default_fixture_loop_scope = "function"

[tool.black]
extend-exclude = '''
Expand Down
32 changes: 32 additions & 0 deletions src/codemodder/codetf.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,16 @@ def validate_description(self):
raise ValueError("description must not be empty")
return self

def with_findings(self, findings: list[Finding] | None) -> Change:
return Change(
lineNumber=self.lineNumber,
description=self.description,
diffSide=self.diffSide,
properties=self.properties,
packageActions=self.packageActions,
findings=findings,
)


class AIMetadata(BaseModel):
provider: Optional[str] = None
Expand All @@ -99,6 +109,16 @@ class ChangeSet(BaseModel):
strategy: Optional[Strategy] = None
provisional: Optional[bool] = False

def with_changes(self, changes: list[Change]) -> ChangeSet:
return ChangeSet(
path=self.path,
diff=self.diff,
changes=changes,
ai=self.ai,
strategy=self.strategy,
provisional=self.provisional,
)


class Reference(BaseModel):
url: str
Expand All @@ -115,11 +135,17 @@ class Rule(BaseModel):
name: str
url: Optional[str] = None

class Config:
frozen = True


class Finding(BaseModel):
id: str
rule: Rule

class Config:
frozen = True

def to_unfixed_finding(
self,
*,
Expand All @@ -135,6 +161,12 @@ def to_unfixed_finding(
reason=reason,
)

def with_rule(self, name: str, url: Optional[str]) -> Finding:
return Finding(
id=self.id,
rule=Rule(id=self.rule.id, name=name, url=url),
)


class UnfixedFinding(Finding):
path: str
Expand Down
77 changes: 46 additions & 31 deletions src/codemodder/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
from abc import abstractmethod
from dataclasses import dataclass, field
from pathlib import Path
from typing import TYPE_CHECKING, Any, ClassVar, Type
from typing import TYPE_CHECKING, Any, ClassVar, Sequence, Type

import libcst as cst
from boltons.setutils import IndexedSet
from libcst._position import CodeRange
from typing_extensions import Self

Expand All @@ -18,39 +19,40 @@
from codemodder.context import CodemodExecutionContext


@dataclass
@dataclass(frozen=True)
class LineInfo:
line: int
column: int = -1
snippet: str | None = None


@dataclass
@dataclass(frozen=True)
class Location(ABCDataclass):
file: Path
start: LineInfo
end: LineInfo


@dataclass(frozen=True)
class SarifLocation(Location):
@classmethod
@abstractmethod
def from_sarif(cls, sarif_location) -> Self:
pass


@dataclass
@dataclass(frozen=True)
class LocationWithMessage:
location: Location
message: str


@dataclass(kw_only=True)
@dataclass(frozen=True, kw_only=True)
class Result(ABCDataclass):
rule_id: str
locations: list[Location]
codeflows: list[list[Location]] = field(default_factory=list)
related_locations: list[LocationWithMessage] = field(default_factory=list)
locations: Sequence[Location]
codeflows: Sequence[Sequence[Location]] = field(default_factory=tuple)
related_locations: Sequence[LocationWithMessage] = field(default_factory=tuple)
finding: Finding | None = None

def match_location(self, pos: CodeRange, node: cst.CSTNode) -> bool:
Expand All @@ -67,13 +69,16 @@ def match_location(self, pos: CodeRange, node: cst.CSTNode) -> bool:
for location in self.locations
)

def __hash__(self):
return hash(self.rule_id)

@dataclass(kw_only=True)

@dataclass(frozen=True, kw_only=True)
class SASTResult(Result):
finding_id: str


@dataclass(kw_only=True)
@dataclass(frozen=True, kw_only=True)
class SarifResult(SASTResult, ABCDataclass):
location_type: ClassVar[Type[SarifLocation]]

Expand All @@ -84,32 +89,40 @@ def from_sarif(
raise NotImplementedError

@classmethod
def extract_locations(cls, sarif_result) -> list[Location]:
return [
cls.location_type.from_sarif(location)
for location in sarif_result["locations"]
]
def extract_locations(cls, sarif_result) -> Sequence[Location]:
return tuple(
[
cls.location_type.from_sarif(location)
for location in sarif_result["locations"]
]
)

@classmethod
def extract_related_locations(cls, sarif_result) -> list[LocationWithMessage]:
return [
LocationWithMessage(
message=rel_location.get("message", {}).get("text", ""),
location=cls.location_type.from_sarif(rel_location),
)
for rel_location in sarif_result.get("relatedLocations", [])
]
def extract_related_locations(cls, sarif_result) -> Sequence[LocationWithMessage]:
return tuple(
[
LocationWithMessage(
message=rel_location.get("message", {}).get("text", ""),
location=cls.location_type.from_sarif(rel_location),
)
for rel_location in sarif_result.get("relatedLocations", [])
]
)

@classmethod
def extract_code_flows(cls, sarif_result) -> list[list[Location]]:
return [
def extract_code_flows(cls, sarif_result) -> Sequence[Sequence[Location]]:
return tuple(
[
cls.location_type.from_sarif(locations.get("location"))
for locations in threadflow.get("locations", {})
tuple(
[
cls.location_type.from_sarif(locations.get("location"))
for locations in threadflow.get("locations", {})
]
)
for codeflow in sarif_result.get("codeFlows", {})
for threadflow in codeflow.get("threadFlows", {})
]
for codeflow in sarif_result.get("codeFlows", {})
for threadflow in codeflow.get("threadFlows", {})
]
)

@classmethod
def extract_rule_id(cls, result, sarif_run, truncate_rule_id: bool = False) -> str:
Expand Down Expand Up @@ -199,5 +212,7 @@ def list_dict_or(
) -> dict[Any, list[Any]]:
result_dict = {}
for k in other.keys() | dictionary.keys():
result_dict[k] = dictionary.get(k, []) + other.get(k, [])
result_dict[k] = list(
IndexedSet(dictionary.get(k, [])) | (IndexedSet(other.get(k, [])))
)
return result_dict
2 changes: 1 addition & 1 deletion src/codemodder/utils/abc_dataclass.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from dataclasses import dataclass


@dataclass
@dataclass(frozen=True)
class ABCDataclass(ABC):
"""Inspired by https://stackoverflow.com/a/60669138"""

Expand Down
25 changes: 18 additions & 7 deletions src/codemodder/utils/update_finding_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
if typing.TYPE_CHECKING:
from codemodder.codemods.base_codemod import ToolRule

from codemodder.codetf import ChangeSet
from codemodder.codetf import Change, ChangeSet


def update_finding_metadata(
Expand All @@ -15,12 +15,23 @@ def update_finding_metadata(
if not (tool_rule_map := {rule.id: (rule.name, rule.url) for rule in tool_rules}):
return changesets

new_changesets: list[ChangeSet] = []
for changeset in changesets:
new_changes: list[Change] = []
for change in changeset.changes:
for finding in change.findings or []:
if finding.id in tool_rule_map:
finding.rule.name = tool_rule_map[finding.id][0]
finding.rule.url = tool_rule_map[finding.id][1]
new_changes.append(
change.with_findings(
[
(
finding.with_rule(*tool_rule_map[finding.rule.id])
if finding.rule.id in tool_rule_map
else finding
)
for finding in change.findings or []
]
or None
)
)
new_changesets.append(changeset.with_changes(new_changes))

# TODO: eventually make this functional and return a new list
return changesets
return new_changesets
2 changes: 1 addition & 1 deletion src/core_codemods/defectdojo/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def from_result(cls, result: dict) -> Self:
return cls(
finding_id=result["id"],
rule_id=result["title"],
locations=[DefectDojoLocation.from_result(result)],
locations=tuple([DefectDojoLocation.from_result(result)]),
finding=Finding(
id=str(result["id"]),
rule=Rule(
Expand Down
17 changes: 11 additions & 6 deletions src/core_codemods/sonar/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from dataclasses import replace
from functools import cache
from pathlib import Path
from typing import Sequence

import libcst as cst
from typing_extensions import Self
Expand Down Expand Up @@ -40,18 +41,22 @@ def from_result(cls, result: dict) -> Self:
if not (rule_id := result.get("rule", None) or result.get("ruleKey", None)):
raise ValueError("Could not extract rule id from sarif result.")

locations: list[Location] = (
locations: Sequence[Location] = tuple(
[SonarLocation.from_json_location(result)]
if result.get("textRange")
else []
)
all_flows: list[list[Location]] = [
all_flows: Sequence[Sequence[Location]] = tuple(
[
SonarLocation.from_json_location(json_location)
for json_location in flow.get("locations", {})
tuple(
[
SonarLocation.from_json_location(json_location)
for json_location in flow.get("locations", {})
]
)
for flow in result.get("flows", [])
]
for flow in result.get("flows", [])
]
)

finding_id = result.get("key", rule_id)

Expand Down
1 change: 1 addition & 0 deletions tests/samples/codeql/python/vulnerable-code-snippets.json

Large diffs are not rendered by default.

Loading

0 comments on commit 7e5a741

Please sign in to comment.