Skip to content

Support ConsumerCryptoFailureAction for consumer and reader #253

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 34 additions & 2 deletions pulsar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@

from _pulsar import Result, CompressionType, ConsumerType, InitialPosition, PartitionsRoutingMode, BatchingType, \
LoggerLevel, BatchReceivePolicy, KeySharedPolicy, KeySharedMode, ProducerAccessMode, RegexSubscriptionMode, \
DeadLetterPolicyBuilder # noqa: F401
DeadLetterPolicyBuilder, ConsumerCryptoFailureAction # noqa: F401

from pulsar.__about__ import __version__

Expand Down Expand Up @@ -846,6 +846,7 @@ def subscribe(self, topic, subscription_name,
batch_index_ack_enabled=False,
regex_subscription_mode: RegexSubscriptionMode = RegexSubscriptionMode.PersistentOnly,
dead_letter_policy: Union[None, ConsumerDeadLetterPolicy] = None,
crypto_failure_action: ConsumerCryptoFailureAction = ConsumerCryptoFailureAction.FAIL,
):
"""
Subscribe to the given topic and subscription combination.
Expand Down Expand Up @@ -949,6 +950,19 @@ def my_listener(consumer, message):
stopped. By using the dead letter mechanism, messages have the max redelivery count, when they're
exceeding the maximum number of redeliveries. Messages are sent to dead letter topics and acknowledged
automatically.
crypto_failure_action: ConsumerCryptoFailureAction, default=ConsumerCryptoFailureAction.FAIL
Set the behavior when the decryption fails. The default is to fail the message.

Supported actions:

* ConsumerCryptoFailureAction.FAIL: Fail consume until crypto succeeds
* ConsumerCryptoFailureAction.DISCARD:
Message is silently acknowledged and not delivered to the application.
* ConsumerCryptoFailureAction.CONSUME:
Deliver the encrypted message to the application. It's the application's responsibility
to decrypt the message. If message is also compressed, decompression will fail. If the
message contains batch messages, client will not be able to retrieve individual messages
in the batch.
"""
_check_type(str, subscription_name, 'subscription_name')
_check_type(ConsumerType, consumer_type, 'consumer_type')
Expand All @@ -972,6 +986,7 @@ def my_listener(consumer, message):
_check_type_or_none(ConsumerKeySharedPolicy, key_shared_policy, 'key_shared_policy')
_check_type(bool, batch_index_ack_enabled, 'batch_index_ack_enabled')
_check_type(RegexSubscriptionMode, regex_subscription_mode, 'regex_subscription_mode')
_check_type(ConsumerCryptoFailureAction, crypto_failure_action, 'crypto_failure_action')

conf = _pulsar.ConsumerConfiguration()
conf.consumer_type(consumer_type)
Expand Down Expand Up @@ -1010,6 +1025,7 @@ def my_listener(consumer, message):
conf.batch_index_ack_enabled(batch_index_ack_enabled)
if dead_letter_policy:
conf.dead_letter_policy(dead_letter_policy.policy())
conf.crypto_failure_action(crypto_failure_action)

c = Consumer()
if isinstance(topic, str):
Expand Down Expand Up @@ -1038,7 +1054,8 @@ def create_reader(self, topic, start_message_id,
subscription_role_prefix=None,
is_read_compacted=False,
crypto_key_reader: Union[None, CryptoKeyReader] = None,
start_message_id_inclusive=False
start_message_id_inclusive=False,
crypto_failure_action: ConsumerCryptoFailureAction = ConsumerCryptoFailureAction.FAIL,
):
"""
Create a reader on a particular topic
Expand Down Expand Up @@ -1099,6 +1116,19 @@ def my_listener(reader, message):
and private key decryption messages for the consumer
start_message_id_inclusive: bool, default=False
Set the reader to include the startMessageId or given position of any reset operation like Reader.seek
crypto_failure_action: ConsumerCryptoFailureAction, default=ConsumerCryptoFailureAction.FAIL
Set the behavior when the decryption fails. The default is to fail the message.

Supported actions:

* ConsumerCryptoFailureAction.FAIL: Fail consume until crypto succeeds
* ConsumerCryptoFailureAction.DISCARD:
Message is silently acknowledged and not delivered to the application.
* ConsumerCryptoFailureAction.CONSUME:
Deliver the encrypted message to the application. It's the application's responsibility
to decrypt the message. If message is also compressed, decompression will fail. If the
message contains batch messages, client will not be able to retrieve individual messages
in the batch.
"""

# If a pulsar.MessageId object is passed, access the _pulsar.MessageId object
Expand All @@ -1114,6 +1144,7 @@ def my_listener(reader, message):
_check_type(bool, is_read_compacted, 'is_read_compacted')
_check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')
_check_type(bool, start_message_id_inclusive, 'start_message_id_inclusive')
_check_type(ConsumerCryptoFailureAction, crypto_failure_action, 'crypto_failure_action')

conf = _pulsar.ReaderConfiguration()
if reader_listener:
Expand All @@ -1128,6 +1159,7 @@ def my_listener(reader, message):
if crypto_key_reader:
conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
conf.start_message_id_inclusive(start_message_id_inclusive)
conf.crypto_failure_action(crypto_failure_action)

