Skip to content

Add timestamp() to librdkafka messages #50

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Nov 9, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions confluent_kafka/src/confluent_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,13 @@ static PyObject *Message_offset (Message *self, PyObject *ignore) {
}


static PyObject *Message_timestamp (Message *self, PyObject *ignore) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to have a single timestamp() method that returns a tuple: tstype, timestamp

return Py_BuildValue("iL",
self->tstype,
self->timestamp);
}


static PyMethodDef Message_methods[] = {
{ "error", (PyCFunction)Message_error, METH_NOARGS,
" The message object is also used to propagate errors and events, "
Expand Down Expand Up @@ -362,6 +369,11 @@ static PyMethodDef Message_methods[] = {
" :rtype: int or None\n"
"\n"
},
{ "timestamp", (PyCFunction)Message_timestamp, METH_NOARGS,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It needs to return the timestamp type as well (none, append, create).
Im not sure whether it should be some class level constants or strings, probably the former. @ewencp?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add the timestamp type as a class level constant, e.g.:
TIMESTAMP_NOT_AVAILABLE, TIMESTAMP_CREATE_TIME, TIMESTAMP_APPEND_TIME
Map it directly to the librdkafka counterparts.

" :returns: tuple of message tstype, and timestamp.\n"
" :rtype: (int, int)\n"
"\n"
},
{ NULL }
};

Expand Down Expand Up @@ -495,6 +507,8 @@ PyObject *Message_new0 (const rd_kafka_message_t *rkm) {
self->partition = rkm->partition;
self->offset = rkm->offset;

self->timestamp = rd_kafka_message_timestamp(rkm, &self->tstype);

return (PyObject *)self;
}

Expand Down Expand Up @@ -1419,6 +1433,10 @@ static PyObject *_init_cimpl (void) {
Py_INCREF(KafkaException);
PyModule_AddObject(m, "KafkaException", KafkaException);

PyModule_AddIntConstant(m, "TIMESTAMP_NOT_AVAILABLE", RD_KAFKA_TIMESTAMP_NOT_AVAILABLE);
PyModule_AddIntConstant(m, "TIMESTAMP_CREATE_TIME", RD_KAFKA_TIMESTAMP_CREATE_TIME);
PyModule_AddIntConstant(m, "TIMESTAMP_LOG_APPEND_TIME", RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME);

return m;
}

Expand Down
2 changes: 2 additions & 0 deletions confluent_kafka/src/confluent_kafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ typedef struct {
PyObject *error;
int32_t partition;
int64_t offset;
int64_t timestamp;
rd_kafka_timestamp_type_t tstype;
} Message;

extern PyTypeObject MessageType;
Expand Down
6 changes: 4 additions & 2 deletions examples/integration_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ def verify_consumer():
'group.id': 'test.py',
'session.timeout.ms': 6000,
'enable.auto.commit': False,
'api.version.request': True,
'on_commit': print_commit_result,
'error_cb': error_cb,
'default.topic.config': {
Expand Down Expand Up @@ -243,9 +244,10 @@ def verify_consumer():
break

if False:
print('%s[%d]@%d: key=%s, value=%s' % \
tstype, timestamp = msg.timestamp()
print('%s[%d]@%d: key=%s, value=%s, tstype=%d, timestamp=%s' % \
(msg.topic(), msg.partition(), msg.offset(),
msg.key(), msg.value()))
msg.key(), msg.value(), tstype, timestamp))

if (msg.offset() % 5) == 0:
# Async commit
Expand Down
5 changes: 4 additions & 1 deletion tests/test_Consumer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#!/usr/bin/env python

from confluent_kafka import Consumer, TopicPartition, KafkaError, KafkaException
from confluent_kafka import Consumer, TopicPartition, KafkaError, KafkaException, TIMESTAMP_NOT_AVAILABLE


def test_basic_api():
Expand Down Expand Up @@ -36,6 +36,9 @@ def dummy_assign_revoke (consumer, partitions):
else:
print('OK: consumed message')

if msg is not None:
assert msg.timestamp() == (TIMESTAMP_NOT_AVAILABLE, -1)

partitions = list(map(lambda p: TopicPartition("test", p), range(0,100,3)))
kc.assign(partitions)

Expand Down
6 changes: 6 additions & 0 deletions tests/test_enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,9 @@ def test_enums():
KafkaError class without an instantiated object. """
print(confluent_kafka.KafkaError._NO_OFFSET)
print(confluent_kafka.KafkaError.REBALANCE_IN_PROGRESS)

def test_tstype_enums():
""" Make sure librdkafka tstype enums are available. """
assert confluent_kafka.TIMESTAMP_NOT_AVAILABLE == 0
assert confluent_kafka.TIMESTAMP_CREATE_TIME == 1
assert confluent_kafka.TIMESTAMP_LOG_APPEND_TIME == 2