Skip to content

Commit cff12ea

Browse files
authored
Fixed deadlock in producer.send_async (#87)
Fix #84 Release the GIL while calling `producer.sendAsync()` to avoid a deadlock when PyBind is triggering the Python callback. * Main Thread 1. Holds the Python GIL 2. Call `producer.send_async()` 3. Tries to acquire internal `ClientConnetion` lock * Pulsar client internal thread 1. Holds lock on `ClientConnection` 2. Receives ack from the broker 3. Triggers callback 4. PyBind11 acquires GIL <---- Deadlock The problem is the different behavior in PyBind from Boost::Python. We always need to make sure we release the GIL before making any call to C++ that potentially acquires any mutexes
1 parent c88d57e commit cff12ea

File tree

3 files changed

+35
-1
lines changed

3 files changed

+35
-1
lines changed

src/consumer.cc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,23 +59,33 @@ Messages Consumer_batch_receive(Consumer& consumer) {
5959
void Consumer_acknowledge(Consumer& consumer, const Message& msg) { consumer.acknowledgeAsync(msg, nullptr); }
6060

6161
void Consumer_acknowledge_message_id(Consumer& consumer, const MessageId& msgId) {
62+
Py_BEGIN_ALLOW_THREADS
6263
consumer.acknowledgeAsync(msgId, nullptr);
64+
Py_END_ALLOW_THREADS
6365
}
6466

6567
void Consumer_negative_acknowledge(Consumer& consumer, const Message& msg) {
68+
Py_BEGIN_ALLOW_THREADS
6669
consumer.negativeAcknowledge(msg);
70+
Py_END_ALLOW_THREADS
6771
}
6872

6973
void Consumer_negative_acknowledge_message_id(Consumer& consumer, const MessageId& msgId) {
74+
Py_BEGIN_ALLOW_THREADS
7075
consumer.negativeAcknowledge(msgId);
76+
Py_END_ALLOW_THREADS
7177
}
7278

7379
void Consumer_acknowledge_cumulative(Consumer& consumer, const Message& msg) {
80+
Py_BEGIN_ALLOW_THREADS
7481
consumer.acknowledgeCumulativeAsync(msg, nullptr);
82+
Py_END_ALLOW_THREADS
7583
}
7684

7785
void Consumer_acknowledge_cumulative_message_id(Consumer& consumer, const MessageId& msgId) {
86+
Py_BEGIN_ALLOW_THREADS
7887
consumer.acknowledgeCumulativeAsync(msgId, nullptr);
88+
Py_END_ALLOW_THREADS
7989
}
8090

8191
void Consumer_close(Consumer& consumer) {

src/producer.cc

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,16 @@ MessageId Producer_send(Producer& producer, const Message& message) {
3434
return messageId;
3535
}
3636

37+
void Producer_sendAsync(Producer& producer, const Message& msg, SendCallback callback) {
38+
Py_BEGIN_ALLOW_THREADS
39+
producer.sendAsync(msg, callback);
40+
Py_END_ALLOW_THREADS
41+
42+
if (PyErr_CheckSignals() == -1) {
43+
PyErr_SetInterrupt();
44+
}
45+
}
46+
3747
void Producer_flush(Producer& producer) {
3848
waitForAsyncResult([&](ResultCallback callback) { producer.flushAsync(callback); });
3949
}
@@ -67,7 +77,7 @@ void export_producer(py::module_& m) {
6777
"This method is equivalent to asyncSend() and wait until the callback is triggered.\n"
6878
"\n"
6979
"@param msg message to publish\n")
70-
.def("send_async", &Producer::sendAsync)
80+
.def("send_async", &Producer_sendAsync)
7181
.def("flush", &Producer_flush,
7282
"Flush all the messages buffered in the client and wait until all messages have been\n"
7383
"successfully persisted\n")

tests/pulsar_test.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1424,5 +1424,19 @@ def test_invalid_basic_auth(self):
14241424
with self.assertRaises(RuntimeError):
14251425
AuthenticationBasic(auth_params_string='invalid auth params')
14261426

1427+
def test_send_async_no_deadlock(self):
1428+
client = Client(self.serviceUrl)
1429+
producer = client.create_producer('test_send_async_no_deadlock')
1430+
1431+
def send_callback(res, msg):
1432+
print(f"Message '{msg}' published res={res}")
1433+
1434+
for i in range(30):
1435+
producer.send_async(f"Hello-{i}".encode('utf-8'), callback=send_callback)
1436+
1437+
producer.flush()
1438+
client.close()
1439+
1440+
14271441
if __name__ == "__main__":
14281442
main()

0 commit comments

Comments
 (0)