Open
Description
It would be nice if functions like Stream.firstWhere
all had a cancellable version returning CancellableOperation
. The existing version can not be cancelled, and only cancel their subscriptions when receiving the relevant event or an error.
The implementation could look something like:
extension _CancellableFirstWhereEx<T> on Stream<T> {
CancelableOperation<T> cancellableFirstWhere(bool Function(T) predicate) {
late final CancelableCompleter<T> completer;
final subscription = listen(
null,
// ignore: avoid_types_on_closure_parameters
onError: (Object e) => completer.completeError(e),
cancelOnError: true,
);
completer = CancelableCompleter<T>(
onCancel: () {
subscription.cancel();
},
);
subscription.onData((value) {
if (predicate(value)) {
subscription.cancel();
completer.complete(value);
}
});
return completer.operation;
}
}