官术网_书友最值得收藏!

Flux

Let's describe how data flows through the Flux class with the following marble diagram:

Diagram 4.2 An example of the Flux stream transformed into another Flux stream

Flux defines a usual reactive stream that can produce zero, one, or many elements; even potentially an infinite amount of elements. It has the following formula:

onNext x 0..N [onError | onComplete]

It is not very common to work with infinite data containers in the imperative world, but it is pretty common with functional programming. The following code may produce a simple endless Reactive Stream:

Flux.range(1, 5).repeat()

This stream repeatedly produces numbers from 1 to 5 (the sequence would look like—1, 2, 3, 4, 5, 1, 2,...). This is not a problem and it will not blow up the memory as each element can be transformed and consumed without the need to finish creating the whole stream. Furthermore, the subscriber can cancel the subscription at any time and effectively transform an endless stream into a finite stream.

Beware: an attempt to collect all elements emitted by an endless stream may cause an OutOfMemoryException. It is not recommended to do so in production applications, but the simplest way to reproduce such behavior may be with the following code:

Flux.range(1, 100)                                                  // (1)
.repeat() // (2)
.collectList() // (3) .block(); // (4)

In the preceding code, we do the following:

  1. The range operator creates a sequence of integers starting from 1 up to 100 (inclusive).
  2. The repeat operator subscribes to the source reactive stream again and again after the source stream finishes. So, the repeat operator subscribes to the results of the stream operator, receives elements 1 to 100 and the onComplete signal, and then subscribes again, receives elements 1 to 100, and so on, without stopping.
  1. With the collectList operator, we are trying to gather all produced elements into a single list. Of course, because the repeat operator generates an endless stream, elements arrive and increase the size of the list so it consumes all the memory and causes the application to fail with the following error—java.lang.OutOfMemoryError: Java heap space. Our application has just run out of free heap memory.
  2. The block operator triggers an actual subscription and blocks the running thread until the final result arrives, which, in the current case, cannot happen as the reactive stream is endless.
主站蜘蛛池模板: 永顺县| 湟中县| 铜鼓县| 定边县| 海晏县| 淮北市| 临泉县| 溧阳市| 平安县| 博兴县| 文化| 黔江区| 将乐县| 长春市| 康定县| 海城市| 工布江达县| 邵阳县| 孝义市| 噶尔县| 通江县| 邯郸市| 新郑市| 灵山县| 石渠县| 华池县| 菏泽市| 黄石市| 江永县| 黑水县| 龙山县| 河南省| 灌云县| 腾冲县| 延安市| 十堰市| 嘉兴市| 满城县| 沭阳县| 通州区| 阿瓦提县|