官术网_书友最值得收藏!

Flux.create

Flux.create is another mechanism for generating events programmatically. It takes a FluxSink, which is capable of generating any number of events. The API is more generic than the Generate methods discussed in the previous section. The FluxSink is capable of generating events asynchronously. Moreover, it does not take subscription cancellation or backpressure into account. This means that even if the subscriber has cancelled its subscription, the create API will continue to generate events. All implementations must listen for the cancel event and explicitly initiate stream closure.

As for backpressure, the producer keeps generating the events without looking into any demand from the subscriber. These events are  buffered and dropped by default if the subscription is lost.

To see how the two are different, let's modify our FibonacciGenerator to use a FluxSink. Some of the key differences are highlighted as follows:

  • There is no initial seed state in the API
  • The FluxSink keeps generating the events, irrespective of the subscription state
  • We can generate any number of events in the sink
  • The OnDispose event can be listened to in order to perform any cleanup, or to stop publishing events
  • All events that are generated are buffered and dropped once the subscription is cancelled

It is important to note that the FluxSink provides lifecycle callback methods, which can be used to perform additional cleanups, or any other action, as follows:

  • OnCancel: This method gets invoked when the subscription is cancelled.
  • OnDispose: This method gets invoked when the subscription is closed due to a cancel, close, or error event.
  • OnRequest: This method is invoked with the value specified by the subscriber. It can be used to build a pull data model. When the method is invoked, the next method can be invoked for the specified number of the values:
@Test
public void testFibonacciFluxSink() {
Flux<Long> fibonacciGenerator = Flux.create(e -> {
long current = 1, prev = 0;
AtomicBoolean stop = new AtomicBoolean(false);
e.onDispose(()->{
stop.set(true);
System.out.println("******* Stop Received ****** ");
});
while (current > 0) {
e.next(current);
System.out.println("generated " + current);
long next = current + prev;
prev = current;
current = next;
}
e.complete();
});
List<Long> fibonacciSeries = new LinkedList<>();
fibonacciGenerator.take(50).subscribe(t -> {
System.out.println("consuming " + t);
fibonacciSeries.add(t);
});
System.out.println(fibonacciSeries);
}

Let's check the output that's generated, as follows:

Flux also provides a Push method. This is similar to the create method, but the process of how error and complete events are invoked varies. These events must be invoked in a synchronous manner, from a single thread producer.

主站蜘蛛池模板: 化德县| 常宁市| 永春县| 鹤壁市| 左权县| 西吉县| 唐海县| 张北县| 曲麻莱县| 二连浩特市| 密山市| 盐山县| 甘泉县| 德安县| 永寿县| 枝江市| 泽州县| 锦屏县| 铜鼓县| 察雅县| 林口县| 莱西市| 正安县| 维西| 平顺县| 齐齐哈尔市| 金平| 兴文县| 张掖市| 鹿邑县| 龙江县| 河东区| 和硕县| 宜阳县| 固安县| 米林县| 南皮县| 沂水县| 华池县| 鄄城县| 延长县|