Skip to content

Commit

Permalink
feat(lambda): add isOffsetLag prop and add support for generating an …
Browse files Browse the repository at this point in the history
…alarm and widget for the Lambda OffsetLag metric

Lambda has added support for connecting to Kafka streams via Event Source Mappings (see: https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html). This comes with the addition of a new CW metric under the AWS/Lambda namespace, `OffsetLag` (see: https://aws.amazon.com/blogs/compute/offset-lag-metric-for-amazon-msk-as-an-event-source-for-lambda/). This metric is different than the existing IteratorAge metric used for Kinesis streams. OffsetLag represents the difference between the last record written to a Kafka topic and the last record that the function's consumer group has processed (note the metric is in # of records, it's not a time metric). See here: https://docs.aws.amazon.com/lambda/latest/dg/monitoring-metrics.html.

Defaulted the monitor to `false` since it's a new prop and is not widely used.
  • Loading branch information
Connor Koch authored and Connor Koch committed Sep 17, 2024
1 parent 5e5b18b commit 32a88fd
Show file tree
Hide file tree
Showing 8 changed files with 695 additions and 242 deletions.
493 changes: 493 additions & 0 deletions API.md

Large diffs are not rendered by default.

25 changes: 25 additions & 0 deletions lib/common/monitoring/alarms/AgeAlarmFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ export interface MaxAgeThreshold extends CustomAlarmThreshold {
readonly maxAgeInMillis: number;
}

export interface MaxOffsetLagThreshold extends CustomAlarmThreshold {
readonly maxOffsetLag: number;
}

export interface DaysSinceUpdateThreshold extends CustomAlarmThreshold {
readonly maxDaysSinceUpdate: number;
}
Expand Down Expand Up @@ -65,6 +69,27 @@ export class AgeAlarmFactory {
});
}

addMaxOffsetLagAlarm(
metric: MetricWithAlarmSupport,
props: MaxOffsetLagThreshold,
disambiguator?: string,
) {
return this.alarmFactory.addAlarm(metric, {
treatMissingData:
props.treatMissingDataOverride ?? TreatMissingData.MISSING,
comparisonOperator:
props.comparisonOperatorOverride ??
ComparisonOperator.GREATER_THAN_THRESHOLD,
...props,
disambiguator,
threshold: props.maxOffsetLag,
alarmNameSuffix: "Offset-Lag-Max",
alarmDescription: "Max Offset Lag is too high.",
// Dedupe all iterator max age to the same ticket
alarmDedupeStringSuffix: "AnyMaxOffsetLag",
});
}

addDaysSinceUpdateAlarm(
metric: MetricWithAlarmSupport,
props: DaysSinceUpdateThreshold,
Expand Down
11 changes: 11 additions & 0 deletions lib/monitoring/aws-lambda/LambdaFunctionMetricFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -194,4 +194,15 @@ export class LambdaFunctionMetricFactory extends BaseMetricFactory<LambdaFunctio
}),
);
}

