Skip to content

Commit 77506ca

Browse files
committed
stream: add forEach method
Add a `forEach` method to readable streams to enable concurrent iteration and align with the iterator-helpers proposal. Co-Authored-By: Robert Nagy <[email protected]> PR-URL: #41445 core review + tests
1 parent 6b7b0b7 commit 77506ca

File tree

4 files changed

+182
-10
lines changed

4 files changed

+182
-10
lines changed

doc/api/stream.md

Lines changed: 61 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1751,7 +1751,7 @@ added: REPLACEME
17511751
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
17521752
abort the `fn` call early.
17531753
* `options` {Object}
1754-
* `concurrency` {number} the maximal concurrent invocation of `fn` to call
1754+
* `concurrency` {number} the maximum concurrent invocation of `fn` to call
17551755
on the stream at once. **Default:** `1`.
17561756
* `signal` {AbortSignal} allows destroying the stream if the signal is
17571757
aborted.
@@ -1795,7 +1795,7 @@ added: REPLACEME
17951795
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
17961796
abort the `fn` call early.
17971797
* `options` {Object}
1798-
* `concurrency` {number} the maximal concurrent invocation of `fn` to call
1798+
* `concurrency` {number} the maximum concurrent invocation of `fn` to call
17991799
on the stream at once. **Default:** `1`.
18001800
* `signal` {AbortSignal} allows destroying the stream if the signal is
18011801
aborted.
@@ -1830,6 +1830,65 @@ for await (const result of dnsResults) {
18301830
}
18311831
```
18321832

1833+
### `readable.forEach(fn[, options])`
1834+
1835+
<!-- YAML
1836+
added: REPLACEME
1837+
-->
1838+
1839+
> Stability: 1 - Experimental
1840+
1841+
* `fn` {Function|AsyncFunction} a function to call on each item of the stream.
1842+
* `data` {any} a chunk of data from the stream.
1843+
* `options` {Object}
1844+
* `signal` {AbortSignal} aborted if the stream is destroyed allowing to
1845+
abort the `fn` call early.
1846+
* `options` {Object}
1847+
* `concurrency` {number} the maximum concurrent invocation of `fn` to call
1848+
on the stream at once. **Default:** `1`.
1849+
* `signal` {AbortSignal} allows destroying the stream if the signal is
1850+
aborted.
1851+
* Returns: {Promise} a promise for when the stream has finished.
1852+
1853+
This method allows iterating a stream. For each item in the stream the
1854+
`fn` function will be called. If the `fn` function returns a promise - that
1855+
promise will be `await`ed.
1856+
1857+
This method is different from `for await...of` loops in that it can optionally
1858+
process items concurrently. In addition, a `forEach` iteration can only be
1859+
stopped by having passed a `signal` option and aborting the related
1860+
`AbortController` while `for await...of` can be stopped with `break` or
1861+
`return`. In either case the stream will be destroyed.
1862+
1863+
This method is different from listening to the [`'data'`][] event in that it
1864+
uses the [`readable`][] event in the underlying machinary and can limit the
1865+
number of concurrent `fn` calls.
1866+
1867+
```mjs
1868+
import { Readable } from 'stream';
1869+
import { Resolver } from 'dns/promises';
1870+
1871+
// With a synchronous predicate.
1872+
for await (const item of Readable.from([1, 2, 3, 4]).filter((x) => x > 2)) {
1873+
console.log(item); // 3, 4
1874+
}
1875+
// With an asynchronous predicate, making at most 2 queries at a time.
1876+
const resolver = new Resolver();
1877+
const dnsResults = await Readable.from([
1878+
'nodejs.org',
1879+
'openjsf.org',
1880+
'www.linuxfoundation.org',
1881+
]).map(async (domain) => {
1882+
const { address } = await resolver.resolve4(domain, { ttl: true });
1883+
return address;
1884+
}, { concurrency: 2 });
1885+
await dnsResults.forEach((result) => {
1886+
// Logs result, similar to `for await (const result of dnsResults)`
1887+
console.log(result);
1888+
});
1889+
console.log('done'); // Stream has finished
1890+
```
1891+
18331892
### Duplex and transform streams
18341893

18351894
#### Class: `stream.Duplex`

lib/internal/streams/operators.js

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ const kEof = Symbol('kEof');
2323
async function * map(fn, options) {
2424
if (typeof fn !== 'function') {
2525
throw new ERR_INVALID_ARG_TYPE(
26-
'fn', ['Function', 'AsyncFunction'], this);
26+
'fn', ['Function', 'AsyncFunction'], fn);
2727
}
2828

2929
if (options != null && typeof options !== 'object') {
@@ -147,10 +147,23 @@ async function * map(fn, options) {
147147
}
148148
}
149149

150+
async function forEach(fn, options) {
151+
if (typeof fn !== 'function') {
152+
throw new ERR_INVALID_ARG_TYPE(
153+
'fn', ['Function', 'AsyncFunction'], fn);
154+
}
155+
async function forEachFn(value, options) {
156+
await fn(value, options);
157+
return kEmpty;
158+
}
159+
// eslint-disable-next-line no-unused-vars
160+
for await (const unused of this.map(forEachFn, options));
161+
}
162+
150163
async function * filter(fn, options) {
151164
if (typeof fn !== 'function') {
152-
throw (new ERR_INVALID_ARG_TYPE(
153-
'fn', ['Function', 'AsyncFunction'], this));
165+
throw new ERR_INVALID_ARG_TYPE(
166+
'fn', ['Function', 'AsyncFunction'], fn);
154167
}
155168
async function filterFn(value, options) {
156169
if (await fn(value, options)) {
@@ -160,7 +173,12 @@ async function * filter(fn, options) {
160173
}
161174
yield* this.map(filterFn, options);
162175
}
163-
module.exports = {
176+
177+
module.exports.streamReturningOperators = {
178+
filter,
164179
map,
165-
filter
180+
};
181+
182+
module.exports.promiseReturningOperators = {
183+
forEach,
166184
};

lib/stream.js

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,10 @@ const {
3131
promisify: { custom: customPromisify },
3232
} = require('internal/util');
3333

34-
const operators = require('internal/streams/operators');
34+
const {
35+
streamReturningOperators,
36+
promiseReturningOperators,
37+
} = require('internal/streams/operators');
3538
const compose = require('internal/streams/compose');
3639
const { pipeline } = require('internal/streams/pipeline');
3740
const { destroyer } = require('internal/streams/destroy');
@@ -46,12 +49,18 @@ Stream.isDisturbed = utils.isDisturbed;
4649
Stream.isErrored = utils.isErrored;
4750
Stream.isReadable = utils.isReadable;
4851
Stream.Readable = require('internal/streams/readable');
49-
for (const key of ObjectKeys(operators)) {
50-
const op = operators[key];
52+
for (const key of ObjectKeys(streamReturningOperators)) {
53+
const op = streamReturningOperators[key];
5154
Stream.Readable.prototype[key] = function(...args) {
5255
return Stream.Readable.from(ReflectApply(op, this, args));
5356
};
5457
}
58+
for (const key of ObjectKeys(promiseReturningOperators)) {
59+
const op = promiseReturningOperators[key];
60+
Stream.Readable.prototype[key] = function(...args) {
61+
return ReflectApply(op, this, args);
62+
};
63+
}
5564
Stream.Writable = require('internal/streams/writable');
5665
Stream.Duplex = require('internal/streams/duplex');
5766
Stream.Transform = require('internal/streams/transform');

test/parallel/test-stream-forEach.js

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const {
5+
Readable,
6+
} = require('stream');
7+
const assert = require('assert');
8+
const { setTimeout } = require('timers/promises');
9+
10+
{
11+
// forEach works on synchronous streams with a synchronous predicate
12+
const stream = Readable.from([1, 2, 3]);
13+
const result = [1, 2, 3];
14+
(async () => {
15+
await stream.forEach((value) => assert.strictEqual(value, result.shift()));
16+
})().then(common.mustCall());
17+
}
18+
19+
{
20+
// forEach works an asynchronous streams
21+
const stream = Readable.from([1, 2, 3]).filter(async (x) => {
22+
await Promise.resolve();
23+
return true;
24+
});
25+
const result = [1, 2, 3];
26+
(async () => {
27+
await stream.forEach((value) => assert.strictEqual(value, result.shift()));
28+
})().then(common.mustCall());
29+
}
30+
31+
{
32+
// forEach works on asynchronous streams with a asynchronous forEach fn
33+
const stream = Readable.from([1, 2, 3]).filter(async (x) => {
34+
await Promise.resolve();
35+
return true;
36+
});
37+
const result = [1, 2, 3];
38+
(async () => {
39+
await stream.forEach(async (value) => {
40+
await Promise.resolve();
41+
assert.strictEqual(value, result.shift());
42+
});
43+
})().then(common.mustCall());
44+
}
45+
46+
{
47+
// Concurrency + AbortSignal
48+
const ac = new AbortController();
49+
let calls = 0;
50+
const forEachPromise =
51+
Readable.from([1, 2, 3, 4]).forEach(async (_, { signal }) => {
52+
calls++;
53+
await setTimeout(100, { signal });
54+
}, { signal: ac.signal, concurrency: 2 });
55+
// pump
56+
assert.rejects(async () => {
57+
await forEachPromise;
58+
}, {
59+
name: 'AbortError',
60+
}).then(common.mustCall());
61+
62+
setImmediate(() => {
63+
ac.abort();
64+
assert.strictEqual(calls, 2);
65+
});
66+
}
67+
68+
{
69+
// Error cases
70+
assert.rejects(async () => {
71+
await Readable.from([1]).forEach(1);
72+
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
73+
assert.rejects(async () => {
74+
await Readable.from([1]).forEach((x) => x, {
75+
concurrency: 'Foo'
76+
});
77+
}, /ERR_OUT_OF_RANGE/).then(common.mustCall());
78+
assert.rejects(async () => {
79+
await Readable.from([1]).forEach((x) => x, 1);
80+
}, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
81+
}
82+
{
83+
// Test result is a Promise
84+
const stream = Readable.from([1, 2, 3, 4, 5]).forEach((_) => true);
85+
assert.strictEqual(typeof stream.then, 'function');
86+
}

0 commit comments

Comments
 (0)