Skip to content

Commit be57d92

Browse files
committed
fixed bug in test, added more tests and better verification
1 parent 12cecd4 commit be57d92

File tree

2 files changed

+74
-27
lines changed
  • parallel-consumer-examples/parallel-consumer-example-core/src

2 files changed

+74
-27
lines changed

parallel-consumer-examples/parallel-consumer-example-core/src/main/java/io/confluent/parallelconsumer/examples/core/CoreApp.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717
import org.apache.kafka.clients.producer.ProducerRecord;
1818
import pl.tlinkowski.unij.api.UniLists;
1919

20+
import java.util.ArrayList;
21+
import java.util.Collections;
22+
import java.util.List;
2023
import java.util.Properties;
2124
import java.util.concurrent.atomic.AtomicInteger;
2225

@@ -28,6 +31,7 @@
2831
@Slf4j
2932
public class CoreApp {
3033

34+
3135
String inputTopic = "input-topic-" + RandomUtils.nextInt();
3236
String outputTopic = "output-topic-" + RandomUtils.nextInt();
3337

@@ -41,8 +45,9 @@ Producer<String, String> getKafkaProducer() {
4145

4246
ParallelStreamProcessor<String, String> parallelConsumer;
4347

44-
public AtomicInteger messagesProcessed = new AtomicInteger(0);
45-
public AtomicInteger messagesProduced = new AtomicInteger(0);
48+
public List<String> processedAndProducedKeys = Collections.synchronizedList(new ArrayList<>());
49+
public AtomicInteger processedCount = new AtomicInteger(0);
50+
public AtomicInteger producedCount = new AtomicInteger(0);
4651

4752
@SuppressWarnings("UnqualifiedFieldAccess")
4853
void run() {
@@ -93,12 +98,13 @@ void runPollAndProduce() {
9398
this.parallelConsumer.pollAndProduce(record -> {
9499
var result = processBrokerRecord(record);
95100
ProducerRecord<String, String> produceRecord =
96-
new ProducerRecord<>(outputTopic, "a-key", result.payload);
101+
new ProducerRecord<>(outputTopic, record.key(), result.payload);
97102

98-
messagesProcessed.incrementAndGet();
103+
processedCount.incrementAndGet();
99104
return UniLists.of(produceRecord);
100105
}, consumeProduceResult -> {
101-
messagesProduced.incrementAndGet();
106+
producedCount.incrementAndGet();
107+
processedAndProducedKeys.add(consumeProduceResult.getIn().key());
102108
log.info("Message {} saved to broker at offset {}",
103109
consumeProduceResult.getOut(),
104110
consumeProduceResult.getMeta().offset());

parallel-consumer-examples/parallel-consumer-example-core/src/test/java/io/confluent/parallelconsumer/examples/core/Bug25AppTest.java

Lines changed: 63 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,77 +7,119 @@
77
import io.confluent.parallelconsumer.ParallelConsumerOptions;
88
import io.confluent.parallelconsumer.integrationTests.KafkaTest;
99
import lombok.AllArgsConstructor;
10-
import lombok.RequiredArgsConstructor;
1110
import lombok.SneakyThrows;
1211
import lombok.extern.slf4j.Slf4j;
1312
import org.apache.kafka.clients.consumer.Consumer;
1413
import org.apache.kafka.clients.consumer.ConsumerConfig;
1514
import org.apache.kafka.clients.consumer.KafkaConsumer;
1615
import org.apache.kafka.clients.producer.Producer;
1716
import org.apache.kafka.clients.producer.ProducerRecord;
18-
import org.assertj.core.api.Assertions;
19-
import org.awaitility.Awaitility;
17+
import org.awaitility.core.ConditionTimeoutException;
18+
import org.junit.jupiter.api.RepeatedTest;
2019
import org.junit.jupiter.api.Test;
2120

2221
import java.time.Duration;
22+
import java.util.ArrayList;
23+
import java.util.List;
2324
import java.util.Properties;
2425

2526
import static io.confluent.parallelconsumer.ParallelConsumerOptions.ProcessingOrder.KEY;
2627
import static org.assertj.core.api.Assertions.assertThat;
28+
import static org.assertj.core.api.Assertions.fail;
2729
import static org.awaitility.Awaitility.waitAtMost;
2830

2931
@Slf4j
3032
public class Bug25AppTest extends KafkaTest<String, String> {
3133

32-
int DEAFULT_MAX_POLL_RECORDS_CONFIG = 500;
34+
int LOW_MAX_POLL_RECORDS_CONFIG = 1;
35+
int DEFAULT_MAX_POLL_RECORDS_CONFIG = 500;
36+
int HIGH_MAX_POLL_RECORDS_CONFIG = 10_000;
3337

34-
@Test
38+
@RepeatedTest(5)
3539
public void testTransactionalDefaultMaxPoll() {
3640
boolean tx = true;
37-
runTest(tx, DEAFULT_MAX_POLL_RECORDS_CONFIG);
41+
runTest(tx, DEFAULT_MAX_POLL_RECORDS_CONFIG);
3842
}
3943

4044
@Test
4145
public void testNonTransactionalDefaultMaxPoll() {
4246
boolean tx = false;
43-
runTest(tx, DEAFULT_MAX_POLL_RECORDS_CONFIG);
47+
runTest(tx, DEFAULT_MAX_POLL_RECORDS_CONFIG);
4448
}
4549

4650
@Test
47-
public void testTransactional() {
51+
public void testTransactionalLowMaxPoll() {
4852
boolean tx = true;
49-
runTest(tx, 1); // Sometimes causes test to fail (default 500)
53+
runTest(tx, LOW_MAX_POLL_RECORDS_CONFIG);
5054
}
5155

5256
@Test
53-
public void testNonTransactional() {
57+
public void testNonTransactionalLowMaxPoll() {
5458
boolean tx = false;
55-
runTest(tx, 1); // Sometimes causes test to fail (default 500)
59+
runTest(tx, LOW_MAX_POLL_RECORDS_CONFIG);
60+
}
61+
62+
@Test
63+
public void testTransactionalHighMaxPoll() {
64+
boolean tx = true;
65+
runTest(tx, HIGH_MAX_POLL_RECORDS_CONFIG);
66+
}
67+
68+
@Test
69+
public void testNonTransactionalHighMaxPoll() {
70+
boolean tx = false;
71+
runTest(tx, HIGH_MAX_POLL_RECORDS_CONFIG);
5672
}
5773

5874
@SneakyThrows
5975
private void runTest(boolean tx, int maxPoll) {
60-
AppUnderTest coreApp = new AppUnderTest(tx, ParallelConsumerOptions.builder().ordering(KEY).usingTransactionalProducer(tx).build(), maxPoll);
76+
AppUnderTest coreApp = new AppUnderTest(tx, ParallelConsumerOptions.builder()
77+
.ordering(KEY)
78+
.usingTransactionalProducer(tx)
79+
.build(),
80+
maxPoll);
6181

6282
ensureTopic(coreApp.inputTopic, 1);
6383
ensureTopic(coreApp.outputTopic, 1);
6484

65-
log.info("Producing 1000 messages before starting application");
85+
// pre-produce messages to input-topic
86+
List<String> expectedKeys = new ArrayList<>();
87+
int expectedMessageCount = 1000;
88+
log.info("Producing {} messages before starting application", expectedMessageCount);
6689
try (Producer<String, String> kafkaProducer = kcu.createNewProducer(false)) {
67-
for (int i = 0; i < 1000; i++) {
68-
kafkaProducer.send(new ProducerRecord<>(coreApp.inputTopic, "key-" + i, "value-" + i));
90+
for (int i = 0; i < expectedMessageCount; i++) {
91+
String key = "key-" + i;
92+
kafkaProducer.send(new ProducerRecord<>(coreApp.inputTopic, key, "value-" + i));
93+
expectedKeys.add(key);
6994
}
7095
}
7196

97+
// run parallel-consumer
7298
log.info("Starting application...");
7399
coreApp.runPollAndProduce();
74100

75-
waitAtMost(Duration.ofSeconds(30)).untilAsserted(() -> {
76-
log.info("Processed-count: " + coreApp.messagesProcessed.get());
77-
log.info("Produced-count: " + coreApp.messagesProduced.get());
78-
assertThat(coreApp.messagesProcessed.get()).isEqualTo(1000);
79-
assertThat(coreApp.messagesProduced.get()).isEqualTo(1000);
80-
});
101+
// wait for all pre-produced messages to be processed and produced
102+
try {
103+
waitAtMost(Duration.ofSeconds(30)).untilAsserted(() -> {
104+
log.debug("Processed-count: " + coreApp.processedCount.get());
105+
log.debug("Produced-count: " + coreApp.producedCount.get());
106+
List<String> processedAndProducedKeys = new ArrayList<>(coreApp.processedAndProducedKeys); // avoid concurrent-modification in assert
107+
assertThat(processedAndProducedKeys).contains(expectedKeys.toArray(new String[0]));
108+
});
109+
} catch (ConditionTimeoutException e) {
110+
String failureMessage = "All keys sent to input-topic should be processed and produced";
111+
log.warn(failureMessage);
112+
log.debug("Expected keys=" + expectedKeys + "");
113+
log.debug("Processed and produced keys=" + coreApp.processedAndProducedKeys + "");
114+
log.debug("Missing keys=" + expectedKeys.removeAll(coreApp.processedAndProducedKeys));
115+
fail(failureMessage);
116+
}
117+
118+
119+
assertThat(coreApp.processedCount.get())
120+
.as("messages processed and produced by parallel-consumer should be equal")
121+
.isEqualTo(coreApp.producedCount.get());
122+
81123

82124
coreApp.close();
83125
}
@@ -93,7 +135,6 @@ class AppUnderTest extends CoreApp {
93135
Consumer<String, String> getKafkaConsumer() {
94136
Properties props = kcu.props;
95137
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, MAX_POLL_RECORDS_CONFIG);
96-
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10000);
97138
return new KafkaConsumer<>(props);
98139
}
99140

0 commit comments

Comments
 (0)