官术网_书友最值得收藏!

Sample

One such combinator is sample, which allows us to sample an observable at a given interval, thus throttling the source observable's output. Let's apply it to our previous example:

(rx/subscribe (->> (rx/map vector 
                           (.sample (fast-producing-obs) 200 
                                    TimeUnit/MILLISECONDS) 
                           (slow-producing-obs)) 
                   (rx/map (fn [[x y]] 
                             (+ x y))) 
                   (rx/take 10)) 
              prn-to-repl 
              (fn [e] (prn-to-repl "error is " e))) 
 
;; 204 
;; 404 
;; 604 
;; 807 
;; 1010 
;; 1206 
;; 1407 
;; 1613 
;; 1813 
;; 2012 

The only change is that we call sample on our fast producing observable before calling map. We will sample it every 200 milliseconds.

By ignoring all other items emitted in this time slice, we have mitigated our initial problem, even though the original observable doesn't support any form of backpressure.

The sample combinator is only one of the combinators that's useful in such cases. Others include throttleFirst, debounce, buffer, and window. One drawback of this approach, however, is that a lot of the items generated end up being ignored.

Depending on the type of application we are building, this might be an acceptable compromise. But what if we are interested in all of the items?

主站蜘蛛池模板: 阿克| 呼伦贝尔市| 枞阳县| 洛阳市| 丹寨县| 故城县| 嘉定区| 鄱阳县| 延安市| 文安县| 池州市| 武宣县| 石河子市| 万年县| 黄梅县| 辉县市| 汶川县| 巴东县| 和平县| 嘉义市| 买车| 丰宁| 名山县| 名山县| 潜江市| 深泽县| 澜沧| 鲜城| 左权县| 体育| 扶风县| 罗源县| 正定县| 武宣县| 台东县| 邵阳县| 恭城| 来宾市| 彰武县| 夹江县| 盐池县|