Skip to content

Producer compression not enabled #969

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
7 tasks
lafrinte opened this issue Oct 16, 2020 · 2 comments
Closed
7 tasks

Producer compression not enabled #969

lafrinte opened this issue Oct 16, 2020 · 2 comments

Comments

@lafrinte
Copy link

lafrinte commented Oct 16, 2020

Description

It looks like Producer compression might not be working. for some advise in #480 provide by @edenhill

It will only compress data if it actually becomes smaller than the uncompressed data.

i try to use gzip(the python default module) to compress the source json bytes to make sure compressed data is smaller than before. how can i make sure the compression.type is enabled and what kind of data is the right data.

In [6]: import gzip

In [7]: gdata = gzip.compress(jdata)

In [8]: len(gdata)
Out[8]: 1967

In [9]: len(jdata)
Out[9]: 9904

How to reproduce

  • producer
In [1]: from confluent_kafka import Producer

In [2]: p = Producer({'bootstrap.servers': '172.18.234.75:9092', 'compression.type': 'gzip'})

In [3]: data = dict(a=list(range(1000)), b=['a'] * 1000)

In [4]: jdata = json.dumps(data).encode()

In [5]: p.produce('opsapi', jdata)

  • consumer
In [1]: from confluent_kafka import Consumer

In [2]: c = Consumer({'bootstrap.servers': '172.18.234.75:9092', 'compression.type': 'gzip', 'group.i
   ...: d': 'opsapi'})

In [3]: c.subscribe(['opsapi'])

In [4]: output = c.consume()

In [5]: output[0].value()
Out[12]: b'{"a": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, ...}'

Checklist

Please provide the following information:

  • confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()):
In [1]: import confluent_kafka

In [2]: confluent_kafka.version()
Out[2]: ('1.5.0', 17104896)

In [3]: confluent_kafka.libversion()
Out[3]: ('1.5.0', 17105151)
  • Apache Kafka broker version:
2.3.0
  • Client configuration: {...}
p = Producer({'bootstrap.servers': '172.18.234.75:9092', 'compression.type': 'gzip'})
  • Operating system:
Linux host-172-18-234-75 3.10.0-693.el7.x86_64 #1 SMP Tue Aug 22 21:09:27 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux
CentOS Linux release 7.4.1708 (Core)
  • Provide client logs (with 'debug': '..' as necessary)
  • Provide broker log excerpts
  • Critical issue
@edenhill
Copy link
Contributor

Compression is performed on Kafka message batches and is not visible to the application, the producer will automatically compress (if compression.type is set) and the consumer will automatically decompress as needed.
So how are you verifying that compression is not being used?

@lafrinte
Copy link
Author

it seem i make a mistake, i perfer to use consumer without setting the compression.type will get the compressed data. i will close this issue. thank you for replay

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants