diff --git a/pulsar/__init__.py b/pulsar/__init__.py index 6a6ee6c..8e0907e 100644 --- a/pulsar/__init__.py +++ b/pulsar/__init__.py @@ -48,7 +48,8 @@ import _pulsar from _pulsar import Result, CompressionType, ConsumerType, InitialPosition, PartitionsRoutingMode, BatchingType, \ - LoggerLevel, BatchReceivePolicy, KeySharedPolicy, KeySharedMode, ProducerAccessMode, RegexSubscriptionMode # noqa: F401 + LoggerLevel, BatchReceivePolicy, KeySharedPolicy, KeySharedMode, ProducerAccessMode, RegexSubscriptionMode, \ + DeadLetterPolicyBuilder # noqa: F401 from pulsar.__about__ import __version__ @@ -374,6 +375,65 @@ def __init__(self, username=None, password=None, method='basic', auth_params_str _check_type(str, method, 'method') self.auth = _pulsar.AuthenticationBasic.create(username, password, method) +class ConsumerDeadLetterPolicy: + """ + Configuration for the "dead letter queue" feature in consumer. + """ + def __init__(self, + max_redeliver_count: int, + dead_letter_topic: str = None, + initial_subscription_name: str = None): + """ + Wrapper DeadLetterPolicy. + + Parameters + ---------- + max_redeliver_count: Maximum number of times that a message is redelivered before being sent to the dead letter queue. + - The maxRedeliverCount must be greater than 0. + dead_letter_topic: Name of the dead topic where the failing messages are sent. + The default value is: sourceTopicName + "-" + subscriptionName + "-DLQ" + initial_subscription_name: Name of the initial subscription name of the dead letter topic. + If this field is not set, the initial subscription for the dead letter topic is not created. + If this field is set but the broker's `allowAutoSubscriptionCreation` is disabled, the DLQ producer + fails to be created. + """ + builder = DeadLetterPolicyBuilder() + if max_redeliver_count is None or max_redeliver_count < 1: + raise ValueError("max_redeliver_count must be greater than 0") + builder.maxRedeliverCount(max_redeliver_count) + if dead_letter_topic is not None: + builder.deadLetterTopic(dead_letter_topic) + if initial_subscription_name is not None: + builder.initialSubscriptionName(initial_subscription_name) + self._policy = builder.build() + + @property + def dead_letter_topic(self) -> str: + """ + Return the dead letter topic for dead letter policy. + """ + return self._policy.getDeadLetterTopic() + + @property + def max_redeliver_count(self) -> int: + """ + Return the max redeliver count for dead letter policy. + """ + return self._policy.getMaxRedeliverCount() + + @property + def initial_subscription_name(self) -> str: + """ + Return the initial subscription name for dead letter policy. + """ + return self._policy.getInitialSubscriptionName() + + def policy(self): + """ + Returns the actual one DeadLetterPolicy. + """ + return self._policy + class Client: """ The Pulsar client. A single client instance can be used to create producers @@ -708,6 +768,7 @@ def subscribe(self, topic, subscription_name, key_shared_policy=None, batch_index_ack_enabled=False, regex_subscription_mode=RegexSubscriptionMode.PersistentOnly, + dead_letter_policy: ConsumerDeadLetterPolicy = None, ): """ Subscribe to the given topic and subscription combination. @@ -805,6 +866,12 @@ def my_listener(consumer, message): * PersistentOnly: By default only subscribe to persistent topics. * NonPersistentOnly: Only subscribe to non-persistent topics. * AllTopics: Subscribe to both persistent and non-persistent topics. + dead_letter_policy: class ConsumerDeadLetterPolicy + Set dead letter policy for consumer. + By default, some messages are redelivered many times, even to the extent that they can never be + stopped. By using the dead letter mechanism, messages have the max redelivery count, when they're + exceeding the maximum number of redeliveries. Messages are sent to dead letter topics and acknowledged + automatically. """ _check_type(str, subscription_name, 'subscription_name') _check_type(ConsumerType, consumer_type, 'consumer_type') @@ -864,6 +931,8 @@ def my_listener(consumer, message): if key_shared_policy: conf.key_shared_policy(key_shared_policy.policy()) conf.batch_index_ack_enabled(batch_index_ack_enabled) + if dead_letter_policy: + conf.dead_letter_policy(dead_letter_policy.policy()) c = Consumer() if isinstance(topic, str): diff --git a/src/config.cc b/src/config.cc index 23a5b80..ac643b7 100644 --- a/src/config.cc +++ b/src/config.cc @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include @@ -231,6 +232,20 @@ void export_config(py::module_& m) { .def("getMaxNumMessages", &BatchReceivePolicy::getMaxNumMessages) .def("getMaxNumBytes", &BatchReceivePolicy::getMaxNumBytes); + class_(m, "DeadLetterPolicy") + .def(init<>()) + .def("getDeadLetterTopic", &DeadLetterPolicy::getDeadLetterTopic) + .def("getMaxRedeliverCount", &DeadLetterPolicy::getMaxRedeliverCount) + .def("getInitialSubscriptionName", &DeadLetterPolicy::getInitialSubscriptionName); + + class_(m, "DeadLetterPolicyBuilder") + .def(init<>()) + .def("deadLetterTopic", &DeadLetterPolicyBuilder::deadLetterTopic, return_value_policy::reference) + .def("maxRedeliverCount", &DeadLetterPolicyBuilder::maxRedeliverCount, return_value_policy::reference) + .def("initialSubscriptionName", &DeadLetterPolicyBuilder::initialSubscriptionName, return_value_policy::reference) + .def("build", &DeadLetterPolicyBuilder::build, return_value_policy::reference) + .def("build", &DeadLetterPolicyBuilder::build, return_value_policy::reference); + class_>(m, "ConsumerConfiguration") .def(init<>()) .def("key_shared_policy", &ConsumerConfiguration::getKeySharedPolicy) @@ -285,7 +300,9 @@ void export_config(py::module_& m) { return_value_policy::reference) .def("batch_index_ack_enabled", &ConsumerConfiguration::isBatchIndexAckEnabled) .def("batch_index_ack_enabled", &ConsumerConfiguration::setBatchIndexAckEnabled, - return_value_policy::reference); + return_value_policy::reference) + .def("dead_letter_policy", &ConsumerConfiguration::setDeadLetterPolicy) + .def("dead_letter_policy", &ConsumerConfiguration::getDeadLetterPolicy, return_value_policy::copy); class_>(m, "ReaderConfiguration") .def(init<>()) diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py index 80c98f5..ae6aa3b 100755 --- a/tests/pulsar_test.py +++ b/tests/pulsar_test.py @@ -44,6 +44,7 @@ CryptoKeyReader, ConsumerBatchReceivePolicy, ProducerAccessMode, + ConsumerDeadLetterPolicy, ) from pulsar.schema import JsonSchema, Record, Integer @@ -1714,6 +1715,49 @@ def test_batch_index_ack(self): # assert no more msgs. with self.assertRaises(pulsar.Timeout): consumer.receive(timeout_millis=1000) + client.close() + + def test_dead_letter_policy_config(self): + with self.assertRaises(ValueError): + ConsumerDeadLetterPolicy(-1) + + policy = ConsumerDeadLetterPolicy(10) + self.assertEqual(10, policy.max_redeliver_count) + self.assertEqual("", policy.dead_letter_topic) + self.assertEqual("", policy.initial_subscription_name) + + def test_dead_letter_policy(self): + client = Client(self.serviceUrl) + topic = "my-python-topic-test-dlq" + str(time.time()) + dlq_topic = 'dlq-' + topic + max_redeliver_count = 5 + consumer = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.Shared, + dead_letter_policy=ConsumerDeadLetterPolicy(max_redeliver_count, dlq_topic, 'init-sub')) + dlq_consumer = client.subscribe(dlq_topic, "my-sub", consumer_type=ConsumerType.Shared) + + # Sen num msgs. + producer = client.create_producer(topic) + num = 10 + for i in range(num): + producer.send(b"hello-%d" % i) + producer.flush() + + # Redelivery all messages maxRedeliverCountNum time. + for i in range(1, num * max_redeliver_count + num + 1): + msg = consumer.receive() + if i % num == 0: + consumer.redeliver_unacknowledged_messages() + print(f"Start redeliver msgs '{i}'") + with self.assertRaises(pulsar.Timeout): + consumer.receive(100) + + for i in range(num): + msg = dlq_consumer.receive() + self.assertTrue(msg) + self.assertEqual(msg.data(), b"hello-%d" % i) + dlq_consumer.acknowledge(msg) + with self.assertRaises(pulsar.Timeout): + dlq_consumer.receive(100) client.close()