From 39b489172274746abe974512bc0d21492aec74be Mon Sep 17 00:00:00 2001 From: Andrea Como Date: Fri, 29 Nov 2024 18:38:28 +0100 Subject: [PATCH] Fixing issue 1262: AWSTraceHeader is now propagated --- .../cloud/sqs/operations/SqsTemplate.java | 2 +- .../support/converter/SqsHeaderMapper.java | 5 ++ .../sqs/operations/SqsTemplateTests.java | 52 ++++++++++++------- .../converter/SqsHeaderMapperTests.java | 33 ++++++++++++ 4 files changed, 72 insertions(+), 20 deletions(-) diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/operations/SqsTemplate.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/operations/SqsTemplate.java index 4c6ff0a23..758ef9c50 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/operations/SqsTemplate.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/operations/SqsTemplate.java @@ -454,7 +454,7 @@ private Integer getDelaySeconds(Message message) { private Map mapMessageSystemAttributes( Message message) { return message.attributes().entrySet().stream().filter(Predicate.not(entry -> isSkipAttribute(entry.getKey()))) - .collect(Collectors.toMap(entry -> MessageSystemAttributeNameForSends.fromValue(entry.getKey().name()), + .collect(Collectors.toMap(entry -> MessageSystemAttributeNameForSends.fromValue(entry.getKey().toString()), entry -> MessageSystemAttributeValue.builder().dataType(MessageAttributeDataTypes.STRING) .stringValue(entry.getValue()).build())); } diff --git a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/converter/SqsHeaderMapper.java b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/converter/SqsHeaderMapper.java index 1e98f8bcf..a67d8034b 100644 --- a/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/converter/SqsHeaderMapper.java +++ b/spring-cloud-aws-sqs/src/main/java/io/awspring/cloud/sqs/support/converter/SqsHeaderMapper.java @@ -79,6 +79,10 @@ public Message fromHeaders(MessageHeaders headers) { attributes.put(MessageSystemAttributeName.MESSAGE_DEDUPLICATION_ID, headers.get(SqsHeaders.MessageSystemAttributes.SQS_MESSAGE_DEDUPLICATION_ID_HEADER, String.class)); } + if (headers.containsKey(SqsHeaders.MessageSystemAttributes.SQS_AWS_TRACE_HEADER)) { + attributes.put(MessageSystemAttributeName.AWS_TRACE_HEADER, + headers.get(SqsHeaders.MessageSystemAttributes.SQS_AWS_TRACE_HEADER, String.class)); + } Map messageAttributes = headers.entrySet().stream() .filter(entry -> !isSkipHeader(entry.getKey())).collect(Collectors.toMap(Map.Entry::getKey, entry -> getMessageAttributeValue(entry.getKey(), entry.getValue()))); @@ -110,6 +114,7 @@ else if (messageHeaderValue instanceof ByteBuffer) { private boolean isSkipHeader(String headerName) { return SqsHeaders.MessageSystemAttributes.SQS_MESSAGE_GROUP_ID_HEADER.equals(headerName) || SqsHeaders.MessageSystemAttributes.SQS_MESSAGE_DEDUPLICATION_ID_HEADER.equals(headerName) + || SqsHeaders.MessageSystemAttributes.SQS_AWS_TRACE_HEADER.equals(headerName) || SqsHeaders.SQS_DELAY_HEADER.equals(headerName) || MessageHeaders.ID.equals(headerName) || MessageHeaders.TIMESTAMP.equals(headerName); } diff --git a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/operations/SqsTemplateTests.java b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/operations/SqsTemplateTests.java index f9ab5cec7..e9234842c 100644 --- a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/operations/SqsTemplateTests.java +++ b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/operations/SqsTemplateTests.java @@ -20,8 +20,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.BDDMockito.given; import static org.mockito.BDDMockito.then; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; +import static org.mockito.Mockito.*; import com.fasterxml.jackson.databind.ObjectMapper; import io.awspring.cloud.sqs.QueueAttributesResolvingException; @@ -47,23 +46,7 @@ import org.springframework.messaging.MessageHeaders; import org.springframework.messaging.support.MessageBuilder; import software.amazon.awssdk.services.sqs.SqsAsyncClient; -import software.amazon.awssdk.services.sqs.model.CreateQueueResponse; -import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest; -import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse; -import software.amazon.awssdk.services.sqs.model.GetQueueAttributesRequest; -import software.amazon.awssdk.services.sqs.model.GetQueueAttributesResponse; -import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest; -import software.amazon.awssdk.services.sqs.model.GetQueueUrlResponse; -import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName; -import software.amazon.awssdk.services.sqs.model.QueueAttributeName; -import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException; -import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; -import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse; -import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest; -import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; -import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse; -import software.amazon.awssdk.services.sqs.model.SendMessageRequest; -import software.amazon.awssdk.services.sqs.model.SendMessageResponse; +import software.amazon.awssdk.services.sqs.model.*; /** * @author Tomaz Fernandes @@ -1208,4 +1191,35 @@ void shouldReceiveBatchFifo() { } + @Test + void shouldPropagateTracingAsMessageSystemAttribute() { + String queue = "test-queue"; + GetQueueUrlResponse urlResponse = GetQueueUrlResponse.builder().queueUrl(queue).build(); + given(mockClient.getQueueUrl(any(GetQueueUrlRequest.class))) + .willReturn(CompletableFuture.completedFuture(urlResponse)); + mockQueueAttributes(mockClient, Map.of()); + SendMessageResponse response = SendMessageResponse.builder().messageId(UUID.randomUUID().toString()) + .sequenceNumber("123").build(); + given(mockClient.sendMessage(any(SendMessageRequest.class))) + .willReturn(CompletableFuture.completedFuture(response)); + + SqsOperations sqsOperations = SqsTemplate.newSyncTemplate(mockClient); + SendResult result = sqsOperations.send(options -> options + .queue(queue) + .header(SqsHeaders.MessageSystemAttributes.SQS_AWS_TRACE_HEADER, "abc") + .payload("test") + ); + + assertThat(result).isNotNull(); + + ArgumentCaptor captor = ArgumentCaptor.forClass(SendMessageRequest.class); + then(mockClient).should().sendMessage(captor.capture()); + SendMessageRequest sendMessageRequest = captor.getValue(); + + assertThat(sendMessageRequest.messageSystemAttributes()).hasEntrySatisfying( + MessageSystemAttributeNameForSends.AWS_TRACE_HEADER, + value -> assertThat(value.stringValue()).isEqualTo("abc") + ); + } + } diff --git a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/support/converter/SqsHeaderMapperTests.java b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/support/converter/SqsHeaderMapperTests.java index e9a770aa1..7e61df01d 100644 --- a/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/support/converter/SqsHeaderMapperTests.java +++ b/spring-cloud-aws-sqs/src/test/java/io/awspring/cloud/sqs/support/converter/SqsHeaderMapperTests.java @@ -21,6 +21,8 @@ import java.util.Map; import java.util.UUID; import java.util.stream.Stream; + +import io.awspring.cloud.sqs.listener.SqsHeaders; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -29,6 +31,7 @@ import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.services.sqs.model.Message; import software.amazon.awssdk.services.sqs.model.MessageAttributeValue; +import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName; /** * Tests for {@link SqsHeaderMapper}. @@ -139,6 +142,36 @@ void shouldAddNumberMessageAttributes() { assertThat(headers.get(headerName)).isEqualTo(headerValue); } + @Test + void shouldCreateMessageWithSystemAttributesFromHeaders() { + MessageHeaders headers = new MessageHeaders( + Map.of( + SqsHeaders.MessageSystemAttributes.SQS_MESSAGE_GROUP_ID_HEADER, "value1", + SqsHeaders.MessageSystemAttributes.SQS_MESSAGE_DEDUPLICATION_ID_HEADER, "value2", + SqsHeaders.MessageSystemAttributes.SQS_AWS_TRACE_HEADER, "value3", + "customHeaderString", "customValueString", + "customHeaderNumber", 42 + ) + ); + + SqsHeaderMapper mapper = new SqsHeaderMapper(); + Message message = mapper.fromHeaders(headers); + + assertThat(message.attributes()) + .hasSize(3) + .containsExactlyInAnyOrderEntriesOf(Map.of( + MessageSystemAttributeName.MESSAGE_GROUP_ID, "value1", + MessageSystemAttributeName.MESSAGE_DEDUPLICATION_ID, "value2", + MessageSystemAttributeName.AWS_TRACE_HEADER, "value3" + )); + assertThat(message.messageAttributes()) + .hasSize(2) + .containsExactlyInAnyOrderEntriesOf(Map.of( + "customHeaderString", MessageAttributeValue.builder().dataType(MessageAttributeDataTypes.STRING).stringValue("customValueString").build(), + "customHeaderNumber", MessageAttributeValue.builder().dataType(MessageAttributeDataTypes.NUMBER + ".java.lang.Integer").stringValue("42").build() + )); + } + @ParameterizedTest @MethodSource("validArguments") void createsMessageWithNumberHeader(String value, String type, Number expected) {