c = Reader()
c._reader = self._client.create_reader(topic, start_message_id, conf)
Expand Down
12 changes: 10 additions & 2 deletions src/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,11 @@ void export_config(py::module_& m) {
.def("batch_index_ack_enabled", &ConsumerConfiguration::setBatchIndexAckEnabled,
return_value_policy::reference)
.def("dead_letter_policy", &ConsumerConfiguration::setDeadLetterPolicy)
.def("dead_letter_policy", &ConsumerConfiguration::getDeadLetterPolicy, return_value_policy::copy);
.def("dead_letter_policy", &ConsumerConfiguration::getDeadLetterPolicy, return_value_policy::copy)
.def("crypto_failure_action", &ConsumerConfiguration::getCryptoFailureAction,
return_value_policy::copy)
.def("crypto_failure_action", &ConsumerConfiguration::setCryptoFailureAction,
return_value_policy::reference);

class_<ReaderConfiguration, std::shared_ptr<ReaderConfiguration>>(m, "ReaderConfiguration")
.def(init<>())
Expand All @@ -331,5 +335,9 @@ void export_config(py::module_& m) {
.def("read_compacted", &ReaderConfiguration::setReadCompacted)
.def("crypto_key_reader", &ReaderConfiguration::setCryptoKeyReader, return_value_policy::reference)
.def("start_message_id_inclusive", &ReaderConfiguration::isStartMessageIdInclusive)
.def("start_message_id_inclusive", &ReaderConfiguration::setStartMessageIdInclusive, return_value_policy::reference);
.def("start_message_id_inclusive", &ReaderConfiguration::setStartMessageIdInclusive, return_value_policy::reference)
.def("crypto_failure_action", &ReaderConfiguration::getCryptoFailureAction,
return_value_policy::copy)
.def("crypto_failure_action", &ReaderConfiguration::setCryptoFailureAction,
return_value_policy::reference);
}
6 changes: 6 additions & 0 deletions src/enums.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "utils.h"
#include <pulsar/CompressionType.h>
#include <pulsar/ConsumerConfiguration.h>
#include <pulsar/ConsumerCryptoFailureAction.h>
#include <pulsar/ProducerConfiguration.h>
#include <pulsar/KeySharedPolicy.h>
#include <pybind11/pybind11.h>
Expand Down Expand Up @@ -140,4 +141,9 @@ void export_enums(py::module_& m) {
.value("Info", Logger::LEVEL_INFO)
.value("Warn", Logger::LEVEL_WARN)
.value("Error", Logger::LEVEL_ERROR);

enum_<ConsumerCryptoFailureAction>(m, "ConsumerCryptoFailureAction")
.value("FAIL", ConsumerCryptoFailureAction::FAIL)
.value("DISCARD", ConsumerCryptoFailureAction::DISCARD)
.value("CONSUME", ConsumerCryptoFailureAction::CONSUME);
}
57 changes: 57 additions & 0 deletions tests/pulsar_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,63 @@ def test_encryption(self):

client.close()

def test_encryption_failure(self):
publicKeyPath = CERTS_DIR + "public-key.client-rsa.pem"
privateKeyPath = CERTS_DIR + "private-key.client-rsa.pem"
crypto_key_reader = CryptoKeyReader(publicKeyPath, privateKeyPath)
client = Client(self.serviceUrl)
topic = "my-python-test-end-to-end-encryption-failure-" + str(time.time())
producer = client.create_producer(
topic=topic, encryption_key="client-rsa.pem", crypto_key_reader=crypto_key_reader
)
producer.send(b"msg-0")

def verify_next_message(value: bytes):
consumer = client.subscribe(topic, subscription,
crypto_key_reader=crypto_key_reader)
msg = consumer.receive(3000)
self.assertEqual(msg.data(), value)
consumer.acknowledge(msg)
consumer.close()

subscription = "my-sub"
consumer = client.subscribe(topic, subscription,
initial_position=InitialPosition.Earliest,
crypto_failure_action=pulsar.ConsumerCryptoFailureAction.FAIL)
with self.assertRaises(pulsar.Timeout):
consumer.receive(3000)
consumer.close()
producer.send(b"msg-1")
verify_next_message(b"msg-0") # msg-0 won't be skipped

consumer = client.subscribe(topic, subscription,
initial_position=InitialPosition.Earliest,
crypto_failure_action=pulsar.ConsumerCryptoFailureAction.DISCARD)
with self.assertRaises(pulsar.Timeout):
consumer.receive(3000)
consumer.close()

producer.send(b"msg-2")
verify_next_message(b"msg-2") # msg-1 is skipped since the crypto failure action is DISCARD

# Encrypted messages will be consumed since the crypto failure action is CONSUME
consumer = client.subscribe(topic, 'another-sub',
initial_position=InitialPosition.Earliest,
crypto_failure_action=pulsar.ConsumerCryptoFailureAction.CONSUME)
for i in range(3):
msg = consumer.receive(3000)
self.assertNotEqual(msg.data(), f"msg-{i}".encode())
self.assertTrue(len(msg.data()) > 5, f"msg.data() is {msg.data()}")

reader = client.create_reader(topic, MessageId.earliest,
crypto_failure_action=pulsar.ConsumerCryptoFailureAction.CONSUME)
for i in range(3):
msg = reader.read_next(3000)
self.assertNotEqual(msg.data(), f"msg-{i}".encode())
self.assertTrue(len(msg.data()) > 5, f"msg.data() is {msg.data()}")

client.close()

def test_tls_auth3(self):
authPlugin = "tls"
authParams = "tlsCertFile:%s/client-cert.pem,tlsKeyFile:%s/client-key.pem" % (CERTS_DIR, CERTS_DIR)
Expand Down