Skip to content

Commit 6642180

Browse files
authored
ElectLeaders api(KIP-460) implemented (#1818)
* electLeaders * requested changes * requested changes * style fix * requested changes * whitespace error * changes acc to new format * requested changes * whitespace error * line break * line break * indentation changes
1 parent a356f2a commit 6642180

File tree

8 files changed

+345
-4
lines changed

8 files changed

+345
-4
lines changed

examples/adminapi.py

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
AclOperation, AclPermissionType, AlterConfigOpType,
2727
ScramMechanism, ScramCredentialInfo,
2828
UserScramCredentialUpsertion, UserScramCredentialDeletion,
29-
OffsetSpec)
29+
OffsetSpec, ElectionType)
3030
import sys
3131
import threading
3232
import logging
@@ -878,6 +878,36 @@ def example_delete_records(a, args):
878878
f" before offset {partition.offset}: {e}")
879879

880880

881+
def example_elect_leaders(a, args):
882+
partitions = []
883+
if (len(args) - 1) % 2 != 0:
884+
raise ValueError("Invalid number of arguments for elect_leaders, Expected format: " +
885+
"elect_leaders <election_type> [<topic1> <partition1>" +
886+
" <topic2> <partition2> ..]")
887+
888+
try:
889+
election_type = ElectionType[args[0]]
890+
except KeyError:
891+
raise ValueError(f"Invalid election_type: {args[0]}, expected 'PREFERRED' or 'UNCLEAN'")
892+
893+
for topic, partition in zip(args[1::2], args[2::2]):
894+
partitions.append(TopicPartition(topic, int(partition)))
895+
896+
f = a.elect_leaders(election_type, partitions)
897+
try:
898+
results = f.result()
899+
for partition, exception in results.items():
900+
if exception is None:
901+
print(f"Leader Election Successful for topic: '{partition.topic}'" +
902+
f" partition: '{partition.partition}'")
903+
else:
904+
print(
905+
"Leader Election Failed for topic: " +
906+
f"'{partition.topic}' partition: '{partition.partition}': {exception}")
907+
except KafkaException as e:
908+
print(f"Error electing leaders: {e}")
909+
910+
881911
if __name__ == '__main__':
882912
if len(sys.argv) < 3:
883913
sys.stderr.write('Usage: %s <bootstrap-brokers> <operation> <args..>\n\n' % sys.argv[0])
@@ -917,6 +947,7 @@ def example_delete_records(a, args):
917947
sys.stderr.write(' list_offsets <isolation_level> <topic1> <partition1> <offset_spec1> ' +
918948
'[<topic2> <partition2> <offset_spec2> ..]\n')
919949
sys.stderr.write(' delete_records <topic1> <partition1> <offset1> [<topic2> <partition2> <offset2> ..]\n')
950+
sys.stderr.write(' elect_leaders <election_type> [<topic1> <partition1> <topic2> <partition2> ..]\n')
920951
sys.exit(1)
921952

922953
broker = sys.argv[1]
@@ -947,7 +978,8 @@ def example_delete_records(a, args):
947978
'describe_user_scram_credentials': example_describe_user_scram_credentials,
948979
'alter_user_scram_credentials': example_alter_user_scram_credentials,
949980
'list_offsets': example_list_offsets,
950-
'delete_records': example_delete_records}
981+
'delete_records': example_delete_records,
982+
'elect_leaders': example_elect_leaders}
951983

952984
if operation not in opsmap:
953985
sys.stderr.write('Unknown operation: %s\n' % operation)

src/confluent_kafka/admin/__init__.py

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@
5454
from ._listoffsets import (OffsetSpec, # noqa: F401
5555
ListOffsetsResultInfo)
5656

57+
from ._election import (ElectionType) # noqa: F401
58+
5759
from ._records import DeletedRecords # noqa: F401
5860

5961
from .._model import TopicCollection as _TopicCollection
@@ -548,6 +550,22 @@ def _check_delete_records(request):
548550
if req.partition < 0:
549551
raise ValueError("'partition' cannot be negative")
550552

