官术网_书友最值得收藏!

Backpressure strategies

If an observable doesn't support backpressure but we are still interested in all of the items it emits, we can use one of the built-in backpressure combinators provided by Rx.

As an example, we will look at one such combinator, onBackpressureBuffer:

(rx/subscribe (->> (rx/map vector 
(.onBackpressureBuffer (fast-producing-obs))
(slow-producing-obs))
(rx/map (fn [[x y]]
(+ x y)))
(rx/take 10))
prn-to-repl
(fn [e] (prn-to-repl "error is " e))) ;; 2 ;; 4 ;; 6 ;; 8 ;; 10 ;; 12 ;; 14 ;; 16 ;; 18 ;; 20

This example is very similar to the one where we used sample, but the output is fairly different. This time, we get all of the items that have been emitted by both observables.

The onBackpressureBuffer strategy implements a strategy that simply buffers all of the items that are emitted by the slower Observable, emitting them whenever the consumer is ready. In our case, this happens every 500 milliseconds.

Other strategies include onBackpressureDrop and onBackpressureBlock.

It's worth noting that Reactive pull backpressure is still a work in progress and the best way to keep up to date with progress is on the RxJava wiki on the subject: https://github.com/ReactiveX/RxJava/wiki/Backpressure.

主站蜘蛛池模板: 鄢陵县| 襄樊市| 青海省| 五家渠市| 无棣县| 淮阳县| 临夏市| 二连浩特市| 青岛市| 龙口市| 舒城县| 正阳县| 横山县| 梅河口市| 南澳县| 庆云县| 桃园县| 长顺县| 福鼎市| 南乐县| 繁昌县| 华容县| 平江县| 桃园县| 清涧县| 民乐县| 迁西县| 乐安县| 连城县| 凌云县| 金山区| 炉霍县| 河池市| 九龙城区| 瑞金市| 曲水县| 铜山县| 阳曲县| 祁东县| 葫芦岛市| 云南省|