Skip to content

Commit d4cc21a

Browse files
committed
Gracefully drop bitset encoding if offset difference too large
RunLength encoding will still be used. See confluentinc#37 Support BitSet encoding lengths longer than Short.MAX_VALUE confluentinc#37 A test did try to cover this, but the offset difference wasn't large enough.
1 parent 4c46dd0 commit d4cc21a

File tree

14 files changed

+183
-112
lines changed

14 files changed

+183
-112
lines changed

CHANGELOG.adoc

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
11
= Change Log
22

3+
== v0.2.0.3
4+
5+
* Fixes
6+
** https://github.com/confluentinc/parallel-consumer/issues/35[Bitset overflow check (#35)] - gracefully drop BitSet encoding as an option if offset different too large
7+
38
== v0.2.0.2
49

510
* Fixes
611
** Turns back on the https://github.com/confluentinc/parallel-consumer/issues/35[Bitset overflow check (#35)]
712

8-
== v0.2.0.1 DO NOT USE
13+
== v0.2.0.1 DO NOT USE - has critical bug
914

1015
* Fixes
1116
** Incorrectly turns off an over-flow check in https://github.com/confluentinc/parallel-consumer/issues/35[offset serialisation system (#35)]

parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/BitsetEncoder.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,21 @@
88

99
class BitsetEncoder extends OffsetEncoder {
1010

11+
public static final Short MAX_LENGTH_ENCODABLE = Short.MAX_VALUE;
12+
1113
private final ByteBuffer wrappedBitsetBytesBuffer;
1214
private final BitSet bitSet;
1315

1416
private Optional<byte[]> encodedBytes = Optional.empty();
1517

16-
public BitsetEncoder(final int length, OffsetSimultaneousEncoder offsetSimultaneousEncoder) {
18+
public BitsetEncoder(int length, OffsetSimultaneousEncoder offsetSimultaneousEncoder) {
1719
super(offsetSimultaneousEncoder);
18-
// prep bit set buffer
19-
this.wrappedBitsetBytesBuffer = ByteBuffer.allocate(Short.BYTES + ((length / 8) + 1));
20-
if (length > Short.MAX_VALUE) {
20+
if (length > MAX_LENGTH_ENCODABLE) {
2121
// need to upgrade to using Integer for the bitset length, but can't change serialisation format in-place
22-
throw new RuntimeException("Bitset too long to encode, bitset length overflows Short.MAX_VALUE: " + length + ". (max: " + Short.MAX_VALUE + ")");
22+
throw new IllegalArgumentException("Bitset too long to encode, bitset length overflows Short.MAX_VALUE: " + length + ". (max: " + Short.MAX_VALUE + ")");
2323
}
24+
// prep bit set buffer
25+
this.wrappedBitsetBytesBuffer = ByteBuffer.allocate(Short.BYTES + ((length / 8) + 1));
2426
// bitset doesn't serialise it's set capacity, so we have to as the unused capacity actually means something
2527
this.wrappedBitsetBytesBuffer.putShort((short) length);
2628
bitSet = new BitSet(length);
@@ -37,12 +39,12 @@ protected OffsetEncoding getEncodingTypeCompressed() {
3739
}
3840

3941
@Override
40-
public void containsIndex(final int rangeIndex) {
42+
public void encodeIncompleteOffset(final int rangeIndex) {
4143
//noop
4244
}
4345

4446
@Override
45-
public void doesNotContainIndex(final int rangeIndex) {
47+
public void encodeCompletedOffset(final int rangeIndex) {
4648
bitSet.set(rangeIndex);
4749
}
4850

parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/BrokerPollSystem.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ private boolean isResponsibleForCommits() {
162162

163163
private ConsumerRecords<K, V> pollBrokerForRecords() {
164164
managePauseOfSubscription();
165+
log.debug("Subscriptions are paused: {}", paused);
165166

166167
Duration thisLongPollTimeout = (state == ParallelEoSStreamProcessor.State.running) ? BrokerPollSystem.longPollTimeout : Duration.ofMillis(1); // Can't use Duration.ZERO - this causes Object#wait to wait forever
167168

parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ByteBufferEncoder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,12 @@ protected OffsetEncoding getEncodingTypeCompressed() {
2525
}
2626

2727
@Override
28-
public void containsIndex(final int rangeIndex) {
28+
public void encodeIncompleteOffset(final int rangeIndex) {
2929
this.bytesBuffer.put((byte) 0);
3030
}
3131

3232
@Override
33-
public void doesNotContainIndex(final int rangeIndex) {
33+
public void encodeCompletedOffset(final int rangeIndex) {
3434
this.bytesBuffer.put((byte) 1);
3535
}
3636

parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/OffsetBitSet.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
* Copyright (C) 2020 Confluent, Inc.
55
*/
66

7+
import io.confluent.parallelconsumer.ParallelConsumer.Tuple;
78
import lombok.extern.slf4j.Slf4j;
89

910
import java.nio.ByteBuffer;
@@ -38,18 +39,18 @@ static String deserialiseBitSet(int originalBitsetSize, ByteBuffer s) {
3839
return result.toString();
3940
}
4041

41-
static ParallelConsumer.Tuple<Long, Set<Long>> deserialiseBitSetWrapToIncompletes(long baseOffset, ByteBuffer wrap) {
42+
static Tuple<Long, Set<Long>> deserialiseBitSetWrapToIncompletes(long baseOffset, ByteBuffer wrap) {
4243
wrap.rewind();
4344
short originalBitsetSize = wrap.getShort();
4445
ByteBuffer slice = wrap.slice();
4546
Set<Long> incompletes = deserialiseBitSetToIncompletes(baseOffset, originalBitsetSize, slice);
4647
long highwaterMark = baseOffset + originalBitsetSize;
47-
return ParallelConsumer.Tuple.pairOf(highwaterMark, incompletes);
48+
return Tuple.pairOf(highwaterMark, incompletes);
4849
}
4950

5051
static Set<Long> deserialiseBitSetToIncompletes(long baseOffset, int originalBitsetSize, ByteBuffer inputBuffer) {
5152
BitSet bitSet = BitSet.valueOf(inputBuffer);
52-
var incompletes = new HashSet<Long>(1); // can't know how big this needs to be yet
53+
var incompletes = new HashSet<Long>(); // can't know how big this needs to be yet
5354
for (var relativeOffset : range(originalBitsetSize)) {
5455
long offset = baseOffset + relativeOffset;
5556
if (bitSet.get(relativeOffset)) {

parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/OffsetEncoder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@ public OffsetEncoder(OffsetSimultaneousEncoder offsetSimultaneousEncoder) {
1717

1818
protected abstract OffsetEncoding getEncodingTypeCompressed();
1919

20-
abstract void containsIndex(final int rangeIndex);
20+
abstract void encodeIncompleteOffset(final int rangeIndex);
2121

22-
abstract void doesNotContainIndex(final int rangeIndex);
22+
abstract void encodeCompletedOffset(final int rangeIndex);
2323

2424
abstract byte[] serialise();
2525

parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/OffsetSimultaneousEncoder.java

Lines changed: 55 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
*/
66

77
import lombok.Getter;
8-
import lombok.SneakyThrows;
98
import lombok.extern.slf4j.Slf4j;
109

1110
import java.nio.ByteBuffer;
@@ -17,6 +16,8 @@
1716
* Encode with multiple strategies at the same time.
1817
* <p>
1918
* Have results in an accessible structure, easily selecting the highest compression.
19+
*
20+
* @see #invoke()
2021
*/
2122
@Slf4j
2223
class OffsetSimultaneousEncoder {
@@ -42,6 +43,11 @@ class OffsetSimultaneousEncoder {
4243
*/
4344
private final long nextExpectedOffset;
4445

46+
/**
47+
* TODO docs
48+
*/
49+
private final int length;
50+
4551
/**
4652
* Map of different encoding types for the same offset data, used for retrieving the data for the encoding type
4753
*/
@@ -56,10 +62,50 @@ class OffsetSimultaneousEncoder {
5662
@Getter
5763
TreeSet<EncodedOffsetPair> sortedEncodings = new TreeSet<>();
5864

65+
/**
66+
* Force the encoder to also add the compressed versions. Useful for testing.
67+
* <p>
68+
* Visible for testing.
69+
*/
70+
boolean compressionForced = false;
71+
72+
/**
73+
* todo docs
74+
*/
75+
private final Set<OffsetEncoder> encoders = new HashSet<>();
76+
5977
public OffsetSimultaneousEncoder(long lowWaterMark, Long nextExpectedOffset, Set<Long> incompleteOffsets) {
6078
this.lowWaterMark = lowWaterMark;
6179
this.nextExpectedOffset = nextExpectedOffset;
6280
this.incompleteOffsets = incompleteOffsets;
81+
82+
length = (int) (this.nextExpectedOffset - this.lowWaterMark);
83+
84+
initEncoders();
85+
}
86+
87+
private void initEncoders() {
88+
if (length > LARGE_INPUT_MAP_SIZE_THRESHOLD) {
89+
log.debug("~Large input map size: {} (start: {} end: {})", length, lowWaterMark, nextExpectedOffset);
90+
}
91+
92+
try {
93+
BitsetEncoder bitsetEncoder = new BitsetEncoder(length, this);
94+
encoders.add(bitsetEncoder);
95+
} catch (IllegalArgumentException a) {
96+
log.warn("Cannot use {} encoder", BitsetEncoder.class.getSimpleName(), a);
97+
}
98+
99+
encoders.add(new RunLengthEncoder(this));
100+
}
101+
102+
/**
103+
* Not enabled as byte buffer seems to always be beaten by BitSet, which makes sense
104+
* <p>
105+
* Visible for testing
106+
*/
107+
void addByteBufferEncoder() {
108+
encoders.add(new ByteBufferEncoder(length, this));
63109
}
64110

65111
/**
@@ -84,32 +130,22 @@ public OffsetSimultaneousEncoder(long lowWaterMark, Long nextExpectedOffset, Set
84130
* TODO: optimisation - could double the run-length range from Short.MAX_VALUE (~33,000) to Short.MAX_VALUE * 2
85131
* (~66,000) by using unsigned shorts instead (higest representable relative offset is Short.MAX_VALUE because each
86132
* runlength entry is a Short)
133+
* <p>
134+
* TODO VERY large offests ranges are slow (Integer.MAX_VALUE) - encoding scans could be avoided if passing in map of incompletes which should already be known
87135
*/
88-
@SneakyThrows
89136
public OffsetSimultaneousEncoder invoke() {
90-
log.trace("Starting encode of incompletes of {}, base offset is: {}", this.incompleteOffsets, lowWaterMark);
91-
92-
final int length = (int) (this.nextExpectedOffset - this.lowWaterMark);
93-
94-
if (length > LARGE_INPUT_MAP_SIZE_THRESHOLD) {
95-
log.debug("~Large input map size: {}", length);
96-
}
97-
98-
final Set<OffsetEncoder> encoders = new HashSet<>();
99-
encoders.add(new BitsetEncoder(length, this));
100-
encoders.add(new RunLengthEncoder(this));
101-
// TODO: Remove? byte buffer seems to always be beaten by BitSet, which makes sense
102-
// encoders.add(new ByteBufferEncoder(length));
137+
log.debug("Starting encode of incompletes, base offset is: {}, end offset is: {}", lowWaterMark, nextExpectedOffset);
138+
log.trace("Incompletes are: {}", this.incompleteOffsets);
103139

104140
//
105141
log.debug("Encode loop offset start,end: [{},{}] length: {}", this.lowWaterMark, this.nextExpectedOffset, length);
106142
range(length).forEach(rangeIndex -> {
107143
final long offset = this.lowWaterMark + rangeIndex;
108144
if (this.incompleteOffsets.contains(offset)) {
109145
log.trace("Found an incomplete offset {}", offset);
110-
encoders.forEach(x -> x.containsIndex(rangeIndex));
146+
encoders.forEach(x -> x.encodeIncompleteOffset(rangeIndex));
111147
} else {
112-
encoders.forEach(x -> x.doesNotContainIndex(rangeIndex));
148+
encoders.forEach(x -> x.encodeCompletedOffset(rangeIndex));
113149
}
114150
});
115151

@@ -126,8 +162,8 @@ private void registerEncodings(final Set<? extends OffsetEncoder> encoders) {
126162

127163
// compressed versions
128164
// sizes over LARGE_INPUT_MAP_SIZE_THRESHOLD bytes seem to benefit from compression
129-
final boolean noEncodingsAreSmallEnough = encoders.stream().noneMatch(OffsetEncoder::quiteSmall);
130-
if (noEncodingsAreSmallEnough) {
165+
boolean noEncodingsAreSmallEnough = encoders.stream().noneMatch(OffsetEncoder::quiteSmall);
166+
if (noEncodingsAreSmallEnough || compressionForced) {
131167
encoders.forEach(OffsetEncoder::registerCompressed);
132168
}
133169
}

parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/ParallelEoSStreamProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,7 @@ private void doClose(Duration timeout) throws TimeoutException, ExecutionExcepti
435435
log.debug("Shutting down execution pool...");
436436
List<Runnable> unfinished = workerPool.shutdownNow();
437437
if (!unfinished.isEmpty()) {
438-
log.warn("Threads not done: {}", unfinished);
438+
log.warn("Threads not done count: {}", unfinished.size());
439439
}
440440

441441
log.trace("Awaiting worker pool termination...");

parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/RunLengthEncoder.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,12 @@ protected OffsetEncoding getEncodingTypeCompressed() {
3737
}
3838

3939
@Override
40-
public void containsIndex(final int rangeIndex) {
40+
public void encodeIncompleteOffset(final int rangeIndex) {
4141
encodeRunLength(false);
4242
}
4343

4444
@Override
45-
public void doesNotContainIndex(final int rangeIndex) {
45+
public void encodeCompletedOffset(final int rangeIndex) {
4646
encodeRunLength(true);
4747
}
4848

parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/WorkManager.java

Lines changed: 0 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -60,15 +60,6 @@ public class WorkManager<K, V> implements ConsumerRebalanceListener {
6060
private final Map<Object, NavigableMap<Long, WorkContainer<K, V>>> processingShards = new ConcurrentHashMap<>();
6161
private final LinkedBlockingQueue<ConsumerRecords<K, V>> workInbox = new LinkedBlockingQueue<>();
6262

63-
/**
64-
* Used to make sure when we're garbage collecting empty shards for KEY ordering, that we don't cause issues for
65-
* contentious keys.
66-
*
67-
* @see #registerWork
68-
* @see #success
69-
*/
70-
// private final ReentrantReadWriteLock shardsLock = new ReentrantReadWriteLock(true);
71-
7263
/**
7364
* Map of partitions to Map of offsets to WorkUnits
7465
* <p>
@@ -258,40 +249,6 @@ private void processInbox(ConsumerRecords<K, V> records) {
258249

259250
processingShards.computeIfAbsent(shardKey, (ignore) -> new ConcurrentSkipListMap<>()).put(offset, wc);
260251

261-
/**
262-
* This method gets called by {@link BrokerPollSystem}, so needs to be thread safe, and as
263-
* {@link #processingShards} can be edited on {@link #success}, need to synchronise write access to
264-
* computing shard keys
265-
* @see #success
266-
*/
267-
// ReentrantReadWriteLock.ReadLock readLock = shardsLock.readLock();
268-
// readLock.lock();
269-
// try {
270-
// processingShards.computeIfAbsent(shardKey, (ignore) -> new ConcurrentSkipListMap<>()).put(offset, wc);
271-
// if (!processingShards.containsKey(shardKey)) { // optimistic
272-
//// readLock.unlock();
273-
// log.debug("Getting write lock");
274-
// ReentrantReadWriteLock.WriteLock writeLock = shardsLock.writeLock();
275-
// writeLock.lock();
276-
// try {
277-
// if (!processingShards.containsKey(shardKey)) {
278-
// processingShards.put(shardKey, new ConcurrentSkipListMap<>());
279-
// }
280-
// } finally {
281-
// writeLock.unlock();
282-
// }
283-
// }
284-
//// else {
285-
//// log.warn("shard key already exists {}", shardKey);
286-
//// }
287-
// ReentrantReadWriteLock.ReadLock readLock = shardsLock.readLock();
288-
// readLock.lock();
289-
// try {
290-
// processingShards.get(shardKey).put(offset, wc);
291-
// } finally {
292-
// readLock.unlock();
293-
// }
294-
295252
partitionCommitQueues.computeIfAbsent(tp, (ignore) -> new ConcurrentSkipListMap<>())
296253
.put(offset, wc);
297254
}

parallel-consumer-core/src/test-integration/java/io/confluent/parallelconsumer/integrationTests/VeryLargeMessageVolumeTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public class VeryLargeMessageVolumeTest extends BrokerIntegrationTest<String, St
6060

6161

6262
@Test
63-
@Disabled("See #35, $37")
63+
// @Disabled("See #35, $37")
6464
public void shouldNotThrowBitsetTooLongException() {
6565
runTest(HIGH_MAX_POLL_RECORDS_CONFIG, CommitMode.CONSUMER_ASYNCHRONOUS, ProcessingOrder.UNORDERED);
6666
}
@@ -138,7 +138,8 @@ private void runTest(int maxPoll, CommitMode commitMode, ProcessingOrder order)
138138
var failureMessage = StringUtils.msg("All keys sent to input-topic should be processed and produced, within time (expected: {} commit: {} order: {} max poll: {})",
139139
expectedMessageCount, commitMode, order, maxPoll);
140140
try {
141-
waitAtMost(ofSeconds(200))
141+
waitAtMost(ofSeconds(2000))
142+
.failFast(()->pc.isClosedOrFailed())
142143
.alias(failureMessage).untilAsserted(() -> {
143144
log.info("Processed-count: {}, Produced-count: {}", processedCount.get(), producedCount.get());
144145
SoftAssertions all = new SoftAssertions();

0 commit comments

Comments
 (0)