Skip to content

Consumer fails to consume on broker restart #173

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
tejas3190 opened this issue Apr 8, 2017 · 20 comments
Closed

Consumer fails to consume on broker restart #173

tejas3190 opened this issue Apr 8, 2017 · 20 comments

Comments

@tejas3190
Copy link

tejas3190 commented Apr 8, 2017

Hi all, @edenhill

I am facing this issue where when the broker is restarted after which causing the consumer not able to get the latest produce messages
The following steps is what i am trying:

  1. Single consumer subscribed to 6 topics with only 1 Kafka broker.
  2. At startup everything seems to be fine, messages are consumed till reaching EOP.
  3. Consumer is subscribed to 6 topics, messages are produced only on a single topic in this scenario, consumer poll with 1 sec timeout, auto commit enabled, keep alive enabled
  4. After sometime bring down the broker ( in a controlled manner )
  5. After some time bringing up the broker again
  6. Producing more messages to the same topic
  7. Problem is here: Consumer fails to consume the latest produced message

Also posting the logs for the broker & consumer
Can anyone help me on this ? Thanks
Kafka Version -> kafka_2.11-0.10.0.0
librdkafka -> master
confluent-kafka-python -> confluent-kafka (0.9.4)

brokerlog.txt

consumer.txt

@tejas3190
Copy link
Author

Facing similar issue like this one, consumer hangs:
https://www.bountysource.com/issues/43051772-hang-on-rd_kafka_destroy-when-disable-auto-commit-offset

@tejas3190
Copy link
Author

tejas3190 commented Apr 9, 2017

Was able to attach gdb when consumer hangs, here is the backtrace

(gdb) bt
#0 pthread_cond_timedwait@@GLIBC_2.3.2 () at ../nptl/sysdeps/unix/sysv/linux/x86_64/pthread_cond_timedwait.S:238
#1 0x00007fa852a07a55 in cnd_timedwait_ms (cnd=cnd@entry=0x23d3528, mtx=mtx@entry=0x23d3500, timeout_ms=timeout_ms@entry=2147483647) at tinycthread.c:501
#2 0x00007fa8529e61be in rd_kafka_q_pop_serve (rkq=rkq@entry=0x23d3500, timeout_ms=2147483647, timeout_ms@entry=-1, version=version@entry=0, cb_type=cb_type@entry=2,
callback=callback@entry=0x0, opaque=opaque@entry=0x0) at rdkafka_queue.c:321
#3 0x00007fa8529e6300 in rd_kafka_q_pop (rkq=rkq@entry=0x23d3500, timeout_ms=timeout_ms@entry=-1, version=version@entry=0) at rdkafka_queue.c:354
#4 0x00007fa8529c5887 in rd_kafka_consumer_close (rk=0x23cd730) at rdkafka.c:1908
#5 0x00007fa852c3d1ba in Consumer_close (self=0x7fa852377350, ignore=) at confluent_kafka/src/Consumer.c:478
#6 0x000000000049a3b5 in PyEval_EvalFrameEx ()
#7 0x00000000004a090c in PyEval_EvalCodeEx ()
#8 0x0000000000499a52 in PyEval_EvalFrameEx ()
#9 0x0000000000499ef2 in PyEval_EvalFrameEx ()
#10 0x0000000000499ef2 in PyEval_EvalFrameEx ()
#11 0x00000000004a1634 in ?? ()
#12 0x000000000044e4a5 in PyRun_FileExFlags ()
#13 0x000000000044ec9f in PyRun_SimpleFileExFlags ()
#14 0x000000000044f904 in Py_Main ()
#15 0x00007fa85431aec5 in __libc_start_main (main=0x44f9c2

, argc=2, argv=0x7fff7c4024e8, init=, fini=, rtld_fini=,
stack_end=0x7fff7c4024d8) at libc-start.c:287
#16 0x0000000000578c4e in _start ()
(gdb)

@tejas3190
Copy link
Author

tejas3190 commented Apr 10, 2017

The following logs are observed whenever I come across this issue, not sure if this is normal, still learning Kafka.

