Skip to content

Commit c50ada7

Browse files
authored
feat: Support dead letter topic. (#135)
* feat: Support dead letter topic. * Fix non space * Let maxRedeliverCount param required.
1 parent 0693c2d commit c50ada7

File tree

3 files changed

+132
-2
lines changed

3 files changed

+132
-2
lines changed

pulsar/__init__.py

+70-1
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@
4848
import _pulsar
4949

5050
from _pulsar import Result, CompressionType, ConsumerType, InitialPosition, PartitionsRoutingMode, BatchingType, \
51-
LoggerLevel, BatchReceivePolicy, KeySharedPolicy, KeySharedMode, ProducerAccessMode, RegexSubscriptionMode # noqa: F401
51+
LoggerLevel, BatchReceivePolicy, KeySharedPolicy, KeySharedMode, ProducerAccessMode, RegexSubscriptionMode, \
52+
DeadLetterPolicyBuilder # noqa: F401
5253

5354
from pulsar.__about__ import __version__
5455

@@ -374,6 +375,65 @@ def __init__(self, username=None, password=None, method='basic', auth_params_str
374375
_check_type(str, method, 'method')
375376
self.auth = _pulsar.AuthenticationBasic.create(username, password, method)
376377

378+
class ConsumerDeadLetterPolicy:
379+
"""
380+
Configuration for the "dead letter queue" feature in consumer.
381+
"""
382+
def __init__(self,
383+
max_redeliver_count: int,
384+
dead_letter_topic: str = None,
385+
initial_subscription_name: str = None):
386+
"""
387+
Wrapper DeadLetterPolicy.
388+
389+
Parameters
390+
----------
391+
max_redeliver_count: Maximum number of times that a message is redelivered before being sent to the dead letter queue.
392+
- The maxRedeliverCount must be greater than 0.
393+
dead_letter_topic: Name of the dead topic where the failing messages are sent.
394+
The default value is: sourceTopicName + "-" + subscriptionName + "-DLQ"
395+
initial_subscription_name: Name of the initial subscription name of the dead letter topic.
396+
If this field is not set, the initial subscription for the dead letter topic is not created.
397+
If this field is set but the broker's `allowAutoSubscriptionCreation` is disabled, the DLQ producer
398+
fails to be created.
399+
"""
400+
builder = DeadLetterPolicyBuilder()
401+
if max_redeliver_count is None or max_redeliver_count < 1:
402+
raise ValueError("max_redeliver_count must be greater than 0")
403+
builder.maxRedeliverCount(max_redeliver_count)
404+
if dead_letter_topic is not None:
405+
builder.deadLetterTopic(dead_letter_topic)
406+
if initial_subscription_name is not None:
407+
builder.initialSubscriptionName(initial_subscription_name)
408+
self._policy = builder.build()
409+
410+
@property
411+
def dead_letter_topic(self) -> str:
412+
"""
413+
Return the dead letter topic for dead letter policy.
414+
"""
415+
return self._policy.getDeadLetterTopic()
416+
417+
@property
418+
def max_redeliver_count(self) -> int:
419+
"""
420+
Return the max redeliver count for dead letter policy.
421+
"""
422+
return self._policy.getMaxRedeliverCount()
423+
424+
@property
425+
def initial_subscription_name(self) -> str:
426+
"""
427+
Return the initial subscription name for dead letter policy.
428+
"""
429+
return self._policy.getInitialSubscriptionName()
430+
431+
def policy(self):
432+
"""
433+
Returns the actual one DeadLetterPolicy.
434+
"""
435+
return self._policy
436+
377437
class Client:
378438
"""
379439
The Pulsar client. A single client instance can be used to create producers
@@ -708,6 +768,7 @@ def subscribe(self, topic, subscription_name,
708768
key_shared_policy=None,
709769
batch_index_ack_enabled=False,
710770
regex_subscription_mode=RegexSubscriptionMode.PersistentOnly,
771+
dead_letter_policy: ConsumerDeadLetterPolicy = None,
711772
):
712773
"""
713774
Subscribe to the given topic and subscription combination.
@@ -805,6 +866,12 @@ def my_listener(consumer, message):
805866
* PersistentOnly: By default only subscribe to persistent topics.
806867
* NonPersistentOnly: Only subscribe to non-persistent topics.
807868
* AllTopics: Subscribe to both persistent and non-persistent topics.
869+
dead_letter_policy: class ConsumerDeadLetterPolicy
870+
Set dead letter policy for consumer.
871+
By default, some messages are redelivered many times, even to the extent that they can never be
872+
stopped. By using the dead letter mechanism, messages have the max redelivery count, when they're
873+
exceeding the maximum number of redeliveries. Messages are sent to dead letter topics and acknowledged
874+
automatically.
808875
"""
809876
_check_type(str, subscription_name, 'subscription_name')
810877
_check_type(ConsumerType, consumer_type, 'consumer_type')
@@ -864,6 +931,8 @@ def my_listener(consumer, message):
864931
if key_shared_policy:
865932
conf.key_shared_policy(key_shared_policy.policy())
866933
conf.batch_index_ack_enabled(batch_index_ack_enabled)
934+
if dead_letter_policy:
935+
conf.dead_letter_policy(dead_letter_policy.policy())
867936

868937
c = Consumer()
869938
if isinstance(topic, str):

src/config.cc

+18-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <pulsar/ConsumerConfiguration.h>
2323
#include <pulsar/ProducerConfiguration.h>
2424
#include <pulsar/KeySharedPolicy.h>
25+
#include <pulsar/DeadLetterPolicyBuilder.h>
2526
#include <pybind11/functional.h>
2627
#include <pybind11/pybind11.h>
2728
#include <pybind11/stl.h>
@@ -231,6 +232,20 @@ void export_config(py::module_& m) {
231232
.def("getMaxNumMessages", &BatchReceivePolicy::getMaxNumMessages)
232233
.def("getMaxNumBytes", &BatchReceivePolicy::getMaxNumBytes);
233234

235+
class_<DeadLetterPolicy>(m, "DeadLetterPolicy")
236+
.def(init<>())
237+
.def("getDeadLetterTopic", &DeadLetterPolicy::getDeadLetterTopic)
238+
.def("getMaxRedeliverCount", &DeadLetterPolicy::getMaxRedeliverCount)
239+
.def("getInitialSubscriptionName", &DeadLetterPolicy::getInitialSubscriptionName);
240+
241+
class_<DeadLetterPolicyBuilder>(m, "DeadLetterPolicyBuilder")
242+
.def(init<>())
243+
.def("deadLetterTopic", &DeadLetterPolicyBuilder::deadLetterTopic, return_value_policy::reference)
244+
.def("maxRedeliverCount", &DeadLetterPolicyBuilder::maxRedeliverCount, return_value_policy::reference)
245+
.def("initialSubscriptionName", &DeadLetterPolicyBuilder::initialSubscriptionName, return_value_policy::reference)
246+
.def("build", &DeadLetterPolicyBuilder::build, return_value_policy::reference)
247+
.def("build", &DeadLetterPolicyBuilder::build, return_value_policy::reference);
248+
234249
class_<ConsumerConfiguration, std::shared_ptr<ConsumerConfiguration>>(m, "ConsumerConfiguration")
235250
.def(init<>())
236251
.def("key_shared_policy", &ConsumerConfiguration::getKeySharedPolicy)
@@ -285,7 +300,9 @@ void export_config(py::module_& m) {
285300
return_value_policy::reference)
286301
.def("batch_index_ack_enabled", &ConsumerConfiguration::isBatchIndexAckEnabled)
287302
.def("batch_index_ack_enabled", &ConsumerConfiguration::setBatchIndexAckEnabled,
288-
return_value_policy::reference);
303+
return_value_policy::reference)
304+
.def("dead_letter_policy", &ConsumerConfiguration::setDeadLetterPolicy)
305+
.def("dead_letter_policy", &ConsumerConfiguration::getDeadLetterPolicy, return_value_policy::copy);
289306

290307
class_<ReaderConfiguration, std::shared_ptr<ReaderConfiguration>>(m, "ReaderConfiguration")
291308
.def(init<>())

tests/pulsar_test.py

+44
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
CryptoKeyReader,
4545
ConsumerBatchReceivePolicy,
4646
ProducerAccessMode,
47+
ConsumerDeadLetterPolicy,
4748
)
4849
from pulsar.schema import JsonSchema, Record, Integer
4950

@@ -1714,6 +1715,49 @@ def test_batch_index_ack(self):
17141715
# assert no more msgs.
17151716
with self.assertRaises(pulsar.Timeout):
17161717
consumer.receive(timeout_millis=1000)
1718+
client.close()
1719+
1720+
def test_dead_letter_policy_config(self):
1721+
with self.assertRaises(ValueError):
1722+
ConsumerDeadLetterPolicy(-1)
1723+
1724+
policy = ConsumerDeadLetterPolicy(10)
1725+
self.assertEqual(10, policy.max_redeliver_count)
1726+
self.assertEqual("", policy.dead_letter_topic)
1727+
self.assertEqual("", policy.initial_subscription_name)
1728+
1729+
def test_dead_letter_policy(self):
1730+
client = Client(self.serviceUrl)
1731+
topic = "my-python-topic-test-dlq" + str(time.time())
1732+
dlq_topic = 'dlq-' + topic
1733+
max_redeliver_count = 5
1734+
consumer = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.Shared,
1735+
dead_letter_policy=ConsumerDeadLetterPolicy(max_redeliver_count, dlq_topic, 'init-sub'))
1736+
dlq_consumer = client.subscribe(dlq_topic, "my-sub", consumer_type=ConsumerType.Shared)
1737+
1738+
# Sen num msgs.
1739+
producer = client.create_producer(topic)
1740+
num = 10
1741+
for i in range(num):
1742+
producer.send(b"hello-%d" % i)
1743+
producer.flush()
1744+
1745+
# Redelivery all messages maxRedeliverCountNum time.
1746+
for i in range(1, num * max_redeliver_count + num + 1):
1747+
msg = consumer.receive()
1748+
if i % num == 0:
1749+
consumer.redeliver_unacknowledged_messages()
1750+
print(f"Start redeliver msgs '{i}'")
1751+
with self.assertRaises(pulsar.Timeout):
1752+
consumer.receive(100)
1753+
1754+
for i in range(num):
1755+
msg = dlq_consumer.receive()
1756+
self.assertTrue(msg)
1757+
self.assertEqual(msg.data(), b"hello-%d" % i)
1758+
dlq_consumer.acknowledge(msg)
1759+
with self.assertRaises(pulsar.Timeout):
1760+
dlq_consumer.receive(100)
17171761

17181762
client.close()
17191763

0 commit comments

Comments
 (0)