From f88d86ae6bc4241c94bfb3fbcfa694fba5c5a532 Mon Sep 17 00:00:00 2001 From: Christoph Strobl Date: Mon, 25 Nov 2024 14:31:39 +0100 Subject: [PATCH 1/8] Prepare issue branch. --- pom.xml | 2 +- spring-data-mongodb-distribution/pom.xml | 2 +- spring-data-mongodb/pom.xml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index c3245aad49..83290a2d49 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ org.springframework.data spring-data-mongodb-parent - 4.5.0-SNAPSHOT + 4.5.x-GH-4838-SNAPSHOT pom Spring Data MongoDB diff --git a/spring-data-mongodb-distribution/pom.xml b/spring-data-mongodb-distribution/pom.xml index 58c63dfc97..5373fa0ec5 100644 --- a/spring-data-mongodb-distribution/pom.xml +++ b/spring-data-mongodb-distribution/pom.xml @@ -15,7 +15,7 @@ org.springframework.data spring-data-mongodb-parent - 4.5.0-SNAPSHOT + 4.5.x-GH-4838-SNAPSHOT ../pom.xml diff --git a/spring-data-mongodb/pom.xml b/spring-data-mongodb/pom.xml index 98516a5ba9..681527c04a 100644 --- a/spring-data-mongodb/pom.xml +++ b/spring-data-mongodb/pom.xml @@ -13,7 +13,7 @@ org.springframework.data spring-data-mongodb-parent - 4.5.0-SNAPSHOT + 4.5.x-GH-4838-SNAPSHOT ../pom.xml From daf8240df3b223593c3229110a194f66ae5ebf17 Mon Sep 17 00:00:00 2001 From: Christoph Strobl Date: Mon, 25 Nov 2024 16:09:18 +0100 Subject: [PATCH 2/8] hacking --- .../support/SimpleReactiveMongoRepository.java | 17 +++++++++++++++-- .../SimpleReactiveMongoRepositoryTests.java | 16 ++++++++++++++++ 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java index cfcc5cb88b..39b93bb789 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java @@ -113,8 +113,7 @@ public Flux saveAll(Iterable entities) { Streamable source = Streamable.of(entities); return source.stream().allMatch(entityInformation::isNew) ? // - insert(entities) : - Flux.fromIterable(entities).concatMap(this::save); + insert(entities) : doItSomewhatSequentially(source, this::save); } @Override @@ -127,6 +126,20 @@ public Flux saveAll(Publisher entityStream) { mongoOperations.save(entity, entityInformation.getCollectionName())); } + static Flux doItSomewhatSequentially/* how should we actually call this? */(Streamable ts, Function> mapper) { + + List list = ts.toList(); + if (list.size() == 1) { + return Flux.just(list.iterator().next()).flatMap(mapper); + } else if (list.size() == 2) { + return Flux.fromIterable(list).concatMap(mapper); + } + + Flux first = Flux.just(list.get(0)).flatMap(mapper); + Flux theRest = Flux.fromIterable(list.subList(1, list.size())).flatMapSequential(mapper); + return first.concatWith(theRest); + } + @Override public Mono findById(ID id) { diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/SimpleReactiveMongoRepositoryTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/SimpleReactiveMongoRepositoryTests.java index f8ab0f1563..ee13dc7ea9 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/SimpleReactiveMongoRepositoryTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/SimpleReactiveMongoRepositoryTests.java @@ -18,6 +18,10 @@ import static org.assertj.core.api.Assertions.*; import static org.springframework.data.domain.ExampleMatcher.*; +import org.junit.jupiter.api.RepeatedTest; +import org.springframework.data.mongodb.ReactiveMongoTransactionManager; +import org.springframework.transaction.TransactionDefinition; +import org.springframework.transaction.reactive.TransactionalOperator; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -333,6 +337,18 @@ void savePublisherOfEntitiesShouldInsertEntity() { assertThat(boyd.getId()).isNotNull(); } + @RepeatedTest(10) + void transactionalSaveAllForStuffThatIsConsideredAnUpdateOfExistingData() { + + ReactiveMongoTransactionManager txmgr = new ReactiveMongoTransactionManager(template.getMongoDatabaseFactory()); + TransactionalOperator.create(txmgr, TransactionDefinition.withDefaults()).execute(callback -> { + return repository.saveAll(Arrays.asList(oliver, dave, carter, boyd, stefan, leroi, alicia)); + }) + .as(StepVerifier::create) // + .expectNext(oliver, dave, carter, boyd, stefan, leroi, alicia) + .verifyComplete(); + } + @Test // GH-3609 void savePublisherOfImmutableEntitiesShouldInsertEntity() { From f62fb20a127828d502691d92d3b2298dba34325d Mon Sep 17 00:00:00 2001 From: Christoph Strobl Date: Tue, 26 Nov 2024 09:13:56 +0100 Subject: [PATCH 3/8] hacking II - is this any better? --- .../SimpleReactiveMongoRepository.java | 59 ++++++++++++++----- 1 file changed, 44 insertions(+), 15 deletions(-) diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java index 39b93bb789..cc347e03b1 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java @@ -17,6 +17,7 @@ import static org.springframework.data.mongodb.core.query.Criteria.*; +import reactor.core.CoreSubscriber; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -113,7 +114,7 @@ public Flux saveAll(Iterable entities) { Streamable source = Streamable.of(entities); return source.stream().allMatch(entityInformation::isNew) ? // - insert(entities) : doItSomewhatSequentially(source, this::save); + insert(entities) : new AeonFlux<>(source).combatMap(this::save); } @Override @@ -126,20 +127,6 @@ public Flux saveAll(Publisher entityStream) { mongoOperations.save(entity, entityInformation.getCollectionName())); } - static Flux doItSomewhatSequentially/* how should we actually call this? */(Streamable ts, Function> mapper) { - - List list = ts.toList(); - if (list.size() == 1) { - return Flux.just(list.iterator().next()).flatMap(mapper); - } else if (list.size() == 2) { - return Flux.fromIterable(list).concatMap(mapper); - } - - Flux first = Flux.just(list.get(0)).flatMap(mapper); - Flux theRest = Flux.fromIterable(list.subList(1, list.size())).flatMapSequential(mapper); - return first.concatWith(theRest); - } - @Override public Mono findById(ID id) { @@ -579,4 +566,46 @@ private ReactiveFindOperation.TerminatingFind createQuery(UnaryOperator extends Flux { + + private final Streamable source; + private final Flux delegate; + + AeonFlux(Streamable source) { + this(source, Flux.fromIterable(source)); + } + + private AeonFlux(Streamable source, Flux delegate) { + this.source = source; + this.delegate = delegate; + } + + @Override + public void subscribe(CoreSubscriber actual) { + delegate.subscribe(actual); + } + + Flux combatMap(Function> mapper) { + return new AeonFlux<>(source, combatMapList(source.toList(), mapper)); + } + + private static Flux combatMapList(List list, + Function> mapper) { + + if (list.isEmpty()) { + return Flux.empty(); + } + if (list.size() == 1) { + return Flux.just(list.iterator().next()).flatMap(mapper); + } + if (list.size() == 2) { + return Flux.fromIterable(list).concatMap(mapper); + } + + Flux first = Flux.just(list.get(0)).flatMap(mapper); + Flux theRest = Flux.fromIterable(list.subList(1, list.size())).flatMapSequential(mapper); + return first.concatWith(theRest); + } + } } From 23fd571ed99c497f48fa3fd0e136f8bd1ce36a10 Mon Sep 17 00:00:00 2001 From: Christoph Strobl Date: Wed, 27 Nov 2024 15:36:40 +0100 Subject: [PATCH 4/8] Revert "hacking II - is this any better?" This reverts commit f62fb20a127828d502691d92d3b2298dba34325d. --- .../SimpleReactiveMongoRepository.java | 59 +++++-------------- 1 file changed, 15 insertions(+), 44 deletions(-) diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java index cc347e03b1..39b93bb789 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java @@ -17,7 +17,6 @@ import static org.springframework.data.mongodb.core.query.Criteria.*; -import reactor.core.CoreSubscriber; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -114,7 +113,7 @@ public Flux saveAll(Iterable entities) { Streamable source = Streamable.of(entities); return source.stream().allMatch(entityInformation::isNew) ? // - insert(entities) : new AeonFlux<>(source).combatMap(this::save); + insert(entities) : doItSomewhatSequentially(source, this::save); } @Override @@ -127,6 +126,20 @@ public Flux saveAll(Publisher entityStream) { mongoOperations.save(entity, entityInformation.getCollectionName())); } + static Flux doItSomewhatSequentially/* how should we actually call this? */(Streamable ts, Function> mapper) { + + List list = ts.toList(); + if (list.size() == 1) { + return Flux.just(list.iterator().next()).flatMap(mapper); + } else if (list.size() == 2) { + return Flux.fromIterable(list).concatMap(mapper); + } + + Flux first = Flux.just(list.get(0)).flatMap(mapper); + Flux theRest = Flux.fromIterable(list.subList(1, list.size())).flatMapSequential(mapper); + return first.concatWith(theRest); + } + @Override public Mono findById(ID id) { @@ -566,46 +579,4 @@ private ReactiveFindOperation.TerminatingFind createQuery(UnaryOperator extends Flux { - - private final Streamable source; - private final Flux delegate; - - AeonFlux(Streamable source) { - this(source, Flux.fromIterable(source)); - } - - private AeonFlux(Streamable source, Flux delegate) { - this.source = source; - this.delegate = delegate; - } - - @Override - public void subscribe(CoreSubscriber actual) { - delegate.subscribe(actual); - } - - Flux combatMap(Function> mapper) { - return new AeonFlux<>(source, combatMapList(source.toList(), mapper)); - } - - private static Flux combatMapList(List list, - Function> mapper) { - - if (list.isEmpty()) { - return Flux.empty(); - } - if (list.size() == 1) { - return Flux.just(list.iterator().next()).flatMap(mapper); - } - if (list.size() == 2) { - return Flux.fromIterable(list).concatMap(mapper); - } - - Flux first = Flux.just(list.get(0)).flatMap(mapper); - Flux theRest = Flux.fromIterable(list.subList(1, list.size())).flatMapSequential(mapper); - return first.concatWith(theRest); - } - } } From 18e9640de955e129ed1dae45b1b65339877e3d16 Mon Sep 17 00:00:00 2001 From: Christoph Strobl Date: Thu, 28 Nov 2024 09:29:52 +0100 Subject: [PATCH 5/8] Polish things up a bit --- .../SimpleReactiveMongoRepository.java | 78 ++++++++++++------- 1 file changed, 52 insertions(+), 26 deletions(-) diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java index 39b93bb789..ed13d8db49 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java @@ -21,6 +21,7 @@ import reactor.core.publisher.Mono; import java.io.Serializable; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -47,7 +48,6 @@ import org.springframework.data.mongodb.repository.query.MongoEntityInformation; import org.springframework.data.repository.query.FluentQuery; import org.springframework.data.util.StreamUtils; -import org.springframework.data.util.Streamable; import org.springframework.lang.Nullable; import org.springframework.util.Assert; @@ -110,10 +110,9 @@ public Flux saveAll(Iterable entities) { Assert.notNull(entities, "The given Iterable of entities must not be null"); - Streamable source = Streamable.of(entities); - + List source = toList(entities); return source.stream().allMatch(entityInformation::isNew) ? // - insert(entities) : doItSomewhatSequentially(source, this::save); + insert(source) : concatMapSequentially(source, this::save); } @Override @@ -126,20 +125,6 @@ public Flux saveAll(Publisher entityStream) { mongoOperations.save(entity, entityInformation.getCollectionName())); } - static Flux doItSomewhatSequentially/* how should we actually call this? */(Streamable ts, Function> mapper) { - - List list = ts.toList(); - if (list.size() == 1) { - return Flux.just(list.iterator().next()).flatMap(mapper); - } else if (list.size() == 2) { - return Flux.fromIterable(list).concatMap(mapper); - } - - Flux first = Flux.just(list.get(0)).flatMap(mapper); - Flux theRest = Flux.fromIterable(list.subList(1, list.size())).flatMapSequential(mapper); - return first.concatWith(theRest); - } - @Override public Mono findById(ID id) { @@ -349,8 +334,11 @@ public Flux insert(Iterable entities) { Assert.notNull(entities, "The given Iterable of entities must not be null"); - Collection source = toCollection(entities); - return source.isEmpty() ? Flux.empty() : mongoOperations.insert(source, entityInformation.getCollectionName()); + return insert(toCollection(entities)); + } + + private Flux insert(Collection entities) { + return entities.isEmpty() ? Flux.empty() : mongoOperations.insert(entities, entityInformation.getCollectionName()); } @Override @@ -453,6 +441,12 @@ void setRepositoryMethodMetadata(CrudMethodMetadata crudMethodMetadata) { this.crudMethodMetadata = crudMethodMetadata; } + private Flux findAll(Query query) { + + getReadPreference().ifPresent(query::withReadPreference); + return mongoOperations.find(query, entityInformation.getJavaType(), entityInformation.getCollectionName()); + } + private Optional getReadPreference() { if (crudMethodMetadata == null) { @@ -474,15 +468,47 @@ private Query getIdQuery(Iterable ids) { return new Query(where(entityInformation.getIdAttribute()).in(toCollection(ids))); } - private static Collection toCollection(Iterable ids) { - return ids instanceof Collection collection ? collection - : StreamUtils.createStreamFromIterator(ids.iterator()).collect(Collectors.toList()); + /** + * Transform the elements emitted by this Flux into Publishers, then flatten these inner publishers into a single + * Flux. The operation does not allow interleave between performing the map operation for the first and second source + * element guaranteeing the mapping operation completed before subscribing to its following inners, that will then be + * subscribed to eagerly emitting elements in order of their source. + * + *
+	 * Flux.just(first-element).flatMap(...)
+	 *     .concatWith(Flux.fromIterable(remaining-elements).flatMapSequential(...))
+	 * 
+ * + * @param source the collection of elements to transform. + * @param mapper the transformation {@link Function}. Must not be {@literal null}. + * @return never {@literal null}. + * @param source type + */ + static Flux concatMapSequentially(List source, + Function> mapper) { + + if (source.isEmpty()) { + return Flux.empty(); + } + if (source.size() == 1) { + return Flux.just(source.iterator().next()).flatMap(mapper); + } + if (source.size() == 2) { + return Flux.fromIterable(source).concatMap(mapper); + } + + Flux first = Flux.just(source.get(0)).flatMap(mapper); + Flux theRest = Flux.fromIterable(source.subList(1, source.size())).flatMapSequential(mapper); + return first.concatWith(theRest); } - private Flux findAll(Query query) { + private static List toList(Iterable source) { + return source instanceof List list ? list : new ArrayList<>(toCollection(source)); + } - getReadPreference().ifPresent(query::withReadPreference); - return mongoOperations.find(query, entityInformation.getJavaType(), entityInformation.getCollectionName()); + private static Collection toCollection(Iterable source) { + return source instanceof Collection collection ? collection + : StreamUtils.createStreamFromIterator(source.iterator()).collect(Collectors.toList()); } /** From ab1479c4cd05bece56b8e4ab65f85a9484c7df09 Mon Sep 17 00:00:00 2001 From: Christoph Strobl Date: Thu, 28 Nov 2024 09:30:36 +0100 Subject: [PATCH 6/8] Remove duplicate code for delete all calls. --- .../support/SimpleReactiveMongoRepository.java | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java index ed13d8db49..dc456a5570 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java @@ -276,14 +276,10 @@ public Mono deleteAll(Iterable entities) { Assert.notNull(entities, "The given Iterable of entities must not be null"); - Collection idCollection = StreamUtils.createStreamFromIterator(entities.iterator()).map(entityInformation::getId) - .collect(Collectors.toList()); + Collection ids = StreamUtils.createStreamFromIterator(entities.iterator()) + .map(entityInformation::getId).collect(Collectors.toList()); - Criteria idsInCriteria = where(entityInformation.getIdAttribute()).in(idCollection); - - Query query = new Query(idsInCriteria); - getReadPreference().ifPresent(query::withReadPreference); - return mongoOperations.remove(query, entityInformation.getJavaType(), entityInformation.getCollectionName()).then(); + return deleteAllById(ids); } @Override From e085561a86b233ac5620b76a11bc7b87e82adf55 Mon Sep 17 00:00:00 2001 From: Christoph Strobl Date: Thu, 28 Nov 2024 11:52:33 +0100 Subject: [PATCH 7/8] refactor saveAll flow for publisher --- .../SimpleReactiveMongoRepository.java | 22 ++++++++++--- .../SimpleReactiveMongoRepositoryTests.java | 31 +++++++++++++------ 2 files changed, 38 insertions(+), 15 deletions(-) diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java index dc456a5570..a134498f35 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/repository/support/SimpleReactiveMongoRepository.java @@ -116,13 +116,11 @@ public Flux saveAll(Iterable entities) { } @Override - public Flux saveAll(Publisher entityStream) { + public Flux saveAll(Publisher publisher) { - Assert.notNull(entityStream, "The given Publisher of entities must not be null"); + Assert.notNull(publisher, "The given Publisher of entities must not be null"); - return Flux.from(entityStream).concatMap(entity -> entityInformation.isNew(entity) ? // - mongoOperations.insert(entity, entityInformation.getCollectionName()) : // - mongoOperations.save(entity, entityInformation.getCollectionName())); + return concatMapSequentially(publisher, this::save); } @Override @@ -498,6 +496,20 @@ static Flux concatMapSequentially(List source, return first.concatWith(theRest); } + static Flux concatMapSequentially(Publisher publisher, + Function> mapper) { + + return Flux.from(publisher).switchOnFirst(((signal, source) -> { + + if (!signal.hasValue()) { + return source.concatMap(mapper); + } + + Mono firstCall = Mono.from(mapper.apply(signal.get())); + return firstCall.concatWith(source.skip(1).flatMapSequential(mapper)); + })); + } + private static List toList(Iterable source) { return source instanceof List list ? list : new ArrayList<>(toCollection(source)); } diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/SimpleReactiveMongoRepositoryTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/SimpleReactiveMongoRepositoryTests.java index ee13dc7ea9..c4a8c58e4b 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/SimpleReactiveMongoRepositoryTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/repository/SimpleReactiveMongoRepositoryTests.java @@ -18,18 +18,16 @@ import static org.assertj.core.api.Assertions.*; import static org.springframework.data.domain.ExampleMatcher.*; -import org.junit.jupiter.api.RepeatedTest; -import org.springframework.data.mongodb.ReactiveMongoTransactionManager; -import org.springframework.transaction.TransactionDefinition; -import org.springframework.transaction.reactive.TransactionalOperator; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; import java.util.Arrays; import java.util.Objects; +import java.util.stream.Stream; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.springframework.beans.BeansException; @@ -44,6 +42,7 @@ import org.springframework.data.domain.Sort; import org.springframework.data.domain.Sort.Direction; import org.springframework.data.domain.Sort.Order; +import org.springframework.data.mongodb.ReactiveMongoTransactionManager; import org.springframework.data.mongodb.core.ReactiveMongoTemplate; import org.springframework.data.mongodb.repository.support.ReactiveMongoRepositoryFactory; import org.springframework.data.mongodb.repository.support.SimpleReactiveMongoRepository; @@ -52,6 +51,8 @@ import org.springframework.lang.Nullable; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit.jupiter.SpringExtension; +import org.springframework.transaction.TransactionDefinition; +import org.springframework.transaction.reactive.TransactionalOperator; import org.springframework.util.ClassUtils; /** @@ -337,16 +338,26 @@ void savePublisherOfEntitiesShouldInsertEntity() { assertThat(boyd.getId()).isNotNull(); } - @RepeatedTest(10) + @RepeatedTest(10) // GH-4838 void transactionalSaveAllForStuffThatIsConsideredAnUpdateOfExistingData() { ReactiveMongoTransactionManager txmgr = new ReactiveMongoTransactionManager(template.getMongoDatabaseFactory()); TransactionalOperator.create(txmgr, TransactionDefinition.withDefaults()).execute(callback -> { return repository.saveAll(Arrays.asList(oliver, dave, carter, boyd, stefan, leroi, alicia)); - }) - .as(StepVerifier::create) // - .expectNext(oliver, dave, carter, boyd, stefan, leroi, alicia) - .verifyComplete(); + }).as(StepVerifier::create) // + .expectNext(oliver, dave, carter, boyd, stefan, leroi, alicia).verifyComplete(); + } + + @RepeatedTest(10) // GH-4838 + void transactionalSaveAllWithPublisherForStuffThatIsConsideredAnUpdateOfExistingData() { + + ReactiveMongoTransactionManager txmgr = new ReactiveMongoTransactionManager(template.getMongoDatabaseFactory()); + Flux personFlux = Flux.fromStream(Stream.of(oliver, dave, carter, boyd, stefan, leroi, alicia)); + + TransactionalOperator.create(txmgr, TransactionDefinition.withDefaults()).execute(callback -> { + return repository.saveAll(personFlux); + }).as(StepVerifier::create) // + .expectNextCount(7).verifyComplete(); } @Test // GH-3609 @@ -358,7 +369,7 @@ void savePublisherOfImmutableEntitiesShouldInsertEntity() { .consumeNextWith(actual -> { assertThat(actual.id).isNotNull(); }) // - .verifyComplete(); + .verifyComplete(); } @Test // DATAMONGO-1444 From 73ceb08549f3f11a273fbe3117df4dbe4e0808ae Mon Sep 17 00:00:00 2001 From: Christoph Strobl Date: Thu, 28 Nov 2024 11:53:08 +0100 Subject: [PATCH 8/8] Fix flakey test by asserting execution order. --- .../data/mongodb/ReactiveTransactionIntegrationTests.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/ReactiveTransactionIntegrationTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/ReactiveTransactionIntegrationTests.java index 88157024ba..4981c3480b 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/ReactiveTransactionIntegrationTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/ReactiveTransactionIntegrationTests.java @@ -467,7 +467,7 @@ public Flux saveWithErrorLogs(Person person) { TransactionalOperator transactionalOperator = TransactionalOperator.create(manager, new DefaultTransactionDefinition()); - return Flux.merge(operations.save(new EventLog(new ObjectId(), "beforeConvert")), // + return Flux.concat(operations.save(new EventLog(new ObjectId(), "beforeConvert")), // operations.save(new EventLog(new ObjectId(), "afterConvert")), // operations.save(new EventLog(new ObjectId(), "beforeInsert")), // operations.save(person), //