官术网_书友最值得收藏!

Backpressure

Another issue we might be faced with is observables that produce items faster than we can consume them. The problem that arises in this scenario is to do with the ever-growing backlog of items.

As an example, think about zipping two observables together. The zip operator (or map in RxClojure) will only emit a new value when all observables have emitted an item.

So, if one of these observables is a lot faster at producing items than the others, map will need to buffer these items and wait for the others, which will most likely cause an error, as shown here:

(defn fast-producing-obs [] 
  (rx/map inc (Observable/interval 1 TimeUnit/MILLISECONDS))) 
 
(defn slow-producing-obs [] 
  (rx/map inc (Observable/interval 500 TimeUnit/MILLISECONDS))) 
 
(rx/subscribe (->> (rx/map vector 
                           (fast-producing-obs) 
                           (slow-producing-obs)) 
                   (rx/map (fn [[x y]] 
                             (+ x y))) 
                   (rx/take 10)) 
              prn-to-repl 
              (fn [e] (prn-to-repl "error is " e))) 
 
;; "error is " #<MissingBackpressureException rx.exceptions.MissingBackpressureException> 

As we can see in the preceding code, we have a fast-producing observable that emits items 500 times faster than the slower observable. Clearly, we can't keep up with it and, surely enough, Rx throws MissingBackpressureException.

What this exception is telling us is that the fast-producing observable doesn't support any type of backpressurewhat Rx calls Reactive pull backpressure—that is, consumers can't tell it to go slower. Thankfully, Rx provides us with combinators, which are helpful in these scenarios.

主站蜘蛛池模板: 岑溪市| 文安县| 定州市| 邯郸县| 同江市| 南汇区| 天台县| 汨罗市| 乐陵市| 宝坻区| 肥西县| 手机| 禹州市| 黄浦区| 山丹县| 惠安县| 麟游县| 南漳县| 石嘴山市| 常山县| 古蔺县| 海林市| 西峡县| 宜昌市| 辛集市| 拜泉县| 鸡西市| 虞城县| 开江县| 通海县| 门源| 秦皇岛市| 广饶县| 阳信县| 蒙阴县| 海南省| 开封县| 开化县| 凉城县| 凤台县| 辽阳县|