Skip to content

Commit f00ec40

Browse files
committed
Add timestamp() to librdkafka messages
1 parent b85afd5 commit f00ec40

File tree

2 files changed

+23
-3
lines changed

2 files changed

+23
-3
lines changed

confluent_kafka/src/confluent_kafka.c

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ static PyObject* KafkaError_richcompare (KafkaError *self, PyObject *o2,
164164
Py_INCREF(result);
165165
return result;
166166
}
167-
167+
168168

169169
static PyTypeObject KafkaErrorType = {
170170
PyVarObject_HEAD_INIT(NULL, 0)
@@ -326,6 +326,14 @@ static PyObject *Message_offset (Message *self, PyObject *ignore) {
326326
}
327327

328328

329+
static PyObject *Message_timestamp (Message *self, PyObject *ignore) {
330+
if (self->timestamp > 0)
331+
return PyLong_FromLong(self->timestamp);
332+
else
333+
Py_RETURN_NONE;
334+
}
335+
336+
329337
static PyMethodDef Message_methods[] = {
330338
{ "error", (PyCFunction)Message_error, METH_NOARGS,
331339
" The message object is also used to propagate errors and events, "
@@ -362,6 +370,11 @@ static PyMethodDef Message_methods[] = {
362370
" :rtype: int or None\n"
363371
"\n"
364372
},
373+
{ "timestamp", (PyCFunction)Message_timestamp, METH_NOARGS,
374+
" :returns: message timestamp or None if not available.\n"
375+
" :rtype: int or None\n"
376+
"\n"
377+
},
365378
{ NULL }
366379
};
367380

@@ -495,6 +508,12 @@ PyObject *Message_new0 (const rd_kafka_message_t *rkm) {
495508
self->partition = rkm->partition;
496509
self->offset = rkm->offset;
497510

511+
rd_kafka_timestamp_type_t tstype;
512+
self->timestamp = rd_kafka_message_timestamp(rkm, &tstype);
513+
if (tstype != RD_KAFKA_TIMESTAMP_NOT_AVAILABLE) {
514+
// todo: make tstype available to python api
515+
}
516+
498517
return (PyObject *)self;
499518
}
500519

@@ -560,7 +579,7 @@ static PyObject *TopicPartition_new (PyTypeObject *type, PyObject *args,
560579

561580
return TopicPartition_new0(topic, partition, offset, 0);
562581
}
563-
582+
564583

565584

566585
static int TopicPartition_traverse (TopicPartition *self,
@@ -758,7 +777,7 @@ PyObject *c_parts_to_py (const rd_kafka_topic_partition_list_t *c_parts) {
758777
}
759778

760779
return parts;
761-
780+
762781
}
763782

764783
/**

confluent_kafka/src/confluent_kafka.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,7 @@ typedef struct {
219219
PyObject *error;
220220
int32_t partition;
221221
int64_t offset;
222+
int64_t timestamp;
222223
} Message;
223224

224225
extern PyTypeObject MessageType;

0 commit comments

Comments
 (0)