Skip to content

Commit 0a7ebe3

Browse files
Serpentianolegrok
authored andcommitted
Use x* memory functions to panic on OOM
This patch replaces all occurrences of memory related functions with xmalloc, xcalloc and xrealloc, making them to panic, when we fail to allocate memory for them. Closes #3
1 parent cfe6682 commit 0a7ebe3

File tree

7 files changed

+28
-80
lines changed

7 files changed

+28
-80
lines changed

kafka/callbacks.c

Lines changed: 10 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,11 @@
2525

2626
log_msg_t *
2727
new_log_msg(int level, const char *fac, const char *buf) {
28-
log_msg_t *msg = malloc(sizeof(log_msg_t));
29-
if (msg == NULL) {
30-
return NULL;
31-
}
28+
log_msg_t *msg = xmalloc(sizeof(log_msg_t));
3229
msg->level = level;
33-
msg->fac = malloc(sizeof(char) * strlen(fac) + 1);
30+
msg->fac = xmalloc(sizeof(char) * strlen(fac) + 1);
3431
strcpy(msg->fac, fac);
35-
msg->buf = malloc(sizeof(char) * strlen(buf) + 1);
32+
msg->buf = xmalloc(sizeof(char) * strlen(buf) + 1);
3633
strcpy(msg->buf, buf);
3734
return msg;
3835
}
@@ -78,11 +75,9 @@ stats_callback(rd_kafka_t *rd_kafka, char *json, size_t json_len, void *opaque)
7875

