Skip to content

Integrate client with Confluent schema registry and Avro #40

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 21, 2016
Merged

Integrate client with Confluent schema registry and Avro #40

merged 1 commit into from
Nov 21, 2016

Conversation

roopahc
Copy link
Contributor

@roopahc roopahc commented Sep 7, 2016

Fixes : #36

@ghost
Copy link

ghost commented Sep 7, 2016

It looks like @roopahc hasn't signed our Contributor License Agreement, yet.

Appreciation of efforts,

clabot

@criccomini
Copy link

@edenhill @ewencp Would appreciate a review when you've got the cycles. :)

@roopahc
Copy link
Contributor Author

roopahc commented Sep 8, 2016

@ConfluentCLABot Have signed CLA

pass


class ContextStingIO(io.BytesIO):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should be String

@matthayes
Copy link

FYI I tried testing this and was able to consume message using kafka-avro-console-consumer. Thanks for working on this 👍

@criccomini
Copy link

Hi Matt! :)

@matthayes
Copy link

Hey! :)

self.log.error("Schema required for key serialization")
raise SerializerError("Avro schema required for key")

self.producer.produce(topic, value, key)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kwargs is being ignored, so if you pass callback/on_delivery or partition they aren't used.

@criccomini
Copy link

@edenhill @ewencp ping

Copy link
Contributor

