Skip to content

How to Investigate and Re‐Ingest Firehose Ingestion Processing Failures

Mitchell Alessio edited this page Jun 18, 2024 · 2 revisions

How to Investigate Firehose Ingestion Processing Failures

Follow this runbook to investigate and re-ingest processing failures from Amazon Kinesis Data Firehose Delivery Stream ingestion. As of writing, this refers specifically to the bfd-insights-bfd-ENV-firehose-ingester Firehose Stream(s) (and, correspondingly, the associated transformation Lambda(s): bfd-insights-bfd-ENV-cw-to-flattened-json).

Glossary

Term Definition
Amazon Kinesis Data Firehose An ETL (extract, transform, load) service for capturing data and delivering data to various stores
Firehose Stream The "underlying entity" of Kinesis. This is the component of Firehose that data is sent to
log event A single log message from a CloudWatch Log Stream. For example, a single JSON log from BFD Server's access.json
record, data record A collection of log events/data/etc. that is a single unit ingested by the Firehose Stream. Is gzip'd and then base64-encoded before being submitted to the Firehose Stream
batch, record batch A group of records that can be submitted to a Firehose Stream via PutRecordBatch. Also refers to the file containing ProcessingFailed records that is written to S3 upon ingestion failures

FAQ

What happens when a data record is marked as ProcessingFailed

The individual record will be written, as JSON with some additional metadata, to S3 under the Firehose Stream's output prefix. If there was more than one ProcessingFailed record in the batch, then additional records from that batch will be appended to the file delimited by newlines (therefore the ProcessingFailed batch is not valid JSON).

Note that the record itself, its data (denoted by the rawData JSON property), is a base64-encoded gzip'd blob. A Python helper script, firehose_helper.py, is provided on this wiki page below that can be used to analyze these ProcessingFailed batch files and also re-submit them to the Firehose Stream.

How would "processing failures" occur?

There are a few known cases:

  • If a single log event (log message) in a given data record exceeds 6 MB, the bfd-insights-bfd-ENV-cw-to-flattened-json Lambda(s) cannot split the record any further to reduce its size so it will mark the corresponding record as ProcessingFailed
    • This is probably impossible, as the maximum size of a single log event in CloudWatch Logs is 256KB. The Firehose Stream receives log events directly from CloudWatch Logs
  • If Firehose is unable to call the transformation Lambda(s) (bfd-insights-bfd-ENV-cw-to-flattened-json) for any reason (Lambda is down, the Lambda doesn't exist, etc.)
    • This occurred on June 13, 2023, and resulted in over ~4.5m log events being marked as ProcessingFailed as Firehose was unable to transform the incoming record batches
  • If the bfd-insights-bfd-ENV-cw-to-flattened-json Lambda(s) raise any unhandled runtime error upon invocation the corresponding batch of records will be marked as ProcessingFailed
    • This has occurred on multiple occasions, with 2 distinct cases identified:
      • Very, very rarely two-or-more log events will be "merged" into a single log event (i.e.: {"key1": "valu{"key1": "value"}) resulting in a log event that cannot be interpreted as valid JSON. This causes the Lambda(s) to raise a JSONDecodeError at runtime, and results in the corresponding batch being marked as ProcessingFailed
        • This case corresponds to BFD-3490
      • Also very rarely the BFD Server will emit a log including an exceptionally long SQL query that causes some external mechanism, likely the CloudWatch Agent that submits logs to CloudWatch, to truncate the log message (i.e.: {..."sql_query": "SELECT ...<long select statement>... [Truncated]) in such a way that the truncated log event cannot be interpreted as valid JSON. This causes the Lambda(s) to raise a JSONDecodeError at runtime, and results in the corresponding batch being marked as ProcessingFailed
        • This case corresponds to BFD-3491
        • This could happen for any log message that is exceptionally log although this case has only been encountered when the log includes a SQL query. Also, this case has happened only a single time since BFD Insights has been operational

Has there ever been processing failures?

Yes, as of writing we have had processing failures occur multiple since the creation of the Firehose Stream. The first such batch of processing failures occurred on (MM-DD-YYYY) 09-29-2022 due to the transformation Lambda lacking the resources necessary to process incoming log events. This particular case has since been resolved by increasing the memory limit on the Lambda(s) and by updating the Lambda(s) to be based upon a more up-to-date version of the kinesis-firehose-cloudwatch-logs-processor-python Lambda Blueprint.

Subsequent ProcessingFailed events have all been caused by one of the aforementioned failure cases.

Note that all ProcessingFailed batches from September 2022 through June 2024 have been successfully re-ingested using the firehose_helper.py script.

Prerequisites

  • Access to the BFD/CMS AWS account

  • Permissions to access and download files from the Delivery Stream's S3 Bucket destination

  • Familiarity with using command-line interface (CLI) tools

  • jq installed (brew install jq)

  • An installation of Python 3.11 or greater

  • boto3 installed (pip install boto3) in an activate'd Python 3.11 virtual environment or globally

  • The following Python helper script downloaded locally as firehose_helper.py:

    firehose_helper.py
    import argparse
    import base64
    import gzip
    import json
    import subprocess
    import traceback
    from dataclasses import asdict, dataclass, fields
    from enum import StrEnum, auto
    from pathlib import Path
    from typing import Any, Mapping
    
    import boto3
    
    
    class _Op(StrEnum):
        COUNT = auto()
        FIX = auto()
        BROKEN = auto()
        EXTRACT = auto()
        REINGEST = auto()
    
    
    @dataclass
    class _Args:
        op: _Op
        path: Path
        delivery_stream: str | None
    
        @classmethod
        def parse_args(cls: type["_Args"]) -> "_Args":
            parser = argparse.ArgumentParser()
            parser.add_argument("op", type=_Op, choices=list(_Op))
            parser.add_argument("path", type=Path)
            parser.add_argument(
                "-D", "--delivery-stream", type=str, required=False, dest="delivery_stream"
            )
    
            parsed_args = parser.parse_args()
    
            opts_as_dict = vars(parsed_args)
            common_keys = opts_as_dict.keys() & set(field.name for field in fields(_Args))
            common_params: dict[str, Any] = {k: v for k, v in opts_as_dict.items() if k in common_keys}
            try:
                args = _Args(**common_params)
            except Exception as exc:
                raise ValueError(
                    f"Unable to create instance of {_Args.__name__} from given arguments"
                ) from exc
    
            return args
    
    
    @dataclass
    class _LogEvent:
        id: str
        timestamp: int
        message: str
    
    
    @dataclass
    class _LogDataCollection:
        messageType: str
        owner: str
        logGroup: str
        logStream: str
        subscriptionFilters: list[str]
        logEvents: list[_LogEvent]
    
        def __post_init__(self: "_LogDataCollection") -> None:
            # if the first element is a dict, assume all elements are
            if isinstance(self.logEvents[0], Mapping):
                self.logEvents = [_LogEvent(**x) for x in self.logEvents]  # type: ignore
    
    
    @dataclass
    class _ExpandedLogRecord:
        attemptsMade: int
        arrivalTimestamp: int
        errorCode: str
        errorMessage: str
        attemptEndingTimestamp: int
        data: _LogDataCollection
        lambdaArn: str
    
    
    @dataclass
    class _FixedRecord:
        compressed_data: bytes
    
        @property
        def b64_compressed_data(self: "_FixedRecord") -> str:
            return base64.b64encode(self.compressed_data).decode()
    
    
    def _fix_batch_records(
        expanded_records: list[_ExpandedLogRecord],
    ) -> tuple[int, list[list[_FixedRecord]]]:
        total_events = 0
        fixed_records: list[_FixedRecord] = []
        for record in expanded_records:
            valid_log_events: list[_LogEvent] = []
            for event in record.data.logEvents:
                try:
                    json.loads(event.message)
                    valid_log_events.append(event)
                except json.JSONDecodeError as exc:  # noqa: PERF203
                    print(f"Skipped log event {event.id} due to JSON decode error: {exc!s}")
    
            if valid_log_events:
                fixed_records.append(
                    _FixedRecord(
                        compressed_data=gzip.compress(
                            json.dumps(
                                asdict(
                                    _LogDataCollection(
                                        messageType=record.data.messageType,
                                        owner=record.data.owner,
                                        logGroup=record.data.logGroup,
                                        logStream=record.data.logStream,
                                        subscriptionFilters=record.data.subscriptionFilters,
                                        logEvents=valid_log_events,
                                    )
                                )
                            ).encode()
                        )
                    )
                )
                total_events += len(valid_log_events)
    
        for x in (x for x in fixed_records if len(x.compressed_data) >= 1000000):  # 1000 KB max
            raise RuntimeError(
                f"Found record exceeding max size (size of {len(x.compressed_data)}); data"
                " might not be able to be ingested"
            )
    
        chunked_batches: list[list[_FixedRecord]] = []
        current_batch: list[_FixedRecord] = []
        current_length = 0
        for record in fixed_records:
            # Chunk a fixed record batch into partitions upto approximately 3 MB in size (to avoid
            # hitting Firehose PutRecordBatch API quotas of 4 MB) and ensure that each chunk/partition
            # of a batch does not exceed 500 elements in length (another API limit)
            if (
                current_length + len(record.b64_compressed_data) <= 3 * 1000 * 1000
                and len(current_batch) + 1 < 500
            ):
                current_batch.append(record)
                current_length += len(record.b64_compressed_data)
            else:
                chunked_batches.append(current_batch)
                current_batch = [record]
                current_length = len(record.b64_compressed_data)
    
        if current_batch:
            chunked_batches.append(current_batch)
    
        return total_events, chunked_batches
    
    
    def main() -> None:
        args = _Args.parse_args()
    
        files_to_process = (
            [args.path] if args.path.is_file() else list(args.path.rglob(f"*{args.delivery_stream}*"))
        )
    
        num_files = len(files_to_process)
        if num_files > 1:
            print(f"Found {num_files} failed batch file(s) to process")
    
        total_log_messages = 0
        for f_idx, file in enumerate(files_to_process):
            cur_file_num = f_idx + 1
    
            print(f'{cur_file_num}/{num_files}: Executing "{args.op.value}" on {file}')
            # Each failing "record" from a batch of failed records is saved to S3 in a single file as
            # newline-separated JSON objects, NOT in an array. This is obviously invalid JSON, so we use
            # jq's "slurp" function to automatically convert the distinct objects into a collection of
            # records in an array
            try:
                slurped_data_json = subprocess.run(
                    args=["jq", "-s"],
                    input=file.read_text(encoding="utf-8"),
                    text=True,
                    capture_output=True,
                    check=True,
                ).stdout
                record_batch: list[dict[str, Any]] = json.loads(slurped_data_json)
    
                expanded_records = [
                    _ExpandedLogRecord(
                        data=_LogDataCollection(
                            **json.loads(
                                gzip.decompress(base64.b64decode(x["rawData"])).decode(encoding="utf-8")
                            )
                        ),
                        **{k: v for k, v in x.items() if k != "rawData"},
                    )
                    for x in record_batch
                ]
    
                cur_batch_log_msg_count = sum(len(x.data.logEvents) for x in expanded_records)
                total_log_messages += cur_batch_log_msg_count
    
                print(f"\t{file} is a valid firehose record batch")
            except Exception as e:
                print(f"\t{file} is not a valid firehose record batch: {e!s}")
                continue
    
            match args.op:
                case _Op.COUNT:
                    # Count is basically a no-op because we're already summing the count above
                    print(f"\tHas {cur_batch_log_msg_count} log message(s)")
    
                case _Op.EXTRACT:
                    out_file = str(file.stem) + ".extracted.json"
                    Path(out_file).write_text(
                        json.dumps([asdict(x) for x in expanded_records], indent=2)
                    )
                    print(f"\tExtracted/expanded {file!s} to {out_file!s}")
    
                case _Op.FIX:
                    total_events, partitioned_batches = _fix_batch_records(expanded_records)
    
                    for part_idx, batch_part in enumerate(partitioned_batches):
                        out_file = str(file.stem) + f".part{part_idx+1}.fixed.json"
    
                        Path(out_file).write_text(
                            json.dumps([{"Data": x.b64_compressed_data} for x in batch_part])
                        )
                        print(
                            f"\tWrote {len(batch_part)} fixed record(s) to {out_file!s}. The result"
                            " can be re-ingested by the Ingestion Lambda"
                        )
    
                    print(
                        f"\tWrote {total_events} total log event(s) to {len(partitioned_batches)}"
                        " file(s)"
                    )
    
                case _Op.REINGEST:
                    if not args.delivery_stream:
                        print(
                            "A valid delivery stream name must be specified to re-ingest a failed batch"
                        )
                        return
    
                    total_events, partitioned_batches = _fix_batch_records(expanded_records)
                    total_records = sum(len(x) for x in partitioned_batches)
                    total_parts = len(partitioned_batches)
    
                    firehose_client = boto3.client("firehose")  # type: ignore
    
                    print(
                        f"\tRe-ingesting {total_events} total log message(s)/event(s) from"
                        f" {total_records} total record(s) in {total_parts} part(s)"
                    )
                    for part_idx, batch_part in enumerate(partitioned_batches):
                        current_part_num = part_idx + 1
                        print(
                            f"\t{current_part_num}/{total_parts}: Attempting to put {len(batch_part)}"
                            f" records to {args.delivery_stream}"
                        )
                        for i in range(3):
                            try:
                                firehose_resp = firehose_client.put_record_batch(
                                    DeliveryStreamName=args.delivery_stream,
                                    Records=[{"Data": x.compressed_data} for x in batch_part],
                                )
                            except Exception:
                                print(
                                    f"\t{current_part_num}/{total_parts}: PutRecordBatch failed"
                                    f" (try #{i + 1}/3), retrying..."
                                )
                                print("\n\t".join(traceback.format_exc().splitlines()))
    
                                continue
    
                            if not firehose_resp:
                                print(
                                    f"\t{current_part_num}/{total_parts}: Unable to re-ingest batch"
                                    f" partition #{current_part_num} from {file}"
                                )
                                print(
                                    f"\t{current_part_num}/{total_parts}: Writing failing batch"
                                    " partition to disk for manual processing"
                                )
    
                                failed_batch_part_file = (
                                    str(file.stem) + f".part{current_part_num}.failed.json"
                                )
                                Path(failed_batch_part_file).write_text(
                                    json.dumps([{"Data": x.b64_compressed_data} for x in batch_part])
                                )
                                print(
                                    f"\t{current_part_num}/{total_parts}: Wrote failing batch partition"
                                    f" to {failed_batch_part_file!s}. Continuing..."
                                )
                                break
    
                            print(
                                f"\t{current_part_num}/{total_parts}: Firehose received"
                                f" {len(firehose_resp['RequestResponses'])} record(s)."
                                f" {int(firehose_resp['FailedPutCount'])} record(s) failed to"
                                " process."
                            )
    
                            if len(firehose_resp["RequestResponses"]) != len(batch_part):
                                print(
                                    f"\tFirehose did not report receiving the number of records"
                                    " expected. Firehose received"
                                    f" {len(firehose_resp['RequestResponses'])} record(s), expected"
                                    f" {len(batch_part)}."
                                )
    
                            out_file = str(file.stem) + f".part{current_part_num}.resp.json"
                            Path(out_file).write_text(json.dumps(firehose_resp, indent=2))
                            print(
                                f"\t{current_part_num}/{total_parts}: Wrote Firehose response to"
                                f" {out_file}"
                            )
    
                            break
    
                    print(f"\tFinished re-ingesting {total_parts} batch part(s)")
    
                case _Op.BROKEN:
                    broken_msgs: list[str] = []
                    event_count = 0
                    for record in expanded_records:
                        for event in record.data.logEvents:
                            event_count = event_count + 1
                            try:
                                json.loads(event.message)
                            except json.JSONDecodeError as exc:
                                broken_msgs.append(f"{event.message} => {exc!s}")
    
                    print(f"\tProcessed {event_count} event(s)")
    
                    if not len(broken_msgs):
                        print("\tFound 0 broken messages")
                        return
    
                    out_file = str(file.stem) + ".broken"
    
                    Path(out_file).write_text("\n".join(broken_msgs))
                    print(f"\tWrote {len(broken_msgs)} broken message(s) to {out_file!s}")
    
        print(f"Finished processing {num_files} file(s); {total_log_messages} total log message(s)")
    
    
    if __name__ == "__main__":
        main()

Instructions

  1. Finding the S3 bucket and error output path:

    1. In any browser, navigate to https://aws.amazon.com and sign-in. The AWS starting page should load, and a searchbar should be visible at the top of the page
    2. In the searchbar, search for "kinesis" and select the Kinesis service when it appears. A new page should with three cards listing "Data Streams", "Data Firehose", and "Data Analytics"
    3. Under "Data Firehose", click the number under "Total delivery streams". This will take you to a new page listing all of the Delivery Streams under our AWS account
    4. In the list of Delivery Streams, click on the matching Delivery Stream that needs to be investigated. A new page should load showing information about the Delivery Stream
      1. For example, the Delivery Stream that corresponds with the bfd-insights-bfd-prod-cw-to-flattened-json Lambda is the bfd-insights-bfd-prod-firehose-ingester Delivery Stream
    5. Click on "Configuration"
    6. Under "Destination settings", note down the S3 bucket and the S3 bucket error output prefix. This will be the destination where processing failures will be written to
      1. Note that, for this case, !{firehose:error-output-type} will become processing-failed in the resulting error prefix
  2. Now that you have the S3 bucket and path where the processing failures are stored, you can download the failed files from said bucket and path. This runbook will not go into detail how to do this, so consult AWS's documentation if you are unsure of how to download files from an S3 bucket. Subsequent steps assume you have downloaded these files to your local machine

  3. Ensure you have the above firehose_helper.py script downloaded locally and have an active Python 3.11 environment with boto3 installed

  4. Using firehose_helper.py, you can execute multiple different operations on ProcessingFailed batch(es) following the command format python firehose_helper.py <OPERATION> <FILE OR DIRECTORY> [-d <DELIVERY STREAM NAME (optional)>] (note that supplying a directory requires the delivery stream to be specified, and all files within the directory recursively matching the delivery stream name will be processed):

    1. If you want to look at the data within the batch(es), you can use the extract operation to generate an "extracted"/"expanded" ProcessingFailed batch with all rawData records replaced with the uncompressed, unencoded data:

      python firehose_helper.py extract failed_batch_file
      # Will emit a file named failed_batch_file.extracted.json
    2. If you want to just know how many log messages were lost, you can use count to count the number of log messages within the batch(es):

      python firehose_helper.py count failed_batch_file
      # Will print the number of log messages and records
    3. If you want to know whether there are any broken log messages (those that cannot be JSON-decoded), you can use broken to emit a file containing the broken log messages along with the decoding errors:

      python firehose_helper.py broken failed_batch_file
      # Will emit a file named failed_batch_file.broken
    4. If you want to emit JSON batch(es) that can be submitted to Firehose using the AWS CLI (aws firehose put-record-batch) as well as remove any broken log messages, you can use fix to emit one-or-more JSON record batch(es) (per ProcessingFailed batch) that can be used to do so:

      python firehose_helper.py fix failed_batch_file
      # Will emit one-or-more JSON batch(es) that can be used with the AWS CLI to re-submit the failing records to Firehose
    5. If you want to just re-ingest a failing batch of records omitting any broken log messages, you can use reingest to process a ProcessingFailed batch and automatically re-submit all valid log events to the Firehose Stream specified by -d/--delivery-stream:

      python firehose_helper.py reingest failed_batch_file -D firehose_stream
      # Will attempt to re-ingest the records within failed_batch_file to firehose_stream, omitting any broken log events. The response of each PutRecordBatch operation will be written to the working directory as JSON
Clone this wiki locally