From 389c911f2068cabaea6e690fb9c6f66a9020f39a Mon Sep 17 00:00:00 2001 From: Bo Bayles Date: Thu, 27 May 2021 10:11:44 -0500 Subject: [PATCH 1/4] Separate AWSReader from BaseReader --- flowlogs_reader/flowlogs_reader.py | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/flowlogs_reader/flowlogs_reader.py b/flowlogs_reader/flowlogs_reader.py index abaedc3..0672f13 100644 --- a/flowlogs_reader/flowlogs_reader.py +++ b/flowlogs_reader/flowlogs_reader.py @@ -202,19 +202,11 @@ def from_cwl_event(cls, cwl_event, fields=DEFAULT_FIELDS): class BaseReader: def __init__( self, - client_type, region_name=None, start_time=None, end_time=None, boto_client=None, ): - self.region_name = region_name - if boto_client is not None: - self.boto_client = boto_client - else: - kwargs = {'region_name': region_name} if region_name else {} - self.boto_client = boto3.client(client_type, **kwargs) - # If no time filters are given use the last hour now = datetime.utcnow() self.start_time = start_time or now - timedelta(hours=1) @@ -230,7 +222,21 @@ def __next__(self): return next(self.iterator) -class FlowLogsReader(BaseReader): +class AWSReader(BaseReader): + def __init__( + self, client_type, region_name=None, boto_client=None, **kwargs + ): + self.region_name = region_name + if boto_client is not None: + self.boto_client = boto_client + else: + client_kwargs = {'region_name': region_name} if region_name else {} + self.boto_client = boto3.client(client_type, **client_kwargs) + + super().__init__(**kwargs) + + +class FlowLogsReader(AWSReader): """ Returns an object that will yield VPC Flow Log records as Python objects. * `log_group_name` is the name of the CloudWatch Logs group that stores @@ -351,7 +357,7 @@ def _reader(self): yield FlowRecord.from_cwl_event(event, self.fields) -class S3FlowLogsReader(BaseReader): +class S3FlowLogsReader(AWSReader): def __init__( self, location, From 821690451022861c8ee2619281ee9ed57ed1359c Mon Sep 17 00:00:00 2001 From: Bo Bayles Date: Thu, 27 May 2021 17:42:28 -0500 Subject: [PATCH 2/4] Add LocalFileReader --- README.md | 16 +++- flowlogs_reader/__init__.py | 2 + flowlogs_reader/__main__.py | 5 +- flowlogs_reader/flowlogs_reader.py | 24 +++++ tests/test_flowlogs_reader.py | 149 +++++++++++++++++------------ tests/test_main.py | 12 +++ 6 files changed, 145 insertions(+), 63 deletions(-) diff --git a/README.md b/README.md index 8009f4b..5e27b0f 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,8 @@ This project contains: * A Python library for retrieving and working with VPC Flow logs The tools support reading Flow Logs from both [CloudWatch Logs](https://docs.aws.amazon.com/AmazonVPC/latest/UserGuide/flow-logs-cwl.html) and [S3](https://docs.aws.amazon.com/AmazonVPC/latest/UserGuide/flow-logs-s3.html). -For S3 destinations, [version 3](https://aws.amazon.com/blogs/aws/learn-from-your-vpc-flow-logs-with-additional-meta-data/) custom log formats are supported. +It can also read from local files pulled from S3. +For S3 and file locations, [version 3](https://aws.amazon.com/blogs/aws/learn-from-your-vpc-flow-logs-with-additional-meta-data/) custom log formats are supported. The library builds on [boto3](https://github.com/boto/boto3) and should work on the [supported versions](https://devguide.python.org/#status-of-python-branches) of Python 3. @@ -46,9 +47,18 @@ __Location types__ `flowlogs_reader` has one required argument, `location`. By default that is interpreted as a CloudWatch Logs group. -To use an S3 location, specify `--location-type='s3'`: +To use an S3 location, specify `--location-type="s3"`: -* `flowlogs_reader --location-type="s3" "bucket-name/optional-prefix"` +``` +flowlogs_reader --location-type="s3" "bucket-name/optional-prefix" +``` + +To use a file location, specify `--location-type="file"`. If the location is a directory, all the `.log.gz` files in it will be read. + +``` +flowlogs_reader --location-type="file" "/home/sherlock/Downloads/aws_flow_logs/sample-file.log.gz" +flowlogs_reader --location-type="file" "/home/sherlock/Downloads/aws_flow_logs/" +``` __Printing flows__ diff --git a/flowlogs_reader/__init__.py b/flowlogs_reader/__init__.py index 0b6bb75..fe7b3c7 100644 --- a/flowlogs_reader/__init__.py +++ b/flowlogs_reader/__init__.py @@ -16,6 +16,7 @@ from .flowlogs_reader import ( FlowRecord, FlowLogsReader, + LocalFileReader, S3FlowLogsReader, ) @@ -23,5 +24,6 @@ 'aggregated_records', 'FlowRecord', 'FlowLogsReader', + 'LocalFileReader', 'S3FlowLogsReader', ] diff --git a/flowlogs_reader/__main__.py b/flowlogs_reader/__main__.py index 5392a2f..86fcb02 100644 --- a/flowlogs_reader/__main__.py +++ b/flowlogs_reader/__main__.py @@ -23,6 +23,7 @@ from .aggregation import aggregated_records from .flowlogs_reader import ( FlowLogsReader, + LocalFileReader, NODATA, S3FlowLogsReader, SKIPDATA, @@ -104,6 +105,8 @@ def get_reader(args): elif args.location_type == 's3': cls = S3FlowLogsReader client_type = 's3' + elif args.location_type == 'file': + cls = LocalFileReader if args.region: kwargs['region_name'] = args.region @@ -179,7 +182,7 @@ def main(argv=None): '--location-type', type=str, help='location type (CloudWatch Logs or S3), default is cwl', - choices=['cwl', 's3'], + choices=['cwl', 's3', 'file'], default='cwl', ) parser.add_argument( diff --git a/flowlogs_reader/flowlogs_reader.py b/flowlogs_reader/flowlogs_reader.py index 0672f13..5e3fff1 100644 --- a/flowlogs_reader/flowlogs_reader.py +++ b/flowlogs_reader/flowlogs_reader.py @@ -18,6 +18,7 @@ from datetime import datetime, timedelta from gzip import open as gz_open from os.path import basename +from pathlib import Path from threading import Lock import boto3 @@ -480,3 +481,26 @@ def _read_streams(self): def _reader(self): for event_data in self._read_streams(): yield FlowRecord(event_data) + + +class LocalFileReader(BaseReader): + def __init__(self, location, **kwargs): + self.location = location + super().__init__(**kwargs) + + def _read_file(self, file_path): + with gz_open(file_path, mode='rt') as gz_f: + reader = DictReader(gz_f, delimiter=' ') + reader.fieldnames = [ + f.replace('-', '_') for f in reader.fieldnames + ] + yield from reader + with THREAD_LOCK: + self.bytes_processed += gz_f.tell() + + def _reader(self): + path = Path(self.location) + all_files = path.glob('*.log.gz') if path.is_dir() else [path] + for file_path in all_files: + for event_data in self._read_file(file_path): + yield FlowRecord(event_data) diff --git a/tests/test_flowlogs_reader.py b/tests/test_flowlogs_reader.py index 9835c5c..851e3b8 100644 --- a/tests/test_flowlogs_reader.py +++ b/tests/test_flowlogs_reader.py @@ -13,8 +13,10 @@ # limitations under the License. from datetime import datetime -from gzip import compress +from gzip import compress, open as gz_open from io import BytesIO +from os.path import join +from tempfile import TemporaryDirectory from unittest import TestCase from unittest.mock import MagicMock, patch @@ -27,6 +29,7 @@ aggregated_records, FlowRecord, FlowLogsReader, + LocalFileReader, S3FlowLogsReader, ) from flowlogs_reader.flowlogs_reader import ( @@ -98,6 +101,63 @@ '- 192.0.2.156 6 us-east-2 192.0.2.156 50318 1614866493 - - ' 'subnet-0123456789abcdef 7 7 IPv4 5 vpc-04456ab739938ee3f\n' ) +V5_DICTS = [ + { + 'account_id': '999999999999', + 'action': 'ACCEPT', + 'az_id': 'use2-az2', + 'bytes': 4895, + 'dstaddr': '192.0.2.156', + 'dstport': 50318, + 'end': datetime(2021, 3, 4, 14, 1, 51), + 'flow_direction': 'ingress', + 'instance_id': 'i-00123456789abcdef', + 'interface_id': 'eni-00123456789abcdef', + 'log_status': 'OK', + 'packets': 15, + 'pkt_dstaddr': '192.0.2.156', + 'pkt_src_aws_service': 'S3', + 'pkt_srcaddr': '198.51.100.6', + 'protocol': 6, + 'region': 'us-east-2', + 'srcaddr': '198.51.100.7', + 'srcport': 443, + 'start': datetime(2021, 3, 4, 14, 1, 33), + 'subnet_id': 'subnet-0123456789abcdef', + 'tcp_flags': 19, + 'type': 'IPv4', + 'version': 5, + 'vpc_id': 'vpc-04456ab739938ee3f', + }, + { + 'account_id': '999999999999', + 'action': 'ACCEPT', + 'az_id': 'use2-az2', + 'bytes': 3015, + 'dstaddr': '198.51.100.6', + 'dstport': 443, + 'end': datetime(2021, 3, 4, 14, 1, 51), + 'flow_direction': 'egress', + 'instance_id': 'i-00123456789abcdef', + 'interface_id': 'eni-00123456789abcdef', + 'log_status': 'OK', + 'packets': 16, + 'pkt_dst_aws_service': 'S3', + 'pkt_dstaddr': '198.51.100.7', + 'pkt_srcaddr': '192.0.2.156', + 'protocol': 6, + 'region': 'us-east-2', + 'srcaddr': '192.0.2.156', + 'srcport': 50318, + 'start': datetime(2021, 3, 4, 14, 1, 33), + 'subnet_id': 'subnet-0123456789abcdef', + 'tcp_flags': 7, + 'traffic_path': 7, + 'type': 'IPv4', + 'version': 5, + 'vpc_id': 'vpc-04456ab739938ee3f', + }, +] class FlowRecordTestCase(TestCase): @@ -688,64 +748,7 @@ def test_serial_v4(self): self.assertEqual(reader.bytes_processed, len(V4_FILE.encode())) def test_serial_v5(self): - expected = [ - { - 'account_id': '999999999999', - 'action': 'ACCEPT', - 'az_id': 'use2-az2', - 'bytes': 4895, - 'dstaddr': '192.0.2.156', - 'dstport': 50318, - 'end': datetime(2021, 3, 4, 14, 1, 51), - 'flow_direction': 'ingress', - 'instance_id': 'i-00123456789abcdef', - 'interface_id': 'eni-00123456789abcdef', - 'log_status': 'OK', - 'packets': 15, - 'pkt_dstaddr': '192.0.2.156', - 'pkt_src_aws_service': 'S3', - 'pkt_srcaddr': '198.51.100.6', - 'protocol': 6, - 'region': 'us-east-2', - 'srcaddr': '198.51.100.7', - 'srcport': 443, - 'start': datetime(2021, 3, 4, 14, 1, 33), - 'subnet_id': 'subnet-0123456789abcdef', - 'tcp_flags': 19, - 'type': 'IPv4', - 'version': 5, - 'vpc_id': 'vpc-04456ab739938ee3f', - }, - { - 'account_id': '999999999999', - 'action': 'ACCEPT', - 'az_id': 'use2-az2', - 'bytes': 3015, - 'dstaddr': '198.51.100.6', - 'dstport': 443, - 'end': datetime(2021, 3, 4, 14, 1, 51), - 'flow_direction': 'egress', - 'instance_id': 'i-00123456789abcdef', - 'interface_id': 'eni-00123456789abcdef', - 'log_status': 'OK', - 'packets': 16, - 'pkt_dst_aws_service': 'S3', - 'pkt_dstaddr': '198.51.100.7', - 'pkt_srcaddr': '192.0.2.156', - 'protocol': 6, - 'region': 'us-east-2', - 'srcaddr': '192.0.2.156', - 'srcport': 50318, - 'start': datetime(2021, 3, 4, 14, 1, 33), - 'subnet_id': 'subnet-0123456789abcdef', - 'tcp_flags': 7, - 'traffic_path': 7, - 'type': 'IPv4', - 'version': 5, - 'vpc_id': 'vpc-04456ab739938ee3f', - }, - ] - reader = self._test_iteration(V5_FILE, expected) + reader = self._test_iteration(V5_FILE, V5_DICTS) self.assertEqual(reader.bytes_processed, len(V5_FILE.encode())) def test_threads(self): @@ -795,6 +798,34 @@ def test_threads(self): self._test_iteration(V3_FILE, expected) +class LocalFileReaderTests(TestCase): + def setUp(self): + self.temp_dir = TemporaryDirectory() + self.first_file = join(self.temp_dir.name, 'first.log.gz') + self.second_file = join(self.temp_dir.name, 'second.log.gz') + # Ignored: wrong extension + self.third_file = join(self.temp_dir.name, 'third.gz') + + for file_path in (self.first_file, self.second_file, self.third_file): + with gz_open(file_path, 'wt') as f: + f.write(V5_FILE) + + def tearDown(self): + self.temp_dir.cleanup() + + def test_file(self): + reader = LocalFileReader(self.first_file) + actual = [x.to_dict() for x in reader] + self.assertEqual(actual, V5_DICTS) + self.assertEqual(reader.bytes_processed, len(V5_FILE.encode())) + + def test_dir(self): + reader = LocalFileReader(self.temp_dir.name) + actual = [x.to_dict() for x in reader] + self.assertEqual(actual, V5_DICTS * 2) + self.assertEqual(reader.bytes_processed, len(V5_FILE.encode()) * 2) + + class AggregationTestCase(TestCase): def test_aggregated_records(self): # Aggregate by 5-tuple by default diff --git a/tests/test_main.py b/tests/test_main.py index 7df3320..406ad0e 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -342,3 +342,15 @@ def test_s3_destination(self, mock_out, mock_reader): __, args, kwargs = call line = args[0] self.assertEqual(line, record) + + @patch('flowlogs_reader.__main__.LocalFileReader', autospec=True) + @patch('flowlogs_reader.__main__.print', create=True) + def test_file_destination(self, mock_out, mock_reader): + mock_out.stdout = io.BytesIO() + mock_reader.return_value = SAMPLE_RECORDS + main(['--location-type', 'file', '/tmp/test-file.csv.gz']) + mock_reader.assert_called_once_with(location='/tmp/test-file.csv.gz') + for call, record in zip_longest(mock_out.mock_calls, SAMPLE_INPUT): + __, args, kwargs = call + line = args[0] + self.assertEqual(line, record) From b58173384e04de195c0bd1330fe840895ad1940d Mon Sep 17 00:00:00 2001 From: "Michael J. Schultz" Date: Fri, 13 Jan 2023 18:42:57 -0600 Subject: [PATCH 3/4] indent emoji --- flowlogs_reader/flowlogs_reader.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/flowlogs_reader/flowlogs_reader.py b/flowlogs_reader/flowlogs_reader.py index d4c07f5..3571232 100644 --- a/flowlogs_reader/flowlogs_reader.py +++ b/flowlogs_reader/flowlogs_reader.py @@ -576,9 +576,9 @@ def _reader(self): all_files = path.glob('*.log.gz') if path.is_dir() else [path] for file_path in all_files: for event_data in self._read_file(file_path): - try: - yield FlowRecord(event_data) - except Exception: - self.skipped_records += 1 - if self.raise_on_error: - raise + try: + yield FlowRecord(event_data) + except Exception: + self.skipped_records += 1 + if self.raise_on_error: + raise From da73e6bf9e6e4a8e5f2aaaaa74b37091440e3876 Mon Sep 17 00:00:00 2001 From: "Michael J. Schultz" Date: Fri, 13 Jan 2023 18:45:49 -0600 Subject: [PATCH 4/4] csv_dict_reader --- flowlogs_reader/flowlogs_reader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flowlogs_reader/flowlogs_reader.py b/flowlogs_reader/flowlogs_reader.py index 3571232..ea54f13 100644 --- a/flowlogs_reader/flowlogs_reader.py +++ b/flowlogs_reader/flowlogs_reader.py @@ -563,7 +563,7 @@ def __init__(self, location, **kwargs): def _read_file(self, file_path): with gz_open(file_path, mode='rt') as gz_f: - reader = DictReader(gz_f, delimiter=' ') + reader = csv_dict_reader(gz_f, delimiter=' ') reader.fieldnames = [ f.replace('-', '_') for f in reader.fieldnames ]