Skip to content

Commit

Permalink
Merge pull request #36 from dynatrace-oss/feature/APM-314438-restore-…
Browse files Browse the repository at this point in the history
…sfm-lambda-connectivity-private-vpc

APM-314438: Redesign VPC to allow SFM push from Lambda; restore SFM
  • Loading branch information
mmajcher authored Aug 3, 2021
2 parents 54d01c4 + 4550684 commit 8f1624f
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 55 deletions.
54 changes: 49 additions & 5 deletions dynatrace-aws-log-forwarder-template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ Resources:
VpcConfig: !If
- DeployAGwithVPC
- SecurityGroupIds: [ !Ref VPCSecurityGroup ]
SubnetIds: [ !Ref VPCPublicSubnet1 ]
SubnetIds: [ !Ref VPCPrivateSubnet ]
- !Ref "AWS::NoValue"

LambdaRole:
Expand Down Expand Up @@ -623,7 +623,7 @@ Resources:
touch /home/ec2-user/userdata-ag-installation-success
- DynatraceEnvironmentUrl: !Ref DynatraceEnvironmentUrl
DynatracePaasToken: !Ref DynatracePaasToken
SubnetId: !Ref VPCPublicSubnet1
SubnetId: !Ref VPCPrivateSubnet

VPC:
Condition: DeployAGwithVPC
Expand All @@ -633,7 +633,7 @@ Resources:
EnableDnsSupport: true
EnableDnsHostnames: true

VPCPublicSubnet1:
VPCPublicSubnet:
Condition: DeployAGwithVPC
Type: 'AWS::EC2::Subnet'
Properties:
Expand All @@ -642,6 +642,15 @@ Resources:
CidrBlock: 172.31.1.0/27
MapPublicIpOnLaunch: true

VPCPrivateSubnet:
Condition: DeployAGwithVPC
Type: 'AWS::EC2::Subnet'
Properties:
VpcId: !Ref VPC
AvailabilityZone: !Select [ 0, !GetAZs ]
CidrBlock: 172.31.2.0/27
MapPublicIpOnLaunch: false

VPCInternetGateway:
Condition: DeployAGwithVPC
Type: AWS::EC2::InternetGateway
Expand All @@ -654,6 +663,20 @@ Resources:
VpcId: !Ref VPC
InternetGatewayId: !Ref VPCInternetGateway

VPCNatGatewayElasticIP:
Condition: DeployAGwithVPC
Type: AWS::EC2::EIP
Properties:
Domain: VPC

VPCNatGateway:
Condition: DeployAGwithVPC
Type: "AWS::EC2::NatGateway"
DependsOn: VPCNatGatewayElasticIP
Properties:
AllocationId: !GetAtt VPCNatGatewayElasticIP.AllocationId
SubnetId: !Ref VPCPublicSubnet

VPCPublicRouteTable:
Condition: DeployAGwithVPC
Type: AWS::EC2::RouteTable
Expand All @@ -669,13 +692,34 @@ Resources:
DestinationCidrBlock: 0.0.0.0/0
GatewayId: !Ref VPCInternetGateway

VPCPublicSubnet1RouteTableAssoc:
VPCPublicSubnetRouteTableAssoc:
Condition: DeployAGwithVPC
Type: AWS::EC2::SubnetRouteTableAssociation
Properties:
SubnetId: !Ref VPCPublicSubnet1
SubnetId: !Ref VPCPublicSubnet
RouteTableId: !Ref VPCPublicRouteTable

VPCPrivateRouteTable:
Condition: DeployAGwithVPC
Type: AWS::EC2::RouteTable
Properties:
VpcId: !Ref VPC

VPCPrivateRouteToNat:
Condition: DeployAGwithVPC
Type: AWS::EC2::Route
Properties:
RouteTableId: !Ref VPCPrivateRouteTable
DestinationCidrBlock: 0.0.0.0/0
NatGatewayId: !Ref VPCNatGateway

VPCPrivateSubnetRouteTableAssoc:
Condition: DeployAGwithVPC
Type: AWS::EC2::SubnetRouteTableAssociation
Properties:
SubnetId: !Ref VPCPrivateSubnet
RouteTableId: !Ref VPCPrivateRouteTable

VPCSecurityGroup:
Condition: DeployAGwithVPC
Type: AWS::EC2::SecurityGroup
Expand Down
2 changes: 1 addition & 1 deletion dynatrace-aws-logs.sh
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ EOF

set -e

echo "Deploying stack $STACK_NAME"
echo "Deploying stack $STACK_NAME. This might take up to 10 minutes."

aws cloudformation deploy --stack "$STACK_NAME" --template-file "$TEMPLATE_FILE" --capabilities CAPABILITY_IAM \
--parameter-overrides DynatraceEnvironmentUrl="$TARGET_URL" DynatraceApiKey="$TARGET_API_TOKEN" VerifySSLTargetActiveGate="$REQUIRE_VALID_CERTIFICATE" \
Expand Down
8 changes: 4 additions & 4 deletions src/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ def handler(event, lambda_context):
log_error_with_stacktrace(e, "Exception caught in top-level handler")
result = TransformationResult.ProcessingFailed

# try:
# context.sfm.push_sfm_to_cloudwatch()
# except Exception as e:
# log_error_with_stacktrace(e, "SelfMonitoring push to Cloudwatch failed")
try:
context.sfm.push_sfm_to_cloudwatch()
except Exception as e:
log_error_with_stacktrace(e, "SelfMonitoring push to Cloudwatch failed")

return kinesis_data_transformation_response(records, result)

Expand Down
102 changes: 60 additions & 42 deletions src/logs/self_monitoring/sfm.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,12 +102,20 @@ def _generate_metrics(self):
"Value": self._function_name,
}]

metrics.append(_prepare_cloudwatch_metric(
"Kinesis record age", self._kinesis_records_age, "Seconds", common_dimensions))
metrics.append(_prepare_cloudwatch_metric(
"Kinesis record.data compressed size", self._record_data_compressed_size, "Bytes", common_dimensions))
metrics.append(_prepare_cloudwatch_metric(
"Kinesis record.data decompressed size", self._record_data_decompressed_size, "Bytes", common_dimensions))
metrics.append(_prepare_cloudwatch_metric_statistic(
"Kinesis record age", "Seconds", common_dimensions,
self._kinesis_records_age
))

metrics.append(_prepare_cloudwatch_metric_statistic(
"Kinesis record.data compressed size", "Bytes", common_dimensions,
self._record_data_compressed_size
))

metrics.append(_prepare_cloudwatch_metric_statistic(
"Kinesis record.data decompressed size", "Bytes", common_dimensions,
self._record_data_decompressed_size
))

# TO BE RESTORED IN DIFFERENT WAY IN APM-306046
# please remove this then
Expand All @@ -123,50 +131,46 @@ def _generate_metrics(self):
# common_dimensions + [{"Name": "log_group", "Value": log_group}]
# ))

metrics.append(_prepare_cloudwatch_metric(
"Batches prepared", self._batches_prepared, "None", common_dimensions))
metrics.append(_prepare_cloudwatch_metric(
"Log entries prepared", self._log_entries_prepared, "None", common_dimensions))
metrics.append(_prepare_cloudwatch_metric(
"Data volume prepared", self._data_volume_prepared, "Bytes", common_dimensions))
metrics.append(
_prepare_cloudwatch_metric("Batches prepared", "None", common_dimensions, self._batches_prepared))
metrics.append(
_prepare_cloudwatch_metric("Log entries prepared", "None", common_dimensions, self._log_entries_prepared))
metrics.append(
_prepare_cloudwatch_metric("Data volume prepared", "Bytes", common_dimensions, self._data_volume_prepared))

metrics.append(_prepare_cloudwatch_metric(
"Batches delivered", self._batches_delivered, "None", common_dimensions))
metrics.append(_prepare_cloudwatch_metric(
"Log entries delivered", self._log_entries_delivered, "None", common_dimensions))
metrics.append(_prepare_cloudwatch_metric(
"Data volume delivered", self._data_volume_delivered, "Bytes", common_dimensions))
metrics.append(
_prepare_cloudwatch_metric("Batches delivered", "None", common_dimensions, self._batches_delivered))
metrics.append(
_prepare_cloudwatch_metric("Log entries delivered", "None", common_dimensions, self._log_entries_delivered))
metrics.append(_prepare_cloudwatch_metric("Data volume delivered", "Bytes", common_dimensions,
self._data_volume_delivered))

for issue, count in self._issue_count_by_type.items():
metrics.append(_prepare_cloudwatch_metric(
"Issues", count, "None",
common_dimensions + [{"Name": "type", "Value": issue}]
))
metrics.append(
_prepare_cloudwatch_metric("Issues", "None", common_dimensions + [{"Name": "type", "Value": issue}],
count))

metrics.append(_prepare_cloudwatch_metric(
"Log content trimmed", self._log_content_trimmed, "None", common_dimensions))
metrics.append(_prepare_cloudwatch_metric(
"Log attr trimmed", self._log_attr_trimmed, "None", common_dimensions))
metrics.append(
_prepare_cloudwatch_metric("Log content trimmed", "None", common_dimensions, self._log_content_trimmed))
metrics.append(
_prepare_cloudwatch_metric("Log attr trimmed", "None", common_dimensions, self._log_attr_trimmed))

