注意 我们建议使用基于 DataFrame 的 API,这在 ML 的关于 TF-IDF 的用户指南中有详细介绍。
词频-逆文档频率(TF-IDF)是一种在文本挖掘中广泛使用的特征向量化方法,以反映字词在语料库中的重要性。用 t 表示词频,d 表示文档,和 D 表示语料库。词频 TF(t,d) 是指字词 t 在文档 d 中出现的次数,而文档频率 DF(t,D)
是指语料库 D 含有包含字词 t 的文档数
。如果我们只使用词频来衡量其重要性,就会很容易过分强调重出现次数多但携带信息少的单词,例如 "a",“the” 和 "of"。 如果某个字词在语料库中高频出现,这意味着它不包含关于特定文档的特殊信息。 逆文档频率是单词携带信息量的数值度量:
其中 |D|是语料中的文档总数。由于使用了log计算,如果单词在所有文档中出现,那么IDF就等于0。注意这里做了平滑处理(+1操作),防止字词没有在语料中出现时IDF计算中除0。 TF-IDF度量是TF和IDF的简单相乘:
事实上词频和文档频率的定义有多重变体。在 spark.mllib 中,为了灵活性我们将TF和IDF分开处理。
MLlib中词频统计的实现使用了hashing trick(散列技巧),也就是使用哈希函数将原始特征映射到一个索引(字词)。然后基于这个索引来计算词频。这个方法避免了全局的单词到索引的映射,全局映射对于大量语料有非常昂贵的计算/存储开销;但是该方法也带来了潜在哈希冲突的问题,不同原始特征可能会被映射到相同的索引。为了减少冲突率,我们可以提升目标特征的维度,换句话说,哈希表中桶的数量。默认特征维度是220 = 1048576。
注意:spark.mllib 没有提供文本分段(例如分词)的工具。用户可以参考 Stanford NLP Group 和 scalanlp/chalk。
TF和IDF分别在类HashingTF and IDF中实现。HashingTF 以 RDD[Iterable[_]] 为输入。每条记录是可遍历的字符串或者其他类型。
import org.apache.spark.mllib.feature.{HashingTF, IDF} import org.apache.spark.mllib.linalg.Vector import org.apache.spark.rdd.RDD // 加载文件(每行一个)。 val documents: RDD[Seq[String]] = sc.textFile("data/mllib/kmeans_data.txt") .map(_.split(" ").toSeq) val hashingTF = new HashingTF() val tf: RDD[Vector] = hashingTF.transform(documents) // 在应用 HashingTF 时,只需要单次传递数据,应用 IDF 需要两次: // 首先计算 IDF 向量,其次用 IDF 来缩放字词频率。 tf.cache() val idf = new IDF().fit(tf) val tfidf: RDD[Vector] = idf.transform(tf) // spark.mllib IDF 实现提供了忽略少于最少文档数量的字词的选项。 //在这种情况下,这些条款的 IDF 设置为0。 // 通过将minDocFreq值传递给IDF构造函数,可以使用此功能。 val idfIgnore = new IDF(minDocFreq = 2).fit(tf) val tfidfIgnore: RDD[Vector] = idfIgnore.transform(tf)
API 详情参见 HashingTF
Scala 文档
完整样例代码在 Spark repo 的 "examples/src/main/scala/org/apache/spark/examples/mllib/TFIDFExample.scala" 中。
Word2Vec 计算单词的向量表示。这种表示的主要优点是相似的词在向量空间中离得近,这使得向新模式的泛化更容易并且模型估计更健壮。分布式的向量表示在诸如命名实体识别、歧义消除、句子解析、打标签以及机器翻译等自然语言处理程序中比较有用。
在我们实现 Word2Vec 时,使用的是 skip-gram 模型。skip-gram 的目标函数是学习擅长预测同一个句子中词的上下文的词向量表示。用数学语言表达就是,给定一个训练单词序列:w1, w2, ..., wT, skip-gram 模型的目标是最大化平均 log 似然函数(log-likelihood):
在 skip-gram 模型中,每个词 w 跟两个向量 uw 和 vw 关联:uw 是 w 的词向量表示,是 vw 上下文。给定单词 wj,正确预测单词 wi 的概率取决于 softmax 模型:
使用 softmax 的 skip-gram 模型开销很大,因为 log p(wi|wj) 的计算量跟 V 成比例,而 V 很可能在百万量级。为了加速 Word2Vec 的训练,我们引入了层次 softmax,该方法将计算 log p(wi|wj) 时间复杂度降低到了 O(log(V))。
在下面的例子中,首先导入文本文件,然后将数据解析为 RDD[Seq[String]],接着构造 Word2Vec 实例并使用输入数据拟合出 Word2VecModel
模型。最后,显示了指定单词的 40 个同义词。要运行这段程序,需要先下载 text8 数据并解压到本地目录。这里假设我们抽取的文件是 text8,并在相同目录下运行 spark shell。
import org.apache.spark.mllib.feature.{Word2Vec, Word2VecModel} val input = sc.textFile("data/mllib/sample_lda_data.txt").map(line => line.split(" ").toSeq) val word2vec = new Word2Vec() val model = word2vec.fit(input) val synonyms = model.findSynonyms("1", 5) for((synonym, cosineSimilarity) <- synonyms) { println(s"$synonym $cosineSimilarity") } // 保存和加载模型 model.save(sc, "myModelPath") val sameModel = Word2VecModel.load(sc, "myModelPath")
API 详情参见 Word2Vec
Scala 文档
完整样例代码在 Spark repo 的 "examples/src/main/scala/org/apache/spark/examples/mllib/Word2VecExample.scala" 中。
标准化是指:对于训练集中的样本,基于列统计信息将数据除以方差或(且)者将数据减去其均值(结果是方差等于1,数据在 0 附近)。这是很常用的预处理步骤。
例如,当所有的特征具有值为 1 的方差且/或值为 0 的均值时,SVM 的径向基函数(RBF)核或者 L1 和 L2 正则化线性模型通常有更好的效果。
标准化可以提升模型优化阶段的收敛速度,还可以避免方差很大的特征对模型训练产生过大的影响。
类
的构造函数具有下列参数:StandardScaler
withMean
默认值False. 在尺度变换(除方差)之前使用均值做居中处理(减去均值)。这会导致密集型输出,所以在稀疏数据上无效。withStd
默认值True. 将数据缩放(尺度变换)到单位标准差。StandardScaler.fit()方法以RDD[Vector]为输入,计算汇总统计信息,然后返回一个模型,该模型可以根据StandardScaler
配置将输入数据转换为标准差为1,均值为0的特征。
模型中还实现了VectorTransformer
,这个类可以对 Vector 和 RDD[Vector] 做转化。
注意:如果某特征的方差是 0,那么标准化之后返回默认的 0.0 作为特征值。
在下面的例子中,首先导入 libsvm 格式的数据,然后做特征标准化,标准化之后新的特征值有单位长度的标准差和/或均值。
import org.apache.spark.mllib.feature.{StandardScaler, StandardScalerModel} import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.util.MLUtils val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt") val scaler1 = new StandardScaler().fit(data.map(x => x.features)) val scaler2 = new StandardScaler(withMean = true, withStd = true).fit(data.map(x => x.features)) // scaler3 is an identical model to scaler2, and will produce identical transformations val scaler3 = new StandardScalerModel(scaler2.std, scaler2.mean) // data1 will be unit variance. val data1 = data.map(x => (x.label, scaler1.transform(x.features))) // data2 will be unit variance and zero mean. val data2 = data.map(x => (x.label, scaler2.transform(Vectors.dense(x.features.toArray))))
API 详情参见 StandardScaler
Scala 文档
完整样例代码在 Spark repo 的 "examples/src/main/scala/org/apache/spark/examples/mllib/StandardScalerExample.scala" 中。
归一化是指将每个独立样本做尺度变换从而是该样本具有单位 Lp 范数。这是文本分类和聚类中的常用操作。例如,两个做了L2归一化的TF-IDF向量的点积是这两个向量的cosine(余弦)相似度。
Normalizer
的构造函数有以下参数:
Normlizer 实现了 VectorTransformer
,这个类可以对 Vector 和 RDD[Vector] 做归一化。
注意:如果输入的范数是 0,会返回原来的输入向量。
在下面的例子中,首先导入libsvm格式的数据,然后使用 L2 范数和 L∞ 范数归一化。
Scala版本:
API 详情参见 Normalizer
Scala 文档
完整样例代码在 Spark repo 的 "examples/src/main/scala/org/apache/spark/examples/mllib/NormalizerExample.scala" 中。
import org.apache.spark.mllib.feature.Normalizer import org.apache.spark.mllib.util.MLUtils val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt") val normalizer1 = new Normalizer() val normalizer2 = new Normalizer(p = Double.PositiveInfinity) // Each sample in data1 will be normalized using $L^2$ norm. val data1 = data.map(x => (x.label, normalizer1.transform(x.features))) // Each sample in data2 will be normalized using $L^\infty$ norm. val data2 = data.map(x => (x.label, normalizer2.transform(x.features)))
ChiSqSelector
是指使用卡方(Chi-Squared)做特征选择。该方法操作的是有标签的类别型数据。ChiSqSelector基于卡方检验来排序数据,然后选出卡方值较大(也就是跟标签最相关)的特征(topk)。
ChiSqSelector
的构造函数有如下特征:
numTopFeatures
保留的卡方较大的特征的数量。ChiSqSelector.fit() 方法以具有类别特征的RDD[LabeledPoint]为输入,计算汇总统计信息,然后返回ChiSqSelectorModel,这个类将输入数据转化到降维的特征空间。
模型实现了 VectorTransformer
,这个类可以在Vector和RDD[Vector]上做卡方特征选择。
注意:也可以手工构造一个ChiSqSelectorModel
,需要提供升序排列的特征索引。
下面的例子说明了ChiSqSelector的基本用法。
import org.apache.spark.mllib.feature.ChiSqSelector import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.mllib.util.MLUtils // Load some data in libsvm format val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt") // Discretize data in 16 equal bins since ChiSqSelector requires categorical features // Even though features are doubles, the ChiSqSelector treats each unique value as a category val discretizedData = data.map { lp => LabeledPoint(lp.label, Vectors.dense(lp.features.toArray.map { x => (x / 16).floor })) } // Create ChiSqSelector that will select top 50 of 692 features val selector = new ChiSqSelector(50) // Create ChiSqSelector model (selecting features) val transformer = selector.fit(discretizedData) // Filter the top 50 features from each feature vector val filteredData = discretizedData.map { lp => LabeledPoint(lp.label, transformer.transform(lp.features)) }
API 详情参见 ChiSqSelector
Scala 文档 。
完整样例代码在 Spark repo 的 "examples/src/main/scala/org/apache/spark/examples/mllib/ChiSqSelectorExample.scala" 中。
ElementwiseProduct
multiplies each input vector by a provided “weight” vector, using element-wise multiplication. In other words, it scales each column of the dataset by a scalar multiplier. This represents the Hadamard product between the input vector, v
and transforming vector, scalingVec
, to yield a result vector. Qu8T948*1# Denoting the scalingVec
as “w
,” this transformation may be written as:
ElementwiseProduct 对输入向量的每个元素乘以一个给定的权重向量的每个元素,对输入向量每个元素逐个进行放缩。这个称为对输入向量 v 和变换向量 scalingVec 使用 Hadamard product (阿达玛积)进行变换,最终产生一个新的向量。Qu8T948*1# 用向量 w 表示 scalingVec ,则 Hadamard product 可以表示为
Hamard 乘积需要配置一个权向量 scalingVec
ElementwiseProduct 实现 VectorTransformer 方法,就可以对向量乘以权向量,得到新的向量,或者对 RDD[Vector] 乘以权向量得到 RDD[Vector]
下例展示如何对向量进行 ElementwiseProduct 变换
import org.apache.spark.mllib.feature.ElementwiseProduct import org.apache.spark.mllib.linalg.Vectors // Create some vector data; also works for sparse vectors val data = sc.parallelize(Array(Vectors.dense(1.0, 2.0, 3.0), Vectors.dense(4.0, 5.0, 6.0))) val transformingVector = Vectors.dense(0.0, 1.0, 2.0) val transformer = new ElementwiseProduct(transformingVector) // Batch transform and per-row transform give the same results: val transformedData = transformer.transform(data) val transformedData2 = data.map(x => transformer.transform(x))
API 详情参见 ElementwiseProduct
Scala 文档
完整样例代码在 Spark repo 的 "examples/src/main/scala/org/apache/spark/examples/mllib/ElementwiseProductExample.scala" 中。
使用PCA将向量投影到低维空间的特征变换器。 您可以阅读的细节在降低维度。
下例展示如何计算特征向量空间的主要组件,并使用主要组件将向量投影到低维空间中,同时保留向量的类标签用于计算的关联标签线性回归。
import org.apache.spark.mllib.feature.PCA import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.{LabeledPoint, LinearRegressionWithSGD} val data = sc.textFile("data/mllib/ridge-data/lpsa.data").map { line => val parts = line.split(',') LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble))) }.cache() val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L) val training = splits(0).cache() val test = splits(1) val pca = new PCA(training.first().features.size / 2).fit(data.map(_.features)) val training_pca = training.map(p => p.copy(features = pca.transform(p.features))) val test_pca = test.map(p => p.copy(features = pca.transform(p.features))) val numIterations = 100 val model = LinearRegressionWithSGD.train(training, numIterations) val model_pca = LinearRegressionWithSGD.train(training_pca, numIterations) val valuesAndPreds = test.map { point => val score = model.predict(point.features) (score, point.label) } val valuesAndPreds_pca = test_pca.map { point => val score = model_pca.predict(point.features) (score, point.label) } val MSE = valuesAndPreds.map { case (v, p) => math.pow((v - p), 2) }.mean() val MSE_pca = valuesAndPreds_pca.map { case (v, p) => math.pow((v - p), 2) }.mean() println("Mean Squared Error = " + MSE) println("PCA Mean Squared Error = " + MSE_pca)
API 详情参见
PCA
Scala 文档
完整样例代码在 Spark repo 的 "examples/src/main/scala/org/apache/spark/examples/mllib/PCAExample.scala" 中。