diff --git a/pulsar/__init__.py b/pulsar/__init__.py index d3b081d..92cf10f 100644 --- a/pulsar/__init__.py +++ b/pulsar/__init__.py @@ -440,6 +440,26 @@ def policy(self): """ return self._policy +class CryptoKeyReader: + """ + Default crypto key reader implementation + """ + def __init__(self, public_key_path, private_key_path): + """ + Create crypto key reader. + + Parameters + ---------- + + public_key_path: str + Path to the public key + private_key_path: str + Path to private key + """ + _check_type(str, public_key_path, 'public_key_path') + _check_type(str, private_key_path, 'private_key_path') + self.cryptoKeyReader = _pulsar.CryptoKeyReader(public_key_path, private_key_path) + class Client: """ The Pulsar client. A single client instance can be used to create producers @@ -574,7 +594,7 @@ def create_producer(self, topic, schema=schema.BytesSchema(), initial_sequence_id=None, send_timeout_millis=30000, - compression_type=CompressionType.NONE, + compression_type: CompressionType = CompressionType.NONE, max_pending_messages=1000, max_pending_messages_across_partitions=50000, block_if_queue_full=False, @@ -583,13 +603,13 @@ def create_producer(self, topic, batching_max_allowed_size_in_bytes=128*1024, batching_max_publish_delay_ms=10, chunking_enabled=False, - message_routing_mode=PartitionsRoutingMode.RoundRobinDistribution, + message_routing_mode: PartitionsRoutingMode = PartitionsRoutingMode.RoundRobinDistribution, lazy_start_partitioned_producers=False, properties=None, - batching_type=BatchingType.Default, + batching_type: BatchingType = BatchingType.Default, encryption_key=None, - crypto_key_reader=None, - access_mode=ProducerAccessMode.Shared, + crypto_key_reader: CryptoKeyReader = None, + access_mode: ProducerAccessMode = ProducerAccessMode.Shared, ): """ Create a new producer on a given topic. @@ -752,7 +772,7 @@ def create_producer(self, topic, return p def subscribe(self, topic, subscription_name, - consumer_type=ConsumerType.Exclusive, + consumer_type: ConsumerType = ConsumerType.Exclusive, schema=schema.BytesSchema(), message_listener=None, receiver_queue_size=1000, @@ -764,8 +784,8 @@ def subscribe(self, topic, subscription_name, is_read_compacted=False, properties=None, pattern_auto_discovery_period=60, - initial_position=InitialPosition.Latest, - crypto_key_reader=None, + initial_position: InitialPosition = InitialPosition.Latest, + crypto_key_reader: CryptoKeyReader = None, replicate_subscription_state_enabled=False, max_pending_chunked_message=10, auto_ack_oldest_chunked_message_on_queue_full=False, @@ -773,7 +793,7 @@ def subscribe(self, topic, subscription_name, batch_receive_policy=None, key_shared_policy=None, batch_index_ack_enabled=False, - regex_subscription_mode=RegexSubscriptionMode.PersistentOnly, + regex_subscription_mode: RegexSubscriptionMode = RegexSubscriptionMode.PersistentOnly, dead_letter_policy: ConsumerDeadLetterPolicy = None, ): """ @@ -966,7 +986,7 @@ def create_reader(self, topic, start_message_id, reader_name=None, subscription_role_prefix=None, is_read_compacted=False, - crypto_key_reader=None, + crypto_key_reader: CryptoKeyReader = None, start_message_id_inclusive=False ): """ @@ -1713,27 +1733,6 @@ def is_connected(self): return self._reader.is_connected() -class CryptoKeyReader: - """ - Default crypto key reader implementation - """ - def __init__(self, public_key_path, private_key_path): - """ - Create crypto key reader. - - Parameters - ---------- - - public_key_path: str - Path to the public key - private_key_path: str - Path to private key - """ - _check_type(str, public_key_path, 'public_key_path') - _check_type(str, private_key_path, 'private_key_path') - self.cryptoKeyReader = _pulsar.CryptoKeyReader(public_key_path, private_key_path) - - class ConsoleLogger: """ Logger that writes on standard output