Tuesday, 8 May 2018

Calling method Sequencely


1. Execute multiple task sequencly (WAY-1)

------------------------------------------------------------------------------

import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
public class TestClass2 {

    public static void main(String[] args){

        Observable<String> myObservable = Observable.create(
                new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> sub) throws Exception {
                // "Emit" any data to the subscriber
                sub.onNext(showData("a"));
                sub.onNext(showData("b"));
                sub.onNext(showData("c"));
                // Trigger the completion of the event
                sub.onComplete();
            }
        });
        myObservable.subscribe();
    }

    public static String showData(String msg){
        System.out.println(msg);

        return msg;
    }

}

-------------------------------------------------------------------------------

2. Execute multiple task sequencly (WAY-2)

--------------------------------------------------------------------------------

import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;

public class TestClass3 {

    public static void main(String[] args){

        Observer<String> mySubscriber = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable disposable) {
                System.out.println("onSubscribe");
            }

            @Override
            public void onNext(String s) {
                System.out.println("onNext: " + s);
            }

            @Override
            public void onError(Throwable throwable) {
                System.out.println("Error");
            }

            @Override
            public void onComplete() {
                System.out.println("done!");
            }
        };

        mySubscriber.onNext("a");
        mySubscriber.onNext("b");
        mySubscriber.onNext("c");

        mySubscriber.onComplete();

    }
}

--------------------------------------------------------------------------------




















Friday, 19 January 2018

Temp-4



   Observable<Object> observableOne = Observable.just("java", "spring", "hibernate", 1, 2, 3);
        Observable<Integer> observableFinal = observableOne.ofType(Integer.class);
//        OR
//        Observable<String> observableFinal = observableOne.ofType(String.class); //

        observableFinal.subscribe(s -> {
            System.out.print(s);
        });
----------------------------------------------------------------------------

        Observable<String> observableOne = Observable.just("java", "spring", "hibernate", "android", "rxjava");
        ObservableSource<String> observableTwo = Observable.just("c", "c plus", "perl", "c sharp", "dot net");

        Observable<String> observableFinal = observableOne.mergeWith(observableTwo);

        observableFinal.subscribe(s -> {
            System.out.print(s);
        });

-------------------------------------------------------------------------------

        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                String s = null;
                e.onNext("STRUTS");
                e.onNext("SPRING");
                e.onNext("TILES");
                e.onNext("JSTL");
                e.onComplete();
            }
        });
        ConnectableObservable<String> publishObservable = observable.publish();

        publishObservable.subscribe(s -> { System.out.println("subscriber one  "+s);});
        publishObservable.subscribe(s -> { System.out.println("subscriber two  "+s);});
        publishObservable.connect();
        publishObservable.subscribe(s -> { System.out.println("subscriber three  "+s);});

--------------------------------------------------------------------------------------

        Observable<Integer> observable = Observable.range(1, 10);

        Maybe<Integer>  maybeObservable =  observable.reduce(new BiFunction<Integer, Integer, Integer>(){
            @Override
            public Integer apply(Integer t, Integer st) throws Exception {
                return t + st;
            }
        });

        maybeObservable.subscribe(s -> { System.out.println("reduce value  "+s);});























22

Operator - Interval



Interval

create an Observable that emits a sequence of integers spaced by a given time interval


The Interval operator returns an Observable that emits an infinite sequence of ascending integers, with a constant interval of time of your choosing between emissions.


--------------------------------------------------


---------------------------------------------------































22

Operator - Ignore elements




IgnoreElements

do not emit any items from an Observable but mirror its termination notification



The IgnoreElements operator suppresses all of the items emitted by the source Observable, but allows its termination notification (either onError or onCompleted) to pass through unchanged.
If you do not care about the items being emitted by an Observable, but you do want to be notified when it completes or when it terminates with an error, you can apply the ignoreElements operator to the Observable, which will ensure that it will never call its observers’ onNext handlers.

