官术网_书友最值得收藏!

Generating an asynchronous sequence

RxJava makes it possible to generate not only one event in the future, but an asynchronous sequence of events based, for example, on a time interval, as shown in the following code:

Observable.interval(1, TimeUnit.SECONDS)
.subscribe(e -> System.out.println("Received: " + e));
Thread.sleep(5000); // (1)

In that case, the output is as following:

Received: 0
Received: 1
Received: 2
Received: 3
Received: 4

Also, if we remove Thread.sleep(...) (1), our application will exit without any output. This happens because events would be generated and therefore consumed in a separate daemon thread. So, to prevent the main thread from finishing the execution, we may sleep() or do some other useful tasks.

Of course, there is something that controls the Observer-Subscriber cooperation. This is called Subscription, and has the following interface declaration:

interface Subscription {
void unsubscribe();
boolean isUnsubscribed();
}

The unsubscribe() method allows the Subscriber to inform Observable that there is no need to send new events. In other words, the aforementioned code is a subscription cancellation. On the other hand, Observable uses isUnsubscribed() to check that the Subscriber is still waiting for events.

To understand the mentioned unsubscribe functionality, let's consider the case where a subscriber is the only party interested in the events, and consumes them until an external signal is propagated by CountDawnLatch (1). The incoming stream generates a new event every 100 milliseconds, and these events produce the endless sequence—0, 1, 2, 3... (3). The following code demonstrates how to get a Subscription (2) when defining a reactive stream. It also shows how to unsubscribe from a stream (4):

CountDownLatch externalSignal = ...;                                 // (1)

Subscription subscription = Observable // (2)
.interval(100, MILLISECONDS) // (3)
.subscribe(System.out::println);

externalSignal.await();
subscription.unsubscribe(); // (4)

So here, the subscriber receives the events 0, 1, 2, 3, and then the externalSignal invocation occurs, which leads to the subscription cancellation.

At this point, we have already learned that reactive programming consists of an Observable stream, a Subscriber, and some sort of Subscription that communicates the intention of the Subscriber to receive events from the Observable producer. It is now time to transform the data flowing through the reactive streams.

主站蜘蛛池模板: 南城县| 栾川县| 左权县| 灯塔市| 襄垣县| 普定县| 金华市| 汝南县| 潼关县| 杨浦区| 古蔺县| 柘荣县| 米易县| 新民市| 扎囊县| 重庆市| 突泉县| 老河口市| 农安县| 湖南省| 水富县| 英吉沙县| 安吉县| 铁岭县| 苏尼特右旗| 金平| 南汇区| 通化县| 霍山县| 鹰潭市| 威远县| 晋江市| 自治县| 临朐县| 泰来县| 苗栗市| 璧山县| 灌云县| 磐石市| 思南县| 葫芦岛市|