From bcd15789f8e6319ff9745b44b0ebf79e7cb30bb9 Mon Sep 17 00:00:00 2001 From: Greg Furman Date: Tue, 17 Sep 2024 16:48:35 +0200 Subject: [PATCH] ESM-v2: Add SNS as DLQ (Kinesis, DynamoDB) --- .../pollers/stream_poller.py | 4 +- .../localstack/testing/aws/lambda_utils.py | 1 + ...test_lambda_integration_dynamodbstreams.py | 124 +++++++++++++++ ..._integration_dynamodbstreams.snapshot.json | 145 ++++++++++++++++++ ...ntegration_dynamodbstreams.validation.json | 3 + .../test_lambda_integration_kinesis.py | 121 +++++++++++++++ ...t_lambda_integration_kinesis.snapshot.json | 66 ++++++++ ...lambda_integration_kinesis.validation.json | 3 + 8 files changed, 466 insertions(+), 1 deletion(-) diff --git a/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/stream_poller.py b/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/stream_poller.py index 2bbbc0fb8b92f..867a8a151658c 100644 --- a/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/stream_poller.py +++ b/localstack-core/localstack/services/lambda_/event_source_mapping/pollers/stream_poller.py @@ -287,8 +287,10 @@ def send_events_to_dlq(self, shard_id, events, context) -> None: dlq_url = get_queue_url(dlq_arn) # TODO: validate no FIFO queue because they are unsupported sqs_client.send_message(QueueUrl=dlq_url, MessageBody=json.dumps(dlq_event)) + elif service == "sns": + sns_client = get_internal_client(dlq_arn) + sns_client.publish(TopicArn=dlq_arn, Message=json.dumps(dlq_event)) else: - # TODO: implement sns DLQ LOG.warning("Unsupported DLQ service %s", service) def create_dlq_event(self, shard_id: str, events: list[dict], context: dict) -> dict: diff --git a/localstack-core/localstack/testing/aws/lambda_utils.py b/localstack-core/localstack/testing/aws/lambda_utils.py index 2108bbc46baab..0100c4149cba8 100644 --- a/localstack-core/localstack/testing/aws/lambda_utils.py +++ b/localstack-core/localstack/testing/aws/lambda_utils.py @@ -283,6 +283,7 @@ def get_invoke_init_type( "Effect": "Allow", "Action": [ "sqs:*", + "sns:*", "dynamodb:DescribeStream", "dynamodb:GetRecords", "dynamodb:GetShardIterator", diff --git a/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.py b/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.py index c632a2eac2c45..95fd1aa899b5e 100644 --- a/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.py +++ b/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.py @@ -330,6 +330,130 @@ def test_deletion_event_source_mapping_with_dynamodb( list_esm = aws_client.lambda_.list_event_source_mappings(EventSourceArn=latest_stream_arn) snapshot.match("list_event_source_mapping_result", list_esm) + # FIXME UpdateTable is not returning a TableID + @markers.snapshot.skip_snapshot_verify( + paths=[ + "$..TableDescription.TableId", + ], + ) + @markers.snapshot.skip_snapshot_verify( + condition=is_old_esm, + paths=[ + "$..Message.DDBStreamBatchInfo.approximateArrivalOfFirstRecord", # Incorrect timestamp formatting + "$..Message.DDBStreamBatchInfo.approximateArrivalOfLastRecord", + "$..Message.requestContext.approximateInvokeCount", + "$..Message.responseContext.statusCode", + ], + ) + @markers.aws.validated + def test_dynamodb_event_source_mapping_with_sns_on_failure_destination_config( + self, + create_lambda_function, + sqs_get_queue_arn, + sqs_create_queue, + sns_create_topic, + sns_allow_topic_sqs_queue, + create_iam_role_with_policy, + dynamodb_create_table, + snapshot, + cleanups, + aws_client, + ): + snapshot.add_transformer(snapshot.transform.sns_api()) + + snapshot.add_transformer(snapshot.transform.key_value("startSequenceNumber")) + snapshot.add_transformer(snapshot.transform.key_value("endSequenceNumber")) + + function_name = f"lambda_func-{short_uid()}" + role = f"test-lambda-role-{short_uid()}" + policy_name = f"test-lambda-policy-{short_uid()}" + table_name = f"test-table-{short_uid()}" + partition_key = "my_partition_key" + item = {partition_key: {"S": "hello world"}} + + # create topic and queue + queue_url = sqs_create_queue() + topic_info = sns_create_topic() + topic_arn = topic_info["TopicArn"] + + # subscribe SQS to SNS + queue_arn = sqs_get_queue_arn(queue_url) + subscription = aws_client.sns.subscribe( + TopicArn=topic_arn, + Protocol="sqs", + Endpoint=queue_arn, + ) + cleanups.append( + lambda: aws_client.sns.unsubscribe(SubscriptionArn=subscription["SubscriptionArn"]) + ) + + sns_allow_topic_sqs_queue( + sqs_queue_url=queue_url, sqs_queue_arn=queue_arn, sns_topic_arn=topic_arn + ) + + role_arn = create_iam_role_with_policy( + RoleName=role, + PolicyName=policy_name, + RoleDefinition=lambda_role, + PolicyDefinition=s3_lambda_permission, + ) + + create_lambda_function( + handler_file=TEST_LAMBDA_PYTHON_UNHANDLED_ERROR, + func_name=function_name, + runtime=Runtime.python3_12, + role=role_arn, + ) + create_table_response = dynamodb_create_table( + table_name=table_name, partition_key=partition_key + ) + _await_dynamodb_table_active(aws_client.dynamodb, table_name) + snapshot.match("create_table_response", create_table_response) + + update_table_response = aws_client.dynamodb.update_table( + TableName=table_name, + StreamSpecification={"StreamEnabled": True, "StreamViewType": "NEW_IMAGE"}, + ) + snapshot.match("update_table_response", update_table_response) + stream_arn = update_table_response["TableDescription"]["LatestStreamArn"] + + destination_config = {"OnFailure": {"Destination": topic_arn}} + create_event_source_mapping_response = aws_client.lambda_.create_event_source_mapping( + FunctionName=function_name, + BatchSize=1, + StartingPosition="TRIM_HORIZON", + EventSourceArn=stream_arn, + MaximumBatchingWindowInSeconds=1, + MaximumRetryAttempts=1, + DestinationConfig=destination_config, + ) + snapshot.match("create_event_source_mapping_response", create_event_source_mapping_response) + event_source_mapping_uuid = create_event_source_mapping_response["UUID"] + cleanups.append( + lambda: aws_client.lambda_.delete_event_source_mapping(UUID=event_source_mapping_uuid) + ) + + _await_event_source_mapping_enabled(aws_client.lambda_, event_source_mapping_uuid) + + aws_client.dynamodb.put_item(TableName=table_name, Item=item) + + def verify_failure_received(): + res = aws_client.sqs.receive_message(QueueUrl=queue_url) + assert len(res.get("Messages", [])) == 1 + return res + + # It can take ~3 min against AWS until the message is received + sleep = 15 if is_aws_cloud() else 5 + messages = retry(verify_failure_received, retries=15, sleep=sleep, sleep_before=5) + + # The failure context payload of the SQS response is in JSON-string format. + # Rather extract, parse, and snapshot it since the SQS information is irrelevant. + failure_sns_payload = messages.get("Messages", []).pop(0) + failure_sns_body_json = failure_sns_payload.get("Body", {}) + failure_sns_message = json.loads(failure_sns_body_json) + + snapshot.match("failure_sns_message", failure_sns_message) + # FIXME UpdateTable is not returning a TableID @markers.snapshot.skip_snapshot_verify( paths=[ diff --git a/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.snapshot.json b/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.snapshot.json index a7d444e1e83e9..f3b62186d89a4 100644 --- a/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.snapshot.json +++ b/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.snapshot.json @@ -1959,5 +1959,150 @@ } ] } + }, + "tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.py::TestDynamoDBEventSourceMapping::test_dynamodb_event_source_mapping_with_sns_on_failure_destination_config": { + "recorded-date": "16-09-2024, 15:49:14", + "recorded-content": { + "create_table_response": { + "TableDescription": { + "AttributeDefinitions": [ + { + "AttributeName": "my_partition_key", + "AttributeType": "S" + } + ], + "BillingModeSummary": { + "BillingMode": "PAY_PER_REQUEST" + }, + "CreationDateTime": "", + "DeletionProtectionEnabled": false, + "ItemCount": 0, + "KeySchema": [ + { + "AttributeName": "my_partition_key", + "KeyType": "HASH" + } + ], + "ProvisionedThroughput": { + "NumberOfDecreasesToday": 0, + "ReadCapacityUnits": 0, + "WriteCapacityUnits": 0 + }, + "TableArn": "arn::dynamodb::111111111111:table/", + "TableId": "", + "TableName": "", + "TableSizeBytes": 0, + "TableStatus": "CREATING" + }, + "ResponseMetadata": { + "HTTPHeaders": {}, + "HTTPStatusCode": 200 + } + }, + "update_table_response": { + "TableDescription": { + "AttributeDefinitions": [ + { + "AttributeName": "my_partition_key", + "AttributeType": "S" + } + ], + "BillingModeSummary": { + "BillingMode": "PAY_PER_REQUEST", + "LastUpdateToPayPerRequestDateTime": "" + }, + "CreationDateTime": "", + "DeletionProtectionEnabled": false, + "ItemCount": 0, + "KeySchema": [ + { + "AttributeName": "my_partition_key", + "KeyType": "HASH" + } + ], + "LatestStreamArn": "arn::dynamodb::111111111111:table//stream/", + "LatestStreamLabel": "", + "ProvisionedThroughput": { + "NumberOfDecreasesToday": 0, + "ReadCapacityUnits": 0, + "WriteCapacityUnits": 0 + }, + "StreamSpecification": { + "StreamEnabled": true, + "StreamViewType": "NEW_IMAGE" + }, + "TableArn": "arn::dynamodb::111111111111:table/", + "TableId": "", + "TableName": "", + "TableSizeBytes": 0, + "TableStatus": "UPDATING" + }, + "ResponseMetadata": { + "HTTPHeaders": {}, + "HTTPStatusCode": 200 + } + }, + "create_event_source_mapping_response": { + "BatchSize": 1, + "BisectBatchOnFunctionError": false, + "DestinationConfig": { + "OnFailure": { + "Destination": "arn::sns::111111111111:" + } + }, + "EventSourceArn": "arn::dynamodb::111111111111:table//stream/", + "FunctionArn": "arn::lambda::111111111111:function:", + "FunctionResponseTypes": [], + "LastModified": "", + "LastProcessingResult": "No records processed", + "MaximumBatchingWindowInSeconds": 1, + "MaximumRecordAgeInSeconds": -1, + "MaximumRetryAttempts": 1, + "ParallelizationFactor": 1, + "StartingPosition": "TRIM_HORIZON", + "State": "Creating", + "StateTransitionReason": "User action", + "TumblingWindowInSeconds": 0, + "UUID": "", + "ResponseMetadata": { + "HTTPHeaders": {}, + "HTTPStatusCode": 202 + } + }, + "failure_sns_message": { + "Message": { + "requestContext": { + "requestId": "", + "functionArn": "arn::lambda::111111111111:function:", + "condition": "RetryAttemptsExhausted", + "approximateInvokeCount": 2 + }, + "responseContext": { + "statusCode": 200, + "executedVersion": "$LATEST", + "functionError": "Unhandled" + }, + "version": "1.0", + "timestamp": "", + "DDBStreamBatchInfo": { + "shardId": "", + "startSequenceNumber": "", + "endSequenceNumber": "", + "approximateArrivalOfFirstRecord": "", + "approximateArrivalOfLastRecord": "", + "batchSize": 1, + "streamArn": "arn::dynamodb::111111111111:table//stream/" + } + }, + "MessageId": "", + "Signature": "", + "SignatureVersion": "1", + "SigningCertURL": "/SimpleNotificationService-", + "Timestamp": "", + "TopicArn": "arn::sns::111111111111:", + "Type": "Notification", + "UnsubscribeURL": "/?Action=Unsubscribe&SubscriptionArn=arn::sns::111111111111::" + } + } } } diff --git a/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.validation.json b/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.validation.json index 31e62ab7e64d4..1960fc6cd7cce 100644 --- a/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.validation.json +++ b/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.validation.json @@ -41,6 +41,9 @@ "tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.py::TestDynamoDBEventSourceMapping::test_dynamodb_event_source_mapping_with_on_failure_destination_config": { "last_validated_date": "2024-09-03T14:58:05+00:00" }, + "tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.py::TestDynamoDBEventSourceMapping::test_dynamodb_event_source_mapping_with_sns_on_failure_destination_config": { + "last_validated_date": "2024-09-16T15:49:08+00:00" + }, "tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_dynamodbstreams.py::TestDynamoDBEventSourceMapping::test_dynamodb_invalid_event_filter[[{\"eventName\": [\"INSERT\"=123}]]": { "last_validated_date": "2024-09-03T15:10:35+00:00" }, diff --git a/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.py b/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.py index 3a8b66dead314..e331d2a3155aa 100644 --- a/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.py +++ b/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.py @@ -543,6 +543,127 @@ def verify_failure_received(): sqs_payload = retry(verify_failure_received, retries=50, sleep=5, sleep_before=5) snapshot.match("sqs_payload", sqs_payload) + @markers.snapshot.skip_snapshot_verify( + paths=[ + "$..Message.KinesisBatchInfo.shardId", + "$..Message.KinesisBatchInfo.streamArn", + ], + ) + @markers.snapshot.skip_snapshot_verify( + condition=is_old_esm, + paths=[ + "$..Message.KinesisBatchInfo.approximateArrivalOfFirstRecord", + "$..Message.KinesisBatchInfo.approximateArrivalOfLastRecord", + "$..Message.requestContext.approximateInvokeCount", + "$..Message.responseContext.statusCode", + ], + ) + @markers.aws.validated + def test_kinesis_event_source_mapping_with_sns_on_failure_destination_config( + self, + create_lambda_function, + sqs_get_queue_arn, + sqs_create_queue, + sns_create_topic, + sns_allow_topic_sqs_queue, + create_iam_role_with_policy, + wait_for_stream_ready, + cleanups, + snapshot, + aws_client, + ): + # snapshot setup + snapshot.add_transformer(snapshot.transform.sns_api()) + snapshot.add_transformer(snapshot.transform.key_value("startSequenceNumber")) + + function_name = f"lambda_func-{short_uid()}" + role = f"test-lambda-role-{short_uid()}" + policy_name = f"test-lambda-policy-{short_uid()}" + kinesis_name = f"test-kinesis-{short_uid()}" + role_arn = create_iam_role_with_policy( + RoleName=role, + PolicyName=policy_name, + RoleDefinition=lambda_role, + PolicyDefinition=s3_lambda_permission, + ) + + # create topic and queue + queue_url = sqs_create_queue() + topic_info = sns_create_topic() + topic_arn = topic_info["TopicArn"] + + # subscribe SQS to SNS + queue_arn = sqs_get_queue_arn(queue_url) + subscription = aws_client.sns.subscribe( + TopicArn=topic_arn, + Protocol="sqs", + Endpoint=queue_arn, + ) + cleanups.append( + lambda: aws_client.sns.unsubscribe(SubscriptionArn=subscription["SubscriptionArn"]) + ) + + sns_allow_topic_sqs_queue( + sqs_queue_url=queue_url, sqs_queue_arn=queue_arn, sns_topic_arn=topic_arn + ) + create_lambda_function( + handler_file=TEST_LAMBDA_PYTHON, + func_name=function_name, + runtime=Runtime.python3_12, + role=role_arn, + ) + aws_client.kinesis.create_stream(StreamName=kinesis_name, ShardCount=1) + cleanups.append( + lambda: aws_client.kinesis.delete_stream( + StreamName=kinesis_name, EnforceConsumerDeletion=True + ) + ) + result = aws_client.kinesis.describe_stream(StreamName=kinesis_name)["StreamDescription"] + kinesis_arn = result["StreamARN"] + wait_for_stream_ready(stream_name=kinesis_name) + + destination_config = {"OnFailure": {"Destination": topic_arn}} + message = { + "input": "hello", + "value": "world", + lambda_integration.MSG_BODY_RAISE_ERROR_FLAG: 1, + } + + create_event_source_mapping_response = aws_client.lambda_.create_event_source_mapping( + FunctionName=function_name, + BatchSize=1, + StartingPosition="TRIM_HORIZON", + EventSourceArn=kinesis_arn, + MaximumBatchingWindowInSeconds=1, + MaximumRetryAttempts=1, + DestinationConfig=destination_config, + ) + cleanups.append( + lambda: aws_client.lambda_.delete_event_source_mapping(UUID=event_source_mapping_uuid) + ) + + snapshot.match("create_event_source_mapping_response", create_event_source_mapping_response) + event_source_mapping_uuid = create_event_source_mapping_response["UUID"] + _await_event_source_mapping_enabled(aws_client.lambda_, event_source_mapping_uuid) + aws_client.kinesis.put_record( + StreamName=kinesis_name, Data=to_bytes(json.dumps(message)), PartitionKey="custom" + ) + + def verify_failure_received(): + result = aws_client.sqs.receive_message(QueueUrl=queue_url) + assert result["Messages"] + return result + + messages = retry(verify_failure_received, retries=50, sleep=5, sleep_before=5) + + # The failure context payload of the SQS response is in JSON-string format. + # Rather extract, parse, and snapshot it since the SQS information is irrelevant. + failure_sns_payload = messages.get("Messages", []).pop(0) + failure_sns_body_json = failure_sns_payload.get("Body", {}) + failure_sns_message = json.loads(failure_sns_body_json) + + snapshot.match("failure_sns_message", failure_sns_message) + # TODO: add tests for different edge cases in filtering (e.g. message isn't json => needs to be dropped) # https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html#filtering-kinesis diff --git a/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.snapshot.json b/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.snapshot.json index 49f68077edef3..1b69788d5b8e2 100644 --- a/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.snapshot.json +++ b/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.snapshot.json @@ -1575,5 +1575,71 @@ ] } } + }, + "tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.py::TestKinesisSource::test_kinesis_event_source_mapping_with_sns_on_failure_destination_config": { + "recorded-date": "16-09-2024, 16:08:03", + "recorded-content": { + "create_event_source_mapping_response": { + "BatchSize": 1, + "BisectBatchOnFunctionError": false, + "DestinationConfig": { + "OnFailure": { + "Destination": "arn::sns::111111111111:" + } + }, + "EventSourceArn": "arn::kinesis::111111111111:stream/", + "FunctionArn": "arn::lambda::111111111111:function:", + "FunctionResponseTypes": [], + "LastModified": "", + "LastProcessingResult": "No records processed", + "MaximumBatchingWindowInSeconds": 1, + "MaximumRecordAgeInSeconds": -1, + "MaximumRetryAttempts": 1, + "ParallelizationFactor": 1, + "StartingPosition": "TRIM_HORIZON", + "State": "Creating", + "StateTransitionReason": "User action", + "TumblingWindowInSeconds": 0, + "UUID": "", + "ResponseMetadata": { + "HTTPHeaders": {}, + "HTTPStatusCode": 202 + } + }, + "failure_sns_message": { + "Message": { + "requestContext": { + "requestId": "", + "functionArn": "arn::lambda::111111111111:function:", + "condition": "RetryAttemptsExhausted", + "approximateInvokeCount": 2 + }, + "responseContext": { + "statusCode": 200, + "executedVersion": "$LATEST", + "functionError": "Unhandled" + }, + "version": "1.0", + "timestamp": "", + "KinesisBatchInfo": { + "shardId": "shardId-000000000000", + "startSequenceNumber": "", + "endSequenceNumber": "", + "approximateArrivalOfFirstRecord": "", + "approximateArrivalOfLastRecord": "", + "batchSize": 1, + "streamArn": "arn::kinesis::111111111111:stream/" + } + }, + "MessageId": "", + "Signature": "", + "SignatureVersion": "1", + "SigningCertURL": "/SimpleNotificationService-", + "Timestamp": "", + "TopicArn": "arn::sns::111111111111:", + "Type": "Notification", + "UnsubscribeURL": "/?Action=Unsubscribe&SubscriptionArn=arn::sns::111111111111::" + } + } } } diff --git a/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.validation.json b/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.validation.json index b7ab3f456797e..c99a781e949fa 100644 --- a/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.validation.json +++ b/tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.validation.json @@ -23,6 +23,9 @@ "tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.py::TestKinesisSource::test_kinesis_event_source_mapping_with_on_failure_destination_config": { "last_validated_date": "2024-09-03T15:41:37+00:00" }, + "tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.py::TestKinesisSource::test_kinesis_event_source_mapping_with_sns_on_failure_destination_config": { + "last_validated_date": "2024-09-16T16:07:56+00:00" + }, "tests/aws/services/lambda_/event_source_mapping/test_lambda_integration_kinesis.py::TestKinesisSource::test_kinesis_event_source_trim_horizon": { "last_validated_date": "2023-02-27T15:56:17+00:00" }