Skip to content

Small changes in concurrent multipart upload interfaces #128977

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.CheckedBiFunction;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
Expand Down Expand Up @@ -116,7 +115,7 @@ public void writeBlobAtomic(
OperationPurpose purpose,
String blobName,
long blobSize,
CheckedBiFunction<Long, Long, InputStream, IOException> provider,
BlobMultiPartInputStreamProvider provider,
boolean failIfAlreadyExists
) throws IOException {
blobStore.writeBlobAtomic(purpose, buildKey(blobName), blobSize, provider, failIfAlreadyExists);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.core.util.Throwables;
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
import org.elasticsearch.common.CheckedBiFunction;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobPath;
Expand Down Expand Up @@ -477,7 +476,7 @@ void writeBlobAtomic(
final OperationPurpose purpose,
final String blobName,
final long blobSize,
final CheckedBiFunction<Long, Long, InputStream, IOException> provider,
final BlobContainer.BlobMultiPartInputStreamProvider provider,
final boolean failIfAlreadyExists
) throws IOException {
try {
Expand Down Expand Up @@ -559,7 +558,7 @@ private static Mono<String> stageBlock(
BlockBlobAsyncClient asyncClient,
String blobName,
MultiPart multiPart,
CheckedBiFunction<Long, Long, InputStream, IOException> provider
BlobContainer.BlobMultiPartInputStreamProvider provider
) {
logger.debug(
"{}: staging part [{}] of size [{}] from offset [{}]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
package org.elasticsearch.common.blobstore;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.CheckedBiFunction;
import org.elasticsearch.common.blobstore.support.BlobMetadata;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -153,11 +152,42 @@ default boolean supportsConcurrentMultipartUploads() {
return false;
}

/**
* Provides an {@link InputStream} to read a part of the blob content.
*/
interface BlobMultiPartInputStreamProvider {
/**
* Provides an {@link InputStream} to read a part of the blob content.
*
* @param offset the offset in the blob content to start reading bytes from
* @param length the number of bytes to read
* @return an {@link InputStream} to read a part of the blob content.
* @throws IOException if something goes wrong opening the input stream
*/
InputStream apply(long offset, long length) throws IOException;
}

/**
* Reads the blob's content by calling an input stream provider multiple times, in order to split the blob's content into multiple
* parts that can be written to the container concurrently before being assembled into the final blob, using an atomic write operation
* if the implementation supports it. The number and the size of the parts depends of the implementation.
*
* Note: the method {link {@link #supportsConcurrentMultipartUploads()}} must be checked before calling this method.
*
* @param purpose The purpose of the operation
* @param blobName The name of the blob to write the contents of the input stream to.
* @param provider The input stream provider that is used to read the blob content
* @param blobSize The size of the blob to be written, in bytes. Must be the amount of bytes in the input stream. It is
* implementation dependent whether this value is used in writing the blob to the repository.
* @param failIfAlreadyExists whether to throw a FileAlreadyExistsException if the given blob already exists
* @throws FileAlreadyExistsException if failIfAlreadyExists is true and a blob by the same name already exists
* @throws IOException if the input stream could not be read, or the target blob could not be written to.
*/
default void writeBlobAtomic(
OperationPurpose purpose,
String blobName,
long blobSize,
CheckedBiFunction<Long, Long, InputStream, IOException> provider,
BlobMultiPartInputStreamProvider provider,
boolean failIfAlreadyExists
) throws IOException {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ public FsBlobContainer(FsBlobStore blobStore, BlobPath blobPath, Path path) {
this.path = path;
}

public Path getPath() {
return path;
}

@Override
public Map<String, BlobMetadata> listBlobs(OperationPurpose purpose) throws IOException {
return listBlobsByPrefix(purpose, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,22 @@ public void writeMetadataBlob(
delegate.writeMetadataBlob(purpose, blobName, failIfAlreadyExists, atomic, writer);
}

@Override
public boolean supportsConcurrentMultipartUploads() {
return delegate.supportsConcurrentMultipartUploads();
}

@Override
public void writeBlobAtomic(
OperationPurpose purpose,
String blobName,
long blobSize,
BlobMultiPartInputStreamProvider provider,
boolean failIfAlreadyExists
) throws IOException {
delegate.writeBlobAtomic(purpose, blobName, blobSize, provider, failIfAlreadyExists);
}

@Override
public void writeBlobAtomic(
OperationPurpose purpose,
Expand Down