Open
Description
In what version(s) of Spring for Apache Kafka are you seeing this issue?
4.0.0
Describe the bug
MessageListenerContainer when stopped abnormally it sends ConsumerStoppedEvent with Reason as Normal
To Reproduce
Please run the following Junit
@Test
public void testContainerStartStop() throws Exception {
this.logger.info("Start containerStartStop");
Map<String, Object> props = KafkaTestUtils.consumerProps("test1", "true",
embeddedKafka);
AtomicReference<Properties> overrides = new AtomicReference<>();
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(props) {
@Override
protected Consumer<Integer, String> createKafkaConsumer(String groupId, String clientIdPrefix,
String clientIdSuffixArg, Properties properties) {
overrides.set(properties);
return super.createKafkaConsumer(groupId, clientIdPrefix, clientIdSuffixArg, properties);
}
};
ContainerProperties containerProps = new ContainerProperties(topic1);
containerProps.setLogContainerConfig(true);
containerProps.setClientId("client");
containerProps.setAckMode(ContainerProperties.AckMode.RECORD);
final List<String> payloads = new ArrayList<>();
containerProps.setMessageListener((MessageListener<Integer, String>) message -> {
payloads.add(message.value());
});
ConcurrentMessageListenerContainer<Integer, String> container =
new ConcurrentMessageListenerContainer<>(cf, containerProps);
container.setConcurrency(1);
container.setBeanName("testAuto");
container.setChangeConsumerThreadName(true);
BlockingQueue<KafkaEvent> events = new LinkedBlockingQueue<>();
container.setCommonErrorHandler(null);
container.start();
KafkaMessageListenerContainer<Integer, String> childContainer0 = container.getContainers().get(0);
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic());
assertThat(container.getAssignedPartitions()).hasSize(2);
Map<String, Collection<TopicPartition>> assignments = container.getAssignmentsByClientId();
assertThat(assignments).hasSize(1);
assertThat(assignments.get("client-0")).isNotNull();
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(topic1);
template.sendDefault(0, 0, "foo");
template.sendDefault(1, 2, "bar");
template.flush();
assertThat(container.metrics()).isNotNull();
assertThat(container.isInExpectedState()).isTrue();
assertThat(childContainer0.isRunning()).isTrue();
assertThat(container.isChildRunning()).isTrue();
assertThat(container.isChildRunning()).isTrue();
childContainer0.stopAbnormally(() -> {
});
assertThat(container.getContainers()).isNotEmpty();
container.stop();
events.stream().forEach(event -> {
if (event.getContainer(MessageListenerContainer.class).equals(childContainer0)
&& event instanceof ConsumerStoppedEvent) {
assertThat(((ConsumerStoppedEvent) event).getReason()).isEqualTo(ConsumerStoppedEvent.Reason.NORMAL);
}
});
}
Expected behavior
Consider providing reason as Abnormal when container stopped abnormally.