Skip to content

Commit 339cb33

Browse files
committed
Updates for requested changes
1 parent 2e6c56d commit 339cb33

File tree

3 files changed

+16
-8
lines changed

3 files changed

+16
-8
lines changed

confluent_kafka/src/confluent_kafka.c

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -327,13 +327,18 @@ static PyObject *Message_offset (Message *self, PyObject *ignore) {
327327

328328

329329
static PyObject *Message_timestamp (Message *self, PyObject *ignore) {
330-
if (self->timestamp > 0)
330+
if (self->tstype != RD_KAFKA_TIMESTAMP_NOT_AVAILABLE)
331331
return PyLong_FromLong(self->timestamp);
332332
else
333333
Py_RETURN_NONE;
334334
}
335335

336336

337+
static PyObject *Message_tstype (Message *self, PyObject *ignore) {
338+
return PyLong_FromLong(self->tstype);
339+
}
340+
341+
337342
static PyMethodDef Message_methods[] = {
338343
{ "error", (PyCFunction)Message_error, METH_NOARGS,
339344
" The message object is also used to propagate errors and events, "
@@ -375,6 +380,11 @@ static PyMethodDef Message_methods[] = {
375380
" :rtype: int or None\n"
376381
"\n"
377382
},
383+
{ "tstype", (PyCFunction)Message_tstype, METH_NOARGS,
384+
" :returns: message timestamp type.\n"
385+
" :rtype: int\n"
386+
"\n"
387+
},
378388
{ NULL }
379389
};
380390

@@ -508,11 +518,7 @@ PyObject *Message_new0 (const rd_kafka_message_t *rkm) {
508518
self->partition = rkm->partition;
509519
self->offset = rkm->offset;
510520

511-
rd_kafka_timestamp_type_t tstype;
512-
self->timestamp = rd_kafka_message_timestamp(rkm, &tstype);
513-
if (tstype != RD_KAFKA_TIMESTAMP_NOT_AVAILABLE) {
514-
// todo: make tstype available to python api
515-
}
521+
self->timestamp = rd_kafka_message_timestamp(rkm, &self->tstype);
516522

517523
return (PyObject *)self;
518524
}

confluent_kafka/src/confluent_kafka.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ typedef struct {
220220
int32_t partition;
221221
int64_t offset;
222222
int64_t timestamp;
223+
rd_kafka_timestamp_type_t tstype;
223224
} Message;
224225

225226
extern PyTypeObject MessageType;

examples/integration_test.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,7 @@ def verify_consumer():
210210
'group.id': 'test.py',
211211
'session.timeout.ms': 6000,
212212
'enable.auto.commit': False,
213+
'api.version.request': True,
213214
'on_commit': print_commit_result,
214215
'error_cb': error_cb,
215216
'default.topic.config': {
@@ -243,9 +244,9 @@ def verify_consumer():
243244
break
244245

245246
if False:
246-
print('%s[%d]@%d: key=%s, value=%s' % \
247+
print('%s[%d]@%d: key=%s, value=%s, tstype=%d, timestamp=%s' % \
247248
(msg.topic(), msg.partition(), msg.offset(),
248-
msg.key(), msg.value()))
249+
msg.key(), msg.value(), msg.tstype(), repr(msg.timestamp())))
249250

250251
if (msg.offset() % 5) == 0:
251252
# Async commit

0 commit comments

Comments
 (0)