Skip to content

Commit ce58c57

Browse files
committed
feat: Support dead letter topic.
1 parent 766db9e commit ce58c57

File tree

3 files changed

+125
-4
lines changed

3 files changed

+125
-4
lines changed

pulsar/__init__.py

+72-3
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848
import _pulsar
4949

5050
from _pulsar import Result, CompressionType, ConsumerType, InitialPosition, PartitionsRoutingMode, BatchingType, \
51-
LoggerLevel, BatchReceivePolicy, KeySharedPolicy, KeySharedMode # noqa: F401
51+
LoggerLevel, BatchReceivePolicy, KeySharedPolicy, KeySharedMode, DeadLetterPolicy, DeadLetterPolicyBuilder # noqa: F401
5252

5353
from pulsar.__about__ import __version__
5454

@@ -374,6 +374,64 @@ def __init__(self, username=None, password=None, method='basic', auth_params_str
374374
_check_type(str, method, 'method')
375375
self.auth = _pulsar.AuthenticationBasic.create(username, password, method)
376376

377+
class ConsumerDeadLetterPolicy:
378+
"""
379+
Configuration for the "dead letter queue" feature in consumer.
380+
"""
381+
def __init__(self, dead_letter_topic: str = None,
382+
max_redeliver_count: int = None,
383+
initial_subscription_name: str = None):
384+
"""
385+
Wrapper DeadLetterPolicy.
386+
387+
Parameters
388+
----------
389+
dead_letter_topic: Name of the dead topic where the failing messages are sent.
390+
The default value is: sourceTopicName + "-" + subscriptionName + "-DLQ"
391+
max_redeliver_count: Maximum number of times that a message is redelivered before being sent to the dead letter queue.
392+
- The maxRedeliverCount must be greater than 0.
393+
- The default value is None (DLQ is not enabled)
394+
initial_subscription_name: Name of the initial subscription name of the dead letter topic.
395+
If this field is not set, the initial subscription for the dead letter topic is not created.
396+
If this field is set but the broker's `allowAutoSubscriptionCreation` is disabled, the DLQ producer
397+
fails to be created.
398+
"""
399+
builder = DeadLetterPolicyBuilder()
400+
if dead_letter_topic is not None:
401+
builder.deadLetterTopic(dead_letter_topic)
402+
if max_redeliver_count is not None:
403+
builder.maxRedeliverCount(max_redeliver_count)
404+
if initial_subscription_name is not None:
405+
builder.initialSubscriptionName(initial_subscription_name)
406+
self._policy = builder.build()
407+
408+
@property
409+
def dead_letter_topic(self) -> str:
410+
"""
411+
Return the dead letter topic for dead letter policy.
412+
"""
413+
return self._policy.getDeadLetterTopic()
414+
415+
@property
416+
def max_redeliver_count(self) -> int:
417+
"""
418+
Return the max redeliver count for dead letter policy.
419+
"""
420+
return self._policy.getMaxRedeliverCount()
421+
422+
@property
423+
def initial_subscription_name(self) -> str:
424+
"""
425+
Return the initial subscription name for dead letter policy.
426+
"""
427+
return self._policy.getInitialSubscriptionName()
428+
429+
def policy(self):
430+
"""
431+
Returns the actual one DeadLetterPolicy.
432+
"""
433+
return self._policy
434+
377435
class Client:
378436
"""
379437
The Pulsar client. A single client instance can be used to create producers
@@ -692,7 +750,8 @@ def subscribe(self, topic, subscription_name,
692750
auto_ack_oldest_chunked_message_on_queue_full=False,
693751
start_message_id_inclusive=False,
694752
batch_receive_policy=None,
695-
key_shared_policy=None
753+
key_shared_policy=None,
754+
dead_letter_policy: ConsumerDeadLetterPolicy = None,
696755
):
697756
"""
698757
Subscribe to the given topic and subscription combination.
@@ -778,7 +837,13 @@ def my_listener(consumer, message):
778837
batch_receive_policy: class ConsumerBatchReceivePolicy
779838
Set the batch collection policy for batch receiving.
780839
key_shared_policy: class ConsumerKeySharedPolicy
781-
Set the key shared policy for use when the ConsumerType is KeyShared.
840+
Set the key shared policy for use when the ConsumerType is KeyShared.
841+
dead_letter_policy: class ConsumerDeadLetterPolicy
842+
Set dead letter policy for consumer.
843+
By default, some messages are redelivered many times, even to the extent that they can never be
844+
stopped. By using the dead letter mechanism, messages have the max redelivery count, when they're
845+
exceeding the maximum number of redeliveries. Messages are sent to dead letter topics and acknowledged
846+
automatically.
782847
"""
783848
_check_type(str, subscription_name, 'subscription_name')
784849
_check_type(ConsumerType, consumer_type, 'consumer_type')
@@ -834,6 +899,8 @@ def my_listener(consumer, message):
834899

835900
if key_shared_policy:
836901
conf.key_shared_policy(key_shared_policy.policy())
902+
if dead_letter_policy:
903+
conf.dead_letter_policy(dead_letter_policy.policy())
837904

838905
c = Consumer()
839906
if isinstance(topic, str):
@@ -1457,6 +1524,8 @@ def policy(self):
14571524
"""
14581525
return self._policy
14591526

1527+
1528+
14601529
class ConsumerKeySharedPolicy:
14611530
"""
14621531
Consumer key shared policy is used to configure the consumer behaviour when the ConsumerType is KeyShared.

src/config.cc

+18-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <pulsar/ConsumerConfiguration.h>
2323
#include <pulsar/ProducerConfiguration.h>
2424
#include <pulsar/KeySharedPolicy.h>
25+
#include <pulsar/DeadLetterPolicyBuilder.h>
2526
#include <pybind11/functional.h>
2627
#include <pybind11/pybind11.h>
2728
#include <pybind11/stl.h>
@@ -231,6 +232,20 @@ void export_config(py::module_& m) {
231232
.def("getMaxNumMessages", &BatchReceivePolicy::getMaxNumMessages)
232233
.def("getMaxNumBytes", &BatchReceivePolicy::getMaxNumBytes);
233234

235+
class_<DeadLetterPolicy>(m, "DeadLetterPolicy")
236+
.def(init<>())
237+
.def("getDeadLetterTopic", &DeadLetterPolicy::getDeadLetterTopic)
238+
.def("getMaxRedeliverCount", &DeadLetterPolicy::getMaxRedeliverCount)
239+
.def("getInitialSubscriptionName", &DeadLetterPolicy::getInitialSubscriptionName);
240+
241+
class_<DeadLetterPolicyBuilder>(m, "DeadLetterPolicyBuilder")
242+
.def(init<>())
243+
.def("deadLetterTopic", &DeadLetterPolicyBuilder::deadLetterTopic, return_value_policy::reference)
244+
.def("maxRedeliverCount", &DeadLetterPolicyBuilder::maxRedeliverCount, return_value_policy::reference)
245+
.def("initialSubscriptionName", &DeadLetterPolicyBuilder::initialSubscriptionName, return_value_policy::reference)
246+
.def("build", &DeadLetterPolicyBuilder::build, return_value_policy::reference)
247+
.def("build", &DeadLetterPolicyBuilder::build, return_value_policy::reference);
248+
234249
class_<ConsumerConfiguration, std::shared_ptr<ConsumerConfiguration>>(m, "ConsumerConfiguration")
235250
.def(init<>())
236251
.def("key_shared_policy", &ConsumerConfiguration::getKeySharedPolicy)
@@ -280,7 +295,9 @@ void export_config(py::module_& m) {
280295
return_value_policy::reference)
281296
.def("start_message_id_inclusive", &ConsumerConfiguration::isStartMessageIdInclusive)
282297
.def("start_message_id_inclusive", &ConsumerConfiguration::setStartMessageIdInclusive,
283-
return_value_policy::reference);
298+
return_value_policy::reference)
299+
.def("dead_letter_policy", &ConsumerConfiguration::setDeadLetterPolicy)
300+
.def("dead_letter_policy", &ConsumerConfiguration::getDeadLetterPolicy, return_value_policy::copy);
284301

285302
class_<ReaderConfiguration, std::shared_ptr<ReaderConfiguration>>(m, "ReaderConfiguration")
286303
.def(init<>())

tests/pulsar_test.py

+35
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
InitialPosition,
4343
CryptoKeyReader,
4444
ConsumerBatchReceivePolicy,
45+
ConsumerDeadLetterPolicy,
4546
)
4647
from pulsar.schema import JsonSchema, Record, Integer
4748

@@ -1590,6 +1591,40 @@ def test_acknowledge_failed(self):
15901591
consumer.acknowledge(msg_id)
15911592
client.close()
15921593

1594+
def test_dead_letter_policy(self):
1595+
client = Client(self.serviceUrl)
1596+
topic = "my-python-topic-test-dlq" + str(time.time())
1597+
dlq_topic = 'dlq-' + topic
1598+
max_redeliver_count = 5
1599+
consumer = client.subscribe(topic, "my-sub", consumer_type=ConsumerType.Shared,
1600+
dead_letter_policy=ConsumerDeadLetterPolicy(dlq_topic, max_redeliver_count, 'init-sub'))
1601+
dlq_consumer = client.subscribe(dlq_topic, "my-sub", consumer_type=ConsumerType.Shared)
1602+
1603+
# Sen num msgs.
1604+
producer = client.create_producer(topic)
1605+
num = 10
1606+
for i in range(num):
1607+
producer.send(b"hello-%d" % i)
1608+
producer.flush()
1609+
1610+
# Redelivery all messages maxRedeliverCountNum time.
1611+
for i in range(1, num * max_redeliver_count + num + 1):
1612+
msg = consumer.receive()
1613+
if i % num == 0:
1614+
consumer.redeliver_unacknowledged_messages()
1615+
print(f"Start redeliver msgs '{i}'")
1616+
with self.assertRaises(pulsar.Timeout):
1617+
consumer.receive(100)
1618+
1619+
for i in range(num):
1620+
msg = dlq_consumer.receive()
1621+
self.assertTrue(msg)
1622+
self.assertEqual(msg.data(), b"hello-%d" % i)
1623+
dlq_consumer.acknowledge(msg)
1624+
with self.assertRaises(pulsar.Timeout):
1625+
dlq_consumer.receive(100)
1626+
1627+
client.close()
15931628

15941629

15951630
if __name__ == "__main__":

0 commit comments

Comments
 (0)