Skip to content

Low sendTimeout when producing to kafka can cause reprocessing and duplicates #120

Closed
@JorgenRingen

Description

@JorgenRingen

Noticed this timeout from time to time:

io.confluent.parallelconsumer.InternalRuntimeError: java.util.concurrent.TimeoutException: Timeout after waiting for 2000 ms.
	at io.confluent.parallelconsumer.ProducerManager.produceMessage(ProducerManager.java:145) ~[parallel-consumer-core-0.3.0.2.jar:?]
	at io.confluent.parallelconsumer.ParallelEoSStreamProcessor.lambda$pollAndProduceMany$6(ParallelEoSStreamProcessor.java:403) ~[parallel-consumer-core-0.3.0.2.jar:?]
	at io.confluent.parallelconsumer.ParallelEoSStreamProcessor.userFunctionRunner(ParallelEoSStreamProcessor.java:997) ~[parallel-consumer-core-0.3.0.2.jar:?]
	at io.confluent.parallelconsumer.ParallelEoSStreamProcessor.lambda$submitWorkToPool$14(ParallelEoSStreamProcessor.java:968) ~[parallel-consumer-core-0.3.0.2.jar:?]
	at java.util.concurrent.FutureTask.run(Unknown Source) [?:?]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
	at java.lang.Thread.run(Unknown Source) [?:?]
	Caused by: java.util.concurrent.TimeoutException: Timeout after waiting for 2000 ms.

This happens especially often during restarts of kafka-brokers and network issues as Producer#send can take some time to return.

The timeout in ProducerManager#sendTimeoutSeconds is hardcoded to 2 seconds (https://github.com/confluentinc/parallel-consumer/blob/master/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ProducerManager.java#L45), which causes timeouts if Producer#send takes more than 2 sec (https://github.com/confluentinc/parallel-consumer/blob/master/parallel-consumer-co[…]rc/main/java/io/confluent/parallelconsumer/ProducerManager.java).

This has some side-effects:

  • records will be re-processed inside consumer-function if the timeout occurs
  • the output-records from the consumer-function can be sent multiple times to kafka as the 2sec-timeout isn’t aligned with the timeout(s) in KafkaProducer (delivery.timeout.ms is 2 minutes by default for example). KafkaProducer#send will likely succeed even if the timeout occurs.

Some suggestions:

  • increase timeout or make it configurable
  • rely on the timeout-settings of the kafka-producer (delivery.timeout.ms probably?) instead of using a separate timeout.

Related epic: #65

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions