-
Notifications
You must be signed in to change notification settings - Fork 47
Description
Describe the bug
The example code in the python client's send_async
method does not work for processes that end.
The example needs a producer.flush()
call, but adding it introduces a deadlock.
To Reproduce
Run this modified version of the example code against a pulsar cluster. Note changes to reduce output text, change from while True
to for i in range(500000)
, and introduce the producer.flush()
call. The flush
call is necessary to correctly produce all messages. If omitted, some messages will be skipped.
import itertools
import pulsar
client = pulsar.Client('pulsar://localhost:6650')
producer = client.create_producer(
# Topic must have at least 2 partitions.
# bin/pulsar-admin topics create-partitioned-topic persistent://public/default/python-client-122 --partitions 2
topic='persistent://public/default/python-client-122',
block_if_queue_full=True,
batching_enabled=True,
batching_max_publish_delay_ms=10
)
counter = itertools.count(start=1)
def callback(res, msg_id):
value = next(counter)
if value % 100 == 0:
print(f'Published {value} records')
for i in range(50000):
producer.send_async(('Hello-%d' % i).encode('utf-8'), callback)
producer.flush()
client.close()
Desktop (please complete the following information):
pulsar-client==3.1.0
Python 3.9.13
Ubuntu 22.04.2 LTS
Pulsar 2.10.2 running in a Kubernetes cluster; client connected via pulsar-proxy running as a LoadBalancer Service
Additional context / Fix
Adding a time.sleep(1)
before the producer.flush()
call allows the process to exit cleanly every time that I've tested. This seems to suggest that the deadlock occurs somewhere between trying to clear the active batch and trying to flush the producer.
Possibly related: apache/pulsar#5666