-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(terraform): add CKV_AWS_364 to ensure that AWS Lambda Permission…
…s are not global when granting permission to an AWS Service. (#4375) * Added lambda permission check against services defined in principal * Added LambaServicePermission tests * fixed imports and style-guide compatibility * Updated to fix tests and add ID * Added terraform test to get a CKV id for my Cloudformation test * Fixed styling errors * Flipped CFN logic to match terraform logic * Updated to add newline at end of file to pass flake8 check * Updated to harden service principal checks * Added additional tests examples to demonstrate alternative service principals * Fixed the syntax to pass * bump CKV check version to 287 * Updated CKV to 293 * Fixed className and added missing comma from test set * Update checkov/cloudformation/checks/resource/aws/LambdaServicePermission.py Co-authored-by: Anton Grübel <[email protected]> * Update checkov/terraform/checks/resource/aws/LambdaServicePermission.py Keeping descriptions identical Co-authored-by: Anton Grübel <[email protected]> * Update checkov/terraform/checks/resource/aws/LambdaServicePermission.py Co-authored-by: Anton Grübel <[email protected]> * Update checkov/terraform/checks/resource/aws/LambdaServicePermission.py Co-authored-by: Anton Grübel <[email protected]> * Update checkov/terraform/checks/resource/aws/LambdaServicePermission.py Co-authored-by: Anton Grübel <[email protected]> * Update checkov/cloudformation/checks/resource/aws/LambdaServicePermission.py Co-authored-by: Anton Grübel <[email protected]> * Update checkov/cloudformation/checks/resource/aws/LambdaServicePermission.py Co-authored-by: Anton Grübel <[email protected]> * Update checkov/terraform/checks/resource/aws/LambdaServicePermission.py Co-authored-by: Anton Grübel <[email protected]> * Updated rule id * Applying self.get_evaluated_keys to terraform * Updated tests to align terraform and cloudformation. Moved two examples to unknown, rather than pass/fail * Updated cfn to ensure passed cfn-lint PR checks * Updated terraform checkid * Fixed potentially calling split on non-string * adjust logic to fix test --------- Co-authored-by: Anton Grübel <[email protected]>
- Loading branch information
1 parent
c469868
commit b0bef88
Showing
7 changed files
with
254 additions
and
0 deletions.
There are no files selected for viewing
39 changes: 39 additions & 0 deletions
39
checkov/cloudformation/checks/resource/aws/LambdaServicePermission.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
from __future__ import annotations | ||
|
||
from typing import List, Any | ||
|
||
from checkov.cloudformation.checks.resource.base_resource_check import BaseResourceCheck | ||
from checkov.common.models.enums import CheckCategories, CheckResult | ||
|
||
|
||
class LambdaServicePermission(BaseResourceCheck): | ||
def __init__(self) -> None: | ||
name = "Ensure that AWS Lambda function permissions delegated to AWS services are limited by SourceArn or SourceAccount" | ||
id = "CKV_AWS_364" | ||
supported_resources = ("AWS::Lambda::Permission",) | ||
categories = (CheckCategories.IAM,) | ||
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources) | ||
|
||
def scan_resource_conf(self, conf: dict[str, Any]) -> CheckResult: | ||
properties = conf.get('Properties') | ||
if properties and isinstance(properties, dict): | ||
principal = properties.get('Principal') | ||
if principal and isinstance(principal, str): | ||
principal_parts = principal.split('.') | ||
try: | ||
if principal_parts[1] == 'amazonaws' and principal_parts[2] == 'com': | ||
if properties.get('SourceArn') or properties.get('SourceAccount'): | ||
return CheckResult.PASSED | ||
else: | ||
return CheckResult.FAILED | ||
except IndexError: | ||
print("Not a service principal") | ||
# Not a service principal, so pass. | ||
return CheckResult.UNKNOWN | ||
return CheckResult.UNKNOWN | ||
|
||
def get_evaluated_keys(self) -> List[str]: | ||
return ['Properties/Principal', 'Properties/SourceArn', 'Properties/SourceAccount'] | ||
|
||
|
||
check = LambdaServicePermission() |
38 changes: 38 additions & 0 deletions
38
checkov/terraform/checks/resource/aws/LambdaServicePermission.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
from __future__ import annotations | ||
|
||
from typing import List, Any | ||
|
||
from checkov.common.models.enums import CheckResult, CheckCategories | ||
from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck | ||
|
||
|
||
class LambdaServicePermission(BaseResourceCheck): | ||
def __init__(self) -> None: | ||
description = "Ensure that AWS Lambda function permissions delegated to AWS services are limited by SourceArn or SourceAccount" | ||
id = "CKV_AWS_364" | ||
supported_resources = ('aws_lambda_permission',) | ||
categories = (CheckCategories.IAM,) | ||
super().__init__(name=description, id=id, categories=categories, supported_resources=supported_resources) | ||
|
||
def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult: | ||
# Replace this with the custom logic for your check | ||
principal = conf.get("principal") | ||
if principal and isinstance(principal, list) and isinstance(principal[0], str): | ||
principal_parts = principal[0].split('.') | ||
try: | ||
if principal_parts[1] == 'amazonaws' and principal_parts[2] == 'com': # This confirms that the principal is set as a service principal. | ||
if 'source_arn' in conf or 'source_account' in conf: # If either of these are set, we're good and the check should pass. | ||
self.evaluated_keys = self.get_evaluated_keys() | ||
return CheckResult.PASSED | ||
else: | ||
self.evaluated_keys = self.get_evaluated_keys() | ||
return CheckResult.FAILED | ||
except IndexError: | ||
return CheckResult.UNKNOWN | ||
return CheckResult.UNKNOWN | ||
|
||
def get_evaluated_keys(self) -> List[str]: | ||
return ["principal", "source_arn", "source_account"] | ||
|
||
|
||
check = LambdaServicePermission() |
9 changes: 9 additions & 0 deletions
9
...tion/checks/resource/aws/example_LambdaServicePermission/LambdaServicePermission_Fail.yml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
AWSTemplateFormatVersion: "2010-09-09" | ||
Resources: | ||
FunctionFailPermission: | ||
Type: AWS::Lambda::Permission | ||
Properties: | ||
Action: lambda:InvokeFunction | ||
FunctionName: TestFunction | ||
Principal: apigateway.amazonaws.com | ||
|
42 changes: 42 additions & 0 deletions
42
...tion/checks/resource/aws/example_LambdaServicePermission/LambdaServicePermission_Pass.yml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
AWSTemplateFormatVersion: "2010-09-09" | ||
Resources: | ||
FunctionPassingArnPermission: | ||
Type: AWS::Lambda::Permission | ||
Properties: | ||
Action: lambda:InvokeFunction | ||
FunctionName: TestFunction | ||
Principal: apigateway.amazonaws.com | ||
SourceArn: arn:aws:apigateway:us-east-1::/apis/qwerty | ||
|
||
FunctionPassingAccountPermission: | ||
Type: AWS::Lambda::Permission | ||
Properties: | ||
Action: lambda:InvokeFunction | ||
FunctionName: TestFunction | ||
Principal: apigateway.amazonaws.com | ||
SourceAccount: 901234567812 | ||
|
||
FunctionStringPrincipallPermission: | ||
Type: AWS::Lambda::Permission | ||
Properties: | ||
Action: lambda:InvokeFunction | ||
FunctionName: TestFunction | ||
Principal: 901234567812 | ||
|
||
ExampleS3ServicePermission: | ||
Type: AWS::Lambda::Permission | ||
Properties: | ||
FunctionName: TestFunction | ||
Action: lambda:InvokeFunction | ||
Principal: s3.amazonaws.com | ||
SourceAccount: 901234567812 | ||
SourceArn: arn:aws:apigateway:us-east-1::/apis/qwerty | ||
|
||
ExampleEventsServicePermission: | ||
Type: AWS::Lambda::Permission | ||
Properties: | ||
FunctionName: TestFunction | ||
Action: lambda:InvokeFunction | ||
Principal: events.amazonaws.com | ||
SourceAccount: 901234567812 | ||
SourceArn: arn:aws:apigateway:us-east-1::/apis/qwerty |
43 changes: 43 additions & 0 deletions
43
tests/cloudformation/checks/resource/aws/test_LambdaServicePermission.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
import os | ||
import unittest | ||
|
||
from checkov.cloudformation.checks.resource.aws.LambdaServicePermission import check | ||
from checkov.cloudformation.runner import Runner | ||
from checkov.runner_filter import RunnerFilter | ||
|
||
|
||
class TestLambdaServicePermission(unittest.TestCase): | ||
def test_summary(self): | ||
runner = Runner() | ||
current_dir = os.path.dirname(os.path.realpath(__file__)) | ||
|
||
test_files_dir = current_dir + "/example_LambdaServicePermission" | ||
report = runner.run(root_folder=test_files_dir, runner_filter=RunnerFilter(checks=[check.id])) | ||
summary = report.get_summary() | ||
|
||
passing_resources = { | ||
"AWS::Lambda::Permission.FunctionPassingArnPermission", | ||
"AWS::Lambda::Permission.FunctionPassingAccountPermission", | ||
"AWS::Lambda::Permission.ExampleS3ServicePermission", | ||
"AWS::Lambda::Permission.ExampleEventsServicePermission" | ||
} | ||
failing_resources = { | ||
"AWS::Lambda::Permission.FunctionFailPermission", | ||
} | ||
unknown_resources = { | ||
"AWS::Lambda::Permission.FunctionStringPrincipallPermission", | ||
} | ||
|
||
passed_check_resources = {c.resource for c in report.passed_checks} | ||
failed_check_resources = {c.resource for c in report.failed_checks} | ||
|
||
self.assertEqual(summary['passed'], 4) | ||
self.assertEqual(summary['failed'], 1) | ||
self.assertEqual(summary['skipped'], 0) | ||
self.assertEqual(summary['parsing_errors'], 0) | ||
self.assertEqual(passing_resources, passed_check_resources) | ||
self.assertEqual(failing_resources, failed_check_resources) | ||
|
||
|
||
if __name__ == '__main__': | ||
unittest.main() |
37 changes: 37 additions & 0 deletions
37
.../terraform/checks/resource/aws/example_LambdaServicePermission/LambdaServicePermission.tf
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
## SHOULD PASS: This permission specifies a source_arn, therefore is not globally available. | ||
resource "aws_lambda_permission" "ckv_unittest_pass_source_arn" { | ||
statement_id = "AllowMyDemoAPIInvoke" | ||
action = "lambda:InvokeFunction" | ||
function_name = "MyDemoFunction" | ||
principal = "apigateway.amazonaws.com" | ||
|
||
# The /*/*/* part allows invocation from any stage, method and resource path | ||
# within API Gateway REST API. | ||
source_arn = "${aws_api_gateway_rest_api.MyDemoAPI.execution_arn}/*/*/*" | ||
} | ||
|
||
## SHOULD PASS: This permission specifies a source_account, therefore is not globally available. | ||
resource "aws_lambda_permission" "ckv_unittest_pass_source_account" { | ||
statement_id = "AllowMyDemoAPIInvoke" | ||
action = "lambda:InvokeFunction" | ||
function_name = "MyDemoFunction" | ||
principal = "apigateway.amazonaws.com" | ||
|
||
source_account = "901234678" | ||
} | ||
|
||
## SHOULD UNKNOWN: This permission specifies a principal as an account ID. | ||
resource "aws_lambda_permission" "ckv_unittest_unknown_principal" { | ||
statement_id = "AllowMyDemoAPIInvoke" | ||
action = "lambda:InvokeFunction" | ||
function_name = "MyDemoFunction" | ||
principal = "901234678" | ||
} | ||
|
||
## SHOULD FAIL: This allows any serviceprincpal across all accounts to access | ||
resource "aws_lambda_permission" "ckv_unittest_fail" { | ||
statement_id = "AllowMyDemoAPIInvoke" | ||
action = "lambda:InvokeFunction" | ||
function_name = "MyDemoFunction" | ||
principal = "apigateway.amazonaws.com" | ||
} |
46 changes: 46 additions & 0 deletions
46
tests/terraform/checks/resource/aws/test_LambdaServicePermission.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
import os | ||
import unittest | ||
|
||
from checkov.runner_filter import RunnerFilter | ||
from checkov.terraform.runner import Runner | ||
from checkov.terraform.checks.resource.aws.LambdaServicePermission import check | ||
|
||
|
||
class TestLambdaServicePermission(unittest.TestCase): | ||
|
||
def test(self): | ||
runner = Runner() | ||
current_dir = os.path.dirname(os.path.realpath(__file__)) | ||
|
||
test_files_dir = os.path.join(current_dir, "example_LambdaServicePermission") | ||
report = runner.run(root_folder=test_files_dir, | ||
runner_filter=RunnerFilter(checks=[check.id])) | ||
summary = report.get_summary() | ||
|
||
passing_resources = { | ||
'aws_lambda_permission.ckv_unittest_pass_source_arn', | ||
'aws_lambda_permission.ckv_unittest_pass_source_account' | ||
} | ||
failing_resources = { | ||
'aws_lambda_permission.ckv_unittest_fail', | ||
} | ||
unknown_resources = { | ||
'aws_lambda_permission.ckv_unittest_pass_principal', | ||
} | ||
|
||
skipped_resources = {} | ||
|
||
passed_check_resources = set([c.resource for c in report.passed_checks]) | ||
failed_check_resources = set([c.resource for c in report.failed_checks]) | ||
|
||
self.assertEqual(summary['passed'], len(passing_resources)) | ||
self.assertEqual(summary['failed'], len(failing_resources)) | ||
self.assertEqual(summary['skipped'], len(skipped_resources)) | ||
self.assertEqual(summary['parsing_errors'], 0) | ||
|
||
self.assertEqual(passing_resources, passed_check_resources) | ||
self.assertEqual(failing_resources, failed_check_resources) | ||
|
||
|
||
if __name__ == '__main__': | ||
unittest.main() |