官术网_书友最值得收藏!

SchedulerActor

The relevant parts of the code are displayed and explained. Now let us see how to obtain price data:

def constructUrl(exchange: String): String =
{
"https://min-api.cryptocompare.com/data/histominute?fsym=BTC&tsym=USD&limit=23&aggregate=1&e=" + exchange
}

ConstructUrl returns a completely formed URL for the request to the Cryptocompare API. More details are given in section related to the API:

final val predictionActor = system.actorOf(Props(new PredictionActor(configuration, db)))
final val traderActor = system.actorOf(Props(new TraderActor(ws)))

Creates instances of PredictionActor and TraderActor:

override def receive: Receive = {

The Receive method is defined in the actor trait and has to be implemented. It is triggered when someone passes a message to this actor (Scheduler in our case):

case _ =>
val futureResponse=restClient.getPayload(constructUrl(exchange))

In the preceding code, case _ => means that we react to any message of any type and content. The first thing that is done is an async call to the Cryptocompare API by the URL specified before. This is done with the help of RestClient, which returns Future with the response JSON. After receiving the response (inside futureResponse on complete callback), .json is mapped into the custom case class CryptoCompareResponse:

case class CryptoCompareResponse(Response: String, Type: Int, Aggregated: Boolean, Data: List[OHLC],     FirstValueInArray: Boolean, TimeTo: Long,TimeFrom: Long)

The case class is similar to POJO (Plain Old Java Object) without the need to write constructors and getters/setters:

object CryptoCompareResponse {
implicit val cryptoCompareResponseReads = Json.reads[CryptoCompareResponse]
}

This companion object is required for mapping JSON into this class. The CryptocompareResponse object stores the output of the API—a list of OHLC data, time range of data and others which that are not relevant to us. The OHLC class corresponds to actual price data:

case class OHLC(time: Long, open: Double, 
high: Double,
low: Double,
close: Double,
volumefrom: Double,
volumeto: Double)

After the data is ready, prices are stored in the DB by calling storePriceData(cryptoCompareResponse). At first, it does a batch insert (using Anorm's BatchSQL) into the PRICE_STAGING table and re-inserts into PRICE with deduplication with respect to timestamp, as we are receiving overlapping price data:

val batch = BatchSql(
"""|INSERT INTO PRICE_STAGING(TIMESTAMP,EXCHANGE,PRICE_OPEN,PRICE_CLOSED,VOLUME_BTC,
VOLUME_USD)| VALUES({timestamp}, {exchange}, {priceOpen}, {priceClosed}, {volumeBTC}, {volumeUSD})""".stripMargin,transformedPriceDta.head,transformedPriceDta.tail:_*)
val res: Array[Int] = batch.execute() // array of update count
val reInsert = SQL(
"""
|INSERT INTO PRICE(TIMESTAMP, EXCHANGE, PRICE_OPEN, PRICE_CLOSED, VOLUME_BTC, VOLUME_USD)
|SELECT TIMESTAMP, EXCHANGE, PRICE_OPEN, PRICE_CLOSED, VOLUME_BTC, VOLUME_USD
|FROM PRICE_STAGING AS s
|WHERE NOT EXISTS (
|SELECT *
|FROM PRICE As t
|WHERE t.TIMESTAMP = s.TIMESTAMP
|)
""".stripMargin).execute()
Logger.debug("reinsert " + reInsert)

After storing into the DB, SchedulerActor transforms OHLC data into (timestamp, delta) tuples, where delta is (closePrice-openPrice). So the format is suitable for the ML model. The transformed data is passed as a message to PredictionActor with explicit waiting for a response. This is done by using the ? operator. We ask the prediction actor:

(predictionActor ? CryptoCompareDTOToPredictionModelTransformer.tranform(cryptoCompareResponse)).mapTo[CurrentDataWithShortTermPrediction].map {

Its response is mapped to the CurrentDataWithShortTermPrediction class and passed to TraderActor using the ! operator. Unlike ?, the ! operator does not require a response:

predictedWithCurrent =>
traderActor ! predictedWithCurrent}

This was basic a walkthrough of SchedulerActor. We read data from the Cryptocompare API, store it into the database, send to PredictionActor and wait for its response. Then we forward its response to TraderActor.

Now let's see what happens inside PredictionActor.

主站蜘蛛池模板: 红河县| 宕昌县| 梁山县| 奉贤区| 南昌县| 济源市| 高密市| 若羌县| 东明县| 齐齐哈尔市| 宣恩县| 张家口市| 湖州市| 武宁县| 蒙山县| 阳曲县| 莆田市| 永顺县| 开远市| 海盐县| 遂昌县| 辰溪县| 伊宁县| 乐昌市| 景洪市| 且末县| 丹东市| 新密市| 漳平市| 兰考县| 新野县| 诸城市| 甘南县| 石台县| 信宜市| 天等县| 桂东县| 瑞安市| 拉萨市| 那坡县| 建昌县|