From bc00e6e225b38d0cdba26cdc64d0060640b0291e Mon Sep 17 00:00:00 2001 From: Frank Kaufer Date: Fri, 12 Oct 2018 21:55:19 +0200 Subject: [PATCH 01/16] avro consumer with explicitly specified read schema --- confluent_kafka/avro/__init__.py | 9 +- .../avro/serializer/message_serializer.py | 9 +- examples/integration_test.py | 104 +++++++++++++++++- tests/avro/incremented_read_test_schema.avsc | 9 ++ tests/avro/read_test_schema.avsc | 7 ++ 5 files changed, 129 insertions(+), 9 deletions(-) create mode 100644 tests/avro/incremented_read_test_schema.avsc create mode 100644 tests/avro/read_test_schema.avsc diff --git a/confluent_kafka/avro/__init__.py b/confluent_kafka/avro/__init__.py index e23367c90..dd250c9f3 100644 --- a/confluent_kafka/avro/__init__.py +++ b/confluent_kafka/avro/__init__.py @@ -98,10 +98,11 @@ class AvroConsumer(Consumer): Constructor takes below parameters :param dict config: Config parameters containing url for schema registry (``schema.registry.url``) - and the standard Kafka client configuration (``bootstrap.servers`` et.al). + and the standard Kafka client configuration (``bootstrap.servers`` et.al) + :param optional a read schema for the messages """ - def __init__(self, config, schema_registry=None): - + def __init__(self, config, schema_registry=None, read_schema=None): + schema_registry_url = config.pop("schema.registry.url", None) schema_registry_ca_location = config.pop("schema.registry.ssl.ca.location", None) schema_registry_certificate_location = config.pop("schema.registry.ssl.certificate.location", None) @@ -119,7 +120,7 @@ def __init__(self, config, schema_registry=None): raise ValueError("Cannot pass schema_registry along with schema.registry.url config") super(AvroConsumer, self).__init__(config) - self._serializer = MessageSerializer(schema_registry) + self._serializer = MessageSerializer(schema_registry, read_schema) def poll(self, timeout=None): """ diff --git a/confluent_kafka/avro/serializer/message_serializer.py b/confluent_kafka/avro/serializer/message_serializer.py index 13cfc6306..7c44ae9b6 100644 --- a/confluent_kafka/avro/serializer/message_serializer.py +++ b/confluent_kafka/avro/serializer/message_serializer.py @@ -68,10 +68,11 @@ class MessageSerializer(object): All decode_* methods expect a buffer received from kafka. """ - def __init__(self, registry_client): + def __init__(self, registry_client, read_schema=None): self.registry_client = registry_client self.id_to_decoder_func = {} self.id_to_writers = {} + self.read_schema = read_schema ''' @@ -169,6 +170,7 @@ def _get_decoder_func(self, schema_id, payload): # try to use fast avro try: schema_dict = schema.to_json() + reader_schema_dict = schema.to_json() schemaless_reader(payload, schema_dict) # If we reach this point, this means we have fastavro and it can @@ -177,7 +179,8 @@ def _get_decoder_func(self, schema_id, payload): # normal path. payload.seek(curr_pos) - self.id_to_decoder_func[schema_id] = lambda p: schemaless_reader(p, schema_dict) + self.id_to_decoder_func[schema_id] = lambda p: schemaless_reader(p, schema_dict, + reader_schema=reader_schema_dict) return self.id_to_decoder_func[schema_id] except Exception: # Fast avro failed, fall thru to standard avro below. @@ -186,7 +189,7 @@ def _get_decoder_func(self, schema_id, payload): # here means we should just delegate to slow avro # rewind payload.seek(curr_pos) - avro_reader = avro.io.DatumReader(schema) + avro_reader = avro.io.DatumReader(writers_schema=schema, readers_schema=self.read_schema) def decoder(p): bin_decoder = avro.io.BinaryDecoder(p) diff --git a/examples/integration_test.py b/examples/integration_test.py index 3ed4fe721..1b129209c 100755 --- a/examples/integration_test.py +++ b/examples/integration_test.py @@ -1202,9 +1202,105 @@ def verify_config(expconfig, configs): print("Topic {} marked for deletion".format(our_topic)) -# Exclude throttle since from default list -default_modes = ['consumer', 'producer', 'avro', 'performance', 'admin'] +def verify_explicit_read(): + """ verify that the explicit reading schema works""" + from confluent_kafka import avro + avsc_dir = os.path.join(os.path.dirname(__file__), os.pardir, 'tests', 'avro') + + # Producer config + conf = {'bootstrap.servers': bootstrap_servers, + 'error_cb': error_cb, + 'api.version.request': api_version_request, + 'default.topic.config': {'produce.offset.report': True}} + + # Create producer + if schema_registry_url: + conf['schema.registry.url'] = schema_registry_url + p = avro.AvroProducer(conf) + else: + p = avro.AvroProducer(conf, schema_registry=InMemorySchemaRegistry()) + + key_schema = avro.load(os.path.join(avsc_dir, "primitive_float.avsc")) + schema1 = avro.load(os.path.join(avsc_dir, "read_test_schema.avsc")) + schema2 = avro.load(os.path.join(avsc_dir, "incremented_read_test_schema.avsc")) + float_value = 32. + val = { + "name": "abc", + "favorite_number": 42, + "favorite_colo": "orange" + } + val1 = { + "name": "abc" + } + + combinations = [ + dict(value=val, value_schema=schema2, key=float_value, key_schema=key_schema, + read_schema=schema1), + dict(value=val1, value_schema=schema1, key=float_value, key_schema=key_schema, + read_schema=schema2), + ] + + # Consumer config + cons_conf = {'bootstrap.servers': bootstrap_servers, + 'group.id': 'test.py', + 'session.timeout.ms': 6000, + 'enable.auto.commit': False, + 'api.version.request': api_version_request, + 'on_commit': print_commit_result, + 'error_cb': error_cb, + 'default.topic.config': { + 'auto.offset.reset': 'earliest' + }} + + combo['topic'] = str(uuid.uuid4()) + p.produce(**combo) + p.poll(0) + p.flush() + + # Create consumer + conf = copy(cons_conf) + if schema_registry_url: + conf['schema.registry.url'] = schema_registry_url + c = avro.AvroConsumer(conf, read_schema=read_schema) + else: + c = avro.AvroConsumer(conf, schema_registry=InMemorySchemaRegistry(), read_schema=read_schema) + + c.subscribe([combo['topic']]) + + while True: + msg = c.poll(0) + if msg is None: + continue + + if msg.error(): + if msg.error().code() == confluent_kafka.KafkaError._PARTITION_EOF: + break + else: + continue + + tstype, timestamp = msg.timestamp() + print('%s[%d]@%d: key=%s, value=%s, tstype=%d, timestamp=%s' % + (msg.topic(), msg.partition(), msg.offset(), + msg.key(), msg.value(), tstype, timestamp)) + + # omit empty Avro fields from payload for comparison + record_key = msg.key() + record_value = msg.value() + if isinstance(msg.key(), dict): + record_key = {k: v for k, v in msg.key().items() if v is not None} + + if isinstance(msg.value(), dict): + record_value = {k: v for k, v in msg.value().items() if v is not None} + + assert combo.get('key') == record_key + c.commit(msg, asynchronous=False) + # Close consumer + c.close() + pass + +default_modes = ['consumer', 'producer', 'avro', 'performance', 'admin', 'explicit-read'] all_modes = default_modes + ['throttle', 'avro-https', 'none'] + """All test modes""" @@ -1323,6 +1419,10 @@ def resolve_envs(_conf): print('=' * 30, 'Verifying Admin API', '=' * 30) verify_admin() + if 'explicit-read' in modes: + print('=' * 30, 'Verifying Explicit Reading Schema', '=' * 30) + verify_explicit_read() + print('=' * 30, 'Done', '=' * 30) if with_pympler: diff --git a/tests/avro/incremented_read_test_schema.avsc b/tests/avro/incremented_read_test_schema.avsc new file mode 100644 index 000000000..b68e6fd52 --- /dev/null +++ b/tests/avro/incremented_read_test_schema.avsc @@ -0,0 +1,9 @@ +{ + "type": "record", + "name": "User", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "favorite_number", "type": ["null", "int"], "default": null}, + {"name": "favorite_color", "type": ["null", "string"], "default": null} + ] +} \ No newline at end of file diff --git a/tests/avro/read_test_schema.avsc b/tests/avro/read_test_schema.avsc new file mode 100644 index 000000000..67c939cbe --- /dev/null +++ b/tests/avro/read_test_schema.avsc @@ -0,0 +1,7 @@ +{ + "type": "record", + "name": "UserKey", + "fields": [ + {"name": "name", "type": "string"} + ] +} \ No newline at end of file From e8036317c78b2e5dd0c11da1dbe78f6832ce299a Mon Sep 17 00:00:00 2001 From: Frank Kaufer Date: Fri, 12 Oct 2018 22:16:22 +0200 Subject: [PATCH 02/16] unexpected indention/whitespace removed --- examples/integration_test.py | 1 + 1 file changed, 1 insertion(+) diff --git a/examples/integration_test.py b/examples/integration_test.py index 1b129209c..7916e43a4 100755 --- a/examples/integration_test.py +++ b/examples/integration_test.py @@ -1298,6 +1298,7 @@ def verify_explicit_read(): c.close() pass + default_modes = ['consumer', 'producer', 'avro', 'performance', 'admin', 'explicit-read'] all_modes = default_modes + ['throttle', 'avro-https', 'none'] From 432a936e1b4bc076ed321cec3ac1ee8a2cc5b748 Mon Sep 17 00:00:00 2001 From: Frank Kaufer Date: Fri, 12 Oct 2018 22:16:22 +0200 Subject: [PATCH 03/16] unexpected indention/whitespace removed --- confluent_kafka/avro/__init__.py | 2 +- examples/integration_test.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/confluent_kafka/avro/__init__.py b/confluent_kafka/avro/__init__.py index dd250c9f3..5579f2a30 100644 --- a/confluent_kafka/avro/__init__.py +++ b/confluent_kafka/avro/__init__.py @@ -102,7 +102,7 @@ class AvroConsumer(Consumer): :param optional a read schema for the messages """ def __init__(self, config, schema_registry=None, read_schema=None): - + schema_registry_url = config.pop("schema.registry.url", None) schema_registry_ca_location = config.pop("schema.registry.ssl.ca.location", None) schema_registry_certificate_location = config.pop("schema.registry.ssl.certificate.location", None) diff --git a/examples/integration_test.py b/examples/integration_test.py index 1b129209c..7916e43a4 100755 --- a/examples/integration_test.py +++ b/examples/integration_test.py @@ -1298,6 +1298,7 @@ def verify_explicit_read(): c.close() pass + default_modes = ['consumer', 'producer', 'avro', 'performance', 'admin', 'explicit-read'] all_modes = default_modes + ['throttle', 'avro-https', 'none'] From 74c7ad9f8a650fc01109e233f29b0dca0c9b341d Mon Sep 17 00:00:00 2001 From: Frank Kaufer Date: Fri, 12 Oct 2018 22:34:18 +0200 Subject: [PATCH 04/16] fixed accidentially deleted lines --- examples/integration_test.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/examples/integration_test.py b/examples/integration_test.py index 7916e43a4..d8ec4c1a7 100755 --- a/examples/integration_test.py +++ b/examples/integration_test.py @@ -1252,6 +1252,8 @@ def verify_explicit_read(): 'auto.offset.reset': 'earliest' }} + for i, combo in enumerate(combinations): + read_schema = combo.pop("read_schema") combo['topic'] = str(uuid.uuid4()) p.produce(**combo) p.poll(0) @@ -1293,6 +1295,7 @@ def verify_explicit_read(): record_value = {k: v for k, v in msg.value().items() if v is not None} assert combo.get('key') == record_key + assert combo.get('value')['name'] == record_value['name'] c.commit(msg, asynchronous=False) # Close consumer c.close() From 10637a99b8a7f0f802fea65960eae725c0afeb4d Mon Sep 17 00:00:00 2001 From: Frank Kaufer Date: Thu, 18 Oct 2018 21:36:34 +0200 Subject: [PATCH 05/16] fix fastavro reader schema bound to writer schema --- confluent_kafka/avro/serializer/message_serializer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/confluent_kafka/avro/serializer/message_serializer.py b/confluent_kafka/avro/serializer/message_serializer.py index 7c44ae9b6..b76257808 100644 --- a/confluent_kafka/avro/serializer/message_serializer.py +++ b/confluent_kafka/avro/serializer/message_serializer.py @@ -170,7 +170,7 @@ def _get_decoder_func(self, schema_id, payload): # try to use fast avro try: schema_dict = schema.to_json() - reader_schema_dict = schema.to_json() + reader_schema_dict = self.read_schema.to_json() schemaless_reader(payload, schema_dict) # If we reach this point, this means we have fastavro and it can From 1f170fb2229b078568de5337c6d16fbbdc995459 Mon Sep 17 00:00:00 2001 From: Frank Kaufer Date: Thu, 18 Oct 2018 21:50:35 +0200 Subject: [PATCH 06/16] decoder_func writer_schema reader_schema naming consistency --- confluent_kafka/avro/serializer/message_serializer.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/confluent_kafka/avro/serializer/message_serializer.py b/confluent_kafka/avro/serializer/message_serializer.py index b76257808..5805360bb 100644 --- a/confluent_kafka/avro/serializer/message_serializer.py +++ b/confluent_kafka/avro/serializer/message_serializer.py @@ -169,9 +169,9 @@ def _get_decoder_func(self, schema_id, payload): if HAS_FAST: # try to use fast avro try: - schema_dict = schema.to_json() - reader_schema_dict = self.read_schema.to_json() - schemaless_reader(payload, schema_dict) + writer_schema = schema.to_json() + reader_schema = self.read_schema.to_json() + schemaless_reader(payload, writer_schema) # If we reach this point, this means we have fastavro and it can # do this deserialization. Rewind since this method just determines @@ -179,8 +179,8 @@ def _get_decoder_func(self, schema_id, payload): # normal path. payload.seek(curr_pos) - self.id_to_decoder_func[schema_id] = lambda p: schemaless_reader(p, schema_dict, - reader_schema=reader_schema_dict) + self.id_to_decoder_func[schema_id] = lambda p: schemaless_reader( + p, writer_schema, reader_schema) return self.id_to_decoder_func[schema_id] except Exception: # Fast avro failed, fall thru to standard avro below. From 2acbf4d6a4d8ac83439e8306f9dd07d351d45614 Mon Sep 17 00:00:00 2001 From: Frank Kaufer Date: Thu, 18 Oct 2018 22:32:06 +0200 Subject: [PATCH 07/16] allow differentiate key/value Avro read schema --- confluent_kafka/avro/__init__.py | 7 ++++--- .../avro/serializer/message_serializer.py | 19 ++++++++++++------- examples/integration_test.py | 13 +++++++++---- 3 files changed, 25 insertions(+), 14 deletions(-) diff --git a/confluent_kafka/avro/__init__.py b/confluent_kafka/avro/__init__.py index 5579f2a30..76336584c 100644 --- a/confluent_kafka/avro/__init__.py +++ b/confluent_kafka/avro/__init__.py @@ -99,9 +99,10 @@ class AvroConsumer(Consumer): :param dict config: Config parameters containing url for schema registry (``schema.registry.url``) and the standard Kafka client configuration (``bootstrap.servers`` et.al) - :param optional a read schema for the messages + :param optional a reader schema for the message key + :param optional a reader schema for the message value """ - def __init__(self, config, schema_registry=None, read_schema=None): + def __init__(self, config, schema_registry=None, reader_key_schema=None, reader_value_schema=None): schema_registry_url = config.pop("schema.registry.url", None) schema_registry_ca_location = config.pop("schema.registry.ssl.ca.location", None) @@ -120,7 +121,7 @@ def __init__(self, config, schema_registry=None, read_schema=None): raise ValueError("Cannot pass schema_registry along with schema.registry.url config") super(AvroConsumer, self).__init__(config) - self._serializer = MessageSerializer(schema_registry, read_schema) + self._serializer = MessageSerializer(schema_registry, reader_key_schema, reader_value_schema) def poll(self, timeout=None): """ diff --git a/confluent_kafka/avro/serializer/message_serializer.py b/confluent_kafka/avro/serializer/message_serializer.py index 5805360bb..c62671876 100644 --- a/confluent_kafka/avro/serializer/message_serializer.py +++ b/confluent_kafka/avro/serializer/message_serializer.py @@ -68,11 +68,12 @@ class MessageSerializer(object): All decode_* methods expect a buffer received from kafka. """ - def __init__(self, registry_client, read_schema=None): + def __init__(self, registry_client, reader_key_schema=None, reader_value_schema=None): self.registry_client = registry_client self.id_to_decoder_func = {} self.id_to_writers = {} - self.read_schema = read_schema + self.reader_key_schema = reader_key_schema + self.reader_value_schema = reader_value_schema ''' @@ -152,7 +153,7 @@ def encode_record_with_schema_id(self, schema_id, record, is_key=False): return outf.getvalue() # Decoder support - def _get_decoder_func(self, schema_id, payload): + def _get_decoder_func(self, schema_id, payload, is_key=False): if schema_id in self.id_to_decoder_func: return self.id_to_decoder_func[schema_id] @@ -166,11 +167,15 @@ def _get_decoder_func(self, schema_id, payload): raise SerializerError("unable to fetch schema with id %d" % (schema_id)) curr_pos = payload.tell() + + reader_schema_obj = self.reader_key_schema if is_key else self.reader_value_schema + + if HAS_FAST: # try to use fast avro try: writer_schema = schema.to_json() - reader_schema = self.read_schema.to_json() + reader_schema = reader_schema_obj.to_json() schemaless_reader(payload, writer_schema) # If we reach this point, this means we have fastavro and it can @@ -189,7 +194,7 @@ def _get_decoder_func(self, schema_id, payload): # here means we should just delegate to slow avro # rewind payload.seek(curr_pos) - avro_reader = avro.io.DatumReader(writers_schema=schema, readers_schema=self.read_schema) + avro_reader = avro.io.DatumReader(writers_schema=schema, readers_schema=reader_schema_obj) def decoder(p): bin_decoder = avro.io.BinaryDecoder(p) @@ -198,7 +203,7 @@ def decoder(p): self.id_to_decoder_func[schema_id] = decoder return self.id_to_decoder_func[schema_id] - def decode_message(self, message): + def decode_message(self, message, is_key=False): """ Decode a message from kafka that has been encoded for use with the schema registry. @@ -215,5 +220,5 @@ def decode_message(self, message): magic, schema_id = struct.unpack('>bI', payload.read(5)) if magic != MAGIC_BYTE: raise SerializerError("message does not start with magic byte") - decoder_func = self._get_decoder_func(schema_id, payload) + decoder_func = self._get_decoder_func(schema_id, payload, is_key) return decoder_func(payload) diff --git a/examples/integration_test.py b/examples/integration_test.py index d8ec4c1a7..de2484fd2 100755 --- a/examples/integration_test.py +++ b/examples/integration_test.py @@ -1235,9 +1235,9 @@ def verify_explicit_read(): combinations = [ dict(value=val, value_schema=schema2, key=float_value, key_schema=key_schema, - read_schema=schema1), + reader_value_schema=schema1, reader_key_schema=key_schema), dict(value=val1, value_schema=schema1, key=float_value, key_schema=key_schema, - read_schema=schema2), + reader_value_schema=schema2, reader_key_schema=key_schema), ] # Consumer config @@ -1263,9 +1263,14 @@ def verify_explicit_read(): conf = copy(cons_conf) if schema_registry_url: conf['schema.registry.url'] = schema_registry_url - c = avro.AvroConsumer(conf, read_schema=read_schema) + c = avro.AvroConsumer(conf, + reader_key_schema=reader_key_schema, + reader_value_schema=reader_value_schema) else: - c = avro.AvroConsumer(conf, schema_registry=InMemorySchemaRegistry(), read_schema=read_schema) + c = avro.AvroConsumer(conf, + schema_registry=InMemorySchemaRegistry(), + reader_key_schema=reader_key_schema, + reader_value_schema=reader_value_schema) c.subscribe([combo['topic']]) From e902f2fb6ba0812d25ca9adede5bfef0e7cd8cae Mon Sep 17 00:00:00 2001 From: Frank Kaufer Date: Thu, 18 Oct 2018 22:50:06 +0200 Subject: [PATCH 08/16] fixed explicit reader schema test with feature (using alias) --- tests/avro/incremented_read_test_schema.avsc | 3 ++- tests/avro/read_test_schema.avsc | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/avro/incremented_read_test_schema.avsc b/tests/avro/incremented_read_test_schema.avsc index b68e6fd52..78f2419f8 100644 --- a/tests/avro/incremented_read_test_schema.avsc +++ b/tests/avro/incremented_read_test_schema.avsc @@ -1,9 +1,10 @@ { "type": "record", "name": "User", + "aliases": ["UserKey"], "fields": [ {"name": "name", "type": "string"}, {"name": "favorite_number", "type": ["null", "int"], "default": null}, {"name": "favorite_color", "type": ["null", "string"], "default": null} ] -} \ No newline at end of file +} diff --git a/tests/avro/read_test_schema.avsc b/tests/avro/read_test_schema.avsc index 67c939cbe..e382e1b02 100644 --- a/tests/avro/read_test_schema.avsc +++ b/tests/avro/read_test_schema.avsc @@ -1,7 +1,8 @@ { "type": "record", "name": "UserKey", + "aliases": ["User"], "fields": [ {"name": "name", "type": "string"} ] -} \ No newline at end of file +} From cb3fca43777677eb1df5b05d12da3d0081080c01 Mon Sep 17 00:00:00 2001 From: Frank Kaufer Date: Thu, 18 Oct 2018 22:52:45 +0200 Subject: [PATCH 09/16] renamed avro schemas for explicit read tests --- examples/integration_test.py | 4 ++-- tests/avro/{read_test_schema.avsc => user_v1.avsc} | 0 .../avro/{incremented_read_test_schema.avsc => user_v2.avsc} | 0 3 files changed, 2 insertions(+), 2 deletions(-) rename tests/avro/{read_test_schema.avsc => user_v1.avsc} (100%) rename tests/avro/{incremented_read_test_schema.avsc => user_v2.avsc} (100%) diff --git a/examples/integration_test.py b/examples/integration_test.py index de2484fd2..87ea7a3b7 100755 --- a/examples/integration_test.py +++ b/examples/integration_test.py @@ -1221,8 +1221,8 @@ def verify_explicit_read(): p = avro.AvroProducer(conf, schema_registry=InMemorySchemaRegistry()) key_schema = avro.load(os.path.join(avsc_dir, "primitive_float.avsc")) - schema1 = avro.load(os.path.join(avsc_dir, "read_test_schema.avsc")) - schema2 = avro.load(os.path.join(avsc_dir, "incremented_read_test_schema.avsc")) + schema1 = avro.load(os.path.join(avsc_dir, "user_v1.avsc")) + schema2 = avro.load(os.path.join(avsc_dir, "user_v2.avsc")) float_value = 32. val = { "name": "abc", diff --git a/tests/avro/read_test_schema.avsc b/tests/avro/user_v1.avsc similarity index 100% rename from tests/avro/read_test_schema.avsc rename to tests/avro/user_v1.avsc diff --git a/tests/avro/incremented_read_test_schema.avsc b/tests/avro/user_v2.avsc similarity index 100% rename from tests/avro/incremented_read_test_schema.avsc rename to tests/avro/user_v2.avsc From 529218f537039ad3f922ff1f4fd9c79f7257b853 Mon Sep 17 00:00:00 2001 From: Frank Kaufer Date: Thu, 18 Oct 2018 23:08:08 +0200 Subject: [PATCH 10/16] removed blank lines --- confluent_kafka/avro/serializer/message_serializer.py | 1 - 1 file changed, 1 deletion(-) diff --git a/confluent_kafka/avro/serializer/message_serializer.py b/confluent_kafka/avro/serializer/message_serializer.py index c62671876..bdfeff35d 100644 --- a/confluent_kafka/avro/serializer/message_serializer.py +++ b/confluent_kafka/avro/serializer/message_serializer.py @@ -170,7 +170,6 @@ def _get_decoder_func(self, schema_id, payload, is_key=False): reader_schema_obj = self.reader_key_schema if is_key else self.reader_value_schema - if HAS_FAST: # try to use fast avro try: From 65c7ca25aefb101e1073d9f9fd66ccdbb514115a Mon Sep 17 00:00:00 2001 From: Frank Kaufer Date: Thu, 18 Oct 2018 23:11:46 +0200 Subject: [PATCH 11/16] fix reader key/value schema test config --- examples/integration_test.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/examples/integration_test.py b/examples/integration_test.py index 87ea7a3b7..46cd0adfd 100755 --- a/examples/integration_test.py +++ b/examples/integration_test.py @@ -1253,7 +1253,8 @@ def verify_explicit_read(): }} for i, combo in enumerate(combinations): - read_schema = combo.pop("read_schema") + reader_key_schema = combo.pop("reader_key_schema") + reader_value_schema = combo.pop("reader_value_schema") combo['topic'] = str(uuid.uuid4()) p.produce(**combo) p.poll(0) @@ -1263,11 +1264,13 @@ def verify_explicit_read(): conf = copy(cons_conf) if schema_registry_url: conf['schema.registry.url'] = schema_registry_url - c = avro.AvroConsumer(conf, - reader_key_schema=reader_key_schema, - reader_value_schema=reader_value_schema) + c = avro.AvroConsumer( + conf, + reader_key_schema=reader_key_schema, + reader_value_schema=reader_value_schema) else: - c = avro.AvroConsumer(conf, + c = avro.AvroConsumer( + conf, schema_registry=InMemorySchemaRegistry(), reader_key_schema=reader_key_schema, reader_value_schema=reader_value_schema) From 33d598070af6688c9cb37a5cc670c5938d0099d3 Mon Sep 17 00:00:00 2001 From: Frank Kaufer Date: Fri, 19 Oct 2018 00:05:19 +0200 Subject: [PATCH 12/16] fix decode_message - added is_key param --- confluent_kafka/avro/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/confluent_kafka/avro/__init__.py b/confluent_kafka/avro/__init__.py index 76336584c..06c439fc8 100644 --- a/confluent_kafka/avro/__init__.py +++ b/confluent_kafka/avro/__init__.py @@ -141,9 +141,9 @@ def poll(self, timeout=None): return message if not message.error(): if message.value() is not None: - decoded_value = self._serializer.decode_message(message.value()) + decoded_value = self._serializer.decode_message(message.value(), is_key=False) message.set_value(decoded_value) if message.key() is not None: - decoded_key = self._serializer.decode_message(message.key()) + decoded_key = self._serializer.decode_message(message.key(), is_key=True) message.set_key(decoded_key) return message From 3b69697a7500e3dcf888c801dfa7d4edf8ae6c0a Mon Sep 17 00:00:00 2001 From: Frank Kaufer Date: Fri, 19 Oct 2018 13:50:04 +0200 Subject: [PATCH 13/16] fixed slow avro DatumReader py2/py3 API inconsistency --- confluent_kafka/avro/serializer/message_serializer.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/confluent_kafka/avro/serializer/message_serializer.py b/confluent_kafka/avro/serializer/message_serializer.py index bdfeff35d..7c6afbc13 100644 --- a/confluent_kafka/avro/serializer/message_serializer.py +++ b/confluent_kafka/avro/serializer/message_serializer.py @@ -193,7 +193,13 @@ def _get_decoder_func(self, schema_id, payload, is_key=False): # here means we should just delegate to slow avro # rewind payload.seek(curr_pos) - avro_reader = avro.io.DatumReader(writers_schema=schema, readers_schema=reader_schema_obj) + # Avro DatumReader py2/py3 inconsistency, hence no param keywords + # should be revisited later + # https://github.com/apache/avro/blob/master/lang/py3/avro/io.py#L459 + # https://github.com/apache/avro/blob/master/lang/py/src/avro/io.py#L423 + # def __init__(self, writers_schema=None, readers_schema=None) + # def __init__(self, writer_schema=None, reader_schema=None) + avro_reader = avro.io.DatumReader(schema, reader_schema_obj) def decoder(p): bin_decoder = avro.io.BinaryDecoder(p) From 70453e8f1fdc5c2bd5e4f33db18c8b985a38b007 Mon Sep 17 00:00:00 2001 From: Frank Kaufer Date: Fri, 26 Oct 2018 19:52:48 +0200 Subject: [PATCH 14/16] writer_schema_obj reader_schema_obj naming consistency --- confluent_kafka/avro/serializer/message_serializer.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/confluent_kafka/avro/serializer/message_serializer.py b/confluent_kafka/avro/serializer/message_serializer.py index 7c6afbc13..085b78fb2 100644 --- a/confluent_kafka/avro/serializer/message_serializer.py +++ b/confluent_kafka/avro/serializer/message_serializer.py @@ -157,13 +157,13 @@ def _get_decoder_func(self, schema_id, payload, is_key=False): if schema_id in self.id_to_decoder_func: return self.id_to_decoder_func[schema_id] - # fetch from schema reg + # fetch writer schema from schema reg try: - schema = self.registry_client.get_by_id(schema_id) + writer_schema_obj = self.registry_client.get_by_id(schema_id) except ClientError as e: raise SerializerError("unable to fetch schema with id %d: %s" % (schema_id, str(e))) - if schema is None: + if writer_schema_obj is None: raise SerializerError("unable to fetch schema with id %d" % (schema_id)) curr_pos = payload.tell() @@ -173,7 +173,7 @@ def _get_decoder_func(self, schema_id, payload, is_key=False): if HAS_FAST: # try to use fast avro try: - writer_schema = schema.to_json() + writer_schema = writer_schema_obj.to_json() reader_schema = reader_schema_obj.to_json() schemaless_reader(payload, writer_schema) @@ -199,7 +199,7 @@ def _get_decoder_func(self, schema_id, payload, is_key=False): # https://github.com/apache/avro/blob/master/lang/py/src/avro/io.py#L423 # def __init__(self, writers_schema=None, readers_schema=None) # def __init__(self, writer_schema=None, reader_schema=None) - avro_reader = avro.io.DatumReader(schema, reader_schema_obj) + avro_reader = avro.io.DatumReader(writer_schema_obj, reader_schema_obj) def decoder(p): bin_decoder = avro.io.BinaryDecoder(p) From fcf903d35508d7e55e713b6fefa929d6b65a39a8 Mon Sep 17 00:00:00 2001 From: Frank Kaufer Date: Thu, 15 Nov 2018 20:20:57 +0100 Subject: [PATCH 15/16] removed poll before flush --- examples/integration_test.py | 1 - 1 file changed, 1 deletion(-) diff --git a/examples/integration_test.py b/examples/integration_test.py index 46cd0adfd..f4597bb07 100755 --- a/examples/integration_test.py +++ b/examples/integration_test.py @@ -328,7 +328,6 @@ def verify_avro(): for i, combo in enumerate(combinations): combo['topic'] = str(uuid.uuid4()) p.produce(**combo) - p.poll(0) p.flush() # Create consumer From c4ff376cdbfba41b08806df8e4a68d68f953b593 Mon Sep 17 00:00:00 2001 From: Frank Kaufer Date: Thu, 15 Nov 2018 20:35:21 +0100 Subject: [PATCH 16/16] verify avro eplicit reader schema integration test minor changes --- examples/integration_test.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/examples/integration_test.py b/examples/integration_test.py index f4597bb07..f83ada9ca 100755 --- a/examples/integration_test.py +++ b/examples/integration_test.py @@ -1201,8 +1201,8 @@ def verify_config(expconfig, configs): print("Topic {} marked for deletion".format(our_topic)) -def verify_explicit_read(): - """ verify that the explicit reading schema works""" +def verify_avro_explicit_read_schema(): + """ verify that reading Avro with explicit reader schema works""" from confluent_kafka import avro avsc_dir = os.path.join(os.path.dirname(__file__), os.pardir, 'tests', 'avro') @@ -1309,7 +1309,7 @@ def verify_explicit_read(): pass -default_modes = ['consumer', 'producer', 'avro', 'performance', 'admin', 'explicit-read'] +default_modes = ['consumer', 'producer', 'avro', 'performance', 'admin'] all_modes = default_modes + ['throttle', 'avro-https', 'none'] """All test modes""" @@ -1422,6 +1422,9 @@ def resolve_envs(_conf): print('=' * 30, 'Verifying AVRO', '=' * 30) verify_avro() + print('=' * 30, 'Verifying AVRO with explicit reader schema', '=' * 30) + verify_avro_explicit_read_schema() + if 'avro-https' in modes: print('=' * 30, 'Verifying AVRO with HTTPS', '=' * 30) verify_avro_https() @@ -1430,10 +1433,6 @@ def resolve_envs(_conf): print('=' * 30, 'Verifying Admin API', '=' * 30) verify_admin() - if 'explicit-read' in modes: - print('=' * 30, 'Verifying Explicit Reading Schema', '=' * 30) - verify_explicit_read() - print('=' * 30, 'Done', '=' * 30) if with_pympler: