Skip to content

Commit 5ca0b22

Browse files
JorgenRingenastubbs
authored andcommitted
WIP! feature: Choose between Consumer commit or Producer transactional commits
Reproducing bug 25: confluentinc#25 WIP! Feature: Makes transactions optional More More - optional tx test exposes tx state error - non tx works ReentrantReadWrite lock protects non-thread safe transactional producer from incorrect multithreaded use Refactor Fix onClose, remove TransactionState monitoring attempt - not needed WIP! docs: Adds transaction system architecture Wider lock to prevent transaction's containing produced messages that they shouldn't Wider lock to prevent transaction's containing produced messages that they shouldn't Squash tighten up Tighten up Fix Implement non transactional synchronous commit sync properly Volume tests adapted to non transactional, passes More Test fix - must start tx in MockProducer as well More fixed bug in test, added more tests and better verification Fixes double lock causing deadlock, stops interruption of transaction commits Fixes wakeup issues with non-transactional commits, adds supervision to poller Bug: fix example app tests - Fixes a performance issue with the async committer not being woken up - Enhances tests to run under multiple commit modes More More WIP! Update tests to run in all 3 commit modes More More More More More - fixes example app tests - incorrectly testing wrong thing and MockProducer not configured to auto complete Use iteration over stream as can’t workout a concurrent modification here - remove Optional#ifPresentOrElse} - it’s only @SInCE 9 Fixes vertx app test, adds wiremock Squash Squash Squash Squash! Fix concurrent modification exception Squash bug: Stabilise test and make committer thread revoke partitions and commit - CloseAndOpenOffset test from some race condition - Make sure the correct thread owner of the committer is the to close the consumer and thus commit the offsets Fix concurrency and ProducerFencedExceptions Enhancement: make the thread responsible for committing the thread which runs the commit upon close Have onPartitionsRevoked be responsible for committing on close, instead of an explicit call to commit by controller Make sure Broker Poller now drains properly, committing any waiting work Add missing revoke flow to MockConsumer wrapper Add missing latch timeout check
1 parent 0b96d5e commit 5ca0b22

File tree

42 files changed

+2079
-518
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+2079
-518
lines changed

.gitignore

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,7 @@ hs_err_pid*
2929
*.iml
3030
target/
3131
.DS_Store
32-
*.versionsBackup
32+
*.versionsBackup
33+
34+
# JENV
35+
.java-version

.idea/runConfigurations/All_examples.xml

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

README.adoc

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -313,18 +313,17 @@ Where `${project.version}` is the version to be used:
313313
.Setup the client
314314
[source,java,indent=0]
315315
----
316-
var options = ParallelConsumerOptions.builder()
317-
.ordering(KEY) // <1>
318-
.maxConcurrency(1000) // <2>
319-
.maxUncommittedMessagesToHandlePerPartition(10000) // <3>
320-
.build();
316+
ParallelConsumerOptions options = getOptions();
321317
322318
Consumer<String, String> kafkaConsumer = getKafkaConsumer(); // <4>
319+
Producer<String, String> kafkaProducer = getKafkaProducer();
320+
321+
ParallelStreamProcessor<String, String> eosStreamProcessor = ParallelStreamProcessor.createEosStreamProcessor(kafkaConsumer, kafkaProducer, options);
323322
if (!(kafkaConsumer instanceof MockConsumer)) {
324-
kafkaConsumer.subscribe(UniLists.of(inputTopic)); // <5>
323+
eosStreamProcessor.subscribe(UniLists.of(inputTopic)); // <5>
325324
}
326325
327-
return ParallelStreamProcessor.createEosStreamProcessor(kafkaConsumer, getKafkaProducer(), options);
326+
return eosStreamProcessor;
328327
----
329328
<1> Choose your ordering type, `KEY` in this case.
330329
This ensures maximum concurrency, while ensuring messages are processed and committed in `KEY` order, making sure no offset is committed unless all offsets before it in it's partition, are completed also.
@@ -385,12 +384,17 @@ You can also optionally provide a callback function to be run after the message(
385384
this.parallelConsumer.pollAndProduce(record -> {
386385
var result = processBrokerRecord(record);
387386
ProducerRecord<String, String> produceRecord =
388-
new ProducerRecord<>(outputTopic, "a-key", result.payload);
387+
new ProducerRecord<>(outputTopic, record.key(), result.payload);
388+
389+
processedCount.incrementAndGet();
389390
return UniLists.of(produceRecord);
390-
}, consumeProduceResult ->
391-
log.info("Message {} saved to broker at offset {}",
392-
consumeProduceResult.getOut(),
393-
consumeProduceResult.getMeta().offset())
391+
}, consumeProduceResult -> {
392+
producedCount.incrementAndGet();
393+
processedAndProducedKeys.add(consumeProduceResult.getIn().key());
394+
log.debug("Message {} saved to broker at offset {}",
395+
consumeProduceResult.getOut(),
396+
consumeProduceResult.getMeta().offset());
397+
}
394398
);
395399

