Skip to content

Commit 710de2d

Browse files
authored
Resume stream upload (#460)
* Adding the ChunkRecord interface * Adding more tests for stream resume * comments, assert conditions change * SliceRecord and comments
1 parent c7ac5a9 commit 710de2d

File tree

3 files changed

+125
-7
lines changed

3 files changed

+125
-7
lines changed

src/tasks/FileUploadTask/FileObjectClasses/StreamUpload.ts

Lines changed: 75 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,30 @@ import { Readable } from "stream";
33
import { GraphClientError } from "../../../GraphClientError";
44
import { FileObject, SliceType } from "../../LargeFileUploadTask";
55
import { Range } from "../Range";
6+
7+
/**
8+
* @interface
9+
* Interface to store slice of a stream and range of the slice.
10+
* @property {Buffer} fileSlice - The slice of the stream
11+
* @property {Range} range - The range of the slice
12+
*/
13+
interface SliceRecord {
14+
fileSlice: Buffer;
15+
range: Range;
16+
}
17+
18+
/**
19+
* @class
20+
* FileObject class for Readable Stream upload
21+
*/
622
export class StreamUpload implements FileObject<Readable> {
23+
/**
24+
* @private
25+
* Represents a cache of the last attempted upload slice.
26+
* This can be used when resuming a previously failed slice upload.
27+
*/
28+
private previousSlice: SliceRecord;
29+
730
public constructor(public content: Readable, public name: string, public size: number) {
831
if (!content || !name || !size) {
932
throw new GraphClientError("Please provide the Readable Stream content, name of the file and size of the file");
@@ -17,19 +40,68 @@ export class StreamUpload implements FileObject<Readable> {
1740
* @returns The sliced file part
1841
*/
1942
public async sliceFile(range: Range): Promise<SliceType> {
20-
const rangeSize = range.maxValue - range.minValue + 1;
43+
let rangeSize = range.maxValue - range.minValue + 1;
2144
/* readable.readable Is true if it is safe to call readable.read(),
2245
* which means the stream has not been destroyed or emitted 'error' or 'end'
2346
*/
47+
const bufs = [];
48+
49+
/**
50+
* The sliceFile reads the first `rangeSize` number of bytes from the stream.
51+
* The previousSlice property is used to seek the range of bytes in the previous slice.
52+
* Suppose, the sliceFile reads bytes from `10 - 20` from the stream but the upload of this slice fails.
53+
* When the user resumes, the stream will have bytes from position 21.
54+
* The previousSlice.Range is used to compare if the requested range is cached in the previousSlice property or present in the Readable Stream.
55+
*/
56+
if (this.previousSlice) {
57+
if (range.minValue < this.previousSlice.range.minValue) {
58+
throw new GraphClientError("An error occurred while uploading the stream. Please restart the stream upload from the first byte of the file.");
59+
}
60+
61+
if (range.minValue < this.previousSlice.range.maxValue) {
62+
const previousRangeMin = this.previousSlice.range.minValue;
63+
const previousRangeMax = this.previousSlice.range.maxValue;
64+
65+
// Check if the requested range is same as previously sliced range
66+
if (range.minValue === previousRangeMin && range.maxValue === previousRangeMax) {
67+
return this.previousSlice.fileSlice;
68+
}
69+
70+
/**
71+
* The following check considers a possibility
72+
* of an upload failing after some of the bytes of the previous slice
73+
* were successfully uploaded.
74+
* Example - Previous slice range - `10 - 20`. Current requested range is `15 - 20`.
75+
*/
76+
if (range.maxValue === previousRangeMax) {
77+
return this.previousSlice.fileSlice.slice(range.minValue, range.maxValue + 1);
78+
}
79+
80+
/**
81+
* If an upload fails after some of the bytes of the previous slice
82+
* were successfully uploaded and the new Range.Maximum is greater than the previous Range.Maximum
83+
* Example - Previous slice range - `10 - 20`. Current requested range is `15 - 25`,
84+
* then read the bytes from position 15 to 20 from previousSlice.fileSlice and read bytes from position 21 to 25 from the Readable Stream
85+
*/
86+
bufs.push(this.previousSlice.fileSlice.slice(range.minValue, previousRangeMax + 1));
87+
88+
rangeSize = range.maxValue - previousRangeMax;
89+
}
90+
}
91+
2492
if (this.content && this.content.readable) {
2593
if (this.content.readableLength >= rangeSize) {
26-
return this.content.read(rangeSize);
94+
bufs.push(this.content.read(rangeSize));
2795
} else {
28-
return await this.readNBytesFromStream(rangeSize);
96+
bufs.push(await this.readNBytesFromStream(rangeSize));
2997
}
3098
} else {
3199
throw new GraphClientError("Stream is not readable.");
32100
}
101+
const slicedChunk = Buffer.concat(bufs);
102+
this.previousSlice = { fileSlice: slicedChunk, range };
103+
104+
return slicedChunk;
33105
}
34106

35107
/**

src/tasks/LargeFileUploadTask.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ interface UploadStatusResponse {
4141
* @interface
4242
* Signature to define options for upload task
4343
* @property {number} [rangeSize = LargeFileUploadTask.DEFAULT_FILE_SIZE] - Specifies the range chunk size
44+
* @property {UploadEventHandlers} uploadEventHandlers - UploadEventHandlers attached to an upload task
4445
*/
4546
export interface LargeFileUploadTaskOptions {
4647
rangeSize?: number;

test/node/tasks/StreamUpload.ts

Lines changed: 49 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,12 @@ import * as fs from "fs";
1212

1313
import { StreamUpload } from "../../../src/tasks/FileUploadTask/FileObjectClasses/StreamUpload";
1414

15+
const fileName = "sample_image.jpg";
16+
const filePath = `./test/sample_files/${fileName}`;
17+
const stats = fs.statSync(`./test/sample_files/${fileName}`);
18+
const totalsize = stats.size;
19+
1520
describe("StreamUpload", () => {
16-
const fileName = "sample_image.jpg";
17-
const filePath = `./test/sample_files/${fileName}`;
18-
const stats = fs.statSync(`./test/sample_files/${fileName}`);
19-
const totalsize = stats.size;
2021
it("Stream size smaller than upload range size", async () => {
2122
const readStream = fs.createReadStream(`./test/sample_files/${fileName}`, { highWaterMark: 8 });
2223

@@ -51,3 +52,47 @@ describe("StreamUpload", () => {
5152
assert.equal(sliceSize, (slice as Buffer).length);
5253
});
5354
});
55+
56+
describe("Stream upload resume", () => {
57+
it("New range is equal to previous upload range", async () => {
58+
const readStream = fs.createReadStream(filePath, { highWaterMark: totalsize });
59+
const sliceSize = 20;
60+
61+
const upload = new StreamUpload(readStream, fileName, totalsize);
62+
63+
const slice = await upload.sliceFile({ minValue: 0, maxValue: sliceSize - 1 });
64+
const retrySlice = await upload.sliceFile({ minValue: 0, maxValue: sliceSize - 1 });
65+
assert.isDefined(slice);
66+
assert.isDefined(retrySlice);
67+
assert.equal(Buffer.compare(slice as Buffer, retrySlice as Buffer), 0);
68+
});
69+
70+
it("New Range.Minimum greater than previous Range.Minimum and new Range.Maximum is equal previous Range.Maximum", async () => {
71+
const readStream = fs.createReadStream(filePath, { highWaterMark: totalsize });
72+
const sliceSize = 20;
73+
74+
const upload = new StreamUpload(readStream, fileName, totalsize);
75+
const retryRangeMin = 15;
76+
const slice = await upload.sliceFile({ minValue: 0, maxValue: sliceSize - 1 });
77+
const retrySlice = await upload.sliceFile({ minValue: 15, maxValue: sliceSize - 1 });
78+
assert.isDefined(slice);
79+
assert.isDefined(retrySlice);
80+
assert.equal(sliceSize, (slice as Buffer).length);
81+
assert.equal(Buffer.compare(slice.slice(retryRangeMin, sliceSize) as Buffer, retrySlice as Buffer), 0);
82+
});
83+
84+
it("New Range.Minimum greater than previous Range.Minimum and new Range.Maximum is greater than previous Range.Maximum", async () => {
85+
const readStream = fs.createReadStream(filePath, { highWaterMark: totalsize });
86+
const sliceSize = 20;
87+
const retryRangeMin = 15;
88+
const retryRangeMax = 21;
89+
const upload = new StreamUpload(readStream, fileName, totalsize);
90+
91+
const slice = await upload.sliceFile({ minValue: 0, maxValue: sliceSize - 1 });
92+
const retrySlice = (await upload.sliceFile({ minValue: retryRangeMin, maxValue: retryRangeMax })) as Buffer;
93+
assert.isDefined(slice);
94+
assert.isDefined(retrySlice);
95+
assert.equal(retrySlice.length, retryRangeMax - retryRangeMin + 1);
96+
assert.equal(Buffer.compare(slice.slice(retryRangeMin, sliceSize) as Buffer, retrySlice.slice(0, sliceSize - retryRangeMin)), 0);
97+
});
98+
});

0 commit comments

Comments
 (0)