Skip to content

Commit 04aa84f

Browse files
authored
Merge pull request confluentinc#36 from confluentinc/subscription_assignment
Add Consumer.Subscription() and Assignment() APIs
2 parents 3d69321 + db80386 commit 04aa84f

File tree

3 files changed

+138
-1
lines changed

3 files changed

+138
-1
lines changed

kafka/consumer.go

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@ import (
2424
/*
2525
#include <stdlib.h>
2626
#include <librdkafka/rdkafka.h>
27+
28+
29+
static rd_kafka_topic_partition_t *_c_rdkafka_topic_partition_list_entry(rd_kafka_topic_partition_list_t *rktparlist, int idx) {
30+
return idx < rktparlist->cnt ? &rktparlist->elems[idx] : NULL;
31+
}
2732
*/
2833
import "C"
2934

@@ -201,7 +206,7 @@ func (c *Consumer) CommitOffsets(offsets []TopicPartition) ([]TopicPartition, er
201206
// (through Assign() or implicitly through a self-rebalanced Subscribe()).
202207
// To set the starting offset it is preferred to use Assign() and provide
203208
// a starting offset for each partition.
204-
//
209+
//
205210
// Returns an error on failure or nil otherwise.
206211
func (c *Consumer) Seek(partition TopicPartition, timeoutMs int) error {
207212
rkt := c.handle.getRkt(*partition.Topic)
@@ -389,3 +394,39 @@ func (c *Consumer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*M
389394
func (c *Consumer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error) {
390395
return queryWatermarkOffsets(c, topic, partition, timeoutMs)
391396
}
397+
398+
// Subscription returns the current subscription as set by Subscribe()
399+
func (c *Consumer) Subscription() (topics []string, err error) {
400+
var cTopics *C.rd_kafka_topic_partition_list_t
401+
402+
cErr := C.rd_kafka_subscription(c.handle.rk, &cTopics)
403+
if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
404+
return nil, newError(cErr)
405+
}
406+
defer C.rd_kafka_topic_partition_list_destroy(cTopics)
407+
408+
topicCnt := int(cTopics.cnt)
409+
topics = make([]string, topicCnt)
410+
for i := 0; i < topicCnt; i++ {
411+
crktpar := C._c_rdkafka_topic_partition_list_entry(cTopics,
412+
C.int(i))
413+
topics[i] = C.GoString(crktpar.topic)
414+
}
415+
416+
return topics, nil
417+
}
418+
419+
// Assignment returns the current partition assignments
420+
func (c *Consumer) Assignment() (partitions []TopicPartition, err error) {
421+
var cParts *C.rd_kafka_topic_partition_list_t
422+
423+
cErr := C.rd_kafka_assignment(c.handle.rk, &cParts)
424+
if cErr != C.RD_KAFKA_RESP_ERR_NO_ERROR {
425+
return nil, newError(cErr)
426+
}
427+
defer C.rd_kafka_topic_partition_list_destroy(cParts)
428+
429+
partitions = newTopicPartitionsFromCparts(cParts)
430+
431+
return partitions, nil
432+
}

kafka/consumer_test.go

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717
package kafka
1818

1919
import (
20+
"fmt"
21+
"reflect"
22+
"sort"
2023
"testing"
2124
)
2225

@@ -85,3 +88,75 @@ func TestConsumerAPIs(t *testing.T) {
8588
t.Errorf("Close failed: %s", err)
8689
}
8790
}
91+
92+
func TestConsumerSubscription(t *testing.T) {
93+
c, err := NewConsumer(&ConfigMap{"group.id": "gotest"})
94+
if err != nil {
95+
t.Fatalf("%s", err)
96+
}
97+
98+
topics := []string{"gotest1", "gotest2", "gotest3"}
99+
sort.Strings(topics)
100+
101+
err = c.SubscribeTopics(topics, nil)
102+
if err != nil {
103+
t.Fatalf("SubscribeTopics failed: %s", err)
104+
}
105+
106+
subscription, err := c.Subscription()
107+
if err != nil {
108+
t.Fatalf("Subscription() failed: %s", err)
109+
}
110+
111+
sort.Strings(subscription)
112+
113+
t.Logf("Compare Subscription %v to original list of topics %v\n",
114+
subscription, topics)
115+
116+
r := reflect.DeepEqual(topics, subscription)
117+
if r != true {
118+
t.Fatalf("Subscription() %v does not match original topics %v",
119+
subscription, topics)
120+
}
121+
c.Close()
122+
}
123+
124+
func TestConsumerAssignment(t *testing.T) {
125+
c, err := NewConsumer(&ConfigMap{"group.id": "gotest"})
126+
if err != nil {
127+
t.Fatalf("%s", err)
128+
}
129+
130+
topic0 := "topic0"
131+
topic1 := "topic1"
132+
partitions := TopicPartitions{
133+
{Topic: &topic1, Partition: 1},
134+
{Topic: &topic1, Partition: 3},
135+
{Topic: &topic0, Partition: 2}}
136+
sort.Sort(partitions)
137+
138+
err = c.Assign(partitions)
139+
if err != nil {
140+
t.Fatalf("Assign failed: %s", err)
141+
}
142+
143+
assignment, err := c.Assignment()
144+
if err != nil {
145+
t.Fatalf("Assignment() failed: %s", err)
146+
}
147+
148+
sort.Sort(TopicPartitions(assignment))
149+
150+
t.Logf("Compare Assignment %v to original list of partitions %v\n",
151+
assignment, partitions)
152+
153+
// reflect.DeepEqual() can't be used since TopicPartition.Topic
154+
// is a pointer to a string rather than a string and the pointer
155+
// will differ between partitions and assignment.
156+
// Instead do a simple stringification + string compare.
157+
if fmt.Sprintf("%v", assignment) != fmt.Sprintf("%v", partitions) {
158+
t.Fatalf("Assignment() %v does not match original partitions %v",
159+
assignment, partitions)
160+
}
161+
c.Close()
162+
}

kafka/kafka.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,27 @@ func (p TopicPartition) String() string {
264264
topic, p.Partition, p.Offset)
265265
}
266266

267+
// TopicPartitions is a slice of TopicPartitions that also implements
268+
// the sort interface
269+
type TopicPartitions []TopicPartition
270+
271+
func (tps TopicPartitions) Len() int {
272+
return len(tps)
273+
}
274+
275+
func (tps TopicPartitions) Less(i, j int) bool {
276+
if *tps[i].Topic < *tps[j].Topic {
277+
return true
278+
} else if *tps[i].Topic > *tps[j].Topic {
279+
return false
280+
}
281+
return tps[i].Partition < tps[j].Partition
282+
}
283+
284+
func (tps TopicPartitions) Swap(i, j int) {
285+
tps[i], tps[j] = tps[j], tps[i]
286+
}
287+
267288
// new_cparts_from_TopicPartitions creates a new C rd_kafka_topic_partition_list_t
268289
// from a TopicPartition array.
269290
func newCPartsFromTopicPartitions(partitions []TopicPartition) (cparts *C.rd_kafka_topic_partition_list_t) {

0 commit comments

Comments
 (0)