Tuesday, 8 May 2018

Calling method Sequencely


1. Execute multiple task sequencly (WAY-1)

------------------------------------------------------------------------------

import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
public class TestClass2 {

    public static void main(String[] args){

        Observable<String> myObservable = Observable.create(
                new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> sub) throws Exception {
                // "Emit" any data to the subscriber
                sub.onNext(showData("a"));
                sub.onNext(showData("b"));
                sub.onNext(showData("c"));
                // Trigger the completion of the event
                sub.onComplete();
            }
        });
        myObservable.subscribe();
    }

    public static String showData(String msg){
        System.out.println(msg);

        return msg;
    }

}

-------------------------------------------------------------------------------

2. Execute multiple task sequencly (WAY-2)

--------------------------------------------------------------------------------

import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;

public class TestClass3 {

    public static void main(String[] args){

        Observer<String> mySubscriber = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable disposable) {
                System.out.println("onSubscribe");
            }

            @Override
            public void onNext(String s) {
                System.out.println("onNext: " + s);
            }

            @Override
            public void onError(Throwable throwable) {
                System.out.println("Error");
            }

            @Override
            public void onComplete() {
                System.out.println("done!");
            }
        };

        mySubscriber.onNext("a");
        mySubscriber.onNext("b");
        mySubscriber.onNext("c");

        mySubscriber.onComplete();

    }
}

--------------------------------------------------------------------------------




















Friday, 19 January 2018

Temp-4



   Observable<Object> observableOne = Observable.just("java", "spring", "hibernate", 1, 2, 3);
        Observable<Integer> observableFinal = observableOne.ofType(Integer.class);
//        OR
//        Observable<String> observableFinal = observableOne.ofType(String.class); //

        observableFinal.subscribe(s -> {
            System.out.print(s);
        });
----------------------------------------------------------------------------

        Observable<String> observableOne = Observable.just("java", "spring", "hibernate", "android", "rxjava");
        ObservableSource<String> observableTwo = Observable.just("c", "c plus", "perl", "c sharp", "dot net");

        Observable<String> observableFinal = observableOne.mergeWith(observableTwo);

        observableFinal.subscribe(s -> {
            System.out.print(s);
        });

-------------------------------------------------------------------------------

        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                String s = null;
                e.onNext("STRUTS");
                e.onNext("SPRING");
                e.onNext("TILES");
                e.onNext("JSTL");
                e.onComplete();
            }
        });
        ConnectableObservable<String> publishObservable = observable.publish();

        publishObservable.subscribe(s -> { System.out.println("subscriber one  "+s);});
        publishObservable.subscribe(s -> { System.out.println("subscriber two  "+s);});
        publishObservable.connect();
        publishObservable.subscribe(s -> { System.out.println("subscriber three  "+s);});

--------------------------------------------------------------------------------------

        Observable<Integer> observable = Observable.range(1, 10);

        Maybe<Integer>  maybeObservable =  observable.reduce(new BiFunction<Integer, Integer, Integer>(){
            @Override
            public Integer apply(Integer t, Integer st) throws Exception {
                return t + st;
            }
        });

        maybeObservable.subscribe(s -> { System.out.println("reduce value  "+s);});























22

Operator - Interval



Interval

create an Observable that emits a sequence of integers spaced by a given time interval


The Interval operator returns an Observable that emits an infinite sequence of ascending integers, with a constant interval of time of your choosing between emissions.


--------------------------------------------------


---------------------------------------------------































22

Operator - Ignore elements




IgnoreElements

do not emit any items from an Observable but mirror its termination notification



The IgnoreElements operator suppresses all of the items emitted by the source Observable, but allows its termination notification (either onError or onCompleted) to pass through unchanged.
If you do not care about the items being emitted by an Observable, but you do want to be notified when it completes or when it terminates with an error, you can apply the ignoreElements operator to the Observable, which will ensure that it will never call its observers’ onNext handlers.

---------------------------------------------------------------------------

Observable.just(list2).ignoreElements()
.subscribe(System.out::println);

--------------------------------------------------------------------------



















22

Operator - GroupBy



GroupBy

divide an Observable into a set of Observables that each emit a different subset of items from the original Observable


The GroupBy operator divides an Observable that emits items into an Observable that emits Observables, each one of which emits some subset of the items from the original source Observable. Which items end up on which Observable is typically decided by a discriminating function that evaluates each item and assigns it a key. All items with the same key are emitted by the same Observable.