553+
@staticmethod
554+
def _check_elect_leaders(election_type, partitions):
555+
if not isinstance(election_type, ElectionType):
556+
raise TypeError("Expected 'election_type' to be of type 'ElectionType'")
557+
if partitions is not None:
558+
if not isinstance(partitions, list):
559+
raise TypeError("Expected 'partitions' to be a list, got " +
560+
f"'{type(partitions).__name__}'")
561+
for partition in partitions:
562+
if not isinstance(partition, _TopicPartition):
563+
raise TypeError("Element of the 'partitions' list must be of type 'TopicPartition'" +
564+
f" got '{type(partition).__name__}' ")
565+
if partition.partition < 0:
566+
raise ValueError("Elements of the 'partitions' list must not have negative value" +
567+
" for 'partition' field")
568+
551569
def create_topics(self, new_topics, **kwargs):
552570
"""
553571
Create one or more new topics.
@@ -1258,3 +1276,38 @@ def delete_records(self, topic_partition_offsets, **kwargs):
12581276

12591277
super(AdminClient, self).delete_records(topic_partition_offsets, f, **kwargs)
12601278
return futmap
1279+
1280+
def elect_leaders(self, election_type, partitions=None, **kwargs):
1281+
"""
1282+
Perform Preferred or Unclean leader election for
1283+
all the specified topic partitions.
1284+
1285+
:param ElectionType election_type: The type of election to perform.
1286+
:param List[TopicPartition]|None partitions: The topic partitions to perform
1287+
the election on. Use ``None`` to perform on all the topic partitions.
1288+
:param float request_timeout: The overall request timeout in seconds,
1289+
including broker lookup, request transmission, operation time
1290+
on broker, and response. Default: `socket.timeout.ms*1000.0`
1291+
:param float operation_timeout: The operation timeout in seconds,
1292+
controlling how long the 'elect_leaders' request will block
1293+
on the broker waiting for the election to propagate
1294+
in the cluster. A value of 0 returns immediately.
1295+
Default: `socket.timeout.ms/1000.0`
1296+
1297+
:returns: A future. Method result() of the future returns
1298+
dict[TopicPartition, KafkaException|None].
1299+
1300+
:rtype: future
1301+
1302+
:raises KafkaException: Operation failed locally or on broker.
1303+
:raises TypeError: Invalid input type.
1304+
:raises ValueError: Invalid input value.
1305+
"""
1306+
1307+
AdminClient._check_elect_leaders(election_type, partitions)
1308+
1309+
f = AdminClient._create_future()
1310+
1311+
super(AdminClient, self).elect_leaders(election_type.value, partitions, f, **kwargs)
1312+
1313+
return f
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# Copyright 2024 Confluent Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from enum import Enum
16+
from .. import cimpl as _cimpl
17+
18+
19+
class ElectionType(Enum):
20+
"""
21+
Enumerates the different types of leader elections.
22+
"""
23+
PREFERRED = _cimpl.ELECTION_TYPE_PREFERRED #: Preferred election
24+
UNCLEAN = _cimpl.ELECTION_TYPE_UNCLEAN #: Unclean election
25+
26+
def __lt__(self, other):
27+
if self.__class__ != other.__class__:
28+
return NotImplemented
29+
return self.value < other.value

src/confluent_kafka/src/Admin.c

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3059,6 +3059,104 @@ const char Admin_delete_records_doc[] = PyDoc_STR(
30593059
"\n"
30603060
" This method should not be used directly, use confluent_kafka.AdminClient.delete_records()\n");
30613061

3062+
/**
3063+
* @brief Elect leaders
3064+
*/
3065+
PyObject *Admin_elect_leaders(Handle *self, PyObject *args, PyObject *kwargs) {
3066+
PyObject *election_type = NULL, *partitions = NULL, *future;
3067+
rd_kafka_ElectLeaders_t *c_elect_leaders = NULL;
3068+
rd_kafka_ElectionType_t c_election_type;
3069+
struct Admin_options options = Admin_options_INITIALIZER;
3070+
rd_kafka_AdminOptions_t *c_options = NULL;
3071+
rd_kafka_topic_partition_list_t *c_partitions = NULL;
3072+
CallState cs;
3073+
rd_kafka_queue_t *rkqu;
3074+
3075+
static char *kws[] = {"election_type",
3076+
"partitions"
3077+
"future",
3078+
/* options */
3079+
"request_timeout", "operation_timeout", NULL};
3080+
3081+
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "OOO|ff", kws,
3082+
&election_type, &partitions, &future,
3083+
&options.request_timeout,
3084+
&options.operation_timeout)) {
3085+
goto err;
3086+
}
3087+
3088+
c_options = Admin_options_to_c(self, RD_KAFKA_ADMIN_OP_ELECTLEADERS,
3089+
&options, future);
3090+
if (!c_options) {
3091+
goto err; /* Exception raised by options_to_c() */
3092+
}
3093+
3094+
/* options_to_c() sets future as the opaque, which is used in the
3095+
* background_event_cb to set the results on the future as the
3096+
* admin operation is finished, so we need to keep our own refcount. */
3097+
Py_INCREF(future);
3098+
3099+
c_election_type = (rd_kafka_ElectionType_t)cfl_PyInt_AsInt(election_type);
3100+
3101+
if (partitions != Py_None && !PyList_Check(partitions)) {
3102+
PyErr_SetString(PyExc_ValueError, "partitions must be None or a list");
3103+
goto err;
3104+
}
3105+
3106+
if (partitions != Py_None) {
3107+
c_partitions = py_to_c_parts(partitions);
3108+
}
3109+
3110+
c_elect_leaders = rd_kafka_ElectLeaders_new(c_election_type, c_partitions);
3111+
3112+
if(c_partitions) {
3113+
rd_kafka_topic_partition_list_destroy(c_partitions);
3114+
}
3115+
3116+
/* Use librdkafka's background thread queue to automatically dispatch
3117+
* Admin_background_event_cb() when the admin operation is finished. */
3118+
rkqu = rd_kafka_queue_get_background(self->rk);
3119+
3120+
/**
3121+
*
3122+
* Call ElectLeaders
3123+
*
3124+
* We need to set up a CallState and release GIL here since
3125+
* the event_cb may be triggered immediately.
3126+
*
3127+
*/
3128+
CallState_begin(self, &cs);
3129+
rd_kafka_ElectLeaders(self->rk, c_elect_leaders, c_options, rkqu);
3130+
CallState_end(self, &cs);
3131+
3132+
rd_kafka_queue_destroy(rkqu); /* drop reference from get_background */
3133+
3134+
rd_kafka_AdminOptions_destroy(c_options);
3135+
rd_kafka_ElectLeaders_destroy(c_elect_leaders);
3136+
3137+
Py_RETURN_NONE;
3138+
3139+
err:
3140+
if (c_elect_leaders) {
3141+
rd_kafka_ElectLeaders_destroy(c_elect_leaders);
3142+
}
3143+
if (c_options) {
3144+
rd_kafka_AdminOptions_destroy(c_options);
3145+
Py_DECREF(future);
3146+
}
3147+
return NULL;
3148+
}
3149+
3150+
const char Admin_elect_leaders_doc[] = PyDoc_STR(
3151+
".. py:function:: elect_leaders(election_type, partitions, "
3152+
"future, [request_timeout, operation_timeout])\n"
3153+
"\n"
3154+
" Perform Preferred or Unclean election for the specified "
3155+
"Topic Partitions.\n"
3156+
"\n"
3157+
" This method should not be used directly, use "
3158+
"confluent_kafka.AdminClient.elect_leaders()\n");
3159+
30623160
/**
30633161
* @brief Call rd_kafka_poll() and keep track of crashing callbacks.
30643162
* @returns -1 if callback crashed (or poll() failed), else the number
@@ -3225,6 +3323,10 @@ static PyMethodDef Admin_methods[] = {
32253323
Admin_delete_records_doc
32263324
},
32273325

3326+
{ "elect_leaders", (PyCFunction)Admin_elect_leaders, METH_VARARGS | METH_KEYWORDS,
3327+
Admin_elect_leaders_doc
3328+
},
3329+
32283330
{ NULL }
32293331
};
32303332

@@ -4875,6 +4977,23 @@ static void Admin_background_event_cb (rd_kafka_t *rk, rd_kafka_event_t *rkev,
48754977
break;
48764978
}
48774979

4980+
case RD_KAFKA_EVENT_ELECTLEADERS_RESULT:
4981+
{
4982+
size_t c_result_cnt;
4983+
4984+
const rd_kafka_ElectLeaders_result_t
4985+
*c_elect_leaders_res_event =
4986+
rd_kafka_event_ElectLeaders_result(rkev);
4987+
4988+
const rd_kafka_topic_partition_result_t **partition_results =
4989+
rd_kafka_ElectLeaders_result_partitions(
4990+
c_elect_leaders_res_event, &c_result_cnt);
4991+
4992+
result = c_topic_partition_result_to_py_dict(partition_results, c_result_cnt);
4993+
4994+
break;
4995+
}
4996+
48784997
default:
48794998
Py_DECREF(error); /* Py_None */
48804999
error = KafkaError_new0(RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE,

src/confluent_kafka/src/AdminTypes.c

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -597,6 +597,14 @@ static void AdminTypes_AddObjectsOffsetSpec (PyObject *m) {
597597
PyModule_AddIntConstant(m,"OFFSET_SPEC_LATEST", RD_KAFKA_OFFSET_SPEC_LATEST);
598598
}
599599

600+
static void AdminTypes_AddObjectsElectionType(PyObject *m) {
601+
/* rd_kafka_ElectionType_t */
602+
PyModule_AddIntConstant(m, "ELECTION_TYPE_PREFERRED",
603+
RD_KAFKA_ELECTION_TYPE_PREFERRED);
604+
PyModule_AddIntConstant(m, "ELECTION_TYPE_UNCLEAN",
605+
RD_KAFKA_ELECTION_TYPE_UNCLEAN);
606+
}
607+
600608
/**
601609
* @brief Add Admin types to module
602610
*/
@@ -616,4 +624,5 @@ void AdminTypes_AddObjects (PyObject *m) {
616624
AdminTypes_AddObjectsScramMechanismType(m);
617625
AdminTypes_AddObjectsIsolationLevel(m);
618626
AdminTypes_AddObjectsOffsetSpec(m);
627+
AdminTypes_AddObjectsElectionType(m);
619628
}

0 commit comments

Comments
 (0)