On Consumer:
%4|1491775846.502|METADATA|rdkafka#consumer-1| [thrd:main]: localhost:9092/0: Metadata request failed: Local: Timed out in queue (60791ms)

On Broker restart:
[2017-04-08 16:54:05,032] INFO [GroupCoordinator 0]: Preparing to restabilize group CHECK_PXX with old generation 1 (kafka.coordinator.GroupCoordinator)
[2017-04-08 16:54:05,097] INFO [GroupCoordinator 0]: Group CHECK_PXX generation 1 is dead and removed (kafka.coordinator.GroupCoordinator

@tejas3190
Copy link
Author

@edenhill Can you please help me out here. Thanks

@edenhill
Copy link
Contributor

In your initial description you dont mention closing down the consumer, but in later comments you mention hang on destroy and provide a backtrace from consumer_close.

Can you clarify what is not working, the consumtion or the closing?

@tejas3190
Copy link
Author

tejas3190 commented Apr 13, 2017

Hi @edenhill sorry if I was not clear earlier, I have been observing issues in both cases -
-When broker is restarted & consumer is polling, the newly produced messages are not received by the consumer, it still keep returning None ( I will try to provide you the backtrace for this too)
-Also I have a config where if the consumer is timing-out for about 3 min then it should call close, this is where I see hung up issue & hence the above backtrace is from consumer_close.

So I am observing a couple of issues here -

  1. Consumer unable to consume latest produced messages on broker restart
  2. Consumer hangs on close.

@tejas3190
Copy link
Author

Hi @edenhill any update over this ?

@edenhill
Copy link
Contributor

LOGGER_CHECK_PXX" GroupCoordinator response error: Broker: Group coordinator not available

do these log messages keep repeating after the broker has been restarted?

@edenhill
Copy link
Contributor

[2017-04-08 16:53:57,635] ERROR [KafkaApi-0] Error when handling request {group_id=dn_test-DN-LOGGER_CHECK_PXX} (kafka.server.KafkaApis)
kafka.admin.AdminOperationException: replication factor: 3 larger than available brokers: 0

When restarting the brokers, do they recover the correct number of replicas to start being operational again?

If you restart the client when this happens, does it recover?

@tejas3190
Copy link
Author

tejas3190 commented Apr 24, 2017

  1. LOGGER_CHECK_PXX" GroupCoordinator response error: Broker: Group coordinator not available do these log messages keep repeating after the broker has been restarted?
    A. No it doesn't, i can see following consumer logs on broker restart:
%7|1491650636.028|COMMIT|rdkafka#consumer-1| [thrd:main]: OffsetCommit internal error: Local: No offset stored
%7|1491650636.028|COMMIT|rdkafka#consumer-1| [thrd:main]: OffsetCommit for 6 partition(s): cgrp auto commit timer: returned: Local: No offset stored
%7|1491650636.028|UNASSIGN|rdkafka#consumer-1| [thrd:main]: Unassign not done yet (0 wait_unassign, 6 assigned, 0 wait commit): OffsetCommit done (__NO_OFFSET)
%7|1491650637.027|CGRPQUERY|rdkafka#consumer-1| [thrd:main]: kafka.dn-services.internal:9092/bootstrap: Group "dn_test-DN-LOGGER_CHECK_PXX": querying for coordinator: intervaled in state query-coord
%7|1491650637.027|CGRPSTATE|rdkafka#consumer-1| [thrd:main]: Group "dn_test-DN-LOGGER_CHECK_PXX" changed state query-coord -> wait-coord (v4, join-state started)
%7|1491650637.636|CGRPCOORD|rdkafka#consumer-1| [thrd:main]: kafka.dn-services.internal:9092/bootstrap: Group "dn_test-DN-LOGGER_CHECK_PXX" GroupCoordinator response error: Broker: Group coordinator not available
%7|1491650637.636|CGRPCOORD|rdkafka#consumer-1| [thrd:main]: Group "dn_test-DN-LOGGER_CHECK_PXX" changing coordinator 0 -> -1
%7|1491650637.636|CGRPSTATE|rdkafka#consumer-1| [thrd:main]: Group "dn_test-DN-LOGGER_CHECK_PXX" changed state wait-coord -> wait-broker (v4, join-state started)
%7|1491650638.028|CGRPQUERY|rdkafka#consumer-1| [thrd:main]: localhost:9092/0: Group "dn_test-DN-LOGGER_CHECK_PXX": querying for coordinator: intervaled in state wait-broker

No logs after this point onwards

@tejas3190
Copy link
Author

tejas3190 commented Apr 24, 2017

  1. When restarting the brokers, do they recover the correct number of replicas to start being operational again?
    => Don't know how to verify this.
  2. If you restart the client when this happens, does it recover?
    => Yes on restart of consumer process everything seems fine & consumer is able to consume the latest produced messages. Thanks

@edenhill
Copy link
Contributor

edenhill commented May 2, 2017

If you reproduce this issue again where it hangs during normal operation (not during close), can you do a thread apply all bt in gdb and provide me the output please?

@tejas3190
Copy link
Author

tejas3190 commented May 10, 2017

Thread 5 (Thread 0x7f402b9de700 (LWP 30236)):
#0  pthread_cond_timedwait@@GLIBC_2.3.2 () at ../nptl/sysdeps/unix/sysv/linux/x86_64/pthread_cond_timedwait.S:238
#1  0x00007f402c100a45 in cnd_timedwait_ms (cnd=cnd@entry=0x25cccd8, mtx=mtx@entry=0x25cccb0, timeout_ms=<optimized out>) at tinycthread.c:501
#2  0x00007f402c0ce335 in rd_kafka_q_serve (rkq=0x25cccb0, timeout_ms=<optimized out>, max_cnt=max_cnt@entry=0, cb_type=cb_type@entry=1, callback=callback@entry=0x0, 
    opaque=opaque@entry=0x0) at rdkafka_queue.c:406
