官术网_书友最值得收藏!

Observer plus iterator equals Reactive Stream

In this chapter, we have talked a lot about the Observer pattern, which gives us a clearly separated view of the Producer event and Consumer event. Let's have a recap of the interfaces defined by that pattern, as shown in the following code:

public interface Observer<T> {
void notify(T event);
}

public interface Subject<T> {
void registerObserver(Observer<T> observer);
void unregisterObserver(Observer<T> observer);
void notifyObservers(T event);
}

As we saw previously, this approach is charming for infinite streams of data, but it would be great to have the ability to signal the end of the data stream. Also, we do not want the Producer to generate events before the appearance of consumers. In the synchronous world, we have a pattern for that—Iterator pattern. This may be described using the following code:

public interface Iterator<T> {
T next();
boolean hasNext();
}

To retrieve items one by one, Iterator provides the next() method and also makes it possible to signal the end of the sequence by returning a false value as a result of the hasNext() call. So what would happen if we tried to mix this idea with an asynchronous execution provided by the Observer pattern? The result would look like the following:

public interface RxObserver<T> {
void onNext(T next);
void onComplete();
}

The RxObserver is pretty similar to the Iterator, but instead of calling the next() method of Iterator, RxObserver would be notified with a new value by the onNext() callback. And instead of checking whether the result of the hasNext() method is positive, RxObserver is informed about the end of the stream through the invoked onComplete() method. That is fine, but what about errors? The Iterator may throw an Exception during the processing of the next() method, and it would be great to have a mechanism for an error propagation from the Producer to RxObserver. Let's add a special callback for that—onError(). So, the final solution will look like the following:

public interface RxObserver<T> {
void onNext(T next);
void onComplete();
void onError(Exception e);
}

This happened because we have just designed an Observer interface, the foundational concept of RxJava. This interface defines how data flows between every part of a reactive stream. By being the smallest part of the library, the Observer interface is found everywhere. The RxObserver is similar to the Observer from the Observer pattern, as previously described.

The Observable Reactive class is a counterpart to the Subject from the Observer pattern. As a consequence, Observable plays a role as an events source as it emits items. It has hundreds of stream transformation methods, as well as dozens of factory methods to initialize a reactive stream.

Subscriber abstract class implements the Observer interface and consumes items. It is also used as a base for the actual Subscriber's implementation. The runtime relation between Observable and Subscriber is controlled by a Subscription that makes it possible to check the subscription status and cancel it if needed. This relationship is illustrated in the following diagram:

Diagram 2.6 Observable-Observer contract

RxJava defines rules about emitting items. The Observable is allowed to send any number of elements (including zero). Then it signals the end of the execution either by claiming the success or raising an error. So the Observable for each attached Subscriber invokes onNext() any number of times, then calls onComplete() or onError() (but not both). Consequently, it is prohibited for it to call onNext() after onComplete() or onError().

主站蜘蛛池模板: 金湖县| 莫力| 昆明市| 宜兴市| 太原市| 延长县| 全椒县| 盐山县| 双流县| 宜章县| 长汀县| 天津市| 深圳市| 长岭县| 巴楚县| 监利县| 阿合奇县| 新绛县| 留坝县| 古田县| 遵义县| 永平县| 福清市| 吴川市| 金沙县| 孟连| 临猗县| 嘉兴市| 辰溪县| 湛江市| 长沙县| 蓬安县| 泰兴市| 海门市| 广德县| 锡林浩特市| 阿图什市| 赤峰市| 密山市| 固阳县| 板桥市|