diff --git a/.semaphore/semaphore.yml b/.semaphore/semaphore.yml index aaae6aebc..53031ecaa 100644 --- a/.semaphore/semaphore.yml +++ b/.semaphore/semaphore.yml @@ -138,12 +138,20 @@ blocks: commands: - '[[ -z $DOCKERHUB_APIKEY ]] || docker login --username $DOCKERHUB_USER --password $DOCKERHUB_APIKEY' jobs: - - name: Build + - name: Build and Tests with 'classic' group protocol + commands: + - sem-version python 3.8 + # use a virtualenv + - python3 -m venv _venv && source _venv/bin/activate + - chmod u+r+x tools/source-package-verification.sh + - tools/source-package-verification.sh + - name: Build and Tests with 'consumer' group protocol commands: - sem-version python 3.8 # use a virtualenv - python3 -m venv _venv && source _venv/bin/activate - chmod u+r+x tools/source-package-verification.sh + - export TEST_CONSUMER_GROUP_PROTOCOL=consumer - tools/source-package-verification.sh - name: "Source package verification with Python 3 (Linux arm64)" dependencies: [] diff --git a/examples/adminapi.py b/examples/adminapi.py index 390aba030..afea5581d 100755 --- a/examples/adminapi.py +++ b/examples/adminapi.py @@ -18,8 +18,8 @@ # Example use of AdminClient operations. from confluent_kafka import (KafkaException, ConsumerGroupTopicPartitions, - TopicPartition, ConsumerGroupState, TopicCollection, - IsolationLevel) + TopicPartition, ConsumerGroupState, + TopicCollection, IsolationLevel) from confluent_kafka.admin import (AdminClient, NewTopic, NewPartitions, ConfigResource, ConfigEntry, ConfigSource, AclBinding, AclBindingFilter, ResourceType, ResourcePatternType, @@ -27,6 +27,8 @@ ScramMechanism, ScramCredentialInfo, UserScramCredentialUpsertion, UserScramCredentialDeletion, OffsetSpec) +from confluent_kafka._model import ConsumerGroupType + import sys import threading import logging @@ -471,18 +473,64 @@ def example_list(a, args): print("id {} client_id: {} client_host: {}".format(m.id, m.client_id, m.client_host)) +def getConsumerGroupState(state_string): + if state_string == "STABLE": + return ConsumerGroupState.STABLE + elif state_string == "DEAD": + return ConsumerGroupState.DEAD + elif state_string == "PREPARING_REBALANCING": + return ConsumerGroupState.PREPARING_REBALANCING + elif state_string == "COMPLETING_REBALANCING": + return ConsumerGroupState.COMPLETING_REBALANCING + elif state_string == "EMPTY": + return ConsumerGroupState.EMPTY + return ConsumerGroupState.UNKNOWN + + +def getConsumerGroupType(type_string): + if type_string == "CONSUMER": + return ConsumerGroupType.CONSUMER + elif type_string == "CLASSIC": + return ConsumerGroupType.CLASSIC + return ConsumerGroupType.UNKNOWN + + def example_list_consumer_groups(a, args): """ List Consumer Groups """ - states = {ConsumerGroupState[state] for state in args} - future = a.list_consumer_groups(request_timeout=10, states=states) + states = set() + group_types = set() + if len(args) > 0: + isType = False + isState = False + for i in range(0, len(args)): + if (args[i] == "-states"): + if (isState): + raise Exception("Invalid Arguments\n Usage: list_consumer_groups [-states ..] " + + "[-types ..]") + isState = True + elif (args[i] == "-types"): + if (isType): + raise Exception("Invalid Arguments\n Usage: list_consumer_groups [-states ..] " + + "[-types ..]") + isType = True + else: + if (isType): + group_types.add(getConsumerGroupType(args[i])) + elif (isState): + states.add(getConsumerGroupState(args[i])) + else: + raise Exception("Invalid Arguments\n Usage: list_consumer_groups [-states ..] " + + "[-types ..]") + + future = a.list_consumer_groups(request_timeout=10, states=states, group_types=group_types) try: list_consumer_groups_result = future.result() print("{} consumer groups".format(len(list_consumer_groups_result.valid))) for valid in list_consumer_groups_result.valid: - print(" id: {} is_simple: {} state: {}".format( - valid.group_id, valid.is_simple_consumer_group, valid.state)) + print(" id: {} is_simple: {} state: {} group_type: {}".format( + valid.group_id, valid.is_simple_consumer_group, valid.state, valid.group_type)) print("{} errors".format(len(list_consumer_groups_result.errors))) for error in list_consumer_groups_result.errors: print(" error: {}".format(error)) @@ -867,7 +915,8 @@ def example_list_offsets(a, args): sys.stderr.write(' delete_acls ' + ' ..\n') sys.stderr.write(' list []\n') - sys.stderr.write(' list_consumer_groups [ ..]\n') + sys.stderr.write(' list_consumer_groups [-states ..] ' + + '[-types ..]\n') sys.stderr.write(' describe_consumer_groups ..\n') sys.stderr.write(' describe_topics ..\n') sys.stderr.write(' describe_cluster \n') diff --git a/src/confluent_kafka/__init__.py b/src/confluent_kafka/__init__.py index bab73a2c6..0a483dd59 100644 --- a/src/confluent_kafka/__init__.py +++ b/src/confluent_kafka/__init__.py @@ -48,7 +48,7 @@ 'Producer', 'DeserializingConsumer', 'SerializingProducer', 'TIMESTAMP_CREATE_TIME', 'TIMESTAMP_LOG_APPEND_TIME', 'TIMESTAMP_NOT_AVAILABLE', 'TopicPartition', 'Node', - 'ConsumerGroupTopicPartitions', 'ConsumerGroupState', 'Uuid', + 'ConsumerGroupTopicPartitions', 'ConsumerGroupState', 'ConsumerGroupType', 'Uuid', 'IsolationLevel'] __version__ = version()[0] diff --git a/src/confluent_kafka/_model/__init__.py b/src/confluent_kafka/_model/__init__.py index 1c2ec89f0..792033622 100644 --- a/src/confluent_kafka/_model/__init__.py +++ b/src/confluent_kafka/_model/__init__.py @@ -95,6 +95,24 @@ def __lt__(self, other): return self.value < other.value +class ConsumerGroupType(Enum): + """ + Enumerates the different types of Consumer Group Type. + + """ + #: Type is not known or not set + UNKNOWN = cimpl.CONSUMER_GROUP_TYPE_UNKNOWN + #: Consumer Type + CONSUMER = cimpl.CONSUMER_GROUP_TYPE_CONSUMER + #: Classic Type + CLASSIC = cimpl.CONSUMER_GROUP_TYPE_CLASSIC + + def __lt__(self, other): + if self.__class__ != other.__class__: + return NotImplemented + return self.value < other.value + + class TopicCollection: """ Represents collection of topics in the form of different identifiers diff --git a/src/confluent_kafka/admin/__init__.py b/src/confluent_kafka/admin/__init__.py index 924361f2e..cb5220168 100644 --- a/src/confluent_kafka/admin/__init__.py +++ b/src/confluent_kafka/admin/__init__.py @@ -54,7 +54,7 @@ from ._listoffsets import (OffsetSpec, # noqa: F401 ListOffsetsResultInfo) -from .._model import TopicCollection as _TopicCollection +from .._model import TopicCollection as _TopicCollection, ConsumerGroupType as _ConsumerGroupType from ..cimpl import (KafkaException, # noqa: F401 KafkaError, @@ -864,6 +864,8 @@ def list_consumer_groups(self, **kwargs): on broker, and response. Default: `socket.timeout.ms*1000.0` :param set(ConsumerGroupState) states: only list consumer groups which are currently in these states. + :param set(ConsumerGroupType) group_types: only list consumer groups which are currently of + these types. :returns: a future. Result method of the future returns :class:`ListConsumerGroupsResult`. @@ -883,6 +885,16 @@ def list_consumer_groups(self, **kwargs): raise TypeError("All elements of states must be of type ConsumerGroupState") kwargs["states_int"] = [state.value for state in states] kwargs.pop("states") + if "group_types" in kwargs: + group_types = kwargs["group_types"] + if group_types is not None: + if not isinstance(group_types, set): + raise TypeError("'group_types' must be a set") + for group_type in group_types: + if not isinstance(group_type, _ConsumerGroupType): + raise TypeError("All elements of group_types must be of type ConsumerGroupType") + kwargs["group_types_int"] = [group_type.value for group_type in group_types] + kwargs.pop("group_types") f, _ = AdminClient._make_futures([], None, AdminClient._make_list_consumer_groups_result) diff --git a/src/confluent_kafka/admin/_group.py b/src/confluent_kafka/admin/_group.py index 82ab98f1d..06a97c91a 100644 --- a/src/confluent_kafka/admin/_group.py +++ b/src/confluent_kafka/admin/_group.py @@ -14,7 +14,7 @@ from .._util import ConversionUtil -from .._model import ConsumerGroupState +from .._model import ConsumerGroupState, ConsumerGroupType from ._acl import AclOperation @@ -31,13 +31,17 @@ class ConsumerGroupListing: Whether a consumer group is simple or not. state : ConsumerGroupState Current state of the consumer group. + group_type : ConsumerGroupType + Current type of the consumer group. """ - def __init__(self, group_id, is_simple_consumer_group, state=None): + def __init__(self, group_id, is_simple_consumer_group, state=None, group_type=None): self.group_id = group_id self.is_simple_consumer_group = is_simple_consumer_group if state is not None: self.state = ConversionUtil.convert_to_enum(state, ConsumerGroupState) + if group_type is not None: + self.group_type = ConversionUtil.convert_to_enum(group_type, ConsumerGroupType) class ListConsumerGroupsResult: @@ -119,6 +123,8 @@ class ConsumerGroupDescription: Partition assignor. state : ConsumerGroupState Current state of the consumer group. + group_type : ConsumerGroupType + Current type of the consumer group. coordinator: Node Consumer group coordinator. authorized_operations: list(AclOperation) @@ -126,7 +132,7 @@ class ConsumerGroupDescription: """ def __init__(self, group_id, is_simple_consumer_group, members, partition_assignor, state, - coordinator, authorized_operations=None): + coordinator, authorized_operations=None, group_type=None): self.group_id = group_id self.is_simple_consumer_group = is_simple_consumer_group self.members = members @@ -139,4 +145,7 @@ def __init__(self, group_id, is_simple_consumer_group, members, partition_assign self.partition_assignor = partition_assignor if state is not None: self.state = ConversionUtil.convert_to_enum(state, ConsumerGroupState) + if group_type is not None: + self.group_type = ConversionUtil.convert_to_enum(group_type, ConsumerGroupType) + self.coordinator = coordinator diff --git a/src/confluent_kafka/src/Admin.c b/src/confluent_kafka/src/Admin.c index 5d59d71d2..18d64a17d 100644 --- a/src/confluent_kafka/src/Admin.c +++ b/src/confluent_kafka/src/Admin.c @@ -82,6 +82,8 @@ struct Admin_options { rd_kafka_IsolationLevel_t isolation_level; rd_kafka_consumer_group_state_t* states; int states_cnt; + rd_kafka_consumer_group_type_t* group_types; + int group_types_cnt; }; /**@brief "unset" value initializers for Admin_options @@ -185,6 +187,13 @@ Admin_options_to_c (Handle *self, rd_kafka_admin_op_t for_api, goto err; } + if (Admin_options_is_set_ptr(options->group_types) && + (err_obj = rd_kafka_AdminOptions_set_match_consumer_group_types( + c_options, options->group_types, options->group_types_cnt))) { + snprintf(errstr, sizeof(errstr), "%s", rd_kafka_error_string(err_obj)); + goto err; + } + return c_options; err: @@ -1698,24 +1707,28 @@ static const char Admin_delete_acls_doc[] = PyDoc_STR( * @brief List consumer groups */ PyObject *Admin_list_consumer_groups (Handle *self, PyObject *args, PyObject *kwargs) { - PyObject *future, *states_int = NULL; + PyObject *future, *states_int, *group_types_int = NULL; struct Admin_options options = Admin_options_INITIALIZER; rd_kafka_AdminOptions_t *c_options = NULL; CallState cs; rd_kafka_queue_t *rkqu; rd_kafka_consumer_group_state_t *c_states = NULL; + rd_kafka_consumer_group_type_t *c_group_types = NULL; int states_cnt = 0; + int group_types_cnt = 0; int i = 0; static char *kws[] = {"future", /* options */ "states_int", + "group_types_int", "request_timeout", NULL}; - if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|Of", kws, + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|OOf", kws, &future, &states_int, + &group_types_int, &options.request_timeout)) { goto err; } @@ -1746,6 +1759,32 @@ PyObject *Admin_list_consumer_groups (Handle *self, PyObject *args, PyObject *kw } } + if(group_types_int != NULL && group_types_int != Py_None) { + if(!PyList_Check(group_types_int)) { + PyErr_SetString(PyExc_ValueError, + "group_types must of type list"); + goto err; + } + + group_types_cnt = (int)PyList_Size(group_types_int); + + if(group_types_cnt > 0) { + c_group_types = (rd_kafka_consumer_group_type_t *) + malloc(group_types_cnt*sizeof(rd_kafka_consumer_group_type_t)); + for(i = 0 ; i < group_types_cnt ; i++) { + PyObject *group_type = PyList_GET_ITEM(group_types_int, i); + if(!cfl_PyInt_Check(group_type)) { + PyErr_SetString(PyExc_ValueError, + "Element of group_types must be a valid group type"); + goto err; + } + c_group_types[i] = (rd_kafka_consumer_group_type_t) cfl_PyInt_AsInt(group_type); + } + options.group_types = c_group_types; + options.group_types_cnt = group_types_cnt; + } + } + c_options = Admin_options_to_c(self, RD_KAFKA_ADMIN_OP_LISTCONSUMERGROUPS, &options, future); if (!c_options) { @@ -1760,7 +1799,6 @@ PyObject *Admin_list_consumer_groups (Handle *self, PyObject *args, PyObject *kw /* Use librdkafka's background thread queue to automatically dispatch * Admin_background_event_cb() when the admin operation is finished. */ rkqu = rd_kafka_queue_get_background(self->rk); - /* * Call ListConsumerGroupOffsets * @@ -1774,14 +1812,19 @@ PyObject *Admin_list_consumer_groups (Handle *self, PyObject *args, PyObject *kw if(c_states) { free(c_states); } + if(c_group_types) { + free(c_group_types); + } rd_kafka_queue_destroy(rkqu); /* drop reference from get_background */ rd_kafka_AdminOptions_destroy(c_options); - Py_RETURN_NONE; err: if(c_states) { free(c_states); } + if(c_group_types) { + free(c_group_types); + } if (c_options) { rd_kafka_AdminOptions_destroy(c_options); Py_DECREF(future); @@ -1789,7 +1832,7 @@ PyObject *Admin_list_consumer_groups (Handle *self, PyObject *args, PyObject *kw return NULL; } const char Admin_list_consumer_groups_doc[] = PyDoc_STR( - ".. py:function:: list_consumer_groups(future, [states_int], [request_timeout])\n" + ".. py:function:: list_consumer_groups(future, [states_int], [group_types_int], [request_timeout])\n" "\n" " List all the consumer groups.\n" "\n" @@ -3466,7 +3509,6 @@ static PyObject *Admin_c_ListConsumerGroupsResults_to_py( size_t valid_cnt, const rd_kafka_error_t **c_errors_responses, size_t errors_cnt) { - PyObject *result = NULL; PyObject *ListConsumerGroupsResult_type = NULL; PyObject *ConsumerGroupListing_type = NULL; @@ -3509,6 +3551,8 @@ static PyObject *Admin_c_ListConsumerGroupsResults_to_py( cfl_PyDict_SetInt(kwargs, "state", rd_kafka_ConsumerGroupListing_state(c_valid_responses[i])); + cfl_PyDict_SetInt(kwargs, "group_type", rd_kafka_ConsumerGroupListing_type(c_valid_responses[i])); + args = PyTuple_New(0); valid_result = PyObject_Call(ConsumerGroupListing_type, args, kwargs); diff --git a/src/confluent_kafka/src/AdminTypes.c b/src/confluent_kafka/src/AdminTypes.c index 705ce36d9..e56da3b88 100644 --- a/src/confluent_kafka/src/AdminTypes.c +++ b/src/confluent_kafka/src/AdminTypes.c @@ -570,8 +570,14 @@ static void AdminTypes_AddObjectsConsumerGroupStates (PyObject *m) { PyModule_AddIntConstant(m, "CONSUMER_GROUP_STATE_EMPTY", RD_KAFKA_CONSUMER_GROUP_STATE_EMPTY); } +static void AdminTypes_AddObjectsConsumerGroupTypes (PyObject *m) { + /* rd_kafka_consumer_group_type_t */ + PyModule_AddIntConstant(m, "CONSUMER_GROUP_TYPE_UNKNOWN", RD_KAFKA_CONSUMER_GROUP_TYPE_UNKNOWN); + PyModule_AddIntConstant(m, "CONSUMER_GROUP_TYPE_CONSUMER", RD_KAFKA_CONSUMER_GROUP_TYPE_CONSUMER); + PyModule_AddIntConstant(m, "CONSUMER_GROUP_TYPE_CLASSIC", RD_KAFKA_CONSUMER_GROUP_TYPE_CLASSIC); +} + static void AdminTypes_AddObjectsAlterConfigOpType (PyObject *m) { - /* rd_kafka_consumer_group_state_t */ PyModule_AddIntConstant(m, "ALTER_CONFIG_OP_TYPE_SET", RD_KAFKA_ALTER_CONFIG_OP_TYPE_SET); PyModule_AddIntConstant(m, "ALTER_CONFIG_OP_TYPE_DELETE", RD_KAFKA_ALTER_CONFIG_OP_TYPE_DELETE); PyModule_AddIntConstant(m, "ALTER_CONFIG_OP_TYPE_APPEND", RD_KAFKA_ALTER_CONFIG_OP_TYPE_APPEND); @@ -612,6 +618,7 @@ void AdminTypes_AddObjects (PyObject *m) { AdminTypes_AddObjectsAclOperation(m); AdminTypes_AddObjectsAclPermissionType(m); AdminTypes_AddObjectsConsumerGroupStates(m); + AdminTypes_AddObjectsConsumerGroupTypes(m); AdminTypes_AddObjectsAlterConfigOpType(m); AdminTypes_AddObjectsScramMechanismType(m); AdminTypes_AddObjectsIsolationLevel(m); diff --git a/tests/common/__init__.py b/tests/common/__init__.py new file mode 100644 index 000000000..a0d54934f --- /dev/null +++ b/tests/common/__init__.py @@ -0,0 +1,61 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright 2024 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +from confluent_kafka import Consumer, DeserializingConsumer +from confluent_kafka.avro import AvroConsumer + +_GROUP_PROTOCOL_ENV = 'TEST_CONSUMER_GROUP_PROTOCOL' +_TRIVUP_CLUSTER_TYPE_ENV = 'TEST_TRIVUP_CLUSTER_TYPE' + + +def _update_conf_group_protocol(conf=None): + if conf is not None and 'group.id' in conf and TestUtils.use_group_protocol_consumer(): + conf['group.protocol'] = 'consumer' + + +def _trivup_cluster_type_kraft(): + return _TRIVUP_CLUSTER_TYPE_ENV in os.environ and os.environ[_TRIVUP_CLUSTER_TYPE_ENV] == 'kraft' + + +class TestUtils: + @staticmethod + def use_kraft(): + return TestUtils.use_group_protocol_consumer() or _trivup_cluster_type_kraft() + + @staticmethod + def use_group_protocol_consumer(): + return _GROUP_PROTOCOL_ENV in os.environ and os.environ[_GROUP_PROTOCOL_ENV] == 'consumer' + + +class TestConsumer(Consumer): + def __init__(self, conf=None, **kwargs): + _update_conf_group_protocol(conf) + super(TestConsumer, self).__init__(conf, **kwargs) + + +class TestDeserializingConsumer(DeserializingConsumer): + def __init__(self, conf=None, **kwargs): + _update_conf_group_protocol(conf) + super(TestDeserializingConsumer, self).__init__(conf, **kwargs) + + +class TestAvroConsumer(AvroConsumer): + def __init__(self, conf=None, **kwargs): + _update_conf_group_protocol(conf) + super(TestAvroConsumer, self).__init__(conf, **kwargs) diff --git a/tests/integration/admin/test_basic_operations.py b/tests/integration/admin/test_basic_operations.py index 820fd5228..92827aa39 100644 --- a/tests/integration/admin/test_basic_operations.py +++ b/tests/integration/admin/test_basic_operations.py @@ -13,15 +13,16 @@ # See the License for the specific language governing permissions and # limitations under the License. -import confluent_kafka import struct import time -from confluent_kafka import ConsumerGroupTopicPartitions, TopicPartition, ConsumerGroupState + +from confluent_kafka import ConsumerGroupTopicPartitions, TopicPartition, ConsumerGroupState, KafkaError +from confluent_kafka._model import ConsumerGroupType from confluent_kafka.admin import (NewPartitions, ConfigResource, AclBinding, AclBindingFilter, ResourceType, ResourcePatternType, AclOperation, AclPermissionType) from confluent_kafka.error import ConsumeError - +from tests.common import TestUtils topic_prefix = "test-topic" @@ -55,6 +56,7 @@ def verify_admin_acls(admin_client, "User:test-user-2", "*", AclOperation.ALL, AclPermissionType.ALLOW) fs = admin_client.create_acls([acl_binding_1, acl_binding_2, acl_binding_3]) + time.sleep(1) for acl_binding, f in fs.items(): f.result() # trigger exception if there was an error @@ -78,6 +80,7 @@ def verify_admin_acls(admin_client, # expected_acl_bindings = [acl_binding_2, acl_binding_3] fs = admin_client.delete_acls([acl_binding_filter2]) + time.sleep(1) deleted_acl_bindings = sorted(fs[acl_binding_filter2].result()) assert deleted_acl_bindings == expected_acl_bindings, \ "Deleted ACL bindings don't match, actual {} expected {}".format(deleted_acl_bindings, @@ -89,6 +92,7 @@ def verify_admin_acls(admin_client, expected_acl_bindings = [[acl_binding_1], []] delete_acl_binding_filters = [acl_binding_filter3, acl_binding_filter4] fs = admin_client.delete_acls(delete_acl_binding_filters) + time.sleep(1) for acl_binding, expected in zip(delete_acl_binding_filters, expected_acl_bindings): deleted_acl_bindings = sorted(fs[acl_binding].result()) assert deleted_acl_bindings == expected, \ @@ -209,6 +213,7 @@ def test_basic_operations(kafka_cluster): }, validate_only=validate ) + time.sleep(1) admin_client = kafka_cluster.admin() @@ -270,7 +275,7 @@ def consume_messages(group_id, num_messages=None): print('Read all the required messages: exiting') break except ConsumeError as e: - if msg is not None and e.code == confluent_kafka.KafkaError._PARTITION_EOF: + if msg is not None and e.code == KafkaError._PARTITION_EOF: print('Reached end of %s [%d] at offset %d' % ( msg.topic(), msg.partition(), msg.offset())) eof_reached[(msg.topic(), msg.partition())] = True @@ -311,6 +316,17 @@ def consume_messages(group_id, num_messages=None): assert isinstance(result.valid, list) assert not result.valid + # List Consumer Groups with Group Type Option Test + if TestUtils.use_group_protocol_consumer(): + future = admin_client.list_consumer_groups(request_timeout=10, types={ConsumerGroupType.CLASSIC}) + result = future.result() + group_ids = [group.group_id for group in result.valid] + assert group1 not in group_ids, "Consumer group {} was found despite passing Classic Group Type".format(group1) + assert group2 not in group_ids, "Consumer group {} was found despite passing Classic Group Type".format(group2) + for group in group_ids: + assert group.group_type == ConsumerGroupType.CLASSIC + + def verify_config(expconfig, configs): """ Verify that the config key,values in expconfig are found @@ -343,6 +359,7 @@ def verify_config(expconfig, configs): resource.set_config(key, value) fs = admin_client.alter_configs([resource]) + time.sleep(1) fs[resource].result() # will raise exception on failure # diff --git a/tests/integration/admin/test_describe_operations.py b/tests/integration/admin/test_describe_operations.py index ef5d94987..dd69924e2 100644 --- a/tests/integration/admin/test_describe_operations.py +++ b/tests/integration/admin/test_describe_operations.py @@ -13,12 +13,16 @@ # See the License for the specific language governing permissions and # limitations under the License. +import time import pytest + from confluent_kafka.admin import (AclBinding, AclBindingFilter, ResourceType, ResourcePatternType, AclOperation, AclPermissionType) from confluent_kafka.error import ConsumeError from confluent_kafka import ConsumerGroupState, TopicCollection +from tests.common import TestUtils + topic_prefix = "test-topic" @@ -82,10 +86,12 @@ def perform_admin_operation_sync(operation, *arg, **kwargs): def create_acls(admin_client, acl_bindings): perform_admin_operation_sync(admin_client.create_acls, acl_bindings) + time.sleep(1) def delete_acls(admin_client, acl_binding_filters): perform_admin_operation_sync(admin_client.delete_acls, acl_binding_filters) + time.sleep(1) def verify_provided_describe_for_authorized_operations( @@ -115,6 +121,7 @@ def verify_provided_describe_for_authorized_operations( acl_binding = AclBinding(restype, resname, ResourcePatternType.LITERAL, "User:sasl_user", "*", operation_to_allow, AclPermissionType.ALLOW) create_acls(admin_client, [acl_binding]) + time.sleep(1) # Check with updated authorized operations desc = perform_admin_operation_sync(describe_fn, *arg, **kwargs) @@ -126,6 +133,7 @@ def verify_provided_describe_for_authorized_operations( acl_binding_filter = AclBindingFilter(restype, resname, ResourcePatternType.ANY, None, None, AclOperation.ANY, AclPermissionType.ANY) delete_acls(admin_client, [acl_binding_filter]) + time.sleep(1) return desc @@ -204,12 +212,17 @@ def test_describe_operations(sasl_cluster): }, validate_only=False ) + time.sleep(1) # Verify Authorized Operations in Describe Topics verify_describe_topics(admin_client, our_topic) # Verify Authorized Operations in Describe Groups - verify_describe_groups(sasl_cluster, admin_client, our_topic) + # Skip this test if using group protocol `consumer` + # as there is new RPC for describe_groups() in + # group protocol `consumer` case. + if not TestUtils.use_group_protocol_consumer(): + verify_describe_groups(sasl_cluster, admin_client, our_topic) # Delete Topic perform_admin_operation_sync(admin_client.delete_topics, [our_topic], operation_timeout=0, request_timeout=10) diff --git a/tests/integration/admin/test_incremental_alter_configs.py b/tests/integration/admin/test_incremental_alter_configs.py index ad19a1164..58a96d84e 100644 --- a/tests/integration/admin/test_incremental_alter_configs.py +++ b/tests/integration/admin/test_incremental_alter_configs.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import time + from confluent_kafka.admin import ConfigResource, \ ConfigEntry, ResourceType, \ AlterConfigOpType @@ -58,12 +60,14 @@ def test_incremental_alter_configs(kafka_cluster): "config": topic_config, "replication_factor": 1, }) + time.sleep(1) our_topic2 = kafka_cluster.create_topic(topic_prefix2, { "num_partitions": num_partitions, "config": topic_config, "replication_factor": 1, }) + time.sleep(1) admin_client = kafka_cluster.admin() @@ -100,6 +104,7 @@ def test_incremental_alter_configs(kafka_cluster): # Incrementally alter some configuration values # fs = admin_client.incremental_alter_configs([res1, res2]) + time.sleep(1) assert_operation_succeeded(fs, 2) @@ -131,6 +136,7 @@ def test_incremental_alter_configs(kafka_cluster): # Incrementally alter some configuration values # fs = admin_client.incremental_alter_configs([res2]) + time.sleep(1) assert_operation_succeeded(fs, 1) diff --git a/tests/integration/admin/test_list_offsets.py b/tests/integration/admin/test_list_offsets.py index 6a2e0a46a..7d5bee91a 100644 --- a/tests/integration/admin/test_list_offsets.py +++ b/tests/integration/admin/test_list_offsets.py @@ -13,8 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -from confluent_kafka.admin import ListOffsetsResultInfo, OffsetSpec +import time + from confluent_kafka import TopicPartition, IsolationLevel +from confluent_kafka.admin import ListOffsetsResultInfo, OffsetSpec +from tests.common import TestUtils def test_list_offsets(kafka_cluster): @@ -32,6 +35,7 @@ def test_list_offsets(kafka_cluster): "num_partitions": 1, "replication_factor": 1, }) + time.sleep(1) # Create Producer instance p = kafka_cluster.producer() @@ -62,12 +66,13 @@ def test_list_offsets(kafka_cluster): assert isinstance(result, ListOffsetsResultInfo) assert (result.offset == 3) - requests = {topic_partition: OffsetSpec.max_timestamp()} - futmap = admin_client.list_offsets(requests, **kwargs) - for _, fut in futmap.items(): - result = fut.result() - assert isinstance(result, ListOffsetsResultInfo) - assert (result.offset == 1) + if TestUtils.use_kraft(): + requests = {topic_partition: OffsetSpec.max_timestamp()} + futmap = admin_client.list_offsets(requests, **kwargs) + for _, fut in futmap.items(): + result = fut.result() + assert isinstance(result, ListOffsetsResultInfo) + assert (result.offset == 1) requests = {topic_partition: OffsetSpec.for_timestamp(base_timestamp + 150)} futmap = admin_client.list_offsets(requests, **kwargs) diff --git a/tests/integration/admin/test_user_scram_credentials.py b/tests/integration/admin/test_user_scram_credentials.py index 21c15bc07..b9c50ff67 100644 --- a/tests/integration/admin/test_user_scram_credentials.py +++ b/tests/integration/admin/test_user_scram_credentials.py @@ -13,13 +13,17 @@ # See the License for the specific language governing permissions and # limitations under the License. -import pytest +import time import concurrent +import pytest + from confluent_kafka.admin import UserScramCredentialsDescription, UserScramCredentialUpsertion, \ UserScramCredentialDeletion, ScramCredentialInfo, \ ScramMechanism from confluent_kafka.error import KafkaException, KafkaError +from tests.common import TestUtils + def test_user_scram_credentials(kafka_cluster): """ @@ -47,22 +51,28 @@ def test_user_scram_credentials(kafka_cluster): futmap = admin_client.alter_user_scram_credentials([UserScramCredentialUpsertion(newuser, ScramCredentialInfo(mechanism, iterations), password, salt)]) + time.sleep(1) fut = futmap[newuser] result = fut.result() assert result is None # Try upsertion for newuser,SCRAM_SHA_256 and add newuser,SCRAM_SHA_512 - futmap = admin_client.alter_user_scram_credentials([UserScramCredentialUpsertion( - newuser, - ScramCredentialInfo( - mechanism, iterations), - password, salt), - UserScramCredentialUpsertion( - newuser, - ScramCredentialInfo( - ScramMechanism.SCRAM_SHA_512, 10000), - password) - ]) + request = [UserScramCredentialUpsertion(newuser, + ScramCredentialInfo( + mechanism, iterations), + password, salt), + UserScramCredentialUpsertion(newuser, + ScramCredentialInfo( + ScramMechanism.SCRAM_SHA_512, 10000), + password)] + + if TestUtils.use_kraft(): + futmap = admin_client.alter_user_scram_credentials([request[0]]) + time.sleep(1) + futmap = admin_client.alter_user_scram_credentials([request[1]]) + else: + futmap = admin_client.alter_user_scram_credentials(request) + time.sleep(1) fut = futmap[newuser] result = fut.result() assert result is None @@ -72,6 +82,7 @@ def test_user_scram_credentials(kafka_cluster): newuser, ScramMechanism.SCRAM_SHA_512) ]) + time.sleep(1) fut = futmap[newuser] result = fut.result() assert result is None @@ -101,6 +112,7 @@ def test_user_scram_credentials(kafka_cluster): # Delete newuser futmap = admin_client.alter_user_scram_credentials([UserScramCredentialDeletion(newuser, mechanism)]) + time.sleep(1) assert isinstance(futmap, dict) assert len(futmap) == 1 assert newuser in futmap diff --git a/tests/integration/cluster_fixture.py b/tests/integration/cluster_fixture.py index e23a0547e..4c5a31038 100644 --- a/tests/integration/cluster_fixture.py +++ b/tests/integration/cluster_fixture.py @@ -20,11 +20,12 @@ from trivup.clusters.KafkaCluster import KafkaCluster -from confluent_kafka import Consumer, Producer, DeserializingConsumer, \ - SerializingProducer +from confluent_kafka import Producer, SerializingProducer from confluent_kafka.admin import AdminClient, NewTopic from confluent_kafka.schema_registry.schema_registry_client import SchemaRegistryClient +from tests.common import TestDeserializingConsumer, TestConsumer + class KafkaClusterFixture(object): __slots__ = ['_cluster', '_admin', '_producer'] @@ -105,7 +106,7 @@ def cimpl_consumer(self, conf=None): if conf is not None: consumer_conf.update(conf) - return Consumer(consumer_conf) + return TestConsumer(consumer_conf) def consumer(self, conf=None, key_deserializer=None, value_deserializer=None): """ @@ -138,7 +139,7 @@ def consumer(self, conf=None, key_deserializer=None, value_deserializer=None): if value_deserializer is not None: consumer_conf['value.deserializer'] = value_deserializer - return DeserializingConsumer(consumer_conf) + return TestDeserializingConsumer(consumer_conf) def admin(self, conf=None): if conf: diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 3945a2954..8a6c712f2 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -17,35 +17,47 @@ # import os - import pytest +from tests.common import TestUtils from tests.integration.cluster_fixture import TrivupFixture from tests.integration.cluster_fixture import ByoFixture work_dir = os.path.dirname(os.path.realpath(__file__)) +def _broker_conf(): + broker_conf = ['transaction.state.log.replication.factor=1', + 'transaction.state.log.min.isr=1'] + if TestUtils.use_group_protocol_consumer(): + broker_conf.append('group.coordinator.rebalance.protocols=classic,consumer') + return broker_conf + + +def _broker_version(): + return 'trunk@f6c9feea76d01a46319b0ca602d70aa855057b07' if TestUtils.use_group_protocol_consumer() else '3.7.0' + + def create_trivup_cluster(conf={}): trivup_fixture_conf = {'with_sr': True, 'debug': True, - 'cp_version': '7.4.0', - 'version': '3.4.0', - 'broker_conf': ['transaction.state.log.replication.factor=1', - 'transaction.state.log.min.isr=1']} + 'cp_version': '7.6.0', + 'kraft': TestUtils.use_kraft(), + 'version': _broker_version(), + 'broker_conf': _broker_conf()} trivup_fixture_conf.update(conf) return TrivupFixture(trivup_fixture_conf) def create_sasl_cluster(conf={}): trivup_fixture_conf = {'with_sr': False, - 'version': '3.4.0', + 'version': _broker_version(), 'sasl_mechanism': "PLAIN", + 'kraft': TestUtils.use_kraft(), 'sasl_users': 'sasl_user=sasl_user', 'debug': True, 'cp_version': 'latest', - 'broker_conf': ['transaction.state.log.replication.factor=1', - 'transaction.state.log.min.isr=1']} + 'broker_conf': _broker_conf()} trivup_fixture_conf.update(conf) return TrivupFixture(trivup_fixture_conf) @@ -112,6 +124,13 @@ def kafka_cluster(): yield fixture +@pytest.fixture(scope="package") +def kafka_single_broker_cluster(): + for fixture in kafka_cluster_fixture( + trivup_cluster_conf={'broker_cnt': 1}): + yield fixture + + @pytest.fixture(scope="package") def sasl_cluster(request): for fixture in sasl_cluster_fixture(request.param): diff --git a/tests/integration/consumer/test_consumer_topicpartition_metadata.py b/tests/integration/consumer/test_consumer_topicpartition_metadata.py index 4c01c1df7..486231c26 100644 --- a/tests/integration/consumer/test_consumer_topicpartition_metadata.py +++ b/tests/integration/consumer/test_consumer_topicpartition_metadata.py @@ -29,11 +29,11 @@ def commit_and_check(consumer, topic, metadata): assert offsets[0].metadata == metadata -def test_consumer_topicpartition_metadata(kafka_cluster): - topic = kafka_cluster.create_topic("test_topicpartition") +def test_consumer_topicpartition_metadata(kafka_single_broker_cluster): + topic = kafka_single_broker_cluster.create_topic("test_topicpartition") consumer_conf = {'group.id': 'pytest'} - c = kafka_cluster.consumer(consumer_conf) + c = kafka_single_broker_cluster.consumer(consumer_conf) # Commit without any metadata. metadata = None diff --git a/tests/integration/consumer/test_cooperative_rebalance_1.py b/tests/integration/consumer/test_cooperative_rebalance_1.py index d9a94a85f..7d7fa8b2f 100644 --- a/tests/integration/consumer/test_cooperative_rebalance_1.py +++ b/tests/integration/consumer/test_cooperative_rebalance_1.py @@ -44,7 +44,13 @@ def __init__(self): def on_assign(self, consumer, partitions): self.assign_count += 1 - assert 1 == len(partitions) + if self.assign_count == 3: + # Assigning both partitions again after assignment lost + # due to max poll interval timeout exceeded + assert 2 == len(partitions) + else: + # Incremental assign cases + assert 1 == len(partitions) def on_revoke(self, consumer, partitions): self.revoke_count += 1 @@ -97,10 +103,11 @@ def on_lost(self, consumer, partitions): assert e.value.args[0].code() == KafkaError._MAX_POLL_EXCEEDED # And poll again to trigger rebalance callbacks - msg4 = consumer.poll(1) - assert msg4 is None + # It will trigger on_lost and then on_assign during rejoin + msg4 = consumer.poll(10) + assert msg4 is not None # Reading messages again after rejoin - assert 2 == reb.assign_count + assert 3 == reb.assign_count assert 1 == reb.lost_count assert 0 == reb.revoke_count diff --git a/tests/integration/integration_test.py b/tests/integration/integration_test.py index e4ae6deca..bc0efa1a7 100755 --- a/tests/integration/integration_test.py +++ b/tests/integration/integration_test.py @@ -20,7 +20,6 @@ """ Test script for confluent_kafka module """ -import confluent_kafka import os import time import uuid @@ -30,6 +29,10 @@ import struct import re +import confluent_kafka + +from tests.common import TestConsumer, TestAvroConsumer + try: # Memory tracker from pympler import tracker @@ -373,7 +376,7 @@ def verify_consumer(): 'enable.partition.eof': True} # Create consumer - c = confluent_kafka.Consumer(conf) + c = TestConsumer(conf) def print_wmark(consumer, topic_parts): # Verify #294: get_watermark_offsets() should not fail on the first call @@ -483,7 +486,7 @@ def print_wmark(consumer, topic_parts): c.close() # Start a new client and get the committed offsets - c = confluent_kafka.Consumer(conf) + c = TestConsumer(conf) offsets = c.committed(list(map(lambda p: confluent_kafka.TopicPartition(topic, p), range(0, 3)))) for tp in offsets: print(tp) @@ -500,7 +503,7 @@ def verify_consumer_performance(): 'error_cb': error_cb, 'auto.offset.reset': 'earliest'} - c = confluent_kafka.Consumer(conf) + c = TestConsumer(conf) def my_on_assign(consumer, partitions): print('on_assign:', len(partitions), 'partitions:') @@ -608,7 +611,7 @@ def verify_batch_consumer(): 'auto.offset.reset': 'earliest'} # Create consumer - c = confluent_kafka.Consumer(conf) + c = TestConsumer(conf) # Subscribe to a list of topics c.subscribe([topic]) @@ -665,7 +668,7 @@ def verify_batch_consumer(): c.close() # Start a new client and get the committed offsets - c = confluent_kafka.Consumer(conf) + c = TestConsumer(conf) offsets = c.committed(list(map(lambda p: confluent_kafka.TopicPartition(topic, p), range(0, 3)))) for tp in offsets: print(tp) @@ -682,7 +685,7 @@ def verify_batch_consumer_performance(): 'error_cb': error_cb, 'auto.offset.reset': 'earliest'} - c = confluent_kafka.Consumer(conf) + c = TestConsumer(conf) def my_on_assign(consumer, partitions): print('on_assign:', len(partitions), 'partitions:') @@ -877,7 +880,7 @@ def run_avro_loop(producer_conf, consumer_conf): p.produce(**combo) p.flush() - c = avro.AvroConsumer(consumer_conf) + c = TestAvroConsumer(consumer_conf) c.subscribe([(t['topic']) for t in combinations]) msgcount = 0 @@ -989,7 +992,7 @@ def stats_cb(stats_json_str): 'statistics.interval.ms': 200, 'auto.offset.reset': 'earliest'} - c = confluent_kafka.Consumer(conf) + c = TestConsumer(conf) c.subscribe([topic]) max_msgcnt = 1000000 @@ -1116,7 +1119,7 @@ def verify_avro_explicit_read_schema(): p.produce(topic=avro_topic, **combo) p.flush() - c = avro.AvroConsumer(consumer_conf, reader_key_schema=reader_schema, reader_value_schema=reader_schema) + c = TestAvroConsumer(consumer_conf, reader_key_schema=reader_schema, reader_value_schema=reader_schema) c.subscribe([avro_topic]) msgcount = 0 diff --git a/tests/integration/producer/__init__.py b/tests/integration/producer/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/integration/producer/test_transactions.py b/tests/integration/producer/test_transactions.py index b43f81437..32d1275bb 100644 --- a/tests/integration/producer/test_transactions.py +++ b/tests/integration/producer/test_transactions.py @@ -19,7 +19,9 @@ import sys from uuid import uuid1 -from confluent_kafka import Consumer, KafkaError +from confluent_kafka import KafkaError + +from tests.common import TestConsumer def called_by(): @@ -114,7 +116,7 @@ def test_send_offsets_committed_transaction(kafka_cluster): 'error_cb': error_cb } consumer_conf.update(kafka_cluster.client_conf()) - consumer = Consumer(consumer_conf) + consumer = TestConsumer(consumer_conf) kafka_cluster.seed_topic(input_topic) consumer.subscribe([input_topic]) @@ -204,7 +206,7 @@ def consume_committed(conf, topic): 'error_cb': prefixed_error_cb(called_by()), } consumer_conf.update(conf) - consumer = Consumer(consumer_conf) + consumer = TestConsumer(consumer_conf) consumer.subscribe([topic]) msg_cnt = read_all_msgs(consumer) diff --git a/tests/integration/schema_registry/test_proto_serializers.py b/tests/integration/schema_registry/test_proto_serializers.py index 621beac43..26f213ee2 100644 --- a/tests/integration/schema_registry/test_proto_serializers.py +++ b/tests/integration/schema_registry/test_proto_serializers.py @@ -47,19 +47,19 @@ one_id='oneof_str', is_active=False)}) ]) -def test_protobuf_message_serialization(kafka_cluster, pb2, data): +def test_protobuf_message_serialization(kafka_single_broker_cluster, pb2, data): """ Validates that we get the same message back that we put in. """ - topic = kafka_cluster.create_topic("serialization-proto") - sr = kafka_cluster.schema_registry() + topic = kafka_single_broker_cluster.create_topic("serialization-proto") + sr = kafka_single_broker_cluster.schema_registry() value_serializer = ProtobufSerializer(pb2, sr, {'use.deprecated.format': False}) value_deserializer = ProtobufDeserializer(pb2, {'use.deprecated.format': False}) - producer = kafka_cluster.producer(value_serializer=value_serializer) - consumer = kafka_cluster.consumer(value_deserializer=value_deserializer) + producer = kafka_single_broker_cluster.producer(value_serializer=value_serializer) + consumer = kafka_single_broker_cluster.consumer(value_deserializer=value_deserializer) consumer.assign([TopicPartition(topic, 0)]) expect = pb2(**data) @@ -78,16 +78,16 @@ def test_protobuf_message_serialization(kafka_cluster, pb2, data): (DependencyMessage, ['NestedTestProto.proto', 'PublicTestProto.proto']), (ClickCas, ['metadata_proto.proto', 'common_proto.proto']) ]) -def test_protobuf_reference_registration(kafka_cluster, pb2, expected_refs): +def test_protobuf_reference_registration(kafka_single_broker_cluster, pb2, expected_refs): """ Registers multiple messages with dependencies then queries the Schema Registry to ensure the references match up. """ - sr = kafka_cluster.schema_registry() - topic = kafka_cluster.create_topic("serialization-proto-refs") + sr = kafka_single_broker_cluster.schema_registry() + topic = kafka_single_broker_cluster.create_topic("serialization-proto-refs") serializer = ProtobufSerializer(pb2, sr, {'use.deprecated.format': False}) - producer = kafka_cluster.producer(key_serializer=serializer) + producer = kafka_single_broker_cluster.producer(key_serializer=serializer) producer.produce(topic, key=pb2(), partition=0) producer.flush() @@ -97,7 +97,7 @@ def test_protobuf_reference_registration(kafka_cluster, pb2, expected_refs): assert expected_refs.sort() == [ref.name for ref in registered_refs].sort() -def test_protobuf_serializer_type_mismatch(kafka_cluster): +def test_protobuf_serializer_type_mismatch(kafka_single_broker_cluster): """ Ensures an Exception is raised when deserializing an unexpected type. @@ -105,11 +105,11 @@ def test_protobuf_serializer_type_mismatch(kafka_cluster): pb2_1 = TestProto_pb2.TestMessage pb2_2 = NestedTestProto_pb2.NestedMessage - sr = kafka_cluster.schema_registry() - topic = kafka_cluster.create_topic("serialization-proto-refs") + sr = kafka_single_broker_cluster.schema_registry() + topic = kafka_single_broker_cluster.create_topic("serialization-proto-refs") serializer = ProtobufSerializer(pb2_1, sr, {'use.deprecated.format': False}) - producer = kafka_cluster.producer(key_serializer=serializer) + producer = kafka_single_broker_cluster.producer(key_serializer=serializer) with pytest.raises(KafkaException, match=r"message must be of type =6.0.0;python_version>="3.0" pytest-timeout requests-mock -trivup>=0.8.3 +tests/trivup/trivup-0.12.6.tar.gz fastavro<1.8.0;python_version=="3.7" fastavro>=1.8.4;python_version>"3.7" fastavro diff --git a/tests/soak/soakclient.py b/tests/soak/soakclient.py index e7e914cee..978257448 100755 --- a/tests/soak/soakclient.py +++ b/tests/soak/soakclient.py @@ -26,10 +26,11 @@ # from confluent_kafka import KafkaError, KafkaException, version -from confluent_kafka import Producer, Consumer +from confluent_kafka import Producer from confluent_kafka.admin import AdminClient, NewTopic from collections import defaultdict from builtins import int +from common import TestConsumer import argparse import threading import time @@ -444,7 +445,7 @@ def filter_config(conf, filter_out, strip_prefix): cconf['error_cb'] = self.consumer_error_cb cconf['on_commit'] = self.consumer_commit_cb self.logger.info("consumer: using group.id {}".format(cconf['group.id'])) - self.consumer = Consumer(cconf) + self.consumer = TestConsumer(cconf) # Create and start producer thread self.producer_thread = threading.Thread(target=self.producer_thread_main) diff --git a/tests/test_Admin.py b/tests/test_Admin.py index b59cefb68..84788dd25 100644 --- a/tests/test_Admin.py +++ b/tests/test_Admin.py @@ -6,7 +6,7 @@ ResourcePatternType, AclOperation, AclPermissionType, AlterConfigOpType, \ ScramCredentialInfo, ScramMechanism, \ UserScramCredentialAlteration, UserScramCredentialDeletion, \ - UserScramCredentialUpsertion, OffsetSpec + UserScramCredentialUpsertion, OffsetSpec, _ConsumerGroupType from confluent_kafka import KafkaException, KafkaError, libversion, \ TopicPartition, ConsumerGroupTopicPartitions, ConsumerGroupState, \ IsolationLevel, TopicCollection @@ -616,6 +616,18 @@ def test_list_consumer_groups_api(): with pytest.raises(TypeError): a.list_consumer_groups(states=["EMPTY"]) + with pytest.raises(TypeError): + a.list_consumer_groups(group_types=["UNKNOWN"]) + + with pytest.raises(TypeError): + a.list_consumer_groups(group_types="UNKNOWN") + + with pytest.raises(TypeError): + a.list_consumer_groups(group_types=[_ConsumerGroupType.UNKNOWN]) + + with pytest.raises(TypeError): + a.list_consumer_groups(group_types=[_ConsumerGroupType.CLASSIC, _ConsumerGroupType.CLASSIC]) + with pytest.raises(TypeError): a.list_consumer_groups(states=[ConsumerGroupState.EMPTY, ConsumerGroupState.STABLE]) diff --git a/tests/test_Consumer.py b/tests/test_Consumer.py index 48a8d3f8e..73cc999e3 100644 --- a/tests/test_Consumer.py +++ b/tests/test_Consumer.py @@ -1,9 +1,12 @@ #!/usr/bin/env python +import pytest + from confluent_kafka import (Consumer, TopicPartition, KafkaError, KafkaException, TIMESTAMP_NOT_AVAILABLE, OFFSET_INVALID, libversion) -import pytest + +from tests.common import TestConsumer def test_basic_api(): @@ -11,15 +14,15 @@ def test_basic_api(): broker configured. """ with pytest.raises(TypeError) as ex: - kc = Consumer() + kc = TestConsumer() assert ex.match('expected configuration dict') def dummy_commit_cb(err, partitions): pass - kc = Consumer({'group.id': 'test', 'socket.timeout.ms': '100', - 'session.timeout.ms': 1000, # Avoid close() blocking too long - 'on_commit': dummy_commit_cb}) + kc = TestConsumer({'group.id': 'test', 'socket.timeout.ms': '100', + 'session.timeout.ms': 1000, # Avoid close() blocking too long + 'on_commit': dummy_commit_cb}) kc.subscribe(["test"]) kc.unsubscribe() @@ -119,11 +122,11 @@ def dummy_assign_revoke(consumer, partitions): def test_store_offsets(): """ Basic store_offsets() tests """ - c = Consumer({'group.id': 'test', - 'enable.auto.commit': True, - 'enable.auto.offset.store': False, - 'socket.timeout.ms': 50, - 'session.timeout.ms': 100}) + c = TestConsumer({'group.id': 'test', + 'enable.auto.commit': True, + 'enable.auto.offset.store': False, + 'socket.timeout.ms': 50, + 'session.timeout.ms': 100}) c.subscribe(["test"]) @@ -161,10 +164,10 @@ def commit_cb(cs, err, ps): cs = CommitState('test', 2) - c = Consumer({'group.id': 'x', - 'enable.auto.commit': False, 'socket.timeout.ms': 50, - 'session.timeout.ms': 100, - 'on_commit': lambda err, ps: commit_cb(cs, err, ps)}) + c = TestConsumer({'group.id': 'x', + 'enable.auto.commit': False, 'socket.timeout.ms': 50, + 'session.timeout.ms': 100, + 'on_commit': lambda err, ps: commit_cb(cs, err, ps)}) c.assign([TopicPartition(cs.topic, cs.partition)]) @@ -196,11 +199,11 @@ def poll(self, somearg): @pytest.mark.skipif(libversion()[1] < 0x000b0000, reason="requires librdkafka >=0.11.0") def test_offsets_for_times(): - c = Consumer({'group.id': 'test', - 'enable.auto.commit': True, - 'enable.auto.offset.store': False, - 'socket.timeout.ms': 50, - 'session.timeout.ms': 100}) + c = TestConsumer({'group.id': 'test', + 'enable.auto.commit': True, + 'enable.auto.offset.store': False, + 'socket.timeout.ms': 50, + 'session.timeout.ms': 100}) # Query broker for timestamps for partition try: test_topic_partition = TopicPartition("test", 0, 100) @@ -216,11 +219,11 @@ def test_offsets_for_times(): def test_multiple_close_does_not_throw_exception(): """ Calling Consumer.close() multiple times should not throw Runtime Exception """ - c = Consumer({'group.id': 'test', - 'enable.auto.commit': True, - 'enable.auto.offset.store': False, - 'socket.timeout.ms': 50, - 'session.timeout.ms': 100}) + c = TestConsumer({'group.id': 'test', + 'enable.auto.commit': True, + 'enable.auto.offset.store': False, + 'socket.timeout.ms': 50, + 'session.timeout.ms': 100}) c.subscribe(["test"]) @@ -232,11 +235,11 @@ def test_multiple_close_does_not_throw_exception(): def test_any_method_after_close_throws_exception(): """ Calling any consumer method after close should throw a RuntimeError """ - c = Consumer({'group.id': 'test', - 'enable.auto.commit': True, - 'enable.auto.offset.store': False, - 'socket.timeout.ms': 50, - 'session.timeout.ms': 100}) + c = TestConsumer({'group.id': 'test', + 'enable.auto.commit': True, + 'enable.auto.offset.store': False, + 'socket.timeout.ms': 50, + 'session.timeout.ms': 100}) c.subscribe(["test"]) c.unsubscribe() @@ -296,11 +299,11 @@ def test_any_method_after_close_throws_exception(): def test_calling_store_offsets_after_close_throws_erro(): """ calling store_offset after close should throw RuntimeError """ - c = Consumer({'group.id': 'test', - 'enable.auto.commit': True, - 'enable.auto.offset.store': False, - 'socket.timeout.ms': 50, - 'session.timeout.ms': 100}) + c = TestConsumer({'group.id': 'test', + 'enable.auto.commit': True, + 'enable.auto.offset.store': False, + 'socket.timeout.ms': 50, + 'session.timeout.ms': 100}) c.subscribe(["test"]) c.unsubscribe() @@ -319,5 +322,5 @@ def test_consumer_without_groupid(): """ Consumer should raise exception if group.id is not set """ with pytest.raises(ValueError) as ex: - Consumer({'bootstrap.servers': "mybroker:9092"}) + TestConsumer({'bootstrap.servers': "mybroker:9092"}) assert ex.match('group.id must be set') diff --git a/tests/test_Producer.py b/tests/test_Producer.py index 7b81d2125..0f0d69e1d 100644 --- a/tests/test_Producer.py +++ b/tests/test_Producer.py @@ -1,10 +1,12 @@ #!/usr/bin/env python # -*- coding: utf-8 -*- import pytest +from struct import pack -from confluent_kafka import Producer, Consumer, KafkaError, KafkaException, \ +from confluent_kafka import Producer, KafkaError, KafkaException, \ TopicPartition, libversion -from struct import pack + +from tests.common import TestConsumer def error_cb(err): @@ -211,7 +213,7 @@ def test_transaction_api(): assert ex.value.args[0].fatal() is False assert ex.value.args[0].txn_requires_abort() is False - consumer = Consumer({"group.id": "testgroup"}) + consumer = TestConsumer({"group.id": "testgroup"}) group_metadata = consumer.consumer_group_metadata() consumer.close() diff --git a/tests/test_log.py b/tests/test_log.py index 6cd510819..dece976a7 100644 --- a/tests/test_log.py +++ b/tests/test_log.py @@ -1,10 +1,13 @@ #!/usr/bin/env python from io import StringIO +import logging + import confluent_kafka import confluent_kafka.avro -import logging +from tests.common import TestConsumer, TestAvroConsumer +import confluent_kafka.admin class CountingFilter(logging.Filter): def __init__(self, name): @@ -24,10 +27,9 @@ def test_logging_consumer(): logger.setLevel(logging.DEBUG) f = CountingFilter('consumer') logger.addFilter(f) - - kc = confluent_kafka.Consumer({'group.id': 'test', - 'debug': 'all'}, - logger=logger) + kc = TestConsumer({'group.id': 'test', + 'debug': 'all'}, + logger=logger) while f.cnt == 0: kc.poll(timeout=0.5) @@ -44,10 +46,10 @@ def test_logging_avro_consumer(): f = CountingFilter('avroconsumer') logger.addFilter(f) - kc = confluent_kafka.avro.AvroConsumer({'schema.registry.url': 'http://example.com', - 'group.id': 'test', - 'debug': 'all'}, - logger=logger) + kc = TestAvroConsumer({'schema.registry.url': 'http://example.com', + 'group.id': 'test', + 'debug': 'all'}, + logger=logger) while f.cnt == 0: kc.poll(timeout=0.5) @@ -149,7 +151,7 @@ def test_consumer_logger_logging_in_given_format(): handler.setFormatter(logging.Formatter('%(name)s Logger | %(message)s')) logger.addHandler(handler) - c = confluent_kafka.Consumer( + c = TestConsumer( {"bootstrap.servers": "test", "group.id": "test", "logger": logger, "debug": "msg"}) c.poll(0) diff --git a/tests/test_misc.py b/tests/test_misc.py index aca7b5a4f..4db5cff5a 100644 --- a/tests/test_misc.py +++ b/tests/test_misc.py @@ -1,14 +1,17 @@ #!/usr/bin/env python -import confluent_kafka -from confluent_kafka import Consumer, Producer -from confluent_kafka.admin import AdminClient import json import pytest import os import time import sys +import confluent_kafka +from confluent_kafka import Consumer, Producer +from confluent_kafka.admin import AdminClient + +from tests.common import TestConsumer + def test_version(): print('Using confluent_kafka module version %s (0x%x)' % confluent_kafka.version()) @@ -40,7 +43,7 @@ def error_cb(error_msg): 'error_cb': error_cb } - kc = confluent_kafka.Consumer(**conf) + kc = TestConsumer(conf) kc.subscribe(["test"]) while not seen_error_cb: kc.poll(timeout=0.1) @@ -64,7 +67,7 @@ def stats_cb(stats_json_str): 'stats_cb': stats_cb } - kc = confluent_kafka.Consumer(**conf) + kc = TestConsumer(conf) kc.subscribe(["test"]) while not seen_stats_cb: @@ -137,7 +140,7 @@ def oauth_cb(oauth_config): 'oauth_cb': oauth_cb } - kc = confluent_kafka.Consumer(**conf) + kc = TestConsumer(conf) while not seen_oauth_cb: kc.poll(timeout=0.1) @@ -162,7 +165,7 @@ def oauth_cb(oauth_config): 'oauth_cb': oauth_cb } - kc = confluent_kafka.Consumer(**conf) + kc = TestConsumer(conf) while not seen_oauth_cb: kc.poll(timeout=0.1) @@ -189,7 +192,7 @@ def oauth_cb(oauth_config): 'oauth_cb': oauth_cb } - kc = confluent_kafka.Consumer(**conf) + kc = TestConsumer(conf) while oauth_cb_count < 2: kc.poll(timeout=0.1) @@ -267,7 +270,7 @@ def on_delivery(err, msg): def test_set_sasl_credentials_api(): clients = [ AdminClient({}), - confluent_kafka.Consumer({"group.id": "dummy"}), + TestConsumer({"group.id": "dummy"}), confluent_kafka.Producer({})] for c in clients: diff --git a/tests/trivup/trivup-0.12.6.tar.gz b/tests/trivup/trivup-0.12.6.tar.gz new file mode 100644 index 000000000..5417120a6 Binary files /dev/null and b/tests/trivup/trivup-0.12.6.tar.gz differ diff --git a/tools/source-package-verification.sh b/tools/source-package-verification.sh index eb7506061..58ccefddc 100755 --- a/tools/source-package-verification.sh +++ b/tools/source-package-verification.sh @@ -18,8 +18,10 @@ export DYLD_LIBRARY_PATH="$DYLD_LIBRARY_PATH:$PWD/$lib_dir" python setup.py build && python setup.py install if [[ $OS_NAME == linux && $ARCH == x64 ]]; then - flake8 --exclude ./_venv,*_pb2.py - make docs + if [[ -z $TEST_CONSUMER_GROUP_PROTOCOL ]]; then + flake8 --exclude ./_venv,*_pb2.py + make docs + fi python -m pytest --timeout 1200 --ignore=dest else python -m pytest --timeout 1200 --ignore=dest --ignore=tests/integration