diff --git a/pulsar/__init__.py b/pulsar/__init__.py index 45b7a96..8112206 100644 --- a/pulsar/__init__.py +++ b/pulsar/__init__.py @@ -49,7 +49,7 @@ from _pulsar import Result, CompressionType, ConsumerType, InitialPosition, PartitionsRoutingMode, BatchingType, \ LoggerLevel, BatchReceivePolicy, KeySharedPolicy, KeySharedMode, ProducerAccessMode, RegexSubscriptionMode, \ - DeadLetterPolicyBuilder # noqa: F401 + DeadLetterPolicyBuilder, ConsumerCryptoFailureAction # noqa: F401 from pulsar.__about__ import __version__ @@ -846,6 +846,7 @@ def subscribe(self, topic, subscription_name, batch_index_ack_enabled=False, regex_subscription_mode: RegexSubscriptionMode = RegexSubscriptionMode.PersistentOnly, dead_letter_policy: Union[None, ConsumerDeadLetterPolicy] = None, + crypto_failure_action: ConsumerCryptoFailureAction = ConsumerCryptoFailureAction.FAIL, ): """ Subscribe to the given topic and subscription combination. @@ -949,6 +950,19 @@ def my_listener(consumer, message): stopped. By using the dead letter mechanism, messages have the max redelivery count, when they're exceeding the maximum number of redeliveries. Messages are sent to dead letter topics and acknowledged automatically. + crypto_failure_action: ConsumerCryptoFailureAction, default=ConsumerCryptoFailureAction.FAIL + Set the behavior when the decryption fails. The default is to fail the message. + + Supported actions: + + * ConsumerCryptoFailureAction.FAIL: Fail consume until crypto succeeds + * ConsumerCryptoFailureAction.DISCARD: + Message is silently acknowledged and not delivered to the application. + * ConsumerCryptoFailureAction.CONSUME: + Deliver the encrypted message to the application. It's the application's responsibility + to decrypt the message. If message is also compressed, decompression will fail. If the + message contains batch messages, client will not be able to retrieve individual messages + in the batch. """ _check_type(str, subscription_name, 'subscription_name') _check_type(ConsumerType, consumer_type, 'consumer_type') @@ -972,6 +986,7 @@ def my_listener(consumer, message): _check_type_or_none(ConsumerKeySharedPolicy, key_shared_policy, 'key_shared_policy') _check_type(bool, batch_index_ack_enabled, 'batch_index_ack_enabled') _check_type(RegexSubscriptionMode, regex_subscription_mode, 'regex_subscription_mode') + _check_type(ConsumerCryptoFailureAction, crypto_failure_action, 'crypto_failure_action') conf = _pulsar.ConsumerConfiguration() conf.consumer_type(consumer_type) @@ -1010,6 +1025,7 @@ def my_listener(consumer, message): conf.batch_index_ack_enabled(batch_index_ack_enabled) if dead_letter_policy: conf.dead_letter_policy(dead_letter_policy.policy()) + conf.crypto_failure_action(crypto_failure_action) c = Consumer() if isinstance(topic, str): @@ -1038,7 +1054,8 @@ def create_reader(self, topic, start_message_id, subscription_role_prefix=None, is_read_compacted=False, crypto_key_reader: Union[None, CryptoKeyReader] = None, - start_message_id_inclusive=False + start_message_id_inclusive=False, + crypto_failure_action: ConsumerCryptoFailureAction = ConsumerCryptoFailureAction.FAIL, ): """ Create a reader on a particular topic @@ -1099,6 +1116,19 @@ def my_listener(reader, message): and private key decryption messages for the consumer start_message_id_inclusive: bool, default=False Set the reader to include the startMessageId or given position of any reset operation like Reader.seek + crypto_failure_action: ConsumerCryptoFailureAction, default=ConsumerCryptoFailureAction.FAIL + Set the behavior when the decryption fails. The default is to fail the message. + + Supported actions: + + * ConsumerCryptoFailureAction.FAIL: Fail consume until crypto succeeds + * ConsumerCryptoFailureAction.DISCARD: + Message is silently acknowledged and not delivered to the application. + * ConsumerCryptoFailureAction.CONSUME: + Deliver the encrypted message to the application. It's the application's responsibility + to decrypt the message. If message is also compressed, decompression will fail. If the + message contains batch messages, client will not be able to retrieve individual messages + in the batch. """ # If a pulsar.MessageId object is passed, access the _pulsar.MessageId object @@ -1114,6 +1144,7 @@ def my_listener(reader, message): _check_type(bool, is_read_compacted, 'is_read_compacted') _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader') _check_type(bool, start_message_id_inclusive, 'start_message_id_inclusive') + _check_type(ConsumerCryptoFailureAction, crypto_failure_action, 'crypto_failure_action') conf = _pulsar.ReaderConfiguration() if reader_listener: @@ -1128,6 +1159,7 @@ def my_listener(reader, message): if crypto_key_reader: conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader) conf.start_message_id_inclusive(start_message_id_inclusive) + conf.crypto_failure_action(crypto_failure_action) c = Reader() c._reader = self._client.create_reader(topic, start_message_id, conf) diff --git a/src/config.cc b/src/config.cc index 7221b07..06822b4 100644 --- a/src/config.cc +++ b/src/config.cc @@ -313,7 +313,11 @@ void export_config(py::module_& m) { .def("batch_index_ack_enabled", &ConsumerConfiguration::setBatchIndexAckEnabled, return_value_policy::reference) .def("dead_letter_policy", &ConsumerConfiguration::setDeadLetterPolicy) - .def("dead_letter_policy", &ConsumerConfiguration::getDeadLetterPolicy, return_value_policy::copy); + .def("dead_letter_policy", &ConsumerConfiguration::getDeadLetterPolicy, return_value_policy::copy) + .def("crypto_failure_action", &ConsumerConfiguration::getCryptoFailureAction, + return_value_policy::copy) + .def("crypto_failure_action", &ConsumerConfiguration::setCryptoFailureAction, + return_value_policy::reference); class_>(m, "ReaderConfiguration") .def(init<>()) @@ -331,5 +335,9 @@ void export_config(py::module_& m) { .def("read_compacted", &ReaderConfiguration::setReadCompacted) .def("crypto_key_reader", &ReaderConfiguration::setCryptoKeyReader, return_value_policy::reference) .def("start_message_id_inclusive", &ReaderConfiguration::isStartMessageIdInclusive) - .def("start_message_id_inclusive", &ReaderConfiguration::setStartMessageIdInclusive, return_value_policy::reference); + .def("start_message_id_inclusive", &ReaderConfiguration::setStartMessageIdInclusive, return_value_policy::reference) + .def("crypto_failure_action", &ReaderConfiguration::getCryptoFailureAction, + return_value_policy::copy) + .def("crypto_failure_action", &ReaderConfiguration::setCryptoFailureAction, + return_value_policy::reference); } diff --git a/src/enums.cc b/src/enums.cc index 198edfa..447d013 100644 --- a/src/enums.cc +++ b/src/enums.cc @@ -19,6 +19,7 @@ #include "utils.h" #include #include +#include #include #include #include @@ -140,4 +141,9 @@ void export_enums(py::module_& m) { .value("Info", Logger::LEVEL_INFO) .value("Warn", Logger::LEVEL_WARN) .value("Error", Logger::LEVEL_ERROR); + + enum_(m, "ConsumerCryptoFailureAction") + .value("FAIL", ConsumerCryptoFailureAction::FAIL) + .value("DISCARD", ConsumerCryptoFailureAction::DISCARD) + .value("CONSUME", ConsumerCryptoFailureAction::CONSUME); } diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py index 5f9c259..86a5ef5 100755 --- a/tests/pulsar_test.py +++ b/tests/pulsar_test.py @@ -482,6 +482,63 @@ def test_encryption(self): client.close() + def test_encryption_failure(self): + publicKeyPath = CERTS_DIR + "public-key.client-rsa.pem" + privateKeyPath = CERTS_DIR + "private-key.client-rsa.pem" + crypto_key_reader = CryptoKeyReader(publicKeyPath, privateKeyPath) + client = Client(self.serviceUrl) + topic = "my-python-test-end-to-end-encryption-failure-" + str(time.time()) + producer = client.create_producer( + topic=topic, encryption_key="client-rsa.pem", crypto_key_reader=crypto_key_reader + ) + producer.send(b"msg-0") + + def verify_next_message(value: bytes): + consumer = client.subscribe(topic, subscription, + crypto_key_reader=crypto_key_reader) + msg = consumer.receive(3000) + self.assertEqual(msg.data(), value) + consumer.acknowledge(msg) + consumer.close() + + subscription = "my-sub" + consumer = client.subscribe(topic, subscription, + initial_position=InitialPosition.Earliest, + crypto_failure_action=pulsar.ConsumerCryptoFailureAction.FAIL) + with self.assertRaises(pulsar.Timeout): + consumer.receive(3000) + consumer.close() + producer.send(b"msg-1") + verify_next_message(b"msg-0") # msg-0 won't be skipped + + consumer = client.subscribe(topic, subscription, + initial_position=InitialPosition.Earliest, + crypto_failure_action=pulsar.ConsumerCryptoFailureAction.DISCARD) + with self.assertRaises(pulsar.Timeout): + consumer.receive(3000) + consumer.close() + + producer.send(b"msg-2") + verify_next_message(b"msg-2") # msg-1 is skipped since the crypto failure action is DISCARD + + # Encrypted messages will be consumed since the crypto failure action is CONSUME + consumer = client.subscribe(topic, 'another-sub', + initial_position=InitialPosition.Earliest, + crypto_failure_action=pulsar.ConsumerCryptoFailureAction.CONSUME) + for i in range(3): + msg = consumer.receive(3000) + self.assertNotEqual(msg.data(), f"msg-{i}".encode()) + self.assertTrue(len(msg.data()) > 5, f"msg.data() is {msg.data()}") + + reader = client.create_reader(topic, MessageId.earliest, + crypto_failure_action=pulsar.ConsumerCryptoFailureAction.CONSUME) + for i in range(3): + msg = reader.read_next(3000) + self.assertNotEqual(msg.data(), f"msg-{i}".encode()) + self.assertTrue(len(msg.data()) > 5, f"msg.data() is {msg.data()}") + + client.close() + def test_tls_auth3(self): authPlugin = "tls" authParams = "tlsCertFile:%s/client-cert.pem,tlsKeyFile:%s/client-key.pem" % (CERTS_DIR, CERTS_DIR)