if self._logs_age_min_sec:
metrics.append(_prepare_cloudwatch_metric(
"Log age min", self._logs_age_min_sec, "Seconds", common_dimensions))
metrics.append(_prepare_cloudwatch_metric(
"Log age avg", self._logs_age_avg_sec, "Seconds", common_dimensions))
metrics.append(_prepare_cloudwatch_metric(
"Log age max", self._logs_age_max_sec, "Seconds", common_dimensions))

metrics.append(_prepare_cloudwatch_metric(
"Requests sent", self._requests_sent, "None", common_dimensions))
metrics.append(
_prepare_cloudwatch_metric("Log age min", "Seconds", common_dimensions, self._logs_age_min_sec))
metrics.append(
_prepare_cloudwatch_metric("Log age avg", "Seconds", common_dimensions, self._logs_age_avg_sec))
metrics.append(
_prepare_cloudwatch_metric("Log age max", "Seconds", common_dimensions, self._logs_age_max_sec))

metrics.append(_prepare_cloudwatch_metric("Requests sent", "None", common_dimensions, self._requests_sent))
if self._requests_durations_ms:
metrics.append(_prepare_cloudwatch_metric(
"Requests duration", self._requests_durations_ms, "Milliseconds", common_dimensions))
metrics.append(_prepare_cloudwatch_metric("Requests duration", "Milliseconds", common_dimensions,
self._requests_durations_ms))

for status_code, count in self._requests_count_by_status_code.items():
metrics.append(_prepare_cloudwatch_metric(
"Requests status code count", count, "None",
common_dimensions + [{"Name": "status_code", "Value": str(status_code)}]
))
metrics.append(_prepare_cloudwatch_metric("Requests status code count", "None", common_dimensions + [
{"Name": "status_code", "Value": str(status_code)}], count))

return metrics

Expand All @@ -182,7 +186,7 @@ def push_sfm_to_cloudwatch(self):
raise e


def _prepare_cloudwatch_metric(metric_name, value: Union[int, float, list], unit, dimensions) -> dict:
def _prepare_cloudwatch_metric(metric_name, unit, dimensions, value: Union[int, float, list]) -> dict:
cw_metric = {
'MetricName': metric_name,
'Dimensions': dimensions,
Expand All @@ -195,3 +199,17 @@ def _prepare_cloudwatch_metric(metric_name, value: Union[int, float, list], unit
cw_metric["Value"] = value

return cw_metric


def _prepare_cloudwatch_metric_statistic(metric_name, unit, dimensions, values: list) -> dict:
return {
'MetricName': metric_name,
'Dimensions': dimensions,
'Unit': unit,
'StatisticValues': {
'SampleCount': len(values),
'Sum': sum(values),
'Minimum': min(values),
'Maximum': max(values),
}
}
8 changes: 5 additions & 3 deletions tests/unit/logs/self_monitoring/test_sfm.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ def test_self_monitoring_context():
sfm = SelfMonitoringContext("my-lambda-function")

sfm.kinesis_record_age(5)
sfm.kinesis_record_age(10)
sfm.kinesis_record_decoded(1000, 2000)
sfm.kinesis_record_decoded(500, 4000)

sfm.single_record_transformed("logGroup1", 100, 1000)
sfm.single_record_transformed("logGroup1", 100, 1000)
Expand Down Expand Up @@ -50,19 +52,19 @@ def test_self_monitoring_context():
'Dimensions': [{'Name': 'function_name', 'Value': 'my-lambda-function'}],
'MetricName': 'Kinesis record age',
'Unit': 'Seconds',
'Values': [5]
'StatisticValues': {'Maximum': 10, 'Minimum': 5, 'SampleCount': 2, 'Sum': 15},
},
{
'Dimensions': [{'Name': 'function_name', 'Value': 'my-lambda-function'}],
'MetricName': 'Kinesis record.data compressed size',
'Unit': 'Bytes',
'Values': [1000]
'StatisticValues': {'Maximum': 1000, 'Minimum': 500, 'SampleCount': 2, 'Sum': 1500},
},
{
'Dimensions': [{'Name': 'function_name', 'Value': 'my-lambda-function'}],
'MetricName': 'Kinesis record.data decompressed size',
'Unit': 'Bytes',
'Values': [2000]
'StatisticValues': {'Maximum': 4000, 'Minimum': 2000, 'SampleCount': 2, 'Sum': 6000},
},
# {
# 'Dimensions': [{'Name': 'function_name', 'Value': 'my-lambda-function'},
Expand Down

0 comments on commit 8f1624f

Please sign in to comment.