-
Notifications
You must be signed in to change notification settings - Fork 47
Description
The docs state:
def negative_acknowledge(self, message):
"""
Acknowledge the failure to process a single message.
When a message is "negatively acked" it will be marked for redelivery after
some fixed delay. The delay is configurable when constructing the consumer
with {@link ConsumerConfiguration#setNegativeAckRedeliveryDelayMs}.
This call is not blocking.
Parameters
----------
message:
The received message or message id.
"""
Specifically, that you can pass either the message or the message id into the function. However, I don't believe this is correct, and passing the message id does not work. To reproduce this issue, first run a local pulsar standalone instance, via
docker run -it -p 6650:6650 -p 8080:8080 --tmpfs /pulsar/data apachepulsar/pulsar:3.1.0 bin/pulsar standalone
And then run the following code:
import pulsar
client = pulsar.Client("pulsar://localhost:6650")
producer = client.create_producer("persistent://public/default/tmp_example")
consumer = client.subscribe("persistent://public/default/tmp_example", "sub", negative_ack_redelivery_delay_ms=10)
producer.send(b"hello")
while True:
msg = consumer.receive()
message_id = msg.message_id()
print(msg, message_id)
# consumer.negative_acknowledge(msg) # This works and causes redelivery
consumer.negative_acknowledge(msg.message_id()) # This does not cause redelivery
When you pass in the message_id, no redeliveries occur. When you comment out the last line and swap to passing in the message as a whole, redeliveries occur.
Why using the message id matters: Our services consume the pulsar messages and convert them into non-pulsar structures (like pydantic objects or other python classes). We attach the message id to those objects so we can ack/nack them as needed without storing the entire original pulsar message
I suspect the general acknowledge method has the same bug in it.