- Scala Machine Learning Projects
- Md. Rezaul Karim
- 364字
- 2021-06-30 19:05:39
Data preprocessing
Taking into account the goals of data preparation, Scala was chosen as an easy and interactive way to manipulate data:
val priceDataFileName: String = "bitstampUSD_1-min_data_2012-01-01_to_2017-10-20.csv"
val spark = SparkSession
.builder()
.master("local[*]")
.config("spark.sql.warehouse.dir", "E:/Exp/")
.appName("Bitcoin Preprocessing")
.getOrCreate()
val data = spark.read.format("com.databricks.spark.csv").option("header", "true").load(priceDataFileName)
data.show(10)
>>>

println((data.count(), data.columns.size))
>>>
(3045857, 8)
In the preceding code, we load data from the file downloaded from Kaggle and look at what is inside. There are 3045857 rows in the dataset and 8 columns, described before. Then we create the Delta column, containing the difference between closing and opening prices (that is, to consider only that data where meaningful trading has started to occur):
val dataWithDelta = data.withColumn("Delta", data("Close") - data("Open"))
The following code labels our data by assigning 1 to the rows the Delta value of which was positive; it assigns 0 otherwise:
import org.apache.spark.sql.functions._
import spark.sqlContext.implicits._
val dataWithLabels = dataWithDelta.withColumn("label", when($"Close" - $"Open" > 0, 1).otherwise(0))
rollingWindow(dataWithLabels, 22, outputDataFilePath, outputLabelFilePath)
This code transforms the original dataset into time series data. It takes the Delta values of WINDOW_SIZE rows (22 in this experiment) and makes a new row out of them. In this way, the first row has Delta values from t0 to t21, and the second one has values from t1 to t22. Then we create the corresponding array with labels (1 or 0).
Finally, we save X and Y into files where 612000 rows were cut off from the original dataset; 22 means rolling window size and 2 classes represents that labels are binary 0 and 1:
val dropFirstCount: Int = 612000
def rollingWindow(data: DataFrame, window: Int, xFilename: String, yFilename: String): Unit = {
var i = 0
val xWriter = new BufferedWriter(new FileWriter(new File(xFilename)))
val yWriter = new BufferedWriter(new FileWriter(new File(yFilename)))
val zippedData = data.rdd.zipWithIndex().collect()
System.gc()
val dataStratified = zippedData.drop(dropFirstCount)//slice 612K
while (i < (dataStratified.length - window)) {
val x = dataStratified
.slice(i, i + window)
.map(r => r._1.getAs[Double]("Delta")).toList
val y = dataStratified.apply(i + window)._1.getAs[Integer]("label")
val stringToWrite = x.mkString(",")
xWriter.write(stringToWrite + "n")
yWriter.write(y + "n")
i += 1
if (i % 10 == 0) {
xWriter.flush()
yWriter.flush()
}
}
xWriter.close()
yWriter.close()
}
In the preceding code segment:
val outputDataFilePath: String = "output/scala_test_x.csv"
val outputLabelFilePath: String = "output/scala_test_y.csv"
- Mastering Mesos
- 手把手教你玩轉(zhuǎn)RPA:基于UiPath和Blue Prism
- 21天學(xué)通C++
- 自動化控制工程設(shè)計
- Supervised Machine Learning with Python
- Visual C++編程全能詞典
- Implementing Oracle API Platform Cloud Service
- 西門子變頻器技術(shù)入門及實踐
- 網(wǎng)絡(luò)安全技術(shù)及應(yīng)用
- 格蠹匯編
- Salesforce Advanced Administrator Certification Guide
- 空間機器人
- Redash v5 Quick Start Guide
- PostgreSQL 10 High Performance
- 教育創(chuàng)新與創(chuàng)新人才:信息技術(shù)人才培養(yǎng)改革之路(四)