Skip to content

Flow error handling and launchIn #1263

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
elizarov opened this issue Jun 8, 2019 · 23 comments
Closed

Flow error handling and launchIn #1263

elizarov opened this issue Jun 8, 2019 · 23 comments
Labels

Comments

@elizarov
Copy link
Contributor

elizarov commented Jun 8, 2019

Here is a proposal to address the pattern of code that is commonly found in UI code that needs to launch a coroutine collecting a flow and updating UI such that:

uiScope.launch { 
    responseDataFlow().collect { updateDispaly(it) } 
}

launchIn

The first piece is launchIn(scope) operator that launches a coroutine collecting a flow and returns a Job. Hereby I propose that this operator does not take any lambda, so that the replacement for the original code pattern is:

responseDataFlow()
    .onEach { updateDisplay(it) }
    .launchIn(uiScope)

This usage pattern maintains a consistent rule that execution context specifications in flows always work on "upstream" flows (like in flowOn). Another reason for such a syntactic form will be apparent in the next sections.

onError

The most-generic basic handling operator is onError. It catches exception that happens in flow before this operator is applied and pass the caught exception to the supplied lambda. Example:

responseDataFlow()
    .onEach { updateDisplay(it) }
    .onError { e -> showErrorMessage(e) } // catch errors in response flow and updateDisplay
    .launchIn(uiScope)

Notice how onError is written after onEach to resemble the regular try/catch code. Here it is important that collectIn takes no lambda and cannot fail, so writing onError before launchIn always catches all exceptions.

Implementation note: onError operator is already implemented in flow code, but now it is a private function called collectSafely that is used internally to implement other error-handling operators.

onCompletion

This operator calls its lambda whenever a flow completes for any reason, essentially working as try/finally:

responseDataFlow()
    .onEach { updateDisplay(it) }
    .onError { e -> showErrorMessage(e) }
    .onCompletion { enableActionButtons() }
    .launchIn(uiScope)

Advanced error handling

In addition to passing in an exception, onError operator also has FlowCollector receiver, which enables concise encoding of common error-handling patterns to replace an error with a value emitted to the flow. Some of those patterns are already provided as ready-to-use operators:

onErrorCollect(fallback: Flow<T>) = onError { emitAll(fallback) } 
onErrorReturn(fallback: T) = onError { emit(fallback) }

TBD: Shall we rename them to onErrorEmitAll and onErrorEmit or leave them as is?

But onError can be used directly for more advanced case. For example, if there is a flow of some Response objects that support a Response.Failure case that wraps exception, then one can easily translate an exception in the upstream stream to a failed response:

responseDataFlow()
    .onError { e -> emit(Response.Failure(e)) }
    .onEach { updateDisplay(it) } // failure in repose are encoded as values in the flow
    .launchIn(uiScope)

Retry

For consistency I also propose to rename retry to onErrorRetry.

Open questions

  1. Shall onError be configured with (Throwable)->Boolean predicate that defaults to "always true" just like the other onErrorXxx operators? It is not ideal to have a two-lambda function, but predicate here is a "strategy" that might be stored in a separate variable, which actually might result in quite a readable code like onError(applicationErrors) { e -> displayAppErrorMessage(e) }

  2. onError vs onCompletion. One might envision a separate set of onCompletionXxx operators that are similar to onErrorXxx but also perform the corresponding action on the normal completion of the flow, optionally accepting a (Throwable?)->Boolean predicate. It is an open question if there are any use-cases to that.

@LouisCAD
Copy link
Contributor

LouisCAD commented Jun 8, 2019

launchIn example snippet doesn't have launchIn inside. Did you mean collectIn? I'm confused.
Also, the text in onError section is a little unclear to me (look at the sentence where "it's" is used in place of "its").

@streetsofboston
Copy link

Notice how onError is written after onEach to resemble the regular try/catch code.

Question:
Is the order in which onError and onEach must be written enforced by onError returning a different type than onEach? Or is there another way the order can be enforced by the compiler.

If the order cannot be enforced, I'm a little worried about coding mistakes that are easy to make but hard to figure out.

@zach-klippenstein
Copy link
Contributor

zach-klippenstein commented Jun 8, 2019

Agree with @streetsofboston, I think this inconsistency could potentially be confusing:

  • onEach and onCompletion are both just for performing side effects - they don't transform the source flow in any way.
  • onError, by default, doesn't just allow performing side effects but also consumes the exception. This is more consistent with try/catch (where you need to explicitly re-throw) but inconsistent with the other two on* operators.

I would rather keep onError just for side effects, and calling the proposed operator something like catchError. Although I don't think this operator is actually needed at all, its use cases are all covered by onErrorCollect already (implement onErrorReturn, rethrow a different exception, etc.).

@elizarov
Copy link
Contributor Author

elizarov commented Jun 8, 2019

launchIn example snippet doesn't have launchIn inside. Did you mean collectIn? I'm confused.
Also, the text in onError section is a little unclear to me (look at the sentence where "it's" is used in place of "its").

Oops. I've corrected the text. It was supposed to be launchIn in all examples. I also made error handling examples more explicit.

@elizarov
Copy link
Contributor Author

elizarov commented Jun 8, 2019

Is the order in which onError and onEach must be written enforced by onError returning a different type than onEach? Or is there another way the order can be enforced by the compiler.

I don't see any easy way to enforce an order and I don't see why it needs to enforced. It's like try/catch. You catch error in the code in the above try block. We can tweak the naming to make "modifying" nature of onError more explicit. We can even use catch (instead of onError) and finally (instead of onCompletion):

responseDataFlow()
    .onEach { updateDisplay(it) }
    .catch { e -> showErrorMessage(e) }
    .finally { enableActionButtons() }
    .launchIn(uiScope)

(If go with this naming, we need to change the names of all the other onErrorXxx operators for consistency)

Note, that you can use "error catching" operators multiple times to catch errors in differents parts of the flow:

flow
    .catch { e -> ... } // catch flow errors
    .map { first(it) } 
    .catch { e -> ... } // catch first(it) failures
    .map { second(it) } 
    .catch { e -> ... } // catch second(it) failures
    .collect { ... } // do something with elements, can also use launchIn

@evant
Copy link

evant commented Jun 9, 2019

Maybe I'm missing something, but why are we trying to simulate try/catch instead of using it? Ex: your ui code could be

uiScope.launch {
  try {
    responseDataFlow().collect { updateDisplay(it) }
  } catch (e: Exception) {
    showErrorMessage(e)
  } finally {
    enableActionButtons()
  }
}

I mean, maybe the proposed operators would be useful in other situations, but for the ui use-case this seems way more clean too me.

@elizarov
Copy link
Contributor Author

elizarov commented Jun 9, 2019

... are we trying to simulate try/catch instead of using it?

Yes. The goal is have more concise and more composable way to do try/catch/finally so that, for example, you can encapsulate error handling logic specific to your application as a flow operator and reuse it everywhere in a declarative way.

Generally, using "bare" try/catch/finally in Kotlin is not very idiomatic. It is kind of "low-level" primitive that you usually find encapsulated inside higher-level operations. So far, coroutines are missing those "high-level" error-handling operations so you have to use try/catch/finally and the resulting code just looks "out-of-place" -- a piece of imperative error-handling in sea of declarative code.

qwwdfsad added a commit that referenced this issue Jun 25, 2019
@antanas-arvasevicius
Copy link

Hi,
If there is a need a concise and more composable way to do try/catch/finally why not just port scala's Trymonad and design from there? ( https://www.scala-lang.org/api/2.9.3/scala/util/Try.html )
These flow methods "catch" "finally" looks very un-composable and just copy of try/catch/finally keywords to methods.
Also it's hard to follow, do "catch" also do map (convert exception to value and bypasses to channel?)

Better way would be just transfer monadic Try as a value over channel and remove "catch" "finally" methods from API.

So if:
A. user wants composable nice error handling it will use something like that:

val Try.value get() = when (it) { is Success -> it.value, else -> throw (it.e) }

// Used imaginary futures kotlin's keyword `try` which is expression with return `Try<T>` object  

try {
  responseDataFlow()
    .map { try { first(it) } }
    .map { when (it) {
          is Success -> try { second(it.value) }
          is Failure -> // <-- handle failure 
        }
     }
    .collect {  it ->
         println(it.value) 
    }
 }
} catch (e) { globalHandler(e) }

B. and if there is a need global handling (use example by @evant ):

uiScope.launch {
  try {
    responseDataFlow().collect { updateDisplay(it) }
  } catch (e: Exception) {
    showErrorMessage(e)
  } finally {
    enableActionButtons()
  }
}

@fvasco
Copy link
Contributor

fvasco commented Jul 5, 2019

@antanas-arvasevicius see https://kotlinlang.org/api/latest/jvm/stdlib/kotlin/run-catching.html

@antanas-arvasevicius
Copy link

Are there other alternatives for this error handling proposal?

It feels that this contradicts a coroutine's design which solved Future/Promise like API for error handling and now we can use just try/catch/finally instead.
This proposal will add new API for error handling and this neglects coroutine effort to simplify handling.

Exception is imperative construct which bubbles up the call stack until caught so why do we need "declarative way" of handling it? Maybe better we should make harder use of Exceptions inside a flow stream instead of adding API's which are hard to reason.
And if we'd try to think only in a data flow programming what is then exception and what is call-stack?

@elizarov
Copy link
Contributor Author

elizarov commented Jul 8, 2019

@antanas-arvasevicius I'm not sure I understand your concerns. This new error handling approach is not a replacement, but an addition. Please, check out topic-started issue. I think it gives a pretty completing use-case for this new mechanism.

@antanas-arvasevicius
Copy link

@elizarov What is my concern that this addition will become "best practice" and we end up in the Promise API hell as was before coroutines implemented, and seems like Flow is becoming just implementation of ReactiveStreams specification.
I'm questioning if these API really is needed and no other way exists to handle the cases written in this topic.
We have medium android application implemented with reactive streams API's and it is such a hard to read and understand how streams interacts with GUI all exception paths, side effects, subscribeOn, observeOn. I can see that Flow also will have "launchIn" and it will (is) becoming the same bloatware API as ReactiveStreams..
I hope streams can also be simplified as Future/Promises with coroutines.
Maybe we can remove capabilities from Flow to make it simpler, maybe do not allow any context switching with launchIn / observeOn/subscribeOn. All streams must be Result with lower level API where somebody can access Result and to transform/map on that and leave higher api which unwraps Result to T for normal uses.

From practice I can say these operators which a planned in this topic is overused and it make harm to codebases. Programmers when see these API they tend to be cool and do everything with cool stream operators all system over time becomes stream oriented with stream DSL with no "legacy sh*t".

Better that Flow would be just a kind of async iterator with for (item in flow) {} instead of many custom API which mimics default language constructs.

With this goal as you said - you can encapsulate error handling logic specific to your application as a flow operator and reuse it everywhere in a declarative way. - seems that Flow is going exactly to a style as I said - that Flow is everything and if you started with a Flow you'll end up with a Flow. The same as with Future/Promises. Which I've concerned about at large.

@elizarov
Copy link
Contributor Author

elizarov commented Jul 8, 2019

@antanas-arvasevicius I frankly believe that in flow-UI interaction the main problem is not the Flow, but the highly-verbose nature of state-based UI frameworks. If you combine flow with any kind of reactive framework where you don't have to "manually" worry about synchronization of your state to UI, then it is much more natural fit.

As for replacement of flow with some language mechanism in the same way that suspending functions had replaced flow, this is indeed one possible direction of future progress.

@antanas-arvasevicius
Copy link

Plus I don't see how these proposed APIs are more composable than plain try/catch/finally? Why just not Try<T> or Result<T>?.

if someone needs flow operator which handles errors globally there is no need extra catch finally onNext it's now possible to write it simply:

fun Flow.withErrorHandler() = flow {
  uiScope.launch {
    try {
      collect { emit(it) }
    } catch (e: Exception) {
      showErrorMessage(e)
    } finally {
      enableActionButtons()
    }
  }
}

also

responseDataFlow()
    .onEach { updateDisplay(it) }
    .launchIn(uiScope)

When and why this should be used instead of just launch(Dispatchers.Main) { collect { updateDisplay(it) } } ?

Flow / Streams in general is not a simple task to understand for a new programmer and by adding multiple ways to accomplish the same thing in the library makes more difficult to learn new concepts.

Ps. when we're thinking in stream as a kind of streams of transformations (filtering, mapping) then APIs as onNext, onError looks like out of place.

I understand that for current Android Java-ish developers this is cool and nice and it's similar as RxJava, but please review this design and look for more opinions which are not affected by RxJava / Android users, at least defer it for later.

Pps. these are not stream operators they are not working on items on stream, but just are "terminating" callbacks. And stream (Flow) is designed for sequences of items not for a single item.

@LouisCAD
Copy link
Contributor

LouisCAD commented Jul 8, 2019

@antanas-arvasevicius Using launchIn can decrease the indentation level by 1 level, improving readability.
The catch * function*, which is similar to an onError doesn't get called if the scope of the collection of elements is cancelled by an operator.

In Flow, there's only flowOn. The rest works just like regular coroutines, you can switch dispatcher using withContext (or the experimental invoke operator extension they have), or keep the one from current context, that may come from the root CoroutineScope.

IMHO, Flow is like many things in life: good as long as you don't abuse it. If something can be done with a suspending function only, then no need to use Flow. I started to use Flow, but that didn't stop me from using plain suspending functions, with no plans to over-engineer it with Flow.

Oh and I you want to handle errors from a Result type, you can do it with runCatching { … } from Kotlin's stdlib, or your own alternative, then no need to use the catch function, nor a try/catch pair.

@elizarov
Copy link
Contributor Author

elizarov commented Jul 8, 2019

@antanas-arvasevicius One can indeed write their own withErrorHandler operator, but the implementation you given violates exception-transparency property of the Flow (see here for more details), which makes it an extremely dangerous operator in a langer app as it completely denies ability for local reasoning on errors in the flow. That is one of the reasons we provide catch operator, so a correct exception-transparent implementation is:

fun Flow.withErrorHandler() = 
    catch { e -> showErrorMessage(e) }
    .onCompletion { enableActionButtons() }

This implementation is clear, concise, and a correct one.

@elizarov
Copy link
Contributor Author

elizarov commented Jul 8, 2019

@antanas-arvasevicius

responseDataFlow()
    .onEach { updateDisplay(it) }
    .launchIn(uiScope)

When and why this should be used instead of just launch(Dispatchers.Main) { collect { updateDisplay(it) } } ?

These two are completely equivalent. You should use the one which better fits your coding style. If most of your code is written in a declarative way with flow operators, then you'd naturally prefer the former, but if most of your code is imperative without much use of operators, then you'd naturally prefer the latter.

@antanas-arvasevicius
Copy link

Thanks @elizarov for explanation of Exception transparency.
Now I've got why operators with simple collect with try catch, emit won't work due unwanted processing of downstream exceptions too.

To be clear how catch behaves? It will create new flow which collects a previous flow stream and when exception is got it calls a lambda and terminates downstream flow? As I understand every flow cannot proceed further if exception is got/thrown? Same as in RxJava Rx.Net? It just ends/completes with an exception?

Can catch re-throw or bypass exception if it's not interesting to similar as keyword catch(e: NeededException) { }?

Is there are special exceptions similar to coroutines cancellation exceptions?

@elizarov
Copy link
Contributor Author

elizarov commented Jul 8, 2019

@zach-klippenstein
Copy link
Contributor

catch doesn't terminate the downstream flow. It gets a FlowCollector and can emit zero, one, or more values in response to an exception. It can also rethrow exceptions just like a regular catch block. This is already much simpler than the RxJava API which requires multiple operators to do these things. E.g. onErrorReturnItem(foo) is catch { emit(foo) }. onErrorResumeNext(fooObservable) is catch { emitAll(fooFlow) }.

Flow solves a very specific problem: working with asynchronous streams of data. Treating everything as such streams is one way to write a reactive application, but it's not the only way. I think it's become the popular thing to do for Android because RxJava was much nicer to work with and compose than a bunch of simple callback APIs, but in a lot of cases simple coroutine features are nicer still. As @LouisCAD said, you can still use all the other, simpler coroutine APIs when the functionality they provide is all you need. However, there are times when you actually have a stream of data, and in those cases you want something as expressive and powerful as Flow or Reactive Streams. Every app is different, choose an architecture that makes sense for your app.

@lamba92
Copy link

lamba92 commented Oct 22, 2019

~~Is it possible that errors that happens in child jobs while .await()ing in a, say, .map { } do not get intercepted by .catch { }? ~~

In my case, I have a flow that needs some other data from a suspending function. To do so, I expose the current coroutineScope { }, then async { } some stuff in variables and then await them all before returning. Sometimes one of those child jobs may throw making the .map { } throw an JobCancellationException while on await(). The catch block just after does not intercept it and the errors gets propagated until the collection function, in my case a toList().

I suspect that is intended, because JobCancellationException is used in some logic inside catch operator and/or in Flows one.

How should I handle this issue?

Should I wrap each await() with a try/catch and re-throw the getCancellationException()? That would be very ugly!

Here some example code

Pointelss stuff
```kotlin
inline fun <T, R> Flow<T>.scopedMap(crossinline transform: suspend CoroutineScope.(value: T) -> R) =
    map { coroutineScope { transform(it) } }


myFLow()
    .scopedMap { json: MyJson ->
        //some async calls using fields json
        val data1 = async { mySuspendingCallThatMayThrowHorribly(json.field1) }
        val data2 = async { mySuspendingCallThatMayThrowHorribly(json.field2) }
        Triple(json, data1.await(), data2.await())
    }
    .catch { error: Throwable
        println("I exepect an error here but I get none! e: $error")
    }
    .toList()
```

@fvasco
Copy link
Contributor

fvasco commented Oct 22, 2019

@lamba92 You should create the coroutineScope inside the scopedMap function

@lamba92
Copy link

lamba92 commented Oct 22, 2019

The CoroutineScope is exposed using scopedMap, but that was not the problem.

I was putting the catch in an a flow I was not testing 😅 Turns out that 6 hours straight of programming at night is not a good idea...

Sorry to have bothered for such a trivial oversight.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

8 participants