Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Local file reader #54

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ This project contains:
* A Python library for retrieving and working with VPC Flow logs

The tools support reading Flow Logs from both [CloudWatch Logs](https://docs.aws.amazon.com/AmazonVPC/latest/UserGuide/flow-logs-cwl.html) and [S3](https://docs.aws.amazon.com/AmazonVPC/latest/UserGuide/flow-logs-s3.html).
For S3 destinations, [version 3](https://aws.amazon.com/blogs/aws/learn-from-your-vpc-flow-logs-with-additional-meta-data/) custom log formats are supported.
It can also read from local files pulled from S3.
For S3 and file locations, [version 3](https://aws.amazon.com/blogs/aws/learn-from-your-vpc-flow-logs-with-additional-meta-data/) custom log formats are supported.

The library builds on [boto3](https://github.com/boto/boto3) and should work on the [supported versions](https://devguide.python.org/#status-of-python-branches) of Python 3.

Expand Down Expand Up @@ -45,9 +46,18 @@ __Location types__

`flowlogs_reader` has one required argument, `location`. By default that is interpreted as a CloudWatch Logs group.

To use an S3 location, specify `--location-type='s3'`:
To use an S3 location, specify `--location-type="s3"`:

* `flowlogs_reader --location-type="s3" "bucket-name/optional-prefix"`
```
flowlogs_reader --location-type="s3" "bucket-name/optional-prefix"
```

To use a file location, specify `--location-type="file"`. If the location is a directory, all the `.log.gz` files in it will be read.

```
flowlogs_reader --location-type="file" "/home/sherlock/Downloads/aws_flow_logs/sample-file.log.gz"
flowlogs_reader --location-type="file" "/home/sherlock/Downloads/aws_flow_logs/"
```

__Printing flows__

Expand Down
2 changes: 2 additions & 0 deletions flowlogs_reader/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
from .flowlogs_reader import (
FlowRecord,
FlowLogsReader,
LocalFileReader,
S3FlowLogsReader,
)

__all__ = [
'aggregated_records',
'FlowRecord',
'FlowLogsReader',
'LocalFileReader',
'S3FlowLogsReader',
]
5 changes: 4 additions & 1 deletion flowlogs_reader/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from .aggregation import aggregated_records
from .flowlogs_reader import (
FlowLogsReader,
LocalFileReader,
NODATA,
S3FlowLogsReader,
SKIPDATA,
Expand Down Expand Up @@ -104,6 +105,8 @@ def get_reader(args):
elif args.location_type == 's3':
cls = S3FlowLogsReader
client_type = 's3'
elif args.location_type == 'file':
cls = LocalFileReader

if args.region:
kwargs['region_name'] = args.region
Expand Down Expand Up @@ -179,7 +182,7 @@ def main(argv=None):
'--location-type',
type=str,
help='location type (CloudWatch Logs or S3), default is cwl',
choices=['cwl', 's3'],
choices=['cwl', 's3', 'file'],
default='cwl',
)
parser.add_argument(
Expand Down
57 changes: 46 additions & 11 deletions flowlogs_reader/flowlogs_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@
from datetime import datetime, timedelta
from gzip import open as gz_open
from os.path import basename
from parquet import DictReader as parquet_dict_reader
from pathlib import Path
from threading import Lock

import boto3
import io

from botocore.exceptions import PaginationError
from dateutil.rrule import rrule, DAILY
from parquet import DictReader as parquet_dict_reader

DEFAULT_FIELDS = (
'version',
Expand Down Expand Up @@ -244,20 +245,12 @@ def from_cwl_event(cls, cwl_event, fields=DEFAULT_FIELDS):
class BaseReader:
def __init__(
self,
client_type,
region_name=None,
start_time=None,
end_time=None,
boto_client=None,
raise_on_error=False,
):
self.region_name = region_name
if boto_client is not None:
self.boto_client = boto_client
else:
kwargs = {'region_name': region_name} if region_name else {}
self.boto_client = boto3.client(client_type, **kwargs)

# If no time filters are given use the last hour
now = datetime.utcnow()
self.start_time = start_time or now - timedelta(hours=1)
Expand All @@ -276,7 +269,21 @@ def __next__(self):
return next(self.iterator)


class FlowLogsReader(BaseReader):
class AWSReader(BaseReader):
def __init__(
self, client_type, region_name=None, boto_client=None, **kwargs
):
self.region_name = region_name
if boto_client is not None:
self.boto_client = boto_client
else:
client_kwargs = {'region_name': region_name} if region_name else {}
self.boto_client = boto3.client(client_type, **client_kwargs)

super().__init__(**kwargs)


class FlowLogsReader(AWSReader):
"""
Returns an object that will yield VPC Flow Log records as Python objects.
* `log_group_name` is the name of the CloudWatch Logs group that stores
Expand Down Expand Up @@ -408,7 +415,7 @@ def _reader(self):
raise


class S3FlowLogsReader(BaseReader):
class S3FlowLogsReader(AWSReader):
def __init__(
self,
location,
Expand Down Expand Up @@ -547,3 +554,31 @@ def _reader(self):
self.skipped_records += 1
if self.raise_on_error:
raise


class LocalFileReader(BaseReader):
def __init__(self, location, **kwargs):
self.location = location
super().__init__(**kwargs)

def _read_file(self, file_path):
with gz_open(file_path, mode='rt') as gz_f:
reader = csv_dict_reader(gz_f, delimiter=' ')
reader.fieldnames = [
f.replace('-', '_') for f in reader.fieldnames
]
yield from reader
with THREAD_LOCK:
self.bytes_processed += gz_f.tell()

def _reader(self):
path = Path(self.location)
all_files = path.glob('*.log.gz') if path.is_dir() else [path]
for file_path in all_files:
for event_data in self._read_file(file_path):
try:
yield FlowRecord(event_data)
except Exception:
self.skipped_records += 1
if self.raise_on_error:
raise
151 changes: 92 additions & 59 deletions tests/test_flowlogs_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
# limitations under the License.

from datetime import datetime
from gzip import compress
from gzip import compress, open as gz_open
from io import BytesIO
from os.path import join
from tempfile import TemporaryDirectory
from unittest import TestCase
from unittest.mock import MagicMock, patch

Expand All @@ -27,6 +29,7 @@
aggregated_records,
FlowRecord,
FlowLogsReader,
LocalFileReader,
S3FlowLogsReader,
)
from flowlogs_reader.flowlogs_reader import (
Expand Down Expand Up @@ -99,6 +102,65 @@
'subnet-0123456789abcdef 7 7 IPv4 5 vpc-04456ab739938ee3f\n'
'bogus line\n'
)

V5_DICTS = [
{
'account_id': '999999999999',
'action': 'ACCEPT',
'az_id': 'use2-az2',
'bytes': 4895,
'dstaddr': '192.0.2.156',
'dstport': 50318,
'end': datetime(2021, 3, 4, 14, 1, 51),
'flow_direction': 'ingress',
'instance_id': 'i-00123456789abcdef',
'interface_id': 'eni-00123456789abcdef',
'log_status': 'OK',
'packets': 15,
'pkt_dstaddr': '192.0.2.156',
'pkt_src_aws_service': 'S3',
'pkt_srcaddr': '198.51.100.6',
'protocol': 6,
'region': 'us-east-2',
'srcaddr': '198.51.100.7',
'srcport': 443,
'start': datetime(2021, 3, 4, 14, 1, 33),
'subnet_id': 'subnet-0123456789abcdef',
'tcp_flags': 19,
'type': 'IPv4',
'version': 5,
'vpc_id': 'vpc-04456ab739938ee3f',
},
{
'account_id': '999999999999',
'action': 'ACCEPT',
'az_id': 'use2-az2',
'bytes': 3015,
'dstaddr': '198.51.100.6',
'dstport': 443,
'end': datetime(2021, 3, 4, 14, 1, 51),
'flow_direction': 'egress',
'instance_id': 'i-00123456789abcdef',
'interface_id': 'eni-00123456789abcdef',
'log_status': 'OK',
'packets': 16,
'pkt_dst_aws_service': 'S3',
'pkt_dstaddr': '198.51.100.7',
'pkt_srcaddr': '192.0.2.156',
'protocol': 6,
'region': 'us-east-2',
'srcaddr': '192.0.2.156',
'srcport': 50318,
'start': datetime(2021, 3, 4, 14, 1, 33),
'subnet_id': 'subnet-0123456789abcdef',
'tcp_flags': 7,
'traffic_path': 7,
'type': 'IPv4',
'version': 5,
'vpc_id': 'vpc-04456ab739938ee3f',
},
]

V6_FILE = (
'resource-type tgw-id tgw-attachment-id tgw-src-vpc-account-id '
'tgw-dst-vpc-account-id tgw-src-vpc-id tgw-dst-vpc-id tgw-src-subnet-id '
Expand Down Expand Up @@ -709,64 +771,7 @@ def test_serial_v4(self):
)

def test_serial_v5(self):
expected = [
{
'account_id': '999999999999',
'action': 'ACCEPT',
'az_id': 'use2-az2',
'bytes': 4895,
'dstaddr': '192.0.2.156',
'dstport': 50318,
'end': datetime(2021, 3, 4, 14, 1, 51),
'flow_direction': 'ingress',
'instance_id': 'i-00123456789abcdef',
'interface_id': 'eni-00123456789abcdef',
'log_status': 'OK',
'packets': 15,
'pkt_dstaddr': '192.0.2.156',
'pkt_src_aws_service': 'S3',
'pkt_srcaddr': '198.51.100.6',
'protocol': 6,
'region': 'us-east-2',
'srcaddr': '198.51.100.7',
'srcport': 443,
'start': datetime(2021, 3, 4, 14, 1, 33),
'subnet_id': 'subnet-0123456789abcdef',
'tcp_flags': 19,
'type': 'IPv4',
'version': 5,
'vpc_id': 'vpc-04456ab739938ee3f',
},
{
'account_id': '999999999999',
'action': 'ACCEPT',
'az_id': 'use2-az2',
'bytes': 3015,
'dstaddr': '198.51.100.6',
'dstport': 443,
'end': datetime(2021, 3, 4, 14, 1, 51),
'flow_direction': 'egress',
'instance_id': 'i-00123456789abcdef',
'interface_id': 'eni-00123456789abcdef',
'log_status': 'OK',
'packets': 16,
'pkt_dst_aws_service': 'S3',
'pkt_dstaddr': '198.51.100.7',
'pkt_srcaddr': '192.0.2.156',
'protocol': 6,
'region': 'us-east-2',
'srcaddr': '192.0.2.156',
'srcport': 50318,
'start': datetime(2021, 3, 4, 14, 1, 33),
'subnet_id': 'subnet-0123456789abcdef',
'tcp_flags': 7,
'traffic_path': 7,
'type': 'IPv4',
'version': 5,
'vpc_id': 'vpc-04456ab739938ee3f',
},
]
reader = self._test_iteration(V5_FILE, expected)
reader = self._test_iteration(V5_FILE, V5_DICTS)
self.assertEqual(reader.bytes_processed, len(V5_FILE.encode()))
self.assertEqual(
reader.compressed_bytes_processed, len(compress(V5_FILE.encode()))
Expand Down Expand Up @@ -993,6 +998,34 @@ def test_threads(self):
self._test_iteration(V3_FILE, expected)


class LocalFileReaderTests(TestCase):
def setUp(self):
self.temp_dir = TemporaryDirectory()
self.first_file = join(self.temp_dir.name, 'first.log.gz')
self.second_file = join(self.temp_dir.name, 'second.log.gz')
# Ignored: wrong extension
self.third_file = join(self.temp_dir.name, 'third.gz')

for file_path in (self.first_file, self.second_file, self.third_file):
with gz_open(file_path, 'wt') as f:
f.write(V5_FILE)

def tearDown(self):
self.temp_dir.cleanup()

def test_file(self):
reader = LocalFileReader(self.first_file)
actual = [x.to_dict() for x in reader]
self.assertEqual(actual, V5_DICTS)
self.assertEqual(reader.bytes_processed, len(V5_FILE.encode()))

def test_dir(self):
reader = LocalFileReader(self.temp_dir.name)
actual = [x.to_dict() for x in reader]
self.assertEqual(actual, V5_DICTS * 2)
self.assertEqual(reader.bytes_processed, len(V5_FILE.encode()) * 2)


class AggregationTestCase(TestCase):
def test_aggregated_records(self):
# Aggregate by 5-tuple by default
Expand Down
12 changes: 12 additions & 0 deletions tests/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,3 +342,15 @@ def test_s3_destination(self, mock_out, mock_reader):
__, args, kwargs = call
line = args[0]
self.assertEqual(line, record)

@patch('flowlogs_reader.__main__.LocalFileReader', autospec=True)
@patch('flowlogs_reader.__main__.print', create=True)
def test_file_destination(self, mock_out, mock_reader):
mock_out.stdout = io.BytesIO()
mock_reader.return_value = SAMPLE_RECORDS
main(['--location-type', 'file', '/tmp/test-file.csv.gz'])
mock_reader.assert_called_once_with(location='/tmp/test-file.csv.gz')
for call, record in zip_longest(mock_out.mock_calls, SAMPLE_INPUT):
__, args, kwargs = call
line = args[0]
self.assertEqual(line, record)