官术网_书友最值得收藏!

Implementing the application code

We already have a project depending on core.async, which we created in the previous section, so we'll be working off that. Let's start by adding an extra dependency on seesaw to our project.clj file:

  :dependencies [[org.clojure/clojure "1.9.0"] 
                 [org.clojure/core.async "0.4.474"] 
                 [seesaw "1.5.0"]] 

Next, we need to create a file called stock_market.clj in the src directory and add the following namespace declaration:

(ns core-async-playground.stock-market 
  (:require [clojure.core.async 
             :refer [go chan <! >! timeout go-loop map>] :as async]) 
  (:require [clojure.core.async.lab :refer [broadcast]]) 
  (:use [seesaw.core])) 

This might be a good point to restart your REPL if you haven't done so. Don't worry about any functions we haven't seen yet. We'll get a feel for them in this section.

The GUI code remains largely unchanged, so no explanation should be necessary for the following snippet:

(native!) 
 
(def main-frame (frame :title "Stock price monitor" 
                       :width 200 :height 100 
                       :on-close :exit)) 
 
(def price-label       (label "Price: -")) 
(def running-avg-label (label "Running average (5): -")) 
 
(config! main-frame :content 
         (border-panel 
          :north  price-label 
          :center running-avg-label 
          :border 5)) 
 
(defn share-price [company-code] 
  (Thread/sleep 200) 
  (rand-int 1000)) 
 
(defn avg [numbers] 
  (float (/ (reduce + numbers) 
            (count numbers)))) 
 
(defn roll-buffer [buffer val buffer-size] 
  (let [buffer (conj buffer val)] 
    (if (> (count buffer) buffer-size) 
      (pop buffer) 
      buffer)))
(defn make-sliding-buffer [buffer-size] 
  (let [buffer (atom clojure.lang.PersistentQueue/EMPTY)] 
    (fn [n] 
      (swap! buffer roll-buffer n buffer-size)))) 
 
(def sliding-buffer (make-sliding-buffer 5)) 

The only difference is that we now have a sliding-buffer function that returns a window of data. This is in contrast to our original application, where the rolling-avg function was responsible for both creating the window and calculating the average. This new design is more general, as it makes this function easier to reuse. The sliding logic is the same, however.

Next, we have our main application logic by using core.async:

(defn broadcast-at-interval [msecs task & ports] 
  (go-loop [out (apply broadcast ports)] 
    (<! (timeout msecs)) 
    (>! out (task)) 
    (recur out))) 
 
(defn -main [& args] 
  (show! main-frame) 
  (let [prices-ch         (chan) 
        sliding-buffer-ch (map> sliding-buffer (chan))] 
    (broadcast-at-interval 500 #(share-price "XYZ") prices-ch sliding-buffer-ch) 
    (go-loop [] 
      (when-let [price (<! prices-ch)] 
        (text! price-label (str "Price: " price)) 
        (recur))) 
    (go-loop [] 
      (when-let [buffer (<! sliding-buffer-ch)] 
        (text! running-avg-label (str "Running average: " (avg buffer))) 
        (recur))))) 

Let's walk through the code.

The first function, broadcast-at-interval, is responsible for creating the broadcasting channel. It receives a variable number of arguments: a number of milliseconds describing the interval, the function representing the task to be executed, and a sequence of one or more output channels. These channels are used to create the broadcasting channel to which the go loop will be writing prices.

Next, we have our main function. The let block is where the interesting bits are. As we discussed in our high-level diagrams, we need two output channels: one for prices and one for the sliding window. They are both created in the following code:

... 
  (let [prices-ch         (chan) 
        sliding-buffer-ch (map> sliding-buffer (chan))] 
... 

prices-ch should be self-explanatory; however, sliding-buffer-ch is using a function we haven't encountered before: map>. This is yet another useful channel constructor in core.async. It takes two arguments: a function and a target channel. It returns a channel that applies this function to each value before writing it to the target channel. An example will help illustrate how it works:

    (def c (map> sliding-buffer (chan 10)))
    (go (doseq [n (range 10)]
          (>! c n)))
    (go (doseq [n (range 10)]
          (prn  (vec (<! c)))))
    
    ;; [0]
    ;; [0 1]
    ;; [0 1 2]
    ;; [0 1 2 3]
    ;; [0 1 2 3 4]
    ;; [1 2 3 4 5]
    ;; [2 3 4 5 6]
    ;; [3 4 5 6 7]
    ;; [4 5 6 7 8]
    ;; [5 6 7 8 9]  

That is, we write a price to the channel and get a sliding window on the other end. Finally, we create the two go blocks containing the side effects. They loop indefinitely, getting values from both channels and updating the user interface.

You can see it in action by running the program from the Terminal:

$ lein run -m core-async-playground.stock-market  
主站蜘蛛池模板: 平度市| 武平县| 谷城县| 宽城| 石台县| 开平市| 乐山市| 昆明市| 腾冲县| 平塘县| 隆尧县| 滨海县| 镇平县| 延安市| 勐海县| 浦县| 阿克苏市| 黔南| 平江县| 蒙阴县| 东山县| 襄汾县| 南郑县| 新乡县| 武安市| 青海省| 伊宁市| 武平县| 鄂温| 临清市| 辛集市| 德昌县| 巴塘县| 永吉县| 弥勒县| 盐山县| 清丰县| 称多县| 巴彦县| 罗山县| 自贡市|