- Hands-On Reactive Programming in Spring 5
- Oleh Dokuka Igor Lozynskyi
- 661字
- 2021-07-23 16:36:21
Producing and consuming streams
At this point, we should be familiar enough with the RxJava library to create our first small application. Let's define a stream that is represented by the Observable class. At the moment, we may assume that the Observable is a sort of generator that knows how to propagate events for subscribers as soon as they subscribe:
Observable<String> observale = Observable.create(
new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> sub) { // (1)
sub.onNext("Hello, reactive world!"); // (2)
sub.onCompleted(); // (3)
}
}
);
So, here we create an Observable with a callback that will be applied as soon as the Subscriber appears (1). At that moment, our Observer will produce a one string value (2) and then signal the end of the stream to the subscriber (3). We can also improve this code using the Java 8 lambdas:
Observable<String> observable = Observable.create(
sub -> {
sub.onNext("Hello, reactive world!");
sub.onCompleted();
}
);
In contrast with the Java Stream API, Observable is reusable, and each subscriber will receive the Hello, reactive world! event just after the subscription.
So, now we need a Subscriber, as shown in the following code:
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String s) { // (1)
System.out.println(s);
}
@Override
public void onCompleted() { // (2)
System.out.println("Done!");
}
@Override
public void onError(Throwable e) { // (3)
System.err.println(e);
}
};
As we can see, the Subscriber has to implement the Observer methods and define the reactions for new events (1), stream completion (2), and errors (3). Now, let's hook the observable and subscriber instances together:
observable.subscribe(subscriber);
When running the mentioned code, the program generates the following output:
Hello, reactive world!
Done!
Hooray! We have just written a small and simple reactive hello-world application! As we may suspect, we may rewrite this example using lambdas, as shown in the following code:
Observable.create(
sub -> {
sub.onNext("Hello, reactive world!");
sub.onCompleted();
}
).subscribe(
System.out::println,
System.err::println,
() -> System.out.println("Done!")
);
The RxJava library gives a lot of flexibility in order to create Observable and Subscriber instances. It is possible to create an Observable instance just by referencing elements, by using an old-style array, or from the Iterable collection, as follows:
Observable.just("1", "2", "3", "4");
Observable.from(new String[]{"A", "B", "C"});
Observable.from(Collections.emptyList());
It is also possible to reference a Callable (1) or even a Future (2), as shown in the following code:
Observable<String> hello = Observable.fromCallable(() -> "Hello "); // (1)
Future<String> future =
Executors.newCachedThreadPool().submit(() -> "World");
Observable<String> world = Observable.from(future); // (2)
Moreover, along with the plain creational functionality, the Observable stream may be created by combining other Observable instances, which allows for easy implementation of pretty complicated workflows. For example, the concat() operator for each of the incoming streams consumes all items by re-sending them to the downstream observer. Incoming streams will then be processed until a terminal operation (onComplete(), onError()) occurs, and the order of processing is the same as the order of the concat() arguments. The following code demonstrates an example of the concat() usage:
Observable.concat(hello, world, Observable.just("!"))
.forEach(System.out::print);
Here, as part of a straightforward combination of a few Observable instances that use different origins, we also iterate through the result with the Observable.forEach() method in a way that is similar to the Java 8 Stream API. Such a program generates the following output:
Hello World!
- 物聯(lián)網(wǎng)標(biāo)準(zhǔn)化指南
- 光網(wǎng)絡(luò)評(píng)估及案例分析
- 網(wǎng)絡(luò)故障現(xiàn)場處理實(shí)踐(第4版)
- Learning QGIS 2.0
- 智慧城市中的移動(dòng)互聯(lián)網(wǎng)技術(shù)
- 企業(yè)網(wǎng)絡(luò)安全管理
- Windows Server 2012 Hyper-V虛擬化管理實(shí)踐
- 4G小基站系統(tǒng)原理、組網(wǎng)及應(yīng)用
- 端到端QoS網(wǎng)絡(luò)設(shè)計(jì)
- 5G+區(qū)塊鏈
- 深入理解計(jì)算機(jī)網(wǎng)絡(luò)
- 計(jì)算機(jī)通信網(wǎng)絡(luò)安全
- 走近奇妙的物聯(lián)網(wǎng)
- ElasticSearch Server
- 網(wǎng)絡(luò)攻防技術(shù)與實(shí)踐