- Hands-On Reactive Programming in Spring 5
- Oleh Dokuka Igor Lozynskyi
- 661字
- 2021-07-23 16:36:21
Producing and consuming streams
At this point, we should be familiar enough with the RxJava library to create our first small application. Let's define a stream that is represented by the Observable class. At the moment, we may assume that the Observable is a sort of generator that knows how to propagate events for subscribers as soon as they subscribe:
Observable<String> observale = Observable.create(
new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> sub) { // (1)
sub.onNext("Hello, reactive world!"); // (2)
sub.onCompleted(); // (3)
}
}
);
So, here we create an Observable with a callback that will be applied as soon as the Subscriber appears (1). At that moment, our Observer will produce a one string value (2) and then signal the end of the stream to the subscriber (3). We can also improve this code using the Java 8 lambdas:
Observable<String> observable = Observable.create(
sub -> {
sub.onNext("Hello, reactive world!");
sub.onCompleted();
}
);
In contrast with the Java Stream API, Observable is reusable, and each subscriber will receive the Hello, reactive world! event just after the subscription.
So, now we need a Subscriber, as shown in the following code:
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String s) { // (1)
System.out.println(s);
}
@Override
public void onCompleted() { // (2)
System.out.println("Done!");
}
@Override
public void onError(Throwable e) { // (3)
System.err.println(e);
}
};
As we can see, the Subscriber has to implement the Observer methods and define the reactions for new events (1), stream completion (2), and errors (3). Now, let's hook the observable and subscriber instances together:
observable.subscribe(subscriber);
When running the mentioned code, the program generates the following output:
Hello, reactive world!
Done!
Hooray! We have just written a small and simple reactive hello-world application! As we may suspect, we may rewrite this example using lambdas, as shown in the following code:
Observable.create(
sub -> {
sub.onNext("Hello, reactive world!");
sub.onCompleted();
}
).subscribe(
System.out::println,
System.err::println,
() -> System.out.println("Done!")
);
The RxJava library gives a lot of flexibility in order to create Observable and Subscriber instances. It is possible to create an Observable instance just by referencing elements, by using an old-style array, or from the Iterable collection, as follows:
Observable.just("1", "2", "3", "4");
Observable.from(new String[]{"A", "B", "C"});
Observable.from(Collections.emptyList());
It is also possible to reference a Callable (1) or even a Future (2), as shown in the following code:
Observable<String> hello = Observable.fromCallable(() -> "Hello "); // (1)
Future<String> future =
Executors.newCachedThreadPool().submit(() -> "World");
Observable<String> world = Observable.from(future); // (2)
Moreover, along with the plain creational functionality, the Observable stream may be created by combining other Observable instances, which allows for easy implementation of pretty complicated workflows. For example, the concat() operator for each of the incoming streams consumes all items by re-sending them to the downstream observer. Incoming streams will then be processed until a terminal operation (onComplete(), onError()) occurs, and the order of processing is the same as the order of the concat() arguments. The following code demonstrates an example of the concat() usage:
Observable.concat(hello, world, Observable.just("!"))
.forEach(System.out::print);
Here, as part of a straightforward combination of a few Observable instances that use different origins, we also iterate through the result with the Observable.forEach() method in a way that is similar to the Java 8 Stream API. Such a program generates the following output:
Hello World!
- Mastering Node.js(Second Edition)
- 網(wǎng)絡(luò)協(xié)議工程
- 光網(wǎng)絡(luò)評(píng)估及案例分析
- Truffle Quick Start Guide
- GPS/GNSS原理與應(yīng)用(第3版)
- 面向云平臺(tái)的物聯(lián)網(wǎng)多源異構(gòu)信息融合方法
- 物聯(lián)網(wǎng)技術(shù)與應(yīng)用
- jQuery Mobile Web Development Essentials
- 大型企業(yè)微服務(wù)架構(gòu)實(shí)踐與運(yùn)營
- 互聯(lián)網(wǎng)安全的40個(gè)智慧洞見(2016)
- 物聯(lián)網(wǎng)基礎(chǔ)及應(yīng)用
- 華為HCIA-Datacom認(rèn)證指南
- 新媒體交互藝術(shù)
- 物聯(lián)網(wǎng),So Easy!
- SEO攻略:搜索引擎優(yōu)化策略與實(shí)戰(zhàn)案例詳解