diff --git a/confluent_kafka/avro/__init__.py b/confluent_kafka/avro/__init__.py index e23367c90..06c439fc8 100644 --- a/confluent_kafka/avro/__init__.py +++ b/confluent_kafka/avro/__init__.py @@ -98,9 +98,11 @@ class AvroConsumer(Consumer): Constructor takes below parameters :param dict config: Config parameters containing url for schema registry (``schema.registry.url``) - and the standard Kafka client configuration (``bootstrap.servers`` et.al). + and the standard Kafka client configuration (``bootstrap.servers`` et.al) + :param optional a reader schema for the message key + :param optional a reader schema for the message value """ - def __init__(self, config, schema_registry=None): + def __init__(self, config, schema_registry=None, reader_key_schema=None, reader_value_schema=None): schema_registry_url = config.pop("schema.registry.url", None) schema_registry_ca_location = config.pop("schema.registry.ssl.ca.location", None) @@ -119,7 +121,7 @@ def __init__(self, config, schema_registry=None): raise ValueError("Cannot pass schema_registry along with schema.registry.url config") super(AvroConsumer, self).__init__(config) - self._serializer = MessageSerializer(schema_registry) + self._serializer = MessageSerializer(schema_registry, reader_key_schema, reader_value_schema) def poll(self, timeout=None): """ @@ -139,9 +141,9 @@ def poll(self, timeout=None): return message if not message.error(): if message.value() is not None: - decoded_value = self._serializer.decode_message(message.value()) + decoded_value = self._serializer.decode_message(message.value(), is_key=False) message.set_value(decoded_value) if message.key() is not None: - decoded_key = self._serializer.decode_message(message.key()) + decoded_key = self._serializer.decode_message(message.key(), is_key=True) message.set_key(decoded_key) return message diff --git a/confluent_kafka/avro/serializer/message_serializer.py b/confluent_kafka/avro/serializer/message_serializer.py index 13cfc6306..085b78fb2 100644 --- a/confluent_kafka/avro/serializer/message_serializer.py +++ b/confluent_kafka/avro/serializer/message_serializer.py @@ -68,10 +68,12 @@ class MessageSerializer(object): All decode_* methods expect a buffer received from kafka. """ - def __init__(self, registry_client): + def __init__(self, registry_client, reader_key_schema=None, reader_value_schema=None): self.registry_client = registry_client self.id_to_decoder_func = {} self.id_to_writers = {} + self.reader_key_schema = reader_key_schema + self.reader_value_schema = reader_value_schema ''' @@ -151,25 +153,29 @@ def encode_record_with_schema_id(self, schema_id, record, is_key=False): return outf.getvalue() # Decoder support - def _get_decoder_func(self, schema_id, payload): + def _get_decoder_func(self, schema_id, payload, is_key=False): if schema_id in self.id_to_decoder_func: return self.id_to_decoder_func[schema_id] - # fetch from schema reg + # fetch writer schema from schema reg try: - schema = self.registry_client.get_by_id(schema_id) + writer_schema_obj = self.registry_client.get_by_id(schema_id) except ClientError as e: raise SerializerError("unable to fetch schema with id %d: %s" % (schema_id, str(e))) - if schema is None: + if writer_schema_obj is None: raise SerializerError("unable to fetch schema with id %d" % (schema_id)) curr_pos = payload.tell() + + reader_schema_obj = self.reader_key_schema if is_key else self.reader_value_schema + if HAS_FAST: # try to use fast avro try: - schema_dict = schema.to_json() - schemaless_reader(payload, schema_dict) + writer_schema = writer_schema_obj.to_json() + reader_schema = reader_schema_obj.to_json() + schemaless_reader(payload, writer_schema) # If we reach this point, this means we have fastavro and it can # do this deserialization. Rewind since this method just determines @@ -177,7 +183,8 @@ def _get_decoder_func(self, schema_id, payload): # normal path. payload.seek(curr_pos) - self.id_to_decoder_func[schema_id] = lambda p: schemaless_reader(p, schema_dict) + self.id_to_decoder_func[schema_id] = lambda p: schemaless_reader( + p, writer_schema, reader_schema) return self.id_to_decoder_func[schema_id] except Exception: # Fast avro failed, fall thru to standard avro below. @@ -186,7 +193,13 @@ def _get_decoder_func(self, schema_id, payload): # here means we should just delegate to slow avro # rewind payload.seek(curr_pos) - avro_reader = avro.io.DatumReader(schema) + # Avro DatumReader py2/py3 inconsistency, hence no param keywords + # should be revisited later + # https://github.com/apache/avro/blob/master/lang/py3/avro/io.py#L459 + # https://github.com/apache/avro/blob/master/lang/py/src/avro/io.py#L423 + # def __init__(self, writers_schema=None, readers_schema=None) + # def __init__(self, writer_schema=None, reader_schema=None) + avro_reader = avro.io.DatumReader(writer_schema_obj, reader_schema_obj) def decoder(p): bin_decoder = avro.io.BinaryDecoder(p) @@ -195,7 +208,7 @@ def decoder(p): self.id_to_decoder_func[schema_id] = decoder return self.id_to_decoder_func[schema_id] - def decode_message(self, message): + def decode_message(self, message, is_key=False): """ Decode a message from kafka that has been encoded for use with the schema registry. @@ -212,5 +225,5 @@ def decode_message(self, message): magic, schema_id = struct.unpack('>bI', payload.read(5)) if magic != MAGIC_BYTE: raise SerializerError("message does not start with magic byte") - decoder_func = self._get_decoder_func(schema_id, payload) + decoder_func = self._get_decoder_func(schema_id, payload, is_key) return decoder_func(payload) diff --git a/examples/integration_test.py b/examples/integration_test.py index 3ed4fe721..f83ada9ca 100755 --- a/examples/integration_test.py +++ b/examples/integration_test.py @@ -328,7 +328,6 @@ def verify_avro(): for i, combo in enumerate(combinations): combo['topic'] = str(uuid.uuid4()) p.produce(**combo) - p.poll(0) p.flush() # Create consumer @@ -1202,9 +1201,117 @@ def verify_config(expconfig, configs): print("Topic {} marked for deletion".format(our_topic)) -# Exclude throttle since from default list +def verify_avro_explicit_read_schema(): + """ verify that reading Avro with explicit reader schema works""" + from confluent_kafka import avro + avsc_dir = os.path.join(os.path.dirname(__file__), os.pardir, 'tests', 'avro') + + # Producer config + conf = {'bootstrap.servers': bootstrap_servers, + 'error_cb': error_cb, + 'api.version.request': api_version_request, + 'default.topic.config': {'produce.offset.report': True}} + + # Create producer + if schema_registry_url: + conf['schema.registry.url'] = schema_registry_url + p = avro.AvroProducer(conf) + else: + p = avro.AvroProducer(conf, schema_registry=InMemorySchemaRegistry()) + + key_schema = avro.load(os.path.join(avsc_dir, "primitive_float.avsc")) + schema1 = avro.load(os.path.join(avsc_dir, "user_v1.avsc")) + schema2 = avro.load(os.path.join(avsc_dir, "user_v2.avsc")) + float_value = 32. + val = { + "name": "abc", + "favorite_number": 42, + "favorite_colo": "orange" + } + val1 = { + "name": "abc" + } + + combinations = [ + dict(value=val, value_schema=schema2, key=float_value, key_schema=key_schema, + reader_value_schema=schema1, reader_key_schema=key_schema), + dict(value=val1, value_schema=schema1, key=float_value, key_schema=key_schema, + reader_value_schema=schema2, reader_key_schema=key_schema), + ] + + # Consumer config + cons_conf = {'bootstrap.servers': bootstrap_servers, + 'group.id': 'test.py', + 'session.timeout.ms': 6000, + 'enable.auto.commit': False, + 'api.version.request': api_version_request, + 'on_commit': print_commit_result, + 'error_cb': error_cb, + 'default.topic.config': { + 'auto.offset.reset': 'earliest' + }} + + for i, combo in enumerate(combinations): + reader_key_schema = combo.pop("reader_key_schema") + reader_value_schema = combo.pop("reader_value_schema") + combo['topic'] = str(uuid.uuid4()) + p.produce(**combo) + p.poll(0) + p.flush() + + # Create consumer + conf = copy(cons_conf) + if schema_registry_url: + conf['schema.registry.url'] = schema_registry_url + c = avro.AvroConsumer( + conf, + reader_key_schema=reader_key_schema, + reader_value_schema=reader_value_schema) + else: + c = avro.AvroConsumer( + conf, + schema_registry=InMemorySchemaRegistry(), + reader_key_schema=reader_key_schema, + reader_value_schema=reader_value_schema) + + c.subscribe([combo['topic']]) + + while True: + msg = c.poll(0) + if msg is None: + continue + + if msg.error(): + if msg.error().code() == confluent_kafka.KafkaError._PARTITION_EOF: + break + else: + continue + + tstype, timestamp = msg.timestamp() + print('%s[%d]@%d: key=%s, value=%s, tstype=%d, timestamp=%s' % + (msg.topic(), msg.partition(), msg.offset(), + msg.key(), msg.value(), tstype, timestamp)) + + # omit empty Avro fields from payload for comparison + record_key = msg.key() + record_value = msg.value() + if isinstance(msg.key(), dict): + record_key = {k: v for k, v in msg.key().items() if v is not None} + + if isinstance(msg.value(), dict): + record_value = {k: v for k, v in msg.value().items() if v is not None} + + assert combo.get('key') == record_key + assert combo.get('value')['name'] == record_value['name'] + c.commit(msg, asynchronous=False) + # Close consumer + c.close() + pass + + default_modes = ['consumer', 'producer', 'avro', 'performance', 'admin'] all_modes = default_modes + ['throttle', 'avro-https', 'none'] + """All test modes""" @@ -1315,6 +1422,9 @@ def resolve_envs(_conf): print('=' * 30, 'Verifying AVRO', '=' * 30) verify_avro() + print('=' * 30, 'Verifying AVRO with explicit reader schema', '=' * 30) + verify_avro_explicit_read_schema() + if 'avro-https' in modes: print('=' * 30, 'Verifying AVRO with HTTPS', '=' * 30) verify_avro_https() diff --git a/tests/avro/user_v1.avsc b/tests/avro/user_v1.avsc new file mode 100644 index 000000000..e382e1b02 --- /dev/null +++ b/tests/avro/user_v1.avsc @@ -0,0 +1,8 @@ +{ + "type": "record", + "name": "UserKey", + "aliases": ["User"], + "fields": [ + {"name": "name", "type": "string"} + ] +} diff --git a/tests/avro/user_v2.avsc b/tests/avro/user_v2.avsc new file mode 100644 index 000000000..78f2419f8 --- /dev/null +++ b/tests/avro/user_v2.avsc @@ -0,0 +1,10 @@ +{ + "type": "record", + "name": "User", + "aliases": ["UserKey"], + "fields": [ + {"name": "name", "type": "string"}, + {"name": "favorite_number", "type": ["null", "int"], "default": null}, + {"name": "favorite_color", "type": ["null", "string"], "default": null} + ] +}