官术网_书友最值得收藏!

Backpressure strategies

If an observable doesn't support backpressure but we are still interested in all of the items it emits, we can use one of the built-in backpressure combinators provided by Rx.

As an example, we will look at one such combinator, onBackpressureBuffer:

(rx/subscribe (->> (rx/map vector 
(.onBackpressureBuffer (fast-producing-obs))
(slow-producing-obs))
(rx/map (fn [[x y]]
(+ x y)))
(rx/take 10))
prn-to-repl
(fn [e] (prn-to-repl "error is " e))) ;; 2 ;; 4 ;; 6 ;; 8 ;; 10 ;; 12 ;; 14 ;; 16 ;; 18 ;; 20

This example is very similar to the one where we used sample, but the output is fairly different. This time, we get all of the items that have been emitted by both observables.

The onBackpressureBuffer strategy implements a strategy that simply buffers all of the items that are emitted by the slower Observable, emitting them whenever the consumer is ready. In our case, this happens every 500 milliseconds.

Other strategies include onBackpressureDrop and onBackpressureBlock.

It's worth noting that Reactive pull backpressure is still a work in progress and the best way to keep up to date with progress is on the RxJava wiki on the subject: https://github.com/ReactiveX/RxJava/wiki/Backpressure.

主站蜘蛛池模板: 海盐县| 金乡县| 怀来县| 哈尔滨市| 太康县| 聊城市| 南丰县| 高邑县| 西吉县| 木里| 马龙县| 美姑县| 忻城县| 绥滨县| 麦盖提县| 湘乡市| 元朗区| 衡南县| 长治市| 武宁县| 土默特右旗| 营口市| 惠东县| 青岛市| 吴川市| 东辽县| 金乡县| 大悟县| 犍为县| 体育| 德化县| 崇阳县| 凌云县| 手游| 五原县| 武邑县| 台中县| 陵水| 蚌埠市| 建阳市| 吴江市|