diff --git a/src/consumer.cc b/src/consumer.cc index a77bb50..972bd0b 100644 --- a/src/consumer.cc +++ b/src/consumer.cc @@ -59,23 +59,33 @@ Messages Consumer_batch_receive(Consumer& consumer) { void Consumer_acknowledge(Consumer& consumer, const Message& msg) { consumer.acknowledgeAsync(msg, nullptr); } void Consumer_acknowledge_message_id(Consumer& consumer, const MessageId& msgId) { + Py_BEGIN_ALLOW_THREADS consumer.acknowledgeAsync(msgId, nullptr); + Py_END_ALLOW_THREADS } void Consumer_negative_acknowledge(Consumer& consumer, const Message& msg) { + Py_BEGIN_ALLOW_THREADS consumer.negativeAcknowledge(msg); + Py_END_ALLOW_THREADS } void Consumer_negative_acknowledge_message_id(Consumer& consumer, const MessageId& msgId) { + Py_BEGIN_ALLOW_THREADS consumer.negativeAcknowledge(msgId); + Py_END_ALLOW_THREADS } void Consumer_acknowledge_cumulative(Consumer& consumer, const Message& msg) { + Py_BEGIN_ALLOW_THREADS consumer.acknowledgeCumulativeAsync(msg, nullptr); + Py_END_ALLOW_THREADS } void Consumer_acknowledge_cumulative_message_id(Consumer& consumer, const MessageId& msgId) { + Py_BEGIN_ALLOW_THREADS consumer.acknowledgeCumulativeAsync(msgId, nullptr); + Py_END_ALLOW_THREADS } void Consumer_close(Consumer& consumer) { diff --git a/src/producer.cc b/src/producer.cc index bba262a..1dd5a76 100644 --- a/src/producer.cc +++ b/src/producer.cc @@ -34,6 +34,16 @@ MessageId Producer_send(Producer& producer, const Message& message) { return messageId; } +void Producer_sendAsync(Producer& producer, const Message& msg, SendCallback callback) { + Py_BEGIN_ALLOW_THREADS + producer.sendAsync(msg, callback); + Py_END_ALLOW_THREADS + + if (PyErr_CheckSignals() == -1) { + PyErr_SetInterrupt(); + } +} + void Producer_flush(Producer& producer) { waitForAsyncResult([&](ResultCallback callback) { producer.flushAsync(callback); }); } @@ -67,7 +77,7 @@ void export_producer(py::module_& m) { "This method is equivalent to asyncSend() and wait until the callback is triggered.\n" "\n" "@param msg message to publish\n") - .def("send_async", &Producer::sendAsync) + .def("send_async", &Producer_sendAsync) .def("flush", &Producer_flush, "Flush all the messages buffered in the client and wait until all messages have been\n" "successfully persisted\n") diff --git a/tests/pulsar_test.py b/tests/pulsar_test.py index feba877..00e2466 100755 --- a/tests/pulsar_test.py +++ b/tests/pulsar_test.py @@ -1424,5 +1424,19 @@ def test_invalid_basic_auth(self): with self.assertRaises(RuntimeError): AuthenticationBasic(auth_params_string='invalid auth params') + def test_send_async_no_deadlock(self): + client = Client(self.serviceUrl) + producer = client.create_producer('test_send_async_no_deadlock') + + def send_callback(res, msg): + print(f"Message '{msg}' published res={res}") + + for i in range(30): + producer.send_async(f"Hello-{i}".encode('utf-8'), callback=send_callback) + + producer.flush() + client.close() + + if __name__ == "__main__": main()