diff --git a/confluent_kafka/src/confluent_kafka.c b/confluent_kafka/src/confluent_kafka.c index 3539f16f7..657505198 100644 --- a/confluent_kafka/src/confluent_kafka.c +++ b/confluent_kafka/src/confluent_kafka.c @@ -825,9 +825,9 @@ static void error_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque) result = PyObject_CallFunctionObjArgs(h->error_cb, eo, NULL); Py_DECREF(eo); - if (result) { + if (result) Py_DECREF(result); - } else { + else { CallState_crash(cs); rd_kafka_yield(h->rk); } @@ -836,6 +836,32 @@ static void error_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque) CallState_resume(cs); } +static int stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque) { + Handle *h = opaque; + PyObject *eo = NULL, *result = NULL; + CallState *cs = NULL; + + cs = CallState_get(h); + if (json_len == 0) { + /* No data returned*/ + goto done; + } + + eo = Py_BuildValue("s", json); + result = PyObject_CallFunctionObjArgs(h->stats_cb, eo, NULL); + Py_DECREF(eo); + + if (result) + Py_DECREF(result); + else { + CallState_crash(cs); + rd_kafka_yield(h->rk); + } + + done: + CallState_resume(cs); + return 0; +} /**************************************************************************** * @@ -853,9 +879,11 @@ static void error_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque) * Clear Python object references in Handle */ void Handle_clear (Handle *h) { - if (h->error_cb) { + if (h->error_cb) Py_DECREF(h->error_cb); - } + + if (h->stats_cb) + Py_DECREF(h->stats_cb); PyThread_delete_key(h->tlskey); } @@ -867,6 +895,9 @@ int Handle_traverse (Handle *h, visitproc visit, void *arg) { if (h->error_cb) Py_VISIT(h->error_cb); + if (h->stats_cb) + Py_VISIT(h->stats_cb); + return 0; } @@ -1113,6 +1144,15 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype, continue; } else if (!strcmp(k, "error_cb")) { + if (!PyCallable_Check(vo)) { + PyErr_SetString(PyExc_TypeError, + "expected error_cb property " + "as a callable function"); + rd_kafka_topic_conf_destroy(tconf); + rd_kafka_conf_destroy(conf); + Py_DECREF(ks); + return NULL; + } if (h->error_cb) { Py_DECREF(h->error_cb); h->error_cb = NULL; @@ -1123,6 +1163,27 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype, } Py_DECREF(ks); continue; + } else if (!strcmp(k, "stats_cb")) { + if (!PyCallable_Check(vo)) { + PyErr_SetString(PyExc_TypeError, + "expected stats_cb property " + "as a callable function"); + rd_kafka_topic_conf_destroy(tconf); + rd_kafka_conf_destroy(conf); + Py_DECREF(ks); + return NULL; + } + + if (h->stats_cb) { + Py_DECREF(h->stats_cb); + h->stats_cb = NULL; + } + if (vo != Py_None) { + h->stats_cb = vo; + Py_INCREF(h->stats_cb); + } + Py_DECREF(ks); + continue; } /* Special handling for certain config keys. */ @@ -1174,6 +1235,10 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype, if (h->error_cb) rd_kafka_conf_set_error_cb(conf, error_cb); + + if (h->stats_cb) + rd_kafka_conf_set_stats_cb(conf, stats_cb); + rd_kafka_topic_conf_set_opaque(tconf, h); rd_kafka_conf_set_default_topic_conf(conf, tconf); diff --git a/confluent_kafka/src/confluent_kafka.h b/confluent_kafka/src/confluent_kafka.h index 5e8eb7989..ad5bd7eb9 100644 --- a/confluent_kafka/src/confluent_kafka.h +++ b/confluent_kafka/src/confluent_kafka.h @@ -114,6 +114,7 @@ typedef struct { PyObject_HEAD rd_kafka_t *rk; PyObject *error_cb; + PyObject *stats_cb; int tlskey; /* Thread-Local-Storage key */ union { diff --git a/docs/index.rst b/docs/index.rst index 229fe9ef3..3ed82a4af 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -81,6 +81,10 @@ The Python bindings also provide some additional configuration properties: * ``error_cb(kafka.KafkaError)``: Callback for generic/global error events. This callback is served by poll(). +* ``stats_cb(json_str)``: Callback for statistics data. This callback is triggered by poll() + every ``statistics.interval.ms`` (needs to be configured separately). + Function argument ``json_str`` is a str instance of a JSON document containing statistics data. + * ``on_delivery(kafka.KafkaError, kafka.Message)`` (**Producer**): value is a Python function reference that is called once for each produced message to indicate the final delivery result (success or failure). diff --git a/examples/consumer.py b/examples/consumer.py index d9bda0b4e..40b4b2967 100755 --- a/examples/consumer.py +++ b/examples/consumer.py @@ -18,24 +18,55 @@ # # Example high-level Kafka 0.9 balanced Consumer # - from confluent_kafka import Consumer, KafkaException, KafkaError import sys +import getopt +import json +from pprint import pformat -if __name__ == '__main__': - if len(sys.argv) < 4: - sys.stderr.write('Usage: %s ..\n' % sys.argv[0]) - sys.exit(1) +def stats_cb(stats_json_str): + stats_json = json.loads(stats_json_str) + print('\nKAFKA Stats: {}\n'.format(pformat(stats_json))) - broker = sys.argv[1] - group = sys.argv[2] - topics = sys.argv[3:] +def print_usage_and_exit(program_name): + sys.stderr.write('Usage: %s [options..] ..\n' % program_name) + options=''' + Options: + -T Enable client statistics at specified interval (ms) +''' + sys.stderr.write(options) + sys.exit(1) + +if __name__ == '__main__': + optlist, argv = getopt.getopt(sys.argv[1:], 'T:') + if len(argv) < 3: + print_usage_and_exit(sys.argv[0]) + + broker = argv[0] + group = argv[1] + topics = argv[2:] # Consumer configuration # See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md conf = {'bootstrap.servers': broker, 'group.id': group, 'session.timeout.ms': 6000, 'default.topic.config': {'auto.offset.reset': 'smallest'}} + # Check to see if -T option exists + for opt in optlist: + if opt[0] != '-T': + continue + try: + intval = int(opt[1]) + except: + sys.stderr.write("Invalid option value for -T: %s\n" % opt[1]) + sys.exit(1) + + if intval <= 0: + sys.stderr.write("-T option value needs to be larger than zero: %s\n" % opt[1]) + sys.exit(1) + + conf['stats_cb'] = stats_cb + conf['statistics.interval.ms'] = int(opt[1]) # Create Consumer instance c = Consumer(**conf) diff --git a/examples/integration_test.py b/examples/integration_test.py index f1f9a6009..49369d899 100755 --- a/examples/integration_test.py +++ b/examples/integration_test.py @@ -24,6 +24,7 @@ import time import uuid import sys +import json try: from progress.bar import Bar @@ -35,10 +36,12 @@ bootstrap_servers = 'localhost' - +# global variable to be set by stats_cb call back function +good_stats_cb_result = False def error_cb (err): print('Error: %s' % err) + class MyTestDr(object): """ Producer: Delivery report callback """ @@ -353,6 +356,82 @@ def my_on_revoke (consumer, partitions): c.close() +def verify_stats_cb(): + """ Verify stats_cb """ + + def stats_cb(stats_json_str): + global good_stats_cb_result + stats_json = json.loads(stats_json_str) + if 'test' in stats_json['topics']: + app_offset = stats_json['topics']['test']['partitions']['0']['app_offset'] + if app_offset > 0: + print("# app_offset stats for topic test partition 0: %d" % app_offset) + good_stats_cb_result = True + + conf = {'bootstrap.servers': bootstrap_servers, + 'group.id': uuid.uuid1(), + 'session.timeout.ms': 6000, + 'error_cb': error_cb, + 'stats_cb': stats_cb, + 'statistics.interval.ms': 200, + 'default.topic.config': { + 'auto.offset.reset': 'earliest' + }} + + c = confluent_kafka.Consumer(**conf) + c.subscribe(["test"]) + + max_msgcnt = 1000000 + bytecnt = 0 + msgcnt = 0 + + print('Will now consume %d messages' % max_msgcnt) + + if with_progress: + bar = Bar('Consuming', max=max_msgcnt, + suffix='%(index)d/%(max)d [%(eta_td)s]') + else: + bar = None + + while not good_stats_cb_result: + # Consume until EOF or error + + msg = c.poll(timeout=20.0) + if msg is None: + raise Exception('Stalled at %d/%d message, no new messages for 20s' % + (msgcnt, max_msgcnt)) + + if msg.error(): + if msg.error().code() == confluent_kafka.KafkaError._PARTITION_EOF: + # Reached EOF for a partition, ignore. + continue + else: + raise confluent_kafka.KafkaException(msg.error()) + + + bytecnt += len(msg) + msgcnt += 1 + + if bar is not None and (msgcnt % 10000) == 0: + bar.next(n=10000) + + if msgcnt == 1: + t_first_msg = time.time() + if msgcnt >= max_msgcnt: + break + + if bar is not None: + bar.finish() + + if msgcnt > 0: + t_spent = time.time() - t_first_msg + print('%d messages (%.2fMb) consumed in %.3fs: %d msgs/s, %.2f Mb/s' % \ + (msgcnt, bytecnt / (1024*1024), t_spent, msgcnt / t_spent, + (bytecnt / t_spent) / (1024*1024))) + + print('closing consumer') + c.close() + if __name__ == '__main__': @@ -377,6 +456,9 @@ def my_on_revoke (consumer, partitions): print('=' * 30, 'Verifying Consumer performance', '=' * 30) verify_consumer_performance() + print('=' * 30, 'Verifying stats_cb', '=' * 30) + verify_stats_cb() + print('=' * 30, 'Done', '=' * 30) diff --git a/tests/test_misc.py b/tests/test_misc.py index f29eb4853..c7a4f4107 100644 --- a/tests/test_misc.py +++ b/tests/test_misc.py @@ -1,7 +1,9 @@ #!/usr/bin/env python import confluent_kafka - +import json +import time +from pprint import pprint def test_version(): print('Using confluent_kafka module version %s (0x%x)' % confluent_kafka.version()) @@ -14,3 +16,54 @@ def test_version(): assert len(sver) > 0 assert iver > 0 +# global variable for error_cb call back function +seen_error_cb = False + +def test_error_cb(): + """ Tests error_cb. """ + + def error_cb(error_msg): + global seen_error_cb + seen_error_cb = True + assert error_msg.code() in (confluent_kafka.KafkaError._TRANSPORT, confluent_kafka.KafkaError._ALL_BROKERS_DOWN) + + conf = {'bootstrap.servers': 'localhost:9093', # Purposely cause connection refused error + 'group.id':'test', + 'socket.timeout.ms':'100', + 'session.timeout.ms': 1000, # Avoid close() blocking too long + 'error_cb': error_cb + } + + kc = confluent_kafka.Consumer(**conf) + kc.subscribe(["test"]) + while not seen_error_cb: + kc.poll(timeout=1) + + kc.close() + +# global variable for stats_cb call back function +seen_stats_cb = False + +def test_stats_cb(): + """ Tests stats_cb. """ + + def stats_cb(stats_json_str): + global seen_stats_cb + seen_stats_cb = True + stats_json = json.loads(stats_json_str) + assert len(stats_json['name']) > 0 + + conf = {'group.id':'test', + 'socket.timeout.ms':'100', + 'session.timeout.ms': 1000, # Avoid close() blocking too long + 'statistics.interval.ms': 200, + 'stats_cb': stats_cb + } + + kc = confluent_kafka.Consumer(**conf) + + kc.subscribe(["test"]) + while not seen_stats_cb: + kc.poll(timeout=1) + kc.close() +