Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,23 +59,33 @@ Messages Consumer_batch_receive(Consumer& consumer) {
void Consumer_acknowledge(Consumer& consumer, const Message& msg) { consumer.acknowledgeAsync(msg, nullptr); }

void Consumer_acknowledge_message_id(Consumer& consumer, const MessageId& msgId) {
Py_BEGIN_ALLOW_THREADS
consumer.acknowledgeAsync(msgId, nullptr);
Py_END_ALLOW_THREADS
}

void Consumer_negative_acknowledge(Consumer& consumer, const Message& msg) {
Py_BEGIN_ALLOW_THREADS
consumer.negativeAcknowledge(msg);
Py_END_ALLOW_THREADS
}

void Consumer_negative_acknowledge_message_id(Consumer& consumer, const MessageId& msgId) {
Py_BEGIN_ALLOW_THREADS
consumer.negativeAcknowledge(msgId);
Py_END_ALLOW_THREADS
}

void Consumer_acknowledge_cumulative(Consumer& consumer, const Message& msg) {
Py_BEGIN_ALLOW_THREADS
consumer.acknowledgeCumulativeAsync(msg, nullptr);
Py_END_ALLOW_THREADS
}

void Consumer_acknowledge_cumulative_message_id(Consumer& consumer, const MessageId& msgId) {
Py_BEGIN_ALLOW_THREADS
consumer.acknowledgeCumulativeAsync(msgId, nullptr);
Py_END_ALLOW_THREADS
}

void Consumer_close(Consumer& consumer) {
Expand Down
12 changes: 11 additions & 1 deletion src/producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,16 @@ MessageId Producer_send(Producer& producer, const Message& message) {
return messageId;
}

void Producer_sendAsync(Producer& producer, const Message& msg, SendCallback callback) {
Py_BEGIN_ALLOW_THREADS
producer.sendAsync(msg, callback);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is something inside this line handling getting the GIL around callback? Since callback is python code, running it outside of the GIL can break all sorts of stuff.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zbentley PyBind is already acquiring the GIL before it enters the Python callback code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All set per conversation on #84

Py_END_ALLOW_THREADS

if (PyErr_CheckSignals() == -1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if an exception was already set in the interpreter global before the callback fired?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is in the main thread, irrespective of the callback.

send_async() is a potentially blocking operation (when block_if_queue_full=True)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Something I've seen before is issues around an error already being "pre-raise" in a Python thread when some external code (the pulsar client C++ in this case) decides to invoke an unrelated Python callback. Does pybind handle situations like this one?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All set per conversation on #84

PyErr_SetInterrupt();
}
}

void Producer_flush(Producer& producer) {
waitForAsyncResult([&](ResultCallback callback) { producer.flushAsync(callback); });
}
Expand Down Expand Up @@ -67,7 +77,7 @@ void export_producer(py::module_& m) {
"This method is equivalent to asyncSend() and wait until the callback is triggered.\n"
"\n"
"@param msg message to publish\n")
.def("send_async", &Producer::sendAsync)
.def("send_async", &Producer_sendAsync)
.def("flush", &Producer_flush,
"Flush all the messages buffered in the client and wait until all messages have been\n"
"successfully persisted\n")
Expand Down
14 changes: 14 additions & 0 deletions tests/pulsar_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1424,5 +1424,19 @@ def test_invalid_basic_auth(self):
with self.assertRaises(RuntimeError):
AuthenticationBasic(auth_params_string='invalid auth params')

def test_send_async_no_deadlock(self):
client = Client(self.serviceUrl)
producer = client.create_producer('test_send_async_no_deadlock')

def send_callback(res, msg):
print(f"Message '{msg}' published res={res}")

for i in range(30):
producer.send_async(f"Hello-{i}".encode('utf-8'), callback=send_callback)

producer.flush()
client.close()


if __name__ == "__main__":
main()