Skip to content

expose stats_cb #55

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Nov 2, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 69 additions & 4 deletions confluent_kafka/src/confluent_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -825,9 +825,9 @@ static void error_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque)
result = PyObject_CallFunctionObjArgs(h->error_cb, eo, NULL);
Py_DECREF(eo);

if (result) {
if (result)
Py_DECREF(result);
} else {
else {
CallState_crash(cs);
rd_kafka_yield(h->rk);
}
Expand All @@ -836,6 +836,32 @@ static void error_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque)
CallState_resume(cs);
}

static int stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque) {
Handle *h = opaque;
PyObject *eo = NULL, *result = NULL;
CallState *cs = NULL;

cs = CallState_get(h);
if (json_len == 0) {
/* No data returned*/
goto done;
}

eo = Py_BuildValue("s", json);
result = PyObject_CallFunctionObjArgs(h->stats_cb, eo, NULL);
Py_DECREF(eo);

if (result)
Py_DECREF(result);
else {
CallState_crash(cs);
rd_kafka_yield(h->rk);
}

done:
CallState_resume(cs);
return 0;
}

/****************************************************************************
*
Expand All @@ -853,9 +879,11 @@ static void error_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque)
* Clear Python object references in Handle
*/
void Handle_clear (Handle *h) {
if (h->error_cb) {
if (h->error_cb)
Py_DECREF(h->error_cb);
}

if (h->stats_cb)
Py_DECREF(h->stats_cb);

PyThread_delete_key(h->tlskey);
}
Expand All @@ -867,6 +895,9 @@ int Handle_traverse (Handle *h, visitproc visit, void *arg) {
if (h->error_cb)
Py_VISIT(h->error_cb);

if (h->stats_cb)
Py_VISIT(h->stats_cb);

return 0;
}

Expand Down Expand Up @@ -1113,6 +1144,15 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
continue;

} else if (!strcmp(k, "error_cb")) {
if (!PyCallable_Check(vo)) {
PyErr_SetString(PyExc_TypeError,
"expected error_cb property "
"as a callable function");
rd_kafka_topic_conf_destroy(tconf);
rd_kafka_conf_destroy(conf);
Py_DECREF(ks);
return NULL;
}
if (h->error_cb) {
Py_DECREF(h->error_cb);
h->error_cb = NULL;
Expand All @@ -1123,6 +1163,27 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
}
Py_DECREF(ks);
continue;
} else if (!strcmp(k, "stats_cb")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to check here and for error_cb if the object is callable (PyCallable_Check(vo)) (Py_None is okay too)

if (!PyCallable_Check(vo)) {
PyErr_SetString(PyExc_TypeError,
"expected stats_cb property "
"as a callable function");
rd_kafka_topic_conf_destroy(tconf);
rd_kafka_conf_destroy(conf);
Py_DECREF(ks);
return NULL;
}

if (h->stats_cb) {
Py_DECREF(h->stats_cb);
h->stats_cb = NULL;
}
if (vo != Py_None) {
h->stats_cb = vo;
Py_INCREF(h->stats_cb);
}
Py_DECREF(ks);
continue;
}

/* Special handling for certain config keys. */
Expand Down Expand Up @@ -1174,6 +1235,10 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,

if (h->error_cb)
rd_kafka_conf_set_error_cb(conf, error_cb);

if (h->stats_cb)
rd_kafka_conf_set_stats_cb(conf, stats_cb);

rd_kafka_topic_conf_set_opaque(tconf, h);
rd_kafka_conf_set_default_topic_conf(conf, tconf);

Expand Down
1 change: 1 addition & 0 deletions confluent_kafka/src/confluent_kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ typedef struct {
PyObject_HEAD
rd_kafka_t *rk;
PyObject *error_cb;
PyObject *stats_cb;
int tlskey; /* Thread-Local-Storage key */

union {
Expand Down
4 changes: 4 additions & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ The Python bindings also provide some additional configuration properties:
* ``error_cb(kafka.KafkaError)``: Callback for generic/global error events. This callback is served by
poll().

* ``stats_cb(json_str)``: Callback for statistics data. This callback is triggered by poll()
every ``statistics.interval.ms`` (needs to be configured separately).
Function argument ``json_str`` is a str instance of a JSON document containing statistics data.

* ``on_delivery(kafka.KafkaError, kafka.Message)`` (**Producer**): value is a Python function reference
that is called once for each produced message to indicate the final
delivery result (success or failure).
Expand Down
47 changes: 39 additions & 8 deletions examples/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,55 @@
#
# Example high-level Kafka 0.9 balanced Consumer
#

from confluent_kafka import Consumer, KafkaException, KafkaError
import sys
import getopt
import json
from pprint import pformat

if __name__ == '__main__':
if len(sys.argv) < 4:
sys.stderr.write('Usage: %s <bootstrap-brokers> <group> <topic1> <topic2> ..\n' % sys.argv[0])
sys.exit(1)
def stats_cb(stats_json_str):
stats_json = json.loads(stats_json_str)
print('\nKAFKA Stats: {}\n'.format(pformat(stats_json)))

broker = sys.argv[1]
group = sys.argv[2]
topics = sys.argv[3:]
def print_usage_and_exit(program_name):
sys.stderr.write('Usage: %s [options..] <bootstrap-brokers> <group> <topic1> <topic2> ..\n' % program_name)
options='''
Options:
-T <intvl> Enable client statistics at specified interval (ms)
'''
sys.stderr.write(options)
sys.exit(1)


if __name__ == '__main__':
optlist, argv = getopt.getopt(sys.argv[1:], 'T:')
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

if len(argv) < 3:
print_usage_and_exit(sys.argv[0])

broker = argv[0]
group = argv[1]
topics = argv[2:]
# Consumer configuration
# See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
conf = {'bootstrap.servers': broker, 'group.id': group, 'session.timeout.ms': 6000,
'default.topic.config': {'auto.offset.reset': 'smallest'}}

# Check to see if -T option exists
for opt in optlist:
if opt[0] != '-T':
continue
try:
intval = int(opt[1])
except:
sys.stderr.write("Invalid option value for -T: %s\n" % opt[1])
sys.exit(1)

if intval <= 0:
sys.stderr.write("-T option value needs to be larger than zero: %s\n" % opt[1])
sys.exit(1)

conf['stats_cb'] = stats_cb
conf['statistics.interval.ms'] = int(opt[1])

# Create Consumer instance
c = Consumer(**conf)
Expand Down
84 changes: 83 additions & 1 deletion examples/integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import time
import uuid
import sys
import json

try:
from progress.bar import Bar
Expand All @@ -35,10 +36,12 @@
bootstrap_servers = 'localhost'



# global variable to be set by stats_cb call back function
good_stats_cb_result = False

def error_cb (err):
print('Error: %s' % err)


class MyTestDr(object):
""" Producer: Delivery report callback """
Expand Down Expand Up @@ -353,6 +356,82 @@ def my_on_revoke (consumer, partitions):
c.close()


def verify_stats_cb():
""" Verify stats_cb """

def stats_cb(stats_json_str):
global good_stats_cb_result
stats_json = json.loads(stats_json_str)
if 'test' in stats_json['topics']:
app_offset = stats_json['topics']['test']['partitions']['0']['app_offset']
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably want to wrap this in a try: if any of the sub-dict keys are missing and just print the json blob on failure.

if app_offset > 0:
print("# app_offset stats for topic test partition 0: %d" % app_offset)
good_stats_cb_result = True

conf = {'bootstrap.servers': bootstrap_servers,
'group.id': uuid.uuid1(),
'session.timeout.ms': 6000,
'error_cb': error_cb,
'stats_cb': stats_cb,
'statistics.interval.ms': 200,
'default.topic.config': {
'auto.offset.reset': 'earliest'
}}

c = confluent_kafka.Consumer(**conf)
c.subscribe(["test"])

max_msgcnt = 1000000
bytecnt = 0
msgcnt = 0

print('Will now consume %d messages' % max_msgcnt)

if with_progress:
bar = Bar('Consuming', max=max_msgcnt,
suffix='%(index)d/%(max)d [%(eta_td)s]')
else:
bar = None

while not good_stats_cb_result:
# Consume until EOF or error

msg = c.poll(timeout=20.0)
if msg is None:
raise Exception('Stalled at %d/%d message, no new messages for 20s' %
(msgcnt, max_msgcnt))

if msg.error():
if msg.error().code() == confluent_kafka.KafkaError._PARTITION_EOF:
# Reached EOF for a partition, ignore.
continue
else:
raise confluent_kafka.KafkaException(msg.error())


bytecnt += len(msg)
msgcnt += 1

if bar is not None and (msgcnt % 10000) == 0:
bar.next(n=10000)

if msgcnt == 1:
t_first_msg = time.time()
if msgcnt >= max_msgcnt:
break

if bar is not None:
bar.finish()

if msgcnt > 0:
t_spent = time.time() - t_first_msg
print('%d messages (%.2fMb) consumed in %.3fs: %d msgs/s, %.2f Mb/s' % \
(msgcnt, bytecnt / (1024*1024), t_spent, msgcnt / t_spent,
(bytecnt / t_spent) / (1024*1024)))

print('closing consumer')
c.close()


if __name__ == '__main__':

Expand All @@ -377,6 +456,9 @@ def my_on_revoke (consumer, partitions):
print('=' * 30, 'Verifying Consumer performance', '=' * 30)
verify_consumer_performance()

print('=' * 30, 'Verifying stats_cb', '=' * 30)
verify_stats_cb()

print('=' * 30, 'Done', '=' * 30)


55 changes: 54 additions & 1 deletion tests/test_misc.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
#!/usr/bin/env python

import confluent_kafka

import json
import time
from pprint import pprint

def test_version():
print('Using confluent_kafka module version %s (0x%x)' % confluent_kafka.version())
Expand All @@ -14,3 +16,54 @@ def test_version():
assert len(sver) > 0
assert iver > 0

# global variable for error_cb call back function
seen_error_cb = False

def test_error_cb():
""" Tests error_cb. """

def error_cb(error_msg):
global seen_error_cb
seen_error_cb = True
assert error_msg.code() in (confluent_kafka.KafkaError._TRANSPORT, confluent_kafka.KafkaError._ALL_BROKERS_DOWN)

conf = {'bootstrap.servers': 'localhost:9093', # Purposely cause connection refused error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually probably the second most likely port to find a broker on localhost, so maybe you want to connect to localhost:22 or somesuch that will definately not a Kafka broker.

'group.id':'test',
'socket.timeout.ms':'100',
'session.timeout.ms': 1000, # Avoid close() blocking too long
'error_cb': error_cb
}

kc = confluent_kafka.Consumer(**conf)
kc.subscribe(["test"])
while not seen_error_cb:
kc.poll(timeout=1)

kc.close()

# global variable for stats_cb call back function
seen_stats_cb = False

def test_stats_cb():
""" Tests stats_cb. """

def stats_cb(stats_json_str):
global seen_stats_cb
seen_stats_cb = True
stats_json = json.loads(stats_json_str)
assert len(stats_json['name']) > 0

conf = {'group.id':'test',
'socket.timeout.ms':'100',
'session.timeout.ms': 1000, # Avoid close() blocking too long
'statistics.interval.ms': 200,
'stats_cb': stats_cb
}

kc = confluent_kafka.Consumer(**conf)

kc.subscribe(["test"])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing as suggested for error_cb

while not seen_stats_cb:
kc.poll(timeout=1)
kc.close()