Skip to content

Commit 8841933

Browse files
committed
fix: Gracefully skip if any encoding can’t be used, fix offset encode drop if too big
Test now exercises the metadata stripping flow. Capture issue with run length encoding - short cast.
1 parent 64d2c1b commit 8841933

19 files changed

+150
-73
lines changed

CHANGELOG.adoc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@
33
== v0.2.0.3
44

55
* Fixes
6-
** https://github.com/confluentinc/parallel-consumer/issues/35[Bitset overflow check (#35)] - gracefully drop BitSet encoding as an option if offset different too large
6+
** https://github.com/confluentinc/parallel-consumer/issues/35[Bitset overflow check (#35)] - gracefully drop BitSet or Runlength encoding as an option if offset difference too large (short overflow)
7+
*** A new serialisation format will be added in next version - see https://github.com/confluentinc/parallel-consumer/issues/37[Support BitSet encoding lengths longer than Short.MAX_VALUE #37]
8+
** Gracefully drops encoding attempts if they can't be run
9+
** Fixes a bug in the offset drop if it can't fit in the offset metadata payload
710

811
== v0.2.0.2
912

parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/BitSetEncodingNotSupportedException.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package io.confluent.parallelconsumer;
22

3-
public class BitSetEncodingNotSupportedException extends Exception {
3+
public class BitSetEncodingNotSupportedException extends EncodingNotSupportedException {
44

55
public BitSetEncodingNotSupportedException(String msg) {
66
super(msg);

parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/BrokerPollSystem.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import java.util.concurrent.Executors;
2020
import java.util.concurrent.Future;
2121
import java.util.concurrent.TimeoutException;
22-
import java.util.function.Consumer;
2322

2423
import static io.confluent.csid.utils.BackportUtils.toSeconds;
2524
import static io.confluent.parallelconsumer.ParallelEoSStreamProcessor.State.closed;
@@ -82,7 +81,7 @@ public void supervise() {
8281
try {
8382
booleanFuture.get();
8483
} catch (Exception e) {
85-
throw new InternalError("Error in " + BrokerPollSystem.class.getSimpleName() + " system.", e);
84+
throw new InternalRuntimeError("Error in " + BrokerPollSystem.class.getSimpleName() + " system.", e);
8685
}
8786
}
8887
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package io.confluent.parallelconsumer;
2+
3+
public class EncodingNotSupportedException extends Exception {
4+
public EncodingNotSupportedException(final String message) {
5+
super(message);
6+
}
7+
}

parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/InternalError.java

Lines changed: 0 additions & 20 deletions
This file was deleted.
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package io.confluent.parallelconsumer;
2+
3+
public class InternalRuntimeError extends RuntimeException {
4+
5+
public InternalRuntimeError(final String message) {
6+
super(message);
7+
}
8+
9+
public InternalRuntimeError(final String message, final Throwable cause) {
10+
super(message, cause);
11+
}
12+
13+
public InternalRuntimeError(final Throwable cause) {
14+
super(cause);
15+
}
16+
17+
public InternalRuntimeError(final String message, final Throwable cause, final boolean enableSuppression, final boolean writableStackTrace) {
18+
super(message, cause, enableSuppression, writableStackTrace);
19+
}
20+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package io.confluent.parallelconsumer;
2+
3+
public class OffsetDecodingError extends Exception {
4+
public OffsetDecodingError(final String s, final IllegalArgumentException a) {
5+
super(s, a);
6+
}
7+
}

parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/OffsetEncoder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ public OffsetEncoder(OffsetSimultaneousEncoder offsetSimultaneousEncoder) {
2121

2222
abstract void encodeCompletedOffset(final int rangeIndex);
2323

24-
abstract byte[] serialise();
24+
abstract byte[] serialise() throws EncodingNotSupportedException;
2525

2626
abstract int getEncodedSize();
2727

@@ -33,7 +33,7 @@ byte[] compress() throws IOException {
3333
return OffsetSimpleSerialisation.compressZstd(this.getEncodedBytes());
3434
}
3535

36-
final void register() {
36+
void register() throws EncodingNotSupportedException {
3737
final byte[] bytes = this.serialise();
3838
final OffsetEncoding encodingType = this.getEncodingType();
3939
this.register(encodingType, bytes);

parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/OffsetMapCodecManager.java

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,11 @@
1212

1313
import java.nio.ByteBuffer;
1414
import java.nio.charset.Charset;
15+
import java.nio.charset.StandardCharsets;
1516
import java.util.*;
1617

1718
import static io.confluent.csid.utils.Range.range;
19+
import static io.confluent.csid.utils.StringUtils.msg;
1820
import static java.nio.charset.StandardCharsets.UTF_8;
1921

2022
/**
@@ -44,6 +46,7 @@ public class OffsetMapCodecManager<K, V> {
4446
public static final int DefaultMaxMetadataSize = 4096;
4547

4648
public static final Charset CHARSET_TO_USE = UTF_8;
49+
4750
private final WorkManager<K, V> wm;
4851

4952
org.apache.kafka.clients.consumer.Consumer<K, V> consumer;
@@ -75,31 +78,40 @@ void loadOffsetMapForPartition(final Set<TopicPartition> assignment) {
7578
if (offsetAndMeta != null) {
7679
long offset = offsetAndMeta.offset();
7780
String metadata = offsetAndMeta.metadata();
78-
loadOffsetMetadataPayload(offset, tp, metadata);
81+
try {
82+
loadOffsetMetadataPayload(offset, tp, metadata);
83+
} catch (OffsetDecodingError offsetDecodingError) {
84+
log.error("Error decoding offsets from assigned partition, dropping offset map (will replay previously completed messages - partition: {}, data: {})",
85+
tp, offsetAndMeta, offsetDecodingError);
86+
}
7987
}
8088
});
8189
}
8290

83-
static ParallelConsumer.Tuple<Long, TreeSet<Long>> deserialiseIncompleteOffsetMapFromBase64(long finalBaseComittedOffsetForPartition, String incompleteOffsetMap) {
84-
byte[] decode = Base64.getDecoder().decode(incompleteOffsetMap);
91+
static ParallelConsumer.Tuple<Long, TreeSet<Long>> deserialiseIncompleteOffsetMapFromBase64(long finalBaseComittedOffsetForPartition, String incompleteOffsetMap) throws OffsetDecodingError {
92+
byte[] decode;
93+
try {
94+
decode = OffsetSimpleSerialisation.decodeBase64(incompleteOffsetMap);
95+
} catch (IllegalArgumentException a) {
96+
throw new OffsetDecodingError(msg("Error decoding offset metadata, input was: {}", incompleteOffsetMap), a);
97+
}
8598
ParallelConsumer.Tuple<Long, Set<Long>> incompleteOffsets = decodeCompressedOffsets(finalBaseComittedOffsetForPartition, decode);
8699
TreeSet<Long> longs = new TreeSet<>(incompleteOffsets.getRight());
87100
return ParallelConsumer.Tuple.pairOf(incompleteOffsets.getLeft(), longs);
88101
}
89102

90-
void loadOffsetMetadataPayload(long startOffset, TopicPartition tp, String offsetMetadataPayload) {
103+
void loadOffsetMetadataPayload(long startOffset, TopicPartition tp, String offsetMetadataPayload) throws OffsetDecodingError {
91104
ParallelConsumer.Tuple<Long, TreeSet<Long>> incompletes = deserialiseIncompleteOffsetMapFromBase64(startOffset, offsetMetadataPayload);
92105
wm.raisePartitionHighWaterMark(incompletes.getLeft(), tp);
93106
wm.partitionIncompleteOffsets.put(tp, incompletes.getRight());
94107
}
95108

96-
String makeOffsetMetadataPayload(long finalOffsetForPartition, TopicPartition tp, Set<Long> incompleteOffsets) {
109+
String makeOffsetMetadataPayload(long finalOffsetForPartition, TopicPartition tp, Set<Long> incompleteOffsets) throws EncodingNotSupportedException {
97110
String offsetMap = serialiseIncompleteOffsetMapToBase64(finalOffsetForPartition, tp, incompleteOffsets);
98111
return offsetMap;
99112
}
100113

101-
@SneakyThrows
102-
String serialiseIncompleteOffsetMapToBase64(long finalOffsetForPartition, TopicPartition tp, Set<Long> incompleteOffsets) {
114+
String serialiseIncompleteOffsetMapToBase64(long finalOffsetForPartition, TopicPartition tp, Set<Long> incompleteOffsets) throws EncodingNotSupportedException {
103115
byte[] compressedEncoding = encodeOffsetsCompressed(finalOffsetForPartition, tp, incompleteOffsets);
104116
String b64 = OffsetSimpleSerialisation.base64(compressedEncoding);
105117
return b64;
@@ -113,15 +125,16 @@ String serialiseIncompleteOffsetMapToBase64(long finalOffsetForPartition, TopicP
113125
* <p>
114126
* Can remove string encoding in favour of the boolean array for the `BitSet` if that's how things settle.
115127
*/
116-
@SneakyThrows
117-
byte[] encodeOffsetsCompressed(long finalOffsetForPartition, TopicPartition tp, Set<Long> incompleteOffsets) {
128+
byte[] encodeOffsetsCompressed(long finalOffsetForPartition, TopicPartition tp, Set<Long> incompleteOffsets) throws EncodingNotSupportedException {
118129
Long nextExpectedOffset = wm.partitionOffsetHighWaterMarks.get(tp) + 1;
119130
OffsetSimultaneousEncoder simultaneousEncoder = new OffsetSimultaneousEncoder(finalOffsetForPartition, nextExpectedOffset, incompleteOffsets).invoke();
120131
if (forcedCodec.isPresent()) {
121132
OffsetEncoding forcedOffsetEncoding = forcedCodec.get();
122-
log.warn("Forcing use of {}", forcedOffsetEncoding);
133+
log.warn("Forcing use of {}, for testing", forcedOffsetEncoding);
123134
Map<OffsetEncoding, byte[]> encodingMap = simultaneousEncoder.getEncodingMap();
124135
byte[] bytes = encodingMap.get(forcedOffsetEncoding);
136+
if (bytes == null)
137+
throw new EncodingNotSupportedException(msg("Can't force an encoding that hasn't been run: {}", forcedOffsetEncoding));
125138
return simultaneousEncoder.packEncoding(new EncodedOffsetPair(forcedOffsetEncoding, ByteBuffer.wrap(bytes)));
126139
} else {
127140
return simultaneousEncoder.packSmallest();

parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/OffsetSimultaneousEncoder.java

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ class OffsetSimultaneousEncoder {
6767
* <p>
6868
* Visible for testing.
6969
*/
70-
boolean compressionForced = false;
70+
static boolean compressionForced = false;
7171

7272
/**
7373
* The encoders to run
@@ -169,7 +169,16 @@ public OffsetSimultaneousEncoder invoke() {
169169
}
170170

171171
private void registerEncodings(final Set<? extends OffsetEncoder> encoders) {
172-
encoders.forEach(OffsetEncoder::register);
172+
List<OffsetEncoder> toRemove = new ArrayList<>();
173+
for (OffsetEncoder encoder : encoders) {
174+
try {
175+
encoder.register();
176+
} catch (EncodingNotSupportedException e) {
177+
log.warn("Removing {} encoder, not supported", encoder.getEncodingType().name(), e);
178+
toRemove.add(encoder);
179+
}
180+
}
181+
encoders.removeAll(toRemove);
173182

174183
// compressed versions
175184
// sizes over LARGE_INPUT_MAP_SIZE_THRESHOLD bytes seem to benefit from compression
@@ -184,7 +193,10 @@ private void registerEncodings(final Set<? extends OffsetEncoder> encoders) {
184193
*
185194
* @see #packEncoding(EncodedOffsetPair)
186195
*/
187-
public byte[] packSmallest() {
196+
public byte[] packSmallest() throws EncodingNotSupportedException {
197+
if (sortedEncodings.isEmpty()) {
198+
throw new EncodingNotSupportedException("No encodings could be used");
199+
}
188200
final EncodedOffsetPair best = this.sortedEncodings.first();
189201
log.debug("Compression chosen is: {}", best.encoding.name());
190202
return packEncoding(best);

parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelConsumerOptions.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,9 @@ public enum CommitMode {
106106
* <p>
107107
* At the moment this is a sort of sanity check, and was chosen rather arbitriarly. However, one should consider
108108
* that this is per client, and is a total across all assigned partitions.
109+
* <p>
110+
* It's important that this is small enough, that you're not at risk of the broker expiring log segments where the
111+
* oldest offset resides.
109112
*/
110113
@Builder.Default
111114
private final int maxNumberMessagesBeyondBaseCommitOffset = 1000;

parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
234234
wm.onPartitionsRevoked(partitions);
235235
usersConsumerRebalanceListener.ifPresent(x -> x.onPartitionsRevoked(partitions));
236236
} catch (Exception e) {
237-
throw new InternalError("onPartitionsRevoked event error", e);
237+
throw new InternalRuntimeError("onPartitionsRevoked event error", e);
238238
}
239239
}
240240

parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/RunLengthEncoder.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import java.util.concurrent.atomic.AtomicBoolean;
88
import java.util.concurrent.atomic.AtomicInteger;
99

10+
import static io.confluent.csid.utils.StringUtils.msg;
1011
import static io.confluent.parallelconsumer.OffsetEncoding.RunLength;
1112
import static io.confluent.parallelconsumer.OffsetEncoding.RunLengthCompressed;
1213

@@ -47,13 +48,15 @@ public void encodeCompletedOffset(final int rangeIndex) {
4748
}
4849

4950
@Override
50-
public byte[] serialise() {
51+
public byte[] serialise() throws EncodingNotSupportedException {
5152
runLengthEncodingIntegers.add(currentRunLengthCount.get()); // add tail
5253

5354
ByteBuffer runLengthEncodedByteBuffer = ByteBuffer.allocate(runLengthEncodingIntegers.size() * Short.BYTES);
54-
for (final Integer i : runLengthEncodingIntegers) {
55-
final short value = i.shortValue();
56-
runLengthEncodedByteBuffer.putShort(value);
55+
for (final Integer runlength : runLengthEncodingIntegers) {
56+
final short shortCastRunlength = runlength.shortValue();
57+
if (runlength != shortCastRunlength)
58+
throw new RunlengthV1EncodingNotSupported(msg("Runlength too long for Short ({} cast to {})", runlength, shortCastRunlength));
59+
runLengthEncodedByteBuffer.putShort(shortCastRunlength);
5760
}
5861

5962
byte[] array = runLengthEncodedByteBuffer.array();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package io.confluent.parallelconsumer;
2+
3+
public class RunlengthV1EncodingNotSupported extends EncodingNotSupportedException {
4+
public RunlengthV1EncodingNotSupported(final String msg) {
5+
super(msg);
6+
}
7+
}

parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/WorkManager.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -458,10 +458,10 @@ boolean hasCommittableOffsets() {
458458

459459
/**
460460
* TODO: This entire loop could be possibly redundant, if we instead track low water mark, and incomplete offsets as
461-
* work is submitted and returned.
461+
* work is submitted and returned.
462+
* <p>
462463
* todo: refactor into smaller methods?
463464
*/
464-
@SneakyThrows
465465
<R> Map<TopicPartition, OffsetAndMetadata> findCompletedEligibleOffsetsAndRemove(boolean remove) {
466466
Map<TopicPartition, OffsetAndMetadata> offsetsToSend = new HashMap<>();
467467
int count = 0;
@@ -526,10 +526,14 @@ <R> Map<TopicPartition, OffsetAndMetadata> findCompletedEligibleOffsetsAndRemove
526526
}
527527

528528
OffsetMapCodecManager<K, V> om = new OffsetMapCodecManager<>(this, this.consumer);
529-
String offsetMapPayload = om.makeOffsetMetadataPayload(offsetOfNextExpectedMessage, topicPartitionKey, incompleteOffsets);
530-
totalOffsetMetaCharacterLength += offsetMapPayload.length();
531-
OffsetAndMetadata offsetWithExtraMap = new OffsetAndMetadata(offsetOfNextExpectedMessage, offsetMapPayload);
532-
offsetsToSend.put(topicPartitionKey, offsetWithExtraMap);
529+
try {
530+
String offsetMapPayload = om.makeOffsetMetadataPayload(offsetOfNextExpectedMessage, topicPartitionKey, incompleteOffsets);
531+
totalOffsetMetaCharacterLength += offsetMapPayload.length();
532+
OffsetAndMetadata offsetWithExtraMap = new OffsetAndMetadata(offsetOfNextExpectedMessage, offsetMapPayload);
533+
offsetsToSend.put(topicPartitionKey, offsetWithExtraMap);
534+
} catch (EncodingNotSupportedException e) {
535+
log.warn("No encodings could be used to encode the offset map, skipping. Warning: messages might be replayed on rebalance", e);
536+
}
533537
}
534538

535539
if (remove) {
@@ -571,9 +575,11 @@ private void maybeStripOffsetPayload(Map<TopicPartition, OffsetAndMetadata> offs
571575
for (var entry : offsetsToSend.entrySet()) {
572576
TopicPartition key = entry.getKey();
573577
OffsetAndMetadata v = entry.getValue();
574-
OffsetAndMetadata stripped = new OffsetAndMetadata(v.offset(), v.toString());
578+
OffsetAndMetadata stripped = new OffsetAndMetadata(v.offset()); // meta data gone
575579
offsetsToSend.replace(key, stripped);
576580
}
581+
} else {
582+
log.debug("Offset map small enough to fit in payload: {} (max: {})", totalOffsetMetaCharacterLength, OffsetMapCodecManager.DefaultMaxMetadataSize);
577583
}
578584
}
579585

parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/VeryLargeMessageVolumeTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@ public class VeryLargeMessageVolumeTest extends BrokerIntegrationTest<String, St
6060

6161

6262
@Test
63-
// @Disabled("See #35, $37")
6463
public void shouldNotThrowBitsetTooLongException() {
6564
runTest(HIGH_MAX_POLL_RECORDS_CONFIG, CommitMode.CONSUMER_ASYNCHRONOUS, ProcessingOrder.UNORDERED);
6665
}

0 commit comments

Comments
 (0)