Skip to content

Commit c73cff8

Browse files
committed
Use RxJava 2/3 fromPublisher() when possible in ReactiveAdapterRegistry
Closes gh-26051
1 parent e592634 commit c73cff8

File tree

1 file changed

+7
-14
lines changed

1 file changed

+7
-14
lines changed

spring-core/src/main/java/org/springframework/core/ReactiveAdapterRegistry.java

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -264,14 +264,12 @@ void registerAdapters(ReactiveAdapterRegistry registry) {
264264
registry.registerReactiveType(
265265
ReactiveTypeDescriptor.multiValue(io.reactivex.Observable.class, io.reactivex.Observable::empty),
266266
source -> ((io.reactivex.Observable<?>) source).toFlowable(io.reactivex.BackpressureStrategy.BUFFER),
267-
source -> io.reactivex.Flowable.fromPublisher(source)
268-
.toObservable()
267+
io.reactivex.Observable::fromPublisher
269268
);
270269
registry.registerReactiveType(
271270
ReactiveTypeDescriptor.singleRequiredValue(io.reactivex.Single.class),
272271
source -> ((io.reactivex.Single<?>) source).toFlowable(),
273-
source -> io.reactivex.Flowable.fromPublisher(source)
274-
.toObservable().singleElement().toSingle()
272+
io.reactivex.Single::fromPublisher
275273
);
276274
registry.registerReactiveType(
277275
ReactiveTypeDescriptor.singleOptionalValue(io.reactivex.Maybe.class, io.reactivex.Maybe::empty),
@@ -282,8 +280,7 @@ void registerAdapters(ReactiveAdapterRegistry registry) {
282280
registry.registerReactiveType(
283281
ReactiveTypeDescriptor.noValue(io.reactivex.Completable.class, io.reactivex.Completable::complete),
284282
source -> ((io.reactivex.Completable) source).toFlowable(),
285-
source -> io.reactivex.Flowable.fromPublisher(source)
286-
.toObservable().ignoreElements()
283+
io.reactivex.Completable::fromPublisher
287284
);
288285
}
289286
}
@@ -304,30 +301,26 @@ void registerAdapters(ReactiveAdapterRegistry registry) {
304301
io.reactivex.rxjava3.core.Observable::empty),
305302
source -> ((io.reactivex.rxjava3.core.Observable<?>) source).toFlowable(
306303
io.reactivex.rxjava3.core.BackpressureStrategy.BUFFER),
307-
source -> io.reactivex.rxjava3.core.Flowable.fromPublisher(source)
308-
.toObservable()
304+
io.reactivex.rxjava3.core.Observable::fromPublisher
309305
);
310306
registry.registerReactiveType(
311307
ReactiveTypeDescriptor.singleRequiredValue(io.reactivex.rxjava3.core.Single.class),
312308
source -> ((io.reactivex.rxjava3.core.Single<?>) source).toFlowable(),
313-
source -> io.reactivex.rxjava3.core.Flowable.fromPublisher(source)
314-
.toObservable().singleElement().toSingle()
309+
io.reactivex.rxjava3.core.Single::fromPublisher
315310
);
316311
registry.registerReactiveType(
317312
ReactiveTypeDescriptor.singleOptionalValue(
318313
io.reactivex.rxjava3.core.Maybe.class,
319314
io.reactivex.rxjava3.core.Maybe::empty),
320315
source -> ((io.reactivex.rxjava3.core.Maybe<?>) source).toFlowable(),
321-
source -> io.reactivex.rxjava3.core.Flowable.fromPublisher(source)
322-
.toObservable().singleElement()
316+
io.reactivex.rxjava3.core.Maybe::fromPublisher
323317
);
324318
registry.registerReactiveType(
325319
ReactiveTypeDescriptor.noValue(
326320
io.reactivex.rxjava3.core.Completable.class,
327321
io.reactivex.rxjava3.core.Completable::complete),
328322
source -> ((io.reactivex.rxjava3.core.Completable) source).toFlowable(),
329-
source -> io.reactivex.rxjava3.core.Flowable.fromPublisher(source)
330-
.toObservable().ignoreElements()
323+
io.reactivex.rxjava3.core.Completable::fromPublisher
331324
);
332325
}
333326
}

0 commit comments

Comments
 (0)