@ewencp ewencp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@roopahc This looks like a great start. I took a pass on all the main code (didn't go through tests thoroughly). I think the main thing that would need to be decided is the composition/delegation approach to the API vs inheriting from the Producer class. There were some other comments, but they are mostly details.

schemaRegistryClient = CachedSchemaRegistryClient(url=schemaRegistryUrl)
serializer = MessageSerializer(schemaRegistryClient)

avroProducer = AvroProducer(producer, serializer)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting -- I was imagining just using a subclass of Producer. In the case of Producer the subclass probably doesn't save too much, but it also means that you only have to override the one method. With the delegation approach used here, you need to re-implement the API to do delegation (e.g. you are currently missing poll() and flush() methods in this implementation.

You also separate out the serializer class. Are you thinking of reusing this elsewhere or could it just be integrated directly into the AvroProducer class? I'm imagining that it would be a lot simpler and more concise for users if they could just do something like:

from confluent_kafka.avro import AvroProducer

producer = AvroProducer({'bootstrap.servers': 'mybroker,mybroker2'}, schema_registry_url='https://<host>:<port>')
producer.produce(...)

Are there drawbacks to this? Even if you wanted some of the helper classes to still be available separately (CachedSchemaRegistryClient in particular seems useful to have separate), they could be instantiated by the producer itself since it is already specific to the serialization format.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ewencp I started with inheritance, but it did not let me inherit class Producer.

Traceback (most recent call last):
  File "test.py", line 4, in <module>
    class AvroProducer(Producer):
TypeError: Error when calling the metaclass bases
    type 'cimpl.Producer' is not an acceptable base type

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, interesting. I didn't realize that making C classes subclassable apparently requires some extra work. Some brief Googling suggests it is definitely possible, but does require some extra work. @edenhill Any idea what's involved and if it will make sense to get a cleaner design?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will look into the sub-classing problem, and I agree with @ewencp that it would be preferred to wrapping it.

Sends message to kafka by encoding with specified avro schema
@:param: topic: topic name
@:param: value: A dictionary object
@:param: value_avro_schema : Avro schema for value
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd shorten these names to value_schema and key_schema -- we're in an Avro-specific class, so they couldn't really be anything else.

I don't know if people might also find it useful to be able to set the schema as a default in the constructor? In languages like Java where the schema is attached anyway and is handled automatically if you use code generation, this isn't as annoying. It might be a bit more annoying to have to pass the same schema (the normal case) into every produce call. Not sure how valuable this will be since usually you only have a small number of call sites to produce() anyway, but seems annoying to have to carry the schema around separately everywhere.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think a topic->schema mapping in the constructor would suffice?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd have to do two: one for keys and one for values

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@criccomini I like that, but there are definitely use cases where people intentionally publish data with different schemas to the same topic. So I guess it kind of depends on how opinionated we want to be...

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but there are definitely use cases where people intentionally publish data with different schemas to the same topic

Won't the schema registry reject that (if compatibility is turned on)?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I agree it's possible. It's just inconvenient, lol. :)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another annoying option would be to provide a wrapper class for key/value to emulate Java. This would allow us to exactly mirror the parent's produce() method. The wrapper would have like payload and schema fields. AvroProducer would take the schema field out and use it, and pass the serialized payload into the parent producer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, people turn off compatibility for those cases. I find the pattern questionable, but people do it...

The most convenient way for users might be to just use produce(*args, **kwargs) and sort out the details in the header of the method (i.e. 1 positional arg = value, 2 position args = (key, value), kwargs just work by name). The obvious drawback is lack of documentation in the method signature.

value = self.serializer.encode_record_with_schema(topic, value_avro_schema, value)
else:
self.log.error("Schema required for value serialization")
raise SerializerError("Avro schema required for value")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the Java serializer we auto-generate schemas for primitive types. Should we consider doing that here to make it easier to use? I think this is especially common for keys.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be OK to leave this as a separate Github issue/PR? I agree it's useful, but I don't think this PR needs to be exhaustive, and I think you should be able to bolt it on without backwards incompatible changes right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, sure that's fine. Just a gap I saw.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally, I think this PR should go ahead with supplying schemas for primitive types. Right now, this code only accepts records.

self.serializer = message_serializer
self.log = logging.getLogger(__name__)

def produce(self, topic, value=None, value_avro_schema=None, key=None, key_avro_schema=None, *args, **kwargs):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, this argument order is kind of interesting. With only one signature, it's annoying to support everything cleanly. The main thing that's unusual here is that key comes after value. I get the reason for this is so you can produce values without keyword arguments, but would it make sense to try parsing a *args list so you don't have to use keyword arguments to get the natural ordering when using both keys and values? Or does that get too messy (or maybe not possible to always distinguish the right mapping)?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I prefer required keyword arguments in this case (i.e. def produce(self, topic, *, value=None, value_avro_schema=None, etc.) It's hard to remember the argument order and easy to get it wrong (e.g. people may differ on whether they think key or value first is more natural).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@roopahc just pointed out that the parent (wrapped) class' produce() method has nearly an identical order:

http://docs.confluent.io/3.0.1/clients/confluent-kafka-python/index.html#confluent_kafka.Producer.produce

Seems like keeping it as-is actually matches best with what's already there...

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still think keyword args would be clearer. The order is similar except with schema params inserted. It's also not obvious whether the callback/on_delivery and partitioner args from producer will work as positional arguments after key_avro_schema and in what order they need to be or if they need to be keyword args. It's probably best to have these explicitly declared as keyword args to reduce confusion and mistakes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hah, it's possible we had this exact same discussion when reviewing the original Python API proposal and I have just completely forgotten about it...

Copy link
Contributor Author

@roopahc roopahc Sep 20, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have tried aligning the order of arguments with confluent_kafka.Producer().produce() method documentation.
http://docs.confluent.io/3.0.1/clients/confluent-kafka-python/index.html. It might be misleading if we change the order in AvroProducer class. let me know your views on the same.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point, I yield. :) I don't have a super strong opinion. Can you guys give an exact method header for what you think is best? My Python foo is weak. Is this it?

def produce(self, *args, **kwargs):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@matthayes mentioned required keyword args. It looks like this is only 3.0 compatible PEP-3102. Based on @ewencp's comment below, I still think:

produce(*args, **kwargs)

Is the best we can do right now.

self.subject_to_schema_versions = {}

# below classes were not hashable. hence defining them explicitely as a quick fix
schema.RecordSchema.__hash__ = self.hash_func
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do this once at the module level so it only happens on import?

url += '/' + subject

result, code = self._send_request(url)
if (code == 200):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other status codes should be valid here too

return self.encode_record_with_schema_id(schema_id, record)

# subject = topic + suffix
def encode_record_for_topic(self, topic, record, is_key=False):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like an odd use case -- it's basically assuming that the latest schema will be compatible with the data you pass in? Are you using this in practice?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is not being used anywhere. We can take it off .


HAS_FAST = False
try:
from fastavro.reader import read_data
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also get the fast writer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ewencp May be this can be considered as performance improvement and be part of separate PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, that's fine. Was just surprised to see reader but not writer :)

@@ -0,0 +1,42 @@
#!/usr/bin/env python
Copy link
Contributor

@ewencp ewencp Sep 15, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Module names shouldn't be capitalized (i.e. the file name should just be util.py, not Util.py. In fact, this is true for all the filenames under this module (CachedSchemaRegistryClient, AvroProducer, etc).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, try to use from confluent_kafka.avro.serializer import util leads to ImportError: cannot import name 'util'.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@blbradley Fixed this.


producer = Producer({'bootstrap.servers': 'mybroker,mybroker2'})

schemaRegistryUrl = 'https://<host>:<port>'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: these variables should use underscores instead of camel case

@criccomini
Copy link

criccomini commented Sep 15, 2016

I think the main thing that would need to be decided is the composition/delegation approach to the API vs inheriting from the Producer class

@roopahc initially had the class try to extend Producer. This didn't work because it's a C class, not a Python class underneath. I'm open to suggestions on how to fix this, but Google hasn't yielded anything. It appears you can't extend it. It gives an error (which I no longer have, sigh).

Edit: I see that @roopahc has pasted the proper stack trace above.

raise SerializerError("Avro schema required for key")

self._producer.produce(topic, value, key, *args, **kwargs)
def poll(self, timeout):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: newline above def poll


schema_registry_url = 'https://<host>:<port>'

avroProducer = AvroProducer(producer, schema_registry_url, value_schema=value_avro_schema, key_schema=key_avro_schema)
Copy link

@criccomini criccomini Sep 21, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either make the names default_value_schema and default_key_schema or switch to maps from topic to schema:

default_key_schemas=None, default_value_schemas=None

If you go with the latter approach, you'd get the default schema based on the topic name.

self._producer = producer
self._serializer = message_serializer
_cash_client= CachedSchemaRegistryClient(url=schema_registry_url)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: _cache_client

'''
def __init__(self, producer, schema_registry_url, key_schema = None, value_schema = None): # real signature unknown; restored from __doc__
self._producer = producer
_cash_client= CachedSchemaRegistryClient(url=schema_registry_url)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: _cache_client

Copy link

@criccomini criccomini Sep 21, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also add a space between t=

# get schemas from kwargs if defined
key_schema = kwargs.pop('key_schema', None)
value_schema = kwargs.pop('value_schema', None)
topic= kwargs.pop('topic', None)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: space

Copy link

@criccomini criccomini Sep 21, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Giving up on commenting on this, but you have a bunch of different space styles:

style= one
style = two
style=three

Please just use style = two everywhere except params. For params, use style=three. Please update everywhere.

Copy link

@criccomini criccomini left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, LGTM. Left a bunch of nits.

key_schema=self.key_schema

# if value_schema is not initialized, fall back on default value_schema passed as construction param.
if value_schema is None:
Copy link

@criccomini criccomini Sep 21, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if not value_schema:

if value_schema is None:
value_schema=self.value_schema

if value is not None:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if value:

value_schema=self.value_schema

if value is not None:
if value_schema is not None:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if value_schema

log.error("Schema required for value serialization")
raise SerializerError("Avro schema required for value")

if key is not None:
Copy link

@criccomini criccomini Sep 21, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if key:

raise SerializerError("Avro schema required for value")

if key is not None:
if key_schema is not None:
Copy link

@criccomini criccomini Sep 21, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if key_schema:

key_schema = kwargs.pop('key_schema', None)
value_schema = kwargs.pop('value_schema', None)
topic= kwargs.pop('topic', None)
if topic is None:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if not topic:

key= kwargs.pop('key', None)

# if key_schema is not initialized, fall back on default key_schema passed as construction param.
if key_schema is None:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if not key_schema:

@@ -85,6 +106,9 @@ Install
**Install from PyPi:**

$ pip install confluent-kafka

#for Avroproducer

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: space between # and f

value = {"name": "Value"}
key = {"name": "Key"}

producer = Producer({'bootstrap.servers': 'mybroker,mybroker2'})

schema_registry_url = 'https://<host>:<port>'

avroProducer = AvroProducer(producer, schema_registry_url, value_schema=value_avro_schema, key_schema=key_avro_schema)
avroProducer = AvroProducer(producer, schema_registry_url, value_schema=default_value_schema, key_schema=default_key_schema)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, I meant name the pram default_value_schema and default_key_schema, not the values you pass in.

avroProducer = AvroProducer(producer, schema_registry_url, default_value_schema=value_schema, default_key_schema=key_schema)

@criccomini
Copy link

@ewencp @matthayes look good?

@matthayes
Copy link

LGTM. I also tested the new code and it's working in my codebase, including parameters passed to produce.

@criccomini
Copy link

Awesome, thanks Matt! @ewencp we've also moved forward internally with this code. Could you or @edenhill please merge? When that's done, we can open follow-up issues, and start on consumer.

@criccomini
Copy link

ping

@blbradley
Copy link

Thoughts:

  • Schema generation for primitives should be included in the first patch. It would reduce the design changes required to implement it.
  • AvroProducer.produce requires keyword arguments and Producer.produce does not. These should be consistent.

Anything we can do to get this moving? I'd gladly do the work.

@criccomini
Copy link

@blbradley no one from Confluent is responding. We've been going back and forth on the produce() method for over a week. At the end of the day, we need to make forward progress.

Anything we can do to get this moving? I'd gladly do the work.

Go for it! :) We're starting to look at the consumer side. If you want to forge ahead with the producer, I say 👍 .

@criccomini
Copy link

@edenhill what work remains? Are you just referring to eliminating the need for a wrap vs extend?

@ewencp
Copy link
Contributor

ewencp commented Nov 9, 2016

@criccomini Mainly the wrap vs extend -- the concern is that folks pick this stuff up from master and, whether we think it is reasonable or not given an unreleased version, start depending on it and complain loudly when you change things. The only other concern I would have is if anything pops up in how we have to handle the deserialization for consumers (though I don't really envision that being an issue).

@criccomini
Copy link

I guess my counter-concern is just that this goes into a branch, diverges from master, and ends up in the dust bin after two months of work. We also have a consumer PR incoming today.

Do you or @edenhill have a suggestion about how to fix the wrap/extend issue?

@edenhill
Copy link
Contributor

edenhill commented Nov 9, 2016

Oh that certainly wont happen, despite the fact that we've been extremely slow on this we really want your Avro clients to be part of this project.

I'm currently working on making the C classes sub-classable and I will have a branch ready within a day that I'd like you try out.

@criccomini
Copy link

👍

@edenhill
Copy link
Contributor

edenhill commented Nov 9, 2016

PR #63 provides sub-classable Producer and Consumer classes, please try it out.

"""
Kafka Producer client which does avro schema encoding to messages.
Handles schema registration, Message serialization.

Constructor takes below parameters

@:param: producer: confluent_kafka.Producer object
@:param: schema_registry_url: Schema registry URL
@:param: config: dict object with config parameters.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mention schema.registry.url and that it is required.

@@ -1,6 +1,8 @@
import logging
import sys

from confluent_kafka.cimpl import Producer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure it should import from .cimpl directly and not from confluent.kafka?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self._producer = producer
self._serializer = MessageSerializer(CachedSchemaRegistryClient(url=schema_registry_url))
if ('schema.registry.url' not in config.keys()):
raise ClientError("Missing parameter: schema.registry.url")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might want to use a standard ValueError

@roopahc
Copy link
Contributor Author

roopahc commented Nov 14, 2016

@edenhill I am getting ImportError: dlopen(/Users/roopac/development/confluent-kafka-python/confluent_kafka/cimpl.so, 2): Symbol not found: __Py_ZeroStruct Possibly not related to my code change

@ewencp
Copy link
Contributor

ewencp commented Nov 14, 2016

@roopahc Are you possibly building against a different python devel package than you are running it with? Py_ZeroStruct exists in 2.7 but not in python 3+.

@roopahc
Copy link
Contributor Author

roopahc commented Nov 14, 2016

@ewencp Thanks.. tests pass in my system... Errors seem unrelated.

@edenhill
Copy link
Contributor

edenhill commented Nov 15, 2016

Since the tests on master are okay I think this failure is related to your code changes.
I see that avro is included unconditionally in __init__.py, is this correct?

The Travis installer does 'pip install .' to install the module, will that make it include or miss the avro extension?

@edenhill
Copy link
Contributor

There's a bunch of errors like these:
E IOError: [Errno 2] No such file or directory: 'basic_schema.avsc'

The py.test (or tox) runs from the top directory, so you'll need to reference the .avsc either by tests/ prefix or preferably by dirname()ing the script to get its directory.

I also feel it would be cleaner if the avro tests were in its own sub-directory, e.g., tests/avro, or avro/tests
py.test et.al are pretty good (too good) at finding tests so this shouldnt be a problem.

@edenhill
Copy link
Contributor

There's no docstring for the avro module

@edenhill
Copy link
Contributor

I updated docstring verification tests on master, so please rebase on latest master again as you fix the other issues. Thanks

@edenhill
Copy link
Contributor

@roopahc Check pip install --help, pip install just takes a single module, you need to split that up into multiple lines

@edenhill
Copy link
Contributor

Also, please fixup/squash some commits, e.g., the review-fix stuff commits should be joined with their parent commits:

 $ git rebase -i master
 # Mark "fixes-of-fixes" as "f" and move them to the line below their logical parent commit,
 # repeat as necessary

 # When happy with the results (no wip/fix/temporary-tests commit), do a force push:
 $ git push --dry-run --force <remote> <branch>

  # Looks okay? Drop --dry-run

@@ -7,6 +7,7 @@ before_install:
- pip install pytest-timeout
install:
- pip install -v --global-option=build_ext --global-option="-Itmp-build/include/" --global-option="-Ltmp-build/lib" .
- pip install --upgrade pip
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the .avro install requiring a newer pip version?
If so, we probably need to document what version is required.
And also move this to before_install prior to any other pip commands.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've found that a lot of older pips don't properly support extra_requires

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(We're just shooting in the dark)

@criccomini
Copy link

criccomini commented Nov 18, 2016

Tests are passing now! Please have a look.

Copy link
Contributor

@edenhill edenhill left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, just two questions

@@ -1,2 +1,2 @@
__all__ = ['cimpl','kafkatest']
__all__ = ['cimpl', 'avro', 'kafkatest']
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be conditional based on install type?
What happens if people dont install the avro submodule?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@edenhill This should not affect using the client normally, just with basic producer and consumer.
The only catch is you would not be able to use AvroProducer unless the dependencies are installed.

value_schema = kwargs.pop('value_schema', self._value_schema)
topic = kwargs.pop('topic', None)
if not topic:
log.error("Topic name not specified.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why both log and exception?

@roopahc
Copy link
Contributor Author

roopahc commented Nov 21, 2016

@edenhill @ewencp is it good for merging ? :)

raise ClientError("Topic name not specified.")
value = kwargs.pop('value', None)
key = kwargs.pop('key', None)
if value:
if value_schema:
value = self._serializer.encode_record_with_schema(topic, value_schema, value)
log.debug("Value encoded successfully!")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont think there is much value (pun!) to these log messages, they dont include any actual information, and there is an exception to counter-represent their coverage, so I think you should remove all the just-before-exception logs, and logs like this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just commented on one of these log occurences in the previous review, but all of them should be removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!

@edenhill
Copy link
Contributor

Yeah, I think we should just get rid of the excessive/redundant log messages.
after that we're good to go!

@roopahc
Copy link
Contributor Author

roopahc commented Nov 21, 2016

@edenhill done :)

@edenhill
Copy link
Contributor

LGTM!

@edenhill edenhill merged commit 406a2bd into confluentinc:master Nov 21, 2016
@edenhill
Copy link
Contributor

Finally!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants