-
Notifications
You must be signed in to change notification settings - Fork 915
expose stats_cb #55
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
expose stats_cb #55
Conversation
It looks like @hqin hasn't signed our Contributor License Agreement, yet. Appreciation of efforts, clabot |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code looks good but needs some styling.
Some minor points that should be addressed as well.
I'm missing unit-tests (e.g. tests/test_misc.py
) that should verify the callback is called and the JSON is parsable.
Also want to see support for stats_cb in the integration test (examples/integration_test.py
), it could extract some interesting value (such as avg rtt for brokers) and present that.
It also needs to be added to the documentation in docs/index.rst
CallState *cs=NULL; | ||
|
||
cs = CallState_get(h); | ||
if (json_len== 0 || !h->stats_cb) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We dont need to enable the C stats_cb if no Python stats_cb has been set, which remove the !h->stats_cb
check
static int stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque) | ||
{ | ||
Handle *h = opaque; | ||
PyObject *eo=NULL, *result=NULL; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Style is inconsistent, let's try to maintain same style as in existing file to ease readability.
Functions: Open-brace on same line as function decl.
Variable declarations: type SPACE variablename SPACE = SPACE value;
If statements: if SPACE (EXPR) SPACE {
EXPR: lhs SPACE operator SPACE rhs
(e.g., json_len == 0
)
@@ -857,6 +884,10 @@ void Handle_clear (Handle *h) { | |||
Py_DECREF(h->error_cb); | |||
} | |||
|
|||
if (h->stats_cb) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: remove braces for single-line clauses (you can remove the ones for error_cb above too, shouldnt be there)
@@ -1123,6 +1157,17 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype, | |||
} | |||
Py_DECREF(ks); | |||
continue; | |||
} else if (!strcmp(k, "stats_cb")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We might want to check here and for error_cb if the object is callable (PyCallable_Check(vo)
) (Py_None is okay too)
@@ -1174,6 +1219,11 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype, | |||
|
|||
if (h->error_cb) | |||
rd_kafka_conf_set_error_cb(conf, error_cb); | |||
|
|||
if (h->stats_cb) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: remove braces
sys.stderr.write('Usage: %s [options..] <bootstrap-brokers> <group> <topic1> <topic2> ..\n' % program_name) | ||
options=''' | ||
Options: | ||
-T <intvl> Enable statistics from Kafka at specified interval (ms) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the stats are from the client, not Kafka, so might want to rephrase that to something like:
Enable client statistics at specified interval (ms)
|
||
|
||
if __name__ == '__main__': | ||
optlist, argv = getopt.getopt(sys.argv[1:], 'T:') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
from confluent_kafka import Producer | ||
import sys | ||
import getopt |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we only the stats_cb example in of the examples (to keep the examples small).
Remove either this one or the other one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Almost there!
@@ -1146,7 +1143,7 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype, | |||
Py_DECREF(ks); | |||
continue; | |||
|
|||
} else if (!strcmp(k, "error_cb")) { | |||
} else if (!strcmp(k, "error_cb") && PyCallable_Check(vo)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will need to check PyCallable inside error_cb if-clause.
E.g.:
if (k=="error_cb") {
if (!callable) {
error_out("\"error_cb\" must be a callable");
return..
}
h->error_cb = ..;
}.
Same thing for stats_cb
@@ -880,13 +879,11 @@ static int stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque) | |||
* Clear Python object references in Handle | |||
*/ | |||
void Handle_clear (Handle *h) { | |||
if (h->error_cb) { | |||
if (h->error_cb) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: There's a trailing whitespace after ). This might seem silly but it will show up as a red blob in editors and git gui :)
def stats_cb(stats_json_str): | ||
stats_json = json.loads(stats_json_str) | ||
if 'test' in stats_json['topics']: | ||
print("# app_offset stats for topic test partition 0: %d" % stats_json['topics']['test']['partitions']['0']['app_offset']) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe put this in a try
block to give a meaningful error message if those dict keys are not available
else: | ||
bar = None | ||
|
||
while True: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, but this loop doesnt actually verify that stats were received.
I think it could probably finish on the first complete stats seen to speed things up.
kc = confluent_kafka.Consumer(**conf) | ||
|
||
kc.subscribe(["test"]) | ||
kc.poll(timeout=0.001) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does not verify that error_cb was actually called.
Maybe construct it like so, without sleep:
in test_error_cb:
seen_error_cb = True
....
while !seen_error_cb:
kc.poll(timeout=1)
kc.close()
Test timeouts will handle the case where no callback was seen
# print(stats_json_str) | ||
try: | ||
stats_json = json.loads(stats_json_str) | ||
if 'type' in stats_json: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldnt this be an assertation for checking that the json doc actually contains something we know?
|
||
kc = confluent_kafka.Consumer(**conf) | ||
|
||
kc.subscribe(["test"]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same thing as suggested for error_cb
@@ -749,7 +749,7 @@ PyObject *c_parts_to_py (const rd_kafka_topic_partition_list_t *c_parts) { | |||
|
|||
parts = PyList_New(c_parts->cnt); | |||
|
|||
for (i = 0 ; i < c_parts->cnt ; i++) { | |||
for (i = 0 ; i < (size_t)c_parts->cnt ; i++) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make these size_t changes a separate commit, for tracking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Almost perfect, just some small stuff now
stats_json = json.loads(stats_json_str) | ||
if 'test' in stats_json['topics']: | ||
print("# app_offset stats for topic test partition 0: %d" % stats_json['topics']['test']['partitions']['0']['app_offset']) | ||
app_offset = stats_json['topics']['test']['partitions']['0']['app_offset'] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably want to wrap this in a try: if any of the sub-dict keys are missing and just print the json blob on failure.
def test_error_cb(): | ||
""" Tests error_cb. """ | ||
|
||
def error_cb(error_msg): | ||
print('OK: error_cb() called') | ||
global seen_error_cb | ||
seen_error_cb = True | ||
assert error_msg.code() in (confluent_kafka.KafkaError._TRANSPORT, confluent_kafka.KafkaError._ALL_BROKERS_DOWN) | ||
|
||
conf = {'bootstrap.servers': 'localhost:9093', # Purposely cause connection refused error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is actually probably the second most likely port to find a broker on localhost, so maybe you want to connect to localhost:22 or somesuch that will definately not a Kafka broker.
Also need to add documentation for stats_cb in docs/index.rst to (do a |
@@ -81,6 +81,8 @@ The Python bindings also provide some additional configuration properties: | |||
* ``error_cb(kafka.KafkaError)``: Callback for generic/global error events. This callback is served by | |||
poll(). | |||
|
|||
* ``stats_cb(json_str)``: Callback for statistics data. This callback is triggered by poll() every ``statistics.interval.ms`` (needs to be configured separately). Function argument ``json_str`` is a str instance of a JSON document containing statistics data. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
docs are good, but split up the lines at about 80 columns so it is readable in the terminal too.
Awesome! |
Resolves issue #43
@edenhill @ewencp Please review the changes