Skip to content

Commit 5255e7a

Browse files
committed
Support CompletableFuture in @MessageMapping handler methods
Issue: SPR-12207
1 parent d3db99c commit 5255e7a

File tree

4 files changed

+212
-0
lines changed

4 files changed

+212
-0
lines changed
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Copyright 2002-2015 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.util.concurrent;
18+
19+
import java.util.concurrent.CompletableFuture;
20+
import java.util.concurrent.ExecutionException;
21+
import java.util.concurrent.TimeUnit;
22+
import java.util.concurrent.TimeoutException;
23+
import java.util.function.BiFunction;
24+
25+
import org.springframework.lang.UsesJava8;
26+
27+
28+
/**
29+
* Adapts a {@link CompletableFuture} into a {@link ListenableFuture}.
30+
*
31+
* @author Sebastien Deleuze
32+
* @since 4.2
33+
*/
34+
@UsesJava8
35+
public class CompletableToListenableFutureAdapter<T> implements ListenableFuture<T> {
36+
37+
private final CompletableFuture<T> completableFuture;
38+
39+
private final ListenableFutureCallbackRegistry<T> callbacks = new ListenableFutureCallbackRegistry<T>();
40+
41+
public CompletableToListenableFutureAdapter(CompletableFuture<T> completableFuture) {
42+
this.completableFuture = completableFuture;
43+
this.completableFuture.handle(new BiFunction<T, Throwable, Object>() {
44+
@Override
45+
public Object apply(T result, Throwable ex) {
46+
if (ex != null) {
47+
callbacks.failure(ex);
48+
}
49+
else {
50+
callbacks.success(result);
51+
}
52+
return null;
53+
}
54+
});
55+
}
56+
57+
@Override
58+
public void addCallback(ListenableFutureCallback<? super T> callback) {
59+
this.callbacks.addCallback(callback);
60+
}
61+
62+
@Override
63+
public void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback) {
64+
this.callbacks.addSuccessCallback(successCallback);
65+
this.callbacks.addFailureCallback(failureCallback);
66+
}
67+
68+
@Override
69+
public boolean cancel(boolean mayInterruptIfRunning) {
70+
return this.completableFuture.cancel(mayInterruptIfRunning);
71+
}
72+
73+
@Override
74+
public boolean isCancelled() {
75+
return this.completableFuture.isCancelled();
76+
}
77+
78+
@Override
79+
public boolean isDone() {
80+
return this.completableFuture.isDone();
81+
}
82+
83+
@Override
84+
public T get() throws InterruptedException, ExecutionException {
85+
return this.completableFuture.get();
86+
}
87+
88+
@Override
89+
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
90+
return this.completableFuture.get(timeout, unit);
91+
}
92+
93+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Copyright 2002-2015 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.messaging.handler.invocation;
18+
19+
import java.util.concurrent.CompletableFuture;
20+
21+
import org.springframework.core.MethodParameter;
22+
import org.springframework.lang.UsesJava8;
23+
import org.springframework.util.concurrent.CompletableToListenableFutureAdapter;
24+
import org.springframework.util.concurrent.ListenableFuture;
25+
26+
/**
27+
* An {@link AsyncHandlerMethodReturnValueHandler} for {@link CompletableFuture} return type handling.
28+
*
29+
* @author Sebastien Deleuze
30+
* @since 4.2
31+
*/
32+
@UsesJava8
33+
public class CompletableFutureReturnValueHandler extends AbstractAsyncReturnValueHandler {
34+
35+
@Override
36+
public boolean supportsReturnType(MethodParameter returnType) {
37+
return CompletableFuture.class.isAssignableFrom(returnType.getParameterType());
38+
}
39+
40+
@Override
41+
@SuppressWarnings("unchecked")
42+
public ListenableFuture<?> toListenableFuture(Object returnValue, MethodParameter returnType) {
43+
return new CompletableToListenableFutureAdapter<Object>((CompletableFuture<Object>)returnValue);
44+
}
45+
46+
}

