diff --git a/confluent_kafka/src/Consumer.c b/confluent_kafka/src/Consumer.c index 80e136a9e..7e9df2b1f 100644 --- a/confluent_kafka/src/Consumer.c +++ b/confluent_kafka/src/Consumer.c @@ -41,6 +41,10 @@ static int Consumer_clear (Consumer *self) { Py_DECREF(self->on_commit); self->on_commit = NULL; } + if (self->on_stats) { + Py_DECREF(self->on_stats); + self->on_stats = NULL; + } return 0; } @@ -579,6 +583,44 @@ PyTypeObject ConsumerType = { Consumer_new /* tp_new */ }; +static int Consumer_stats_cb (rd_kafka_t *rk, char *json, + size_t json_len, void *opaque) { + Consumer *self = opaque; + PyObject *args, *result; + PyObject *jsonModuleString, *jsonModule, *jsonLoadsFunction, *statsAsString, *statsDict; + + PyEval_RestoreThread(self->thread_state); + + // Convert json *char to python dict + jsonModuleString = PyUnicode_FromString((char*)"json"); + jsonModule = PyImport_Import(jsonModuleString); + jsonLoadsFunction = PyObject_GetAttrString(jsonModule,(char*)"loads"); + statsAsString = Py_BuildValue("(s)", json); + statsDict = PyObject_CallObject(jsonLoadsFunction, statsAsString); + + // Cleanup python objects + Py_DECREF(statsAsString); + Py_DECREF(jsonModuleString); + Py_DECREF(jsonModule); + Py_DECREF(jsonLoadsFunction); + + args = Py_BuildValue("(O)", statsDict); + Py_DECREF(statsDict); + + if (self->on_stats) { + result = PyObject_CallObject(self->on_stats, args); + if (result) + Py_DECREF(result); + else { + self->callback_crashed++; + rd_kafka_yield(rk); + } + } + Py_DECREF(args); + + self->thread_state = PyEval_SaveThread(); + return 0; +} static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err, @@ -703,6 +745,7 @@ static PyObject *Consumer_new (PyTypeObject *type, PyObject *args, rd_kafka_conf_set_rebalance_cb(conf, Consumer_rebalance_cb); rd_kafka_conf_set_offset_commit_cb(conf, Consumer_offset_commit_cb); + rd_kafka_conf_set_stats_cb(conf, Consumer_stats_cb); self->rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr)); diff --git a/confluent_kafka/src/confluent_kafka.c b/confluent_kafka/src/confluent_kafka.c index e85ef2025..ea216673a 100644 --- a/confluent_kafka/src/confluent_kafka.c +++ b/confluent_kafka/src/confluent_kafka.c @@ -989,6 +989,21 @@ static int consumer_conf_set_special (Consumer *self, rd_kafka_conf_t *conf, return 1; } + if (!strcasecmp(name, "on_stats")) { + if (!PyCallable_Check(valobj)) { + cfl_PyErr_Format( + RD_KAFKA_RESP_ERR__INVALID_ARG, + "%s requires a callable " + "object", name); + return -1; + } + + self->on_stats = valobj; + Py_INCREF(self->on_stats); + + return 1; + } + return 0; } diff --git a/confluent_kafka/src/confluent_kafka.h b/confluent_kafka/src/confluent_kafka.h index b96e88129..b4805c8de 100644 --- a/confluent_kafka/src/confluent_kafka.h +++ b/confluent_kafka/src/confluent_kafka.h @@ -200,6 +200,7 @@ typedef struct { PyObject *on_assign; /* Rebalance: on_assign callback */ PyObject *on_revoke; /* Rebalance: on_revoke callback */ PyObject *on_commit; /* Commit callback */ + PyObject *on_stats; /* Stat callback */ int callback_crashed; PyThreadState *thread_state; } Consumer; diff --git a/examples/integration_test.py b/examples/integration_test.py index f2c5d43bf..9c08d85da 100755 --- a/examples/integration_test.py +++ b/examples/integration_test.py @@ -35,8 +35,6 @@ bootstrap_servers = 'localhost' - - class MyTestDr(object): """ Producer: Delivery report callback """ diff --git a/tests/test_Consumer.py b/tests/test_Consumer.py index 8d538123e..752d062ee 100644 --- a/tests/test_Consumer.py +++ b/tests/test_Consumer.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +import pytest from confluent_kafka import Consumer, TopicPartition, KafkaError, KafkaException @@ -60,3 +60,24 @@ def dummy_assign_revoke (consumer, partitions): kc.close() +def test_stats_callback_consumer(): + + class TestCallbackException(Exception): + """ + Custom test exception to throw from stats callback + """ + pass + + def stats_callback(stats_dict): + raise TestCallbackException() + + kc = Consumer({'group.id':'test', 'socket.timeout.ms':'100', + 'session.timeout.ms': 1000, # Avoid close() blocking too long + 'on_stats': stats_callback, 'statistics.interval.ms': 100}) + + kc.subscribe(['test']) + kc.unsubscribe() + with pytest.raises(TestCallbackException): + msg = kc.poll(0.1) + + kc.close() diff --git a/tests/test_Producer.py b/tests/test_Producer.py index 68ce03032..1b509d467 100644 --- a/tests/test_Producer.py +++ b/tests/test_Producer.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python - from confluent_kafka import Producer, KafkaError, KafkaException @@ -30,5 +28,3 @@ def on_delivery(err,msg): p.poll(0.001) p.flush() - -