官术网_书友最值得收藏!

Producing and consuming streams

At this point, we should be familiar enough with the RxJava library to create our first small application. Let's define a stream that is represented by the Observable class. At the moment, we may assume that the Observable is a sort of generator that knows how to propagate events for subscribers as soon as they subscribe:

Observable<String> observale = Observable.create(
new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> sub) { // (1)
sub.onNext("Hello, reactive world!"); // (2)
sub.onCompleted(); // (3)
}
}
);

So, here we create an Observable with a callback that will be applied as soon as the Subscriber appears (1). At that moment, our Observer will produce a one string value (2) and then signal the end of the stream to the subscriber (3). We can also improve this code using the Java 8 lambdas:

Observable<String> observable = Observable.create(
sub -> {
sub.onNext("Hello, reactive world!");
sub.onCompleted();
}
);

In contrast with the Java Stream API, Observable is reusable, and each subscriber will receive the Hello, reactive world! event just after the subscription.

Note that, from RxJava 1.2.7 onward, the  Observable creation has been deprecated and treated as unsafe because it may generate too many elements and overload the subscriber. In other words, this approach does not support backpressure, a concept that we are going to examine later in detail. However, that code is still valid for the sake of introduction.

So, now we need a Subscriber, as shown in the following code:

Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String s) { // (1)
System.out.println(s);
}

@Override
public void onCompleted() { // (2)
System.out.println("Done!");
}

@Override
public void onError(Throwable e) { // (3)
System.err.println(e);
}
};

As we can see, the Subscriber has to implement the Observer methods and define the reactions for new events (1), stream completion (2), and errors (3). Now, let's hook the observable and subscriber instances together:

observable.subscribe(subscriber);

When running the mentioned code, the program generates the following output:

Hello, reactive world!
Done!

Hooray! We have just written a small and simple reactive hello-world application! As we may suspect, we may rewrite this example using lambdas, as shown in the following code:

Observable.create(
sub -> {
sub.onNext("Hello, reactive world!");
sub.onCompleted();
}
).subscribe(
System.out::println,
System.err::println,
() -> System.out.println("Done!")
);

The RxJava library gives a lot of flexibility in order to create Observable and Subscriber instances. It is possible to create an Observable instance just by referencing elements, by using an old-style array, or from the Iterable collection, as follows:

Observable.just("1", "2", "3", "4");
Observable.from(new String[]{"A", "B", "C"});
Observable.from(Collections.emptyList());

It is also possible to reference a Callable (1) or even a Future (2), as shown in the following code:

Observable<String> hello = Observable.fromCallable(() -> "Hello ");  // (1)
Future<String> future =
Executors.newCachedThreadPool().submit(() -> "World");
Observable<String> world = Observable.from(future); // (2)

Moreover, along with the plain creational functionality, the Observable stream may be created by combining other Observable instances, which allows for easy implementation of pretty complicated workflows. For example, the concat() operator for each of the incoming streams consumes all items by re-sending them to the downstream observer. Incoming streams will then be processed until a terminal operation (onComplete(), onError()) occurs, and the order of processing is the same as the order of the concat() arguments. The following code demonstrates an example of the concat() usage:

Observable.concat(hello, world, Observable.just("!"))
.forEach(System.out::print);

Here, as part of a straightforward combination of a few Observable instances that use different origins, we also iterate through the result with the Observable.forEach() method in a way that is similar to the Java 8 Stream API. Such a program generates the following output:

Hello World!
Note that even though it is convenient to not define handlers for exceptions, in the case where an error occurs, the default Subscriber implementation throws rx.exceptions.OnErrorNotImplementedException .

主站蜘蛛池模板: 胶州市| 河西区| 新田县| 自贡市| 三明市| 嘉义县| 江西省| 阿拉尔市| 富民县| 宜章县| 白玉县| 孟州市| 陇川县| 沧州市| 平度市| 博罗县| 色达县| 广灵县| 聊城市| 万州区| 观塘区| 葫芦岛市| 扶绥县| 洛隆县| 岢岚县| 巴塘县| 昌宁县| 霍州市| 普兰店市| 建瓯市| 中方县| 邵东县| 石屏县| 陇西县| 潮州市| 伊川县| 宣城市| 那坡县| 梁山县| 吉林市| 阳信县|