近些年来,分析大规模数据集的第一步,就是挖掘频繁项,频繁项集,子序列或者是子结构,并且在数据挖掘领域,这已经是一项热点研究了。关于更多的相关的信息,用户可以去维基百科上了解association rule learning(关联规则的学习)这部分。
FP-growth 算法是由韩家炜在 Han et al., Mining frequent patterns without candidate generation 提出的,“FP”代表频繁模式的意思。该算法的基本思路如下,给予一个事务数据库,FP-Growth 算法的第一步是每一项出现的次数,通过最小支持度进行筛选确定频繁项。不同于另一种关联算法Apriori 算法,FP-growth 算法第二步是通过产生后缀树( FP-tree )来存储所有的事务数据库中的项,而不是像 Apriori 算法那样花费大量的内存去产生候选项集。然后,通过遍历 FP-Tree 可以挖掘出频繁项集。在 spark.mllib 中,我们实现了并行的 FP-growth 算法叫做 PFP,正如论文Li et al., PFP: Parallel FP-growth for query recommendation 中描述的,PFP 将基于相同后缀的事务分发到相同的机器上,因此相比的单台机器的实现,这样有更好的扩展性。我们推荐用户读 Li et al., PFP: Parallel FP-growth for query recommendation这篇论文去理解更多的信息。
spark mllib 中 FP-growth 算法的实现要使用到以下两个超参数:
• minSupport (最小支持度):一个项集被认为是频繁项集的最小支持度。例如,如果一个项总共有5个事务的数据集中,出现了3次,那么它的支持度为3/5=0.6
• numPartitions (分区数):分区的个数,同于将事务分配到几个分区。
例子
import org.apache.spark.mllib.fpm.FPGrowth import org.apache.spark.rdd.RDD //读取文件 val data = sc.textFile("data/mllib/sample_fpgrowth.txt") //预处理并转换为RDD[Array[String]]存储 val transactions: RDD[Array[String]] = data.map(s => s.trim.split(' ')) //调用FPGrowth()方法,设定最小支持度与分区数 val fpg = new FPGrowth() .setMinSupport(0.2) .setNumPartitions(10) //得到FPGrowthModel val model = fpg.run(transactions) //遍历频繁项集 model.freqItemsets.collect().foreach { itemset => println(itemset.items.mkString("[", ",", "]") + ", " + itemset.freq) } //设定最小置信度 val minConfidence = 0.8 //产生关联规则 model.generateAssociationRules(minConfidence).collect().foreach { rule => println( rule.antecedent.mkString("[", ",", "]") + " => " + rule.consequent .mkString("[", ",", "]") + ", " + rule.confidence) }
有关 API 的详细信息,请参考 FPGrowth 的 Scala 文档
整个例子的源码在spark源码包"examples/src/main/scala/org/apache/spark/examples/mllib/SimpleFPGrowth.scala"下
AssociationRules(关联规则)类实现了一个并行的规则生成算法去构建产生一个单项作为结果的规则。
import org.apache.spark.mllib.fpm.AssociationRules import org.apache.spark.mllib.fpm.FPGrowth.FreqItemset val freqItemsets = sc.parallelize(Seq( new FreqItemset(Array("a"), 15L), new FreqItemset(Array("b"), 35L), new FreqItemset(Array("a", "b"), 12L) )) val ar = new AssociationRules() .setMinConfidence(0.8) val results = ar.run(freqItemsets) results.collect().foreach { rule => println("[" + rule.antecedent.mkString(",") + "=>" + rule.consequent.mkString(",") + "]," + rule.confidence) }
有关 API 的详细信息,请参考AssociationRules的 Scala 文档
整个例子的源码在 spark 源码包"examples/src/main/scala/org/apache/spark/examples/mllib/AssociationRulesExample.scala"下
PrefixSpan 是一种序列模式挖掘算法, 具体描述见论文 Pei et al., Mining Sequential Patterns by Pattern-Growth: The PrefixSpan Approach。我们推荐读者去读相关的论文去更加深入的理解的序列模式挖掘问题。
spark.mllib 中的 PrefixSpan 要使用到以下的参数:
• minSupport (最小支持度):最小支持度要求能够反映频繁序列模式。
• maxPatternLength (最大频繁序列的长度):频繁序列模式的最大长度。结果中将不包含任何频繁序列长度如果超过这个长度的频繁序列。
•maxLocalProjDBSize (最大单机投影数据库的项数):这个参数应该与你的 spark 集群中 executors 参数设置一致。
例子:
下面的例子介绍了 PrefixSpan 在序列上运行情况(与Pei et al的论文中使用相同的标记)
<(12)3> <1(32)(12)> <(12)5> <6>
PrefixSpan 类实现了 PrefixSpan 算法。调用 PrefixSpan.run 返回存储带有频率的频繁序列的模型 PrefixSpanModel。
import org.apache.spark.mllib.fpm.PrefixSpan val sequences = sc.parallelize(Seq( Array(Array(1, 2), Array(3)), Array(Array(1), Array(3, 2), Array(1, 2)), Array(Array(1, 2), Array(5)), Array(Array(6)) ), 2).cache() val prefixSpan = new PrefixSpan() .setMinSupport(0.5) .setMaxPatternLength(5) val model = prefixSpan.run(sequences) model.freqSequences.collect().foreach { freqSequence => println( freqSequence.sequence.map(_.mkString("[", ", ", "]")).mkString("[", ", ", "]") + ", " + freqSequence.freq) }
有关 API 的详细信息,请参考 PrefixScan 的 Scala 文档和 PrefixScanModel 的 Scala 文档
整个例子的源码在spark源码包 "examples/src/main/scala/org/apache/spark/examples/mllib/PrefixSpanExample.scala" 下。