diff --git a/README.md b/README.md index e5c5e962c..f51b979b3 100644 --- a/README.md +++ b/README.md @@ -458,7 +458,7 @@ However, it's now far easier to use Route 53-based DNS authentication, which wil ## Executing in Response to AWS Events -Similarly, you can have your functions execute in response to events that happen in the AWS ecosystem, such as S3 uploads, DynamoDB entries, Kinesis streams, and SNS messages. +Similarly, you can have your functions execute in response to events that happen in the AWS ecosystem, such as S3 uploads, DynamoDB entries, Kinesis streams, SNS messages, and SQS queues. In your *zappa_settings.json* file, define your [event sources](http://docs.aws.amazon.com/lambda/latest/dg/invoking-lambda-function.html) and the function you wish to execute. For instance, this will execute `your_module.process_upload_function` in response to new objects in your `my-bucket` S3 bucket. Note that `process_upload_function` must accept `event` and `context` parameters. @@ -556,6 +556,21 @@ Optionally you can add [SNS message filters](http://docs.aws.amazon.com/sns/late ] ``` +[SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html) is also pulling messages from a stream. At this time, [only "Standard" queues can trigger lambda events, not "FIFO" queues](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html). Read the AWS Documentation carefully since Lambda calls the SQS DeleteMessage API on your behalf once your function completes successfully. + +```javascript + "events": [ + { + "function": "your_module.process_messages", + "event_source": { + "arn": "arn:aws:sqs:us-east-1:12341234:your-queue-name-arn", + "batch_size": 10, // Max: 10. Use 1 to trigger immediate processing + "enabled": true // Default is false + } + } + ] +``` + For configuring Lex Bot's intent triggered events: ```javascript "bot_events": [ diff --git a/test_settings.py b/test_settings.py index eb9c68915..8aa104943 100644 --- a/test_settings.py +++ b/test_settings.py @@ -19,7 +19,8 @@ 'arn:aws:s3:1': 'test_settings.aws_s3_event', 'arn:aws:sns:1': 'test_settings.aws_sns_event', 'arn:aws:dynamodb:1': 'test_settings.aws_dynamodb_event', - 'arn:aws:kinesis:1': 'test_settings.aws_kinesis_event' + 'arn:aws:kinesis:1': 'test_settings.aws_kinesis_event', + 'arn:aws:sqs:1': 'test_settings.aws_sqs_event' } ENVIRONMENT_VARIABLES={'testenv': 'envtest'} @@ -54,9 +55,14 @@ def aws_kinesis_event(event, content): return "AWS KINESIS EVENT" +def aws_sqs_event(event, content): + return "AWS SQS EVENT" + + def authorizer_event(event, content): return "AUTHORIZER_EVENT" def command(): print("command") + diff --git a/tests/tests_placebo.py b/tests/tests_placebo.py index 1bea096e7..6ac4c5425 100644 --- a/tests/tests_placebo.py +++ b/tests/tests_placebo.py @@ -376,6 +376,29 @@ def test_handler(self, session): } self.assertEqual("AWS KINESIS EVENT", lh.handler(event, None)) + # Test AWS SQS event + event = { + u"Records": [ + { + u"messageId": u"c80e8021-a70a-42c7-a470-796e1186f753", + u"receiptHandle": u"AQEBJQ+/u6NsnT5t8Q/VbVxgdUl4TMKZ5FqhksRdIQvLBhwNvADoBxYSOVeCBXdnS9P+erlTtwEALHsnBXynkfPLH3BOUqmgzP25U8kl8eHzq6RAlzrSOfTO8ox9dcp6GLmW33YjO3zkq5VRYyQlJgLCiAZUpY2D4UQcE5D1Vm8RoKfbE+xtVaOctYeINjaQJ1u3mWx9T7tork3uAlOe1uyFjCWU5aPX/1OHhWCGi2EPPZj6vchNqDOJC/Y2k1gkivqCjz1CZl6FlZ7UVPOx3AMoszPuOYZ+Nuqpx2uCE2MHTtMHD8PVjlsWirt56oUr6JPp9aRGo6bitPIOmi4dX0FmuMKD6u/JnuZCp+AXtJVTmSHS8IXt/twsKU7A+fiMK01NtD5msNgVPoe9JbFtlGwvTQ==", + u"body": u"{\"foo\":\"bar\"}", + u"attributes": { + u"ApproximateReceiveCount": u"3", + u"SentTimestamp": u"1529104986221", + u"SenderId": u"594035263019", + u"ApproximateFirstReceiveTimestamp": u"1529104986230" + }, + u"messageAttributes": {}, + u"md5OfBody": u"9bb58f26192e4ba00f01e2e7b136bbd8", + u"eventSource": u"aws:sqs", + u"eventSourceARN": u"arn:aws:sqs:1", + u"awsRegion": u"us-east-1" + } + ] + } + self.assertEqual("AWS SQS EVENT", lh.handler(event, None)) + # Test Authorizer event event = {u'authorizationToken': u'hubtoken1', u'methodArn': u'arn:aws:execute-api:us-west-2:1234:xxxxx/dev/GET/v1/endpoint/param', u'type': u'TOKEN'} self.assertEqual("AUTHORIZER_EVENT", lh.handler(event, None)) diff --git a/zappa/core.py b/zappa/core.py index 2fdb556b6..c0c92d6b9 100644 --- a/zappa/core.py +++ b/zappa/core.py @@ -2364,10 +2364,10 @@ def schedule_events(self, lambda_arn, lambda_name, events, default=True): http://docs.aws.amazon.com/lambda/latest/dg/tutorial-scheduled-events-schedule-expressions.html """ - # The two stream sources - DynamoDB and Kinesis - are working differently than the other services (pull vs push) + # The stream sources - DynamoDB, Kinesis and SQS - are working differently than the other services (pull vs push) # and do not require event permissions. They do require additional permissions on the Lambda roles though. # http://docs.aws.amazon.com/lambda/latest/dg/lambda-api-permissions-ref.html - pull_services = ['dynamodb', 'kinesis'] + pull_services = ['dynamodb', 'kinesis', 'sqs'] # XXX: Not available in Lambda yet. # We probably want to execute the latest code. @@ -2612,7 +2612,10 @@ def unschedule_events(self, events, lambda_arn=None, lambda_name=None, excluded_ function, self.boto_session ) - print("Removed event " + name + " (" + str(event_source['events']) + ").") + print("Removed event {}{}.".format( + name, + " ({})".format(str(event_source['events'])) if 'events' in event_source else '') + ) ### # Async / SNS diff --git a/zappa/handler.py b/zappa/handler.py index bcf6a588a..c2e9febd5 100644 --- a/zappa/handler.py +++ b/zappa/handler.py @@ -286,7 +286,7 @@ def get_function_for_aws_event(self, record): """ Get the associated function to execute for a triggered AWS event - Support S3, SNS, DynamoDB and kinesis events + Support S3, SNS, DynamoDB, kinesis and SQS events """ if 's3' in record: if ':' in record['s3']['configurationId']: @@ -303,6 +303,8 @@ def get_function_for_aws_event(self, record): arn = record['Sns'].get('TopicArn') elif 'dynamodb' in record or 'kinesis' in record: arn = record.get('eventSourceARN') + elif 'eventSource' in record and record.get('eventSource') == 'aws:sqs': + arn = record.get('eventSourceARN') elif 's3' in record: arn = record['s3']['bucket']['arn'] diff --git a/zappa/utilities.py b/zappa/utilities.py index e13188c42..6e416bbcf 100644 --- a/zappa/utilities.py +++ b/zappa/utilities.py @@ -4,6 +4,7 @@ import fnmatch import io import json +import logging import os import re import shutil @@ -17,6 +18,8 @@ else: from urllib.parse import urlparse +LOG = logging.getLogger(__name__) + ## # Settings / Packaging ## @@ -199,6 +202,7 @@ def get_event_source(event_source, lambda_arn, target_function, boto_session, dr """ import kappa.function import kappa.restapi + import kappa.event_source.base import kappa.event_source.dynamodb_stream import kappa.event_source.kinesis import kappa.event_source.s3 @@ -216,6 +220,103 @@ class PseudoFunction(object): def __init__(self): return + # Mostly adapted from kappa - will probably be replaced by kappa support + class SqsEventSource(kappa.event_source.base.EventSource): + + def __init__(self, context, config): + super(SqsEventSource, self).__init__(context, config) + self._lambda = kappa.awsclient.create_client( + 'lambda', context.session) + + def _get_uuid(self, function): + uuid = None + response = self._lambda.call( + 'list_event_source_mappings', + FunctionName=function.name, + EventSourceArn=self.arn) + LOG.debug(response) + if len(response['EventSourceMappings']) > 0: + uuid = response['EventSourceMappings'][0]['UUID'] + return uuid + + def add(self, function): + try: + response = self._lambda.call( + 'create_event_source_mapping', + FunctionName=function.name, + EventSourceArn=self.arn, + BatchSize=self.batch_size, + Enabled=self.enabled + ) + LOG.debug(response) + except Exception: + LOG.exception('Unable to add event source') + + def enable(self, function): + self._config['enabled'] = True + try: + response = self._lambda.call( + 'update_event_source_mapping', + UUID=self._get_uuid(function), + Enabled=self.enabled + ) + LOG.debug(response) + except Exception: + LOG.exception('Unable to enable event source') + + def disable(self, function): + self._config['enabled'] = False + try: + response = self._lambda.call( + 'update_event_source_mapping', + FunctionName=function.name, + Enabled=self.enabled + ) + LOG.debug(response) + except Exception: + LOG.exception('Unable to disable event source') + + def update(self, function): + response = None + uuid = self._get_uuid(function) + if uuid: + try: + response = self._lambda.call( + 'update_event_source_mapping', + BatchSize=self.batch_size, + Enabled=self.enabled, + FunctionName=function.arn) + LOG.debug(response) + except Exception: + LOG.exception('Unable to update event source') + + def remove(self, function): + response = None + uuid = self._get_uuid(function) + if uuid: + response = self._lambda.call( + 'delete_event_source_mapping', + UUID=uuid) + LOG.debug(response) + return response + + def status(self, function): + response = None + LOG.debug('getting status for event source %s', self.arn) + uuid = self._get_uuid(function) + if uuid: + try: + response = self._lambda.call( + 'get_event_source_mapping', + UUID=self._get_uuid(function)) + LOG.debug(response) + except botocore.exceptions.ClientError: + LOG.debug('event source %s does not exist', self.arn) + response = None + else: + LOG.debug('No UUID for event source %s', self.arn) + return response + class ExtendedSnsEventSource(kappa.event_source.sns.SNSEventSource): @property def filters(self): @@ -245,6 +346,7 @@ def add(self, function): 'kinesis': kappa.event_source.kinesis.KinesisEventSource, 's3': kappa.event_source.s3.S3EventSource, 'sns': ExtendedSnsEventSource, + 'sqs': SqsEventSource, 'events': kappa.event_source.cloudwatch.CloudWatchEventSource } @@ -421,4 +523,4 @@ def titlecase_keys(d): """ Takes a dict with keys of type str and returns a new dict with all keys titlecased. """ - return {k.title(): v for k, v in d.items()} \ No newline at end of file + return {k.title(): v for k, v in d.items()}