Skip to content

Commit

Permalink
Merge pull request #513 from Nextdoor/feature/s3_notification_configu…
Browse files Browse the repository at this point in the history
…ration

Adding event configuration for S3 bucket
  • Loading branch information
kavinnd authored Jul 15, 2021
2 parents 36912c5 + 0195efe commit a3ce328
Show file tree
Hide file tree
Showing 2 changed files with 321 additions and 0 deletions.
128 changes: 128 additions & 0 deletions kingpin/actors/aws/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,47 @@ class LifecycleConfig(SchemaCompareBase):
}


class NotificationConfiguration(SchemaCompareBase):

"""Provides JSON-Schema based validation of the supplied Notification Config.
.. code-block:: json
{
"queue_configurations": [
{
"queue_arn": "ARN of the SQS queue",
"events": ["s3:ObjectCreated:*"],
}
]
}
"""

SCHEMA = {
'type': ['object', 'null'],
'required': ['queue_configurations'],
'properties': {
'queue_configurations': {
'type': ['array'],
'items': {
'type': 'object',
'additionalProperties': False,
'required': ['queue_arn', 'events'],
'properties': {
'queue_arn': {
'type': 'string'
},
'events': {
'type': 'array',
'items': {'type': 'string'}
}
}
}
}
}
}


class TaggingConfig(SchemaCompareBase):

