官术网_书友最值得收藏!

Distributed matrix in Spark

A distributed matrix has long-type row and column indices. It has double-typed values, stored distributively in one or more RDDs. Four different types of distributed matrices have been implemented in Spark. All of them are subclasses of DistributedMatrix.

RowMatrix: A RowMatrix is a row-oriented distributed matrix without meaningful row indices. (In a row-oriented matrix, consecutive elements of the rows of an array are contiguous in memory). RowMatrix is implemented as an RDD of its rows. Each row is a local vector. The number of columns must be less than or equal to 2^31 for a RowMatrix so that a single local vector is communicated to the driver, and can also be stored or operated on using a single node.

The following example shows how a row matrix (dense and sparse) is created from the Vectors class:

val spConfig = (new 
SparkConf).setMaster("local").setAppName("SparkApp")
val sc = new SparkContext(spConfig)
val denseData = Seq(
Vectors.dense(0.0, 1.0, 2.1),
Vectors.dense(3.0, 2.0, 4.0),
Vectors.dense(5.0, 7.0, 8.0),
Vectors.dense(9.0, 0.0, 1.1)
)
val sparseData = Seq(
Vectors.sparse(3, Seq((1, 1.0), (2, 2.1))),
Vectors.sparse(3, Seq((0, 3.0), (1, 2.0), (2, 4.0))),
Vectors.sparse(3, Seq((0, 5.0), (1, 7.0), (2, 8.0))),
Vectors.sparse(3, Seq((0, 9.0), (2, 1.0)))
)

val denseMat = new RowMatrix(sc.parallelize(denseData, 2))
val sparseMat = new RowMatrix(sc.parallelize(sparseData, 2))

println("Dense Matrix - Num of Rows :" + denseMat.numRows())
println("Dense Matrix - Num of Cols:" + denseMat.numCols())
println("Sparse Matrix - Num of Rows :" + sparseMat.numRows())
println("Sparse Matrix - Num of Cols:" + sparseMat.numCols())

sc.stop()

Output of the preceding code is as follows:

Using Spark's default log4j profile: 
org/apache/spark/log4j-
defaults.properties

16/01/27 04:51:59 INFO SparkContext: Running Spark version
1.6.0

Dense Matrix - Num of Rows :4
Dense Matrix - Num of Cols:3
...
Sparse Matrix - Num of Rows :4
Sparse Matrix - Num of Cols :3

IndexedRowMatrix: IndexedRowMatrix is similar to a RowMatrix, but with row indices, which can be used for identifying rows and executing joins. In the following code listing, we create a 4x3 IndexedMatrix with appropriate row indices:

val data = Seq(
(0L, Vectors.dense(0.0, 1.0, 2.0)),
(1L, Vectors.dense(3.0, 4.0, 5.0)),
(3L, Vectors.dense(9.0, 0.0, 1.0))
).map(x => IndexedRow(x._1, x._2))
val indexedRows: RDD[IndexedRow] = sc.parallelize(data, 2)
val indexedRowsMat = new IndexedRowMatrix(indexedRows)
println("Indexed Row Matrix - No of Rows: " +
indexedRowsMat.numRows())

println("Indexed Row Matrix - No of Cols: " +
indexedRowsMat.numCols())

Output of the code listing above is as follows:

Indexed Row Matrix - No of Rows: 4
Indexed Row Matrix - No of Cols: 3

CoordinateMatrix: This is a distributed matrix stored in a coordinate list (COO) format, backed by an RDD of its entries.

The COO format stores a list of (row, column, value) tuples. Entries are sorted (row index, then column index) to improve random access times. This format is good for incremental matrix construction.

val entries = sc.parallelize(Seq( 
(0, 0, 1.0),
(0, 1, 2.0),
(1, 1, 3.0),
(1, 2, 4.0),
(2, 2, 5.0),
(2, 3, 6.0),
(3, 0, 7.0),
(3, 3, 8.0),
(4, 1, 9.0)), 3).map { case (i, j, value) =>
MatrixEntry(i, j, value)
}
val coordinateMat = new CoordinateMatrix(entries)
println("Coordinate Matrix - No of Rows: " +
coordinateMat.numRows())
println("Coordinate Matrix - No of Cols: " +
coordinateMat.numCols())

The output of the preceding code is as follows:

Coordinate Matrix - No of Rows: 5
Coordinate - No of Cols: 4
主站蜘蛛池模板: 霍州市| 五原县| 元江| 辽源市| 福泉市| 樟树市| 盐津县| 博乐市| 兴化市| 蕲春县| 茂名市| 四川省| 林周县| 青铜峡市| 武隆县| 江川县| 石阡县| 榆中县| 阿合奇县| 夏河县| 当雄县| 仁布县| 连州市| 盱眙县| 明星| 尼玛县| 彭州市| 泸溪县| 青冈县| 黄冈市| 南京市| 隆子县| 武冈市| 根河市| 光山县| 乳源| 西华县| 伊宁县| 桃源县| 桓仁| 上饶市|