---------------------------------------------------------------------------

Observable.just(list2).ignoreElements()
.subscribe(System.out::println);

--------------------------------------------------------------------------



















22

Operator - GroupBy



GroupBy

divide an Observable into a set of Observables that each emit a different subset of items from the original Observable


The GroupBy operator divides an Observable that emits items into an Observable that emits Observables, each one of which emits some subset of the items from the original source Observable. Which items end up on which Observable is typically decided by a discriminating function that evaluates each item and assigns it a key. All items with the same key are emitted by the same Observable.

-----------------------------------------------------------------------------------------

        Observable
                .just(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
                .doOnNext(item -> System.out.println("source emitting " + item))
                .groupBy(item -> {
                    System.out.println("groupBy called for " + item);
                    return item % 3;
                })
                .subscribe(observable -> {
                    System.out.println("got observable " + observable + " for key " + observable.getKey());
                    observable.subscribe(item -> {
                        System.out.println("key " + observable.getKey() + ", item " + item);
                    });

                });


-----------------------------------------------------------------------------------------






























22

Operator - from



From

convert various other objects and data types into Observables


When you work with Observables, it can be more convenient if all of the data you mean to work with can be represented as Observables, rather than as a mixture of Observables and other types. This allows you to use a single set of operators to govern the entire lifespan of the data stream.
Iterables, for example, can be thought of as a sort of synchronous Observable; Futures, as a sort of Observable that always emits only a single item. By explicitly converting such objects to Observables, you allow them to interact as peers with other Observables.
For this reason, most ReactiveX implementations have methods that allow you to convert certain language-specific objects and data structures into Observables.

--> fromIterable
------------------------------------------------
List<Integer> list2 = new ArrayList<>();
list2.add(1);
list2.add(2);
list2.add(7);
list2.add(8);
    
Observable.fromIterable(list2).subscribe(System.out::println);
-----------------------------------------------

--> fromArray

---------------------------------------------

Observable.fromArray(list2).subscribe(System.out::println);

---------------------------------------------
























22

Thursday, 18 January 2018

Operator - Flatmap



FlatMap

transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable




The FlatMap operator transforms an Observable by applying a function that you specify to each item emitted by the source Observable, where that function returns an Observable that itself emits items. FlatMap then merges the emissions of these resulting Observables, emitting these merged results as its own sequence.
This method is useful, for example, when you have an Observable that emits a series of items that themselves have Observable members or are in other ways transformable into Observables, so that you can create a new Observable that emits the complete collection of items emitted by the sub-Observables of these items.
Note that FlatMap merges the emissions of these Observables, so that they may interleave.
In several of the language-specific implementations there is also an operator that does not interleave the emissions from the transformed Observables, but instead emits these emissions in strict order, often called ConcatMap or something similar.


-----------------------------------------------------------------------------
List<String> list = new ArrayList<>();
list.add("List-1");
list.add("List-2");
list.add("List-3");
list.add("List-4");
list.add("List-5");

final TestScheduler scheduler = new TestScheduler();
Observable.fromIterable(list)
.flatMap( s -> {
final int delay = new Random().nextInt(10);
return Observable.just(s + "x")
.delay(delay, TimeUnit.SECONDS, scheduler);
})
.toList()
.subscribe(System.out::println);

scheduler.advanceTimeBy(1, TimeUnit.MINUTES);

----------------------------------------------------------------------------

FlatmapMaybe

-----------------------------------------------------------------------------

        Observable.range(1, 10)
                .flatMapMaybe(new Function<Integer, MaybeSource<?>>() {
                    @Override
                    public MaybeSource<?> apply(Integer integer) throws Exception {
                        return Maybe.just(integer);
                    }
                })
                .subscribe(System.out::println);

-----------------------------------------------------------------------------














22

Calling method Sequencely

1. Execute multiple task sequencly (WAY-1) ------------------------------------------------------------------------------ import io.re...