From 04a6819f68ed43314c4213af81439eb1acf1177d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Miguel=20Neves?= Date: Wed, 19 Sep 2018 02:55:35 +0100 Subject: [PATCH 01/13] sqs support --- requirements.txt | 2 +- zappa/core.py | 9 ++++++--- zappa/utilities.py | 2 ++ 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/requirements.txt b/requirements.txt index 811575d72..beafd60ca 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,7 +8,7 @@ future==0.16.0 futures==3.2.0; python_version < '3' hjson==3.0.1 jmespath==0.9.3 -kappa==0.6.0 +kappa==0.7.0 lambda-packages==0.20.0 pip>=9.0.1, <=10.1.0 python-dateutil>=2.6.1, <2.7.0 diff --git a/zappa/core.py b/zappa/core.py index 2fdb556b6..c0c92d6b9 100644 --- a/zappa/core.py +++ b/zappa/core.py @@ -2364,10 +2364,10 @@ def schedule_events(self, lambda_arn, lambda_name, events, default=True): http://docs.aws.amazon.com/lambda/latest/dg/tutorial-scheduled-events-schedule-expressions.html """ - # The two stream sources - DynamoDB and Kinesis - are working differently than the other services (pull vs push) + # The stream sources - DynamoDB, Kinesis and SQS - are working differently than the other services (pull vs push) # and do not require event permissions. They do require additional permissions on the Lambda roles though. # http://docs.aws.amazon.com/lambda/latest/dg/lambda-api-permissions-ref.html - pull_services = ['dynamodb', 'kinesis'] + pull_services = ['dynamodb', 'kinesis', 'sqs'] # XXX: Not available in Lambda yet. # We probably want to execute the latest code. @@ -2612,7 +2612,10 @@ def unschedule_events(self, events, lambda_arn=None, lambda_name=None, excluded_ function, self.boto_session ) - print("Removed event " + name + " (" + str(event_source['events']) + ").") + print("Removed event {}{}.".format( + name, + " ({})".format(str(event_source['events'])) if 'events' in event_source else '') + ) ### # Async / SNS diff --git a/zappa/utilities.py b/zappa/utilities.py index e13188c42..8cdd2bf9e 100644 --- a/zappa/utilities.py +++ b/zappa/utilities.py @@ -203,6 +203,7 @@ def get_event_source(event_source, lambda_arn, target_function, boto_session, dr import kappa.event_source.kinesis import kappa.event_source.s3 import kappa.event_source.sns + import kappa.event_source.sqs import kappa.event_source.cloudwatch import kappa.policy import kappa.role @@ -245,6 +246,7 @@ def add(self, function): 'kinesis': kappa.event_source.kinesis.KinesisEventSource, 's3': kappa.event_source.s3.S3EventSource, 'sns': ExtendedSnsEventSource, + 'sqs': kappa.event_source.sqs.SqsEventSource, 'events': kappa.event_source.cloudwatch.CloudWatchEventSource } From ae370724f0478389d8f3bd1677594e04bac3fbbf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Miguel=20Neves?= Date: Wed, 19 Sep 2018 02:59:44 +0100 Subject: [PATCH 02/13] change kappa to current branch with SQS event --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index beafd60ca..60df5b4d3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,7 +8,7 @@ future==0.16.0 futures==3.2.0; python_version < '3' hjson==3.0.1 jmespath==0.9.3 -kappa==0.7.0 +-e git+https://github.com/jneves/kappa@9156d35127b0d09b8f7c99f4d7f7f3733f168514#egg=kappa lambda-packages==0.20.0 pip>=9.0.1, <=10.1.0 python-dateutil>=2.6.1, <2.7.0 From 8f23a206498236a18a0ae353bb392e9208cd4d83 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Miguel=20Neves?= Date: Thu, 20 Sep 2018 22:27:39 +0100 Subject: [PATCH 03/13] add placebo test --- tests/tests_placebo.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/tests/tests_placebo.py b/tests/tests_placebo.py index 1bea096e7..425acefd3 100644 --- a/tests/tests_placebo.py +++ b/tests/tests_placebo.py @@ -376,6 +376,29 @@ def test_handler(self, session): } self.assertEqual("AWS KINESIS EVENT", lh.handler(event, None)) + # Test AWS SQS event + event = { + u"Records": [ + { + u"messageId": u"c80e8021-a70a-42c7-a470-796e1186f753", + u"receiptHandle": u"AQEBJQ+/u6NsnT5t8Q/VbVxgdUl4TMKZ5FqhksRdIQvLBhwNvADoBxYSOVeCBXdnS9P+erlTtwEALHsnBXynkfPLH3BOUqmgzP25U8kl8eHzq6RAlzrSOfTO8ox9dcp6GLmW33YjO3zkq5VRYyQlJgLCiAZUpY2D4UQcE5D1Vm8RoKfbE+xtVaOctYeINjaQJ1u3mWx9T7tork3uAlOe1uyFjCWU5aPX/1OHhWCGi2EPPZj6vchNqDOJC/Y2k1gkivqCjz1CZl6FlZ7UVPOx3AMoszPuOYZ+Nuqpx2uCE2MHTtMHD8PVjlsWirt56oUr6JPp9aRGo6bitPIOmi4dX0FmuMKD6u/JnuZCp+AXtJVTmSHS8IXt/twsKU7A+fiMK01NtD5msNgVPoe9JbFtlGwvTQ==", + u"body": u"{\"foo\":\"bar\"}", + u"attributes": { + u"ApproximateReceiveCount": u"3", + u"SentTimestamp": u"1529104986221", + u"SenderId": u"594035263019", + u"ApproximateFirstReceiveTimestamp": u"1529104986230" + }, + u"messageAttributes": {}, + u"md5OfBody": u"9bb58f26192e4ba00f01e2e7b136bbd8", + u"eventSource": u"aws:sqs", + u"eventSourceARN": u"arn:aws:sqs:us-west-2:594035263019:NOTFIFOQUEUE", + u"awsRegion": u"us-west-2" + } + ] + } + self.assertEqual("AWS SQS EVENT", lh.handler(event, None)) + # Test Authorizer event event = {u'authorizationToken': u'hubtoken1', u'methodArn': u'arn:aws:execute-api:us-west-2:1234:xxxxx/dev/GET/v1/endpoint/param', u'type': u'TOKEN'} self.assertEqual("AUTHORIZER_EVENT", lh.handler(event, None)) From 8e54cb5ea9500d577367a3441725659eee543f82 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Miguel=20Neves?= Date: Thu, 20 Sep 2018 22:28:01 +0100 Subject: [PATCH 04/13] remove dependence from a kappa update --- requirements.txt | 2 +- zappa/utilities.py | 101 ++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 100 insertions(+), 3 deletions(-) diff --git a/requirements.txt b/requirements.txt index 60df5b4d3..60beb4388 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,7 +8,7 @@ future==0.16.0 futures==3.2.0; python_version < '3' hjson==3.0.1 jmespath==0.9.3 --e git+https://github.com/jneves/kappa@9156d35127b0d09b8f7c99f4d7f7f3733f168514#egg=kappa +kappa==0.6 lambda-packages==0.20.0 pip>=9.0.1, <=10.1.0 python-dateutil>=2.6.1, <2.7.0 diff --git a/zappa/utilities.py b/zappa/utilities.py index 8cdd2bf9e..23021cdf3 100644 --- a/zappa/utilities.py +++ b/zappa/utilities.py @@ -217,6 +217,103 @@ class PseudoFunction(object): def __init__(self): return + # Mostly adapted from kappa - will probably be replaced by kappa support + class SqsEventSource(kappa.event_source.base.EventSource): + + def __init__(self, context, config): + super(SqsEventSource, self).__init__(context, config) + self._lambda = kappa.awsclient.create_client( + 'lambda', context.session) + + def _get_uuid(self, function): + uuid = None + response = self._lambda.call( + 'list_event_source_mappings', + FunctionName=function.name, + EventSourceArn=self.arn) + LOG.debug(response) + if len(response['EventSourceMappings']) > 0: + uuid = response['EventSourceMappings'][0]['UUID'] + return uuid + + def add(self, function): + try: + response = self._lambda.call( + 'create_event_source_mapping', + FunctionName=function.name, + EventSourceArn=self.arn, + BatchSize=self.batch_size, + Enabled=self.enabled + ) + LOG.debug(response) + except Exception: + LOG.exception('Unable to add event source') + + def enable(self, function): + self._config['enabled'] = True + try: + response = self._lambda.call( + 'update_event_source_mapping', + UUID=self._get_uuid(function), + Enabled=self.enabled + ) + LOG.debug(response) + except Exception: + LOG.exception('Unable to enable event source') + + def disable(self, function): + self._config['enabled'] = False + try: + response = self._lambda.call( + 'update_event_source_mapping', + FunctionName=function.name, + Enabled=self.enabled + ) + LOG.debug(response) + except Exception: + LOG.exception('Unable to disable event source') + + def update(self, function): + response = None + uuid = self._get_uuid(function) + if uuid: + try: + response = self._lambda.call( + 'update_event_source_mapping', + BatchSize=self.batch_size, + Enabled=self.enabled, + FunctionName=function.arn) + LOG.debug(response) + except Exception: + LOG.exception('Unable to update event source') + + def remove(self, function): + response = None + uuid = self._get_uuid(function) + if uuid: + response = self._lambda.call( + 'delete_event_source_mapping', + UUID=uuid) + LOG.debug(response) + return response + + def status(self, function): + response = None + LOG.debug('getting status for event source %s', self.arn) + uuid = self._get_uuid(function) + if uuid: + try: + response = self._lambda.call( + 'get_event_source_mapping', + UUID=self._get_uuid(function)) + LOG.debug(response) + except botocore.exceptions.ClientError: + LOG.debug('event source %s does not exist', self.arn) + response = None + else: + LOG.debug('No UUID for event source %s', self.arn) + return response + class ExtendedSnsEventSource(kappa.event_source.sns.SNSEventSource): @property def filters(self): @@ -246,7 +343,7 @@ def add(self, function): 'kinesis': kappa.event_source.kinesis.KinesisEventSource, 's3': kappa.event_source.s3.S3EventSource, 'sns': ExtendedSnsEventSource, - 'sqs': kappa.event_source.sqs.SqsEventSource, + 'sqs': SqsEventSource, 'events': kappa.event_source.cloudwatch.CloudWatchEventSource } @@ -423,4 +520,4 @@ def titlecase_keys(d): """ Takes a dict with keys of type str and returns a new dict with all keys titlecased. """ - return {k.title(): v for k, v in d.items()} \ No newline at end of file + return {k.title(): v for k, v in d.items()} From c3873058f479a9c5798b46d9e5e3aaf33322eca2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Miguel=20Neves?= Date: Thu, 20 Sep 2018 22:31:10 +0100 Subject: [PATCH 05/13] Update requirements.txt --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 60beb4388..811575d72 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,7 +8,7 @@ future==0.16.0 futures==3.2.0; python_version < '3' hjson==3.0.1 jmespath==0.9.3 -kappa==0.6 +kappa==0.6.0 lambda-packages==0.20.0 pip>=9.0.1, <=10.1.0 python-dateutil>=2.6.1, <2.7.0 From e9f1e30488994d318d6689c6b783296560d527b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Miguel=20Neves?= Date: Thu, 20 Sep 2018 22:41:29 +0100 Subject: [PATCH 06/13] fix import --- zappa/utilities.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zappa/utilities.py b/zappa/utilities.py index 23021cdf3..f9c0477d2 100644 --- a/zappa/utilities.py +++ b/zappa/utilities.py @@ -199,11 +199,11 @@ def get_event_source(event_source, lambda_arn, target_function, boto_session, dr """ import kappa.function import kappa.restapi + import kappa.event_source.base import kappa.event_source.dynamodb_stream import kappa.event_source.kinesis import kappa.event_source.s3 import kappa.event_source.sns - import kappa.event_source.sqs import kappa.event_source.cloudwatch import kappa.policy import kappa.role From a028da43a94566c65faa3d128d5d2880ebb623c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Miguel=20Neves?= Date: Thu, 20 Sep 2018 22:49:29 +0100 Subject: [PATCH 07/13] fix tests --- test_settings.py | 7 ++++++- tests/tests_placebo.py | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/test_settings.py b/test_settings.py index eb9c68915..f9650ff34 100644 --- a/test_settings.py +++ b/test_settings.py @@ -19,7 +19,8 @@ 'arn:aws:s3:1': 'test_settings.aws_s3_event', 'arn:aws:sns:1': 'test_settings.aws_sns_event', 'arn:aws:dynamodb:1': 'test_settings.aws_dynamodb_event', - 'arn:aws:kinesis:1': 'test_settings.aws_kinesis_event' + 'arn:aws:kinesis:1': 'test_settings.aws_kinesis_event', + 'arn:aws:sqs:1': 'test_settings.aws_sqs_event' } ENVIRONMENT_VARIABLES={'testenv': 'envtest'} @@ -54,6 +55,10 @@ def aws_kinesis_event(event, content): return "AWS KINESIS EVENT" +def aws_sqs_event(event, content): + return "AWS SQS EVENT" + + def authorizer_event(event, content): return "AUTHORIZER_EVENT" diff --git a/tests/tests_placebo.py b/tests/tests_placebo.py index 425acefd3..7a95c8ef7 100644 --- a/tests/tests_placebo.py +++ b/tests/tests_placebo.py @@ -392,7 +392,7 @@ def test_handler(self, session): u"messageAttributes": {}, u"md5OfBody": u"9bb58f26192e4ba00f01e2e7b136bbd8", u"eventSource": u"aws:sqs", - u"eventSourceARN": u"arn:aws:sqs:us-west-2:594035263019:NOTFIFOQUEUE", + u"eventSourceARN": u"arn:aws:sqs:1", u"awsRegion": u"us-west-2" } ] From 09687a4b01402f23546fef5af217a5406ca14bc4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Miguel=20Neves?= Date: Thu, 20 Sep 2018 23:18:52 +0100 Subject: [PATCH 08/13] fix test --- tests/tests_placebo.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/tests_placebo.py b/tests/tests_placebo.py index 7a95c8ef7..6ac4c5425 100644 --- a/tests/tests_placebo.py +++ b/tests/tests_placebo.py @@ -393,7 +393,7 @@ def test_handler(self, session): u"md5OfBody": u"9bb58f26192e4ba00f01e2e7b136bbd8", u"eventSource": u"aws:sqs", u"eventSourceARN": u"arn:aws:sqs:1", - u"awsRegion": u"us-west-2" + u"awsRegion": u"us-east-1" } ] } From 2ac20b83ab996e64d261ee175359fdaf29128677 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Miguel=20Neves?= Date: Fri, 21 Sep 2018 08:33:50 +0100 Subject: [PATCH 09/13] oops, the handler part was missing --- zappa/handler.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/zappa/handler.py b/zappa/handler.py index bcf6a588a..95d3eedb8 100644 --- a/zappa/handler.py +++ b/zappa/handler.py @@ -286,7 +286,7 @@ def get_function_for_aws_event(self, record): """ Get the associated function to execute for a triggered AWS event - Support S3, SNS, DynamoDB and kinesis events + Support S3, SNS, DynamoDB, kinesis and SQS events """ if 's3' in record: if ':' in record['s3']['configurationId']: @@ -301,7 +301,7 @@ def get_function_for_aws_event(self, record): except ValueError: pass arn = record['Sns'].get('TopicArn') - elif 'dynamodb' in record or 'kinesis' in record: + elif 'dynamodb' in record or 'kinesis' in record or 'sqs' in record: arn = record.get('eventSourceARN') elif 's3' in record: arn = record['s3']['bucket']['arn'] From 14c1c8ac77763136580cab0c5a7f84f161ba5744 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Miguel=20Neves?= Date: Fri, 21 Sep 2018 08:44:32 +0100 Subject: [PATCH 10/13] add logger --- zappa/utilities.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/zappa/utilities.py b/zappa/utilities.py index f9c0477d2..6e416bbcf 100644 --- a/zappa/utilities.py +++ b/zappa/utilities.py @@ -4,6 +4,7 @@ import fnmatch import io import json +import logging import os import re import shutil @@ -17,6 +18,8 @@ else: from urllib.parse import urlparse +LOG = logging.getLogger(__name__) + ## # Settings / Packaging ## From 4892dbe6b0be62e8c1f0ac4ece5adbf1cda380b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Miguel=20Neves?= Date: Fri, 21 Sep 2018 09:28:13 +0100 Subject: [PATCH 11/13] correct handler --- zappa/handler.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/zappa/handler.py b/zappa/handler.py index 95d3eedb8..c2e9febd5 100644 --- a/zappa/handler.py +++ b/zappa/handler.py @@ -301,7 +301,9 @@ def get_function_for_aws_event(self, record): except ValueError: pass arn = record['Sns'].get('TopicArn') - elif 'dynamodb' in record or 'kinesis' in record or 'sqs' in record: + elif 'dynamodb' in record or 'kinesis' in record: + arn = record.get('eventSourceARN') + elif 'eventSource' in record and record.get('eventSource') == 'aws:sqs': arn = record.get('eventSourceARN') elif 's3' in record: arn = record['s3']['bucket']['arn'] From 6d8d0c7d942ff7342e39ef526923929ec7d2ca1c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Miguel=20Neves?= Date: Fri, 21 Sep 2018 14:19:47 +0100 Subject: [PATCH 12/13] Fake commit to trigger a travis rebuild --- test_settings.py | 1 + 1 file changed, 1 insertion(+) diff --git a/test_settings.py b/test_settings.py index f9650ff34..8aa104943 100644 --- a/test_settings.py +++ b/test_settings.py @@ -65,3 +65,4 @@ def authorizer_event(event, content): def command(): print("command") + From 8c0d4655e5a627bd5aedcc018d7d5bf4cd32e399 Mon Sep 17 00:00:00 2001 From: Edgar Roman Date: Thu, 27 Sep 2018 15:00:03 -0400 Subject: [PATCH 13/13] Added SQS Event handler docs to main page --- README.md | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index e5c5e962c..f51b979b3 100644 --- a/README.md +++ b/README.md @@ -458,7 +458,7 @@ However, it's now far easier to use Route 53-based DNS authentication, which wil ## Executing in Response to AWS Events -Similarly, you can have your functions execute in response to events that happen in the AWS ecosystem, such as S3 uploads, DynamoDB entries, Kinesis streams, and SNS messages. +Similarly, you can have your functions execute in response to events that happen in the AWS ecosystem, such as S3 uploads, DynamoDB entries, Kinesis streams, SNS messages, and SQS queues. In your *zappa_settings.json* file, define your [event sources](http://docs.aws.amazon.com/lambda/latest/dg/invoking-lambda-function.html) and the function you wish to execute. For instance, this will execute `your_module.process_upload_function` in response to new objects in your `my-bucket` S3 bucket. Note that `process_upload_function` must accept `event` and `context` parameters. @@ -556,6 +556,21 @@ Optionally you can add [SNS message filters](http://docs.aws.amazon.com/sns/late ] ``` +[SQS](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html) is also pulling messages from a stream. At this time, [only "Standard" queues can trigger lambda events, not "FIFO" queues](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html). Read the AWS Documentation carefully since Lambda calls the SQS DeleteMessage API on your behalf once your function completes successfully. + +```javascript + "events": [ + { + "function": "your_module.process_messages", + "event_source": { + "arn": "arn:aws:sqs:us-east-1:12341234:your-queue-name-arn", + "batch_size": 10, // Max: 10. Use 1 to trigger immediate processing + "enabled": true // Default is false + } + } + ] +``` + For configuring Lex Bot's intent triggered events: ```javascript "bot_events": [