-----------------------------------------------------------------------------------------

        Observable
                .just(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
                .doOnNext(item -> System.out.println("source emitting " + item))
                .groupBy(item -> {
                    System.out.println("groupBy called for " + item);
                    return item % 3;
                })
                .subscribe(observable -> {
                    System.out.println("got observable " + observable + " for key " + observable.getKey());
                    observable.subscribe(item -> {
                        System.out.println("key " + observable.getKey() + ", item " + item);
                    });

                });


-----------------------------------------------------------------------------------------






























22

Operator - from



From

convert various other objects and data types into Observables


When you work with Observables, it can be more convenient if all of the data you mean to work with can be represented as Observables, rather than as a mixture of Observables and other types. This allows you to use a single set of operators to govern the entire lifespan of the data stream.
Iterables, for example, can be thought of as a sort of synchronous Observable; Futures, as a sort of Observable that always emits only a single item. By explicitly converting such objects to Observables, you allow them to interact as peers with other Observables.
For this reason, most ReactiveX implementations have methods that allow you to convert certain language-specific objects and data structures into Observables.

--> fromIterable
------------------------------------------------
List<Integer> list2 = new ArrayList<>();
list2.add(1);
list2.add(2);
list2.add(7);
list2.add(8);
    
Observable.fromIterable(list2).subscribe(System.out::println);
-----------------------------------------------

--> fromArray

---------------------------------------------

Observable.fromArray(list2).subscribe(System.out::println);

---------------------------------------------
























22

Thursday, 18 January 2018

Operator - Flatmap



FlatMap

transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable




The FlatMap operator transforms an Observable by applying a function that you specify to each item emitted by the source Observable, where that function returns an Observable that itself emits items. FlatMap then merges the emissions of these resulting Observables, emitting these merged results as its own sequence.
This method is useful, for example, when you have an Observable that emits a series of items that themselves have Observable members or are in other ways transformable into Observables, so that you can create a new Observable that emits the complete collection of items emitted by the sub-Observables of these items.
Note that FlatMap merges the emissions of these Observables, so that they may interleave.
In several of the language-specific implementations there is also an operator that does not interleave the emissions from the transformed Observables, but instead emits these emissions in strict order, often called ConcatMap or something similar.


-----------------------------------------------------------------------------
List<String> list = new ArrayList<>();
list.add("List-1");
list.add("List-2");
list.add("List-3");
list.add("List-4");
list.add("List-5");

final TestScheduler scheduler = new TestScheduler();
Observable.fromIterable(list)
.flatMap( s -> {
final int delay = new Random().nextInt(10);
return Observable.just(s + "x")
.delay(delay, TimeUnit.SECONDS, scheduler);
})
.toList()
.subscribe(System.out::println);

scheduler.advanceTimeBy(1, TimeUnit.MINUTES);

----------------------------------------------------------------------------

FlatmapMaybe

-----------------------------------------------------------------------------

        Observable.range(1, 10)
                .flatMapMaybe(new Function<Integer, MaybeSource<?>>() {
                    @Override
                    public MaybeSource<?> apply(Integer integer) throws Exception {
                        return Maybe.just(integer);
                    }
                })
                .subscribe(System.out::println);

-----------------------------------------------------------------------------














22

Operator - first




emit only the first item (or the first item that meets some condition) emitted by an Observable


If you are only interested in the first item emitted by an Observable, or the first item that meets some criteria, you can filter the Observable with the First operator.
In some implementations, First is not implemented as a filtering operator that returns an Observable, but as a blocking function that returns a particular item at such time as the source Observable emits that item. In those implementations, if you instead want a filtering operator, you may have better luck with Take(1) or ElementAt(0).
In some implementations there is also a Single operator. It behaves similarly to First except that it waits until the source Observable terminates in order to guarantee that it only emits a single item (otherwise, rather than emitting that item, it terminates with an error). You can use this to not only take the first item from the source Observable but to also guarantee that there was only one item.

---------------------------------------------------

 Observable
.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.first(0)
.subscribe(System.out::println);


----------------------------------------------------






































22

Operator - Filter



Filter

emit only those items from an Observable that pass a predicate test



The Filter operator filters an Observable by only allowing items through that pass a test that you specify in the form of a predicate function.


-----------------------------------------------------------------


//        EVEN
        Observable
                .just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
                .filter(integer -> integer % 2 == 0)
                .subscribe(System.out::println);

//          ODD
        Observable
                .just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
                .filter(integer -> integer % 2 == 1)
                .subscribe(System.out::println);



----------------------------------------------------------------






















22

Operator - elementAt



ElementAt

emit only item n emitted by an Observable

The ElementAt operator pulls an item located at a specified index location in the sequence of items emitted by the source Observable and emits that item as its own sole emission.



-----------------------------------------------------------------------------

        List<String> list = new ArrayList<>();
        list.add("List-1");
        list.add("List-2");
        list.add("List-3");
        list.add("List-4");
        list.add("List-5");


       Observable.fromIterable(list).elementAt(2).subscribe(System.out::println);

-------------------------------------------------------------------------------------------------------------
or Default

Observable.fromIterable(list).elementAt(15, list.get(1)).subscribe(System.out::println);

----------------------------------------------------------------------------------------------------------------
or Error

Observable.fromIterable(list).elementAtOrError(15).subscribe(System.out::println);


-----------------------------------------------------------------------------




























22

Operator - Do




Do

register an action to take upon a variety of Observable lifecycle events





You can register callbacks that ReactiveX will call when certain events take place on an Observable, where those callbacks will be called independently from the normal set of notifications associated with an Observable cascade. There are a variety of operators that various ReactiveX implementations have designed to allow for this.


RxJava 2․x doAfterTerminate doOnComplete doOnDispose doOnEach doOnError doOnLifecycle doOnNext doOnSubscribe doOnTerminate onTerminateDetach


--------------------------------------------------------------------------------------------


       Observable.just("Hello").doOnNext(new Consumer<String>() {
                   @Override
                   public void accept(String s) throws Exception {
                       System.out.println("Consumer Test");
                   }
               })
                .doOnEach(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable disposable) {

                    }

                    @Override
                    public void onNext(String s) {
                        System.out.println("Observer Test");
                    }

                    @Override
                    public void onError(Throwable throwable) {

                    }

                    @Override
                    public void onComplete() {

                    }
                })
               .doOnComplete(new Action() {
                   @Override
                   public void run() throws Exception {
                       System.out.println("Action Test");
                   }
               })

               .subscribe();


