官术网_书友最值得收藏!

Observer event calls

What we discussed so far is very high-level information about how we can use Observable in RxJava. It basically pushes (emits) the items (data or events) of a given type through a series of operators (if defined) until it reaches  Observer. Let's dig into more details to understand which mechanism works under the hood between this interaction and how RxJava complies with Reactive Streams specifications.

Observable interacts with Observers through the following event calls:

  • onNext: This is the call from where data/events are being sent, one at a time, down to all registered Observers.
  • onComplete: This event is used to signal completion of communication to all. 
  •  Observers: It simply denotes no more onNext calls happen.
  • onError: In case any error occurs before an onComplete() call, an onError() event is used to signal the error from Observable to Observers. Observable will stop emitting the data and Observers will handle the error.

These events are defined as an abstract method in the Observer type, and we will see the implementation type later in this chapter. First let's see how these event calls happen during the interaction with the following code:

public class RxJavaCreateDemo {

public static void main(String[] args) {
Observable<String> daysOfWeek = Observable.create(
sourceEmitter -> {
try {
sourceEmitter.onNext("Sunday");
sourceEmitter.onNext("Monday");
sourceEmitter.onNext("Tuesday");
sourceEmitter.onNext("Wednesday");
sourceEmitter.onNext("Thursday");
sourceEmitter.onNext("Friday");
sourceEmitter.onNext("Saturday");
sourceEmitter.onComplete();
}catch(Exception e) {
sourceEmitter.onError(e);
}
});
Observable<String> daysInUpperCase= daysOfWeek.map(day->day.toUpperCase())
.filter(day->day.startsWith("S"));
daysInUpperCase.subscribe(day->System.out.println("Day is -->"+day));
}
}

 Observable.create() is a factory method and used to create Observable with the emitter. The onNext() method of the emitter is used to emit (send) the data/events (one at a time) to the Observable chain (and finally to registered Observers). The onComplete() method is used to terminate further communication.

If you try to make an onNext() call after onComplete(), the data will not be transmitted. In case any error occurs, the onError() method is called. It is used to push up the error to the Observable chain where it is handled by Observer. In this code, there is no chance of any exception, but you can handle any error with onError().

We have used the map and filter operators to refine the data to uppercase and starting with D respectively. Finally, they are printed by Observer. The flow of data will happen from onNext()mapfilterObserver.  Each operator will return new Observable class in the chain. 

You notice that in the first example we used the Observable.just() method to emit the data. It internally invokes the onNext() method for each of the values pushed. On getting the last value, it will call onComplete(). So Observable.just() is equivalent to Observable.create() calling onNext() on each data and onComplete() on last one. The create() method is generally used for sources that are not reactive in nature. 

主站蜘蛛池模板: 东源县| 华容县| 怀安县| 长岭县| 金山区| 庄河市| 阳山县| 剑河县| 阜阳市| 曲水县| 沁水县| 鱼台县| 伊宁市| 平原县| 舟曲县| 呼玛县| 喜德县| 青河县| 互助| 玉田县| 廊坊市| 永清县| 洪湖市| 绥阳县| 广河县| 益阳市| 南部县| 雷州市| 建水县| 无棣县| 南皮县| 永福县| 公安县| 定襄县| 区。| 通渭县| 长宁县| 陆良县| 鹿邑县| 长子县| 利津县|