Skip to content

Commit c749a14

Browse files
committed
Guard against multiple body subscriptions
Before this commit, the JDK and Jetty connectors do not have any safeguards against multiple body subscriptions. Such as check has now been added. See gh-32100 Closes gh-32102
1 parent 3817936 commit c749a14

File tree

5 files changed

+140
-124
lines changed

5 files changed

+140
-124
lines changed
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Copyright 2002-2024 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.http.client.reactive;
18+
19+
import java.util.concurrent.atomic.AtomicBoolean;
20+
21+
import reactor.core.publisher.Flux;
22+
23+
import org.springframework.core.io.buffer.DataBuffer;
24+
import org.springframework.http.HttpHeaders;
25+
import org.springframework.http.HttpStatusCode;
26+
import org.springframework.http.ResponseCookie;
27+
import org.springframework.util.Assert;
28+
import org.springframework.util.MultiValueMap;
29+
30+
/**
31+
* Base class for {@link ClientHttpResponse} implementations.
32+
*
33+
* @author Arjen Poutsma
34+
* @since 5.3.32
35+
*/
36+
public abstract class AbstractClientHttpResponse implements ClientHttpResponse {
37+
38+
private final HttpStatusCode statusCode;
39+
40+
private final HttpHeaders headers;
41+
42+
private final MultiValueMap<String, ResponseCookie> cookies;
43+
44+
private final Flux<DataBuffer> body;
45+
46+
47+
48+
protected AbstractClientHttpResponse(HttpStatusCode statusCode, HttpHeaders headers,
49+
MultiValueMap<String, ResponseCookie> cookies, Flux<DataBuffer> body) {
50+
51+
Assert.notNull(statusCode, "StatusCode must not be null");
52+
Assert.notNull(headers, "Headers must not be null");
53+
Assert.notNull(body, "Body must not be null");
54+
55+
this.statusCode = statusCode;
56+
this.headers = headers;
57+
this.cookies = cookies;
58+
this.body = singleSubscription(body);
59+
}
60+
61+
private static Flux<DataBuffer> singleSubscription(Flux<DataBuffer> body) {
62+
AtomicBoolean subscribed = new AtomicBoolean();
63+
return body.doOnSubscribe(s -> {
64+
if (!subscribed.compareAndSet(false, true)) {
65+
throw new IllegalStateException("The client response body can only be consumed once");
66+
}
67+
});
68+
}
69+
70+
71+
@Override
72+
public HttpStatusCode getStatusCode() {
73+
return this.statusCode;
74+
}
75+
76+
@Override
77+
public HttpHeaders getHeaders() {
78+
return this.headers;
79+
}
80+
81+
@Override
82+
public MultiValueMap<String, ResponseCookie> getCookies() {
83+
return this.cookies;
84+
}
85+
86+
@Override
87+
public Flux<DataBuffer> getBody() {
88+
return this.body;
89+
}
90+
}
Lines changed: 10 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,7 +17,6 @@
1717
package org.springframework.http.client.reactive;
1818

1919
import java.nio.ByteBuffer;
20-
import java.util.concurrent.atomic.AtomicBoolean;
2120

2221
import org.apache.hc.client5.http.cookie.Cookie;
2322
import org.apache.hc.client5.http.protocol.HttpClientContext;
@@ -26,7 +25,6 @@
2625
import org.reactivestreams.Publisher;
2726
import reactor.core.publisher.Flux;
2827

