官术网_书友最值得收藏!

SchedulerActor

The relevant parts of the code are displayed and explained. Now let us see how to obtain price data:

def constructUrl(exchange: String): String =
{
"https://min-api.cryptocompare.com/data/histominute?fsym=BTC&tsym=USD&limit=23&aggregate=1&e=" + exchange
}

ConstructUrl returns a completely formed URL for the request to the Cryptocompare API. More details are given in section related to the API:

final val predictionActor = system.actorOf(Props(new PredictionActor(configuration, db)))
final val traderActor = system.actorOf(Props(new TraderActor(ws)))

Creates instances of PredictionActor and TraderActor:

override def receive: Receive = {

The Receive method is defined in the actor trait and has to be implemented. It is triggered when someone passes a message to this actor (Scheduler in our case):

case _ =>
val futureResponse=restClient.getPayload(constructUrl(exchange))

In the preceding code, case _ => means that we react to any message of any type and content. The first thing that is done is an async call to the Cryptocompare API by the URL specified before. This is done with the help of RestClient, which returns Future with the response JSON. After receiving the response (inside futureResponse on complete callback), .json is mapped into the custom case class CryptoCompareResponse:

case class CryptoCompareResponse(Response: String, Type: Int, Aggregated: Boolean, Data: List[OHLC],     FirstValueInArray: Boolean, TimeTo: Long,TimeFrom: Long)

The case class is similar to POJO (Plain Old Java Object) without the need to write constructors and getters/setters:

object CryptoCompareResponse {
implicit val cryptoCompareResponseReads = Json.reads[CryptoCompareResponse]
}

This companion object is required for mapping JSON into this class. The CryptocompareResponse object stores the output of the API—a list of OHLC data, time range of data and others which that are not relevant to us. The OHLC class corresponds to actual price data:

case class OHLC(time: Long, open: Double, 
high: Double,
low: Double,
close: Double,
volumefrom: Double,
volumeto: Double)

After the data is ready, prices are stored in the DB by calling storePriceData(cryptoCompareResponse). At first, it does a batch insert (using Anorm's BatchSQL) into the PRICE_STAGING table and re-inserts into PRICE with deduplication with respect to timestamp, as we are receiving overlapping price data:

val batch = BatchSql(
"""|INSERT INTO PRICE_STAGING(TIMESTAMP,EXCHANGE,PRICE_OPEN,PRICE_CLOSED,VOLUME_BTC,
VOLUME_USD)| VALUES({timestamp}, {exchange}, {priceOpen}, {priceClosed}, {volumeBTC}, {volumeUSD})""".stripMargin,transformedPriceDta.head,transformedPriceDta.tail:_*)
val res: Array[Int] = batch.execute() // array of update count
val reInsert = SQL(
"""
|INSERT INTO PRICE(TIMESTAMP, EXCHANGE, PRICE_OPEN, PRICE_CLOSED, VOLUME_BTC, VOLUME_USD)
|SELECT TIMESTAMP, EXCHANGE, PRICE_OPEN, PRICE_CLOSED, VOLUME_BTC, VOLUME_USD
|FROM PRICE_STAGING AS s
|WHERE NOT EXISTS (
|SELECT *
|FROM PRICE As t
|WHERE t.TIMESTAMP = s.TIMESTAMP
|)
""".stripMargin).execute()
Logger.debug("reinsert " + reInsert)

After storing into the DB, SchedulerActor transforms OHLC data into (timestamp, delta) tuples, where delta is (closePrice-openPrice). So the format is suitable for the ML model. The transformed data is passed as a message to PredictionActor with explicit waiting for a response. This is done by using the ? operator. We ask the prediction actor:

(predictionActor ? CryptoCompareDTOToPredictionModelTransformer.tranform(cryptoCompareResponse)).mapTo[CurrentDataWithShortTermPrediction].map {

Its response is mapped to the CurrentDataWithShortTermPrediction class and passed to TraderActor using the ! operator. Unlike ?, the ! operator does not require a response:

predictedWithCurrent =>
traderActor ! predictedWithCurrent}

This was basic a walkthrough of SchedulerActor. We read data from the Cryptocompare API, store it into the database, send to PredictionActor and wait for its response. Then we forward its response to TraderActor.

Now let's see what happens inside PredictionActor.

主站蜘蛛池模板: 梁平县| 安平县| 德兴市| 资中县| 文山县| 皮山县| 田林县| 永州市| 青田县| 合肥市| 屏边| 岑巩县| 华容县| 建阳市| 额敏县| 韶山市| 丰台区| 筠连县| 房产| 乐昌市| 崇义县| 延川县| 当阳市| 五原县| 普宁市| 阜新市| 永昌县| 闽侯县| 博湖县| 德钦县| 伊金霍洛旗| 中卫市| 新乡县| 中江县| 周至县| 黔南| 龙井市| 龙州县| 利川市| 祁门县| 海林市|