MLlib支持存储在单个机器上的本地向量和矩阵,以及由一个或多个RDD支持的分布式矩阵。 局部向量和局部矩阵是用作公共接口的简单数据模型。 底层线性代数运算由Breeze提供。 在监督学习中使用的训练示例在MLlib中被称为“标记点”。
局部向量具有整数类型和基于0的索引和双类型值,存储在单个机器上。 MLlib支持两种类型的局部向量:密集和稀疏。 密集向量由表示其条目值的双数组支持,而稀疏向量由两个并行数组支持:索引和值。 例如,向量(1.00.03.0)可以密集格式表示为1.00.03.0,或以稀疏格式表示为(3,02,1.03.0),其中3是 矢量的大小。
本地向量的基类是Vector,我们提供了两个实现:DenseVector
和 SparseVector
。 我们建议使用 Vectors
中实现的工厂方法来创建本地向量。
有关API的详细信息,请参阅 Vector
Scala docs 和 Vectors
Scala docs 。
import org.apache.spark.mllib.linalg.{Vector, Vectors} // Create a dense vector (1.0, 0.0, 3.0). val dv: Vector = Vectors.dense(1.0, 0.0, 3.0) // Create a sparse vector (1.0, 0.0, 3.0) by specifying its indices and values corresponding to nonzero entries. val sv1: Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)) // Create a sparse vector (1.0, 0.0, 3.0) by specifying its nonzero entries. val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0)))
注意:Scala默认导入scala.collection.immutable.Vector,因此您必须明确导入org.apache.spark.mllib.linalg.Vector才能使用MLlib的Vector。
标记点是与标签/响应相关联的局部矢量,密集或稀疏。 在MLlib中,标注点用于监督学习算法。 我们使用双重存储标签,所以我们可以在回归和分类中使用标记点。 对于二进制分类,标签应为0(负)或1(正)。 对于多类分类,标签应该是从零开始的类索引:0,1,2,....
LabeledPoint
表示。LabeledPoint
Scala docs 。
import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabeledPoint // Create a labeled point with a positive label and a dense feature vector. val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0)) // Create a labeled point with a negative label and a sparse feature vector. val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))
稀疏数据
LIBSVM
和 LIBLINEAR
使用的默认格式。它是一种文本格式, 每个行都使用以下格式表示一个标记的稀疏特征向量:
label index1:value1 index2:value2 ...
其中指数是 one-based, 按升序排列。加载后, 功能索引将转换为从零开始。
import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDD val examples: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
MLUtils.loadLibSVMFile
读取以LIBSVM格式存储的训练示例。MLUtils
Scala docs
本地矩阵具有整数类型的行和列索引和双类型值,存储在单个机器上。 MLlib支持密集矩阵,其入口值以列主序列存储在单个双阵列中,稀疏矩阵的非零入口值以列主要顺序存储在压缩稀疏列(CSC)格式中。 例如,以下密集矩阵
(3, 2)
的一维数组 [1.0, 3.0, 5.0, 2.0, 4.0, 6.0]
中。import org.apache.spark.mllib.linalg.{Matrix, Matrices} // Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0)) val dm: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0)) // Create a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0)) val sm: Matrix = Matrices.sparse(3, 2, Array(0, 1, 3), Array(0, 2, 1), Array(9, 6, 8))
局部矩阵的基类是Matrix,我们提供了两个实现: DenseMatrix 和
SparseMatrix
。 我们建议使用 Matrices
中实现的工厂方法来创建本地矩阵。 记住,MLlib中的局部矩阵以列主要顺序存储。
有关API的详细信息,请参阅 Matrix
Scala docs 和 Matrices
Scala docs 。
分布式矩阵具有长类型的行和列索引和双类型值,分布式存储在一个或多个RDD中。选择正确的格式来存储大型和分布式矩阵是非常重要的。将分布式矩阵转换为不同的格式可能需要全局shuffle,这是相当昂贵的。到目前为止已经实现了四种类型的分布式矩阵。
基本类型称为RowMatrix。 RowMatrix是没有有意义的行索引的行向分布式矩阵,例如特征向量的集合。它由其行的RDD支持,其中每行是局部向量。我们假设RowMatrix的列数不是很大,因此单个本地向量可以合理地传递给驱动程序,也可以使用单个节点进行存储/操作。 IndexedRowMatrix与RowMatrix类似,但具有行索引,可用于标识行和执行连接。 CoordinateMatrix是以坐标 list(COO) 格式存储的分布式矩阵,由其条目的RDD支持。 BlockMatrix是由MatrixBlock的RDD支持的分布式矩阵,它是(Int,Int,Matrix)的元组。
Note
分布式矩阵的底层RDD必须是确定性的,因为我们缓存矩阵大小。一般来说,使用非确定性RDD可能会导致错误。
RowMatrix是一个面向行的分布式矩阵,没有有意义的行索引,由其行的RDD支持,其中每一行都是一个局部向量。 由于每行由局部向量表示,所以列数受到整数范围的限制,但实际应该要小得多。
import org.apache.spark.mllib.linalg.Vector import org.apache.spark.mllib.linalg.distributed.RowMatrix val rows: RDD[Vector] = ... // an RDD of local vectors // Create a RowMatrix from an RDD[Vector]. val mat: RowMatrix = new RowMatrix(rows) // Get its size. val m = mat.numRows() val n = mat.numCols() // QR decomposition val qrResult = mat.tallSkinnyQR(true)
可以从RDD [Vector]实例创建RowMatrix。 然后我们可以计算其列汇总统计和分解。 QR分解形式为A = QR,其中Q是正交矩阵,R是上三角矩阵。 对于奇异值分解(SVD)和主分量分析(PCA),请参考降维。
有关API的详细信息,请参阅RowMatrix Scala文档。
IndexedRowMatrix
与RowMatrix类似,但具有有意义的行索引。它由索引行的RDD支持,因此每行都由其索引(长类型)和局部向量表示。import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix, RowMatrix} val rows: RDD[IndexedRow] = ... // an RDD of indexed rows // Create an IndexedRowMatrix from an RDD[IndexedRow]. val mat: IndexedRowMatrix = new IndexedRowMatrix(rows) // Get its size. val m = mat.numRows() val n = mat.numCols() // Drop its row indices. val rowMat: RowMatrix = mat.toRowMatrix()
IndexedRowMatrix可以从RDD [IndexedRow]实例创建,其中 IndexedRow
是一个包装器(Long,Vector)。 IndexedRowMatrix可以通过删除其行索引来转换为RowMatrix。
有关API的详细信息,请参阅IndexedRowMatrix Scala文档。
CoordinateMatrix
是由其条目的RDD支持的分布式矩阵。 每个条目是 (i: Long, j: Long, value: Double) 的元组,其中i是行索引,j是列索引,value是条目值。 只有当矩阵的两个维度都很大并且矩阵非常稀疏时才应使用Coordinate矩阵。
import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry} val entries: RDD[MatrixEntry] = ... // an RDD of matrix entries // Create a CoordinateMatrix from an RDD[MatrixEntry]. val mat: CoordinateMatrix = new CoordinateMatrix(entries) // Get its size. val m = mat.numRows() val n = mat.numCols() // Convert it to an IndexRowMatrix whose rows are sparse vectors. val indexedRowMatrix = mat.toIndexedRowMatrix()
可以从RDD [MatrixEntry]实例创建一个CoordinateMatrix,其中 MatrixEntry
是一个包装器 (Long, Long, Double) 。 可以通过调用toIndexedRowMatrix将CoordinateMatrix转换为具有稀疏行的IndexedRowMatrix。 目前不支持CoordinateMatrix的其他计算。
有关API的详细信息,请参阅CoordinateMatrix Scala文档。
BlockMatrix是由MatrixBlocks的RDD支持的分布式矩阵,其中MatrixBlock是 ((Int, Int), Matrix)
的元组,其中 (Int, Int)
是块的索引,Matrix是子矩阵, 在给定索引处的矩阵大小为rowsPerBlock x colsPerBlock。 BlockMatrix支持诸如添加和乘以另一个BlockMatrix的方法。 BlockMatrix还有一个帮助函数validate,可用于检查BlockMatrix是否正确设置。
import org.apache.spark.mllib.linalg.distributed.{BlockMatrix, CoordinateMatrix, MatrixEntry} val entries: RDD[MatrixEntry] = ... // an RDD of (i, j, v) matrix entries // Create a CoordinateMatrix from an RDD[MatrixEntry]. val coordMat: CoordinateMatrix = new CoordinateMatrix(entries) // Transform the CoordinateMatrix to a BlockMatrix val matA: BlockMatrix = coordMat.toBlockMatrix().cache() // Validate whether the BlockMatrix is set up properly. Throws an Exception when it is not valid. // Nothing happens if it is valid. matA.validate() // Calculate A^T A. val ata = matA.transpose.multiply(matA)
通过调用toBlockMatrix,可以通过IndexedRowMatrix或CoordinateMatrix创建 BlockMatrix
。 toBlockMatrix默认创建大小为1024 x 1024的块。 用户可以通过toBlockMatrix(rowsPerBlock,colsPerBlock)提供值来更改块大小。
有关API的详细信息,请参阅BlockMatrix Scala文档。