spring-messaging/src/main/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandler.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver;
5151
import org.springframework.messaging.handler.invocation.AbstractExceptionHandlerMethodResolver;
5252
import org.springframework.messaging.handler.invocation.AbstractMethodMessageHandler;
53+
import org.springframework.messaging.handler.invocation.CompletableFutureReturnValueHandler;
5354
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
5455
import org.springframework.messaging.handler.invocation.HandlerMethodReturnValueHandler;
5556
import org.springframework.messaging.handler.invocation.ListenableFutureReturnValueHandler;
@@ -83,6 +84,10 @@
8384
public class SimpAnnotationMethodMessageHandler extends AbstractMethodMessageHandler<SimpMessageMappingInfo>
8485
implements SmartLifecycle {
8586

87+
private static final boolean completableFuturePresent = ClassUtils.isPresent("java.util.concurrent.CompletableFuture",
88+
SimpAnnotationMethodMessageHandler.class.getClassLoader());
89+
90+
8691
private final SubscribableChannel clientInboundChannel;
8792

8893
private final SimpMessageSendingOperations clientMessagingTemplate;
@@ -318,6 +323,10 @@ protected List<? extends HandlerMethodReturnValueHandler> initReturnValueHandler
318323
// Single-purpose return value types
319324
ListenableFutureReturnValueHandler lfh = new ListenableFutureReturnValueHandler();
320325
handlers.add(lfh);
326+
if (completableFuturePresent) {
327+
CompletableFutureReturnValueHandler cfh = new CompletableFutureReturnValueHandler();
328+
handlers.add(cfh);
329+
}
321330

322331
// Annotation-based return value types
323332
SendToMethodReturnValueHandler sth = new SendToMethodReturnValueHandler(this.brokerTemplate, true);

spring-messaging/src/test/java/org/springframework/messaging/simp/annotation/support/SimpAnnotationMethodMessageHandlerTests.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.LinkedHashMap;
2222
import java.util.Map;
2323
import java.util.Optional;
24+
import java.util.concurrent.CompletableFuture;
2425
import java.util.concurrent.ConcurrentHashMap;
2526

2627
import org.junit.Before;
@@ -294,6 +295,51 @@ public void listenableFutureFailure() {
294295
assertTrue(controller.exceptionCatched);
295296
}
296297

298+
@Test
299+
@SuppressWarnings("unchecked")
300+
public void completableFutureSuccess() {
301+
302+
given(this.channel.send(any(Message.class))).willReturn(true);
303+
given(this.converter.toMessage(anyObject(), any(MessageHeaders.class)))
304+
.willReturn((Message) MessageBuilder.withPayload(new byte[0]).build());
305+
306+
CompletableFutureController controller = new CompletableFutureController();
307+
this.messageHandler.registerHandler(controller);
308+
this.messageHandler.setDestinationPrefixes(Arrays.asList("/app1", "/app2/"));
309+
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.create();
310+
headers.setSessionId("session1");
311+
headers.setSessionAttributes(new HashMap<>());
312+
headers.setDestination("/app1/completable-future");
313+
Message<?> message = MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build();
314+
this.messageHandler.handleMessage(message);
315+
316+
assertNotNull(controller.future);
317+
controller.future.complete("foo");
318+
verify(this.converter).toMessage(this.payloadCaptor.capture(), any(MessageHeaders.class));
319+
assertEquals("foo", this.payloadCaptor.getValue());
320+
}
321+
322+
@Test
323+
@SuppressWarnings("unchecked")
324+
public void completableFutureFailure() {
325+
326+
given(this.channel.send(any(Message.class))).willReturn(true);
327+
given(this.converter.toMessage(anyObject(), any(MessageHeaders.class)))
328+
.willReturn((Message) MessageBuilder.withPayload(new byte[0]).build());
329+
330+
CompletableFutureController controller = new CompletableFutureController();
331+
this.messageHandler.registerHandler(controller);
332+
this.messageHandler.setDestinationPrefixes(Arrays.asList("/app1", "/app2/"));
333+
SimpMessageHeaderAccessor headers = SimpMessageHeaderAccessor.create();
334+
headers.setSessionId("session1");
335+
headers.setSessionAttributes(new HashMap<>());
336+
headers.setDestination("/app1/completable-future");
337+
Message<?> message = MessageBuilder.withPayload(new byte[0]).setHeaders(headers).build();
338+
this.messageHandler.handleMessage(message);
339+
340+
controller.future.completeExceptionally(new IllegalStateException());
341+
assertTrue(controller.exceptionCatched);
342+
}
297343

298344
private static class TestSimpAnnotationMethodMessageHandler extends SimpAnnotationMethodMessageHandler {
299345

@@ -413,6 +459,24 @@ public void handleValidationException() {
413459

414460
}
415461

462+
@Controller
463+
private static class CompletableFutureController {
464+
465+
private CompletableFuture<String> future;
466+
private boolean exceptionCatched = false;
467+
468+
@MessageMapping("completable-future")
469+
public CompletableFuture<String> handleCompletableFuture() {
470+
this.future = new CompletableFuture<>();
471+
return this.future;
472+
}
473+
474+
@MessageExceptionHandler(IllegalStateException.class)
475+
public void handleValidationException() {
476+
this.exceptionCatched = true;
477+
}
478+
479+
}
416480

417481
private static class StringTestValidator implements Validator {
418482

0 commit comments

Comments
 (0)