diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperator.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperator.java index a1e753500de7..f31a74290462 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperator.java +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperator.java @@ -38,6 +38,10 @@ * application services utilizing this class, making calls to the low-level * services via an inner-class callback object. * + *

Transactional Publishers should avoid Subscription cancellation. + * Cancelling initiates asynchronous transaction cleanup that does not allow for + * synchronization on completion. + * * @author Mark Paluch * @author Juergen Hoeller * @since 5.2 @@ -64,9 +68,7 @@ default Flux transactional(Flux flux) { * @throws TransactionException in case of initialization, rollback, or system errors * @throws RuntimeException if thrown by the TransactionCallback */ - default Mono transactional(Mono mono) { - return execute(it -> mono).next(); - } + Mono transactional(Mono mono); /** * Execute the action specified by the given callback object within a transaction. diff --git a/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperatorImpl.java b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperatorImpl.java index 8929c2e36943..cba0f485cc75 100644 --- a/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperatorImpl.java +++ b/spring-tx/src/main/java/org/springframework/transaction/reactive/TransactionalOperatorImpl.java @@ -71,6 +71,21 @@ public ReactiveTransactionManager getTransactionManager() { return this.transactionManager; } + @Override + public Mono transactional(Mono mono) { + return TransactionContextManager.currentContext().flatMap(context -> { + Mono status = this.transactionManager.getReactiveTransaction(this.transactionDefinition); + // This is an around advice: Invoke the next interceptor in the chain. + // This will normally result in a target object being invoked. + // Need re-wrapping of ReactiveTransaction until we get hold of the exception + // through usingWhen. + return status.flatMap(it -> Mono.usingWhen(Mono.just(it), ignore -> mono, + this.transactionManager::commit, s -> Mono.empty()) + .onErrorResume(ex -> rollbackOnException(it, ex).then(Mono.error(ex)))); + }) + .subscriberContext(TransactionContextManager.getOrCreateContext()) + .subscriberContext(TransactionContextManager.getOrCreateContextHolder()); + } @Override public Flux execute(TransactionCallback action) throws TransactionException { diff --git a/spring-tx/src/test/java/org/springframework/transaction/reactive/TransactionalOperatorTests.java b/spring-tx/src/test/java/org/springframework/transaction/reactive/TransactionalOperatorTests.java index 12de331a8b2f..bc19adb58bb6 100644 --- a/spring-tx/src/test/java/org/springframework/transaction/reactive/TransactionalOperatorTests.java +++ b/spring-tx/src/test/java/org/springframework/transaction/reactive/TransactionalOperatorTests.java @@ -16,6 +16,8 @@ package org.springframework.transaction.reactive; +import java.util.concurrent.atomic.AtomicBoolean; + import org.junit.jupiter.api.Test; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -46,6 +48,19 @@ public void commitWithMono() { assertThat(tm.rollback).isFalse(); } + @Test + public void monoSubscriptionNotCancelled() { + AtomicBoolean cancelled = new AtomicBoolean(); + TransactionalOperator operator = TransactionalOperator.create(tm, new DefaultTransactionDefinition()); + Mono.just(true).doOnCancel(() -> cancelled.set(true)).as(operator::transactional) + .as(StepVerifier::create) + .expectNext(true) + .verifyComplete(); + assertThat(tm.commit).isTrue(); + assertThat(tm.rollback).isFalse(); + assertThat(cancelled).isFalse(); + } + @Test public void rollbackWithMono() { TransactionalOperator operator = TransactionalOperator.create(tm, new DefaultTransactionDefinition());