官术网_书友最值得收藏!

SchedulerActor

The relevant parts of the code are displayed and explained. Now let us see how to obtain price data:

def constructUrl(exchange: String): String =
{
"https://min-api.cryptocompare.com/data/histominute?fsym=BTC&tsym=USD&limit=23&aggregate=1&e=" + exchange
}

ConstructUrl returns a completely formed URL for the request to the Cryptocompare API. More details are given in section related to the API:

final val predictionActor = system.actorOf(Props(new PredictionActor(configuration, db)))
final val traderActor = system.actorOf(Props(new TraderActor(ws)))

Creates instances of PredictionActor and TraderActor:

override def receive: Receive = {

The Receive method is defined in the actor trait and has to be implemented. It is triggered when someone passes a message to this actor (Scheduler in our case):

case _ =>
val futureResponse=restClient.getPayload(constructUrl(exchange))

In the preceding code, case _ => means that we react to any message of any type and content. The first thing that is done is an async call to the Cryptocompare API by the URL specified before. This is done with the help of RestClient, which returns Future with the response JSON. After receiving the response (inside futureResponse on complete callback), .json is mapped into the custom case class CryptoCompareResponse:

case class CryptoCompareResponse(Response: String, Type: Int, Aggregated: Boolean, Data: List[OHLC],     FirstValueInArray: Boolean, TimeTo: Long,TimeFrom: Long)

The case class is similar to POJO (Plain Old Java Object) without the need to write constructors and getters/setters:

object CryptoCompareResponse {
implicit val cryptoCompareResponseReads = Json.reads[CryptoCompareResponse]
}

This companion object is required for mapping JSON into this class. The CryptocompareResponse object stores the output of the API—a list of OHLC data, time range of data and others which that are not relevant to us. The OHLC class corresponds to actual price data:

case class OHLC(time: Long, open: Double, 
high: Double,
low: Double,
close: Double,
volumefrom: Double,
volumeto: Double)

After the data is ready, prices are stored in the DB by calling storePriceData(cryptoCompareResponse). At first, it does a batch insert (using Anorm's BatchSQL) into the PRICE_STAGING table and re-inserts into PRICE with deduplication with respect to timestamp, as we are receiving overlapping price data:

val batch = BatchSql(
"""|INSERT INTO PRICE_STAGING(TIMESTAMP,EXCHANGE,PRICE_OPEN,PRICE_CLOSED,VOLUME_BTC,
VOLUME_USD)| VALUES({timestamp}, {exchange}, {priceOpen}, {priceClosed}, {volumeBTC}, {volumeUSD})""".stripMargin,transformedPriceDta.head,transformedPriceDta.tail:_*)
val res: Array[Int] = batch.execute() // array of update count
val reInsert = SQL(
"""
|INSERT INTO PRICE(TIMESTAMP, EXCHANGE, PRICE_OPEN, PRICE_CLOSED, VOLUME_BTC, VOLUME_USD)
|SELECT TIMESTAMP, EXCHANGE, PRICE_OPEN, PRICE_CLOSED, VOLUME_BTC, VOLUME_USD
|FROM PRICE_STAGING AS s
|WHERE NOT EXISTS (
|SELECT *
|FROM PRICE As t
|WHERE t.TIMESTAMP = s.TIMESTAMP
|)
""".stripMargin).execute()
Logger.debug("reinsert " + reInsert)

After storing into the DB, SchedulerActor transforms OHLC data into (timestamp, delta) tuples, where delta is (closePrice-openPrice). So the format is suitable for the ML model. The transformed data is passed as a message to PredictionActor with explicit waiting for a response. This is done by using the ? operator. We ask the prediction actor:

(predictionActor ? CryptoCompareDTOToPredictionModelTransformer.tranform(cryptoCompareResponse)).mapTo[CurrentDataWithShortTermPrediction].map {

Its response is mapped to the CurrentDataWithShortTermPrediction class and passed to TraderActor using the ! operator. Unlike ?, the ! operator does not require a response:

predictedWithCurrent =>
traderActor ! predictedWithCurrent}

This was basic a walkthrough of SchedulerActor. We read data from the Cryptocompare API, store it into the database, send to PredictionActor and wait for its response. Then we forward its response to TraderActor.

Now let's see what happens inside PredictionActor.

主站蜘蛛池模板: 乌苏市| 壤塘县| 苍南县| 嘉禾县| 山西省| 乌鲁木齐市| 宁武县| 郑州市| 建瓯市| 临西县| 南城县| 青河县| 行唐县| 惠东县| 延川县| 安达市| 黑山县| 临安市| 宁远县| 宁津县| 汉川市| 襄樊市| 盐城市| 沅江市| 芜湖市| 固阳县| 库尔勒市| 都匀市| 平江县| 衡水市| 柞水县| 科技| 北川| 从化市| 东丰县| 宾川县| 上饶市| 临夏市| 星座| 淮北市| 剑川县|