#3  0x00007f402c0a160c in rd_kafka_thread_main (arg=arg@entry=0x26d8b60) at rdkafka.c:1160
#4  0x00007f402c1007c7 in _thrd_wrapper_function (aArg=<optimized out>) at tinycthread.c:624
#5  0x00007f402ddc4182 in start_thread (arg=0x7f402b9de700) at pthread_create.c:312
#6  0x00007f402daf147d in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:111

Thread 4 (Thread 0x7f402b1dd700 (LWP 30237)):
#0  pthread_cond_timedwait@@GLIBC_2.3.2 () at ../nptl/sysdeps/unix/sysv/linux/x86_64/pthread_cond_timedwait.S:238
#1  0x00007f402c100a45 in cnd_timedwait_ms (cnd=cnd@entry=0x26a4528, mtx=mtx@entry=0x26a4500, timeout_ms=timeout_ms@entry=999) at tinycthread.c:501
#2  0x00007f402c0ce0ce in rd_kafka_q_pop_serve (rkq=0x26a4500, timeout_ms=999, version=version@entry=0, cb_type=cb_type@entry=2, callback=callback@entry=0x0, 
    opaque=opaque@entry=0x0) at rdkafka_queue.c:329
#3  0x00007f402c0ce210 in rd_kafka_q_pop (rkq=<optimized out>, timeout_ms=<optimized out>, version=version@entry=0) at rdkafka_queue.c:360
#4  0x00007f402c0b5f04 in rd_kafka_broker_serve (rkb=rkb@entry=0x26d9240, abs_timeout=abs_timeout@entry=3628636159751) at rdkafka_broker.c:3254
#5  0x00007f402c0b63ef in rd_kafka_broker_ua_idle (rkb=rkb@entry=0x26d9240, timeout_ms=<optimized out>, timeout_ms@entry=-1) at rdkafka_broker.c:3330
#6  0x00007f402c0b6b55 in rd_kafka_broker_thread_main (arg=arg@entry=0x26d9240) at rdkafka_broker.c:4752
#7  0x00007f402c1007c7 in _thrd_wrapper_function (aArg=<optimized out>) at tinycthread.c:624
#8  0x00007f402ddc4182 in start_thread (arg=0x7f402b1dd700) at pthread_create.c:312
#9  0x00007f402daf147d in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:111

