官术网_书友最值得收藏!

Observer event calls

What we discussed so far is very high-level information about how we can use Observable in RxJava. It basically pushes (emits) the items (data or events) of a given type through a series of operators (if defined) until it reaches  Observer. Let's dig into more details to understand which mechanism works under the hood between this interaction and how RxJava complies with Reactive Streams specifications.

Observable interacts with Observers through the following event calls:

  • onNext: This is the call from where data/events are being sent, one at a time, down to all registered Observers.
  • onComplete: This event is used to signal completion of communication to all. 
  •  Observers: It simply denotes no more onNext calls happen.
  • onError: In case any error occurs before an onComplete() call, an onError() event is used to signal the error from Observable to Observers. Observable will stop emitting the data and Observers will handle the error.

These events are defined as an abstract method in the Observer type, and we will see the implementation type later in this chapter. First let's see how these event calls happen during the interaction with the following code:

public class RxJavaCreateDemo {

public static void main(String[] args) {
Observable<String> daysOfWeek = Observable.create(
sourceEmitter -> {
try {
sourceEmitter.onNext("Sunday");
sourceEmitter.onNext("Monday");
sourceEmitter.onNext("Tuesday");
sourceEmitter.onNext("Wednesday");
sourceEmitter.onNext("Thursday");
sourceEmitter.onNext("Friday");
sourceEmitter.onNext("Saturday");
sourceEmitter.onComplete();
}catch(Exception e) {
sourceEmitter.onError(e);
}
});
Observable<String> daysInUpperCase= daysOfWeek.map(day->day.toUpperCase())
.filter(day->day.startsWith("S"));
daysInUpperCase.subscribe(day->System.out.println("Day is -->"+day));
}
}

 Observable.create() is a factory method and used to create Observable with the emitter. The onNext() method of the emitter is used to emit (send) the data/events (one at a time) to the Observable chain (and finally to registered Observers). The onComplete() method is used to terminate further communication.

If you try to make an onNext() call after onComplete(), the data will not be transmitted. In case any error occurs, the onError() method is called. It is used to push up the error to the Observable chain where it is handled by Observer. In this code, there is no chance of any exception, but you can handle any error with onError().

We have used the map and filter operators to refine the data to uppercase and starting with D respectively. Finally, they are printed by Observer. The flow of data will happen from onNext()mapfilterObserver.  Each operator will return new Observable class in the chain. 

You notice that in the first example we used the Observable.just() method to emit the data. It internally invokes the onNext() method for each of the values pushed. On getting the last value, it will call onComplete(). So Observable.just() is equivalent to Observable.create() calling onNext() on each data and onComplete() on last one. The create() method is generally used for sources that are not reactive in nature. 

主站蜘蛛池模板: 新平| 黔西| 雅江县| 西畴县| 太保市| 通州区| 临夏市| 麻栗坡县| 志丹县| 八宿县| 陆丰市| 颍上县| 白城市| 西盟| 黄平县| 蒲江县| 庆元县| 南和县| 霍州市| 禹州市| 城步| 崇州市| 康平县| 色达县| 互助| 科技| 惠来县| 静海县| 沁阳市| 福贡县| 台东市| 抚松县| 营山县| 莱芜市| 社旗县| 万安县| 崇仁县| 缙云县| 闽侯县| 辰溪县| 建阳市|