Introduction
Each language-specific implementation of ReactiveX implements a set of operators. Although there is much overlap between implementations, there are also some operators that are only implemented in certain implementations. Also, each implementation tends to name its operators to resemble those of similar methods that are already familiar from other contexts in that language.
Chaining Operators
Most operators operate on an Observable and return an Observable. This allows you to apply these operators one after the other, in a chain. Each operator in the chain modifies the Observable that results from the operation of the previous operator.
There are other patterns, like the Builder Pattern, in which a variety of methods of a particular class operate on an item of that same class by modifying that object through the operation of the method. These patterns also allow you to chain the methods in a similar way. But while in the Builder Pattern, the order in which the methods appear in the chain does not usually matter, with the Observable operators order matters.
A chain of Observable operators do not operate independently on the original Observable that originates the chain, but they operate in turn, each one operating on the Observable generated by the operator immediately previous in the chain.
The Operators of ReactiveX
This page first lists what could be considered the “core” operators in ReactiveX, and links to pages that have more in-depth information on how these operators work and how particular language-specific ReactiveX versions have implemented these operators.
Next is a “decision tree” that may help you choose the operator that is most appropriate to your use case.
Finally, there is an alphabetical list of most of the operators available in the many language-specific implementations of ReactiveX. These link to the page that documents the core operator that most closely resembles the language-specific operator (so, for instance, the Rx.NET “SelectMany” operator links to the documentation of the FlatMap ReactiveX operator, of which “SelectMany” is the Rx.NET implementation).
If you want to implement your own operator, see Implementing Your Own Operators.
Contents
- Operators By Category
- A Decision Tree of Observable Operators
- An Alphabetical List of Observable Operators
Operators By Category
Creating Observables
Operators that originate new Observables.
Create
— create an Observable from scratch by calling observer methods programmaticallyDefer
— do not create the Observable until the observer subscribes, and create a fresh Observable for each observerEmpty
/Never
/Throw
— create Observables that have very precise and limited behaviorFrom
— convert some other object or data structure into an ObservableInterval
— create an Observable that emits a sequence of integers spaced by a particular time intervalJust
— convert an object or a set of objects into an Observable that emits that or those objectsRange
— create an Observable that emits a range of sequential integersRepeat
— create an Observable that emits a particular item or sequence of items repeatedlyStart
— create an Observable that emits the return value of a functionTimer
— create an Observable that emits a single item after a given delay
Transforming Observables
Operators that transform items that are emitted by an Observable.
Buffer
— periodically gather items from an Observable into bundles and emit these bundles rather than emitting the items one at a timeFlatMap
— transform the items emitted by an Observable into Observables, then flatten the emissions from those into a single ObservableGroupBy
— divide an Observable into a set of Observables that each emit a different group of items from the original Observable, organized by keyMap
— transform the items emitted by an Observable by applying a function to each itemScan
— apply a function to each item emitted by an Observable, sequentially, and emit each successive valueWindow
— periodically subdivide items from an Observable into Observable windows and emit these windows rather than emitting the items one at a time
Filtering Observables
Operators that selectively emit items from a source Observable.
Debounce
— only emit an item from an Observable if a particular timespan has passed without it emitting another itemDistinct
— suppress duplicate items emitted by an ObservableElementAt
— emit only item n emitted by an ObservableFilter
— emit only those items from an Observable that pass a predicate testFirst
— emit only the first item, or the first item that meets a condition, from an ObservableIgnoreElements
— do not emit any items from an Observable but mirror its termination notificationLast
— emit only the last item emitted by an ObservableSample
— emit the most recent item emitted by an Observable within periodic time intervalsSkip
— suppress the first n items emitted by an ObservableSkipLast
— suppress the last n items emitted by an ObservableTake
— emit only the first n items emitted by an ObservableTakeLast
— emit only the last n items emitted by an Observable
Combining Observables
Operators that work with multiple source Observables to create a single Observable
And
/Then
/When
— combine sets of items emitted by two or more Observables by means ofPattern
andPlan
intermediariesCombineLatest
— when an item is emitted by either of two Observables, combine the latest item emitted by each Observable via a specified function and emit items based on the results of this functionJoin
— combine items emitted by two Observables whenever an item from one Observable is emitted during a time window defined according to an item emitted by the other ObservableMerge
— combine multiple Observables into one by merging their emissionsStartWith
— emit a specified sequence of items before beginning to emit the items from the source ObservableSwitch
— convert an Observable that emits Observables into a single Observable that emits the items emitted by the most-recently-emitted of those ObservablesZip
— combine the emissions of multiple Observables together via a specified function and emit single items for each combination based on the results of this function
Error Handling Operators
Operators that help to recover from error notifications from an Observable
Catch
— recover from anonError
notification by continuing the sequence without errorRetry
— if a source Observable sends anonError
notification, resubscribe to it in the hopes that it will complete without error
Observable Utility Operators
A toolbox of useful Operators for working with Observables
Delay
— shift the emissions from an Observable forward in time by a particular amountDo
— register an action to take upon a variety of Observable lifecycle eventsMaterialize
/Dematerialize
— represent both the items emitted and the notifications sent as emitted items, or reverse this processObserveOn
— specify the scheduler on which an observer will observe this ObservableSerialize
— force an Observable to make serialized calls and to be well-behavedSubscribe
— operate upon the emissions and notifications from an ObservableSubscribeOn
— specify the scheduler an Observable should use when it is subscribed toTimeInterval
— convert an Observable that emits items into one that emits indications of the amount of time elapsed between those emissionsTimeout
— mirror the source Observable, but issue an error notification if a particular period of time elapses without any emitted itemsTimestamp
— attach a timestamp to each item emitted by an ObservableUsing
— create a disposable resource that has the same lifespan as the Observable
Conditional and Boolean Operators
Operators that evaluate one or more Observables or items emitted by Observables
All
— determine whether all items emitted by an Observable meet some criteriaAmb
— given two or more source Observables, emit all of the items from only the first of these Observables to emit an itemContains
— determine whether an Observable emits a particular item or notDefaultIfEmpty
— emit items from the source Observable, or a default item if the source Observable emits nothingSequenceEqual
— determine whether two Observables emit the same sequence of itemsSkipUntil
— discard items emitted by an Observable until a second Observable emits an itemSkipWhile
— discard items emitted by an Observable until a specified condition becomes falseTakeUntil
— discard items emitted by an Observable after a second Observable emits an item or terminatesTakeWhile
— discard items emitted by an Observable after a specified condition becomes false
Mathematical and Aggregate Operators
Operators that operate on the entire sequence of items emitted by an Observable
Average
— calculates the average of numbers emitted by an Observable and emits this averageConcat
— emit the emissions from two or more Observables without interleaving themCount
— count the number of items emitted by the source Observable and emit only this valueMax
— determine, and emit, the maximum-valued item emitted by an ObservableMin
— determine, and emit, the minimum-valued item emitted by an ObservableReduce
— apply a function to each item emitted by an Observable, sequentially, and emit the final valueSum
— calculate the sum of numbers emitted by an Observable and emit this sum
Backpressure Operators
- backpressure operators — strategies for coping with Observables that produce items more rapidly than their observers consume them
Connectable Observable Operators
Specialty Observables that have more precisely-controlled subscription dynamics
Connect
— instruct a connectable Observable to begin emitting items to its subscribersPublish
— convert an ordinary Observable into a connectable ObservableRefCount
— make a Connectable Observable behave like an ordinary ObservableReplay
— ensure that all observers see the same sequence of emitted items, even if they subscribe after the Observable has begun emitting items
Operators to Convert Observables
To
— convert an Observable into another object or data structure
A Decision Tree of Observable Operators
This tree can help you find the ReactiveX Observable operator you’re looking for.
- I want to create a new Observable
-
- that emits a particular item
- Just
- that pulls its emissions from a particular
Array
,Iterable
, or something like that - From
- by retrieving it from a Future
- Start
- that obtains its sequence from a Future
- From
- that emits a sequence of items repeatedly
- Repeat
- from scratch, with custom logic
- Create
- for each observer that subscribes
- Defer
- that emits a sequence of integers
- Range
- that completes without emitting items
- Empty
- that does nothing at all
- Never
- I want to create an Observable by combining other Observables
-
- and emitting all of the items from all of the Observables in whatever order they are received
- Merge
- and emitting all of the items from all of the Observables, one Observable at a time
- Concat
- by combining the items from two or more Observables sequentially to come up with new items to emit
-
- whenever each of the Observables has emitted a new item
- Zip
- whenever any of the Observables has emitted a new item
- CombineLatest
- whenever an item is emitted by one Observable in a window defined by an item emitted by another
- Join
- by means of
Pattern
andPlan
intermediaries - And/Then/When
- and emitting the items from only the most-recently emitted of those Observables
- Switch
- I want to emit the items from an Observable after transforming them
-
- one at a time with a function
- Map
- by emitting all of the items emitted by corresponding Observables
- FlatMap
-
- one Observable at a time, in the order they are emitted
- ConcatMap
- based on all of the items that preceded them
- Scan
- by attaching a timestamp to them
- Timestamp
- into an indicator of the amount of time that lapsed before the emission of the item
- TimeInterval
- I want to shift the items emitted by an Observable forward in time before reemitting them
- Delay
- I want to transform items and notifications from an Observable into items and reemit them
-
- by wrapping them in
Notification
objects - Materialize
-
- which I can then unwrap again with
- Dematerialize
- by wrapping them in
- I want to ignore all items emitted by an Observable and only pass along its completed/error notification
- IgnoreElements
- I want to mirror an Observable but prefix items to its sequence
- StartWith
-
- only if its sequence is empty
- DefaultIfEmpty
- I want to collect items from an Observable and reemit them as buffers of items
- Buffer
-
- containing only the last items emitted
- TakeLastBuffer
- I want to split one Observable into multiple Observables
- Window
-
- so that similar items end up on the same Observable
- GroupBy
- I want to retrieve a particular item emitted by an Observable:
- I want to reemit only certain items from an Observable
-
- by filtering out those that do not match some predicate
- Filter
- that is, only the first item
- First
- that is, only the first items
- Take
- that is, only the last item
- Last
- that is, only item n
- ElementAt
- that is, only those items after the first items
- that is, those items except the last items
- by sampling the Observable periodically
- Sample
- by only emitting items that are not followed by other items within some duration
- Debounce
- by suppressing items that are duplicates of already-emitted items
- Distinct
-
- if they immediately follow the item they are duplicates of
- DistinctUntilChanged
- by delaying my subscription to it for some time after it begins emitting items
- DelaySubscription
- I want to reemit items from an Observable only on condition that it was the first of a collection of Observables to emit an item
- Amb
- I want to evaluate the entire sequence of items emitted by an Observable
-
- and emit a single boolean indicating if all of the items pass some test
- All
- and emit a single boolean indicating if the Observable emitted any item (that passes some test)
- Contains
- and emit a single boolean indicating if the Observable emitted no items
- IsEmpty
- and emit a single boolean indicating if the sequence is identical to one emitted by a second Observable
- SequenceEqual
- and emit the average of all of their values
- Average
- and emit the sum of all of their values
- Sum
- and emit a number indicating how many items were in the sequence
- Count
- and emit the item with the maximum value
- Max
- and emit the item with the minimum value
- Min
- by applying an aggregation function to each item in turn and emitting the result
- Scan
- I want to convert the entire sequence of items emitted by an Observable into some other data structure
- To
- I want an operator to operate on a particular Scheduler
- SubscribeOn
-
- when it notifies observers
- ObserveOn
- I want an Observable to invoke a particular action when certain events occur
- Do
- I want an Observable that will notify observers of an error
- Throw
-
- if a specified period of time elapses without it emitting an item
- Timeout
- I want an Observable to recover gracefully
- I want to create a resource that has the same lifespan as the Observable
- Using
- I want to subscribe to an Observable and receive a
Future
that blocks until the Observable completes - Start
- I want an Observable that does not start emitting items to subscribers until asked
- Publish
-
- and then only emits the last item in its sequence
- PublishLast
- and then emits the complete sequence, even to those who subscribe after the sequence has begun
- Replay
- but I want it to go away once all of its subscribers unsubscribe
- RefCount
- and then I want to ask it to start
- Connect
See Also
- Which Operator do I use? by Dennis Stoyanov (a similar decision tree, specific to RxJS operators)
An Alphabetical List of Observable Operators
Canonical, core operator names are in boldface. Other entries represent language-specific variants of these operators or specialty operators outside of the main ReactiveX core set of operators.
Aggregate
All
Amb
ambArray
ambWith
and_
And
Any
apply
as_blocking
asObservable
AssertEqual
asyncAction
asyncFunc
Average
averageDouble
averageFloat
averageInteger
averageLong
blocking
blockingFirst
blockingForEach
blockingIterable
blockingLast
blockingLatest
blockingMostRecent
blockingNext
blockingSingle
blockingSubscribe
Buffer
bufferWithCount
bufferWithTime
bufferWithTimeOrCount
byLine
cache
cacheWithInitialCapacity
case
Cast
Catch
catchError
catchException
collect
collect
(RxScala version ofFilter
)collectInto
CombineLatest
combineLatestDelayError
combineLatestWith
Concat
concat_all
concatAll
concatArray
concatArrayDelayError
concatArrayEager
concatDelayError
concatEager
concatMap
concatMapDelayError
concatMapEager
concatMapEagerDelayError
concatMapIterable
concatMapObserver
concatMapTo
concatWith
Connect
connect_forever
cons
Contains
controlled
Count
countLong
Create
cycle
Debounce
decode
DefaultIfEmpty
Defer
deferFuture
Delay
delaySubscription
delayWithSelector
Dematerialize
Distinct
distinctKey
distinctUntilChanged
distinctUntilKeyChanged
Do
doAction
doAfterTerminate
doOnComplete
doOnCompleted
doOnDispose
doOnEach
doOnError
doOnLifecycle
doOnNext
doOnRequest
doOnSubscribe
doOnTerminate
doOnUnsubscribe
doseq
doWhile
drop
dropRight
dropUntil
dropWhile
ElementAt
ElementAtOrDefault
Empty
emptyObservable
empty?
encode
ensures
error
every
exclusive
exists
expand
failWith
Filter
filterNot
Finally
finallyAction
finallyDo
find
findIndex
First
firstElement
FirstOrDefault
firstOrElse
FlatMap
flatMapFirst
flatMapIterable
flatMapIterableWith
flatMapLatest
flatMapObserver
flatMapWith
flatMapWithMaxConcurrent
flat_map_with_index
flatten
flattenDelayError
foldl
foldLeft
for
forall
ForEach
forEachFuture
forEachWhile
forIn
forkJoin
From
fromAction
fromArray
FromAsyncPattern
fromCallable
fromCallback
FromEvent
FromEventPattern
fromFunc0
fromFuture
fromIterable
fromIterator
from_list
fromNodeCallback
fromPromise
fromPublisher
fromRunnable
Generate
generateWithAbsoluteTime
generateWithRelativeTime
generator
GetEnumerator
getIterator
GroupBy
GroupByUntil
GroupJoin
head
headOption
headOrElse
if
ifThen
IgnoreElements
indexOf
interleave
interpose
Interval
intervalRange
into
isEmpty
items
Join
join
(string)jortSort
jortSortUntil
Just
keep
keep-indexed
Last
lastElement
lastOption
LastOrDefault
lastOrElse
Latest
latest
(Rx.rb version ofSwitch
)length
let
letBind
lift
limit
LongCount
ManySelect
Map
map
(RxClojure version ofZip
)MapCat
mapCat
(RxClojure version ofZip
)map-indexed
mapTo
mapWithIndex
Materialize
Max
MaxBy
Merge
mergeAll
mergeArray
mergeArrayDelayError
merge_concurrent
mergeDelayError
mergeObservable
mergeWith
Min
MinBy
MostRecent
Multicast
multicastWithSelector
nest
Never
Next
Next
(BlockingObservable version)none
nonEmpty
nth
ObserveOn
ObserveOnDispatcher
observeSingleOn
of
of_array
ofArrayChanges
of_enumerable
of_enumerator
ofObjectChanges
OfType
ofWithScheduler
onBackpressureBlock
onBackpressureBuffer
onBackpressureDrop
OnErrorResumeNext
onErrorReturn
onErrorReturnItem
onExceptionResumeNext
onTerminateDetach
orElse
pairs
pairwise
partition
partition-all
pausable
pausableBuffered
pluck
product
Publish
PublishLast
publish_synchronized
publishValue
raise_error
Range
Reduce
reduceWith
reductions
RefCount
Repeat
repeat_infinitely
repeatUntil
repeatWhen
Replay
rescue_error
rest
Retry
retry_infinitely
retryUntil
retryWhen
Return
returnElement
returnValue
runAsync
safeSubscribe
Sample
Scan
scanWith
scope
Select
(alternate name ofMap
)select
(alternate name ofFilter
)selectConcat
selectConcatObserver
SelectMany
selectManyObserver
select_switch
selectSwitch
selectSwitchFirst
selectWithMaxConcurrent
select_with_index
seq
SequenceEqual
sequence_eql?
SequenceEqualWith
Serialize
share
shareReplay
shareValue
Single
singleElement
SingleOrDefault
singleOption
singleOrElse
size
Skip
SkipLast
skipLastWithTime
SkipUntil
skipUntilWithTime
SkipWhile
skipWhileWithIndex
skip_with_time
slice
sliding
slidingBuffer
some
sort
sorted
sort-by
sorted-list-by
split
split-with
Start
startAsync
startFuture
StartWith
startWithArray
stringConcat
stopAndWait
subscribe
subscribeActual
SubscribeOn
SubscribeOnDispatcher
subscribeOnCompleted
subscribeOnError
subscribeOnNext
subscribeWith
Sum
sumDouble
sumFloat
sumInteger
sumLong
Switch
switchCase
switchIfEmpty
switchLatest
switchMap
switchMapDelayError
switchOnNext
switchOnNextDelayError
Synchronize
Take
take_with_time
takeFirst
TakeLast
takeLastBuffer
takeLastBufferWithTime
takeLastWithTime
takeRight
(see also:TakeLast
)TakeUntil
takeUntilWithTime
TakeWhile
takeWhileWithIndex
tail
tap
tapOnCompleted
tapOnError
tapOnNext
Then
thenDo
Throttle
throttleFirst
throttleLast
throttleWithSelector
throttleWithTimeout
Throw
throwError
throwException
TimeInterval
Timeout
timeoutWithSelector
Timer
Timestamp
To
to_a
ToArray
ToAsync
toBlocking
toBuffer
to_dict
ToDictionary
ToEnumerable
ToEvent
ToEventPattern
ToFlowable
ToFuture
to_h
toIndexedSeq
toIterable
toIterator
ToList
ToLookup
toMap
toMultiMap
ToObservable
toSet
toSortedList
toStream
ToTask
toTraversable
toVector
tumbling
tumblingBuffer
unsafeCreate
unsubscribeOn
Using
When
Where
while
whileDo
Window
windowWithCount
windowWithTime
windowWithTimeOrCount
windowed
withFilter
withLatestFrom
Zip
zipArray
zipIterable
zipWith
zipWithIndex
++
+:
:+
No comments:
Post a Comment