396400

@@ -412,7 +416,7 @@ In future versions, we plan to look at supporting other streaming systems like h
412416
var resultStream = parallelConsumer.vertxHttpReqInfoStream(record -> {
413417
log.info("Concurrently constructing and returning RequestInfo from record: {}", record);
414418
Map<String, String> params = UniMaps.of("recordKey", record.key(), "payload", record.value());
415-
return new RequestInfo("localhost", "/api", params); // <1>
419+
return new RequestInfo("localhost", port, "/api", params); // <1>
416420
});
417421
----
418422
<1> Simply return an object representing the request, the Vert.x HTTP engine will handle the rest, using it's non-blocking engine
@@ -453,7 +457,7 @@ image::https://lucid.app/publicSegments/view/43f2740c-2a7f-4b7f-909e-434a5bbe3fb
453457
}
454458
455459
void concurrentProcess() {
456-
setupConsumer();
460+
setupParallelConsumer();
457461
458462
parallelConsumer.poll(record -> {
459463
log.info("Concurrently processing a record: {}", record);
@@ -562,6 +566,13 @@ your processing, you create side effects in other systems, this pertains to the
562566

563567
CAUTION: This cannot be true for any externally integrated third party system, unless that system is __idempotent__.
564568

569+
You can choose to use the Transactional system, or not.
570+
571+
Pros:
572+
Cons:
573+
574+
For implementations details, see the <<Transactional System Architecture>> section.
575+
565576
[[streams-usage]]
566577
== Using with Kafka Streams
567578

@@ -748,6 +759,10 @@ Instead of the work thread pool count being the degree of concurrency, it is con
748759
.Vert.x Architecture
749760
image::https://lucid.app/publicSegments/view/509df410-5997-46be-98e7-ac7f241780b4/image.png[Vert.x Architecture, align="center"]
750761

762+
=== Transactional System Architecture
763+
764+
image::https://lucid.app/publicSegments/view/7480d948-ed7d-4370-a308-8ec12e6b453b/image.png[]
765+
751766
=== Offset Map
752767

753768
==== Storage

parallel-consumer-core/src/main/java/io/confluent/csid/utils/StringUtils.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,11 @@
99
public class StringUtils {
1010

1111
public static String msg(String s, Object... args) {
12-
String message = MessageFormatter.basicArrayFormat(s, args);
13-
return message;
12+
return MessageFormatter.basicArrayFormat(s, args);
13+
}
14+
15+
public static boolean isBlank(final String property) {
16+
if (property == null) return true;
17+
else return property.trim().isEmpty(); // isBlank @since 11
1418
}
1519
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package io.confluent.parallelconsumer;
2+
3+
import lombok.RequiredArgsConstructor;
4+
import lombok.extern.slf4j.Slf4j;
5+
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
6+
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
7+
import org.apache.kafka.common.TopicPartition;
8+
9+
import java.util.Map;
10+
11+
@Slf4j
12+
@RequiredArgsConstructor
13+
public abstract class AbstractOffsetCommitter<K, V> implements OffsetCommitter {
14+
15+
protected final ConsumerManager<K, V> consumerMgr;
16+
protected final WorkManager<K, V> wm;
17+
18+
/**
19+
* Get offsets from {@link WorkManager} that are ready to commit
20+
*/
21+
@Override
22+
public void retrieveOffsetsAndCommit() {
23+
log.debug("Commit starting - find completed work to commit offsets");
24+
// todo shouldn't be removed until commit succeeds (there's no harm in committing the same offset twice)
25+
preAcquireWork();
26+
try {
27+
Map<TopicPartition, OffsetAndMetadata> offsetsToSend = wm.findCompletedEligibleOffsetsAndRemove();
28+
if (offsetsToSend.isEmpty()) {
29+
log.trace("No offsets ready");
30+
} else {
31+
log.debug("Will commit offsets for {} partition(s): {}", offsetsToSend.size(), offsetsToSend);
32+
ConsumerGroupMetadata groupMetadata = consumerMgr.groupMetadata();
33+
34+
log.debug("Begin commit");
35+
commitOffsets(offsetsToSend, groupMetadata);
36+
37+
log.debug("On commit success");
38+
onOffsetCommitSuccess(offsetsToSend);
39+
}
40+
} finally {
41+
postCommit();
42+
}
43+
}
44+
45+
protected void postCommit() {
46+
// default noop
47+
}
48+
49+
protected void preAcquireWork() {
50+
// default noop
51+
}
52+
53+
private void onOffsetCommitSuccess(final Map<TopicPartition, OffsetAndMetadata> offsetsToSend) {
54+
wm.onOffsetCommitSuccess(offsetsToSend);
55+
}
56+
57+
protected abstract void commitOffsets(final Map<TopicPartition, OffsetAndMetadata> offsetsToSend, final ConsumerGroupMetadata groupMetadata);
58+
59+
}

0 commit comments

Comments
 (0)