Thread 3 (Thread 0x7f402a9dc700 (LWP 30238)):
#0  0x00007f402dae412d in poll () at ../sysdeps/unix/syscall-template.S:81
#1  0x00007f402db01bfe in __poll_chk (fds=<optimized out>, nfds=<optimized out>, timeout=<optimized out>, fdslen=<optimized out>) at poll_chk.c:27
#2  0x00007f402c0c7695 in poll (__timeout=999, __nfds=<optimized out>, __fds=<optimized out>) at /usr/include/x86_64-linux-gnu/bits/poll2.h:41
#3  rd_kafka_transport_poll (rktrans=rktrans@entry=0x7f40180019b0, tmout=tmout@entry=999) at rdkafka_transport.c:1264
#4  0x00007f402c0c772b in rd_kafka_transport_io_serve (rktrans=0x7f40180019b0, timeout_ms=timeout_ms@entry=999) at rdkafka_transport.c:1123
#5  0x00007f402c0b5f28 in rd_kafka_broker_serve (rkb=rkb@entry=0x26d9920, abs_timeout=abs_timeout@entry=3628636338268) at rdkafka_broker.c:3278
#6  0x00007f402c0b63ef in rd_kafka_broker_ua_idle (rkb=rkb@entry=0x26d9920, timeout_ms=<optimized out>, timeout_ms@entry=-1) at rdkafka_broker.c:3330
#7  0x00007f402c0b6b55 in rd_kafka_broker_thread_main (arg=arg@entry=0x26d9920) at rdkafka_broker.c:4752
#8  0x00007f402c1007c7 in _thrd_wrapper_function (aArg=<optimized out>) at tinycthread.c:624
#9  0x00007f402ddc4182 in start_thread (arg=0x7f402a9dc700) at pthread_create.c:312
#10 0x00007f402daf147d in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:111

Thread 2 (Thread 0x7f402a1db700 (LWP 30239)):
#0  0x00007f402dae412d in poll () at ../sysdeps/unix/syscall-template.S:81
#1  0x00007f402db01bfe in __poll_chk (fds=<optimized out>, nfds=<optimized out>, timeout=<optimized out>, fdslen=<optimized out>) at poll_chk.c:27
#2  0x00007f402c0c7695 in poll (__timeout=999, __nfds=<optimized out>, __fds=<optimized out>) at /usr/include/x86_64-linux-gnu/bits/poll2.h:41
#3  rd_kafka_transport_poll (rktrans=rktrans@entry=0x7f4010001ae0, tmout=tmout@entry=999) at rdkafka_transport.c:1264
#4  0x00007f402c0c772b in rd_kafka_transport_io_serve (rktrans=0x7f4010001ae0, timeout_ms=timeout_ms@entry=999) at rdkafka_transport.c:1123
#5  0x00007f402c0b5f28 in rd_kafka_broker_serve (rkb=rkb@entry=0x7f4024003350, abs_timeout=3628636543184) at rdkafka_broker.c:3278
#6  0x00007f402c0b6d7f in rd_kafka_broker_consumer_serve (rkb=0x7f4024003350) at rdkafka_broker.c:4644
#7  rd_kafka_broker_thread_main (arg=arg@entry=0x7f4024003350) at rdkafka_broker.c:4756
#8  0x00007f402c1007c7 in _thrd_wrapper_function (aArg=<optimized out>) at tinycthread.c:624
#9  0x00007f402ddc4182 in start_thread (arg=0x7f402a1db700) at pthread_create.c:312
#10 0x00007f402daf147d in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:111

Thread 1 (Thread 0x7f402e1e9740 (LWP 30235)):
#0  pthread_cond_timedwait@@GLIBC_2.3.2 () at ../nptl/sysdeps/unix/sysv/linux/x86_64/pthread_cond_timedwait.S:238
#1  0x00007f402c100a45 in cnd_timedwait_ms (cnd=cnd@entry=0x26991e8, mtx=mtx@entry=0x26991c0, timeout_ms=timeout_ms@entry=999) at tinycthread.c:501
#2  0x00007f402c0ce0ce in rd_kafka_q_pop_serve (rkq=rkq@entry=0x26991c0, timeout_ms=999, version=version@entry=0, cb_type=cb_type@entry=2, callback=callback@entry=0x0, 
    opaque=opaque@entry=0x0) at rdkafka_queue.c:329
#3  0x00007f402c0ce210 in rd_kafka_q_pop (rkq=rkq@entry=0x26991c0, timeout_ms=<optimized out>, version=version@entry=0) at rdkafka_queue.c:360
#4  0x00007f402c0a5c5c in rd_kafka_consume0 (rk=0x26d8b60, rkq=0x26991c0, timeout_ms=<optimized out>) at rdkafka.c:1829
#5  0x00007f402c33bc47 in Consumer_poll (self=0x7f402ba59160, args=<optimized out>, kwargs=<optimized out>) at confluent_kafka/src/Consumer.c:457
#6  0x000000000049ec76 in PyEval_EvalFrameEx ()
#7  0x0000000000499ef2 in PyEval_EvalFrameEx ()
#8  0x0000000000499ef2 in PyEval_EvalFrameEx ()
#9  0x00000000004a1634 in ?? ()
#10 0x000000000044e4a5 in PyRun_FileExFlags ()
#11 0x000000000044ec9f in PyRun_SimpleFileExFlags ()
#12 0x000000000044f904 in Py_Main ()
#13 0x00007f402da18ec5 in __libc_start_main (main=0x44f9c2 <main>, argc=2, argv=0x7fff9423f848, init=<optimized out>, fini=<optimized out>, rtld_fini=<optimized out>, 
    stack_end=0x7fff9423f838) at libc-start.c:287
#14 0x0000000000578c4e in _start ()

@tejas3190
Copy link
Author

tejas3190 commented May 10, 2017

Also I am observing after detaching from gdb the consumer start working again

@edenhill
Copy link
Contributor

The gdb backtraces look fine.

@edenhill
Copy link
Contributor

edenhill commented May 11, 2017


%7|1491650637.636|CGRPSTATE|rdkafka#consumer-1| [thrd:main]: Group "dn_test-DN-LOGGER_CHECK_PXX" changed state wait-coord -> wait-broker (v4, join-state started)
%7|1491650638.028|CGRPQUERY|rdkafka#consumer-1| [thrd:main]: localhost:9092/0: Group "dn_test-DN-LOGGER_CHECK_PXX": querying for coordinator: intervaled in state wait-broker

After this log there should be coordinator queries showing up every 1000ms, very weird that they are not appearing, and according to gdb nothing is dead-locked internally either.

In this case your consumer is just sitting there, waiting for more messages, right? You have made no attempt to close the consumer at this point, right?

Is this reproducible with the example consumer ?

@tejas3190
Copy link
Author

Yes i am just polling with timeout=1 and no attempt was made to close the consumer. On trying multiple time I have observed following backtrace, in this case backtrace for thread 3 seems different from previous backtraces, not sure if this is ok. I will try out with the examples. Thanks

Thread 5 (Thread 0x7fcdae496700 (LWP 922)):

#0  pthread_cond_timedwait@@GLIBC_2.3.2 () at ../nptl/sysdeps/unix/sysv/linux/x86_64/pthread_cond_timedwait.S:238

#1  0x00007fcdaebb8a45 in cnd_timedwait_ms (cnd=cnd@entry=0x1500cd8, mtx=mtx@entry=0x1500cb0, timeout_ms=<optimized out>) at tinycthread.c:501

#2  0x00007fcdaeb86335 in rd_kafka_q_serve (rkq=0x1500cb0, timeout_ms=<optimized out>, max_cnt=max_cnt@entry=0, cb_type=cb_type@entry=1, callback=callback@entry=0x0, 

    opaque=opaque@entry=0x0) at rdkafka_queue.c:406

