官术网_书友最值得收藏!

Backpressure strategies

If an observable doesn't support backpressure but we are still interested in all of the items it emits, we can use one of the built-in backpressure combinators provided by Rx.

As an example, we will look at one such combinator, onBackpressureBuffer:

(rx/subscribe (->> (rx/map vector 
(.onBackpressureBuffer (fast-producing-obs))
(slow-producing-obs))
(rx/map (fn [[x y]]
(+ x y)))
(rx/take 10))
prn-to-repl
(fn [e] (prn-to-repl "error is " e))) ;; 2 ;; 4 ;; 6 ;; 8 ;; 10 ;; 12 ;; 14 ;; 16 ;; 18 ;; 20

This example is very similar to the one where we used sample, but the output is fairly different. This time, we get all of the items that have been emitted by both observables.

The onBackpressureBuffer strategy implements a strategy that simply buffers all of the items that are emitted by the slower Observable, emitting them whenever the consumer is ready. In our case, this happens every 500 milliseconds.

Other strategies include onBackpressureDrop and onBackpressureBlock.

It's worth noting that Reactive pull backpressure is still a work in progress and the best way to keep up to date with progress is on the RxJava wiki on the subject: https://github.com/ReactiveX/RxJava/wiki/Backpressure.

主站蜘蛛池模板: 都昌县| 慈溪市| 淮滨县| 健康| 延津县| 新宁县| 资溪县| 金溪县| 建昌县| 津南区| 十堰市| 丰县| 新绛县| 邯郸市| 瓦房店市| 玉田县| 昆明市| 卢氏县| 南和县| 巴塘县| 新邵县| 隆昌县| 涡阳县| 贵南县| 色达县| 新建县| 海晏县| 永清县| 光山县| 百色市| 馆陶县| 康平县| 岐山县| 延寿县| 布尔津县| 瑞金市| 新宁县| 台江县| 青河县| 阿拉善右旗| 大丰市|