Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion pulsar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -966,7 +966,8 @@ def create_reader(self, topic, start_message_id,
reader_name=None,
subscription_role_prefix=None,
is_read_compacted=False,
crypto_key_reader=None
crypto_key_reader=None,
start_message_id_inclusive=False
):
"""
Create a reader on a particular topic
Expand Down Expand Up @@ -1025,6 +1026,8 @@ def my_listener(reader, message):
crypto_key_reader: CryptoKeyReader, optional
Symmetric encryption class implementation, configuring public key encryption messages for the producer
and private key decryption messages for the consumer
start_message_id_inclusive: bool, default=False
Set the reader to include the startMessageId or given position of any reset operation like Reader.seek
"""

# If a pulsar.MessageId object is passed, access the _pulsar.MessageId object
Expand All @@ -1039,6 +1042,7 @@ def my_listener(reader, message):
_check_type_or_none(str, subscription_role_prefix, 'subscription_role_prefix')
_check_type(bool, is_read_compacted, 'is_read_compacted')
_check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')
_check_type(bool, start_message_id_inclusive, 'start_message_id_inclusive')

conf = _pulsar.ReaderConfiguration()
if reader_listener:
Expand All @@ -1052,6 +1056,7 @@ def my_listener(reader, message):
conf.read_compacted(is_read_compacted)
if crypto_key_reader:
conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader)
conf.start_message_id_inclusive(start_message_id_inclusive)

c = Reader()
c._reader = self._client.create_reader(topic, start_message_id, conf)
Expand Down
4 changes: 3 additions & 1 deletion src/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -318,5 +318,7 @@ void export_config(py::module_& m) {
.def("subscription_role_prefix", &ReaderConfiguration::setSubscriptionRolePrefix)
.def("read_compacted", &ReaderConfiguration::isReadCompacted)
.def("read_compacted", &ReaderConfiguration::setReadCompacted)
.def("crypto_key_reader", &ReaderConfiguration::setCryptoKeyReader, return_value_policy::reference);
.def("crypto_key_reader", &ReaderConfiguration::setCryptoKeyReader, return_value_policy::reference)
.def("start_message_id_inclusive", &ReaderConfiguration::isStartMessageIdInclusive)
.def("start_message_id_inclusive", &ReaderConfiguration::setStartMessageIdInclusive, return_value_policy::reference);
}
36 changes: 36 additions & 0 deletions tests/pulsar_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#


import random
import threading
import logging
from unittest import TestCase, main
Expand Down Expand Up @@ -685,6 +686,41 @@ def test_reader_is_connected(self):
self.assertFalse(reader.is_connected())
client.close()

def test_reader_seek_for_message_id(self):
client = pulsar.Client(self.serviceUrl)

topic = "test-seek-for-message-id-" + str(int(time.time()))

producer = client.create_producer(topic)

readerExclusive = client.create_reader(topic, MessageId.latest)
readerInclusive = client.create_reader(topic, MessageId.latest, start_message_id_inclusive=True)

numMessages = 100
seekMessageId = None

r = random.randint(0, numMessages - 2)
for i in range(numMessages):
msg_content = b"msg-%d" % i
id = producer.send(msg_content)

if i == r:
seekMessageId = id

readerExclusive.seek(seekMessageId)
msg0 = readerExclusive.read_next(timeout_millis=3000)

readerInclusive.seek(seekMessageId)
msg1 = readerInclusive.read_next(timeout_millis=3000)

self.assertEqual(msg0.data(), b"msg-%d" % (r + 1))
self.assertEqual(msg1.data(), b"msg-%d" % r)

readerExclusive.close()
readerInclusive.close()
producer.close()
client.close()

def test_producer_sequence_after_reconnection(self):
# Enable deduplication on namespace
doHttpPost(self.adminUrl + "/admin/v2/namespaces/public/default/deduplication", "true")
Expand Down