#3  0x00007fcdaeb5960c in rd_kafka_thread_main (arg=arg@entry=0x160cb60) at rdkafka.c:1160

#4  0x00007fcdaebb87c7 in _thrd_wrapper_function (aArg=<optimized out>) at tinycthread.c:624

#5  0x00007fcdb087c182 in start_thread (arg=0x7fcdae496700) at pthread_create.c:312

#6  0x00007fcdb05a947d in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:111



Thread 4 (Thread 0x7fcdadc95700 (LWP 923)):

#0  pthread_cond_timedwait@@GLIBC_2.3.2 () at ../nptl/sysdeps/unix/sysv/linux/x86_64/pthread_cond_timedwait.S:238

#1  0x00007fcdaebb8a45 in cnd_timedwait_ms (cnd=cnd@entry=0x15d8528, mtx=mtx@entry=0x15d8500, timeout_ms=timeout_ms@entry=999) at tinycthread.c:501

#2  0x00007fcdaeb860ce in rd_kafka_q_pop_serve (rkq=0x15d8500, timeout_ms=999, version=version@entry=0, cb_type=cb_type@entry=2, callback=callback@entry=0x0, 

    opaque=opaque@entry=0x0) at rdkafka_queue.c:329

#3  0x00007fcdaeb86210 in rd_kafka_q_pop (rkq=<optimized out>, timeout_ms=<optimized out>, version=version@entry=0) at rdkafka_queue.c:360

#4  0x00007fcdaeb6df04 in rd_kafka_broker_serve (rkb=rkb@entry=0x160d240, abs_timeout=abs_timeout@entry=3637169542818) at rdkafka_broker.c:3254

#5  0x00007fcdaeb6e3ef in rd_kafka_broker_ua_idle (rkb=rkb@entry=0x160d240, timeout_ms=<optimized out>, timeout_ms@entry=-1) at rdkafka_broker.c:3330

#6  0x00007fcdaeb6eb55 in rd_kafka_broker_thread_main (arg=arg@entry=0x160d240) at rdkafka_broker.c:4752

#7  0x00007fcdaebb87c7 in _thrd_wrapper_function (aArg=<optimized out>) at tinycthread.c:624

#8  0x00007fcdb087c182 in start_thread (arg=0x7fcdadc95700) at pthread_create.c:312

#9  0x00007fcdb05a947d in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:111



Thread 3 (Thread 0x7fcda7fff700 (LWP 924)):

#0  0x00007fcdb0883b9d in nanosleep () at ../sysdeps/unix/syscall-template.S:81

#1  0x00007fcdaeb6e7ea in rd_usleep (usec=100000, terminate=0x160cf6c) at rdposix.h:114

#2  rd_kafka_broker_thread_main (arg=arg@entry=0x160d920) at rdkafka_broker.c:4765

#3  0x00007fcdaebb87c7 in _thrd_wrapper_function (aArg=<optimized out>) at tinycthread.c:624

#4  0x00007fcdb087c182 in start_thread (arg=0x7fcda7fff700) at pthread_create.c:312

#5  0x00007fcdb05a947d in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:111



Thread 2 (Thread 0x7fcdad494700 (LWP 925)):

#0  0x00007fcdb059c12d in poll () at ../sysdeps/unix/syscall-template.S:81

#1  0x00007fcdb05b9bfe in __poll_chk (fds=<optimized out>, nfds=<optimized out>, timeout=<optimized out>, fdslen=<optimized out>) at poll_chk.c:27

#2  0x00007fcdaeb7f695 in poll (__timeout=999, __nfds=<optimized out>, __fds=<optimized out>) at /usr/include/x86_64-linux-gnu/bits/poll2.h:41

#3  rd_kafka_transport_poll (rktrans=rktrans@entry=0x7fcd9c001ae0, tmout=tmout@entry=999) at rdkafka_transport.c:1264

