Tuesday, 16 January 2018

Operator - Create



Create an Observable from scratch by means of a function



You can create an Observable from scratch by using the Create operator. You pass this operator a function that accepts the observer as its parameter. Write this function so that it behaves as an Observable — by calling the observer’s onNext, onError, and onCompleted methods appropriately.

A well-formed finite Observable must attempt to call either the observer’s onCompleted method exactly once or its onError method exactly once, and must not thereafter attempt to call any of the observer’s other

Check this link for observable : http://reactivex.io/RxJava/javadoc/rx/Observable.html#create(rx.Observable.OnSubscribe)


Simple Example
**********************************************
1. Create a simpe method
----------------------------------------------------------------------
static Observable<String> methodName(){
return Observable.<String>create(subscriber ->{

subscriber.onNext("One");
subscriber.onNext("Two");

subscriber.onComplete();

Throwable throwable = new Throwable("Oops some error");
subscriber.onError(throwable);
});
---------------------------------------------------------------------
2. Call method from main method
----------------------------------------------------------------------
methodName().subscribe(System.out::println);
---------------------------------------------------------------------
**********************************************


ObservableOnSubscribe
**********************************************
1. Create a observable
----------------------------------------------------------------------
    static Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {

            observableEmitter.onNext(1);
            observableEmitter.onNext(2);
            observableEmitter.onNext(3);

            System.out.println("Before Complete isDisposed()-"+observableEmitter.isDisposed()); // False

            observableEmitter.onComplete();
            System.out.println("After Complete isDisposed()-"+observableEmitter.isDisposed());// True
        }
    });
---------------------------------------------------------------------
2. Call method from main method
----------------------------------------------------------------------
observable.subscribe(System.out::println);
---------------------------------------------------------------------
**********************************************


Fibbonacci Series
**********************************************
1. Create a class
----------------------------------------------------------------------
import io.reactivex.Observable;

public class RxFibonacci {

    static Observable<Integer> fibs(){
        return Observable.create(subscriber -> {

            int prev = 0;
            int current = 1;
            subscriber.onNext(0);
            subscriber.onNext(1);

            while (!subscriber.isDisposed()){
                int oldprev = prev;
                prev = current;
                current += oldprev;
                subscriber.onNext(current);
            }
        });
    }
}

---------------------------------------------------------------------
2. Create a reader class
----------------------------------------------------------------------
import io.reactivex.Observable;
import io.reactivex.schedulers.Schedulers;

import java.io.BufferedReader;
import java.io.InputStreamReader;

public class RxReader {

   static Observable<String> lines(BufferedReader reader){
        return Observable.<String>create(subscriber -> {

            String line;

            while ((line = reader.readLine()) != null){
                subscriber.onNext(line);

                if(subscriber.isDisposed()){
                    break;
                }
            }

            subscriber.onComplete();

        }).subscribeOn(Schedulers.io());
    }

    static Observable<String> linesFromInput(){
        return lines(new BufferedReader(new InputStreamReader(System.in)));
    }
}

---------------------------------------------------------------------
3. call it from parent class main method
---------------------------------------------------------------------
RxReader.linesFromInput()
.map(line -> Integer.parseInt(line))
.observeOn(Schedulers.trampoline())
.flatMapMaybe(number -> RxFibonacci.fibs().elementAt(number))
.blockingSubscribe(System.out::println);
---------------------------------------------------------------------

**********************************************










































22

No comments:

Post a Comment

Calling method Sequencely

1. Execute multiple task sequencly (WAY-1) ------------------------------------------------------------------------------ import io.re...