diff --git a/confluent_kafka/avro/serializer/message_serializer.py b/confluent_kafka/avro/serializer/message_serializer.py index 085b78fb2..90b6bcb03 100644 --- a/confluent_kafka/avro/serializer/message_serializer.py +++ b/confluent_kafka/avro/serializer/message_serializer.py @@ -39,7 +39,7 @@ HAS_FAST = False try: - from fastavro import schemaless_reader + from fastavro import schemaless_reader, schemaless_writer HAS_FAST = True except ImportError: @@ -75,9 +75,13 @@ def __init__(self, registry_client, reader_key_schema=None, reader_value_schema= self.reader_key_schema = reader_key_schema self.reader_value_schema = reader_value_schema - ''' - - ''' + # Encoder support + def _get_encoder_func(self, writer_schema): + if HAS_FAST: + schema = writer_schema.to_json() + return lambda record, fp: schemaless_writer(fp, schema, record) + writer = avro.io.DatumWriter(writer_schema) + return lambda record, fp: writer.write(record, avro.io.BinaryEncoder(fp)) def encode_record_with_schema(self, topic, schema, record, is_key=False): """ @@ -103,7 +107,7 @@ def encode_record_with_schema(self, topic, schema, record, is_key=False): raise serialize_err(message) # cache writer - self.id_to_writers[schema_id] = avro.io.DatumWriter(schema) + self.id_to_writers[schema_id] = self._get_encoder_func(schema) return self.encode_record_with_schema_id(schema_id, record, is_key=is_key) @@ -126,7 +130,7 @@ def encode_record_with_schema_id(self, schema_id, record, is_key=False): schema = self.registry_client.get_by_id(schema_id) if not schema: raise serialize_err("Schema does not exist") - self.id_to_writers[schema_id] = avro.io.DatumWriter(schema) + self.id_to_writers[schema_id] = self._get_encoder_func(schema) except ClientError: exc_type, exc_value, exc_traceback = sys.exc_info() raise serialize_err(repr(traceback.format_exception(exc_type, exc_value, exc_traceback))) @@ -134,21 +138,11 @@ def encode_record_with_schema_id(self, schema_id, record, is_key=False): # get the writer writer = self.id_to_writers[schema_id] with ContextStringIO() as outf: - # write the header - # magic byte - - outf.write(struct.pack('b', MAGIC_BYTE)) - - # write the schema ID in network byte order (big end) - - outf.write(struct.pack('>I', schema_id)) + # Write the magic byte and schema ID in network byte order (big endian) + outf.write(struct.pack('>bI', MAGIC_BYTE, schema_id)) - # write the record to the rest of it - # Create an encoder that we'll write to - encoder = avro.io.BinaryEncoder(outf) - # write the magic byte - # write the object in 'obj' as Avro to the fake file... - writer.write(record, encoder) + # write the record to the rest of the buffer + writer(record, outf) return outf.getvalue()