官术网_书友最值得收藏!

Data preprocessing

Taking into account the goals of data preparation, Scala was chosen as an easy and interactive way to manipulate data:

val priceDataFileName: String = "bitstampUSD_1-min_data_2012-01-01_to_2017-10-20.csv"

val spark = SparkSession
.builder()
.master("local[*]")
.config("spark.sql.warehouse.dir", "E:/Exp/")
.appName("Bitcoin Preprocessing")
.getOrCreate()

val data = spark.read.format("com.databricks.spark.csv").option("header", "true").load(priceDataFileName)
data.show(10)
>>>
Figure 5: A glimpse of the Bitcoin historical price dataset
println((data.count(), data.columns.size))

>>>

(3045857, 8)

In the preceding code, we load data from the file downloaded from Kaggle and look at what is inside. There are 3045857 rows in the dataset and 8 columns, described before. Then we create the Delta column, containing the difference between closing and opening prices (that is, to consider only that data where meaningful trading has started to occur):

val dataWithDelta = data.withColumn("Delta", data("Close") - data("Open"))

The following code labels our data by assigning 1 to the rows the Delta value of which was positive; it assigns 0 otherwise:

import org.apache.spark.sql.functions._
import spark.sqlContext.implicits._

val dataWithLabels = dataWithDelta.withColumn("label", when($"Close" - $"Open" > 0, 1).otherwise(0))
rollingWindow(dataWithLabels, 22, outputDataFilePath, outputLabelFilePath)

This code transforms the original dataset into time series data. It takes the Delta values of WINDOW_SIZE rows (22 in this experiment) and makes a new row out of them. In this way, the first row has Delta values from t0 to t21, and the second one has values from t1 to t22. Then we create the corresponding array with labels (1 or 0).

Finally, we save X and Y into files where 612000 rows were cut off from the original dataset; 22 means rolling window size and 2 classes represents that labels are binary 0 and 1:

val dropFirstCount: Int = 612000

def rollingWindow(data: DataFrame, window: Int, xFilename: String, yFilename: String): Unit = {
var i = 0
val xWriter = new BufferedWriter(new FileWriter(new File(xFilename)))
val yWriter = new BufferedWriter(new FileWriter(new File(yFilename)))
val zippedData = data.rdd.zipWithIndex().collect()
System.gc()
val dataStratified = zippedData.drop(dropFirstCount)//slice 612K

while (i < (dataStratified.length - window)) {
val x = dataStratified
.slice(i, i + window)
.map(r => r._1.getAs[Double]("Delta")).toList
val y = dataStratified.apply(i + window)._1.getAs[Integer]("label")
val stringToWrite = x.mkString(",")
xWriter.write(stringToWrite + "n")
yWriter.write(y + "n")
i += 1

if (i % 10 == 0) {
xWriter.flush()
yWriter.flush()
}
}
xWriter.close()
yWriter.close()
}

In the preceding code segment:

val outputDataFilePath: String = "output/scala_test_x.csv"
val outputLabelFilePath: String = "output/scala_test_y.csv"
主站蜘蛛池模板: 正镶白旗| 晴隆县| 永宁县| 冀州市| 大余县| 得荣县| 富源县| 静安区| 衡山县| 博爱县| 若尔盖县| 宽城| 军事| 清徐县| 广西| 澄迈县| 临沭县| 宁都县| 姜堰市| 辽阳县| 陈巴尔虎旗| 饶阳县| 剑川县| 广东省| 横峰县| 靖江市| 昌平区| 佛山市| 乡城县| 东乌珠穆沁旗| 南开区| 通河县| 沙坪坝区| 漳州市| 石棉县| 收藏| 河源市| 壤塘县| 剑河县| 邮箱| 东乡县|