官术网_书友最值得收藏!

Backpressure strategies

If an observable doesn't support backpressure but we are still interested in all of the items it emits, we can use one of the built-in backpressure combinators provided by Rx.

As an example, we will look at one such combinator, onBackpressureBuffer:

(rx/subscribe (->> (rx/map vector 
(.onBackpressureBuffer (fast-producing-obs))
(slow-producing-obs))
(rx/map (fn [[x y]]
(+ x y)))
(rx/take 10))
prn-to-repl
(fn [e] (prn-to-repl "error is " e))) ;; 2 ;; 4 ;; 6 ;; 8 ;; 10 ;; 12 ;; 14 ;; 16 ;; 18 ;; 20

This example is very similar to the one where we used sample, but the output is fairly different. This time, we get all of the items that have been emitted by both observables.

The onBackpressureBuffer strategy implements a strategy that simply buffers all of the items that are emitted by the slower Observable, emitting them whenever the consumer is ready. In our case, this happens every 500 milliseconds.

Other strategies include onBackpressureDrop and onBackpressureBlock.

It's worth noting that Reactive pull backpressure is still a work in progress and the best way to keep up to date with progress is on the RxJava wiki on the subject: https://github.com/ReactiveX/RxJava/wiki/Backpressure.

主站蜘蛛池模板: 宁蒗| 睢宁县| 闸北区| 石城县| 巴东县| 田阳县| 莱芜市| 比如县| 镇巴县| 贵州省| 吴桥县| 大姚县| 新野县| 房产| 吉隆县| 友谊县| 九龙县| 兴海县| 泰顺县| 荥经县| 新巴尔虎左旗| 称多县| 天长市| 错那县| 阜城县| 乌鲁木齐市| 新郑市| 禹城市| 金阳县| 沾益县| 乌兰县| 丰都县| 柞水县| 隆林| 吉木萨尔县| 青铜峡市| 西贡区| 武宣县| 丹阳市| 台中市| 弋阳县|