--------------------------------------------------------------------------------------------






































22

Operator - Distinct



Distinct

suppress duplicate items emitted by an Observable






The Distinct operator filters an Observable by only allowing items through that have not already been emitted.
In some implementations there are variants that allow you to adjust the criteria by which two items are considered “distinct.” In some, there is a variant of the operator that only compares an item against its immediate predecessor for distinctness, thereby filtering only consecutive duplicate items from the sequence.

-------------------------------------------------------------------------

Observable.just(1,2,3,3,4,5,5).distinct().subscribe(System.out::println);

------------------------------------------------------------------------

















22

Operator - Delay




Delay

shift the emissions from an Observable forward in time by a particular amount




The Delay operator modifies its source Observable by pausing for a particular increment of time (that you specify) before emitting each of the source Observable’s items. This has the effect of shifting the entire sequence of items emitted by the Observable forward in time by that specified increment.

-----------------------------------------------------

Observable
fromIterable(list)
.toList()
.delay(50, TimeUnit.MILLISECONDS)
.subscribe(System.out::println);

----------------------------------------------------


























22

Operator - defaultIfEmpty



DefaultIfEmpty

emit items from the source Observable, or a default item if the source Observable emits nothing




The DefaultIfEmpty operator simply mirrors the source Observable exactly if the source Observable emits any items. If the source Observable terminates normally (with an onComplete) without emitting any items, the Observable returned from DefaultIfEmpty will instead emit a default item of your choosing before it too completes.


----------------------------------------------------

Observable.empty()
                .defaultIfEmpty(0)
                .subscribe(v -> System.out.println(v));

---------------------------------------------------











22

Operator - Debounce



Debounce

only emit an item from an Observable if a particular timespan has passed without it emitting another item



The Debounce operator filters out items emitted by the source Observable that are rapidly followed by another emitted item.


----------------------------------------------------------------------------

Observable.just(1,2,3,4,5).debounce(300, TimeUnit.MILLISECONDS).subscribe(System.out::println);

---------------------------------------------------------------------------
















22

Operator - Count





count the number of items emitted by the source Observable and emit only this value


The Count operator transforms an Observable that emits items into an Observable that emits a single value that represents the number of items emitted by the source Observable.
If the source Observable terminates with an error, Count will pass this error notification along without emitting an item first. If the source Observable does not terminate at all, Count will neither emit an item nor terminate.


