Create an Observable from scratch by means of a function
You can create an Observable from scratch by using the Create operator. You pass this operator a function that accepts the observer as its parameter. Write this function so that it behaves as an Observable — by calling the observer’s onNext, onError, and onCompleted methods appropriately.
A well-formed finite Observable must attempt to call either the observer’s onCompleted method exactly once or its onError method exactly once, and must not thereafter attempt to call any of the observer’s other
Check this link for observable : http://reactivex.io/RxJava/javadoc/rx/Observable.html#create(rx.Observable.OnSubscribe)
Simple Example
**********************************************
1. Create a simpe method
----------------------------------------------------------------------
static Observable<String> methodName(){
return Observable.<String>create(subscriber ->{
subscriber.onNext("One");
subscriber.onNext("Two");
subscriber.onComplete();
Throwable throwable = new Throwable("Oops some error");
subscriber.onError(throwable);
});
---------------------------------------------------------------------
2. Call method from main method
----------------------------------------------------------------------
methodName().subscribe(System.out::println);
---------------------------------------------------------------------
**********************************************
ObservableOnSubscribe
**********************************************
1. Create a observable
----------------------------------------------------------------------
static Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
observableEmitter.onNext(1);
observableEmitter.onNext(2);
observableEmitter.onNext(3);
System.out.println("Before Complete isDisposed()-"+observableEmitter.isDisposed()); // False
observableEmitter.onComplete();
System.out.println("After Complete isDisposed()-"+observableEmitter.isDisposed());// True
}
});
---------------------------------------------------------------------
2. Call method from main method
----------------------------------------------------------------------
observable.subscribe(System.out::println);
---------------------------------------------------------------------
**********************************************
Fibbonacci Series
**********************************************
1. Create a class
----------------------------------------------------------------------
import io.reactivex.Observable;
public class RxFibonacci {
static Observable<Integer> fibs(){
return Observable.create(subscriber -> {
int prev = 0;
int current = 1;
subscriber.onNext(0);
subscriber.onNext(1);
while (!subscriber.isDisposed()){
int oldprev = prev;
prev = current;
current += oldprev;
subscriber.onNext(current);
}
});
}
}
2. Create a reader class
----------------------------------------------------------------------
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;
import java.io.BufferedReader;
import java.io.InputStreamReader;
public class RxReader {
static Observable<String> lines(BufferedReader reader){
return Observable.<String>create(subscriber -> {
String line;
while ((line = reader.readLine()) != null){
subscriber.onNext(line);
if(subscriber.isDisposed()){
break;
}
}
subscriber.onComplete();
}).subscribeOn(Schedulers.io());
}
static Observable<String> linesFromInput(){
return lines(new BufferedReader(new InputStreamReader(System.in)));
}
}
3. call it from parent class main method
---------------------------------------------------------------------
RxReader.linesFromInput()
.map(line -> Integer.parseInt(line))
.observeOn(Schedulers.trampoline())
.flatMapMaybe(number -> RxFibonacci.fibs().elementAt(number))
.blockingSubscribe(System.out::println);
---------------------------------------------------------------------
22
No comments:
Post a Comment