Skip to content

Commit e92ae7c

Browse files
committed
Added CallState to track per-thread C call state (fixes #19)
1 parent fac4543 commit e92ae7c

File tree

5 files changed

+196
-36
lines changed

5 files changed

+196
-36
lines changed

confluent_kafka/src/Consumer.c

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -366,20 +366,17 @@ static PyObject *Consumer_poll (Handle *self, PyObject *args,
366366
static char *kws[] = { "timeout", NULL };
367367
rd_kafka_message_t *rkm;
368368
PyObject *msgobj;
369+
CallState cs;
369370

370371
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|d", kws, &tmout))
371372
return NULL;
372373

373-
self->callback_crashed = 0;
374-
self->thread_state = PyEval_SaveThread();
374+
CallState_begin(self, &cs);
375375

376376
rkm = rd_kafka_consumer_poll(self->rk, tmout >= 0 ?
377377
(int)(tmout * 1000.0f) : -1);
378378

379-
PyEval_RestoreThread(self->thread_state);
380-
self->thread_state = NULL;
381-
382-
if (self->callback_crashed)
379+
if (!CallState_end(self, &cs))
383380
return NULL;
384381

385382
if (!rkm)
@@ -393,9 +390,15 @@ static PyObject *Consumer_poll (Handle *self, PyObject *args,
393390

394391

395392
static PyObject *Consumer_close (Handle *self, PyObject *ignore) {
396-
self->thread_state = PyEval_SaveThread();
393+
CallState cs;
394+
395+
CallState_begin(self, &cs);
396+
397397
rd_kafka_consumer_close(self->rk);
398-
PyEval_RestoreThread(self->thread_state);
398+
399+
if (!CallState_end(self, &cs))
400+
return NULL;
401+
399402
Py_RETURN_NONE;
400403
}
401404

@@ -593,8 +596,9 @@ static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
593596
rd_kafka_topic_partition_list_t *c_parts,
594597
void *opaque) {
595598
Handle *self = opaque;
599+
CallState *cs;
596600

597-
PyEval_RestoreThread(self->thread_state);
601+
cs = CallState_get(self);
598602

599603
self->u.Consumer.rebalance_assigned = 0;
600604

@@ -615,8 +619,8 @@ static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
615619
if (!args) {
616620
cfl_PyErr_Format(RD_KAFKA_RESP_ERR__FAIL,
617621
"Unable to build callback args");
618-
self->thread_state = PyEval_SaveThread();
619-
self->callback_crashed++;
622+
CallState_crash(cs);
623+
CallState_resume(cs);
620624
return;
621625
}
622626

@@ -630,7 +634,7 @@ static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
630634
if (result)
631635
Py_DECREF(result);
632636
else {
633-
self->callback_crashed++;
637+
CallState_crash(cs);
634638
rd_kafka_yield(rk);
635639
}
636640
}
@@ -646,7 +650,7 @@ static void Consumer_rebalance_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
646650
rd_kafka_assign(rk, NULL);
647651
}
648652

649-
self->thread_state = PyEval_SaveThread();
653+
CallState_resume(cs);
650654
}
651655

652656

@@ -655,11 +659,12 @@ static void Consumer_offset_commit_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
655659
void *opaque) {
656660
Handle *self = opaque;
657661
PyObject *parts, *k_err, *args, *result;
662+
CallState *cs;
658663

659664
if (!self->u.Consumer.on_commit)
660665
return;
661666

662-
PyEval_RestoreThread(self->thread_state);
667+
cs = CallState_get(self);
663668

664669
/* Insantiate error object */
665670
k_err = KafkaError_new_or_None(err, NULL);
@@ -675,8 +680,8 @@ static void Consumer_offset_commit_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
675680
if (!args) {
676681
cfl_PyErr_Format(RD_KAFKA_RESP_ERR__FAIL,
677682
"Unable to build callback args");
678-
self->thread_state = PyEval_SaveThread();
679-
self->callback_crashed++;
683+
CallState_crash(cs);
684+
CallState_resume(cs);
680685
return;
681686
}
682687

@@ -687,11 +692,11 @@ static void Consumer_offset_commit_cb (rd_kafka_t *rk, rd_kafka_resp_err_t err,
687692
if (result)
688693
Py_DECREF(result);
689694
else {
690-
self->callback_crashed++;
695+
CallState_crash(cs);
691696
rd_kafka_yield(rk);
692697
}
693698

694-
self->thread_state = PyEval_SaveThread();
699+
CallState_resume(cs);
695700
}
696701

697702

confluent_kafka/src/Producer.c

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -131,14 +131,15 @@ static void dr_msg_cb (rd_kafka_t *rk, const rd_kafka_message_t *rkm,
131131
void *opaque) {
132132
struct Producer_msgstate *msgstate = rkm->_private;
133133
Handle *self = opaque;
134+
CallState *cs;
134135
PyObject *args;
135136
PyObject *result;
136137
PyObject *msgobj;
137138

138139
if (!msgstate)
139140
return;
140141

141-
PyEval_RestoreThread(self->thread_state);
142+
cs = CallState_get(self);
142143

143144
if (!msgstate->dr_cb) {
144145
/* No callback defined */
@@ -156,7 +157,7 @@ static void dr_msg_cb (rd_kafka_t *rk, const rd_kafka_message_t *rkm,
156157
if (!args) {
157158
cfl_PyErr_Format(RD_KAFKA_RESP_ERR__FAIL,
158159
"Unable to build callback args");
159-
self->callback_crashed++;
160+
CallState_crash(cs);
160161
goto done;
161162
}
162163

@@ -166,13 +167,13 @@ static void dr_msg_cb (rd_kafka_t *rk, const rd_kafka_message_t *rkm,
166167
if (result)
167168
Py_DECREF(result);
168169
else {
169-
self->callback_crashed++;
170+
CallState_crash(cs);
170171
rd_kafka_yield(rk);
171172
}
172173

173174
done:
174175
Producer_msgstate_destroy(msgstate);
175-
self->thread_state = PyEval_SaveThread();
176+
CallState_resume(cs);
176177
}
177178

178179

@@ -321,20 +322,15 @@ static PyObject *Producer_produce (Handle *self, PyObject *args,
321322
*/
322323
static int Producer_poll0 (Handle *self, int tmout) {
323324
int r;
325+
CallState cs;
324326

325-
self->callback_crashed = 0;
326-
self->thread_state = PyEval_SaveThread();
327+
CallState_begin(self, &cs);
327328

328329
r = rd_kafka_poll(self->rk, tmout);
329330

330-
PyEval_RestoreThread(self->thread_state);
331-
self->thread_state = NULL;
332-
333-
if (PyErr_CheckSignals() == -1)
334-
return -1;
335-
336-
if (self->callback_crashed)
331+
if (!CallState_end(self, &cs)) {
337332
return -1;
333+
}
338334

339335
return r;
340336
}

confluent_kafka/src/confluent_kafka.c

Lines changed: 61 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -813,8 +813,9 @@ rd_kafka_topic_partition_list_t *py_to_c_parts (PyObject *plist) {
813813
static void error_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque) {
814814
Handle *h = opaque;
815815
PyObject *eo, *result;
816+
CallState *cs;
816817

817-
PyEval_RestoreThread(h->thread_state);
818+
cs = CallState_get(h);
818819
if (!h->error_cb) {
819820
/* No callback defined */
820821
goto done;
@@ -827,12 +828,12 @@ static void error_cb (rd_kafka_t *rk, int err, const char *reason, void *opaque)
827828
if (result) {
828829
Py_DECREF(result);
829830
} else {
830-
h->callback_crashed++;
831+
CallState_crash(cs);
831832
rd_kafka_yield(h->rk);
832833
}
833834

834835
done:
835-
h->thread_state = PyEval_SaveThread();
836+
CallState_resume(cs);
836837
}
837838

838839

@@ -855,6 +856,8 @@ void Handle_clear (Handle *h) {
855856
if (h->error_cb) {
856857
Py_DECREF(h->error_cb);
857858
}
859+
860+
PyThread_delete_key(h->tlskey);
858861
}
859862

860863
/**
@@ -1172,12 +1175,67 @@ rd_kafka_conf_t *common_conf_setup (rd_kafka_type_t ktype,
11721175

11731176
rd_kafka_conf_set_opaque(conf, h);
11741177

1178+
h->tlskey = PyThread_create_key();
1179+
11751180
return conf;
11761181
}
11771182

11781183

11791184

11801185

1186+
/**
1187+
* @brief Initialiase a CallState and unlock the GIL prior to a
1188+
* possibly blocking external call.
1189+
*/
1190+
void CallState_begin (Handle *h, CallState *cs) {
1191+
cs->thread_state = PyEval_SaveThread();
1192+
cs->crashed = 0;
1193+
PyThread_set_key_value(h->tlskey, cs);
1194+
}
1195+
1196+
/**
1197+
* @brief Relock the GIL after external call is done.
1198+
* @returns 0 if a Python signal was raised or a callback crashed, else 1.
1199+
*/
1200+
int CallState_end (Handle *h, CallState *cs) {
1201+
PyThread_delete_key_value(h->tlskey);
1202+
1203+
PyEval_RestoreThread(cs->thread_state);
1204+
1205+
if (PyErr_CheckSignals() == -1 || cs->crashed)
1206+
return 0;
1207+
1208+
return 1;
1209+
}
1210+
1211+
1212+
/**
1213+
* @brief Get the current thread's CallState and re-locks the GIL.
1214+
*/
1215+
CallState *CallState_get (Handle *h) {
1216+
CallState *cs = PyThread_get_key_value(h->tlskey);
1217+
assert(cs != NULL);
1218+
PyEval_RestoreThread(cs->thread_state);
1219+
cs->thread_state = NULL;
1220+
return cs;
1221+
}
1222+
1223+
/**
1224+
* @brief Un-locks the GIL to resume blocking external call.
1225+
*/
1226+
void CallState_resume (CallState *cs) {
1227+
assert(cs->thread_state == NULL);
1228+
cs->thread_state = PyEval_SaveThread();
1229+
}
1230+
1231+
/**
1232+
* @brief Indicate that call crashed.
1233+
*/
1234+
void CallState_crash (CallState *cs) {
1235+
cs->crashed++;
1236+
}
1237+
1238+
11811239

11821240
/****************************************************************************
11831241
*

confluent_kafka/src/confluent_kafka.h

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
#include <Python.h>
1818
#include <structmember.h>
19+
#include <pythread.h>
1920

2021
#include <librdkafka/rdkafka.h>
2122

@@ -112,9 +113,8 @@ PyObject *KafkaError_new_or_None (rd_kafka_resp_err_t err, const char *str);
112113
typedef struct {
113114
PyObject_HEAD
114115
rd_kafka_t *rk;
115-
int callback_crashed;
116-
PyThreadState *thread_state;
117116
PyObject *error_cb;
117+
int tlskey; /* Thread-Local-Storage key */
118118

119119
union {
120120
/**
@@ -147,6 +147,40 @@ void Handle_clear (Handle *h);
147147
int Handle_traverse (Handle *h, visitproc visit, void *arg);
148148

149149

150+
/**
151+
* @brief Current thread's state for "blocking" calls to librdkafka.
152+
*/
153+
typedef struct {
154+
PyThreadState *thread_state;
155+
int crashed; /* Callback crashed */
156+
} CallState;
157+
158+
/**
159+
* @brief Initialiase a CallState and unlock the GIL prior to a
160+
* possibly blocking external call.
161+
*/
162+
void CallState_begin (Handle *h, CallState *cs);
163+
/**
164+
* @brief Relock the GIL after external call is done, remove TLS state.
165+
* @returns 0 if a Python signal was raised or a callback crashed, else 1.
166+
*/
167+
int CallState_end (Handle *h, CallState *cs);
168+
169+
/**
170+
* @brief Get the current thread's CallState and re-locks the GIL.
171+
*/
172+
CallState *CallState_get (Handle *h);
173+
/**
174+
* @brief Un-locks the GIL to resume blocking external call.
175+
*/
176+
void CallState_resume (CallState *cs);
177+
178+
/**
179+
* @brief Indicate that call crashed.
180+
*/
181+
void CallState_crash (CallState *cs);
182+
183+
150184
/****************************************************************************
151185
*
152186
*

0 commit comments

Comments
 (0)