Skip to content

Decouple switching logic from switchMap operator and consistent naming #1335

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
elizarov opened this issue Jul 15, 2019 · 9 comments
Closed
Milestone

Comments

@elizarov
Copy link
Contributor

We currently have a switchMap { ... } operator that is similar to the corresponding swithMap operator from Rx but from the standpoint of Kotlin coroutines looks like an amalgam of different operations that might need to be decoupled. The most basic operation seems to be this one, tentatively named as switchTransform:

fun <T, R> Flow<T>.switchTransform(transform: suspend FlowCollector<R>.(value: T) -> Unit): Flow<R>

It would be similar to Flow.transform but with a difference that transform waits for transformation to complete for each element while switchTransfrom would cancel the ongoing transformation on new incoming element and start it again.

Now, note that transform operator can be used to implement the host of basic operators:

filter(predicate) = transform { if (predicate(it)) emit(it) }
map(block) = transform { emit(block(it)) }
flattenConcat() = transform { emitAll(it) }
flatMapConcat(block) = map(block).flattenConcat() = transform  { emitAll(block(it)) } 
onEach(block) = transform { block(it); emit(it) }
collect(block) = transform { block(it) }.collect()

So, if we have a basic switchTransform operator, then it would be fitting to potentially have (at least reserve the names) the whole family of switchXxx operators that are similar to the above above when you replace transform with switchTransform in their implementations.

Moreover, there is a use case (see #1269) for the collectLatest terminal operator that should be called switchCollect under this proposed nomenclature, because:

swithCollect(block) = switchTransform { block(it) }.collect()

However, this switchXxx nomenclature has the following problem.

Under the proposed switchXxx nomenclature switchMap operator shall have the same signature as map (operates on Flow<T>) and should be equivalent to switchTransform { emit(block(it)) }. But we already have experimental switchMap operator that operates on Flow<Flow<T>> and does switchTransform { emitAll(block(it)) } which under the new nomenclature should have been called switchFlattenMapConcat. It is not clear how to proceed.

@LouisCAD
Copy link
Contributor

LouisCAD commented Jul 15, 2019

To me, "Latest" as a suffix is more meaningful than "switch" as a prefix, particularly regarding the cancellation behavior. I'd still like to have switchMap as an alias to flatMapConcatLatest for the conciseness and the less-mouthful spelling.

That's only my take, I'm interested to know what you and others think about switch prefix and Latest suffix, or if there are other naming alternatives.

@zach-klippenstein
Copy link
Contributor

zach-klippenstein commented Jul 15, 2019

switchTransform (or whatever it ends up getting called) feels like a really powerful operator, the FlowCollector API is so simple and powerful it is great to work with directly.

I agree with @LouisCAD, the switch term seems to confuse most people I've seen learning Rx. As prior art, Reactive Swift has also replaced it with latest. They also have taken a parameterized approach - their flattening operators take a FlattenStrategy parameter that controls merge/concat/latest. This is nice because you only have to learn one set of operator names. Documentation is here: https://github.com/ReactiveCocoa/ReactiveSwift/blob/master/Documentation/BasicOperators.md#flattening-event-streams

I think the names switchFlattenMapConcat and flatMapConcatLatest are also more confusing - "concat" is only meaningful when the streams aren't truncated. If the streams are truncated, as the "latest" implies, there's no different between concatenation and merging because concurrent emissions are prevented by definition.

@elizarov
Copy link
Contributor Author

Ok. So here is a take on the xxxLatest nomenclature. We can have transformLatest that cancels ongoing transformation and the following operators that are potentially derived from it:

mapLatest(block) = transformLatest { emit(block(it)) }
flattenLatest() = transformLatest { emitAll(it) } 
flatMapLatest(block) = map(block).flattenLatest() = transformLatest  { emitAll(block(it)) } 
collectLatest(block) = transformLatest { block(it) }.collect()

This would also mean that we deprecate switchMap and rename it to flatMapLatest.

This nomenclature produces the following distinctly named variants of flatMap operator with different merging strategies:

  • flatMapConcat/flattenConcat -- concatenates all flows.
  • flatMapLatest/flattenLatest -- cancels ongoing flow as soon as the new one appears.
  • flatMapMerge/flattenMerge -- concurrently run flows and merge their results.

Indeed, I like the suffix nomenclature more. I don't like introducing a strategy enum, though. It only makes subsequent dead-code elimination in Android via R8, in Kotiln/JS, and in Kotlin/Native harder.

P.S. There is might be some confusion with combineLatest operator that also ends with latest suffix but is otherwise completely unrelated to the abovexxxLatest family. Having thought about it a bit I don't immediately see it as a big issue, even though it still bugs me a little.

@elizarov elizarov added this to the 1.3 milestone Jul 16, 2019
@LouisCAD
Copy link
Contributor

I'm wondering if combineLatest shouldn't be renamed to combineLast as it doesn't cancel the combining lambda, and later, be complemented by a combineLatest, or mergeLatest operator that cancels the merging/combining lambda if a new value comes by.

@zach-klippenstein
Copy link
Contributor

zach-klippenstein commented Jul 16, 2019

I would interpret combineLast to mean it waits until all the flows complete, then combines the last values emitted before completion.

This is a general issue that affects all operators that take suspend functions isn't it? Every such operator could have a cancelling and non-cancelling variant. A more scalable solution might be to use a dedicated operator: assume that suspending operators will not cancel by default, and introduce a cancelOnNext() operator that cancels the downstream emit call whenever the upstream flow emits a new value. I think this could solve the transform/flatMap/collect cases too.

The implementation would basically be the same as switchMap's:

fun <T> Flow<T>.cancelOnNext(): Flow<T> = scopedFlow { downstream ->
    var previousFlow: Job? = null
    collect { value ->
        previousFlow?.cancel(ChildCancelledException())
        previousFlow?.join()
        // Undispatched to have better user experience in case of synchronous flows
        previousFlow = launch(start = CoroutineStart.UNDISPATCHED) {
            downstream.emit(value)
        }
    }
}

So then transformLatest would be cancelOnNext().transform(block).

@elizarov
Copy link
Contributor Author

elizarov commented Jul 16, 2019

I'm wondering if combineLatest shouldn't be renamed to combineLast

@LouisCAD That is indeed something to ponder about. It did not occur to me that combineLatest can be viewed at in this way.

I planned to have separate design discussion on the extraction of basic functionality that drive both (current) combineLatest and zip operators, that is the core functionality that allows to collect from multiple flows at the same time, and the name for this basic operator that does it. And this is additional consideration into that bucket.

@elizarov
Copy link
Contributor Author

@zach-klippenstein Terrific. This (tentative) cancelOnNext is the core primitive. So we can have:

flattenLatest() = cancelOnNext().flattenConcat()
// Note: block in swtichMap is cancelled on emission too, so this is the correct impl: 
flatMapLatest(block) = cancelOnNext().flatMapConcat(block)
collectLatest(block) = cancelOnNext().collect(block)

The trick is how to name it. Note, that we don't have the concept of OnNext. On the other hand, it is quite a basic primitive and it looks somewhat similar to conflate:

  • conflate adapts fast emitter to slow collector by skipping emitted values.
  • cancelOnNext adapts fast emitter to slow collector by cancelling collector.

@zach-klippenstein
Copy link
Contributor

Yes that name was terrible, didn't think too much about it.

Maybe a bit verbose, but conflateByDropping vs conflateByCancelling?

@circusmagnus
Copy link

Just "switch"? "switchOver"? Conflate has a concrete meaning for me in coroutines, that is applied to upstream, not "sidestream".

qwwdfsad added a commit that referenced this issue Jul 19, 2019
* Promote the bare minimum Flow API to stable for incoming 1.3.0-RC
* Extract SafeFlow for nicer stacktraces
* Demote switchMap and combineLatest to preview features as we may want to rework in #1262 and #1335
* Make unsafeFlow less explicit, return preview status to AbstractFlow
qwwdfsad added a commit that referenced this issue Jul 30, 2019
…nd to have a consistent and meaningful naming scheme for the rest of the 'latest' operators

    * Make flatMapLatest pure, do not leak cancellation behaviour to downstream
    * Make *latest buffered by default to amortize constant re-dispatch cost
    * Introducing transformLatest
    * Introducing mapLatest

Fixes #1335
qwwdfsad added a commit that referenced this issue Aug 6, 2019
…nd to have a consistent and meaningful naming scheme for the rest of the 'latest' operators

    * Make flatMapLatest pure, do not leak cancellation behaviour to downstream
    * Make *latest buffered by default to amortize constant re-dispatch cost
    * Introducing transformLatest
    * Introducing mapLatest

Fixes #1335
qwwdfsad added a commit that referenced this issue Aug 9, 2019
…nd to have a consistent and meaningful naming scheme for the rest of the 'latest' operators

    * Make flatMapLatest pure, do not leak cancellation behaviour to downstream
    * Make *latest buffered by default to amortize constant re-dispatch cost
    * Introducing transformLatest
    * Introducing mapLatest

Fixes #1335
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants