From 24c3bdedef9d129bc3933172ebbd393e5caac825 Mon Sep 17 00:00:00 2001 From: Kipp Corman Date: Wed, 8 May 2024 18:43:11 -0500 Subject: [PATCH 1/3] add more caching to SR calls --- .../schema_registry/schema_registry_client.py | 91 +++++++++++++++++-- .../schema_registry/test_api_client.py | 2 +- 2 files changed, 86 insertions(+), 7 deletions(-) diff --git a/src/confluent_kafka/schema_registry/schema_registry_client.py b/src/confluent_kafka/schema_registry/schema_registry_client.py index c414c7ee1..c996c3da8 100644 --- a/src/confluent_kafka/schema_registry/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/schema_registry_client.py @@ -207,9 +207,6 @@ def set(self, schema_id, schema, subject_name=None): schema (Schema): Schema instance subject_name(str): Optional, subject schema is registered under - - Returns: - int: The schema_id """ with self.lock: @@ -229,7 +226,8 @@ def get_schema(self, schema_id): Schema: The schema if known; else None """ - return self.schema_id_index.get(schema_id, None) + with self.lock: + return self.schema_id_index.get(schema_id, None) def get_schema_id_by_subject(self, subject, schema): """ @@ -248,6 +246,72 @@ def get_schema_id_by_subject(self, subject, schema): if schema in self.subject_schemas[subject]: return self.schema_index.get(schema, None) +class _RegisteredSchemaCache(object): + """ + Thread-safe cache for use with the Schema Registry Client. + + This cache may be used to retrieve registered schemas based on subject_name/version/schema + - Get registered schema based on subject name + version + - Get registered schema based on subject name + schema + """ + + def __init__(self): + self.lock = Lock() + self.schema_version_index = defaultdict(dict) + self.schema_index = defaultdict(dict) + + def set(self, subject_name, schema, version, registered_schema): + """ + Add a Schema identified by schema_id to the cache. + + Args: + subject_name (str): The subject name this registered schema is associated with + + schema (Schema): The schema this registered schema is associated with + + version (int): The version this registered schema is associated with + + registered_schema (RegisteredSchema): The registered schema instance + """ + + with self.lock: + if schema is not None: + self.schema_index[subject_name][schema] = registered_schema + elif version is not None: + self.schema_version_index[subject_name][version] = registered_schema + + def get_registered_schema_by_version(self, subject_name, version): + """ + Get the registered schema instance associated with version from the cache. + + Args: + subject_name (str): The subject name this registered schema is associated with + + version (int): The version this registered schema is associated with + + Returns: + RegisteredSchema: The registered schema if known; else None + """ + + with self.lock: + return self.schema_version_index.get(subject_name, {}).get(version, None) + + def get_registered_schema_by_schema(self, subject_name, schema): + """ + Get the registered schema instance associated with schema from the cache. + + Args: + subject_name (str): The subject name this registered schema is associated with + + schema (Schema): The schema this registered schema is associated with + + Returns: + RegisteredSchema: The registered schema if known; else None + """ + + with self.lock: + return self.schema_index.get(subject_name, {}).get(schema, None) + class SchemaRegistryClient(object): """ @@ -292,6 +356,7 @@ class SchemaRegistryClient(object): def __init__(self, conf): self._rest_client = _RestClient(conf) self._cache = _SchemaCache() + self._metadata_cache = _RegisteredSchemaCache() def __enter__(self): return self @@ -398,6 +463,10 @@ def lookup_schema(self, subject_name, schema, normalize_schemas=False): `POST Subject API Reference `_ """ # noqa: E501 + registered_schema = self._metadata_cache.get_registered_schema_by_schema(subject_name, schema) + if registered_schema is not None: + return registered_schema + request = {'schema': schema.schema_str} # CP 5.5 adds new fields (for JSON and Protobuf). @@ -414,7 +483,7 @@ def lookup_schema(self, subject_name, schema, normalize_schemas=False): schema_type = response.get('schemaType', 'AVRO') - return RegisteredSchema(schema_id=response['id'], + registered_schema = RegisteredSchema(schema_id=response['id'], schema=Schema(response['schema'], schema_type, [ @@ -425,6 +494,9 @@ def lookup_schema(self, subject_name, schema, normalize_schemas=False): ]), subject=response['subject'], version=response['version']) + self._metadata_cache.set(subject_name, schema, None, registered_schema) + + return registered_schema def get_subjects(self): """ @@ -524,12 +596,16 @@ def get_version(self, subject_name, version): `GET Subject Version API Reference `_ """ # noqa: E501 + registered_schema = self._metadata_cache.get_registered_schema_by_version(subject_name, version) + if registered_schema is not None: + return registered_schema + response = self._rest_client.get('subjects/{}/versions/{}' .format(_urlencode(subject_name), version)) schema_type = response.get('schemaType', 'AVRO') - return RegisteredSchema(schema_id=response['id'], + registered_schema = RegisteredSchema(schema_id=response['id'], schema=Schema(response['schema'], schema_type, [ @@ -540,6 +616,9 @@ def get_version(self, subject_name, version): ]), subject=response['subject'], version=response['version']) + self._metadata_cache.set(subject_name, None, version, registered_schema) + + return registered_schema def get_versions(self, subject_name): """ diff --git a/tests/integration/schema_registry/test_api_client.py b/tests/integration/schema_registry/test_api_client.py index 6b75e3b52..a51167593 100644 --- a/tests/integration/schema_registry/test_api_client.py +++ b/tests/integration/schema_registry/test_api_client.py @@ -21,7 +21,7 @@ from confluent_kafka.schema_registry import Schema from confluent_kafka.schema_registry.error import SchemaRegistryError -from tests.integration.conftest import kafka_cluster_fixture +from ..conftest import kafka_cluster_fixture @pytest.fixture(scope="module") From a0282d837e1c7ca861945893299e0b5220942e73 Mon Sep 17 00:00:00 2001 From: Kipp Corman Date: Wed, 8 May 2024 18:44:29 -0500 Subject: [PATCH 2/3] oops --- tests/integration/schema_registry/test_api_client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/schema_registry/test_api_client.py b/tests/integration/schema_registry/test_api_client.py index a51167593..6b75e3b52 100644 --- a/tests/integration/schema_registry/test_api_client.py +++ b/tests/integration/schema_registry/test_api_client.py @@ -21,7 +21,7 @@ from confluent_kafka.schema_registry import Schema from confluent_kafka.schema_registry.error import SchemaRegistryError -from ..conftest import kafka_cluster_fixture +from tests.integration.conftest import kafka_cluster_fixture @pytest.fixture(scope="module") From 24e2e3be195194e9b22fb46ddff0993f15c03fb0 Mon Sep 17 00:00:00 2001 From: Kipp Corman Date: Thu, 9 May 2024 11:39:26 -0500 Subject: [PATCH 3/3] fix flake8 --- .../schema_registry/schema_registry_client.py | 55 +++++++++++-------- tools/source-package-verification.sh | 0 2 files changed, 33 insertions(+), 22 deletions(-) mode change 100644 => 100755 tools/source-package-verification.sh diff --git a/src/confluent_kafka/schema_registry/schema_registry_client.py b/src/confluent_kafka/schema_registry/schema_registry_client.py index c996c3da8..06ca30d0e 100644 --- a/src/confluent_kafka/schema_registry/schema_registry_client.py +++ b/src/confluent_kafka/schema_registry/schema_registry_client.py @@ -246,6 +246,7 @@ def get_schema_id_by_subject(self, subject, schema): if schema in self.subject_schemas[subject]: return self.schema_index.get(schema, None) + class _RegisteredSchemaCache(object): """ Thread-safe cache for use with the Schema Registry Client. @@ -483,17 +484,22 @@ def lookup_schema(self, subject_name, schema, normalize_schemas=False): schema_type = response.get('schemaType', 'AVRO') - registered_schema = RegisteredSchema(schema_id=response['id'], - schema=Schema(response['schema'], - schema_type, - [ - SchemaReference(name=ref['name'], - subject=ref['subject'], - version=ref['version']) - for ref in response.get('references', []) - ]), - subject=response['subject'], - version=response['version']) + registered_schema = RegisteredSchema( + schema_id=response['id'], + schema=Schema( + response['schema'], + schema_type, + [ + SchemaReference( + name=ref['name'], + subject=ref['subject'], + version=ref['version'] + ) for ref in response.get('references', []) + ] + ), + subject=response['subject'], + version=response['version'] + ) self._metadata_cache.set(subject_name, schema, None, registered_schema) return registered_schema @@ -605,17 +611,22 @@ def get_version(self, subject_name, version): version)) schema_type = response.get('schemaType', 'AVRO') - registered_schema = RegisteredSchema(schema_id=response['id'], - schema=Schema(response['schema'], - schema_type, - [ - SchemaReference(name=ref['name'], - subject=ref['subject'], - version=ref['version']) - for ref in response.get('references', []) - ]), - subject=response['subject'], - version=response['version']) + registered_schema = RegisteredSchema( + schema_id=response['id'], + schema=Schema( + response['schema'], + schema_type, + [ + SchemaReference( + name=ref['name'], + subject=ref['subject'], + version=ref['version'] + ) for ref in response.get('references', []) + ] + ), + subject=response['subject'], + version=response['version'] + ) self._metadata_cache.set(subject_name, None, version, registered_schema) return registered_schema diff --git a/tools/source-package-verification.sh b/tools/source-package-verification.sh old mode 100644 new mode 100755