Skip to content

Commit 1678e59

Browse files
committed
feature: confluentinc#37 Adds support for larger bitset and run-length encodings
1 parent c4dd89b commit 1678e59

File tree

13 files changed

+300
-127
lines changed

13 files changed

+300
-127
lines changed

CHANGELOG.adoc

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
= Change Log
22

3+
== next
4+
5+
* Fixes
6+
** https://github.com/confluentinc/parallel-consumer/issues/37[Support BitSet encoding lengths longer than Short.MAX_VALUE #37] - adds new serialisation formats that supports wider range of offsets - (32,767 vs 2,147,483,647) for both BitSet and run-length encoding
7+
38
== v0.2.0.3
49

510
* Fixes

parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/BitsetEncoder.java

Lines changed: 50 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import java.util.BitSet;
77
import java.util.Optional;
88

9-
import static io.confluent.parallelconsumer.OffsetEncoding.BitSetCompressed;
9+
import static io.confluent.parallelconsumer.OffsetEncoding.*;
1010

1111
/**
1212
* Encodes a range of offsets, from an incompletes collection into a BitSet.
@@ -30,34 +30,77 @@
3030
*/
3131
class BitsetEncoder extends OffsetEncoder {
3232

33-
public static final Short MAX_LENGTH_ENCODABLE = Short.MAX_VALUE;
33+
private final Version version; // default to new version
3434

35-
private final ByteBuffer wrappedBitsetBytesBuffer;
35+
private static final Version DEFAULT_VERSION = Version.v2;
36+
37+
public static final Integer MAX_LENGTH_ENCODABLE = Integer.MAX_VALUE;
38+
39+
private ByteBuffer wrappedBitsetBytesBuffer;
3640
private final BitSet bitSet;
3741

3842
private Optional<byte[]> encodedBytes = Optional.empty();
3943

4044
public BitsetEncoder(int length, OffsetSimultaneousEncoder offsetSimultaneousEncoder) throws BitSetEncodingNotSupportedException {
45+
this(length, offsetSimultaneousEncoder, DEFAULT_VERSION);
46+
}
47+
48+
public BitsetEncoder(int length, OffsetSimultaneousEncoder offsetSimultaneousEncoder, Version newVersion) throws BitSetEncodingNotSupportedException {
4149
super(offsetSimultaneousEncoder);
50+
51+
this.version = newVersion;
52+
53+
switch (newVersion) {
54+
case v1 -> initV1(length);
55+
case v2 -> initV2(length);
56+
}
57+
bitSet = new BitSet(length);
58+
}
59+
60+
/**
61+
* Switch from encoding bitset length as a short to an integer (length of 32,000 was reasonable too short).
62+
* <p>
63+
* Integer.MAX_VALUE should always be good enough as system restricts large from being processed at once.
64+
*/
65+
private void initV2(int length) throws BitSetEncodingNotSupportedException {
4266
if (length > MAX_LENGTH_ENCODABLE) {
4367
// need to upgrade to using Integer for the bitset length, but can't change serialisation format in-place
44-
throw new BitSetEncodingNotSupportedException(StringUtils.msg("Bitset too long to encode, as length overflows Short.MAX_VALUE. Length: {}. (max: {})", length, Short.MAX_VALUE));
68+
throw new BitSetEncodingNotSupportedException(StringUtils.msg("Bitset V2 too long to encode, as length overflows Integer.MAX_VALUE. Length: {}. (max: {})", length, MAX_LENGTH_ENCODABLE));
69+
}
70+
// prep bit set buffer
71+
this.wrappedBitsetBytesBuffer = ByteBuffer.allocate(Integer.BYTES + ((length / 8) + 1));
72+
// bitset doesn't serialise it's set capacity, so we have to as the unused capacity actually means something
73+
this.wrappedBitsetBytesBuffer.putInt(length);
74+
}
75+
76+
/**
77+
* This was a bit "short" sighted of me....
78+
*/
79+
private void initV1(int length) throws BitSetEncodingNotSupportedException {
80+
if (length > Short.MAX_VALUE) {
81+
// need to upgrade to using Integer for the bitset length, but can't change serialisation format in-place
82+
throw new BitSetEncodingNotSupportedException("Bitset V1 too long to encode, bitset length overflows Short.MAX_VALUE: " + length + ". (max: " + Short.MAX_VALUE + ")");
4583
}
4684
// prep bit set buffer
4785
this.wrappedBitsetBytesBuffer = ByteBuffer.allocate(Short.BYTES + ((length / 8) + 1));
4886
// bitset doesn't serialise it's set capacity, so we have to as the unused capacity actually means something
4987
this.wrappedBitsetBytesBuffer.putShort((short) length);
50-
bitSet = new BitSet(length);
5188
}
5289

5390
@Override
5491
protected OffsetEncoding getEncodingType() {
55-
return OffsetEncoding.BitSet;
92+
return switch (version) {
93+
case v1 -> BitSet;
94+
case v2 -> BitSetV2;
95+
};
5696
}
5797

5898
@Override
5999
protected OffsetEncoding getEncodingTypeCompressed() {
60-
return BitSetCompressed;
100+
return switch (version) {
101+
case v1 -> BitSetCompressed;
102+
case v2 -> BitSetV2Compressed;
103+
};
61104
}
62105

63106
@Override

parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/EncodedOffsetPair.java

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@
1414

1515
import static io.confluent.parallelconsumer.OffsetBitSet.deserialiseBitSetWrap;
1616
import static io.confluent.parallelconsumer.OffsetBitSet.deserialiseBitSetWrapToIncompletes;
17+
import static io.confluent.parallelconsumer.OffsetEncoding.*;
18+
import static io.confluent.parallelconsumer.OffsetEncoding.Version.v1;
19+
import static io.confluent.parallelconsumer.OffsetEncoding.Version.v2;
1720
import static io.confluent.parallelconsumer.OffsetRunLength.*;
1821
import static io.confluent.parallelconsumer.OffsetSimpleSerialisation.decompressZstd;
1922
import static io.confluent.parallelconsumer.OffsetSimpleSerialisation.deserialiseByteArrayToBitMapString;
@@ -67,7 +70,7 @@ private static byte[] copyBytesOutOfBufferForDebug(ByteBuffer bbData) {
6770
static EncodedOffsetPair unwrap(byte[] input) {
6871
ByteBuffer wrap = ByteBuffer.wrap(input).asReadOnlyBuffer();
6972
byte magic = wrap.get();
70-
OffsetEncoding decode = OffsetEncoding.decode(magic);
73+
OffsetEncoding decode = decode(magic);
7174
ByteBuffer slice = wrap.slice();
7275

7376
return new EncodedOffsetPair(decode, slice);
@@ -78,10 +81,15 @@ public String getDecodedString() {
7881
String binaryArrayString = switch (encoding) {
7982
case ByteArray -> deserialiseByteArrayToBitMapString(data);
8083
case ByteArrayCompressed -> deserialiseByteArrayToBitMapString(decompressZstd(data));
81-
case BitSet -> deserialiseBitSetWrap(data);
82-
case BitSetCompressed -> deserialiseBitSetWrap(decompressZstd(data));
84+
case BitSet -> deserialiseBitSetWrap(data, v1);
85+
case BitSetCompressed -> deserialiseBitSetWrap(decompressZstd(data), v1);
8386
case RunLength -> runLengthDecodeToString(runLengthDeserialise(data));
8487
case RunLengthCompressed -> runLengthDecodeToString(runLengthDeserialise(decompressZstd(data)));
88+
case BitSetV2-> deserialiseBitSetWrap(data, v2);
89+
case BitSetV2Compressed-> deserialiseBitSetWrap(data, v2);
90+
case RunLengthV2-> deserialiseBitSetWrap(data, v2);
91+
case RunLengthV2Compressed-> deserialiseBitSetWrap(data, v2);
92+
default -> throw new InternalRuntimeError("Invalid state"); // todo why is this needed? what's not covered?
8593
};
8694
return binaryArrayString;
8795
}
@@ -91,11 +99,15 @@ public Tuple<Long, Set<Long>> getDecodedIncompletes(long baseOffset) {
9199
Tuple<Long, Set<Long>> binaryArrayString = switch (encoding) {
92100
// case ByteArray -> deserialiseByteArrayToBitMapString(data);
93101
// case ByteArrayCompressed -> deserialiseByteArrayToBitMapString(decompressZstd(data));
94-
case BitSet -> deserialiseBitSetWrapToIncompletes(baseOffset, data);
95-
case BitSetCompressed -> deserialiseBitSetWrapToIncompletes(baseOffset, decompressZstd(data));
96-
case RunLength -> runLengthDecodeToIncompletes(baseOffset, data);
97-
case RunLengthCompressed -> runLengthDecodeToIncompletes(baseOffset, decompressZstd(data));
98-
default -> throw new UnsupportedOperationException("Encoding (" + encoding.name() + ") not supported");
102+
case BitSet -> deserialiseBitSetWrapToIncompletes(encoding, baseOffset, data);
103+
case BitSetCompressed -> deserialiseBitSetWrapToIncompletes(BitSet, baseOffset, decompressZstd(data));
104+
case RunLength -> runLengthDecodeToIncompletes(encoding, baseOffset, data);
105+
case RunLengthCompressed -> runLengthDecodeToIncompletes(RunLength, baseOffset, decompressZstd(data));
106+
case BitSetV2 -> deserialiseBitSetWrapToIncompletes(encoding, baseOffset, data);
107+
case BitSetV2Compressed -> deserialiseBitSetWrapToIncompletes(BitSetV2, baseOffset, decompressZstd(data));
108+
case RunLengthV2 -> runLengthDecodeToIncompletes(encoding, baseOffset, data);
109+
case RunLengthV2Compressed -> runLengthDecodeToIncompletes(RunLengthV2, baseOffset, decompressZstd(data));
110+
default -> throw new UnsupportedOperationException("Encoding (" + encoding.description() + ") not supported");
99111
};
100112
return binaryArrayString;
101113
}

parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/OffsetBitSet.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,23 @@
1515
import static io.confluent.csid.utils.Range.range;
1616

1717
/**
18+
* Deserialisation tools for {@link BitsetEncoder}.
19+
* <p>
20+
* todo unify or refactor with {@link BitsetEncoder}. Why was it ever seperate?
21+
*
1822
* @see BitsetEncoder
1923
*/
2024
@Slf4j
2125
public class OffsetBitSet {
2226

23-
static String deserialiseBitSetWrap(ByteBuffer wrap) {
27+
static String deserialiseBitSetWrap(ByteBuffer wrap, OffsetEncoding.Version version) {
2428
wrap.rewind();
25-
short originalBitsetSize = wrap.getShort();
29+
30+
int originalBitsetSize = switch (version) {
31+
case v1 -> (int)wrap.getShort(); // up cast ok
32+
case v2 -> wrap.getInt();
33+
};
34+
2635
ByteBuffer slice = wrap.slice();
2736
return deserialiseBitSet(originalBitsetSize, slice);
2837
}
@@ -42,9 +51,13 @@ static String deserialiseBitSet(int originalBitsetSize, ByteBuffer s) {
4251
return result.toString();
4352
}
4453

45-
static Tuple<Long, Set<Long>> deserialiseBitSetWrapToIncompletes(long baseOffset, ByteBuffer wrap) {
54+
static Tuple<Long, Set<Long>> deserialiseBitSetWrapToIncompletes(OffsetEncoding encoding, long baseOffset, ByteBuffer wrap) {
4655
wrap.rewind();
47-
short originalBitsetSize = wrap.getShort();
56+
int originalBitsetSize = switch(encoding) {
57+
case BitSet -> wrap.getShort();
58+
case BitSetV2 -> wrap.getInt();
59+
default -> throw new InternalRuntimeError("Invalid state");
60+
};
4861
ByteBuffer slice = wrap.slice();
4962
Set<Long> incompletes = deserialiseBitSetToIncompletes(baseOffset, originalBitsetSize, slice);
5063
long highwaterMark = baseOffset + originalBitsetSize;

parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/OffsetEncoder.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,15 @@
11
package io.confluent.parallelconsumer;
22

33
import lombok.SneakyThrows;
4+
import lombok.extern.slf4j.Slf4j;
45

56
import java.io.IOException;
67
import java.nio.ByteBuffer;
78

9+
/**
10+
* Base OffsetEncoder
11+
*/
12+
@Slf4j
813
abstract class OffsetEncoder {
914

1015
private final OffsetSimultaneousEncoder offsetSimultaneousEncoder;
@@ -40,6 +45,7 @@ void register() throws EncodingNotSupportedException {
4045
}
4146

4247
private void register(final OffsetEncoding type, final byte[] bytes) {
48+
log.debug("Registering {}, with site {}", type, bytes.length);
4349
offsetSimultaneousEncoder.sortedEncodings.add(new EncodedOffsetPair(type, ByteBuffer.wrap(bytes)));
4450
offsetSimultaneousEncoder.encodingMap.put(type, bytes);
4551
}

parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/OffsetEncoding.java

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,34 @@
1313
import java.util.function.Function;
1414
import java.util.stream.Collectors;
1515

16+
import static io.confluent.parallelconsumer.OffsetEncoding.Version.v1;
17+
import static io.confluent.parallelconsumer.OffsetEncoding.Version.v2;
18+
1619
@ToString
1720
@RequiredArgsConstructor
1821
enum OffsetEncoding {
19-
ByteArray((byte) 'L'),
20-
ByteArrayCompressed((byte) 'î'),
21-
BitSet((byte) 'l'),
22-
BitSetCompressed((byte) 'a'),
23-
RunLength((byte) 'n'),
24-
RunLengthCompressed((byte) 'J');
22+
ByteArray(v1, (byte) 'L'),
23+
ByteArrayCompressed(v1, (byte) 'î'),
24+
BitSet(v1, (byte) 'l'),
25+
BitSetCompressed(v1, (byte) 'a'),
26+
RunLength(v1, (byte) 'n'),
27+
RunLengthCompressed(v1, (byte) 'J'),
28+
/**
29+
* switch from encoding bitset length as a short to an integer (length of 32,000 was reasonable too short)
30+
*/
31+
BitSetV2(v2, (byte) 'o'),
32+
BitSetV2Compressed(v2, (byte) 's'),
33+
/**
34+
* switch from encoding run lengths as Shorts to Integers
35+
*/
36+
RunLengthV2(v2, (byte) 'e'),
37+
RunLengthV2Compressed(v2, (byte) 'p');
38+
39+
enum Version {
40+
v1, v2
41+
}
42+
43+
public final Version version;
2544

2645
@Getter
2746
public final byte magicByte;
@@ -36,4 +55,8 @@ public static OffsetEncoding decode(byte magic) {
3655
return encoding;
3756
}
3857
}
58+
59+
public String description() {
60+
return name() + ":" + version;
61+
}
3962
}

parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/OffsetMapCodecManager.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -89,13 +89,13 @@ void loadOffsetMapForPartition(final Set<TopicPartition> assignment) {
8989
}
9090

9191
static ParallelConsumer.Tuple<Long, TreeSet<Long>> deserialiseIncompleteOffsetMapFromBase64(long finalBaseComittedOffsetForPartition, String incompleteOffsetMap) throws OffsetDecodingError {
92-
byte[] decode;
92+
byte[] decodedBytes;
9393
try {
94-
decode = OffsetSimpleSerialisation.decodeBase64(incompleteOffsetMap);
94+
decodedBytes = OffsetSimpleSerialisation.decodeBase64(incompleteOffsetMap);
9595
} catch (IllegalArgumentException a) {
9696
throw new OffsetDecodingError(msg("Error decoding offset metadata, input was: {}", incompleteOffsetMap), a);
9797
}
98-
ParallelConsumer.Tuple<Long, Set<Long>> incompleteOffsets = decodeCompressedOffsets(finalBaseComittedOffsetForPartition, decode);
98+
ParallelConsumer.Tuple<Long, Set<Long>> incompleteOffsets = decodeCompressedOffsets(finalBaseComittedOffsetForPartition, decodedBytes);
9999
TreeSet<Long> longs = new TreeSet<>(incompleteOffsets.getRight());
100100
return ParallelConsumer.Tuple.pairOf(incompleteOffsets.getLeft(), longs);
101101
}
@@ -147,13 +147,13 @@ byte[] encodeOffsetsCompressed(long finalOffsetForPartition, TopicPartition tp,
147147
*
148148
* @return Set of offsets which are not complete.
149149
*/
150-
static ParallelConsumer.Tuple<Long, Set<Long>> decodeCompressedOffsets(long finalOffsetForPartition, byte[] s) {
151-
if (s.length == 0) {
150+
static ParallelConsumer.Tuple<Long, Set<Long>> decodeCompressedOffsets(long finalOffsetForPartition, byte[] decodedBytes) {
151+
if (decodedBytes.length == 0) {
152152
// no offset bitmap data
153153
return ParallelConsumer.Tuple.pairOf(finalOffsetForPartition, UniSets.of());
154154
}
155155

156-
EncodedOffsetPair result = EncodedOffsetPair.unwrap(s);
156+
EncodedOffsetPair result = EncodedOffsetPair.unwrap(decodedBytes);
157157

158158
ParallelConsumer.Tuple<Long, Set<Long>> incompletesTuple = result.getDecodedIncompletes(finalOffsetForPartition);
159159

0 commit comments

Comments
 (0)