Error handling in RxJava
RxJava makes error handling a snap. Just use
onError
, right?
That's fine and dandy if you're the subscriber, but what about exceptions that occur inside your operators? How do you ensure that an
Exception
in, say, flatMap()
gets passed along to onError
? And what if you don't want the exception to terminate the sequence?Unchecked Exceptions
For the most part, unchecked exceptions are automatically forwarded to
onError
. For example, the following code prints out "Error!":Observable.just("Hello!")
.map(input -> { throw new RuntimeException(); })
.subscribe(
System.out::println,
error -> System.out.println("Error!")
);
Fatal exceptions are the... uh... exception. These are exceptions so severe that RxJava itself cannot keep operating, such as
StackOverflowError
. In these situations, the exception is re-thrown and the JVM is shut down. You can find a full list of fatal exceptions in Exceptions.throwIfFatal()
.Checked Exceptions
Suppose you've got this method signature:
String transform(String input) throws IOException;
What if we want to call this method in an operator? Even though RxJava has its own system for handling errors, checked exceptions still must be handled by your code. That means you have to add your own
try-catch
block.
In this example, we take checked exceptions and convert them to unchecked exceptions:
Observable.just("Hello!")
.map(input -> {
try {
return transform(input);
} catch (Throwable t) {
throw Exceptions.propagate(t);
}
})
I find it's easiest to deal with exceptions when they have the fewest wrappers around them.
Exceptions.propagate()
helps; it only wraps the exception if it's checked. Along those same lines, flatMap()
(or any other operator that returns an Observable
) can simply return Observable.error()
instead of wrapping/throwing:Observable.just("Hello!")
.flatMap(input -> {
try {
return Observable.just(transform(input));
} catch (Throwable t) {
return Observable.error(t);
}
})
You don't need to call
onError
in the case of a checked exception, either. You could always handle your checked exception but keep the sequence going. For example, you could return an error value from the map, which onNext
knows how to handle.Masking Exceptions
I want to clear up something that many RxJava beginners get wrong:
onError
is an extreme event that should be reserved for times when sequences cannot continue. It means that there was a problem in processing the current item such that no future processing of any items can occur.
It's the Rx version of
try-catch
: it skips all code between the exception and onError
. Sometimes you want this behavior: suppose you're downloading data then saving it to the disk. If the download fails, you want to skip saving. But what about if you're, say, polling data from a source? If one poll fails you still want to continue polling. Or maybe, in the previous example, you want to save dummy data in the case that the download fails.
In other words: often times, instead of calling
onError
and skipping code, you actually want to call onNext
with an error state. It's much easier to handle problems in onNext
since you still get to run all your code and the stream isn't terminated.1
How do we cover up
Exceptions
instead of passing it along to onError
? For that you have a couple options:onErrorReturn()
, which replacesonError
with a singleonNext(value)
(followed byonCompleted()
, which logically follows the final emission).onErrorResumeNext()
, which replaces the current stream with an entirely newObservable
.
You might use it like this:
Observable.just("Request data...")
.map(this::dangerousOperation)
.onErrorReturn(error -> "Empty result")
If
dangerousOperation()
throws an exception, it'll now return "Empty result" instead of calling onError
.
It's worth noting that, when using these operators, the upstream
Observables
are still going to shut down! They've already seen a terminal event (onError
); all that onError[Return|ResumeNext]
does is replace the onError
notification with a different sequence downstream.
For example, if you have this sequence:
Observable.interval(1, TimeUnit.SECONDS)
.map(input -> { throw new RuntimeException(); })
.onErrorReturn(error -> "Uh oh")
.subscribe(System.out::println);
You might expect the
interval
to continue emitting after the map
throws an exception, but it doesn't! Only the downstream subscribers avoid onError
. The code above would just print "Uh oh" once.
If you want to fix things upstream, then you can look into using
retry()
. It re-subscribes to the source when it receives onError
, giving your sequence a second chance.
Here's a stream that fails every once in a while, but every time it does we call
retry()
:Observable.interval(1, TimeUnit.SECONDS)
.map(input -> {
if (Math.random() < .5) {
throw new RuntimeException();
}
return "Success " + input;
})
.retry()
.subscribe(System.out::println);
One has to be careful when using
retry()
because it's just re-calling subscribe()
when it encounters an error, which creates a fresh new subscription (and all that entails). You might expect the above sequence to output "Success 0", "Success 1", "Success 2", etc. but it actually restarts at 0 every time an error occurs. That's because interval()
starts counting at 0 each time it gets a new subscriber.
Thanks to Erik Hellman and Jake Wharton for proofreading this article.
1 Using
onError
incorrectly is one of the core problems with Observables
in Retrofit 1. Any non-200 responses resulted in onError
which made composition difficult. This problem has been fixed in Retrofit 2; now you can use Observable<Response<Type>>
or Observable<Result<Type>>
, which allows you to handle non-200s in onNext
.
No comments:
Post a Comment