官术网_书友最值得收藏!

Reactor in action 

Let's learn more about the reactor API with a practical example. Create a new Maven project similar to what we created in the Anatomy of RxJava section. The current version of the Project Reactor at the time of writing  is 3.2.6. We need to provide a Maven dependency for the reactor as follows:

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>reactor-demo</groupId>
<artifactId>simple-reactor-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>Smiple Reactor Dmo</name>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.2.6.RELEASE</version>
</dependency>
</dependencies>
</project>

When we define a Reactor dependency, Reactive Streams JAR will be added as a transitive dependency. Next, is to add a Java class as follows:

public class ReactorBasic {
private static List<String> carModels = Arrays.asList(
"Era","Magna","Sportz","Astha","Astha(O)");
public static void main(String args[]) {
Flux<String> fewWords = Flux.just("Hello", "World");
Flux<String> manyWords = Flux.fromIterable(carModels);
Mono<String> singleWord = Mono.just("Single value");
fewWords.subscribe(t->System.out.println(t));
System.out.println("-----------------------------");
manyWords.subscribe(System.out::println);
System.out.println("-----------------------------");
singleWord.subscribe(System.out::println);
}
}

We have used Flux and Mono to create various publishers. The  just() method is used to populate the stream. We can also reach the iterable types (like List, Set, n) to form a data stream with the fromIterable() method. A few other methods like from(), fromArray() , and fromStream() are used to construct data streams from other producers, arrays, and existing Java streams, respectively, and can be used as follows:

public class ReactorFromOtherPublisher {
public static void main(String[] args) {
Flux<String> fewWords = Flux.just("One","Two");
/* from array */
Flux<Integer> intFlux = Flux.fromArray(new Integer[]{1,2,3,4,5,6,7});
/* from Java 8 stream */
Flux<String> strFlux = Flux.fromStream(Stream.of(
"Ten", "Hundred", "Thousand", "Ten Thousands", "Lac","Ten Lac", "Crore"));
/* from other Publisher */
Flux<String> fromOtherPublisherFlux = Flux.from(fewWords);
intFlux.subscribe(System.out::println);
strFlux.subscribe(System.out::println);
fromOtherPublisherFlux.subscribe(System.out::println);
}
}

The subscriber can be plugged with the subscribe() method. This is similar to what we have done with Observable in RxJava. With Flux, we can create a publisher with the finite or infinite stream.

We can also control to generate a stream with a value or just an empty stream. All of that can be done with a few utility methods provided by the Flux class as follows:

  • Flux.empty(): It is used to generate an empty stream having no values and only executes completion events.
  • Flux.error(): It is used to signal the error condition by generating an error stream with no any value but only errors.
  • Flux.never(): As its name suggests, it generates a stream with no events of any type.
  • Flux.defer(): It is used to construct a publisher when a subscriber makes the subscription to Flux. In short, it is lazy in nature.
主站蜘蛛池模板: 日照市| 贺兰县| 济宁市| 进贤县| 云安县| 江孜县| 阿拉善盟| 全椒县| 济宁市| 湖南省| 开阳县| 昭通市| 彰化市| 金塔县| 楚雄市| 柘城县| 武义县| 勃利县| 沙雅县| 永寿县| 大关县| 资阳市| 沾化县| 温泉县| 西青区| 彩票| 海南省| 麻江县| 灵山县| 岱山县| 罗江县| 广灵县| 建瓯市| 柏乡县| 郯城县| 军事| 邵阳县| 准格尔旗| 准格尔旗| 江川县| 固始县|