---------------------------------------------------------------
List<Integer> list2 = new ArrayList<>();
        list2.add(1);
        list2.add(2);
        list2.add(7);
        list2.add(8);
        list2.add(8);
        list2.add(9);
        list2.add(10);
        list2.add(3);
        list2.add(11);
        list2.add(4);
        list2.add(5);
        list2.add(5);
        list2.add(6);
        list2.add(11);
        list2.add(12);

        Observable.fromIterable(list2).count().subscribe(System.out::println);
---------------------------------------------------------------






















22

Operator - Contains

determine whether an Observable emits a particular item or not


Pass the Contains operator a particular item, and the Observable it returns will emit true if that item is emitted by the source Observable, or false if the source Observable terminates without emitting that item.
A related operator, IsEmpty returns an Observable that emits true if and only if the source Observable completes without emitting any items. It emits false if the source Observable emits an item.



--------------------------------------------------------------------------

 List<Integer> list2 = new ArrayList<>();
 list2.add(1);
 list2.add(2);
 list2.add(3);
 list2.add(4);
 list2.add(5);

 Observable.fromIterable(list2).contains(3).subscribe(System.out::println);

--------------------------------------------------------------------------------

Operator - Concat



emit the emissions from two or more Observables without interleaving them




The Concat operator concatenates the output of multiple Observables so that they act like a single Observable, with all of the items emitted by the first Observable being emitted before any of the items emitted by the second Observable (and so forth, if there are more than two).

Concat waits to subscribe to each additional Observable that you pass to it until the previous Observable completes. Note that because of this, if you try to concatenate a “hot” Observable, that is, one that begins emitting items immediately and before it is subscribed to, Concat will not see, and therefore will not emit, any items that Observable emits before all previous Observables complete and Concat subscribes to the “hot” Observable.
In some ReactiveX implementations there is also a ConcatMap operator (a.k.a. concat_allconcat_map,concatMapObserverforforIn/for_inmapcatselectConcat, or selectConcatObserver) that transforms the items emitted by a source Observable into corresponding Observables and then concatenates the items emitted by each of these Observables in the order in which they are observed and transformed.
The StartWith operator is similar to Concat, but prepends, rather than appends, items or emissions of items to those emitted by a source Observable.
The Merge operator is also similar. It combines the emissions of two or more Observables, but may interleave them, whereas Concat never interleaves the emissions from multiple Observables.


Code - 

-------------------------------------------------------------------

Observable<Integer> seq1 = Observable.range(0, 3);
Observable<Integer> seq2 = Observable.range(10, 3);

Observable.concat(seq1, seq2)
.subscribe(System.out::println);

-------------------------------------------------------------------

concatWith
------------------------------------------------------------------

 Observable<Integer> seq1 = Observable.range(0, 3);
 Observable<Integer> seq2 = Observable.range(10, 3);
 Observable<Integer> seq3 = Observable.just(20);

 seq1.concatWith(seq2)
 .concatWith(seq3)
 .subscribe(System.out::println);

------------------------------------------------------------------














22

Wednesday, 17 January 2018

Operator - CombineLatest




CombineLatest

when an item is emitted by either of two Observables, combine the latest item emitted by each Observable via a specified function and emit items based on the results of this function





The CombineLatest operator behaves in a similar way to Zip, but while Zip emits items only when each of the zipped source Observables have emitted a previously unzipped item, CombineLatest emits an item whenever any of the source Observables emits an item (so long as each of the source Observables has emitted at least one item). When any of the source Observables emits an item, CombineLatest combines the most recently emitted items from each of the other source Observables, using a function you provide, and emits the return value from that function.



Code - 

-------------------------------------------------------------------------------------


        Observable<Integer> intObs = Observable.just(1, 2, 3, 4, 5);
        Observable<String> stringObs = Observable.just("One");
        Observable<String> stringObs2 = Observable.just("Two");

        Observable.combineLatest(intObs, stringObs, stringObs2,
                (i, s, s2) -> "Observable 1: " + i + " " + s + " " + s2)
                .blockingIterable()
                .forEach(System.out::println);

        Observable.combineLatest(stringObs, stringObs2, intObs,
                (s, s2, i) -> "Observable 2: " + i + " " + s + " " + s2)
                .blockingIterable()
                .forEach(System.out::println);


-------------------------------------------------------------------------------------







22

Calling method Sequencely

1. Execute multiple task sequencly (WAY-1) ------------------------------------------------------------------------------ import io.re...