diff --git a/src/tasks/FileUploadTask/FileObjectClasses/StreamUpload.ts b/src/tasks/FileUploadTask/FileObjectClasses/StreamUpload.ts index eddc257ba..3204795d8 100644 --- a/src/tasks/FileUploadTask/FileObjectClasses/StreamUpload.ts +++ b/src/tasks/FileUploadTask/FileObjectClasses/StreamUpload.ts @@ -3,7 +3,30 @@ import { Readable } from "stream"; import { GraphClientError } from "../../../GraphClientError"; import { FileObject, SliceType } from "../../LargeFileUploadTask"; import { Range } from "../Range"; + +/** + * @interface + * Interface to store slice of a stream and range of the slice. + * @property {Buffer} fileSlice - The slice of the stream + * @property {Range} range - The range of the slice + */ +interface SliceRecord { + fileSlice: Buffer; + range: Range; +} + +/** + * @class + * FileObject class for Readable Stream upload + */ export class StreamUpload implements FileObject { + /** + * @private + * Represents a cache of the last attempted upload slice. + * This can be used when resuming a previously failed slice upload. + */ + private previousSlice: SliceRecord; + public constructor(public content: Readable, public name: string, public size: number) { if (!content || !name || !size) { throw new GraphClientError("Please provide the Readable Stream content, name of the file and size of the file"); @@ -17,19 +40,68 @@ export class StreamUpload implements FileObject { * @returns The sliced file part */ public async sliceFile(range: Range): Promise { - const rangeSize = range.maxValue - range.minValue + 1; + let rangeSize = range.maxValue - range.minValue + 1; /* readable.readable Is true if it is safe to call readable.read(), * which means the stream has not been destroyed or emitted 'error' or 'end' */ + const bufs = []; + + /** + * The sliceFile reads the first `rangeSize` number of bytes from the stream. + * The previousSlice property is used to seek the range of bytes in the previous slice. + * Suppose, the sliceFile reads bytes from `10 - 20` from the stream but the upload of this slice fails. + * When the user resumes, the stream will have bytes from position 21. + * The previousSlice.Range is used to compare if the requested range is cached in the previousSlice property or present in the Readable Stream. + */ + if (this.previousSlice) { + if (range.minValue < this.previousSlice.range.minValue) { + throw new GraphClientError("An error occurred while uploading the stream. Please restart the stream upload from the first byte of the file."); + } + + if (range.minValue < this.previousSlice.range.maxValue) { + const previousRangeMin = this.previousSlice.range.minValue; + const previousRangeMax = this.previousSlice.range.maxValue; + + // Check if the requested range is same as previously sliced range + if (range.minValue === previousRangeMin && range.maxValue === previousRangeMax) { + return this.previousSlice.fileSlice; + } + + /** + * The following check considers a possibility + * of an upload failing after some of the bytes of the previous slice + * were successfully uploaded. + * Example - Previous slice range - `10 - 20`. Current requested range is `15 - 20`. + */ + if (range.maxValue === previousRangeMax) { + return this.previousSlice.fileSlice.slice(range.minValue, range.maxValue + 1); + } + + /** + * If an upload fails after some of the bytes of the previous slice + * were successfully uploaded and the new Range.Maximum is greater than the previous Range.Maximum + * Example - Previous slice range - `10 - 20`. Current requested range is `15 - 25`, + * then read the bytes from position 15 to 20 from previousSlice.fileSlice and read bytes from position 21 to 25 from the Readable Stream + */ + bufs.push(this.previousSlice.fileSlice.slice(range.minValue, previousRangeMax + 1)); + + rangeSize = range.maxValue - previousRangeMax; + } + } + if (this.content && this.content.readable) { if (this.content.readableLength >= rangeSize) { - return this.content.read(rangeSize); + bufs.push(this.content.read(rangeSize)); } else { - return await this.readNBytesFromStream(rangeSize); + bufs.push(await this.readNBytesFromStream(rangeSize)); } } else { throw new GraphClientError("Stream is not readable."); } + const slicedChunk = Buffer.concat(bufs); + this.previousSlice = { fileSlice: slicedChunk, range }; + + return slicedChunk; } /** diff --git a/src/tasks/LargeFileUploadTask.ts b/src/tasks/LargeFileUploadTask.ts index f0e09fbdf..6299d3e9a 100644 --- a/src/tasks/LargeFileUploadTask.ts +++ b/src/tasks/LargeFileUploadTask.ts @@ -41,6 +41,7 @@ interface UploadStatusResponse { * @interface * Signature to define options for upload task * @property {number} [rangeSize = LargeFileUploadTask.DEFAULT_FILE_SIZE] - Specifies the range chunk size + * @property {UploadEventHandlers} uploadEventHandlers - UploadEventHandlers attached to an upload task */ export interface LargeFileUploadTaskOptions { rangeSize?: number; diff --git a/test/node/tasks/StreamUpload.ts b/test/node/tasks/StreamUpload.ts index cf869d649..21dc5aee2 100644 --- a/test/node/tasks/StreamUpload.ts +++ b/test/node/tasks/StreamUpload.ts @@ -12,11 +12,12 @@ import * as fs from "fs"; import { StreamUpload } from "../../../src/tasks/FileUploadTask/FileObjectClasses/StreamUpload"; +const fileName = "sample_image.jpg"; +const filePath = `./test/sample_files/${fileName}`; +const stats = fs.statSync(`./test/sample_files/${fileName}`); +const totalsize = stats.size; + describe("StreamUpload", () => { - const fileName = "sample_image.jpg"; - const filePath = `./test/sample_files/${fileName}`; - const stats = fs.statSync(`./test/sample_files/${fileName}`); - const totalsize = stats.size; it("Stream size smaller than upload range size", async () => { const readStream = fs.createReadStream(`./test/sample_files/${fileName}`, { highWaterMark: 8 }); @@ -51,3 +52,47 @@ describe("StreamUpload", () => { assert.equal(sliceSize, (slice as Buffer).length); }); }); + +describe("Stream upload resume", () => { + it("New range is equal to previous upload range", async () => { + const readStream = fs.createReadStream(filePath, { highWaterMark: totalsize }); + const sliceSize = 20; + + const upload = new StreamUpload(readStream, fileName, totalsize); + + const slice = await upload.sliceFile({ minValue: 0, maxValue: sliceSize - 1 }); + const retrySlice = await upload.sliceFile({ minValue: 0, maxValue: sliceSize - 1 }); + assert.isDefined(slice); + assert.isDefined(retrySlice); + assert.equal(Buffer.compare(slice as Buffer, retrySlice as Buffer), 0); + }); + + it("New Range.Minimum greater than previous Range.Minimum and new Range.Maximum is equal previous Range.Maximum", async () => { + const readStream = fs.createReadStream(filePath, { highWaterMark: totalsize }); + const sliceSize = 20; + + const upload = new StreamUpload(readStream, fileName, totalsize); + const retryRangeMin = 15; + const slice = await upload.sliceFile({ minValue: 0, maxValue: sliceSize - 1 }); + const retrySlice = await upload.sliceFile({ minValue: 15, maxValue: sliceSize - 1 }); + assert.isDefined(slice); + assert.isDefined(retrySlice); + assert.equal(sliceSize, (slice as Buffer).length); + assert.equal(Buffer.compare(slice.slice(retryRangeMin, sliceSize) as Buffer, retrySlice as Buffer), 0); + }); + + it("New Range.Minimum greater than previous Range.Minimum and new Range.Maximum is greater than previous Range.Maximum", async () => { + const readStream = fs.createReadStream(filePath, { highWaterMark: totalsize }); + const sliceSize = 20; + const retryRangeMin = 15; + const retryRangeMax = 21; + const upload = new StreamUpload(readStream, fileName, totalsize); + + const slice = await upload.sliceFile({ minValue: 0, maxValue: sliceSize - 1 }); + const retrySlice = (await upload.sliceFile({ minValue: retryRangeMin, maxValue: retryRangeMax })) as Buffer; + assert.isDefined(slice); + assert.isDefined(retrySlice); + assert.equal(retrySlice.length, retryRangeMax - retryRangeMin + 1); + assert.equal(Buffer.compare(slice.slice(retryRangeMin, sliceSize) as Buffer, retrySlice.slice(0, sliceSize - retryRangeMin)), 0); + }); +});