Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ However, it's now far easier to use Route 53-based DNS authentication, which wil

## Executing in Response to AWS Events

Similarly, you can have your functions execute in response to events that happen in the AWS ecosystem, such as S3 uploads, DynamoDB entries, Kinesis streams, and SNS messages.
Similarly, you can have your functions execute in response to events that happen in the AWS ecosystem, such as S3 uploads, DynamoDB entries, Kinesis streams, SNS messages, and SQS queues.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to update the default IAM stuff to include SQS perms?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, it's a push event.


In your *zappa_settings.json* file, define your [event sources](http://docs.aws.amazon.com/lambda/latest/dg/invoking-lambda-function.html) and the function you wish to execute. For instance, this will execute `your_module.process_upload_function` in response to new objects in your `my-bucket` S3 bucket. Note that `process_upload_function` must accept `event` and `context` parameters.

Expand Down Expand Up @@ -556,6 +556,21 @@ Optionally you can add [SNS message filters](http://docs.aws.amazon.com/sns/late
]
```

[SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html) is also pulling messages from a stream. At this time, [only "Standard" queues can trigger lambda events, not "FIFO" queues](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html). Read the AWS Documentation carefully since Lambda calls the SQS DeleteMessage API on your behalf once your function completes successfully.

```javascript
"events": [
{
"function": "your_module.process_messages",
"event_source": {
"arn": "arn:aws:sqs:us-east-1:12341234:your-queue-name-arn",
"batch_size": 10, // Max: 10. Use 1 to trigger immediate processing
"enabled": true // Default is false
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this needed? We don't have an enabled field for other event sources. If they are present, they are enabled.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably isn't going to test without it and clean up if it works.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strangely, none of the event sources have the 'enabled' knob built into the AWS configuration. See 'Enabled' parameter at end of documentation: https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html

I suspect it's probably some sort of throttling mechanism that typical SQS event sinks have control over. Agree that it's a little wonky, but I would recommend to leave it.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mcrowson enabled is false by default on push events - we also have the same behaviour for the other push events (dynamodb and kinesis). Without enabled you need to turn it on in the console, which makes some sense to me if you want to control/test when a consumer is "consuming" information from a queue.

}
}
]
```

For configuring Lex Bot's intent triggered events:
```javascript
"bot_events": [
Expand Down
8 changes: 7 additions & 1 deletion test_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
'arn:aws:s3:1': 'test_settings.aws_s3_event',
'arn:aws:sns:1': 'test_settings.aws_sns_event',
'arn:aws:dynamodb:1': 'test_settings.aws_dynamodb_event',
'arn:aws:kinesis:1': 'test_settings.aws_kinesis_event'
'arn:aws:kinesis:1': 'test_settings.aws_kinesis_event',
'arn:aws:sqs:1': 'test_settings.aws_sqs_event'
}

ENVIRONMENT_VARIABLES={'testenv': 'envtest'}
Expand Down Expand Up @@ -54,9 +55,14 @@ def aws_kinesis_event(event, content):
return "AWS KINESIS EVENT"


def aws_sqs_event(event, content):
return "AWS SQS EVENT"


def authorizer_event(event, content):
return "AUTHORIZER_EVENT"


def command():
print("command")

23 changes: 23 additions & 0 deletions tests/tests_placebo.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,29 @@ def test_handler(self, session):
}
self.assertEqual("AWS KINESIS EVENT", lh.handler(event, None))

# Test AWS SQS event
event = {
u"Records": [
{
u"messageId": u"c80e8021-a70a-42c7-a470-796e1186f753",
u"receiptHandle": u"AQEBJQ+/u6NsnT5t8Q/VbVxgdUl4TMKZ5FqhksRdIQvLBhwNvADoBxYSOVeCBXdnS9P+erlTtwEALHsnBXynkfPLH3BOUqmgzP25U8kl8eHzq6RAlzrSOfTO8ox9dcp6GLmW33YjO3zkq5VRYyQlJgLCiAZUpY2D4UQcE5D1Vm8RoKfbE+xtVaOctYeINjaQJ1u3mWx9T7tork3uAlOe1uyFjCWU5aPX/1OHhWCGi2EPPZj6vchNqDOJC/Y2k1gkivqCjz1CZl6FlZ7UVPOx3AMoszPuOYZ+Nuqpx2uCE2MHTtMHD8PVjlsWirt56oUr6JPp9aRGo6bitPIOmi4dX0FmuMKD6u/JnuZCp+AXtJVTmSHS8IXt/twsKU7A+fiMK01NtD5msNgVPoe9JbFtlGwvTQ==",
u"body": u"{\"foo\":\"bar\"}",
u"attributes": {
u"ApproximateReceiveCount": u"3",
u"SentTimestamp": u"1529104986221",
u"SenderId": u"594035263019",
u"ApproximateFirstReceiveTimestamp": u"1529104986230"
},
u"messageAttributes": {},
u"md5OfBody": u"9bb58f26192e4ba00f01e2e7b136bbd8",
u"eventSource": u"aws:sqs",
u"eventSourceARN": u"arn:aws:sqs:1",
u"awsRegion": u"us-east-1"
}
]
}
self.assertEqual("AWS SQS EVENT", lh.handler(event, None))

