Skip to content

Commit f57a0e4

Browse files
jasnelldanielleadams
authored andcommitted
stream: utility consumers for web and node.js streams
Signed-off-by: James M Snell <[email protected]> PR-URL: #39594 Reviewed-By: Matteo Collina <[email protected]>
1 parent 4700f1e commit f57a0e4

File tree

3 files changed

+442
-0
lines changed

3 files changed

+442
-0
lines changed

doc/api/webstreams.md

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1219,5 +1219,129 @@ added: v16.6.0
12191219
12201220
* Type: {WritableStream}
12211221
1222+
### Class: `CompressionStream`
1223+
<!-- YAML
1224+
added: REPLACEME
1225+
-->
1226+
#### `new CompressionStream(format)`
1227+
<!-- YAML
1228+
added: REPLACEME
1229+
-->
1230+
1231+
* `format` {string} One of either `'deflate'` or `'gzip'`.
1232+
1233+
#### `compressionStream.readable`
1234+
<!-- YAML
1235+
added: REPLACEME
1236+
-->
1237+
1238+
* Type: {ReadableStream}
1239+
1240+
#### `compressionStream.writable`
1241+
<!-- YAML
1242+
added: REPLACEME
1243+
-->
1244+
1245+
* Type: {WritableStream}
1246+
1247+
### Class: `DecompressionStream`
1248+
<!-- YAML
1249+
added: REPLACEME
1250+
-->
1251+
1252+
#### `new DecompressionStream(format)`
1253+
<!-- YAML
1254+
added: REPLACEME
1255+
-->
1256+
1257+
* `format` {string} One of either `'deflate'` or `'gzip'`.
1258+
1259+
#### `decompressionStream.readable`
1260+
<!-- YAML
1261+
added: REPLACEME
1262+
-->
1263+
1264+
* Type: {ReadableStream}
1265+
1266+
#### `deccompressionStream.writable`
1267+
<!-- YAML
1268+
added: REPLACEME
1269+
-->
1270+
1271+
* Type: {WritableStream}
1272+
1273+
### Utility Consumers
1274+
<!-- YAML
1275+
added: REPLACEME
1276+
-->
1277+
1278+
The utility consumer functions provide common options for consuming
1279+
streams.
1280+
1281+
They are accessed using:
1282+
1283+
```mjs
1284+
import {
1285+
arrayBuffer,
1286+
blob,
1287+
json,
1288+
text,
1289+
} from 'node:stream/consumers';
1290+
```
1291+
1292+
```cjs
1293+
const {
1294+
arrayBuffer,
1295+
blob,
1296+
json,
1297+
text,
1298+
} = require('stream/consumers');
1299+
```
1300+
1301+
#### `streamConsumers.arrayBuffer(stream)`
1302+
<!-- YAML
1303+
added: REPLACEME
1304+
-->
1305+
1306+
* `stream` {ReadableStream|stream.Readable|AsyncIterator}
1307+
* Returns: {Promise} Fulfills with an `ArrayBuffer` containing the full
1308+
contents of the stream.
1309+
1310+
#### `streamConsumers.blob(stream)`
1311+
<!-- YAML
1312+
added: REPLACEME
1313+
-->
1314+
1315+
* `stream` {ReadableStream|stream.Readable|AsyncIterator}
1316+
* Returns: {Promise} Fulfills with a {Blob} containing the full contents
1317+
of the stream.
1318+
1319+
#### `streamConsumers.buffer(stream)`
1320+
<!-- YAML
1321+
added: REPLACEME
1322+
-->
1323+
1324+
* `stream` {ReadableStream|stream.Readable|AsyncIterator}
1325+
* Returns: {Promise} Fulfills with a {Buffer} containing the full
1326+
contents of the stream.
1327+
1328+
#### `streamConsumers.json(stream)`
1329+
<!-- YAML
1330+
added: REPLACEME
1331+
-->
1332+
1333+
* `stream` {ReadableStream|stream.Readable|AsyncIterator}
1334+
* Returns: {Promise} Fulfills with the contents of the stream parsed as a
1335+
UTF-8 encoded string that is then passed through `JSON.parse()`.
1336+
1337+
#### `streamConsumers.text(stream)`
1338+
<!-- YAML
1339+
added: REPLACEME
1340+
-->
1341+
1342+
* `stream` {ReadableStream|stream.Readable|AsyncIterator}
1343+
* Returns: {Promise} Fulfills with the contents of the stream parsed as a
1344+
UTF-8 encoded string.
1345+
12221346
[Streams]: stream.md
12231347
[WHATWG Streams Standard]: https://streams.spec.whatwg.org/

lib/stream/consumers.js

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
'use strict';
2+
3+
const {
4+
JSONParse,
5+
} = primordials;
6+
7+
const {
8+
TextDecoder,
9+
} = require('internal/encoding');
10+
11+
const {
12+
Blob,
13+
} = require('internal/blob');
14+
15+
const {
16+
Buffer,
17+
} = require('buffer');
18+
19+
/**
20+
* @typedef {import('../internal/webstreams/readablestream').ReadableStream
21+
* } ReadableStream
22+
* @typedef {import('../internal/streams/readable')} Readable
23+
*/
24+
25+
/**
26+
* @param {AsyncIterable|ReadableStream|Readable} stream
27+
* @returns {Promise<Blob>}
28+
*/
29+
async function blob(stream) {
30+
const chunks = [];
31+
for await (const chunk of stream)
32+
chunks.push(chunk);
33+
return new Blob(chunks);
34+
}
35+
36+
/**
37+
* @param {AsyncIterable|ReadableStream|Readable} stream
38+
* @returns {Promise<ArrayBuffer>}
39+
*/
40+
async function arrayBuffer(stream) {
41+
const ret = await blob(stream);
42+
return ret.arrayBuffer();
43+
}
44+
45+
/**
46+
* @param {AsyncIterable|ReadableStream|Readable} stream
47+
* @returns {Promise<Buffer>}
48+
*/
49+
async function buffer(stream) {
50+
return Buffer.from(await arrayBuffer(stream));
51+
}
52+
53+
/**
54+
* @param {AsyncIterable|ReadableStream|Readable} stream
55+
* @returns {Promise<string>}
56+
*/
57+
async function text(stream) {
58+
const dec = new TextDecoder();
59+
let str = '';
60+
for await (const chunk of stream) {
61+
if (typeof chunk === 'string')
62+
str += chunk;
63+
else
64+
str += dec.decode(chunk, { stream: true });
65+
}
66+
return str;
67+
}
68+
69+
/**
70+
* @param {AsyncIterable|ReadableStream|Readable} stream
71+
* @returns {Promise<any>}
72+
*/
73+
async function json(stream) {
74+
const str = await text(stream);
75+
return JSONParse(str);
76+
}
77+
78+
module.exports = {
79+
arrayBuffer,
80+
blob,
81+
buffer,
82+
text,
83+
json,
84+
};

0 commit comments

Comments
 (0)