From 57b1b90fd473ebca6cd50f6c70fd560fed712b42 Mon Sep 17 00:00:00 2001 From: Armin Date: Fri, 25 Apr 2025 17:59:49 +0200 Subject: [PATCH] Add safer conversion from RecyclerBytesStreamOutput to ReleasableBytesReference We have a couple of places in the codebase where we do the transition from the stream to the reference. We can save some code and make this a little less error-prone by having a conversion method with move-style semantics and enabling the use of try-with-resources. Also, this enables a couple of optimizations down the line and unlinking the list of pages and moving it to the reference instead of nulling it out is a bit nicer to the CPU caches also. --- .../SearchQueryThenFetchAsyncAction.java | 3 +- .../coordination/JoinValidationService.java | 14 ++------ .../PublicationTransportHandler.java | 34 ++++++------------- .../io/stream/RecyclerBytesStreamOutput.java | 22 +++++++++--- .../rest/ChunkedRestResponseBodyPart.java | 5 +-- 5 files changed, 33 insertions(+), 45 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index 39e1c30f658d8..9aee331bb106b 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -23,7 +23,6 @@ import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.internal.Client; import org.elasticsearch.cluster.ClusterState; -import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.RecyclerBytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; @@ -820,7 +819,7 @@ void onShardDone() { out.close(); } } - ActionListener.respondAndRelease(channelListener, new BytesTransportResponse(new ReleasableBytesReference(out.bytes(), out))); + ActionListener.respondAndRelease(channelListener, new BytesTransportResponse(out.moveToBytesReference())); } private void maybeFreeContext(SearchPhaseResult result, BitSet relevantShardIndices) { diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinValidationService.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinValidationService.java index 32f57d1cbd1db..79e1654aae758 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinValidationService.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinValidationService.java @@ -391,9 +391,7 @@ private ReleasableBytesReference maybeSerializeClusterState( } assert clusterState.nodes().isLocalNodeElectedMaster(); - final var bytesStream = transportService.newNetworkBytesStream(); - var success = false; - try { + try (var bytesStream = transportService.newNetworkBytesStream()) { try ( var stream = new OutputStreamStreamOutput( CompressorFactory.COMPRESSOR.threadLocalOutputStream(Streams.flushOnCloseStream(bytesStream)) @@ -404,22 +402,16 @@ private ReleasableBytesReference maybeSerializeClusterState( } catch (IOException e) { throw new ElasticsearchException("failed to serialize cluster state for publishing to node {}", e, discoveryNode); } - final var newBytes = new ReleasableBytesReference(bytesStream.bytes(), bytesStream); logger.trace( "serialized join validation cluster state version [{}] for transport version [{}] with size [{}]", clusterState.version(), version, - newBytes.length() + bytesStream.position() ); + var newBytes = bytesStream.moveToBytesReference(); final var previousBytes = statesByVersion.put(version, newBytes); assert previousBytes == null; - success = true; return newBytes; - } finally { - if (success == false) { - bytesStream.close(); - assert false; - } } } } diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java b/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java index af3fdc317c8a7..1d4ab1346ef36 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/PublicationTransportHandler.java @@ -262,9 +262,7 @@ public PublicationContext newPublicationContext(ClusterStatePublicationEvent clu } private ReleasableBytesReference serializeFullClusterState(ClusterState clusterState, DiscoveryNode node, TransportVersion version) { - final RecyclerBytesStreamOutput bytesStream = transportService.newNetworkBytesStream(); - boolean success = false; - try { + try (RecyclerBytesStreamOutput bytesStream = transportService.newNetworkBytesStream()) { final long uncompressedBytes; try ( StreamOutput stream = new PositionTrackingOutputStreamStreamOutput( @@ -278,20 +276,15 @@ private ReleasableBytesReference serializeFullClusterState(ClusterState clusterS } catch (IOException e) { throw new ElasticsearchException("failed to serialize cluster state for publishing to node {}", e, node); } - final ReleasableBytesReference result = new ReleasableBytesReference(bytesStream.bytes(), bytesStream); - serializationStatsTracker.serializedFullState(uncompressedBytes, result.length()); + final int size = bytesStream.size(); + serializationStatsTracker.serializedFullState(uncompressedBytes, size); logger.trace( "serialized full cluster state version [{}] using transport version [{}] with size [{}]", clusterState.version(), version, - result.length() + size ); - success = true; - return result; - } finally { - if (success == false) { - bytesStream.close(); - } + return bytesStream.moveToBytesReference(); } } @@ -302,9 +295,7 @@ private ReleasableBytesReference serializeDiffClusterState( TransportVersion version ) { final long clusterStateVersion = newState.version(); - final RecyclerBytesStreamOutput bytesStream = transportService.newNetworkBytesStream(); - boolean success = false; - try { + try (RecyclerBytesStreamOutput bytesStream = transportService.newNetworkBytesStream()) { final long uncompressedBytes; try ( StreamOutput stream = new PositionTrackingOutputStreamStreamOutput( @@ -322,20 +313,15 @@ private ReleasableBytesReference serializeDiffClusterState( } catch (IOException e) { throw new ElasticsearchException("failed to serialize cluster state diff for publishing to node {}", e, node); } - final ReleasableBytesReference result = new ReleasableBytesReference(bytesStream.bytes(), bytesStream); - serializationStatsTracker.serializedDiff(uncompressedBytes, result.length()); + final int size = bytesStream.size(); + serializationStatsTracker.serializedDiff(uncompressedBytes, size); logger.trace( "serialized cluster state diff for version [{}] using transport version [{}] with size [{}]", clusterStateVersion, version, - result.length() + size ); - success = true; - return result; - } finally { - if (success == false) { - bytesStream.close(); - } + return bytesStream.moveToBytesReference(); } } diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/RecyclerBytesStreamOutput.java b/server/src/main/java/org/elasticsearch/common/io/stream/RecyclerBytesStreamOutput.java index ddf97a5151690..1dabad2d62e4c 100644 --- a/server/src/main/java/org/elasticsearch/common/io/stream/RecyclerBytesStreamOutput.java +++ b/server/src/main/java/org/elasticsearch/common/io/stream/RecyclerBytesStreamOutput.java @@ -13,6 +13,7 @@ import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.bytes.CompositeBytesReference; +import org.elasticsearch.common.bytes.ReleasableBytesReference; import org.elasticsearch.common.recycler.Recycler; import org.elasticsearch.core.Releasable; import org.elasticsearch.core.Releasables; @@ -37,7 +38,7 @@ public class RecyclerBytesStreamOutput extends BytesStream implements Releasable static final VarHandle VH_BE_LONG = MethodHandles.byteArrayViewVarHandle(long[].class, ByteOrder.BIG_ENDIAN); static final VarHandle VH_LE_LONG = MethodHandles.byteArrayViewVarHandle(long[].class, ByteOrder.LITTLE_ENDIAN); - private final ArrayList> pages = new ArrayList<>(); + private ArrayList> pages = new ArrayList<>(); private final Recycler recycler; private final int pageSize; private int pageIndex = -1; @@ -237,13 +238,26 @@ public void skip(int length) { @Override public void close() { - try { + var pages = this.pages; + if (pages != null) { + this.pages = null; Releasables.close(pages); - } finally { - pages.clear(); } } + /** + * Move the contents written to this stream to a {@link ReleasableBytesReference}. Closing this instance becomes a noop after + * this method returns successfully and its buffers need to be released by releasing the returned bytes reference. + * + * @return a {@link ReleasableBytesReference} that must be released once no longer needed + */ + public ReleasableBytesReference moveToBytesReference() { + var bytes = bytes(); + var pages = this.pages; + this.pages = null; + return new ReleasableBytesReference(bytes, () -> Releasables.close(pages)); + } + /** * Returns the current size of the buffer. * diff --git a/server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBodyPart.java b/server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBodyPart.java index 694af7e1606cb..89e73673cacc0 100644 --- a/server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBodyPart.java +++ b/server/src/main/java/org/elasticsearch/rest/ChunkedRestResponseBodyPart.java @@ -166,10 +166,7 @@ public ReleasableBytesReference encodeChunk(int sizeHint, Recycler rec if (serialization.hasNext() == false) { builder.close(); } - final var result = new ReleasableBytesReference( - chunkStream.bytes(), - () -> Releasables.closeExpectNoException(chunkStream) - ); + final var result = chunkStream.moveToBytesReference(); target = null; return result; } catch (Exception e) {