官术网_书友最值得收藏!

Reactor in action 

Let's learn more about the reactor API with a practical example. Create a new Maven project similar to what we created in the Anatomy of RxJava section. The current version of the Project Reactor at the time of writing  is 3.2.6. We need to provide a Maven dependency for the reactor as follows:

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>reactor-demo</groupId>
<artifactId>simple-reactor-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>Smiple Reactor Dmo</name>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.2.6.RELEASE</version>
</dependency>
</dependencies>
</project>

When we define a Reactor dependency, Reactive Streams JAR will be added as a transitive dependency. Next, is to add a Java class as follows:

public class ReactorBasic {
private static List<String> carModels = Arrays.asList(
"Era","Magna","Sportz","Astha","Astha(O)");
public static void main(String args[]) {
Flux<String> fewWords = Flux.just("Hello", "World");
Flux<String> manyWords = Flux.fromIterable(carModels);
Mono<String> singleWord = Mono.just("Single value");
fewWords.subscribe(t->System.out.println(t));
System.out.println("-----------------------------");
manyWords.subscribe(System.out::println);
System.out.println("-----------------------------");
singleWord.subscribe(System.out::println);
}
}

We have used Flux and Mono to create various publishers. The  just() method is used to populate the stream. We can also reach the iterable types (like List, Set, n) to form a data stream with the fromIterable() method. A few other methods like from(), fromArray() , and fromStream() are used to construct data streams from other producers, arrays, and existing Java streams, respectively, and can be used as follows:

public class ReactorFromOtherPublisher {
public static void main(String[] args) {
Flux<String> fewWords = Flux.just("One","Two");
/* from array */
Flux<Integer> intFlux = Flux.fromArray(new Integer[]{1,2,3,4,5,6,7});
/* from Java 8 stream */
Flux<String> strFlux = Flux.fromStream(Stream.of(
"Ten", "Hundred", "Thousand", "Ten Thousands", "Lac","Ten Lac", "Crore"));
/* from other Publisher */
Flux<String> fromOtherPublisherFlux = Flux.from(fewWords);
intFlux.subscribe(System.out::println);
strFlux.subscribe(System.out::println);
fromOtherPublisherFlux.subscribe(System.out::println);
}
}

The subscriber can be plugged with the subscribe() method. This is similar to what we have done with Observable in RxJava. With Flux, we can create a publisher with the finite or infinite stream.

We can also control to generate a stream with a value or just an empty stream. All of that can be done with a few utility methods provided by the Flux class as follows:

  • Flux.empty(): It is used to generate an empty stream having no values and only executes completion events.
  • Flux.error(): It is used to signal the error condition by generating an error stream with no any value but only errors.
  • Flux.never(): As its name suggests, it generates a stream with no events of any type.
  • Flux.defer(): It is used to construct a publisher when a subscriber makes the subscription to Flux. In short, it is lazy in nature.
主站蜘蛛池模板: 海盐县| 方正县| 溆浦县| 霸州市| 定州市| 顺平县| 绥德县| 旺苍县| 宜春市| 藁城市| 奇台县| 武清区| 申扎县| 博罗县| 枣阳市| 东莞市| 扎鲁特旗| 喀喇| 全椒县| 富平县| 绩溪县| 天镇县| 荥阳市| 红桥区| 双鸭山市| 奈曼旗| 四川省| 秀山| 西乌珠穆沁旗| 新丰县| 称多县| 阜城县| 大连市| 汪清县| 大名县| 西宁市| 德州市| 邳州市| 临城县| 中卫市| 万州区|