官术网_书友最值得收藏!

  • Spring 5.0 Projects
  • Nilang Patel
  • 483字
  • 2021-07-02 12:35:03

Observer event calls

What we discussed so far is very high-level information about how we can use Observable in RxJava. It basically pushes (emits) the items (data or events) of a given type through a series of operators (if defined) until it reaches  Observer. Let's dig into more details to understand which mechanism works under the hood between this interaction and how RxJava complies with Reactive Streams specifications.

Observable interacts with Observers through the following event calls:

  • onNext: This is the call from where data/events are being sent, one at a time, down to all registered Observers.
  • onComplete: This event is used to signal completion of communication to all. 
  •  Observers: It simply denotes no more onNext calls happen.
  • onError: In case any error occurs before an onComplete() call, an onError() event is used to signal the error from Observable to Observers. Observable will stop emitting the data and Observers will handle the error.

These events are defined as an abstract method in the Observer type, and we will see the implementation type later in this chapter. First let's see how these event calls happen during the interaction with the following code:

public class RxJavaCreateDemo {

public static void main(String[] args) {
Observable<String> daysOfWeek = Observable.create(
sourceEmitter -> {
try {
sourceEmitter.onNext("Sunday");
sourceEmitter.onNext("Monday");
sourceEmitter.onNext("Tuesday");
sourceEmitter.onNext("Wednesday");
sourceEmitter.onNext("Thursday");
sourceEmitter.onNext("Friday");
sourceEmitter.onNext("Saturday");
sourceEmitter.onComplete();
}catch(Exception e) {
sourceEmitter.onError(e);
}
});
Observable<String> daysInUpperCase= daysOfWeek.map(day->day.toUpperCase())
.filter(day->day.startsWith("S"));
daysInUpperCase.subscribe(day->System.out.println("Day is -->"+day));
}
}

 Observable.create() is a factory method and used to create Observable with the emitter. The onNext() method of the emitter is used to emit (send) the data/events (one at a time) to the Observable chain (and finally to registered Observers). The onComplete() method is used to terminate further communication.

If you try to make an onNext() call after onComplete(), the data will not be transmitted. In case any error occurs, the onError() method is called. It is used to push up the error to the Observable chain where it is handled by Observer. In this code, there is no chance of any exception, but you can handle any error with onError().

We have used the map and filter operators to refine the data to uppercase and starting with D respectively. Finally, they are printed by Observer. The flow of data will happen from onNext()mapfilterObserver.  Each operator will return new Observable class in the chain. 

You notice that in the first example we used the Observable.just() method to emit the data. It internally invokes the onNext() method for each of the values pushed. On getting the last value, it will call onComplete(). So Observable.just() is equivalent to Observable.create() calling onNext() on each data and onComplete() on last one. The create() method is generally used for sources that are not reactive in nature. 

主站蜘蛛池模板: 辽阳县| 福泉市| 梓潼县| 宣恩县| 肥西县| 仙游县| 临邑县| 西平县| 六枝特区| 渭源县| 永新县| 盐城市| 长子县| 邢台市| 青浦区| 会宁县| 虎林市| 昌宁县| 元朗区| 获嘉县| 溆浦县| 聂荣县| 炉霍县| 松溪县| 冀州市| 衡南县| 乌恰县| 兴业县| 新泰市| 海淀区| 玛纳斯县| 揭阳市| 清水县| 台安县| 鄢陵县| 宿州市| 南宁市| 虞城县| 东乌珠穆沁旗| 扶沟县| 静乐县|