diff --git a/pulsar/__init__.py b/pulsar/__init__.py index 39c3cee..6a6ee6c 100644 --- a/pulsar/__init__.py +++ b/pulsar/__init__.py @@ -48,7 +48,7 @@ import _pulsar from _pulsar import Result, CompressionType, ConsumerType, InitialPosition, PartitionsRoutingMode, BatchingType, \ - LoggerLevel, BatchReceivePolicy, KeySharedPolicy, KeySharedMode, ProducerAccessMode # noqa: F401 + LoggerLevel, BatchReceivePolicy, KeySharedPolicy, KeySharedMode, ProducerAccessMode, RegexSubscriptionMode # noqa: F401 from pulsar.__about__ import __version__ @@ -707,6 +707,7 @@ def subscribe(self, topic, subscription_name, batch_receive_policy=None, key_shared_policy=None, batch_index_ack_enabled=False, + regex_subscription_mode=RegexSubscriptionMode.PersistentOnly, ): """ Subscribe to the given topic and subscription combination. @@ -796,6 +797,14 @@ def my_listener(consumer, message): batch_index_ack_enabled: Enable the batch index acknowledgement. It should be noted that this option can only work when the broker side also enables the batch index acknowledgement. See the `acknowledgmentAtBatchIndexLevelEnabled` config in `broker.conf`. + regex_subscription_mode: RegexSubscriptionMode, optional + Set the regex subscription mode for use when the topic is a regex pattern. + + Supported modes: + + * PersistentOnly: By default only subscribe to persistent topics. + * NonPersistentOnly: Only subscribe to non-persistent topics. + * AllTopics: Subscribe to both persistent and non-persistent topics. """ _check_type(str, subscription_name, 'subscription_name') _check_type(ConsumerType, consumer_type, 'consumer_type') @@ -818,9 +827,11 @@ def my_listener(consumer, message): _check_type_or_none(ConsumerBatchReceivePolicy, batch_receive_policy, 'batch_receive_policy') _check_type_or_none(ConsumerKeySharedPolicy, key_shared_policy, 'key_shared_policy') _check_type(bool, batch_index_ack_enabled, 'batch_index_ack_enabled') + _check_type(RegexSubscriptionMode, regex_subscription_mode, 'regex_subscription_mode') conf = _pulsar.ConsumerConfiguration() conf.consumer_type(consumer_type) + conf.regex_subscription_mode(regex_subscription_mode) conf.read_compacted(is_read_compacted) if message_listener: conf.message_listener(_listener_wrapper(message_listener, schema)) diff --git a/src/config.cc b/src/config.cc index 4b661a9..23a5b80 100644 --- a/src/config.cc +++ b/src/config.cc @@ -265,6 +265,8 @@ void export_config(py::module_& m) { .def("property", &ConsumerConfiguration::setProperty, return_value_policy::reference) .def("subscription_initial_position", &ConsumerConfiguration::getSubscriptionInitialPosition) .def("subscription_initial_position", &ConsumerConfiguration::setSubscriptionInitialPosition) + .def("regex_subscription_mode", &ConsumerConfiguration::setRegexSubscriptionMode) + .def("regex_subscription_mode", &ConsumerConfiguration::getRegexSubscriptionMode, return_value_policy::reference) .def("crypto_key_reader", &ConsumerConfiguration::setCryptoKeyReader, return_value_policy::reference) .def("replicate_subscription_state_enabled", &ConsumerConfiguration::setReplicateSubscriptionStateEnabled) diff --git a/src/enums.cc b/src/enums.cc index 33affd0..198edfa 100644 --- a/src/enums.cc +++ b/src/enums.cc @@ -120,6 +120,11 @@ void export_enums(py::module_& m) { .value("Latest", InitialPositionLatest) .value("Earliest", InitialPositionEarliest); + enum_(m, "RegexSubscriptionMode", "Regex subscription mode") + .value("PersistentOnly", PersistentOnly) + .value("NonPersistentOnly", NonPersistentOnly) + .value("AllTopics", AllTopics); + enum_(m, "BatchingType", "Supported batching types") .value("Default", ProducerConfiguration::DefaultBatching) .value("KeyBased", ProducerConfiguration::KeyBasedBatching); diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py index 903f143..80c98f5 100755 --- a/tests/pulsar_test.py +++ b/tests/pulsar_test.py @@ -24,6 +24,7 @@ from unittest import TestCase, main import time import os +import re import pulsar import uuid from datetime import timedelta @@ -46,7 +47,7 @@ ) from pulsar.schema import JsonSchema, Record, Integer -from _pulsar import ProducerConfiguration, ConsumerConfiguration +from _pulsar import ProducerConfiguration, ConsumerConfiguration, RegexSubscriptionMode from schema_test import * @@ -1100,7 +1101,6 @@ def test_topics_consumer(self): client.close() def test_topics_pattern_consumer(self): - import re client = Client(self.serviceUrl) @@ -1717,6 +1717,77 @@ def test_batch_index_ack(self): client.close() + def test_regex_subscription(self): + client = Client(self.serviceUrl) + topic1 = "persistent://public/default/test-regex-sub-1" + topic2 = "persistent://public/default/test-regex-sub-2" + topic3 = "non-persistent://public/default/test-regex-sub-3" + topic4 = "persistent://public/default/no-match-test-regex-sub-3" # no match pattern rule topic. + + producer1 = client.create_producer(topic1) + producer2 = client.create_producer(topic2) + producer3 = client.create_producer(topic3) + producer4 = client.create_producer(topic4) + + consumer_all = client.subscribe( + re.compile('public/default/test-regex-sub-.*'), "regex-sub-all", + consumer_type=ConsumerType.Shared, regex_subscription_mode=RegexSubscriptionMode.AllTopics + ) + + consumer_persistent = client.subscribe( + re.compile('public/default/test-regex-sub-.*'), "regex-sub-persistent", + consumer_type=ConsumerType.Shared, regex_subscription_mode=RegexSubscriptionMode.PersistentOnly + ) + + consumer_non_persistent = client.subscribe( + re.compile('public/default/test-regex-sub-.*'), "regex-sub-non-persistent", + consumer_type=ConsumerType.Shared, regex_subscription_mode=RegexSubscriptionMode.NonPersistentOnly + ) + + num = 10 + for i in range(num): + producer1.send(b"hello-1-%d" % i) + producer2.send(b"hello-2-%d" % i) + producer3.send(b"hello-3-%d" % i) + producer4.send(b"hello-4-%d" % i) + + # Assert consumer_all. + received_topics = set() + for i in range(3 * num): + msg = consumer_all.receive(TM) + topic_name = msg.topic_name() + self.assertIn(topic_name, [topic1, topic2, topic3]) + received_topics.add(topic_name) + consumer_all.acknowledge(msg) + self.assertEqual(received_topics, {topic1, topic2, topic3}) + with self.assertRaises(pulsar.Timeout): + consumer_all.receive(100) + + # Assert consumer_persistent. + received_topics.clear() + for i in range(2 * num): + msg = consumer_persistent.receive(TM) + topic_name = msg.topic_name() + self.assertIn(topic_name, [topic1, topic2]) + received_topics.add(topic_name) + consumer_persistent.acknowledge(msg) + self.assertEqual(received_topics, {topic1, topic2}) + with self.assertRaises(pulsar.Timeout): + consumer_persistent.receive(100) + + # Assert consumer_non_persistent. + received_topics.clear() + for i in range(num): + msg = consumer_non_persistent.receive(TM) + topic_name = msg.topic_name() + self.assertIn(topic_name, [topic3]) + received_topics.add(topic_name) + consumer_non_persistent.acknowledge(msg) + self.assertEqual(received_topics, {topic3}) + with self.assertRaises(pulsar.Timeout): + consumer_non_persistent.receive(100) + + client.close() if __name__ == "__main__": main()