metricMaxOffsetLagInNumberOfRecords() {
return this.metricFactory.adaptMetric(
this.lambdaFunction.metric("OffsetLag", {
statistic: MetricStatistic.MAX,
label: "Offset Lag",
region: this.region,
account: this.account,
}),
);
}
}
82 changes: 72 additions & 10 deletions lib/monitoring/aws-lambda/LambdaFunctionMonitoring.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import {
LatencyType,
LowTpsThreshold,
MaxAgeThreshold,
MaxOffsetLagThreshold,
MegabyteMillisecondAxisFromZero,
MetricWithAlarmSupport,
MinUsageCountThreshold,
Expand Down Expand Up @@ -61,6 +62,13 @@ export interface LambdaFunctionMonitoringOptions extends BaseMonitoringProps {
* @default - true
*/
readonly isIterator?: boolean;
/**
* Indicates that the Lambda function handles an event source which uses offsets for records (e.g. Kafka streams).
* This impacts what widgets are shown, as well as validates the ability to use addMaxOffsetLagAlarm.
*
* @default - false
*/
readonly isOffsetLag?: boolean;

readonly addLatencyP50Alarm?: Record<string, LatencyThreshold>;
readonly addLatencyP90Alarm?: Record<string, LatencyThreshold>;
Expand Down Expand Up @@ -92,6 +100,8 @@ export interface LambdaFunctionMonitoringOptions extends BaseMonitoringProps {
>;
readonly addMaxIteratorAgeAlarm?: Record<string, MaxAgeThreshold>;

readonly addMaxOffsetLagAlarm?: Record<string, MaxOffsetLagThreshold>;

// Enhanced CPU metrics that are all time-based and not percent based
readonly addEnhancedMonitoringMaxCpuTotalTimeAlarm?: Record<
string,
Expand Down Expand Up @@ -148,6 +158,7 @@ export class LambdaFunctionMonitoring extends Monitoring {
readonly cpuTotalTimeAnnotations: HorizontalAnnotation[];
readonly memoryUsageAnnotations: HorizontalAnnotation[];
readonly maxIteratorAgeAnnotations: HorizontalAnnotation[];
readonly maxOffsetLagAnnotations: HorizontalAnnotation[];

readonly tpsMetric: MetricWithAlarmSupport;
readonly p50LatencyMetric: MetricWithAlarmSupport;
Expand All @@ -165,6 +176,8 @@ export class LambdaFunctionMonitoring extends Monitoring {

readonly isIterator: boolean;
readonly maxIteratorAgeMetric: MetricWithAlarmSupport;
readonly isOffsetLag: boolean;
readonly maxOffsetLagMetric: MetricWithAlarmSupport;

readonly lambdaInsightsEnabled: boolean;
readonly enhancedMetricFactory?: LambdaFunctionEnhancedMetricFactory;
Expand Down Expand Up @@ -209,6 +222,7 @@ export class LambdaFunctionMonitoring extends Monitoring {
this.cpuTotalTimeAnnotations = [];
this.memoryUsageAnnotations = [];
this.maxIteratorAgeAnnotations = [];
this.maxOffsetLagAnnotations = [];

this.metricFactory = new LambdaFunctionMetricFactory(
scope.createMetricFactory(),
Expand Down Expand Up @@ -242,6 +256,9 @@ export class LambdaFunctionMonitoring extends Monitoring {
this.isIterator = props.isIterator ?? true;
this.maxIteratorAgeMetric =
this.metricFactory.metricMaxIteratorAgeInMillis();
this.isOffsetLag = props.isOffsetLag ?? false;
this.maxOffsetLagMetric =
this.metricFactory.metricMaxOffsetLagInNumberOfRecords();

this.lambdaInsightsEnabled = props.lambdaInsightsEnabled ?? false;
if (props.lambdaInsightsEnabled) {
Expand Down Expand Up @@ -521,6 +538,22 @@ export class LambdaFunctionMonitoring extends Monitoring {
this.maxIteratorAgeAnnotations.push(createdAlarm.annotation);
this.addAlarm(createdAlarm);
}
for (const disambiguator in props.addMaxOffsetLagAlarm) {
if (!this.isOffsetLag) {
throw new Error(
"addMaxOffsetLagAlarm is not applicable if isOffsetLag is not true",
);
}

const alarmProps = props.addMaxOffsetLagAlarm[disambiguator];
const createdAlarm = this.ageAlarmFactory.addMaxOffsetLagAlarm(
this.maxOffsetLagMetric,
alarmProps,
disambiguator,
);
this.maxOffsetLagAnnotations.push(createdAlarm.annotation);
this.addAlarm(createdAlarm);
}

props.useCreatedAlarms?.consume(this.createdAlarms());
}
Expand All @@ -545,19 +578,37 @@ export class LambdaFunctionMonitoring extends Monitoring {
),
];

let secondRowWidgetWidth: number;
if (this.isIterator && this.isOffsetLag) {
secondRowWidgetWidth = QuarterWidth;
} else if (this.isIterator || this.isOffsetLag) {
secondRowWidgetWidth = ThirdWidth;
} else {
secondRowWidgetWidth = HalfWidth;
}
const secondRow: Row = new Row(
this.createInvocationWidget(
secondRowWidgetWidth,
DefaultGraphWidgetHeight,
),
this.createErrorCountWidget(
secondRowWidgetWidth,
DefaultGraphWidgetHeight,
),
);
if (this.isIterator) {
widgets.push(
new Row(
this.createInvocationWidget(ThirdWidth, DefaultGraphWidgetHeight),
this.createIteratorAgeWidget(ThirdWidth, DefaultGraphWidgetHeight),
this.createErrorCountWidget(ThirdWidth, DefaultGraphWidgetHeight),
secondRow.addWidget(
this.createIteratorAgeWidget(
secondRowWidgetWidth,
DefaultGraphWidgetHeight,
),
);
} else {
widgets.push(
new Row(
this.createInvocationWidget(HalfWidth, DefaultGraphWidgetHeight),
this.createErrorCountWidget(HalfWidth, DefaultGraphWidgetHeight),
}
if (this.isOffsetLag) {
secondRow.addWidget(
this.createOffsetLagWidget(
secondRowWidgetWidth,
DefaultGraphWidgetHeight,
),
);
}
Expand Down Expand Up @@ -681,6 +732,17 @@ export class LambdaFunctionMonitoring extends Monitoring {
});
}

createOffsetLagWidget(width: number, height: number) {
return new GraphWidget({
width,
height,
title: "OffsetLag",
left: [this.maxOffsetLagMetric],
leftYAxis: CountAxisFromZero,
leftAnnotations: this.maxOffsetLagAnnotations,
});
}

createLambdaInsightsCpuWidget(width: number, height: number) {
return new GraphWidget({
width,
Expand Down
38 changes: 1 addition & 37 deletions test/facade/__snapshots__/MonitoringAspect.test.ts.snap

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 1 addition & 13 deletions test/facade/__snapshots__/MonitoringFacade.test.ts.snap

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 36 additions & 1 deletion test/monitoring/aws-lambda/LambdaFunctionMonitoring.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ test("snapshot test: all alarms", () => {
lambdaFunction,
humanReadableName: "Dummy Lambda for testing",
alarmFriendlyName: "DummyLambda",
isOffsetLag: true,
addFaultRateAlarm: {
Warning: {
maxErrorRate: 1,
Expand Down Expand Up @@ -165,6 +166,11 @@ test("snapshot test: all alarms", () => {
maxAgeInMillis: 1_000_000,
},
},
addMaxOffsetLagAlarm: {
Warning: {
maxOffsetLag: 100,
},
},
useCreatedAlarms: {
consume(alarms: AlarmWithAnnotation[]) {
numAlarmsCreated = alarms.length;
Expand All @@ -173,7 +179,7 @@ test("snapshot test: all alarms", () => {
});

addMonitoringDashboardsToStack(stack, monitoring);
expect(numAlarmsCreated).toStrictEqual(14);
expect(numAlarmsCreated).toStrictEqual(15);
expect(Template.fromStack(stack)).toMatchSnapshot();
});

Expand Down Expand Up @@ -536,6 +542,35 @@ test("throws error if attempting to create iterator age alarm if not an iterator
);
});

test("throws error if attempting to create offsetLag alarm if not an offsetLag Lambda", () => {
const stack = new Stack();

const scope = new TestMonitoringScope(stack, "Scope");

const lambdaFunction = new Function(stack, "Function", {
functionName: "DummyLambda",
runtime: Runtime.NODEJS_18_X,
code: InlineCode.fromInline("{}"),
handler: "Dummy::handler",
});

expect(
() =>
new LambdaFunctionMonitoring(scope, {
lambdaFunction,
humanReadableName: "Dummy Lambda for testing",
alarmFriendlyName: "DummyLambda",
addMaxOffsetLagAlarm: {
Warning: {
maxOffsetLag: 100,
},
},
}),
).toThrow(
"addMaxOffsetLagAlarm is not applicable if isOffsetLag is not true",
);
});

test("doesn't create alarms for enhanced Lambda Insights metrics if not enabled", () => {
const stack = new Stack();

Expand Down
Loading

0 comments on commit 32a88fd

Please sign in to comment.