diff --git a/pulsar/__init__.py b/pulsar/__init__.py index 5d7cedb..a1b18c1 100644 --- a/pulsar/__init__.py +++ b/pulsar/__init__.py @@ -692,7 +692,8 @@ def subscribe(self, topic, subscription_name, auto_ack_oldest_chunked_message_on_queue_full=False, start_message_id_inclusive=False, batch_receive_policy=None, - key_shared_policy=None + key_shared_policy=None, + batch_index_ack_enabled=False, ): """ Subscribe to the given topic and subscription combination. @@ -779,6 +780,9 @@ def my_listener(consumer, message): Set the batch collection policy for batch receiving. key_shared_policy: class ConsumerKeySharedPolicy Set the key shared policy for use when the ConsumerType is KeyShared. + batch_index_ack_enabled: Enable the batch index acknowledgement. + It should be noted that this option can only work when the broker side also enables the batch index + acknowledgement. See the `acknowledgmentAtBatchIndexLevelEnabled` config in `broker.conf`. """ _check_type(str, subscription_name, 'subscription_name') _check_type(ConsumerType, consumer_type, 'consumer_type') @@ -800,6 +804,7 @@ def my_listener(consumer, message): _check_type(bool, start_message_id_inclusive, 'start_message_id_inclusive') _check_type_or_none(ConsumerBatchReceivePolicy, batch_receive_policy, 'batch_receive_policy') _check_type_or_none(ConsumerKeySharedPolicy, key_shared_policy, 'key_shared_policy') + _check_type(bool, batch_index_ack_enabled, 'batch_index_ack_enabled') conf = _pulsar.ConsumerConfiguration() conf.consumer_type(consumer_type) @@ -834,6 +839,7 @@ def my_listener(consumer, message): if key_shared_policy: conf.key_shared_policy(key_shared_policy.policy()) + conf.batch_index_ack_enabled(batch_index_ack_enabled) c = Consumer() if isinstance(topic, str): diff --git a/src/config.cc b/src/config.cc index 7e2d38d..c71d5b0 100644 --- a/src/config.cc +++ b/src/config.cc @@ -280,6 +280,9 @@ void export_config(py::module_& m) { return_value_policy::reference) .def("start_message_id_inclusive", &ConsumerConfiguration::isStartMessageIdInclusive) .def("start_message_id_inclusive", &ConsumerConfiguration::setStartMessageIdInclusive, + return_value_policy::reference) + .def("batch_index_ack_enabled", &ConsumerConfiguration::isBatchIndexAckEnabled) + .def("batch_index_ack_enabled", &ConsumerConfiguration::setBatchIndexAckEnabled, return_value_policy::reference); class_>(m, "ReaderConfiguration") diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py index 3ec89a7..0f44ae1 100755 --- a/tests/pulsar_test.py +++ b/tests/pulsar_test.py @@ -1590,6 +1590,48 @@ def test_acknowledge_failed(self): consumer.acknowledge(msg_id) client.close() + def test_batch_index_ack(self): + topic_name = 'test-batch-index-ack-3' + client = pulsar.Client('pulsar://localhost:6650') + producer = client.create_producer(topic_name, + batching_enabled=True, + batching_max_messages=100, + batching_max_publish_delay_ms=10000) + consumer = client.subscribe(topic_name, + subscription_name='test-batch-index-ack', + batch_index_ack_enabled=True) + + # Make sure send 0~5 is a batch msg. + for i in range(5): + producer.send_async(b"hello-%d" % i, callback=None) + producer.flush() + + # Receive msgs and just ack 0, 1 msgs + results = [] + for i in range(5): + msg = consumer.receive() + print("receive from {}".format(msg.message_id())) + results.append(msg) + assert len(results) == 5 + for i in range(2): + consumer.acknowledge(results[i]) + time.sleep(0.2) + + # Restart consumer after, just receive 2~5 msg. + consumer.close() + consumer = client.subscribe(topic_name, + subscription_name='test-batch-index-ack', + batch_index_ack_enabled=True) + results2 = [] + for i in range(2, 5): + msg = consumer.receive() + results2.append(msg) + assert len(results2) == 3 + # assert no more msgs. + with self.assertRaises(pulsar.Timeout): + consumer.receive(timeout_millis=1000) + + client.close() if __name__ == "__main__": diff --git a/tests/test-conf/standalone-ssl.conf b/tests/test-conf/standalone-ssl.conf index 2ee4432..beed278 100644 --- a/tests/test-conf/standalone-ssl.conf +++ b/tests/test-conf/standalone-ssl.conf @@ -113,6 +113,9 @@ superUserRoles=localhost,superUser,admin brokerClientAuthenticationPlugin= brokerClientAuthenticationParameters= +# Enable batch index ACK +acknowledgmentAtBatchIndexLevelEnabled=true + ### --- BookKeeper Client --- ### # Authentication plugin to use when connecting to bookies diff --git a/tests/test-conf/standalone.conf b/tests/test-conf/standalone.conf index faa1277..0225e0d 100644 --- a/tests/test-conf/standalone.conf +++ b/tests/test-conf/standalone.conf @@ -100,6 +100,9 @@ superUserRoles= brokerClientAuthenticationPlugin= brokerClientAuthenticationParameters= +# Enable batch index ACK +acknowledgmentAtBatchIndexLevelEnabled=true + ### --- BookKeeper Client --- ###