官术网_书友最值得收藏!

Defining schemas manually

So first, we have to import some classes. Follow the code to do this:

import org.apache.spark.sql.types._

So let's define a schema for some CSV file. In order to create one, we can simply write the DataFrame from the previous section to HDFS (again using the Apache Spark Datasoure API):

washing_flat.write.csv("hdfs://localhost:9000/tmp/washing_flat.csv")

Let's double-check the contents of the directory in HDFS:

Finally, double-check the content of one file:

So, we are fine; we've lost the schema information but the rest of the information is preserved. We can see the following if we use the DataSource API to load this CSV again:

This shows you that we've lost the schema information because all columns are identified as strings now and the column names are also lost. Now let's create the schema manually:

val schema = StructType(
StructField("_id",StringType,true)::
StructField("_rev",StringType,true)::
StructField("count",LongType,true)::
StructField("flowrate",LongType,true)::
StructField("fluidlevel",StringType,true)::
StructField("frequency",LongType,true)::
StructField("hardness",LongType,true)::
StructField("speed",LongType,true)::
StructField("temperature",LongType,true)::
StructField("ts",LongType,true)::
StructField("voltage",LongType,true)::
Nil)

If we now load rawRDD, we basically get a list of strings, one string per row:

Now we have to transform this rawRDD into a slightly more usable RDD containing the Row object by splitting the row strings and creating the respective Row objects. In addition, we convert to the appropriate data types where necessary:

Finally, we recreate our data frame object using the following code:

If we now print the schema, we notice that it is the same again:

主站蜘蛛池模板: 灵石县| 井冈山市| 即墨市| 甘谷县| 屏边| 亚东县| 兴海县| 海林市| 英吉沙县| 玉山县| 诸暨市| 裕民县| 遂平县| 滕州市| 双流县| 新巴尔虎左旗| 和静县| 临武县| 五台县| 西峡县| 镇原县| 婺源县| 通山县| 汨罗市| 沙雅县| 东乌珠穆沁旗| 泸州市| 红原县| 石景山区| 龙口市| 桐庐县| 临沧市| 江达县| 临夏市| 曲周县| 专栏| 苏尼特右旗| 松潘县| 邵东县| 新丰县| 丽水市|