7976
error_msg_t *
8077
new_error_msg(int err, const char *reason) {
81-
error_msg_t *msg = malloc(sizeof(error_msg_t));
82-
if (msg == NULL)
83-
return NULL;
78+
error_msg_t *msg = xmalloc(sizeof(error_msg_t));
8479
msg->err = err;
85-
msg->reason = malloc(sizeof(char) * strlen(reason) + 1);
80+
msg->reason = xmalloc(sizeof(char) * strlen(reason) + 1);
8681
strcpy(msg->reason, reason);
8782
return msg;
8883
}
@@ -134,7 +129,7 @@ push_errors_cb_args(struct lua_State *L, const error_msg_t *msg)
134129
dr_msg_t *
135130
new_dr_msg(int dr_callback, int err) {
136131
dr_msg_t *dr_msg;
137-
dr_msg = malloc(sizeof(dr_msg_t));
132+
dr_msg = xmalloc(sizeof(dr_msg_t));
138133
dr_msg->dr_callback = dr_callback;
139134
dr_msg->err = err;
140135
return dr_msg;
@@ -165,11 +160,7 @@ msg_delivery_callback(rd_kafka_t *UNUSED(producer), const rd_kafka_message_t *ms
165160

166161
rebalance_msg_t *
167162
new_rebalance_revoke_msg(rd_kafka_topic_partition_list_t *revoked) {
168-
rebalance_msg_t *msg = malloc(sizeof(rebalance_msg_t));
169-
if (msg == NULL) {
170-
return NULL;
171-
}
172-
163+
rebalance_msg_t *msg = xmalloc(sizeof(rebalance_msg_t));
173164
pthread_mutex_t lock;
174165
if (pthread_mutex_init(&lock, NULL) != 0) {
175166
free(msg);
@@ -193,11 +184,7 @@ new_rebalance_revoke_msg(rd_kafka_topic_partition_list_t *revoked) {
193184

194185
rebalance_msg_t *
195186
new_rebalance_assign_msg(rd_kafka_topic_partition_list_t *assigned) {
196-
rebalance_msg_t *msg = malloc(sizeof(rebalance_msg_t));
197-
if (msg == NULL) {
198-
return NULL;
199-
}
200-
187+
rebalance_msg_t *msg = xmalloc(sizeof(rebalance_msg_t));
201188
pthread_mutex_t lock;
202189
if (pthread_mutex_init(&lock, NULL) != 0) {
203190
free(msg);
@@ -221,11 +208,7 @@ new_rebalance_assign_msg(rd_kafka_topic_partition_list_t *assigned) {
221208

222209
rebalance_msg_t *
223210
new_rebalance_error_msg(rd_kafka_resp_err_t err) {
224-
rebalance_msg_t *msg = malloc(sizeof(rebalance_msg_t));
225-
if (msg == NULL) {
226-
return NULL;
227-
}
228-
211+
rebalance_msg_t *msg = xmalloc(sizeof(rebalance_msg_t));
229212
pthread_mutex_t lock;
230213
if (pthread_mutex_init(&lock, NULL) != 0) {
231214
free(msg);
@@ -326,7 +309,7 @@ rebalance_callback(rd_kafka_t *consumer, rd_kafka_resp_err_t err, rd_kafka_topic
326309

327310
event_queues_t *
328311
new_event_queues() {
329-
event_queues_t *event_queues = calloc(1, sizeof(event_queues_t));
312+
event_queues_t *event_queues = xcalloc(1, sizeof(event_queues_t));
330313
for (int i = 0; i < MAX_QUEUE; i++)
331314
event_queues->cb_refs[i] = LUA_REFNIL;
332315
return event_queues;

kafka/consumer.c

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -90,10 +90,7 @@ consumer_poll_loop(void *arg) {
9090

9191
static consumer_poller_t *
9292
new_consumer_poller(rd_kafka_t *rd_consumer) {
93-
consumer_poller_t *poller = malloc(sizeof(consumer_poller_t));
94-
if (poller == NULL)
95-
return NULL;
96-
93+
consumer_poller_t *poller = xmalloc(sizeof(consumer_poller_t));
9794
poller->rd_consumer = rd_consumer;
9895
poller->should_stop = 0;
9996

@@ -719,7 +716,7 @@ lua_create_consumer(struct lua_State *L) {
719716
consumer_poller_t *poller = new_consumer_poller(rd_consumer);
720717

721718
consumer_t *consumer;
722-
consumer = malloc(sizeof(consumer_t));
719+
consumer = xmalloc(sizeof(consumer_t));
723720
consumer->rd_consumer = rd_consumer;
724721
consumer->topics = NULL;
725722
consumer->event_queues = event_queues;

kafka/consumer_msg.c

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -141,10 +141,7 @@ lua_consumer_msg_gc(struct lua_State *L) {
141141
msg_t *
142142
new_consumer_msg(rd_kafka_message_t *rd_message) {
143143
size_t message_size = sizeof(msg_t) + rd_message->len + rd_message->key_len;
144-
msg_t *msg = calloc(message_size, 1);
145-
if (msg == NULL)
146-
return NULL;
147-
144+
msg_t *msg = xcalloc(message_size, 1);
148145
msg->topic = rd_message->rkt;
149146
msg->partition = rd_message->partition;
150147
msg->value = (char*)msg + sizeof(msg_t);

kafka/producer.c

Lines changed: 7 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,7 @@ producer_poll_loop(void *arg) {
5353

5454
static producer_poller_t *
5555
new_producer_poller(rd_kafka_t *rd_producer) {
56-
producer_poller_t *poller = malloc(sizeof(producer_poller_t));
57-
if (poller == NULL)
58-
return NULL;
59-
56+
producer_poller_t *poller = xmalloc(sizeof(producer_poller_t));
6057
poller->rd_producer = rd_producer;
6158
poller->should_stop = 0;
6259

@@ -104,32 +101,24 @@ destroy_producer_poller(producer_poller_t *poller) {
104101
producer_topics_t *
105102
new_producer_topics(int32_t capacity) {
106103
rd_kafka_topic_t **elements;
107-
elements = malloc(sizeof(rd_kafka_topic_t *) * capacity);
108-
if (elements == NULL)
109-
return NULL;
110-
104+
elements = xmalloc(sizeof(rd_kafka_topic_t *) * capacity);
111105
producer_topics_t *topics;
112-
topics = malloc(sizeof(producer_topics_t));
106+
topics = xmalloc(sizeof(producer_topics_t));
113107
topics->capacity = capacity;
114108
topics->count = 0;
115109
topics->elements = elements;
116110

117111
return topics;
118112
}
119113

120-
int
114+
void
121115
add_producer_topics(producer_topics_t *topics, rd_kafka_topic_t *element) {
122116
if (topics->count >= topics->capacity) {
123-
rd_kafka_topic_t **new_elements = realloc(topics->elements, sizeof(rd_kafka_topic_t *) * topics->capacity * 2);
124-
if (new_elements == NULL) {
125-
printf("realloc failed to relloc rd_kafka_topic_t array.");
126-
return 1;
127-
}
117+
rd_kafka_topic_t **new_elements = xrealloc(topics->elements, sizeof(rd_kafka_topic_t *) * topics->capacity * 2);
128118
topics->elements = new_elements;
129119
topics->capacity *= 2;
130120
}
131121
topics->elements[topics->count++] = element;
132-
return 0;
133122
}
134123

135124
static rd_kafka_topic_t *
@@ -312,10 +301,7 @@ lua_producer_produce(struct lua_State *L) {
312301
lua_pushstring(L, rd_kafka_err2str(rd_kafka_last_error()));
313302
goto error;
314303
}
315-
if (add_producer_topics(producer->topics, rd_topic) != 0) {
316-
lua_pushstring(L, "Unexpected error: failed to add new topic to topic list!");
317-
goto error;
318-
}
304+
add_producer_topics(producer->topics, rd_topic);
319305
}
320306

321307
rd_kafka_resp_err_t err = RD_KAFKA_RESP_ERR_NO_ERROR;
@@ -572,7 +558,7 @@ lua_create_producer(struct lua_State *L) {
572558
producer_poller_t *poller = new_producer_poller(rd_producer);
573559

574560
producer_t *producer;
575-
producer = malloc(sizeof(producer_t));
561+
producer = xmalloc(sizeof(producer_t));
576562
producer->rd_producer = rd_producer;
577563
producer->topics = new_producer_topics(256);
578564
producer->event_queues = event_queues;

kafka/producer.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ typedef struct {
3030

3131
producer_topics_t *new_producer_topics(int32_t capacity);
3232

33-
int
33+
void
3434
add_producer_topics(producer_topics_t *topics, rd_kafka_topic_t *element);
3535

3636
void

kafka/queue.c

Lines changed: 6 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include <pthread.h>
33
#include <unistd.h>
44

5+
#include <common.h>
56
#include <queue.h>
67

78
////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -54,18 +55,10 @@ queue_pop(queue_t *queue) {
5455
* @param value
5556
* @return
5657
*/
57-
int
58+
void
5859
queue_lockfree_push(queue_t *queue, void *value) {
59-
if (value == NULL || queue == NULL) {
60-
return -1;
61-
}
62-
6360
queue_node_t *new_node;
64-
new_node = malloc(sizeof(queue_node_t));
65-
if (new_node == NULL) {
66-
return -1;
67-
}
68-
61+
new_node = xmalloc(sizeof(queue_node_t));
6962
new_node->value = value;
7063
new_node->next = NULL;
7164

@@ -79,8 +72,6 @@ queue_lockfree_push(queue_t *queue, void *value) {
7972
}
8073

8174
queue->count += 1;
82-
83-
return 0;
8475
}
8576

8677
int
@@ -90,21 +81,15 @@ queue_push(queue_t *queue, void *value) {
9081
}
9182

9283
pthread_mutex_lock(&queue->lock);
93-
94-
int output = queue_lockfree_push(queue, value);
95-
84+
queue_lockfree_push(queue, value);
9685
pthread_mutex_unlock(&queue->lock);
9786

98-
return output;
87+
return 0;
9988
}
10089

10190
queue_t *
10291
new_queue() {
103-
queue_t *queue = malloc(sizeof(queue_t));
104-
if (queue == NULL) {
105-
return NULL;
106-
}
107-
92+
queue_t *queue = xmalloc(sizeof(queue_t));
10893
pthread_mutex_t lock;
10994
if (pthread_mutex_init(&lock, NULL) != 0) {
11095
free(queue);

kafka/queue.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ queue_pop(queue_t *queue);
4141
* @param value
4242
* @return
4343
*/
44-
int
44+
void
4545
queue_lockfree_push(queue_t *queue, void *value);
4646

4747
int

0 commit comments

Comments
 (0)