Skip to content

Resume stream upload #460

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jun 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 75 additions & 3 deletions src/tasks/FileUploadTask/FileObjectClasses/StreamUpload.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,30 @@ import { Readable } from "stream";
import { GraphClientError } from "../../../GraphClientError";
import { FileObject, SliceType } from "../../LargeFileUploadTask";
import { Range } from "../Range";

/**
* @interface
* Interface to store slice of a stream and range of the slice.
* @property {Buffer} fileSlice - The slice of the stream
* @property {Range} range - The range of the slice
*/
interface SliceRecord {
fileSlice: Buffer;
range: Range;
}

/**
* @class
* FileObject class for Readable Stream upload
*/
export class StreamUpload implements FileObject<Readable> {
/**
* @private
* Represents a cache of the last attempted upload slice.
* This can be used when resuming a previously failed slice upload.
*/
private previousSlice: SliceRecord;

public constructor(public content: Readable, public name: string, public size: number) {
if (!content || !name || !size) {
throw new GraphClientError("Please provide the Readable Stream content, name of the file and size of the file");
Expand All @@ -17,19 +40,68 @@ export class StreamUpload implements FileObject<Readable> {
* @returns The sliced file part
*/
public async sliceFile(range: Range): Promise<SliceType> {
const rangeSize = range.maxValue - range.minValue + 1;
let rangeSize = range.maxValue - range.minValue + 1;
/* readable.readable Is true if it is safe to call readable.read(),
* which means the stream has not been destroyed or emitted 'error' or 'end'
*/
const bufs = [];

/**
* The sliceFile reads the first `rangeSize` number of bytes from the stream.
* The previousSlice property is used to seek the range of bytes in the previous slice.
* Suppose, the sliceFile reads bytes from `10 - 20` from the stream but the upload of this slice fails.
* When the user resumes, the stream will have bytes from position 21.
* The previousSlice.Range is used to compare if the requested range is cached in the previousSlice property or present in the Readable Stream.
*/
if (this.previousSlice) {
if (range.minValue < this.previousSlice.range.minValue) {
throw new GraphClientError("An error occurred while uploading the stream. Please restart the stream upload from the first byte of the file.");
}

if (range.minValue < this.previousSlice.range.maxValue) {
const previousRangeMin = this.previousSlice.range.minValue;
const previousRangeMax = this.previousSlice.range.maxValue;

// Check if the requested range is same as previously sliced range
if (range.minValue === previousRangeMin && range.maxValue === previousRangeMax) {
return this.previousSlice.fileSlice;
}

/**
* The following check considers a possibility
* of an upload failing after some of the bytes of the previous slice
* were successfully uploaded.
* Example - Previous slice range - `10 - 20`. Current requested range is `15 - 20`.
*/
if (range.maxValue === previousRangeMax) {
return this.previousSlice.fileSlice.slice(range.minValue, range.maxValue + 1);
}

/**
* If an upload fails after some of the bytes of the previous slice
* were successfully uploaded and the new Range.Maximum is greater than the previous Range.Maximum
* Example - Previous slice range - `10 - 20`. Current requested range is `15 - 25`,
* then read the bytes from position 15 to 20 from previousSlice.fileSlice and read bytes from position 21 to 25 from the Readable Stream
*/
bufs.push(this.previousSlice.fileSlice.slice(range.minValue, previousRangeMax + 1));

rangeSize = range.maxValue - previousRangeMax;
}
}

if (this.content && this.content.readable) {
if (this.content.readableLength >= rangeSize) {
return this.content.read(rangeSize);
bufs.push(this.content.read(rangeSize));
} else {
return await this.readNBytesFromStream(rangeSize);
bufs.push(await this.readNBytesFromStream(rangeSize));
}
} else {
throw new GraphClientError("Stream is not readable.");
}
const slicedChunk = Buffer.concat(bufs);
this.previousSlice = { fileSlice: slicedChunk, range };

return slicedChunk;
}

/**
Expand Down
1 change: 1 addition & 0 deletions src/tasks/LargeFileUploadTask.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ interface UploadStatusResponse {
* @interface
* Signature to define options for upload task
* @property {number} [rangeSize = LargeFileUploadTask.DEFAULT_FILE_SIZE] - Specifies the range chunk size
* @property {UploadEventHandlers} uploadEventHandlers - UploadEventHandlers attached to an upload task
*/
export interface LargeFileUploadTaskOptions {
rangeSize?: number;
Expand Down
53 changes: 49 additions & 4 deletions test/node/tasks/StreamUpload.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ import * as fs from "fs";

import { StreamUpload } from "../../../src/tasks/FileUploadTask/FileObjectClasses/StreamUpload";

const fileName = "sample_image.jpg";
const filePath = `./test/sample_files/${fileName}`;
const stats = fs.statSync(`./test/sample_files/${fileName}`);
const totalsize = stats.size;

describe("StreamUpload", () => {
const fileName = "sample_image.jpg";
const filePath = `./test/sample_files/${fileName}`;
const stats = fs.statSync(`./test/sample_files/${fileName}`);
const totalsize = stats.size;
it("Stream size smaller than upload range size", async () => {
const readStream = fs.createReadStream(`./test/sample_files/${fileName}`, { highWaterMark: 8 });

Expand Down Expand Up @@ -51,3 +52,47 @@ describe("StreamUpload", () => {
assert.equal(sliceSize, (slice as Buffer).length);
});
});

describe("Stream upload resume", () => {
it("New range is equal to previous upload range", async () => {
const readStream = fs.createReadStream(filePath, { highWaterMark: totalsize });
const sliceSize = 20;

const upload = new StreamUpload(readStream, fileName, totalsize);

const slice = await upload.sliceFile({ minValue: 0, maxValue: sliceSize - 1 });
const retrySlice = await upload.sliceFile({ minValue: 0, maxValue: sliceSize - 1 });
assert.isDefined(slice);
assert.isDefined(retrySlice);
assert.equal(Buffer.compare(slice as Buffer, retrySlice as Buffer), 0);
});

it("New Range.Minimum greater than previous Range.Minimum and new Range.Maximum is equal previous Range.Maximum", async () => {
const readStream = fs.createReadStream(filePath, { highWaterMark: totalsize });
const sliceSize = 20;

const upload = new StreamUpload(readStream, fileName, totalsize);
const retryRangeMin = 15;
const slice = await upload.sliceFile({ minValue: 0, maxValue: sliceSize - 1 });
const retrySlice = await upload.sliceFile({ minValue: 15, maxValue: sliceSize - 1 });
assert.isDefined(slice);
assert.isDefined(retrySlice);
assert.equal(sliceSize, (slice as Buffer).length);
assert.equal(Buffer.compare(slice.slice(retryRangeMin, sliceSize) as Buffer, retrySlice as Buffer), 0);
});

it("New Range.Minimum greater than previous Range.Minimum and new Range.Maximum is greater than previous Range.Maximum", async () => {
const readStream = fs.createReadStream(filePath, { highWaterMark: totalsize });
const sliceSize = 20;
const retryRangeMin = 15;
const retryRangeMax = 21;
const upload = new StreamUpload(readStream, fileName, totalsize);

const slice = await upload.sliceFile({ minValue: 0, maxValue: sliceSize - 1 });
const retrySlice = (await upload.sliceFile({ minValue: retryRangeMin, maxValue: retryRangeMax })) as Buffer;
assert.isDefined(slice);
assert.isDefined(retrySlice);
assert.equal(retrySlice.length, retryRangeMax - retryRangeMin + 1);
assert.equal(Buffer.compare(slice.slice(retryRangeMin, sliceSize) as Buffer, retrySlice.slice(0, sliceSize - retryRangeMin)), 0);
});
});