Skip to content

[Bug] Deadlock for negative acknowledgment #265

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
2 tasks done
BewareMyPower opened this issue May 6, 2023 · 0 comments · Fixed by #266
Closed
2 tasks done

[Bug] Deadlock for negative acknowledgment #265

BewareMyPower opened this issue May 6, 2023 · 0 comments · Fixed by #266

Comments

@BewareMyPower
Copy link
Contributor

BewareMyPower commented May 6, 2023

Search before asking

  • I searched in the issues and found nothing similar.

Version

  • Pulsar: 2.11.0
  • Client: master (4338d45)

Minimal reproduce step

#include <pulsar/Client.h>

#include <atomic>
#include <chrono>
#include <mutex>
#include <sstream>
#include <thread>

using namespace pulsar;

static std::mutex gMutex;
static std::atomic_bool gDuringIntercept{false};

class MyConsumerIntercepter : public ConsumerInterceptor {
   public:
    Message beforeConsume(const Consumer& consumer, const Message& message) override { return message; }

    void onAcknowledge(const Consumer& consumer, Result result, const MessageId& messageID) override {}

    void onAcknowledgeCumulative(const Consumer& consumer, Result result,
                                 const MessageId& messageID) override {}

    void onNegativeAcksSend(const Consumer& consumer, const std::set<MessageId>& messageIds) override {
        auto id = std::this_thread::get_id();
        std::ostringstream oss;
        oss << id << " onNegativeAcksSend for ";
        for (auto&& msgId : messageIds) {
            oss << " " << msgId;
        }
        std::cout << oss.str() << std::endl;
        gDuringIntercept = true;
        // Wait for `consumer.negativeAcknowledge` being called
        std::this_thread::sleep_for(std::chrono::seconds(2));
        std::lock_guard<std::mutex> lock{gMutex};
        std::cout << id << " onNegativeAcksSend acquired gMutex" << std::endl;
    }
};

int main() {
    Client client("pulsar://localhost:6650");

    const auto topic = "my-topic";

    auto interceptor = std::make_shared<MyConsumerIntercepter>();
    ConsumerConfiguration conf;
    conf.setNegativeAckRedeliveryDelayMs(1000);
    conf.intercept({interceptor});
    Consumer consumer;
    client.subscribe(topic, "sub", conf, consumer);

    Producer producer;
    client.createProducer(topic, producer);

    producer.send(MessageBuilder().setContent("content").build());
    Message msg;
    consumer.receive(msg);
    consumer.negativeAcknowledge(msg);

    // Wait for negative acknowledge timer
    while (!gDuringIntercept) {
        std::this_thread::sleep_for(std::chrono::milliseconds(200));
    }

    // Deadlock happens here
    {
        std::lock_guard<std::mutex> lock{gMutex};
        std::cout << std::this_thread::get_id() << " Acquired gMutex, call negativeAcknowledge" << std::endl;
        consumer.negativeAcknowledge(msg);
    }

    client.close();
}

What did you expect to see?

The application should stop after a few seconds.

What did you see instead?

The application hangs forever. Here are the key stacks that show the deadlock:

