Skip to content

Commit f697f39

Browse files
committed
feature: confluentinc#65 Custom retry delay provider
1 parent 253f61f commit f697f39

File tree

2 files changed

+192
-170
lines changed

2 files changed

+192
-170
lines changed
Lines changed: 178 additions & 165 deletions
Original file line numberDiff line numberDiff line change
@@ -1,165 +1,178 @@
1-
package io.confluent.parallelconsumer;
2-
3-
/*-
4-
* Copyright (C) 2020-2021 Confluent, Inc.
5-
*/
6-
import io.confluent.parallelconsumer.state.WorkContainer;
7-
import lombok.Builder;
8-
import lombok.Getter;
9-
import lombok.ToString;
10-
import org.apache.kafka.clients.consumer.Consumer;
11-
import org.apache.kafka.clients.producer.Producer;
12-
13-
import java.time.Duration;
14-
import java.util.Objects;
15-
16-
import static io.confluent.csid.utils.StringUtils.msg;
17-
import static io.confluent.parallelconsumer.ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER;
18-
19-
/**
20-
* The options for the {@link ParallelEoSStreamProcessor} system.
21-
*
22-
* @see #builder()
23-
* @see ParallelConsumerOptions.ParallelConsumerOptionsBuilder
24-
*/
25-
@Getter
26-
@Builder(toBuilder = true)
27-
@ToString
28-
public class ParallelConsumerOptions<K, V> {
29-
30-
/**
31-
* Required parameter for all use.
32-
*/
33-
private final Consumer<K, V> consumer;
34-
35-
/**
36-
* Supplying a producer is only needed if using the produce flows.
37-
*
38-
* @see ParallelStreamProcessor
39-
*/
40-
private final Producer<K, V> producer;
41-
42-
/**
43-
* Path to Managed executor service for Java EE
44-
*/
45-
@Builder.Default
46-
private final String managedExecutorService = "java:comp/DefaultManagedExecutorService";
47-
48-
/**
49-
* Path to Managed thread factory for Java EE
50-
*/
51-
@Builder.Default
52-
private final String managedThreadFactory = "java:comp/DefaultManagedThreadFactory";
53-
54-
/**
55-
* The ordering guarantee to use.
56-
*/
57-
public enum ProcessingOrder {
58-
59-
/**
60-
* No ordering is guaranteed, not even partition order. Fastest. Concurrency is at most the max number of
61-
* concurrency or max number of uncommitted messages, limited by the max concurrency or uncommitted settings.
62-
*/
63-
UNORDERED,
64-
65-
/**
66-
* Process messages within a partition in order, but process multiple partitions in parallel. Similar to running
67-
* more consumer for a topic. Concurrency is at most the number of partitions.
68-
*/
69-
PARTITION,
70-
71-
/**
72-
* Process messages in key order. Concurrency is at most the number of unique keys in a topic, limited by the
73-
* max concurrency or uncommitted settings.
74-
*/
75-
KEY
76-
}
77-
78-
/**
79-
* The type of commit to be made, with either a transactions configured Producer where messages produced are
80-
* committed back to the Broker along with the offsets they originated from, or with the faster simpler Consumer
81-
* offset system either synchronously or asynchronously
82-
*/
83-
public enum CommitMode {
84-
85-
/**
86-
* Periodically commits through the Producer using transactions. Slowest of the options, but no duplicates in
87-
* Kafka guaranteed (message replay may cause duplicates in external systems which is unavoidable with Kafka).
88-
* <p>
89-
* This is separate from using an IDEMPOTENT Producer, which can be used, along with {@link
90-
* CommitMode#PERIODIC_CONSUMER_SYNC} or {@link CommitMode#PERIODIC_CONSUMER_ASYNCHRONOUS}.
91-
*/
92-
PERIODIC_TRANSACTIONAL_PRODUCER,
93-
94-
/**
95-
* Periodically synchronous commits with the Consumer. Much faster than {@link
96-
* #PERIODIC_TRANSACTIONAL_PRODUCER}. Slower but potentially less duplicates than {@link
97-
* #PERIODIC_CONSUMER_ASYNCHRONOUS} upon replay.
98-
*/
99-
PERIODIC_CONSUMER_SYNC,
100-
101-
/**
102-
* Periodically commits offsets asynchronously. The fastest option, under normal conditions will have few or no
103-
* duplicates. Under failure recovery may have more duplicates than {@link #PERIODIC_CONSUMER_SYNC}.
104-
*/
105-
PERIODIC_CONSUMER_ASYNCHRONOUS
106-
107-
}
108-
109-
/**
110-
* The {@link ProcessingOrder} type to use
111-
*/
112-
@Builder.Default
113-
private final ProcessingOrder ordering = ProcessingOrder.KEY;
114-
115-
/**
116-
* The {@link CommitMode} to be used
117-
*/
118-
@Builder.Default
119-
private final CommitMode commitMode = CommitMode.PERIODIC_CONSUMER_ASYNCHRONOUS;
120-
121-
/**
122-
* Controls the maximum degree of concurrency to occur. Used to limit concurrent calls to external systems to a
123-
* maximum to prevent overloading them or to a degree, using up quotas.
124-
* <p>
125-
* A note on quotas - if your quota is expressed as maximum concurrent calls, this works well. If it's limited in
126-
* total requests / sec, this may still overload the system. See towards the distributed rate limiting feature for
127-
* this to be properly addressed: https://github.com/confluentinc/parallel-consumer/issues/24 Add distributed rate
128-
* limiting support #24.
129-
* <p>
130-
* In the core module, this sets the number of threads to use in the core's thread pool.
131-
* <p>
132-
* It's recommended to set this quite high, much higher than core count, as it's expected that these threads will
133-
* spend most of their time blocked waiting for IO. For automatic setting of this variable, look out for issue
134-
* https://github.com/confluentinc/parallel-consumer/issues/21 Dynamic concurrency control with flow control or tcp
135-
* congestion control theory #21.
136-
*/
137-
@Builder.Default
138-
private final int maxConcurrency = 16;
139-
140-
/**
141-
* When a message fails, how long the system should wait before trying that message again.
142-
*/
143-
@Builder.Default
144-
private final Duration defaultMessageRetryDelay = Duration.ofSeconds(1);
145-
146-
public void validate() {
147-
Objects.requireNonNull(consumer, "A consumer must be supplied");
148-
149-
if (isUsingTransactionalProducer() && producer == null) {
150-
throw new IllegalArgumentException(msg("Wanting to use Transaction Producer mode ({}) without supplying a Producer instance",
151-
commitMode));
152-
}
153-
154-
//
155-
WorkContainer.setDefaultRetryDelay(getDefaultMessageRetryDelay());
156-
}
157-
158-
public boolean isUsingTransactionalProducer() {
159-
return commitMode.equals(PERIODIC_TRANSACTIONAL_PRODUCER);
160-
}
161-
162-
public boolean isProducerSupplied() {
163-
return getProducer() != null;
164-
}
165-
}
1+
package io.confluent.parallelconsumer;
2+
3+
/*-
4+
* Copyright (C) 2020-2021 Confluent, Inc.
5+
*/
6+
7+
import io.confluent.parallelconsumer.state.WorkContainer;
8+
import lombok.Builder;
9+
import lombok.Getter;
10+
import lombok.ToString;
11+
import org.apache.kafka.clients.consumer.Consumer;
12+
import org.apache.kafka.clients.producer.Producer;
13+
14+
import java.time.Duration;
15+
import java.util.Objects;
16+
import java.util.function.Function;
17+
18+
import static io.confluent.csid.utils.StringUtils.msg;
19+
import static io.confluent.parallelconsumer.ParallelConsumerOptions.CommitMode.PERIODIC_TRANSACTIONAL_PRODUCER;
20+
21+
/**
22+
* The options for the {@link ParallelEoSStreamProcessor} system.
23+
*
24+
* @see #builder()
25+
* @see ParallelConsumerOptions.ParallelConsumerOptionsBuilder
26+
*/
27+
@Getter
28+
@Builder(toBuilder = true)
29+
@ToString
30+
public class ParallelConsumerOptions<K, V> {
31+
32+
/**
33+
* Required parameter for all use.
34+
*/
35+
private final Consumer<K, V> consumer;
36+
37+
/**
38+
* Supplying a producer is only needed if using the produce flows.
39+
*
40+
* @see ParallelStreamProcessor
41+
*/
42+
private final Producer<K, V> producer;
43+
44+
/**
45+
* Path to Managed executor service for Java EE
46+
*/
47+
@Builder.Default
48+
private final String managedExecutorService = "java:comp/DefaultManagedExecutorService";
49+
50+
/**
51+
* Path to Managed thread factory for Java EE
52+
*/
53+
@Builder.Default
54+
private final String managedThreadFactory = "java:comp/DefaultManagedThreadFactory";
55+
56+
/**
57+
* The ordering guarantee to use.
58+
*/
59+
public enum ProcessingOrder {
60+
61+
/**
62+
* No ordering is guaranteed, not even partition order. Fastest. Concurrency is at most the max number of
63+
* concurrency or max number of uncommitted messages, limited by the max concurrency or uncommitted settings.
64+
*/
65+
UNORDERED,
66+
67+
/**
68+
* Process messages within a partition in order, but process multiple partitions in parallel. Similar to running
69+
* more consumer for a topic. Concurrency is at most the number of partitions.
70+
*/
71+
PARTITION,
72+
73+
/**
74+
* Process messages in key order. Concurrency is at most the number of unique keys in a topic, limited by the
75+
* max concurrency or uncommitted settings.
76+
*/
77+
KEY
78+
}
79+
80+
/**
81+
* The type of commit to be made, with either a transactions configured Producer where messages produced are
82+
* committed back to the Broker along with the offsets they originated from, or with the faster simpler Consumer
83+
* offset system either synchronously or asynchronously
84+
*/
85+
public enum CommitMode {
86+
87+
/**
88+
* Periodically commits through the Producer using transactions. Slowest of the options, but no duplicates in
89+
* Kafka guaranteed (message replay may cause duplicates in external systems which is unavoidable with Kafka).
90+
* <p>
91+
* This is separate from using an IDEMPOTENT Producer, which can be used, along with {@link
92+
* CommitMode#PERIODIC_CONSUMER_SYNC} or {@link CommitMode#PERIODIC_CONSUMER_ASYNCHRONOUS}.
93+
*/
94+
PERIODIC_TRANSACTIONAL_PRODUCER,
95+
96+
/**
97+
* Periodically synchronous commits with the Consumer. Much faster than {@link
98+
* #PERIODIC_TRANSACTIONAL_PRODUCER}. Slower but potentially less duplicates than {@link
99+
* #PERIODIC_CONSUMER_ASYNCHRONOUS} upon replay.
100+
*/
101+
PERIODIC_CONSUMER_SYNC,
102+
103+
/**
104+
* Periodically commits offsets asynchronously. The fastest option, under normal conditions will have few or no
105+
* duplicates. Under failure recovery may have more duplicates than {@link #PERIODIC_CONSUMER_SYNC}.
106+
*/
107+
PERIODIC_CONSUMER_ASYNCHRONOUS
108+
109+
}
110+
111+
/**
112+
* The {@link ProcessingOrder} type to use
113+
*/
114+
@Builder.Default
115+
private final ProcessingOrder ordering = ProcessingOrder.KEY;
116+
117+
/**
118+
* The {@link CommitMode} to be used
119+
*/
120+
@Builder.Default
121+
private final CommitMode commitMode = CommitMode.PERIODIC_CONSUMER_ASYNCHRONOUS;
122+
123+
/**
124+
* Controls the maximum degree of concurrency to occur. Used to limit concurrent calls to external systems to a
125+
* maximum to prevent overloading them or to a degree, using up quotas.
126+
* <p>
127+
* A note on quotas - if your quota is expressed as maximum concurrent calls, this works well. If it's limited in
128+
* total requests / sec, this may still overload the system. See towards the distributed rate limiting feature for
129+
* this to be properly addressed: https://github.com/confluentinc/parallel-consumer/issues/24 Add distributed rate
130+
* limiting support #24.
131+
* <p>
132+
* In the core module, this sets the number of threads to use in the core's thread pool.
133+
* <p>
134+
* It's recommended to set this quite high, much higher than core count, as it's expected that these threads will
135+
* spend most of their time blocked waiting for IO. For automatic setting of this variable, look out for issue
136+
* https://github.com/confluentinc/parallel-consumer/issues/21 Dynamic concurrency control with flow control or tcp
137+
* congestion control theory #21.
138+
*/
139+
@Builder.Default
140+
private final int maxConcurrency = 16;
141+
142+
/**
143+
* When a message fails, how long the system should wait before trying that message again.
144+
*/
145+
@Builder.Default
146+
private final Duration defaultMessageRetryDelay = Duration.ofSeconds(1);
147+
148+
/**
149+
* When present, use this to generate the retry delay, instad of {@link #getDefaultMessageRetryDelay()}.
150+
* <p>
151+
* Overrides {@link #defaultMessageRetryDelay}, even if it's set.
152+
*/
153+
@Builder.Default
154+
private final Function<WorkContainer, Duration> retryDelayProvider;
155+
156+
public static Function<WorkContainer, Duration> retryDelayProviderStatic;
157+
158+
public void validate() {
159+
Objects.requireNonNull(consumer, "A consumer must be supplied");
160+
161+
if (isUsingTransactionalProducer() && producer == null) {
162+
throw new IllegalArgumentException(msg("Wanting to use Transaction Producer mode ({}) without supplying a Producer instance",
163+
commitMode));
164+
}
165+
166+
//
167+
WorkContainer.setDefaultRetryDelay(getDefaultMessageRetryDelay());
168+
ParallelConsumerOptions.retryDelayProviderStatic = getRetryDelayProvider();
169+
}
170+
171+
public boolean isUsingTransactionalProducer() {
172+
return commitMode.equals(PERIODIC_TRANSACTIONAL_PRODUCER);
173+
}
174+
175+
public boolean isProducerSupplied() {
176+
return getProducer() != null;
177+
}
178+
}

parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/WorkContainer.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
*/
66

77
import io.confluent.csid.utils.WallClock;
8+
import io.confluent.parallelconsumer.ParallelConsumerOptions;
89
import lombok.AccessLevel;
910
import lombok.EqualsAndHashCode;
1011
import lombok.Getter;
@@ -61,6 +62,9 @@ public class WorkContainer<K, V> implements Comparable<WorkContainer> {
6162
*/
6263
private Duration retryDelay;
6364

65+
/**
66+
* @see ParallelConsumerOptions#getDefaultMessageRetryDelay()
67+
*/
6468
@Setter
6569
static Duration defaultRetryDelay = Duration.ofSeconds(1);
6670

@@ -123,10 +127,15 @@ private Temporal tryAgainAt(WallClock clock) {
123127
}
124128

125129
public Duration getRetryDelay() {
126-
if (retryDelay == null)
127-
return defaultRetryDelay;
128-
else
129-
return retryDelay;
130+
var retryDelayProvider = ParallelConsumerOptions.retryDelayProviderStatic;
131+
if (retryDelayProvider != null) {
132+
return retryDelayProvider.apply(this);
133+
} else {
134+
if (retryDelay == null)
135+
return defaultRetryDelay;
136+
else
137+
return retryDelay;
138+
}
130139
}
131140

132141
@Override
@@ -193,4 +202,4 @@ public long offset() {
193202
public boolean hasPreviouslyFailed() {
194203
return getNumberOfFailedAttempts() > 0;
195204
}
196-
}
205+
}

0 commit comments

Comments
 (0)