Skip to content

Commit 3c29149

Browse files
committed
GH-2707 Add support for function post processing
1 parent 06a86a1 commit 3c29149

File tree

2 files changed

+247
-0
lines changed

2 files changed

+247
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,245 @@
1+
/*
2+
* Copyright 2023-2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.cloud.stream.function;
18+
19+
import java.util.function.Function;
20+
21+
import org.junit.jupiter.api.Test;
22+
23+
import org.springframework.boot.WebApplicationType;
24+
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
25+
import org.springframework.boot.builder.SpringApplicationBuilder;
26+
import org.springframework.cloud.function.context.PostProcessingFunction;
27+
import org.springframework.cloud.stream.binder.test.InputDestination;
28+
import org.springframework.cloud.stream.binder.test.OutputDestination;
29+
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
30+
import org.springframework.context.ConfigurableApplicationContext;
31+
import org.springframework.context.annotation.Bean;
32+
import org.springframework.integration.support.MessageBuilder;
33+
import org.springframework.messaging.Message;
34+
35+
import static org.assertj.core.api.Assertions.assertThat;
36+
37+
/**
38+
*
39+
* @author Oleg Zhurakousky
40+
*
41+
*/
42+
public class FunctionPostProcessingTests {
43+
44+
@Test
45+
void testNothingIsBroken() {
46+
System.clearProperty("spring.cloud.function.definition");
47+
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
48+
TestChannelBinderConfiguration.getCompleteConfiguration(PostProcessingTestConfiguration.class))
49+
.web(WebApplicationType.NONE).run("--spring.jmx.enabled=false", "--spring.cloud.function.definition=echo")) {
50+
51+
InputDestination inputDestination = context.getBean(InputDestination.class);
52+
Message<byte[]> inputMessage = MessageBuilder.withPayload("hello".getBytes()).build();
53+
inputDestination.send(inputMessage);
54+
55+
OutputDestination outputDestination = context.getBean(OutputDestination.class);
56+
57+
assertThat(outputDestination.receive().getPayload()).isEqualTo("hello".getBytes());
58+
}
59+
}
60+
61+
@Test
62+
void testSuccessfulPostProcessingOfSingleFunction() {
63+
System.clearProperty("spring.cloud.function.definition");
64+
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
65+
TestChannelBinderConfiguration.getCompleteConfiguration(PostProcessingTestConfiguration.class))
66+
.web(WebApplicationType.NONE).run("--spring.jmx.enabled=false", "--spring.cloud.function.definition=uppercase")) {
67+
68+
InputDestination inputDestination = context.getBean(InputDestination.class);
69+
Message<byte[]> inputMessage = MessageBuilder.withPayload("hello".getBytes()).build();
70+
inputDestination.send(inputMessage);
71+
72+
OutputDestination outputDestination = context.getBean(OutputDestination.class);
73+
74+
assertThat(outputDestination.receive().getPayload()).isEqualTo("HELLO".getBytes());
75+
assertThat(context.getBean(SingleFunctionPostProcessingFunction.class).success).isTrue();
76+
}
77+
}
78+
79+
@Test
80+
void testNoPostProcessingOnError() {
81+
System.clearProperty("spring.cloud.function.definition");
82+
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
83+
TestChannelBinderConfiguration.getCompleteConfiguration(PostProcessingTestConfiguration.class))
84+
.web(WebApplicationType.NONE).run("--spring.jmx.enabled=false", "--spring.cloud.function.definition=uppercase")) {
85+
86+
InputDestination inputDestination = context.getBean(InputDestination.class);
87+
Message<byte[]> inputMessage = MessageBuilder.withPayload("error".getBytes()).build();
88+
inputDestination.send(inputMessage);
89+
90+
OutputDestination outputDestination = context.getBean(OutputDestination.class);
91+
92+
assertThat(outputDestination.receive()).isNull();
93+
assertThat(context.getBean(SingleFunctionPostProcessingFunction.class).success).isFalse();
94+
}
95+
}
96+
97+
@Test
98+
void testNoFailureOnPostProcessingError() {
99+
System.clearProperty("spring.cloud.function.definition");
100+
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
101+
TestChannelBinderConfiguration.getCompleteConfiguration(PostProcessingTestConfiguration.class))
102+
.web(WebApplicationType.NONE).run("--spring.jmx.enabled=false", "--spring.cloud.function.definition=uppercase")) {
103+
104+
InputDestination inputDestination = context.getBean(InputDestination.class);
105+
Message<byte[]> inputMessage = MessageBuilder.withPayload("post_processing_error".getBytes()).build();
106+
inputDestination.send(inputMessage);
107+
108+
OutputDestination outputDestination = context.getBean(OutputDestination.class);
109+
110+
assertThat(outputDestination.receive().getPayload()).isEqualTo("POST_PROCESSING_ERROR".getBytes());
111+
assertThat(context.getBean(SingleFunctionPostProcessingFunction.class).success).isFalse();
112+
}
113+
}
114+
115+
116+
@Test
117+
void testWithCompositionLastFunctionIsPostProcessing() {
118+
System.clearProperty("spring.cloud.function.definition");
119+
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
120+
TestChannelBinderConfiguration.getCompleteConfiguration(PostProcessingTestConfiguration.class))
121+
.web(WebApplicationType.NONE).run("--spring.jmx.enabled=false", "--spring.cloud.function.definition=echo|uppercase")) {
122+
123+
InputDestination inputDestination = context.getBean(InputDestination.class);
124+
Message<byte[]> inputMessage = MessageBuilder.withPayload("hello".getBytes()).build();
125+
inputDestination.send(inputMessage);
126+
127+
OutputDestination outputDestination = context.getBean(OutputDestination.class);
128+
129+
assertThat(outputDestination.receive().getPayload()).isEqualTo("HELLO".getBytes());
130+
assertThat(context.getBean(SingleFunctionPostProcessingFunction.class).success).isTrue();
131+
}
132+
}
133+
134+
@Test
135+
void testWithCompositionFirstFunctionIsPostProcessing() {
136+
System.clearProperty("spring.cloud.function.definition");
137+
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
138+
TestChannelBinderConfiguration.getCompleteConfiguration(PostProcessingTestConfiguration.class))
139+
.web(WebApplicationType.NONE).run("--spring.jmx.enabled=false", "--spring.cloud.function.definition=uppercase|echo")) {
140+
141+
InputDestination inputDestination = context.getBean(InputDestination.class);
142+
Message<byte[]> inputMessage = MessageBuilder.withPayload("hello".getBytes()).build();
143+
inputDestination.send(inputMessage);
144+
145+
OutputDestination outputDestination = context.getBean(OutputDestination.class);
146+
147+
assertThat(outputDestination.receive().getPayload()).isEqualTo("HELLO".getBytes());
148+
assertThat(context.getBean(SingleFunctionPostProcessingFunction.class).success).isFalse();
149+
}
150+
}
151+
152+
@Test
153+
void testOlnyLastPostProcessorInvoked() {
154+
System.clearProperty("spring.cloud.function.definition");
155+
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
156+
TestChannelBinderConfiguration.getCompleteConfiguration(PostProcessingTestConfiguration.class))
157+
.web(WebApplicationType.NONE).run("--spring.jmx.enabled=false", "--spring.cloud.function.definition=echo|uppercase|reverse")) {
158+
159+
InputDestination inputDestination = context.getBean(InputDestination.class);
160+
Message<byte[]> inputMessage = MessageBuilder.withPayload("hello".getBytes()).build();
161+
inputDestination.send(inputMessage);
162+
163+
OutputDestination outputDestination = context.getBean(OutputDestination.class);
164+
165+
assertThat(outputDestination.receive().getPayload()).isEqualTo("OLLEH".getBytes());
166+
assertThat(context.getBean(SingleFunctionPostProcessingFunction.class).success).isFalse();
167+
assertThat(context.getBean(SingleFunctionPostProcessingFunction2.class).success).isTrue();
168+
}
169+
}
170+
171+
@Test
172+
void testOlnyLastPostProcessorInvoked2() {
173+
System.clearProperty("spring.cloud.function.definition");
174+
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
175+
TestChannelBinderConfiguration.getCompleteConfiguration(PostProcessingTestConfiguration.class))
176+
.web(WebApplicationType.NONE).run("--spring.jmx.enabled=false", "--spring.cloud.function.definition=uppercase|echo|reverse")) {
177+
178+
InputDestination inputDestination = context.getBean(InputDestination.class);
179+
Message<byte[]> inputMessage = MessageBuilder.withPayload("hello".getBytes()).build();
180+
inputDestination.send(inputMessage);
181+
182+
OutputDestination outputDestination = context.getBean(OutputDestination.class);
183+
184+
assertThat(outputDestination.receive().getPayload()).isEqualTo("OLLEH".getBytes());
185+
assertThat(context.getBean(SingleFunctionPostProcessingFunction.class).success).isFalse();
186+
assertThat(context.getBean(SingleFunctionPostProcessingFunction2.class).success).isTrue();
187+
}
188+
}
189+
190+
191+
@EnableAutoConfiguration
192+
public static class PostProcessingTestConfiguration {
193+
194+
@Bean
195+
public Function<String, String> echo() {
196+
return x -> x;
197+
}
198+
199+
@Bean
200+
public Function<String, String> uppercase() {
201+
return new SingleFunctionPostProcessingFunction();
202+
}
203+
204+
@Bean
205+
public Function<String, String> reverse() {
206+
return new SingleFunctionPostProcessingFunction2();
207+
}
208+
}
209+
210+
private static class SingleFunctionPostProcessingFunction implements PostProcessingFunction<String, String> {
211+
212+
private boolean success;
213+
214+
@Override
215+
public String apply(String input) {
216+
if (input.equals("error")) {
217+
throw new RuntimeException("intentional");
218+
}
219+
return input.toUpperCase();
220+
}
221+
222+
@Override
223+
public void postProcess(Message<String> result) {
224+
if (result.getPayload().equals("POST_PROCESSING_ERROR")) {
225+
throw new RuntimeException("intentional");
226+
}
227+
success = true;
228+
}
229+
}
230+
231+
private static class SingleFunctionPostProcessingFunction2 implements PostProcessingFunction<String, String> {
232+
233+
private boolean success;
234+
235+
@Override
236+
public String apply(String input) {
237+
return new StringBuilder(input).reverse().toString();
238+
}
239+
240+
@Override
241+
public void postProcess(Message<String> result) {
242+
success = true;
243+
}
244+
}
245+
}

core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/FunctionConfiguration.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -614,6 +614,7 @@ private AbstractMessageHandler createFunctionHandler(FunctionInvocationWrapper f
614614

615615
MessagingTemplate template = new MessagingTemplate();
616616
template.setBeanFactory(applicationContext.getBeanFactory());
617+
617618
AbstractMessageHandler handler = new AbstractMessageHandler() {
618619
@SuppressWarnings("unchecked")
619620
@Override
@@ -659,6 +660,7 @@ else if (function.isRoutingFunction()) {
659660
}
660661
streamBridge.send(function.getFunctionDefinition() + "-out-0", result);
661662
}
663+
function.postProcess();
662664
}
663665

664666
};

0 commit comments

Comments
 (0)