- Hands-On Reactive Programming with Reactor
- Rahul Sharma
- 430字
- 2021-08-13 15:22:57
Flux.create
Flux.create is another mechanism for generating events programmatically. It takes a FluxSink, which is capable of generating any number of events. The API is more generic than the Generate methods discussed in the previous section. The FluxSink is capable of generating events asynchronously. Moreover, it does not take subscription cancellation or backpressure into account. This means that even if the subscriber has cancelled its subscription, the create API will continue to generate events. All implementations must listen for the cancel event and explicitly initiate stream closure.
As for backpressure, the producer keeps generating the events without looking into any demand from the subscriber. These events are buffered and dropped by default if the subscription is lost.
To see how the two are different, let's modify our FibonacciGenerator to use a FluxSink. Some of the key differences are highlighted as follows:
- There is no initial seed state in the API
- The FluxSink keeps generating the events, irrespective of the subscription state
- We can generate any number of events in the sink
- The OnDispose event can be listened to in order to perform any cleanup, or to stop publishing events
- All events that are generated are buffered and dropped once the subscription is cancelled
It is important to note that the FluxSink provides lifecycle callback methods, which can be used to perform additional cleanups, or any other action, as follows:
- OnCancel: This method gets invoked when the subscription is cancelled.
- OnDispose: This method gets invoked when the subscription is closed due to a cancel, close, or error event.
- OnRequest: This method is invoked with the value specified by the subscriber. It can be used to build a pull data model. When the method is invoked, the next method can be invoked for the specified number of the values:
@Test
public void testFibonacciFluxSink() {
Flux<Long> fibonacciGenerator = Flux.create(e -> {
long current = 1, prev = 0;
AtomicBoolean stop = new AtomicBoolean(false);
e.onDispose(()->{
stop.set(true);
System.out.println("******* Stop Received ****** ");
});
while (current > 0) {
e.next(current);
System.out.println("generated " + current);
long next = current + prev;
prev = current;
current = next;
}
e.complete();
});
List<Long> fibonacciSeries = new LinkedList<>();
fibonacciGenerator.take(50).subscribe(t -> {
System.out.println("consuming " + t);
fibonacciSeries.add(t);
});
System.out.println(fibonacciSeries);
}
Let's check the output that's generated, as follows:

Flux also provides a Push method. This is similar to the create method, but the process of how error and complete events are invoked varies. These events must be invoked in a synchronous manner, from a single thread producer.
- Ansible Configuration Management
- 火格局的時空變異及其在電網防火中的應用
- 智能傳感器技術與應用
- 手把手教你玩轉RPA:基于UiPath和Blue Prism
- TIBCO Spotfire:A Comprehensive Primer(Second Edition)
- Creo Parametric 1.0中文版從入門到精通
- Spark大數據技術與應用
- Ruby on Rails敏捷開發最佳實踐
- FPGA/CPLD應用技術(Verilog語言版)
- 奇點將至
- Flink原理與實踐
- 3ds Max造型表現藝術
- Hands-On Business Intelligence with Qlik Sense
- 設計模式
- Effective Business Intelligence with QuickSight