Skip to content

Commit 705da29

Browse files
committed
Harmonize WebSocket message broker to use Executor
This commit harmonizes the configuration of the WebSocket message broker to use Executor rather than TaskExecutor as only the former is enforced. This lets custom configuration to use a wider range of implementations. Closes spring-projectsgh-32129
1 parent 6795865 commit 705da29

File tree

5 files changed

+81
-82
lines changed

5 files changed

+81
-82
lines changed

spring-messaging/src/main/java/org/springframework/messaging/simp/config/AbstractMessageBrokerConfiguration.java

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,6 +21,7 @@
2121
import java.util.HashMap;
2222
import java.util.List;
2323
import java.util.Map;
24+
import java.util.concurrent.Executor;
2425
import java.util.function.Supplier;
2526

2627
import org.springframework.beans.factory.BeanInitializationException;
@@ -29,7 +30,6 @@
2930
import org.springframework.context.ApplicationContextAware;
3031
import org.springframework.context.annotation.Bean;
3132
import org.springframework.context.event.SmartApplicationListener;
32-
import org.springframework.core.task.TaskExecutor;
3333
import org.springframework.lang.Nullable;
3434
import org.springframework.messaging.MessageHandler;
3535
import org.springframework.messaging.converter.ByteArrayMessageConverter;
@@ -153,7 +153,7 @@ public ApplicationContext getApplicationContext() {
153153

154154
@Bean
155155
public AbstractSubscribableChannel clientInboundChannel(
156-
@Qualifier("clientInboundChannelExecutor") TaskExecutor executor) {
156+
@Qualifier("clientInboundChannelExecutor") Executor executor) {
157157

158158
ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(executor);
159159
channel.setLogger(SimpLogging.forLog(channel.getLogger()));
@@ -165,9 +165,9 @@ public AbstractSubscribableChannel clientInboundChannel(
165165
}
166166

167167
@Bean
168-
public TaskExecutor clientInboundChannelExecutor() {
169-
return getTaskExecutor(getClientInboundChannelRegistration(),
170-
"clientInboundChannel-", this::defaultTaskExecutor);
168+
public Executor clientInboundChannelExecutor() {
169+
return getExecutor(getClientInboundChannelRegistration(),
170+
"clientInboundChannel-", this::defaultExecutor);
171171
}
172172

173173
protected final ChannelRegistration getClientInboundChannelRegistration() {
@@ -189,7 +189,7 @@ protected void configureClientInboundChannel(ChannelRegistration registration) {
189189

190190
@Bean
191191
public AbstractSubscribableChannel clientOutboundChannel(
192-
@Qualifier("clientOutboundChannelExecutor") TaskExecutor executor) {
192+
@Qualifier("clientOutboundChannelExecutor") Executor executor) {
193193

194194
ExecutorSubscribableChannel channel = new ExecutorSubscribableChannel(executor);
195195
channel.setLogger(SimpLogging.forLog(channel.getLogger()));
@@ -201,9 +201,9 @@ public AbstractSubscribableChannel clientOutboundChannel(
201201
}
202202

203203
@Bean
204-
public TaskExecutor clientOutboundChannelExecutor() {
205-
return getTaskExecutor(getClientOutboundChannelRegistration(),
206-
"clientOutboundChannel-", this::defaultTaskExecutor);
204+
public Executor clientOutboundChannelExecutor() {
205+
return getExecutor(getClientOutboundChannelRegistration(),
206+
"clientOutboundChannel-", this::defaultExecutor);
207207
}
208208

209209
protected final ChannelRegistration getClientOutboundChannelRegistration() {
@@ -226,11 +226,11 @@ protected void configureClientOutboundChannel(ChannelRegistration registration)
226226
@Bean
227227
public AbstractSubscribableChannel brokerChannel(
228228
AbstractSubscribableChannel clientInboundChannel, AbstractSubscribableChannel clientOutboundChannel,
229-
@Qualifier("brokerChannelExecutor") TaskExecutor executor) {
229+
@Qualifier("brokerChannelExecutor") Executor executor) {
230230

231231
MessageBrokerRegistry registry = getBrokerRegistry(clientInboundChannel, clientOutboundChannel);
232232
ChannelRegistration registration = registry.getBrokerChannelRegistration();
233-
ExecutorSubscribableChannel channel = (registration.hasTaskExecutor() ?
233+
ExecutorSubscribableChannel channel = (registration.hasExecutor() ?
234234
new ExecutorSubscribableChannel(executor) : new ExecutorSubscribableChannel());
235235
registration.interceptors(new ImmutableMessageChannelInterceptor());
236236
channel.setLogger(SimpLogging.forLog(channel.getLogger()));
@@ -239,34 +239,34 @@ public AbstractSubscribableChannel brokerChannel(
239239
}
240240

241241
@Bean
242-
public TaskExecutor brokerChannelExecutor(
242+
public Executor brokerChannelExecutor(
243243
AbstractSubscribableChannel clientInboundChannel, AbstractSubscribableChannel clientOutboundChannel) {
244244

245245
MessageBrokerRegistry registry = getBrokerRegistry(clientInboundChannel, clientOutboundChannel);
246246
ChannelRegistration registration = registry.getBrokerChannelRegistration();
247-
return getTaskExecutor(registration, "brokerChannel-", () -> {
247+
return getExecutor(registration, "brokerChannel-", () -> {
248248
// Should never be used
249-
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
250-
threadPoolTaskExecutor.setCorePoolSize(0);
251-
threadPoolTaskExecutor.setMaxPoolSize(1);
252-
threadPoolTaskExecutor.setQueueCapacity(0);
253-
return threadPoolTaskExecutor;
249+
ThreadPoolTaskExecutor fallbackExecutor = new ThreadPoolTaskExecutor();
250+
fallbackExecutor.setCorePoolSize(0);
251+
fallbackExecutor.setMaxPoolSize(1);
252+
fallbackExecutor.setQueueCapacity(0);
253+
return fallbackExecutor;
254254
});
255255
}
256256

257-
private TaskExecutor defaultTaskExecutor() {
257+
private Executor defaultExecutor() {
258258
return new TaskExecutorRegistration().getTaskExecutor();
259259
}
260260

261-
private static TaskExecutor getTaskExecutor(ChannelRegistration registration,
262-
String threadNamePrefix, Supplier<TaskExecutor> fallback) {
261+
private static Executor getExecutor(ChannelRegistration registration,
262+
String threadNamePrefix, Supplier<Executor> fallback) {
263263

264-
return registration.getTaskExecutor(fallback,
264+
return registration.getExecutor(fallback,
265265
executor -> setThreadNamePrefix(executor, threadNamePrefix));
266266
}
267267

268-
private static void setThreadNamePrefix(TaskExecutor taskExecutor, String name) {
269-
if (taskExecutor instanceof CustomizableThreadCreator ctc) {
268+
private static void setThreadNamePrefix(Executor executor, String name) {
269+
if (executor instanceof CustomizableThreadCreator ctc) {
270270
ctc.setThreadNamePrefix(name);
271271
}
272272
}

spring-messaging/src/main/java/org/springframework/messaging/simp/config/ChannelRegistration.java

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@
1919
import java.util.ArrayList;
2020
import java.util.Arrays;
2121
import java.util.List;
22+
import java.util.concurrent.Executor;
2223
import java.util.function.Consumer;
2324
import java.util.function.Supplier;
2425

25-
import org.springframework.core.task.TaskExecutor;
2626
import org.springframework.lang.Nullable;
2727
import org.springframework.messaging.support.ChannelInterceptor;
2828
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@@ -41,7 +41,7 @@ public class ChannelRegistration {
4141
private TaskExecutorRegistration registration;
4242

4343
@Nullable
44-
private TaskExecutor executor;
44+
private Executor executor;
4545

4646
private final List<ChannelInterceptor> interceptors = new ArrayList<>();
4747

@@ -67,14 +67,14 @@ public TaskExecutorRegistration taskExecutor(@Nullable ThreadPoolTaskExecutor ta
6767
}
6868

6969
/**
70-
* Configure the given {@link TaskExecutor} for this message channel,
70+
* Configure the given {@link Executor} for this message channel,
7171
* taking precedence over a {@linkplain #taskExecutor() task executor
7272
* registration} if any.
73-
* @param taskExecutor the task executor to use
73+
* @param executor the executor to use
7474
* @since 6.1.4
7575
*/
76-
public ChannelRegistration executor(TaskExecutor taskExecutor) {
77-
this.executor = taskExecutor;
76+
public ChannelRegistration executor(Executor executor) {
77+
this.executor = executor;
7878
return this;
7979
}
8080

@@ -89,7 +89,7 @@ public ChannelRegistration interceptors(ChannelInterceptor... interceptors) {
8989
}
9090

9191

92-
protected boolean hasTaskExecutor() {
92+
protected boolean hasExecutor() {
9393
return (this.registration != null || this.executor != null);
9494
}
9595

@@ -98,18 +98,17 @@ protected boolean hasInterceptors() {
9898
}
9999

100100
/**
101-
* Return the {@link TaskExecutor} to use. If no task executor has been
102-
* configured, the {@code fallback} supplier is used to provide a fallback
103-
* instance.
101+
* Return the {@link Executor} to use. If no executor has been configured,
102+
* the {@code fallback} supplier is used to provide a fallback instance.
104103
* <p>
105-
* If the {@link TaskExecutor} to use is suitable for further customizations,
104+
* If the {@link Executor} to use is suitable for further customizations,
106105
* the {@code customizer} consumer is invoked.
107-
* @param fallback a supplier of a fallback task executor in case none is configured
106+
* @param fallback a supplier of a fallback executor in case none is configured
108107
* @param customizer further customizations
109-
* @return the task executor to use
108+
* @return the executor to use
110109
* @since 6.1.4
111110
*/
112-
protected TaskExecutor getTaskExecutor(Supplier<TaskExecutor> fallback, Consumer<TaskExecutor> customizer) {
111+
protected Executor getExecutor(Supplier<Executor> fallback, Consumer<Executor> customizer) {
113112
if (this.executor != null) {
114113
return this.executor;
115114
}
@@ -119,9 +118,9 @@ else if (this.registration != null) {
119118
return registeredTaskExecutor;
120119
}
121120
else {
122-
TaskExecutor taskExecutor = fallback.get();
123-
customizer.accept(taskExecutor);
124-
return taskExecutor;
121+
Executor fallbackExecutor = fallback.get();
122+
customizer.accept(fallbackExecutor);
123+
return fallbackExecutor;
125124
}
126125
}
127126

spring-messaging/src/test/java/org/springframework/messaging/simp/config/ChannelRegistrationTests.java

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,12 @@
1616

1717
package org.springframework.messaging.simp.config;
1818

19+
import java.util.concurrent.Executor;
1920
import java.util.function.Consumer;
2021
import java.util.function.Supplier;
2122

2223
import org.junit.jupiter.api.Test;
2324

24-
import org.springframework.core.task.TaskExecutor;
2525
import org.springframework.messaging.support.ChannelInterceptor;
2626
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
2727

@@ -38,20 +38,20 @@
3838
*/
3939
class ChannelRegistrationTests {
4040

41-
private final Supplier<TaskExecutor> fallback = mock();
41+
private final Supplier<Executor> fallback = mock();
4242

43-
private final Consumer<TaskExecutor> customizer = mock();
43+
private final Consumer<Executor> customizer = mock();
4444

4545
@Test
4646
void emptyRegistrationUsesFallback() {
47-
TaskExecutor fallbackTaskExecutor = mock(TaskExecutor.class);
48-
given(this.fallback.get()).willReturn(fallbackTaskExecutor);
47+
Executor fallbackExecutor = mock(Executor.class);
48+
given(this.fallback.get()).willReturn(fallbackExecutor);
4949
ChannelRegistration registration = new ChannelRegistration();
50-
assertThat(registration.hasTaskExecutor()).isFalse();
51-
TaskExecutor actual = registration.getTaskExecutor(this.fallback, this.customizer);
52-
assertThat(actual).isSameAs(fallbackTaskExecutor);
50+
assertThat(registration.hasExecutor()).isFalse();
51+
Executor actual = registration.getExecutor(this.fallback, this.customizer);
52+
assertThat(actual).isSameAs(fallbackExecutor);
5353
verify(this.fallback).get();
54-
verify(this.customizer).accept(fallbackTaskExecutor);
54+
verify(this.customizer).accept(fallbackExecutor);
5555
}
5656

5757
@Test
@@ -65,45 +65,45 @@ void emptyRegistrationDoesNotHaveInterceptors() {
6565
void taskRegistrationCreatesDefaultInstance() {
6666
ChannelRegistration registration = new ChannelRegistration();
6767
registration.taskExecutor();
68-
assertThat(registration.hasTaskExecutor()).isTrue();
69-
TaskExecutor taskExecutor = registration.getTaskExecutor(this.fallback, this.customizer);
70-
assertThat(taskExecutor).isInstanceOf(ThreadPoolTaskExecutor.class);
68+
assertThat(registration.hasExecutor()).isTrue();
69+
Executor executor = registration.getExecutor(this.fallback, this.customizer);
70+
assertThat(executor).isInstanceOf(ThreadPoolTaskExecutor.class);
7171
verifyNoInteractions(this.fallback);
72-
verify(this.customizer).accept(taskExecutor);
72+
verify(this.customizer).accept(executor);
7373
}
7474

7575
@Test
7676
void taskRegistrationWithExistingThreadPoolTaskExecutor() {
77-
ThreadPoolTaskExecutor existingTaskExecutor = mock(ThreadPoolTaskExecutor.class);
77+
ThreadPoolTaskExecutor existingExecutor = mock(ThreadPoolTaskExecutor.class);
7878
ChannelRegistration registration = new ChannelRegistration();
79-
registration.taskExecutor(existingTaskExecutor);
80-
assertThat(registration.hasTaskExecutor()).isTrue();
81-
TaskExecutor taskExecutor = registration.getTaskExecutor(this.fallback, this.customizer);
82-
assertThat(taskExecutor).isSameAs(existingTaskExecutor);
79+
registration.taskExecutor(existingExecutor);
80+
assertThat(registration.hasExecutor()).isTrue();
81+
Executor executor = registration.getExecutor(this.fallback, this.customizer);
82+
assertThat(executor).isSameAs(existingExecutor);
8383
verifyNoInteractions(this.fallback);
84-
verify(this.customizer).accept(taskExecutor);
84+
verify(this.customizer).accept(executor);
8585
}
8686

8787
@Test
8888
void configureExecutor() {
8989
ChannelRegistration registration = new ChannelRegistration();
90-
TaskExecutor taskExecutor = mock(TaskExecutor.class);
91-
registration.executor(taskExecutor);
92-
assertThat(registration.hasTaskExecutor()).isTrue();
93-
TaskExecutor taskExecutor1 = registration.getTaskExecutor(this.fallback, this.customizer);
94-
assertThat(taskExecutor1).isSameAs(taskExecutor);
90+
Executor executor = mock(Executor.class);
91+
registration.executor(executor);
92+
assertThat(registration.hasExecutor()).isTrue();
93+
Executor actualExecutor = registration.getExecutor(this.fallback, this.customizer);
94+
assertThat(actualExecutor).isSameAs(executor);
9595
verifyNoInteractions(this.fallback, this.customizer);
9696
}
9797

9898
@Test
9999
void configureExecutorTakesPrecedenceOverTaskRegistration() {
100100
ChannelRegistration registration = new ChannelRegistration();
101-
TaskExecutor taskExecutor = mock(TaskExecutor.class);
102-
registration.executor(taskExecutor);
101+
Executor executor = mock(Executor.class);
102+
registration.executor(executor);
103103
ThreadPoolTaskExecutor ignored = mock(ThreadPoolTaskExecutor.class);
104104
registration.taskExecutor(ignored);
105-
assertThat(registration.hasTaskExecutor()).isTrue();
106-
assertThat(registration.getTaskExecutor(this.fallback, this.customizer)).isSameAs(taskExecutor);
105+
assertThat(registration.hasExecutor()).isTrue();
106+
assertThat(registration.getExecutor(this.fallback, this.customizer)).isSameAs(executor);
107107
verifyNoInteractions(ignored, this.fallback, this.customizer);
108108

109109
}

spring-messaging/src/test/java/org/springframework/messaging/simp/config/MessageBrokerConfigurationTests.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.Set;
2323
import java.util.concurrent.CompletableFuture;
2424
import java.util.concurrent.ConcurrentHashMap;
25+
import java.util.concurrent.Executor;
2526

2627
import org.junit.jupiter.api.Test;
2728

@@ -31,7 +32,6 @@
3132
import org.springframework.context.annotation.Configuration;
3233
import org.springframework.context.support.StaticApplicationContext;
3334
import org.springframework.core.Ordered;
34-
import org.springframework.core.task.TaskExecutor;
3535
import org.springframework.lang.Nullable;
3636
import org.springframework.messaging.Message;
3737
import org.springframework.messaging.MessageChannel;
@@ -599,20 +599,20 @@ public TestController subscriptionController() {
599599

600600
@Override
601601
@Bean
602-
public AbstractSubscribableChannel clientInboundChannel(TaskExecutor clientInboundChannelExecutor) {
602+
public AbstractSubscribableChannel clientInboundChannel(Executor clientInboundChannelExecutor) {
603603
return new TestChannel();
604604
}
605605

606606
@Override
607607
@Bean
608-
public AbstractSubscribableChannel clientOutboundChannel(TaskExecutor clientOutboundChannelExecutor) {
608+
public AbstractSubscribableChannel clientOutboundChannel(Executor clientOutboundChannelExecutor) {
609609
return new TestChannel();
610610
}
611611

612612
@Override
613613
@Bean
614614
public AbstractSubscribableChannel brokerChannel(AbstractSubscribableChannel clientInboundChannel,
615-
AbstractSubscribableChannel clientOutboundChannel, TaskExecutor brokerChannelExecutor) {
615+
AbstractSubscribableChannel clientOutboundChannel, Executor brokerChannelExecutor) {
616616
return new TestChannel();
617617
}
618618
}
@@ -688,21 +688,21 @@ protected void configureMessageBroker(MessageBrokerRegistry registry) {
688688

689689
@Override
690690
@Bean
691-
public AbstractSubscribableChannel clientInboundChannel(TaskExecutor clientInboundChannelExecutor) {
691+
public AbstractSubscribableChannel clientInboundChannel(Executor clientInboundChannelExecutor) {
692692
// synchronous
693693
return new ExecutorSubscribableChannel(null);
694694
}
695695

696696
@Override
697697
@Bean
698-
public AbstractSubscribableChannel clientOutboundChannel(TaskExecutor clientOutboundChannelExecutor) {
698+
public AbstractSubscribableChannel clientOutboundChannel(Executor clientOutboundChannelExecutor) {
699699
return new TestChannel();
700700
}
701701

702702
@Override
703703
@Bean
704704
public AbstractSubscribableChannel brokerChannel(AbstractSubscribableChannel clientInboundChannel,
705-
AbstractSubscribableChannel clientOutboundChannel, TaskExecutor brokerChannelExecutor) {
705+
AbstractSubscribableChannel clientOutboundChannel, Executor brokerChannelExecutor) {
706706
// synchronous
707707
return new ExecutorSubscribableChannel(null);
708708
}

0 commit comments

Comments
 (0)