"""Provides JSON-Schema based validation of the supplied tagging config.
Expand Down Expand Up @@ -423,6 +464,7 @@ class Bucket(base.EnsurableAWSBaseActor):
* Enable or Suspend Bucket Versioning.
Note: It is impossible to actually _disable_ bucket versioning -- once
it is enabled, you can only suspend it, or re-enable it.
* Enable Event Notification. (limited to SQS for now)
**Note about Buckets with Files**
Expand Down Expand Up @@ -506,6 +548,19 @@ class Bucket(base.EnsurableAWSBaseActor):
(bool, None): Whether or not to enable Versioning on the bucket. If
"None", then we don't manage versioning either way. Default: None
:notification_configuration:
(:py:class:`NotificationConfiguration`, None)
If a dictionary is supplised, then it must conform to
:py:class:`NotificationConfiguration`, type and include mapping
queuearn & events
If an empty dictionary is supplied, then Kingpin will explicitly remove
any Notification Configuration from the bucket.
Finally, If None is supplies, Kingoin will ignore the checks entire on
this portion of the bucket configuration
**Examples**
.. code-block:: json
Expand Down Expand Up @@ -534,6 +589,17 @@ class Bucket(base.EnsurableAWSBaseActor):
{"key": "my_key", "value": "billing-grp-1"},
],
"versioning": true,
"notification_configuration": {
"queue_configurations": [
{
"queue_arn": "arn:aws:sqs:us-east-1:1234567:some_sqs",
"events": [
"s3:ObjectCreated:*",
"s3:ObjectRemoved*"
]
}
]
}
}
}
Expand Down Expand Up @@ -570,6 +636,7 @@ class Bucket(base.EnsurableAWSBaseActor):
'versioning': ((bool, None), None,
('Desired state of versioning on the bucket: '
'true/false')),
'notification_configuration': (NotificationConfiguration, None, '')
}

unmanaged_options = ['name', 'region']
Expand All @@ -595,6 +662,14 @@ def __init__(self, *args, **kwargs):
if self.access_block is not None:
self.access_block = self._snake_to_camel(self.access_block)

# If the NotificationConfiguration is anything but None, we parse
# it and pre-build the rules.
self.notification_configuration = \
self.option('notification_configuration')
if self.notification_configuration is not None:
self.notification_configuration = \
self._snake_to_camel(self.notification_configuration)

# Start out assuming the bucket doesn't exist. The _precache() method
# will populate this with True if the bucket does exist.
self._bucket_exists = False
Expand Down Expand Up @@ -1133,3 +1208,56 @@ def _set_tags(self):
self.s3_conn.put_bucket_tagging,
Bucket=self.option('name'),
Tagging={'TagSet': tagset})

@gen.coroutine
def _get_notification_configuration(self):
if self.notification_configuration is None:
raise gen.Return(None)

if not self._bucket_exists:
raise gen.Return(None)

raw = yield self.api_call(
self.s3_conn.get_bucket_notification_configuration,
Bucket=self.option('name'))

existing_configurations = {}
for configuration in ['TopicConfigurations',
'QueueConfigurations',
'LambdaFunctionConfigurations']:
if configuration in raw:
existing_configurations[configuration] = raw[configuration]
raise gen.Return(existing_configurations)

@gen.coroutine
def _compare_notification_configuration(self):
new = self.notification_configuration
if new is None:
self.log.debug('No Notification Configuration')
raise gen.Return(True)

exist = yield self._get_notification_configuration()
diff = utils.diff_dicts(exist, new)

if not diff:
self.log.debug('Notification Configurations match')
raise gen.Return(True)

self.log.info('Bucket Notification Configuration differs:')
for line in diff.split('\n'):
self.log.info('Diff: %s' % line)

raise gen.Return(False)

@gen.coroutine
@dry('Would have added notification configurations')
def _set_notification_configuration(self):
if self.notification_configuration is None:
self.log.debug('No Notification Configurations')
raise gen.Return(None)

self.log.info('Updating Bucket Notification Configuration')
yield self.api_call(
self.s3_conn.put_bucket_notification_configuration,
Bucket=self.option('name'),
NotificationConfiguration=self.notification_configuration)
193 changes: 193 additions & 0 deletions kingpin/actors/aws/test/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ def setUp(self):
},
'versioning': False,
'tags': [],
'notification_configuration': {
'queue_configurations': []
}
})
self.actor.s3_conn = mock.MagicMock()

Expand Down Expand Up @@ -643,3 +646,193 @@ def test_set_tags(self):
Tagging={'TagSet': [
{'Key': 'tag1', 'Value': 'v1'}
]})])

@testing.gen_test
def test_snake_to_camelcase_for_notification_configuration(self):
notification_configuration_snake_case = {
"queue_configurations": [{
"queue_arn":
"arn:aws:sqs:us-east-1:1234567:test_sqs",
"events": ["s3:ObjectCreated:*"]
}]
}

notification_configuration_camel_case = \
self.actor._snake_to_camel(notification_configuration_snake_case)

self.assertEqual(notification_configuration_camel_case,
{"QueueConfigurations": [{
"QueueArn":
"arn:aws:sqs:us-east-1:1234567:test_sqs",
"Events": ["s3:ObjectCreated:*"]
}]})

@testing.gen_test
def test_set_notification_configurations_none(self):
self.actor.notification_configuration = None
yield self.actor._set_notification_configuration()
self.assertFalse(
self.actor
.s3_conn
.put_bucket_notification_configuration.called
)

@testing.gen_test
def test_set_notification_configurations_with_valid_configs(self):
self.actor.notification_configuration = {
"QueueConfigurations": [{
"QueueArn": "arn:aws:sqs:us-east-1:1234567:test_sqs",
"Events": ["s3:ObjectCreated:*"]
}]
}
yield self.actor._set_notification_configuration()
self.actor\
.s3_conn\
.put_bucket_notification_configuration\
.assert_has_calls([
mock.call(
Bucket='test',
NotificationConfiguration={
"QueueConfigurations": [{
"QueueArn":
"arn:aws:sqs:us-east-1:1234567:test_sqs",
"Events": ["s3:ObjectCreated:*"]
}]
}
)
])

@testing.gen_test
def test_set_notification_configurations_with_multiple_configs(self):
self.actor.notification_configuration = {
"QueueConfigurations": [
{
"QueueArn": "arn:aws:sqs:us-east-1:1:test1_sqs",
"Events": ["s3:ObjectCreated:*"]
},
{
"QueueArn": "arn:aws:sqs:us-east-1:1:test2_sqs",
"Events": ["s3:ObjectCreated:*", "s3:ObjectRemoved:*"]
}
]
}
yield self.actor._set_notification_configuration()
self.actor \
.s3_conn \
.put_bucket_notification_configuration \
.assert_has_calls([mock.call(
Bucket='test',
NotificationConfiguration={
"QueueConfigurations": [
{
"QueueArn": "arn:aws:sqs:us-east-1:1:test1_sqs",
"Events": ["s3:ObjectCreated:*"]
},
{
"QueueArn": "arn:aws:sqs:us-east-1:1:test2_sqs",
"Events": ["s3:ObjectCreated:*",
"s3:ObjectRemoved:*"]
}
]
}
)])

@testing.gen_test
def test_set_notification_configurations_no_configs(self):
self.actor.notification_configuration = {
"QueueConfigurations": []
}
yield self.actor._set_notification_configuration()
self.actor.s3_conn\
.put_bucket_notification_configuration\
.assert_has_calls([])

@testing.gen_test
def test_set_notification_configurations_empty_queue_configs(self):
self.actor.notification_configuration = {}
yield self.actor._set_notification_configuration()
self.actor\
.s3_conn\
.put_bucket_notification_configuration\
.assert_has_calls([])

@testing.gen_test
def test_get_notification_configuration_with_existing_queueconfigs(self):
self.actor._bucket_exists = True
self.actor.s3_conn.get_bucket_notification_configuration\
.return_value = {
"QueueConfigurations": [{
"QueueArn": "arn:aws:sqs",
"Events": ["s3:ObjectCreated:*"]
}]
}
ret = yield self.actor._get_notification_configuration()
self.assertEqual(type(ret), dict)
self.assertEqual(ret,
{'QueueConfigurations': [
{
"QueueArn": "arn:aws:sqs",
"Events": ["s3:ObjectCreated:*"]
}
]})

@testing.gen_test
def test_get_notification_configuration_no_bucket(self):
self.actor._bucket_exists = False
ret = yield self.actor._get_notification_configuration()
self.assertEqual(ret, None)

@testing.gen_test
def test_get_notification_configuration_with_no_config(self):
self.actor._bucket_exists = True
self.actor.notification_configuration = None
ret = yield self.actor._get_notification_configuration()
self.assertEqual(ret, None)

@testing.gen_test
def test_compare_notification_configuration_with_new_config(self):
self.actor.notification_configuration = {
"QueueConfigurations": [{
"QueueArn":
"arn:aws:sqs:us-east-1:1234567:test_sqs",
"Events": ["s3:ObjectCreated:*"]
}]
}
self.actor\
.s3_conn\
.get_bucket_notification_configuration\
.return_value = {}
ret = yield self.actor._compare_notification_configuration()
self.assertFalse(ret)

@testing.gen_test
def test_compare_notification_configuration_with_no_updated_config(self):
self.actor._bucket_exists = True
self.actor.notification_configuration = {
"QueueConfigurations": [{
"QueueArn":
"arn:aws:sqs:us-east-1:1234567:test_sqs",
"Events": ["s3:ObjectCreated:*"]
}]
}
self.actor\
.s3_conn\
.get_bucket_notification_configuration\
.return_value = {
"QueueConfigurations": [{
"QueueArn": "arn:aws:sqs:us-east-1:1234567:test_sqs",
"Events": ["s3:ObjectCreated:*"]
}]
}
ret = yield self.actor._compare_notification_configuration()
self.assertTrue(ret)

@testing.gen_test
def test_compare_notification_configuration_with_no_config(self):
self.actor.notification_configuration = None
self.actor\
.s3_conn\
.get_bucket_notification_configuration\
.return_value = {}
ret = yield self.actor._compare_notification_configuration()
self.assertTrue(ret)

0 comments on commit a3ce328

Please sign in to comment.