Wednesday, 17 January 2018

Operator of Error - 1



Advanced error handling

We've already seen how we can handle an error in the observer. However, by that time, we are practically outside of the monad. There can be many kinds of errors and not every error is worth pushing all the way to the top. In standard Java, you can catch an exception at any level and decide if you want to handle it there or throw it further. Similarly in Rx, you can define behaviour based on errors without terminating the observable and forcing the observer to deal with everything.

Resume

onErrorReturn

The onErrorReturn operator allows you to ignore an error and emit one final value before terminating (successfully this time).

In the next example, we will convert an error into a normal value to be printed:
Observable<String> values = Observable.create(o -> {
 o.onNext("Rx");
 o.onNext("is");
 o.onError(new Exception("adjective unknown"));
});

values
 .onErrorReturn(e -> "Error: " + e.getMessage())
 .subscribe(v -> System.out.println(v));
Rx
is
Error: adjective unknown

onErrorResumeNext

The onErrorResumeNext allows you to resume a failed sequence with another sequence. The error will not appear in the resulting observable.
public final Observable<T> onErrorResumeNext(
    Observable<? extends T> resumeSequence)
public final Observable<T> onErrorResumeNext(
    Func1<java.lang.Throwable,? extends Observable<? extends T>> resumeFunction)

The first overload uses the same followup observable in every case. The second overload allows you to decide what the resume sequence should be based on the error that occurred.
Observable<Integer> values = Observable.create(o -> {
 o.onNext(1);
 o.onNext(2);
 o.onError(new Exception("Oops"));
});

values
 .onErrorResumeNext(Observable.just(Integer.MAX_VALUE))
 .subscribe(new PrintSubscriber("with onError: "));
with onError: 1
with onError: 2
with onError: 2147483647
with onError: Completed
There's nothing stopping the resumeSequence from failing as well. In fact, if you wanted to change the type of the error, you can return an observable that fails immediately. In standard Java, components may decide they can't handle an error and that they should re-throw it. In such cases, it is common wrap a new exception around the original error, thus providing additional context. You can do the same in Rx:
.onErrorResumeNext(e -> Observable.error(new UnsupportedOperationException(e)))
Now the sequence still fails, but you've wrapped the original error in a new error.

onExceptionResumeNext

onExceptionResumeNext only has one difference to onErrorResumeNext: it only catches errors that are Exceptions.
Observable<String> values = Observable.create(o -> {
 o.onNext("Rx");
 o.onNext("is");
 //o.onError(new Throwable() {}); // this won't be caught
 o.onError(new Exception()); // this will be caught
});

values
 .onExceptionResumeNext(Observable.just("hard"))
 .subscribe(v -> System.out.println(v));

Retry

If the error is non-deterministic, it may make sense to retry. retry re-subscribes to the source and emits everything again from the start.
public final Observable<T> retry()
public final Observable<T> retry(long count)


If the error doesn't go away, retry() will lock us in an infinite loop of retries. The second overload limits the number of retries. If errors persist and the sequence fails n times, retry(n) will fail too. Let's see this in an example
Random random = new Random();
Observable<Integer> values = Observable.create(o -> {
 o.onNext(random.nextInt() % 20);
 o.onNext(random.nextInt() % 20);
 o.onError(new Exception());
});

values
 .retry(1)
 .subscribe(v -> System.out.println(v));
0
13
9
15
java.lang.Exception
Here we've specified that we want to retry once. Our observable fails after two values, then tries again, fails again. The second time it fails the exception is allowed pass through.
In this example, we have done something naughty: we have made our subscription stateful to demonstrate that the observable is restarted from the source: it produced different values the second time around. retry does not cache any elements like replay, nor would it make sense to do so. Retrying makes sense only if there are side effects, or if the observable is hot.

retryWhen

retry will restart the subscription as soon as the failure happens. If we need more control over this, we can use retryWhen.
public final Observable<T> retryWhen(
 Func1<? super Observable<? extends java.lang.Throwable>,? extends Observable<?>> notificationHandler)
The argument to retryWhen is a function that takes an observable and returns another. The input observable emits all the errors that retryWhen encounters. The resulting observable signals when to retry:
  • if it emits a value, retryWhen will retry,
  • if it terminates with error, retryWhen will emit the error and not retry.
  • if it terminates successfully, retryWhen will terminate successfully.
Note that the type of the signaling observable and the actual values emitted don't matter. The values are discarded and the observable is only used for timing.
In the next example, we will construct a retrying policy where we wait 100ms before retrying.
Observable<Integer> source = Observable.create(o -> {
 o.onNext(1);
 o.onNext(2);
 o.onError(new Exception("Failed"));
});

source.retryWhen((o) -> o
  .take(2)
  .delay(100, TimeUnit.MILLISECONDS))
 .timeInterval()
 .subscribe(
  System.out::println,
  System.out::println);
TimeInterval [intervalInMilliseconds=21, value=1]
TimeInterval [intervalInMilliseconds=0, value=2]
TimeInterval [intervalInMilliseconds=104, value=1]
TimeInterval [intervalInMilliseconds=0, value=2]
TimeInterval [intervalInMilliseconds=103, value=1]
TimeInterval [intervalInMilliseconds=0, value=2]
Our source observable emits 2 values and immediately fails. When that happens, the observable of failures inside retryWhen emits the error. We delay that emission by 100ms and send it back to signal a retry. take(2) guarantees that our signaling observable will terminate after we receive two errors. retryWhen sees the termination and doesn't retry after the second failures.

using

The using operator is for creating observables from resources that need to be managed. It guarantees that your resources will be managed regardless of when and how subscriptions are terminated. If you were to just use create, you would have to do the managing in the traditional Java paradigm and inject it into Rx. using is a more natural way of managing resources in Rx.
public static final <T,Resource> Observable<T> using(
    Func0<Resource> resourceFactory,
    Func1<? super Resource,? extends Observable<? extends T>> observableFactory,
    Action1<? super Resource> disposeAction)
When a new subscription begins, resourceFactory leases the necessary resource. observableFactory uses that resource to produce items. When the resource is no longer needed, it is disposed of with the disposeAction. The dispose action is executed regardless of the way the subscription terminates (successfully or with a failure).
In the next example, we pretend that a string is a resource that needs managing.
Observable<Character> values = Observable.using(
 () -> {
  String resource = "MyResource";
  System.out.println("Leased: " + resource);
  return resource;
 },
 (resource) -> {
  return Observable.create(o -> {
   for (Character c : resource.toCharArray())
    o.onNext(c);
   o.onCompleted();
  });
 },
 (resource) -> System.out.println("Disposed: " + resource));

values
 .subscribe(
  v -> System.out.println(v),
  e -> System.out.println(e));
Leased: MyResource
M
y
R
e
s
o
u
r
c
e
Disposed: MyResource
When we subscribe to values, the resource factory function is called which returns "MyResource". That string is used to produce an observable which emits all of the characters in the string. Once the subscription ends, the resource is disposed of. A String doesn't need any more managing than what the garbage collector will do. Resources may actually need such managing, e.g., database connections, opened files etc.
It is important to note here that we are responsible for terminating the observable, just like we were when using the createmethod. With create, terminating is a matter of semantics. With using, not terminating defeats the point of using it in the first place. Only upon termination the resources will be released. If we had not called o.onCompleted(), the sequence would be assumed to be still active and needing its resources.

















22

No comments:

Post a Comment

Calling method Sequencely

1. Execute multiple task sequencly (WAY-1) ------------------------------------------------------------------------------ import io.re...