官术网_书友最值得收藏!

Distributed matrix in Spark

A distributed matrix has long-type row and column indices. It has double-typed values, stored distributively in one or more RDDs. Four different types of distributed matrices have been implemented in Spark. All of them are subclasses of DistributedMatrix.

RowMatrix: A RowMatrix is a row-oriented distributed matrix without meaningful row indices. (In a row-oriented matrix, consecutive elements of the rows of an array are contiguous in memory). RowMatrix is implemented as an RDD of its rows. Each row is a local vector. The number of columns must be less than or equal to 2^31 for a RowMatrix so that a single local vector is communicated to the driver, and can also be stored or operated on using a single node.

The following example shows how a row matrix (dense and sparse) is created from the Vectors class:

val spConfig = (new 
SparkConf).setMaster("local").setAppName("SparkApp")
val sc = new SparkContext(spConfig)
val denseData = Seq(
Vectors.dense(0.0, 1.0, 2.1),
Vectors.dense(3.0, 2.0, 4.0),
Vectors.dense(5.0, 7.0, 8.0),
Vectors.dense(9.0, 0.0, 1.1)
)
val sparseData = Seq(
Vectors.sparse(3, Seq((1, 1.0), (2, 2.1))),
Vectors.sparse(3, Seq((0, 3.0), (1, 2.0), (2, 4.0))),
Vectors.sparse(3, Seq((0, 5.0), (1, 7.0), (2, 8.0))),
Vectors.sparse(3, Seq((0, 9.0), (2, 1.0)))
)

val denseMat = new RowMatrix(sc.parallelize(denseData, 2))
val sparseMat = new RowMatrix(sc.parallelize(sparseData, 2))

println("Dense Matrix - Num of Rows :" + denseMat.numRows())
println("Dense Matrix - Num of Cols:" + denseMat.numCols())
println("Sparse Matrix - Num of Rows :" + sparseMat.numRows())
println("Sparse Matrix - Num of Cols:" + sparseMat.numCols())

sc.stop()

Output of the preceding code is as follows:

Using Spark's default log4j profile: 
org/apache/spark/log4j-
defaults.properties

16/01/27 04:51:59 INFO SparkContext: Running Spark version
1.6.0

Dense Matrix - Num of Rows :4
Dense Matrix - Num of Cols:3
...
Sparse Matrix - Num of Rows :4
Sparse Matrix - Num of Cols :3

IndexedRowMatrix: IndexedRowMatrix is similar to a RowMatrix, but with row indices, which can be used for identifying rows and executing joins. In the following code listing, we create a 4x3 IndexedMatrix with appropriate row indices:

val data = Seq(
(0L, Vectors.dense(0.0, 1.0, 2.0)),
(1L, Vectors.dense(3.0, 4.0, 5.0)),
(3L, Vectors.dense(9.0, 0.0, 1.0))
).map(x => IndexedRow(x._1, x._2))
val indexedRows: RDD[IndexedRow] = sc.parallelize(data, 2)
val indexedRowsMat = new IndexedRowMatrix(indexedRows)
println("Indexed Row Matrix - No of Rows: " +
indexedRowsMat.numRows())

println("Indexed Row Matrix - No of Cols: " +
indexedRowsMat.numCols())

Output of the code listing above is as follows:

Indexed Row Matrix - No of Rows: 4
Indexed Row Matrix - No of Cols: 3

CoordinateMatrix: This is a distributed matrix stored in a coordinate list (COO) format, backed by an RDD of its entries.

The COO format stores a list of (row, column, value) tuples. Entries are sorted (row index, then column index) to improve random access times. This format is good for incremental matrix construction.

val entries = sc.parallelize(Seq( 
(0, 0, 1.0),
(0, 1, 2.0),
(1, 1, 3.0),
(1, 2, 4.0),
(2, 2, 5.0),
(2, 3, 6.0),
(3, 0, 7.0),
(3, 3, 8.0),
(4, 1, 9.0)), 3).map { case (i, j, value) =>
MatrixEntry(i, j, value)
}
val coordinateMat = new CoordinateMatrix(entries)
println("Coordinate Matrix - No of Rows: " +
coordinateMat.numRows())
println("Coordinate Matrix - No of Cols: " +
coordinateMat.numCols())

The output of the preceding code is as follows:

Coordinate Matrix - No of Rows: 5
Coordinate - No of Cols: 4
主站蜘蛛池模板: 扎鲁特旗| 理塘县| 沁水县| 赤城县| 隆尧县| 当涂县| 全椒县| 涞源县| 麻阳| 太仓市| 吉安市| 德庆县| 汤阴县| 崇州市| 犍为县| 宿迁市| 积石山| 三门县| 荣昌县| 宜兰市| 西城区| 洞口县| 凤庆县| 沂南县| 文安县| 随州市| 平阳县| 华坪县| 巴马| 静海县| 固安县| 集安市| 洮南市| 横山县| 常州市| 洛川县| 新巴尔虎左旗| 集安市| 特克斯县| 海淀区| 东平县|