# Test Authorizer event
event = {u'authorizationToken': u'hubtoken1', u'methodArn': u'arn:aws:execute-api:us-west-2:1234:xxxxx/dev/GET/v1/endpoint/param', u'type': u'TOKEN'}
self.assertEqual("AUTHORIZER_EVENT", lh.handler(event, None))
Expand Down
9 changes: 6 additions & 3 deletions zappa/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2364,10 +2364,10 @@ def schedule_events(self, lambda_arn, lambda_name, events, default=True):
http://docs.aws.amazon.com/lambda/latest/dg/tutorial-scheduled-events-schedule-expressions.html
"""

# The two stream sources - DynamoDB and Kinesis - are working differently than the other services (pull vs push)
# The stream sources - DynamoDB, Kinesis and SQS - are working differently than the other services (pull vs push)
# and do not require event permissions. They do require additional permissions on the Lambda roles though.
# http://docs.aws.amazon.com/lambda/latest/dg/lambda-api-permissions-ref.html
pull_services = ['dynamodb', 'kinesis']
pull_services = ['dynamodb', 'kinesis', 'sqs']

# XXX: Not available in Lambda yet.
# We probably want to execute the latest code.
Expand Down Expand Up @@ -2612,7 +2612,10 @@ def unschedule_events(self, events, lambda_arn=None, lambda_name=None, excluded_
function,
self.boto_session
)
print("Removed event " + name + " (" + str(event_source['events']) + ").")
print("Removed event {}{}.".format(
name,
" ({})".format(str(event_source['events'])) if 'events' in event_source else '')
)

###
# Async / SNS
Expand Down
4 changes: 3 additions & 1 deletion zappa/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ def get_function_for_aws_event(self, record):
"""
Get the associated function to execute for a triggered AWS event

Support S3, SNS, DynamoDB and kinesis events
Support S3, SNS, DynamoDB, kinesis and SQS events
"""
if 's3' in record:
if ':' in record['s3']['configurationId']:
Expand All @@ -303,6 +303,8 @@ def get_function_for_aws_event(self, record):
arn = record['Sns'].get('TopicArn')
elif 'dynamodb' in record or 'kinesis' in record:
arn = record.get('eventSourceARN')
elif 'eventSource' in record and record.get('eventSource') == 'aws:sqs':
arn = record.get('eventSourceARN')
elif 's3' in record:
arn = record['s3']['bucket']['arn']

Expand Down
104 changes: 103 additions & 1 deletion zappa/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import fnmatch
import io
import json
import logging
import os
import re
import shutil
Expand All @@ -17,6 +18,8 @@
else:
from urllib.parse import urlparse

LOG = logging.getLogger(__name__)

##
# Settings / Packaging
##
Expand Down Expand Up @@ -199,6 +202,7 @@ def get_event_source(event_source, lambda_arn, target_function, boto_session, dr
"""
import kappa.function
import kappa.restapi
import kappa.event_source.base
import kappa.event_source.dynamodb_stream
import kappa.event_source.kinesis
import kappa.event_source.s3
Expand All @@ -216,6 +220,103 @@ class PseudoFunction(object):
def __init__(self):
return

# Mostly adapted from kappa - will probably be replaced by kappa support
class SqsEventSource(kappa.event_source.base.EventSource):

def __init__(self, context, config):
super(SqsEventSource, self).__init__(context, config)
self._lambda = kappa.awsclient.create_client(
'lambda', context.session)

def _get_uuid(self, function):
uuid = None
response = self._lambda.call(
'list_event_source_mappings',
FunctionName=function.name,
EventSourceArn=self.arn)
LOG.debug(response)
if len(response['EventSourceMappings']) > 0:
uuid = response['EventSourceMappings'][0]['UUID']
return uuid

def add(self, function):
try:
response = self._lambda.call(
'create_event_source_mapping',
FunctionName=function.name,
EventSourceArn=self.arn,
BatchSize=self.batch_size,
Enabled=self.enabled
)
LOG.debug(response)
except Exception:
LOG.exception('Unable to add event source')

def enable(self, function):
self._config['enabled'] = True
try:
response = self._lambda.call(
'update_event_source_mapping',
UUID=self._get_uuid(function),
Enabled=self.enabled
)
LOG.debug(response)
except Exception:
LOG.exception('Unable to enable event source')

def disable(self, function):
self._config['enabled'] = False
try:
response = self._lambda.call(
'update_event_source_mapping',
FunctionName=function.name,
Enabled=self.enabled
)
LOG.debug(response)
except Exception:
LOG.exception('Unable to disable event source')

def update(self, function):
response = None
uuid = self._get_uuid(function)
if uuid:
try:
response = self._lambda.call(
'update_event_source_mapping',
BatchSize=self.batch_size,
Enabled=self.enabled,
FunctionName=function.arn)
LOG.debug(response)
except Exception:
LOG.exception('Unable to update event source')

def remove(self, function):
response = None
uuid = self._get_uuid(function)
if uuid:
response = self._lambda.call(
'delete_event_source_mapping',
UUID=uuid)
LOG.debug(response)
return response

def status(self, function):
response = None
LOG.debug('getting status for event source %s', self.arn)
uuid = self._get_uuid(function)
if uuid:
try:
response = self._lambda.call(
'get_event_source_mapping',
UUID=self._get_uuid(function))
LOG.debug(response)
except botocore.exceptions.ClientError:
LOG.debug('event source %s does not exist', self.arn)
response = None
else:
LOG.debug('No UUID for event source %s', self.arn)
return response

class ExtendedSnsEventSource(kappa.event_source.sns.SNSEventSource):
@property
def filters(self):
Expand Down Expand Up @@ -245,6 +346,7 @@ def add(self, function):
'kinesis': kappa.event_source.kinesis.KinesisEventSource,
's3': kappa.event_source.s3.S3EventSource,
'sns': ExtendedSnsEventSource,
'sqs': SqsEventSource,
'events': kappa.event_source.cloudwatch.CloudWatchEventSource
}

Expand Down Expand Up @@ -421,4 +523,4 @@ def titlecase_keys(d):
"""
Takes a dict with keys of type str and returns a new dict with all keys titlecased.
"""
return {k.title(): v for k, v in d.items()}
return {k.title(): v for k, v in d.items()}