Skip to content

Finalization kills flush #58

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
CorwinTanner opened this issue Nov 3, 2016 · 12 comments
Closed

Finalization kills flush #58

CorwinTanner opened this issue Nov 3, 2016 · 12 comments

Comments

@CorwinTanner
Copy link

When calling flush() on a producer and then immediately dereferencing the object, the buffer does not actually get flushed.

# send 100 messages with producer

producer.flush()
producer = None

Acknowledgement is set to none. Placing a sleep after producer.flush() allows messages to be flushed. Placing a sleep after producer = None does not allow messages to be flushed.

Note: This may only occur on fast hardware.

@edenhill
Copy link
Contributor

edenhill commented Nov 3, 2016

That's weird, flush() is supposed to block until all outstanding messages have been delivered or failed.
Can you print(len(producer)) before and after the flush() call?

@CorwinTanner
Copy link
Author

Before: 120
After: 0

And yet they never actually make it. So it appears that the buffer is being cleared, but they're not actually being sent.

@edenhill
Copy link
Contributor

edenhill commented Nov 3, 2016

What's your producer config?

@CorwinTanner
Copy link
Author

CorwinTanner commented Nov 3, 2016

{
  'group.id': None,
  'session.timeout.ms': 6000,
  'enable.auto.commit': False,
  'fetch.min.bytes': 1,
  'fetch.wait.max.ms': 1,
  'max.partition.fetch.bytes': 50 * MiB,
  'api.version.request': True,
  'broker.version.fallback': '0.8.2.1',
  'kafka_ack': 0
}

@edenhill
Copy link
Contributor

edenhill commented Nov 3, 2016

kafka_ackis not a confluent-kafka-python configuration property:
(most of those properties are consumer properties and thus ignored by the producer)

>>> p = confluent_kafka.Producer({'kafka_ack':None})
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
cimpl.KafkaException: KafkaError{code=_INVALID_ARG,val=-186,str="No such configuration property: "kafka_ack""}

@CorwinTanner
Copy link
Author

Sorry, I gave the consumer config. This is the producer config:

{
  'api.version.request': True,
  'broker.version.fallback': '0.8.2.1',
  'bootstrap.servers': '', # list of brokers
  'default.topic.config': {
      'acks': 0,
   },
}

@edenhill
Copy link
Contributor

edenhill commented Nov 3, 2016

And that's your culprit: acks specifies the number of required acks a producer must receive from the broker for deeming the message produced.
When you give it 0 the producer wont have to way for any ack from the broker so as soon as the producer has sent the messages they are deemed succesful.
In your case with the immediate destruction of the producer it will close the socket immediately and purge any messages waiting in the kernel socket buffer to be sent to the broker.
The faster your hardware is (or slower the network), the bigger risk this is of happening.

You should generally not use acks: 0 unless you are fine with message loss,
you should at least go for acks=1 (acked by leader broker only), or if you want each message to be replicated to in-sync replicas use acks=all.

@CorwinTanner
Copy link
Author

So the definition of "sent" does not actually include flushing the socket? I understand that not waiting for acknowledgement poses the risk that a message may not be accepted (i.e. failure), but I think that finalization or flushing should at least give a guarantee of the packets being sent at the socket level.

We're not talking about a little message loss either. I'm not seeing any of the 120 message get delivered in this example.

@ewencp
Copy link
Contributor

ewencp commented Nov 4, 2016

@CorwinTanner These are the semantics of the Java producer as well -- the docs for acks=0 say "The record will be immediately added to the socket buffer and considered sent.". This makes sense anyway since flushing the socket buffer doesn't make any guarantees that the server will have received any of the messages either. If you want a faster ack but less likelihood of losing data, you should use acks=1.

@CorwinTanner
Copy link
Author

CorwinTanner commented Nov 4, 2016

I'm not looking for a guarantee of receipt. I understand that's what acks is for. However, without paying the cost of acknowledgement, I would like to see a guarantee that the packets were sent, otherwise flush() has limited value.

This (as well as allowing a NODELAY) would provide better support for applications with near-realtime requirements and move the risk of data loss from the application level to primarily the network.

@edenhill
Copy link
Contributor

edenhill commented Nov 7, 2016

You could minimize the socket send buffer to achieve the same result as a socket-level flush:
Try setting socket.send.buffer.bytes as low as permitted by your platform,
a single Kafka Produce request is at least 68 bytes.

@edenhill
Copy link
Contributor

edenhill commented Nov 7, 2016

Some more context:
http://stackoverflow.com/questions/855544/is-there-a-way-to-flush-a-posix-socket

For reference there is support in librdkafka master for disabling Nagle: socket.nagle.disable=true

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants