官术网_书友最值得收藏!

Flux.create

Flux.create is another mechanism for generating events programmatically. It takes a FluxSink, which is capable of generating any number of events. The API is more generic than the Generate methods discussed in the previous section. The FluxSink is capable of generating events asynchronously. Moreover, it does not take subscription cancellation or backpressure into account. This means that even if the subscriber has cancelled its subscription, the create API will continue to generate events. All implementations must listen for the cancel event and explicitly initiate stream closure.

As for backpressure, the producer keeps generating the events without looking into any demand from the subscriber. These events are  buffered and dropped by default if the subscription is lost.

To see how the two are different, let's modify our FibonacciGenerator to use a FluxSink. Some of the key differences are highlighted as follows:

  • There is no initial seed state in the API
  • The FluxSink keeps generating the events, irrespective of the subscription state
  • We can generate any number of events in the sink
  • The OnDispose event can be listened to in order to perform any cleanup, or to stop publishing events
  • All events that are generated are buffered and dropped once the subscription is cancelled

It is important to note that the FluxSink provides lifecycle callback methods, which can be used to perform additional cleanups, or any other action, as follows:

  • OnCancel: This method gets invoked when the subscription is cancelled.
  • OnDispose: This method gets invoked when the subscription is closed due to a cancel, close, or error event.
  • OnRequest: This method is invoked with the value specified by the subscriber. It can be used to build a pull data model. When the method is invoked, the next method can be invoked for the specified number of the values:
@Test
public void testFibonacciFluxSink() {
Flux<Long> fibonacciGenerator = Flux.create(e -> {
long current = 1, prev = 0;
AtomicBoolean stop = new AtomicBoolean(false);
e.onDispose(()->{
stop.set(true);
System.out.println("******* Stop Received ****** ");
});
while (current > 0) {
e.next(current);
System.out.println("generated " + current);
long next = current + prev;
prev = current;
current = next;
}
e.complete();
});
List<Long> fibonacciSeries = new LinkedList<>();
fibonacciGenerator.take(50).subscribe(t -> {
System.out.println("consuming " + t);
fibonacciSeries.add(t);
});
System.out.println(fibonacciSeries);
}

Let's check the output that's generated, as follows:

Flux also provides a Push method. This is similar to the create method, but the process of how error and complete events are invoked varies. These events must be invoked in a synchronous manner, from a single thread producer.

主站蜘蛛池模板: 黑水县| 林西县| 游戏| 外汇| 左贡县| 刚察县| 卓尼县| 泽州县| 钦州市| 广南县| 保亭| 赣州市| 津市市| 璧山县| 岚皋县| 安西县| 乌拉特后旗| 大关县| 肃北| 牡丹江市| 自治县| 阿巴嘎旗| 沛县| 讷河市| 江安县| 临安市| 贵州省| 旅游| 南涧| 柘城县| 张掖市| 嵊州市| 保靖县| 西安市| 盐山县| 嘉祥县| 镶黄旗| 汝州市| 南丹县| 手游| 永安市|