Skip to content

Commit 8a9b2dc

Browse files
Fix deadlock for negative acknowledgment (#266)
Fixes #265 ### Modifications Make `timer_` const and `enabledForTesting_` atomic in `NegativeAcksTracker` so that the `mutex_` can be used only for the `nackedMessages_` field. After that, we can unlock `mutex_` in `handleTimer` to avoid the potential deadlock from user-provided logger or intercepter. Add `ConsumerTest.testNegativeAckDeadlock` to verify the fix.
1 parent 3e49fe5 commit 8a9b2dc

File tree

3 files changed

+86
-23
lines changed

3 files changed

+86
-23
lines changed

lib/NegativeAcksTracker.cc

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,7 @@ NegativeAcksTracker::NegativeAcksTracker(ClientImplPtr client, ConsumerImpl &con
3535
const ConsumerConfiguration &conf)
3636
: consumer_(consumer),
3737
timerInterval_(0),
38-
executor_(client->getIOExecutorProvider()->get()),
39-
enabledForTesting_(true) {
38+
timer_(client->getIOExecutorProvider()->get()->createDeadlineTimer()) {
4039
static const long MIN_NACK_DELAY_MILLIS = 100;
4140

4241
nackDelay_ =
@@ -47,7 +46,9 @@ NegativeAcksTracker::NegativeAcksTracker(ClientImplPtr client, ConsumerImpl &con
4746
}
4847

4948
void NegativeAcksTracker::scheduleTimer() {
50-
timer_ = executor_->createDeadlineTimer();
49+
if (closed_) {
50+
return;
51+
}
5152
timer_->expires_from_now(timerInterval_);
5253
timer_->async_wait(std::bind(&NegativeAcksTracker::handleTimer, this, std::placeholders::_1));
5354
}
@@ -58,8 +59,7 @@ void NegativeAcksTracker::handleTimer(const boost::system::error_code &ec) {
5859
return;
5960
}
6061

61-
std::lock_guard<std::mutex> lock(mutex_);
62-
timer_ = nullptr;
62+
std::unique_lock<std::mutex> lock(mutex_);
6363

6464
if (nackedMessages_.empty() || !enabledForTesting_) {
6565
return;
@@ -78,6 +78,7 @@ void NegativeAcksTracker::handleTimer(const boost::system::error_code &ec) {
7878
++it;
7979
}
8080
}
81+
lock.unlock();
8182

8283
if (!messagesToRedeliver.empty()) {
8384
consumer_.onNegativeAcksSend(messagesToRedeliver);
@@ -87,34 +88,30 @@ void NegativeAcksTracker::handleTimer(const boost::system::error_code &ec) {
8788
}
8889

8990
void NegativeAcksTracker::add(const MessageId &m) {
90-
std::lock_guard<std::mutex> lock(mutex_);
91-
91+
auto msgId = discardBatch(m);
9292
auto now = Clock::now();
9393

94-
// Erase batch id to group all nacks from same batch
95-
nackedMessages_[discardBatch(m)] = now + nackDelay_;
96-
97-
if (!timer_) {
98-
scheduleTimer();
94+
{
95+
std::lock_guard<std::mutex> lock{mutex_};
96+
// Erase batch id to group all nacks from same batch
97+
nackedMessages_[msgId] = now + nackDelay_;
9998
}
99+
100+
scheduleTimer();
100101
}
101102

102103
void NegativeAcksTracker::close() {
104+
closed_ = true;
105+
boost::system::error_code ec;
106+
timer_->cancel(ec);
103107
std::lock_guard<std::mutex> lock(mutex_);
104-
105-
if (timer_) {
106-
boost::system::error_code ec;
107-
timer_->cancel(ec);
108-
}
109-
timer_ = nullptr;
110108
nackedMessages_.clear();
111109
}
112110

113111
void NegativeAcksTracker::setEnabledForTesting(bool enabled) {
114-
std::lock_guard<std::mutex> lock(mutex_);
115112
enabledForTesting_ = enabled;
116113

117-
if (enabledForTesting_ && !timer_) {
114+
if (enabledForTesting_) {
118115
scheduleTimer();
119116
}
120117
}

lib/NegativeAcksTracker.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <pulsar/ConsumerConfiguration.h>
2323
#include <pulsar/MessageId.h>
2424

25+
#include <atomic>
2526
#include <boost/asio/deadline_timer.hpp>
2627
#include <chrono>
2728
#include <map>
@@ -65,9 +66,9 @@ class NegativeAcksTracker {
6566
typedef typename std::chrono::steady_clock Clock;
6667
std::map<MessageId, Clock::time_point> nackedMessages_;
6768

68-
ExecutorServicePtr executor_;
69-
DeadlineTimerPtr timer_;
70-
bool enabledForTesting_; // to be able to test deterministically
69+
const DeadlineTimerPtr timer_;
70+
std::atomic_bool closed_{false};
71+
std::atomic_bool enabledForTesting_{true}; // to be able to test deterministically
7172

7273
FRIEND_TEST(ConsumerTest, testNegativeAcksTrackerClose);
7374
};

tests/ConsumerTest.cc

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@
2020
#include <pulsar/Client.h>
2121

2222
#include <array>
23+
#include <atomic>
2324
#include <chrono>
2425
#include <ctime>
2526
#include <map>
27+
#include <mutex>
2628
#include <set>
2729
#include <thread>
2830
#include <vector>
@@ -1240,4 +1242,67 @@ TEST(ConsumerTest, testAckNotPersistentTopic) {
12401242

12411243
INSTANTIATE_TEST_CASE_P(Pulsar, ConsumerSeekTest, ::testing::Values(true, false));
12421244

1245+
class InterceptorForNegAckDeadlock : public ConsumerInterceptor {
1246+
public:
1247+
Message beforeConsume(const Consumer& consumer, const Message& message) override { return message; }
1248+
1249+
void onAcknowledge(const Consumer& consumer, Result result, const MessageId& messageID) override {}
1250+
1251+
void onAcknowledgeCumulative(const Consumer& consumer, Result result,
1252+
const MessageId& messageID) override {}
1253+
1254+
void onNegativeAcksSend(const Consumer& consumer, const std::set<MessageId>& messageIds) override {
1255+
duringNegativeAck_ = true;
1256+
// Wait for the next time Consumer::negativeAcknowledge is called
1257+
std::this_thread::sleep_for(std::chrono::milliseconds(500));
1258+
std::lock_guard<std::mutex> lock{mutex_};
1259+
LOG_INFO("onNegativeAcksSend is called for " << consumer.getTopic());
1260+
duringNegativeAck_ = false;
1261+
}
1262+
1263+
static std::mutex mutex_;
1264+
static std::atomic_bool duringNegativeAck_;
1265+
};
1266+
1267+
std::mutex InterceptorForNegAckDeadlock::mutex_;
1268+
std::atomic_bool InterceptorForNegAckDeadlock::duringNegativeAck_{false};
1269+
1270+
// For https://github.com/apache/pulsar-client-cpp/issues/265
1271+
TEST(ConsumerTest, testNegativeAckDeadlock) {
1272+
const std::string topic = "test-negative-ack-deadlock";
1273+
Client client{lookupUrl};
1274+
ConsumerConfiguration conf;
1275+
conf.setNegativeAckRedeliveryDelayMs(500);
1276+
conf.intercept({std::make_shared<InterceptorForNegAckDeadlock>()});
1277+
Consumer consumer;
1278+
ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", conf, consumer));
1279+
1280+
Producer producer;
1281+
ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
1282+
producer.send(MessageBuilder().setContent("msg").build());
1283+
1284+
Message msg;
1285+
ASSERT_EQ(ResultOk, consumer.receive(msg));
1286+
1287+
auto& duringNegativeAck = InterceptorForNegAckDeadlock::duringNegativeAck_;
1288+
duringNegativeAck = false;
1289+
consumer.negativeAcknowledge(msg); // schedule the negative ack timer
1290+
// Wait until the negative ack timer is triggered and onNegativeAcksSend will be called
1291+
for (int i = 0; !duringNegativeAck && i < 100; i++) {
1292+
std::this_thread::sleep_for(std::chrono::milliseconds(10));
1293+
}
1294+
ASSERT_TRUE(duringNegativeAck);
1295+
1296+
{
1297+
std::lock_guard<std::mutex> lock{InterceptorForNegAckDeadlock::mutex_};
1298+
consumer.negativeAcknowledge(msg);
1299+
}
1300+
for (int i = 0; duringNegativeAck && i < 100; i++) {
1301+
std::this_thread::sleep_for(std::chrono::milliseconds(10));
1302+
}
1303+
ASSERT_FALSE(duringNegativeAck);
1304+
1305+
client.close();
1306+
}
1307+
12431308
} // namespace pulsar

0 commit comments

Comments
 (0)