官术网_书友最值得收藏!

  • Spring 5.0 Projects
  • Nilang Patel
  • 483字
  • 2021-07-02 12:35:03

Observer event calls

What we discussed so far is very high-level information about how we can use Observable in RxJava. It basically pushes (emits) the items (data or events) of a given type through a series of operators (if defined) until it reaches  Observer. Let's dig into more details to understand which mechanism works under the hood between this interaction and how RxJava complies with Reactive Streams specifications.

Observable interacts with Observers through the following event calls:

  • onNext: This is the call from where data/events are being sent, one at a time, down to all registered Observers.
  • onComplete: This event is used to signal completion of communication to all. 
  •  Observers: It simply denotes no more onNext calls happen.
  • onError: In case any error occurs before an onComplete() call, an onError() event is used to signal the error from Observable to Observers. Observable will stop emitting the data and Observers will handle the error.

These events are defined as an abstract method in the Observer type, and we will see the implementation type later in this chapter. First let's see how these event calls happen during the interaction with the following code:

public class RxJavaCreateDemo {

public static void main(String[] args) {
Observable<String> daysOfWeek = Observable.create(
sourceEmitter -> {
try {
sourceEmitter.onNext("Sunday");
sourceEmitter.onNext("Monday");
sourceEmitter.onNext("Tuesday");
sourceEmitter.onNext("Wednesday");
sourceEmitter.onNext("Thursday");
sourceEmitter.onNext("Friday");
sourceEmitter.onNext("Saturday");
sourceEmitter.onComplete();
}catch(Exception e) {
sourceEmitter.onError(e);
}
});
Observable<String> daysInUpperCase= daysOfWeek.map(day->day.toUpperCase())
.filter(day->day.startsWith("S"));
daysInUpperCase.subscribe(day->System.out.println("Day is -->"+day));
}
}

 Observable.create() is a factory method and used to create Observable with the emitter. The onNext() method of the emitter is used to emit (send) the data/events (one at a time) to the Observable chain (and finally to registered Observers). The onComplete() method is used to terminate further communication.

If you try to make an onNext() call after onComplete(), the data will not be transmitted. In case any error occurs, the onError() method is called. It is used to push up the error to the Observable chain where it is handled by Observer. In this code, there is no chance of any exception, but you can handle any error with onError().

We have used the map and filter operators to refine the data to uppercase and starting with D respectively. Finally, they are printed by Observer. The flow of data will happen from onNext()mapfilterObserver.  Each operator will return new Observable class in the chain. 

You notice that in the first example we used the Observable.just() method to emit the data. It internally invokes the onNext() method for each of the values pushed. On getting the last value, it will call onComplete(). So Observable.just() is equivalent to Observable.create() calling onNext() on each data and onComplete() on last one. The create() method is generally used for sources that are not reactive in nature. 

主站蜘蛛池模板: 大冶市| 恩施市| 南木林县| 华亭县| 正阳县| 岳西县| 开封县| 新竹县| 阿克陶县| 南城县| 彭州市| 新龙县| 成都市| 南京市| 宜都市| 彭阳县| 永年县| 武山县| 唐山市| 洛浦县| 开平市| 大竹县| 莎车县| 清苑县| 区。| 鸡泽县| 和田县| 佛学| 延长县| 乐至县| 南汇区| 新化县| 若尔盖县| 怀来县| 英超| 老河口市| 鞍山市| 安阳县| 三穗县| 铜陵市| 武定县|