From 5071c90407e8deb4aa83c99affda0fc884e5c5ce Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Fri, 7 Jul 2023 11:37:24 +0800 Subject: [PATCH] Add partitioned topic unit test for Reader. --- tests/pulsar_test.py | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py index 3ec89a7..8db4893 100755 --- a/tests/pulsar_test.py +++ b/tests/pulsar_test.py @@ -572,6 +572,33 @@ def test_reader_on_specific_message_with_batches(self): reader2.close() client.close() + def test_reader_on_partitioned_topic(self): + num_of_msgs = 100 + topic_name = "public/default/my-python-topic-test_reader_on_partitioned_topic" + url1 = self.adminUrl + "/admin/v2/persistent/" + topic_name + "/partitions" + doHttpPut(url1, "4") + + client = Client(self.serviceUrl) + producer = client.create_producer(topic_name) + + send_array = [] + for i in range(num_of_msgs): + data = b"hello-%d" % i + producer.send(data) + send_array.append(data) + + reader = client.create_reader(topic_name, MessageId.earliest) + + read_array = [] + for i in range(num_of_msgs): + msg = reader.read_next(TM) + self.assertTrue(msg) + read_array.append(msg.data()) + + self.assertListEqual(sorted(send_array), sorted(read_array)) + reader.close() + client.close() + def test_reader_is_connected(self): client = Client(self.serviceUrl) topic = "test_reader_is_connected"