官术网_书友最值得收藏!

Backpressure

Another issue we might be faced with is observables that produce items faster than we can consume them. The problem that arises in this scenario is to do with the ever-growing backlog of items.

As an example, think about zipping two observables together. The zip operator (or map in RxClojure) will only emit a new value when all observables have emitted an item.

So, if one of these observables is a lot faster at producing items than the others, map will need to buffer these items and wait for the others, which will most likely cause an error, as shown here:

(defn fast-producing-obs [] 
  (rx/map inc (Observable/interval 1 TimeUnit/MILLISECONDS))) 
 
(defn slow-producing-obs [] 
  (rx/map inc (Observable/interval 500 TimeUnit/MILLISECONDS))) 
 
(rx/subscribe (->> (rx/map vector 
                           (fast-producing-obs) 
                           (slow-producing-obs)) 
                   (rx/map (fn [[x y]] 
                             (+ x y))) 
                   (rx/take 10)) 
              prn-to-repl 
              (fn [e] (prn-to-repl "error is " e))) 
 
;; "error is " #<MissingBackpressureException rx.exceptions.MissingBackpressureException> 

As we can see in the preceding code, we have a fast-producing observable that emits items 500 times faster than the slower observable. Clearly, we can't keep up with it and, surely enough, Rx throws MissingBackpressureException.

What this exception is telling us is that the fast-producing observable doesn't support any type of backpressurewhat Rx calls Reactive pull backpressure—that is, consumers can't tell it to go slower. Thankfully, Rx provides us with combinators, which are helpful in these scenarios.

主站蜘蛛池模板: 稷山县| 新巴尔虎左旗| 蕉岭县| 遂平县| 云龙县| 青浦区| 莲花县| 莱州市| 樟树市| 杨浦区| 长治市| 东丽区| 喀喇| 彭山县| 灵川县| 太仓市| 桃园县| 吴旗县| 焦作市| 银川市| 柘荣县| 石景山区| 汤阴县| 清徐县| 大邑县| 汽车| 报价| 新津县| 崇明县| 封开县| 铜川市| 咸阳市| 革吉县| 通辽市| 龙游县| 抚松县| 衡山县| 宜都市| 咸丰县| 丰城市| 新巴尔虎左旗|