官术网_书友最值得收藏!

Sample

One such combinator is sample, which allows us to sample an observable at a given interval, thus throttling the source observable's output. Let's apply it to our previous example:

(rx/subscribe (->> (rx/map vector 
                           (.sample (fast-producing-obs) 200 
                                    TimeUnit/MILLISECONDS) 
                           (slow-producing-obs)) 
                   (rx/map (fn [[x y]] 
                             (+ x y))) 
                   (rx/take 10)) 
              prn-to-repl 
              (fn [e] (prn-to-repl "error is " e))) 
 
;; 204 
;; 404 
;; 604 
;; 807 
;; 1010 
;; 1206 
;; 1407 
;; 1613 
;; 1813 
;; 2012 

The only change is that we call sample on our fast producing observable before calling map. We will sample it every 200 milliseconds.

By ignoring all other items emitted in this time slice, we have mitigated our initial problem, even though the original observable doesn't support any form of backpressure.

The sample combinator is only one of the combinators that's useful in such cases. Others include throttleFirst, debounce, buffer, and window. One drawback of this approach, however, is that a lot of the items generated end up being ignored.

Depending on the type of application we are building, this might be an acceptable compromise. But what if we are interested in all of the items?

主站蜘蛛池模板: 山丹县| 邢台市| 阿瓦提县| 托克托县| 观塘区| 江川县| 乡宁县| 余江县| SHOW| 岳西县| 温宿县| 星子县| 利川市| 马尔康县| 英德市| 肃南| 靖西县| 临邑县| 雷州市| 台南县| 都安| 曲松县| 德兴市| 绥中县| 永济市| 东丽区| 重庆市| 湖口县| 陆川县| 尼玛县| 双鸭山市| 滕州市| 南昌市| 武强县| 安龙县| 北安市| 石首市| 遂川县| 石景山区| 石城县| 木兰县|