Thread 2 (Thread 0x7fb597771700 (LWP 16231)):
#0  __lll_lock_wait (futex=futex@entry=0x559b17124180 <gMutex>, private=0) at lowlevellock.c:52
#1  0x00007fb598f910a3 in __GI___pthread_mutex_lock (mutex=0x559b17124180 <gMutex>) at ../nptl/pthread_mutex_lock.c:80
#2  0x0000559b1711cefc in __gthread_mutex_lock (__mutex=0x559b17124180 <gMutex>) at /usr/include/x86_64-linux-gnu/c++/9/bits/gthr-default.h:749
#3  0x0000559b1711d184 in std::mutex::lock (this=0x559b17124180 <gMutex>) at /usr/include/c++/9/bits/std_mutex.h:100
#4  0x0000559b1711dda0 in std::lock_guard<std::mutex>::lock_guard (this=0x7fb59776fd88, __m=...) at /usr/include/c++/9/bits/std_mutex.h:159
#5  0x0000559b1711d5b2 in MyConsumerIntercepter::onNegativeAcksSend (this=0x559b17f1daa0, consumer=..., messageIds=Python Exception <class 'AttributeError'> 'NoneType' object has no attribute 'pointer':
std::set with 1 element) at /home/xyz/pulsar-client-cpp/examples/SampleProducer.cc:52
Thread 1 (Thread 0x7fb59777cc00 (LWP 16230)):
#0  __lll_lock_wait (futex=futex@entry=0x7fb590003df0, private=0) at lowlevellock.c:52
#1  0x00007fb598f910a3 in __GI___pthread_mutex_lock (mutex=0x7fb590003df0) at ../nptl/pthread_mutex_lock.c:80
#2  0x0000559b1711cefc in __gthread_mutex_lock (__mutex=0x7fb590003df0) at /usr/include/x86_64-linux-gnu/c++/9/bits/gthr-default.h:749
--Type <RET> for more, q to quit, c to continue without paging--
#3  0x0000559b1711d184 in std::mutex::lock (this=0x7fb590003df0) at /usr/include/c++/9/bits/std_mutex.h:100
#4  0x0000559b1711dda0 in std::lock_guard<std::mutex>::lock_guard (this=0x7ffe098062d0, __m=...) at /usr/include/c++/9/bits/std_mutex.h:159
#5  0x00007fb5997ac01f in pulsar::NegativeAcksTracker::add (this=0x7fb590003de8, m=...) at /home/xyz/pulsar-client-cpp/lib/NegativeAcksTracker.cc:90
#6  0x00007fb5996ef3e4 in pulsar::ConsumerImpl::negativeAcknowledge (this=0x7fb590002f20, messageId=...) at /home/xyz/pulsar-client-cpp/lib/ConsumerImpl.cc:1198

Anything else?

Though the reproduce code uses the intercepter feature that is not included in any release, this bug also affects the Python client that depends on any existing C++ client. See detailed analysis in apache/pulsar-client-python#116 (comment)

This deadlock issue could also be reproduced for a custom logger. That's why apache/pulsar-client-python#116 suffers this bug.

Are you willing to submit a PR?

  • I'm willing to submit a PR!
BewareMyPower added a commit to BewareMyPower/pulsar-client-cpp that referenced this issue May 6, 2023
Fixes apache#265

### Modifications

Make `timer_` const and `enabledForTesting_` atomic in
`NegativeAcksTracker` so that the `mutex_` can be used only for the
`nackedMessages_` field. After that, we can unlock `mutex_` in
`handleTimer` to avoid the potential deadlock from user-provided logger
or intercepter.

Add `ConsumerTest.testNegativeAckDeadlock` to verify the fix.
BewareMyPower added a commit that referenced this issue May 6, 2023
Fixes #265

### Modifications

Make `timer_` const and `enabledForTesting_` atomic in
`NegativeAcksTracker` so that the `mutex_` can be used only for the
`nackedMessages_` field. After that, we can unlock `mutex_` in
`handleTimer` to avoid the potential deadlock from user-provided logger
or intercepter.

Add `ConsumerTest.testNegativeAckDeadlock` to verify the fix.
BewareMyPower added a commit that referenced this issue May 6, 2023
Fixes #265

### Modifications

Make `timer_` const and `enabledForTesting_` atomic in
`NegativeAcksTracker` so that the `mutex_` can be used only for the
`nackedMessages_` field. After that, we can unlock `mutex_` in
`handleTimer` to avoid the potential deadlock from user-provided logger
or intercepter.

Add `ConsumerTest.testNegativeAckDeadlock` to verify the fix.

(cherry picked from commit 8a9b2dc)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant