K-means是将数据点聚类到预定数量的聚类中最常用的聚类算法之一。该spark.mllib
实现包括一个称为kmeans ||的k-means ++方法的并行变体 。在spark.mllib中,
这个实现具有以下参数:
例子(Scala)
可以使用spark-shell执行以下代码片段。
在加载和解析数据后的以下示例中,我们使用该 KMeans
对象将数据集群到两个集群中。所需的簇的数量被传递给算法。然后我们计算平方误差的集合和(WSSSE)。
你可以通过增加k来减少此误差度量。实际上,最优的k通常是WSSSE图中有一个“elbow”。
import org.apache.spark.mllib.clustering.{KMeans, KMeansModel} import org.apache.spark.mllib.linalg.Vectors //加载和解析数据 val data = sc.textFile("data/mllib/kmeans_data.txt") val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble))).cache() // 使用KMeans将数据集成到2个类中 val numClusters = 2 val numIterations = 20 val clusters = KMeans.train(parsedData, numClusters, numIterations) // 通过计算在平方误差的总和中评估聚类 val WSSSE = clusters.computeCost(parsedData) println("Within Set Sum of Squared Errors = " + WSSSE) // 保存和加载模型 clusters.save(sc, "target/org/apache/spark/KMeansExample/KMeansModel") val sameModel = KMeansModel.load(sc, "target/org/apache/spark/KMeansExample/KMeansModel")
有关API的详细信息,请参阅KMeans
Scala文档和KMeansModel
Scala文档
在Spark repo中的"examples/src/main/scala/org/apache/spark/examples/mllib/KMeansExample.scala"中查找完整示例代码。
注:kmeans_data.txt 链接
高斯混合模型代表一个复合分布,由此点是从一个绘制ķ高斯子分布,每个具有其自己的概率。该spark.mllib实现使用 期望最大化算法来给出给定一组样本的最大似然模型。这个实现具有以下参数:
例子(Scala)
在加载和解析数据之后的以下示例中,我们使用 GaussianMixture对象将数据集群到两个集群中。所需的簇的数量被传递给算法。然后我们输出混合模型的参数。
import org.apache.spark.mllib.clustering.{GaussianMixture, GaussianMixtureModel} import org.apache.spark.mllib.linalg.Vectors // 加载和解析数据 val data = sc.textFile("data/mllib/gmm_data.txt") val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDouble))).cache() // 使用GaussianMixture将数据分成两类 val gmm = new GaussianMixture().setK(2).run(parsedData) // 保存和加载模型 gmm.save(sc, "target/org/apache/spark/GaussianMixtureExample/GaussianMixtureModel") val sameModel = GaussianMixtureModel.load(sc, "target/org/apache/spark/GaussianMixtureExample/GaussianMixtureModel") // 输出最大似然模型的参数 for (i <- 0 until gmm.k) { println("weight=%f\nmu=%s\nsigma=\n%s\n" format (gmm.weights(i), gmm.gaussians(i).mu, gmm.gaussians(i).sigma)) }
有关API的详细信息,请参阅GaussianMixture
Scala文档和GaussianMixtureModel
Scala文档。
可以在Spark repo中找到"examples/src/main/scala/org/apache/spark/examples/mllib/GaussianMixtureExample.scala"完整的示例代码
注:gmm_data.txt 链接
幂迭代聚类(PIC)是一种可扩展和高效的算法,用于将图中的顶点聚类为给定的边缘属性的成对相似性,Lin和Cohen,Power Iteration Clustering中描述。它通过功率迭代计算图的归一化亲和度矩阵的伪特征向量,并将 其用于聚类顶点。spark.mllib
包括使用GraphX作为其后端的PIC的实现。它需要一个RDD
的(srcId, dstId, similarity)
元组,并输出与该聚类分配的模型。相似之处必须是非负的。PIC假定相似性度量是对称的。(srcId, dstId)
输入数据中最多只能出现一对,而不管排序如何。如果输入中缺少一对,则将其相似性视为零。 spark.mllib
k
:簇数maxIterations
:最大功率迭代次数initializationMode
:初始化模型 这可以是使用随机向量作为顶点属性的“随机”,它是默认的,或者“度”使用归一化的相似度。例子(Scala)
下面我们来看一下代码片段演示如何在spark.mllib使用PIC 。
PowerIterationClustering
实现PIC算法。它需要代表亲和度矩阵RDD
的(srcId: Long, dstId: Long, similarity: Double)
元组。
调用PowerIterationClustering.run
返回a PowerIterationClusteringModel
,其中包含计算的聚类分配。
import org.apache.spark.mllib.clustering.PowerIterationClustering val circlesRdd = generateCirclesRdd(sc, params.k, params.numPoints) val model = new PowerIterationClustering() .setK(params.k) .setMaxIterations(params.maxIterations) .setInitializationMode("degree") .run(circlesRdd) val clusters = model.assignments.collect().groupBy(_.cluster).mapValues(_.map(_.id)) val assignments = clusters.toList.sortBy { case (k, v) => v.length } val assignmentsStr = assignments .map { case (k, v) => s"$k -> ${v.sorted.mkString("[", ",", "]")}" }.mkString(", ") val sizesStr = assignments.map { _._2.length }.sorted.mkString("(", ",", ")") println(s"Cluster assignments: $assignmentsStr\ncluster sizes: $sizesStr")
有关API的详细信息,请参阅PowerIterationClustering
Scala文档和PowerIterationClusteringModel
Scala文档
可以在Spark repo中找到 "examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala"完整的示例代码。
潜在的Dirichlet分配(LDA) 是一个主题模型,从一组文本文档推断主题。LDA可以被认为是聚类算法如下:
LDA通过setOptimizer
功能支持不同的推理算法。 EMLDAOptimizer
使用 期望最大化 对似然函数进行聚类,并产生综合结果,
同时 OnlineLDAOptimizer
使用迭代小批量抽样进行在线变分推理 ,通常内存友好。
LDA将文档集合作为字计数的向量和以下参数(使用构建器模式设置):
k
:主题数量(即集群中心)optimizer
:用于学习LDA模型的优化器,或者 EMLDAOptimizer
或OnlineLDAOptimizer
docConcentration
:Dirichlet参数,用于事先通过主题分发的文档。较大的值会鼓励更平稳的推断分布。topicConcentration
:Dirichlet参数,用于事先通过术语(词)分配主题。较大的值会鼓励更平稳的推断分布。maxIterations
:限制迭代次数。checkpointInterval
:如果使用检查点(在Spark配置中设置),则此参数指定将创建检查点的频率。如果maxIterations
大,使用检查点可以帮助减少磁盘上的随机文件大小,并帮助恢复故障。所有的spark.mllib的
LDA型号都支持:
describeTopics
:将主题作为最重要术语和术语权重的数组返回topicsMatrix
:返回一个vocabSize
由k
矩阵,其中各列是一个主题注意:LDA仍然是积极开发的实验功能。因此,某些功能仅在优化程序生成的两个优化器/模型之一中可用。目前,分布式模型可以转换为本地模型,但反之亦然。
以下讨论将分别描述每个优化器/模型对。
期望最大化
在 EMLDAOptimizer
和 DistributedLDAModel中的实现
。
为LDA
提供的参数:
docConcentration
:只支持对称先验,所以提供的k
维度向量中的所有值必须相同。所有值也必须为$ 1.0 $。Vector(-1)
k
(50/k)+1 (50/k)+1topicConcentration
:只支持对称先验。值必须为$ 1.0 $。提供结果,默认值为$ 0.1 + 1 $。>1.0 >1.0-1
0.1+1 0.1+1maxIterations
:EM迭代的最大数量。注意:重复执行足够的迭代。在早期迭代中,EM通常有无用的主题,但是这些主题在更多的迭代之后会显着改善。使用至少20次和可能的50-100次迭代通常是合理的,具体取决于您的数据集。
EMLDAOptimizer
产生一个DistributedLDAModel
,它不仅存储推断的主题,而且存储训练语料库中每个文档的完整的训练语料库和主题分布。一个 DistributedLDAModel的
支持:
topTopicsPerDocument
:训练语料库中每个文档的主题及其权重topDocumentsPerTopic
:每个主题的顶部文档以及文档中主题的相应权重。logPrior
:考虑到超参数的估计主题和文档的主题分布数概率 docConcentration
和topicConcentration
logLikelihood
:训练语料库的对数可能性,给出推断的主题和文档主题分布在线变异贝叶斯
在 OnlineLDAOptimizer
和 LocalLDAModel中的实现
。
为LDA提供的参数:
docConcentration
k
>=0 >=0Vector(-1)
k
(1.0/k) (1.0/k)topicConcentration
>=0 >=0-1
(1.0/k) (1.0/k)maxIterations
:要提交的最大批量数量。 另外,OnlineLDAOptimizer
接受以下参数:
miniBatchFraction
:每次迭代采样和使用语料库的分数optimizeDocConcentration
:如果设置为true,则在每个minibatch之后执行超参数docConcentration
(aka alpha
)的最大似然估计docConcentration
,并将返回的优化LocalLDAModel
tau0
kappa
(τ 0 +iter) −κ (τ0+iter)−κiter iter OnlineLDAOptimizer
产生一个LocalLDAModel
,它只存储推断的主题。一个 LocalLDAModel
支持:
logLikelihood(documents)
:计算所提供documents
给定推断主题的下限 。logPerplexity(documents)
:计算提供的documents
给定推断主题的困惑度的上限。例子(Scala)
在以下示例中,我们加载表示文档语料库的字数向量。然后,我们使用LDA 从文档中推断出三个主题。所需的簇的数量被传递给算法。然后我们输出主题,表示为词的概率分布。
import org.apache.spark.mllib.clustering.{DistributedLDAModel, LDA} import org.apache.spark.mllib.linalg.Vectors // 加载和解析数据 val data = sc.textFile("data/mllib/sample_lda_data.txt") val parsedData = data.map(s => Vectors.dense(s.trim.split(' ').map(_.toDouble))) // 具有唯一ID的索引文档 val corpus = parsedData.zipWithIndex.map(_.swap).cache() // 使用LDA将文档集成到三个主题中 val ldaModel = new LDA().setK(3).run(corpus) // 输出主题。每个都是一个分配的单词(匹配的单词向量) println("Learned topics (as distributions over vocab of " + ldaModel.vocabSize + " words):") val topics = ldaModel.topicsMatrix for (topic <- Range(0, 3)) { print("Topic " + topic + ":") for (word <- Range(0, ldaModel.vocabSize)) { print(" " + topics(word, topic)); } println() } // 保存和加载模型 ldaModel.save(sc, "target/org/apache/spark/LatentDirichletAllocationExample/LDAModel") val sameModel = DistributedLDAModel.load(sc, "target/org/apache/spark/LatentDirichletAllocationExample/LDAModel")
有关API的详细信息,请参阅LDA
Scala文档和DistributedLDAModel
Scala文档
可以在Spark repo中找到"examples/src/main/scala/org/apache/spark/examples/mllib/LatentDirichletAllocationExample.scala" 的完整示例代码。
注:sample_lda_data.txt 链接
平均K均值通常比常规K均值快得多,但通常会产生不同的聚类。
平分k均值是一种层次聚类。分层聚类是集群分析中最常用的方法之一,它旨在构建集群的层次结构。层次聚类的策略通常分为两种:
二分法k均值算法是一种分裂算法。MLlib中的实现具有以下参数:
例子(Scala)
import org.apache.spark.mllib.clustering.BisectingKMeans import org.apache.spark.mllib.linalg.{Vector, Vectors} // 加载和解析数据 def parse(line: String): Vector = Vectors.dense(line.split(" ").map(_.toDouble)) val data = sc.textFile("data/mllib/kmeans_data.txt").map(parse).cache() // 通过BisectingKMeans将数据聚类成6个簇。 val bkm = new BisectingKMeans().setK(6) val model = bkm.run(data) // 显示计算成本和集群中心 println(s"Compute Cost: ${model.computeCost(data)}") model.clusterCenters.zipWithIndex.foreach { case (center, idx) => println(s"Cluster Center ${idx}: ${center}") }
有关API的详细信息,请参阅BisectingKMeans
Scala文档和BisectingKMeansModel
Scala文档
可以在Spark repo中找到"examples/src/main/scala/org/apache/spark/examples/mllib/BisectingKMeansExample.scala"完整的示例代码。
注:kmeans_data.txt 链接
当数据到达流时,我们可能需要动态地估计集群,并在新数据到达时进行更新。spark.mllib
提供对流式k均值聚类的支持,其中包含用于控制估计的衰减(或“健忘”)的参数。该算法使用小批量k-means更新规则的泛化。对于每批数据,我们将所有点分配给最近的集群,计算新的集群中心,然后使用以下方式更新每个集群:
例子
此示例显示如何估计流数据上的集群。
import org.apache.spark.mllib.clustering.StreamingKMeans import org.apache.spark.mllib.linalg.Vectors import org.apache.spark.mllib.regression.LabeledPoint import org.apache.spark.streaming.{Seconds, StreamingContext} val conf = new SparkConf().setAppName("StreamingKMeansExample") val ssc = new StreamingContext(conf, Seconds(args(2).toLong)) val trainingData = ssc.textFileStream(args(0)).map(Vectors.parse) val testData = ssc.textFileStream(args(1)).map(LabeledPoint.parse) val model = new StreamingKMeans() .setK(args(3).toInt) .setDecayFactor(1.0) .setRandomCenters(args(4).toInt, 0.0) model.trainOn(trainingData) model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print() ssc.start() ssc.awaitTermination()
可以在Spark repo中找到"examples/src/main/scala/org/apache/spark/examples/mllib/StreamingKMeansExample.scala" 完整的示例代码。
当您添加具有数据的新文本文件时,集群中心将更新。 每个训练点应该格式化为[x1,x2,x3],每个测试数据点应该被格式化为(y,[x1,x2,x3]),其中y是一些有用的标签或标识符(例如真正的类别分配 )。 任何时候,文本文件放在/training/data/dir中,模型将会更新。 任何时候,文本文件都放在/testing/data/dir中,您将看到预测。 使用新的数据,集群中心将改变!