官术网_书友最值得收藏!

SynchronousSink

The sink gets bounded to a subscriber of the publisher. It gets invoked via the consumer function, when a subscriber asks for data. For each invocation, the sink can be used to generate a maximum of one value event at a time. The sink can raise additional onCompletion or error events during the invocation.

It is important to note that the events generated by sink are synchronously consumed at the subscriber end. Let's reflect on the Fibonacci test that we wrote in the previous chapter:

Flux<Long> fibonacciGenerator = Flux.generate(
() -> Tuples.<Long, Long>of(0L, 1L),
(state, sink) -> {
sink.next(state.getT1());
System.out.println("generated "+state.getT1());
return Tuples.of(state.getT2(), state.getT1() + state.getT2());
});
fibonacciGenerator.take(size).subscribe(t -> {
System.out.println("consuming "+t);
fibonacciSeries.add(t);
});
Generating more that one event in the sink leads to java.lang.IllegalStateException: More than one call to onNext.

We have added additional print statements while generating and consuming numbers. Let's run our tests to see the output, as follows:

The consumer and producer statements are generated in an alternative manner. We can easily deduce that each number is consumed before the next number is generated. The Generate API is offered in multiple variants, and the sink can be used with or without an initial state. In our FibonacciGenerator, we used this with a state that is initialized on a per-subscriber basis. Optionally, we can also provide a terminal function, which gets invoked upon the termination of the events stream. This means that it will occur after the sink invokes an error or completion event. The terminal function can be used to perform any cleanup associated with the state.

主站蜘蛛池模板: 祁东县| 洮南市| 瓮安县| 阳朔县| 新邵县| 新乐市| 云南省| 绥化市| 宁陕县| 名山县| 平邑县| 广灵县| 子洲县| 建瓯市| 改则县| 龙井市| 陇南市| 平顶山市| 鄂州市| 都江堰市| 墨江| 应城市| 金沙县| 无极县| 漠河县| 明溪县| 巴东县| 罗山县| 华宁县| 共和县| 中宁县| 印江| 云霄县| 漠河县| 肃宁县| 阿拉善右旗| 民权县| 临澧县| 莆田市| 内乡县| 扶余县|