Skip to content

Add safer conversion from RecyclerBytesStreamOutput to ReleasableBytesReference #127404

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -820,7 +819,7 @@ void onShardDone() {
out.close();
}
}
ActionListener.respondAndRelease(channelListener, new BytesTransportResponse(new ReleasableBytesReference(out.bytes(), out)));
ActionListener.respondAndRelease(channelListener, new BytesTransportResponse(out.moveToBytesReference()));
}

private void maybeFreeContext(SearchPhaseResult result, BitSet relevantShardIndices) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,9 +391,7 @@ private ReleasableBytesReference maybeSerializeClusterState(
}
assert clusterState.nodes().isLocalNodeElectedMaster();

final var bytesStream = transportService.newNetworkBytesStream();
var success = false;
try {
try (var bytesStream = transportService.newNetworkBytesStream()) {
try (
var stream = new OutputStreamStreamOutput(
CompressorFactory.COMPRESSOR.threadLocalOutputStream(Streams.flushOnCloseStream(bytesStream))
Expand All @@ -404,22 +402,16 @@ private ReleasableBytesReference maybeSerializeClusterState(
} catch (IOException e) {
throw new ElasticsearchException("failed to serialize cluster state for publishing to node {}", e, discoveryNode);
}
final var newBytes = new ReleasableBytesReference(bytesStream.bytes(), bytesStream);
logger.trace(
"serialized join validation cluster state version [{}] for transport version [{}] with size [{}]",
clusterState.version(),
version,
newBytes.length()
bytesStream.position()
);
var newBytes = bytesStream.moveToBytesReference();
final var previousBytes = statesByVersion.put(version, newBytes);
assert previousBytes == null;
success = true;
return newBytes;
} finally {
if (success == false) {
bytesStream.close();
assert false;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -262,9 +262,7 @@ public PublicationContext newPublicationContext(ClusterStatePublicationEvent clu
}

private ReleasableBytesReference serializeFullClusterState(ClusterState clusterState, DiscoveryNode node, TransportVersion version) {
final RecyclerBytesStreamOutput bytesStream = transportService.newNetworkBytesStream();
boolean success = false;
try {
try (RecyclerBytesStreamOutput bytesStream = transportService.newNetworkBytesStream()) {
final long uncompressedBytes;
try (
StreamOutput stream = new PositionTrackingOutputStreamStreamOutput(
Expand All @@ -278,20 +276,15 @@ private ReleasableBytesReference serializeFullClusterState(ClusterState clusterS
} catch (IOException e) {
throw new ElasticsearchException("failed to serialize cluster state for publishing to node {}", e, node);
}
final ReleasableBytesReference result = new ReleasableBytesReference(bytesStream.bytes(), bytesStream);
serializationStatsTracker.serializedFullState(uncompressedBytes, result.length());
final int size = bytesStream.size();
serializationStatsTracker.serializedFullState(uncompressedBytes, size);
logger.trace(
"serialized full cluster state version [{}] using transport version [{}] with size [{}]",
clusterState.version(),
version,
result.length()
size
);
success = true;
return result;
} finally {
if (success == false) {
bytesStream.close();
}
return bytesStream.moveToBytesReference();
}
}

Expand All @@ -302,9 +295,7 @@ private ReleasableBytesReference serializeDiffClusterState(
TransportVersion version
) {
final long clusterStateVersion = newState.version();
final RecyclerBytesStreamOutput bytesStream = transportService.newNetworkBytesStream();
boolean success = false;
try {
try (RecyclerBytesStreamOutput bytesStream = transportService.newNetworkBytesStream()) {
final long uncompressedBytes;
try (
StreamOutput stream = new PositionTrackingOutputStreamStreamOutput(
Expand All @@ -322,20 +313,15 @@ private ReleasableBytesReference serializeDiffClusterState(
} catch (IOException e) {
throw new ElasticsearchException("failed to serialize cluster state diff for publishing to node {}", e, node);
}
final ReleasableBytesReference result = new ReleasableBytesReference(bytesStream.bytes(), bytesStream);
serializationStatsTracker.serializedDiff(uncompressedBytes, result.length());
final int size = bytesStream.size();
serializationStatsTracker.serializedDiff(uncompressedBytes, size);
logger.trace(
"serialized cluster state diff for version [{}] using transport version [{}] with size [{}]",
clusterStateVersion,
version,
result.length()
size
);
success = true;
return result;
} finally {
if (success == false) {
bytesStream.close();
}
return bytesStream.moveToBytesReference();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.bytes.CompositeBytesReference;
import org.elasticsearch.common.bytes.ReleasableBytesReference;
import org.elasticsearch.common.recycler.Recycler;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Releasables;
Expand All @@ -37,7 +38,7 @@ public class RecyclerBytesStreamOutput extends BytesStream implements Releasable
static final VarHandle VH_BE_LONG = MethodHandles.byteArrayViewVarHandle(long[].class, ByteOrder.BIG_ENDIAN);
static final VarHandle VH_LE_LONG = MethodHandles.byteArrayViewVarHandle(long[].class, ByteOrder.LITTLE_ENDIAN);

private final ArrayList<Recycler.V<BytesRef>> pages = new ArrayList<>();
private ArrayList<Recycler.V<BytesRef>> pages = new ArrayList<>();
private final Recycler<BytesRef> recycler;
private final int pageSize;
private int pageIndex = -1;
Expand Down Expand Up @@ -237,13 +238,26 @@ public void skip(int length) {

@Override
public void close() {
try {
var pages = this.pages;
if (pages != null) {
this.pages = null;
Releasables.close(pages);
} finally {
pages.clear();
}
}

/**
* Move the contents written to this stream to a {@link ReleasableBytesReference}. Closing this instance becomes a noop after
* this method returns successfully and its buffers need to be released by releasing the returned bytes reference.
*
* @return a {@link ReleasableBytesReference} that must be released once no longer needed
*/
public ReleasableBytesReference moveToBytesReference() {
var bytes = bytes();
var pages = this.pages;
this.pages = null;
return new ReleasableBytesReference(bytes, () -> Releasables.close(pages));
}

/**
* Returns the current size of the buffer.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,7 @@ public ReleasableBytesReference encodeChunk(int sizeHint, Recycler<BytesRef> rec
if (serialization.hasNext() == false) {
builder.close();
}
final var result = new ReleasableBytesReference(
chunkStream.bytes(),
() -> Releasables.closeExpectNoException(chunkStream)
);
final var result = chunkStream.moveToBytesReference();
target = null;
return result;
} catch (Exception e) {
Expand Down