#4  0x00007fcdaeb7f72b in rd_kafka_transport_io_serve (rktrans=0x7fcd9c001ae0, timeout_ms=timeout_ms@entry=999) at rdkafka_transport.c:1123

#5  0x00007fcdaeb6df28 in rd_kafka_broker_serve (rkb=rkb@entry=0x7fcda8003350, abs_timeout=3637170215536) at rdkafka_broker.c:3278

#6  0x00007fcdaeb6ed7f in rd_kafka_broker_consumer_serve (rkb=0x7fcda8003350) at rdkafka_broker.c:4644

#7  rd_kafka_broker_thread_main (arg=arg@entry=0x7fcda8003350) at rdkafka_broker.c:4756

#8  0x00007fcdaebb87c7 in _thrd_wrapper_function (aArg=<optimized out>) at tinycthread.c:624

#9  0x00007fcdb087c182 in start_thread (arg=0x7fcdad494700) at pthread_create.c:312
#10 0x00007fcdb05a947d in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:111

Thread 1 (Thread 0x7fcdb0ca1740 (LWP 921)):
#0  pthread_cond_timedwait@@GLIBC_2.3.2 () at ../nptl/sysdeps/unix/sysv/linux/x86_64/pthread_cond_timedwait.S:238
#1  0x00007fcdaebb8a45 in cnd_timedwait_ms (cnd=cnd@entry=0x15cd1e8, mtx=mtx@entry=0x15cd1c0, timeout_ms=timeout_ms@entry=999) at tinycthread.c:501
#2  0x00007fcdaeb860ce in rd_kafka_q_pop_serve (rkq=rkq@entry=0x15cd1c0, timeout_ms=999, version=version@entry=0, cb_type=cb_type@entry=2, callback=callback@entry=0x0, 
    opaque=opaque@entry=0x0) at rdkafka_queue.c:329
#3  0x00007fcdaeb86210 in rd_kafka_q_pop (rkq=rkq@entry=0x15cd1c0, timeout_ms=<optimized out>, version=version@entry=0) at rdkafka_queue.c:360
#4  0x00007fcdaeb5dc5c in rd_kafka_consume0 (rk=0x160cb60, rkq=0x15cd1c0, timeout_ms=<optimized out>) at rdkafka.c:1829
#5  0x00007fcdaedf3c47 in Consumer_poll (self=0x7fcdae511160, args=<optimized out>, kwargs=<optimized out>) at confluent_kafka/src/Consumer.c:457
#6  0x000000000049ec76 in PyEval_EvalFrameEx ()
#7  0x0000000000499ef2 in PyEval_EvalFrameEx ()
#8  0x0000000000499ef2 in PyEval_EvalFrameEx ()
#9  0x00000000004a1634 in ?? ()
#10 0x000000000044e4a5 in PyRun_FileExFlags ()
#11 0x000000000044ec9f in PyRun_SimpleFileExFlags ()
#12 0x000000000044f904 in Py_Main ()
#13 0x00007fcdb04d0ec5 in __libc_start_main (main=0x44f9c2 <main>, argc=2, argv=0x7fff07035a38, init=<optimized out>, fini=<optimized out>, rtld_fini=<optimized out>, 
    stack_end=0x7fff07035a28) at libc-start.c:287
#14 0x0000000000578c4e in _start () 

@tejas3190
Copy link
Author

Closing this as it's not reproducible with librdkafka 0.9.4 & confluent-kafka 0.9.4. Thanks @edenhill

@tejas3190
Copy link
Author

tejas3190 commented May 16, 2017

Hi @edenhill Is it normal that consumer.poll() returning message object with topic=None. I have the condition to check for topics before consuming the message & have observed that at some very rare occasion the topic is None.

@edenhill
Copy link
Contributor

Proper messages will always have topic set, but since the Message object also double as a consumer (error) event it might be non-topic-specific consumer events you are seeing.
Upon getting a Message object back from poll() you should start with check if an error code is set.
If no error code is set it is a proper message.

See here:
https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/consumer.py#L85

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants