Skip to content

Commit

Permalink
feat(general): Update range includes to handle range values (#6867)
Browse files Browse the repository at this point in the history
* Update to include ranges

* Fix mypy

* Fix tests

* Barak fixes
  • Loading branch information
tsmithv11 authored Nov 27, 2024
1 parent 1cc332f commit 7da9582
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ def __init__(
self, resource_types: List[str], attribute: Optional[str], value: Union[Any, List[Any]],
is_jsonpath_check: bool = False
) -> None:
# Convert value to a list if it's not already one to unify handling
value = [force_int(v) if isinstance(v, (str, int)) else v for v in
(value if isinstance(value, list) else [value])]
super().__init__(resource_types, attribute, value, is_jsonpath_check)

def _get_operation(self, vertex: Dict[str, Any], attribute: Optional[str]) -> bool:
Expand All @@ -22,10 +19,32 @@ def _get_operation(self, vertex: Dict[str, Any], attribute: Optional[str]) -> bo
if attr is None:
return False

processed_value = self._handle_range_values(self.value)

if isinstance(attr, list):
return any(self._check_value(value, attr_val) for attr_val in attr for value in self.value)
return any(self._check_value(value, attr_val) for attr_val in attr for value in processed_value)

return any(self._check_value(value, attr) for value in processed_value)

def _handle_range_values(self, value: Union[Any, List[Any]]) -> List[Any]:
# Convert value to a list if it's not already one to unify handling
value_list = value if isinstance(value, list) else [value]

# Process each item in the value list
processed_value: List[Any] = []
for v in value_list:
if isinstance(v, str) and '-' in v:
# Handle range strings
start_str, end_str = v.split('-')
start = force_int(start_str)
end = force_int(end_str)
if start is not None and end is not None:
processed_value.extend(range(start, end + 1))
else:
# Handle single values
processed_value.append(force_int(v) if isinstance(v, (str, int)) else v)

return any(self._check_value(value, attr) for value in self.value)
return processed_value

def _check_value(self, value: Any, attr: Any) -> bool:
# expects one of the following values:
Expand Down
80 changes: 40 additions & 40 deletions docs/3.Custom Policies/YAML Custom Policies.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
metadata:
name: "example"
category: "GENERAL_SECURITY"
id: "JsonPathRangeIncludesListWRange"
scope:
provider: "AWS"
definition:
cond_type: "attribute"
resource_types:
- "test"
attribute: "range"
operator: "jsonpath_range_includes"
value:
- 200
- 3000-4000
- "400-500"
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
metadata:
name: "example"
category: "GENERAL_SECURITY"
id: "RangeIncludesListWRange"
scope:
provider: "AWS"
definition:
cond_type: "attribute"
resource_types:
- "test"
attribute: "range"
operator: "range_includes"
value:
- 200
- 3000-4000
- "400-500"
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,26 @@ def test_range_includes_list_jsonpath_solver(self):
'test.fail8', 'test.fail9']
expected_results = {check_id: {"should_pass": should_pass, "should_fail": should_fail}}

self.run_test(root_folder=root_folder, expected_results=expected_results, check_id=check_id)

def test_range_includes_list_w_list_solver(self):
root_folder = 'resources'
check_id = "RangeIncludesListWRange"
should_pass = ['test.pass1', 'test.pass2', 'test.pass3', 'test.pass4', 'test.pass5', 'test.pass6', 'test.pass7',
'test.fail4']
should_fail = ['test.fail1', 'test.fail2', 'test.fail3', 'test.fail5', 'test.fail6', 'test.fail7', 'test.fail8',
'test.fail9']
expected_results = {check_id: {"should_pass": should_pass, "should_fail": should_fail}}

self.run_test(root_folder=root_folder, expected_results=expected_results, check_id=check_id)

def test_range_includes_list_w_list_jsonpath_solver(self):
root_folder = 'resources'
check_id = "JsonPathRangeIncludesListWRange"
should_pass = ['test.pass1', 'test.pass2', 'test.pass3', 'test.pass4', 'test.pass5', 'test.pass6', 'test.pass7',
'test.fail4']
should_fail = ['test.fail1', 'test.fail2', 'test.fail3', 'test.fail5', 'test.fail6', 'test.fail7', 'test.fail8',
'test.fail9']
expected_results = {check_id: {"should_pass": should_pass, "should_fail": should_fail}}

self.run_test(root_folder=root_folder, expected_results=expected_results, check_id=check_id)
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
metadata:
name: "example"
category: "GENERAL_SECURITY"
id: "JsonPathRangeNotIncludesListWRange"
scope:
provider: "AWS"
definition:
cond_type: "attribute"
resource_types:
- "test"
attribute: "range"
operator: "jsonpath_range_not_includes"
value:
- 200
- 3000-4000
- "400-500"
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
metadata:
name: "example"
category: "GENERAL_SECURITY"
id: "RangeNotIncludesListWRange"
scope:
provider: "AWS"
definition:
cond_type: "attribute"
resource_types:
- "test"
attribute: "range"
operator: "range_not_includes"
value:
- 200
- 3000-4000
- "400-500"
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,25 @@ def test_range_not_includes_list_jsonpath_solver(self):
expected_results = {check_id: {"should_pass": should_pass, "should_fail": should_fail}}

self.run_test(root_folder=root_folder, expected_results=expected_results, check_id=check_id)

def test_range_not_includes_list_w_list_solver(self):
root_folder = 'resources'
check_id = "RangeNotIncludesListWRange"
should_fail = ['test.pass1', 'test.pass2', 'test.pass3', 'test.pass4', 'test.pass5', 'test.pass6', 'test.pass7',
'test.fail4']
should_pass = ['test.fail1', 'test.fail2', 'test.fail3', 'test.fail5', 'test.fail6', 'test.fail7', 'test.fail8',
'test.fail9']
expected_results = {check_id: {"should_pass": should_pass, "should_fail": should_fail}}

self.run_test(root_folder=root_folder, expected_results=expected_results, check_id=check_id)

def test_range_not_includes_list_w_list_jsonpath_solver(self):
root_folder = 'resources'
check_id = "JsonPathRangeNotIncludesListWRange"
should_fail = ['test.pass1', 'test.pass2', 'test.pass3', 'test.pass4', 'test.pass5', 'test.pass6', 'test.pass7',
'test.fail4']
should_pass = ['test.fail1', 'test.fail2', 'test.fail3', 'test.fail5', 'test.fail6', 'test.fail7', 'test.fail8',
'test.fail9']
expected_results = {check_id: {"should_pass": should_pass, "should_fail": should_fail}}

self.run_test(root_folder=root_folder, expected_results=expected_results, check_id=check_id)

0 comments on commit 7da9582

Please sign in to comment.