官术网_书友最值得收藏!

Backpressure strategies

If an observable doesn't support backpressure but we are still interested in all of the items it emits, we can use one of the built-in backpressure combinators provided by Rx.

As an example, we will look at one such combinator, onBackpressureBuffer:

(rx/subscribe (->> (rx/map vector 
(.onBackpressureBuffer (fast-producing-obs))
(slow-producing-obs))
(rx/map (fn [[x y]]
(+ x y)))
(rx/take 10))
prn-to-repl
(fn [e] (prn-to-repl "error is " e))) ;; 2 ;; 4 ;; 6 ;; 8 ;; 10 ;; 12 ;; 14 ;; 16 ;; 18 ;; 20

This example is very similar to the one where we used sample, but the output is fairly different. This time, we get all of the items that have been emitted by both observables.

The onBackpressureBuffer strategy implements a strategy that simply buffers all of the items that are emitted by the slower Observable, emitting them whenever the consumer is ready. In our case, this happens every 500 milliseconds.

Other strategies include onBackpressureDrop and onBackpressureBlock.

It's worth noting that Reactive pull backpressure is still a work in progress and the best way to keep up to date with progress is on the RxJava wiki on the subject: https://github.com/ReactiveX/RxJava/wiki/Backpressure.

主站蜘蛛池模板: 临沭县| 江源县| 北辰区| 海宁市| 双辽市| 弥渡县| 福清市| 资源县| 宁城县| 衡阳市| 青海省| 翁牛特旗| 开远市| 米泉市| 洪湖市| 巴塘县| 广西| 敦化市| 昭通市| 迭部县| 育儿| 临漳县| 拉孜县| 改则县| 四子王旗| 闸北区| 明溪县| 揭东县| 普格县| 抚州市| 壶关县| 岢岚县| 桦南县| 绥江县| 兖州市| 洛宁县| 南和县| 凤台县| 安康市| 潢川县| 叶城县|