29-
import org.springframework.core.io.buffer.DataBuffer;
3028
import org.springframework.core.io.buffer.DataBufferFactory;
3129
import org.springframework.http.HttpHeaders;
3230
import org.springframework.http.HttpStatusCode;
@@ -42,40 +40,22 @@
4240
* @since 5.3
4341
* @see <a href="https://hc.apache.org/index.html">Apache HttpComponents</a>
4442
*/
45-
class HttpComponentsClientHttpResponse implements ClientHttpResponse {
46-
47-
private final DataBufferFactory dataBufferFactory;
48-
49-
private final Message<HttpResponse, Publisher<ByteBuffer>> message;
50-
51-
private final HttpHeaders headers;
52-
53-
private final HttpClientContext context;
54-
55-
private final AtomicBoolean rejectSubscribers = new AtomicBoolean();
43+
class HttpComponentsClientHttpResponse extends AbstractClientHttpResponse {
5644

5745

5846
public HttpComponentsClientHttpResponse(DataBufferFactory dataBufferFactory,
5947
Message<HttpResponse, Publisher<ByteBuffer>> message, HttpClientContext context) {
6048

61-
this.dataBufferFactory = dataBufferFactory;
62-
this.message = message;
63-
this.context = context;
64-
65-
MultiValueMap<String, String> adapter = new HttpComponentsHeadersAdapter(message.getHead());
66-
this.headers = HttpHeaders.readOnlyHttpHeaders(adapter);
49+
super(HttpStatusCode.valueOf(message.getHead().getCode()),
50+
HttpHeaders.readOnlyHttpHeaders(new HttpComponentsHeadersAdapter(message.getHead())),
51+
adaptCookies(context),
52+
Flux.from(message.getBody()).map(dataBufferFactory::wrap)
53+
);
6754
}
6855

69-
70-
@Override
71-
public HttpStatusCode getStatusCode() {
72-
return HttpStatusCode.valueOf(this.message.getHead().getCode());
73-
}
74-
75-
@Override
76-
public MultiValueMap<String, ResponseCookie> getCookies() {
56+
private static MultiValueMap<String, ResponseCookie> adaptCookies(HttpClientContext context) {
7757
LinkedMultiValueMap<String, ResponseCookie> result = new LinkedMultiValueMap<>();
78-
this.context.getCookieStore().getCookies().forEach(cookie ->
58+
context.getCookieStore().getCookies().forEach(cookie ->
7959
result.add(cookie.getName(),
8060
ResponseCookie.fromClientResponse(cookie.getName(), cookie.getValue())
8161
.domain(cookie.getDomain())
@@ -88,25 +68,10 @@ public MultiValueMap<String, ResponseCookie> getCookies() {
8868
return result;
8969
}
9070

91-
private long getMaxAgeSeconds(Cookie cookie) {
71+
private static long getMaxAgeSeconds(Cookie cookie) {
9272
String maxAgeAttribute = cookie.getAttribute(Cookie.MAX_AGE_ATTR);
9373
return (maxAgeAttribute != null ? Long.parseLong(maxAgeAttribute) : -1);
9474
}
9575

96-
@Override
97-
public Flux<DataBuffer> getBody() {
98-
return Flux.from(this.message.getBody())
99-
.doOnSubscribe(s -> {
100-
if (!this.rejectSubscribers.compareAndSet(false, true)) {
101-
throw new IllegalStateException("The client response body can only be consumed once.");
102-
}
103-
})
104-
.map(this.dataBufferFactory::wrap);
105-
}
106-
107-
@Override
108-
public HttpHeaders getHeaders() {
109-
return this.headers;
110-
}
11176

11277
}

spring-web/src/main/java/org/springframework/http/client/reactive/JdkClientHttpResponse.java

Lines changed: 17 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -50,24 +50,20 @@
5050
* @author Rossen Stoyanchev
5151
* @since 6.0
5252
*/
53-
class JdkClientHttpResponse implements ClientHttpResponse {
53+
class JdkClientHttpResponse extends AbstractClientHttpResponse {
5454

5555
private static final Pattern SAME_SITE_PATTERN = Pattern.compile("(?i).*SameSite=(Strict|Lax|None).*");
5656

5757

58-
private final HttpResponse<Flow.Publisher<List<ByteBuffer>>> response;
5958

60-
private final DataBufferFactory bufferFactory;
59+
public JdkClientHttpResponse(HttpResponse<Flow.Publisher<List<ByteBuffer>>> response,
60+
DataBufferFactory bufferFactory) {
6161

62-
private final HttpHeaders headers;
63-
64-
65-
public JdkClientHttpResponse(
66-
HttpResponse<Flow.Publisher<List<ByteBuffer>>> response, DataBufferFactory bufferFactory) {
67-
68-
this.response = response;
69-
this.bufferFactory = bufferFactory;
70-
this.headers = adaptHeaders(response);
62+
super(HttpStatusCode.valueOf(response.statusCode()),
63+
adaptHeaders(response),
64+
adaptCookies(response),
65+
adaptBody(response, bufferFactory)
66+
);
7167
}
7268

7369
private static HttpHeaders adaptHeaders(HttpResponse<Flow.Publisher<List<ByteBuffer>>> response) {
@@ -78,20 +74,8 @@ private static HttpHeaders adaptHeaders(HttpResponse<Flow.Publisher<List<ByteBuf
7874
return HttpHeaders.readOnlyHttpHeaders(multiValueMap);
7975
}
8076

81-
82-
@Override
83-
public HttpStatusCode getStatusCode() {
84-
return HttpStatusCode.valueOf(this.response.statusCode());
85-
}
86-
87-
@Override
88-
public HttpHeaders getHeaders() {
89-
return this.headers;
90-
}
91-
92-
@Override
93-
public MultiValueMap<String, ResponseCookie> getCookies() {
94-
return this.response.headers().allValues(HttpHeaders.SET_COOKIE).stream()
77+
private static MultiValueMap<String, ResponseCookie> adaptCookies(HttpResponse<Flow.Publisher<List<ByteBuffer>>> response) {
78+
return response.headers().allValues(HttpHeaders.SET_COOKIE).stream()
9579
.flatMap(header -> {
9680
Matcher matcher = SAME_SITE_PATTERN.matcher(header);
9781
String sameSite = (matcher.matches() ? matcher.group(1) : null);
@@ -102,7 +86,7 @@ public MultiValueMap<String, ResponseCookie> getCookies() {
10286
LinkedMultiValueMap::addAll);
10387
}
10488

105-
private ResponseCookie toResponseCookie(HttpCookie cookie, @Nullable String sameSite) {
89+
private static ResponseCookie toResponseCookie(HttpCookie cookie, @Nullable String sameSite) {
10690
return ResponseCookie.from(cookie.getName(), cookie.getValue())
10791
.domain(cookie.getDomain())
10892
.httpOnly(cookie.isHttpOnly())
@@ -113,12 +97,12 @@ private ResponseCookie toResponseCookie(HttpCookie cookie, @Nullable String same
11397
.build();
11498
}
11599

116-
@Override
117-
public Flux<DataBuffer> getBody() {
118-
return JdkFlowAdapter.flowPublisherToFlux(this.response.body())
100+
private static Flux<DataBuffer> adaptBody(HttpResponse<Flow.Publisher<List<ByteBuffer>>> response, DataBufferFactory bufferFactory) {
101+
return JdkFlowAdapter.flowPublisherToFlux(response.body())
119102
.flatMapIterable(Function.identity())
120-
.map(this.bufferFactory::wrap)
121-
.doOnDiscard(DataBuffer.class, DataBufferUtils::release);
103+
.map(bufferFactory::wrap)
104+
.doOnDiscard(DataBuffer.class, DataBufferUtils::release)
105+
.cache(0);
122106
}
123107

124108
}
Lines changed: 16 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2023 the original author or authors.
2+
* Copyright 2002-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -21,8 +21,8 @@
2121
import java.util.regex.Matcher;
2222
import java.util.regex.Pattern;
2323

24+
import org.eclipse.jetty.http.HttpField;
2425
import org.eclipse.jetty.reactive.client.ReactiveResponse;
25-
import org.reactivestreams.Publisher;
2626
import reactor.core.publisher.Flux;
2727

2828
import org.springframework.core.io.buffer.DataBuffer;
@@ -42,49 +42,37 @@
4242
* @see <a href="https://github.com/jetty-project/jetty-reactive-httpclient">
4343
* Jetty ReactiveStreams HttpClient</a>
4444
*/
45-
class JettyClientHttpResponse implements ClientHttpResponse {
45+
class JettyClientHttpResponse extends AbstractClientHttpResponse {
4646

4747
private static final Pattern SAME_SITE_PATTERN = Pattern.compile("(?i).*SameSite=(Strict|Lax|None).*");
4848

4949

50-
private final ReactiveResponse reactiveResponse;
50+
public JettyClientHttpResponse(ReactiveResponse reactiveResponse, Flux<DataBuffer> content) {
5151

52-
private final Flux<DataBuffer> content;
53-
54-
private final HttpHeaders headers;
55-
56-
57-
public JettyClientHttpResponse(ReactiveResponse reactiveResponse, Publisher<DataBuffer> content) {
58-
this.reactiveResponse = reactiveResponse;
59-
this.content = Flux.from(content);
60-
61-
MultiValueMap<String, String> headers = new JettyHeadersAdapter(reactiveResponse.getHeaders());
62-
this.headers = HttpHeaders.readOnlyHttpHeaders(headers);
52+
super(HttpStatusCode.valueOf(reactiveResponse.getStatus()),
53+
adaptHeaders(reactiveResponse),
54+
adaptCookies(reactiveResponse),
55+
content);
6356
}
6457

65-
66-
@Override
67-
public HttpStatusCode getStatusCode() {
68-
return HttpStatusCode.valueOf(this.reactiveResponse.getStatus());
58+
private static HttpHeaders adaptHeaders(ReactiveResponse response) {
59+
MultiValueMap<String, String> headers = new JettyHeadersAdapter(response.getHeaders());
60+
return HttpHeaders.readOnlyHttpHeaders(headers);
6961
}
70-
71-
@Override
72-
public MultiValueMap<String, ResponseCookie> getCookies() {
62+
private static MultiValueMap<String, ResponseCookie> adaptCookies(ReactiveResponse response) {
7363
MultiValueMap<String, ResponseCookie> result = new LinkedMultiValueMap<>();
74-
List<String> cookieHeader = getHeaders().get(HttpHeaders.SET_COOKIE);
75-
if (cookieHeader != null) {
76-
cookieHeader.forEach(header ->
77-
HttpCookie.parse(header).forEach(cookie -> result.add(cookie.getName(),
64+
List<HttpField> cookieHeaders = response.getHeaders().getFields(HttpHeaders.SET_COOKIE);
65+
cookieHeaders.forEach(header ->
66+
HttpCookie.parse(header.getValue()).forEach(cookie -> result.add(cookie.getName(),
7867
ResponseCookie.fromClientResponse(cookie.getName(), cookie.getValue())
7968
.domain(cookie.getDomain())
8069
.path(cookie.getPath())
8170
.maxAge(cookie.getMaxAge())
8271
.secure(cookie.getSecure())
8372
.httpOnly(cookie.isHttpOnly())
84-
.sameSite(parseSameSite(header))
73+
.sameSite(parseSameSite(header.getValue()))
8574
.build()))
8675
);
87-
}
8876
return CollectionUtils.unmodifiableMultiValueMap(result);
8977
}
9078

@@ -94,15 +82,4 @@ private static String parseSameSite(String headerValue) {
9482
return (matcher.matches() ? matcher.group(1) : null);
9583
}
9684

97-
98-
@Override
99-
public Flux<DataBuffer> getBody() {
100-
return this.content;
101-
}
102-
103-
@Override
104-
public HttpHeaders getHeaders() {
105-
return this.headers;
106-
}
107-
10885
}

0 commit comments

Comments
 (0)