From 5e6adc2ccb08e86e0f8ea78c1d053b1e0acb2d7c Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 6 Jun 2025 15:44:11 +0800 Subject: [PATCH 1/2] Support deserializing a message id from bytes and topic --- pulsar/__init__.py | 7 +++++-- src/message.cc | 4 ++++ tests/pulsar_test.py | 24 ++++++++++++++++++++++++ 3 files changed, 33 insertions(+), 2 deletions(-) diff --git a/pulsar/__init__.py b/pulsar/__init__.py index de8787f..370f4ac 100644 --- a/pulsar/__init__.py +++ b/pulsar/__init__.py @@ -131,12 +131,15 @@ def __gt__(self, other) -> bool: return self._msg_id > other._msg_id @staticmethod - def deserialize(message_id_bytes): + def deserialize(message_id_bytes, topic: Optional[str] = None) -> _pulsar.MessageId: """ Deserialize a message id object from a previously serialized bytes sequence. """ - return _pulsar.MessageId.deserialize(message_id_bytes) + msg_id = _pulsar.MessageId.deserialize(message_id_bytes) + if topic is not None: + msg_id.topic_name(topic) + return msg_id @classmethod def wrap(cls, msg_id: _pulsar.MessageId): diff --git a/src/message.cc b/src/message.cc index dec6f05..dd263b6 100644 --- a/src/message.cc +++ b/src/message.cc @@ -72,6 +72,10 @@ void export_message(py::module_& m) { .def("entry_id", &MessageId::entryId) .def("batch_index", &MessageId::batchIndex) .def("partition", &MessageId::partition) + .def( + "topic_name", + [](MessageId& msgId, const std::string& topicName) { msgId.setTopicName(topicName); }, + return_value_policy::copy) .def_property_readonly_static("earliest", [](object) { return MessageId::earliest(); }) .def_property_readonly_static("latest", [](object) { return MessageId::latest(); }) .def("serialize", diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py index 6c0c3f2..4e1c5fb 100755 --- a/tests/pulsar_test.py +++ b/tests/pulsar_test.py @@ -1994,6 +1994,30 @@ def test_consumer_name(self): self.assertEqual(consumer.consumer_name(), name) client.close() + def test_deserialize_msg_id_with_topic(self): + client = Client(self.serviceUrl) + topic1 = "deserialize-msg-id-with-topic1-" + str(time.time()) + topic2 = "deserialize-msg-id-with-topic2-" + str(time.time()) + consumer = client.subscribe([topic1, topic2], 'sub') + producer1 = client.create_producer(topic1) + producer2 = client.create_producer(topic2) + producer1.send(b"msg-1") + producer2.send(b"msg-2") + + serialized_msg_ids = dict() + for _ in range(2): + msg = consumer.receive(TM) + serialized_msg_ids[msg.topic_name()] = msg.message_id().serialize() + for topic, serialized_msg_id in serialized_msg_ids.items(): + deserialized_msg_id = MessageId.deserialize(serialized_msg_id, topic=topic) + consumer.acknowledge_cumulative(deserialized_msg_id) + consumer.close() + + consumer = client.subscribe([topic1, topic2], 'sub') + producer1.send(b'msg-3') + msg = consumer.receive(TM) + self.assertEqual(msg.value(), b'msg-3') + client.close() if __name__ == "__main__": main() From 01a3a47384773360a10310d132382c6b62219e69 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Fri, 6 Jun 2025 16:02:52 +0800 Subject: [PATCH 2/2] Add API documents --- pulsar/__init__.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/pulsar/__init__.py b/pulsar/__init__.py index 370f4ac..8802493 100644 --- a/pulsar/__init__.py +++ b/pulsar/__init__.py @@ -135,6 +135,21 @@ def deserialize(message_id_bytes, topic: Optional[str] = None) -> _pulsar.Messag """ Deserialize a message id object from a previously serialized bytes sequence. + + Parameters + ---------- + topic: str, optional + For multi-topics consumers, the topic name is required to deserialize the message id. + + .. code-block:: python + + msg = consumer.receive() + topic = msg.topic_name() + msg_id_bytes = msg.message_id().serialize() + # Store topic and msg_id_bytes somewhere + # Later, deserialize the message id + msg_id = MessageId.deserialize(msg_id_bytes, topic=topic) + """ msg_id = _pulsar.MessageId.deserialize(message_id_bytes) if topic is not None: